test_que.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package integration
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "runtime/debug"
  7. "time"
  8. "github.com/qor5/admin/worker"
  9. "github.com/qor5/admin/worker/mock"
  10. )
  11. var items []worker.QueJobInterface
  12. var Que = &mock.QueueMock{
  13. AddFunc: func(job worker.QueJobInterface) error {
  14. jobInfo, err := job.GetJobInfo()
  15. if err != nil {
  16. return err
  17. }
  18. if scheduler, ok := jobInfo.Argument.(worker.Scheduler); ok && scheduler.GetScheduleTime() != nil {
  19. job.SetStatus(worker.JobStatusScheduled)
  20. }
  21. items = append(items, job)
  22. return nil
  23. },
  24. KillFunc: func(job worker.QueJobInterface) error {
  25. return job.SetStatus(worker.JobStatusKilled)
  26. },
  27. ListenFunc: func(jobDefs []*worker.QorJobDefinition, getJob func(qorJobID uint) (worker.QueJobInterface, error)) error {
  28. return nil
  29. },
  30. RemoveFunc: func(job worker.QueJobInterface) error {
  31. return job.SetStatus(worker.JobStatusCancelled)
  32. },
  33. }
  34. func ConsumeQueItem() (err error) {
  35. if len(items) == 0 {
  36. return
  37. }
  38. job := items[0]
  39. items = items[1:]
  40. defer func() {
  41. if r := recover(); r != nil {
  42. job.AddLog(string(debug.Stack()))
  43. job.SetProgressText(fmt.Sprint(r))
  44. job.SetStatus(worker.JobStatusException)
  45. panic(r)
  46. }
  47. }()
  48. if job.GetStatus() == worker.JobStatusCancelled {
  49. return
  50. }
  51. if job.GetStatus() != worker.JobStatusNew && job.GetStatus() != worker.JobStatusScheduled {
  52. job.SetStatus(worker.JobStatusKilled)
  53. return errors.New("invalid job status, current status: " + job.GetStatus())
  54. }
  55. err = job.SetStatus(worker.JobStatusRunning)
  56. if err != nil {
  57. return err
  58. }
  59. hctx, cf := context.WithCancel(context.Background())
  60. hDoneC := make(chan struct{})
  61. isAborted := false
  62. go func() {
  63. timer := time.NewTicker(time.Second)
  64. for {
  65. select {
  66. case <-hDoneC:
  67. return
  68. case <-timer.C:
  69. status, _ := job.FetchAndSetStatus()
  70. if status == worker.JobStatusKilled {
  71. isAborted = true
  72. cf()
  73. return
  74. }
  75. }
  76. }
  77. }()
  78. job.StartRefresh()
  79. err = job.GetHandler()(hctx, job)
  80. job.StopRefresh()
  81. if !isAborted {
  82. hDoneC <- struct{}{}
  83. }
  84. if err != nil {
  85. job.SetProgressText(err.Error())
  86. job.SetStatus(worker.JobStatusException)
  87. return err
  88. }
  89. if isAborted {
  90. return
  91. }
  92. err = job.SetStatus(worker.JobStatusDone)
  93. if err != nil {
  94. return err
  95. }
  96. return
  97. }