123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- package worker
- import (
- "context"
- "encoding/json"
- "errors"
- "flag"
- "fmt"
- "os"
- "os/exec"
- "os/signal"
- "path/filepath"
- "reflect"
- "runtime/debug"
- "strconv"
- "strings"
- "sync"
- "syscall"
- "time"
- )
- type cronJob struct {
- JobID string
- Pid int
- Command string
- Delete bool `json:"-"`
- }
- func (job cronJob) ToString() string {
- marshal, _ := json.Marshal(job)
- return fmt.Sprintf("## BEGIN QOR JOB %v # %v\n%v\n## END QOR JOB\n", job.JobID, string(marshal), job.Command)
- }
- // Cron implemented a worker Queue based on cronjob
- type cron struct {
- Jobs []*cronJob
- CronJobs []string
- mutex sync.Mutex `sql:"-"`
- }
- // NewCronQueue initialize a Cron queue
- func NewCronQueue() Queue {
- return &cron{}
- }
- func (c *cron) parseJobs() []*cronJob {
- c.mutex.Lock()
- c.Jobs = []*cronJob{}
- c.CronJobs = []string{}
- if out, err := exec.Command("crontab", "-l").Output(); err == nil {
- var inQorJob bool
- for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
- if strings.HasPrefix(line, "## BEGIN QOR JOB") {
- inQorJob = true
- if idx := strings.Index(line, "{"); idx > 1 {
- var job cronJob
- if json.Unmarshal([]byte(line[idx-1:]), &job) == nil {
- c.Jobs = append(c.Jobs, &job)
- }
- }
- }
- if !inQorJob {
- c.CronJobs = append(c.CronJobs, line)
- }
- if strings.HasPrefix(line, "## END QOR JOB") {
- inQorJob = false
- }
- }
- }
- return c.Jobs
- }
- func (c *cron) writeCronJob() error {
- defer c.mutex.Unlock()
- cmd := exec.Command("crontab", "-")
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- stdin, _ := cmd.StdinPipe()
- for _, cronJob := range c.CronJobs {
- stdin.Write([]byte(cronJob + "\n"))
- }
- for _, job := range c.Jobs {
- if !job.Delete {
- stdin.Write([]byte(job.ToString() + "\n"))
- }
- }
- stdin.Close()
- return cmd.Run()
- }
- // Add a job to cron queue
- func (c *cron) Add(ctx context.Context, job QueJobInterface) (err error) {
- c.parseJobs()
- defer c.writeCronJob()
- jobInfo, err := job.GetJobInfo()
- if err != nil {
- return err
- }
- var binaryFile string
- if binaryFile, err = filepath.Abs(os.Args[0]); err == nil {
- var jobs []*cronJob
- for _, cronJob := range c.Jobs {
- if cronJob.JobID != jobInfo.JobID {
- jobs = append(jobs, cronJob)
- }
- }
- if scheduler, ok := jobInfo.Argument.(Scheduler); ok && scheduler.GetScheduleTime() != nil {
- scheduleTime := scheduler.GetScheduleTime().In(time.Local)
- job.SetStatus(JobStatusScheduled)
- currentPath, _ := os.Getwd()
- jobs = append(jobs, &cronJob{
- JobID: jobInfo.JobID,
- Command: fmt.Sprintf("%d %d %d %d * cd %v; %v --qor-job %v\n", scheduleTime.Minute(), scheduleTime.Hour(), scheduleTime.Day(), scheduleTime.Month(), currentPath, binaryFile, jobInfo.JobID),
- })
- } else {
- cmd := exec.Command(binaryFile, "--qor-job", jobInfo.JobID)
- if err = cmd.Start(); err == nil {
- jobs = append(jobs, &cronJob{JobID: jobInfo.JobID, Pid: cmd.Process.Pid})
- cmd.Process.Release()
- }
- }
- c.Jobs = jobs
- }
- return
- }
- // Run a job from cron queue
- func (c *cron) run(ctx context.Context, qorJob QueJobInterface) (err error) {
- jobInfo, err := qorJob.GetJobInfo()
- if err != nil {
- return err
- }
- h := qorJob.GetHandler()
- if h == nil {
- panic(fmt.Sprintf("job %v no handler", jobInfo.JobName))
- }
- go func() {
- sigint := make(chan os.Signal, 1)
- // interrupt signal sent from terminal
- signal.Notify(sigint, syscall.SIGINT)
- // sigterm signal sent from kubernetes
- signal.Notify(sigint, syscall.SIGTERM)
- i := <-sigint
- qorJob.SetProgressText(fmt.Sprintf("Worker killed by signal %s", i.String()))
- qorJob.SetStatus(JobStatusKilled)
- qorJob.StopRefresh()
- os.Exit(int(reflect.ValueOf(i).Int()))
- }()
- qorJob.StartRefresh()
- defer qorJob.StopRefresh()
- err = h(ctx, qorJob)
- if err == nil {
- c.parseJobs()
- defer c.writeCronJob()
- for _, cronJob := range c.Jobs {
- if cronJob.JobID == jobInfo.JobID {
- cronJob.Delete = true
- }
- }
- }
- return err
- }
- // Kill a job from cron queue
- func (c *cron) Kill(ctx context.Context, job QueJobInterface) (err error) {
- c.parseJobs()
- defer c.writeCronJob()
- jobInfo, err := job.GetJobInfo()
- if err != nil {
- return err
- }
- for _, cronJob := range c.Jobs {
- if cronJob.JobID == jobInfo.JobID {
- if process, err := os.FindProcess(cronJob.Pid); err == nil {
- if err = process.Kill(); err == nil {
- cronJob.Delete = true
- return job.SetStatus(JobStatusKilled)
- }
- }
- return err
- }
- }
- return errors.New("failed to find job")
- }
- // Remove a job from cron queue
- func (c *cron) Remove(ctx context.Context, job QueJobInterface) error {
- c.parseJobs()
- defer c.writeCronJob()
- jobInfo, err := job.GetJobInfo()
- if err != nil {
- return err
- }
- for _, cronJob := range c.Jobs {
- if cronJob.JobID == jobInfo.JobID {
- if cronJob.Pid == 0 {
- cronJob.Delete = true
- return job.SetStatus(JobStatusKilled)
- }
- return errors.New("failed to remove current job as it is running")
- }
- }
- return errors.New("failed to find job")
- }
- func (c *cron) Listen(_ []*QorJobDefinition, getJob func(qorJobID uint) (QueJobInterface, error)) error {
- cmdLine := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
- qorJobID := cmdLine.String("qor-job", "", "Qor Job ID")
- cmdLine.Parse(os.Args[1:])
- if *qorJobID != "" {
- id, err := strconv.ParseUint(*qorJobID, 10, 64)
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- job, err := getJob(uint(id))
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- if err := c.doRunJob(context.Background(), job); err == nil {
- os.Exit(0)
- } else {
- fmt.Println(err)
- os.Exit(1)
- }
- }
- return nil
- }
- func (c *cron) doRunJob(ctx context.Context, job QueJobInterface) error {
- defer func() {
- if r := recover(); r != nil {
- job.AddLog(string(debug.Stack()))
- job.SetProgressText(fmt.Sprint(r))
- job.SetStatus(JobStatusException)
- }
- }()
- if job.GetStatus() != JobStatusNew && job.GetStatus() != JobStatusScheduled {
- return errors.New("invalid job status, current status: " + job.GetStatus())
- }
- if err := job.SetStatus(JobStatusRunning); err == nil {
- if err := c.run(ctx, job); err == nil {
- return job.SetStatus(JobStatusDone)
- }
- job.SetProgressText(err.Error())
- job.SetStatus(JobStatusException)
- }
- return nil
- }
- func (c *cron) Shutdown(ctx context.Context) error {
- return nil
- }
|