Golang微服务架构实践使用Go-kit和Consul
推荐
在线提问>>
Golang 微服务架构实践:使用 Go-kit 和 Consul
随着互联网的不断发展和应用场景的不断扩展,微服务架构逐渐成为了一种趋势。在微服务架构中,每一个服务都是一个独立的进程,服务之间通过网络进行通信。而 Go 语言以其高并发、高性能的特性,成为了微服务架构中的主流语言。本文介绍了如何使用 Go-kit 和 Consul 实现微服务架构中的服务注册与发现、负载均衡和服务治理等功能。
一、Go-kit 简介
Go-kit 是一个用于编写微服务的工具集,它提供了一些基础组件,包括:
1.服务发现:支持多种服务发现机制,如 Consul、Etcd、Zookeeper 等。
2.请求路由:自动将请求路由到适当的服务实例。
3.负载均衡:支持多种负载均衡策略,如随机、轮询、加权轮询、一致性哈希等。
4.中间件:提供可插拔的中间件,如日志、链路追踪、限流等。
5.服务治理:提供可插拔的服务治理组件,如断路器、重试等。
Go-kit 基于 Go 语言标准库编写,使用了 Go 语言的特性,如 Context、Error 等。它提供了良好的可扩展性和可定制性,可以方便地扩展和定制自己需要的组件。
二、Consul 简介
Consul 是一个开源的服务发现和配置管理系统,它提供了服务发现、健康检查、KV 存储、多数据中心等功能。Consul 采用了 Raft 协议来保证数据的一致性和高可用性。
为了实现服务发现和负载均衡,我们使用 Consul 作为服务注册中心。服务注册中心是微服务架构中的重要组件之一,它用于将服务注册到中心,并提供服务的发现和负载均衡功能。
三、代码实现
现在我们来实现一个简单的微服务架构,包括一个用户服务和一个订单服务。用户服务提供了用户注册和查询用户信息的功能,订单服务提供了下单和查询订单的功能。我们使用 Go-kit 和 Consul 实现这个微服务架构。
1. 创建项目目录
首先,我们创建一个项目目录,包括用户服务和订单服务两个子目录,以及一个公共目录。公共目录用于存放通用的代码、工具和配置文件等。
2. 安装 Go-kit 和 Consul 包
使用以下命令安装 Go-kit 和 Consul 包:
go get github.com/go-kit/kitgo get github.com/hashicorp/consul/api
3. 编写服务接口和实现
我们先定义服务接口:
type UserService interface { Register(ctx context.Context, user *User) error GetUserInfo(ctx context.Context, userID int) (*User, error)}type OrderService interface { PlaceOrder(ctx context.Context, order *Order) error GetOrderInfo(ctx context.Context, orderID int) (*Order, error)}
然后,我们实现这些接口:
type userService struct { repo UserRepository}func NewUserService(repo UserRepository) UserService { return &userService{ repo: repo, }}func (s *userService) Register(ctx context.Context, user *User) error { return s.repo.CreateUser(user)}func (s *userService) GetUserInfo(ctx context.Context, userID int) (*User, error) { return s.repo.FindUserByID(userID)}type orderService struct { repo OrderRepository}func NewOrderService(repo OrderRepository) OrderService { return &orderService{ repo: repo, }}func (s *orderService) PlaceOrder(ctx context.Context, order *Order) error { return s.repo.CreateOrder(order)}func (s *orderService) GetOrderInfo(ctx context.Context, orderID int) (*Order, error) { return s.repo.FindOrderByID(orderID)}
4. 集成 Consul
我们使用 Consul 作为服务注册中心,为了方便起见,我们先启动一个本地的 Consul 服务。使用以下命令启动 Consul 服务:
consul agent -dev
然后,我们在公共目录创建一个 Consul 客户端:
type ConsulClient struct { client *api.Client}func NewConsulClient() (*ConsulClient, error) { config := api.DefaultConfig() config.Address = "localhost:8500" client, err := api.NewClient(config) if err != nil { return nil, err } return &ConsulClient{ client: client, }, nil}func (c *ConsulClient) RegisterService(ctx context.Context, serviceID, serviceName, serviceAddress string, servicePort int) error { registration := &api.AgentServiceRegistration{ ID: serviceID, Name: serviceName, Address: serviceAddress, Port: servicePort, Check: &api.AgentServiceCheck{ DeregisterCriticalServiceAfter: "1m", HTTP: fmt.Sprintf("http://%s:%d/health", serviceAddress, servicePort), Interval: "10s", }, } err := c.client.Agent().ServiceRegister(registration) if err != nil { return err } return nil}func (c *ConsulClient) DeregisterService(ctx context.Context, serviceID string) error { err := c.client.Agent().ServiceDeregister(serviceID) if err != nil { return err } return nil}func (c *ConsulClient) DiscoverService(ctx context.Context, serviceName string) (string, error) { services, _, err := c.client.Catalog().Service(serviceName, "", nil) if err != nil { return "", err } if len(services) == 0 { return "", errors.New("no available service") } randIndex := rand.Intn(len(services)) service := services return fmt.Sprintf("%s:%d", service.ServiceAddress, service.ServicePort), nil}
我们使用 NewConsulClient 函数创建一个 Consul 客户端,它连接到本地的 Consul 服务。然后,我们实现了 RegisterService、DeregisterService 和 DiscoverService 方法。RegisterService 用于将服务注册到 Consul 中心,DeregisterService 用于将服务从 Consul 中心注销,DiscoverService 用于发现服务实例。我们使用随机策略从所有服务实例中选择一个服务实例。
5. 实现 HTTP 传输协议
我们使用 HTTP 作为传输协议,接收 HTTP 请求,然后将请求转换为 Go-kit 的 endpoint,并将请求发送到对应的服务。
我们在公共目录创建一个 transport 包,用于实现 HTTP 传输协议:
type HTTPServer struct { server *http.Server}func NewHTTPServer(addr string, handler http.Handler) *HTTPServer { return &HTTPServer{ server: &http.Server{ Addr: addr, Handler: handler, }, }}func (s *HTTPServer) Start() error { return s.server.ListenAndServe()}func (s *HTTPServer) Stop(ctx context.Context) error { return s.server.Shutdown(ctx)}func EncodeJSONResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error { w.Header().Set("Content-Type", "application/json") return json.NewEncoder(w).Encode(response)}func DecodeJSONRequest(ctx context.Context, r *http.Request, request interface{}) error { return json.NewDecoder(r.Body).Decode(request)}
我们实现了 NewHTTPServer、Start 和 Stop 方法,用于启动和停止 HTTP 服务器。同时,我们实现了 EncodeJSONResponse 和 DecodeJSONRequest 方法,用于将 HTTP 请求和响应转换为 Go-kit 的请求和响应。
6. 实现服务端
我们使用 Go-kit 的 grpc 包实现服务端。首先,我们在公共目录创建一个 endpoint 包:
func MakeRegisterEndpoint(svc UserService) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { req := request.(*RegisterRequest) user := &User{ Name: req.Name, Email: req.Email, Password: req.Password, } err := svc.Register(ctx, user) if err != nil { return nil, err } return &RegisterResponse{}, nil }}func MakeGetUserInfoEndpoint(svc UserService) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { req := request.(*GetUserInfoRequest) user, err := svc.GetUserInfo(ctx, req.UserID) if err != nil { return nil, err } return &GetUserInfoResponse{ User: user, }, nil }}func MakePlaceOrderEndpoint(svc OrderService) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { req := request.(*PlaceOrderRequest) order := &Order{ UserID: req.UserID, ProductID: req.ProductID, Quantity: req.Quantity, } err := svc.PlaceOrder(ctx, order) if err != nil { return nil, err } return &PlaceOrderResponse{}, nil }}func MakeGetOrderInfoEndpoint(svc OrderService) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { req := request.(*GetOrderInfoRequest) order, err := svc.GetOrderInfo(ctx, req.OrderID) if err != nil { return nil, err } return &GetOrderInfoResponse{ Order: order, }, nil }}
我们实现了 MakeRegisterEndpoint、MakeGetUserInfoEndpoint、MakePlaceOrderEndpoint 和 MakeGetOrderInfoEndpoint 方法,用于将服务实现转换为 Go-kit 的 endpoint。
然后,我们在用户服务和订单服务中分别创建一个服务实现,并实现 gRPC 服务:
用户服务:
func main() { client, err := NewConsulClient() if err != nil { log.Fatal(err) } repo := NewMemoryUserRepository() svc := NewUserService(repo) registerEndpoint := MakeRegisterEndpoint(svc) registerEndpoint = ratelimit.NewTokenBucketLimiter(rate.NewLimiter(1, 1))(registerEndpoint) getUserInfoEndpoint := MakeGetUserInfoEndpoint(svc) getUserInfoEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "GetUserInfo", OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) { log.Printf("%s: state changed from %s to %s\n", name, from, to) }, }))(getUserInfoEndpoint) endpoints := UserEndpoints{ RegisterEndpoint: registerEndpoint, GetUserInfoEndpoint: getUserInfoEndpoint, } svcHandler := NewGRPCServer(endpoints) grpcListener, err := net.Listen("tcp", ":0") if err != nil { log.Fatal(err) } defer grpcListener.Close() grpcServer := grpc.NewServer() pb.RegisterUserServiceServer(grpcServer, svcHandler) serviceID := fmt.Sprintf("UserService-%s", uuid.New().String()) err = client.RegisterService(context.Background(), serviceID, "UserService", "localhost", grpcListener.Addr().(*net.TCPAddr).Port) if err != nil { log.Fatal(err) } defer client.DeregisterService(context.Background(), serviceID) log.Printf("UserService has been registered to Consul: %s\n", serviceID) err = grpcServer.Serve(grpcListener) if err != nil { log.Fatal(err) }}
订单服务:
func main() { client, err := NewConsulClient() if err != nil { log.Fatal(err) } repo := NewMemoryOrderRepository() svc := NewOrderService(repo) placeOrderEndpoint := MakePlaceOrderEndpoint(svc) placeOrderEndpoint = ratelimit.NewTokenBucketLimiter(rate.NewLimiter(1, 1))(placeOrderEndpoint) getOrderInfoEndpoint := MakeGetOrderInfoEndpoint(svc) getOrderInfoEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "GetOrderInfo", OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) { log.Printf("%s: state changed from %s to %s\n", name, from, to) }, }))(getOrderInfoEndpoint) endpoints := OrderEndpoints{ PlaceOrderEndpoint: placeOrderEndpoint, GetOrderInfoEndpoint: getOrderInfoEndpoint, } svcHandler := NewGRPCServer(endpoints) grpcListener, err := net.Listen("tcp", ":0") if err != nil { log.Fatal(err) } defer grpcListener.Close() grpcServer := grpc.NewServer() pb.RegisterOrderServiceServer(grpcServer, svcHandler) serviceID := fmt.Sprintf("OrderService-%s", uuid.New().String()) err = client.RegisterService(context.Background(), serviceID, "OrderService", "localhost", grpcListener.Addr().(*net.TCPAddr).Port) if err != nil { log.Fatal(err) } defer client.DeregisterService(context.Background(), serviceID) log.Printf("OrderService has been registered to Consul: %s\n", serviceID) err = grpcServer.Serve(grpcListener) if err != nil { log.Fatal(err) }}
我们实现了 grpc 包中的 gRPC 服务器,然后将服务注册到 Consul 中心。
7. 实现客户端
最后,我们使用 Go-kit 的 grpc 包实现客户端。我们在公共目录创建一个 proxy 包:
type UserServiceProxy struct {
registerEndpoint endpoint.Endpoint
getUserInfoEndpoint endpoint.Endpoint
}
func NewUserServiceProxy(client *grpc.ClientConn) *UserServiceProxy {
registerEndpoint := kitgrpc.NewClient(
client,
"pb.UserService",
"Register",
EncodeGRPCEncRequest,
DecodeGRPCRegisterResponse,
pb.RegisterUserServiceClient{},
).Endpoint()
getUserInfoEndpoint := kitgrpc.NewClient(
client,
"pb.UserService",
"GetUserInfo",
EncodeGRPCEncRequest,
DecodeGRPCGetUserInfoResponse,
pb.RegisterUserServiceClient{},
).Endpoint()
return &UserServiceProxy{
registerEndpoint: registerEndpoint,
getUserInfoEndpoint: getUserInfoEndpoint,
}
}
func (p *UserServiceProxy) Register(ctx context.Context, user *User) error {
request := &RegisterRequest{
Name: user.Name,