go-micro,redis stream学习笔记
go-micro
go-micro是一个可插拔的微服务框架,我的demo插的组件是,grpcServer,grpcClient,roundRobin,etcd,其他的组件为默认。
先说一下主要的接口:
Service
Server
Client
Register
Selector
Codec
Broker
Transport
server.go
srv := grpc.NewServer(
server.Name("go.micro.server"),
server.Version("myVersion"),
server.Wait(nil),
server.RegisterTTL(15*time.Second),
server.RegisterInterval(5*time.Second),
grpc.MaxMsgSize(2*1024*1024),
grpc.go
func NewServer(opts ...server.Option) server.Server {
return newGRPCServer(opts...)
func newGRPCServer(opts ...server.Option) server.Server {
options := newOptions(opts...)
srv := &grpcServer{
opts: options,
rpc: &rServer{
serviceMap: make(map[string]*service),
handlers: make(map[string]server.Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
wg: wait(options.Context),
srv.configure()
return srv
func (g *grpcServer) configure(opts ...server.Option) {
maxMsgSize := g.getMaxMsgSize()
gopts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxMsgSize),
grpc.MaxSendMsgSize(maxMsgSize),
grpc.UnknownServiceHandler(g.handler),
if creds := g.getCredentials(); creds != nil {
gopts = append(gopts, grpc.Creds(creds))
if opts := g.getGrpcOptions(); opts != nil {
gopts = append(gopts, opts...)
g.srv = grpc.NewServer(gopts...)
一个options结构体为:
type Options struct {
Codecs map[string]codec.NewCodec
Broker broker.Broker
Registry registry.Registry
Transport transport.Transport
Metadata map[string]string
Name string
Address string
Advertise string
Id string
Version string
HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper
RegisterCheck func(context.Context) error
RegisterTTL time.Duration
RegisterInterval time.Duration
Router Router
Context context.Context
我们传进来的配置函数基本就是直接修改这个结构体,有些还没搞懂是干嘛的,暂不理会。
比较不同的是 grpc.MaxMsgSize(210241024) 它是将值存储在 context 中的,不是所有server组件都有的参数就放到context中。
func MaxMsgSize(s int) server.Option {
return func(o *server.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, maxMsgSizeKey{}, s)
grpc 其实有很多配置,go-micro 提供了3个需要特殊处理的 Codec,AuthTLS,MaxMsgSize,其他的可以通过 grpc.Options 函数直接修改:
func Options(opts ...grpc.ServerOption) server.Option {
return func(o *server.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, grpcOptions{}, opts)
server.go
reg := etcd.NewRegistry(
registry.Addrs("127.0.0.1:2379"),
registry.Timeout(10*time.Second),
道理是一样的,registry.Timeout表示请求超时时间。
func NewRegistry(opts ...registry.Option) registry.Registry {
e := &etcdRegistry{
options: registry.Options{},
register: make(map[string]uint64),
leases: make(map[string]clientv3.LeaseID),
configure(e, opts...)
return e
type Options struct {
Addrs []string
Timeout time.Duration
Secure bool
TLSConfig *tls.Config
Context context.Context
把我们的组件插到框架上。
server.go
service := micro.NewService(
// 调用micro.Registry生成的函数的时候会修改当前Server,Client的配置,要注意它们的顺序,否则就改不到了哦。
micro.Server(srv),
micro.Registry(reg),
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
o.Client.Init(client.Registry(r))
o.Server.Init(server.Registry(r))
接下来就是注册路由了。
server.go
err := proto.RegisterGreeterHandler(service.Server(), &Greeter{})
if err != nil {
fmt.Println(err.Error())
return
稍微跟一下go-micro生成的协议文件就能来到下面两个注册路由的方法。
func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption)
return newRpcHandler(h, opts...)
func newRpcHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
typ := reflect.TypeOf(handler)
hdlr := reflect.ValueOf(handler)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
if e := extractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name
endpoints = append(endpoints, e)
return &rpcHandler{
name: name,
handler: handler,
endpoints: endpoints,
opts: options,
func (g *grpcServer) Handle(h server.Handler) error {
if err := g.rpc.register(h.Handler()); err != nil {
return err
g.handlers[h.Name()] = h
return nil
func (server *rServer) register(rcvr interface{}) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
...
s.name = sname
s.method = make(map[string]*methodType)
for m := 0; m < s.typ.NumMethod(); m++ {
method := s.typ.Method(m)
if mt := prepareEndpoint(method); mt != nil {
s.method[method.Name] = mt
server.serviceMap[s.name] = s
return nil
不往真正的 server 上注册路由,那到时候怎么处理请求呢?
原来初始化 goolge/grpc 的 server 的时候配置了 grpc.UnknownServiceHandler(g.handler),
所有请求都会路由到这个UnknownHandler,这个handler再处理路由,这就实现了自己的路由器,相当与只把 goolge/grpc 当做一个传输层。
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
fullMethod, ok := grpc.MethodFromServerStream(stream)
serviceName, methodName, err := mgrpc.ServiceMethod(fullMethod)
gmd, ok := metadata.FromIncomingContext(stream.Context())
md := meta.Metadata{}
for k, v := range gmd {
md[k] = strings.Join(v, ", ")
to := md["timeout"]
ct := defaultContentType
if ctype, ok := md["x-content-type"]; ok {
ct = ctype
delete(md, "x-content-type")
delete(md, "timeout")
ctx := meta.NewContext(stream.Context(), md)
if p, ok := peer.FromContext(stream.Context()); ok {
md["Remote"] = p.Addr.String()
ctx = peer.NewContext(ctx, p)
if len(to) > 0 {
if n, err := strconv.ParseUint(to, 10, 64); err == nil {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(n))
defer cancel()
if g.opts.Router != nil {
g.rpc.mu.Lock()
service := g.rpc.serviceMap[serviceName]
g.rpc.mu.Unlock()
if service == nil {
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
mtype := service.method[methodName]
if mtype == nil {
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s.%s", serviceName, methodName)).Err()
if !mtype.stream {
return g.processRequest(stream, service, mtype, ct, ctx)
return g.processStream(stream, service, mtype, ct, ctx)
只看处理非流式方法,处理流式的比较简单:
func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context) error {
for {
var argv, replyv reflect.Value
argIsValue := false
if mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType.Elem())
} else {
argv = reflect.New(mtype.ArgType)
argIsValue = true
if err := stream.RecvMsg(argv.Interface()); err != nil {
return err
replyv = reflect.New(mtype.ReplyType.Elem())
function := mtype.method.Func
var returnValues []reflect.Value
cc, err := g.newGRPCCodec(ct)
b, err := cc.Marshal(argv.Interface())
r := &rpcRequest{
service: g.opts.Name,
contentType: ct,
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
body: b,
payload: argv.Interface(),
fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
defer func() {
if r := recover(); r != nil {
...
returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})
if rerr := returnValues[0].Interface(); rerr != nil {
err = rerr.(error)
return err
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
fn = g.opts.HdlrWrappers[i-1](fn)
statusCode := codes.OK
statusDesc := ""
if appErr := fn(ctx, r, replyv.Interface()); appErr != nil {
...
if err := stream.SendMsg(replyv.Interface()); err != nil {
return err
return status.New(statusCode, statusDesc).Err()
看完了怎么处理请求的,接下来我们的server要运行啦。
server.go
err = service.Run()
if err != nil {
fmt.Println(err.Error())
return
func (s *service) Run() error {
...
if err := s.Start(); err != nil {
return err
select {
case <-ch:
case <-s.opts.Context.Done():
return s.Stop()
func (g *grpcServer) Start() error {
ts, err := net.Listen("tcp", config.Address)
g.opts.Address = ts.Addr().String()
if err := config.Broker.Connect(); err != nil {
return err
if err := g.Register(); err != nil {
log.Log("Server register error: ", err)
go func() {
if err := g.srv.Serve(ts); err != nil {
log.Log("gRPC Server start error: ", err)
go func() {
t := new(time.Ticker)
if g.opts.RegisterInterval > time.Duration(0) {
t = time.NewTicker(g.opts.RegisterInterval)
var ch chan error
Loop:
for {
select {
case <-t.C:
if err := g.Register(); err != nil {
log.Log("Server register error: ", err)
case ch = <-g.exit:
break Loop
if err := g.Deregister(); err != nil {
log.Log("Server deregister error: ", err)
if g.wg != nil {
g.wg.Wait()
g.srv.GracefulStop()
return nil
func (g *grpcServer) Stop() error {
ch := make(chan error)
g.exit <- ch
return err
如果不保活键会在15s后过期(我们的配置是这样的),每隔5s保活一次是因为之前的组长跟我说3次是真理!
client.go
// 服务配置
sel := selector.NewSelector(selector.SetStrategy(selector.RoundRobin))
cli := grpc.NewClient(
client.Selector(sel),
client.ContentType("application/json"), // 改为json编码方式
client.PoolSize(200),
client.PoolTTL(2*time.Minute),
client.RequestTimeout(10*time.Second),
client.Retries(3),
grpc.MaxSendMsgSize(2*1024*1024),
grpc.MaxRecvMsgSize(2*1024*1024),
reg := etcd.NewRegistry(registry.Timeout(5*time.Second))
service := micro.NewService(
micro.Client(cli),
micro.Registry(reg),
// greeter1
greeter := proto.NewGreeterService("go.micro.server", service.Client())
rsp, _ := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
fmt.Println(rsp.Greeting)
// greeter2
req := cli.NewRequest("go.micro.server", "Greeter.Hello", &proto.HelloRequest{Name: "Jack"})
rsp1 := new(proto.HelloResponse)
_ = cli.Call(context.TODO(), req, rsp1)
fmt.Println(rsp1.Greeting)
服务配置的就不废话了,client初始化时就会建立连接池。
只要稍微分析一下go-micro生成的协议文件就会发现 greeter1 和 greeter2 的调用方式是等价的。
func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request {
var opts client.RequestOptions
for _, o := range reqOpts {
o(&opts)
if len(opts.ContentType) > 0 {
contentType = opts.ContentType
return &grpcRequest{
service: service,
method: method,
request: request,
contentType: contentType,
opts: opts,
生成的req代表我们需要 go.micro.server 服务(这个就是server配置的Name),需要请求这个服务上的 Greeter.Hello 路由(还记得ServiceHandler吗)。
HelloRequest,HelloResponse为协议文件的类。
只截取了部分cli.Call()的代码
{......
call := func(i int) error {
......
next, err := g.next(req, callOpts)
......
node, err := next()
service := req.Service()
......
err = gcall(ctx, node, req, rsp, callOpts)
g.opts.Selector.Mark(service, node, err)
return err
......
for i := 0; i <= callOpts.Retries; i++ {
go func(i int) {
ch <- call(i)
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
case err := <-ch:
if err == nil {
return nil
retry, rerr := callOpts.Retry(ctx, req, i, err)
if rerr != nil {
return rerr
if !retry {
return err
gerr = err
return gerr
selector是怎么选择node的呢?
func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
services, err := c.rc.GetService(service)
for _, filter := range sopts.Filters {
services = filter(services)
return sopts.Strategy(services), nil
func (c *cache) get(service string) ([]*registry.Service, error) {
services := c.cache[service]
ttl := c.ttls[service]
cp := registry.Copy(services)
if c.isValid(cp, ttl) {
return cp, nil
get := func(service string, cached []*registry.Service) ([]*registry.Service, error) {
services, err := c.Registry.GetService(service)
......
c.set(service, registry.Copy(services))
return services, nil
.....
return get(service, cp)
func RoundRobin(services []*registry.Service) Next {
......
var i = rand.Int()
var mtx sync.Mutex
return func() (*registry.Node, error) {
mtx.Lock()
node := nodes[i%len(nodes)]
mtx.Unlock()
return node, nil
继续看,gcall就是g.call:
func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
address := node.Address
header := make(map[string]string)
if md, ok := metadata.FromContext(ctx); ok {
for k, v := range md {
header[k] = v
header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
header["x-content-type"] = req.ContentType()
md := gmetadata.New(header)
ctx = gmetadata.NewOutgoingContext(ctx, md)
cf, err := g.newGRPCCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
maxRecvMsgSize := g.maxRecvMsgSizeValue()
maxSendMsgSize := g.maxSendMsgSizeValue()
var grr error
grpcDialOptions := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)),
grpc.WithTimeout(opts.DialTimeout),
g.secure(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
if opts := g.getGrpcDialOptions(); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
cc, err := g.pool.getConn(address, grpcDialOptions...)
if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
defer func() {
g.pool.release(address, cc, grr)
ch := make(chan error, 1)
go func() {
grpcCallOptions := []grpc.CallOption{grpc.CallContentSubtype(cf.Name())}
if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpcCallOptions...)
ch <- microError(err)
select {
case err := <-ch:
grr = err
case <-ctx.Done():
grr = ctx.Err()
return grr
到这里一个client和server的请求响应就完成了。
rpc 流请求的处理可以参考这里。
看完代码后最大的问题就是handler编解码那里,还有很多内容比如 stream,wrap,grpc,反射 需要学习!
go-micro 主要的作用是 可插拔(实现了大量插件),服务注册与发现,服务缓存,轻量级负载均衡(只针对一次Call !),连接池,至于相对独立的 broker,只是锦上添花。
redis stream
redis stream用字典树来存储数据,先简单介绍一下字典树:
字典树查找,插入,删除一个字符串的时间复杂度都为O(m),m为字符串长度。
字典树用空间换时间,一个支持大小写字母和数字的字典树需要的空间为O(26n+26n+10n),n为节点个数。
字典树也支持高效的范围查询,比如我想找 romanus - ruber 之间的单词,第一步以O(m)找到 romanus,第二步从 romanus 节点开始遍历直到大于等于 ruber 的节点为止。时间复杂度为O(m*k),m为单词平均长度,k为范围内的个数,这个复杂度虽然与挨个查找一样,但是常数应该小得多,因为遍历时查找的节点数量肯定比每次从根节点开始找少得多。
stream 用一颗只支持数字的字典树存储数据,字典树中的单词形如 1576820296133-0, 前面代表时间戳,后面代表这个时间戳内生成的第几个单词,单词对应的值就是key-value对,比如 jack hello。
一个简化的stream结构体为:
typedef struct stream {
rax *rax;
uint64_t length;
streamID last_id;
rax *cgroups;
} stream;
知道了这些,我们就可以随心所欲了,比如查找某个单词 1576820296133-0,查找某个范围内的所有单词
1576820296132-0 — 1576820296140-0。
一个消费者组可以看成按字典树上所有单词的队列,每个消费者组都有一个last_delivered_id记录消费到了哪个单词,只要有一个消费者读了消息,这个id就会增加,其他消费者就读不到了。
那如果读走消息的消费者发生了意外不能处理消息了怎么办?原来redis会记录这个消费者读走了哪些消息到pending_ids,只有消费者ack才会把消息从这里删除。redis是不支持消息重新分发的,赶紧把发生了意外的消费者起起来干活吧!
读取消费者组要跟其他消费者竞争,我想独占一个消息队列怎么办?xread!但是你要自己记住消费到了哪个单词哦。
现在我知道了怎么向stream添加值,知道了stream支持哪些高效的查询,但是每次都要我主动,它就不能主动一点吗?至少新增了什么单词要跟我说吧?xread是支持阻塞的!可以用 for + xread 实现持续推送:
for {
res := xread()
redis只支持物理删除最小的那部分单词,不支持随机物理删除。
xdel只是把这个单词标记为删除,物理上并没有被删除,xadd时可以带上maxlen参数让redis把比较老的单词删除,最多只保留maxlen个。
参考资料:
1.挑战Kafka!Redis5.0重量级特性Stream尝鲜