nats-stream.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package comm
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/nats-io/nats.go"
  9. )
  10. // 投递代理流消息
  11. func (q *NatsBus) PushPackMessage(streamAndTopic string, payload interface{}, options *RequestOptions) error {
  12. pack := "comm"
  13. timeout := time.Second * 5
  14. pubOpts := []nats.PubOpt{}
  15. if options != nil {
  16. if len(options.DeployPack) > 0 {
  17. pack = options.DeployPack
  18. }
  19. if options.Timeout > 0 {
  20. timeout = options.Timeout
  21. }
  22. if len(options.PubOpts) > 0 {
  23. pubOpts = options.PubOpts
  24. }
  25. }
  26. names := strings.Split(streamAndTopic, "#")
  27. if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 {
  28. return errors.New("streamAndTopic should like stream#topic")
  29. }
  30. req := &ReqStreamApi{Payload: payload, Subject: fmt.Sprintf("%s#%s", names[0], names[1]), PubOpts: pubOpts}
  31. payloadData, _ := json.Marshal(req)
  32. msg, err := q.nc.Request(fmt.Sprintf("%s.%s", pack, PackPublicMessageApi), payloadData, timeout)
  33. if err != nil {
  34. return err
  35. }
  36. result := &NatsResponse{}
  37. err = json.Unmarshal(msg.Data, result)
  38. if err != nil {
  39. return err
  40. }
  41. if result.ErrorNo != 200 {
  42. return errors.New(result.ErrorDesc)
  43. }
  44. fmt.Println("-----------------------pushPackMessage-------------------------------------")
  45. fmt.Println("pack:", pack)
  46. fmt.Println("streamAndTopic:", streamAndTopic)
  47. fmt.Println("streamName:", names[0])
  48. fmt.Println("topic:", names[1])
  49. return nil
  50. }
  51. // 消息回复者
  52. type ReqPullPackMessageApi struct {
  53. Subject string
  54. Pack string
  55. AckWaitMinute float64
  56. SubjectCallBack string
  57. }
  58. func CreatePackMessageCallback(revEntity interface{}, callback PackMessageCallback) PackMessageCallback {
  59. return func(entity interface{}) (*RespPackMessage, error) {
  60. data := entity.(*[]byte)
  61. json.Unmarshal(*data, revEntity)
  62. return callback(revEntity)
  63. }
  64. }
  65. // 获取其他部署包的流消息,初始话时调用有效,运行时调用,不能生效
  66. func (q *NatsBus) PullPackMessage(pack, streamAndTopic string, ackWaitMinute float64, callback PackMessageCallback) error {
  67. if len(pack) < 1 {
  68. return errors.New("pack 不能为空")
  69. }
  70. names := strings.Split(streamAndTopic, "#")
  71. if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 {
  72. return errors.New("streamAndTopic should like stream#topic")
  73. }
  74. find := false
  75. subject := fmt.Sprintf("%s#%s", pack, streamAndTopic)
  76. for _, w := range q.replyers {
  77. if w.Subject == subject {
  78. find = true
  79. break
  80. }
  81. }
  82. if find {
  83. return fmt.Errorf("pack message Subject %s has registered", streamAndTopic)
  84. }
  85. if ackWaitMinute < 0.1 { //至少6s
  86. ackWaitMinute = 0.1 //
  87. }
  88. //创建消息回调请求响应器
  89. watcher := &NatsMsgReplyer{
  90. Subject: subject,
  91. Entity: func() interface{} { return &[]byte{} },
  92. Timeout: time.Duration(int64(ackWaitMinute * float64(time.Minute))),
  93. Cb2: func(msg *nats.Msg, entity interface{}) (interface{}, error) { //true = ack
  94. return callback(entity)
  95. },
  96. }
  97. q.replyers = append(q.replyers, watcher)
  98. //向bus发送PUllPackMessage 请求,由bus代理转发消息到本地Request watcher
  99. req := &ReqPullPackMessageApi{
  100. Subject: streamAndTopic,
  101. AckWaitMinute: ackWaitMinute,
  102. SubjectCallBack: subject,
  103. }
  104. q.RequestApi(fmt.Sprintf("%s.%s", pack, PackPullMessageApi), req, time.Second*5, nil)
  105. fmt.Println("-----------------------pullPackMessage-------------------------------------")
  106. fmt.Println("pack:", pack)
  107. fmt.Println("streamAndTopic:", streamAndTopic)
  108. fmt.Println("streamName:", names[0])
  109. fmt.Println("topic:", names[1])
  110. return nil
  111. }
  112. func (q *NatsBus) PullMessage(streamAndTopic string, callback WatcherCallback) error {
  113. names := strings.Split(streamAndTopic, "#")
  114. if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 {
  115. return errors.New("streamAndTopic should like stream#topic")
  116. }
  117. queueName := ""
  118. if len(names) == 2 {
  119. v := strings.ReplaceAll(names[1], ".", "-")
  120. queueName = v + "-queue"
  121. } else {
  122. queueName = names[2]
  123. }
  124. find := false
  125. for _, w := range q.streamWacther {
  126. if w.Stream == names[0] && w.Topic == names[1] && w.Queue == queueName {
  127. find = true
  128. break
  129. }
  130. }
  131. if find {
  132. return fmt.Errorf("pack message Subject %s has registered", streamAndTopic)
  133. }
  134. watcher := &NatsStreamWather{
  135. Topic: names[1],
  136. Stream: names[0],
  137. Queue: queueName,
  138. Entity: func() interface{} {
  139. var enity interface{}
  140. return &enity
  141. },
  142. Cb: callback,
  143. }
  144. q.streamWacther = append(q.streamWacther, watcher)
  145. fmt.Println("-----------------------pullMessage-------------------------------------")
  146. fmt.Println("streamAndTopic:", streamAndTopic)
  147. fmt.Println("streamName:", names[0])
  148. fmt.Println("topic:", names[1])
  149. return nil
  150. }
  151. func (q *NatsBus) PushMessage(streamAndTopic string, data interface{}, opts ...nats.PubOpt) error {
  152. names := strings.Split(streamAndTopic, "#")
  153. if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 {
  154. return errors.New("streamAndTopic should like stream#topic")
  155. }
  156. streamName := names[0]
  157. topic := names[1]
  158. Payload, err := json.Marshal(data)
  159. if err != nil {
  160. return err
  161. }
  162. js, err := q.nc.JetStream()
  163. if err != nil {
  164. return err
  165. }
  166. stream, _ := js.StreamInfo(streamName)
  167. if stream == nil {
  168. _, err = js.AddStream(&nats.StreamConfig{
  169. Name: streamName,
  170. Subjects: []string{topic},
  171. Retention: nats.WorkQueuePolicy,
  172. })
  173. if err != nil {
  174. return err
  175. }
  176. _, err = js.StreamInfo(streamName)
  177. if err != nil {
  178. return err
  179. }
  180. }
  181. _, err = js.PublishAsync(topic, Payload, opts...)
  182. fmt.Println("-----------------------pushMessage-------------------------------------")
  183. fmt.Println("streamAndTopic:", streamAndTopic)
  184. fmt.Println("streamName:", names[0])
  185. fmt.Println("topic:", names[1])
  186. return err
  187. }