123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- package comm
- import (
- "errors"
- "fmt"
- "strings"
- "time"
- "github.com/nats-io/nats.go"
- )
- type ReplyerApi struct {
- Subject string
- Timeout time.Duration
- }
- type MessageApi struct {
- Stream string
- Subject string
- }
- func CreateBusProxyReplyers(adpters map[string]*NatsBus, localBus *NatsBus) ([]*NatsMsgReplyer, error) {
- out := []*NatsMsgReplyer{}
- for pack, adapter := range adpters {
- var remoteBus = adapter
- if remoteBus == nil {
- continue
- }
- //每个包创建一个代理
- subject := fmt.Sprintf("%s.%s", pack, PackPublicApi)
- out = append(out, &NatsMsgReplyer{
- Subject: subject, //包名.PackPublicApi
- Entity: func() interface{} { return &ReqPackApi{} },
- Cb2: func(msg *nats.Msg, payload interface{}) (interface{}, error) {
- entity, ok := payload.(*ReqPackApi)
- if !ok {
- return nil, fmt.Errorf("subject %s need payload type of ReqPackApi", subject)
- }
- var ret interface{}
- timout := 5 * time.Second
- if entity.Timeout > 0 {
- timout = entity.Timeout
- }
- fmt.Println("proxying api=>", entity.Subject, time.Now().Format("2006-01-02 15:04:05"), entity.Payload, remoteBus.nc.Opts.Url)
- err := remoteBus.RequestApi(entity.Subject, entity.Payload, timout, &ret)
- fmt.Println("proxying api back =>", entity.Subject, time.Now().Format("2006-01-02 15:04:05"))
- return ret, err
- },
- })
- //代理转发所有远程暴露消息
- subjectPush := fmt.Sprintf("%s.%s", pack, PackPublicMessageApi)
- out = append(out, &NatsMsgReplyer{
- Subject: subjectPush, //包名.PackPublicMessageApi
- Entity: func() interface{} { return &ReqStreamApi{} },
- Cb2: func(msg *nats.Msg, payload interface{}) (interface{}, error) {
- pld, ok := payload.(*ReqStreamApi)
- if !ok {
- return nil, fmt.Errorf("subject %s need payload type of ReqStreamApi", subjectPush)
- }
- fmt.Println("proxying message api=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05"))
- err := remoteBus.PushMessage(pld.Subject, pld.Payload, pld.PubOpts...)
- fmt.Println("proxying message api back=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05"))
- return true, err
- },
- })
- //代理远程消息的返回转发
- subjectPull := fmt.Sprintf("%s.%s", pack, PackPullMessageApi) //包名.PackPullMessageApi
- out = append(out, &NatsMsgReplyer{
- Subject: subjectPull,
- Entity: func() interface{} { return &ReqPullPackMessageApi{} },
- Cb2: func(msg *nats.Msg, payload interface{}) (interface{}, error) {
- pld, ok := payload.(*ReqPullPackMessageApi)
- if !ok {
- return nil, fmt.Errorf("subject %s need payload type of ReqPullPackMessageApi", subjectPull)
- }
- fmt.Println("proxying pull message api=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05"))
- err := remoteBus.CreateRemotePackMessageWatcher(pld, localBus)
- fmt.Println("proxying pull message api back=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05"))
- return true, err
- },
- })
- }
- return out, nil
- }
- func (q *NatsBus) CreateRemotePackMessageWatcher(req *ReqPullPackMessageApi, localBus *NatsBus) error {
- if q.runtimeStartStreamWatcher == nil {
- return errors.New("remote bus is not running")
- }
- streamAndTopic := req.Subject
- names := strings.Split(streamAndTopic, "#")
- if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 {
- return errors.New("streamAndTopic should like stream#topic")
- }
- queueName := ""
- if len(names) == 2 {
- v := strings.ReplaceAll(names[1], ".", "-")
- queueName = v + "-queue"
- } else {
- queueName = names[2]
- }
- find := false
- for _, w := range q.streamWacther {
- if w.Stream == names[0] && w.Topic == names[1] && w.Queue == queueName {
- find = true
- break
- }
- }
- if find {
- return fmt.Errorf("pack message Subject %s has registered", streamAndTopic)
- }
- subjectCallBack := req.SubjectCallBack
- ackWaitMinute := req.AckWaitMinute
- watcher := &NatsStreamWather{
- Topic: names[1],
- Stream: names[0],
- Queue: queueName,
- AckWaitMinuteFloat: ackWaitMinute,
- Cb2: func(msg *nats.Msg) {
- Resp := &RespPackMessage{}
- fmt.Println("proxy message coming", names[1], names[0], queueName, "calling=>", subjectCallBack, "timeout minut", ackWaitMinute)
- err := localBus.RequestApi(subjectCallBack, msg.Data, time.Duration(int64(ackWaitMinute*float64(time.Minute))), Resp)
- fmt.Println("calling back error =>", err, Resp.CanAck)
- if Resp.CanAck {
- msg.Ack()
- }
- },
- }
- q.streamWacther = append(q.streamWacther, watcher)
- q.runtimeStartStreamWatcher(watcher)
- return nil
- }
|