|
- package comm
- import (
- "context"
- "fmt"
- "sync"
- "time"
- )
- type LoopCtx struct {
- RunCtx context.Context
- }
- type CreateLoopCtx struct {
- RunCtx context.Context
- Quit chan error
- SetFpsWait func(t time.Duration)
- RegEvent func(name string, listener func(v interface{}))
- RegSignal func(name string, c chan interface{}, listener func(v interface{}))
- }
- const (
- MSG_CANCEL_RUNING = 1
- MSG_RESTART_RUNING = 2
- )
- // 此函数为BackgroundTask的主循环, 应该阻塞在此,以响应其事件和持续的输出数据
- // 当前此函数退出时,表面BackgroundTask任务取消执行或重启退出中
- // MainLoop会在独立的线程中被执行
- type MainLoopFunc func(ctx *LoopCtx) error
- type CreateMainLoopFunc func(ctx *CreateLoopCtx) MainLoopFunc
- type TaskCreator func(data interface{}) (t EventTask, createOut interface{}, err error)
- type TaskDispatcher struct {
- runningState int //运行状态
- msgChan chan int
- emiterChan chan *EmiterRouter
- mtx sync.Mutex
- _taskRoutes map[string]TaskCreator
- _tasks map[string]EventTask
- _tasksAsyncRuning map[string]bool //正在异步执行的任务
- bus *NatsBus
- }
- func (t *TaskDispatcher) GetNats() *NatsBus {
- return t.bus
- }
- func (t *TaskDispatcher) SetNats(b *NatsBus) {
- t.bus = b
- }
- func NewTaskDispatcher() *TaskDispatcher {
- return &TaskDispatcher{msgChan: make(chan int), emiterChan: make(chan *EmiterRouter), _taskRoutes: map[string]TaskCreator{}, _tasksAsyncRuning: make(map[string]bool)}
- }
- func (t *TaskDispatcher) CancelRuning() {
- t.msgChan <- MSG_CANCEL_RUNING
- }
- func (t *TaskDispatcher) IsAsync() bool {
- return true
- }
- func (t *TaskDispatcher) InitTask() {}
- func (t *TaskDispatcher) CreateTaskId() string {
- return "task-dispatcher-id"
- }
- func (t *TaskDispatcher) Restart() {
- t.msgChan <- MSG_RESTART_RUNING
- }
- type EmiterRouter struct {
- Data interface{}
- Name string
- Cb EmitRouterCallback
- }
- func (t *TaskDispatcher) GetRuningState() TaskState {
- return TaskState(t.runningState)
- }
- // 发送同步事件
- func EmitEventSync(name string, data interface{}) (interface{}, error) { return nil, nil }
- // 发送异步事件
- func EmitEventAsync(name string, data interface{}) (chan interface{}, error) { return nil, nil }
- // 添加路由
- func (t *TaskDispatcher) AddRoute(name string, taskCreator TaskCreator) {
- t.mtx.Lock()
- defer t.mtx.Unlock()
- if t._taskRoutes == nil {
- t._taskRoutes = make(map[string]TaskCreator)
- }
- t._taskRoutes[name] = taskCreator
- }
- // 添加路由
- func (t *TaskDispatcher) GetRouter(name string) TaskCreator {
- t.mtx.Lock()
- defer t.mtx.Unlock()
- return t._taskRoutes[name]
- }
- // 移除路由
- func (t *TaskDispatcher) RemoveRoute(name string, listen Task) {
- t.mtx.Lock()
- defer t.mtx.Unlock()
- if t._taskRoutes == nil {
- return
- }
- delete(t._taskRoutes, name)
- }
- type EmitRouterCallback func(interface{}, error)
- func (t *TaskDispatcher) EmitRouter(name string, data interface{}, cb EmitRouterCallback) {
- t.emiterChan <- &EmiterRouter{Name: name, Data: data, Cb: cb}
- }
- func (t *TaskDispatcher) EmitRouterEvent(routerName string, EventName string, data interface{}) (interface{}, error) {
- if !t._tasksAsyncRuning[routerName] {
- return nil, fmt.Errorf("%s 没有启动", routerName)
- }
- if t._tasks[routerName] == nil {
- return nil, fmt.Errorf("任务 %s 没有注册", routerName)
- }
- return t._tasks[routerName].EmitEvent(EventName, data)
- }
- func (t *TaskDispatcher) Run(overCallback TaskRunCallback) error {
- t.runningState = BT_STATE_INIT
- var OnLifeCycleMessage = func(msg int) bool {
- quitRuning := true
- switch msg {
- case MSG_CANCEL_RUNING:
- t.runningState = BT_STATE_QUITED_CACEL //取消执行退出
- case MSG_RESTART_RUNING:
- t.runningState = BT_STATE_NORMAL_RESTARTING
- default:
- quitRuning = false
- }
- if quitRuning {
- for name, _ := range t._tasksAsyncRuning {
- if t._tasks[name] != nil {
- t._tasks[name].CancelRuning()
- }
- }
- }
- return false
- }
- var RunAsyncEvent = func(taskWillRun Task, emiter *EmiterRouter) {
- t._tasksAsyncRuning[emiter.Name] = true
- go func() {
- taskWillRun.Run(nil)
- t._tasksAsyncRuning[emiter.Name] = false
- }()
- }
- ticket := 0
- // cases := make([]reflect.SelectCase, 0)
- // cases = append(cases, reflect.SelectCase{
- // Dir: reflect.SelectRecv,
- // Chan: reflect.ValueOf(t.msgChan),
- // })
- // cases = append(cases, reflect.SelectCase{
- // Dir: reflect.SelectRecv,
- // Chan: reflect.ValueOf(t.eventsMsgChan),
- // })
- // signalArray := []*Signal{}
- // for _, v := range t._signalMaps {
- // cases = append(cases, reflect.SelectCase{
- // Dir: reflect.SelectRecv,
- // Chan: reflect.ValueOf(v.Get()),
- // })
- // signalArray = append(signalArray, v)
- // }
- for {
- quitTask := false
- // chosen, recv, ok := reflect.Select(cases)
- // if !ok {
- // ticket = ticket + 1
- // fmt.Println("back-ground-task running: ", ticket)
- // time.Sleep(time.Second * 1)
- // continue
- // }
- // fmt.Printf("chosen: %d, recv: %v\n", chosen, recv)
- select {
- case msg := <-t.msgChan:
- quitTask = OnLifeCycleMessage(msg)
- case emiter := <-t.emiterChan:
- creator := t.GetRouter(emiter.Name)
- if creator != nil {
- t, out, err := creator(emiter.Data)
- if emiter.Cb != nil {
- emiter.Cb(out, err)
- }
- RunAsyncEvent(t, emiter)
- }
- default:
- ticket = ticket + 1
- fmt.Println("backgrount Task runing tick ", ticket)
- }
- if quitTask {
- break
- }
- }
- if overCallback != nil {
- overCallback(nil)
- }
- return nil
- }
|