全国旗舰校区

不同学习城市 同样授课品质

北京

深圳

上海

广州

郑州

大连

武汉

成都

西安

杭州

青岛

重庆

长沙

哈尔滨

南京

太原

沈阳

合肥

贵阳

济南

下一个校区
就在你家门口
+
当前位置:首页  >  技术干货  >  详情

Golang微服务架构实践使用Go-kit和Consul

来源:千锋教育
发布人:xqq
2023-12-27

推荐

在线提问>>

Golang 微服务架构实践:使用 Go-kit 和 Consul

随着互联网的不断发展和应用场景的不断扩展,微服务架构逐渐成为了一种趋势。在微服务架构中,每一个服务都是一个独立的进程,服务之间通过网络进行通信。而 Go 语言以其高并发、高性能的特性,成为了微服务架构中的主流语言。本文介绍了如何使用 Go-kit 和 Consul 实现微服务架构中的服务注册与发现、负载均衡和服务治理等功能。

一、Go-kit 简介

Go-kit 是一个用于编写微服务的工具集,它提供了一些基础组件,包括:

1.服务发现:支持多种服务发现机制,如 Consul、Etcd、Zookeeper 等。

2.请求路由:自动将请求路由到适当的服务实例。

3.负载均衡:支持多种负载均衡策略,如随机、轮询、加权轮询、一致性哈希等。

4.中间件:提供可插拔的中间件,如日志、链路追踪、限流等。

5.服务治理:提供可插拔的服务治理组件,如断路器、重试等。

Go-kit 基于 Go 语言标准库编写,使用了 Go 语言的特性,如 Context、Error 等。它提供了良好的可扩展性和可定制性,可以方便地扩展和定制自己需要的组件。

二、Consul 简介

Consul 是一个开源的服务发现和配置管理系统,它提供了服务发现、健康检查、KV 存储、多数据中心等功能。Consul 采用了 Raft 协议来保证数据的一致性和高可用性。

为了实现服务发现和负载均衡,我们使用 Consul 作为服务注册中心。服务注册中心是微服务架构中的重要组件之一,它用于将服务注册到中心,并提供服务的发现和负载均衡功能。

三、代码实现

现在我们来实现一个简单的微服务架构,包括一个用户服务和一个订单服务。用户服务提供了用户注册和查询用户信息的功能,订单服务提供了下单和查询订单的功能。我们使用 Go-kit 和 Consul 实现这个微服务架构。

1. 创建项目目录

首先,我们创建一个项目目录,包括用户服务和订单服务两个子目录,以及一个公共目录。公共目录用于存放通用的代码、工具和配置文件等。

2. 安装 Go-kit 和 Consul 包

使用以下命令安装 Go-kit 和 Consul 包:

go get github.com/go-kit/kitgo get github.com/hashicorp/consul/api

3. 编写服务接口和实现

我们先定义服务接口:

type UserService interface {    Register(ctx context.Context, user *User) error    GetUserInfo(ctx context.Context, userID int) (*User, error)}type OrderService interface {    PlaceOrder(ctx context.Context, order *Order) error    GetOrderInfo(ctx context.Context, orderID int) (*Order, error)}

然后,我们实现这些接口:

type userService struct {    repo UserRepository}func NewUserService(repo UserRepository) UserService {    return &userService{        repo: repo,    }}func (s *userService) Register(ctx context.Context, user *User) error {    return s.repo.CreateUser(user)}func (s *userService) GetUserInfo(ctx context.Context, userID int) (*User, error) {    return s.repo.FindUserByID(userID)}type orderService struct {    repo OrderRepository}func NewOrderService(repo OrderRepository) OrderService {    return &orderService{        repo: repo,    }}func (s *orderService) PlaceOrder(ctx context.Context, order *Order) error {    return s.repo.CreateOrder(order)}func (s *orderService) GetOrderInfo(ctx context.Context, orderID int) (*Order, error) {    return s.repo.FindOrderByID(orderID)}

4. 集成 Consul

我们使用 Consul 作为服务注册中心,为了方便起见,我们先启动一个本地的 Consul 服务。使用以下命令启动 Consul 服务:

consul agent -dev

然后,我们在公共目录创建一个 Consul 客户端:

type ConsulClient struct {    client *api.Client}func NewConsulClient() (*ConsulClient, error) {    config := api.DefaultConfig()    config.Address = "localhost:8500"    client, err := api.NewClient(config)    if err != nil {        return nil, err    }    return &ConsulClient{        client: client,    }, nil}func (c *ConsulClient) RegisterService(ctx context.Context, serviceID, serviceName, serviceAddress string, servicePort int) error {    registration := &api.AgentServiceRegistration{        ID:      serviceID,        Name:    serviceName,        Address: serviceAddress,        Port:    servicePort,        Check: &api.AgentServiceCheck{            DeregisterCriticalServiceAfter: "1m",            HTTP:                           fmt.Sprintf("http://%s:%d/health", serviceAddress, servicePort),            Interval:                       "10s",        },    }    err := c.client.Agent().ServiceRegister(registration)    if err != nil {        return err    }    return nil}func (c *ConsulClient) DeregisterService(ctx context.Context, serviceID string) error {    err := c.client.Agent().ServiceDeregister(serviceID)    if err != nil {        return err    }    return nil}func (c *ConsulClient) DiscoverService(ctx context.Context, serviceName string) (string, error) {    services, _, err := c.client.Catalog().Service(serviceName, "", nil)    if err != nil {        return "", err    }    if len(services) == 0 {        return "", errors.New("no available service")    }    randIndex := rand.Intn(len(services))    service := services    return fmt.Sprintf("%s:%d", service.ServiceAddress, service.ServicePort), nil}

我们使用 NewConsulClient 函数创建一个 Consul 客户端,它连接到本地的 Consul 服务。然后,我们实现了 RegisterService、DeregisterService 和 DiscoverService 方法。RegisterService 用于将服务注册到 Consul 中心,DeregisterService 用于将服务从 Consul 中心注销,DiscoverService 用于发现服务实例。我们使用随机策略从所有服务实例中选择一个服务实例。

5. 实现 HTTP 传输协议

我们使用 HTTP 作为传输协议,接收 HTTP 请求,然后将请求转换为 Go-kit 的 endpoint,并将请求发送到对应的服务。

我们在公共目录创建一个 transport 包,用于实现 HTTP 传输协议:

type HTTPServer struct {    server *http.Server}func NewHTTPServer(addr string, handler http.Handler) *HTTPServer {    return &HTTPServer{        server: &http.Server{            Addr:    addr,            Handler: handler,        },    }}func (s *HTTPServer) Start() error {    return s.server.ListenAndServe()}func (s *HTTPServer) Stop(ctx context.Context) error {    return s.server.Shutdown(ctx)}func EncodeJSONResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {    w.Header().Set("Content-Type", "application/json")    return json.NewEncoder(w).Encode(response)}func DecodeJSONRequest(ctx context.Context, r *http.Request, request interface{}) error {    return json.NewDecoder(r.Body).Decode(request)}

我们实现了 NewHTTPServer、Start 和 Stop 方法,用于启动和停止 HTTP 服务器。同时,我们实现了 EncodeJSONResponse 和 DecodeJSONRequest 方法,用于将 HTTP 请求和响应转换为 Go-kit 的请求和响应。

6. 实现服务端

我们使用 Go-kit 的 grpc 包实现服务端。首先,我们在公共目录创建一个 endpoint 包:

func MakeRegisterEndpoint(svc UserService) endpoint.Endpoint {    return func(ctx context.Context, request interface{}) (interface{}, error) {        req := request.(*RegisterRequest)        user := &User{            Name:     req.Name,            Email:    req.Email,            Password: req.Password,        }        err := svc.Register(ctx, user)        if err != nil {            return nil, err        }        return &RegisterResponse{}, nil    }}func MakeGetUserInfoEndpoint(svc UserService) endpoint.Endpoint {    return func(ctx context.Context, request interface{}) (interface{}, error) {        req := request.(*GetUserInfoRequest)        user, err := svc.GetUserInfo(ctx, req.UserID)        if err != nil {            return nil, err        }        return &GetUserInfoResponse{            User: user,        }, nil    }}func MakePlaceOrderEndpoint(svc OrderService) endpoint.Endpoint {    return func(ctx context.Context, request interface{}) (interface{}, error) {        req := request.(*PlaceOrderRequest)        order := &Order{            UserID:    req.UserID,            ProductID: req.ProductID,            Quantity:  req.Quantity,        }        err := svc.PlaceOrder(ctx, order)        if err != nil {            return nil, err        }        return &PlaceOrderResponse{}, nil    }}func MakeGetOrderInfoEndpoint(svc OrderService) endpoint.Endpoint {    return func(ctx context.Context, request interface{}) (interface{}, error) {        req := request.(*GetOrderInfoRequest)        order, err := svc.GetOrderInfo(ctx, req.OrderID)        if err != nil {            return nil, err        }        return &GetOrderInfoResponse{            Order: order,        }, nil    }}

我们实现了 MakeRegisterEndpoint、MakeGetUserInfoEndpoint、MakePlaceOrderEndpoint 和 MakeGetOrderInfoEndpoint 方法,用于将服务实现转换为 Go-kit 的 endpoint。

然后,我们在用户服务和订单服务中分别创建一个服务实现,并实现 gRPC 服务:

用户服务:

func main() {    client, err := NewConsulClient()    if err != nil {        log.Fatal(err)    }    repo := NewMemoryUserRepository()    svc := NewUserService(repo)    registerEndpoint := MakeRegisterEndpoint(svc)    registerEndpoint = ratelimit.NewTokenBucketLimiter(rate.NewLimiter(1, 1))(registerEndpoint)    getUserInfoEndpoint := MakeGetUserInfoEndpoint(svc)    getUserInfoEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{        Name: "GetUserInfo",        OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) {            log.Printf("%s: state changed from %s to %s\n", name, from, to)        },    }))(getUserInfoEndpoint)    endpoints := UserEndpoints{        RegisterEndpoint:    registerEndpoint,        GetUserInfoEndpoint: getUserInfoEndpoint,    }    svcHandler := NewGRPCServer(endpoints)    grpcListener, err := net.Listen("tcp", ":0")    if err != nil {        log.Fatal(err)    }    defer grpcListener.Close()    grpcServer := grpc.NewServer()    pb.RegisterUserServiceServer(grpcServer, svcHandler)    serviceID := fmt.Sprintf("UserService-%s", uuid.New().String())    err = client.RegisterService(context.Background(), serviceID, "UserService", "localhost", grpcListener.Addr().(*net.TCPAddr).Port)    if err != nil {        log.Fatal(err)    }    defer client.DeregisterService(context.Background(), serviceID)    log.Printf("UserService has been registered to Consul: %s\n", serviceID)    err = grpcServer.Serve(grpcListener)    if err != nil {        log.Fatal(err)    }}

订单服务:

func main() {    client, err := NewConsulClient()    if err != nil {        log.Fatal(err)    }    repo := NewMemoryOrderRepository()    svc := NewOrderService(repo)    placeOrderEndpoint := MakePlaceOrderEndpoint(svc)    placeOrderEndpoint = ratelimit.NewTokenBucketLimiter(rate.NewLimiter(1, 1))(placeOrderEndpoint)    getOrderInfoEndpoint := MakeGetOrderInfoEndpoint(svc)    getOrderInfoEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{        Name: "GetOrderInfo",        OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) {            log.Printf("%s: state changed from %s to %s\n", name, from, to)        },    }))(getOrderInfoEndpoint)    endpoints := OrderEndpoints{        PlaceOrderEndpoint:   placeOrderEndpoint,        GetOrderInfoEndpoint: getOrderInfoEndpoint,    }    svcHandler := NewGRPCServer(endpoints)    grpcListener, err := net.Listen("tcp", ":0")    if err != nil {        log.Fatal(err)    }    defer grpcListener.Close()    grpcServer := grpc.NewServer()    pb.RegisterOrderServiceServer(grpcServer, svcHandler)    serviceID := fmt.Sprintf("OrderService-%s", uuid.New().String())    err = client.RegisterService(context.Background(), serviceID, "OrderService", "localhost", grpcListener.Addr().(*net.TCPAddr).Port)    if err != nil {        log.Fatal(err)    }    defer client.DeregisterService(context.Background(), serviceID)    log.Printf("OrderService has been registered to Consul: %s\n", serviceID)    err = grpcServer.Serve(grpcListener)    if err != nil {        log.Fatal(err)    }}

我们实现了 grpc 包中的 gRPC 服务器,然后将服务注册到 Consul 中心。

7. 实现客户端

最后,我们使用 Go-kit 的 grpc 包实现客户端。我们在公共目录创建一个 proxy 包:

type UserServiceProxy struct {

registerEndpoint endpoint.Endpoint

getUserInfoEndpoint endpoint.Endpoint

}

func NewUserServiceProxy(client *grpc.ClientConn) *UserServiceProxy {

registerEndpoint := kitgrpc.NewClient(

client,

"pb.UserService",

"Register",

EncodeGRPCEncRequest,

DecodeGRPCRegisterResponse,

pb.RegisterUserServiceClient{},

).Endpoint()

getUserInfoEndpoint := kitgrpc.NewClient(

client,

"pb.UserService",

"GetUserInfo",

EncodeGRPCEncRequest,

DecodeGRPCGetUserInfoResponse,

pb.RegisterUserServiceClient{},

).Endpoint()

return &UserServiceProxy{

registerEndpoint: registerEndpoint,

getUserInfoEndpoint: getUserInfoEndpoint,

}

}

func (p *UserServiceProxy) Register(ctx context.Context, user *User) error {

request := &RegisterRequest{

Name: user.Name,

相关文章

从Docker到Kubernetes容器技术的演进之路

从部署到监控快速搭建自己的Prometheus监控系统

AI技术在网络安全中的应用:如何提高网络安全防护水平?

从攻击事件看网络安全:利用大数据分析来升级企业安全防护

浅析加密算法:RSA、AES、DES等主流加密技术解析

开班信息 更多>>

课程名称
全部学科
咨询

HTML5大前端

Java分布式开发

Python数据分析

Linux运维+云计算

全栈软件测试

大数据+数据智能

智能物联网+嵌入式

网络安全

全链路UI/UE设计

Unity游戏开发

新媒体短视频直播电商

影视剪辑包装

游戏原画

    在线咨询 免费试学 教程领取