123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- package worker
- import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "runtime/debug"
- "strconv"
- "time"
- "github.com/tnclong/go-que"
- "github.com/tnclong/go-que/pg"
- "go.uber.org/multierr"
- "gorm.io/gorm"
- )
- type goque struct {
- q que.Queue
- db *gorm.DB
- wks []*que.Worker
- }
- func NewGoQueQueue(db *gorm.DB) Queue {
- if db == nil {
- panic("db can not be nil")
- }
- var q que.Queue
- {
- rdb, err := db.DB()
- if err != nil {
- panic(err)
- }
- q, err = pg.New(rdb)
- if err != nil {
- panic(err)
- }
- }
- return &goque{
- q: q,
- db: db,
- }
- }
- func (q *goque) Add(ctx context.Context, job QueJobInterface) error {
- jobInfo, err := job.GetJobInfo()
- if err != nil {
- return err
- }
- runAt := time.Now()
- if scheduler, ok := jobInfo.Argument.(Scheduler); ok && scheduler.GetScheduleTime() != nil {
- runAt = scheduler.GetScheduleTime().In(time.Local)
- job.SetStatus(JobStatusScheduled)
- }
- _, err = q.q.Enqueue(ctx, nil, que.Plan{
- Queue: "worker_" + jobInfo.JobName,
- Args: que.Args(jobInfo.JobID, jobInfo.Argument),
- RunAt: runAt,
- })
- if err != nil {
- return err
- }
- return nil
- }
- func (q *goque) run(ctx context.Context, job QueJobInterface) error {
- job.StartRefresh()
- defer job.StopRefresh()
- return job.GetHandler()(ctx, job)
- }
- func (q *goque) Kill(ctx context.Context, job QueJobInterface) error {
- return job.SetStatus(JobStatusKilled)
- }
- func (q *goque) Remove(ctx context.Context, job QueJobInterface) error {
- return job.SetStatus(JobStatusCancelled)
- }
- func (q *goque) Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) (QueJobInterface, error)) error {
- for i, _ := range jobDefs {
- jd := jobDefs[i]
- if jd.Handler == nil {
- panic(fmt.Sprintf("job %s handler is nil", jd.Name))
- }
- worker, err := que.NewWorker(que.WorkerOptions{
- Queue: "worker_" + jd.Name,
- Mutex: q.q.Mutex(),
- MaxLockPerSecond: 10,
- MaxBufferJobsCount: 0,
- MaxPerformPerSecond: 2,
- MaxConcurrentPerformCount: 1,
- Perform: func(ctx context.Context, qj que.Job) (err error) {
- var job QueJobInterface
- {
- var sid string
- err = q.parseArgs(qj.Plan().Args, &sid)
- if err != nil {
- return err
- }
- id, err := strconv.Atoi(sid)
- if err != nil {
- return err
- }
- job, err = getJob(uint(id))
- if err != nil {
- return err
- }
- }
- defer func() {
- if r := recover(); r != nil {
- job.AddLog(string(debug.Stack()))
- job.SetProgressText(fmt.Sprint(r))
- job.SetStatus(JobStatusException)
- panic(r)
- }
- }()
- if job.GetStatus() == JobStatusCancelled {
- return qj.Expire(ctx, errors.New("job is cancelled"))
- }
- if job.GetStatus() != JobStatusNew && job.GetStatus() != JobStatusScheduled {
- job.SetStatus(JobStatusKilled)
- return errors.New("invalid job status, current status: " + job.GetStatus())
- }
- err = job.SetStatus(JobStatusRunning)
- if err != nil {
- return err
- }
- hctx, cf := context.WithCancel(ctx)
- hDoneC := make(chan struct{})
- isAborted := false
- go func() {
- timer := time.NewTicker(time.Second)
- for {
- select {
- case <-hDoneC:
- return
- case <-timer.C:
- status, _ := job.FetchAndSetStatus()
- if status == JobStatusKilled {
- isAborted = true
- cf()
- return
- }
- }
- }
- }()
- err = q.run(hctx, job)
- if !isAborted {
- hDoneC <- struct{}{}
- }
- if err != nil {
- job.SetProgressText(err.Error())
- job.SetStatus(JobStatusException)
- return err
- }
- if isAborted {
- return qj.Expire(ctx, errors.New("manually aborted"))
- }
- err = job.SetStatus(JobStatusDone)
- if err != nil {
- return err
- }
- return qj.Done(ctx)
- },
- })
- if err != nil {
- panic(err)
- }
- q.wks = append(q.wks, worker)
- go func() {
- if err := worker.Run(); err != nil {
- q.db.Create(&GoQueError{
- Error: fmt.Sprintf("worker Run() error: %s", err.Error()),
- })
- }
- }()
- }
- return nil
- }
- func (q *goque) Shutdown(ctx context.Context) error {
- var errs error
- for _, wk := range q.wks {
- if err := wk.Stop(ctx); err != nil {
- errs = multierr.Append(errs, err)
- }
- }
- return errs
- }
- func (q *goque) parseArgs(data []byte, args ...interface{}) error {
- d := json.NewDecoder(bytes.NewReader(data))
- if _, err := d.Token(); err != nil {
- return err
- }
- for _, arg := range args {
- if err := d.Decode(arg); err != nil {
- return err
- }
- }
- return nil
- }
|