package comm import ( "context" "fmt" "sync" "time" ) type LoopCtx struct { RunCtx context.Context } type CreateLoopCtx struct { RunCtx context.Context Quit chan error SetFpsWait func(t time.Duration) RegEvent func(name string, listener func(v interface{})) RegSignal func(name string, c chan interface{}, listener func(v interface{})) } const ( MSG_CANCEL_RUNING = 1 MSG_RESTART_RUNING = 2 ) // 此函数为BackgroundTask的主循环, 应该阻塞在此,以响应其事件和持续的输出数据 // 当前此函数退出时,表面BackgroundTask任务取消执行或重启退出中 // MainLoop会在独立的线程中被执行 type MainLoopFunc func(ctx *LoopCtx) error type CreateMainLoopFunc func(ctx *CreateLoopCtx) MainLoopFunc type TaskCreator func(data interface{}) (t EventTask, createOut interface{}, err error) type TaskDispatcher struct { runningState int //运行状态 msgChan chan int emiterChan chan *EmiterRouter mtx sync.Mutex _taskRoutes map[string]TaskCreator _tasks map[string]EventTask _tasksAsyncRuning map[string]bool //正在异步执行的任务 bus *NatsBus } func (t *TaskDispatcher) GetNats() *NatsBus { return t.bus } func (t *TaskDispatcher) SetNats(b *NatsBus) { t.bus = b } func NewTaskDispatcher() *TaskDispatcher { return &TaskDispatcher{msgChan: make(chan int), emiterChan: make(chan *EmiterRouter), _taskRoutes: map[string]TaskCreator{}, _tasksAsyncRuning: make(map[string]bool)} } func (t *TaskDispatcher) CancelRuning() { t.msgChan <- MSG_CANCEL_RUNING } func (t *TaskDispatcher) IsAsync() bool { return true } func (t *TaskDispatcher) InitTask() {} func (t *TaskDispatcher) CreateTaskId() string { return "task-dispatcher-id" } func (t *TaskDispatcher) Restart() { t.msgChan <- MSG_RESTART_RUNING } type EmiterRouter struct { Data interface{} Name string Cb EmitRouterCallback } func (t *TaskDispatcher) GetRuningState() TaskState { return TaskState(t.runningState) } // 发送同步事件 func EmitEventSync(name string, data interface{}) (interface{}, error) { return nil, nil } // 发送异步事件 func EmitEventAsync(name string, data interface{}) (chan interface{}, error) { return nil, nil } // 添加路由 func (t *TaskDispatcher) AddRoute(name string, taskCreator TaskCreator) { t.mtx.Lock() defer t.mtx.Unlock() if t._taskRoutes == nil { t._taskRoutes = make(map[string]TaskCreator) } t._taskRoutes[name] = taskCreator } // 添加路由 func (t *TaskDispatcher) GetRouter(name string) TaskCreator { t.mtx.Lock() defer t.mtx.Unlock() return t._taskRoutes[name] } // 移除路由 func (t *TaskDispatcher) RemoveRoute(name string, listen Task) { t.mtx.Lock() defer t.mtx.Unlock() if t._taskRoutes == nil { return } delete(t._taskRoutes, name) } type EmitRouterCallback func(interface{}, error) func (t *TaskDispatcher) EmitRouter(name string, data interface{}, cb EmitRouterCallback) { t.emiterChan <- &EmiterRouter{Name: name, Data: data, Cb: cb} } func (t *TaskDispatcher) EmitRouterEvent(routerName string, EventName string, data interface{}) (interface{}, error) { if !t._tasksAsyncRuning[routerName] { return nil, fmt.Errorf("%s 没有启动", routerName) } if t._tasks[routerName] == nil { return nil, fmt.Errorf("任务 %s 没有注册", routerName) } return t._tasks[routerName].EmitEvent(EventName, data) } func (t *TaskDispatcher) Run(overCallback TaskRunCallback) error { t.runningState = BT_STATE_INIT var OnLifeCycleMessage = func(msg int) bool { quitRuning := true switch msg { case MSG_CANCEL_RUNING: t.runningState = BT_STATE_QUITED_CACEL //取消执行退出 case MSG_RESTART_RUNING: t.runningState = BT_STATE_NORMAL_RESTARTING default: quitRuning = false } if quitRuning { for name, _ := range t._tasksAsyncRuning { if t._tasks[name] != nil { t._tasks[name].CancelRuning() } } } return false } var RunAsyncEvent = func(taskWillRun Task, emiter *EmiterRouter) { t._tasksAsyncRuning[emiter.Name] = true go func() { taskWillRun.Run(nil) t._tasksAsyncRuning[emiter.Name] = false }() } ticket := 0 // cases := make([]reflect.SelectCase, 0) // cases = append(cases, reflect.SelectCase{ // Dir: reflect.SelectRecv, // Chan: reflect.ValueOf(t.msgChan), // }) // cases = append(cases, reflect.SelectCase{ // Dir: reflect.SelectRecv, // Chan: reflect.ValueOf(t.eventsMsgChan), // }) // signalArray := []*Signal{} // for _, v := range t._signalMaps { // cases = append(cases, reflect.SelectCase{ // Dir: reflect.SelectRecv, // Chan: reflect.ValueOf(v.Get()), // }) // signalArray = append(signalArray, v) // } for { quitTask := false // chosen, recv, ok := reflect.Select(cases) // if !ok { // ticket = ticket + 1 // fmt.Println("back-ground-task running: ", ticket) // time.Sleep(time.Second * 1) // continue // } // fmt.Printf("chosen: %d, recv: %v\n", chosen, recv) select { case msg := <-t.msgChan: quitTask = OnLifeCycleMessage(msg) case emiter := <-t.emiterChan: creator := t.GetRouter(emiter.Name) if creator != nil { t, out, err := creator(emiter.Data) if emiter.Cb != nil { emiter.Cb(out, err) } RunAsyncEvent(t, emiter) } default: ticket = ticket + 1 fmt.Println("backgrount Task runing tick ", ticket) } if quitTask { break } } if overCallback != nil { overCallback(nil) } return nil }