goque.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package worker
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "runtime/debug"
  9. "strconv"
  10. "time"
  11. "github.com/tnclong/go-que"
  12. "github.com/tnclong/go-que/pg"
  13. "gorm.io/gorm"
  14. )
  15. type goque struct {
  16. q que.Queue
  17. db *gorm.DB
  18. }
  19. func NewGoQueQueue(db *gorm.DB) Queue {
  20. if db == nil {
  21. panic("db can not be nil")
  22. }
  23. var q que.Queue
  24. {
  25. rdb, err := db.DB()
  26. if err != nil {
  27. panic(err)
  28. }
  29. q, err = pg.New(rdb)
  30. if err != nil {
  31. panic(err)
  32. }
  33. }
  34. return &goque{
  35. q: q,
  36. db: db,
  37. }
  38. }
  39. func (q *goque) Add(job QueJobInterface) error {
  40. jobInfo, err := job.GetJobInfo()
  41. if err != nil {
  42. return err
  43. }
  44. runAt := time.Now()
  45. if scheduler, ok := jobInfo.Argument.(Scheduler); ok && scheduler.GetScheduleTime() != nil {
  46. runAt = scheduler.GetScheduleTime().In(time.Local)
  47. job.SetStatus(JobStatusScheduled)
  48. }
  49. _, err = q.q.Enqueue(context.Background(), nil, que.Plan{
  50. Queue: "worker_" + jobInfo.JobName,
  51. Args: que.Args(jobInfo.JobID, jobInfo.Argument),
  52. RunAt: runAt,
  53. })
  54. if err != nil {
  55. return err
  56. }
  57. return nil
  58. }
  59. func (q *goque) run(ctx context.Context, job QueJobInterface) error {
  60. job.StartRefresh()
  61. defer job.StopRefresh()
  62. return job.GetHandler()(ctx, job)
  63. }
  64. func (q *goque) Kill(job QueJobInterface) error {
  65. return job.SetStatus(JobStatusKilled)
  66. }
  67. func (q *goque) Remove(job QueJobInterface) error {
  68. return job.SetStatus(JobStatusCancelled)
  69. }
  70. func (q *goque) Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) (QueJobInterface, error)) error {
  71. for i, _ := range jobDefs {
  72. jd := jobDefs[i]
  73. if jd.Handler == nil {
  74. panic(fmt.Sprintf("job %s handler is nil", jd.Name))
  75. }
  76. worker, err := que.NewWorker(que.WorkerOptions{
  77. Queue: "worker_" + jd.Name,
  78. Mutex: q.q.Mutex(),
  79. MaxLockPerSecond: 10,
  80. MaxBufferJobsCount: 0,
  81. MaxPerformPerSecond: 2,
  82. MaxConcurrentPerformCount: 1,
  83. Perform: func(ctx context.Context, qj que.Job) (err error) {
  84. var job QueJobInterface
  85. {
  86. var sid string
  87. err = q.parseArgs(qj.Plan().Args, &sid)
  88. if err != nil {
  89. return err
  90. }
  91. id, err := strconv.Atoi(sid)
  92. if err != nil {
  93. return err
  94. }
  95. job, err = getJob(uint(id))
  96. if err != nil {
  97. return err
  98. }
  99. }
  100. defer func() {
  101. if r := recover(); r != nil {
  102. job.AddLog(string(debug.Stack()))
  103. job.SetProgressText(fmt.Sprint(r))
  104. job.SetStatus(JobStatusException)
  105. panic(r)
  106. }
  107. }()
  108. if job.GetStatus() == JobStatusCancelled {
  109. return qj.Expire(ctx, errors.New("job is cancelled"))
  110. }
  111. if job.GetStatus() != JobStatusNew && job.GetStatus() != JobStatusScheduled {
  112. job.SetStatus(JobStatusKilled)
  113. return errors.New("invalid job status, current status: " + job.GetStatus())
  114. }
  115. err = job.SetStatus(JobStatusRunning)
  116. if err != nil {
  117. return err
  118. }
  119. hctx, cf := context.WithCancel(context.Background())
  120. hDoneC := make(chan struct{})
  121. isAborted := false
  122. go func() {
  123. timer := time.NewTicker(time.Second)
  124. for {
  125. select {
  126. case <-hDoneC:
  127. return
  128. case <-timer.C:
  129. status, _ := job.FetchAndSetStatus()
  130. if status == JobStatusKilled {
  131. isAborted = true
  132. cf()
  133. return
  134. }
  135. }
  136. }
  137. }()
  138. err = q.run(hctx, job)
  139. if !isAborted {
  140. hDoneC <- struct{}{}
  141. }
  142. if err != nil {
  143. job.SetProgressText(err.Error())
  144. job.SetStatus(JobStatusException)
  145. return err
  146. }
  147. if isAborted {
  148. return qj.Expire(ctx, errors.New("manually aborted"))
  149. }
  150. err = job.SetStatus(JobStatusDone)
  151. if err != nil {
  152. return err
  153. }
  154. return qj.Done(ctx)
  155. },
  156. })
  157. if err != nil {
  158. panic(err)
  159. }
  160. go func() {
  161. if err := worker.Run(); err != nil {
  162. errStr := fmt.Sprintf("worker Run() error: %s", err.Error())
  163. fmt.Println(errStr)
  164. q.db.Create(&GoQueError{
  165. Error: errStr,
  166. })
  167. }
  168. }()
  169. }
  170. return nil
  171. }
  172. func (q *goque) parseArgs(data []byte, args ...interface{}) error {
  173. d := json.NewDecoder(bytes.NewReader(data))
  174. if _, err := d.Token(); err != nil {
  175. return err
  176. }
  177. for _, arg := range args {
  178. if err := d.Decode(arg); err != nil {
  179. return err
  180. }
  181. }
  182. return nil
  183. }