|
@@ -90,7 +90,7 @@ type TTask struct {
|
|
|
S_attr string //标识属性key值
|
|
|
AttrVal int //标识属性val值
|
|
|
S_coll string //查询、存储表
|
|
|
- LastId string
|
|
|
+ LastId string //上次定时任务的结束id
|
|
|
Lock sync.Mutex
|
|
|
S_query string //任务查询条件
|
|
|
FlagQuit chan bool //任务结束控制
|
|
@@ -141,9 +141,9 @@ func InitTaskData(_id string) {
|
|
|
s_starttime := int64(0)
|
|
|
s_asfield := ""
|
|
|
if b_updaterule, ok := (*taskData)["b_updaterule"].(bool); ok {
|
|
|
- task.B_UpdateRule = b_updaterule
|
|
|
+ task.B_UpdateRule = b_updaterule //更新任务下的规则
|
|
|
}
|
|
|
- if (*taskData)["s_querycon"] != nil {
|
|
|
+ if (*taskData)["s_querycon"] != nil { //
|
|
|
s_querycon = util.ObjToString((*taskData)["s_querycon"])
|
|
|
}
|
|
|
if (*taskData)["s_starttime"] != nil {
|
|
@@ -194,7 +194,8 @@ func InitTaskData(_id string) {
|
|
|
}
|
|
|
task.AttrVal = flagAttrVal
|
|
|
|
|
|
- for _, v := range tools.Config { //联表查询初始化mgo,线上只有行业分类用到;跑历史招标、行业分类的时候也用到两边查询
|
|
|
+ //联表查询初始化mgo,线上只有行业分类用到;跑历史招标、行业分类的时候也用到两边查询
|
|
|
+ for _, v := range tools.Config {
|
|
|
if m, ok := v.(map[string]interface{}); ok {
|
|
|
if m["taskid"] == task.ID {
|
|
|
if m["mgoaddr"] != nil && m["db"] != nil && m["coll"] != nil {
|
|
@@ -232,10 +233,11 @@ func InitTaskData(_id string) {
|
|
|
task.Task_QueryFieldMap[f] = 1
|
|
|
task.Task_QueryFieldArr = append(task.Task_QueryFieldArr, f)
|
|
|
}
|
|
|
+ //初始化任务下所有的分类和规则
|
|
|
InitClassAndRuleData(_id, task)
|
|
|
}
|
|
|
|
|
|
-//初始化任务下所有的分类和规则
|
|
|
+//InitClassAndRuleData 初始化任务下所有的分类和规则
|
|
|
func InitClassAndRuleData(_id string, task *TTask) {
|
|
|
defer tools.Catch()
|
|
|
classIdStr := task.S_class
|
|
@@ -245,7 +247,8 @@ func InitClassAndRuleData(_id string, task *TTask) {
|
|
|
for _, classid := range classIdArr {
|
|
|
classData, _ := tools.MgoClass.FindById(tools.COLL_CLASS, classid, nil)
|
|
|
if classData != nil {
|
|
|
- class := &Class{ //初始化Class
|
|
|
+ //初始化Class
|
|
|
+ class := &Class{
|
|
|
//Rule: CidRuleMap[classid],
|
|
|
Cid: classid,
|
|
|
Class_PreRule: util.ObjToString((*classData)["s_class_prerule"]),
|
|
@@ -499,9 +502,10 @@ OVER:
|
|
|
tt = nil
|
|
|
break OVER
|
|
|
case <-first: //第一次执行控制
|
|
|
- if tools.ControlTaskRun { //任务流程控制
|
|
|
+ if tools.ControlTaskRun { //任务流程控制,现有模式用不到,默认false
|
|
|
tools.AllTaskFinish = false
|
|
|
}
|
|
|
+ log.Println("第一次执行任务:", tt.S_name)
|
|
|
newtaskrun(tt)
|
|
|
case <-time.Tick(time.Duration(tt.I_rate) * time.Second): //任务定时控制
|
|
|
//执行定时任务前,检查任务是否更新了rule
|
|
@@ -528,14 +532,14 @@ func newtaskrun(tt *TTask) {
|
|
|
NewTaskRunAll(tt, false, nil)
|
|
|
}
|
|
|
|
|
|
-//常规任务和udp非合并数据处理方法
|
|
|
+//NewTaskRunAll 常规任务和udp非合并数据处理方法
|
|
|
func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
|
total := 0
|
|
|
tools.Try(func() { //不加这一层defer运行不了!!!
|
|
|
timespan := false //时间间隔(控制数据条数打印)
|
|
|
tt.B_Running = true
|
|
|
defer func() {
|
|
|
- //业主分类执行完修改AllTaskFinish状态
|
|
|
+ //业主分类执行完修改AllTaskFinish状态;控制流程的任务id(整个分类流程业主分类结尾,以此为标记)
|
|
|
if tt.ID == tools.ControlLastTaskId {
|
|
|
tools.AllTaskFinish = true
|
|
|
}
|
|
@@ -580,7 +584,7 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
|
json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
|
|
|
}
|
|
|
idcoll := tt.S_idcoll
|
|
|
- if idcoll != "" { //idcoll中查询id区间
|
|
|
+ if idcoll != "" { //idcoll中查询id区间,bidding_processing_ids
|
|
|
nextNodeSid, nextNodeEid = FindId(idcoll) //查询id段
|
|
|
if nextNodeSid != "" && nextNodeEid != "" && nextNodeSid <= nextNodeEid {
|
|
|
q["_id"] = bson.M{
|
|
@@ -1267,7 +1271,9 @@ func NewLoadTestTask(_id, s_mgourl, s_mgodb, s_coll, i_poolsize, s_startid, s_en
|
|
|
//加载任务
|
|
|
func NewLoadTask(_id string, res *tools.JSON) {
|
|
|
defer tools.Catch()
|
|
|
- InitTaskData(_id) //初始化任务信息
|
|
|
+ //初始化任务信息
|
|
|
+ InitTaskData(_id)
|
|
|
+ //初始化任务mgo配置信息
|
|
|
bres, tt, msg := NewAnalyTask(_id, "", "", "", 5)
|
|
|
tt.I_status = 1
|
|
|
log.Println(tt.S_mgodb, tt.S_name, tt.I_thread)
|