本篇内容介绍了“golang中tinyrpc框架怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!tinyrpc功能tinyrpc基于t
本篇内容介绍了“golang中tinyrpc框架怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
tinyrpc
基于tcp协议,支持各种压缩格式,基于protocol buffer
的序列化协议。其rpc是基于Golang原生的net/rpc
开发而成。
tinyrpc
基于net/rpc
开发而成,在此基础上集成了额外的能力。项目结构如图:
功能目录如下:
codec 编码模块
compressor 压缩模块
header 请求/响应头模块
protoc-gen-tinyrpc 代码生成插件
serializer 序列化模块
客户端是以net/rpc
的rpc.Client
为基础构建,在此基础上定义了Option
以配置压缩方式和序列化方式:
type Option func(o *options)type options struct {compressType compressor.CompressTypeserializer serializer.Serializer}
在创建客户端的时候将配置好的压缩算法和序列化方式作为创建客户端的参数:
func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {options := options{compressType: compressor.Raw,serializer: serializer.Proto,}for _, option := range opts {option(&options)}return &Client{rpc.NewClientWithCodec(codec.NewClientCodec(conn, options.compressType, options.serializer))}}
服务端是以net/rpc
的rpc.Server
为基础构建,在此基础上扩展了Server
的定义:
type Server struct {*rpc.Serverserializer.Serializer}
在创建客户端和开启服务时传入序列化方式:
func NewServer(opts ...Option) *Server {options := options{serializer: serializer.Proto,}for _, option := range opts {option(&options)}return &Server{&rpc.Server{}, options.serializer}}func (s *Server) Serve(lis net.Listener) {log.Printf("tinyrpc started on: %s", lis.Addr().String())for {conn, err := lis.Accept()if err != nil {continue}go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))}}
压缩算法的实现中首先是定义了压缩的接口:
type Compressor interface {Zip([]byte) ([]byte, error)Unzip([]byte) ([]byte, error)}
压缩的接口包含压缩和解压方法。
压缩算法使用的是uint
类型,使用iota
来初始化,并且使用map来进行所有压缩算法实现的管理:
type CompressType uint16const (Raw CompressType = iotaGzipSnappyZlib)// Compressors which supported by rpcvar Compressors = map[CompressType]Compressor{Raw: RawCompressor{},Gzip: GzipCompressor{},Snappy: SnappyCompressor{},Zlib: ZlibCompressor{},}
序列化部分代码非常简单,提供了一个接口:
type Serializer interface {Marshal(message interface{}) ([]byte, error)Unmarshal(data []byte, message interface{}) error}
目前只有ProtoSerializer
一个实现,ProtoSerializer
内部的实现是基于"google.golang.org/protobuf/proto"
来实现的,并没有什么特殊的处理,因此就不花费笔墨详述了。
tinyrpc
定义了自己的请求头和响应头:
// RequestHeader request header structure looks like:// +--------------+----------------+----------+------------+----------+// | CompressType | Method | ID | RequestLen | Checksum |// +--------------+----------------+----------+------------+----------+// | uint16 | uvarint+string | uvarint | uvarint | uint32 |// +--------------+----------------+----------+------------+----------+type RequestHeader struct {sync.RWMutexCompressType compressor.CompressTypeMethod stringID uint64RequestLen uint32Checksum uint32}
请求头由压缩类型,方法,id,请求长度和校验码组成。
// ResponseHeader request header structure looks like:// +--------------+---------+----------------+-------------+----------+// | CompressType | ID | Error | ResponseLen | Checksum |// +--------------+---------+----------------+-------------+----------+// | uint16 | uvarint | uvarint+string | uvarint | uint32 |// +--------------+---------+----------------+-------------+----------+type ResponseHeader struct {sync.RWMutexCompressType compressor.CompressTypeID uint64Error stringResponseLen uint32Checksum uint32}
响应头由压缩类型,id,错误信息,返回长度和校验码组成。
为了实现头的重用,tinyrpc
为头构建了缓存池:
var (RequestPool sync.PoolResponsePool sync.Pool)func init() {RequestPool = sync.Pool{New: func() interface{} {return &RequestHeader{}}}ResponsePool = sync.Pool{New: func() interface{} {return &ResponseHeader{}}}}
在使用时get出来,生命周期结束后放回池子,并且在put之前需要进行重置:
h := header.RequestPool.Get().(*header.RequestHeader)defer func() {h.ResetHeader()header.RequestPool.Put(h)}()
// ResetHeader reset request headerfunc (r *RequestHeader) ResetHeader() {r.Lock()defer r.Unlock()r.ID = 0r.Checksum = 0r.Method = ""r.CompressType = 0r.RequestLen = 0}// ResetHeader reset response headerfunc (r *ResponseHeader) ResetHeader() {r.Lock()defer r.Unlock()r.Error = ""r.ID = 0r.CompressType = 0r.Checksum = 0r.ResponseLen = 0}
搞清楚了头的结构以及对象池的复用逻辑,那么具体的头的编码与解码就是很简单的拆装工作,就不在此一行一行解析了,大家有兴趣可以自行去阅读。
由于tinyrpc
是基于net/rpc
开发,那么其codec
模块自然也是依赖于net/rpc
的ClientCodec
和ServerCodec
接口来实现的。
客户端是基于ClientCodec
实现的能力:
type ClientCodec interface {WriteRequest(*Request, any) errorReadResponseHeader(*Response) errorReadResponseBody(any) errorClose() error}
client
定义了一个clientCodec
类型,并且实现了ClientCodec
的接口方法:
type clientCodec struct {r io.Readerw io.Writerc io.Closercompressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)serializer serializer.Serializerresponse header.ResponseHeader // rpc response headermutex sync.Mutex // protect pending mappending map[uint64]string}
WriteRequest
实现:
// WriteRequest Write the rpc request header and body to the io streamfunc (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {c.mutex.Lock()c.pending[r.Seq] = r.ServiceMethodc.mutex.Unlock()if _, ok := compressor.Compressors[c.compressor]; !ok {return NotFoundCompressorError}reqBody, err := c.serializer.Marshal(param)if err != nil {return err}compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)if err != nil {return err}h := header.RequestPool.Get().(*header.RequestHeader)defer func() {h.ResetHeader()header.RequestPool.Put(h)}()h.ID = r.Seqh.Method = r.ServiceMethodh.RequestLen = uint32(len(compressedReqBody))h.CompressType = compressor.CompressType(c.compressor)h.Checksum = crc32.ChecksumIEEE(compressedReqBody)if err := sendFrame(c.w, h.Marshal()); err != nil {return err}if err := write(c.w, compressedReqBody); err != nil {return err}c.w.(*bufio.Writer).Flush()return nil}
可以看到代码的实现还是比较清晰的,主要分为几个步骤:
将数据进行序列化构成请求体
选择相应的压缩算法进行压缩
从Pool中获取请求头实例将数据全部填入其中构成最后的请求头
分别通过io操作发送处理过的请求头和请求体
ReadResponseHeader
实现:
// ReadResponseHeader read the rpc response header from the io streamfunc (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {c.response.ResetHeader()data, err := recvFrame(c.r)if err != nil {return err}err = c.response.Unmarshal(data)if err != nil {return err}c.mutex.Lock()r.Seq = c.response.IDr.Error = c.response.Errorr.ServiceMethod = c.pending[r.Seq]delete(c.pending, r.Seq)c.mutex.Unlock()return nil}
此方法作用是读取返回的响应头,并解析成具体的结构体
ReadResponseBody
实现:
func (c *clientCodec) ReadResponseBody(param interface{}) error {if param == nil {if c.response.ResponseLen != 0 {if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {return err}}return nil}respBody := make([]byte, c.response.ResponseLen)err := read(c.r, respBody)if err != nil {return err}if c.response.Checksum != 0 {if crc32.ChecksumIEEE(respBody) != c.response.Checksum {return UnexpectedChecksumError}}if c.response.GetCompressType() != c.compressor {return CompressorTypeMismatchError}resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)if err != nil {return err}return c.serializer.Unmarshal(resp, param)}
此方法是用于读取返回的响应结构体,流程如下:
读取流获取响应体
根据响应头中的校验码来比对响应体是否完整
根据压缩算法来解压具体的结构体
进行反序列化
服务端是基于ServerCodec
实现的能力:
type ServerCodec interface {ReadRequestHeader(*Request) errorReadRequestBody(any) errorWriteResponse(*Response, any) error// Close can be called multiple times and must be idempotent.Close() error}
和客户端类似,server
定义了一个serverCodec
类型,并且实现了ServerCodec
的接口方法:
type serverCodec struct {r io.Readerw io.Writerc io.Closerrequest header.RequestHeaderserializer serializer.Serializermutex sync.Mutex // protects seq, pendingseq uint64pending map[uint64]*reqCtx}
ReadRequestHeader
实现:
// ReadRequestHeader read the rpc request header from the io streamfunc (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {s.request.ResetHeader()data, err := recvFrame(s.r)if err != nil {return err}err = s.request.Unmarshal(data)if err != nil {return err}s.mutex.Lock()s.seq++s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()}r.ServiceMethod = s.request.Methodr.Seq = s.seqs.mutex.Unlock()return nil}
此方法用于读取请求头并解析成结构体
ReadRequestBody
实现:
// ReadRequestBody read the rpc request body from the io streamfunc (s *serverCodec) ReadRequestBody(param interface{}) error {if param == nil {if s.request.RequestLen != 0 {if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {return err}}return nil}reqBody := make([]byte, s.request.RequestLen)err := read(s.r, reqBody)if err != nil {return err}if s.request.Checksum != 0 {if crc32.ChecksumIEEE(reqBody) != s.request.Checksum {return UnexpectedChecksumError}}if _, ok := compressor.Compressors[s.request.GetCompressType()]; !ok {return NotFoundCompressorError}req, err := compressor.Compressors[s.request.GetCompressType()].Unzip(reqBody)if err != nil {return err}return s.serializer.Unmarshal(req, param)}
此方法用于读取请求体,流程和读取响应体差不多,大致如下:
读取流并解析成请求体
根据请求头中的校验码进行校验
根据压缩算法进行解压
反序列化
WriteResponse
实现:
// WriteResponse Write the rpc response header and body to the io streamfunc (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error {s.mutex.Lock()reqCtx, ok := s.pending[r.Seq]if !ok {s.mutex.Unlock()return InvalidSequenceError}delete(s.pending, r.Seq)s.mutex.Unlock()if r.Error != "" {param = nil}if _, ok := compressor.Compressors[reqCtx.compareType]; !ok {return NotFoundCompressorError}var respBody []bytevar err errorif param != nil {respBody, err = s.serializer.Marshal(param)if err != nil {return err}}compressedRespBody, err := compressor.Compressors[reqCtx.compareType].Zip(respBody)if err != nil {return err}h := header.ResponsePool.Get().(*header.ResponseHeader)defer func() {h.ResetHeader()header.ResponsePool.Put(h)}()h.ID = reqCtx.requestIDh.Error = r.Errorh.ResponseLen = uint32(len(compressedRespBody))h.Checksum = crc32.ChecksumIEEE(compressedRespBody)h.CompressType = reqCtx.compareTypeif err = sendFrame(s.w, h.Marshal()); err != nil {return err}if err = write(s.w, compressedRespBody); err != nil {return err}s.w.(*bufio.Writer).Flush()return nil}
此方法用于写入响应体,大致与写入请求体差不多,流程如下:
将响应体序列化
使用压缩算法将响应体进行压缩
使用Pool管理响应头
分别发送返回头和返回体
“Golang中tinyrpc框架怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!
--结束END--
本文标题: Golang中tinyrpc框架怎么使用
本文链接: https://lsjlt.com/news/348467.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-04-05
2024-04-05
2024-04-05
2024-04-04
2024-04-05
2024-04-05
2024-04-05
2024-04-05
2024-04-04
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0