worker_mock_que.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package example_basics
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "runtime/debug"
  7. "time"
  8. "github.com/qor5/admin/presets"
  9. "github.com/qor5/admin/presets/gorm2op"
  10. "github.com/qor5/admin/worker"
  11. "github.com/qor5/admin/worker/mock"
  12. )
  13. func WorkerExampleMock(b *presets.Builder) {
  14. b.URIPrefix(WorkerExamplePath).
  15. DataOperator(gorm2op.DataOperator(DB))
  16. wb := worker.NewWithQueue(DB, Que)
  17. wb.Configure(b)
  18. addJobs(wb)
  19. wb.Listen()
  20. }
  21. const WorkerExamplePath = "/samples/worker"
  22. var Que = &mock.QueueMock{
  23. AddFunc: func(job worker.QueJobInterface) error {
  24. jobInfo, err := job.GetJobInfo()
  25. if err != nil {
  26. return err
  27. }
  28. if scheduler, ok := jobInfo.Argument.(worker.Scheduler); ok && scheduler.GetScheduleTime() != nil {
  29. job.SetStatus(worker.JobStatusScheduled)
  30. go func() {
  31. time.Sleep(scheduler.GetScheduleTime().Sub(time.Now()))
  32. ConsumeQueItem(job)
  33. }()
  34. } else {
  35. go func() {
  36. ConsumeQueItem(job)
  37. }()
  38. }
  39. return nil
  40. },
  41. KillFunc: func(job worker.QueJobInterface) error {
  42. return job.SetStatus(worker.JobStatusKilled)
  43. },
  44. ListenFunc: func(jobDefs []*worker.QorJobDefinition, getJob func(qorJobID uint) (worker.QueJobInterface, error)) error {
  45. return nil
  46. },
  47. RemoveFunc: func(job worker.QueJobInterface) error {
  48. return job.SetStatus(worker.JobStatusCancelled)
  49. },
  50. }
  51. func ConsumeQueItem(job worker.QueJobInterface) (err error) {
  52. defer func() {
  53. if r := recover(); r != nil {
  54. job.AddLog(string(debug.Stack()))
  55. job.SetProgressText(fmt.Sprint(r))
  56. job.SetStatus(worker.JobStatusException)
  57. job.StopRefresh()
  58. }
  59. }()
  60. if job.GetStatus() == worker.JobStatusCancelled {
  61. return
  62. }
  63. if job.GetStatus() != worker.JobStatusNew && job.GetStatus() != worker.JobStatusScheduled {
  64. job.SetStatus(worker.JobStatusKilled)
  65. return errors.New("invalid job status, current status: " + job.GetStatus())
  66. }
  67. err = job.SetStatus(worker.JobStatusRunning)
  68. if err != nil {
  69. return err
  70. }
  71. time.Sleep(100 * time.Millisecond)
  72. hctx, cf := context.WithCancel(context.Background())
  73. hDoneC := make(chan struct{})
  74. isAborted := false
  75. go func() {
  76. timer := time.NewTicker(time.Second)
  77. for {
  78. select {
  79. case <-hDoneC:
  80. return
  81. case <-timer.C:
  82. status, _ := job.FetchAndSetStatus()
  83. if status == worker.JobStatusKilled {
  84. isAborted = true
  85. cf()
  86. return
  87. }
  88. }
  89. }
  90. }()
  91. job.StartRefresh()
  92. err = job.GetHandler()(hctx, job)
  93. job.StopRefresh()
  94. if !isAborted {
  95. hDoneC <- struct{}{}
  96. }
  97. if err != nil {
  98. job.SetProgressText(err.Error())
  99. job.SetStatus(worker.JobStatusException)
  100. return err
  101. }
  102. if isAborted {
  103. return
  104. }
  105. err = job.SetStatus(worker.JobStatusDone)
  106. if err != nil {
  107. return err
  108. }
  109. return
  110. }