task-dispatcher-background.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package comm
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. type LoopCtx struct {
  9. RunCtx context.Context
  10. }
  11. type CreateLoopCtx struct {
  12. RunCtx context.Context
  13. Quit chan error
  14. SetFpsWait func(t time.Duration)
  15. RegEvent func(name string, listener func(v interface{}))
  16. RegSignal func(name string, c chan interface{}, listener func(v interface{}))
  17. }
  18. const (
  19. MSG_CANCEL_RUNING = 1
  20. MSG_RESTART_RUNING = 2
  21. )
  22. // 此函数为BackgroundTask的主循环, 应该阻塞在此,以响应其事件和持续的输出数据
  23. // 当前此函数退出时,表面BackgroundTask任务取消执行或重启退出中
  24. // MainLoop会在独立的线程中被执行
  25. type MainLoopFunc func(ctx *LoopCtx) error
  26. type CreateMainLoopFunc func(ctx *CreateLoopCtx) MainLoopFunc
  27. type TaskCreator func(data interface{}) (t EventTask, createOut interface{}, err error)
  28. type TaskDispatcher struct {
  29. runningState int //运行状态
  30. msgChan chan int
  31. emiterChan chan *EmiterRouter
  32. mtx sync.Mutex
  33. _taskRoutes map[string]TaskCreator
  34. _tasks map[string]EventTask
  35. _tasksAsyncRuning map[string]bool //正在异步执行的任务
  36. bus *NatsBus
  37. }
  38. func (t *TaskDispatcher) GetNats() *NatsBus {
  39. return t.bus
  40. }
  41. func (t *TaskDispatcher) SetNats(b *NatsBus) {
  42. t.bus = b
  43. }
  44. func NewTaskDispatcher() *TaskDispatcher {
  45. return &TaskDispatcher{msgChan: make(chan int), emiterChan: make(chan *EmiterRouter), _taskRoutes: map[string]TaskCreator{}, _tasksAsyncRuning: make(map[string]bool)}
  46. }
  47. func (t *TaskDispatcher) CancelRuning() {
  48. t.msgChan <- MSG_CANCEL_RUNING
  49. }
  50. func (t *TaskDispatcher) IsAsync() bool {
  51. return true
  52. }
  53. func (t *TaskDispatcher) InitTask() {}
  54. func (t *TaskDispatcher) CreateTaskId() string {
  55. return "task-dispatcher-id"
  56. }
  57. func (t *TaskDispatcher) Restart() {
  58. t.msgChan <- MSG_RESTART_RUNING
  59. }
  60. type EmiterRouter struct {
  61. Data interface{}
  62. Name string
  63. Cb EmitRouterCallback
  64. }
  65. func (t *TaskDispatcher) GetRuningState() TaskState {
  66. return TaskState(t.runningState)
  67. }
  68. // 发送同步事件
  69. func EmitEventSync(name string, data interface{}) (interface{}, error) { return nil, nil }
  70. // 发送异步事件
  71. func EmitEventAsync(name string, data interface{}) (chan interface{}, error) { return nil, nil }
  72. // 添加路由
  73. func (t *TaskDispatcher) AddRoute(name string, taskCreator TaskCreator) {
  74. t.mtx.Lock()
  75. defer t.mtx.Unlock()
  76. if t._taskRoutes == nil {
  77. t._taskRoutes = make(map[string]TaskCreator)
  78. }
  79. t._taskRoutes[name] = taskCreator
  80. }
  81. // 添加路由
  82. func (t *TaskDispatcher) GetRouter(name string) TaskCreator {
  83. t.mtx.Lock()
  84. defer t.mtx.Unlock()
  85. return t._taskRoutes[name]
  86. }
  87. // 移除路由
  88. func (t *TaskDispatcher) RemoveRoute(name string, listen Task) {
  89. t.mtx.Lock()
  90. defer t.mtx.Unlock()
  91. if t._taskRoutes == nil {
  92. return
  93. }
  94. delete(t._taskRoutes, name)
  95. }
  96. type EmitRouterCallback func(interface{}, error)
  97. func (t *TaskDispatcher) EmitRouter(name string, data interface{}, cb EmitRouterCallback) {
  98. t.emiterChan <- &EmiterRouter{Name: name, Data: data, Cb: cb}
  99. }
  100. func (t *TaskDispatcher) EmitRouterEvent(routerName string, EventName string, data interface{}) (interface{}, error) {
  101. if !t._tasksAsyncRuning[routerName] {
  102. return nil, fmt.Errorf("%s 没有启动", routerName)
  103. }
  104. if t._tasks[routerName] == nil {
  105. return nil, fmt.Errorf("任务 %s 没有注册", routerName)
  106. }
  107. return t._tasks[routerName].EmitEvent(EventName, data)
  108. }
  109. func (t *TaskDispatcher) Run(overCallback TaskRunCallback) error {
  110. t.runningState = BT_STATE_INIT
  111. var OnLifeCycleMessage = func(msg int) bool {
  112. quitRuning := true
  113. switch msg {
  114. case MSG_CANCEL_RUNING:
  115. t.runningState = BT_STATE_QUITED_CACEL //取消执行退出
  116. case MSG_RESTART_RUNING:
  117. t.runningState = BT_STATE_NORMAL_RESTARTING
  118. default:
  119. quitRuning = false
  120. }
  121. if quitRuning {
  122. for name, _ := range t._tasksAsyncRuning {
  123. if t._tasks[name] != nil {
  124. t._tasks[name].CancelRuning()
  125. }
  126. }
  127. }
  128. return false
  129. }
  130. var RunAsyncEvent = func(taskWillRun Task, emiter *EmiterRouter) {
  131. t._tasksAsyncRuning[emiter.Name] = true
  132. go func() {
  133. taskWillRun.Run(nil)
  134. t._tasksAsyncRuning[emiter.Name] = false
  135. }()
  136. }
  137. ticket := 0
  138. // cases := make([]reflect.SelectCase, 0)
  139. // cases = append(cases, reflect.SelectCase{
  140. // Dir: reflect.SelectRecv,
  141. // Chan: reflect.ValueOf(t.msgChan),
  142. // })
  143. // cases = append(cases, reflect.SelectCase{
  144. // Dir: reflect.SelectRecv,
  145. // Chan: reflect.ValueOf(t.eventsMsgChan),
  146. // })
  147. // signalArray := []*Signal{}
  148. // for _, v := range t._signalMaps {
  149. // cases = append(cases, reflect.SelectCase{
  150. // Dir: reflect.SelectRecv,
  151. // Chan: reflect.ValueOf(v.Get()),
  152. // })
  153. // signalArray = append(signalArray, v)
  154. // }
  155. for {
  156. quitTask := false
  157. // chosen, recv, ok := reflect.Select(cases)
  158. // if !ok {
  159. // ticket = ticket + 1
  160. // fmt.Println("back-ground-task running: ", ticket)
  161. // time.Sleep(time.Second * 1)
  162. // continue
  163. // }
  164. // fmt.Printf("chosen: %d, recv: %v\n", chosen, recv)
  165. select {
  166. case msg := <-t.msgChan:
  167. quitTask = OnLifeCycleMessage(msg)
  168. case emiter := <-t.emiterChan:
  169. creator := t.GetRouter(emiter.Name)
  170. if creator != nil {
  171. t, out, err := creator(emiter.Data)
  172. if emiter.Cb != nil {
  173. emiter.Cb(out, err)
  174. }
  175. RunAsyncEvent(t, emiter)
  176. }
  177. default:
  178. ticket = ticket + 1
  179. fmt.Println("backgrount Task runing tick ", ticket)
  180. }
  181. if quitTask {
  182. break
  183. }
  184. }
  185. if overCallback != nil {
  186. overCallback(nil)
  187. }
  188. return nil
  189. }