cron.go 6.0 KB


  1. package worker
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "flag"
  7. "fmt"
  8. "os"
  9. "os/exec"
  10. "os/signal"
  11. "path/filepath"
  12. "reflect"
  13. "runtime/debug"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "syscall"
  18. "time"
  19. )
  20. type cronJob struct {
  21. JobID string
  22. Pid int
  23. Command string
  24. Delete bool `json:"-"`
  25. }
  26. func (job cronJob) ToString() string {
  27. marshal, _ := json.Marshal(job)
  28. return fmt.Sprintf("## BEGIN QOR JOB %v # %v\n%v\n## END QOR JOB\n", job.JobID, string(marshal), job.Command)
  29. }
  30. // Cron implemented a worker Queue based on cronjob
  31. type cron struct {
  32. Jobs []*cronJob
  33. CronJobs []string
  34. mutex sync.Mutex `sql:"-"`
  35. }
  36. // NewCronQueue initialize a Cron queue
  37. func NewCronQueue() Queue {
  38. return &cron{}
  39. }
  40. func (c *cron) parseJobs() []*cronJob {
  41. c.mutex.Lock()
  42. c.Jobs = []*cronJob{}
  43. c.CronJobs = []string{}
  44. if out, err := exec.Command("crontab", "-l").Output(); err == nil {
  45. var inQorJob bool
  46. for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
  47. if strings.HasPrefix(line, "## BEGIN QOR JOB") {
  48. inQorJob = true
  49. if idx := strings.Index(line, "{"); idx > 1 {
  50. var job cronJob
  51. if json.Unmarshal([]byte(line[idx-1:]), &job) == nil {
  52. c.Jobs = append(c.Jobs, &job)
  53. }
  54. }
  55. }
  56. if !inQorJob {
  57. c.CronJobs = append(c.CronJobs, line)
  58. }
  59. if strings.HasPrefix(line, "## END QOR JOB") {
  60. inQorJob = false
  61. }
  62. }
  63. }
  64. return c.Jobs
  65. }
  66. func (c *cron) writeCronJob() error {
  67. defer c.mutex.Unlock()
  68. cmd := exec.Command("crontab", "-")
  69. cmd.Stdout = os.Stdout
  70. cmd.Stderr = os.Stderr
  71. stdin, _ := cmd.StdinPipe()
  72. for _, cronJob := range c.CronJobs {
  73. stdin.Write([]byte(cronJob + "\n"))
  74. }
  75. for _, job := range c.Jobs {
  76. if !job.Delete {
  77. stdin.Write([]byte(job.ToString() + "\n"))
  78. }
  79. }
  80. stdin.Close()
  81. return cmd.Run()
  82. }
  83. // Add a job to cron queue
  84. func (c *cron) Add(job QueJobInterface) (err error) {
  85. c.parseJobs()
  86. defer c.writeCronJob()
  87. jobInfo, err := job.GetJobInfo()
  88. if err != nil {
  89. return err
  90. }
  91. var binaryFile string
  92. if binaryFile, err = filepath.Abs(os.Args[0]); err == nil {
  93. var jobs []*cronJob
  94. for _, cronJob := range c.Jobs {
  95. if cronJob.JobID != jobInfo.JobID {
  96. jobs = append(jobs, cronJob)
  97. }
  98. }
  99. if scheduler, ok := jobInfo.Argument.(Scheduler); ok && scheduler.GetScheduleTime() != nil {
  100. scheduleTime := scheduler.GetScheduleTime().In(time.Local)
  101. job.SetStatus(JobStatusScheduled)
  102. currentPath, _ := os.Getwd()
  103. jobs = append(jobs, &cronJob{
  104. JobID: jobInfo.JobID,
  105. Command: fmt.Sprintf("%d %d %d %d * cd %v; %v --qor-job %v\n", scheduleTime.Minute(), scheduleTime.Hour(), scheduleTime.Day(), scheduleTime.Month(), currentPath, binaryFile, jobInfo.JobID),
  106. })
  107. } else {
  108. cmd := exec.Command(binaryFile, "--qor-job", jobInfo.JobID)
  109. if err = cmd.Start(); err == nil {
  110. jobs = append(jobs, &cronJob{JobID: jobInfo.JobID, Pid: cmd.Process.Pid})
  111. cmd.Process.Release()
  112. }
  113. }
  114. c.Jobs = jobs
  115. }
  116. return
  117. }
  118. // Run a job from cron queue
  119. func (c *cron) run(qorJob QueJobInterface) (err error) {
  120. jobInfo, err := qorJob.GetJobInfo()
  121. if err != nil {
  122. return err
  123. }
  124. h := qorJob.GetHandler()
  125. if h == nil {
  126. panic(fmt.Sprintf("job %v no handler", jobInfo.JobName))
  127. }
  128. go func() {
  129. sigint := make(chan os.Signal, 1)
  130. // interrupt signal sent from terminal
  131. signal.Notify(sigint, syscall.SIGINT)
  132. // sigterm signal sent from kubernetes
  133. signal.Notify(sigint, syscall.SIGTERM)
  134. i := <-sigint
  135. qorJob.SetProgressText(fmt.Sprintf("Worker killed by signal %s", i.String()))
  136. qorJob.SetStatus(JobStatusKilled)
  137. qorJob.StopRefresh()
  138. os.Exit(int(reflect.ValueOf(i).Int()))
  139. }()
  140. qorJob.StartRefresh()
  141. defer qorJob.StopRefresh()
  142. err = h(context.Background(), qorJob)
  143. if err == nil {
  144. c.parseJobs()
  145. defer c.writeCronJob()
  146. for _, cronJob := range c.Jobs {
  147. if cronJob.JobID == jobInfo.JobID {
  148. cronJob.Delete = true
  149. }
  150. }
  151. }
  152. return err
  153. }
  154. // Kill a job from cron queue
  155. func (c *cron) Kill(job QueJobInterface) (err error) {
  156. c.parseJobs()
  157. defer c.writeCronJob()
  158. jobInfo, err := job.GetJobInfo()
  159. if err != nil {
  160. return err
  161. }
  162. for _, cronJob := range c.Jobs {
  163. if cronJob.JobID == jobInfo.JobID {
  164. if process, err := os.FindProcess(cronJob.Pid); err == nil {
  165. if err = process.Kill(); err == nil {
  166. cronJob.Delete = true
  167. return job.SetStatus(JobStatusKilled)
  168. }
  169. }
  170. return err
  171. }
  172. }
  173. return errors.New("failed to find job")
  174. }
  175. // Remove a job from cron queue
  176. func (c *cron) Remove(job QueJobInterface) error {
  177. c.parseJobs()
  178. defer c.writeCronJob()
  179. jobInfo, err := job.GetJobInfo()
  180. if err != nil {
  181. return err
  182. }
  183. for _, cronJob := range c.Jobs {
  184. if cronJob.JobID == jobInfo.JobID {
  185. if cronJob.Pid == 0 {
  186. cronJob.Delete = true
  187. return job.SetStatus(JobStatusKilled)
  188. }
  189. return errors.New("failed to remove current job as it is running")
  190. }
  191. }
  192. return errors.New("failed to find job")
  193. }
  194. func (c *cron) Listen(_ []*QorJobDefinition, getJob func(qorJobID uint) (QueJobInterface, error)) error {
  195. cmdLine := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
  196. qorJobID := cmdLine.String("qor-job", "", "Qor Job ID")
  197. cmdLine.Parse(os.Args[1:])
  198. if *qorJobID != "" {
  199. id, err := strconv.ParseUint(*qorJobID, 10, 64)
  200. if err != nil {
  201. fmt.Println(err)
  202. os.Exit(1)
  203. }
  204. job, err := getJob(uint(id))
  205. if err != nil {
  206. fmt.Println(err)
  207. os.Exit(1)
  208. }
  209. if err := c.doRunJob(job); err == nil {
  210. os.Exit(0)
  211. } else {
  212. fmt.Println(err)
  213. os.Exit(1)
  214. }
  215. }
  216. return nil
  217. }
  218. func (c *cron) doRunJob(job QueJobInterface) error {
  219. defer func() {
  220. if r := recover(); r != nil {
  221. job.AddLog(string(debug.Stack()))
  222. job.SetProgressText(fmt.Sprint(r))
  223. job.SetStatus(JobStatusException)
  224. }
  225. }()
  226. if job.GetStatus() != JobStatusNew && job.GetStatus() != JobStatusScheduled {
  227. return errors.New("invalid job status, current status: " + job.GetStatus())
  228. }
  229. if err := job.SetStatus(JobStatusRunning); err == nil {
  230. if err := c.run(job); err == nil {
  231. return job.SetStatus(JobStatusDone)
  232. }
  233. job.SetProgressText(err.Error())
  234. job.SetStatus(JobStatusException)
  235. }
  236. return nil
  237. }