nats.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639
  1. package comm
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "os/signal"
  9. "sync"
  10. "syscall"
  11. "time"
  12. "github.com/nats-io/nats.go"
  13. )
  14. const (
  15. ConnStateDisconned = 1
  16. ConnStateReconned = 2
  17. )
  18. // NATSQueue queue for work
  19. type NatsBus struct {
  20. mu sync.Mutex
  21. nc *nats.Conn
  22. reconnChan chan int
  23. streamWacther []*NatsStreamWather
  24. topicWatcher []*NatsTopicWather
  25. replyers []*NatsMsgReplyer
  26. runtimeStartStreamWatcher StartStreamWatcherHandle
  27. }
  28. type StartStreamWatcherHandle func(conf *NatsStreamWather) error
  29. type NatsStreamWather struct {
  30. Stream string
  31. Topic string
  32. Queue string
  33. AckWaitMinute int64
  34. AckWaitMinuteFloat float64
  35. Entity CreateEnity
  36. Cb WatcherCallback
  37. Cb2 WatcherCallback2
  38. Exported bool //在其他部署包内,是否可以被调用
  39. }
  40. // 消息回复者
  41. type NatsMsgReplyer struct {
  42. Subject string
  43. Entity CreateEnity
  44. Exported bool //在其他部署包内,是否可以被调用
  45. Timeout time.Duration //调用的最大时间
  46. Cb ReplyerHandle
  47. Cb2 ReplyerHandle2
  48. }
  49. type NatsTopicWather struct {
  50. Topic string
  51. Cb WatcherCallback
  52. }
  53. type Stream func(streamName string, topic string, group string)
  54. type ListenTopic func(topic string)
  55. type WatcherCallback func(msg *nats.Msg, entity interface{})
  56. type WatcherCallback2 func(msg *nats.Msg)
  57. type ReplyerHandle func(msg *nats.Msg, entity interface{}) interface{}
  58. type ReplyerHandle2 func(msg *nats.Msg, entity interface{}) (interface{}, error)
  59. type ReplyerHandle3 func(msg *nats.Msg) (interface{}, error)
  60. type CreateEnity func() interface{}
  61. type PackMessageCallback func(entity interface{}) (*RespPackMessage, error)
  62. type RespPackMessage struct {
  63. CanAck bool
  64. }
  65. // 请求数据
  66. type NatsResponse struct {
  67. ErrorNo int
  68. ErrorDesc string
  69. Result string //json字符串
  70. }
  71. type RequestOptions struct {
  72. Timeout time.Duration
  73. PubOpts []nats.PubOpt
  74. DeployPack string
  75. }
  76. type ReqPayload struct {
  77. Payload interface{}
  78. DeployPack string
  79. }
  80. type ReqPackApi struct {
  81. Payload interface{}
  82. Subject string
  83. Timeout time.Duration
  84. }
  85. // req params
  86. type ReqStreamApi struct {
  87. Subject string
  88. Payload interface{}
  89. PubOpts []nats.PubOpt
  90. }
  91. type CancelSubscribe func()
  92. type SubscribeCallback func(obj interface{}, msg *nats.Msg) interface{}
  93. type SubOption struct {
  94. Sub string
  95. Obj func() interface{}
  96. Call SubscribeCallback
  97. }
  98. func (q *NatsBus) Subscribe(topic string, msg nats.MsgHandler) (*nats.Subscription, error) {
  99. return q.nc.Subscribe(topic, msg)
  100. }
  101. func (q *NatsBus) SubscribeOnce(Option *SubOption) (CancelSubscribe, error) {
  102. var sub *nats.Subscription
  103. var cancel = func() {
  104. if sub != nil {
  105. sub.Unsubscribe()
  106. sub = nil
  107. }
  108. }
  109. s, err := q.nc.Subscribe(Option.Sub, func(msg *nats.Msg) {
  110. var p interface{}
  111. if Option.Obj != nil {
  112. p = Option.Obj()
  113. err := json.Unmarshal(msg.Data, p)
  114. if err != nil {
  115. msg.Term()
  116. fmt.Println(Option.Sub, " payload parse msg err", err)
  117. return
  118. }
  119. }
  120. cancel()
  121. out := Option.Call(p, msg)
  122. if out != nil {
  123. payload, _ := json.Marshal(out)
  124. msg.Respond(payload)
  125. return
  126. }
  127. })
  128. if err != nil {
  129. return nil, err
  130. }
  131. sub = s
  132. return cancel, nil
  133. }
  134. func (q *NatsBus) QueueSubscribe(Option *SubOption) (CancelSubscribe, error) {
  135. var sub *nats.Subscription
  136. var cancel = func() {
  137. if sub != nil {
  138. sub.Unsubscribe()
  139. sub = nil
  140. }
  141. }
  142. s, err := q.nc.QueueSubscribe(Option.Sub, Option.Sub+".queue", func(msg *nats.Msg) {
  143. var p interface{}
  144. if Option.Obj != nil {
  145. p = Option.Obj()
  146. err := json.Unmarshal(msg.Data, p)
  147. if err != nil {
  148. msg.Term()
  149. fmt.Println(Option.Sub, " payload parse msg err", err)
  150. return
  151. }
  152. }
  153. out := Option.Call(p, msg)
  154. if out != nil {
  155. payload, _ := json.Marshal(out)
  156. msg.Respond(payload)
  157. return
  158. }
  159. })
  160. if err != nil {
  161. return nil, err
  162. }
  163. sub = s
  164. return cancel, nil
  165. }
  166. func (q *NatsBus) Publish(topic string, data []byte) error {
  167. return q.nc.Publish(topic, data)
  168. }
  169. func (q *NatsBus) PublishObj(topic string, obj interface{}) error {
  170. data, _ := json.Marshal(obj)
  171. return q.nc.Publish(topic, data)
  172. }
  173. func (q *NatsBus) AddReplyers(w ...*NatsMsgReplyer) {
  174. q.replyers = append(q.replyers, w...)
  175. }
  176. func (q *NatsBus) AddStreamWacher(w ...*NatsStreamWather) {
  177. q.streamWacther = append(q.streamWacther, w...)
  178. }
  179. // 请求代理api
  180. func (q *NatsBus) RequestPackApi(subject string, Payload interface{}, out interface{}, options *RequestOptions) error {
  181. pack := "comm"
  182. timeout := time.Second * 5
  183. if options != nil {
  184. if len(options.DeployPack) > 0 {
  185. pack = options.DeployPack
  186. }
  187. if options.Timeout > 0 {
  188. timeout = options.Timeout
  189. }
  190. }
  191. payload, _ := json.Marshal(&ReqPackApi{Payload: Payload, Subject: subject, Timeout: timeout})
  192. msg, err := q.nc.Request(fmt.Sprintf("%s.%s", pack, PackPublicApi), payload, timeout)
  193. if err != nil {
  194. return err
  195. }
  196. result := &NatsResponse{}
  197. err = json.Unmarshal(msg.Data, result)
  198. if err != nil {
  199. return err
  200. }
  201. if result.ErrorNo != 200 {
  202. return errors.New(result.ErrorDesc)
  203. }
  204. if out != nil {
  205. return json.Unmarshal([]byte(result.Result), out)
  206. }
  207. return nil
  208. }
  209. // 请求本地api
  210. func (q *NatsBus) RequestApi(subject string, data interface{}, timeout time.Duration, out interface{}) error {
  211. // Lock so only one goroutine at a time can access the map c.v.
  212. q.mu.Lock()
  213. defer q.mu.Unlock()
  214. payload := []byte{}
  215. if data != nil {
  216. payload, _ = json.Marshal(data)
  217. }
  218. msg, err := q.nc.Request(subject, payload, timeout)
  219. if err != nil {
  220. return err
  221. }
  222. result := &NatsResponse{}
  223. err = json.Unmarshal(msg.Data, result)
  224. if err != nil {
  225. return err
  226. }
  227. if result.ErrorNo != 200 {
  228. return errors.New(result.ErrorDesc)
  229. }
  230. if out != nil {
  231. return json.Unmarshal([]byte(result.Result), out)
  232. }
  233. return nil
  234. }
  235. func (q *NatsBus) Request(subject string, data interface{}, timeout time.Duration, out interface{}) error {
  236. payload := []byte{}
  237. if data != nil {
  238. payload, _ = json.Marshal(data)
  239. }
  240. msg, err := q.nc.Request(subject, payload, timeout)
  241. if err != nil {
  242. return err
  243. }
  244. if out != nil {
  245. return json.Unmarshal(msg.Data, out)
  246. }
  247. return nil
  248. }
  249. // 退出
  250. func (q *NatsBus) Quit() {
  251. q.nc.Drain()
  252. q.nc.Close()
  253. }
  254. func NewNatsBus2(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers []*NatsMsgReplyer) (*NatsBus, error) {
  255. bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
  256. if err != nil {
  257. return nil, err
  258. }
  259. bus.replyers = replyers
  260. return bus, nil
  261. }
  262. // 可变参数
  263. func NewNatsBus3(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers ...*NatsMsgReplyer) (*NatsBus, error) {
  264. bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
  265. if err != nil {
  266. return nil, err
  267. }
  268. bus.replyers = replyers
  269. return bus, nil
  270. }
  271. func NewNatsBusWithName(name string, natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather) (*NatsBus, error) {
  272. url := natsUri
  273. if len(url) < 1 {
  274. url = nats.DefaultURL
  275. }
  276. fmt.Println("conning to nats ====> ", url)
  277. bus := &NatsBus{
  278. reconnChan: make(chan int),
  279. streamWacther: streamWacther,
  280. topicWatcher: []*NatsTopicWather{},
  281. }
  282. nc, err := nats.Connect(url,
  283. nats.Name(name),
  284. nats.MaxReconnects(MaxReconnect),
  285. nats.ReconnectWait(time.Duration(ReconnDelaySecond*int(time.Second))),
  286. nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
  287. fmt.Printf("%s Got disconnected! Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), err)
  288. bus.reconnChan <- ConnStateDisconned
  289. }),
  290. nats.ReconnectHandler(func(nc *nats.Conn) {
  291. fmt.Printf("%s Got reconnected to %v!\n", time.Now().Format("2006-01-02 15:04:05"), nc.ConnectedUrl())
  292. bus.reconnChan <- ConnStateReconned
  293. }),
  294. nats.ClosedHandler(func(nc *nats.Conn) {
  295. tip := fmt.Sprintf("%s Connection closed. Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), nc.LastError())
  296. fmt.Println(tip)
  297. // panic(tip)
  298. }),
  299. )
  300. if err != nil {
  301. fmt.Println("conn to nats failed ====> ", url, err)
  302. return nil, err
  303. }
  304. bus.nc = nc
  305. fmt.Printf("%s nats bus Connected: %s\n", time.Now().Format("2006-01-02 15:04:05"), url)
  306. return bus, nil
  307. }
  308. // 可变参数
  309. func NewNatsBus4(name string, natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers ...*NatsMsgReplyer) (*NatsBus, error) {
  310. bus, err := NewNatsBusWithName(name, natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
  311. if err != nil {
  312. return nil, err
  313. }
  314. bus.replyers = replyers
  315. return bus, nil
  316. }
  317. func NewNatsBus(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather) (*NatsBus, error) {
  318. url := natsUri
  319. if len(url) < 1 {
  320. url = nats.DefaultURL
  321. }
  322. fmt.Println("conning to nats ====> ", url)
  323. bus := &NatsBus{
  324. reconnChan: make(chan int),
  325. streamWacther: streamWacther,
  326. topicWatcher: []*NatsTopicWather{},
  327. }
  328. nc, err := nats.Connect(url,
  329. // nats.Name("render1"),
  330. nats.MaxReconnects(MaxReconnect),
  331. nats.ReconnectWait(time.Duration(ReconnDelaySecond*int(time.Second))),
  332. nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
  333. fmt.Printf("%s Got disconnected! Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), err)
  334. bus.reconnChan <- ConnStateDisconned
  335. }),
  336. nats.ReconnectHandler(func(nc *nats.Conn) {
  337. fmt.Printf("%s Got reconnected to %v!\n", time.Now().Format("2006-01-02 15:04:05"), nc.ConnectedUrl())
  338. bus.reconnChan <- ConnStateReconned
  339. }),
  340. nats.ClosedHandler(func(nc *nats.Conn) {
  341. tip := fmt.Sprintf("%s Connection closed. Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), nc.LastError())
  342. fmt.Println(tip)
  343. // panic(tip)
  344. }),
  345. )
  346. if err != nil {
  347. fmt.Println("conn to nats failed ====> ", url, err)
  348. return nil, err
  349. }
  350. bus.nc = nc
  351. fmt.Printf("%s nats bus Connected: %s\n", time.Now().Format("2006-01-02 15:04:05"), url)
  352. return bus, nil
  353. }
  354. // 创建jetStream
  355. func (q *NatsBus) JetStream() (nats.JetStreamContext, error) {
  356. return q.nc.JetStream()
  357. }
  358. // 创建jetStream
  359. func (q *NatsBus) CreateStream(streamName string, topic string) (nats.JetStreamContext, error) {
  360. js, err := q.nc.JetStream()
  361. if err != nil {
  362. return nil, err
  363. }
  364. stream, _ := js.StreamInfo(streamName)
  365. if stream != nil {
  366. return js, nil
  367. }
  368. _, err = js.AddStream(&nats.StreamConfig{
  369. Name: streamName,
  370. Subjects: []string{topic},
  371. Retention: nats.WorkQueuePolicy,
  372. })
  373. if err != nil {
  374. fmt.Println("bus CreateStream err=>", err.Error())
  375. return nil, err
  376. }
  377. return js, nil
  378. }
  379. // 监听流数据
  380. func (q *NatsBus) WatchStream(ctxTerm, ctxConn context.Context, conf *NatsStreamWather) error {
  381. streamName := conf.Stream
  382. topic := conf.Topic
  383. queue := conf.Queue
  384. fmt.Println("Watching Stream ", conf.Stream, conf.Topic, conf.Queue)
  385. //创建对应的jet stream 以便模型创建消息不回丢失
  386. js, err := q.CreateStream(streamName, topic)
  387. if err != nil {
  388. fmt.Println("WatchStream error=>", err.Error())
  389. return err
  390. }
  391. opts := []nats.SubOpt{nats.BindStream(streamName)} //nats.MaxDeliver(3)
  392. if conf.AckWaitMinute > 0 {
  393. opts = append(opts, nats.AckWait(time.Duration(conf.AckWaitMinute*int64(time.Minute))))
  394. }
  395. if conf.AckWaitMinuteFloat > 0 {
  396. opts = append(opts, nats.AckWait(time.Duration(int64(conf.AckWaitMinuteFloat*float64(time.Minute)))))
  397. }
  398. sub, err := js.PullSubscribe(topic, queue, opts...)
  399. if err != nil {
  400. fmt.Println("WatchStream QueueSubscribeSync err ", conf.Stream, conf.Topic, err)
  401. return err
  402. }
  403. fmt.Println("Watching Stream succ=>", conf.Stream, conf.Topic, conf.Queue)
  404. var currHandingMsg *nats.Msg = nil
  405. for {
  406. select {
  407. case <-ctxTerm.Done():
  408. fmt.Println("Watching Stream Termed", streamName, topic)
  409. if currHandingMsg != nil {
  410. err := currHandingMsg.Nak()
  411. fmt.Println("terminate currMsg ", err)
  412. }
  413. //中断链接
  414. q.Quit()
  415. return nil
  416. case <-ctxConn.Done():
  417. fmt.Println("Watching Stream Conn closed", streamName, topic)
  418. if currHandingMsg != nil {
  419. err := currHandingMsg.Nak()
  420. fmt.Println("terminate currMsg ", err)
  421. }
  422. return nil
  423. default:
  424. currHandingMsg = nil
  425. msgs, err := sub.Fetch(1)
  426. if err == nil && len(msgs) > 0 {
  427. msg := msgs[0]
  428. currHandingMsg = msg
  429. meta, _ := msg.Metadata()
  430. fmt.Println("meta for ", meta.Stream, "#", topic, "#", meta.Consumer, "#")
  431. fmt.Println("meta NumDelivered", meta.NumDelivered, "NumPending ", meta.NumPending)
  432. fmt.Println("meta Sequence.Consumer", meta.Sequence.Consumer, "Sequence.Stream ", meta.Sequence.Stream)
  433. if conf.Cb2 != nil {
  434. conf.Cb2(msg)
  435. } else if conf.Cb != nil {
  436. var req interface{}
  437. if conf.Entity != nil {
  438. req = conf.Entity()
  439. }
  440. err = json.Unmarshal(msg.Data, req)
  441. if err != nil {
  442. msg.Term()
  443. fmt.Println("work msg err=>", err.Error())
  444. continue
  445. }
  446. conf.Cb(msg, req)
  447. } else {
  448. fmt.Println("error no stream wather callback!!!!")
  449. }
  450. currHandingMsg = nil
  451. }
  452. }
  453. }
  454. }
  455. func (q *NatsBus) GetNatsConn() *nats.Conn {
  456. return q.nc
  457. }
  458. func (q *NatsBus) Run(cb func()) {
  459. //ctx, cancel := context.WithCancel(context.Background())
  460. //ctx, stop := context.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
  461. ctxTerm, cancelTerm := context.WithCancel(context.Background())
  462. ctxConn, cancelCnn := context.WithCancel(context.Background())
  463. defer cancelTerm()
  464. defer cancelCnn()
  465. go func() {
  466. exit := make(chan os.Signal, 1)
  467. signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
  468. <-exit
  469. cancelTerm()
  470. time.Sleep(1 * time.Second)
  471. os.Exit(1)
  472. }()
  473. var QueueSubscribe = func(rplyer *NatsMsgReplyer) {
  474. r1 := rplyer
  475. queue := fmt.Sprintf("%s.queue", r1.Subject)
  476. q.nc.QueueSubscribe(r1.Subject, queue, func(msg *nats.Msg) {
  477. defer func() {
  478. if err := recover(); err != nil {
  479. fmt.Println("catch error ", err)
  480. }
  481. }()
  482. fmt.Println("replay for ", r1.Subject)
  483. var req interface{}
  484. if r1.Entity != nil {
  485. req = r1.Entity()
  486. err := json.Unmarshal(msg.Data, req)
  487. if err != nil {
  488. msg.Term()
  489. fmt.Println("rplyer work msg err", err.Error())
  490. return
  491. }
  492. }
  493. if r1.Cb != nil {
  494. out := r1.Cb(msg, req)
  495. if out != nil {
  496. payload, err := json.Marshal(out)
  497. if err != nil {
  498. fmt.Println("error reply for", r1.Subject, err)
  499. }
  500. err = msg.Respond(payload)
  501. if err != nil {
  502. fmt.Println("error reply for", r1.Subject, out)
  503. }
  504. }
  505. return
  506. }
  507. if r1.Cb2 != nil {
  508. out, err := r1.Cb2(msg, req)
  509. resp := &NatsResponse{ErrorNo: 200}
  510. if err != nil {
  511. fmt.Println("error reply2 for", r1.Subject, err)
  512. resp.ErrorDesc = err.Error()
  513. resp.ErrorNo = 400
  514. } else {
  515. payload, _ := json.Marshal(out)
  516. resp.Result = string(payload)
  517. }
  518. payload, _ := json.Marshal(resp)
  519. err = msg.Respond(payload)
  520. if err != nil {
  521. fmt.Println("error reply2 for", r1.Subject, out, err.Error())
  522. }
  523. return
  524. }
  525. })
  526. }
  527. var startSubscribe = func() {
  528. //启动所有watcher
  529. for _, item := range q.streamWacther {
  530. go q.WatchStream(ctxTerm, ctxConn, item)
  531. }
  532. for _, rplyer := range q.replyers {
  533. QueueSubscribe(rplyer)
  534. }
  535. }
  536. q.runtimeStartStreamWatcher = func(conf *NatsStreamWather) error {
  537. go q.WatchStream(ctxTerm, ctxConn, conf)
  538. return nil
  539. }
  540. startSubscribe()
  541. if cb != nil {
  542. cb()
  543. }
  544. for {
  545. select {
  546. case state := <-q.reconnChan:
  547. if state == ConnStateDisconned {
  548. cancelCnn()
  549. } else if state == ConnStateReconned {
  550. ctxConn, cancelCnn = context.WithCancel(context.Background())
  551. startSubscribe()
  552. }
  553. }
  554. }
  555. }