123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- package comm
- import (
- "encoding/json"
- "errors"
- "fmt"
- "strings"
- "time"
- "github.com/nats-io/nats.go"
- )
- // 投递代理流消息
- func (q *NatsBus) PushPackMessage(streamAndTopic string, payload interface{}, options *RequestOptions) error {
- pack := "comm"
- timeout := time.Second * 5
- pubOpts := []nats.PubOpt{}
- if options != nil {
- if len(options.DeployPack) > 0 {
- pack = options.DeployPack
- }
- if options.Timeout > 0 {
- timeout = options.Timeout
- }
- if len(options.PubOpts) > 0 {
- pubOpts = options.PubOpts
- }
- }
- names := strings.Split(streamAndTopic, "#")
- if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 {
- return errors.New("streamAndTopic should like stream#topic")
- }
- req := &ReqStreamApi{Payload: payload, Subject: fmt.Sprintf("%s#%s", names[0], names[1]), PubOpts: pubOpts}
- payloadData, _ := json.Marshal(req)
- msg, err := q.nc.Request(fmt.Sprintf("%s.%s", pack, PackPublicMessageApi), payloadData, timeout)
- if err != nil {
- return err
- }
- result := &NatsResponse{}
- err = json.Unmarshal(msg.Data, result)
- if err != nil {
- return err
- }
- if result.ErrorNo != 200 {
- return errors.New(result.ErrorDesc)
- }
- fmt.Println("-----------------------pushPackMessage-------------------------------------")
- fmt.Println("pack:", pack)
- fmt.Println("streamAndTopic:", streamAndTopic)
- fmt.Println("streamName:", names[0])
- fmt.Println("topic:", names[1])
- return nil
- }
- // 消息回复者
- type ReqPullPackMessageApi struct {
- Subject string
- Pack string
- AckWaitMinute float64
- SubjectCallBack string
- }
- func CreatePackMessageCallback(revEntity interface{}, callback PackMessageCallback) PackMessageCallback {
- return func(entity interface{}) (*RespPackMessage, error) {
- data := entity.(*[]byte)
- json.Unmarshal(*data, revEntity)
- return callback(revEntity)
- }
- }
- // 获取其他部署包的流消息,初始话时调用有效,运行时调用,不能生效
- func (q *NatsBus) PullPackMessage(pack, streamAndTopic string, ackWaitMinute float64, callback PackMessageCallback) error {
- if len(pack) < 1 {
- return errors.New("pack 不能为空")
- }
- names := strings.Split(streamAndTopic, "#")
- if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 {
- return errors.New("streamAndTopic should like stream#topic")
- }
- find := false
- subject := fmt.Sprintf("%s#%s", pack, streamAndTopic)
- for _, w := range q.replyers {
- if w.Subject == subject {
- find = true
- break
- }
- }
- if find {
- return fmt.Errorf("pack message Subject %s has registered", streamAndTopic)
- }
- if ackWaitMinute < 0.1 { //至少6s
- ackWaitMinute = 0.1 //
- }
- //创建消息回调请求响应器
- watcher := &NatsMsgReplyer{
- Subject: subject,
- Entity: func() interface{} { return &[]byte{} },
- Timeout: time.Duration(int64(ackWaitMinute * float64(time.Minute))),
- Cb2: func(msg *nats.Msg, entity interface{}) (interface{}, error) { //true = ack
- return callback(entity)
- },
- }
- q.replyers = append(q.replyers, watcher)
- //向bus发送PUllPackMessage 请求,由bus代理转发消息到本地Request watcher
- req := &ReqPullPackMessageApi{
- Subject: streamAndTopic,
- AckWaitMinute: ackWaitMinute,
- SubjectCallBack: subject,
- }
- q.RequestApi(fmt.Sprintf("%s.%s", pack, PackPullMessageApi), req, time.Second*5, nil)
- fmt.Println("-----------------------pullPackMessage-------------------------------------")
- fmt.Println("pack:", pack)
- fmt.Println("streamAndTopic:", streamAndTopic)
- fmt.Println("streamName:", names[0])
- fmt.Println("topic:", names[1])
- return nil
- }
- func (q *NatsBus) PullMessage(streamAndTopic string, callback WatcherCallback) error {
- names := strings.Split(streamAndTopic, "#")
- if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 {
- return errors.New("streamAndTopic should like stream#topic")
- }
- queueName := ""
- if len(names) == 2 {
- v := strings.ReplaceAll(names[1], ".", "-")
- queueName = v + "-queue"
- } else {
- queueName = names[2]
- }
- find := false
- for _, w := range q.streamWacther {
- if w.Stream == names[0] && w.Topic == names[1] && w.Queue == queueName {
- find = true
- break
- }
- }
- if find {
- return fmt.Errorf("pack message Subject %s has registered", streamAndTopic)
- }
- watcher := &NatsStreamWather{
- Topic: names[1],
- Stream: names[0],
- Queue: queueName,
- Entity: func() interface{} {
- var enity interface{}
- return &enity
- },
- Cb: callback,
- }
- q.streamWacther = append(q.streamWacther, watcher)
- fmt.Println("-----------------------pullMessage-------------------------------------")
- fmt.Println("streamAndTopic:", streamAndTopic)
- fmt.Println("streamName:", names[0])
- fmt.Println("topic:", names[1])
- return nil
- }
- func (q *NatsBus) PushMessage(streamAndTopic string, data interface{}, opts ...nats.PubOpt) error {
- names := strings.Split(streamAndTopic, "#")
- if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 {
- return errors.New("streamAndTopic should like stream#topic")
- }
- streamName := names[0]
- topic := names[1]
- Payload, err := json.Marshal(data)
- if err != nil {
- return err
- }
- js, err := q.nc.JetStream()
- if err != nil {
- return err
- }
- stream, _ := js.StreamInfo(streamName)
- if stream == nil {
- _, err = js.AddStream(&nats.StreamConfig{
- Name: streamName,
- Subjects: []string{topic},
- Retention: nats.WorkQueuePolicy,
- })
- if err != nil {
- return err
- }
- _, err = js.StreamInfo(streamName)
- if err != nil {
- return err
- }
- }
- _, err = js.PublishAsync(topic, Payload, opts...)
- fmt.Println("-----------------------pushMessage-------------------------------------")
- fmt.Println("streamAndTopic:", streamAndTopic)
- fmt.Println("streamName:", names[0])
- fmt.Println("topic:", names[1])
- return err
- }
|