package bus import ( "assetcenter/conf" "assetcenter/db/model" "strings" "github.com/nats-io/nats.go" "infish.cn/comm" ) type OsgConvertorResult struct { AssetId string `json:"assetId"` DbId string `json:"dbId"` Table string `json:"table"` Error string `json:"error"` MeshUrl string `json:"meshUrl"` Osgjs *model.OssType `json:"Osgjs"` } func CreateOsgConvResultWather(app *conf.AppConf) *comm.NatsStreamWather { topics := strings.Split(app.Nats.OsgConvStreamTopic, "#") return &comm.NatsStreamWather{ Stream: topics[0], Topic: topics[1], Queue: topics[2], MaxDeliver: 5, // 添加 MaxDeliver 配置 AckPolicy: nats.AckExplicitPolicy, AckWaitMinute: 5, Entity: func() interface{} { return &OsgConvertorResult{} }, Cb: func(msg *nats.Msg, entity interface{}) { result := entity.(*OsgConvertorResult) //保存osgjs if result.Osgjs == nil || len(result.Osgjs.Url) < 1 || len(result.DbId) < 1 || len(result.Table) < 1 { msg.AckSync() return } err := RepoOsgconvResult(result) if err != nil { msg.Nak() return } msg.AckSync() }, } }