task-cmd.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package comm
  2. import (
  3. "bufio"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os/exec"
  9. "path"
  10. "path/filepath"
  11. "runtime"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. type CmdTaskData struct {
  17. TimeWaitingForRunSucc int //多少时间后判定为执行成功
  18. Timeout int //多少时间判定为超时
  19. ExeFile string //执行文件
  20. ExeDir string //执行目录
  21. CmdSuccFlag string
  22. Envs []string //执行环境变量
  23. Params []string //执行参数
  24. }
  25. type CmdTask struct {
  26. CmdSuccStarted bool
  27. LogFunc CmdLogFunction
  28. RunningState int //运行状态
  29. msgChan chan int
  30. SessionId string
  31. bus *NatsBus
  32. Data *CmdTaskData
  33. LifeCircle TaskLifeCirle
  34. Name string
  35. }
  36. func (t *CmdTask) GetName() string { return t.Name }
  37. func (t *CmdTask) SetName(n string) { t.Name = n }
  38. func (t *CmdTask) GetID() string {
  39. return t.SessionId
  40. }
  41. func (t *CmdTask) SetID(id string) {
  42. t.SessionId = id
  43. }
  44. func NewCmdTask(data *CmdTaskData) *CmdTask {
  45. t := &CmdTask{}
  46. t.Data = data
  47. t.msgChan = make(chan int)
  48. t.CreateTaskId()
  49. return t
  50. }
  51. func (t *CmdTask) CancelRuning() {
  52. go func() { //开启新线程投递重启消息,避免再LOOP中调用,导致死锁
  53. t.msgChan <- MSG_CANCEL_RUNING
  54. }()
  55. }
  56. func (t *CmdTask) Restart() {
  57. panic("不支持重启操作")
  58. }
  59. func (t *CmdTask) GetNats() *NatsBus {
  60. return t.bus
  61. }
  62. func (t *CmdTask) SetNats(b *NatsBus) {
  63. t.bus = b
  64. }
  65. func (t *CmdTask) IsAsync() bool {
  66. return true
  67. }
  68. func (t *CmdTask) GetRuningState() TaskState {
  69. return TaskState(t.RunningState)
  70. }
  71. func (t *CmdTask) CreateTaskId() string {
  72. t.SessionId = "cmd-" + CreateSessionId()
  73. return t.SessionId
  74. }
  75. func (t *CmdTask) Run(overCallback TaskRunCallback) error {
  76. if t.LifeCircle != nil {
  77. err := t.LifeCircle.OnBeforeRun()
  78. if err != nil {
  79. return err
  80. }
  81. }
  82. var wg sync.WaitGroup
  83. wg.Add(1)
  84. var out error
  85. var taskRun = func() {
  86. t.RunningState = BT_STATE_INIT
  87. loopQuitChan := make(chan error)
  88. var ctx context.Context
  89. var cancel context.CancelFunc
  90. timeOutCanceled := false
  91. var StartLoop = func() {
  92. t.RunningState = BT_STATE_RUNING
  93. defer func() {
  94. if r := recover(); r != nil {
  95. loopQuitChan <- errors.New(fmt.Sprintf("loop exception :%v", r))
  96. }
  97. }()
  98. ctx, cancel = context.WithCancel(context.Background())
  99. e := t.runCmd(ctx)
  100. if timeOutCanceled {
  101. e = nil
  102. }
  103. loopQuitChan <- e
  104. }
  105. go StartLoop()
  106. var OnLoopQuit = func(err error) bool {
  107. fmt.Println("cmd quit ==>", err)
  108. if t.RunningState == BT_STATE_QUITED_CACEL {
  109. err = nil
  110. } else {
  111. if err != nil { //loop异常退出
  112. t.RunningState = BT_STATE_QUITED_EXCEPTION
  113. out = err
  114. }
  115. //程序正常终止了
  116. t.RunningState = BT_STATE_QUITED_NORMAL
  117. }
  118. return true
  119. }
  120. var OnLifeCycleMessage = func(msg int) bool {
  121. switch msg {
  122. case MSG_CANCEL_RUNING:
  123. t.RunningState = BT_STATE_QUITED_CACEL //取消执行退出
  124. out = fmt.Errorf("取消执行")
  125. if cancel != nil {
  126. cancel()
  127. } else {
  128. return true
  129. }
  130. }
  131. return false
  132. }
  133. ticket := 0
  134. sucFlag := false
  135. for {
  136. quitTask := false
  137. select {
  138. case msg := <-t.msgChan: //msg
  139. quitTask = OnLifeCycleMessage(msg)
  140. case err := <-loopQuitChan:
  141. quitTask = OnLoopQuit(err)
  142. default:
  143. ticket = ticket + 1
  144. time.Sleep(time.Second)
  145. if !sucFlag && ticket > t.Data.TimeWaitingForRunSucc {
  146. wg.Done()
  147. sucFlag = true
  148. }
  149. if !timeOutCanceled && t.Data.Timeout > 0 && ticket > t.Data.Timeout {
  150. fmt.Printf("cmd:%s 执行超时,取消执行中", t.Name)
  151. out = fmt.Errorf("执行超时")
  152. timeOutCanceled = true
  153. if cancel != nil {
  154. cancel()
  155. }
  156. }
  157. }
  158. if quitTask {
  159. break
  160. }
  161. }
  162. if t.LifeCircle != nil {
  163. t.LifeCircle.OnAfterRun()
  164. }
  165. if overCallback != nil {
  166. overCallback(out)
  167. }
  168. }
  169. go taskRun()
  170. wg.Wait()
  171. return out
  172. }
  173. func (t *CmdTask) GetExeDir() string {
  174. data := t.Data
  175. if len(data.ExeDir) > 0 {
  176. return path.Join(data.ExeDir, fmt.Sprintf("%s-%s", runtime.GOOS, runtime.GOARCH))
  177. }
  178. return filepath.Dir(data.ExeFile)
  179. }
  180. func (t *CmdTask) runCmd(ctx context.Context) error {
  181. data := t.Data
  182. exef := data.ExeFile
  183. if runtime.GOOS == "windows" {
  184. if strings.LastIndex(exef, ".exe") == -1 {
  185. exef = exef + ".exe"
  186. }
  187. }
  188. cmd := exec.CommandContext(ctx, exef, data.Params...)
  189. if len(data.ExeDir) > 0 {
  190. cmd.Dir = path.Join(data.ExeDir, fmt.Sprintf("%s-%s", runtime.GOOS, runtime.GOARCH))
  191. }
  192. if len(data.Envs) > 0 {
  193. cmd.Env = data.Envs
  194. }
  195. fmt.Println("start cmd==>", data.ExeDir, data.ExeFile)
  196. // cmd.SysProcAttr = xdaemon.NewSysProcAttr()
  197. stdoutIn, _ := cmd.StdoutPipe()
  198. stderrIn, _ := cmd.StderrPipe()
  199. reader := bufio.NewReader(stdoutIn)
  200. reader2 := bufio.NewReader(stderrIn)
  201. hasSuccFlag := len(data.CmdSuccFlag) > 0
  202. go func() {
  203. checked := false
  204. for {
  205. line, err2 := reader.ReadString('\n')
  206. if err2 != nil || io.EOF == err2 {
  207. break
  208. }
  209. fmt.Println(line)
  210. if t.LogFunc != nil {
  211. t.LogFunc(line)
  212. }
  213. if !checked && hasSuccFlag && (strings.LastIndex(line, data.CmdSuccFlag) != -1) {
  214. checked = true
  215. t.CmdSuccStarted = true
  216. }
  217. }
  218. }()
  219. go func() {
  220. checked := false
  221. for {
  222. line, err2 := reader2.ReadString('\n')
  223. if err2 != nil || io.EOF == err2 {
  224. break
  225. }
  226. fmt.Println(line)
  227. if !checked && hasSuccFlag && (strings.LastIndex(line, data.CmdSuccFlag) != -1) {
  228. t.CmdSuccStarted = true
  229. }
  230. }
  231. }()
  232. exeBase := path.Base(data.ExeFile)
  233. err := cmd.Start()
  234. if err != nil {
  235. fmt.Println(exeBase, " start failed", err.Error())
  236. return err
  237. }
  238. return cmd.Wait()
  239. }