Jelajahi Sumber

worker: add Shutdown

xuxin 1 tahun lalu
induk
melakukan
32db71c9cb
4 mengubah file dengan 26 tambahan dan 7 penghapusan
  1. 4 0
      worker/builder.go
  2. 4 0
      worker/cron.go
  3. 17 7
      worker/goque.go
  4. 1 0
      worker/queue.go

+ 4 - 0
worker/builder.go

@@ -392,6 +392,10 @@ func (b *Builder) Listen() {
 	}
 }
 
+func (b *Builder) Shutdown() error {
+	return b.q.Shutdown()
+}
+
 func (b *Builder) createJob(ctx *web.EventContext, qorJob *QorJob) (j *QorJob, err error) {
 	if err = editIsAllowed(ctx.R, qorJob.Job); err != nil {
 		return

+ 4 - 0
worker/cron.go

@@ -277,3 +277,7 @@ func (c *cron) doRunJob(job QueJobInterface) error {
 
 	return nil
 }
+
+func (c *cron) Shutdown() error {
+	return nil
+}

+ 17 - 7
worker/goque.go

@@ -12,12 +12,14 @@ import (
 
 	"github.com/tnclong/go-que"
 	"github.com/tnclong/go-que/pg"
+	"go.uber.org/multierr"
 	"gorm.io/gorm"
 )
 
 type goque struct {
-	q  que.Queue
-	db *gorm.DB
+	q   que.Queue
+	db  *gorm.DB
+	wks []*que.Worker
 }
 
 func NewGoQueQueue(db *gorm.DB) Queue {
@@ -135,7 +137,7 @@ func (q *goque) Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) (
 					return err
 				}
 
-				hctx, cf := context.WithCancel(context.Background())
+				hctx, cf := context.WithCancel(ctx)
 				hDoneC := make(chan struct{})
 				isAborted := false
 				go func() {
@@ -177,13 +179,11 @@ func (q *goque) Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) (
 		if err != nil {
 			panic(err)
 		}
-
+		q.wks = append(q.wks, worker)
 		go func() {
 			if err := worker.Run(); err != nil {
-				errStr := fmt.Sprintf("worker Run() error: %s", err.Error())
-				fmt.Println(errStr)
 				q.db.Create(&GoQueError{
-					Error: errStr,
+					Error: fmt.Sprintf("worker Run() error: %s", err.Error()),
 				})
 			}
 		}()
@@ -192,6 +192,16 @@ func (q *goque) Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) (
 	return nil
 }
 
+func (q *goque) Shutdown() error {
+	var errs error
+	for _, wk := range q.wks {
+		if err := wk.Stop(context.Background()); err != nil {
+			errs = multierr.Append(errs, err)
+		}
+	}
+	return errs
+}
+
 func (q *goque) parseArgs(data []byte, args ...interface{}) error {
 	d := json.NewDecoder(bytes.NewReader(data))
 	if _, err := d.Token(); err != nil {

+ 1 - 0
worker/queue.go

@@ -12,4 +12,5 @@ type Queue interface {
 	Kill(QueJobInterface) error
 	Remove(QueJobInterface) error
 	Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) (QueJobInterface, error)) error
+	Shutdown() error
 }