|
@@ -8,6 +8,7 @@ import (
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"errors"
|
|
"errors"
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "go.uber.org/zap"
|
|
"log"
|
|
"log"
|
|
mu "mfw/util"
|
|
mu "mfw/util"
|
|
"net"
|
|
"net"
|
|
@@ -22,6 +23,7 @@ import (
|
|
"time"
|
|
"time"
|
|
"tools"
|
|
"tools"
|
|
u "util"
|
|
u "util"
|
|
|
|
+ "zlog"
|
|
|
|
|
|
"github.com/cron"
|
|
"github.com/cron"
|
|
"github.com/donnie4w/go-logger/logger"
|
|
"github.com/donnie4w/go-logger/logger"
|
|
@@ -30,6 +32,7 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
func init() {
|
|
func init() {
|
|
|
|
+ InitLog()
|
|
REG, _ = regexp.Compile(`\(.*?\)\d*`)
|
|
REG, _ = regexp.Compile(`\(.*?\)\d*`)
|
|
REG1, _ = regexp.Compile(`\(.*?\)`)
|
|
REG1, _ = regexp.Compile(`\(.*?\)`)
|
|
//log.Println(REG.FindAllString("(平台|软件|电子商务|多媒体|通讯设备)1(建设|采购|开发)2", -1))
|
|
//log.Println(REG.FindAllString("(平台|软件|电子商务|多媒体|通讯设备)1(建设|采购|开发)2", -1))
|
|
@@ -48,6 +51,22 @@ func init() {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func InitLog() {
|
|
|
|
+ err := zlog.InitLog(
|
|
|
|
+ zlog.Path("./logs/log.out"),
|
|
|
|
+ //zlog.Path(""),
|
|
|
|
+ zlog.Level("info"),
|
|
|
|
+ zlog.Compress(true),
|
|
|
|
+ zlog.MaxSize(10),
|
|
|
|
+ zlog.MaxBackups(10),
|
|
|
|
+ zlog.MaxAge(7),
|
|
|
|
+ zlog.Format("json"),
|
|
|
|
+ )
|
|
|
|
+ if err != nil {
|
|
|
|
+ fmt.Printf("InitLog failed: %v\n", err)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
// var UdpSess *tools.MongodbSim
|
|
// var UdpSess *tools.MongodbSim
|
|
var REG *regexp.Regexp
|
|
var REG *regexp.Regexp
|
|
var REG1 *regexp.Regexp
|
|
var REG1 *regexp.Regexp
|
|
@@ -123,6 +142,7 @@ type TTask struct {
|
|
Task_QueryFieldArr []string //用于合并数据
|
|
Task_QueryFieldArr []string //用于合并数据
|
|
Task_QueryFieldMap map[string]interface{} //用于仅查询字段
|
|
Task_QueryFieldMap map[string]interface{} //用于仅查询字段
|
|
Dbtype string //用于区分连得是哪个库,使用不同的用户密码
|
|
Dbtype string //用于区分连得是哪个库,使用不同的用户密码
|
|
|
|
+ isWait bool //是否等待;非UDP 请求,都需要等待上次任务执行完毕
|
|
}
|
|
}
|
|
type RuleDFA struct {
|
|
type RuleDFA struct {
|
|
Match []DFA //包含的敏感词
|
|
Match []DFA //包含的敏感词
|
|
@@ -306,7 +326,8 @@ func DealRules(rules []interface{}) (i_rule []interface{}) {
|
|
ru := string(rs[1 : len(rs)-1])
|
|
ru := string(rs[1 : len(rs)-1])
|
|
rureg, err := regexp.Compile(ru)
|
|
rureg, err := regexp.Compile(ru)
|
|
if err != nil {
|
|
if err != nil {
|
|
- log.Println("error---rule:", r)
|
|
|
|
|
|
+ //log.Println("error---rule:", r)
|
|
|
|
+ zlog.Info("DealRules", zap.String("error---rule:", r))
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
i_rule = append(i_rule, []interface{}{rureg}...)
|
|
i_rule = append(i_rule, []interface{}{rureg}...)
|
|
@@ -368,7 +389,8 @@ func (tt *TTask) SStop() bool {
|
|
if tt.I_status == 1 {
|
|
if tt.I_status == 1 {
|
|
tt.I_status = 0
|
|
tt.I_status = 0
|
|
tt.FlagQuit <- true
|
|
tt.FlagQuit <- true
|
|
- log.Println("开始停止...")
|
|
|
|
|
|
+ zlog.Info("SStop", zap.String("开始停止任务", tt.S_name))
|
|
|
|
+ //log.Println("开始停止...")
|
|
}
|
|
}
|
|
return true
|
|
return true
|
|
}
|
|
}
|
|
@@ -487,7 +509,8 @@ func (tt *TTask) RRunTest(s_startid, s_endid, s_query, filename string) {
|
|
w.Flush()
|
|
w.Flush()
|
|
f.Close()
|
|
f.Close()
|
|
}
|
|
}
|
|
- log.Println("运行RUNTEST-OVER")
|
|
|
|
|
|
+ //log.Println("运行RUNTEST-OVER")
|
|
|
|
+ zlog.Info("RRunTest", zap.String("任务测试结束", tt.S_name))
|
|
}
|
|
}
|
|
|
|
|
|
func Exist(filename string) bool {
|
|
func Exist(filename string) bool {
|
|
@@ -505,14 +528,16 @@ OVER:
|
|
tt.B_Running = false
|
|
tt.B_Running = false
|
|
select {
|
|
select {
|
|
case <-tt.FlagQuit: //结果任务控制
|
|
case <-tt.FlagQuit: //结果任务控制
|
|
- log.Println("退出,RUN", tt.S_name)
|
|
|
|
|
|
+ //log.Println("退出,RUN", tt.S_name)
|
|
|
|
+ zlog.Info("RRun", zap.String("退出任务", tt.S_name))
|
|
tt = nil
|
|
tt = nil
|
|
break OVER
|
|
break OVER
|
|
case <-first: //第一次执行控制
|
|
case <-first: //第一次执行控制
|
|
if tools.ControlTaskRun { //任务流程控制,现有模式用不到,默认false
|
|
if tools.ControlTaskRun { //任务流程控制,现有模式用不到,默认false
|
|
tools.AllTaskFinish = false
|
|
tools.AllTaskFinish = false
|
|
}
|
|
}
|
|
- log.Println("第一次执行任务:", tt.S_name)
|
|
|
|
|
|
+ //log.Println("第一次执行任务:", tt.S_name)
|
|
|
|
+ zlog.Info("RRun", zap.String("第一次执行任务:", tt.S_name))
|
|
newtaskrun(tt)
|
|
newtaskrun(tt)
|
|
case <-time.Tick(time.Duration(tt.I_rate) * time.Second): //任务定时控制
|
|
case <-time.Tick(time.Duration(tt.I_rate) * time.Second): //任务定时控制
|
|
//执行定时任务前,检查任务是否更新了rule
|
|
//执行定时任务前,检查任务是否更新了rule
|
|
@@ -529,22 +554,31 @@ OVER:
|
|
tools.AllTaskFinish = false
|
|
tools.AllTaskFinish = false
|
|
newtaskrun(tt)
|
|
newtaskrun(tt)
|
|
} else {
|
|
} else {
|
|
- log.Println("上轮任务暂未完成")
|
|
|
|
|
|
+ //log.Println("上轮任务暂未完成")
|
|
|
|
+ zlog.Info("RRun", zap.String("上轮任务暂未完成", tt.S_name))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func newtaskrun(tt *TTask) {
|
|
func newtaskrun(tt *TTask) {
|
|
|
|
+ //不需要等待上次迭代结束时
|
|
|
|
+ if !tt.isWait {
|
|
|
|
+ go NewTaskRunAll(tt, false, nil)
|
|
|
|
+ } else {
|
|
|
|
+ zlog.Info("newtaskrun", zap.String("上轮迭代暂未完成", tt.S_name), zap.Any("上次查询语句", tt.S_query))
|
|
|
|
+ }
|
|
|
|
|
|
- NewTaskRunAll(tt, false, nil)
|
|
|
|
}
|
|
}
|
|
|
|
|
|
// NewTaskRunAll 常规任务和udp非合并数据处理方法
|
|
// NewTaskRunAll 常规任务和udp非合并数据处理方法
|
|
func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
total := 0
|
|
total := 0
|
|
tools.Try(func() { //不加这一层defer运行不了!!!
|
|
tools.Try(func() { //不加这一层defer运行不了!!!
|
|
- timespan := false //时间间隔(控制数据条数打印)
|
|
|
|
|
|
+ if !budp { //如果不是udp调用,说明是页面轮询迭代任务
|
|
|
|
+ tt.isWait = true
|
|
|
|
+ }
|
|
|
|
+ //timespan := false //时间间隔(控制数据条数打印)
|
|
tt.B_Running = true
|
|
tt.B_Running = true
|
|
defer func() {
|
|
defer func() {
|
|
//业主分类执行完修改AllTaskFinish状态;控制流程的任务id(整个分类流程业主分类结尾,以此为标记)
|
|
//业主分类执行完修改AllTaskFinish状态;控制流程的任务id(整个分类流程业主分类结尾,以此为标记)
|
|
@@ -552,6 +586,7 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
tools.AllTaskFinish = true
|
|
tools.AllTaskFinish = true
|
|
}
|
|
}
|
|
tt.B_Running = false
|
|
tt.B_Running = false
|
|
|
|
+ tt.isWait = false
|
|
}()
|
|
}()
|
|
//开始识别
|
|
//开始识别
|
|
pool := make(chan bool, tt.I_thread)
|
|
pool := make(chan bool, tt.I_thread)
|
|
@@ -587,9 +622,9 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
//有结果表有关联字段,在结果表上根据关联字段更新;有结果表没有关联字段,在结果表根据_id段更新
|
|
//有结果表有关联字段,在结果表上根据关联字段更新;有结果表没有关联字段,在结果表根据_id段更新
|
|
//没有结果表在查询表上更新
|
|
//没有结果表在查询表上更新
|
|
//log.Println("lastid:", tt.LastId, "查询方式:", tt.S_querycon, "时间:", tt.S_starttime, "条件:", tt.S_query, "table:", tt.S_table, "s_timefieldname:", tt.S_timefieldname)
|
|
//log.Println("lastid:", tt.LastId, "查询方式:", tt.S_querycon, "时间:", tt.S_starttime, "条件:", tt.S_query, "table:", tt.S_table, "s_timefieldname:", tt.S_timefieldname)
|
|
- sort := ""
|
|
|
|
|
|
+ //sort := ""
|
|
if !budp { //非udp查询条件
|
|
if !budp { //非udp查询条件
|
|
- sort = "_id"
|
|
|
|
|
|
+ //sort = "_id"
|
|
if tt.S_query != "" { //有查询条件
|
|
if tt.S_query != "" { //有查询条件
|
|
json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
|
|
json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
|
|
}
|
|
}
|
|
@@ -602,15 +637,16 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
"$lte": u.StringTOBsonId(nextNodeEid),
|
|
"$lte": u.StringTOBsonId(nextNodeEid),
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- log.Println("定时任务", tt.S_name, "查询", tt.S_idcoll, "时间段出错", nextNodeSid, nextNodeEid)
|
|
|
|
|
|
+ zlog.Info("NewTaskRunAll", zap.String("定时任务", tt.S_name), zap.String("查询条件", tt.S_idcoll), zap.String("时间段出错", fmt.Sprintf("%s-%s", nextNodeSid, nextNodeEid)))
|
|
|
|
+ //log.Println("定时任务", tt.S_name, "查询", tt.S_idcoll, "时间段出错", nextNodeSid, nextNodeEid)
|
|
tools.AllTaskFinish = true //为查询到数据视为此轮任务完成
|
|
tools.AllTaskFinish = true //为查询到数据视为此轮任务完成
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- stime, _ := strconv.ParseInt(nextNodeSid[:8], 16, 64) //取id前8位转成时间戳
|
|
|
|
- etime, _ := strconv.ParseInt(nextNodeEid[:8], 16, 64) //
|
|
|
|
- if etime-stime < 1800 { //时间跨度小于半小时
|
|
|
|
- timespan = true
|
|
|
|
- }
|
|
|
|
|
|
+ //stime, _ := strconv.ParseInt(nextNodeSid[:8], 16, 64) //取id前8位转成时间戳
|
|
|
|
+ //etime, _ := strconv.ParseInt(nextNodeEid[:8], 16, 64) //
|
|
|
|
+ //if etime-stime < 60 { //时间跨度小于半小时,1分钟
|
|
|
|
+ // timespan = true
|
|
|
|
+ //}
|
|
} else {
|
|
} else {
|
|
if q["_id"] != nil {
|
|
if q["_id"] != nil {
|
|
if _id, ok := q["_id"].(string); ok {
|
|
if _id, ok := q["_id"].(string); ok {
|
|
@@ -677,7 +713,7 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else { //udp查询条件
|
|
} else { //udp查询条件
|
|
- sort = "-_id"
|
|
|
|
|
|
+ //sort = "-_id"
|
|
if tt.S_query != "" { //有查询条件
|
|
if tt.S_query != "" { //有查询条件
|
|
json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
|
|
json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
|
|
}
|
|
}
|
|
@@ -685,26 +721,42 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
for k, v := range tmpq {
|
|
for k, v := range tmpq {
|
|
q[k] = v
|
|
q[k] = v
|
|
}
|
|
}
|
|
- sid := mapInfo["gtid"].(string)
|
|
|
|
- eid := mapInfo["lteid"].(string)
|
|
|
|
- stime, _ := strconv.ParseInt(sid[:8], 16, 64)
|
|
|
|
- etime, _ := strconv.ParseInt(eid[:8], 16, 64)
|
|
|
|
- if etime-stime < 1800 { //时间跨度小于半小时
|
|
|
|
- timespan = true
|
|
|
|
- }
|
|
|
|
|
|
+ //sid := mapInfo["gtid"].(string)
|
|
|
|
+ //eid := mapInfo["lteid"].(string)
|
|
|
|
+ //stime, _ := strconv.ParseInt(sid[:8], 16, 64)
|
|
|
|
+ //etime, _ := strconv.ParseInt(eid[:8], 16, 64)
|
|
|
|
+ //if etime-stime < 60 { //时间跨度小于半小时
|
|
|
|
+ // timespan = true
|
|
|
|
+ //}
|
|
}
|
|
}
|
|
//task
|
|
//task
|
|
tasksess := tt.MgoTask.GetMgoConn()
|
|
tasksess := tt.MgoTask.GetMgoConn()
|
|
defer tt.MgoTask.DestoryMongoConn(tasksess)
|
|
defer tt.MgoTask.DestoryMongoConn(tasksess)
|
|
//通过ID 查询数据时,才打印日志
|
|
//通过ID 查询数据时,才打印日志
|
|
- if tt.S_querycon == "1" {
|
|
|
|
- log.Println("运行", tt.S_name, "start")
|
|
|
|
- log.Println("线程数:", tt.I_thread, "查询语句", q)
|
|
|
|
- log.Println("查询---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
|
|
|
|
- log.Println("select:", tt.Task_QueryFieldMap, tt.Task_QueryFieldArr)
|
|
|
|
- }
|
|
|
|
|
|
+ //if tt.S_querycon == "1" {
|
|
|
|
+ zlog.Info("NewTaskRunAll", zap.String("开始执行任务", tt.S_name), zap.Any("线程数:", tt.I_thread), zap.Any("查询语句:", q))
|
|
|
|
+ zlog.Info("NewTaskRunAll", zap.Any("查询---", fmt.Sprintf("%s,%s,%s,%s", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)))
|
|
|
|
+ zlog.Info("NewTaskRunAll", zap.Any("select:", tt.Task_QueryFieldMap), zap.Any("select", tt.Task_QueryFieldArr))
|
|
|
|
+
|
|
|
|
+ //log.Println("运行", tt.S_name, "start")
|
|
|
|
+ //log.Println("线程数:", tt.I_thread, "查询语句", q)
|
|
|
|
+ //log.Println("查询---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
|
|
|
|
+ //log.Println("select:", tt.Task_QueryFieldMap, tt.Task_QueryFieldArr)
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ zlog.Info("aaaaaaaaaaaaaa", zap.Any("aaaa", tt.MgoTask.Count(tt.S_coll, q)))
|
|
|
|
+
|
|
|
|
+ count, err := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(tt.Task_QueryFieldMap).Count()
|
|
|
|
+ zlog.Info("bbbbbbbbbbb")
|
|
|
|
+ zlog.Info("NewTaskRunAll", zap.Error(err), zap.Int64("count", count), zap.String("任务名称", tt.S_name))
|
|
|
|
+
|
|
|
|
+ //if count > 0 {
|
|
|
|
+ // zlog.Info("NewTaskRunAll", zap.String("开始执行任务", tt.S_name), zap.Any("线程数:", tt.I_thread), zap.Any("查询语句:", q))
|
|
|
|
+ // zlog.Info("NewTaskRunAll", zap.Any("查询---", fmt.Sprintf("%s,%s,%s,%s", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)))
|
|
|
|
+ // zlog.Info("NewTaskRunAll", zap.Any("select:", tt.Task_QueryFieldMap), zap.Any("select", tt.Task_QueryFieldArr))
|
|
|
|
+ //}
|
|
|
|
|
|
- extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(tt.Task_QueryFieldMap).Sort(sort).Iter()
|
|
|
|
|
|
+ extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(tt.Task_QueryFieldMap).Iter()
|
|
arr := [][]map[string]interface{}{}
|
|
arr := [][]map[string]interface{}{}
|
|
if tt.I_wordcount == 1 {
|
|
if tt.I_wordcount == 1 {
|
|
tt.WordCount = map[string]map[string]int{}
|
|
tt.WordCount = map[string]map[string]int{}
|
|
@@ -712,9 +764,11 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
sum := 0
|
|
sum := 0
|
|
for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ {
|
|
for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ {
|
|
tid := tmp["_id"]
|
|
tid := tmp["_id"]
|
|
- if !timespan && sum%2000 == 0 {
|
|
|
|
- log.Println("current:", sum, tt.S_name)
|
|
|
|
|
|
+ if sum%1000 == 0 {
|
|
|
|
+ zlog.Info("current:", zap.String("当前任务名称", tt.S_name), zap.Any("当前数量:", sum))
|
|
|
|
+ //log.Println("current:", sum, tt.S_name)
|
|
}
|
|
}
|
|
|
|
+
|
|
pool <- true
|
|
pool <- true
|
|
wg.Add(1)
|
|
wg.Add(1)
|
|
go func(tmp map[string]interface{}) {
|
|
go func(tmp map[string]interface{}) {
|
|
@@ -759,7 +813,8 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
id := mongodb.BsonIdToSId(tmp["_id"])
|
|
id := mongodb.BsonIdToSId(tmp["_id"])
|
|
val := tools.OssGetObject(id, tools.BucketName)
|
|
val := tools.OssGetObject(id, tools.BucketName)
|
|
if val == "" {
|
|
if val == "" {
|
|
- log.Println("获取oss 内容为空", id)
|
|
|
|
|
|
+ //log.Println("获取oss 内容为空", id)
|
|
|
|
+ zlog.Info("NewTaskRunAll", zap.Any("获取oss 内容为空", id))
|
|
} else {
|
|
} else {
|
|
tmp["detail"] = val
|
|
tmp["detail"] = val
|
|
}
|
|
}
|
|
@@ -950,12 +1005,13 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
|
|
|
|
total = sum
|
|
total = sum
|
|
//通过ID 查询分类数据才打印日志
|
|
//通过ID 查询分类数据才打印日志
|
|
- if tt.S_querycon == "1" {
|
|
|
|
- log.Println("总数:————", sum)
|
|
|
|
- if timespan {
|
|
|
|
- log.Println("current:————", sum)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ zlog.Info("current:", zap.String("当前任务结束:", tt.S_name), zap.Int("总数;", sum))
|
|
|
|
+ //if tt.S_querycon == "1" {
|
|
|
|
+ // log.Println("总数:————", sum)
|
|
|
|
+ // if timespan {
|
|
|
|
+ // log.Println("current:————", sum)
|
|
|
|
+ // }
|
|
|
|
+ //}
|
|
|
|
|
|
wg.Wait()
|
|
wg.Wait()
|
|
lock.Lock()
|
|
lock.Lock()
|
|
@@ -1015,7 +1071,8 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
|
|
}
|
|
}
|
|
//InitRule()
|
|
//InitRule()
|
|
if tt.S_querycon == "1" {
|
|
if tt.S_querycon == "1" {
|
|
- log.Println("运行", tt.S_name, "over", oid, " endid:", tt.LastId)
|
|
|
|
|
|
+ zlog.Info("运行任务结束:"+tt.S_name, zap.String("over", oid), zap.String("endid", tt.LastId))
|
|
|
|
+ //log.Println("运行", tt.S_name, "over", oid, " endid:", tt.LastId)
|
|
}
|
|
}
|
|
//定时任务完成发送udp信号调抽取
|
|
//定时任务完成发送udp信号调抽取
|
|
if tools.Extract["preNodeId"] == tt.ID { //常规招标定时任务udp调用抽取
|
|
if tools.Extract["preNodeId"] == tt.ID { //常规招标定时任务udp调用抽取
|
|
@@ -1083,7 +1140,8 @@ func UdpTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}, stype s
|
|
}
|
|
}
|
|
udpsess := tt.MulMgo.GetMgoConn()
|
|
udpsess := tt.MulMgo.GetMgoConn()
|
|
if udpsess == nil {
|
|
if udpsess == nil {
|
|
- log.Println("连接为空", tt.S_name, mapInfo)
|
|
|
|
|
|
+ zlog.Info("UdpTaskRunAll", zap.String("连接为空", tt.S_name), zap.Any("查询条件", mapInfo))
|
|
|
|
+ //log.Println("连接为空", tt.S_name, mapInfo)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
defer tt.MulMgo.DestoryMongoConn(udpsess)
|
|
defer tt.MulMgo.DestoryMongoConn(udpsess)
|
|
@@ -1091,9 +1149,12 @@ func UdpTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}, stype s
|
|
//task
|
|
//task
|
|
tasksess := tt.MgoTask.GetMgoConn()
|
|
tasksess := tt.MgoTask.GetMgoConn()
|
|
defer tt.MgoTask.DestoryMongoConn(tasksess)
|
|
defer tt.MgoTask.DestoryMongoConn(tasksess)
|
|
- log.Println("线程数:", tt.I_thread, "查询语句", q)
|
|
|
|
- log.Println("task---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
|
|
|
|
- log.Println("select:", tt.Task_QueryFieldMap)
|
|
|
|
|
|
+ zlog.Info("UdpTaskRunAll", zap.String("当前任务:", tt.S_name), zap.Any("线程数:", tt.I_thread), zap.Any("查询语句:", q))
|
|
|
|
+ zlog.Info("UdpTaskRunAll", zap.Any("task---", fmt.Sprintf("%s,%s,%s,%s", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)))
|
|
|
|
+ zlog.Info("UdpTaskRunAll", zap.Any("select:", tt.Task_QueryFieldMap))
|
|
|
|
+ //log.Println("线程数:", tt.I_thread, "查询语句", q)
|
|
|
|
+ //log.Println("task---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
|
|
|
|
+ //log.Println("select:", tt.Task_QueryFieldMap)
|
|
extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(&q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
|
|
extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(&q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
|
|
arr := [][]map[string]interface{}{}
|
|
arr := [][]map[string]interface{}{}
|
|
oid := tt.LastId
|
|
oid := tt.LastId
|
|
@@ -1141,7 +1202,8 @@ func UdpTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}, stype s
|
|
}
|
|
}
|
|
tid := tmp["_id"]
|
|
tid := tmp["_id"]
|
|
if !timespan && sum%2000 == 0 {
|
|
if !timespan && sum%2000 == 0 {
|
|
- log.Println("current:", sum, tid)
|
|
|
|
|
|
+ zlog.Info("UdpTaskRunAll", zap.Any("current:", sum), zap.Any("_id", tid))
|
|
|
|
+ //log.Println("current:", sum, tid)
|
|
}
|
|
}
|
|
pool <- true
|
|
pool <- true
|
|
wg.Add(1)
|
|
wg.Add(1)
|
|
@@ -1380,7 +1442,8 @@ func UdpTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}, stype s
|
|
go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, settime, false, false)
|
|
go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, settime, false, false)
|
|
}
|
|
}
|
|
//InitRule()
|
|
//InitRule()
|
|
- log.Println("运行", tt.S_name, "over")
|
|
|
|
|
|
+ //log.Println("运行", tt.S_name, "over")
|
|
|
|
+ zlog.Info("UdpTaskRunAll", zap.String("运行 -- over ", tt.S_name))
|
|
})
|
|
})
|
|
|
|
|
|
return total
|
|
return total
|
|
@@ -1492,7 +1555,8 @@ func NewLoadTask(_id string, res *tools.JSON) {
|
|
if bres && tt != nil {
|
|
if bres && tt != nil {
|
|
res.Status = true
|
|
res.Status = true
|
|
NEWTASKPOOL[_id] = tt //存入当前启动任务
|
|
NEWTASKPOOL[_id] = tt //存入当前启动任务
|
|
- log.Println("加载", tt.S_name, "完成...", tt.S_query)
|
|
|
|
|
|
+ zlog.Info("NewLoadTask", zap.String("加载任务完成", tt.S_name), zap.Any("任务查询条件:", tt.S_query))
|
|
|
|
+ //log.Println("加载", tt.S_name, "完成...", tt.S_query)
|
|
go tt.RRun()
|
|
go tt.RRun()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1669,6 +1733,7 @@ func InitTimeTask() *TTask {
|
|
func StartTask(t *TTask) {
|
|
func StartTask(t *TTask) {
|
|
defer util.Catch()
|
|
defer util.Catch()
|
|
logger.Debug("开始执行定时任务")
|
|
logger.Debug("开始执行定时任务")
|
|
|
|
+ zlog.Info("StartTask", zap.String("开始执行定时任务", t.S_name))
|
|
query := make(map[string]interface{})
|
|
query := make(map[string]interface{})
|
|
if util.ObjToString(tools.Config["dbname_old"]) != "" {
|
|
if util.ObjToString(tools.Config["dbname_old"]) != "" {
|
|
query = map[string]interface{}{
|
|
query = map[string]interface{}{
|
|
@@ -1687,7 +1752,8 @@ func StartTask(t *TTask) {
|
|
}
|
|
}
|
|
|
|
|
|
order := map[string]interface{}{"_id": -1}
|
|
order := map[string]interface{}{"_id": -1}
|
|
- logger.Debug("query:", query)
|
|
|
|
|
|
+ //logger.Debug("query:", query)
|
|
|
|
+ zlog.Info("StartTask", zap.Any("query", query))
|
|
list, _ := tools.MgoClass.Find(t.S_idcoll, query, order, nil, false, -1, -1)
|
|
list, _ := tools.MgoClass.Find(t.S_idcoll, query, order, nil, false, -1, -1)
|
|
sid := t.S_startid
|
|
sid := t.S_startid
|
|
eid := ""
|
|
eid := ""
|
|
@@ -1717,7 +1783,8 @@ func StartTask(t *TTask) {
|
|
sess := t.MgoTask.GetMgoConn()
|
|
sess := t.MgoTask.GetMgoConn()
|
|
defer t.MgoTask.DestoryMongoConn(sess)
|
|
defer t.MgoTask.DestoryMongoConn(sess)
|
|
count, _ := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Count()
|
|
count, _ := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Count()
|
|
- logger.Debug("count:", count, " query:", query)
|
|
|
|
|
|
+ //logger.Debug("count:", count, " query:", query)
|
|
|
|
+ zlog.Info("StartTask", zap.Any("count", count), zap.Any("query", query))
|
|
if count == 0 { //此轮任务没有查到数据
|
|
if count == 0 { //此轮任务没有查到数据
|
|
return
|
|
return
|
|
}
|
|
}
|
|
@@ -1727,6 +1794,7 @@ func StartTask(t *TTask) {
|
|
pool := make(chan bool, t.I_thread)
|
|
pool := make(chan bool, t.I_thread)
|
|
sum := 0
|
|
sum := 0
|
|
logger.Debug("select:", t.Task_QueryFieldMap)
|
|
logger.Debug("select:", t.Task_QueryFieldMap)
|
|
|
|
+ zlog.Info("StartTask", zap.Any("select:", t.Task_QueryFieldMap))
|
|
it := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Select(t.Task_QueryFieldMap).Iter()
|
|
it := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Select(t.Task_QueryFieldMap).Iter()
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
|
|
pool <- true
|
|
pool <- true
|
|
@@ -1779,7 +1847,8 @@ func StartTask(t *TTask) {
|
|
arr = [][]map[string]interface{}{}
|
|
arr = [][]map[string]interface{}{}
|
|
}
|
|
}
|
|
lock.Unlock()
|
|
lock.Unlock()
|
|
- logger.Debug("定时任务执行完毕 count:", sum)
|
|
|
|
|
|
+ //logger.Debug("定时任务执行完毕 count:", sum)
|
|
|
|
+ zlog.Info("StartTask", zap.Int("定时任务执行完毕 count:", sum))
|
|
UdpRunProjectForecast(sid, eid)
|
|
UdpRunProjectForecast(sid, eid)
|
|
}
|
|
}
|
|
logger.Debug("Udp通知项目预测执行完毕")
|
|
logger.Debug("Udp通知项目预测执行完毕")
|
|
@@ -1792,6 +1861,7 @@ func UdpRunProjectForecast(sid, eid string) {
|
|
"lteid": eid,
|
|
"lteid": eid,
|
|
})
|
|
})
|
|
logger.Debug("定时任务通知项目预测:", string(by))
|
|
logger.Debug("定时任务通知项目预测:", string(by))
|
|
|
|
+ zlog.Info("UdpRunProjectForecast", zap.String("定时任务通知项目预测", string(by)))
|
|
addr := &net.UDPAddr{
|
|
addr := &net.UDPAddr{
|
|
IP: net.ParseIP(tools.NextNodeAddr),
|
|
IP: net.ParseIP(tools.NextNodeAddr),
|
|
Port: tools.NextNodePort,
|
|
Port: tools.NextNodePort,
|
|
@@ -1802,19 +1872,21 @@ func UdpRunProjectForecast(sid, eid string) {
|
|
// SendAi 调用大模型招标分类;map[result:[结果-中标] status:200]
|
|
// SendAi 调用大模型招标分类;map[result:[结果-中标] status:200]
|
|
func SendAi(data map[string]interface{}, url string) (res map[string]interface{}) {
|
|
func SendAi(data map[string]interface{}, url string) (res map[string]interface{}) {
|
|
// 设置 2 秒的超时
|
|
// 设置 2 秒的超时
|
|
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
|
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Microsecond)
|
|
defer cancel()
|
|
defer cancel()
|
|
|
|
|
|
//url := "http://192.168.3.109:16688"
|
|
//url := "http://192.168.3.109:16688"
|
|
jsonData, err := json.Marshal(data)
|
|
jsonData, err := json.Marshal(data)
|
|
if err != nil {
|
|
if err != nil {
|
|
- fmt.Println("JSON marshal error:", err)
|
|
|
|
|
|
+ zlog.Info("SendAi", zap.Any("JSON marshal error:", err))
|
|
|
|
+ //fmt.Println("JSON marshal error:", err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
|
if err != nil {
|
|
if err != nil {
|
|
- fmt.Println("Request error:", err)
|
|
|
|
|
|
+ zlog.Info("SendAi", zap.Any("Request error:", err))
|
|
|
|
+ //fmt.Println("Request error:", err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1828,17 +1900,19 @@ func SendAi(data map[string]interface{}, url string) (res map[string]interface{}
|
|
if err != nil {
|
|
if err != nil {
|
|
// 使用 errors.Is 检查错误是否是超时错误
|
|
// 使用 errors.Is 检查错误是否是超时错误
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
- fmt.Println("Request timed out")
|
|
|
|
|
|
+ //fmt.Println("Request timed out")
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- fmt.Println("Request error:", err)
|
|
|
|
|
|
+ //fmt.Println("Request error:", err)
|
|
|
|
+ zlog.Info("SendAi", zap.Any("Request error:", err))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
defer resp.Body.Close()
|
|
defer resp.Body.Close()
|
|
|
|
|
|
err = json.NewDecoder(resp.Body).Decode(&res)
|
|
err = json.NewDecoder(resp.Body).Decode(&res)
|
|
if err != nil {
|
|
if err != nil {
|
|
- fmt.Println("Response decoding error:", err)
|
|
|
|
|
|
+ zlog.Info("SendAi", zap.Any("Response decoding error:", err))
|
|
|
|
+ //fmt.Println("Response decoding error:", err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|