package comm import ( "bufio" "context" "errors" "fmt" "io" "os/exec" "path" "path/filepath" "runtime" "strings" "sync" "time" ) type CmdTaskData struct { TimeWaitingForRunSucc int //多少时间后判定为执行成功 Timeout int //多少时间判定为超时 ExeFile string //执行文件 ExeDir string //执行目录 CmdSuccFlag string Envs []string //执行环境变量 Params []string //执行参数 } type CmdTask struct { CmdSuccStarted bool LogFunc CmdLogFunction RunningState int //运行状态 msgChan chan int SessionId string bus *NatsBus Data *CmdTaskData LifeCircle TaskLifeCirle Name string } func (t *CmdTask) GetName() string { return t.Name } func (t *CmdTask) SetName(n string) { t.Name = n } func (t *CmdTask) GetID() string { return t.SessionId } func (t *CmdTask) SetID(id string) { t.SessionId = id } func NewCmdTask(data *CmdTaskData) *CmdTask { t := &CmdTask{} t.Data = data t.msgChan = make(chan int) t.CreateTaskId() return t } func (t *CmdTask) CancelRuning() { go func() { //开启新线程投递重启消息,避免再LOOP中调用,导致死锁 t.msgChan <- MSG_CANCEL_RUNING }() } func (t *CmdTask) Restart() { panic("不支持重启操作") } func (t *CmdTask) GetNats() *NatsBus { return t.bus } func (t *CmdTask) SetNats(b *NatsBus) { t.bus = b } func (t *CmdTask) IsAsync() bool { return true } func (t *CmdTask) GetRuningState() TaskState { return TaskState(t.RunningState) } func (t *CmdTask) CreateTaskId() string { t.SessionId = "cmd-" + CreateSessionId() return t.SessionId } func (t *CmdTask) Run(overCallback TaskRunCallback) error { if t.LifeCircle != nil { err := t.LifeCircle.OnBeforeRun() if err != nil { return err } } var wg sync.WaitGroup wg.Add(1) var out error var taskRun = func() { t.RunningState = BT_STATE_INIT loopQuitChan := make(chan error) var ctx context.Context var cancel context.CancelFunc timeOutCanceled := false var StartLoop = func() { t.RunningState = BT_STATE_RUNING defer func() { if r := recover(); r != nil { loopQuitChan <- errors.New(fmt.Sprintf("loop exception :%v", r)) } }() ctx, cancel = context.WithCancel(context.Background()) e := t.runCmd(ctx) if timeOutCanceled { e = nil } loopQuitChan <- e } go StartLoop() var OnLoopQuit = func(err error) bool { fmt.Println("cmd quit ==>", err) if t.RunningState == BT_STATE_QUITED_CACEL { err = nil } else { if err != nil { //loop异常退出 t.RunningState = BT_STATE_QUITED_EXCEPTION out = err } //程序正常终止了 t.RunningState = BT_STATE_QUITED_NORMAL } return true } var OnLifeCycleMessage = func(msg int) bool { switch msg { case MSG_CANCEL_RUNING: t.RunningState = BT_STATE_QUITED_CACEL //取消执行退出 out = fmt.Errorf("取消执行") if cancel != nil { cancel() } else { return true } } return false } ticket := 0 sucFlag := false for { quitTask := false select { case msg := <-t.msgChan: //msg quitTask = OnLifeCycleMessage(msg) case err := <-loopQuitChan: quitTask = OnLoopQuit(err) default: ticket = ticket + 1 time.Sleep(time.Second) if !sucFlag && ticket > t.Data.TimeWaitingForRunSucc { wg.Done() sucFlag = true } if !timeOutCanceled && t.Data.Timeout > 0 && ticket > t.Data.Timeout { fmt.Printf("cmd:%s 执行超时,取消执行中", t.Name) out = fmt.Errorf("执行超时") timeOutCanceled = true if cancel != nil { cancel() } } } if quitTask { break } } if t.LifeCircle != nil { t.LifeCircle.OnAfterRun() } if overCallback != nil { overCallback(out) } } go taskRun() wg.Wait() return out } func (t *CmdTask) GetExeDir() string { data := t.Data if len(data.ExeDir) > 0 { return path.Join(data.ExeDir, fmt.Sprintf("%s-%s", runtime.GOOS, runtime.GOARCH)) } return filepath.Dir(data.ExeFile) } func (t *CmdTask) runCmd(ctx context.Context) error { data := t.Data exef := data.ExeFile if runtime.GOOS == "windows" { if strings.LastIndex(exef, ".exe") == -1 { exef = exef + ".exe" } } cmd := exec.CommandContext(ctx, exef, data.Params...) if len(data.ExeDir) > 0 { cmd.Dir = path.Join(data.ExeDir, fmt.Sprintf("%s-%s", runtime.GOOS, runtime.GOARCH)) } if len(data.Envs) > 0 { cmd.Env = data.Envs } fmt.Println("start cmd==>", data.ExeDir, data.ExeFile) // cmd.SysProcAttr = xdaemon.NewSysProcAttr() stdoutIn, _ := cmd.StdoutPipe() stderrIn, _ := cmd.StderrPipe() reader := bufio.NewReader(stdoutIn) reader2 := bufio.NewReader(stderrIn) hasSuccFlag := len(data.CmdSuccFlag) > 0 go func() { checked := false for { line, err2 := reader.ReadString('\n') if err2 != nil || io.EOF == err2 { break } fmt.Println(line) if t.LogFunc != nil { t.LogFunc(line) } if !checked && hasSuccFlag && (strings.LastIndex(line, data.CmdSuccFlag) != -1) { checked = true t.CmdSuccStarted = true } } }() go func() { checked := false for { line, err2 := reader2.ReadString('\n') if err2 != nil || io.EOF == err2 { break } fmt.Println(line) if !checked && hasSuccFlag && (strings.LastIndex(line, data.CmdSuccFlag) != -1) { t.CmdSuccStarted = true } } }() exeBase := path.Base(data.ExeFile) err := cmd.Start() if err != nil { fmt.Println(exeBase, " start failed", err.Error()) return err } return cmd.Wait() }