task-background.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package comm
  2. import (
  3. "fmt"
  4. )
  5. type EventListener func(data interface{})
  6. type GetSignalChannel func() chan interface{}
  7. type Event struct {
  8. Name string
  9. Listener EventListener
  10. Data interface{}
  11. }
  12. type Signal struct {
  13. Name string
  14. Get GetSignalChannel
  15. Listener EventListener
  16. Data interface{}
  17. }
  18. type TaskLifeCirle interface {
  19. OnBeforeRun() error
  20. OnAfterRun() error
  21. Loop() bool
  22. }
  23. type BackgroundTask struct {
  24. LifeCircle TaskLifeCirle
  25. MaxExceptRestartTimes int //最大异常重启次数
  26. ExceptRestartWatting int //重启等待时间
  27. runningState int //运行状态
  28. msgChan chan int
  29. eventsMsgChan chan *Event
  30. _eventMaps map[string]*Event
  31. _signalMaps map[string]*Signal
  32. _runData interface{}
  33. SessionId string
  34. bus *NatsBus
  35. data interface{}
  36. Name string
  37. }
  38. func NewBackgroundTask(lifeCircle TaskLifeCirle, data interface{}) *BackgroundTask {
  39. t := &BackgroundTask{LifeCircle: lifeCircle}
  40. t.InitTask(data)
  41. return t
  42. }
  43. func (t *BackgroundTask) GetName() string { return t.Name }
  44. func (t *BackgroundTask) SetName(n string) { t.Name = n }
  45. func (t *BackgroundTask) GetID() string {
  46. return t.SessionId
  47. }
  48. func (t *BackgroundTask) SetID(id string) {
  49. t.SessionId = id
  50. }
  51. func (t *BackgroundTask) CancelRuning() {
  52. go func() {
  53. t.msgChan <- MSG_CANCEL_RUNING
  54. }()
  55. }
  56. func (t *BackgroundTask) Restart() {
  57. go func() {
  58. t.msgChan <- MSG_RESTART_RUNING
  59. }()
  60. }
  61. func (t *BackgroundTask) InitTask(data interface{}) {
  62. t.data = data
  63. t.msgChan = make(chan int)
  64. t.eventsMsgChan = make(chan *Event)
  65. t._eventMaps = make(map[string]*Event)
  66. t._signalMaps = make(map[string]*Signal)
  67. t.CreateTaskId()
  68. }
  69. func (t *BackgroundTask) CreateTaskId() string {
  70. t.SessionId = "bgt" + CreateSessionId()
  71. return t.SessionId
  72. }
  73. func (t *BackgroundTask) RegistorEvent(events string, listen EventListener) {
  74. // if t._eventMaps == nil {
  75. // t._eventMaps = make(map[string]*Event)
  76. // }
  77. // for _, name := range events {
  78. // t._eventMaps[name] = &Event{Listener: listen, Name: name}
  79. // }
  80. }
  81. func (t *BackgroundTask) RegistorEventAsync(name string, channel GetSignalChannel, listen EventListener) {
  82. if t._signalMaps == nil {
  83. t._signalMaps = make(map[string]*Signal)
  84. }
  85. t._signalMaps[name] = &Signal{Listener: listen, Name: name, Get: channel}
  86. }
  87. func (t *BackgroundTask) EmitEvent(event string, data interface{}) (interface{}, error) {
  88. return nil, nil
  89. }
  90. func (t *BackgroundTask) EmitEventAsync(event string, data interface{}) (chan interface{}, error) {
  91. return nil, nil
  92. }
  93. func (t *BackgroundTask) EmitEventSync(event string, data interface{}) (interface{}, error) {
  94. return nil, nil
  95. }
  96. func (t *BackgroundTask) EmitSingal(sigal string, data interface{}) {
  97. }
  98. func (t *BackgroundTask) GetRuningState() TaskState {
  99. return TaskState(t.runningState)
  100. }
  101. func (t *BackgroundTask) IsAsync() bool {
  102. return true
  103. }
  104. func (t *BackgroundTask) GetNats() *NatsBus {
  105. return t.bus
  106. }
  107. func (t *BackgroundTask) SetNats(b *NatsBus) {
  108. t.bus = b
  109. }
  110. //data
  111. //state
  112. //action
  113. // swiftTask
  114. func (t *BackgroundTask) Run(overCallback TaskRunCallback) error {
  115. life := t.LifeCircle
  116. err := life.OnBeforeRun()
  117. if err != nil {
  118. return err
  119. }
  120. go func() {
  121. t.runningState = BT_STATE_INIT
  122. var OnLifeCycleMessage = func(msg int) bool {
  123. switch msg {
  124. case MSG_CANCEL_RUNING:
  125. t.runningState = BT_STATE_QUITED_CACEL //取消执行退出
  126. return true
  127. case MSG_RESTART_RUNING:
  128. t.runningState = BT_STATE_NORMAL_RESTARTING
  129. }
  130. return false
  131. }
  132. ticket := 0
  133. for {
  134. quitTask := false
  135. select {
  136. case msg := <-t.msgChan: //msg
  137. quitTask = OnLifeCycleMessage(msg)
  138. default:
  139. ticket = ticket + 1
  140. fmt.Printf("[%s]background-task tick %d\n", t.Name, ticket)
  141. quitTask = life.Loop()
  142. }
  143. if quitTask {
  144. break
  145. }
  146. }
  147. if overCallback != nil {
  148. overCallback(nil)
  149. }
  150. life.OnAfterRun()
  151. }()
  152. return nil
  153. }