nats-proxy.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package comm
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/nats-io/nats.go"
  8. )
  9. type ReplyerApi struct {
  10. Subject string
  11. Timeout time.Duration
  12. }
  13. type MessageApi struct {
  14. Stream string
  15. Subject string
  16. }
  17. func CreateBusProxyReplyers(adpters map[string]*NatsBus, localBus *NatsBus) ([]*NatsMsgReplyer, error) {
  18. out := []*NatsMsgReplyer{}
  19. for pack, adapter := range adpters {
  20. var remoteBus = adapter
  21. if remoteBus == nil {
  22. continue
  23. }
  24. //每个包创建一个代理
  25. subject := fmt.Sprintf("%s.%s", pack, PackPublicApi)
  26. out = append(out, &NatsMsgReplyer{
  27. Subject: subject, //包名.PackPublicApi
  28. Entity: func() interface{} { return &ReqPackApi{} },
  29. Cb2: func(msg *nats.Msg, payload interface{}) (interface{}, error) {
  30. entity, ok := payload.(*ReqPackApi)
  31. if !ok {
  32. return nil, fmt.Errorf("subject %s need payload type of ReqPackApi", subject)
  33. }
  34. var ret interface{}
  35. timout := 5 * time.Second
  36. if entity.Timeout > 0 {
  37. timout = entity.Timeout
  38. }
  39. fmt.Println("proxying api=>", entity.Subject, time.Now().Format("2006-01-02 15:04:05"), entity.Payload, remoteBus.nc.Opts.Url)
  40. err := remoteBus.RequestApi(entity.Subject, entity.Payload, timout, &ret)
  41. fmt.Println("proxying api back =>", entity.Subject, time.Now().Format("2006-01-02 15:04:05"))
  42. return ret, err
  43. },
  44. })
  45. //代理转发所有远程暴露消息
  46. subjectPush := fmt.Sprintf("%s.%s", pack, PackPublicMessageApi)
  47. out = append(out, &NatsMsgReplyer{
  48. Subject: subjectPush, //包名.PackPublicMessageApi
  49. Entity: func() interface{} { return &ReqStreamApi{} },
  50. Cb2: func(msg *nats.Msg, payload interface{}) (interface{}, error) {
  51. pld, ok := payload.(*ReqStreamApi)
  52. if !ok {
  53. return nil, fmt.Errorf("subject %s need payload type of ReqStreamApi", subjectPush)
  54. }
  55. fmt.Println("proxying message api=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05"))
  56. err := remoteBus.PushMessage(pld.Subject, pld.Payload, pld.PubOpts...)
  57. fmt.Println("proxying message api back=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05"))
  58. return true, err
  59. },
  60. })
  61. //代理远程消息的返回转发
  62. subjectPull := fmt.Sprintf("%s.%s", pack, PackPullMessageApi) //包名.PackPullMessageApi
  63. out = append(out, &NatsMsgReplyer{
  64. Subject: subjectPull,
  65. Entity: func() interface{} { return &ReqPullPackMessageApi{} },
  66. Cb2: func(msg *nats.Msg, payload interface{}) (interface{}, error) {
  67. pld, ok := payload.(*ReqPullPackMessageApi)
  68. if !ok {
  69. return nil, fmt.Errorf("subject %s need payload type of ReqPullPackMessageApi", subjectPull)
  70. }
  71. fmt.Println("proxying pull message api=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05"))
  72. err := remoteBus.CreateRemotePackMessageWatcher(pld, localBus)
  73. fmt.Println("proxying pull message api back=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05"))
  74. return true, err
  75. },
  76. })
  77. }
  78. return out, nil
  79. }
  80. func (q *NatsBus) CreateRemotePackMessageWatcher(req *ReqPullPackMessageApi, localBus *NatsBus) error {
  81. if q.runtimeStartStreamWatcher == nil {
  82. return errors.New("remote bus is not running")
  83. }
  84. streamAndTopic := req.Subject
  85. names := strings.Split(streamAndTopic, "#")
  86. if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 {
  87. return errors.New("streamAndTopic should like stream#topic")
  88. }
  89. queueName := ""
  90. if len(names) == 2 {
  91. v := strings.ReplaceAll(names[1], ".", "-")
  92. queueName = v + "-queue"
  93. } else {
  94. queueName = names[2]
  95. }
  96. find := false
  97. for _, w := range q.streamWacther {
  98. if w.Stream == names[0] && w.Topic == names[1] && w.Queue == queueName {
  99. find = true
  100. break
  101. }
  102. }
  103. if find {
  104. return fmt.Errorf("pack message Subject %s has registered", streamAndTopic)
  105. }
  106. subjectCallBack := req.SubjectCallBack
  107. ackWaitMinute := req.AckWaitMinute
  108. watcher := &NatsStreamWather{
  109. Topic: names[1],
  110. Stream: names[0],
  111. Queue: queueName,
  112. AckWaitMinuteFloat: ackWaitMinute,
  113. Cb2: func(msg *nats.Msg) {
  114. Resp := &RespPackMessage{}
  115. fmt.Println("proxy message coming", names[1], names[0], queueName, "calling=>", subjectCallBack, "timeout minut", ackWaitMinute)
  116. err := localBus.RequestApi(subjectCallBack, msg.Data, time.Duration(int64(ackWaitMinute*float64(time.Minute))), Resp)
  117. fmt.Println("calling back error =>", err, Resp.CanAck)
  118. if Resp.CanAck {
  119. msg.Ack()
  120. }
  121. },
  122. }
  123. q.streamWacther = append(q.streamWacther, watcher)
  124. q.runtimeStartStreamWatcher(watcher)
  125. return nil
  126. }