1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747 |
- package task
- import (
- "bytes"
- "context"
- "encoding/csv"
- "encoding/json"
- "errors"
- "fmt"
- "log"
- mu "mfw/util"
- "net"
- "net/http"
- "os"
- "qfw/util"
- "reflect"
- "regexp"
- "strconv"
- "strings"
- "sync"
- "time"
- "tools"
- u "util"
- "github.com/cron"
- "github.com/donnie4w/go-logger/logger"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/primitive"
- )
- func init() {
- REG, _ = regexp.Compile(`\(.*?\)\d*`)
- REG1, _ = regexp.Compile(`\(.*?\)`)
- //log.Println(REG.FindAllString("(平台|软件|电子商务|多媒体|通讯设备)1(建设|采购|开发)2", -1))
- //加载所有任务
- StartMonitor()
- if tools.NoAutoRun == 0 {
- rtask, b := tools.MgoClass.Find(tools.COLL_TASK, `{"i_status":1}`, nil, `{"_id":1}`, false, 0, 50)
- if b && rtask != nil && *rtask != nil {
- for _, t := range *rtask {
- _id := u.BsonIdToSId(t["_id"])
- res := tools.JSON{}
- //LoadTask(_id, &res)
- NewLoadTask(_id, &res)
- }
- }
- }
- }
- // var UdpSess *tools.MongodbSim
- var REG *regexp.Regexp
- var REG1 *regexp.Regexp
- var TaskLock = sync.Mutex{}
- var NEWTASKPOOL = map[string]*TTask{} //存储当前任务
- var TaskMap = map[string]*TTask{}
- var HangyeUdps = make(chan map[string]interface{}, 100) //初始化行业分类udp任务池
- type Rule struct {
- Reg []interface{} //规则
- DetailReg []interface{} //detail正则,目前只针对title、channel的二级招标分类中的中标规则
- NotReg []interface{} //排除规则
- Rid string
- Class *Class
- Rule_PreRule string
- S_name string
- S_code string
- S_pid string
- //Reg []*RuleDFA
- }
- type Class struct {
- Cid string
- TTask *TTask
- Rule []*Rule
- Class_PreRule string
- S_name string
- S_fields string
- S_code string
- S_default string
- S_pid string
- S_savefield string
- }
- type TTask struct {
- Class []*Class //任务中类集合
- ID string //任务id
- S_name string //任务名称
- I_rate int //任务执行频率
- S_class string //任务中类id
- S_mgourl string //任务mgo addr
- S_mgodb string //任务mgo db
- S_collection string //查询、存储表
- I_poolsize int //任务连接池个数
- S_startid string //任务起始id
- I_status int //控制任务状态属性
- S_attr string //标识属性key值
- AttrVal int //标识属性val值
- S_coll string //查询、存储表
- LastId string //上次定时任务的结束id
- Lock sync.Mutex
- S_query string //任务查询条件
- FlagQuit chan bool //任务结束控制
- B_Running bool //任务是否执行
- MgoTask *u.MongodbSim //任务查询表mgo
- I_multiclass int //是否支持多分类
- I_savetype int //1 名称 2 值
- I_thread int //线程数
- I_wordcount int //词频统计
- WordCount map[string]map[string]int //词频统计集合
- WcLock sync.Mutex
- Task_PreRule string //任务前置过滤
- S_table string //结果表
- S_querycon string //查询方式 1是id查询 0是时间查询
- S_starttime int64 //起始时间
- S_timefieldname string //各表所查时间字段名称
- S_asfield string //查询表与结果表关联字段
- I_fieldUpdate int //分类中的保存字段信息 0:覆盖 1:更新
- MulMgo *u.MongodbSim //联合查询的mgo配置信息
- MulColl string //联合查询的表名
- I_tasktype int //任务类型 0:常规任务 1:附件任务:2
- S_idcoll string //正式环境查询数据id段
- B_UpdateRule bool //是否更新任务下的规则
- S_classField string //分类字段
- Task_QueryFieldArr []string //用于合并数据
- Task_QueryFieldMap map[string]interface{} //用于仅查询字段
- Dbtype string //用于区分连得是哪个库,使用不同的用户密码
- }
- type RuleDFA struct {
- Match []DFA //包含的敏感词
- MatchNum []int //包含敏感词匹配个数
- MisMatch DFA //不包含的敏感词
- MisMatchNum int //不包含敏感词匹配个数
- }
- type DFA struct {
- Link map[string]interface{}
- }
- // 初始化任务
- func InitTaskData(_id string) {
- defer tools.Catch()
- taskData, _ := tools.MgoClass.FindById(tools.COLL_TASK, _id, nil)
- task := &TTask{}
- task.MgoTask = &u.MongodbSim{}
- s_querycon := "1"
- s_table := ""
- s_timefieldname := ""
- s_starttime := int64(0)
- s_asfield := ""
- if b_updaterule, ok := (*taskData)["b_updaterule"].(bool); ok {
- task.B_UpdateRule = b_updaterule //更新任务下的规则
- }
- if (*taskData)["s_querycon"] != nil { //
- s_querycon = util.ObjToString((*taskData)["s_querycon"])
- }
- if (*taskData)["s_starttime"] != nil {
- s_starttime = (*taskData)["s_starttime"].(int64)
- }
- if (*taskData)["s_table"] != nil {
- s_table = util.ObjToString((*taskData)["s_table"])
- }
- if (*taskData)["s_timefieldname"] != nil {
- s_timefieldname = util.ObjToString((*taskData)["s_timefieldname"])
- }
- if (*taskData)["s_asfield"] != nil {
- s_asfield = util.ObjToString((*taskData)["s_asfield"])
- }
- //task.Class
- task.ID = u.BsonIdToSId((*taskData)["_id"])
- task.S_name = util.ObjToString((*taskData)["s_name"])
- task.S_class = util.ObjToString((*taskData)["s_class"])
- task.S_mgourl = util.ObjToString((*taskData)["s_mgourl"])
- task.S_mgodb = util.ObjToString((*taskData)["s_mgodb"])
- task.S_coll = util.ObjToString((*taskData)["s_coll"])
- task.I_poolsize = util.IntAll((*taskData)["i_poolsize"])
- task.Task_PreRule = util.ObjToString((*taskData)["s_task_prerule"])
- task.I_multiclass = util.IntAllDef((*taskData)["i_multiclass"], 0)
- task.I_wordcount = util.IntAllDef((*taskData)["i_wordcount"], 0)
- task.I_savetype = util.IntAllDef((*taskData)["i_savetype"], 1)
- task.I_thread = util.IntAllDef((*taskData)["i_thread"], 1)
- task.S_query = util.ObjToString((*taskData)["s_query"])
- task.S_startid = util.ObjToString((*taskData)["s_startid"])
- task.S_attr = util.ObjToString((*taskData)["s_attr"])
- task.I_rate = util.IntAllDef((*taskData)["i_rate"], 10)
- task.I_tasktype = util.IntAllDef((*taskData)["i_tasktype"], 0)
- task.I_status = util.IntAll((*taskData)["i_status"])
- task.S_idcoll = util.ObjToString((*taskData)["s_idcoll"])
- task.S_querycon = s_querycon
- task.S_starttime = s_starttime
- task.S_table = s_table
- task.S_timefieldname = s_timefieldname
- task.S_asfield = s_asfield
- task.I_fieldUpdate = util.IntAllDef((*taskData)["i_fieldUpdate"], 0)
- task.S_classField = util.ObjToString((*taskData)["s_classfield"])
- task.Dbtype = util.ObjToString((*taskData)["s_dbtype"])
- flagAttrVal := 1
- attrs := strings.Split(task.S_attr, "__")
- if len(attrs) == 2 {
- flagAttrVal = util.IntAll(attrs[1])
- task.S_attr = attrs[0]
- }
- task.AttrVal = flagAttrVal
- //联表查询初始化mgo,线上只有行业分类用到;跑历史招标、行业分类的时候也用到两边查询
- for _, v := range tools.Config {
- if m, ok := v.(map[string]interface{}); ok {
- if m["taskid"] == task.ID {
- if m["mgoaddr"] != nil && m["db"] != nil && m["coll"] != nil {
- dbtype := util.ObjToString(m["dbtype"])
- addr, _ := m["mgoaddr"].(string)
- db, _ := m["db"].(string)
- coll, _ := m["coll"].(string)
- task.MulMgo = &u.MongodbSim{}
- task.MulMgo.MongodbAddr = addr
- task.MulMgo.Size = 3
- task.MulMgo.DbName = db
- if dbtype != "" {
- task.MulMgo.UserName = tools.DbInfo[dbtype][0]
- task.MulMgo.Password = tools.DbInfo[dbtype][1]
- }
- task.MulMgo.InitPool()
- task.MulColl = coll
- }
- }
- }
- }
- //初始化查询字段信息
- if task.Task_QueryFieldMap == nil {
- task.Task_QueryFieldMap = make(map[string]interface{})
- }
- //Task_QueryFieldMap加入关联字段
- for _, v := range strings.Split(s_asfield, "==") {
- if v != "" {
- task.Task_QueryFieldMap[v] = 1
- }
- }
- //Task_QueryFieldMap加入分类字段
- //Task_QueryFieldArr加入分类字段
- for _, f := range strings.Split(task.S_classField, ",") {
- task.Task_QueryFieldMap[f] = 1
- task.Task_QueryFieldArr = append(task.Task_QueryFieldArr, f)
- }
- //初始化任务下所有的分类和规则
- InitClassAndRuleData(_id, task)
- }
- // InitClassAndRuleData 初始化任务下所有的分类和规则
- func InitClassAndRuleData(_id string, task *TTask) {
- defer tools.Catch()
- classIdStr := task.S_class
- if classIdStr != "" {
- classIdArr := strings.Split(classIdStr, ",")
- classArr := make([]*Class, 0)
- for _, classid := range classIdArr {
- classData, _ := tools.MgoClass.FindById(tools.COLL_CLASS, classid, nil)
- if classData != nil {
- //初始化Class
- class := &Class{
- //Rule: CidRuleMap[classid],
- Cid: classid,
- Class_PreRule: util.ObjToString((*classData)["s_class_prerule"]),
- S_name: util.ObjToString((*classData)["s_name"]),
- S_fields: util.ObjToString((*classData)["s_fields"]),
- S_code: util.ObjToString((*classData)["s_code"]),
- S_default: util.ObjToString((*classData)["s_default"]),
- S_pid: util.ObjToString((*classData)["s_pid"]),
- S_savefield: util.ObjToString((*classData)["s_savefield"]),
- }
- //根据classid查找对应的rule
- ruleList, _ := tools.MgoClass.Find(tools.COLL_RULE, `{"s_classid":"`+classid+`"}`, `{"i_order":1}`, nil, false, -1, -1)
- if ruleList != nil && len(*ruleList) > 0 {
- for _, v := range *ruleList {
- _id := u.BsonIdToSId(v["_id"])
- //rule
- s_rule, _ := v["s_rule"].(primitive.A)
- i_rule := DealRules(s_rule)
- //detailrule
- s_detailrule, _ := v["s_detailrule"].(primitive.A)
- i_detailrule := DealRules(s_detailrule)
- //notrule
- s_notrule, _ := v["s_notrule"].(primitive.A)
- i_notrule := DealRules(s_notrule)
- rule := &Rule{
- Rid: _id,
- Reg: i_rule,
- DetailReg: i_detailrule,
- NotReg: i_notrule,
- Rule_PreRule: util.ObjToString(v["s_rule_prerule"]),
- S_name: util.ObjToString(v["s_name"]),
- S_code: util.ObjToString(v["s_code"]),
- S_pid: util.ObjToString(v["s_pid"]),
- }
- class.Rule = append(class.Rule, rule)
- }
- }
- classArr = append(classArr, class)
- }
- }
- task.Class = classArr
- TaskMap[_id] = task
- }
- }
- func DealRules(rules []interface{}) (i_rule []interface{}) {
- for _, r := range tools.ObjArrToStringArr(rules) {
- if strings.HasPrefix(r, "'") && strings.HasSuffix(r, "'") { //正则
- rs := []rune(r)
- ru := string(rs[1 : len(rs)-1])
- rureg, err := regexp.Compile(ru)
- if err != nil {
- log.Println("error---rule:", r)
- continue
- }
- i_rule = append(i_rule, []interface{}{rureg}...)
- } else { //规则,加入到敏感词匹配
- matchnum := 0
- mismatchnum := 0
- isnum1 := false
- isnum2 := false
- numArr := make([]int, 0)
- ruleDFA := &RuleDFA{
- Match: []DFA{},
- MisMatch: DFA{},
- }
- tmpArr := strings.Split(r, "^")
- matchTmp := tmpArr[0]
- ruleTextArr := REG.FindAllString(matchTmp, -1)
- for _, match := range ruleTextArr {
- matchnum, isnum1 = GetNum(match)
- numArr = append(numArr, matchnum)
- matchArr := GetRule(match, isnum1)
- tmpDFA := DFA{
- Link: make(map[string]interface{}),
- }
- tmpDFA.AddWord(matchArr...)
- ruleDFA.Match = append(ruleDFA.Match, tmpDFA)
- }
- if len(tmpArr) == 2 {
- mismatch := tmpArr[1]
- mismatchnum, isnum2 = GetNum(mismatch)
- mismatchArr := GetRule(mismatch, isnum2)
- ruleDFA.MisMatch.AddWord(mismatchArr...)
- }
- ruleDFA.MatchNum = numArr
- ruleDFA.MisMatchNum = mismatchnum
- i_rule = append(i_rule, []interface{}{ruleDFA}...)
- }
- }
- return
- }
- // 更新任务状态
- func (tt *TTask) Sstatus() int {
- if tt.I_status == 0 && tt.B_Running {
- return 1
- } else if tt.I_status == 0 && !tt.B_Running {
- return 0
- } else if tt.I_status == 1 && tt.B_Running {
- return 2
- } else if tt.I_status == 1 && !tt.B_Running {
- return 3
- }
- return -1
- }
- // 停止任务
- func (tt *TTask) SStop() bool {
- tt.Lock.Lock()
- defer tt.Lock.Unlock()
- if tt.I_status == 1 {
- tt.I_status = 0
- tt.FlagQuit <- true
- log.Println("开始停止...")
- }
- return true
- }
- var NN = 400
- // 存放测试的数据
- var TEST = &TestList{
- Count: map[string][]int{},
- }
- type TestList struct {
- Lock sync.Mutex
- Count map[string][]int
- }
- func (tl *TestList) Get(id string) []int {
- tl.Lock.Lock()
- defer tl.Lock.Unlock()
- return tl.Count[id]
- }
- func (tl *TestList) Put(id string, val ...int) {
- defer tools.Catch()
- tl.Lock.Lock()
- defer tl.Lock.Unlock()
- if len(val) == 3 {
- tl.Count[id] = val
- } else {
- tval := tl.Count[id]
- tl.Count[id][2] = val[0]
- if tval[1] <= val[0] {
- tl.Count[id][0] = 1
- }
- }
- }
- func (tl *TestList) Del(id string) {
- tl.Lock.Lock()
- defer tl.Lock.Unlock()
- delete(tl.Count, id)
- os.Remove("csv/" + id)
- }
- // 任务测试
- func (tt *TTask) RRunTest(s_startid, s_endid, s_query, filename string) {
- defer tools.Catch()
- defer func() {
- go func() {
- time.AfterFunc(10*time.Minute, func() {
- TEST.Del(filename)
- })
- }()
- }()
- //开始识别
- sess := tt.MgoTask.GetMgoConn()
- defer tt.MgoTask.DestoryMongoConn(sess)
- q := map[string]interface{}{}
- if s_query != "" {
- json.Unmarshal([]byte(strings.Replace(s_query, "'", "\"", -1)), &q)
- }
- if s_startid != "" {
- q["_id"] = map[string]interface{}{
- "$gte": u.StringTOBsonId(s_startid),
- }
- }
- if s_endid != "" {
- if q["_id"] != nil {
- q["_id"].(map[string]interface{})["$lte"] = u.StringTOBsonId(s_endid)
- } else {
- q["_id"] = map[string]interface{}{
- "$lte": u.StringTOBsonId(s_endid),
- }
- }
- }
- count := tt.MgoTask.Count(tt.S_coll, &q)
- if count == 0 {
- return
- }
- TEST.Put(filename, 0, count, 0)
- query := sess.DB(tt.S_mgodb).C(tt.S_coll).Find(&q).Iter()
- arr := [][]string{}
- i := 0
- for tmp := make(map[string]interface{}); query.Next(&tmp); i = i + 1 {
- if i > 2000 { //数据跑至2000停止
- break
- }
- //按顺序识别
- tid := tmp["_id"]
- res := make([]string, 3)
- res[0] = u.BsonIdToSId(tid)
- res[1] = util.ObjToString(tmp["title"])
- if util.IntAll(tmp["infoformat"]) == 2 { //此处增加特例
- res = append(res, "拟建", "拟建")
- } else if util.IntAll(tmp["infoformat"]) == 3 {
- res = append(res, "产权", "产权")
- } else {
- SMap := NewClassificationRun(tt, tmp)
- for _, k := range SMap.Keys {
- res = append(res, util.ObjToString((SMap.Map[k].([]string))[0]))
- }
- }
- arr = append(arr, res)
- TEST.Put(filename, i+1)
- tmp = make(map[string]interface{})
- }
- if len(arr) > 0 {
- if !Exist("csv") {
- os.Mkdir("csv", 777)
- }
- f, _ := os.Create("csv/" + filename)
- w := csv.NewWriter(f)
- for _, str := range arr {
- w.Write(str)
- }
- w.Flush()
- f.Close()
- }
- log.Println("运行RUNTEST-OVER")
- }
- func Exist(filename string) bool {
- _, err := os.Stat(filename)
- return err == nil || os.IsExist(err)
- }
- func (tt *TTask) RRun() {
- first := make(chan bool, 1)
- if tt.I_status == 1 {
- first <- true
- }
- OVER:
- for tt.I_status == 1 {
- tt.B_Running = false
- select {
- case <-tt.FlagQuit: //结果任务控制
- log.Println("退出,RUN", tt.S_name)
- tt = nil
- break OVER
- case <-first: //第一次执行控制
- if tools.ControlTaskRun { //任务流程控制,现有模式用不到,默认false
- tools.AllTaskFinish = false
- }
- log.Println("第一次执行任务:", tt.S_name)
- newtaskrun(tt)
- case <-time.Tick(time.Duration(tt.I_rate) * time.Second): //任务定时控制
- //执行定时任务前,检查任务是否更新了rule
- if tt.B_UpdateRule {
- InitClassAndRuleData(tt.ID, tt) //重新加载任务的rule
- tt.B_UpdateRule = false
- UpdateTaskInfo(false, tt.ID)
- }
- //上一轮任务执行完毕再走下一轮(上一轮任务执行完毕的标志是业主分类执行完)
- //util.Debug("ControlTaskRun---", tools.ControlTaskRun, "AllTaskFinish---", tools.AllTaskFinish)
- if !tools.ControlTaskRun || tt.S_querycon == "0" { //线下环境不控制定时任务;或者线上通过时间定时执行
- newtaskrun(tt)
- } else if tools.ControlTaskRun && tools.AllTaskFinish { //线上控制
- tools.AllTaskFinish = false
- newtaskrun(tt)
- } else {
- log.Println("上轮任务暂未完成")
- }
- }
- }
- }
- func newtaskrun(tt *TTask) {
- NewTaskRunAll(tt, false, nil)
- }
- // NewTaskRunAll 常规任务和udp非合并数据处理方法
- func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
- total := 0
- tools.Try(func() { //不加这一层defer运行不了!!!
- timespan := false //时间间隔(控制数据条数打印)
- tt.B_Running = true
- defer func() {
- //业主分类执行完修改AllTaskFinish状态;控制流程的任务id(整个分类流程业主分类结尾,以此为标记)
- if tt.ID == tools.ControlLastTaskId {
- tools.AllTaskFinish = true
- }
- tt.B_Running = false
- }()
- //开始识别
- pool := make(chan bool, tt.I_thread)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- q := make(map[string]interface{})
- nextNodeSid, nextNodeEid := "", ""
- oid := tt.LastId //上次定时任务的结束id
- s_table := tt.S_table //结果表
- s_asfield := tt.S_asfield //关联字段
- asfields := strings.Split(s_asfield, "==")
- qfield := "" //查询表字段
- rfield := "" //结果表字段
- qtp := "" //查询字段的类型
- rtp := "" //结果字段的类型
- rtype := "" //结果字段的真实类型
- if s_table != "" { //有结果表保存到结果表(更新,插入)
- tt.S_collection = s_table
- }
- if len(asfields) == 2 {
- qfield = asfields[0] //查询表字段
- rfield = asfields[1] //结果表字段
- //id处理 object string互转
- qfield, rfield, qtp, rtp = IdTypeConversion(qfield, rfield)
- queryExis := map[string]interface{}{
- rfield: map[string]interface{}{
- "$exists": true,
- },
- }
- onedata, _ := tt.MgoTask.FindOne(tt.S_collection, queryExis)
- rtype = reflect.TypeOf((*onedata)[rfield]).String() //结果表字段真实类型
- }
- //有结果表有关联字段,在结果表上根据关联字段更新;有结果表没有关联字段,在结果表根据_id段更新
- //没有结果表在查询表上更新
- //log.Println("lastid:", tt.LastId, "查询方式:", tt.S_querycon, "时间:", tt.S_starttime, "条件:", tt.S_query, "table:", tt.S_table, "s_timefieldname:", tt.S_timefieldname)
- sort := ""
- if !budp { //非udp查询条件
- sort = "_id"
- if tt.S_query != "" { //有查询条件
- json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
- }
- idcoll := tt.S_idcoll
- if idcoll != "" { //idcoll中查询id区间,bidding_processing_ids
- nextNodeSid, nextNodeEid = FindId(idcoll) //查询id段
- if nextNodeSid != "" && nextNodeEid != "" && nextNodeSid <= nextNodeEid {
- q["_id"] = bson.M{
- "$gt": u.StringTOBsonId(nextNodeSid),
- "$lte": u.StringTOBsonId(nextNodeEid),
- }
- } else {
- log.Println("定时任务", tt.S_name, "查询", tt.S_idcoll, "时间段出错", nextNodeSid, nextNodeEid)
- tools.AllTaskFinish = true //为查询到数据视为此轮任务完成
- return
- }
- stime, _ := strconv.ParseInt(nextNodeSid[:8], 16, 64) //取id前8位转成时间戳
- etime, _ := strconv.ParseInt(nextNodeEid[:8], 16, 64) //
- if etime-stime < 1800 { //时间跨度小于半小时
- timespan = true
- }
- } else {
- if q["_id"] != nil {
- if _id, ok := q["_id"].(string); ok {
- q["_id"] = u.StringTOBsonId(_id)
- } else if _ids, ok := q["_id"].(map[string]interface{}); ok {
- for k, v := range _ids {
- if id, bk := v.(string); bk {
- _ids[k] = u.StringTOBsonId(id)
- }
- }
- }
- }
- if tt.S_querycon == "1" { //id查询
- if tt.S_query != "" {
- //页面上配置了查询条件,直接使用,不再单独查询上次任务结束ID
- if tt.LastId != "" && q["_id"] == nil {
- q["_id"] = map[string]interface{}{
- "$gt": u.StringTOBsonId(tt.LastId),
- }
- }
- } else {
- //临时修改查询id区间段
- comeintime := time.Now().Unix() - 5*60
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gt": comeintime,
- },
- }
- qId := tt.MgoTask.GetMgoConn()
- defer tt.MgoTask.DestoryMongoConn(qId)
- tmpData := qId.DB(tt.S_mgodb).C(tt.S_coll).Find(&query).Limit(1).Sort("-_id").Iter()
- eId := ""
- for tmp := make(map[string]interface{}); tmpData.Next(tmp); {
- eId = u.BsonIdToSId(tmp["_id"])
- }
- if tt.LastId != "" && q["_id"] == nil {
- sid := tt.LastId
- q["_id"] = map[string]interface{}{
- "$gt": u.StringTOBsonId(sid),
- }
- if eId != "" {
- q["_id"] = map[string]interface{}{
- "$gt": u.StringTOBsonId(sid),
- "$lte": u.StringTOBsonId(eId),
- }
- }
- }
- //按id查询,为了保证有新数据入库,每次休息2分钟
- time.Sleep(time.Second * 60)
- //测试环境q的赋值执行下述代码
- //if tt.LastId != "" && q["_id"] == nil {
- // q["_id"] = map[string]interface{}{
- // "$gt": u.StringTOBsonId(tt.LastId),
- // }
- //}
- }
- } else { //时间查询
- name := tt.S_timefieldname
- q[name] = map[string]interface{}{
- "$gt": tt.S_starttime,
- }
- }
- }
- } else { //udp查询条件
- sort = "-_id"
- if tt.S_query != "" { //有查询条件
- json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
- }
- tmpq := mapInfo["q"].(map[string]interface{})
- for k, v := range tmpq {
- q[k] = v
- }
- sid := mapInfo["gtid"].(string)
- eid := mapInfo["lteid"].(string)
- stime, _ := strconv.ParseInt(sid[:8], 16, 64)
- etime, _ := strconv.ParseInt(eid[:8], 16, 64)
- if etime-stime < 1800 { //时间跨度小于半小时
- timespan = true
- }
- }
- //task
- tasksess := tt.MgoTask.GetMgoConn()
- defer tt.MgoTask.DestoryMongoConn(tasksess)
- //通过ID 查询数据时,才打印日志
- if tt.S_querycon == "1" {
- log.Println("运行", tt.S_name, "start")
- log.Println("线程数:", tt.I_thread, "查询语句", q)
- log.Println("查询---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
- log.Println("select:", tt.Task_QueryFieldMap, tt.Task_QueryFieldArr)
- }
- extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(tt.Task_QueryFieldMap).Sort(sort).Iter()
- arr := [][]map[string]interface{}{}
- if tt.I_wordcount == 1 {
- tt.WordCount = map[string]map[string]int{}
- }
- sum := 0
- for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ {
- tid := tmp["_id"]
- if !timespan && sum%2000 == 0 {
- log.Println("current:", sum, tt.S_name)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- //按顺序识别
- tid := tmp["_id"]
- update := []map[string]interface{}{}
- //如果有关联字段 根据关联字段更新或者保存
- if len(asfields) == 2 {
- //log.Println("qfield====", qfield)
- field := tmp[qfield]
- if field != nil {
- //log.Println("field===", field, qtp, rtp)
- if qtp == "bson.ObjectId" && rtp == "bson.ObjectId" { //俩字段类型一致
- } else {
- //log.Println(field, " 查询字段", qfield, "查询类型", qtp, "结果字段", rfield, " 结果类型", rtp, "结果真实类型", rtype)
- if rtype == rtp { //转换后与真实类型不同,填写时类型错误
- //将查询字段类型转换为对应的结果字段类型
- if qtp == "bson.ObjectId" && rtp == "string" {
- field = u.BsonIdToSId(tmp[qfield])
- } else if qtp == "string" && rtp == "bson.ObjectId" {
- field = u.StringTOBsonId(tmp[qfield].(string))
- }
- }
- }
- update = append(update, map[string]interface{}{ //根据关联字段更新
- rfield: field,
- })
- }
- } else {
- update = append(update, map[string]interface{}{ //更新的条件 根据id更新
- "_id": tid,
- })
- }
- res := map[string]interface{}{}
- ksmap := make(map[string]map[string][]string)
- if util.IntAll(tmp["infoformat"]) == 2 { //此处增加特例
- res["toptype"] = "拟建"
- res["subtype"] = "拟建"
- } else if util.IntAll(tmp["infoformat"]) == 3 {
- res["toptype"] = "产权"
- res["subtype"] = "产权"
- } else {
- SMap := &tools.SortMap{}
- if tt.I_tasktype == 2 { //标签任务
- SMap = TagClassificationRun(tt, tmp)
- } else if tt.I_tasktype == 1 { //附件任务
- SMap = FileClassificationRun(tt, tmp)
- //res["projectinfo"] = tmp["projectinfo"]
- } else { //常规任务
- SMap = NewClassificationRun(tt, tmp)
- //1.针对招标分类的特殊处理
- if tt.ID == "57982b4436b82b073c000001" && tt.S_name == "招标分类" {
- //1.一级分类时,符合结果中成交规则时
- //todo 如果没有打上分类,调用ai 模型分类
- if _, ok := SMap.Map["toptype"]; !ok {
- if util.ObjToString(tools.Config["aiurl"]) != "" {
- now := time.Now()
- data := map[string]interface{}{
- "title": tmp["title"],
- "detail": tmp["detail"],
- }
- reqData := map[string]interface{}{
- "texts": []interface{}{data},
- }
- rai := SendAi(reqData, util.ObjToString(tools.Config["aiurl"]))
- log.Println("ai 分类时长", time.Since(now).Seconds())
- if len(rai) > 0 {
- resa := rai["result"]
- if dataa, ok := resa.([]interface{}); ok {
- da := dataa[0]
- if len(util.ObjToString(da)) > 0 {
- cs := strings.Split(util.ObjToString(da), "-")
- SMap.Map["toptype"] = cs[0]
- SMap.Map["subtype"] = cs[1]
- }
- }
- }
- }
- }
- if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" {
- if _, ok := tmp["detail"]; ok {
- if u.ChargeDetailResult(util.ObjToString(tmp["detail"])) {
- SMap.Map["toptype"] = "结果"
- resa := ReSub(tt, tmp, "结果")
- subtype := resa.Map["subtype"]
- delete(SMap.Map, "subtype")
- SMap.Map["subtype"] = subtype
- }
- }
- }
- //2.一级分类是预告,但是标题含有招标计划,同时含有 预公告|预公示,变为 采购意向
- if SMap.Map["toptype"] == "预告" {
- if u.DealYuce(tmp["title"].(string)) {
- SMap.Map["toptype"] = "采购意向"
- SMap.Map["subtype"] = "采购意向"
- }
- }
- //3.针对 项目登记 相关数据处理,符合条件的归类为‘采购意向’
- if u.IsPurchasingIntent(tmp) {
- SMap.Map["toptype"] = "采购意向"
- SMap.Map["subtype"] = "采购意向"
- }
- }
- }
- //2.针对用户行业分类,单独处理数据
- if mapInfo["stype"] == "yonghuhangye" || strings.TrimSpace(tt.S_name) == "用户行业分类" {
- subs := SMap.Map["subscope_dy"]
- delete(SMap.Map, "topscope_dy")
- var tops []string
- if subscopes, ok := subs.([]string); ok {
- for _, sub := range subscopes {
- top := strings.Split(sub, "_")[0]
- tops = append(tops, top)
- }
- SMap.Map["topscope_dy"] = u.RemoveDuplicateString(tops)
- }
- }
- //追加时处理,//更新字段 I_fieldUpdate 0:覆盖 1:追加
- if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 {
- //封装追加信息
- if len(SMap.Keys) > 0 {
- for _, k := range SMap.Keys {
- ksarr := make([]string, 0)
- if k != "toptype" && k != "subtype" {
- ks, ok := SMap.Map[k].(string)
- if ok {
- ksarr = append(ksarr, ks)
- } else {
- ksarr = append(ksarr, SMap.Map[k].([]string)...)
- }
- }
- ksmap[k] = map[string][]string{
- "$each": ksarr,
- }
- }
- }
- } else { //非多分类
- res = SMap.Map
- if tt.I_tasktype == 1 { //附件任务
- if tmp["projectinfo"] != nil {
- res["projectinfo"] = tmp["projectinfo"]
- }
- }
- }
- }
- //if len(res) > 0 || len(ksmap) > 0 {
- IS.NewAdd(tt, res)
- if tt.S_attr != "" {
- //res[tt.S_attr] = 1
- res[tt.S_attr] = tt.AttrVal
- }
- // 添加分类时间
- res["classification_time"] = time.Now().Unix()
- if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //I_fieldUpdate 0:覆盖 1:追加
- if len(ksmap) > 0 && len(res) > 0 {
- update = append(update, map[string]interface{}{
- "$set": res,
- "$addToSet": ksmap,
- })
- } else if len(ksmap) > 0 && len(res) == 0 {
- update = append(update, map[string]interface{}{
- "$addToSet": ksmap,
- })
- } else if len(ksmap) == 0 && len(res) > 0 {
- update = append(update, map[string]interface{}{
- "$set": res,
- })
- }
- } else { //非多分类或者多分类的覆盖
- //log.Println("更新==", res)
- if len(res) > 0 {
- update = append(update, map[string]interface{}{
- "$set": res,
- })
- }
- }
- //更新
- lock.Lock()
- if len(update) == 2 { //有更新条件和更新内容时才进行更新操作
- arr = append(arr, update)
- }
- if len(arr) >= NN {
- //if s_table == "" {
- tt.MgoTask.UpdateBulk(tt.S_collection, arr...)
- //} else {
- // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...)
- //}
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- //}
- }(tmp)
- ttid := u.BsonIdToSId(tid)
- if ttid > tt.LastId && !budp {
- tt.LastId = ttid
- }
- tmp = make(map[string]interface{})
- }
- total = sum
- //通过ID 查询分类数据才打印日志
- if tt.S_querycon == "1" {
- log.Println("总数:————", sum)
- if timespan {
- log.Println("current:————", sum)
- }
- }
- wg.Wait()
- lock.Lock()
- if len(arr) > 0 {
- //if s_table == "" { //没有结果表
- tt.MgoTask.UpdateBulk(tt.S_collection, arr...) //在原表上更新
- //} else { //有结果表
- // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...)
- //}
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- tt.WcLock.Lock()
- if len(tt.WordCount) > 0 {
- savem := []map[string]interface{}{}
- tn := time.Now().Unix()
- for wk, wm := range tt.WordCount {
- for ck, cm := range wm {
- m1 := map[string]interface{}{
- "s_class": wk,
- "s_word": ck,
- "i_count": cm,
- "bz": tn,
- }
- savem = append(savem, m1)
- if len(savem) >= 1000 {
- tools.MgoClass.SaveBulk("wordcount", savem...)
- savem = []map[string]interface{}{}
- }
- }
- }
- if len(savem) > 0 {
- tools.MgoClass.SaveBulk("wordcount", savem...)
- savem = nil
- }
- }
- tt.WcLock.Unlock()
- //更新最后id
- if !budp && oid != tt.LastId {
- setid := map[string]interface{}{
- "$set": map[string]interface{}{
- "s_startid": tt.LastId,
- },
- }
- go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, setid, false, false)
- }
- //更新最后时间
- if !budp {
- nowtime := time.Now().Unix()
- settime := map[string]interface{}{
- "$set": map[string]interface{}{
- "s_starttime": nowtime,
- },
- }
- go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, settime, false, false)
- }
- //InitRule()
- if tt.S_querycon == "1" {
- log.Println("运行", tt.S_name, "over", oid, " endid:", tt.LastId)
- }
- //定时任务完成发送udp信号调抽取
- if tools.Extract["preNodeId"] == tt.ID { //常规招标定时任务udp调用抽取
- if tt.S_idcoll == "" {
- nextNodeSid = oid
- nextNodeEid = tt.LastId
- }
- UdpRunExtract(nextNodeSid, nextNodeEid)
- }
- })
- return total
- }
- // udp合并数据处理的方法
- func UdpTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}, stype string) int {
- total := 0
- tools.Try(func() { //不加这一层defer运行不了!!!
- timespan := false
- tt.B_Running = true
- defer func() {
- tt.B_Running = false
- }()
- //开始识别
- pool := make(chan bool, tt.I_thread)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- q := map[string]interface{}{}
- s_table := tt.S_table //结果表
- s_asfield := tt.S_asfield //关联字段
- asfields := strings.Split(s_asfield, "==")
- qfield := "" //查询表字段
- rfield := "" //结果表字段
- qtp := "" //查询字段的类型
- rtp := "" //结果字段的类型
- rtype := "" //结果字段的真实类型
- if s_table != "" { //有结果表保存到结果表(更新,插入)
- tt.S_collection = s_table
- }
- if len(asfields) == 2 {
- qfield = asfields[0] //查询表字段
- rfield = asfields[1] //结果表字段
- //id处理 object string互转
- qfield, rfield, qtp, rtp = IdTypeConversion(qfield, rfield)
- queryExis := map[string]interface{}{
- rfield: map[string]interface{}{
- "$exists": true,
- },
- }
- onedata, _ := tt.MgoTask.FindOne(tt.S_collection, queryExis)
- rtype = reflect.TypeOf((*onedata)[rfield]).String() //结果表字段真实类型
- }
- //有结果表有关联字段,在结果表上根据关联字段更新;有结果表没有关联字段,在结果表根据_id段更新
- //没有结果表在查询表上更新
- //log.Println("lastid:", tt.LastId, "查询方式:", tt.S_querycon, "时间:", tt.S_starttime, "条件:", tt.S_query, "table:", tt.S_table, "s_timefieldname:", tt.S_timefieldname)
- q = mapInfo["q"].(map[string]interface{})
- //计算起始id和结束id时间宽度,大于半小时的2000数据打印一次个数,小于半小时的每条数据都打印一次个数
- sid := mapInfo["gtid"].(string)
- eid := mapInfo["lteid"].(string)
- stime, _ := strconv.ParseInt(sid[:8], 16, 64)
- etime, _ := strconv.ParseInt(eid[:8], 16, 64)
- if etime-stime < 1800 { //时间跨度小于半小时
- timespan = true
- }
- udpsess := tt.MulMgo.GetMgoConn()
- if udpsess == nil {
- log.Println("连接为空", tt.S_name, mapInfo)
- return
- }
- defer tt.MulMgo.DestoryMongoConn(udpsess)
- udptmp := udpsess.DB(tt.MulMgo.DbName).C(tt.MulColl).Find(&q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
- //task
- tasksess := tt.MgoTask.GetMgoConn()
- defer tt.MgoTask.DestoryMongoConn(tasksess)
- log.Println("线程数:", tt.I_thread, "查询语句", q)
- log.Println("task---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
- log.Println("select:", tt.Task_QueryFieldMap)
- extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(&q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
- arr := [][]map[string]interface{}{}
- oid := tt.LastId
- if tt.I_wordcount == 1 {
- tt.WordCount = map[string]map[string]int{}
- }
- sum := 0
- //对比两张表数据,减少查询次数
- var compare bson.M
- for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ {
- result := tmp
- //对比
- for {
- if compare == nil {
- compare = make(bson.M)
- if !udptmp.Next(&compare) { //传参表数据赋值给compare
- break
- }
- }
- if compare != nil {
- //对比
- cid := u.BsonIdToSId(compare["_id"]) //传参表id
- tid := u.BsonIdToSId(tmp["_id"]) //抽取表id
- if cid == tid {
- //更新抽取表的数据,再进行分类
- for _, k := range tt.Task_QueryFieldArr {
- v1 := compare[k]
- v2 := tmp[k]
- if v2 == nil && v1 != nil {
- result[k] = v1
- }
- }
- break
- } else {
- if tid > cid { //抽取表id大于传参表id
- compare = nil
- continue
- } else {
- break
- }
- }
- } else {
- break
- }
- }
- tid := tmp["_id"]
- if !timespan && sum%2000 == 0 {
- log.Println("current:", sum, tid)
- }
- pool <- true
- wg.Add(1)
- go func(result map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- //按顺序识别
- if result != nil {
- tid := result["_id"]
- update := []map[string]interface{}{}
- //如果有关联字段 根据关联字段更新或者保存
- if len(asfields) == 2 {
- //log.Println("qfield====", qfield)
- field := result[qfield]
- if field != nil {
- //log.Println("field===", field, qtp, rtp)
- if qtp == "bson.ObjectId" && rtp == "bson.ObjectId" { //俩字段类型一致
- } else {
- //log.Println(field, " 查询字段", qfield, "查询类型", qtp, "结果字段", rfield, " 结果类型", rtp, "结果真实类型", rtype)
- if rtype == rtp { //转换后与真实类型不同,填写时类型错误
- //将查询字段类型转换为对应的结果字段类型
- if qtp == "bson.ObjectId" && rtp == "string" {
- field = u.BsonIdToSId(result[qfield])
- } else if qtp == "string" && rtp == "bson.ObjectId" {
- field = u.StringTOBsonId(result[qfield].(string))
- }
- }
- }
- update = append(update, map[string]interface{}{ //根据关联字段更新
- rfield: field,
- })
- }
- } else {
- update = append(update, map[string]interface{}{ //更新的条件 根据id更新
- "_id": tid,
- })
- }
- res := map[string]interface{}{}
- ksmap := make(map[string]map[string][]string)
- if util.IntAll(result["infoformat"]) == 2 { //此处增加特例
- res["toptype"] = "拟建"
- res["subtype"] = "拟建"
- } else if util.IntAll(result["infoformat"]) == 3 {
- res["toptype"] = "产权"
- res["subtype"] = "产权"
- } else {
- SMap := &tools.SortMap{}
- if tt.I_tasktype == 2 { //标签任务
- SMap = TagClassificationRun(tt, result)
- } else if tt.I_tasktype == 1 { //附件任务
- SMap = FileClassificationRun(tt, result)
- //res["projectinfo"] = tmp["projectinfo"]
- } else { //常规任务
- SMap = NewClassificationRun(tt, result)
- //一级分类时,符合结果中成交规则时
- if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" {
- if _, ok := tmp["detail"]; ok {
- if u.ChargeDetailResult(util.ObjToString(tmp["detail"])) {
- SMap.Map["toptype"] = "结果"
- resa := ReSub(tt, tmp, "结果")
- subtype := resa.Map["subtype"]
- delete(SMap.Map, "subtype")
- SMap.Map["subtype"] = subtype
- }
- }
- }
- }
- if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //更新字段 I_fieldUpdate 0:覆盖 1:追加
- //封装追加信息
- if len(SMap.Keys) > 0 {
- for _, k := range SMap.Keys {
- ksarr := make([]string, 0)
- if k != "toptype" && k != "subtype" {
- ks, ok := SMap.Map[k].(string)
- if ok {
- ksarr = append(ksarr, ks)
- } else {
- ksarr = append(ksarr, SMap.Map[k].([]string)...)
- }
- }
- ksmap[k] = map[string][]string{
- "$each": ksarr,
- }
- }
- }
- } else {
- res = SMap.Map
- }
- }
- IS.NewAdd(tt, res)
- if tt.S_attr != "" {
- res[tt.S_attr] = tt.AttrVal
- }
- if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //I_fieldUpdate 0:覆盖 1:追加
- if len(ksmap) > 0 && len(res) > 0 {
- update = append(update, map[string]interface{}{
- "$set": res,
- "$addToSet": ksmap,
- })
- } else if len(ksmap) > 0 && len(res) == 0 {
- update = append(update, map[string]interface{}{
- "$addToSet": ksmap,
- })
- } else if len(ksmap) == 0 && len(res) > 0 {
- update = append(update, map[string]interface{}{
- "$set": res,
- })
- }
- } else {
- if len(res) > 0 {
- update = append(update, map[string]interface{}{
- "$set": res,
- })
- }
- }
- //更新
- lock.Lock()
- if len(update) == 2 { //有更新条件和更新内容时才进行更新操作
- arr = append(arr, update)
- }
- if len(arr) >= NN {
- tt.MgoTask.UpdateBulk(tt.S_collection, arr...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }
- }(result)
- ttid := u.BsonIdToSId(tid)
- if ttid > tt.LastId {
- tt.LastId = ttid
- }
- tmp = make(map[string]interface{})
- }
- total = sum
- if timespan {
- log.Println("current:", sum)
- }
- wg.Wait()
- lock.Lock()
- if len(arr) > 0 {
- //if s_table == "" { //没有结果表
- tt.MgoTask.UpdateBulk(tt.S_collection, arr...) //在原表上更新
- //} else { //有结果表
- // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...)
- //}
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- tt.WcLock.Lock()
- if len(tt.WordCount) > 0 {
- savem := []map[string]interface{}{}
- tn := time.Now().Unix()
- for wk, wm := range tt.WordCount {
- for ck, cm := range wm {
- m1 := map[string]interface{}{
- "s_class": wk,
- "s_word": ck,
- "i_count": cm,
- "bz": tn,
- }
- savem = append(savem, m1)
- if len(savem) >= 1000 {
- tools.MgoClass.SaveBulk("wordcount", savem...)
- savem = []map[string]interface{}{}
- }
- }
- }
- if len(savem) > 0 {
- tools.MgoClass.SaveBulk("wordcount", savem...)
- savem = nil
- }
- }
- tt.WcLock.Unlock()
- //更新最后id
- if !budp && oid != tt.LastId {
- setid := map[string]interface{}{
- "$set": map[string]interface{}{
- "s_startid": tt.LastId,
- },
- }
- go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, setid, false, false)
- }
- //更新最后时间
- if !budp {
- nowtime := time.Now().Unix()
- settime := map[string]interface{}{
- "$set": map[string]interface{}{
- "s_starttime": nowtime,
- },
- }
- go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, settime, false, false)
- }
- //InitRule()
- log.Println("运行", tt.S_name, "over")
- })
- return total
- }
- func UdpRunExtract(sid, eid string) { //5cb6c508a5cb26b9b70d6536
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": tools.ExtractStype,
- })
- log.Println("定时任务调下一节点分类:", tools.ExtractPort, string(by))
- addr := &net.UDPAddr{
- IP: net.ParseIP(tools.ExtractAddr),
- Port: tools.ExtractPort,
- }
- //node := &tools.UdpNode{by, addr, time.Now().Unix(), 0}
- //udptaskmap.Store(key, node)
- tools.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- func FindId_back(coll string) (gtid, lteid string) {
- sum := 0 //记录数据总量
- data, _ := tools.MgoClass.Find(coll, `{"isused":false}`, `{"_id":1}`, nil, false, -1, -1)
- length := len(*data)
- set := bson.M{
- "$set": bson.M{
- "isused": true,
- "publishtime": time.Now().Unix(),
- },
- }
- for i, d := range *data {
- id := d["_id"]
- count := util.IntAll(d["count"])
- sum += count
- if gtid == "" {
- gtid = d["gtid"].(string)
- }
- if sum >= 5000 || i == length-1 { //总数大于5000或数据取完,取此id段数据
- lteid := d["lteid"].(string)
- go tools.MgoClass.Update(coll, bson.M{"_id": id}, set, false, false)
- return gtid, lteid
- } else { //总数小于5000,删除已使用数据
- go tools.MgoClass.Update(coll, bson.M{"_id": id}, set, false, false)
- }
- }
- return gtid, lteid
- }
- func FindId(coll string) (gtid, lteid string) {
- data, _ := tools.MgoClass.Find(coll, map[string]interface{}{"dataprocess": 0}, `{"_id":1}`, nil, false, -1, -1)
- for _, d := range *data {
- gtid = d["gtid"].(string)
- lteid = d["lteid"].(string)
- set := map[string]interface{}{
- "$set": map[string]interface{}{
- "dataprocess": 2,
- "updatetime": time.Now().Unix(),
- },
- }
- tools.MgoClass.Update(coll, map[string]interface{}{"_id": d["_id"]}, set, false, false)
- break
- }
- return gtid, lteid
- }
- // NewLoadTestTask 测试任务
- func NewLoadTestTask(_id, s_mgourl, s_mgodb, s_coll, i_poolsize, s_startid, s_endid, s_query string) (bs bool, filename string) {
- defer tools.Catch()
- r, t, _ := NewAnalyTask(_id, s_mgourl, s_mgodb, s_coll, tools.IntAllDef(i_poolsize, 5))
- //log.Println(m)
- if r && t != nil {
- filename = time.Now().Format("150405") + ".csv"
- go t.RRunTest(s_startid, s_endid, s_query, filename)
- bs = true
- }
- return
- }
- // NewLoadTask 加载任务
- func NewLoadTask(_id string, res *tools.JSON) {
- defer tools.Catch()
- //初始化任务信息
- InitTaskData(_id)
- //初始化任务mgo配置信息
- bres, tt, msg := NewAnalyTask(_id, "", "", "", 5)
- tt.I_status = 1
- log.Println(tt.S_mgodb, tt.S_name, tt.I_thread)
- res.Msg = msg
- if bres && tt != nil {
- res.Status = true
- NEWTASKPOOL[_id] = tt //存入当前启动任务
- log.Println("加载", tt.S_name, "完成...", tt.S_query)
- go tt.RRun()
- }
- }
- // 处理id的类型转换
- func IdTypeConversion(q, r string) (string, string, string, string) {
- qtp, rtp := "bson.ObjectId", "bson.ObjectId"
- if strings.Contains(q, "ObjectId") || strings.Contains(q, "objectId") {
- q = q[9 : len(q)-1]
- } else if strings.Contains(q, "StringId") || strings.Contains(q, "stringId") {
- q = q[9 : len(q)-1]
- qtp = "string"
- }
- if strings.Contains(r, "ObjectId") || strings.Contains(r, "objectId") {
- r = r[9 : len(r)-1]
- } else if strings.Contains(r, "StringId") || strings.Contains(r, "stringId") {
- r = r[9 : len(r)-1]
- rtp = "string"
- }
- return q, r, qtp, rtp
- }
- // 获取匹配或不匹配的个数
- func GetNum(rule string) (int, bool) {
- num := 1
- isnum := strings.HasSuffix(rule, ")")
- if !isnum { //是数字
- s := []rune(rule)
- last := string(s[len(s)-1:])
- num = tools.IntAll(last)
- }
- return num, isnum
- }
- // 获取规则
- func GetRule(text string, isnum bool) (matchArr []string) {
- if isnum { //最后一个不是数字
- if strings.HasPrefix(text, "(") && strings.HasSuffix(text, ")") {
- text = text[1 : len(text)-1]
- matchArr = strings.Split(text, "|")
- }
- } else if strings.HasPrefix(text, "(") && !isnum {
- text = text[1 : len(text)-2]
- matchArr = strings.Split(text, "|")
- }
- return matchArr
- }
- func (d *DFA) AddWord(keys ...string) {
- d.AddWordAll(true, keys...)
- }
- func (d *DFA) AddWordAll(haskey bool, keys ...string) {
- if d.Link == nil {
- d.Link = make(map[string]interface{})
- }
- for _, key := range keys {
- nowMap := &d.Link
- for i := 0; i < len(key); i++ {
- kc := key[i : i+1]
- if v, ok := (*nowMap)[kc]; ok {
- nowMap, _ = v.(*map[string]interface{})
- } else {
- newMap := map[string]interface{}{}
- newMap["YN"] = "0"
- (*nowMap)[kc] = &newMap
- nowMap = &newMap
- }
- if i == len(key)-1 {
- (*nowMap)["YN"] = "1"
- if haskey {
- (*nowMap)["K"] = key
- }
- }
- }
- }
- }
- func (d *DFA) CheckSensitiveWord(src string, n int) (bool, []string) {
- res := make([]string, 0)
- tmpMap := make(map[string]int)
- for j := 0; j < len(src); j++ {
- nowMap := &d.Link
- for i := j; i < len(src); i++ {
- word := src[i : i+1]
- nowMap, _ = (*nowMap)[word].(*map[string]interface{})
- if nowMap != nil { // 存在,则判断是否为最后一个
- if "1" == util.ObjToString((*nowMap)["YN"]) {
- s := util.ObjToString((*nowMap)["K"])
- tmpMap[s] = 1
- //nowMap = &d.Link //匹配到之后继续匹配后边的内容
- }
- } else {
- //nowMap = &d.Link
- break
- }
- }
- }
- if len(tmpMap) >= n {
- for k, _ := range tmpMap {
- res = append(res, k)
- }
- return true, res
- }
- return false, []string{}
- }
- func (d *DFA) CheckSensitiveWordTest(src string, n int) (bool, []string) {
- res := make([]string, 0)
- tmpMap := make(map[string]int)
- for j := 0; j < len(src); j++ {
- nowMap := &d.Link
- for i := j; i < len(src); i++ {
- word := src[i : i+1]
- nowMap, _ = (*nowMap)[word].(*map[string]interface{})
- if nowMap != nil { // 存在,则判断是否为最后一个
- if "1" == util.ObjToString((*nowMap)["YN"]) {
- s := util.ObjToString((*nowMap)["K"])
- tmpMap[s] = 1
- //nowMap = &d.Link //匹配到之后继续匹配后边的内容
- }
- } else {
- //nowMap = &d.Link
- break
- }
- }
- }
- for k, _ := range tmpMap {
- res = append(res, k)
- }
- return len(tmpMap) >= n, res
- }
- // UpdateTaskInfo 更新任务信息
- func UpdateTaskInfo(flag bool, tid string) bool {
- query := bson.M{
- "_id": u.StringTOBsonId(tid),
- }
- set := bson.M{
- "$set": bson.M{
- "b_updaterule": flag,
- },
- }
- return tools.MgoClass.Update(tools.COLL_TASK, query, set, false, false)
- }
- // o_projectinfo中数据分类定时任务
- func RunTask() {
- if tools.IsStart { //是否开启定时任务
- tt := InitTimeTask() //初始化任务
- //StartTask(tt)
- //return
- c := cron.New()
- cronstr := "0 */" + fmt.Sprint(tt.I_rate) + " * * * ?" //每TaskTime分钟执行一次
- c.AddFunc(cronstr, func() { StartTask(tt) })
- c.Start()
- }
- }
- // 初始化任务
- func InitTimeTask() *TTask {
- defer util.Catch()
- timeTaskTT := &TTask{}
- InitTaskData(tools.TimeTaskid)
- bres, tt, _ := NewAnalyTask(tools.TimeTaskid, "", "", "", 5)
- if bres && tt != nil {
- timeTaskTT = tt
- logger.Debug("初始化定时任务成功")
- } else {
- logger.Debug("初始化定时任务失败")
- }
- return timeTaskTT
- }
- // StartTask 开始任务
- func StartTask(t *TTask) {
- defer util.Catch()
- logger.Debug("开始执行定时任务")
- query := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": u.StringTOBsonId(tools.IdCollSid),
- },
- "dataprocess": 8,
- }
- order := map[string]interface{}{"_id": -1}
- logger.Debug("query:", query)
- list, _ := tools.MgoClass.Find(t.S_idcoll, query, order, nil, false, -1, -1)
- sid := t.S_startid
- eid := ""
- if list != nil && len(*list) > 0 {
- eid = util.ObjToString((*list)[0]["lteid"])
- if eid <= sid {
- logger.Debug("id err. sid:", sid, " eid:", eid)
- return
- }
- t.S_startid = eid //更新任务中数据的起始id
- tools.IdCollSid = u.BsonIdToSId((*list)[0]["_id"]) //更新id表起始id
- //更新任务表中起始id
- setid := map[string]interface{}{
- "$set": map[string]interface{}{
- "s_startid": t.S_startid,
- },
- }
- go tools.MgoClass.Update("rc_task", `{"_id":"`+t.ID+`"}`, setid, false, false)
- //查拟建数据
- query := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": u.StringTOBsonId(sid),
- "$lte": u.StringTOBsonId(eid),
- },
- "toptype": "拟建",
- }
- sess := t.MgoTask.GetMgoConn()
- defer t.MgoTask.DestoryMongoConn(sess)
- count, _ := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Count()
- logger.Debug("count:", count, " query:", query)
- if count == 0 { //此轮任务没有查到数据
- return
- }
- arr := [][]map[string]interface{}{}
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- pool := make(chan bool, t.I_thread)
- sum := 0
- logger.Debug("select:", t.Task_QueryFieldMap)
- it := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Select(t.Task_QueryFieldMap).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- update := []map[string]interface{}{}
- update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- SMap := &tools.SortMap{}
- SMap = NewClassificationRun(t, tmp)
- if len(SMap.Map) > 0 {
- //一级分类时,符合结果中成交规则时
- if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" {
- if _, ok := tmp["detail"]; ok {
- if u.ChargeDetailResult(util.ObjToString(tmp["detail"])) {
- SMap.Map["toptype"] = "结果"
- resa := ReSub(t, tmp, "结果")
- subtype := resa.Map["subtype"]
- delete(SMap.Map, "subtype")
- SMap.Map["subtype"] = subtype
- }
- }
- }
- update = append(update, map[string]interface{}{"$set": SMap.Map})
- }
- //更新
- lock.Lock()
- if len(update) == 2 { //有更新条件和更新内容时才进行更新操作
- arr = append(arr, update)
- }
- if len(arr) >= NN {
- tmps := arr
- t.MgoTask.UpdateBulk(t.S_coll, tmps...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- if sum%100 == 0 {
- log.Println("current:", sum)
- }
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- lock.Lock()
- if len(arr) > 0 {
- t.MgoTask.UpdateBulk(t.S_coll, arr...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- logger.Debug("定时任务执行完毕 count:", sum)
- UdpRunProjectForecast(sid, eid)
- }
- logger.Debug("Udp通知项目预测执行完毕")
- }
- // udp通知项目预测
- func UdpRunProjectForecast(sid, eid string) {
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- })
- logger.Debug("定时任务通知项目预测:", string(by))
- addr := &net.UDPAddr{
- IP: net.ParseIP(tools.NextNodeAddr),
- Port: tools.NextNodePort,
- }
- tools.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- // SendAi 调用大模型招标分类;map[result:[结果-中标] status:200]
- func SendAi(data map[string]interface{}, url string) (res map[string]interface{}) {
- // 设置 2 秒的超时
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- defer cancel()
- //url := "http://192.168.3.109:16688"
- jsonData, err := json.Marshal(data)
- if err != nil {
- fmt.Println("JSON marshal error:", err)
- return
- }
- req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
- if err != nil {
- fmt.Println("Request error:", err)
- return
- }
- req.Header.Set("Content-Type", "application/json")
- // 将请求与上下文关联
- req = req.WithContext(ctx)
- client := &http.Client{}
- resp, err := client.Do(req)
- if err != nil {
- // 使用 errors.Is 检查错误是否是超时错误
- if errors.Is(err, context.DeadlineExceeded) {
- fmt.Println("Request timed out")
- return
- }
- fmt.Println("Request error:", err)
- return
- }
- defer resp.Body.Close()
- err = json.NewDecoder(resp.Body).Decode(&res)
- if err != nil {
- fmt.Println("Response decoding error:", err)
- return
- }
- return
- }
|