nats.go 14 KB


  1. package comm
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "os/signal"
  9. "sync"
  10. "syscall"
  11. "time"
  12. "github.com/nats-io/nats.go"
  13. )
  14. const (
  15. ConnStateDisconned = 1
  16. ConnStateReconned = 2
  17. )
  18. // NATSQueue queue for work
  19. type NatsBus struct {
  20. mu sync.Mutex
  21. nc *nats.Conn
  22. reconnChan chan int
  23. streamWacther []*NatsStreamWather
  24. topicWatcher []*NatsTopicWather
  25. replyers []*NatsMsgReplyer
  26. runtimeStartStreamWatcher StartStreamWatcherHandle
  27. }
  28. type StartStreamWatcherHandle func(conf *NatsStreamWather) error
  29. type NatsStreamWather struct {
  30. Stream string
  31. Topic string
  32. Queue string
  33. AckWaitMinute int64
  34. AckWaitMinuteFloat float64
  35. Entity CreateEnity
  36. Cb WatcherCallback
  37. Cb2 WatcherCallback2
  38. Exported bool //在其他部署包内,是否可以被调用
  39. }
  40. // 消息回复者
  41. type NatsMsgReplyer struct {
  42. Subject string
  43. Entity CreateEnity
  44. Exported bool //在其他部署包内,是否可以被调用
  45. Timeout time.Duration //调用的最大时间
  46. Cb ReplyerHandle
  47. Cb2 ReplyerHandle2
  48. }
  49. type NatsTopicWather struct {
  50. Topic string
  51. Cb WatcherCallback
  52. }
  53. type Stream func(streamName string, topic string, group string)
  54. type ListenTopic func(topic string)
  55. type WatcherCallback func(msg *nats.Msg, entity interface{})
  56. type WatcherCallback2 func(msg *nats.Msg)
  57. type ReplyerHandle func(msg *nats.Msg, entity interface{}) interface{}
  58. type ReplyerHandle2 func(msg *nats.Msg, entity interface{}) (interface{}, error)
  59. type ReplyerHandle3 func(msg *nats.Msg) (interface{}, error)
  60. type CreateEnity func() interface{}
  61. type PackMessageCallback func(entity interface{}) (*RespPackMessage, error)
  62. type RespPackMessage struct {
  63. CanAck bool
  64. }
  65. // 请求数据
  66. type NatsResponse struct {
  67. ErrorNo int
  68. ErrorDesc string
  69. Result string //json字符串
  70. }
  71. type RequestOptions struct {
  72. Timeout time.Duration
  73. PubOpts []nats.PubOpt
  74. DeployPack string
  75. }
  76. type ReqPayload struct {
  77. Payload interface{}
  78. DeployPack string
  79. }
  80. type ReqPackApi struct {
  81. Payload interface{}
  82. Subject string
  83. Timeout time.Duration
  84. }
  85. // req params
  86. type ReqStreamApi struct {
  87. Subject string
  88. Payload interface{}
  89. PubOpts []nats.PubOpt
  90. }
  91. type CancelSubscribe func()
  92. type SubscribeCallback func(obj interface{}, msg *nats.Msg) interface{}
  93. type SubOption struct {
  94. Sub string
  95. Obj func() interface{}
  96. Call SubscribeCallback
  97. }
  98. func (q *NatsBus) Subscribe(topic string, msg nats.MsgHandler) (*nats.Subscription, error) {
  99. return q.nc.Subscribe(topic, msg)
  100. }
  101. func (q *NatsBus) SubscribeOnce(Option *SubOption) (CancelSubscribe, error) {
  102. var sub *nats.Subscription
  103. var cancel = func() {
  104. if sub != nil {
  105. sub.Unsubscribe()
  106. sub = nil
  107. }
  108. }
  109. s, err := q.nc.Subscribe(Option.Sub, func(msg *nats.Msg) {
  110. var p interface{}
  111. if Option.Obj != nil {
  112. p = Option.Obj()
  113. err := json.Unmarshal(msg.Data, p)
  114. if err != nil {
  115. msg.Term()
  116. fmt.Println(Option.Sub, " payload parse msg err", err)
  117. return
  118. }
  119. }
  120. cancel()
  121. out := Option.Call(p, msg)
  122. if out != nil {
  123. payload, _ := json.Marshal(out)
  124. msg.Respond(payload)
  125. return
  126. }
  127. })
  128. if err != nil {
  129. return nil, err
  130. }
  131. sub = s
  132. return cancel, nil
  133. }
  134. func (q *NatsBus) QueueSubscribe(Option *SubOption) (CancelSubscribe, error) {
  135. var sub *nats.Subscription
  136. var cancel = func() {
  137. if sub != nil {
  138. sub.Unsubscribe()
  139. sub = nil
  140. }
  141. }
  142. s, err := q.nc.QueueSubscribe(Option.Sub, Option.Sub+".queue", func(msg *nats.Msg) {
  143. var p interface{}
  144. if Option.Obj != nil {
  145. p = Option.Obj()
  146. err := json.Unmarshal(msg.Data, p)
  147. if err != nil {
  148. msg.Term()
  149. fmt.Println(Option.Sub, " payload parse msg err", err)
  150. return
  151. }
  152. }
  153. out := Option.Call(p, msg)
  154. if out != nil {
  155. payload, _ := json.Marshal(out)
  156. msg.Respond(payload)
  157. return
  158. }
  159. })
  160. if err != nil {
  161. return nil, err
  162. }
  163. sub = s
  164. return cancel, nil
  165. }
  166. func (q *NatsBus) Publish(topic string, data []byte) error {
  167. return q.nc.Publish(topic, data)
  168. }
  169. func (q *NatsBus) PublishObj(topic string, obj interface{}) error {
  170. data, _ := json.Marshal(obj)
  171. return q.nc.Publish(topic, data)
  172. }
  173. func (q *NatsBus) AddReplyers(w ...*NatsMsgReplyer) {
  174. q.replyers = append(q.replyers, w...)
  175. }
  176. func (q *NatsBus) AddStreamWacher(w ...*NatsStreamWather) {
  177. q.streamWacther = append(q.streamWacther, w...)
  178. }
  179. // 请求代理api
  180. func (q *NatsBus) RequestPackApi(subject string, Payload interface{}, out interface{}, options *RequestOptions) error {
  181. pack := "comm"
  182. timeout := time.Second * 5
  183. if options != nil {
  184. if len(options.DeployPack) > 0 {
  185. pack = options.DeployPack
  186. }
  187. if options.Timeout > 0 {
  188. timeout = options.Timeout
  189. }
  190. }
  191. payload, _ := json.Marshal(&ReqPackApi{Payload: Payload, Subject: subject, Timeout: timeout})
  192. msg, err := q.nc.Request(fmt.Sprintf("%s.%s", pack, PackPublicApi), payload, timeout)
  193. if err != nil {
  194. return err
  195. }
  196. result := &NatsResponse{}
  197. err = json.Unmarshal(msg.Data, result)
  198. if err != nil {
  199. return err
  200. }
  201. if result.ErrorNo != 200 {
  202. return errors.New(result.ErrorDesc)
  203. }
  204. if out != nil {
  205. return json.Unmarshal([]byte(result.Result), out)
  206. }
  207. return nil
  208. }
  209. // 请求本地api
  210. func (q *NatsBus) RequestApi(subject string, data interface{}, timeout time.Duration, out interface{}) error {
  211. // Lock so only one goroutine at a time can access the map c.v.
  212. q.mu.Lock()
  213. defer q.mu.Unlock()
  214. payload := []byte{}
  215. if data != nil {
  216. payload, _ = json.Marshal(data)
  217. }
  218. msg, err := q.nc.Request(subject, payload, timeout)
  219. if err != nil {
  220. return err
  221. }
  222. result := &NatsResponse{}
  223. err = json.Unmarshal(msg.Data, result)
  224. if err != nil {
  225. return err
  226. }
  227. if result.ErrorNo != 200 {
  228. return errors.New(result.ErrorDesc)
  229. }
  230. if out != nil {
  231. return json.Unmarshal([]byte(result.Result), out)
  232. }
  233. return nil
  234. }
  235. func (q *NatsBus) Request(subject string, data interface{}, timeout time.Duration, out interface{}) error {
  236. payload := []byte{}
  237. if data != nil {
  238. payload, _ = json.Marshal(data)
  239. }
  240. msg, err := q.nc.Request(subject, payload, timeout)
  241. if err != nil {
  242. return err
  243. }
  244. return json.Unmarshal(msg.Data, out)
  245. }
  246. // 退出
  247. func (q *NatsBus) Quit() {
  248. q.nc.Drain()
  249. q.nc.Close()
  250. }
  251. func NewNatsBus2(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers []*NatsMsgReplyer) (*NatsBus, error) {
  252. bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
  253. if err != nil {
  254. return nil, err
  255. }
  256. bus.replyers = replyers
  257. return bus, nil
  258. }
  259. // 可变参数
  260. func NewNatsBus3(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers ...*NatsMsgReplyer) (*NatsBus, error) {
  261. bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
  262. if err != nil {
  263. return nil, err
  264. }
  265. bus.replyers = replyers
  266. return bus, nil
  267. }
  268. func NewNatsBus(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather) (*NatsBus, error) {
  269. url := natsUri
  270. if len(url) < 1 {
  271. url = nats.DefaultURL
  272. }
  273. fmt.Println("conning to nats ====> ", url)
  274. bus := &NatsBus{
  275. reconnChan: make(chan int),
  276. streamWacther: streamWacther,
  277. topicWatcher: []*NatsTopicWather{},
  278. }
  279. nc, err := nats.Connect(url,
  280. // nats.Name("render1"),
  281. nats.MaxReconnects(MaxReconnect),
  282. nats.ReconnectWait(time.Duration(ReconnDelaySecond*int(time.Second))),
  283. nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
  284. fmt.Printf("%s Got disconnected! Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), err)
  285. bus.reconnChan <- ConnStateDisconned
  286. }),
  287. nats.ReconnectHandler(func(nc *nats.Conn) {
  288. fmt.Printf("%s Got reconnected to %v!\n", time.Now().Format("2006-01-02 15:04:05"), nc.ConnectedUrl())
  289. bus.reconnChan <- ConnStateReconned
  290. }),
  291. nats.ClosedHandler(func(nc *nats.Conn) {
  292. tip := fmt.Sprintf("%s Connection closed. Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), nc.LastError())
  293. fmt.Println(tip)
  294. // panic(tip)
  295. }),
  296. )
  297. if err != nil {
  298. fmt.Println("conn to nats failed ====> ", url, err)
  299. return nil, err
  300. }
  301. bus.nc = nc
  302. fmt.Printf("%s nats bus Connected: %s\n", time.Now().Format("2006-01-02 15:04:05"), url)
  303. return bus, nil
  304. }
  305. // 创建jetStream
  306. func (q *NatsBus) JetStream() (nats.JetStreamContext, error) {
  307. return q.nc.JetStream()
  308. }
  309. // 创建jetStream
  310. func (q *NatsBus) CreateStream(streamName string, topic string) (nats.JetStreamContext, error) {
  311. js, err := q.nc.JetStream()
  312. if err != nil {
  313. return nil, err
  314. }
  315. stream, _ := js.StreamInfo(streamName)
  316. if stream != nil {
  317. return js, nil
  318. }
  319. _, err = js.AddStream(&nats.StreamConfig{
  320. Name: streamName,
  321. Subjects: []string{topic},
  322. Retention: nats.WorkQueuePolicy,
  323. })
  324. if err != nil {
  325. fmt.Println("bus CreateStream err=>", err.Error())
  326. return nil, err
  327. }
  328. return js, nil
  329. }
  330. // 监听流数据
  331. func (q *NatsBus) WatchStream(ctxTerm, ctxConn context.Context, conf *NatsStreamWather) error {
  332. streamName := conf.Stream
  333. topic := conf.Topic
  334. queue := conf.Queue
  335. fmt.Println("Watching Stream ", conf.Stream, conf.Topic, conf.Queue)
  336. //创建对应的jet stream 以便模型创建消息不回丢失
  337. js, err := q.CreateStream(streamName, topic)
  338. if err != nil {
  339. fmt.Println("WatchStream error=>", err.Error())
  340. return err
  341. }
  342. opts := []nats.SubOpt{nats.BindStream(streamName)} //nats.MaxDeliver(3)
  343. if conf.AckWaitMinute > 0 {
  344. opts = append(opts, nats.AckWait(time.Duration(conf.AckWaitMinute*int64(time.Minute))))
  345. }
  346. if conf.AckWaitMinuteFloat > 0 {
  347. opts = append(opts, nats.AckWait(time.Duration(int64(conf.AckWaitMinuteFloat*float64(time.Minute)))))
  348. }
  349. sub, err := js.PullSubscribe(topic, queue, opts...)
  350. if err != nil {
  351. fmt.Println("WatchStream QueueSubscribeSync err ", conf.Stream, conf.Topic, err)
  352. return err
  353. }
  354. fmt.Println("Watching Stream succ=>", conf.Stream, conf.Topic, conf.Queue)
  355. var currHandingMsg *nats.Msg = nil
  356. for {
  357. select {
  358. case <-ctxTerm.Done():
  359. fmt.Println("Watching Stream Termed", streamName, topic)
  360. if currHandingMsg != nil {
  361. err := currHandingMsg.Nak()
  362. fmt.Println("terminate currMsg ", err)
  363. }
  364. //中断链接
  365. q.Quit()
  366. return nil
  367. case <-ctxConn.Done():
  368. fmt.Println("Watching Stream Conn closed", streamName, topic)
  369. if currHandingMsg != nil {
  370. err := currHandingMsg.Nak()
  371. fmt.Println("terminate currMsg ", err)
  372. }
  373. return nil
  374. default:
  375. currHandingMsg = nil
  376. msgs, err := sub.Fetch(1)
  377. if err == nil && len(msgs) > 0 {
  378. msg := msgs[0]
  379. currHandingMsg = msg
  380. meta, _ := msg.Metadata()
  381. fmt.Println("meta for ", meta.Stream, "#", topic, "#", meta.Consumer, "#")
  382. fmt.Println("meta NumDelivered", meta.NumDelivered, "NumPending ", meta.NumPending)
  383. fmt.Println("meta Sequence.Consumer", meta.Sequence.Consumer, "Sequence.Stream ", meta.Sequence.Stream)
  384. if conf.Cb2 != nil {
  385. conf.Cb2(msg)
  386. } else if conf.Cb != nil {
  387. var req interface{}
  388. if conf.Entity != nil {
  389. req = conf.Entity()
  390. }
  391. err = json.Unmarshal(msg.Data, req)
  392. if err != nil {
  393. msg.Term()
  394. fmt.Println("work msg err=>", err.Error())
  395. continue
  396. }
  397. conf.Cb(msg, req)
  398. } else {
  399. fmt.Println("error no stream wather callback!!!!")
  400. }
  401. currHandingMsg = nil
  402. }
  403. }
  404. }
  405. }
  406. func (q *NatsBus) GetNatsConn() *nats.Conn {
  407. return q.nc
  408. }
  409. func (q *NatsBus) Run(cb func()) {
  410. //ctx, cancel := context.WithCancel(context.Background())
  411. //ctx, stop := context.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
  412. ctxTerm, cancelTerm := context.WithCancel(context.Background())
  413. ctxConn, cancelCnn := context.WithCancel(context.Background())
  414. defer cancelTerm()
  415. defer cancelCnn()
  416. go func() {
  417. exit := make(chan os.Signal, 1)
  418. signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
  419. <-exit
  420. cancelTerm()
  421. time.Sleep(1 * time.Second)
  422. os.Exit(1)
  423. }()
  424. var QueueSubscribe = func(rplyer *NatsMsgReplyer) {
  425. r1 := rplyer
  426. queue := fmt.Sprintf("%s.queue", r1.Subject)
  427. q.nc.QueueSubscribe(r1.Subject, queue, func(msg *nats.Msg) {
  428. defer func() {
  429. if err := recover(); err != nil {
  430. fmt.Println("catch error ", err)
  431. }
  432. }()
  433. fmt.Println("replay for ", r1.Subject)
  434. var req interface{}
  435. if r1.Entity != nil {
  436. req = r1.Entity()
  437. err := json.Unmarshal(msg.Data, req)
  438. if err != nil {
  439. msg.Term()
  440. fmt.Println("rplyer work msg err", err.Error())
  441. return
  442. }
  443. }
  444. if r1.Cb != nil {
  445. out := r1.Cb(msg, req)
  446. if out != nil {
  447. payload, err := json.Marshal(out)
  448. if err != nil {
  449. fmt.Println("error reply for", r1.Subject, err)
  450. }
  451. err = msg.Respond(payload)
  452. if err != nil {
  453. fmt.Println("error reply for", r1.Subject, out)
  454. }
  455. }
  456. return
  457. }
  458. if r1.Cb2 != nil {
  459. out, err := r1.Cb2(msg, req)
  460. resp := &NatsResponse{ErrorNo: 200}
  461. if err != nil {
  462. fmt.Println("error reply2 for", r1.Subject, err)
  463. resp.ErrorDesc = err.Error()
  464. resp.ErrorNo = 400
  465. } else {
  466. payload, _ := json.Marshal(out)
  467. resp.Result = string(payload)
  468. }
  469. payload, _ := json.Marshal(resp)
  470. err = msg.Respond(payload)
  471. if err != nil {
  472. fmt.Println("error reply2 for", r1.Subject, out)
  473. }
  474. return
  475. }
  476. })
  477. }
  478. var startSubscribe = func() {
  479. //启动所有watcher
  480. for _, item := range q.streamWacther {
  481. go q.WatchStream(ctxTerm, ctxConn, item)
  482. }
  483. for _, rplyer := range q.replyers {
  484. QueueSubscribe(rplyer)
  485. }
  486. }
  487. q.runtimeStartStreamWatcher = func(conf *NatsStreamWather) error {
  488. go q.WatchStream(ctxTerm, ctxConn, conf)
  489. return nil
  490. }
  491. startSubscribe()
  492. if cb != nil {
  493. cb()
  494. }
  495. for {
  496. select {
  497. case state := <-q.reconnChan:
  498. if state == ConnStateDisconned {
  499. cancelCnn()
  500. } else if state == ConnStateReconned {
  501. ctxConn, cancelCnn = context.WithCancel(context.Background())
  502. startSubscribe()
  503. }
  504. }
  505. }
  506. }