-
Notifications
You must be signed in to change notification settings - Fork 22
/
taskinfo.go
120 lines (104 loc) · 2.62 KB
/
taskinfo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package task
import (
"context"
"sync"
"time"
)
const (
DefaultPeriod = time.Second //默认执行周期
defaultTimeLayout = "2006-01-02 15:04:05"
)
type (
//TaskInfo task info define
TaskInfo struct {
taskID string `json:"taskid"`
IsRun bool `json:"isrun"`
taskService *TaskService
mutex sync.RWMutex
TimeTicker *time.Ticker `json:"-"`
TaskType string `json:"tasktype"`
handler TaskHandle
TaskData interface{}
State string `json:"state"` //匹配 TskState_Init、TaskState_Run、TaskState_Stop
DueTime int64 `json:"duetime"` //开始任务的延迟时间(以毫秒为单位),如果<=0则不延迟
Timeout int64
counters *CounterInfo
}
//TaskConfig task config
TaskConfig struct {
TaskID string
TaskType string
IsRun bool
Handler TaskHandle `json:"-"`
DueTime int64
Interval int64
Express string
TaskData interface{}
}
CounterInfo struct {
StartTime time.Time
RunCounter Counter
ErrorCounter Counter
}
ShowCountInfo struct {
TaskID string
Lable string
Count int64
}
)
//Stop stop task
func (task *TaskInfo) Stop() {
if !task.IsRun {
return
}
if task.State == TaskState_Run {
task.TimeTicker.Stop()
task.State = TaskState_Stop
}
}
func (task *TaskInfo) SetTimeout(timeout int64) {
task.Timeout = timeout
}
//TaskID return taskID
func (task *TaskInfo) TaskID() string {
return task.taskID
}
//SetTaskService Set up the associated service
func (task *TaskInfo) SetTaskService(service *TaskService) {
task.taskService = service
}
// RunOnce do task only once
// no match Express or Interval
// no recover panic
// support for #6 新增RunOnce方法建议
func (task *TaskInfo) RunOnce() error {
err := task.handler(task.getTaskContext())
return err
}
func (task *TaskInfo) getTaskContext() *TaskContext {
ctx := task.taskService.contextPool.Get().(*TaskContext)
ctx.TaskID = task.taskID
ctx.TaskData = task.TaskData
ctx.Header = make(map[string]interface{})
if task.Timeout > 0 {
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(task.Timeout))
ctx.TimeoutContext = timeoutCtx
ctx.TimeoutCancel = cancel
ctx.doneChan = make(chan struct{})
}
return ctx
}
func (task *TaskInfo) putTaskContext(ctx *TaskContext) {
ctx.reset()
task.taskService.contextPool.Put(ctx)
}
func (task *TaskInfo) initCounters() {
counterInfo := new(CounterInfo)
counterInfo.StartTime = time.Now()
counterInfo.RunCounter = NewCounter()
counterInfo.ErrorCounter = NewCounter()
task.counters = counterInfo
}
func (task *TaskInfo) CounterInfo() *CounterInfo {
return task.counters
}