Tendermint ABCI 应用 KVStore 源码详解

标签:底层链开发
发布时间:2018年11月30日 价值:20000.00 / 共识:23

这篇文章以 ABCI 示例 KVStore 应用及默认的 socket 连接为例说明 ABCI 应用的启动及 abci-cli 客户端与其交互的过程,以加深开发 ABCI 应用的模式及源码组织方式的理解。

整体流程说明

  1. ABCI 应用服务端:在命令行执行 abci-cli kvstore 启动应用后,它会在 46658 端口等待客户端的 TCP 连接。
  2. abci-cli 客户端:这个客户端指的是 echoinfodeliverTx 等子命令。执行这些命令会建立一条与 ABCI 应用服务端的 TCP 连接,并将子命令后面的参数当作请求消息发送给应用服务端进行处理(这里的 ABCI 应用与 Tendermint 节点绑定在一起)。

构建命令过程

程序入口在 abci/cmd/abci-cli/main.goExecute 函数。

这里面做事情有:

  • 构建 RootCmd 命令,即 abci-cli 命令及各子命令。

  • 注册全局 Flags,主要包括:

    • ABCI 应用服务端监听地址 flagAddress ,默认为:tcp://0.0.0.0:46658
    • abci-cli 客户端与 ABCI 应用服务端的通信协议 flagAbci,默认为:socket
    • 日志等级 flagLogLevel,默认为:debug
  • 添加 ABCI 应用 kvstoredummy 以及 echoCmdinfoCmddeliverTxCmdcommitCmd 等客户端命令。

RootCmd、kvstore 命令的实现及启动

RootCmd 主命令逻辑

所有子命令都要添加到 RootCmd 主命令下面。

执行 abci-cli 命令只会列出其使用文档,在执行具体子命令时才会执行其定义的相应应用逻辑。

这里只需看这段代码:

  1. // 执行主命令时如果 client 为空,会创建 client 并启动,会根据 flagAbci 参数来判断要创建 socket 客户端
  2. // 还是 RPC 客户端
  3. if client == nil {
  4. var err error
  5. client, err = abcicli.NewClient(flagAddress, flagAbci, false)
  6. if err != nil {
  7. return err
  8. }
  9. client.SetLogger(logger.With("module", "abci-client"))
  10. // 启动客户端,实际执行的是 client.OnStart() 函数
  11. if err := client.Start(); err != nil {
  12. return err
  13. }
  14. }

abci-cli 客户端

在命令行执行 abci-cli echo hello,会与 ABCI 应用服务端建立一条 TCP 连接并将 “abc” 发送到服务端进行处理,收到应后断开连接。但这样比较麻烦,可以使用 abci-cli console 命令可以在交互式命令行中与应用服务端交互。

通过调用 abcicli.NewClient 函数来创建客户端。

返回的是接口 abci/client/Client,这个接口有 socketgrpclocal 三种实现,但 flag 只可以指定前两种,默认为 socket

  1. func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
  2. switch transport {
  3. case "socket":
  4. client = NewSocketClient(addr, mustConnect)
  5. case "grpc":
  6. client = NewGRPCClient(addr, mustConnect)
  7. default:
  8. err = fmt.Errorf("Unknown abci transport %s", transport)
  9. }
  10. return
  11. }

abci/client/Client 接口继承了 tmlibs/common/service/Service 接口,可以启动、停止和重置。客户端和服务端都需要这些功能,使用时可以通过把 BaseService 作为自定义结构的匿名字段来实现。

首先看 socketClient 函数的结构,它实现了 abci/client/Client 接口,由于 cmn.BaseService 结构是它的匿名字段,也间接实现了 tmlibs/common/service/Service 接口:

  1. type socketClient struct {
  2. // 这个结构实现了 Service 接口,内部包含 impl Service字段,即具体实现 Service 的结构
  3. cmn.BaseService
  4. // 用来传递请求及应答消息的通道,
  5. reqQueue chan *ReqRes
  6. flushTimer *cmn.ThrottleTimer
  7. mustConnect bool
  8. mtx sync.Mutex
  9. addr string
  10. conn net.Conn
  11. err error
  12. // 标准库中的双向链表,从 reqQueue 读取到的请求消息都会先放入这个链表的尾端
  13. reqSent *list.List
  14. resCb func(*types.Request, *types.Response) // listens to all callbacks
  15. }

创建 socketClient 的函数:

  1. func NewSocketClient(addr string, mustConnect bool) *socketClient {
  2. cli := &socketClient{
  3. reqQueue: make(chan *ReqRes, reqQueueSize),
  4. flushTimer: cmn.NewThrottleTimer("socketClient", flushThrottleMS),
  5. mustConnect: mustConnect,
  6. addr: addr,
  7. reqSent: list.New(),
  8. resCb: nil,
  9. }
  10. // 这里传入了具体实现 Service 的结构 cli
  11. cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
  12. return cli
  13. }

在命令行执行 abci-cli kvstore 命令时会执行 client.Start() 启动服务,这里用默认的 socket 连接及 kvstore 应用举例说明。这里执行的是 socketClient 结构中匿名字段 cmn.BaseService 的方法:

  1. func (bs *BaseService) Start() error {
  2. if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
  3. if atomic.LoadUint32(&bs.stopped) == 1 {
  4. bs.Logger.Error(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
  5. return ErrAlreadyStopped
  6. }
  7. bs.Logger.Info(Fmt("Starting %v", bs.name), "impl", bs.impl)
  8. // 这里实际执行上面传入的 cli(即 socketClient 结构) 的 OnStart 函数来启动服务
  9. err := bs.impl.OnStart()
  10. if err != nil {
  11. // revert flag
  12. atomic.StoreUint32(&bs.started, 0)
  13. return err
  14. }
  15. return nil
  16. }
  17. bs.Logger.Debug(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl)
  18. return ErrAlreadyStarted
  19. }

以上就是客户端的启动过程。

ABCI 应用服务端

现在看一下执行 abci-cli kvstore 命令都做了什么。

执行此命令时,实际执行的是 cmdKVStore 函数,启动了应用服务端,在 tcp://0.0.0.0:46658 监听连接。

启动服务端:

  1. // 默认创建 socket 服务端
  2. srv, err := server.NewServer(flagAddrD, flagAbci, app)
  3. if err != nil {
  4. return err
  5. }
  6. srv.SetLogger(logger.With("module", "abci-server"))
  7. if err := srv.Start(); err != nil {
  8. return err
  9. }

NewSocketServer 创建服务端:

  1. func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
  2. proto, addr := cmn.ProtocolAndAddress(protoAddr)
  3. s := &SocketServer{
  4. proto: proto,
  5. addr: addr,
  6. listener: nil,
  7. app: app,
  8. conns: make(map[int]net.Conn),
  9. }
  10. // 这里使用的模式与 Client 段一致
  11. s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
  12. return s
  13. }

执行 srv.Start() 函数时,实际执行的是 SocketServer 的实现,通过 BaseService 结构的 Start 方法调用:

  1. func (s *SocketServer) OnStart() error {
  2. if err := s.BaseService.OnStart(); err != nil {
  3. return err
  4. }
  5. ln, err := net.Listen(s.proto, s.addr)
  6. if err != nil {
  7. return err
  8. }
  9. s.listener = ln
  10. // 启动一个协程来监听连接
  11. go s.acceptConnectionsRoutine()
  12. return nil
  13. }

至此已经把 ABCI 应用服务端是如何启动的说明了,下面的部分会详细说明请求及应答处理的细节。

请求及应答处理

这部分以 deliver_tx 命令为例来进行说明。

abci-cli 客户端

为了方便,这里再看一下 socketClient 的数据结构:

  1. type socketClient struct {
  2. cmn.BaseService
  3. reqQueue chan *ReqRes
  4. flushTimer *cmn.ThrottleTimer
  5. mustConnect bool
  6. mtx sync.Mutex
  7. addr string
  8. conn net.Conn
  9. err error
  10. // 这里会把请求写入双向链表 reqSent 的尾端,在 recvResponseRoutine 函数中接收到应答时会从此链表取出
  11. // 第一个请求进行类型比较,如果与应答类型一样则返回给前端
  12. reqSent *list.List
  13. resCb func(*types.Request, *types.Response) // listens to all callbacks
  14. }

OnStart 函数所做的就是与服务端建立连接,启动两个协程来处理请求与应答。

先看处理请求的函数 sendRequestsRoutine

  1. func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
  2. w := bufio.NewWriter(conn)
  3. for {
  4. select {
  5. // 发送 flush 类型请求的定时器
  6. case <-cli.flushTimer.Ch:
  7. select {
  8. case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
  9. default:
  10. // Probably will fill the buffer, or retry later.
  11. }
  12. case <-cli.Quit():
  13. return
  14. case reqres := <-cli.reqQueue:
  15. // 这里会把请求写入双向链表 reqSent 的尾端
  16. cli.willSendReq(reqres)
  17. // 把请求消息写入连接缓冲,这时还没有发送给服务端
  18. err := types.WriteMessage(reqres.Request, w)
  19. if err != nil {
  20. cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
  21. return
  22. }
  23. // 如果请求是 flush 类型,会把缓冲的请求消息 (包括此 flush 请求) 写入连接,
  24. // 由 kvstore 服务端接收并处理。
  25. // 有两种方式发送 flush 类型的请求:1) 定时器触发;2) DeliverTxSync 函数中主动发送
  26. if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
  27. err = w.Flush()
  28. if err != nil {
  29. cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
  30. return
  31. }
  32. }
  33. }
  34. }
  35. }

现在看处理应答的 recvResponseRoutine 函数:

  1. func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
  2. r := bufio.NewReader(conn) // Buffer reads
  3. for {
  4. var res = &types.Response{}
  5. // 从连接中读取应答,出错时会关闭连接并执行 flushQueue() 释放 wg.WaitGroup
  6. err := types.ReadMessage(r, res)
  7. if err != nil {
  8. cli.StopForError(err)
  9. return
  10. }
  11. switch r := res.Value.(type) {
  12. case *types.Response_Exception:
  13. cli.StopForError(errors.New(r.Exception.Error))
  14. return
  15. default:
  16. // 应答处理逻辑在这里
  17. err := cli.didRecvResponse(res)
  18. if err != nil {
  19. cli.StopForError(err)
  20. return
  21. }
  22. }
  23. }
  24. }
  25. func (cli *socketClient) didRecvResponse(res *types.Response) error {
  26. cli.mtx.Lock()
  27. defer cli.mtx.Unlock()
  28. // 从双向链表 reqSent 中取出第一个请求
  29. next := cli.reqSent.Front()
  30. if next == nil {
  31. return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
  32. }
  33. reqres := next.Value.(*ReqRes)
  34. // 检查请求与应答的类型是否匹配
  35. if !resMatchesReq(reqres.Request, res) {
  36. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  37. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  38. }
  39. reqres.Response = res // Set response
  40. reqres.Done() // 释放此请求创建时执行的 wg.Add(1)
  41. cli.reqSent.Remove(next) // 从链表中删除第一个请求
  42. // Notify reqRes listener if set
  43. if cb := reqres.GetCallback(); cb != nil {
  44. cb(res)
  45. }
  46. // Notify client listener if set
  47. if cli.resCb != nil {
  48. cli.resCb(reqres.Request, res)
  49. }
  50. return nil
  51. }

ABCI 应用服务端

创建应用细节

先看应用的数据结构 KVStoreApplication

  1. // 这个结构只实现了 Info、DeliverTx、CheckTx、Commit 和 Query 方法
  2. type KVStoreApplication struct {
  3. // 这个基础结构实现了 "tendermint/abci/types/Application" 接口(此项目中基本都是用的这种模式)。
  4. // 这个结构实现的接口的方法中没有具体应用逻辑,以供开发者在自己的应用结构中继承此结构后,可以只实现
  5. // 必须的方法,而无需实现接口的全部方法
  6. types.BaseApplication
  7. state State
  8. }

创建应用:

  1. func NewKVStoreApplication() *KVStoreApplication {
  2. state := loadState(dbm.NewMemDB())
  3. return &KVStoreApplication{state: state}
  4. }

主要看 loadState 函数,它根据键 stateKey 从内存存储 MemDB 结构中获取对应状态,因为是初始化,肯定没有对应值,返回的是一个带有新建 MemDB (就是一个带锁的 map)的 State

  1. func loadState(db dbm.DB) State {
  2. stateBytes := db.Get(stateKey)
  3. var state State
  4. if len(stateBytes) != 0 {
  5. err := json.Unmarshal(stateBytes, &state)
  6. if err != nil {
  7. panic(err)
  8. }
  9. }
  10. state.db = db
  11. return state
  12. }

服务端处理请求及应答细节

重点看接受连接的函数:

  1. func (s *SocketServer) acceptConnectionsRoutine() {
  2. for {
  3. // 接受连接,下面这些日志就是命令行启动 kvstore 后看到的信息
  4. s.Logger.Info("Waiting for new connection...")
  5. conn, err := s.listener.Accept()
  6. if err != nil {
  7. if !s.IsRunning() {
  8. return // Ignore error from listener closing.
  9. }
  10. s.Logger.Error("Failed to accept connection: " + err.Error())
  11. continue
  12. }
  13. s.Logger.Info("Accepted a new connection")
  14. // 可以接受多条连接并记录
  15. connID := s.addConn(conn)
  16. closeConn := make(chan error, 2) // Push to signal connection closed
  17. responses := make(chan *types.Response, 1000) // A channel to buffer responses
  18. // 从连接读取请求并处理
  19. go s.handleRequests(closeConn, conn, responses)
  20. // 从 'responses' 获取应答并写到连接中
  21. go s.handleResponses(closeConn, conn, responses)
  22. // 等待信号来关闭连接
  23. go s.waitForClose(closeConn, connID)
  24. }
  25. }

先看处理请求的 handleRequests 函数:

  1. func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
  2. var count int
  3. var bufReader = bufio.NewReader(conn)
  4. for {
  5. var req = &types.Request{}
  6. // 从连接上读取请求消息,读取完毕或出错后要通知 waitForClose 协程来关闭连接
  7. err := types.ReadMessage(bufReader, req)
  8. if err != nil {
  9. if err == io.EOF {
  10. closeConn <- err
  11. } else {
  12. closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
  13. }
  14. return
  15. }
  16. s.appMtx.Lock()
  17. count++
  18. // 处理请求时要加锁,这个函数会根据请求的类型调用具体函数来处理,
  19. // 比如 types.Request_DeliverTx 类型时就会调用 KVStoreApplication.DeliverTx 函数来处理,
  20. // 应答会写入 responses 通道,以便 handleResponses 函数处理
  21. s.handleRequest(req, responses)
  22. s.appMtx.Unlock()
  23. }
  24. }

现在看 handleResponses 函数:

  1. func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) {
  2. var count int
  3. var bufWriter = bufio.NewWriter(conn)
  4. for {
  5. // 从 responses 通道读取应答并写入连接。同样,出错时要通知 waitForClose 来关闭连接
  6. var res = <-responses
  7. err := types.WriteMessage(res, bufWriter)
  8. if err != nil {
  9. closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
  10. return
  11. }
  12. // flush 类型的应答是哪里来的?
  13. // 与客户端处理类似,如果是此类型要进行 Flush 处理,把缓冲的数据写入连接
  14. if _, ok := res.Value.(*types.Response_Flush); ok {
  15. err = bufWriter.Flush()
  16. if err != nil {
  17. closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
  18. return
  19. }
  20. }
  21. count++
  22. }
  23. }

deliver_tx 子命令执行过程

现在以此命令为例说明发起请求及收到应答的整体过程。

  1. 服务端启动,启动两个协程,一个处理请求,一个处理应答。
  2. 在命令行输入 abci-cli console 进入交互模式,创建客户端并与服务端建立了持久连接。服务端和客户端各启动两个协程,一个处理请求,一个处理应答。
  3. 输入 deliver_tx "abc",客户端会识别子命令,调用 cmdDeliverTx 函数,此函数在解析到 tx 后进行编码,随后会调用 cli.DeliverTxSync(txBytes) (同步的) 函数。
  4. 请求消息写入连接后,服务端读取到此请求。
  5. 根据请求类型,调用 KVStore 应用的 DeliverTx 函数进行处理。
  6. 服务端处理完毕后的应答写入连接,客户端接收到应答,前端打印显示到命令行。
  • 分享 收藏
0 条评论
  • 这篇文章暂无评论,赶紧评论一下吧~