package task import ( "bytes" "context" "encoding/csv" "encoding/json" "errors" "fmt" "log" mu "mfw/util" "net" "net/http" "os" "qfw/util" "reflect" "regexp" "strconv" "strings" "sync" "time" "tools" u "util" "github.com/cron" "github.com/donnie4w/go-logger/logger" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) func init() { REG, _ = regexp.Compile(`\(.*?\)\d*`) REG1, _ = regexp.Compile(`\(.*?\)`) //log.Println(REG.FindAllString("(平台|软件|电子商务|多媒体|通讯设备)1(建设|采购|开发)2", -1)) //加载所有任务 StartMonitor() if tools.NoAutoRun == 0 { rtask, b := tools.MgoClass.Find(tools.COLL_TASK, `{"i_status":1}`, nil, `{"_id":1}`, false, 0, 50) if b && rtask != nil && *rtask != nil { for _, t := range *rtask { _id := u.BsonIdToSId(t["_id"]) res := tools.JSON{} //LoadTask(_id, &res) NewLoadTask(_id, &res) } } } } // var UdpSess *tools.MongodbSim var REG *regexp.Regexp var REG1 *regexp.Regexp var TaskLock = sync.Mutex{} var NEWTASKPOOL = map[string]*TTask{} //存储当前任务 var TaskMap = map[string]*TTask{} var HangyeUdps = make(chan map[string]interface{}, 100) //初始化行业分类udp任务池 type Rule struct { Reg []interface{} //规则 DetailReg []interface{} //detail正则,目前只针对title、channel的二级招标分类中的中标规则 NotReg []interface{} //排除规则 Rid string Class *Class Rule_PreRule string S_name string S_code string S_pid string //Reg []*RuleDFA } type Class struct { Cid string TTask *TTask Rule []*Rule Class_PreRule string S_name string S_fields string S_code string S_default string S_pid string S_savefield string } type TTask struct { Class []*Class //任务中类集合 ID string //任务id S_name string //任务名称 I_rate int //任务执行频率 S_class string //任务中类id S_mgourl string //任务mgo addr S_mgodb string //任务mgo db S_collection string //查询、存储表 I_poolsize int //任务连接池个数 S_startid string //任务起始id I_status int //控制任务状态属性 S_attr string //标识属性key值 AttrVal int //标识属性val值 S_coll string //查询、存储表 LastId string //上次定时任务的结束id Lock sync.Mutex S_query string //任务查询条件 FlagQuit chan bool //任务结束控制 B_Running bool //任务是否执行 MgoTask *u.MongodbSim //任务查询表mgo I_multiclass int //是否支持多分类 I_savetype int //1 名称 2 值 I_thread int //线程数 I_wordcount int //词频统计 WordCount map[string]map[string]int //词频统计集合 WcLock sync.Mutex Task_PreRule string //任务前置过滤 S_table string //结果表 S_querycon string //查询方式 1是id查询 0是时间查询 S_starttime int64 //起始时间 S_timefieldname string //各表所查时间字段名称 S_asfield string //查询表与结果表关联字段 I_fieldUpdate int //分类中的保存字段信息 0:覆盖 1:更新 MulMgo *u.MongodbSim //联合查询的mgo配置信息 MulColl string //联合查询的表名 I_tasktype int //任务类型 0:常规任务 1:附件任务:2 S_idcoll string //正式环境查询数据id段 B_UpdateRule bool //是否更新任务下的规则 S_classField string //分类字段 Task_QueryFieldArr []string //用于合并数据 Task_QueryFieldMap map[string]interface{} //用于仅查询字段 Dbtype string //用于区分连得是哪个库,使用不同的用户密码 } type RuleDFA struct { Match []DFA //包含的敏感词 MatchNum []int //包含敏感词匹配个数 MisMatch DFA //不包含的敏感词 MisMatchNum int //不包含敏感词匹配个数 } type DFA struct { Link map[string]interface{} } // 初始化任务 func InitTaskData(_id string) { defer tools.Catch() taskData, _ := tools.MgoClass.FindById(tools.COLL_TASK, _id, nil) task := &TTask{} task.MgoTask = &u.MongodbSim{} s_querycon := "1" s_table := "" s_timefieldname := "" s_starttime := int64(0) s_asfield := "" if b_updaterule, ok := (*taskData)["b_updaterule"].(bool); ok { task.B_UpdateRule = b_updaterule //更新任务下的规则 } if (*taskData)["s_querycon"] != nil { // s_querycon = util.ObjToString((*taskData)["s_querycon"]) } if (*taskData)["s_starttime"] != nil { s_starttime = (*taskData)["s_starttime"].(int64) } if (*taskData)["s_table"] != nil { s_table = util.ObjToString((*taskData)["s_table"]) } if (*taskData)["s_timefieldname"] != nil { s_timefieldname = util.ObjToString((*taskData)["s_timefieldname"]) } if (*taskData)["s_asfield"] != nil { s_asfield = util.ObjToString((*taskData)["s_asfield"]) } //task.Class task.ID = u.BsonIdToSId((*taskData)["_id"]) task.S_name = util.ObjToString((*taskData)["s_name"]) task.S_class = util.ObjToString((*taskData)["s_class"]) task.S_mgourl = util.ObjToString((*taskData)["s_mgourl"]) task.S_mgodb = util.ObjToString((*taskData)["s_mgodb"]) task.S_coll = util.ObjToString((*taskData)["s_coll"]) task.I_poolsize = util.IntAll((*taskData)["i_poolsize"]) task.Task_PreRule = util.ObjToString((*taskData)["s_task_prerule"]) task.I_multiclass = util.IntAllDef((*taskData)["i_multiclass"], 0) task.I_wordcount = util.IntAllDef((*taskData)["i_wordcount"], 0) task.I_savetype = util.IntAllDef((*taskData)["i_savetype"], 1) task.I_thread = util.IntAllDef((*taskData)["i_thread"], 1) task.S_query = util.ObjToString((*taskData)["s_query"]) task.S_startid = util.ObjToString((*taskData)["s_startid"]) task.S_attr = util.ObjToString((*taskData)["s_attr"]) task.I_rate = util.IntAllDef((*taskData)["i_rate"], 10) task.I_tasktype = util.IntAllDef((*taskData)["i_tasktype"], 0) task.I_status = util.IntAll((*taskData)["i_status"]) task.S_idcoll = util.ObjToString((*taskData)["s_idcoll"]) task.S_querycon = s_querycon task.S_starttime = s_starttime task.S_table = s_table task.S_timefieldname = s_timefieldname task.S_asfield = s_asfield task.I_fieldUpdate = util.IntAllDef((*taskData)["i_fieldUpdate"], 0) task.S_classField = util.ObjToString((*taskData)["s_classfield"]) task.Dbtype = util.ObjToString((*taskData)["s_dbtype"]) flagAttrVal := 1 attrs := strings.Split(task.S_attr, "__") if len(attrs) == 2 { flagAttrVal = util.IntAll(attrs[1]) task.S_attr = attrs[0] } task.AttrVal = flagAttrVal //联表查询初始化mgo,线上只有行业分类用到;跑历史招标、行业分类的时候也用到两边查询 for _, v := range tools.Config { if m, ok := v.(map[string]interface{}); ok { if m["taskid"] == task.ID { if m["mgoaddr"] != nil && m["db"] != nil && m["coll"] != nil { dbtype := util.ObjToString(m["dbtype"]) addr, _ := m["mgoaddr"].(string) db, _ := m["db"].(string) coll, _ := m["coll"].(string) task.MulMgo = &u.MongodbSim{} task.MulMgo.MongodbAddr = addr task.MulMgo.Size = 3 task.MulMgo.DbName = db if dbtype != "" { task.MulMgo.UserName = tools.DbInfo[dbtype][0] task.MulMgo.Password = tools.DbInfo[dbtype][1] } task.MulMgo.InitPool() task.MulColl = coll } } } } //初始化查询字段信息 if task.Task_QueryFieldMap == nil { task.Task_QueryFieldMap = make(map[string]interface{}) } //Task_QueryFieldMap加入关联字段 for _, v := range strings.Split(s_asfield, "==") { if v != "" { task.Task_QueryFieldMap[v] = 1 } } //Task_QueryFieldMap加入分类字段 //Task_QueryFieldArr加入分类字段 for _, f := range strings.Split(task.S_classField, ",") { task.Task_QueryFieldMap[f] = 1 task.Task_QueryFieldArr = append(task.Task_QueryFieldArr, f) } //初始化任务下所有的分类和规则 InitClassAndRuleData(_id, task) } // InitClassAndRuleData 初始化任务下所有的分类和规则 func InitClassAndRuleData(_id string, task *TTask) { defer tools.Catch() classIdStr := task.S_class if classIdStr != "" { classIdArr := strings.Split(classIdStr, ",") classArr := make([]*Class, 0) for _, classid := range classIdArr { classData, _ := tools.MgoClass.FindById(tools.COLL_CLASS, classid, nil) if classData != nil { //初始化Class class := &Class{ //Rule: CidRuleMap[classid], Cid: classid, Class_PreRule: util.ObjToString((*classData)["s_class_prerule"]), S_name: util.ObjToString((*classData)["s_name"]), S_fields: util.ObjToString((*classData)["s_fields"]), S_code: util.ObjToString((*classData)["s_code"]), S_default: util.ObjToString((*classData)["s_default"]), S_pid: util.ObjToString((*classData)["s_pid"]), S_savefield: util.ObjToString((*classData)["s_savefield"]), } //根据classid查找对应的rule ruleList, _ := tools.MgoClass.Find(tools.COLL_RULE, `{"s_classid":"`+classid+`"}`, `{"i_order":1}`, nil, false, -1, -1) if ruleList != nil && len(*ruleList) > 0 { for _, v := range *ruleList { _id := u.BsonIdToSId(v["_id"]) //rule s_rule, _ := v["s_rule"].(primitive.A) i_rule := DealRules(s_rule) //detailrule s_detailrule, _ := v["s_detailrule"].(primitive.A) i_detailrule := DealRules(s_detailrule) //notrule s_notrule, _ := v["s_notrule"].(primitive.A) i_notrule := DealRules(s_notrule) rule := &Rule{ Rid: _id, Reg: i_rule, DetailReg: i_detailrule, NotReg: i_notrule, Rule_PreRule: util.ObjToString(v["s_rule_prerule"]), S_name: util.ObjToString(v["s_name"]), S_code: util.ObjToString(v["s_code"]), S_pid: util.ObjToString(v["s_pid"]), } class.Rule = append(class.Rule, rule) } } classArr = append(classArr, class) } } task.Class = classArr TaskMap[_id] = task } } func DealRules(rules []interface{}) (i_rule []interface{}) { for _, r := range tools.ObjArrToStringArr(rules) { if strings.HasPrefix(r, "'") && strings.HasSuffix(r, "'") { //正则 rs := []rune(r) ru := string(rs[1 : len(rs)-1]) rureg, err := regexp.Compile(ru) if err != nil { log.Println("error---rule:", r) continue } i_rule = append(i_rule, []interface{}{rureg}...) } else { //规则,加入到敏感词匹配 matchnum := 0 mismatchnum := 0 isnum1 := false isnum2 := false numArr := make([]int, 0) ruleDFA := &RuleDFA{ Match: []DFA{}, MisMatch: DFA{}, } tmpArr := strings.Split(r, "^") matchTmp := tmpArr[0] ruleTextArr := REG.FindAllString(matchTmp, -1) for _, match := range ruleTextArr { matchnum, isnum1 = GetNum(match) numArr = append(numArr, matchnum) matchArr := GetRule(match, isnum1) tmpDFA := DFA{ Link: make(map[string]interface{}), } tmpDFA.AddWord(matchArr...) ruleDFA.Match = append(ruleDFA.Match, tmpDFA) } if len(tmpArr) == 2 { mismatch := tmpArr[1] mismatchnum, isnum2 = GetNum(mismatch) mismatchArr := GetRule(mismatch, isnum2) ruleDFA.MisMatch.AddWord(mismatchArr...) } ruleDFA.MatchNum = numArr ruleDFA.MisMatchNum = mismatchnum i_rule = append(i_rule, []interface{}{ruleDFA}...) } } return } // 更新任务状态 func (tt *TTask) Sstatus() int { if tt.I_status == 0 && tt.B_Running { return 1 } else if tt.I_status == 0 && !tt.B_Running { return 0 } else if tt.I_status == 1 && tt.B_Running { return 2 } else if tt.I_status == 1 && !tt.B_Running { return 3 } return -1 } // 停止任务 func (tt *TTask) SStop() bool { tt.Lock.Lock() defer tt.Lock.Unlock() if tt.I_status == 1 { tt.I_status = 0 tt.FlagQuit <- true log.Println("开始停止...") } return true } var NN = 400 // 存放测试的数据 var TEST = &TestList{ Count: map[string][]int{}, } type TestList struct { Lock sync.Mutex Count map[string][]int } func (tl *TestList) Get(id string) []int { tl.Lock.Lock() defer tl.Lock.Unlock() return tl.Count[id] } func (tl *TestList) Put(id string, val ...int) { defer tools.Catch() tl.Lock.Lock() defer tl.Lock.Unlock() if len(val) == 3 { tl.Count[id] = val } else { tval := tl.Count[id] tl.Count[id][2] = val[0] if tval[1] <= val[0] { tl.Count[id][0] = 1 } } } func (tl *TestList) Del(id string) { tl.Lock.Lock() defer tl.Lock.Unlock() delete(tl.Count, id) os.Remove("csv/" + id) } // 任务测试 func (tt *TTask) RRunTest(s_startid, s_endid, s_query, filename string) { defer tools.Catch() defer func() { go func() { time.AfterFunc(10*time.Minute, func() { TEST.Del(filename) }) }() }() //开始识别 sess := tt.MgoTask.GetMgoConn() defer tt.MgoTask.DestoryMongoConn(sess) q := map[string]interface{}{} if s_query != "" { json.Unmarshal([]byte(strings.Replace(s_query, "'", "\"", -1)), &q) } if s_startid != "" { q["_id"] = map[string]interface{}{ "$gte": u.StringTOBsonId(s_startid), } } if s_endid != "" { if q["_id"] != nil { q["_id"].(map[string]interface{})["$lte"] = u.StringTOBsonId(s_endid) } else { q["_id"] = map[string]interface{}{ "$lte": u.StringTOBsonId(s_endid), } } } count := tt.MgoTask.Count(tt.S_coll, &q) if count == 0 { return } TEST.Put(filename, 0, count, 0) query := sess.DB(tt.S_mgodb).C(tt.S_coll).Find(&q).Iter() arr := [][]string{} i := 0 for tmp := make(map[string]interface{}); query.Next(&tmp); i = i + 1 { if i > 2000 { //数据跑至2000停止 break } //按顺序识别 tid := tmp["_id"] res := make([]string, 3) res[0] = u.BsonIdToSId(tid) res[1] = util.ObjToString(tmp["title"]) if util.IntAll(tmp["infoformat"]) == 2 { //此处增加特例 res = append(res, "拟建", "拟建") } else if util.IntAll(tmp["infoformat"]) == 3 { res = append(res, "产权", "产权") } else { SMap := NewClassificationRun(tt, tmp) for _, k := range SMap.Keys { res = append(res, util.ObjToString((SMap.Map[k].([]string))[0])) } } arr = append(arr, res) TEST.Put(filename, i+1) tmp = make(map[string]interface{}) } if len(arr) > 0 { if !Exist("csv") { os.Mkdir("csv", 777) } f, _ := os.Create("csv/" + filename) w := csv.NewWriter(f) for _, str := range arr { w.Write(str) } w.Flush() f.Close() } log.Println("运行RUNTEST-OVER") } func Exist(filename string) bool { _, err := os.Stat(filename) return err == nil || os.IsExist(err) } func (tt *TTask) RRun() { first := make(chan bool, 1) if tt.I_status == 1 { first <- true } OVER: for tt.I_status == 1 { tt.B_Running = false select { case <-tt.FlagQuit: //结果任务控制 log.Println("退出,RUN", tt.S_name) tt = nil break OVER case <-first: //第一次执行控制 if tools.ControlTaskRun { //任务流程控制,现有模式用不到,默认false tools.AllTaskFinish = false } log.Println("第一次执行任务:", tt.S_name) newtaskrun(tt) case <-time.Tick(time.Duration(tt.I_rate) * time.Second): //任务定时控制 //执行定时任务前,检查任务是否更新了rule if tt.B_UpdateRule { InitClassAndRuleData(tt.ID, tt) //重新加载任务的rule tt.B_UpdateRule = false UpdateTaskInfo(false, tt.ID) } //上一轮任务执行完毕再走下一轮(上一轮任务执行完毕的标志是业主分类执行完) //util.Debug("ControlTaskRun---", tools.ControlTaskRun, "AllTaskFinish---", tools.AllTaskFinish) if !tools.ControlTaskRun || tt.S_querycon == "0" { //线下环境不控制定时任务;或者线上通过时间定时执行 newtaskrun(tt) } else if tools.ControlTaskRun && tools.AllTaskFinish { //线上控制 tools.AllTaskFinish = false newtaskrun(tt) } else { log.Println("上轮任务暂未完成") } } } } func newtaskrun(tt *TTask) { NewTaskRunAll(tt, false, nil) } // NewTaskRunAll 常规任务和udp非合并数据处理方法 func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int { total := 0 tools.Try(func() { //不加这一层defer运行不了!!! timespan := false //时间间隔(控制数据条数打印) tt.B_Running = true defer func() { //业主分类执行完修改AllTaskFinish状态;控制流程的任务id(整个分类流程业主分类结尾,以此为标记) if tt.ID == tools.ControlLastTaskId { tools.AllTaskFinish = true } tt.B_Running = false }() //开始识别 pool := make(chan bool, tt.I_thread) wg := &sync.WaitGroup{} lock := &sync.Mutex{} q := make(map[string]interface{}) nextNodeSid, nextNodeEid := "", "" oid := tt.LastId //上次定时任务的结束id s_table := tt.S_table //结果表 s_asfield := tt.S_asfield //关联字段 asfields := strings.Split(s_asfield, "==") qfield := "" //查询表字段 rfield := "" //结果表字段 qtp := "" //查询字段的类型 rtp := "" //结果字段的类型 rtype := "" //结果字段的真实类型 if s_table != "" { //有结果表保存到结果表(更新,插入) tt.S_collection = s_table } if len(asfields) == 2 { qfield = asfields[0] //查询表字段 rfield = asfields[1] //结果表字段 //id处理 object string互转 qfield, rfield, qtp, rtp = IdTypeConversion(qfield, rfield) queryExis := map[string]interface{}{ rfield: map[string]interface{}{ "$exists": true, }, } onedata, _ := tt.MgoTask.FindOne(tt.S_collection, queryExis) rtype = reflect.TypeOf((*onedata)[rfield]).String() //结果表字段真实类型 } //有结果表有关联字段,在结果表上根据关联字段更新;有结果表没有关联字段,在结果表根据_id段更新 //没有结果表在查询表上更新 //log.Println("lastid:", tt.LastId, "查询方式:", tt.S_querycon, "时间:", tt.S_starttime, "条件:", tt.S_query, "table:", tt.S_table, "s_timefieldname:", tt.S_timefieldname) sort := "" if !budp { //非udp查询条件 sort = "_id" if tt.S_query != "" { //有查询条件 json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q) } idcoll := tt.S_idcoll if idcoll != "" { //idcoll中查询id区间,bidding_processing_ids nextNodeSid, nextNodeEid = FindId(idcoll) //查询id段 if nextNodeSid != "" && nextNodeEid != "" && nextNodeSid <= nextNodeEid { q["_id"] = bson.M{ "$gt": u.StringTOBsonId(nextNodeSid), "$lte": u.StringTOBsonId(nextNodeEid), } } else { log.Println("定时任务", tt.S_name, "查询", tt.S_idcoll, "时间段出错", nextNodeSid, nextNodeEid) tools.AllTaskFinish = true //为查询到数据视为此轮任务完成 return } stime, _ := strconv.ParseInt(nextNodeSid[:8], 16, 64) //取id前8位转成时间戳 etime, _ := strconv.ParseInt(nextNodeEid[:8], 16, 64) // if etime-stime < 1800 { //时间跨度小于半小时 timespan = true } } else { if q["_id"] != nil { if _id, ok := q["_id"].(string); ok { q["_id"] = u.StringTOBsonId(_id) } else if _ids, ok := q["_id"].(map[string]interface{}); ok { for k, v := range _ids { if id, bk := v.(string); bk { _ids[k] = u.StringTOBsonId(id) } } } } if tt.S_querycon == "1" { //id查询 if tt.S_query != "" { //页面上配置了查询条件,直接使用,不再单独查询上次任务结束ID if tt.LastId != "" && q["_id"] == nil { q["_id"] = map[string]interface{}{ "$gt": u.StringTOBsonId(tt.LastId), } } } else { //临时修改查询id区间段 comeintime := time.Now().Unix() - 5*60 query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": comeintime, }, } qId := tt.MgoTask.GetMgoConn() defer tt.MgoTask.DestoryMongoConn(qId) tmpData := qId.DB(tt.S_mgodb).C(tt.S_coll).Find(&query).Limit(1).Sort("-_id").Iter() eId := "" for tmp := make(map[string]interface{}); tmpData.Next(tmp); { eId = u.BsonIdToSId(tmp["_id"]) } if tt.LastId != "" && q["_id"] == nil { sid := tt.LastId q["_id"] = map[string]interface{}{ "$gt": u.StringTOBsonId(sid), } if eId != "" { q["_id"] = map[string]interface{}{ "$gt": u.StringTOBsonId(sid), "$lte": u.StringTOBsonId(eId), } } } //按id查询,为了保证有新数据入库,每次休息2分钟 time.Sleep(time.Second * 60) //测试环境q的赋值执行下述代码 //if tt.LastId != "" && q["_id"] == nil { // q["_id"] = map[string]interface{}{ // "$gt": u.StringTOBsonId(tt.LastId), // } //} } } else { //时间查询 name := tt.S_timefieldname q[name] = map[string]interface{}{ "$gt": tt.S_starttime, } } } } else { //udp查询条件 sort = "-_id" if tt.S_query != "" { //有查询条件 json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q) } tmpq := mapInfo["q"].(map[string]interface{}) for k, v := range tmpq { q[k] = v } sid := mapInfo["gtid"].(string) eid := mapInfo["lteid"].(string) stime, _ := strconv.ParseInt(sid[:8], 16, 64) etime, _ := strconv.ParseInt(eid[:8], 16, 64) if etime-stime < 1800 { //时间跨度小于半小时 timespan = true } } //task tasksess := tt.MgoTask.GetMgoConn() defer tt.MgoTask.DestoryMongoConn(tasksess) //通过ID 查询数据时,才打印日志 if tt.S_querycon == "1" { log.Println("运行", tt.S_name, "start") log.Println("线程数:", tt.I_thread, "查询语句", q) log.Println("查询---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table) log.Println("select:", tt.Task_QueryFieldMap, tt.Task_QueryFieldArr) } extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(tt.Task_QueryFieldMap).Sort(sort).Iter() arr := [][]map[string]interface{}{} if tt.I_wordcount == 1 { tt.WordCount = map[string]map[string]int{} } sum := 0 for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ { tid := tmp["_id"] if !timespan && sum%2000 == 0 { log.Println("current:", sum, tt.S_name) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() //按顺序识别 tid := tmp["_id"] update := []map[string]interface{}{} //如果有关联字段 根据关联字段更新或者保存 if len(asfields) == 2 { //log.Println("qfield====", qfield) field := tmp[qfield] if field != nil { //log.Println("field===", field, qtp, rtp) if qtp == "bson.ObjectId" && rtp == "bson.ObjectId" { //俩字段类型一致 } else { //log.Println(field, " 查询字段", qfield, "查询类型", qtp, "结果字段", rfield, " 结果类型", rtp, "结果真实类型", rtype) if rtype == rtp { //转换后与真实类型不同,填写时类型错误 //将查询字段类型转换为对应的结果字段类型 if qtp == "bson.ObjectId" && rtp == "string" { field = u.BsonIdToSId(tmp[qfield]) } else if qtp == "string" && rtp == "bson.ObjectId" { field = u.StringTOBsonId(tmp[qfield].(string)) } } } update = append(update, map[string]interface{}{ //根据关联字段更新 rfield: field, }) } } else { update = append(update, map[string]interface{}{ //更新的条件 根据id更新 "_id": tid, }) } res := map[string]interface{}{} ksmap := make(map[string]map[string][]string) if util.IntAll(tmp["infoformat"]) == 2 { //此处增加特例 res["toptype"] = "拟建" res["subtype"] = "拟建" } else if util.IntAll(tmp["infoformat"]) == 3 { res["toptype"] = "产权" res["subtype"] = "产权" } else { SMap := &tools.SortMap{} if tt.I_tasktype == 2 { //标签任务 SMap = TagClassificationRun(tt, tmp) } else if tt.I_tasktype == 1 { //附件任务 SMap = FileClassificationRun(tt, tmp) //res["projectinfo"] = tmp["projectinfo"] } else { //常规任务 SMap = NewClassificationRun(tt, tmp) //1.针对招标分类的特殊处理 if tt.ID == "57982b4436b82b073c000001" && tt.S_name == "招标分类" { //1.一级分类时,符合结果中成交规则时 //todo 如果没有打上分类,调用ai 模型分类 if _, ok := SMap.Map["toptype"]; !ok { if util.ObjToString(tools.Config["aiurl"]) != "" { now := time.Now() data := map[string]interface{}{ "title": tmp["title"], "detail": tmp["detail"], } reqData := map[string]interface{}{ "texts": []interface{}{data}, } rai := SendAi(reqData, util.ObjToString(tools.Config["aiurl"])) log.Println("ai 分类时长", time.Since(now).Seconds()) if len(rai) > 0 { resa := rai["result"] if dataa, ok := resa.([]interface{}); ok { da := dataa[0] if len(util.ObjToString(da)) > 0 { cs := strings.Split(util.ObjToString(da), "-") SMap.Map["toptype"] = cs[0] SMap.Map["subtype"] = cs[1] } } } } } if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" { if _, ok := tmp["detail"]; ok { if u.ChargeDetailResult(util.ObjToString(tmp["detail"])) { SMap.Map["toptype"] = "结果" resa := ReSub(tt, tmp, "结果") subtype := resa.Map["subtype"] delete(SMap.Map, "subtype") SMap.Map["subtype"] = subtype } } } //2.一级分类是预告,但是标题含有招标计划,同时含有 预公告|预公示,变为 采购意向 if SMap.Map["toptype"] == "预告" { if u.DealYuce(tmp["title"].(string)) { SMap.Map["toptype"] = "采购意向" SMap.Map["subtype"] = "采购意向" } } //3.针对 项目登记 相关数据处理,符合条件的归类为‘采购意向’ if u.IsPurchasingIntent(tmp) { SMap.Map["toptype"] = "采购意向" SMap.Map["subtype"] = "采购意向" } } } //2.针对用户行业分类,单独处理数据 if mapInfo["stype"] == "yonghuhangye" || strings.TrimSpace(tt.S_name) == "用户行业分类" { subs := SMap.Map["subscope_dy"] delete(SMap.Map, "topscope_dy") var tops []string if subscopes, ok := subs.([]string); ok { for _, sub := range subscopes { top := strings.Split(sub, "_")[0] tops = append(tops, top) } SMap.Map["topscope_dy"] = u.RemoveDuplicateString(tops) } } //追加时处理,//更新字段 I_fieldUpdate 0:覆盖 1:追加 if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //封装追加信息 if len(SMap.Keys) > 0 { for _, k := range SMap.Keys { ksarr := make([]string, 0) if k != "toptype" && k != "subtype" { ks, ok := SMap.Map[k].(string) if ok { ksarr = append(ksarr, ks) } else { ksarr = append(ksarr, SMap.Map[k].([]string)...) } } ksmap[k] = map[string][]string{ "$each": ksarr, } } } } else { //非多分类 res = SMap.Map if tt.I_tasktype == 1 { //附件任务 if tmp["projectinfo"] != nil { res["projectinfo"] = tmp["projectinfo"] } } } } //if len(res) > 0 || len(ksmap) > 0 { IS.NewAdd(tt, res) if tt.S_attr != "" { //res[tt.S_attr] = 1 res[tt.S_attr] = tt.AttrVal } // 添加分类时间 res["classification_time"] = time.Now().Unix() if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //I_fieldUpdate 0:覆盖 1:追加 if len(ksmap) > 0 && len(res) > 0 { update = append(update, map[string]interface{}{ "$set": res, "$addToSet": ksmap, }) } else if len(ksmap) > 0 && len(res) == 0 { update = append(update, map[string]interface{}{ "$addToSet": ksmap, }) } else if len(ksmap) == 0 && len(res) > 0 { update = append(update, map[string]interface{}{ "$set": res, }) } } else { //非多分类或者多分类的覆盖 //log.Println("更新==", res) if len(res) > 0 { update = append(update, map[string]interface{}{ "$set": res, }) } } //更新 lock.Lock() if len(update) == 2 { //有更新条件和更新内容时才进行更新操作 arr = append(arr, update) } if len(arr) >= NN { //if s_table == "" { tt.MgoTask.UpdateBulk(tt.S_collection, arr...) //} else { // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...) //} arr = [][]map[string]interface{}{} } lock.Unlock() //} }(tmp) ttid := u.BsonIdToSId(tid) if ttid > tt.LastId && !budp { tt.LastId = ttid } tmp = make(map[string]interface{}) } total = sum //通过ID 查询分类数据才打印日志 if tt.S_querycon == "1" { log.Println("总数:————", sum) if timespan { log.Println("current:————", sum) } } wg.Wait() lock.Lock() if len(arr) > 0 { //if s_table == "" { //没有结果表 tt.MgoTask.UpdateBulk(tt.S_collection, arr...) //在原表上更新 //} else { //有结果表 // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...) //} arr = [][]map[string]interface{}{} } lock.Unlock() tt.WcLock.Lock() if len(tt.WordCount) > 0 { savem := []map[string]interface{}{} tn := time.Now().Unix() for wk, wm := range tt.WordCount { for ck, cm := range wm { m1 := map[string]interface{}{ "s_class": wk, "s_word": ck, "i_count": cm, "bz": tn, } savem = append(savem, m1) if len(savem) >= 1000 { tools.MgoClass.SaveBulk("wordcount", savem...) savem = []map[string]interface{}{} } } } if len(savem) > 0 { tools.MgoClass.SaveBulk("wordcount", savem...) savem = nil } } tt.WcLock.Unlock() //更新最后id if !budp && oid != tt.LastId { setid := map[string]interface{}{ "$set": map[string]interface{}{ "s_startid": tt.LastId, }, } go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, setid, false, false) } //更新最后时间 if !budp { nowtime := time.Now().Unix() settime := map[string]interface{}{ "$set": map[string]interface{}{ "s_starttime": nowtime, }, } go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, settime, false, false) } //InitRule() if tt.S_querycon == "1" { log.Println("运行", tt.S_name, "over", oid, " endid:", tt.LastId) } //定时任务完成发送udp信号调抽取 if tools.Extract["preNodeId"] == tt.ID { //常规招标定时任务udp调用抽取 if tt.S_idcoll == "" { nextNodeSid = oid nextNodeEid = tt.LastId } UdpRunExtract(nextNodeSid, nextNodeEid) } }) return total } // udp合并数据处理的方法 func UdpTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}, stype string) int { total := 0 tools.Try(func() { //不加这一层defer运行不了!!! timespan := false tt.B_Running = true defer func() { tt.B_Running = false }() //开始识别 pool := make(chan bool, tt.I_thread) wg := &sync.WaitGroup{} lock := &sync.Mutex{} q := map[string]interface{}{} s_table := tt.S_table //结果表 s_asfield := tt.S_asfield //关联字段 asfields := strings.Split(s_asfield, "==") qfield := "" //查询表字段 rfield := "" //结果表字段 qtp := "" //查询字段的类型 rtp := "" //结果字段的类型 rtype := "" //结果字段的真实类型 if s_table != "" { //有结果表保存到结果表(更新,插入) tt.S_collection = s_table } if len(asfields) == 2 { qfield = asfields[0] //查询表字段 rfield = asfields[1] //结果表字段 //id处理 object string互转 qfield, rfield, qtp, rtp = IdTypeConversion(qfield, rfield) queryExis := map[string]interface{}{ rfield: map[string]interface{}{ "$exists": true, }, } onedata, _ := tt.MgoTask.FindOne(tt.S_collection, queryExis) rtype = reflect.TypeOf((*onedata)[rfield]).String() //结果表字段真实类型 } //有结果表有关联字段,在结果表上根据关联字段更新;有结果表没有关联字段,在结果表根据_id段更新 //没有结果表在查询表上更新 //log.Println("lastid:", tt.LastId, "查询方式:", tt.S_querycon, "时间:", tt.S_starttime, "条件:", tt.S_query, "table:", tt.S_table, "s_timefieldname:", tt.S_timefieldname) q = mapInfo["q"].(map[string]interface{}) //计算起始id和结束id时间宽度,大于半小时的2000数据打印一次个数,小于半小时的每条数据都打印一次个数 sid := mapInfo["gtid"].(string) eid := mapInfo["lteid"].(string) stime, _ := strconv.ParseInt(sid[:8], 16, 64) etime, _ := strconv.ParseInt(eid[:8], 16, 64) if etime-stime < 1800 { //时间跨度小于半小时 timespan = true } udpsess := tt.MulMgo.GetMgoConn() if udpsess == nil { log.Println("连接为空", tt.S_name, mapInfo) return } defer tt.MulMgo.DestoryMongoConn(udpsess) udptmp := udpsess.DB(tt.MulMgo.DbName).C(tt.MulColl).Find(&q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter() //task tasksess := tt.MgoTask.GetMgoConn() defer tt.MgoTask.DestoryMongoConn(tasksess) log.Println("线程数:", tt.I_thread, "查询语句", q) log.Println("task---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table) log.Println("select:", tt.Task_QueryFieldMap) extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(&q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter() arr := [][]map[string]interface{}{} oid := tt.LastId if tt.I_wordcount == 1 { tt.WordCount = map[string]map[string]int{} } sum := 0 //对比两张表数据,减少查询次数 var compare bson.M for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ { result := tmp //对比 for { if compare == nil { compare = make(bson.M) if !udptmp.Next(&compare) { //传参表数据赋值给compare break } } if compare != nil { //对比 cid := u.BsonIdToSId(compare["_id"]) //传参表id tid := u.BsonIdToSId(tmp["_id"]) //抽取表id if cid == tid { //更新抽取表的数据,再进行分类 for _, k := range tt.Task_QueryFieldArr { v1 := compare[k] v2 := tmp[k] if v2 == nil && v1 != nil { result[k] = v1 } } break } else { if tid > cid { //抽取表id大于传参表id compare = nil continue } else { break } } } else { break } } tid := tmp["_id"] if !timespan && sum%2000 == 0 { log.Println("current:", sum, tid) } pool <- true wg.Add(1) go func(result map[string]interface{}) { defer func() { <-pool wg.Done() }() //按顺序识别 if result != nil { tid := result["_id"] update := []map[string]interface{}{} //如果有关联字段 根据关联字段更新或者保存 if len(asfields) == 2 { //log.Println("qfield====", qfield) field := result[qfield] if field != nil { //log.Println("field===", field, qtp, rtp) if qtp == "bson.ObjectId" && rtp == "bson.ObjectId" { //俩字段类型一致 } else { //log.Println(field, " 查询字段", qfield, "查询类型", qtp, "结果字段", rfield, " 结果类型", rtp, "结果真实类型", rtype) if rtype == rtp { //转换后与真实类型不同,填写时类型错误 //将查询字段类型转换为对应的结果字段类型 if qtp == "bson.ObjectId" && rtp == "string" { field = u.BsonIdToSId(result[qfield]) } else if qtp == "string" && rtp == "bson.ObjectId" { field = u.StringTOBsonId(result[qfield].(string)) } } } update = append(update, map[string]interface{}{ //根据关联字段更新 rfield: field, }) } } else { update = append(update, map[string]interface{}{ //更新的条件 根据id更新 "_id": tid, }) } res := map[string]interface{}{} ksmap := make(map[string]map[string][]string) if util.IntAll(result["infoformat"]) == 2 { //此处增加特例 res["toptype"] = "拟建" res["subtype"] = "拟建" } else if util.IntAll(result["infoformat"]) == 3 { res["toptype"] = "产权" res["subtype"] = "产权" } else { SMap := &tools.SortMap{} if tt.I_tasktype == 2 { //标签任务 SMap = TagClassificationRun(tt, result) } else if tt.I_tasktype == 1 { //附件任务 SMap = FileClassificationRun(tt, result) //res["projectinfo"] = tmp["projectinfo"] } else { //常规任务 SMap = NewClassificationRun(tt, result) //一级分类时,符合结果中成交规则时 if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" { if _, ok := tmp["detail"]; ok { if u.ChargeDetailResult(util.ObjToString(tmp["detail"])) { SMap.Map["toptype"] = "结果" resa := ReSub(tt, tmp, "结果") subtype := resa.Map["subtype"] delete(SMap.Map, "subtype") SMap.Map["subtype"] = subtype } } } } if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //更新字段 I_fieldUpdate 0:覆盖 1:追加 //封装追加信息 if len(SMap.Keys) > 0 { for _, k := range SMap.Keys { ksarr := make([]string, 0) if k != "toptype" && k != "subtype" { ks, ok := SMap.Map[k].(string) if ok { ksarr = append(ksarr, ks) } else { ksarr = append(ksarr, SMap.Map[k].([]string)...) } } ksmap[k] = map[string][]string{ "$each": ksarr, } } } } else { res = SMap.Map } } IS.NewAdd(tt, res) if tt.S_attr != "" { res[tt.S_attr] = tt.AttrVal } if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //I_fieldUpdate 0:覆盖 1:追加 if len(ksmap) > 0 && len(res) > 0 { update = append(update, map[string]interface{}{ "$set": res, "$addToSet": ksmap, }) } else if len(ksmap) > 0 && len(res) == 0 { update = append(update, map[string]interface{}{ "$addToSet": ksmap, }) } else if len(ksmap) == 0 && len(res) > 0 { update = append(update, map[string]interface{}{ "$set": res, }) } } else { if len(res) > 0 { update = append(update, map[string]interface{}{ "$set": res, }) } } //更新 lock.Lock() if len(update) == 2 { //有更新条件和更新内容时才进行更新操作 arr = append(arr, update) } if len(arr) >= NN { tt.MgoTask.UpdateBulk(tt.S_collection, arr...) arr = [][]map[string]interface{}{} } lock.Unlock() } }(result) ttid := u.BsonIdToSId(tid) if ttid > tt.LastId { tt.LastId = ttid } tmp = make(map[string]interface{}) } total = sum if timespan { log.Println("current:", sum) } wg.Wait() lock.Lock() if len(arr) > 0 { //if s_table == "" { //没有结果表 tt.MgoTask.UpdateBulk(tt.S_collection, arr...) //在原表上更新 //} else { //有结果表 // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...) //} arr = [][]map[string]interface{}{} } lock.Unlock() tt.WcLock.Lock() if len(tt.WordCount) > 0 { savem := []map[string]interface{}{} tn := time.Now().Unix() for wk, wm := range tt.WordCount { for ck, cm := range wm { m1 := map[string]interface{}{ "s_class": wk, "s_word": ck, "i_count": cm, "bz": tn, } savem = append(savem, m1) if len(savem) >= 1000 { tools.MgoClass.SaveBulk("wordcount", savem...) savem = []map[string]interface{}{} } } } if len(savem) > 0 { tools.MgoClass.SaveBulk("wordcount", savem...) savem = nil } } tt.WcLock.Unlock() //更新最后id if !budp && oid != tt.LastId { setid := map[string]interface{}{ "$set": map[string]interface{}{ "s_startid": tt.LastId, }, } go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, setid, false, false) } //更新最后时间 if !budp { nowtime := time.Now().Unix() settime := map[string]interface{}{ "$set": map[string]interface{}{ "s_starttime": nowtime, }, } go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, settime, false, false) } //InitRule() log.Println("运行", tt.S_name, "over") }) return total } func UdpRunExtract(sid, eid string) { //5cb6c508a5cb26b9b70d6536 by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": tools.ExtractStype, }) log.Println("定时任务调下一节点分类:", tools.ExtractPort, string(by)) addr := &net.UDPAddr{ IP: net.ParseIP(tools.ExtractAddr), Port: tools.ExtractPort, } //node := &tools.UdpNode{by, addr, time.Now().Unix(), 0} //udptaskmap.Store(key, node) tools.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } func FindId_back(coll string) (gtid, lteid string) { sum := 0 //记录数据总量 data, _ := tools.MgoClass.Find(coll, `{"isused":false}`, `{"_id":1}`, nil, false, -1, -1) length := len(*data) set := bson.M{ "$set": bson.M{ "isused": true, "publishtime": time.Now().Unix(), }, } for i, d := range *data { id := d["_id"] count := util.IntAll(d["count"]) sum += count if gtid == "" { gtid = d["gtid"].(string) } if sum >= 5000 || i == length-1 { //总数大于5000或数据取完,取此id段数据 lteid := d["lteid"].(string) go tools.MgoClass.Update(coll, bson.M{"_id": id}, set, false, false) return gtid, lteid } else { //总数小于5000,删除已使用数据 go tools.MgoClass.Update(coll, bson.M{"_id": id}, set, false, false) } } return gtid, lteid } func FindId(coll string) (gtid, lteid string) { data, _ := tools.MgoClass.Find(coll, map[string]interface{}{"dataprocess": 0}, `{"_id":1}`, nil, false, -1, -1) for _, d := range *data { gtid = d["gtid"].(string) lteid = d["lteid"].(string) set := map[string]interface{}{ "$set": map[string]interface{}{ "dataprocess": 2, "updatetime": time.Now().Unix(), }, } tools.MgoClass.Update(coll, map[string]interface{}{"_id": d["_id"]}, set, false, false) break } return gtid, lteid } // NewLoadTestTask 测试任务 func NewLoadTestTask(_id, s_mgourl, s_mgodb, s_coll, i_poolsize, s_startid, s_endid, s_query string) (bs bool, filename string) { defer tools.Catch() r, t, _ := NewAnalyTask(_id, s_mgourl, s_mgodb, s_coll, tools.IntAllDef(i_poolsize, 5)) //log.Println(m) if r && t != nil { filename = time.Now().Format("150405") + ".csv" go t.RRunTest(s_startid, s_endid, s_query, filename) bs = true } return } // NewLoadTask 加载任务 func NewLoadTask(_id string, res *tools.JSON) { defer tools.Catch() //初始化任务信息 InitTaskData(_id) //初始化任务mgo配置信息 bres, tt, msg := NewAnalyTask(_id, "", "", "", 5) tt.I_status = 1 log.Println(tt.S_mgodb, tt.S_name, tt.I_thread) res.Msg = msg if bres && tt != nil { res.Status = true NEWTASKPOOL[_id] = tt //存入当前启动任务 log.Println("加载", tt.S_name, "完成...", tt.S_query) go tt.RRun() } } // 处理id的类型转换 func IdTypeConversion(q, r string) (string, string, string, string) { qtp, rtp := "bson.ObjectId", "bson.ObjectId" if strings.Contains(q, "ObjectId") || strings.Contains(q, "objectId") { q = q[9 : len(q)-1] } else if strings.Contains(q, "StringId") || strings.Contains(q, "stringId") { q = q[9 : len(q)-1] qtp = "string" } if strings.Contains(r, "ObjectId") || strings.Contains(r, "objectId") { r = r[9 : len(r)-1] } else if strings.Contains(r, "StringId") || strings.Contains(r, "stringId") { r = r[9 : len(r)-1] rtp = "string" } return q, r, qtp, rtp } // 获取匹配或不匹配的个数 func GetNum(rule string) (int, bool) { num := 1 isnum := strings.HasSuffix(rule, ")") if !isnum { //是数字 s := []rune(rule) last := string(s[len(s)-1:]) num = tools.IntAll(last) } return num, isnum } // 获取规则 func GetRule(text string, isnum bool) (matchArr []string) { if isnum { //最后一个不是数字 if strings.HasPrefix(text, "(") && strings.HasSuffix(text, ")") { text = text[1 : len(text)-1] matchArr = strings.Split(text, "|") } } else if strings.HasPrefix(text, "(") && !isnum { text = text[1 : len(text)-2] matchArr = strings.Split(text, "|") } return matchArr } func (d *DFA) AddWord(keys ...string) { d.AddWordAll(true, keys...) } func (d *DFA) AddWordAll(haskey bool, keys ...string) { if d.Link == nil { d.Link = make(map[string]interface{}) } for _, key := range keys { nowMap := &d.Link for i := 0; i < len(key); i++ { kc := key[i : i+1] if v, ok := (*nowMap)[kc]; ok { nowMap, _ = v.(*map[string]interface{}) } else { newMap := map[string]interface{}{} newMap["YN"] = "0" (*nowMap)[kc] = &newMap nowMap = &newMap } if i == len(key)-1 { (*nowMap)["YN"] = "1" if haskey { (*nowMap)["K"] = key } } } } } func (d *DFA) CheckSensitiveWord(src string, n int) (bool, []string) { res := make([]string, 0) tmpMap := make(map[string]int) for j := 0; j < len(src); j++ { nowMap := &d.Link for i := j; i < len(src); i++ { word := src[i : i+1] nowMap, _ = (*nowMap)[word].(*map[string]interface{}) if nowMap != nil { // 存在,则判断是否为最后一个 if "1" == util.ObjToString((*nowMap)["YN"]) { s := util.ObjToString((*nowMap)["K"]) tmpMap[s] = 1 //nowMap = &d.Link //匹配到之后继续匹配后边的内容 } } else { //nowMap = &d.Link break } } } if len(tmpMap) >= n { for k, _ := range tmpMap { res = append(res, k) } return true, res } return false, []string{} } func (d *DFA) CheckSensitiveWordTest(src string, n int) (bool, []string) { res := make([]string, 0) tmpMap := make(map[string]int) for j := 0; j < len(src); j++ { nowMap := &d.Link for i := j; i < len(src); i++ { word := src[i : i+1] nowMap, _ = (*nowMap)[word].(*map[string]interface{}) if nowMap != nil { // 存在,则判断是否为最后一个 if "1" == util.ObjToString((*nowMap)["YN"]) { s := util.ObjToString((*nowMap)["K"]) tmpMap[s] = 1 //nowMap = &d.Link //匹配到之后继续匹配后边的内容 } } else { //nowMap = &d.Link break } } } for k, _ := range tmpMap { res = append(res, k) } return len(tmpMap) >= n, res } // UpdateTaskInfo 更新任务信息 func UpdateTaskInfo(flag bool, tid string) bool { query := bson.M{ "_id": u.StringTOBsonId(tid), } set := bson.M{ "$set": bson.M{ "b_updaterule": flag, }, } return tools.MgoClass.Update(tools.COLL_TASK, query, set, false, false) } // o_projectinfo中数据分类定时任务 func RunTask() { if tools.IsStart { //是否开启定时任务 tt := InitTimeTask() //初始化任务 //StartTask(tt) //return c := cron.New() cronstr := "0 */" + fmt.Sprint(tt.I_rate) + " * * * ?" //每TaskTime分钟执行一次 c.AddFunc(cronstr, func() { StartTask(tt) }) c.Start() } } // 初始化任务 func InitTimeTask() *TTask { defer util.Catch() timeTaskTT := &TTask{} InitTaskData(tools.TimeTaskid) bres, tt, _ := NewAnalyTask(tools.TimeTaskid, "", "", "", 5) if bres && tt != nil { timeTaskTT = tt logger.Debug("初始化定时任务成功") } else { logger.Debug("初始化定时任务失败") } return timeTaskTT } // StartTask 开始任务 func StartTask(t *TTask) { defer util.Catch() logger.Debug("开始执行定时任务") query := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": u.StringTOBsonId(tools.IdCollSid), }, "dataprocess": 8, } order := map[string]interface{}{"_id": -1} logger.Debug("query:", query) list, _ := tools.MgoClass.Find(t.S_idcoll, query, order, nil, false, -1, -1) sid := t.S_startid eid := "" if list != nil && len(*list) > 0 { eid = util.ObjToString((*list)[0]["lteid"]) if eid <= sid { logger.Debug("id err. sid:", sid, " eid:", eid) return } t.S_startid = eid //更新任务中数据的起始id tools.IdCollSid = u.BsonIdToSId((*list)[0]["_id"]) //更新id表起始id //更新任务表中起始id setid := map[string]interface{}{ "$set": map[string]interface{}{ "s_startid": t.S_startid, }, } go tools.MgoClass.Update("rc_task", `{"_id":"`+t.ID+`"}`, setid, false, false) //查拟建数据 query := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": u.StringTOBsonId(sid), "$lte": u.StringTOBsonId(eid), }, "toptype": "拟建", } sess := t.MgoTask.GetMgoConn() defer t.MgoTask.DestoryMongoConn(sess) count, _ := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Count() logger.Debug("count:", count, " query:", query) if count == 0 { //此轮任务没有查到数据 return } arr := [][]map[string]interface{}{} wg := &sync.WaitGroup{} lock := &sync.Mutex{} pool := make(chan bool, t.I_thread) sum := 0 logger.Debug("select:", t.Task_QueryFieldMap) it := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Select(t.Task_QueryFieldMap).Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ { pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() update := []map[string]interface{}{} update = append(update, map[string]interface{}{"_id": tmp["_id"]}) SMap := &tools.SortMap{} SMap = NewClassificationRun(t, tmp) if len(SMap.Map) > 0 { //一级分类时,符合结果中成交规则时 if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" { if _, ok := tmp["detail"]; ok { if u.ChargeDetailResult(util.ObjToString(tmp["detail"])) { SMap.Map["toptype"] = "结果" resa := ReSub(t, tmp, "结果") subtype := resa.Map["subtype"] delete(SMap.Map, "subtype") SMap.Map["subtype"] = subtype } } } update = append(update, map[string]interface{}{"$set": SMap.Map}) } //更新 lock.Lock() if len(update) == 2 { //有更新条件和更新内容时才进行更新操作 arr = append(arr, update) } if len(arr) >= NN { tmps := arr t.MgoTask.UpdateBulk(t.S_coll, tmps...) arr = [][]map[string]interface{}{} } lock.Unlock() }(tmp) if sum%100 == 0 { log.Println("current:", sum) } tmp = make(map[string]interface{}) } wg.Wait() lock.Lock() if len(arr) > 0 { t.MgoTask.UpdateBulk(t.S_coll, arr...) arr = [][]map[string]interface{}{} } lock.Unlock() logger.Debug("定时任务执行完毕 count:", sum) UdpRunProjectForecast(sid, eid) } logger.Debug("Udp通知项目预测执行完毕") } // udp通知项目预测 func UdpRunProjectForecast(sid, eid string) { by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, }) logger.Debug("定时任务通知项目预测:", string(by)) addr := &net.UDPAddr{ IP: net.ParseIP(tools.NextNodeAddr), Port: tools.NextNodePort, } tools.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } // SendAi 调用大模型招标分类;map[result:[结果-中标] status:200] func SendAi(data map[string]interface{}, url string) (res map[string]interface{}) { // 设置 2 秒的超时 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() //url := "http://192.168.3.109:16688" jsonData, err := json.Marshal(data) if err != nil { fmt.Println("JSON marshal error:", err) return } req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { fmt.Println("Request error:", err) return } req.Header.Set("Content-Type", "application/json") // 将请求与上下文关联 req = req.WithContext(ctx) client := &http.Client{} resp, err := client.Do(req) if err != nil { // 使用 errors.Is 检查错误是否是超时错误 if errors.Is(err, context.DeadlineExceeded) { fmt.Println("Request timed out") return } fmt.Println("Request error:", err) return } defer resp.Body.Close() err = json.NewDecoder(resp.Body).Decode(&res) if err != nil { fmt.Println("Response decoding error:", err) return } return }