package comm import ( "bufio" "context" "errors" "fmt" "io" "io/fs" "io/ioutil" "os" "os/exec" "path" "path/filepath" "runtime" "strconv" "strings" "time" ) type CmdBackgroundTaskData struct { CmdSuccFlag string //启动成功标志, 如果不设置, CmdSuccFlag2 string CmdDurationForSucc int //如果没有启动标志,则启动多少时间后,没有报错,则判断启动成功 CmdDurationForFail int //有启动标识,启动多少时间后,没有检测到成功标识判为启动失败 KillPreExe bool ExeFile string //执行文件 ExeDir string //执行目录 Envs []string //执行环境变量 Params []string //执行参数 } type CmdLogFunction func(string) type CmdBackgroundTask struct { MaxExceptRestartTimes int //最大异常重启次数 ExceptRestartWatting int //重启等待时间 CmdSuccStarted bool LogFunc CmdLogFunction RunningState int //运行状态 msgChan chan int SessionId string bus *NatsBus Data *CmdBackgroundTaskData InitRunCmd bool LifeCircle TaskLifeCirle Name string } func (t *CmdBackgroundTask) GetName() string { return t.Name } func (t *CmdBackgroundTask) SetName(n string) { t.Name = n } func (t *CmdBackgroundTask) GetID() string { return t.SessionId } func (t *CmdBackgroundTask) SetID(id string) { t.SessionId = id } func NewCmdBackgroundTask(data *CmdBackgroundTaskData) *CmdBackgroundTask { t := &CmdBackgroundTask{} t.Data = data t.msgChan = make(chan int) t.CreateTaskId() t.InitRunCmd = true return t } func (t *CmdBackgroundTask) CancelRuning() { go func() { //开启新线程投递重启消息,避免再LOOP中调用,导致死锁 t.msgChan <- MSG_CANCEL_RUNING }() } func (t *CmdBackgroundTask) Restart() { go func() { //开启新线程投递重启消息,避免再LOOP中调用,导致死锁 t.msgChan <- MSG_RESTART_RUNING }() } func (t *CmdBackgroundTask) GetNats() *NatsBus { return t.bus } func (t *CmdBackgroundTask) SetNats(b *NatsBus) { t.bus = b } func (t *CmdBackgroundTask) IsAsync() bool { return true } func (t *CmdBackgroundTask) GetRuningState() TaskState { return TaskState(t.RunningState) } func (t *CmdBackgroundTask) CreateTaskId() string { t.SessionId = "cmd-" + CreateSessionId() return t.SessionId } func (t *CmdBackgroundTask) EmitEvent(event string, data interface{}) (chan interface{}, error) { return nil, nil } func (t *CmdBackgroundTask) Run(overCallback TaskRunCallback) error { err := t.LifeCircle.OnBeforeRun() if err != nil { return err } var taskLoop = func() { t.RunningState = BT_STATE_INIT loopQuitChan := make(chan error) var ctx context.Context var cancel context.CancelFunc var StartLoop = func() { t.RunningState = BT_STATE_RUNING defer func() { if r := recover(); r != nil { loopQuitChan <- errors.New(fmt.Sprintf("loop exception :%v", r)) } }() ctx, cancel = context.WithCancel(context.Background()) loopQuitChan <- t.runCmd(ctx) } if t.InitRunCmd { go StartLoop() } loopErrorIndex := 0 var OnLoopQuit = func(err error) bool { fmt.Println("cmd quit ==>", err) if t.RunningState == BT_STATE_QUITED_CACEL { return true } if t.RunningState == BT_STATE_NORMAL_RESTARTING { //主动重启中 go StartLoop() return false } if err != nil { //loop异常退出 if t.RunningState == BT_STATE_QUITED_CACEL { return true } loopErrorIndex = loopErrorIndex + 1 if loopErrorIndex > t.MaxExceptRestartTimes { t.RunningState = BT_STATE_QUITED_EXCEPTION return true } t.RunningState = BT_STATE_EXCEPTION_RESTARTING if t.ExceptRestartWatting > 0 { time.Sleep(time.Second * time.Duration(t.ExceptRestartWatting)) } go StartLoop() return false } //程序正常终止了 t.RunningState = BT_STATE_QUITED_NORMAL return true } var OnLifeCycleMessage = func(msg int) bool { switch msg { case MSG_CANCEL_RUNING: t.RunningState = BT_STATE_QUITED_CACEL //取消执行退出 if cancel != nil { cancel() } else { return true } case MSG_RESTART_RUNING: t.RunningState = BT_STATE_NORMAL_RESTARTING if cancel != nil { cancel() } else { go StartLoop() } } return false } ticket := 0 for { quitTask := false select { case msg := <-t.msgChan: //msg quitTask = OnLifeCycleMessage(msg) case err := <-loopQuitChan: quitTask = OnLoopQuit(err) default: ticket = ticket + 1 fmt.Printf("[%s]cmd-bg-task tick %d \n", t.Name, ticket) quitTask = t.LifeCircle.Loop() } if quitTask { break } } t.LifeCircle.OnAfterRun() if overCallback != nil { overCallback(nil) } } go taskLoop() return nil } func (t *CmdBackgroundTask) GetExeDir() string { data := t.Data if len(data.ExeDir) > 0 { return path.Join(data.ExeDir, fmt.Sprintf("%s-%s", runtime.GOOS, runtime.GOARCH)) } return filepath.Dir(data.ExeFile) } func (t *CmdBackgroundTask) KillPreExeImpl() { //判断exename 是否存在 data := t.Data if !data.KillPreExe { return } pid := filepath.Join(t.GetExeDir(), "pid") pidstr, _ := ioutil.ReadFile(pid) if len(pidstr) < 1 { return } p, _ := strconv.Atoi(string(pidstr)) if p < 1 { return } proc, er := os.FindProcess(p) if er != nil { fmt.Println("find process fail", p, er) return } err := proc.Kill() fmt.Println("kill=>", p, err) } func (t *CmdBackgroundTask) runCmd(ctx context.Context) error { data := t.Data exef := data.ExeFile if runtime.GOOS == "windows" { if strings.LastIndex(exef, ".exe") == -1 { exef = exef + ".exe" } } t.KillPreExeImpl() cmd := exec.CommandContext(ctx, exef, data.Params...) if len(data.ExeDir) > 0 { cmd.Dir = path.Join(data.ExeDir, fmt.Sprintf("%s-%s", runtime.GOOS, runtime.GOARCH)) } if len(data.Envs) > 0 { cmd.Env = data.Envs } fmt.Println("start cmd==>", data.ExeDir, data.ExeFile) // cmd.SysProcAttr = xdaemon.NewSysProcAttr() stdoutIn, _ := cmd.StdoutPipe() stderrIn, _ := cmd.StderrPipe() reader := bufio.NewReader(stdoutIn) reader2 := bufio.NewReader(stderrIn) hasSuccFlag := len(data.CmdSuccFlag) > 0 go func() { checked := false for { line, err2 := reader.ReadString('\n') if err2 != nil || io.EOF == err2 { break } fmt.Println(line) if t.LogFunc != nil { t.LogFunc(line) } if !checked && hasSuccFlag && ((strings.LastIndex(line, data.CmdSuccFlag) != -1) || (len(data.CmdSuccFlag2) > 0 && strings.LastIndex(line, data.CmdSuccFlag2) != -1)) { checked = true t.CmdSuccStarted = true } } }() go func() { checked := false for { line, err2 := reader2.ReadString('\n') if err2 != nil || io.EOF == err2 { break } fmt.Println(line) if !checked && hasSuccFlag && ((strings.LastIndex(line, data.CmdSuccFlag) != -1) || (len(data.CmdSuccFlag2) > 0 && strings.LastIndex(line, data.CmdSuccFlag2) != -1)) { t.CmdSuccStarted = true } } }() exeBase := path.Base(data.ExeFile) err := cmd.Start() if err != nil { fmt.Println(exeBase, " start failed", err.Error()) return err } if data.KillPreExe { ioutil.WriteFile(filepath.Join(t.GetExeDir(), "pid"), []byte(fmt.Sprintf("%d", cmd.Process.Pid)), fs.ModePerm) } return cmd.Wait() }