package bus import ( "assetcenter/conf" "assetcenter/db/model" "assetcenter/log" "strings" "github.com/nats-io/nats.go" "infish.cn/comm" ) type OsgShadowResult struct { AssetId string `json:"assetId"` DbId string `json:"dbId"` Table string `json:"table"` MeshUrl string `json:"meshUrl"` Shadow *model.OssType `json:"shadow"` } func CreateShadowResultWather(app *conf.AppConf) *comm.NatsStreamWather { topics := strings.Split(app.Nats.OsgShadowStreamTopic, "#") return &comm.NatsStreamWather{ Stream: topics[0], Topic: topics[1], Queue: topics[2], MaxDeliver: 5, // 添加 MaxDeliver 配置 AckPolicy: nats.AckExplicitPolicy, AckWaitMinute: 5, Entity: func() interface{} { return &OsgShadowResult{} }, Cb: func(msg *nats.Msg, entity interface{}) { result := entity.(*OsgShadowResult) log.Info("------------------------[shadow-result]-------------------------") log.Infof("%#v\n", *result) //保存osgjs if result.Shadow == nil || len(result.Shadow.Url) < 1 || len(result.DbId) < 1 || len(result.Table) < 1 { msg.AckSync() return } err := RepoShadowResult(result) if err != nil { msg.Nak() return } msg.AckSync() }, } }