123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- package worker
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "log"
- "net/http"
- "reflect"
- "strings"
- "time"
- "github.com/qor5/admin/presets"
- vx "github.com/qor5/ui/vuetifyx"
- "github.com/qor5/web"
- "github.com/qor5/x/i18n"
- . "github.com/theplant/htmlgo"
- "gorm.io/gorm"
- )
- //go:generate moq -pkg mock -out mock/qor_job.go . QorJobInterface
- type JobBuilder struct {
- b *Builder
- name string
- r interface{}
- rmb *presets.ModelBuilder
- h JobHandler
- contextHandler func(*web.EventContext) map[string]interface{} //optional
- global bool
- }
- func newJob(b *Builder, name string) *JobBuilder {
- if b == nil {
- panic("builder is nil")
- }
- if strings.TrimSpace(name) == "" {
- panic("name is empty")
- }
- return &JobBuilder{
- b: b,
- name: name,
- global: true,
- }
- }
- type JobHandler func(context.Context, QorJobInterface) error
- // r should be ptr to struct
- func (jb *JobBuilder) Resource(r interface{}) *JobBuilder {
- {
- v := reflect.TypeOf(r)
- if v.Kind() != reflect.Ptr {
- panic("resource is not ptr to struct")
- }
- if v.Elem().Kind() != reflect.Struct {
- panic("resource is not ptr to struct")
- }
- }
- jb.r = r
- jb.rmb = jb.b.jpb.Model(r)
- if _, ok := r.(Scheduler); ok {
- jb.rmb.Editing().Field("ScheduleTime").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) HTMLComponent {
- msgr := i18n.MustGetModuleMessages(ctx.R, I18nWorkerKey, Messages_en_US).(*Messages)
- t := obj.(Scheduler).GetScheduleTime()
- var v string
- if t != nil {
- v = t.Local().Format("2006-01-02 15:04")
- }
- return vx.VXDateTimePicker().FieldName(field.Name).Label(msgr.ScheduleTime).
- Value(v).
- TimePickerProps(vx.TimePickerProps{
- Format: "24hr",
- Scrollable: true,
- }).
- ClearText(msgr.DateTimePickerClearText).OkText(msgr.DateTimePickerOkText)
- }).SetterFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) (err error) {
- v := ctx.R.Form.Get(field.Name)
- if v == "" {
- return nil
- }
- t, err := time.ParseInLocation("2006-01-02 15:04", v, time.Local)
- if err != nil {
- return err
- }
- obj.(Scheduler).SetScheduleTime(&t)
- return nil
- })
- }
- return jb
- }
- func (jb *JobBuilder) GetResourceBuilder() *presets.ModelBuilder {
- return jb.rmb
- }
- func (jb *JobBuilder) Handler(h JobHandler) *JobBuilder {
- jb.h = h
- return jb
- }
- func (jb *JobBuilder) ContextHandler(handler func(*web.EventContext) map[string]interface{}) *JobBuilder {
- jb.contextHandler = handler
- return jb
- }
- func (jb *JobBuilder) newResourceObject() interface{} {
- if jb.r == nil {
- return nil
- }
- return reflect.New(reflect.TypeOf(jb.r).Elem()).Interface()
- }
- func (jb *JobBuilder) unmarshalForm(ctx *web.EventContext) (args interface{}, vErr web.ValidationErrors) {
- args = jb.newResourceObject()
- if args != nil {
- vErr = jb.rmb.Editing().RunSetterFunc(ctx, false, args)
- }
- return args, vErr
- }
- func (jb *JobBuilder) parseArgs(in string) (args interface{}, err error) {
- if jb.r == nil {
- return nil, nil
- }
- args = jb.newResourceObject()
- err = json.Unmarshal([]byte(in), args)
- if err != nil {
- return nil, err
- }
- return args, nil
- }
- func getModelQorJobInstance(db *gorm.DB, qorJobID uint) (*QorJobInstance, error) {
- var insts []*QorJobInstance
- err := db.Where("qor_job_id = ?", qorJobID).
- Order("created_at desc").
- Limit(1).
- Find(&insts).
- Error
- if err != nil {
- return nil, err
- }
- if len(insts) == 0 {
- return nil, errors.New("no qor job instance")
- }
- return insts[0], nil
- }
- func (jb *JobBuilder) getJobInstance(qorJobID uint) (*QorJobInstance, error) {
- inst, err := getModelQorJobInstance(jb.b.db, qorJobID)
- if err != nil {
- return nil, err
- }
- inst.jb = jb
- return inst, nil
- }
- func (jb *JobBuilder) newJobInstance(
- r *http.Request,
- qorJobID uint,
- qorJobName string,
- args interface{},
- context interface{},
- ) (*QorJobInstance, error) {
- var mArgs string
- if v, ok := args.(string); ok {
- mArgs = v
- } else {
- bArgs, err := json.Marshal(args)
- if err != nil {
- return nil, err
- }
- mArgs = string(bArgs)
- }
- var ctx string
- if v, ok := context.(string); ok {
- ctx = v
- } else {
- bArgs, err := json.Marshal(context)
- if err != nil {
- return nil, err
- }
- ctx = string(bArgs)
- }
- inst := QorJobInstance{
- QorJobID: qorJobID,
- Args: mArgs,
- Context: ctx,
- Job: qorJobName,
- Status: JobStatusNew,
- }
- if jb.b.getCurrentUserIDFunc != nil {
- inst.Operator = jb.b.getCurrentUserIDFunc(r)
- }
- err := jb.b.db.Create(&inst).Error
- if err != nil {
- return nil, err
- }
- return jb.getJobInstance(qorJobID)
- }
- type QueJobInterface interface {
- QorJobInterface
- GetStatus() string
- FetchAndSetStatus() (string, error)
- SetStatus(string) error
- StartRefresh()
- StopRefresh()
- GetHandler() JobHandler
- }
- type JobInfo struct {
- JobID string
- JobName string
- Operator string
- Argument interface{}
- Context map[string]interface{}
- }
- // for job handler
- type QorJobInterface interface {
- GetJobInfo() (*JobInfo, error)
- SetProgress(uint) error
- SetProgressText(string) error
- AddLog(string) error
- AddLogf(format string, a ...interface{}) error
- }
- var _ QueJobInterface = (*QorJobInstance)(nil)
- func (job *QorJobInstance) GetJobInfo() (ji *JobInfo, err error) {
- arg, err := job.getArgument()
- if err != nil {
- return
- }
- context, err := job.getContext()
- if err != nil {
- return
- }
- return &JobInfo{
- JobID: fmt.Sprint(job.QorJobID),
- JobName: job.Job,
- Operator: job.Operator,
- Argument: arg,
- Context: context,
- }, nil
- }
- func (job *QorJobInstance) GetStatus() string {
- return job.Status
- }
- func (job *QorJobInstance) FetchAndSetStatus() (string, error) {
- var status string
- {
- db, err := job.jb.b.db.DB()
- if err != nil {
- return job.Status, err
- }
- err = db.QueryRow("select status from qor_job_instances where id = $1", job.ID).Scan(&status)
- if err != nil {
- return job.Status, err
- }
- if status == "" {
- return job.Status, errors.New("failed to fetch qor_job_instance status")
- }
- }
- if job.Status != status {
- err := job.SetStatus(status)
- if err != nil {
- return job.Status, err
- }
- }
- return job.Status, nil
- }
- func (job *QorJobInstance) SetStatus(status string) error {
- job.mutex.Lock()
- defer job.mutex.Unlock()
- job.Status = status
- if status == JobStatusDone {
- job.Progress = 100
- }
- if job.shouldCallSave() {
- return job.callSave()
- }
- return nil
- }
- func (job *QorJobInstance) SetProgress(progress uint) error {
- job.mutex.Lock()
- defer job.mutex.Unlock()
- if progress > 100 {
- progress = 100
- }
- job.Progress = progress
- if job.shouldCallSave() {
- return job.callSave()
- }
- return nil
- }
- func (job *QorJobInstance) SetProgressText(s string) error {
- job.mutex.Lock()
- defer job.mutex.Unlock()
- job.ProgressText = s
- if job.shouldCallSave() {
- return job.callSave()
- }
- return nil
- }
- func (job *QorJobInstance) AddLog(log string) error {
- if err := job.jb.b.db.Create(&QorJobLog{
- QorJobInstanceID: job.ID,
- Log: log,
- }).Error; err != nil {
- return err
- }
- return nil
- }
- func (job *QorJobInstance) AddLogf(format string, a ...interface{}) error {
- return job.AddLog(fmt.Sprintf(format, a...))
- }
- func (job *QorJobInstance) StartRefresh() {
- job.mutex.Lock()
- defer job.mutex.Unlock()
- if !job.inRefresh {
- job.inRefresh = true
- job.stopRefresh = false
- go func() {
- job.refresh()
- }()
- }
- }
- func (job *QorJobInstance) StopRefresh() {
- job.mutex.Lock()
- defer job.mutex.Unlock()
- err := job.callSave()
- if err != nil {
- log.Println(err)
- }
- job.stopRefresh = true
- }
- func (job *QorJobInstance) GetHandler() JobHandler {
- return job.jb.h
- }
- func (job *QorJobInstance) getArgument() (interface{}, error) {
- return job.jb.parseArgs(job.Args)
- }
- func (job *QorJobInstance) getContext() (map[string]interface{}, error) {
- var context = make(map[string]interface{})
- err := json.Unmarshal([]byte(job.Context), &context)
- return context, err
- }
- func (job *QorJobInstance) shouldCallSave() bool {
- return !job.inRefresh || job.stopRefresh
- }
- func (job *QorJobInstance) callSave() error {
- err := job.jb.b.setStatus(job.QorJobID, job.Status)
- if err != nil {
- return err
- }
- return job.jb.b.db.Save(job).Error
- }
- func (job *QorJobInstance) refresh() {
- job.mutex.Lock()
- defer job.mutex.Unlock()
- err := job.callSave()
- if err != nil {
- log.Println(err)
- }
- if job.stopRefresh {
- job.inRefresh = false
- job.stopRefresh = false
- } else {
- time.AfterFunc(5*time.Second, job.refresh)
- }
- }
|