schedule_publish_builder.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package publish
  2. import (
  3. "context"
  4. "log"
  5. "reflect"
  6. "github.com/hashicorp/go-multierror"
  7. "gorm.io/gorm"
  8. )
  9. type SchedulePublishBuilder struct {
  10. publisher *Builder
  11. context context.Context
  12. }
  13. func NewSchedulePublishBuilder(publisher *Builder) *SchedulePublishBuilder {
  14. return &SchedulePublishBuilder{
  15. publisher: publisher,
  16. context: context.Background(),
  17. }
  18. }
  19. func (b *SchedulePublishBuilder) WithValue(key, val interface{}) *SchedulePublishBuilder {
  20. b.context = context.WithValue(b.context, key, val)
  21. return b
  22. }
  23. type SchedulePublisher interface {
  24. SchedulePublisherDBScope(db *gorm.DB) *gorm.DB
  25. }
  26. // model is a empty struct
  27. // example: Product{}
  28. func (b *SchedulePublishBuilder) Run(model interface{}) (err error) {
  29. var scope *gorm.DB
  30. if m, ok := model.(SchedulePublisher); ok {
  31. scope = m.SchedulePublisherDBScope(b.publisher.db)
  32. } else {
  33. scope = b.publisher.db
  34. }
  35. // If model is Product{}
  36. // Generate a records: []*Product{}
  37. records := reflect.MakeSlice(reflect.SliceOf(reflect.New(reflect.TypeOf(model)).Type()), 0, 0).Interface()
  38. flagTime := scope.NowFunc()
  39. var unpublishAfterPublishRecords []interface{}
  40. {
  41. tempRecords := records
  42. err = scope.Where("scheduled_end_at <= ?", flagTime).Order("scheduled_end_at").Find(&tempRecords).Error
  43. if err != nil {
  44. return
  45. }
  46. needUnpublishReflectValues := reflect.ValueOf(tempRecords)
  47. for i := 0; i < needUnpublishReflectValues.Len(); i++ {
  48. {
  49. record := needUnpublishReflectValues.Index(i).Interface().(ScheduleInterface)
  50. if record.GetScheduledStartAt() != nil && record.GetScheduledStartAt().Sub(*record.GetScheduledEndAt()) < 0 {
  51. unpublishAfterPublishRecords = append(unpublishAfterPublishRecords, record)
  52. continue
  53. }
  54. }
  55. if record, ok := needUnpublishReflectValues.Index(i).Interface().(UnPublishInterface); ok {
  56. if err2 := b.publisher.UnPublish(record); err2 != nil {
  57. log.Printf("error: %s\n", err2)
  58. err = multierror.Append(err, err2).ErrorOrNil()
  59. }
  60. }
  61. }
  62. }
  63. {
  64. tempRecords := records
  65. err = scope.Where("scheduled_start_at <= ?", flagTime).Order("scheduled_start_at").Find(&tempRecords).Error
  66. if err != nil {
  67. return
  68. }
  69. needPublishReflectValues := reflect.ValueOf(tempRecords)
  70. for i := 0; i < needPublishReflectValues.Len(); i++ {
  71. if record, ok := needPublishReflectValues.Index(i).Interface().(PublishInterface); ok {
  72. if err2 := b.publisher.Publish(record); err2 != nil {
  73. log.Printf("error: %s\n", err2)
  74. err = multierror.Append(err, err2).ErrorOrNil()
  75. }
  76. }
  77. }
  78. }
  79. {
  80. for _, interfaceRecord := range unpublishAfterPublishRecords {
  81. if record, ok := interfaceRecord.(UnPublishInterface); ok {
  82. if err2 := b.publisher.UnPublish(record); err2 != nil {
  83. log.Printf("error: %s\n", err2)
  84. err = multierror.Append(err, err2).ErrorOrNil()
  85. }
  86. }
  87. }
  88. }
  89. return
  90. }