shadow.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. package bus
  2. import (
  3. "assetcenter/conf"
  4. "assetcenter/db/model"
  5. "assetcenter/log"
  6. "strings"
  7. "github.com/nats-io/nats.go"
  8. "infish.cn/comm"
  9. )
  10. type OsgShadowResult struct {
  11. AssetId string `json:"assetId"`
  12. DbId string `json:"dbId"`
  13. Table string `json:"table"`
  14. MeshUrl string `json:"meshUrl"`
  15. Shadow *model.OssType `json:"shadow"`
  16. }
  17. func CreateShadowResultWather(app *conf.AppConf) *comm.NatsStreamWather {
  18. topics := strings.Split(app.Nats.OsgShadowStreamTopic, "#")
  19. return &comm.NatsStreamWather{
  20. Stream: topics[0],
  21. Topic: topics[1],
  22. Queue: topics[2],
  23. MaxDeliver: 5, // 添加 MaxDeliver 配置
  24. AckPolicy: nats.AckExplicitPolicy,
  25. AckWaitMinute: 5,
  26. Entity: func() interface{} { return &OsgShadowResult{} },
  27. Cb: func(msg *nats.Msg, entity interface{}) {
  28. result := entity.(*OsgShadowResult)
  29. log.Info("------------------------[shadow-result]-------------------------")
  30. log.Infof("%#v\n", *result)
  31. //保存osgjs
  32. if result.Shadow == nil || len(result.Shadow.Url) < 1 || len(result.DbId) < 1 || len(result.Table) < 1 {
  33. msg.AckSync()
  34. return
  35. }
  36. err := RepoShadowResult(result)
  37. if err != nil {
  38. msg.Nak()
  39. return
  40. }
  41. msg.AckSync()
  42. },
  43. }
  44. }