Go语言实战之实现一个简单分布式系统

2022-10-27 20:06:33
目录
引子思路实战节点通信主节点工作节点将它们放在一起代码效果总结

引子

如今很多云原生系统、分布式系统,例如>

思路

在开始写代码之前,我们先思考一下需要实现些什么。

    主节点(Master>工作节点(Worker Node):执行者,相当于军队中的士兵,执行任务

    除了上面的概念以外,我们需要实现一些简单功能。

      上报运行状态(Report Status):工作节点向主节点上报当前状态分派任务(Assign Task):通过 API 向主节点发起请求,主节点再向工作节点分派任务运行脚本(Execute Script):工作节点执行任务中的脚本

      整个流程示意图如下。

      实战

      节点通信

      节点之间的通信在分布式系统中非常重要,毕竟每个节点或机器如果孤立运行,就失去了分布式系统的意义。因此,节点通信在分布式系统中是核心模块。

      gRPC>

      首先,我们来想一下,如何让节点之间进行相互通信。最常用的通信方式就是 API,不过这个通信方式有个缺点,就是需要将各个节点的 IP 地址及端口显示暴露给其他节点,这在公网中是不太安全的。因此,我们选择了 gRPC,一种流行的远程过程调用(Remote Procedure Call,RPC)框架。这里我们不过多的解释 RPC 或 gRPC 的原理,简而言之,就是能让调用者在远程机器上执行命令的协议方式。

      为了使用 gRPC 框架,我们先创建 go.mod 并输入以下内容,并执行 go mod download。注意:对于国内的朋友,或许需要添加代理才能正常下载,可以先执行 export GOPROXY=goproxy.cn,direct 后再执行下载命令。

      module go-distributed-system
      ​
      go 1.17
      ​
      require (
        github.com/golang/protobuf v1.5.0
        google.golang.org/grpc v1.27.0
        google.golang.org/protobuf v1.27.1
      )

      然后,我们创建 Protocol Buffers 文件 node.proto(表示节点对应的 gRPC 协议文件),并输入以下内容。

      syntax = "proto3";
      ​
      package core;
      option go_package = ".;core";
      ​
      message Request {
        string action = 1;
      }
      ​
      message Response {
        string data = 1;
      }
      ​
      service NodeService {
        rpc ReportStatus(Request) returns (Response){};       // Simple RPC
        rpc AssignTask(Request) returns (stream Response){};  // Server-Side RPC
      }

      在这里我们创建了两个 RPC 服务,分别是负责上报状态的 Simple RPC ReportStatus 以及 Server-Side RPC AssignTask。Simple RPC 和 Server-Side RPC 的区别如下图所示,主要区别在于 Server-Side RPC 可以从通过流(Stream)向客户端(Client)主动发送数据,而 Simple RPC 只能从客户端向服务端(Server)发请求。

      创建好 .proto 文件后,我们需要将这个 gRPC 协议文件转化为 .go 代码文件,从而能被 Go 程序引用。在命令行窗口中执行如下命令。注意:编译工具 protoc 不是自带的,需要单独下载,具体可以参考文档 https://grpc.io/docs/protoc-installation/。

      mkdir core
      protoc --go_out=./core \
          --go-grpc_out=./core \
          node.proto
      

      执行完后,可以在 core 目录下看到两个 Go 代码文件, node.pb.gonode_grpc.pb.go,这相当于 Go 程序中对应的 gRPC 库。

      gRPC 服务端

      现在开始编写服务端逻辑。

      咱们先创建一个新文件 core/node_service_server.go,输入以下内容。主要逻辑就是实现了之前创建好的 gRPC 协议中的两个调用方法。其中,暴露了 CmdChannel 这个通道(Channel)来获取需要发送到工作节点的命令。

      package core
      ​
      import (
        "context"
      )
      ​
      type NodeServiceGrpcServer struct {
        UnimplementedNodeServiceServer
      ​
        // channel to receive command
        CmdChannel chan string
      }
      ​
      func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) {
        return &Response{Data: "ok"}, nil
      }
      ​
      func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error {
        for {
          select {
          case cmd := <-n.CmdChannel:
            // receive command and send to worker node (client)
            if err := server.Send(&Response{Data: cmd}); err != nil {
              return err
            }
          }
        }
      }
      ​
      var server *NodeServiceGrpcServer
      ​
      // GetNodeServiceGrpcServer singleton service
      func GetNodeServiceGrpcServer() *NodeServiceGrpcServer {
        if server == nil {
          server = &NodeServiceGrpcServer{
            CmdChannel: make(chan string),
          }
        }
        return server
      }

      gRPC 客户端

      gRPC 客户端不需要具体实现,我们通常只需要调用 gRPC 客户端的方法,程序会自动发起向服务端的请求以及获取后续的响应。

      主节点

      编写好了节点通信的基础部分,现在我们需要实现主节点了,这是整个中心化分布式系统的核心。

      咱们创建一个新的文件>node.go,输入以下内容。

      package core
      ​
      import (
        "github.com/gin-gonic/gin"
        "google.golang.org/grpc"
        "net"
        "net/http"
      )
      ​
      // MasterNode is the node instance
      type MasterNode struct {
        api     *gin.Engine            // api server
        ln      net.Listener           // listener
        svr     *grpc.Server           // grpc server
        nodeSvr *NodeServiceGrpcServer // node service
      }
      ​
      func (n *MasterNode) Init() (err error) {
        // TODO: implement me
        panic("implement me")
      }
      ​
      func (n *MasterNode) Start() {
        // TODO: implement me
        panic("implement me")
      }
      ​
      var node *MasterNode
      ​
      // GetMasterNode returns the node instance
      func GetMasterNode() *MasterNode {
        if node == nil {
          // node
          node = &MasterNode{}
      ​
          // initialize node
          if err := node.Init(); err != nil {
            panic(err)
          }
        }
      ​
        return node
      }

      其中,我们创建了两个占位方法 InitStart,我们分别实现。

      在初始化方法 Init 中,我们需要做几件事情:

        注册 gRPC 服务注册 API 服务

        现在,在 Init 方法中加入如下代码。

        func (n *MasterNode) Init() (err error) {
          // grpc server listener with port as 50051
          n.ln, err = net.Listen("tcp", ":50051")
          if err != nil {
            return err
          }
        ​
          // grpc server
          n.svr = grpc.NewServer()
        ​
          // node service
          n.nodeSvr = GetNodeServiceGrpcServer()
        ​
          // register node service to grpc server
          RegisterNodeServiceServer(node.svr, n.nodeSvr)
        ​
          // api
          n.api = gin.Default()
          n.api.POST("/tasks", func(c *gin.Context) {
            // parse payload
            var payload struct {
              Cmd string `json:"cmd"`
            }
            if err := c.ShouldBindJSON(&payload); err != nil {
              c.AbortWithStatus(http.StatusBadRequest)
              return
            }
        ​
            // send command to node service
            n.nodeSvr.CmdChannel <- payload.Cmd
        ​
            c.AbortWithStatus(http.StatusOK)
          })
        ​
          return nil
        }

        可以看到,我们新建了一个 gRPC Server,并将之前的 NodeServiceGrpcServer 注册了进去。另外,我们还用 gin 框架创建了一个简单的 API 服务,可以 POST 请求到 /tasksNodeServiceGrpcServer 中的命令通道 CmdChannel 传送命令。这样就将各个部件串接起来了!

        启动方法 Start 很简单,就是启动 gRPC Server 以及 API Server。

        func (n *MasterNode) Start() {
          // start grpc server
          go n.svr.Serve(n.ln)
        ​
          // start api server
          _ = n.api.Run(":9092")
        ​
          // wait for exit
          n.svr.Stop()
        }

        下一步,我们就要实现实际做任务的工作节点了。

        工作节点

        现在,我们创建一个新文件>core/worker_node.go,输入以下内容。

        package core
        ​
        import (
          "context"
          "google.golang.org/grpc"
          "os/exec"
        )
        ​
        type WorkerNode struct {
          conn *grpc.ClientConn  // grpc client connection
          c    NodeServiceClient // grpc client
        }
        ​
        func (n *WorkerNode) Init() (err error) {
          // connect to master node
          n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
          if err != nil {
            return err
          }
        ​
          // grpc client
          n.c = NewNodeServiceClient(n.conn)
        ​
          return nil
        }
        ​
        func (n *WorkerNode) Start() {
          // log
          fmt.Println("worker node started")
        ​
          // report status
          _, _ = n.c.ReportStatus(context.Background(), &Request{})
        ​
          // assign task
          stream, _ := n.c.AssignTask(context.Background(), &Request{})
          for {
            // receive command from master node
            res, err := stream.Recv()
            if err != nil {
              return
            }
        ​
            // log command
            fmt.Println("received command: ", res.Data)
        ​
            // execute command
            parts := strings.Split(res.Data, " ")
            if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil {
              fmt.Println(err)
            }
          }
        }
        ​
        var workerNode *WorkerNode
        ​
        func GetWorkerNode() *WorkerNode {
          if workerNode == nil {
            // node
            workerNode = &WorkerNode{}
        ​
            // initialize node
            if err := workerNode.Init(); err != nil {
              panic(err)
            }
          }
        ​
          return workerNode
        }

        其中,我们在初始化方法 Init 中创建了gRPC 客户端,并连接了主节点的 gRPC 服务端。

        在启动方法 Start 中做了几件事情:

          调用上报状态(Report Status)的 Simple RPC 方法调用分配任务(Assign Task)的 Server-Side RPC 方法,获取到了流(Stream)通过循环不断接受流传输过来的来自服务端(也就是主节点)的信息,并执行命令

          这样,整个包含主节点、工作节点的分布式系统核心逻辑就写好了!

          将它们放在一起

          最后,我们需要将这些核心逻辑用命令行工具封装一下,以便启用。

          创建主程序文件>main.go,并输入以下内容。

          package main
          ​
          import (
            "go-distributed-system/core"
            "os"
          )
          ​
          func main() {
            nodeType := os.Args[0]
            switch nodeType {
            case "master":
              core.GetMasterNode().Start()
            case "worker":
              core.GetWorkerNode().Start()
            default:
              panic("invalid node type")
            }
          }

          这样,整个简单的分布式系统就创建好了!

          代码效果

          下面我们来运行一下代码。

          打开两个命令行窗口,其中一个输入>go run main.go master 启动主节点,另一个输入 go run main.go worker 启动工作节点。

          如果主节点启动成功,将会看到如下日志信息。

          [GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
          ​
          [GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
           - using env:   export GIN_MODE=release
           - using code:  gin.SetMode(gin.ReleaseMode)
          ​
          [GIN-debug] POST   /tasks                    --> go-distributed-system/core.(*MasterNode).Init.func1 (3 handlers)
          [GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value.
          Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details.
          [GIN-debug] Listening and serving HTTP on :9092
          

          如果工作节点启动成功,将会看到如下日志信息。

          worker node started

          主节点、工作节点都启动成功后,我们在另外一个命令行窗口中输入如下命令来发起 API 请求。

          curl -X POST \
            -H "Content-Type: application/json" \
            -d '{"cmd": "touch /tmp/hello-distributed-system"}' \
            http://localhost:9092/tasks

          在工作节点窗口应该可以看到日志 received command: touch /tmp/hello-distributed-system

          然后查看文件是否顺利生成,执行 ls -l /tmp/hello-distributed-system

          -rw-r--r--  1 marvzhang  wheel     0B Oct 26 12:22 /tmp/hello-distributed-system

          文件成功生成,表示已经通过工作节点执行成功了!大功告成!

          总结

          本篇文章通过>

            gRPCProtocol Bufferschannelginos/exec

            整个代码示例仓库在 GitHub 上: https://github.com/tikazyq/codao-code/tree/main/2022-10/go-distributed-system

            到此这篇关于Go语言实战之实现一个简单分布式系统的文章就介绍到这了,更多相关Go语言分布式系统内容请搜索易采站长站以前的文章或继续浏览下面的相关文章希望大家以后多多支持易采站长站!