osgconv.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package bus
  2. import (
  3. "assetcenter/conf"
  4. "assetcenter/db/model"
  5. "strings"
  6. "github.com/nats-io/nats.go"
  7. "infish.cn/comm"
  8. )
  9. type OsgConvertorResult struct {
  10. AssetId string `json:"assetId"`
  11. DbId string `json:"dbId"`
  12. Table string `json:"table"`
  13. Error string `json:"error"`
  14. MeshUrl string `json:"meshUrl"`
  15. Osgjs *model.OssType `json:"Osgjs"`
  16. }
  17. func CreateOsgConvResultWather(app *conf.AppConf) *comm.NatsStreamWather {
  18. topics := strings.Split(app.Nats.OsgConvStreamTopic, "#")
  19. return &comm.NatsStreamWather{
  20. Stream: topics[0],
  21. Topic: topics[1],
  22. Queue: topics[2],
  23. MaxDeliver: 5, // 添加 MaxDeliver 配置
  24. AckPolicy: nats.AckExplicitPolicy,
  25. AckWaitMinute: 5,
  26. Entity: func() interface{} { return &OsgConvertorResult{} },
  27. Cb: func(msg *nats.Msg, entity interface{}) {
  28. result := entity.(*OsgConvertorResult)
  29. //保存osgjs
  30. if result.Osgjs == nil || len(result.Osgjs.Url) < 1 || len(result.DbId) < 1 || len(result.Table) < 1 {
  31. msg.AckSync()
  32. return
  33. }
  34. err := RepoOsgconvResult(result)
  35. if err != nil {
  36. msg.Nak()
  37. return
  38. }
  39. msg.AckSync()
  40. },
  41. }
  42. }