|
- package comm
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "time"
- "github.com/nats-io/nats.go"
- )
- const (
- ConnStateDisconned = 1
- ConnStateReconned = 2
- )
- // NATSQueue queue for work
- type NatsBus struct {
- mu sync.Mutex
- nc *nats.Conn
- reconnChan chan int
- streamWacther []*NatsStreamWather
- topicWatcher []*NatsTopicWather
- replyers []*NatsMsgReplyer
- runtimeStartStreamWatcher StartStreamWatcherHandle
- }
- type StartStreamWatcherHandle func(conf *NatsStreamWather) error
- type NatsStreamWather struct {
- Stream string
- Topic string
- Queue string
- AckWaitMinute int64
- AckWaitMinuteFloat float64
- Entity CreateEnity
- Cb WatcherCallback
- Cb2 WatcherCallback2
- Exported bool //在其他部署包内,是否可以被调用
- }
- // 消息回复者
- type NatsMsgReplyer struct {
- Subject string
- Entity CreateEnity
- Exported bool //在其他部署包内,是否可以被调用
- Timeout time.Duration //调用的最大时间
- Cb ReplyerHandle
- Cb2 ReplyerHandle2
- }
- type NatsTopicWather struct {
- Topic string
- Cb WatcherCallback
- }
- type Stream func(streamName string, topic string, group string)
- type ListenTopic func(topic string)
- type WatcherCallback func(msg *nats.Msg, entity interface{})
- type WatcherCallback2 func(msg *nats.Msg)
- type ReplyerHandle func(msg *nats.Msg, entity interface{}) interface{}
- type ReplyerHandle2 func(msg *nats.Msg, entity interface{}) (interface{}, error)
- type ReplyerHandle3 func(msg *nats.Msg) (interface{}, error)
- type CreateEnity func() interface{}
- type PackMessageCallback func(entity interface{}) (*RespPackMessage, error)
- type RespPackMessage struct {
- CanAck bool
- }
- // 请求数据
- type NatsResponse struct {
- ErrorNo int
- ErrorDesc string
- Result string //json字符串
- }
- type RequestOptions struct {
- Timeout time.Duration
- PubOpts []nats.PubOpt
- DeployPack string
- }
- type ReqPayload struct {
- Payload interface{}
- DeployPack string
- }
- type ReqPackApi struct {
- Payload interface{}
- Subject string
- Timeout time.Duration
- }
- // req params
- type ReqStreamApi struct {
- Subject string
- Payload interface{}
- PubOpts []nats.PubOpt
- }
- type CancelSubscribe func()
- type SubscribeCallback func(obj interface{}, msg *nats.Msg) interface{}
- type SubOption struct {
- Sub string
- Obj func() interface{}
- Call SubscribeCallback
- }
- func (q *NatsBus) Subscribe(topic string, msg nats.MsgHandler) (*nats.Subscription, error) {
- return q.nc.Subscribe(topic, msg)
- }
- func (q *NatsBus) SubscribeOnce(Option *SubOption) (CancelSubscribe, error) {
- var sub *nats.Subscription
- var cancel = func() {
- if sub != nil {
- sub.Unsubscribe()
- sub = nil
- }
- }
- s, err := q.nc.Subscribe(Option.Sub, func(msg *nats.Msg) {
- var p interface{}
- if Option.Obj != nil {
- p = Option.Obj()
- err := json.Unmarshal(msg.Data, p)
- if err != nil {
- msg.Term()
- fmt.Println(Option.Sub, " payload parse msg err", err)
- return
- }
- }
- cancel()
- out := Option.Call(p, msg)
- if out != nil {
- payload, _ := json.Marshal(out)
- msg.Respond(payload)
- return
- }
- })
- if err != nil {
- return nil, err
- }
- sub = s
- return cancel, nil
- }
- func (q *NatsBus) QueueSubscribe(Option *SubOption) (CancelSubscribe, error) {
- var sub *nats.Subscription
- var cancel = func() {
- if sub != nil {
- sub.Unsubscribe()
- sub = nil
- }
- }
- s, err := q.nc.QueueSubscribe(Option.Sub, Option.Sub+".queue", func(msg *nats.Msg) {
- var p interface{}
- if Option.Obj != nil {
- p = Option.Obj()
- err := json.Unmarshal(msg.Data, p)
- if err != nil {
- msg.Term()
- fmt.Println(Option.Sub, " payload parse msg err", err)
- return
- }
- }
- out := Option.Call(p, msg)
- if out != nil {
- payload, _ := json.Marshal(out)
- msg.Respond(payload)
- return
- }
- })
- if err != nil {
- return nil, err
- }
- sub = s
- return cancel, nil
- }
- func (q *NatsBus) Publish(topic string, data []byte) error {
- return q.nc.Publish(topic, data)
- }
- func (q *NatsBus) PublishObj(topic string, obj interface{}) error {
- data, _ := json.Marshal(obj)
- return q.nc.Publish(topic, data)
- }
- func (q *NatsBus) AddReplyers(w ...*NatsMsgReplyer) {
- q.replyers = append(q.replyers, w...)
- }
- func (q *NatsBus) AddStreamWacher(w ...*NatsStreamWather) {
- q.streamWacther = append(q.streamWacther, w...)
- }
- // 请求代理api
- func (q *NatsBus) RequestPackApi(subject string, Payload interface{}, out interface{}, options *RequestOptions) error {
- pack := "comm"
- timeout := time.Second * 5
- if options != nil {
- if len(options.DeployPack) > 0 {
- pack = options.DeployPack
- }
- if options.Timeout > 0 {
- timeout = options.Timeout
- }
- }
- payload, _ := json.Marshal(&ReqPackApi{Payload: Payload, Subject: subject, Timeout: timeout})
- msg, err := q.nc.Request(fmt.Sprintf("%s.%s", pack, PackPublicApi), payload, timeout)
- if err != nil {
- return err
- }
- result := &NatsResponse{}
- err = json.Unmarshal(msg.Data, result)
- if err != nil {
- return err
- }
- if result.ErrorNo != 200 {
- return errors.New(result.ErrorDesc)
- }
- if out != nil {
- return json.Unmarshal([]byte(result.Result), out)
- }
- return nil
- }
- // 请求本地api
- func (q *NatsBus) RequestApi(subject string, data interface{}, timeout time.Duration, out interface{}) error {
- // Lock so only one goroutine at a time can access the map c.v.
- q.mu.Lock()
- defer q.mu.Unlock()
- payload := []byte{}
- if data != nil {
- payload, _ = json.Marshal(data)
- }
- msg, err := q.nc.Request(subject, payload, timeout)
- if err != nil {
- return err
- }
- result := &NatsResponse{}
- err = json.Unmarshal(msg.Data, result)
- if err != nil {
- return err
- }
- if result.ErrorNo != 200 {
- return errors.New(result.ErrorDesc)
- }
- if out != nil {
- return json.Unmarshal([]byte(result.Result), out)
- }
- return nil
- }
- func (q *NatsBus) Request(subject string, data interface{}, timeout time.Duration, out interface{}) error {
- payload := []byte{}
- if data != nil {
- payload, _ = json.Marshal(data)
- }
- msg, err := q.nc.Request(subject, payload, timeout)
- if err != nil {
- return err
- }
- if out != nil {
- return json.Unmarshal(msg.Data, out)
- }
- return nil
- }
- // 退出
- func (q *NatsBus) Quit() {
- q.nc.Drain()
- q.nc.Close()
- }
- func NewNatsBus2(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers []*NatsMsgReplyer) (*NatsBus, error) {
- bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
- if err != nil {
- return nil, err
- }
- bus.replyers = replyers
- return bus, nil
- }
- // 可变参数
- func NewNatsBus3(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers ...*NatsMsgReplyer) (*NatsBus, error) {
- bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
- if err != nil {
- return nil, err
- }
- bus.replyers = replyers
- return bus, nil
- }
- func NewNatsBusWithName(name string, natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather) (*NatsBus, error) {
- url := natsUri
- if len(url) < 1 {
- url = nats.DefaultURL
- }
- fmt.Println("conning to nats ====> ", url)
- bus := &NatsBus{
- reconnChan: make(chan int),
- streamWacther: streamWacther,
- topicWatcher: []*NatsTopicWather{},
- }
- nc, err := nats.Connect(url,
- nats.Name(name),
- nats.MaxReconnects(MaxReconnect),
- nats.ReconnectWait(time.Duration(ReconnDelaySecond*int(time.Second))),
- nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
- fmt.Printf("%s Got disconnected! Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), err)
- bus.reconnChan <- ConnStateDisconned
- }),
- nats.ReconnectHandler(func(nc *nats.Conn) {
- fmt.Printf("%s Got reconnected to %v!\n", time.Now().Format("2006-01-02 15:04:05"), nc.ConnectedUrl())
- bus.reconnChan <- ConnStateReconned
- }),
- nats.ClosedHandler(func(nc *nats.Conn) {
- tip := fmt.Sprintf("%s Connection closed. Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), nc.LastError())
- fmt.Println(tip)
- // panic(tip)
- }),
- )
- if err != nil {
- fmt.Println("conn to nats failed ====> ", url, err)
- return nil, err
- }
- bus.nc = nc
- fmt.Printf("%s nats bus Connected: %s\n", time.Now().Format("2006-01-02 15:04:05"), url)
- return bus, nil
- }
- // 可变参数
- func NewNatsBus4(name string, natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers ...*NatsMsgReplyer) (*NatsBus, error) {
- bus, err := NewNatsBusWithName(name, natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
- if err != nil {
- return nil, err
- }
- bus.replyers = replyers
- return bus, nil
- }
- func NewNatsBus(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather) (*NatsBus, error) {
- url := natsUri
- if len(url) < 1 {
- url = nats.DefaultURL
- }
- fmt.Println("conning to nats ====> ", url)
- bus := &NatsBus{
- reconnChan: make(chan int),
- streamWacther: streamWacther,
- topicWatcher: []*NatsTopicWather{},
- }
- nc, err := nats.Connect(url,
- // nats.Name("render1"),
- nats.MaxReconnects(MaxReconnect),
- nats.ReconnectWait(time.Duration(ReconnDelaySecond*int(time.Second))),
- nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
- fmt.Printf("%s Got disconnected! Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), err)
- bus.reconnChan <- ConnStateDisconned
- }),
- nats.ReconnectHandler(func(nc *nats.Conn) {
- fmt.Printf("%s Got reconnected to %v!\n", time.Now().Format("2006-01-02 15:04:05"), nc.ConnectedUrl())
- bus.reconnChan <- ConnStateReconned
- }),
- nats.ClosedHandler(func(nc *nats.Conn) {
- tip := fmt.Sprintf("%s Connection closed. Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), nc.LastError())
- fmt.Println(tip)
- // panic(tip)
- }),
- )
- if err != nil {
- fmt.Println("conn to nats failed ====> ", url, err)
- return nil, err
- }
- bus.nc = nc
- fmt.Printf("%s nats bus Connected: %s\n", time.Now().Format("2006-01-02 15:04:05"), url)
- return bus, nil
- }
- // 创建jetStream
- func (q *NatsBus) JetStream() (nats.JetStreamContext, error) {
- return q.nc.JetStream()
- }
- // 创建jetStream
- func (q *NatsBus) CreateStream(streamName string, topic string) (nats.JetStreamContext, error) {
- js, err := q.nc.JetStream()
- if err != nil {
- return nil, err
- }
- stream, _ := js.StreamInfo(streamName)
- if stream != nil {
- return js, nil
- }
- _, err = js.AddStream(&nats.StreamConfig{
- Name: streamName,
- Subjects: []string{topic},
- Retention: nats.WorkQueuePolicy,
- })
- if err != nil {
- fmt.Println("bus CreateStream err=>", err.Error())
- return nil, err
- }
- return js, nil
- }
- // 监听流数据
- func (q *NatsBus) WatchStream(ctxTerm, ctxConn context.Context, conf *NatsStreamWather) error {
- streamName := conf.Stream
- topic := conf.Topic
- queue := conf.Queue
- fmt.Println("Watching Stream ", conf.Stream, conf.Topic, conf.Queue)
- //创建对应的jet stream 以便模型创建消息不回丢失
- js, err := q.CreateStream(streamName, topic)
- if err != nil {
- fmt.Println("WatchStream error=>", err.Error())
- return err
- }
- opts := []nats.SubOpt{nats.BindStream(streamName)} //nats.MaxDeliver(3)
- if conf.AckWaitMinute > 0 {
- opts = append(opts, nats.AckWait(time.Duration(conf.AckWaitMinute*int64(time.Minute))))
- }
- if conf.AckWaitMinuteFloat > 0 {
- opts = append(opts, nats.AckWait(time.Duration(int64(conf.AckWaitMinuteFloat*float64(time.Minute)))))
- }
- sub, err := js.PullSubscribe(topic, queue, opts...)
- if err != nil {
- fmt.Println("WatchStream QueueSubscribeSync err ", conf.Stream, conf.Topic, err)
- return err
- }
- fmt.Println("Watching Stream succ=>", conf.Stream, conf.Topic, conf.Queue)
- var currHandingMsg *nats.Msg = nil
- for {
- select {
- case <-ctxTerm.Done():
- fmt.Println("Watching Stream Termed", streamName, topic)
- if currHandingMsg != nil {
- err := currHandingMsg.Nak()
- fmt.Println("terminate currMsg ", err)
- }
- //中断链接
- q.Quit()
- return nil
- case <-ctxConn.Done():
- fmt.Println("Watching Stream Conn closed", streamName, topic)
- if currHandingMsg != nil {
- err := currHandingMsg.Nak()
- fmt.Println("terminate currMsg ", err)
- }
- return nil
- default:
- currHandingMsg = nil
- msgs, err := sub.Fetch(1)
- if err == nil && len(msgs) > 0 {
- msg := msgs[0]
- currHandingMsg = msg
- meta, _ := msg.Metadata()
- fmt.Println("meta for ", meta.Stream, "#", topic, "#", meta.Consumer, "#")
- fmt.Println("meta NumDelivered", meta.NumDelivered, "NumPending ", meta.NumPending)
- fmt.Println("meta Sequence.Consumer", meta.Sequence.Consumer, "Sequence.Stream ", meta.Sequence.Stream)
- if conf.Cb2 != nil {
- conf.Cb2(msg)
- } else if conf.Cb != nil {
- var req interface{}
- if conf.Entity != nil {
- req = conf.Entity()
- }
- err = json.Unmarshal(msg.Data, req)
- if err != nil {
- msg.Term()
- fmt.Println("work msg err=>", err.Error())
- continue
- }
- conf.Cb(msg, req)
- } else {
- fmt.Println("error no stream wather callback!!!!")
- }
- currHandingMsg = nil
- }
- }
- }
- }
- func (q *NatsBus) GetNatsConn() *nats.Conn {
- return q.nc
- }
- func (q *NatsBus) Run(cb func()) {
- //ctx, cancel := context.WithCancel(context.Background())
- //ctx, stop := context.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
- ctxTerm, cancelTerm := context.WithCancel(context.Background())
- ctxConn, cancelCnn := context.WithCancel(context.Background())
- defer cancelTerm()
- defer cancelCnn()
- go func() {
- exit := make(chan os.Signal, 1)
- signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
- <-exit
- cancelTerm()
- time.Sleep(1 * time.Second)
- os.Exit(1)
- }()
- var QueueSubscribe = func(rplyer *NatsMsgReplyer) {
- r1 := rplyer
- queue := fmt.Sprintf("%s.queue", r1.Subject)
- q.nc.QueueSubscribe(r1.Subject, queue, func(msg *nats.Msg) {
- defer func() {
- if err := recover(); err != nil {
- fmt.Println("catch error ", err)
- }
- }()
- fmt.Println("replay for ", r1.Subject)
- var req interface{}
- if r1.Entity != nil {
- req = r1.Entity()
- err := json.Unmarshal(msg.Data, req)
- if err != nil {
- msg.Term()
- fmt.Println("rplyer work msg err", err.Error())
- return
- }
- }
- if r1.Cb != nil {
- out := r1.Cb(msg, req)
- if out != nil {
- payload, err := json.Marshal(out)
- if err != nil {
- fmt.Println("error reply for", r1.Subject, err)
- }
- err = msg.Respond(payload)
- if err != nil {
- fmt.Println("error reply for", r1.Subject, out)
- }
- }
- return
- }
- if r1.Cb2 != nil {
- out, err := r1.Cb2(msg, req)
- resp := &NatsResponse{ErrorNo: 200}
- if err != nil {
- fmt.Println("error reply2 for", r1.Subject, err)
- resp.ErrorDesc = err.Error()
- resp.ErrorNo = 400
- } else {
- payload, _ := json.Marshal(out)
- resp.Result = string(payload)
- }
- payload, _ := json.Marshal(resp)
- err = msg.Respond(payload)
- if err != nil {
- fmt.Println("error reply2 for", r1.Subject, out, err.Error())
- }
- return
- }
- })
- }
- var startSubscribe = func() {
- //启动所有watcher
- for _, item := range q.streamWacther {
- go q.WatchStream(ctxTerm, ctxConn, item)
- }
- for _, rplyer := range q.replyers {
- QueueSubscribe(rplyer)
- }
- }
- q.runtimeStartStreamWatcher = func(conf *NatsStreamWather) error {
- go q.WatchStream(ctxTerm, ctxConn, conf)
- return nil
- }
- startSubscribe()
- if cb != nil {
- cb()
- }
- for {
- select {
- case state := <-q.reconnChan:
- if state == ConnStateDisconned {
- cancelCnn()
- } else if state == ConnStateReconned {
- ctxConn, cancelCnn = context.WithCancel(context.Background())
- startSubscribe()
- }
- }
- }
- }
|