返回顶部
首页 > 资讯 > 精选 >如何理解tcpServer 中的IOLoop方法
  • 713
分享到

如何理解tcpServer 中的IOLoop方法

2023-06-19 10:06:05 713人浏览 安东尼
摘要

本篇文章为大家展示了如何理解tcpServer 中的ioLoop方法,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。今天我们就分析一下IOLoop这个方法废话不多说,直接上代码吧(代码位于nsq/ns

本篇文章为大家展示了如何理解tcpServer 中的ioLoop方法,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

今天我们就分析一下IOLoop这个方法

废话不多说,直接上代码吧(代码位于nsq/nsqlookupd/lookup_protocol_v1.Go这个文件中)

//这段代码位于nsq/nsqlookupd/client_v1.go这个文件中type ClientV1 struct {net.Conn //组合net.Conn接口peerInfo *PeerInfo //client的信息也就是前面所讲的product的信息}//初始化一个ClientV1func NewClientV1(conn net.Conn) *ClientV1 {return &ClientV1{Conn: conn,}}//实现String接口func (c *ClientV1) String() string {return c.RemoteAddr().String()}//定义ClientV1结束type LookupProtocolV1 struct {ctx *Context //一直贯穿整个代码的Context,具体可翻看前面章节}func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {var err errorvar line strinGClient := NewClientV1(conn) //新建一个client版本为V1err = nilreader := bufio.NewReader(client) //由client 创建一个 带有buffer 的Reader 默认 buffer size 为4096,这里的NewReader参数为io.Reader 接口,为何net.Conn接口也能作为参数呢?因为net.Conn 接口其实也是实现了io.Reader接口了,具体概念大家可翻看golang教程for {line, err = reader.ReadString('\n') //按行读取if err != nil {break}line = strings.TrimSpace(line) //去掉这行两头的空格符params := strings.Split(line, " ") //字符串按一个空格字符串分割,并获取相应的Commad 以及 该command 的相应的paramsresponse, err := p.Exec(client, reader, params) //执行相应的Commandif err != nil {ctx := ""if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {ctx = " - " + parentErr.Error()}p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, err, ctx)_, err = protocol.SendResponse(client, []byte(err.Error())) //返回错误给clientif err != nil {break}// errors of type FatalClientErr should forceably close the connectionif _, ok := err.(*protocol.FatalClientErr); ok { break}continue}if response != nil {_, err = protocol.SendResponse(client, response) //返回命令处理结果给clientif err != nil {break}}}//for 循环结束了 说明程序要退出了,调用ReGIStrationDB 中的 RemoveProducer从producer 的注册数据中删除 producer信息//这里的RegistrationDB下章再具体讲解p.ctx.nsqlookupd.logf("CLIENT(%s): closing", client)if client.peerInfo != nil {registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id)for _, r := range registrations {if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",client, r.Category, r.Key, r.SubKey)}}}return err}//这个方法就是执行相应的命令动作 有 PING IDENTIFY REGISTER UNREGISTER 这四个类型func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {switch params[0] {case "PING": //用于client的心跳处理return p.PING(client, params)case "IDENTIFY": //用于client端的信息注册,执行PING REGISTER UNREGISTER 命令前必须先执行此命令return p.IDENTIFY(client, reader, params[1:])case "REGISTER": //用于client端注册topic以及channel的命令return p.REGISTER(client, reader, params[1:])case "UNREGISTER": //执行与REGISTER命令相反的逻辑return p.UNREGISTER(client, reader, params[1:])}return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))}//INDENTIFY命令处理逻辑//该命令用于注册client的producer信息,并返回nsqlookupd的TCP 以及 Http 端口信息给client//大致的报文如下//  V1 INDENTIFY\n   注意了这里前面提过的V1前面是两个空格字符//123\n  这里是后面JSON数据(producer 信息的json数据的字节长度)//{....}\n 一串表示producer信息的json数据,具体的可参考 nsq/nsqlookupd/registration_db.go文件中的PeerInfo structfunc (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {var err errorif client.peerInfo != nil { //如果有该client 的PeerInfo数据则返回错误return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")}var bodyLen int32err = binary.Read(reader, binary.BigEndian, &bodyLen) //获取producer PeerInfo json数据的字节长度 大端二进制格式if err != nil {return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size")}body := make([]byte, bodyLen) //初始化一个producer PeerInfo json数据长度的bytes_, err = io.ReadFull(reader, body) //读取所有的json数据if err != nil {return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body")}// body is a json structure with producer infORMationpeerInfo := PeerInfo{id: client.RemoteAddr().String()} //实例化一个PeerInfoerr = json.Unmarshal(body, &peerInfo) //解析producer PeerInfo json数据到peerInfoif err != nil {//json 数据解析失败 返回错误return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")}peerInfo.RemoteAddress = client.RemoteAddr().String() //获取PeerInfo remote address// require all fields//校验获取的PeerInfo 数据if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" {return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields")}//将当前系统时间(纳秒)更新到PeerInfo 中的lastUpdate中 用于 client的心跳判断atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano())p.ctx.nsqlookupd.logf("CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s",client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version)client.peerInfo = &peerInfo//注册producer PeerInfo 信息到 RegistrationDB中 其中Registration的 Category 为client Key 和 SubKey为空if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")}// build a response//将nsqlookupd的TCP端口,HTTP端口信息,版本信息,broadcast address信息,以及host name信息 已json数据格式返回给clientdata := make(map[string]interface{})data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Portdata["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Portdata["version"] = version.Binaryhostname, err := os.Hostname()if err != nil {log.Fatalf("ERROR: unable to get hostname %s", err)}data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddressdata["hostname"] = hostnameresponse, err := json.Marshal(data)if err != nil {p.ctx.nsqlookupd.logf("ERROR: marshaling %v", data)return []byte("OK"), nil}return response, nil}//获取params中的topic 以及 channelfunc getTopicChan(command string, params []string) (string, string, error) {if len(params) == 0 {return "", "", protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command))}topicName := params[0]var channelName stringif len(params) >= 2 {channelName = params[1]}//校验是否是topicif !protocol.IsValidTopicName(topicName) {return "", "", protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName))}//校验是否是channelif channelName != "" && !protocol.IsValidChannelName(channelName) {return "", "", protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName))}return topicName, channelName, nil}//REGISTER 命令 用于注册client的topic 以及 channel信息//一个topic下可以有多个channel//一个消费者订阅的是一个topic 那么 生成者给这个topic 下的channel的信息 这个消费者也能接收得到这个信息,如果消费者订阅的不是这个channel的信息,那么这个消费者则接受不到这个信息//nsq topic 与channel的关系,大家可以多搜索下资料,我这里感觉讲的也不太清晰,请大家谅解一下//REGISTER 命令必须在INDENTIFY 之后才能调用//具体协议报文如下//REGISTER topic1\n 这个只创建一个名字为topic1的topic//或如下报文//REGISTER topic1 channel1\n 这个只创建一个名字为topic1的topic 且topic1下面创建一个名字为channel1的channelfunc (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {if client.peerInfo == nil {return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")}//获取REGISTER 命令时的topic 以及channel名字topic, channel, err := getTopicChan("REGISTER", params) if err != nil {return nil, err}//如果有channelif channel != "" {//添加channel信息到RegistrationDB中其中Registration的 Category 为"channel"字符串,Key为topic,SubKey为channelkey := Registration{"channel", topic, channel}if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s",client, "channel", topic, channel)}}//添加topic信息到RegistrationDB中其中Registration的 Category 为"topic"字符串,Key为topic,SubKey为空key := Registration{"topic", topic, ""}if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s",client, "topic", topic, "")}//返回OKreturn []byte("OK"), nil}//UNREGISTER命令用于取消注册topic 或取消注册某个topic下的某一个channel//报文格式如下//UNREGISTER topic1 channel1\n 这个报文表示取消注册名字为topic1的topic下的名字为channel1的channel,这个名字 只取消注册这个channel1,不取消注册topic1下的其他channel 以及这个topic1本身//或这个格式的报文//UNREGISTER topic1\n 这个报文表示取消注册名字为topic1的topic,这个时候topic1以及topic1下的所有channel都取消注册了func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {if client.peerInfo == nil {return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")}topic, channel, err := getTopicChan("UNREGISTER", params) //获取topic 以及channel 的名字if err != nil {return nil, err}if channel != "" {//如果有channel 则只取消注册这个topic下的这个channelkey := Registration{"channel", topic, channel}removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id)if removed {p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",client, "channel", topic, channel)}// for ephemeral channels, remove the channel as well if it has no producersif left == 0 && strings.HasSuffix(channel, "#ephemeral") {p.ctx.nsqlookupd.DB.RemoveRegistration(key)}} else {// no channel was specified so this is a topic unregistration// remove all of the channel registrations...// normally this shouldn't happen which is why we print a warning message// if anything is actually removed//如果没有channel 这个取消注册这个topic 以及这个topic下的所有channelregistrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*")for _, r := range registrations {if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {p.ctx.nsqlookupd.logf("WARNING: client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s",client, "channel", topic, r.SubKey)}}key := Registration{"topic", topic, ""}if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed {p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",client, "topic", topic, "")}}//返回OKreturn []byte("OK"), nil}//PING 用于client中的心跳处理命令func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) {if client.peerInfo != nil {// we could get a PING before other commands on the same client connection//获取上一次心跳的时间cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate))//获取当前时间now := time.Now()//这里日志输出两次心跳之间的间隔时间p.ctx.nsqlookupd.logf("CLIENT(%s): pinged (last ping %s)", client.peerInfo.id,now.Sub(cur))//更新PeerInfo中的lastUpdate时间为当前时间atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano())}//返回OKreturn []byte("OK"), nil}

nsqlookupd中的tcpServer中的主要协议处理的IOLoop这里基本讲解完了。

上述内容就是如何理解tcpServer 中的IOLoop方法,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注编程网精选频道。

--结束END--

本文标题: 如何理解tcpServer 中的IOLoop方法

本文链接: https://lsjlt.com/news/295437.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 如何理解tcpServer 中的IOLoop方法
    本篇文章为大家展示了如何理解tcpServer 中的IOLoop方法,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。今天我们就分析一下IOLoop这个方法废话不多说,直接上代码吧(代码位于nsq/ns...
    99+
    2023-06-19
  • 如何理解jQuery中ajax - get()方法
    这期内容当中小编将会给大家带来有关如何理解jQuery中ajax - get()方法,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。在jquery中使用get,post和a...
    99+
    2024-04-02
  • 如何理解Java8接口中的默认方法和静态方法
    本篇内容介绍了“如何理解Java8接口中的默认方法和静态方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1.接口中的默认方法和静态方法Ja...
    99+
    2023-06-25
  • 如何理解Kurbernetes中服务暴露方法
    本篇内容主要讲解“如何理解Kurbernetes中服务暴露方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何理解Kurbernetes中服务暴露方法”吧!由...
    99+
    2024-04-02
  • 如何理解Java中的语法糖
    本篇内容介绍了“如何理解Java中的语法糖”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!语法糖在聊之前我们需要先了解一下 语法糖 的概念:语...
    99+
    2023-06-15
  • 如何理解JavaScript数值方法
    这篇文章将为大家详细讲解有关如何理解JavaScript数值方法,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。可以使用内置方法和属性对数字执行哪些有用的操作...
    99+
    2024-04-02
  • 如何理解C#虚拟方法
    这期内容当中小编将会给大家带来有关如何理解C#虚拟方法,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。当类中的方法声明前加上了virtual修饰符,我们称之为C#虚拟方法 ,反之为非虚。使用了virtual...
    99+
    2023-06-17
  • 如何理解Ruby加密方法
    如何理解Ruby加密方法,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Ruby语言虽然是一个结构比较简单的编程语言,但是其中也有程序加密的功能。在这里我们就以几...
    99+
    2023-06-17
  • 如何解析Java中的clone方法
    如何解析Java中的clone方法,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。Java中对象的创建clone顾名思义就是复制, 在Java语言中, clone方法被对象调用,...
    99+
    2023-06-17
  • 如何理解Shell中的中括号用法
    本篇文章给大家分享的是有关如何理解Shell中的中括号用法,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。导读在计算机科学中,Shell俗称壳(用来区别于核),是指"为...
    99+
    2023-06-05
  • 如何理解SQL中Group By的用法
    这期内容当中小编将会给大家带来有关如何理解SQL中Group By的用法,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。GROUP BY 语句用于结合合计函数,根据一个或多...
    99+
    2024-04-02
  • python中如何理解算法的度量
    这期内容当中小编将会给大家带来有关python中如何理解算法的度量,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1 机器学习算法性能度量这里要评估一下这个算法到底效果如何。 评价的度量是有很多种的, 不同...
    99+
    2023-06-19
  • 如何理解Document对象的属性和方法
    如何理解Document对象的属性和方法,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一个文档对象模型或者说DOM就是一个API,它定义了...
    99+
    2024-04-02
  • 如何理解C++实现程序方法
    这篇文章将为大家详细讲解有关如何理解C++实现程序方法,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。C++实现程序解决问题,本程序采用射线法,由待测试点(v)水平引出一条射线B(v,w),计...
    99+
    2023-06-17
  • 如何理解Ansi_Padding的用法
    这篇文章给大家介绍如何理解Ansi_Padding的用法,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。你对Ansi_Padding的用法是否了解,这里和大家分享一下,当设置为OFF时,...
    99+
    2024-04-02
  • 如何理解VB.NET发送邮件的两种方法
    今天就跟大家聊聊有关如何理解VB.NET发送邮件的两种方法,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。VB.NET功能非常强大,程序界面标准,可以帮助程序员提高开发效率。它能够支持...
    99+
    2023-06-17
  • 如何理解Scrapy关于item pipeline的传递方法
    这篇文章主要介绍“如何理解Scrapy关于item pipeline的传递方法”,在日常操作中,相信很多人在如何理解Scrapy关于item pipeline的传递方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家...
    99+
    2023-06-01
  • 深入理解Python中的super()方法
    前言 python的类分别有新式类和经典类,都支持多继承。在类的继承中,如果你想要重写父类的方法而不是覆盖的父类方法,这个时候我们可以使用super()方法来实现 python语言与C++有相似的类继承,在...
    99+
    2022-06-04
    方法 Python super
  • 理解Android中Activity的方法回调
    为什么需要方法回调? 方法回调是功能定义和功能分离的一种手段,是一种松耦合的设计思想。在JAVA中回调是通过接口来实现的。作为一种系统架构,必须要有自己的运行环境,并且要提供用...
    99+
    2022-06-06
    方法 回调 activity Android
  • Java中URL的处理方法详解
    目录前言URL 类方法URLConnections 类方法方法实例前言 URL(Uniform Resource Locator)中文名为统一资源定位符,有时也被俗称为网页地址。表示...
    99+
    2023-05-20
    Java URL处理方法 Java URL处理 Java URL
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作