123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- package task
- import (
- "encoding/json"
- "errors"
- "fmt"
- "mats/conf"
- "mats/db"
- "mats/log"
- "runtime"
- "sync"
- "time"
- "github.com/RichardKnop/machinery/v2"
- "github.com/RichardKnop/machinery/v2/config"
- "github.com/RichardKnop/machinery/v2/tasks"
- redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
- amqpbroker "github.com/RichardKnop/machinery/v2/brokers/amqp"
- eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
- )
- func StartServer(AppConfig *conf.TaskConfig) (*machinery.Server, error) {
- cnf := &config.Config{
- Broker: AppConfig.Broker,
- DefaultQueue: AppConfig.DefaultQueue,
- // ResultBackend: AppConfig.ResultBackend,
- ResultsExpireIn: int(AppConfig.ResultsExpireIn),
- AMQP: &config.AMQPConfig{
- Exchange: AppConfig.AmqpExchange,
- ExchangeType: AppConfig.AmqpExchangeType,
- BindingKey: AppConfig.AmqpBindingKey,
- PrefetchCount: int(AppConfig.AmqpPrefetchCount),
- },
- Redis: &config.RedisConfig{
- MaxIdle: 3,
- IdleTimeout: 240,
- ReadTimeout: 15,
- WriteTimeout: 15,
- ConnectTimeout: 15,
- NormalTasksPollPeriod: 1000,
- DelayedTasksPollPeriod: 500,
- },
- }
- // Create server instance
- broker := amqpbroker.New(cnf)
- backend := redisbackend.NewGR(cnf, []string{conf.AppConfig.Redis.Addr}, conf.AppConfig.Redis.Db)
- lock := eagerlock.New()
- server := machinery.NewServer(cnf, broker, backend, lock)
- return server, nil
- }
- func RunRenderTask(taskId string, db *db.MongoDB) error {
- defer func() {
- if r := recover(); r != nil {
- fmt.Println("RunRenderTask success.")
- fmt.Println(r)
- buf := make([]byte, 1<<16)
- runtime.Stack(buf, true)
- fmt.Println("buf", string(buf))
- log.Error(string(buf))
- }
- }()
- return nil
- // renderServer := GetOrCreateServer(&conf.AppConfig.TaskRender)
- // if renderServer == nil {
- // return errors.New("GetOrCreateServer null")
- // }
- // renderTask := &tasks.Signature{
- // Name: "render",
- // Args: []tasks.Arg{
- // {
- // Name: "taskId",
- // Type: "string",
- // Value: taskId,
- // },
- // },
- // }
- // asyncResult, err := renderServer.SendTaskWithContext(context.Background(), renderTask)
- // go func() {
- // results, err := asyncResult.Get(time.Duration(time.Microsecond * 200))
- // update := bson.M{}
- // if err != nil {
- // log.Errorf("Getting task result failed with error: %s", err.Error())
- // update["runState"] = model.TaskRunState_Fail
- // update["runPercent"] = 0
- // } else {
- // result := tasks.HumanReadableResults(results)
- // log.Infof("render result = %v\n", result)
- // update["runState"] = model.TaskRunState_Succ
- // update["runPercent"] = 100
- // contentLen := len("result:")
- // renderRet := result[contentLen:]
- // runResult := &model.TaskRunResult{}
- // err = json.Unmarshal([]byte(renderRet), runResult)
- // if err == nil {
- // update["runResult"] = runResult
- // }
- // }
- // _, err = repo.RepoUpdateSetDocProps(&repo.RepoSession{Ctx: context.Background(), Client: db}, repo.CollectionTask, taskId, bson.M{"$set": update})
- // if err != nil {
- // log.Errorf("render task update error")
- // }
- // }()
- // return err
- }
- // func RunConvmeshTask(meshUrl string, createThumbnail bool) (*model.ConvMeshRunResult, error) {
- // //先转换fbx
- // file, err := RunFbxConverterTask(meshUrl)
- // if err != nil {
- // return nil, err
- // }
- // taskConf := &tasks.Signature{
- // Name: "convmesh",
- // Args: []tasks.Arg{
- // {
- // Name: "meshUrl",
- // Type: "string",
- // Value: meshUrl,
- // },
- // {
- // Name: "thumbnail",
- // Type: "bool",
- // Value: createThumbnail,
- // },
- // },
- // }
- // runResult := &model.ConvMeshRunResult{}
- // err = RunTask(&conf.AppConfig.TaskConvMesh, taskConf, runResult)
- // if err != nil {
- // return nil, err
- // }
- // runResult.File = file
- // return runResult, nil
- // }
- // func RunFbxConverterTask(meshUrl string) (*model.OssType, error) {
- // renderTask := &tasks.Signature{
- // Name: "fbxConverter",
- // Args: []tasks.Arg{
- // {
- // Name: "meshUrl",
- // Type: "string",
- // Value: meshUrl,
- // },
- // },
- // }
- // runResult := &model.OssType{}
- // err := RunTask(&conf.AppConfig.TaskFbxConverter, renderTask, runResult)
- // if err != nil {
- // return nil, err
- // }
- // return runResult, nil
- // }
- var taskServers map[string]*machinery.Server = map[string]*machinery.Server{}
- var lock sync.Mutex
- func GetOrCreateServer(config *conf.TaskConfig) *machinery.Server {
- lock.Lock()
- defer lock.Unlock()
- server := taskServers[config.Name]
- if server == nil {
- taskServer, err := StartServer(config)
- if err != nil {
- fmt.Sprintln("GetOrCreateServer failed")
- return nil
- }
- server = taskServer
- taskServers[config.Name] = server
- }
- return server
- }
- func RunTask(config *conf.TaskConfig, task *tasks.Signature, runResult interface{}) error {
- server := GetOrCreateServer(config)
- if server == nil {
- return errors.New("GetOrCreateServer null")
- }
- defer func() {
- if r := recover(); r != nil {
- fmt.Println(r)
- buf := make([]byte, 1<<16)
- runtime.Stack(buf, true)
- fmt.Println("buf", string(buf))
- log.Error(string(buf))
- }
- }()
- asyncResult, err := server.SendTask(task)
- if err != nil {
- return err
- }
- if runResult == nil {
- return nil
- }
- results, err := asyncResult.Get(time.Duration(time.Microsecond * 200))
- if err != nil {
- log.Errorf("Getting task result failed with error: %s", err.Error())
- fmt.Println(err)
- return err
- }
- msg := tasks.HumanReadableResults(results)
- log.Infof("task result = %v\n", msg)
- contentLen := len("result:")
- renderRet := msg[contentLen:]
- err = json.Unmarshal([]byte(renderRet), runResult)
- if err != nil {
- return errors.New("转换结果解析失败!")
- }
- return nil
- }
- // func RunShadowTask(meshId string) (*model.OssType, error) {
- // taskReq := &tasks.Signature{
- // Name: "shadow",
- // Args: []tasks.Arg{
- // {
- // Name: "from",
- // Type: "string",
- // Value: "db",
- // },
- // {
- // Name: "meshId",
- // Type: "string",
- // Value: meshId,
- // },
- // },
- // }
- // runResult := &model.OssType{}
- // err := RunTask(&conf.AppConfig.TaskShadow, taskReq, runResult)
- // return runResult, err
- // }
- func RunMeshPostProcessTask(meshId string, collection string, itemId string, itemCollection string) error {
- go func() {
- RunMeshShadowTask(meshId, collection)
- }()
- taskReq := &tasks.Signature{
- Name: "uploadmesh",
- Args: []tasks.Arg{
- {
- Name: "meshId",
- Type: "string",
- Value: meshId,
- },
- {
- Name: "collection",
- Type: "string",
- Value: collection,
- },
- {
- Name: "itemId",
- Type: "string",
- Value: itemId,
- },
- {
- Name: "itemCollection",
- Type: "string",
- Value: itemCollection,
- },
- },
- }
- fmt.Sprintln(taskReq)
- // runResult := ""
- // return RunTask(&conf.AppConfig.TaskUploadProcess, taskReq, runResult)
- return nil
- }
- func RunMeshShadowTask(meshId string, collection string) error {
- taskReq := &tasks.Signature{
- Name: "shadow",
- Args: []tasks.Arg{
- {
- Name: "meshId",
- Type: "string",
- Value: meshId,
- },
- {
- Name: "collection",
- Type: "string",
- Value: collection,
- },
- },
- }
- // runResult := &model.OssType{}
- // return RunTask(&conf.AppConfig.TaskShadow, taskReq, runResult)
- fmt.Sprintln(taskReq)
- return nil
- }
|