|
- package comm
- import (
- "fmt"
- )
- type EventListener func(data interface{})
- type GetSignalChannel func() chan interface{}
- type Event struct {
- Name string
- Listener EventListener
- Data interface{}
- }
- type Signal struct {
- Name string
- Get GetSignalChannel
- Listener EventListener
- Data interface{}
- }
- type TaskLifeCirle interface {
- OnBeforeRun() error
- OnAfterRun() error
- Loop() bool
- }
- type BackgroundTask struct {
- LifeCircle TaskLifeCirle
- MaxExceptRestartTimes int //最大异常重启次数
- ExceptRestartWatting int //重启等待时间
- runningState int //运行状态
- msgChan chan int
- eventsMsgChan chan *Event
- _eventMaps map[string]*Event
- _signalMaps map[string]*Signal
- _runData interface{}
- SessionId string
- bus *NatsBus
- data interface{}
- Name string
- }
- func NewBackgroundTask(lifeCircle TaskLifeCirle, data interface{}) *BackgroundTask {
- t := &BackgroundTask{LifeCircle: lifeCircle}
- t.InitTask(data)
- return t
- }
- func (t *BackgroundTask) GetName() string { return t.Name }
- func (t *BackgroundTask) SetName(n string) { t.Name = n }
- func (t *BackgroundTask) GetID() string {
- return t.SessionId
- }
- func (t *BackgroundTask) SetID(id string) {
- t.SessionId = id
- }
- func (t *BackgroundTask) CancelRuning() {
- go func() {
- t.msgChan <- MSG_CANCEL_RUNING
- }()
- }
- func (t *BackgroundTask) Restart() {
- go func() {
- t.msgChan <- MSG_RESTART_RUNING
- }()
- }
- func (t *BackgroundTask) InitTask(data interface{}) {
- t.data = data
- t.msgChan = make(chan int)
- t.eventsMsgChan = make(chan *Event)
- t._eventMaps = make(map[string]*Event)
- t._signalMaps = make(map[string]*Signal)
- t.CreateTaskId()
- }
- func (t *BackgroundTask) CreateTaskId() string {
- t.SessionId = "bgt" + CreateSessionId()
- return t.SessionId
- }
- func (t *BackgroundTask) RegistorEvent(events string, listen EventListener) {
- // if t._eventMaps == nil {
- // t._eventMaps = make(map[string]*Event)
- // }
- // for _, name := range events {
- // t._eventMaps[name] = &Event{Listener: listen, Name: name}
- // }
- }
- func (t *BackgroundTask) RegistorEventAsync(name string, channel GetSignalChannel, listen EventListener) {
- if t._signalMaps == nil {
- t._signalMaps = make(map[string]*Signal)
- }
- t._signalMaps[name] = &Signal{Listener: listen, Name: name, Get: channel}
- }
- func (t *BackgroundTask) EmitEvent(event string, data interface{}) (interface{}, error) {
- return nil, nil
- }
- func (t *BackgroundTask) EmitEventAsync(event string, data interface{}) (chan interface{}, error) {
- return nil, nil
- }
- func (t *BackgroundTask) EmitEventSync(event string, data interface{}) (interface{}, error) {
- return nil, nil
- }
- func (t *BackgroundTask) EmitSingal(sigal string, data interface{}) {
- }
- func (t *BackgroundTask) GetRuningState() TaskState {
- return TaskState(t.runningState)
- }
- func (t *BackgroundTask) IsAsync() bool {
- return true
- }
- func (t *BackgroundTask) GetNats() *NatsBus {
- return t.bus
- }
- func (t *BackgroundTask) SetNats(b *NatsBus) {
- t.bus = b
- }
- //data
- //state
- //action
- // swiftTask
- func (t *BackgroundTask) Run(overCallback TaskRunCallback) error {
- life := t.LifeCircle
- err := life.OnBeforeRun()
- if err != nil {
- return err
- }
- go func() {
- t.runningState = BT_STATE_INIT
- var OnLifeCycleMessage = func(msg int) bool {
- switch msg {
- case MSG_CANCEL_RUNING:
- t.runningState = BT_STATE_QUITED_CACEL //取消执行退出
- return true
- case MSG_RESTART_RUNING:
- t.runningState = BT_STATE_NORMAL_RESTARTING
- }
- return false
- }
- ticket := 0
- for {
- quitTask := false
- select {
- case msg := <-t.msgChan: //msg
- quitTask = OnLifeCycleMessage(msg)
- default:
- ticket = ticket + 1
- fmt.Printf("[%s]background-task tick %d\n", t.Name, ticket)
- quitTask = life.Loop()
- }
- if quitTask {
- break
- }
- }
- if overCallback != nil {
- overCallback(nil)
- }
- life.OnAfterRun()
- }()
- return nil
- }
|