package comm import ( "context" "encoding/json" "errors" "fmt" "os" "os/signal" "sync" "syscall" "time" "github.com/nats-io/nats.go" ) const ( ConnStateDisconned = 1 ConnStateReconned = 2 ) // NATSQueue queue for work type NatsBus struct { mu sync.Mutex nc *nats.Conn reconnChan chan int streamWacther []*NatsStreamWather topicWatcher []*NatsTopicWather replyers []*NatsMsgReplyer runtimeStartStreamWatcher StartStreamWatcherHandle } type StartStreamWatcherHandle func(conf *NatsStreamWather) error type NatsStreamWather struct { Stream string Topic string Queue string AckWaitMinute int64 AckWaitMinuteFloat float64 Entity CreateEnity Cb WatcherCallback Cb2 WatcherCallback2 Exported bool //在其他部署包内,是否可以被调用 } // 消息回复者 type NatsMsgReplyer struct { Subject string Entity CreateEnity Exported bool //在其他部署包内,是否可以被调用 Timeout time.Duration //调用的最大时间 Cb ReplyerHandle Cb2 ReplyerHandle2 } type NatsTopicWather struct { Topic string Cb WatcherCallback } type Stream func(streamName string, topic string, group string) type ListenTopic func(topic string) type WatcherCallback func(msg *nats.Msg, entity interface{}) type WatcherCallback2 func(msg *nats.Msg) type ReplyerHandle func(msg *nats.Msg, entity interface{}) interface{} type ReplyerHandle2 func(msg *nats.Msg, entity interface{}) (interface{}, error) type ReplyerHandle3 func(msg *nats.Msg) (interface{}, error) type CreateEnity func() interface{} type PackMessageCallback func(entity interface{}) (*RespPackMessage, error) type RespPackMessage struct { CanAck bool } // 请求数据 type NatsResponse struct { ErrorNo int ErrorDesc string Result string //json字符串 } type RequestOptions struct { Timeout time.Duration PubOpts []nats.PubOpt DeployPack string } type ReqPayload struct { Payload interface{} DeployPack string } type ReqPackApi struct { Payload interface{} Subject string Timeout time.Duration } // req params type ReqStreamApi struct { Subject string Payload interface{} PubOpts []nats.PubOpt } type CancelSubscribe func() type SubscribeCallback func(obj interface{}, msg *nats.Msg) interface{} type SubOption struct { Sub string Obj func() interface{} Call SubscribeCallback } func (q *NatsBus) Subscribe(topic string, msg nats.MsgHandler) (*nats.Subscription, error) { return q.nc.Subscribe(topic, msg) } func (q *NatsBus) SubscribeOnce(Option *SubOption) (CancelSubscribe, error) { var sub *nats.Subscription var cancel = func() { if sub != nil { sub.Unsubscribe() sub = nil } } s, err := q.nc.Subscribe(Option.Sub, func(msg *nats.Msg) { var p interface{} if Option.Obj != nil { p = Option.Obj() err := json.Unmarshal(msg.Data, p) if err != nil { msg.Term() fmt.Println(Option.Sub, " payload parse msg err", err) return } } cancel() out := Option.Call(p, msg) if out != nil { payload, _ := json.Marshal(out) msg.Respond(payload) return } }) if err != nil { return nil, err } sub = s return cancel, nil } func (q *NatsBus) QueueSubscribe(Option *SubOption) (CancelSubscribe, error) { var sub *nats.Subscription var cancel = func() { if sub != nil { sub.Unsubscribe() sub = nil } } s, err := q.nc.QueueSubscribe(Option.Sub, Option.Sub+".queue", func(msg *nats.Msg) { var p interface{} if Option.Obj != nil { p = Option.Obj() err := json.Unmarshal(msg.Data, p) if err != nil { msg.Term() fmt.Println(Option.Sub, " payload parse msg err", err) return } } out := Option.Call(p, msg) if out != nil { payload, _ := json.Marshal(out) msg.Respond(payload) return } }) if err != nil { return nil, err } sub = s return cancel, nil } func (q *NatsBus) Publish(topic string, data []byte) error { return q.nc.Publish(topic, data) } func (q *NatsBus) PublishObj(topic string, obj interface{}) error { data, _ := json.Marshal(obj) return q.nc.Publish(topic, data) } func (q *NatsBus) AddReplyers(w ...*NatsMsgReplyer) { q.replyers = append(q.replyers, w...) } func (q *NatsBus) AddStreamWacher(w ...*NatsStreamWather) { q.streamWacther = append(q.streamWacther, w...) } // 请求代理api func (q *NatsBus) RequestPackApi(subject string, Payload interface{}, out interface{}, options *RequestOptions) error { pack := "comm" timeout := time.Second * 5 if options != nil { if len(options.DeployPack) > 0 { pack = options.DeployPack } if options.Timeout > 0 { timeout = options.Timeout } } payload, _ := json.Marshal(&ReqPackApi{Payload: Payload, Subject: subject, Timeout: timeout}) msg, err := q.nc.Request(fmt.Sprintf("%s.%s", pack, PackPublicApi), payload, timeout) if err != nil { return err } result := &NatsResponse{} err = json.Unmarshal(msg.Data, result) if err != nil { return err } if result.ErrorNo != 200 { return errors.New(result.ErrorDesc) } if out != nil { return json.Unmarshal([]byte(result.Result), out) } return nil } // 请求本地api func (q *NatsBus) RequestApi(subject string, data interface{}, timeout time.Duration, out interface{}) error { // Lock so only one goroutine at a time can access the map c.v. q.mu.Lock() defer q.mu.Unlock() payload := []byte{} if data != nil { payload, _ = json.Marshal(data) } msg, err := q.nc.Request(subject, payload, timeout) if err != nil { return err } result := &NatsResponse{} err = json.Unmarshal(msg.Data, result) if err != nil { return err } if result.ErrorNo != 200 { return errors.New(result.ErrorDesc) } if out != nil { return json.Unmarshal([]byte(result.Result), out) } return nil } func (q *NatsBus) Request(subject string, data interface{}, timeout time.Duration, out interface{}) error { payload := []byte{} if data != nil { payload, _ = json.Marshal(data) } msg, err := q.nc.Request(subject, payload, timeout) if err != nil { return err } return json.Unmarshal(msg.Data, out) } // 退出 func (q *NatsBus) Quit() { q.nc.Drain() q.nc.Close() } func NewNatsBus2(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers []*NatsMsgReplyer) (*NatsBus, error) { bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther) if err != nil { return nil, err } bus.replyers = replyers return bus, nil } // 可变参数 func NewNatsBus3(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers ...*NatsMsgReplyer) (*NatsBus, error) { bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther) if err != nil { return nil, err } bus.replyers = replyers return bus, nil } func NewNatsBus(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather) (*NatsBus, error) { url := natsUri if len(url) < 1 { url = nats.DefaultURL } fmt.Println("conning to nats ====> ", url) bus := &NatsBus{ reconnChan: make(chan int), streamWacther: streamWacther, topicWatcher: []*NatsTopicWather{}, } nc, err := nats.Connect(url, // nats.Name("render1"), nats.MaxReconnects(MaxReconnect), nats.ReconnectWait(time.Duration(ReconnDelaySecond*int(time.Second))), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { fmt.Printf("%s Got disconnected! Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), err) bus.reconnChan <- ConnStateDisconned }), nats.ReconnectHandler(func(nc *nats.Conn) { fmt.Printf("%s Got reconnected to %v!\n", time.Now().Format("2006-01-02 15:04:05"), nc.ConnectedUrl()) bus.reconnChan <- ConnStateReconned }), nats.ClosedHandler(func(nc *nats.Conn) { tip := fmt.Sprintf("%s Connection closed. Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), nc.LastError()) fmt.Println(tip) // panic(tip) }), ) if err != nil { fmt.Println("conn to nats failed ====> ", url, err) return nil, err } bus.nc = nc fmt.Printf("%s nats bus Connected: %s\n", time.Now().Format("2006-01-02 15:04:05"), url) return bus, nil } // 创建jetStream func (q *NatsBus) JetStream() (nats.JetStreamContext, error) { return q.nc.JetStream() } // 创建jetStream func (q *NatsBus) CreateStream(streamName string, topic string) (nats.JetStreamContext, error) { js, err := q.nc.JetStream() if err != nil { return nil, err } stream, _ := js.StreamInfo(streamName) if stream != nil { return js, nil } _, err = js.AddStream(&nats.StreamConfig{ Name: streamName, Subjects: []string{topic}, Retention: nats.WorkQueuePolicy, }) if err != nil { fmt.Println("bus CreateStream err=>", err.Error()) return nil, err } return js, nil } // 监听流数据 func (q *NatsBus) WatchStream(ctxTerm, ctxConn context.Context, conf *NatsStreamWather) error { streamName := conf.Stream topic := conf.Topic queue := conf.Queue fmt.Println("Watching Stream ", conf.Stream, conf.Topic, conf.Queue) //创建对应的jet stream 以便模型创建消息不回丢失 js, err := q.CreateStream(streamName, topic) if err != nil { fmt.Println("WatchStream error=>", err.Error()) return err } opts := []nats.SubOpt{nats.BindStream(streamName)} //nats.MaxDeliver(3) if conf.AckWaitMinute > 0 { opts = append(opts, nats.AckWait(time.Duration(conf.AckWaitMinute*int64(time.Minute)))) } if conf.AckWaitMinuteFloat > 0 { opts = append(opts, nats.AckWait(time.Duration(int64(conf.AckWaitMinuteFloat*float64(time.Minute))))) } sub, err := js.PullSubscribe(topic, queue, opts...) if err != nil { fmt.Println("WatchStream QueueSubscribeSync err ", conf.Stream, conf.Topic, err) return err } fmt.Println("Watching Stream succ=>", conf.Stream, conf.Topic, conf.Queue) var currHandingMsg *nats.Msg = nil for { select { case <-ctxTerm.Done(): fmt.Println("Watching Stream Termed", streamName, topic) if currHandingMsg != nil { err := currHandingMsg.Nak() fmt.Println("terminate currMsg ", err) } //中断链接 q.Quit() return nil case <-ctxConn.Done(): fmt.Println("Watching Stream Conn closed", streamName, topic) if currHandingMsg != nil { err := currHandingMsg.Nak() fmt.Println("terminate currMsg ", err) } return nil default: currHandingMsg = nil msgs, err := sub.Fetch(1) if err == nil && len(msgs) > 0 { msg := msgs[0] currHandingMsg = msg meta, _ := msg.Metadata() fmt.Println("meta for ", meta.Stream, "#", topic, "#", meta.Consumer, "#") fmt.Println("meta NumDelivered", meta.NumDelivered, "NumPending ", meta.NumPending) fmt.Println("meta Sequence.Consumer", meta.Sequence.Consumer, "Sequence.Stream ", meta.Sequence.Stream) if conf.Cb2 != nil { conf.Cb2(msg) } else if conf.Cb != nil { var req interface{} if conf.Entity != nil { req = conf.Entity() } err = json.Unmarshal(msg.Data, req) if err != nil { msg.Term() fmt.Println("work msg err=>", err.Error()) continue } conf.Cb(msg, req) } else { fmt.Println("error no stream wather callback!!!!") } currHandingMsg = nil } } } } func (q *NatsBus) GetNatsConn() *nats.Conn { return q.nc } func (q *NatsBus) Run(cb func()) { //ctx, cancel := context.WithCancel(context.Background()) //ctx, stop := context.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) ctxTerm, cancelTerm := context.WithCancel(context.Background()) ctxConn, cancelCnn := context.WithCancel(context.Background()) defer cancelTerm() defer cancelCnn() go func() { exit := make(chan os.Signal, 1) signal.Notify(exit, os.Interrupt, syscall.SIGTERM) <-exit cancelTerm() time.Sleep(1 * time.Second) os.Exit(1) }() var QueueSubscribe = func(rplyer *NatsMsgReplyer) { r1 := rplyer queue := fmt.Sprintf("%s.queue", r1.Subject) q.nc.QueueSubscribe(r1.Subject, queue, func(msg *nats.Msg) { defer func() { if err := recover(); err != nil { fmt.Println("catch error ", err) } }() fmt.Println("replay for ", r1.Subject) var req interface{} if r1.Entity != nil { req = r1.Entity() err := json.Unmarshal(msg.Data, req) if err != nil { msg.Term() fmt.Println("rplyer work msg err", err.Error()) return } } if r1.Cb != nil { out := r1.Cb(msg, req) if out != nil { payload, err := json.Marshal(out) if err != nil { fmt.Println("error reply for", r1.Subject, err) } err = msg.Respond(payload) if err != nil { fmt.Println("error reply for", r1.Subject, out) } } return } if r1.Cb2 != nil { out, err := r1.Cb2(msg, req) resp := &NatsResponse{ErrorNo: 200} if err != nil { fmt.Println("error reply2 for", r1.Subject, err) resp.ErrorDesc = err.Error() resp.ErrorNo = 400 } else { payload, _ := json.Marshal(out) resp.Result = string(payload) } payload, _ := json.Marshal(resp) err = msg.Respond(payload) if err != nil { fmt.Println("error reply2 for", r1.Subject, out) } return } }) } var startSubscribe = func() { //启动所有watcher for _, item := range q.streamWacther { go q.WatchStream(ctxTerm, ctxConn, item) } for _, rplyer := range q.replyers { QueueSubscribe(rplyer) } } q.runtimeStartStreamWatcher = func(conf *NatsStreamWather) error { go q.WatchStream(ctxTerm, ctxConn, conf) return nil } startSubscribe() if cb != nil { cb() } for { select { case state := <-q.reconnChan: if state == ConnStateDisconned { cancelCnn() } else if state == ConnStateReconned { ctxConn, cancelCnn = context.WithCancel(context.Background()) startSubscribe() } } } }