|
- package task
- import (
- "encoding/json"
- "errors"
- "fmt"
- "mats/conf"
- "mats/db"
- "mats/log"
- "runtime"
- "sync"
- "time"
- "github.com/RichardKnop/machinery/v2"
- "github.com/RichardKnop/machinery/v2/config"
- "github.com/RichardKnop/machinery/v2/tasks"
- redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
- amqpbroker "github.com/RichardKnop/machinery/v2/brokers/amqp"
- eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
- )
- func StartServer(AppConfig *conf.TaskConfig) (*machinery.Server, error) {
- cnf := &config.Config{
- Broker: AppConfig.Broker,
- DefaultQueue: AppConfig.DefaultQueue,
-
- ResultsExpireIn: int(AppConfig.ResultsExpireIn),
- AMQP: &config.AMQPConfig{
- Exchange: AppConfig.AmqpExchange,
- ExchangeType: AppConfig.AmqpExchangeType,
- BindingKey: AppConfig.AmqpBindingKey,
- PrefetchCount: int(AppConfig.AmqpPrefetchCount),
- },
- Redis: &config.RedisConfig{
- MaxIdle: 3,
- IdleTimeout: 240,
- ReadTimeout: 15,
- WriteTimeout: 15,
- ConnectTimeout: 15,
- NormalTasksPollPeriod: 1000,
- DelayedTasksPollPeriod: 500,
- },
- }
-
- broker := amqpbroker.New(cnf)
- backend := redisbackend.NewGR(cnf, []string{conf.AppConfig.Redis.Addr}, conf.AppConfig.Redis.Db)
- lock := eagerlock.New()
- server := machinery.NewServer(cnf, broker, backend, lock)
- return server, nil
- }
- func RunRenderTask(taskId string, db *db.MongoDB) error {
- defer func() {
- if r := recover(); r != nil {
- fmt.Println("RunRenderTask success.")
- fmt.Println(r)
- buf := make([]byte, 1<<16)
- runtime.Stack(buf, true)
- fmt.Println("buf", string(buf))
- log.Error(string(buf))
- }
- }()
- return nil
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- }
- var taskServers map[string]*machinery.Server = map[string]*machinery.Server{}
- var lock sync.Mutex
- func GetOrCreateServer(config *conf.TaskConfig) *machinery.Server {
- lock.Lock()
- defer lock.Unlock()
- server := taskServers[config.Name]
- if server == nil {
- taskServer, err := StartServer(config)
- if err != nil {
- fmt.Sprintln("GetOrCreateServer failed")
- return nil
- }
- server = taskServer
- taskServers[config.Name] = server
- }
- return server
- }
- func RunTask(config *conf.TaskConfig, task *tasks.Signature, runResult interface{}) error {
- server := GetOrCreateServer(config)
- if server == nil {
- return errors.New("GetOrCreateServer null")
- }
- defer func() {
- if r := recover(); r != nil {
- fmt.Println(r)
- buf := make([]byte, 1<<16)
- runtime.Stack(buf, true)
- fmt.Println("buf", string(buf))
- log.Error(string(buf))
- }
- }()
- asyncResult, err := server.SendTask(task)
- if err != nil {
- return err
- }
- if runResult == nil {
- return nil
- }
- results, err := asyncResult.Get(time.Duration(time.Microsecond * 200))
- if err != nil {
- log.Errorf("Getting task result failed with error: %s", err.Error())
- fmt.Println(err)
- return err
- }
- msg := tasks.HumanReadableResults(results)
- log.Infof("task result = %v\n", msg)
- contentLen := len("result:")
- renderRet := msg[contentLen:]
- err = json.Unmarshal([]byte(renderRet), runResult)
- if err != nil {
- return errors.New("转换结果解析失败!")
- }
- return nil
- }
- func RunMeshPostProcessTask(meshId string, collection string, itemId string, itemCollection string) error {
- go func() {
- RunMeshShadowTask(meshId, collection)
- }()
- taskReq := &tasks.Signature{
- Name: "uploadmesh",
- Args: []tasks.Arg{
- {
- Name: "meshId",
- Type: "string",
- Value: meshId,
- },
- {
- Name: "collection",
- Type: "string",
- Value: collection,
- },
- {
- Name: "itemId",
- Type: "string",
- Value: itemId,
- },
- {
- Name: "itemCollection",
- Type: "string",
- Value: itemCollection,
- },
- },
- }
- fmt.Sprintln(taskReq)
-
-
- return nil
- }
- func RunMeshShadowTask(meshId string, collection string) error {
- taskReq := &tasks.Signature{
- Name: "shadow",
- Args: []tasks.Arg{
- {
- Name: "meshId",
- Type: "string",
- Value: meshId,
- },
- {
- Name: "collection",
- Type: "string",
- Value: collection,
- },
- },
- }
-
-
- fmt.Sprintln(taskReq)
- return nil
- }
|