job.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. package worker
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "reflect"
  10. "strings"
  11. "time"
  12. "github.com/qor5/admin/presets"
  13. vx "github.com/qor5/ui/vuetifyx"
  14. "github.com/qor5/web"
  15. "github.com/qor5/x/i18n"
  16. . "github.com/theplant/htmlgo"
  17. "gorm.io/gorm"
  18. )
  19. //go:generate moq -pkg mock -out mock/qor_job.go . QorJobInterface
  20. type JobBuilder struct {
  21. b *Builder
  22. name string
  23. r interface{}
  24. rmb *presets.ModelBuilder
  25. h JobHandler
  26. contextHandler func(*web.EventContext) map[string]interface{} //optional
  27. global bool
  28. }
  29. func newJob(b *Builder, name string) *JobBuilder {
  30. if b == nil {
  31. panic("builder is nil")
  32. }
  33. if strings.TrimSpace(name) == "" {
  34. panic("name is empty")
  35. }
  36. return &JobBuilder{
  37. b: b,
  38. name: name,
  39. global: true,
  40. }
  41. }
  42. type JobHandler func(context.Context, QorJobInterface) error
  43. // r should be ptr to struct
  44. func (jb *JobBuilder) Resource(r interface{}) *JobBuilder {
  45. {
  46. v := reflect.TypeOf(r)
  47. if v.Kind() != reflect.Ptr {
  48. panic("resource is not ptr to struct")
  49. }
  50. if v.Elem().Kind() != reflect.Struct {
  51. panic("resource is not ptr to struct")
  52. }
  53. }
  54. jb.r = r
  55. jb.rmb = jb.b.jpb.Model(r)
  56. if _, ok := r.(Scheduler); ok {
  57. jb.rmb.Editing().Field("ScheduleTime").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) HTMLComponent {
  58. msgr := i18n.MustGetModuleMessages(ctx.R, I18nWorkerKey, Messages_en_US).(*Messages)
  59. t := obj.(Scheduler).GetScheduleTime()
  60. var v string
  61. if t != nil {
  62. v = t.Local().Format("2006-01-02 15:04")
  63. }
  64. return vx.VXDateTimePicker().FieldName(field.Name).Label(msgr.ScheduleTime).
  65. Value(v).
  66. TimePickerProps(vx.TimePickerProps{
  67. Format: "24hr",
  68. Scrollable: true,
  69. }).
  70. ClearText(msgr.DateTimePickerClearText).OkText(msgr.DateTimePickerOkText)
  71. }).SetterFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) (err error) {
  72. v := ctx.R.Form.Get(field.Name)
  73. if v == "" {
  74. return nil
  75. }
  76. t, err := time.ParseInLocation("2006-01-02 15:04", v, time.Local)
  77. if err != nil {
  78. return err
  79. }
  80. obj.(Scheduler).SetScheduleTime(&t)
  81. return nil
  82. })
  83. }
  84. return jb
  85. }
  86. func (jb *JobBuilder) GetResourceBuilder() *presets.ModelBuilder {
  87. return jb.rmb
  88. }
  89. func (jb *JobBuilder) Handler(h JobHandler) *JobBuilder {
  90. jb.h = h
  91. return jb
  92. }
  93. func (jb *JobBuilder) ContextHandler(handler func(*web.EventContext) map[string]interface{}) *JobBuilder {
  94. jb.contextHandler = handler
  95. return jb
  96. }
  97. func (jb *JobBuilder) newResourceObject() interface{} {
  98. if jb.r == nil {
  99. return nil
  100. }
  101. return reflect.New(reflect.TypeOf(jb.r).Elem()).Interface()
  102. }
  103. func (jb *JobBuilder) unmarshalForm(ctx *web.EventContext) (args interface{}, vErr web.ValidationErrors) {
  104. args = jb.newResourceObject()
  105. if args != nil {
  106. vErr = jb.rmb.Editing().RunSetterFunc(ctx, false, args)
  107. }
  108. return args, vErr
  109. }
  110. func (jb *JobBuilder) parseArgs(in string) (args interface{}, err error) {
  111. if jb.r == nil {
  112. return nil, nil
  113. }
  114. args = jb.newResourceObject()
  115. err = json.Unmarshal([]byte(in), args)
  116. if err != nil {
  117. return nil, err
  118. }
  119. return args, nil
  120. }
  121. func getModelQorJobInstance(db *gorm.DB, qorJobID uint) (*QorJobInstance, error) {
  122. var insts []*QorJobInstance
  123. err := db.Where("qor_job_id = ?", qorJobID).
  124. Order("created_at desc").
  125. Limit(1).
  126. Find(&insts).
  127. Error
  128. if err != nil {
  129. return nil, err
  130. }
  131. if len(insts) == 0 {
  132. return nil, errors.New("no qor job instance")
  133. }
  134. return insts[0], nil
  135. }
  136. func (jb *JobBuilder) getJobInstance(qorJobID uint) (*QorJobInstance, error) {
  137. inst, err := getModelQorJobInstance(jb.b.db, qorJobID)
  138. if err != nil {
  139. return nil, err
  140. }
  141. inst.jb = jb
  142. return inst, nil
  143. }
  144. func (jb *JobBuilder) newJobInstance(
  145. r *http.Request,
  146. qorJobID uint,
  147. qorJobName string,
  148. args interface{},
  149. context interface{},
  150. ) (*QorJobInstance, error) {
  151. var mArgs string
  152. if v, ok := args.(string); ok {
  153. mArgs = v
  154. } else {
  155. bArgs, err := json.Marshal(args)
  156. if err != nil {
  157. return nil, err
  158. }
  159. mArgs = string(bArgs)
  160. }
  161. var ctx string
  162. if v, ok := context.(string); ok {
  163. ctx = v
  164. } else {
  165. bArgs, err := json.Marshal(context)
  166. if err != nil {
  167. return nil, err
  168. }
  169. ctx = string(bArgs)
  170. }
  171. inst := QorJobInstance{
  172. QorJobID: qorJobID,
  173. Args: mArgs,
  174. Context: ctx,
  175. Job: qorJobName,
  176. Status: JobStatusNew,
  177. }
  178. if jb.b.getCurrentUserIDFunc != nil {
  179. inst.Operator = jb.b.getCurrentUserIDFunc(r)
  180. }
  181. err := jb.b.db.Create(&inst).Error
  182. if err != nil {
  183. return nil, err
  184. }
  185. return jb.getJobInstance(qorJobID)
  186. }
  187. type QueJobInterface interface {
  188. QorJobInterface
  189. GetStatus() string
  190. FetchAndSetStatus() (string, error)
  191. SetStatus(string) error
  192. StartRefresh()
  193. StopRefresh()
  194. GetHandler() JobHandler
  195. }
  196. type JobInfo struct {
  197. JobID string
  198. JobName string
  199. Operator string
  200. Argument interface{}
  201. Context map[string]interface{}
  202. }
  203. // for job handler
  204. type QorJobInterface interface {
  205. GetJobInfo() (*JobInfo, error)
  206. SetProgress(uint) error
  207. SetProgressText(string) error
  208. AddLog(string) error
  209. AddLogf(format string, a ...interface{}) error
  210. }
  211. var _ QueJobInterface = (*QorJobInstance)(nil)
  212. func (job *QorJobInstance) GetJobInfo() (ji *JobInfo, err error) {
  213. arg, err := job.getArgument()
  214. if err != nil {
  215. return
  216. }
  217. context, err := job.getContext()
  218. if err != nil {
  219. return
  220. }
  221. return &JobInfo{
  222. JobID: fmt.Sprint(job.QorJobID),
  223. JobName: job.Job,
  224. Operator: job.Operator,
  225. Argument: arg,
  226. Context: context,
  227. }, nil
  228. }
  229. func (job *QorJobInstance) GetStatus() string {
  230. return job.Status
  231. }
  232. func (job *QorJobInstance) FetchAndSetStatus() (string, error) {
  233. var status string
  234. {
  235. db, err := job.jb.b.db.DB()
  236. if err != nil {
  237. return job.Status, err
  238. }
  239. err = db.QueryRow("select status from qor_job_instances where id = $1", job.ID).Scan(&status)
  240. if err != nil {
  241. return job.Status, err
  242. }
  243. if status == "" {
  244. return job.Status, errors.New("failed to fetch qor_job_instance status")
  245. }
  246. }
  247. if job.Status != status {
  248. err := job.SetStatus(status)
  249. if err != nil {
  250. return job.Status, err
  251. }
  252. }
  253. return job.Status, nil
  254. }
  255. func (job *QorJobInstance) SetStatus(status string) error {
  256. job.mutex.Lock()
  257. defer job.mutex.Unlock()
  258. job.Status = status
  259. if status == JobStatusDone {
  260. job.Progress = 100
  261. }
  262. if job.shouldCallSave() {
  263. return job.callSave()
  264. }
  265. return nil
  266. }
  267. func (job *QorJobInstance) SetProgress(progress uint) error {
  268. job.mutex.Lock()
  269. defer job.mutex.Unlock()
  270. if progress > 100 {
  271. progress = 100
  272. }
  273. job.Progress = progress
  274. if job.shouldCallSave() {
  275. return job.callSave()
  276. }
  277. return nil
  278. }
  279. func (job *QorJobInstance) SetProgressText(s string) error {
  280. job.mutex.Lock()
  281. defer job.mutex.Unlock()
  282. job.ProgressText = s
  283. if job.shouldCallSave() {
  284. return job.callSave()
  285. }
  286. return nil
  287. }
  288. func (job *QorJobInstance) AddLog(log string) error {
  289. if err := job.jb.b.db.Create(&QorJobLog{
  290. QorJobInstanceID: job.ID,
  291. Log: log,
  292. }).Error; err != nil {
  293. return err
  294. }
  295. return nil
  296. }
  297. func (job *QorJobInstance) AddLogf(format string, a ...interface{}) error {
  298. return job.AddLog(fmt.Sprintf(format, a...))
  299. }
  300. func (job *QorJobInstance) StartRefresh() {
  301. job.mutex.Lock()
  302. defer job.mutex.Unlock()
  303. if !job.inRefresh {
  304. job.inRefresh = true
  305. job.stopRefresh = false
  306. go func() {
  307. job.refresh()
  308. }()
  309. }
  310. }
  311. func (job *QorJobInstance) StopRefresh() {
  312. job.mutex.Lock()
  313. defer job.mutex.Unlock()
  314. err := job.callSave()
  315. if err != nil {
  316. log.Println(err)
  317. }
  318. job.stopRefresh = true
  319. }
  320. func (job *QorJobInstance) GetHandler() JobHandler {
  321. return job.jb.h
  322. }
  323. func (job *QorJobInstance) getArgument() (interface{}, error) {
  324. return job.jb.parseArgs(job.Args)
  325. }
  326. func (job *QorJobInstance) getContext() (map[string]interface{}, error) {
  327. var context = make(map[string]interface{})
  328. err := json.Unmarshal([]byte(job.Context), &context)
  329. return context, err
  330. }
  331. func (job *QorJobInstance) shouldCallSave() bool {
  332. return !job.inRefresh || job.stopRefresh
  333. }
  334. func (job *QorJobInstance) callSave() error {
  335. err := job.jb.b.setStatus(job.QorJobID, job.Status)
  336. if err != nil {
  337. return err
  338. }
  339. return job.jb.b.db.Save(job).Error
  340. }
  341. func (job *QorJobInstance) refresh() {
  342. job.mutex.Lock()
  343. defer job.mutex.Unlock()
  344. err := job.callSave()
  345. if err != nil {
  346. log.Println(err)
  347. }
  348. if job.stopRefresh {
  349. job.inRefresh = false
  350. job.stopRefresh = false
  351. } else {
  352. time.AfterFunc(5*time.Second, job.refresh)
  353. }
  354. }