Etcd
安装
go get github.com/kitex-contrib/registry-etcd
服务注册
创建 Registry
提供了三个创建 Registry 的函数
NewEtcdRegistry
NewEtcdRegistry 使用 etcd 创建一个新的服务注册中心,需要传入端点值。可自定义服务注册中心配置,配置详情见 Option。
函数签名:
func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, error)
示例代码:
package main
import (
    ...
    "github.com/cloudwego/kitex/pkg/rpcinfo"
    "github.com/cloudwego/kitex/server"
    etcd "github.com/kitex-contrib/registry-etcd"
    ...
)
func main() {
    ...
    r, err := etcd.NewEtcdRegistry([]string{"127.0.0.1:2379"}) // r should not be reused.
    if err != nil {
        log.Fatal(err)
    }
    // /docs/tutorials/framework-exten/registry/#integrate-into-kitex
    server, err := echo.NewServer(new(EchoImpl), server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: "echo"}), server.WithRegistry(r))
    if err != nil {
        log.Fatal(err)
    }
    err = server.Run()
    if err != nil {
        log.Fatal(err)
    }
    ...
}
NewEtcdRegistryWithAuth
NewEtcdRegistryWithAuth 创建服务注册中心需要传入 auth 参数。
函数签名:
func NewEtcdRegistryWithAuth(endpoints []string, username, password string) (registry.Registry, error)
代码示例:
package main
import (
    ...
	"github.com/cloudwego/kitex/server"
	etcd "github.com/kitex-contrib/registry-etcd"
)
type HelloImpl struct{}
func (h *HelloImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
	resp = &api.Response{
		Message: req.Message,
	}
	return
}
func main() {
	// creates a etcd based registry with given username and password
	r, err := etcd.NewEtcdRegistryWithAuth([]string{"127.0.0.1:2379"}, "username", "password")
	if err != nil {
		log.Fatal(err)
	}
	server := hello.NewServer(new(HelloImpl), server.WithRegistry(r), server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
		ServiceName: "Hello",
	}))
	err = server.Run()
	if err != nil {
		log.Fatal(err)
	}
}
NewEtcdRegistryWithRetry
NewEtcdRegistryWithRetry 创建服务注册中心传入自定义 Retry 配置。
函数签名:
func NewEtcdRegistryWithRetry(endpoints []string, retryConfig *retry.Config, opts ...Option) (registry.Registry, error)
使用 NewRetryConfig(opts ...Option) *Config 生成 Retry 配置,配置详情见 Option。
代码示例:
package main
import (
	"context"
	"log"
	"time"
	"github.com/cloudwego/kitex-examples/hello/kitex_gen/api"
	"github.com/cloudwego/kitex-examples/hello/kitex_gen/api/hello"
	"github.com/cloudwego/kitex/pkg/rpcinfo"
	"github.com/cloudwego/kitex/server"
	etcd "github.com/kitex-contrib/registry-etcd"
	"github.com/kitex-contrib/registry-etcd/retry"
)
type HelloImpl struct{}
func (h *HelloImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
	resp = &api.Response{
		Message: req.Message,
	}
	return
}
func main() {
	retryConfig := retry.NewRetryConfig(
		retry.WithMaxAttemptTimes(10),
		retry.WithObserveDelay(20*time.Second),
		retry.WithRetryDelay(5*time.Second),
	)
	r, err := etcd.NewEtcdRegistryWithRetry([]string{"127.0.0.1:2379"}, retryConfig)
	if err != nil {
		log.Fatal(err)
	}
	server := hello.NewServer(new(HelloImpl), server.WithRegistry(r), server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
		ServiceName: "Hello",
	}))
	err = server.Run()
	if err != nil {
		log.Fatal(err)
	}
}
Option
Etcd 拓展在服务注册部分中提供了 option 配置。
WithTLSOpt
Etcd 扩展提供了 WithTLSOpt 用于帮助用户配置 Etcd 中的 TLS 选项。
函数签名:
func WithTLSOpt(certFile, keyFile, caFile string) Option
WithAuthOpt
Etcd 扩展提供了 WithAuthOpt 用于帮助用户配置 Etcd 中的 Username 和 Password 选项。
函数签名:
func WithAuthOpt(username, password string) Option
WithDialTimeoutOpt
Etcd 扩展提供了 WithTimeoutOpt 用于帮助用户配置连接超时时间。
func WithDialTimeoutOpt(dialTimeout time.Duration) Option
WithEtcdServicePrefix
Etcd 扩展提供了 WithEtcdServicePrefix 来帮助用户配置服务注册键前缀。
函数签名:
func WithEtcdServicePrefix(prefix string) Option
Retry
在服务注册到 etcd 之后,它会定期检查服务的状态。如果发现任何异常状态,它将尝试重新注册服务。observeDelay 是正常情况下检查服务状态的延迟时间,而 retryDelay 是断开连接后尝试注册服务的延迟时间。
默认配置
| 配置名 | 默认值 | 描述 | 
|---|---|---|
WithMaxAttemptTimes(maxAttemptTimes uint) Option | 5 | 用于设置最大尝试次数,如果为 0,则表示无限尝试 | 
WithObserveDelay(observeDelay time.Duration) Option | 30 * time.Second | 用于设置正常连接条件下检查服务状态的延迟时间 | 
WithRetryDelay(t time.Duration) Option | 10 * time.Second | 用于设置断开连接后重试的延迟时间 | 
服务发现
创建 Resolver
提供了两个创建 Resolver 的函数。
NewEtcdResolver
NewEtcdResolver 使用 etcd 创建一个新的服务发现中心,需要传入端点值。可自定义服务发现中心配置,配置详情见 Option。
函数签名:
func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, error)
示例代码:
package main
import (
    ...
    "github.com/cloudwego/kitex/client"
    etcd "github.com/kitex-contrib/registry-etcd"
    ...
)
func main() {
    ...
    r, err := etcd.NewEtcdResolver([]string{"127.0.0.1:2379"})
    if err != nil {
        log.Fatal(err)
    }
    client, err := echo.NewClient("echo", client.WithResolver(r))
    if err != nil {
        log.Fatal(err)
    }
    ...
}
NewEtcdResolverWithAuth
NewEtcdResolverWithAuth 服务发现中心,需要传入 Auth 参数。
函数签名:
func NewEtcdResolverWithAuth(endpoints []string, username, password string) (discovery.Resolver, error)
示例代码:
package main
import (
    ...
	"github.com/cloudwego/kitex/client"
	etcd "github.com/kitex-contrib/registry-etcd"
)
func main() {
	// creates a etcd based resolver with given username and password
	r, err := etcd.NewEtcdResolverWithAuth([]string{"127.0.0.1:2379"}, "username", "password")
	if err != nil {
		log.Fatal(err)
	}
	client := hello.MustNewClient("Hello", client.WithResolver(r))
	for {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
		resp, err := client.Echo(ctx, &api.Request{Message: "Hello"})
		cancel()
		if err != nil {
			log.Fatal(err)
		}
		log.Println(resp)
		time.Sleep(time.Second)
	}
}
Option
Etcd 拓展在服务发现部分中提供了 option 配置。
WithTLSOpt
Etcd 扩展提供了 WithTLSOpt 用于帮助用户配置 Etcd 中的TLS选项。
函数签名:
func WithTLSOpt(certFile, keyFile, caFile string) Option
WithAuthOpt
Etcd 扩展提供了WithAuthOpt用于帮助用户配置 Etcd 中的Username和Password选项。
函数签名:
func WithAuthOpt(username, password string) Option
WithDialTimeoutOpt
Etcd 扩展提供了WithTimeoutOpt用于帮助用户配置连接超时时间。
func WithDialTimeoutOpt(dialTimeout time.Duration) Option
WithEtcdServicePrefix
Etcd 扩展提供了 WithEtcdServicePrefix 来帮助用户配置服务注册键的前缀。
函数签名:
func WithEtcdServicePrefix(prefix string) Option
使用示例
服务端
package main
import (
	"context"
	"log"
	"github.com/cloudwego/kitex-examples/hello/kitex_gen/api"
	"github.com/cloudwego/kitex-examples/hello/kitex_gen/api/hello"
	"github.com/cloudwego/kitex/pkg/rpcinfo"
	"github.com/cloudwego/kitex/server"
	etcd "github.com/kitex-contrib/registry-etcd"
)
type HelloImpl struct{}
func (h *HelloImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
	resp = &api.Response{
		Message: req.Message,
	}
	return
}
func main() {
	r, err := etcd.NewEtcdRegistry([]string{"127.0.0.1:2379"})
	if err != nil {
		log.Fatal(err)
	}
	server := hello.NewServer(new(HelloImpl), server.WithRegistry(r), server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
		ServiceName: "Hello",
	}))
	err = server.Run()
	if err != nil {
		log.Fatal(err)
	}
}
客户端
package main
import (
	"context"
	"log"
	"time"
	"github.com/cloudwego/kitex-examples/hello/kitex_gen/api"
	"github.com/cloudwego/kitex-examples/hello/kitex_gen/api/hello"
	"github.com/cloudwego/kitex/client"
	etcd "github.com/kitex-contrib/registry-etcd"
)
func main() {
	r, err := etcd.NewEtcdResolver([]string{"127.0.0.1:2379"})
	if err != nil {
		log.Fatal(err)
	}
	client := hello.MustNewClient("Hello", client.WithResolver(r))
	for {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
		resp, err := client.Echo(ctx, &api.Request{Message: "Hello"})
		cancel()
		if err != nil {
			log.Fatal(err)
		}
		log.Println(resp)
		time.Sleep(time.Second)
	}
}
实例信息
Etcd 拓展对服务实例的注册信息进行了封装,使用 Kitex 与其它服务互通时,可参考此处进行手动封装。
每个实例的 Key 由 Prefix 与服务元信息组成,格式如下:
{Prefix}/{serviceName}{addr}
默认Prefix为kitex/registry-etcd
实例的 Value 为一个 JSON 字符串,其序列化了一个 instanceInfo 结构体,定义如下:
type instanceInfo struct {
	Network string            `json:"network"`
	Address string            `json:"address"`
	Weight  int               `json:"weight"`
	Tags    map[string]string `json:"tags"`
}
在其它服务中解析 Kitex 服务实例时,可以使用 {Prefix}/{serviceName} 前缀进行查找,获取到指定服务的全部实例,并根据 instanceInfo 信息来选择实例。
配置
可自定义 Etcd 客户端以及服务端的配置,参考 etcd-client 配置。
完整示例
完整用法示例详见 example 。