123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- package task
- import (
- "encoding/json"
- "errors"
- "fmt"
- "mats/conf"
- "mats/db"
- "mats/log"
- "runtime"
- "sync"
- "time"
- "github.com/RichardKnop/machinery/v2"
- "github.com/RichardKnop/machinery/v2/config"
- "github.com/RichardKnop/machinery/v2/tasks"
- redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
- amqpbroker "github.com/RichardKnop/machinery/v2/brokers/amqp"
- eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
- )
- func StartServer(AppConfig *conf.TaskConfig) (*machinery.Server, error) {
- cnf := &config.Config{
- Broker: AppConfig.Broker,
- DefaultQueue: AppConfig.DefaultQueue,
-
- ResultsExpireIn: int(AppConfig.ResultsExpireIn),
- AMQP: &config.AMQPConfig{
- Exchange: AppConfig.AmqpExchange,
- ExchangeType: AppConfig.AmqpExchangeType,
- BindingKey: AppConfig.AmqpBindingKey,
- PrefetchCount: int(AppConfig.AmqpPrefetchCount),
- },
- Redis: &config.RedisConfig{
- MaxIdle: 3,
- IdleTimeout: 240,
- ReadTimeout: 15,
- WriteTimeout: 15,
- ConnectTimeout: 15,
- NormalTasksPollPeriod: 1000,
- DelayedTasksPollPeriod: 500,
- },
- }
-
- broker := amqpbroker.New(cnf)
- backend := redisbackend.NewGR(cnf, []string{conf.AppConfig.Redis.Addr}, conf.AppConfig.Redis.Db)
- lock := eagerlock.New()
- server := machinery.NewServer(cnf, broker, backend, lock)
- return server, nil
- }
- func RunRenderTask(taskId string, db *db.MongoDB) error {
- defer func() {
- if r := recover(); r != nil {
- fmt.Println("RunRenderTask success.")
- fmt.Println(r)
- buf := make([]byte, 1<<16)
- runtime.Stack(buf, true)
- fmt.Println("buf", string(buf))
- log.Error(string(buf))
- }
- }()
- return nil
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- }
- var taskServers map[string]*machinery.Server = map[string]*machinery.Server{}
- var lock sync.Mutex
- func GetOrCreateServer(config *conf.TaskConfig) *machinery.Server {
- lock.Lock()
- defer lock.Unlock()
- server := taskServers[config.Name]
- if server == nil {
- taskServer, err := StartServer(config)
- if err != nil {
- fmt.Sprintln("GetOrCreateServer failed")
- return nil
- }
- server = taskServer
- taskServers[config.Name] = server
- }
- return server
- }
- func RunTask(config *conf.TaskConfig, task *tasks.Signature, runResult interface{}) error {
- server := GetOrCreateServer(config)
- if server == nil {
- return errors.New("GetOrCreateServer null")
- }
- defer func() {
- if r := recover(); r != nil {
- fmt.Println(r)
- buf := make([]byte, 1<<16)
- runtime.Stack(buf, true)
- fmt.Println("buf", string(buf))
- log.Error(string(buf))
- }
- }()
- asyncResult, err := server.SendTask(task)
- if err != nil {
- return err
- }
- if runResult == nil {
- return nil
- }
- results, err := asyncResult.Get(time.Duration(time.Microsecond * 200))
- if err != nil {
- log.Errorf("Getting task result failed with error: %s", err.Error())
- fmt.Println(err)
- return err
- }
- msg := tasks.HumanReadableResults(results)
- log.Infof("task result = %v\n", msg)
- contentLen := len("result:")
- renderRet := msg[contentLen:]
- err = json.Unmarshal([]byte(renderRet), runResult)
- if err != nil {
- return errors.New("转换结果解析失败!")
- }
- return nil
- }
- func RunMeshPostProcessTask(meshId string, collection string, itemId string, itemCollection string) error {
- go func() {
- RunMeshShadowTask(meshId, collection)
- }()
- taskReq := &tasks.Signature{
- Name: "uploadmesh",
- Args: []tasks.Arg{
- {
- Name: "meshId",
- Type: "string",
- Value: meshId,
- },
- {
- Name: "collection",
- Type: "string",
- Value: collection,
- },
- {
- Name: "itemId",
- Type: "string",
- Value: itemId,
- },
- {
- Name: "itemCollection",
- Type: "string",
- Value: itemCollection,
- },
- },
- }
- fmt.Sprintln(taskReq)
-
-
- return nil
- }
- func RunMeshShadowTask(meshId string, collection string) error {
- taskReq := &tasks.Signature{
- Name: "shadow",
- Args: []tasks.Arg{
- {
- Name: "meshId",
- Type: "string",
- Value: meshId,
- },
- {
- Name: "collection",
- Type: "string",
- Value: collection,
- },
- },
- }
-
-
- fmt.Sprintln(taskReq)
- return nil
- }
|