package comm import ( "errors" "fmt" "strings" "time" "github.com/nats-io/nats.go" ) type ReplyerApi struct { Subject string Timeout time.Duration } type MessageApi struct { Stream string Subject string } func CreateBusProxyReplyers(adpters map[string]*NatsBus, localBus *NatsBus) ([]*NatsMsgReplyer, error) { out := []*NatsMsgReplyer{} for pack, adapter := range adpters { var remoteBus = adapter if remoteBus == nil { continue } //每个包创建一个代理 subject := fmt.Sprintf("%s.%s", pack, PackPublicApi) out = append(out, &NatsMsgReplyer{ Subject: subject, //包名.PackPublicApi Entity: func() interface{} { return &ReqPackApi{} }, Cb2: func(msg *nats.Msg, payload interface{}) (interface{}, error) { entity, ok := payload.(*ReqPackApi) if !ok { return nil, fmt.Errorf("subject %s need payload type of ReqPackApi", subject) } var ret interface{} timout := 5 * time.Second if entity.Timeout > 0 { timout = entity.Timeout } fmt.Println("proxying api=>", entity.Subject, time.Now().Format("2006-01-02 15:04:05"), entity.Payload, remoteBus.nc.Opts.Url) err := remoteBus.RequestApi(entity.Subject, entity.Payload, timout, &ret) fmt.Println("proxying api back =>", entity.Subject, time.Now().Format("2006-01-02 15:04:05")) return ret, err }, }) //代理转发所有远程暴露消息 subjectPush := fmt.Sprintf("%s.%s", pack, PackPublicMessageApi) out = append(out, &NatsMsgReplyer{ Subject: subjectPush, //包名.PackPublicMessageApi Entity: func() interface{} { return &ReqStreamApi{} }, Cb2: func(msg *nats.Msg, payload interface{}) (interface{}, error) { pld, ok := payload.(*ReqStreamApi) if !ok { return nil, fmt.Errorf("subject %s need payload type of ReqStreamApi", subjectPush) } fmt.Println("proxying message api=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05")) err := remoteBus.PushMessage(pld.Subject, pld.Payload, pld.PubOpts...) fmt.Println("proxying message api back=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05")) return true, err }, }) //代理远程消息的返回转发 subjectPull := fmt.Sprintf("%s.%s", pack, PackPullMessageApi) //包名.PackPullMessageApi out = append(out, &NatsMsgReplyer{ Subject: subjectPull, Entity: func() interface{} { return &ReqPullPackMessageApi{} }, Cb2: func(msg *nats.Msg, payload interface{}) (interface{}, error) { pld, ok := payload.(*ReqPullPackMessageApi) if !ok { return nil, fmt.Errorf("subject %s need payload type of ReqPullPackMessageApi", subjectPull) } fmt.Println("proxying pull message api=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05")) err := remoteBus.CreateRemotePackMessageWatcher(pld, localBus) fmt.Println("proxying pull message api back=>", pld.Subject, time.Now().Format("2006-01-02 15:04:05")) return true, err }, }) } return out, nil } func (q *NatsBus) CreateRemotePackMessageWatcher(req *ReqPullPackMessageApi, localBus *NatsBus) error { if q.runtimeStartStreamWatcher == nil { return errors.New("remote bus is not running") } streamAndTopic := req.Subject names := strings.Split(streamAndTopic, "#") if len(names) < 2 || len(names[0]) < 1 || len(names[1]) < 1 { return errors.New("streamAndTopic should like stream#topic") } queueName := "" if len(names) == 2 { v := strings.ReplaceAll(names[1], ".", "-") queueName = v + "-queue" } else { queueName = names[2] } find := false for _, w := range q.streamWacther { if w.Stream == names[0] && w.Topic == names[1] && w.Queue == queueName { find = true break } } if find { return fmt.Errorf("pack message Subject %s has registered", streamAndTopic) } subjectCallBack := req.SubjectCallBack ackWaitMinute := req.AckWaitMinute watcher := &NatsStreamWather{ Topic: names[1], Stream: names[0], Queue: queueName, AckWaitMinuteFloat: ackWaitMinute, Cb2: func(msg *nats.Msg) { Resp := &RespPackMessage{} fmt.Println("proxy message coming", names[1], names[0], queueName, "calling=>", subjectCallBack, "timeout minut", ackWaitMinute) err := localBus.RequestApi(subjectCallBack, msg.Data, time.Duration(int64(ackWaitMinute*float64(time.Minute))), Resp) fmt.Println("calling back error =>", err, Resp.CanAck) if Resp.CanAck { msg.Ack() } }, } q.streamWacther = append(q.streamWacther, watcher) q.runtimeStartStreamWatcher(watcher) return nil }