package comm import ( "fmt" ) type EventListener func(data interface{}) type GetSignalChannel func() chan interface{} type Event struct { Name string Listener EventListener Data interface{} } type Signal struct { Name string Get GetSignalChannel Listener EventListener Data interface{} } type TaskLifeCirle interface { OnBeforeRun() error OnAfterRun() error Loop() bool } type BackgroundTask struct { LifeCircle TaskLifeCirle MaxExceptRestartTimes int //最大异常重启次数 ExceptRestartWatting int //重启等待时间 runningState int //运行状态 msgChan chan int eventsMsgChan chan *Event _eventMaps map[string]*Event _signalMaps map[string]*Signal _runData interface{} SessionId string bus *NatsBus data interface{} Name string } func NewBackgroundTask(lifeCircle TaskLifeCirle, data interface{}) *BackgroundTask { t := &BackgroundTask{LifeCircle: lifeCircle} t.InitTask(data) return t } func (t *BackgroundTask) GetName() string { return t.Name } func (t *BackgroundTask) SetName(n string) { t.Name = n } func (t *BackgroundTask) GetID() string { return t.SessionId } func (t *BackgroundTask) SetID(id string) { t.SessionId = id } func (t *BackgroundTask) CancelRuning() { go func() { t.msgChan <- MSG_CANCEL_RUNING }() } func (t *BackgroundTask) Restart() { go func() { t.msgChan <- MSG_RESTART_RUNING }() } func (t *BackgroundTask) InitTask(data interface{}) { t.data = data t.msgChan = make(chan int) t.eventsMsgChan = make(chan *Event) t._eventMaps = make(map[string]*Event) t._signalMaps = make(map[string]*Signal) t.CreateTaskId() } func (t *BackgroundTask) CreateTaskId() string { t.SessionId = "bgt" + CreateSessionId() return t.SessionId } func (t *BackgroundTask) RegistorEvent(events string, listen EventListener) { // if t._eventMaps == nil { // t._eventMaps = make(map[string]*Event) // } // for _, name := range events { // t._eventMaps[name] = &Event{Listener: listen, Name: name} // } } func (t *BackgroundTask) RegistorEventAsync(name string, channel GetSignalChannel, listen EventListener) { if t._signalMaps == nil { t._signalMaps = make(map[string]*Signal) } t._signalMaps[name] = &Signal{Listener: listen, Name: name, Get: channel} } func (t *BackgroundTask) EmitEvent(event string, data interface{}) (interface{}, error) { return nil, nil } func (t *BackgroundTask) EmitEventAsync(event string, data interface{}) (chan interface{}, error) { return nil, nil } func (t *BackgroundTask) EmitEventSync(event string, data interface{}) (interface{}, error) { return nil, nil } func (t *BackgroundTask) EmitSingal(sigal string, data interface{}) { } func (t *BackgroundTask) GetRuningState() TaskState { return TaskState(t.runningState) } func (t *BackgroundTask) IsAsync() bool { return true } func (t *BackgroundTask) GetNats() *NatsBus { return t.bus } func (t *BackgroundTask) SetNats(b *NatsBus) { t.bus = b } //data //state //action // swiftTask func (t *BackgroundTask) Run(overCallback TaskRunCallback) error { life := t.LifeCircle err := life.OnBeforeRun() if err != nil { return err } go func() { t.runningState = BT_STATE_INIT var OnLifeCycleMessage = func(msg int) bool { switch msg { case MSG_CANCEL_RUNING: t.runningState = BT_STATE_QUITED_CACEL //取消执行退出 return true case MSG_RESTART_RUNING: t.runningState = BT_STATE_NORMAL_RESTARTING } return false } ticket := 0 for { quitTask := false select { case msg := <-t.msgChan: //msg quitTask = OnLifeCycleMessage(msg) default: ticket = ticket + 1 fmt.Printf("[%s]background-task tick %d\n", t.Name, ticket) quitTask = life.Loop() } if quitTask { break } } if overCallback != nil { overCallback(nil) } life.OnAfterRun() }() return nil }