task-cmd-background.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package comm
  2. import (
  3. "bufio"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "io/fs"
  9. "io/ioutil"
  10. "os"
  11. "os/exec"
  12. "path"
  13. "path/filepath"
  14. "runtime"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. type CmdBackgroundTaskData struct {
  20. CmdSuccFlag string //启动成功标志, 如果不设置,
  21. CmdSuccFlag2 string
  22. CmdDurationForSucc int //如果没有启动标志,则启动多少时间后,没有报错,则判断启动成功
  23. CmdDurationForFail int //有启动标识,启动多少时间后,没有检测到成功标识判为启动失败
  24. KillPreExe bool
  25. ExeFile string //执行文件
  26. ExeDir string //执行目录
  27. Envs []string //执行环境变量
  28. Params []string //执行参数
  29. }
  30. type CmdLogFunction func(string)
  31. type CmdBackgroundTask struct {
  32. MaxExceptRestartTimes int //最大异常重启次数
  33. ExceptRestartWatting int //重启等待时间
  34. CmdSuccStarted bool
  35. LogFunc CmdLogFunction
  36. RunningState int //运行状态
  37. msgChan chan int
  38. SessionId string
  39. bus *NatsBus
  40. Data *CmdBackgroundTaskData
  41. InitRunCmd bool
  42. LifeCircle TaskLifeCirle
  43. Name string
  44. }
  45. func (t *CmdBackgroundTask) GetName() string { return t.Name }
  46. func (t *CmdBackgroundTask) SetName(n string) { t.Name = n }
  47. func (t *CmdBackgroundTask) GetID() string {
  48. return t.SessionId
  49. }
  50. func (t *CmdBackgroundTask) SetID(id string) {
  51. t.SessionId = id
  52. }
  53. func NewCmdBackgroundTask(data *CmdBackgroundTaskData) *CmdBackgroundTask {
  54. t := &CmdBackgroundTask{}
  55. t.Data = data
  56. t.msgChan = make(chan int)
  57. t.CreateTaskId()
  58. t.InitRunCmd = true
  59. return t
  60. }
  61. func (t *CmdBackgroundTask) CancelRuning() {
  62. go func() { //开启新线程投递重启消息,避免再LOOP中调用,导致死锁
  63. t.msgChan <- MSG_CANCEL_RUNING
  64. }()
  65. }
  66. func (t *CmdBackgroundTask) Restart() {
  67. go func() { //开启新线程投递重启消息,避免再LOOP中调用,导致死锁
  68. t.msgChan <- MSG_RESTART_RUNING
  69. }()
  70. }
  71. func (t *CmdBackgroundTask) GetNats() *NatsBus {
  72. return t.bus
  73. }
  74. func (t *CmdBackgroundTask) SetNats(b *NatsBus) {
  75. t.bus = b
  76. }
  77. func (t *CmdBackgroundTask) IsAsync() bool {
  78. return true
  79. }
  80. func (t *CmdBackgroundTask) GetRuningState() TaskState {
  81. return TaskState(t.RunningState)
  82. }
  83. func (t *CmdBackgroundTask) CreateTaskId() string {
  84. t.SessionId = "cmd-" + CreateSessionId()
  85. return t.SessionId
  86. }
  87. func (t *CmdBackgroundTask) EmitEvent(event string, data interface{}) (chan interface{}, error) {
  88. return nil, nil
  89. }
  90. func (t *CmdBackgroundTask) Run(overCallback TaskRunCallback) error {
  91. err := t.LifeCircle.OnBeforeRun()
  92. if err != nil {
  93. return err
  94. }
  95. var taskLoop = func() {
  96. t.RunningState = BT_STATE_INIT
  97. loopQuitChan := make(chan error)
  98. var ctx context.Context
  99. var cancel context.CancelFunc
  100. var StartLoop = func() {
  101. t.RunningState = BT_STATE_RUNING
  102. defer func() {
  103. if r := recover(); r != nil {
  104. loopQuitChan <- errors.New(fmt.Sprintf("loop exception :%v", r))
  105. }
  106. }()
  107. ctx, cancel = context.WithCancel(context.Background())
  108. loopQuitChan <- t.runCmd(ctx)
  109. }
  110. if t.InitRunCmd {
  111. go StartLoop()
  112. }
  113. loopErrorIndex := 0
  114. var OnLoopQuit = func(err error) bool {
  115. fmt.Println("cmd quit ==>", err)
  116. if t.RunningState == BT_STATE_QUITED_CACEL {
  117. return true
  118. }
  119. if t.RunningState == BT_STATE_NORMAL_RESTARTING { //主动重启中
  120. go StartLoop()
  121. return false
  122. }
  123. if err != nil { //loop异常退出
  124. if t.RunningState == BT_STATE_QUITED_CACEL {
  125. return true
  126. }
  127. loopErrorIndex = loopErrorIndex + 1
  128. if loopErrorIndex > t.MaxExceptRestartTimes {
  129. t.RunningState = BT_STATE_QUITED_EXCEPTION
  130. return true
  131. }
  132. t.RunningState = BT_STATE_EXCEPTION_RESTARTING
  133. if t.ExceptRestartWatting > 0 {
  134. time.Sleep(time.Second * time.Duration(t.ExceptRestartWatting))
  135. }
  136. go StartLoop()
  137. return false
  138. }
  139. //程序正常终止了
  140. t.RunningState = BT_STATE_QUITED_NORMAL
  141. return true
  142. }
  143. var OnLifeCycleMessage = func(msg int) bool {
  144. switch msg {
  145. case MSG_CANCEL_RUNING:
  146. t.RunningState = BT_STATE_QUITED_CACEL //取消执行退出
  147. if cancel != nil {
  148. cancel()
  149. } else {
  150. return true
  151. }
  152. case MSG_RESTART_RUNING:
  153. t.RunningState = BT_STATE_NORMAL_RESTARTING
  154. if cancel != nil {
  155. cancel()
  156. } else {
  157. go StartLoop()
  158. }
  159. }
  160. return false
  161. }
  162. ticket := 0
  163. for {
  164. quitTask := false
  165. select {
  166. case msg := <-t.msgChan: //msg
  167. quitTask = OnLifeCycleMessage(msg)
  168. case err := <-loopQuitChan:
  169. quitTask = OnLoopQuit(err)
  170. default:
  171. ticket = ticket + 1
  172. fmt.Printf("[%s]cmd-bg-task tick %d \n", t.Name, ticket)
  173. quitTask = t.LifeCircle.Loop()
  174. }
  175. if quitTask {
  176. break
  177. }
  178. }
  179. t.LifeCircle.OnAfterRun()
  180. if overCallback != nil {
  181. overCallback(nil)
  182. }
  183. }
  184. go taskLoop()
  185. return nil
  186. }
  187. func (t *CmdBackgroundTask) GetExeDir() string {
  188. data := t.Data
  189. if len(data.ExeDir) > 0 {
  190. return path.Join(data.ExeDir, fmt.Sprintf("%s-%s", runtime.GOOS, runtime.GOARCH))
  191. }
  192. return filepath.Dir(data.ExeFile)
  193. }
  194. func (t *CmdBackgroundTask) KillPreExeImpl() {
  195. //判断exename 是否存在
  196. data := t.Data
  197. if !data.KillPreExe {
  198. return
  199. }
  200. pid := filepath.Join(t.GetExeDir(), "pid")
  201. pidstr, _ := ioutil.ReadFile(pid)
  202. if len(pidstr) < 1 {
  203. return
  204. }
  205. p, _ := strconv.Atoi(string(pidstr))
  206. if p < 1 {
  207. return
  208. }
  209. proc, er := os.FindProcess(p)
  210. if er != nil {
  211. fmt.Println("find process fail", p, er)
  212. return
  213. }
  214. err := proc.Kill()
  215. fmt.Println("kill=>", p, err)
  216. }
  217. func (t *CmdBackgroundTask) runCmd(ctx context.Context) error {
  218. data := t.Data
  219. exef := data.ExeFile
  220. if runtime.GOOS == "windows" {
  221. if strings.LastIndex(exef, ".exe") == -1 {
  222. exef = exef + ".exe"
  223. }
  224. }
  225. t.KillPreExeImpl()
  226. cmd := exec.CommandContext(ctx, exef, data.Params...)
  227. if len(data.ExeDir) > 0 {
  228. cmd.Dir = path.Join(data.ExeDir, fmt.Sprintf("%s-%s", runtime.GOOS, runtime.GOARCH))
  229. }
  230. if len(data.Envs) > 0 {
  231. cmd.Env = data.Envs
  232. }
  233. fmt.Println("start cmd==>", data.ExeDir, data.ExeFile)
  234. // cmd.SysProcAttr = xdaemon.NewSysProcAttr()
  235. stdoutIn, _ := cmd.StdoutPipe()
  236. stderrIn, _ := cmd.StderrPipe()
  237. reader := bufio.NewReader(stdoutIn)
  238. reader2 := bufio.NewReader(stderrIn)
  239. hasSuccFlag := len(data.CmdSuccFlag) > 0
  240. go func() {
  241. checked := false
  242. for {
  243. line, err2 := reader.ReadString('\n')
  244. if err2 != nil || io.EOF == err2 {
  245. break
  246. }
  247. fmt.Println(line)
  248. if t.LogFunc != nil {
  249. t.LogFunc(line)
  250. }
  251. if !checked && hasSuccFlag && ((strings.LastIndex(line, data.CmdSuccFlag) != -1) || (len(data.CmdSuccFlag2) > 0 && strings.LastIndex(line, data.CmdSuccFlag2) != -1)) {
  252. checked = true
  253. t.CmdSuccStarted = true
  254. }
  255. }
  256. }()
  257. go func() {
  258. checked := false
  259. for {
  260. line, err2 := reader2.ReadString('\n')
  261. if err2 != nil || io.EOF == err2 {
  262. break
  263. }
  264. fmt.Println(line)
  265. if !checked && hasSuccFlag && ((strings.LastIndex(line, data.CmdSuccFlag) != -1) || (len(data.CmdSuccFlag2) > 0 && strings.LastIndex(line, data.CmdSuccFlag2) != -1)) {
  266. t.CmdSuccStarted = true
  267. }
  268. }
  269. }()
  270. exeBase := path.Base(data.ExeFile)
  271. err := cmd.Start()
  272. if err != nil {
  273. fmt.Println(exeBase, " start failed", err.Error())
  274. return err
  275. }
  276. if data.KillPreExe {
  277. ioutil.WriteFile(filepath.Join(t.GetExeDir(), "pid"), []byte(fmt.Sprintf("%d", cmd.Process.Pid)), fs.ModePerm)
  278. }
  279. return cmd.Wait()
  280. }