goque.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package worker
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "runtime/debug"
  9. "strconv"
  10. "time"
  11. "github.com/tnclong/go-que"
  12. "github.com/tnclong/go-que/pg"
  13. "go.uber.org/multierr"
  14. "gorm.io/gorm"
  15. )
  16. type goque struct {
  17. q que.Queue
  18. db *gorm.DB
  19. wks []*que.Worker
  20. }
  21. func NewGoQueQueue(db *gorm.DB) Queue {
  22. if db == nil {
  23. panic("db can not be nil")
  24. }
  25. var q que.Queue
  26. {
  27. rdb, err := db.DB()
  28. if err != nil {
  29. panic(err)
  30. }
  31. q, err = pg.New(rdb)
  32. if err != nil {
  33. panic(err)
  34. }
  35. }
  36. return &goque{
  37. q: q,
  38. db: db,
  39. }
  40. }
  41. func (q *goque) Add(ctx context.Context, job QueJobInterface) error {
  42. jobInfo, err := job.GetJobInfo()
  43. if err != nil {
  44. return err
  45. }
  46. runAt := time.Now()
  47. if scheduler, ok := jobInfo.Argument.(Scheduler); ok && scheduler.GetScheduleTime() != nil {
  48. runAt = scheduler.GetScheduleTime().In(time.Local)
  49. job.SetStatus(JobStatusScheduled)
  50. }
  51. _, err = q.q.Enqueue(ctx, nil, que.Plan{
  52. Queue: "worker_" + jobInfo.JobName,
  53. Args: que.Args(jobInfo.JobID, jobInfo.Argument),
  54. RunAt: runAt,
  55. })
  56. if err != nil {
  57. return err
  58. }
  59. return nil
  60. }
  61. func (q *goque) run(ctx context.Context, job QueJobInterface) error {
  62. job.StartRefresh()
  63. defer job.StopRefresh()
  64. return job.GetHandler()(ctx, job)
  65. }
  66. func (q *goque) Kill(ctx context.Context, job QueJobInterface) error {
  67. return job.SetStatus(JobStatusKilled)
  68. }
  69. func (q *goque) Remove(ctx context.Context, job QueJobInterface) error {
  70. return job.SetStatus(JobStatusCancelled)
  71. }
  72. func (q *goque) Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) (QueJobInterface, error)) error {
  73. for i, _ := range jobDefs {
  74. jd := jobDefs[i]
  75. if jd.Handler == nil {
  76. panic(fmt.Sprintf("job %s handler is nil", jd.Name))
  77. }
  78. worker, err := que.NewWorker(que.WorkerOptions{
  79. Queue: "worker_" + jd.Name,
  80. Mutex: q.q.Mutex(),
  81. MaxLockPerSecond: 10,
  82. MaxBufferJobsCount: 0,
  83. MaxPerformPerSecond: 2,
  84. MaxConcurrentPerformCount: 1,
  85. Perform: func(ctx context.Context, qj que.Job) (err error) {
  86. var job QueJobInterface
  87. {
  88. var sid string
  89. err = q.parseArgs(qj.Plan().Args, &sid)
  90. if err != nil {
  91. return err
  92. }
  93. id, err := strconv.Atoi(sid)
  94. if err != nil {
  95. return err
  96. }
  97. job, err = getJob(uint(id))
  98. if err != nil {
  99. return err
  100. }
  101. }
  102. defer func() {
  103. if r := recover(); r != nil {
  104. job.AddLog(string(debug.Stack()))
  105. job.SetProgressText(fmt.Sprint(r))
  106. job.SetStatus(JobStatusException)
  107. panic(r)
  108. }
  109. }()
  110. if job.GetStatus() == JobStatusCancelled {
  111. return qj.Expire(ctx, errors.New("job is cancelled"))
  112. }
  113. if job.GetStatus() != JobStatusNew && job.GetStatus() != JobStatusScheduled {
  114. job.SetStatus(JobStatusKilled)
  115. return errors.New("invalid job status, current status: " + job.GetStatus())
  116. }
  117. err = job.SetStatus(JobStatusRunning)
  118. if err != nil {
  119. return err
  120. }
  121. hctx, cf := context.WithCancel(ctx)
  122. hDoneC := make(chan struct{})
  123. isAborted := false
  124. go func() {
  125. timer := time.NewTicker(time.Second)
  126. for {
  127. select {
  128. case <-hDoneC:
  129. return
  130. case <-timer.C:
  131. status, _ := job.FetchAndSetStatus()
  132. if status == JobStatusKilled {
  133. isAborted = true
  134. cf()
  135. return
  136. }
  137. }
  138. }
  139. }()
  140. err = q.run(hctx, job)
  141. if !isAborted {
  142. hDoneC <- struct{}{}
  143. }
  144. if err != nil {
  145. job.SetProgressText(err.Error())
  146. job.SetStatus(JobStatusException)
  147. return err
  148. }
  149. if isAborted {
  150. return qj.Expire(ctx, errors.New("manually aborted"))
  151. }
  152. err = job.SetStatus(JobStatusDone)
  153. if err != nil {
  154. return err
  155. }
  156. return qj.Done(ctx)
  157. },
  158. })
  159. if err != nil {
  160. panic(err)
  161. }
  162. q.wks = append(q.wks, worker)
  163. go func() {
  164. if err := worker.Run(); err != nil {
  165. q.db.Create(&GoQueError{
  166. Error: fmt.Sprintf("worker Run() error: %s", err.Error()),
  167. })
  168. }
  169. }()
  170. }
  171. return nil
  172. }
  173. func (q *goque) Shutdown(ctx context.Context) error {
  174. var errs error
  175. for _, wk := range q.wks {
  176. if err := wk.Stop(ctx); err != nil {
  177. errs = multierr.Append(errs, err)
  178. }
  179. }
  180. return errs
  181. }
  182. func (q *goque) parseArgs(data []byte, args ...interface{}) error {
  183. d := json.NewDecoder(bytes.NewReader(data))
  184. if _, err := d.Token(); err != nil {
  185. return err
  186. }
  187. for _, arg := range args {
  188. if err := d.Decode(arg); err != nil {
  189. return err
  190. }
  191. }
  192. return nil
  193. }