Sfoglia il codice sorgente

worker: ctx for Queue methods

xuxin 1 anno fa
parent
commit
710fc71edc
7 ha cambiato i file con 137 aggiunte e 68 eliminazioni
  1. 2 2
      worker/action_job.go
  2. 11 10
      worker/builder.go
  3. 9 9
      worker/cron.go
  4. 6 6
      worker/goque.go
  5. 6 3
      worker/integration_test/test_que.go
  6. 97 34
      worker/mock/queue.go
  7. 6 4
      worker/queue.go

+ 2 - 2
worker/action_job.go

@@ -253,9 +253,9 @@ func (b *Builder) eventActionJobClose(ctx *web.EventContext) (er web.EventRespon
 
 	switch inst.Status {
 	case JobStatusRunning:
-		err = b.q.Kill(inst)
+		err = b.q.Kill(ctx.R.Context(), inst)
 	case JobStatusNew, JobStatusScheduled:
-		err = b.q.Remove(inst)
+		err = b.q.Remove(ctx.R.Context(), inst)
 	}
 
 	return er, err

+ 11 - 10
worker/builder.go

@@ -1,6 +1,7 @@
 package worker
 
 import (
+	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -392,8 +393,8 @@ func (b *Builder) Listen() {
 	}
 }
 
-func (b *Builder) Shutdown() error {
-	return b.q.Shutdown()
+func (b *Builder) Shutdown(ctx context.Context) error {
+	return b.q.Shutdown(ctx)
 }
 
 func (b *Builder) createJob(ctx *web.EventContext, qorJob *QorJob) (j *QorJob, err error) {
@@ -443,7 +444,7 @@ func (b *Builder) createJob(ctx *web.EventContext, qorJob *QorJob) (j *QorJob, e
 		if err != nil {
 			return err
 		}
-		return b.q.Add(inst)
+		return b.q.Add(ctx.R.Context(), inst)
 	})
 	return
 }
@@ -481,7 +482,7 @@ func (b *Builder) eventAbortJob(ctx *web.EventContext) (er web.EventResponse, er
 	}
 	isScheduled := inst.Status == JobStatusScheduled
 
-	err = b.doAbortJob(inst)
+	err = b.doAbortJob(ctx.R.Context(), inst)
 	if err != nil {
 		_, ok := err.(*cannotAbortError)
 		if !ok {
@@ -521,12 +522,12 @@ func (e *cannotAbortError) Error() string {
 	return e.err.Error()
 }
 
-func (b *Builder) doAbortJob(inst *QorJobInstance) (err error) {
+func (b *Builder) doAbortJob(ctx context.Context, inst *QorJobInstance) (err error) {
 	switch inst.Status {
 	case JobStatusRunning:
-		return b.q.Kill(inst)
+		return b.q.Kill(ctx, inst)
 	case JobStatusNew, JobStatusScheduled:
-		return b.q.Remove(inst)
+		return b.q.Remove(ctx, inst)
 	default:
 		return &cannotAbortError{
 			err: fmt.Errorf("job status is %s, cannot be aborted/canceled", inst.Status),
@@ -559,7 +560,7 @@ func (b *Builder) eventRerunJob(ctx *web.EventContext) (er web.EventResponse, er
 	if err != nil {
 		return er, err
 	}
-	err = b.q.Add(inst)
+	err = b.q.Add(ctx.R.Context(), inst)
 	if err != nil {
 		return er, err
 	}
@@ -608,7 +609,7 @@ func (b *Builder) eventUpdateJob(ctx *web.EventContext) (er web.EventResponse, e
 		return er, err
 	}
 	oldArgs, _ := jb.parseArgs(old.Args)
-	err = b.doAbortJob(old)
+	err = b.doAbortJob(ctx.R.Context(), old)
 	if err != nil {
 		_, ok := err.(*cannotAbortError)
 		if !ok {
@@ -628,7 +629,7 @@ func (b *Builder) eventUpdateJob(ctx *web.EventContext) (er web.EventResponse, e
 	if err != nil {
 		return er, err
 	}
-	err = b.q.Add(newInst)
+	err = b.q.Add(ctx.R.Context(), newInst)
 	if err != nil {
 		return er, err
 	}

+ 9 - 9
worker/cron.go

@@ -94,7 +94,7 @@ func (c *cron) writeCronJob() error {
 }
 
 // Add a job to cron queue
-func (c *cron) Add(job QueJobInterface) (err error) {
+func (c *cron) Add(ctx context.Context, job QueJobInterface) (err error) {
 	c.parseJobs()
 	defer c.writeCronJob()
 
@@ -135,7 +135,7 @@ func (c *cron) Add(job QueJobInterface) (err error) {
 }
 
 // Run a job from cron queue
-func (c *cron) run(qorJob QueJobInterface) (err error) {
+func (c *cron) run(ctx context.Context, qorJob QueJobInterface) (err error) {
 	jobInfo, err := qorJob.GetJobInfo()
 	if err != nil {
 		return err
@@ -166,7 +166,7 @@ func (c *cron) run(qorJob QueJobInterface) (err error) {
 	qorJob.StartRefresh()
 	defer qorJob.StopRefresh()
 
-	err = h(context.Background(), qorJob)
+	err = h(ctx, qorJob)
 	if err == nil {
 		c.parseJobs()
 		defer c.writeCronJob()
@@ -180,7 +180,7 @@ func (c *cron) run(qorJob QueJobInterface) (err error) {
 }
 
 // Kill a job from cron queue
-func (c *cron) Kill(job QueJobInterface) (err error) {
+func (c *cron) Kill(ctx context.Context, job QueJobInterface) (err error) {
 	c.parseJobs()
 	defer c.writeCronJob()
 
@@ -204,7 +204,7 @@ func (c *cron) Kill(job QueJobInterface) (err error) {
 }
 
 // Remove a job from cron queue
-func (c *cron) Remove(job QueJobInterface) error {
+func (c *cron) Remove(ctx context.Context, job QueJobInterface) error {
 	c.parseJobs()
 	defer c.writeCronJob()
 
@@ -242,7 +242,7 @@ func (c *cron) Listen(_ []*QorJobDefinition, getJob func(qorJobID uint) (QueJobI
 			fmt.Println(err)
 			os.Exit(1)
 		}
-		if err := c.doRunJob(job); err == nil {
+		if err := c.doRunJob(context.Background(), job); err == nil {
 			os.Exit(0)
 		} else {
 			fmt.Println(err)
@@ -253,7 +253,7 @@ func (c *cron) Listen(_ []*QorJobDefinition, getJob func(qorJobID uint) (QueJobI
 	return nil
 }
 
-func (c *cron) doRunJob(job QueJobInterface) error {
+func (c *cron) doRunJob(ctx context.Context, job QueJobInterface) error {
 	defer func() {
 		if r := recover(); r != nil {
 			job.AddLog(string(debug.Stack()))
@@ -267,7 +267,7 @@ func (c *cron) doRunJob(job QueJobInterface) error {
 	}
 
 	if err := job.SetStatus(JobStatusRunning); err == nil {
-		if err := c.run(job); err == nil {
+		if err := c.run(ctx, job); err == nil {
 			return job.SetStatus(JobStatusDone)
 		}
 
@@ -278,6 +278,6 @@ func (c *cron) doRunJob(job QueJobInterface) error {
 	return nil
 }
 
-func (c *cron) Shutdown() error {
+func (c *cron) Shutdown(ctx context.Context) error {
 	return nil
 }

+ 6 - 6
worker/goque.go

@@ -45,7 +45,7 @@ func NewGoQueQueue(db *gorm.DB) Queue {
 	}
 }
 
-func (q *goque) Add(job QueJobInterface) error {
+func (q *goque) Add(ctx context.Context, job QueJobInterface) error {
 	jobInfo, err := job.GetJobInfo()
 
 	if err != nil {
@@ -57,7 +57,7 @@ func (q *goque) Add(job QueJobInterface) error {
 		job.SetStatus(JobStatusScheduled)
 	}
 
-	_, err = q.q.Enqueue(context.Background(), nil, que.Plan{
+	_, err = q.q.Enqueue(ctx, nil, que.Plan{
 		Queue: "worker_" + jobInfo.JobName,
 		Args:  que.Args(jobInfo.JobID, jobInfo.Argument),
 		RunAt: runAt,
@@ -76,11 +76,11 @@ func (q *goque) run(ctx context.Context, job QueJobInterface) error {
 	return job.GetHandler()(ctx, job)
 }
 
-func (q *goque) Kill(job QueJobInterface) error {
+func (q *goque) Kill(ctx context.Context, job QueJobInterface) error {
 	return job.SetStatus(JobStatusKilled)
 }
 
-func (q *goque) Remove(job QueJobInterface) error {
+func (q *goque) Remove(ctx context.Context, job QueJobInterface) error {
 	return job.SetStatus(JobStatusCancelled)
 }
 
@@ -192,10 +192,10 @@ func (q *goque) Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) (
 	return nil
 }
 
-func (q *goque) Shutdown() error {
+func (q *goque) Shutdown(ctx context.Context) error {
 	var errs error
 	for _, wk := range q.wks {
-		if err := wk.Stop(context.Background()); err != nil {
+		if err := wk.Stop(ctx); err != nil {
 			errs = multierr.Append(errs, err)
 		}
 	}

+ 6 - 3
worker/integration_test/test_que.go

@@ -14,7 +14,7 @@ import (
 var items []worker.QueJobInterface
 
 var Que = &mock.QueueMock{
-	AddFunc: func(job worker.QueJobInterface) error {
+	AddFunc: func(ctx context.Context, job worker.QueJobInterface) error {
 		jobInfo, err := job.GetJobInfo()
 
 		if err != nil {
@@ -26,15 +26,18 @@ var Que = &mock.QueueMock{
 		items = append(items, job)
 		return nil
 	},
-	KillFunc: func(job worker.QueJobInterface) error {
+	KillFunc: func(ctx context.Context, job worker.QueJobInterface) error {
 		return job.SetStatus(worker.JobStatusKilled)
 	},
 	ListenFunc: func(jobDefs []*worker.QorJobDefinition, getJob func(qorJobID uint) (worker.QueJobInterface, error)) error {
 		return nil
 	},
-	RemoveFunc: func(job worker.QueJobInterface) error {
+	RemoveFunc: func(ctx context.Context, job worker.QueJobInterface) error {
 		return job.SetStatus(worker.JobStatusCancelled)
 	},
+	ShutdownFunc: func(ctx context.Context) error {
+		return nil
+	},
 }
 
 func ConsumeQueItem() (err error) {

+ 97 - 34
worker/mock/queue.go

@@ -4,6 +4,7 @@
 package mock
 
 import (
+	"context"
 	"github.com/qor5/admin/worker"
 	"sync"
 )
@@ -18,18 +19,21 @@ var _ worker.Queue = &QueueMock{}
 //
 //		// make and configure a mocked worker.Queue
 //		mockedQueue := &QueueMock{
-//			AddFunc: func(queJobInterface worker.QueJobInterface) error {
+//			AddFunc: func(ctx context.Context, job worker.QueJobInterface) error {
 //				panic("mock out the Add method")
 //			},
-//			KillFunc: func(queJobInterface worker.QueJobInterface) error {
+//			KillFunc: func(ctx context.Context, job worker.QueJobInterface) error {
 //				panic("mock out the Kill method")
 //			},
 //			ListenFunc: func(jobDefs []*worker.QorJobDefinition, getJob func(qorJobID uint) (worker.QueJobInterface, error)) error {
 //				panic("mock out the Listen method")
 //			},
-//			RemoveFunc: func(queJobInterface worker.QueJobInterface) error {
+//			RemoveFunc: func(ctx context.Context, job worker.QueJobInterface) error {
 //				panic("mock out the Remove method")
 //			},
+//			ShutdownFunc: func(ctx context.Context) error {
+//				panic("mock out the Shutdown method")
+//			},
 //		}
 //
 //		// use mockedQueue in code that requires worker.Queue
@@ -38,28 +42,35 @@ var _ worker.Queue = &QueueMock{}
 //	}
 type QueueMock struct {
 	// AddFunc mocks the Add method.
-	AddFunc func(queJobInterface worker.QueJobInterface) error
+	AddFunc func(ctx context.Context, job worker.QueJobInterface) error
 
 	// KillFunc mocks the Kill method.
-	KillFunc func(queJobInterface worker.QueJobInterface) error
+	KillFunc func(ctx context.Context, job worker.QueJobInterface) error
 
 	// ListenFunc mocks the Listen method.
 	ListenFunc func(jobDefs []*worker.QorJobDefinition, getJob func(qorJobID uint) (worker.QueJobInterface, error)) error
 
 	// RemoveFunc mocks the Remove method.
-	RemoveFunc func(queJobInterface worker.QueJobInterface) error
+	RemoveFunc func(ctx context.Context, job worker.QueJobInterface) error
+
+	// ShutdownFunc mocks the Shutdown method.
+	ShutdownFunc func(ctx context.Context) error
 
 	// calls tracks calls to the methods.
 	calls struct {
 		// Add holds details about calls to the Add method.
 		Add []struct {
-			// QueJobInterface is the queJobInterface argument value.
-			QueJobInterface worker.QueJobInterface
+			// Ctx is the ctx argument value.
+			Ctx context.Context
+			// Job is the job argument value.
+			Job worker.QueJobInterface
 		}
 		// Kill holds details about calls to the Kill method.
 		Kill []struct {
-			// QueJobInterface is the queJobInterface argument value.
-			QueJobInterface worker.QueJobInterface
+			// Ctx is the ctx argument value.
+			Ctx context.Context
+			// Job is the job argument value.
+			Job worker.QueJobInterface
 		}
 		// Listen holds details about calls to the Listen method.
 		Listen []struct {
@@ -70,30 +81,40 @@ type QueueMock struct {
 		}
 		// Remove holds details about calls to the Remove method.
 		Remove []struct {
-			// QueJobInterface is the queJobInterface argument value.
-			QueJobInterface worker.QueJobInterface
+			// Ctx is the ctx argument value.
+			Ctx context.Context
+			// Job is the job argument value.
+			Job worker.QueJobInterface
+		}
+		// Shutdown holds details about calls to the Shutdown method.
+		Shutdown []struct {
+			// Ctx is the ctx argument value.
+			Ctx context.Context
 		}
 	}
-	lockAdd    sync.RWMutex
-	lockKill   sync.RWMutex
-	lockListen sync.RWMutex
-	lockRemove sync.RWMutex
+	lockAdd      sync.RWMutex
+	lockKill     sync.RWMutex
+	lockListen   sync.RWMutex
+	lockRemove   sync.RWMutex
+	lockShutdown sync.RWMutex
 }
 
 // Add calls AddFunc.
-func (mock *QueueMock) Add(queJobInterface worker.QueJobInterface) error {
+func (mock *QueueMock) Add(ctx context.Context, job worker.QueJobInterface) error {
 	if mock.AddFunc == nil {
 		panic("QueueMock.AddFunc: method is nil but Queue.Add was just called")
 	}
 	callInfo := struct {
-		QueJobInterface worker.QueJobInterface
+		Ctx context.Context
+		Job worker.QueJobInterface
 	}{
-		QueJobInterface: queJobInterface,
+		Ctx: ctx,
+		Job: job,
 	}
 	mock.lockAdd.Lock()
 	mock.calls.Add = append(mock.calls.Add, callInfo)
 	mock.lockAdd.Unlock()
-	return mock.AddFunc(queJobInterface)
+	return mock.AddFunc(ctx, job)
 }
 
 // AddCalls gets all the calls that were made to Add.
@@ -101,10 +122,12 @@ func (mock *QueueMock) Add(queJobInterface worker.QueJobInterface) error {
 //
 //	len(mockedQueue.AddCalls())
 func (mock *QueueMock) AddCalls() []struct {
-	QueJobInterface worker.QueJobInterface
+	Ctx context.Context
+	Job worker.QueJobInterface
 } {
 	var calls []struct {
-		QueJobInterface worker.QueJobInterface
+		Ctx context.Context
+		Job worker.QueJobInterface
 	}
 	mock.lockAdd.RLock()
 	calls = mock.calls.Add
@@ -113,19 +136,21 @@ func (mock *QueueMock) AddCalls() []struct {
 }
 
 // Kill calls KillFunc.
-func (mock *QueueMock) Kill(queJobInterface worker.QueJobInterface) error {
+func (mock *QueueMock) Kill(ctx context.Context, job worker.QueJobInterface) error {
 	if mock.KillFunc == nil {
 		panic("QueueMock.KillFunc: method is nil but Queue.Kill was just called")
 	}
 	callInfo := struct {
-		QueJobInterface worker.QueJobInterface
+		Ctx context.Context
+		Job worker.QueJobInterface
 	}{
-		QueJobInterface: queJobInterface,
+		Ctx: ctx,
+		Job: job,
 	}
 	mock.lockKill.Lock()
 	mock.calls.Kill = append(mock.calls.Kill, callInfo)
 	mock.lockKill.Unlock()
-	return mock.KillFunc(queJobInterface)
+	return mock.KillFunc(ctx, job)
 }
 
 // KillCalls gets all the calls that were made to Kill.
@@ -133,10 +158,12 @@ func (mock *QueueMock) Kill(queJobInterface worker.QueJobInterface) error {
 //
 //	len(mockedQueue.KillCalls())
 func (mock *QueueMock) KillCalls() []struct {
-	QueJobInterface worker.QueJobInterface
+	Ctx context.Context
+	Job worker.QueJobInterface
 } {
 	var calls []struct {
-		QueJobInterface worker.QueJobInterface
+		Ctx context.Context
+		Job worker.QueJobInterface
 	}
 	mock.lockKill.RLock()
 	calls = mock.calls.Kill
@@ -181,19 +208,21 @@ func (mock *QueueMock) ListenCalls() []struct {
 }
 
 // Remove calls RemoveFunc.
-func (mock *QueueMock) Remove(queJobInterface worker.QueJobInterface) error {
+func (mock *QueueMock) Remove(ctx context.Context, job worker.QueJobInterface) error {
 	if mock.RemoveFunc == nil {
 		panic("QueueMock.RemoveFunc: method is nil but Queue.Remove was just called")
 	}
 	callInfo := struct {
-		QueJobInterface worker.QueJobInterface
+		Ctx context.Context
+		Job worker.QueJobInterface
 	}{
-		QueJobInterface: queJobInterface,
+		Ctx: ctx,
+		Job: job,
 	}
 	mock.lockRemove.Lock()
 	mock.calls.Remove = append(mock.calls.Remove, callInfo)
 	mock.lockRemove.Unlock()
-	return mock.RemoveFunc(queJobInterface)
+	return mock.RemoveFunc(ctx, job)
 }
 
 // RemoveCalls gets all the calls that were made to Remove.
@@ -201,13 +230,47 @@ func (mock *QueueMock) Remove(queJobInterface worker.QueJobInterface) error {
 //
 //	len(mockedQueue.RemoveCalls())
 func (mock *QueueMock) RemoveCalls() []struct {
-	QueJobInterface worker.QueJobInterface
+	Ctx context.Context
+	Job worker.QueJobInterface
 } {
 	var calls []struct {
-		QueJobInterface worker.QueJobInterface
+		Ctx context.Context
+		Job worker.QueJobInterface
 	}
 	mock.lockRemove.RLock()
 	calls = mock.calls.Remove
 	mock.lockRemove.RUnlock()
 	return calls
 }
+
+// Shutdown calls ShutdownFunc.
+func (mock *QueueMock) Shutdown(ctx context.Context) error {
+	if mock.ShutdownFunc == nil {
+		panic("QueueMock.ShutdownFunc: method is nil but Queue.Shutdown was just called")
+	}
+	callInfo := struct {
+		Ctx context.Context
+	}{
+		Ctx: ctx,
+	}
+	mock.lockShutdown.Lock()
+	mock.calls.Shutdown = append(mock.calls.Shutdown, callInfo)
+	mock.lockShutdown.Unlock()
+	return mock.ShutdownFunc(ctx)
+}
+
+// ShutdownCalls gets all the calls that were made to Shutdown.
+// Check the length with:
+//
+//	len(mockedQueue.ShutdownCalls())
+func (mock *QueueMock) ShutdownCalls() []struct {
+	Ctx context.Context
+} {
+	var calls []struct {
+		Ctx context.Context
+	}
+	mock.lockShutdown.RLock()
+	calls = mock.calls.Shutdown
+	mock.lockShutdown.RUnlock()
+	return calls
+}

+ 6 - 4
worker/queue.go

@@ -1,5 +1,7 @@
 package worker
 
+import "context"
+
 //go:generate moq -pkg mock -out mock/queue.go . Queue
 
 type QorJobDefinition struct {
@@ -8,9 +10,9 @@ type QorJobDefinition struct {
 }
 
 type Queue interface {
-	Add(QueJobInterface) error
-	Kill(QueJobInterface) error
-	Remove(QueJobInterface) error
+	Add(ctx context.Context, job QueJobInterface) error
+	Kill(ctx context.Context, job QueJobInterface) error
+	Remove(ctx context.Context, job QueJobInterface) error
 	Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) (QueJobInterface, error)) error
-	Shutdown() error
+	Shutdown(ctx context.Context) error
 }