package comm import ( "encoding/json" "errors" "fmt" "strings" "time" "github.com/nats-io/nats.go" ) // 投递代理流消息 func (q *NatsBus) PushPackMessage(streamAndTopic string, payload interface{}, options *RequestOptions) error { pack := "comm" timeout := time.Second * 5 pubOpts := []nats.PubOpt{} if options != nil { if len(options.DeployPack) > 0 { pack = options.DeployPack } if options.Timeout > 0 { timeout = options.Timeout } if len(options.PubOpts) > 0 { pubOpts = options.PubOpts } } names := strings.Split(streamAndTopic, "#") if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 { return errors.New("streamAndTopic should like stream#topic") } req := &ReqStreamApi{Payload: payload, Subject: fmt.Sprintf("%s#%s", names[0], names[1]), PubOpts: pubOpts} payloadData, _ := json.Marshal(req) msg, err := q.nc.Request(fmt.Sprintf("%s.%s", pack, PackPublicMessageApi), payloadData, timeout) if err != nil { return err } result := &NatsResponse{} err = json.Unmarshal(msg.Data, result) if err != nil { return err } if result.ErrorNo != 200 { return errors.New(result.ErrorDesc) } fmt.Println("-----------------------pushPackMessage-------------------------------------") fmt.Println("pack:", pack) fmt.Println("streamAndTopic:", streamAndTopic) fmt.Println("streamName:", names[0]) fmt.Println("topic:", names[1]) return nil } // 消息回复者 type ReqPullPackMessageApi struct { Subject string Pack string AckWaitMinute float64 SubjectCallBack string } func CreatePackMessageCallback(revEntity interface{}, callback PackMessageCallback) PackMessageCallback { return func(entity interface{}) (*RespPackMessage, error) { data := entity.(*[]byte) json.Unmarshal(*data, revEntity) return callback(revEntity) } } // 获取其他部署包的流消息,初始话时调用有效,运行时调用,不能生效 func (q *NatsBus) PullPackMessage(pack, streamAndTopic string, ackWaitMinute float64, callback PackMessageCallback) error { if len(pack) < 1 { return errors.New("pack 不能为空") } names := strings.Split(streamAndTopic, "#") if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 { return errors.New("streamAndTopic should like stream#topic") } find := false subject := fmt.Sprintf("%s#%s", pack, streamAndTopic) for _, w := range q.replyers { if w.Subject == subject { find = true break } } if find { return fmt.Errorf("pack message Subject %s has registered", streamAndTopic) } if ackWaitMinute < 0.1 { //至少6s ackWaitMinute = 0.1 // } //创建消息回调请求响应器 watcher := &NatsMsgReplyer{ Subject: subject, Entity: func() interface{} { return &[]byte{} }, Timeout: time.Duration(int64(ackWaitMinute * float64(time.Minute))), Cb2: func(msg *nats.Msg, entity interface{}) (interface{}, error) { //true = ack return callback(entity) }, } q.replyers = append(q.replyers, watcher) //向bus发送PUllPackMessage 请求,由bus代理转发消息到本地Request watcher req := &ReqPullPackMessageApi{ Subject: streamAndTopic, AckWaitMinute: ackWaitMinute, SubjectCallBack: subject, } q.RequestApi(fmt.Sprintf("%s.%s", pack, PackPullMessageApi), req, time.Second*5, nil) fmt.Println("-----------------------pullPackMessage-------------------------------------") fmt.Println("pack:", pack) fmt.Println("streamAndTopic:", streamAndTopic) fmt.Println("streamName:", names[0]) fmt.Println("topic:", names[1]) return nil } func (q *NatsBus) PullMessage(streamAndTopic string, callback WatcherCallback) error { names := strings.Split(streamAndTopic, "#") if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 { return errors.New("streamAndTopic should like stream#topic") } queueName := "" if len(names) == 2 { v := strings.ReplaceAll(names[1], ".", "-") queueName = v + "-queue" } else { queueName = names[2] } find := false for _, w := range q.streamWacther { if w.Stream == names[0] && w.Topic == names[1] && w.Queue == queueName { find = true break } } if find { return fmt.Errorf("pack message Subject %s has registered", streamAndTopic) } watcher := &NatsStreamWather{ Topic: names[1], Stream: names[0], Queue: queueName, Entity: func() interface{} { var enity interface{} return &enity }, Cb: callback, } q.streamWacther = append(q.streamWacther, watcher) fmt.Println("-----------------------pullMessage-------------------------------------") fmt.Println("streamAndTopic:", streamAndTopic) fmt.Println("streamName:", names[0]) fmt.Println("topic:", names[1]) return nil } func (q *NatsBus) PushMessage(streamAndTopic string, data interface{}, opts ...nats.PubOpt) error { names := strings.Split(streamAndTopic, "#") if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 { return errors.New("streamAndTopic should like stream#topic") } streamName := names[0] topic := names[1] Payload, err := json.Marshal(data) if err != nil { return err } js, err := q.nc.JetStream() if err != nil { return err } stream, _ := js.StreamInfo(streamName) if stream == nil { _, err = js.AddStream(&nats.StreamConfig{ Name: streamName, Subjects: []string{topic}, Retention: nats.WorkQueuePolicy, }) if err != nil { return err } _, err = js.StreamInfo(streamName) if err != nil { return err } } _, err = js.PublishAsync(topic, Payload, opts...) fmt.Println("-----------------------pushMessage-------------------------------------") fmt.Println("streamAndTopic:", streamAndTopic) fmt.Println("streamName:", names[0]) fmt.Println("topic:", names[1]) return err }