task.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package task
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "mats/conf"
  7. "mats/db"
  8. "mats/log"
  9. "runtime"
  10. "sync"
  11. "time"
  12. "github.com/RichardKnop/machinery/v2"
  13. "github.com/RichardKnop/machinery/v2/config"
  14. "github.com/RichardKnop/machinery/v2/tasks"
  15. redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
  16. amqpbroker "github.com/RichardKnop/machinery/v2/brokers/amqp"
  17. eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
  18. )
  19. func StartServer(AppConfig *conf.TaskConfig) (*machinery.Server, error) {
  20. cnf := &config.Config{
  21. Broker: AppConfig.Broker,
  22. DefaultQueue: AppConfig.DefaultQueue,
  23. // ResultBackend: AppConfig.ResultBackend,
  24. ResultsExpireIn: int(AppConfig.ResultsExpireIn),
  25. AMQP: &config.AMQPConfig{
  26. Exchange: AppConfig.AmqpExchange,
  27. ExchangeType: AppConfig.AmqpExchangeType,
  28. BindingKey: AppConfig.AmqpBindingKey,
  29. PrefetchCount: int(AppConfig.AmqpPrefetchCount),
  30. },
  31. Redis: &config.RedisConfig{
  32. MaxIdle: 3,
  33. IdleTimeout: 240,
  34. ReadTimeout: 15,
  35. WriteTimeout: 15,
  36. ConnectTimeout: 15,
  37. NormalTasksPollPeriod: 1000,
  38. DelayedTasksPollPeriod: 500,
  39. },
  40. }
  41. // Create server instance
  42. broker := amqpbroker.New(cnf)
  43. backend := redisbackend.NewGR(cnf, []string{conf.AppConfig.Redis.Addr}, conf.AppConfig.Redis.Db)
  44. lock := eagerlock.New()
  45. server := machinery.NewServer(cnf, broker, backend, lock)
  46. return server, nil
  47. }
  48. func RunRenderTask(taskId string, db *db.MongoDB) error {
  49. defer func() {
  50. if r := recover(); r != nil {
  51. fmt.Println("RunRenderTask success.")
  52. fmt.Println(r)
  53. buf := make([]byte, 1<<16)
  54. runtime.Stack(buf, true)
  55. fmt.Println("buf", string(buf))
  56. log.Error(string(buf))
  57. }
  58. }()
  59. return nil
  60. // renderServer := GetOrCreateServer(&conf.AppConfig.TaskRender)
  61. // if renderServer == nil {
  62. // return errors.New("GetOrCreateServer null")
  63. // }
  64. // renderTask := &tasks.Signature{
  65. // Name: "render",
  66. // Args: []tasks.Arg{
  67. // {
  68. // Name: "taskId",
  69. // Type: "string",
  70. // Value: taskId,
  71. // },
  72. // },
  73. // }
  74. // asyncResult, err := renderServer.SendTaskWithContext(context.Background(), renderTask)
  75. // go func() {
  76. // results, err := asyncResult.Get(time.Duration(time.Microsecond * 200))
  77. // update := bson.M{}
  78. // if err != nil {
  79. // log.Errorf("Getting task result failed with error: %s", err.Error())
  80. // update["runState"] = model.TaskRunState_Fail
  81. // update["runPercent"] = 0
  82. // } else {
  83. // result := tasks.HumanReadableResults(results)
  84. // log.Infof("render result = %v\n", result)
  85. // update["runState"] = model.TaskRunState_Succ
  86. // update["runPercent"] = 100
  87. // contentLen := len("result:")
  88. // renderRet := result[contentLen:]
  89. // runResult := &model.TaskRunResult{}
  90. // err = json.Unmarshal([]byte(renderRet), runResult)
  91. // if err == nil {
  92. // update["runResult"] = runResult
  93. // }
  94. // }
  95. // _, err = repo.RepoUpdateSetDocProps(&repo.RepoSession{Ctx: context.Background(), Client: db}, repo.CollectionTask, taskId, bson.M{"$set": update})
  96. // if err != nil {
  97. // log.Errorf("render task update error")
  98. // }
  99. // }()
  100. // return err
  101. }
  102. // func RunConvmeshTask(meshUrl string, createThumbnail bool) (*model.ConvMeshRunResult, error) {
  103. // //先转换fbx
  104. // file, err := RunFbxConverterTask(meshUrl)
  105. // if err != nil {
  106. // return nil, err
  107. // }
  108. // taskConf := &tasks.Signature{
  109. // Name: "convmesh",
  110. // Args: []tasks.Arg{
  111. // {
  112. // Name: "meshUrl",
  113. // Type: "string",
  114. // Value: meshUrl,
  115. // },
  116. // {
  117. // Name: "thumbnail",
  118. // Type: "bool",
  119. // Value: createThumbnail,
  120. // },
  121. // },
  122. // }
  123. // runResult := &model.ConvMeshRunResult{}
  124. // err = RunTask(&conf.AppConfig.TaskConvMesh, taskConf, runResult)
  125. // if err != nil {
  126. // return nil, err
  127. // }
  128. // runResult.File = file
  129. // return runResult, nil
  130. // }
  131. // func RunFbxConverterTask(meshUrl string) (*model.OssType, error) {
  132. // renderTask := &tasks.Signature{
  133. // Name: "fbxConverter",
  134. // Args: []tasks.Arg{
  135. // {
  136. // Name: "meshUrl",
  137. // Type: "string",
  138. // Value: meshUrl,
  139. // },
  140. // },
  141. // }
  142. // runResult := &model.OssType{}
  143. // err := RunTask(&conf.AppConfig.TaskFbxConverter, renderTask, runResult)
  144. // if err != nil {
  145. // return nil, err
  146. // }
  147. // return runResult, nil
  148. // }
  149. var taskServers map[string]*machinery.Server = map[string]*machinery.Server{}
  150. var lock sync.Mutex
  151. func GetOrCreateServer(config *conf.TaskConfig) *machinery.Server {
  152. lock.Lock()
  153. defer lock.Unlock()
  154. server := taskServers[config.Name]
  155. if server == nil {
  156. taskServer, err := StartServer(config)
  157. if err != nil {
  158. fmt.Sprintln("GetOrCreateServer failed")
  159. return nil
  160. }
  161. server = taskServer
  162. taskServers[config.Name] = server
  163. }
  164. return server
  165. }
  166. func RunTask(config *conf.TaskConfig, task *tasks.Signature, runResult interface{}) error {
  167. server := GetOrCreateServer(config)
  168. if server == nil {
  169. return errors.New("GetOrCreateServer null")
  170. }
  171. defer func() {
  172. if r := recover(); r != nil {
  173. fmt.Println(r)
  174. buf := make([]byte, 1<<16)
  175. runtime.Stack(buf, true)
  176. fmt.Println("buf", string(buf))
  177. log.Error(string(buf))
  178. }
  179. }()
  180. asyncResult, err := server.SendTask(task)
  181. if err != nil {
  182. return err
  183. }
  184. if runResult == nil {
  185. return nil
  186. }
  187. results, err := asyncResult.Get(time.Duration(time.Microsecond * 200))
  188. if err != nil {
  189. log.Errorf("Getting task result failed with error: %s", err.Error())
  190. fmt.Println(err)
  191. return err
  192. }
  193. msg := tasks.HumanReadableResults(results)
  194. log.Infof("task result = %v\n", msg)
  195. contentLen := len("result:")
  196. renderRet := msg[contentLen:]
  197. err = json.Unmarshal([]byte(renderRet), runResult)
  198. if err != nil {
  199. return errors.New("转换结果解析失败!")
  200. }
  201. return nil
  202. }
  203. // func RunShadowTask(meshId string) (*model.OssType, error) {
  204. // taskReq := &tasks.Signature{
  205. // Name: "shadow",
  206. // Args: []tasks.Arg{
  207. // {
  208. // Name: "from",
  209. // Type: "string",
  210. // Value: "db",
  211. // },
  212. // {
  213. // Name: "meshId",
  214. // Type: "string",
  215. // Value: meshId,
  216. // },
  217. // },
  218. // }
  219. // runResult := &model.OssType{}
  220. // err := RunTask(&conf.AppConfig.TaskShadow, taskReq, runResult)
  221. // return runResult, err
  222. // }
  223. func RunMeshPostProcessTask(meshId string, collection string, itemId string, itemCollection string) error {
  224. go func() {
  225. RunMeshShadowTask(meshId, collection)
  226. }()
  227. taskReq := &tasks.Signature{
  228. Name: "uploadmesh",
  229. Args: []tasks.Arg{
  230. {
  231. Name: "meshId",
  232. Type: "string",
  233. Value: meshId,
  234. },
  235. {
  236. Name: "collection",
  237. Type: "string",
  238. Value: collection,
  239. },
  240. {
  241. Name: "itemId",
  242. Type: "string",
  243. Value: itemId,
  244. },
  245. {
  246. Name: "itemCollection",
  247. Type: "string",
  248. Value: itemCollection,
  249. },
  250. },
  251. }
  252. fmt.Sprintln(taskReq)
  253. // runResult := ""
  254. // return RunTask(&conf.AppConfig.TaskUploadProcess, taskReq, runResult)
  255. return nil
  256. }
  257. func RunMeshShadowTask(meshId string, collection string) error {
  258. taskReq := &tasks.Signature{
  259. Name: "shadow",
  260. Args: []tasks.Arg{
  261. {
  262. Name: "meshId",
  263. Type: "string",
  264. Value: meshId,
  265. },
  266. {
  267. Name: "collection",
  268. Type: "string",
  269. Value: collection,
  270. },
  271. },
  272. }
  273. // runResult := &model.OssType{}
  274. // return RunTask(&conf.AppConfig.TaskShadow, taskReq, runResult)
  275. fmt.Sprintln(taskReq)
  276. return nil
  277. }