|
@@ -76,43 +76,43 @@ type Class struct {
|
|
S_savefield string
|
|
S_savefield string
|
|
}
|
|
}
|
|
type TTask struct {
|
|
type TTask struct {
|
|
- Class []*Class
|
|
|
|
- ID string
|
|
|
|
- S_name string
|
|
|
|
- I_rate int
|
|
|
|
- S_class string
|
|
|
|
- S_mgourl string
|
|
|
|
- S_mgodb string
|
|
|
|
- S_collection string
|
|
|
|
- I_poolsize int
|
|
|
|
- S_startid string
|
|
|
|
- I_status int
|
|
|
|
- S_attr string
|
|
|
|
- AttrVal int
|
|
|
|
- S_coll string
|
|
|
|
|
|
+ Class []*Class //任务中类集合
|
|
|
|
+ ID string //任务id
|
|
|
|
+ S_name string //任务名称
|
|
|
|
+ I_rate int //任务执行频率
|
|
|
|
+ S_class string //任务中类id
|
|
|
|
+ S_mgourl string //任务mgo addr
|
|
|
|
+ S_mgodb string //任务mgo db
|
|
|
|
+ S_collection string //查询、存储表
|
|
|
|
+ I_poolsize int //任务连接池个数
|
|
|
|
+ S_startid string //任务起始id
|
|
|
|
+ I_status int //控制任务状态属性
|
|
|
|
+ S_attr string //标识属性key值
|
|
|
|
+ AttrVal int //标识属性val值
|
|
|
|
+ S_coll string //查询、存储表
|
|
LastId string
|
|
LastId string
|
|
Lock sync.Mutex
|
|
Lock sync.Mutex
|
|
- S_query string
|
|
|
|
- FlagQuit chan bool
|
|
|
|
- B_Running bool
|
|
|
|
- MgoTask *u.MongodbSim
|
|
|
|
- I_multiclass int //是否支持多分类
|
|
|
|
- I_savetype int //1 名称 2 值
|
|
|
|
- I_thread int //线程数
|
|
|
|
- I_wordcount int
|
|
|
|
- WordCount map[string]map[string]int
|
|
|
|
|
|
+ S_query string //任务查询条件
|
|
|
|
+ FlagQuit chan bool //任务结束控制
|
|
|
|
+ B_Running bool //任务是否执行
|
|
|
|
+ MgoTask *u.MongodbSim //任务查询表mgo
|
|
|
|
+ I_multiclass int //是否支持多分类
|
|
|
|
+ I_savetype int //1 名称 2 值
|
|
|
|
+ I_thread int //线程数
|
|
|
|
+ I_wordcount int //词频统计
|
|
|
|
+ WordCount map[string]map[string]int //词频统计集合
|
|
WcLock sync.Mutex
|
|
WcLock sync.Mutex
|
|
- Task_PreRule string
|
|
|
|
- S_table string //自定义表
|
|
|
|
- S_querycon string //查询方式 1是id查询 0是时间查询
|
|
|
|
- S_starttime int64 //起始时间
|
|
|
|
- S_timefieldname string //各表所查时间字段名称
|
|
|
|
- S_asfield string //查询表与结果表关联字段
|
|
|
|
- I_fieldUpdate int //分类中的保存字段信息 0:覆盖 1:更新
|
|
|
|
- MulMgo *u.MongodbSim
|
|
|
|
- MulColl string
|
|
|
|
|
|
+ Task_PreRule string //任务前置过滤
|
|
|
|
+ S_table string //结果表
|
|
|
|
+ S_querycon string //查询方式 1是id查询 0是时间查询
|
|
|
|
+ S_starttime int64 //起始时间
|
|
|
|
+ S_timefieldname string //各表所查时间字段名称
|
|
|
|
+ S_asfield string //查询表与结果表关联字段
|
|
|
|
+ I_fieldUpdate int //分类中的保存字段信息 0:覆盖 1:更新
|
|
|
|
+ MulMgo *u.MongodbSim //联合查询的mgo配置信息
|
|
|
|
+ MulColl string //联合查询的表名
|
|
I_tasktype int //任务类型 0:常规任务 1:附件任务
|
|
I_tasktype int //任务类型 0:常规任务 1:附件任务
|
|
- S_idcoll string //正式环境查询附件保存的数据id段
|
|
|
|
|
|
+ S_idcoll string //正式环境查询数据id段
|
|
B_UpdateRule bool //是否更新任务下的规则
|
|
B_UpdateRule bool //是否更新任务下的规则
|
|
S_classField string //分类字段
|
|
S_classField string //分类字段
|
|
Task_QueryFieldArr []string //用于合并数据
|
|
Task_QueryFieldArr []string //用于合并数据
|
|
@@ -216,8 +216,8 @@ func InitTaskData(_id string) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- if task.Task_QueryFieldMap == nil { //初始化查询字段
|
|
|
|
|
|
+ //初始化查询字段信息
|
|
|
|
+ if task.Task_QueryFieldMap == nil {
|
|
task.Task_QueryFieldMap = make(map[string]interface{})
|
|
task.Task_QueryFieldMap = make(map[string]interface{})
|
|
}
|
|
}
|
|
//Task_QueryFieldMap加入关联字段
|
|
//Task_QueryFieldMap加入关联字段
|
|
@@ -494,16 +494,16 @@ OVER:
|
|
for tt.I_status == 1 {
|
|
for tt.I_status == 1 {
|
|
tt.B_Running = false
|
|
tt.B_Running = false
|
|
select {
|
|
select {
|
|
- case <-tt.FlagQuit:
|
|
|
|
|
|
+ case <-tt.FlagQuit: //结果任务控制
|
|
log.Println("退出,RUN", tt.S_name)
|
|
log.Println("退出,RUN", tt.S_name)
|
|
tt = nil
|
|
tt = nil
|
|
break OVER
|
|
break OVER
|
|
- case <-first:
|
|
|
|
- if tools.ControlTaskRun {
|
|
|
|
|
|
+ case <-first: //第一次执行控制
|
|
|
|
+ if tools.ControlTaskRun { //任务流程控制
|
|
tools.AllTaskFinish = false
|
|
tools.AllTaskFinish = false
|
|
}
|
|
}
|
|
newtaskrun(tt)
|
|
newtaskrun(tt)
|
|
- case <-time.Tick(time.Duration(tt.I_rate) * time.Second):
|
|
|
|
|
|
+ case <-time.Tick(time.Duration(tt.I_rate) * time.Second): //任务定时控制
|
|
//执行定时任务前,检查任务是否更新了rule
|
|
//执行定时任务前,检查任务是否更新了rule
|
|
if tt.B_UpdateRule {
|
|
if tt.B_UpdateRule {
|
|
InitClassAndRuleData(tt.ID, tt) //重新加载任务的rule
|
|
InitClassAndRuleData(tt.ID, tt) //重新加载任务的rule
|
|
@@ -532,7 +532,7 @@ func newtaskrun(tt *TTask) {
|
|
func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
total := 0
|
|
total := 0
|
|
tools.Try(func() { //不加这一层defer运行不了!!!
|
|
tools.Try(func() { //不加这一层defer运行不了!!!
|
|
- timespan := false //时间间隔
|
|
|
|
|
|
+ timespan := false //时间间隔(控制数据条数打印)
|
|
tt.B_Running = true
|
|
tt.B_Running = true
|
|
defer func() {
|
|
defer func() {
|
|
//业主分类执行完修改AllTaskFinish状态
|
|
//业主分类执行完修改AllTaskFinish状态
|
|
@@ -634,8 +634,8 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
"$lte": u.StringTOBsonId(eId),
|
|
"$lte": u.StringTOBsonId(eId),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- time.Sleep(time.Minute * 2)
|
|
|
|
-
|
|
|
|
|
|
+ time.Sleep(time.Minute * 2) //按id查询,为了保证有新数据入库,每次休息2分钟
|
|
|
|
+ //测试环境q的赋值执行下述代码
|
|
//if tt.LastId != "" && q["_id"] == nil {
|
|
//if tt.LastId != "" && q["_id"] == nil {
|
|
// q["_id"] = map[string]interface{}{
|
|
// q["_id"] = map[string]interface{}{
|
|
// "$gt": u.StringTOBsonId(tt.LastId),
|
|
// "$gt": u.StringTOBsonId(tt.LastId),
|
|
@@ -668,7 +668,7 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
tasksess := tt.MgoTask.GetMgoConn()
|
|
tasksess := tt.MgoTask.GetMgoConn()
|
|
defer tt.MgoTask.DestoryMongoConn(tasksess)
|
|
defer tt.MgoTask.DestoryMongoConn(tasksess)
|
|
log.Println("线程数:", tt.I_thread, "查询语句", q)
|
|
log.Println("线程数:", tt.I_thread, "查询语句", q)
|
|
- log.Println("查询---111", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
|
|
|
|
|
|
+ log.Println("查询---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
|
|
log.Println("select:", tt.Task_QueryFieldMap, tt.Task_QueryFieldArr)
|
|
log.Println("select:", tt.Task_QueryFieldMap, tt.Task_QueryFieldArr)
|
|
extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
|
|
extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
|
|
arr := [][]map[string]interface{}{}
|
|
arr := [][]map[string]interface{}{}
|