package task import ( "encoding/json" "errors" "fmt" "mats/conf" "mats/db" "mats/log" "runtime" "sync" "time" "github.com/RichardKnop/machinery/v2" "github.com/RichardKnop/machinery/v2/config" "github.com/RichardKnop/machinery/v2/tasks" redisbackend "github.com/RichardKnop/machinery/v2/backends/redis" amqpbroker "github.com/RichardKnop/machinery/v2/brokers/amqp" eagerlock "github.com/RichardKnop/machinery/v2/locks/eager" ) func StartServer(AppConfig *conf.TaskConfig) (*machinery.Server, error) { cnf := &config.Config{ Broker: AppConfig.Broker, DefaultQueue: AppConfig.DefaultQueue, // ResultBackend: AppConfig.ResultBackend, ResultsExpireIn: int(AppConfig.ResultsExpireIn), AMQP: &config.AMQPConfig{ Exchange: AppConfig.AmqpExchange, ExchangeType: AppConfig.AmqpExchangeType, BindingKey: AppConfig.AmqpBindingKey, PrefetchCount: int(AppConfig.AmqpPrefetchCount), }, Redis: &config.RedisConfig{ MaxIdle: 3, IdleTimeout: 240, ReadTimeout: 15, WriteTimeout: 15, ConnectTimeout: 15, NormalTasksPollPeriod: 1000, DelayedTasksPollPeriod: 500, }, } // Create server instance broker := amqpbroker.New(cnf) backend := redisbackend.NewGR(cnf, []string{conf.AppConfig.Redis.Addr}, conf.AppConfig.Redis.Db) lock := eagerlock.New() server := machinery.NewServer(cnf, broker, backend, lock) return server, nil } func RunRenderTask(taskId string, db *db.MongoDB) error { defer func() { if r := recover(); r != nil { fmt.Println("RunRenderTask success.") fmt.Println(r) buf := make([]byte, 1<<16) runtime.Stack(buf, true) fmt.Println("buf", string(buf)) log.Error(string(buf)) } }() return nil // renderServer := GetOrCreateServer(&conf.AppConfig.TaskRender) // if renderServer == nil { // return errors.New("GetOrCreateServer null") // } // renderTask := &tasks.Signature{ // Name: "render", // Args: []tasks.Arg{ // { // Name: "taskId", // Type: "string", // Value: taskId, // }, // }, // } // asyncResult, err := renderServer.SendTaskWithContext(context.Background(), renderTask) // go func() { // results, err := asyncResult.Get(time.Duration(time.Microsecond * 200)) // update := bson.M{} // if err != nil { // log.Errorf("Getting task result failed with error: %s", err.Error()) // update["runState"] = model.TaskRunState_Fail // update["runPercent"] = 0 // } else { // result := tasks.HumanReadableResults(results) // log.Infof("render result = %v\n", result) // update["runState"] = model.TaskRunState_Succ // update["runPercent"] = 100 // contentLen := len("result:") // renderRet := result[contentLen:] // runResult := &model.TaskRunResult{} // err = json.Unmarshal([]byte(renderRet), runResult) // if err == nil { // update["runResult"] = runResult // } // } // _, err = repo.RepoUpdateSetDocProps(&repo.RepoSession{Ctx: context.Background(), Client: db}, repo.CollectionTask, taskId, bson.M{"$set": update}) // if err != nil { // log.Errorf("render task update error") // } // }() // return err } // func RunConvmeshTask(meshUrl string, createThumbnail bool) (*model.ConvMeshRunResult, error) { // //先转换fbx // file, err := RunFbxConverterTask(meshUrl) // if err != nil { // return nil, err // } // taskConf := &tasks.Signature{ // Name: "convmesh", // Args: []tasks.Arg{ // { // Name: "meshUrl", // Type: "string", // Value: meshUrl, // }, // { // Name: "thumbnail", // Type: "bool", // Value: createThumbnail, // }, // }, // } // runResult := &model.ConvMeshRunResult{} // err = RunTask(&conf.AppConfig.TaskConvMesh, taskConf, runResult) // if err != nil { // return nil, err // } // runResult.File = file // return runResult, nil // } // func RunFbxConverterTask(meshUrl string) (*model.OssType, error) { // renderTask := &tasks.Signature{ // Name: "fbxConverter", // Args: []tasks.Arg{ // { // Name: "meshUrl", // Type: "string", // Value: meshUrl, // }, // }, // } // runResult := &model.OssType{} // err := RunTask(&conf.AppConfig.TaskFbxConverter, renderTask, runResult) // if err != nil { // return nil, err // } // return runResult, nil // } var taskServers map[string]*machinery.Server = map[string]*machinery.Server{} var lock sync.Mutex func GetOrCreateServer(config *conf.TaskConfig) *machinery.Server { lock.Lock() defer lock.Unlock() server := taskServers[config.Name] if server == nil { taskServer, err := StartServer(config) if err != nil { fmt.Sprintln("GetOrCreateServer failed") return nil } server = taskServer taskServers[config.Name] = server } return server } func RunTask(config *conf.TaskConfig, task *tasks.Signature, runResult interface{}) error { server := GetOrCreateServer(config) if server == nil { return errors.New("GetOrCreateServer null") } defer func() { if r := recover(); r != nil { fmt.Println(r) buf := make([]byte, 1<<16) runtime.Stack(buf, true) fmt.Println("buf", string(buf)) log.Error(string(buf)) } }() asyncResult, err := server.SendTask(task) if err != nil { return err } if runResult == nil { return nil } results, err := asyncResult.Get(time.Duration(time.Microsecond * 200)) if err != nil { log.Errorf("Getting task result failed with error: %s", err.Error()) fmt.Println(err) return err } msg := tasks.HumanReadableResults(results) log.Infof("task result = %v\n", msg) contentLen := len("result:") renderRet := msg[contentLen:] err = json.Unmarshal([]byte(renderRet), runResult) if err != nil { return errors.New("转换结果解析失败!") } return nil } // func RunShadowTask(meshId string) (*model.OssType, error) { // taskReq := &tasks.Signature{ // Name: "shadow", // Args: []tasks.Arg{ // { // Name: "from", // Type: "string", // Value: "db", // }, // { // Name: "meshId", // Type: "string", // Value: meshId, // }, // }, // } // runResult := &model.OssType{} // err := RunTask(&conf.AppConfig.TaskShadow, taskReq, runResult) // return runResult, err // } func RunMeshPostProcessTask(meshId string, collection string, itemId string, itemCollection string) error { go func() { RunMeshShadowTask(meshId, collection) }() taskReq := &tasks.Signature{ Name: "uploadmesh", Args: []tasks.Arg{ { Name: "meshId", Type: "string", Value: meshId, }, { Name: "collection", Type: "string", Value: collection, }, { Name: "itemId", Type: "string", Value: itemId, }, { Name: "itemCollection", Type: "string", Value: itemCollection, }, }, } fmt.Sprintln(taskReq) // runResult := "" // return RunTask(&conf.AppConfig.TaskUploadProcess, taskReq, runResult) return nil } func RunMeshShadowTask(meshId string, collection string) error { taskReq := &tasks.Signature{ Name: "shadow", Args: []tasks.Arg{ { Name: "meshId", Type: "string", Value: meshId, }, { Name: "collection", Type: "string", Value: collection, }, }, } // runResult := &model.OssType{} // return RunTask(&conf.AppConfig.TaskShadow, taskReq, runResult) fmt.Sprintln(taskReq) return nil }