test_que.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package integration
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "runtime/debug"
  7. "time"
  8. "github.com/qor5/admin/worker"
  9. "github.com/qor5/admin/worker/mock"
  10. )
  11. var items []worker.QueJobInterface
  12. var Que = &mock.QueueMock{
  13. AddFunc: func(ctx context.Context, job worker.QueJobInterface) error {
  14. jobInfo, err := job.GetJobInfo()
  15. if err != nil {
  16. return err
  17. }
  18. if scheduler, ok := jobInfo.Argument.(worker.Scheduler); ok && scheduler.GetScheduleTime() != nil {
  19. job.SetStatus(worker.JobStatusScheduled)
  20. }
  21. items = append(items, job)
  22. return nil
  23. },
  24. KillFunc: func(ctx context.Context, job worker.QueJobInterface) error {
  25. return job.SetStatus(worker.JobStatusKilled)
  26. },
  27. ListenFunc: func(jobDefs []*worker.QorJobDefinition, getJob func(qorJobID uint) (worker.QueJobInterface, error)) error {
  28. return nil
  29. },
  30. RemoveFunc: func(ctx context.Context, job worker.QueJobInterface) error {
  31. return job.SetStatus(worker.JobStatusCancelled)
  32. },
  33. ShutdownFunc: func(ctx context.Context) error {
  34. return nil
  35. },
  36. }
  37. func ConsumeQueItem() (err error) {
  38. if len(items) == 0 {
  39. return
  40. }
  41. job := items[0]
  42. items = items[1:]
  43. defer func() {
  44. if r := recover(); r != nil {
  45. job.AddLog(string(debug.Stack()))
  46. job.SetProgressText(fmt.Sprint(r))
  47. job.SetStatus(worker.JobStatusException)
  48. panic(r)
  49. }
  50. }()
  51. if job.GetStatus() == worker.JobStatusCancelled {
  52. return
  53. }
  54. if job.GetStatus() != worker.JobStatusNew && job.GetStatus() != worker.JobStatusScheduled {
  55. job.SetStatus(worker.JobStatusKilled)
  56. return errors.New("invalid job status, current status: " + job.GetStatus())
  57. }
  58. err = job.SetStatus(worker.JobStatusRunning)
  59. if err != nil {
  60. return err
  61. }
  62. hctx, cf := context.WithCancel(context.Background())
  63. hDoneC := make(chan struct{})
  64. isAborted := false
  65. go func() {
  66. timer := time.NewTicker(time.Second)
  67. for {
  68. select {
  69. case <-hDoneC:
  70. return
  71. case <-timer.C:
  72. status, _ := job.FetchAndSetStatus()
  73. if status == worker.JobStatusKilled {
  74. isAborted = true
  75. cf()
  76. return
  77. }
  78. }
  79. }
  80. }()
  81. job.StartRefresh()
  82. err = job.GetHandler()(hctx, job)
  83. job.StopRefresh()
  84. if !isAborted {
  85. hDoneC <- struct{}{}
  86. }
  87. if err != nil {
  88. job.SetProgressText(err.Error())
  89. job.SetStatus(worker.JobStatusException)
  90. return err
  91. }
  92. if isAborted {
  93. return
  94. }
  95. err = job.SetStatus(worker.JobStatusDone)
  96. if err != nil {
  97. return err
  98. }
  99. return
  100. }