nats.go 17 KB


  1. package comm
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "os/signal"
  9. "sync"
  10. "syscall"
  11. "time"
  12. "github.com/nats-io/nats.go"
  13. )
  14. const (
  15. ConnStateDisconned = 1
  16. ConnStateReconned = 2
  17. )
  18. // NATSQueue queue for work
  19. type NatsBus struct {
  20. mu sync.Mutex
  21. nc *nats.Conn
  22. reconnChan chan int
  23. streamWacther []*NatsStreamWather
  24. topicWatcher []*NatsTopicWather
  25. replyers []*NatsMsgReplyer
  26. runtimeStartStreamWatcher StartStreamWatcherHandle
  27. }
  28. type StartStreamWatcherHandle func(conf *NatsStreamWather) error
  29. type NatsStreamWather struct {
  30. Stream string
  31. Topic string
  32. Queue string
  33. AckWaitMinute int64
  34. AckWaitMinuteFloat float64
  35. Entity CreateEnity
  36. Cb WatcherCallback
  37. Cb2 WatcherCallback2
  38. Exported bool //在其他部署包内,是否可以被调用
  39. MaxDeliver int // 添加 MaxDeliver 配置
  40. AckPolicy nats.AckPolicy // 添加 AckPolicy 配置
  41. }
  42. // 消息回复者
  43. type NatsMsgReplyer struct {
  44. Subject string
  45. Entity CreateEnity
  46. Exported bool //在其他部署包内,是否可以被调用
  47. Timeout time.Duration //调用的最大时间
  48. Cb ReplyerHandle
  49. Cb2 ReplyerHandle2
  50. }
  51. type NatsTopicWather struct {
  52. Topic string
  53. Cb WatcherCallback
  54. }
  55. type Stream func(streamName string, topic string, group string)
  56. type ListenTopic func(topic string)
  57. type WatcherCallback func(msg *nats.Msg, entity interface{})
  58. type WatcherCallback2 func(msg *nats.Msg)
  59. type ReplyerHandle func(msg *nats.Msg, entity interface{}) interface{}
  60. type ReplyerHandle2 func(msg *nats.Msg, entity interface{}) (interface{}, error)
  61. type ReplyerHandle3 func(msg *nats.Msg) (interface{}, error)
  62. type CreateEnity func() interface{}
  63. type PackMessageCallback func(entity interface{}) (*RespPackMessage, error)
  64. type RespPackMessage struct {
  65. CanAck bool
  66. }
  67. // 请求数据
  68. type NatsResponse struct {
  69. ErrorNo int
  70. ErrorDesc string
  71. Result string //json字符串
  72. }
  73. type RequestOptions struct {
  74. Timeout time.Duration
  75. PubOpts []nats.PubOpt
  76. DeployPack string
  77. }
  78. type ReqPayload struct {
  79. Payload interface{}
  80. DeployPack string
  81. }
  82. type ReqPackApi struct {
  83. Payload interface{}
  84. Subject string
  85. Timeout time.Duration
  86. }
  87. // req params
  88. type ReqStreamApi struct {
  89. Subject string
  90. Payload interface{}
  91. PubOpts []nats.PubOpt
  92. }
  93. type CancelSubscribe func()
  94. type SubscribeCallback func(obj interface{}, msg *nats.Msg) interface{}
  95. type SubOption struct {
  96. Sub string
  97. Obj func() interface{}
  98. Call SubscribeCallback
  99. }
  100. func (q *NatsBus) Subscribe(topic string, msg nats.MsgHandler) (*nats.Subscription, error) {
  101. return q.nc.Subscribe(topic, msg)
  102. }
  103. func (q *NatsBus) SubscribeOnce(Option *SubOption) (CancelSubscribe, error) {
  104. var sub *nats.Subscription
  105. var cancel = func() {
  106. if sub != nil {
  107. sub.Unsubscribe()
  108. sub = nil
  109. }
  110. }
  111. s, err := q.nc.Subscribe(Option.Sub, func(msg *nats.Msg) {
  112. var p interface{}
  113. if Option.Obj != nil {
  114. p = Option.Obj()
  115. err := json.Unmarshal(msg.Data, p)
  116. if err != nil {
  117. msg.Term()
  118. fmt.Println(Option.Sub, " payload parse msg err", err)
  119. return
  120. }
  121. }
  122. cancel()
  123. out := Option.Call(p, msg)
  124. if out != nil {
  125. payload, _ := json.Marshal(out)
  126. msg.Respond(payload)
  127. return
  128. }
  129. })
  130. if err != nil {
  131. return nil, err
  132. }
  133. sub = s
  134. return cancel, nil
  135. }
  136. func (q *NatsBus) QueueSubscribe(Option *SubOption) (CancelSubscribe, error) {
  137. var sub *nats.Subscription
  138. var cancel = func() {
  139. if sub != nil {
  140. sub.Unsubscribe()
  141. sub = nil
  142. }
  143. }
  144. s, err := q.nc.QueueSubscribe(Option.Sub, Option.Sub+".queue", func(msg *nats.Msg) {
  145. var p interface{}
  146. if Option.Obj != nil {
  147. p = Option.Obj()
  148. err := json.Unmarshal(msg.Data, p)
  149. if err != nil {
  150. msg.Term()
  151. fmt.Println(Option.Sub, " payload parse msg err", err)
  152. return
  153. }
  154. }
  155. out := Option.Call(p, msg)
  156. if out != nil {
  157. payload, _ := json.Marshal(out)
  158. msg.Respond(payload)
  159. return
  160. }
  161. })
  162. if err != nil {
  163. return nil, err
  164. }
  165. sub = s
  166. return cancel, nil
  167. }
  168. func (q *NatsBus) Publish(topic string, data []byte) error {
  169. return q.nc.Publish(topic, data)
  170. }
  171. func (q *NatsBus) PublishObj(topic string, obj interface{}) error {
  172. data, _ := json.Marshal(obj)
  173. return q.nc.Publish(topic, data)
  174. }
  175. func (q *NatsBus) AddReplyers(w ...*NatsMsgReplyer) {
  176. q.replyers = append(q.replyers, w...)
  177. }
  178. func (q *NatsBus) AddStreamWacher(w ...*NatsStreamWather) {
  179. q.streamWacther = append(q.streamWacther, w...)
  180. }
  181. // 请求代理api
  182. func (q *NatsBus) RequestPackApi(subject string, Payload interface{}, out interface{}, options *RequestOptions) error {
  183. pack := "comm"
  184. timeout := time.Second * 5
  185. if options != nil {
  186. if len(options.DeployPack) > 0 {
  187. pack = options.DeployPack
  188. }
  189. if options.Timeout > 0 {
  190. timeout = options.Timeout
  191. }
  192. }
  193. payload, _ := json.Marshal(&ReqPackApi{Payload: Payload, Subject: subject, Timeout: timeout})
  194. msg, err := q.nc.Request(fmt.Sprintf("%s.%s", pack, PackPublicApi), payload, timeout)
  195. if err != nil {
  196. return err
  197. }
  198. result := &NatsResponse{}
  199. err = json.Unmarshal(msg.Data, result)
  200. if err != nil {
  201. return err
  202. }
  203. if result.ErrorNo != 200 {
  204. return errors.New(result.ErrorDesc)
  205. }
  206. if out != nil {
  207. return json.Unmarshal([]byte(result.Result), out)
  208. }
  209. return nil
  210. }
  211. // 请求本地api
  212. func (q *NatsBus) RequestApi(subject string, data interface{}, timeout time.Duration, out interface{}) error {
  213. // Lock so only one goroutine at a time can access the map c.v.
  214. q.mu.Lock()
  215. defer q.mu.Unlock()
  216. payload := []byte{}
  217. if data != nil {
  218. payload, _ = json.Marshal(data)
  219. }
  220. msg, err := q.nc.Request(subject, payload, timeout)
  221. if err != nil {
  222. return err
  223. }
  224. result := &NatsResponse{}
  225. err = json.Unmarshal(msg.Data, result)
  226. if err != nil {
  227. return err
  228. }
  229. if result.ErrorNo != 200 {
  230. return errors.New(result.ErrorDesc)
  231. }
  232. if out != nil {
  233. return json.Unmarshal([]byte(result.Result), out)
  234. }
  235. return nil
  236. }
  237. func (q *NatsBus) Request(subject string, data interface{}, timeout time.Duration, out interface{}) error {
  238. payload := []byte{}
  239. if data != nil {
  240. payload, _ = json.Marshal(data)
  241. }
  242. msg, err := q.nc.Request(subject, payload, timeout)
  243. if err != nil {
  244. return err
  245. }
  246. return json.Unmarshal(msg.Data, out)
  247. }
  248. // 退出
  249. func (q *NatsBus) Quit() {
  250. q.nc.Drain()
  251. q.nc.Close()
  252. }
  253. func NewNatsBus2(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers []*NatsMsgReplyer) (*NatsBus, error) {
  254. bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
  255. if err != nil {
  256. return nil, err
  257. }
  258. bus.replyers = replyers
  259. return bus, nil
  260. }
  261. // 可变参数
  262. func NewNatsBus3(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers ...*NatsMsgReplyer) (*NatsBus, error) {
  263. bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
  264. if err != nil {
  265. return nil, err
  266. }
  267. bus.replyers = replyers
  268. return bus, nil
  269. }
  270. func NewNatsBus(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather) (*NatsBus, error) {
  271. url := natsUri
  272. if len(url) < 1 {
  273. url = nats.DefaultURL
  274. }
  275. fmt.Println("conning to nats ====> ", url)
  276. bus := &NatsBus{
  277. reconnChan: make(chan int),
  278. streamWacther: streamWacther,
  279. topicWatcher: []*NatsTopicWather{},
  280. }
  281. nc, err := nats.Connect(url,
  282. // nats.Name("render1"),
  283. nats.MaxReconnects(MaxReconnect),
  284. nats.ReconnectWait(time.Duration(ReconnDelaySecond*int(time.Second))),
  285. nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
  286. fmt.Printf("%s Got disconnected! Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), err)
  287. bus.reconnChan <- ConnStateDisconned
  288. }),
  289. nats.ReconnectHandler(func(nc *nats.Conn) {
  290. fmt.Printf("%s Got reconnected to %v!\n", time.Now().Format("2006-01-02 15:04:05"), nc.ConnectedUrl())
  291. bus.reconnChan <- ConnStateReconned
  292. }),
  293. nats.ClosedHandler(func(nc *nats.Conn) {
  294. tip := fmt.Sprintf("%s Connection closed. Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), nc.LastError())
  295. fmt.Println(tip)
  296. // panic(tip)
  297. }),
  298. )
  299. if err != nil {
  300. fmt.Println("conn to nats failed ====> ", url, err)
  301. return nil, err
  302. }
  303. bus.nc = nc
  304. fmt.Printf("%s nats bus Connected: %s\n", time.Now().Format("2006-01-02 15:04:05"), url)
  305. return bus, nil
  306. }
  307. // 创建jetStream
  308. func (q *NatsBus) JetStream() (nats.JetStreamContext, error) {
  309. return q.nc.JetStream()
  310. }
  311. // 创建jetStream
  312. func (q *NatsBus) CreateStream(streamName string, topic string) (nats.JetStreamContext, error) {
  313. js, err := q.nc.JetStream()
  314. if err != nil {
  315. return nil, err
  316. }
  317. stream, _ := js.StreamInfo(streamName)
  318. if stream != nil {
  319. return js, nil
  320. }
  321. _, err = js.AddStream(&nats.StreamConfig{
  322. Name: streamName,
  323. Subjects: []string{topic},
  324. Retention: nats.WorkQueuePolicy,
  325. MaxConsumers: 5,
  326. })
  327. if err != nil {
  328. fmt.Println("bus CreateStream err=>", err.Error())
  329. return nil, err
  330. }
  331. return js, nil
  332. }
  333. // 监听流数据
  334. func (q *NatsBus) WatchStream(ctxTerm, ctxConn context.Context, conf *NatsStreamWather) error {
  335. streamName := conf.Stream
  336. topic := conf.Topic
  337. queue := conf.Queue
  338. fmt.Println("Watching Stream ", conf.Stream, conf.Topic, conf.Queue)
  339. //创建对应的jet stream 以便模型创建消息不回丢失
  340. js, err := q.CreateStream(streamName, topic)
  341. if err != nil {
  342. fmt.Println("WatchStream error=>", err.Error())
  343. return err
  344. }
  345. // 设置消费者选项,包括 MaxDeliver
  346. consumerConfig := &nats.ConsumerConfig{
  347. Durable: queue,
  348. AckPolicy: nats.AckExplicitPolicy, // 强制使用 AckExplicitPolicy
  349. MaxDeliver: conf.MaxDeliver,
  350. FilterSubject: topic,
  351. }
  352. if conf.AckWaitMinute > 0 {
  353. consumerConfig.AckWait = time.Duration(conf.AckWaitMinute * int64(time.Minute))
  354. }
  355. if conf.AckWaitMinuteFloat > 0 {
  356. consumerConfig.AckWait = time.Duration(int64(conf.AckWaitMinuteFloat * float64(time.Minute)))
  357. }
  358. // 检查消费者是否存在
  359. consumer, err := js.ConsumerInfo(streamName, queue)
  360. if err == nil {
  361. // 消费者存在,检查并更新配置
  362. needUpdate := consumer.Config.MaxDeliver != conf.MaxDeliver ||
  363. consumer.Config.AckWait != consumerConfig.AckWait
  364. if needUpdate {
  365. _, err = js.UpdateConsumer(streamName, consumerConfig)
  366. if err != nil {
  367. fmt.Println("WatchStream UpdateConsumer err ", conf.Stream, conf.Topic, err)
  368. return err
  369. }
  370. }
  371. } else {
  372. // 消费者不存在,创建新的
  373. _, err = js.AddConsumer(streamName, consumerConfig)
  374. if err != nil {
  375. fmt.Println("WatchStream AddConsumer err ", conf.Stream, conf.Topic, err)
  376. return err
  377. }
  378. }
  379. // 使用 pull 订阅模式
  380. sub, err := js.PullSubscribe(topic, queue, nats.BindStream(streamName))
  381. if err != nil {
  382. fmt.Println("WatchStream QueueSubscribeSync err ", conf.Stream, conf.Topic, err)
  383. return err
  384. }
  385. fmt.Println("Watching Stream succ=>", conf.Stream, conf.Topic, conf.Queue)
  386. var currHandingMsg *nats.Msg = nil
  387. for {
  388. select {
  389. case <-ctxTerm.Done():
  390. fmt.Println("Watching Stream Termed", streamName, topic)
  391. if currHandingMsg != nil {
  392. err := currHandingMsg.Nak()
  393. fmt.Println("terminate currMsg ", err)
  394. }
  395. //中断链接
  396. q.Quit()
  397. return nil
  398. case <-ctxConn.Done():
  399. fmt.Println("Watching Stream Conn closed", streamName, topic)
  400. if currHandingMsg != nil {
  401. err := currHandingMsg.Nak()
  402. fmt.Println("terminate currMsg ", err)
  403. }
  404. return nil
  405. default:
  406. currHandingMsg = nil
  407. msgs, err := sub.Fetch(1)
  408. if err == nil && len(msgs) > 0 {
  409. msg := msgs[0]
  410. currHandingMsg = msg
  411. meta, _ := msg.Metadata()
  412. fmt.Println("meta for ", meta.Stream, "#", topic, "#", meta.Consumer, "#")
  413. fmt.Println("meta NumDelivered", meta.NumDelivered, "NumPending ", meta.NumPending)
  414. fmt.Println("meta Sequence.Consumer", meta.Sequence.Consumer, "Sequence.Stream ", meta.Sequence.Stream)
  415. if conf.Cb2 != nil {
  416. conf.Cb2(msg)
  417. } else if conf.Cb != nil {
  418. var req interface{}
  419. if conf.Entity != nil {
  420. req = conf.Entity()
  421. }
  422. err = json.Unmarshal(msg.Data, req)
  423. if err != nil {
  424. msg.Term()
  425. fmt.Println("work msg err=>", err.Error())
  426. continue
  427. }
  428. conf.Cb(msg, req)
  429. } else {
  430. fmt.Println("error no stream wather callback!!!!")
  431. }
  432. currHandingMsg = nil
  433. }
  434. }
  435. }
  436. }
  437. // 监听流数据
  438. func (q *NatsBus) WatchStream1(ctxTerm, ctxConn context.Context, conf *NatsStreamWather) error {
  439. streamName := conf.Stream
  440. topic := conf.Topic
  441. queue := conf.Queue
  442. fmt.Println("Watching Stream ", conf.Stream, conf.Topic, conf.Queue)
  443. //创建对应的jet stream 以便模型创建消息不回丢失
  444. js, err := q.CreateStream(streamName, topic)
  445. if err != nil {
  446. fmt.Println("WatchStream error=>", err.Error())
  447. return err
  448. }
  449. opts := []nats.SubOpt{nats.BindStream(streamName), nats.MaxDeliver(5)} //nats.MaxDeliver(5)
  450. if conf.AckWaitMinute > 0 {
  451. opts = append(opts, nats.AckWait(time.Duration(conf.AckWaitMinute*int64(time.Minute))))
  452. }
  453. if conf.AckWaitMinuteFloat > 0 {
  454. opts = append(opts, nats.AckWait(time.Duration(int64(conf.AckWaitMinuteFloat*float64(time.Minute)))))
  455. }
  456. sub, err := js.PullSubscribe(topic, queue, opts...)
  457. if err != nil {
  458. fmt.Println("WatchStream QueueSubscribeSync err ", conf.Stream, conf.Topic, err)
  459. return err
  460. }
  461. fmt.Println("Watching Stream succ=>", conf.Stream, conf.Topic, conf.Queue)
  462. var currHandingMsg *nats.Msg = nil
  463. for {
  464. select {
  465. case <-ctxTerm.Done():
  466. fmt.Println("Watching Stream Termed", streamName, topic)
  467. if currHandingMsg != nil {
  468. err := currHandingMsg.Nak()
  469. fmt.Println("terminate currMsg ", err)
  470. }
  471. //中断链接
  472. q.Quit()
  473. return nil
  474. case <-ctxConn.Done():
  475. fmt.Println("Watching Stream Conn closed", streamName, topic)
  476. if currHandingMsg != nil {
  477. err := currHandingMsg.Nak()
  478. fmt.Println("terminate currMsg ", err)
  479. }
  480. return nil
  481. default:
  482. currHandingMsg = nil
  483. msgs, err := sub.Fetch(1)
  484. if err == nil && len(msgs) > 0 {
  485. msg := msgs[0]
  486. currHandingMsg = msg
  487. meta, _ := msg.Metadata()
  488. fmt.Println("meta for ", meta.Stream, "#", topic, "#", meta.Consumer, "#")
  489. fmt.Println("meta NumDelivered", meta.NumDelivered, "NumPending ", meta.NumPending)
  490. fmt.Println("meta Sequence.Consumer", meta.Sequence.Consumer, "Sequence.Stream ", meta.Sequence.Stream)
  491. if conf.Cb2 != nil {
  492. conf.Cb2(msg)
  493. } else if conf.Cb != nil {
  494. var req interface{}
  495. if conf.Entity != nil {
  496. req = conf.Entity()
  497. }
  498. err = json.Unmarshal(msg.Data, req)
  499. if err != nil {
  500. msg.Term()
  501. fmt.Println("work msg err=>", err.Error())
  502. continue
  503. }
  504. conf.Cb(msg, req)
  505. } else {
  506. fmt.Println("error no stream wather callback!!!!")
  507. }
  508. currHandingMsg = nil
  509. }
  510. }
  511. }
  512. }
  513. func (q *NatsBus) GetNatsConn() *nats.Conn {
  514. return q.nc
  515. }
  516. func (q *NatsBus) Run(cb func()) {
  517. //ctx, cancel := context.WithCancel(context.Background())
  518. //ctx, stop := context.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
  519. ctxTerm, cancelTerm := context.WithCancel(context.Background())
  520. ctxConn, cancelCnn := context.WithCancel(context.Background())
  521. defer cancelTerm()
  522. defer cancelCnn()
  523. go func() {
  524. exit := make(chan os.Signal, 1)
  525. signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
  526. <-exit
  527. cancelTerm()
  528. time.Sleep(1 * time.Second)
  529. os.Exit(1)
  530. }()
  531. var QueueSubscribe = func(rplyer *NatsMsgReplyer) {
  532. r1 := rplyer
  533. queue := fmt.Sprintf("%s.queue", r1.Subject)
  534. q.nc.QueueSubscribe(r1.Subject, queue, func(msg *nats.Msg) {
  535. defer func() {
  536. if err := recover(); err != nil {
  537. fmt.Println("catch error ", err)
  538. }
  539. }()
  540. fmt.Println("replay for ", r1.Subject)
  541. var req interface{}
  542. if r1.Entity != nil {
  543. req = r1.Entity()
  544. err := json.Unmarshal(msg.Data, req)
  545. if err != nil {
  546. msg.Term()
  547. fmt.Println("rplyer work msg err", err.Error())
  548. return
  549. }
  550. }
  551. if r1.Cb != nil {
  552. out := r1.Cb(msg, req)
  553. if out != nil {
  554. payload, err := json.Marshal(out)
  555. if err != nil {
  556. fmt.Println("error reply for", r1.Subject, err)
  557. }
  558. err = msg.Respond(payload)
  559. if err != nil {
  560. fmt.Println("error reply for", r1.Subject, out)
  561. }
  562. }
  563. return
  564. }
  565. if r1.Cb2 != nil {
  566. out, err := r1.Cb2(msg, req)
  567. resp := &NatsResponse{ErrorNo: 200}
  568. if err != nil {
  569. fmt.Println("error reply2 for", r1.Subject, err)
  570. resp.ErrorDesc = err.Error()
  571. resp.ErrorNo = 400
  572. } else {
  573. payload, _ := json.Marshal(out)
  574. resp.Result = string(payload)
  575. }
  576. payload, _ := json.Marshal(resp)
  577. err = msg.Respond(payload)
  578. if err != nil {
  579. fmt.Println("error reply2 for", r1.Subject, out)
  580. }
  581. return
  582. }
  583. })
  584. }
  585. var startSubscribe = func() {
  586. //启动所有watcher
  587. for _, item := range q.streamWacther {
  588. go q.WatchStream(ctxTerm, ctxConn, item)
  589. }
  590. for _, rplyer := range q.replyers {
  591. QueueSubscribe(rplyer)
  592. }
  593. }
  594. q.runtimeStartStreamWatcher = func(conf *NatsStreamWather) error {
  595. go q.WatchStream(ctxTerm, ctxConn, conf)
  596. return nil
  597. }
  598. startSubscribe()
  599. if cb != nil {
  600. cb()
  601. }
  602. for {
  603. select {
  604. case state := <-q.reconnChan:
  605. if state == ConnStateDisconned {
  606. cancelCnn()
  607. } else if state == ConnStateReconned {
  608. ctxConn, cancelCnn = context.WithCancel(context.Background())
  609. startSubscribe()
  610. }
  611. }
  612. }
  613. }