task.go 54 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792
  1. package task
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/csv"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "net/http"
  13. "os"
  14. "qfw/util"
  15. "reflect"
  16. "regexp"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. "tools"
  22. u "util"
  23. "github.com/cron"
  24. "github.com/donnie4w/go-logger/logger"
  25. "go.mongodb.org/mongo-driver/bson"
  26. "go.mongodb.org/mongo-driver/bson/primitive"
  27. )
  28. func init() {
  29. REG, _ = regexp.Compile(`\(.*?\)\d*`)
  30. REG1, _ = regexp.Compile(`\(.*?\)`)
  31. //log.Println(REG.FindAllString("(平台|软件|电子商务|多媒体|通讯设备)1(建设|采购|开发)2", -1))
  32. //加载所有任务
  33. StartMonitor()
  34. if tools.NoAutoRun == 0 {
  35. rtask, b := tools.MgoClass.Find(tools.COLL_TASK, `{"i_status":1}`, nil, `{"_id":1}`, false, 0, 50)
  36. if b && rtask != nil && *rtask != nil {
  37. for _, t := range *rtask {
  38. _id := u.BsonIdToSId(t["_id"])
  39. res := tools.JSON{}
  40. //LoadTask(_id, &res)
  41. NewLoadTask(_id, &res)
  42. }
  43. }
  44. }
  45. }
  46. // var UdpSess *tools.MongodbSim
  47. var REG *regexp.Regexp
  48. var REG1 *regexp.Regexp
  49. var TaskLock = sync.Mutex{}
  50. var NEWTASKPOOL = map[string]*TTask{} //存储当前任务
  51. var TaskMap = map[string]*TTask{}
  52. var HangyeUdps = make(chan map[string]interface{}, 100) //初始化行业分类udp任务池
  53. type Rule struct {
  54. Reg []interface{} //规则
  55. DetailReg []interface{} //detail正则,目前只针对title、channel的二级招标分类中的中标规则
  56. NotReg []interface{} //排除规则
  57. Rid string
  58. Class *Class
  59. Rule_PreRule string
  60. S_name string
  61. S_code string
  62. S_pid string
  63. //Reg []*RuleDFA
  64. }
  65. type Class struct {
  66. Cid string
  67. TTask *TTask
  68. Rule []*Rule
  69. Class_PreRule string
  70. S_name string
  71. S_fields string
  72. S_code string
  73. S_default string
  74. S_pid string
  75. S_savefield string
  76. }
  77. type TTask struct {
  78. Class []*Class //任务中类集合
  79. ID string //任务id
  80. S_name string //任务名称
  81. I_rate int //任务执行频率
  82. S_class string //任务中类id
  83. S_mgourl string //任务mgo addr
  84. S_mgodb string //任务mgo db
  85. S_collection string //查询、存储表
  86. I_poolsize int //任务连接池个数
  87. S_startid string //任务起始id
  88. I_status int //控制任务状态属性
  89. S_attr string //标识属性key值
  90. AttrVal int //标识属性val值
  91. S_coll string //查询、存储表
  92. LastId string //上次定时任务的结束id
  93. Lock sync.Mutex
  94. S_query string //任务查询条件
  95. FlagQuit chan bool //任务结束控制
  96. B_Running bool //任务是否执行
  97. MgoTask *u.MongodbSim //任务查询表mgo
  98. I_multiclass int //是否支持多分类
  99. I_savetype int //1 名称 2 值
  100. I_thread int //线程数
  101. I_wordcount int //词频统计
  102. WordCount map[string]map[string]int //词频统计集合
  103. WcLock sync.Mutex
  104. Task_PreRule string //任务前置过滤
  105. S_table string //结果表
  106. S_querycon string //查询方式 1是id查询 0是时间查询
  107. S_starttime int64 //起始时间
  108. S_timefieldname string //各表所查时间字段名称
  109. S_asfield string //查询表与结果表关联字段
  110. I_fieldUpdate int //分类中的保存字段信息 0:覆盖 1:更新
  111. MulMgo *u.MongodbSim //联合查询的mgo配置信息
  112. MulColl string //联合查询的表名
  113. I_tasktype int //任务类型 0:常规任务 1:附件任务:2
  114. S_idcoll string //正式环境查询数据id段
  115. B_UpdateRule bool //是否更新任务下的规则
  116. S_classField string //分类字段
  117. Task_QueryFieldArr []string //用于合并数据
  118. Task_QueryFieldMap map[string]interface{} //用于仅查询字段
  119. Dbtype string //用于区分连得是哪个库,使用不同的用户密码
  120. }
  121. type RuleDFA struct {
  122. Match []DFA //包含的敏感词
  123. MatchNum []int //包含敏感词匹配个数
  124. MisMatch DFA //不包含的敏感词
  125. MisMatchNum int //不包含敏感词匹配个数
  126. }
  127. type DFA struct {
  128. Link map[string]interface{}
  129. }
  130. // 初始化任务
  131. func InitTaskData(_id string) {
  132. defer tools.Catch()
  133. taskData, _ := tools.MgoClass.FindById(tools.COLL_TASK, _id, nil)
  134. task := &TTask{}
  135. task.MgoTask = &u.MongodbSim{}
  136. s_querycon := "1"
  137. s_table := ""
  138. s_timefieldname := ""
  139. s_starttime := int64(0)
  140. s_asfield := ""
  141. if b_updaterule, ok := (*taskData)["b_updaterule"].(bool); ok {
  142. task.B_UpdateRule = b_updaterule //更新任务下的规则
  143. }
  144. if (*taskData)["s_querycon"] != nil { //
  145. s_querycon = util.ObjToString((*taskData)["s_querycon"])
  146. }
  147. if (*taskData)["s_starttime"] != nil {
  148. s_starttime = (*taskData)["s_starttime"].(int64)
  149. }
  150. if (*taskData)["s_table"] != nil {
  151. s_table = util.ObjToString((*taskData)["s_table"])
  152. }
  153. if (*taskData)["s_timefieldname"] != nil {
  154. s_timefieldname = util.ObjToString((*taskData)["s_timefieldname"])
  155. }
  156. if (*taskData)["s_asfield"] != nil {
  157. s_asfield = util.ObjToString((*taskData)["s_asfield"])
  158. }
  159. //task.Class
  160. task.ID = u.BsonIdToSId((*taskData)["_id"])
  161. task.S_name = util.ObjToString((*taskData)["s_name"])
  162. task.S_class = util.ObjToString((*taskData)["s_class"])
  163. task.S_mgourl = util.ObjToString((*taskData)["s_mgourl"])
  164. task.S_mgodb = util.ObjToString((*taskData)["s_mgodb"])
  165. task.S_coll = util.ObjToString((*taskData)["s_coll"])
  166. task.I_poolsize = util.IntAll((*taskData)["i_poolsize"])
  167. task.Task_PreRule = util.ObjToString((*taskData)["s_task_prerule"])
  168. task.I_multiclass = util.IntAllDef((*taskData)["i_multiclass"], 0)
  169. task.I_wordcount = util.IntAllDef((*taskData)["i_wordcount"], 0)
  170. task.I_savetype = util.IntAllDef((*taskData)["i_savetype"], 1)
  171. task.I_thread = util.IntAllDef((*taskData)["i_thread"], 1)
  172. task.S_query = util.ObjToString((*taskData)["s_query"])
  173. task.S_startid = util.ObjToString((*taskData)["s_startid"])
  174. task.S_attr = util.ObjToString((*taskData)["s_attr"])
  175. task.I_rate = util.IntAllDef((*taskData)["i_rate"], 10)
  176. task.I_tasktype = util.IntAllDef((*taskData)["i_tasktype"], 0)
  177. task.I_status = util.IntAll((*taskData)["i_status"])
  178. task.S_idcoll = util.ObjToString((*taskData)["s_idcoll"])
  179. task.S_querycon = s_querycon
  180. task.S_starttime = s_starttime
  181. task.S_table = s_table
  182. task.S_timefieldname = s_timefieldname
  183. task.S_asfield = s_asfield
  184. task.I_fieldUpdate = util.IntAllDef((*taskData)["i_fieldUpdate"], 0)
  185. task.S_classField = util.ObjToString((*taskData)["s_classfield"])
  186. task.Dbtype = util.ObjToString((*taskData)["s_dbtype"])
  187. flagAttrVal := 1
  188. attrs := strings.Split(task.S_attr, "__")
  189. if len(attrs) == 2 {
  190. flagAttrVal = util.IntAll(attrs[1])
  191. task.S_attr = attrs[0]
  192. }
  193. task.AttrVal = flagAttrVal
  194. //联表查询初始化mgo,线上只有行业分类用到;跑历史招标、行业分类的时候也用到两边查询
  195. for _, v := range tools.Config {
  196. if m, ok := v.(map[string]interface{}); ok {
  197. if m["taskid"] == task.ID {
  198. if m["mgoaddr"] != nil && m["db"] != nil && m["coll"] != nil {
  199. dbtype := util.ObjToString(m["dbtype"])
  200. addr, _ := m["mgoaddr"].(string)
  201. db, _ := m["db"].(string)
  202. coll, _ := m["coll"].(string)
  203. task.MulMgo = &u.MongodbSim{}
  204. task.MulMgo.MongodbAddr = addr
  205. task.MulMgo.Size = 3
  206. task.MulMgo.DbName = db
  207. if dbtype != "" {
  208. task.MulMgo.UserName = tools.DbInfo[dbtype][0]
  209. task.MulMgo.Password = tools.DbInfo[dbtype][1]
  210. }
  211. task.MulMgo.InitPool()
  212. task.MulColl = coll
  213. }
  214. }
  215. }
  216. }
  217. //初始化查询字段信息
  218. if task.Task_QueryFieldMap == nil {
  219. task.Task_QueryFieldMap = make(map[string]interface{})
  220. }
  221. //Task_QueryFieldMap加入关联字段
  222. for _, v := range strings.Split(s_asfield, "==") {
  223. if v != "" {
  224. task.Task_QueryFieldMap[v] = 1
  225. }
  226. }
  227. //Task_QueryFieldMap加入分类字段
  228. //Task_QueryFieldArr加入分类字段
  229. for _, f := range strings.Split(task.S_classField, ",") {
  230. task.Task_QueryFieldMap[f] = 1
  231. task.Task_QueryFieldArr = append(task.Task_QueryFieldArr, f)
  232. }
  233. //初始化任务下所有的分类和规则
  234. InitClassAndRuleData(_id, task)
  235. }
  236. // InitClassAndRuleData 初始化任务下所有的分类和规则
  237. func InitClassAndRuleData(_id string, task *TTask) {
  238. defer tools.Catch()
  239. classIdStr := task.S_class
  240. if classIdStr != "" {
  241. classIdArr := strings.Split(classIdStr, ",")
  242. classArr := make([]*Class, 0)
  243. for _, classid := range classIdArr {
  244. classData, _ := tools.MgoClass.FindById(tools.COLL_CLASS, classid, nil)
  245. if classData != nil {
  246. //初始化Class
  247. class := &Class{
  248. //Rule: CidRuleMap[classid],
  249. Cid: classid,
  250. Class_PreRule: util.ObjToString((*classData)["s_class_prerule"]),
  251. S_name: util.ObjToString((*classData)["s_name"]),
  252. S_fields: util.ObjToString((*classData)["s_fields"]),
  253. S_code: util.ObjToString((*classData)["s_code"]),
  254. S_default: util.ObjToString((*classData)["s_default"]),
  255. S_pid: util.ObjToString((*classData)["s_pid"]),
  256. S_savefield: util.ObjToString((*classData)["s_savefield"]),
  257. }
  258. //根据classid查找对应的rule
  259. ruleList, _ := tools.MgoClass.Find(tools.COLL_RULE, `{"s_classid":"`+classid+`"}`, `{"i_order":1}`, nil, false, -1, -1)
  260. if ruleList != nil && len(*ruleList) > 0 {
  261. for _, v := range *ruleList {
  262. _id := u.BsonIdToSId(v["_id"])
  263. //rule
  264. s_rule, _ := v["s_rule"].(primitive.A)
  265. i_rule := DealRules(s_rule)
  266. //detailrule
  267. s_detailrule, _ := v["s_detailrule"].(primitive.A)
  268. i_detailrule := DealRules(s_detailrule)
  269. //notrule
  270. s_notrule, _ := v["s_notrule"].(primitive.A)
  271. i_notrule := DealRules(s_notrule)
  272. rule := &Rule{
  273. Rid: _id,
  274. Reg: i_rule,
  275. DetailReg: i_detailrule,
  276. NotReg: i_notrule,
  277. Rule_PreRule: util.ObjToString(v["s_rule_prerule"]),
  278. S_name: util.ObjToString(v["s_name"]),
  279. S_code: util.ObjToString(v["s_code"]),
  280. S_pid: util.ObjToString(v["s_pid"]),
  281. }
  282. class.Rule = append(class.Rule, rule)
  283. }
  284. }
  285. classArr = append(classArr, class)
  286. }
  287. }
  288. task.Class = classArr
  289. TaskMap[_id] = task
  290. }
  291. }
  292. func DealRules(rules []interface{}) (i_rule []interface{}) {
  293. for _, r := range tools.ObjArrToStringArr(rules) {
  294. if strings.HasPrefix(r, "'") && strings.HasSuffix(r, "'") { //正则
  295. rs := []rune(r)
  296. ru := string(rs[1 : len(rs)-1])
  297. rureg, err := regexp.Compile(ru)
  298. if err != nil {
  299. log.Println("error---rule:", r)
  300. continue
  301. }
  302. i_rule = append(i_rule, []interface{}{rureg}...)
  303. } else { //规则,加入到敏感词匹配
  304. matchnum := 0
  305. mismatchnum := 0
  306. isnum1 := false
  307. isnum2 := false
  308. numArr := make([]int, 0)
  309. ruleDFA := &RuleDFA{
  310. Match: []DFA{},
  311. MisMatch: DFA{},
  312. }
  313. tmpArr := strings.Split(r, "^")
  314. matchTmp := tmpArr[0]
  315. ruleTextArr := REG.FindAllString(matchTmp, -1)
  316. for _, match := range ruleTextArr {
  317. matchnum, isnum1 = GetNum(match)
  318. numArr = append(numArr, matchnum)
  319. matchArr := GetRule(match, isnum1)
  320. tmpDFA := DFA{
  321. Link: make(map[string]interface{}),
  322. }
  323. tmpDFA.AddWord(matchArr...)
  324. ruleDFA.Match = append(ruleDFA.Match, tmpDFA)
  325. }
  326. if len(tmpArr) == 2 {
  327. mismatch := tmpArr[1]
  328. mismatchnum, isnum2 = GetNum(mismatch)
  329. mismatchArr := GetRule(mismatch, isnum2)
  330. ruleDFA.MisMatch.AddWord(mismatchArr...)
  331. }
  332. ruleDFA.MatchNum = numArr
  333. ruleDFA.MisMatchNum = mismatchnum
  334. i_rule = append(i_rule, []interface{}{ruleDFA}...)
  335. }
  336. }
  337. return
  338. }
  339. // 更新任务状态
  340. func (tt *TTask) Sstatus() int {
  341. if tt.I_status == 0 && tt.B_Running {
  342. return 1
  343. } else if tt.I_status == 0 && !tt.B_Running {
  344. return 0
  345. } else if tt.I_status == 1 && tt.B_Running {
  346. return 2
  347. } else if tt.I_status == 1 && !tt.B_Running {
  348. return 3
  349. }
  350. return -1
  351. }
  352. // 停止任务
  353. func (tt *TTask) SStop() bool {
  354. tt.Lock.Lock()
  355. defer tt.Lock.Unlock()
  356. if tt.I_status == 1 {
  357. tt.I_status = 0
  358. tt.FlagQuit <- true
  359. log.Println("开始停止...")
  360. }
  361. return true
  362. }
  363. var NN = 400
  364. // 存放测试的数据
  365. var TEST = &TestList{
  366. Count: map[string][]int{},
  367. }
  368. type TestList struct {
  369. Lock sync.Mutex
  370. Count map[string][]int
  371. }
  372. func (tl *TestList) Get(id string) []int {
  373. tl.Lock.Lock()
  374. defer tl.Lock.Unlock()
  375. return tl.Count[id]
  376. }
  377. func (tl *TestList) Put(id string, val ...int) {
  378. defer tools.Catch()
  379. tl.Lock.Lock()
  380. defer tl.Lock.Unlock()
  381. if len(val) == 3 {
  382. tl.Count[id] = val
  383. } else {
  384. tval := tl.Count[id]
  385. tl.Count[id][2] = val[0]
  386. if tval[1] <= val[0] {
  387. tl.Count[id][0] = 1
  388. }
  389. }
  390. }
  391. func (tl *TestList) Del(id string) {
  392. tl.Lock.Lock()
  393. defer tl.Lock.Unlock()
  394. delete(tl.Count, id)
  395. os.Remove("csv/" + id)
  396. }
  397. // 任务测试
  398. func (tt *TTask) RRunTest(s_startid, s_endid, s_query, filename string) {
  399. defer tools.Catch()
  400. defer func() {
  401. go func() {
  402. time.AfterFunc(10*time.Minute, func() {
  403. TEST.Del(filename)
  404. })
  405. }()
  406. }()
  407. //开始识别
  408. sess := tt.MgoTask.GetMgoConn()
  409. defer tt.MgoTask.DestoryMongoConn(sess)
  410. q := map[string]interface{}{}
  411. if s_query != "" {
  412. json.Unmarshal([]byte(strings.Replace(s_query, "'", "\"", -1)), &q)
  413. }
  414. if s_startid != "" {
  415. q["_id"] = map[string]interface{}{
  416. "$gte": u.StringTOBsonId(s_startid),
  417. }
  418. }
  419. if s_endid != "" {
  420. if q["_id"] != nil {
  421. q["_id"].(map[string]interface{})["$lte"] = u.StringTOBsonId(s_endid)
  422. } else {
  423. q["_id"] = map[string]interface{}{
  424. "$lte": u.StringTOBsonId(s_endid),
  425. }
  426. }
  427. }
  428. count := tt.MgoTask.Count(tt.S_coll, &q)
  429. if count == 0 {
  430. return
  431. }
  432. TEST.Put(filename, 0, count, 0)
  433. query := sess.DB(tt.S_mgodb).C(tt.S_coll).Find(&q).Iter()
  434. arr := [][]string{}
  435. i := 0
  436. for tmp := make(map[string]interface{}); query.Next(&tmp); i = i + 1 {
  437. if i > 2000 { //数据跑至2000停止
  438. break
  439. }
  440. //按顺序识别
  441. tid := tmp["_id"]
  442. res := make([]string, 3)
  443. res[0] = u.BsonIdToSId(tid)
  444. res[1] = util.ObjToString(tmp["title"])
  445. if util.IntAll(tmp["infoformat"]) == 2 { //此处增加特例
  446. res = append(res, "拟建", "拟建")
  447. } else if util.IntAll(tmp["infoformat"]) == 3 {
  448. res = append(res, "产权", "产权")
  449. } else {
  450. SMap := NewClassificationRun(tt, tmp)
  451. for _, k := range SMap.Keys {
  452. res = append(res, util.ObjToString((SMap.Map[k].([]string))[0]))
  453. }
  454. }
  455. arr = append(arr, res)
  456. TEST.Put(filename, i+1)
  457. tmp = make(map[string]interface{})
  458. }
  459. if len(arr) > 0 {
  460. if !Exist("csv") {
  461. os.Mkdir("csv", 777)
  462. }
  463. f, _ := os.Create("csv/" + filename)
  464. w := csv.NewWriter(f)
  465. for _, str := range arr {
  466. w.Write(str)
  467. }
  468. w.Flush()
  469. f.Close()
  470. }
  471. log.Println("运行RUNTEST-OVER")
  472. }
  473. func Exist(filename string) bool {
  474. _, err := os.Stat(filename)
  475. return err == nil || os.IsExist(err)
  476. }
  477. func (tt *TTask) RRun() {
  478. first := make(chan bool, 1)
  479. if tt.I_status == 1 {
  480. first <- true
  481. }
  482. OVER:
  483. for tt.I_status == 1 {
  484. tt.B_Running = false
  485. select {
  486. case <-tt.FlagQuit: //结果任务控制
  487. log.Println("退出,RUN", tt.S_name)
  488. tt = nil
  489. break OVER
  490. case <-first: //第一次执行控制
  491. if tools.ControlTaskRun { //任务流程控制,现有模式用不到,默认false
  492. tools.AllTaskFinish = false
  493. }
  494. log.Println("第一次执行任务:", tt.S_name)
  495. newtaskrun(tt)
  496. case <-time.Tick(time.Duration(tt.I_rate) * time.Second): //任务定时控制
  497. //执行定时任务前,检查任务是否更新了rule
  498. if tt.B_UpdateRule {
  499. InitClassAndRuleData(tt.ID, tt) //重新加载任务的rule
  500. tt.B_UpdateRule = false
  501. UpdateTaskInfo(false, tt.ID)
  502. }
  503. //上一轮任务执行完毕再走下一轮(上一轮任务执行完毕的标志是业主分类执行完)
  504. //util.Debug("ControlTaskRun---", tools.ControlTaskRun, "AllTaskFinish---", tools.AllTaskFinish)
  505. if !tools.ControlTaskRun || tt.S_querycon == "0" { //线下环境不控制定时任务;或者线上通过时间定时执行
  506. newtaskrun(tt)
  507. } else if tools.ControlTaskRun && tools.AllTaskFinish { //线上控制
  508. tools.AllTaskFinish = false
  509. newtaskrun(tt)
  510. } else {
  511. log.Println("上轮任务暂未完成")
  512. }
  513. }
  514. }
  515. }
  516. func newtaskrun(tt *TTask) {
  517. NewTaskRunAll(tt, false, nil)
  518. }
  519. // NewTaskRunAll 常规任务和udp非合并数据处理方法
  520. func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
  521. total := 0
  522. tools.Try(func() { //不加这一层defer运行不了!!!
  523. timespan := false //时间间隔(控制数据条数打印)
  524. tt.B_Running = true
  525. defer func() {
  526. //业主分类执行完修改AllTaskFinish状态;控制流程的任务id(整个分类流程业主分类结尾,以此为标记)
  527. if tt.ID == tools.ControlLastTaskId {
  528. tools.AllTaskFinish = true
  529. }
  530. tt.B_Running = false
  531. }()
  532. //开始识别
  533. pool := make(chan bool, tt.I_thread)
  534. wg := &sync.WaitGroup{}
  535. lock := &sync.Mutex{}
  536. q := make(map[string]interface{})
  537. nextNodeSid, nextNodeEid := "", ""
  538. oid := tt.LastId //上次定时任务的结束id
  539. s_table := tt.S_table //结果表
  540. s_asfield := tt.S_asfield //关联字段
  541. asfields := strings.Split(s_asfield, "==")
  542. qfield := "" //查询表字段
  543. rfield := "" //结果表字段
  544. qtp := "" //查询字段的类型
  545. rtp := "" //结果字段的类型
  546. rtype := "" //结果字段的真实类型
  547. if s_table != "" { //有结果表保存到结果表(更新,插入)
  548. tt.S_collection = s_table
  549. }
  550. if len(asfields) == 2 {
  551. qfield = asfields[0] //查询表字段
  552. rfield = asfields[1] //结果表字段
  553. //id处理 object string互转
  554. qfield, rfield, qtp, rtp = IdTypeConversion(qfield, rfield)
  555. queryExis := map[string]interface{}{
  556. rfield: map[string]interface{}{
  557. "$exists": true,
  558. },
  559. }
  560. onedata, _ := tt.MgoTask.FindOne(tt.S_collection, queryExis)
  561. rtype = reflect.TypeOf((*onedata)[rfield]).String() //结果表字段真实类型
  562. }
  563. //有结果表有关联字段,在结果表上根据关联字段更新;有结果表没有关联字段,在结果表根据_id段更新
  564. //没有结果表在查询表上更新
  565. //log.Println("lastid:", tt.LastId, "查询方式:", tt.S_querycon, "时间:", tt.S_starttime, "条件:", tt.S_query, "table:", tt.S_table, "s_timefieldname:", tt.S_timefieldname)
  566. sort := ""
  567. if !budp { //非udp查询条件
  568. sort = "_id"
  569. if tt.S_query != "" { //有查询条件
  570. json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
  571. }
  572. idcoll := tt.S_idcoll
  573. if idcoll != "" { //idcoll中查询id区间,bidding_processing_ids
  574. nextNodeSid, nextNodeEid = FindId(idcoll) //查询id段
  575. if nextNodeSid != "" && nextNodeEid != "" && nextNodeSid <= nextNodeEid {
  576. q["_id"] = bson.M{
  577. "$gt": u.StringTOBsonId(nextNodeSid),
  578. "$lte": u.StringTOBsonId(nextNodeEid),
  579. }
  580. } else {
  581. log.Println("定时任务", tt.S_name, "查询", tt.S_idcoll, "时间段出错", nextNodeSid, nextNodeEid)
  582. tools.AllTaskFinish = true //为查询到数据视为此轮任务完成
  583. return
  584. }
  585. stime, _ := strconv.ParseInt(nextNodeSid[:8], 16, 64) //取id前8位转成时间戳
  586. etime, _ := strconv.ParseInt(nextNodeEid[:8], 16, 64) //
  587. if etime-stime < 1800 { //时间跨度小于半小时
  588. timespan = true
  589. }
  590. } else {
  591. if q["_id"] != nil {
  592. if _id, ok := q["_id"].(string); ok {
  593. q["_id"] = u.StringTOBsonId(_id)
  594. } else if _ids, ok := q["_id"].(map[string]interface{}); ok {
  595. for k, v := range _ids {
  596. if id, bk := v.(string); bk {
  597. _ids[k] = u.StringTOBsonId(id)
  598. }
  599. }
  600. }
  601. }
  602. if tt.S_querycon == "1" { //id查询
  603. if tt.S_query != "" {
  604. //页面上配置了查询条件,直接使用,不再单独查询上次任务结束ID
  605. if tt.LastId != "" && q["_id"] == nil {
  606. q["_id"] = map[string]interface{}{
  607. "$gt": u.StringTOBsonId(tt.LastId),
  608. }
  609. }
  610. } else {
  611. //临时修改查询id区间段
  612. comeintime := time.Now().Unix() - 5*60
  613. query := map[string]interface{}{
  614. "comeintime": map[string]interface{}{
  615. "$gt": comeintime,
  616. },
  617. }
  618. qId := tt.MgoTask.GetMgoConn()
  619. defer tt.MgoTask.DestoryMongoConn(qId)
  620. tmpData := qId.DB(tt.S_mgodb).C(tt.S_coll).Find(&query).Limit(1).Sort("-_id").Iter()
  621. eId := ""
  622. for tmp := make(map[string]interface{}); tmpData.Next(tmp); {
  623. eId = u.BsonIdToSId(tmp["_id"])
  624. }
  625. if tt.LastId != "" && q["_id"] == nil {
  626. sid := tt.LastId
  627. q["_id"] = map[string]interface{}{
  628. "$gt": u.StringTOBsonId(sid),
  629. }
  630. if eId != "" {
  631. q["_id"] = map[string]interface{}{
  632. "$gt": u.StringTOBsonId(sid),
  633. "$lte": u.StringTOBsonId(eId),
  634. }
  635. }
  636. }
  637. //按id查询,为了保证有新数据入库,每次休息2分钟
  638. //time.Sleep(time.Second * 60)
  639. //测试环境q的赋值执行下述代码
  640. //if tt.LastId != "" && q["_id"] == nil {
  641. // q["_id"] = map[string]interface{}{
  642. // "$gt": u.StringTOBsonId(tt.LastId),
  643. // }
  644. //}
  645. }
  646. } else { //时间查询
  647. name := tt.S_timefieldname
  648. q[name] = map[string]interface{}{
  649. "$gt": tt.S_starttime,
  650. }
  651. }
  652. }
  653. } else { //udp查询条件
  654. sort = "-_id"
  655. if tt.S_query != "" { //有查询条件
  656. json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
  657. }
  658. tmpq := mapInfo["q"].(map[string]interface{})
  659. for k, v := range tmpq {
  660. q[k] = v
  661. }
  662. sid := mapInfo["gtid"].(string)
  663. eid := mapInfo["lteid"].(string)
  664. stime, _ := strconv.ParseInt(sid[:8], 16, 64)
  665. etime, _ := strconv.ParseInt(eid[:8], 16, 64)
  666. if etime-stime < 1800 { //时间跨度小于半小时
  667. timespan = true
  668. }
  669. }
  670. //task
  671. tasksess := tt.MgoTask.GetMgoConn()
  672. defer tt.MgoTask.DestoryMongoConn(tasksess)
  673. //通过ID 查询数据时,才打印日志
  674. if tt.S_querycon == "1" {
  675. log.Println("运行", tt.S_name, "start")
  676. log.Println("线程数:", tt.I_thread, "查询语句", q)
  677. log.Println("查询---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
  678. log.Println("select:", tt.Task_QueryFieldMap, tt.Task_QueryFieldArr)
  679. }
  680. extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(tt.Task_QueryFieldMap).Sort(sort).Iter()
  681. arr := [][]map[string]interface{}{}
  682. if tt.I_wordcount == 1 {
  683. tt.WordCount = map[string]map[string]int{}
  684. }
  685. sum := 0
  686. for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ {
  687. tid := tmp["_id"]
  688. if !timespan && sum%2000 == 0 {
  689. log.Println("current:", sum, tt.S_name)
  690. }
  691. pool <- true
  692. wg.Add(1)
  693. go func(tmp map[string]interface{}) {
  694. defer func() {
  695. <-pool
  696. wg.Done()
  697. }()
  698. //按顺序识别
  699. tid := tmp["_id"]
  700. update := []map[string]interface{}{}
  701. //如果有关联字段 根据关联字段更新或者保存
  702. if len(asfields) == 2 {
  703. //log.Println("qfield====", qfield)
  704. field := tmp[qfield]
  705. if field != nil {
  706. //log.Println("field===", field, qtp, rtp)
  707. if qtp == "bson.ObjectId" && rtp == "bson.ObjectId" { //俩字段类型一致
  708. } else {
  709. //log.Println(field, " 查询字段", qfield, "查询类型", qtp, "结果字段", rfield, " 结果类型", rtp, "结果真实类型", rtype)
  710. if rtype == rtp { //转换后与真实类型不同,填写时类型错误
  711. //将查询字段类型转换为对应的结果字段类型
  712. if qtp == "bson.ObjectId" && rtp == "string" {
  713. field = u.BsonIdToSId(tmp[qfield])
  714. } else if qtp == "string" && rtp == "bson.ObjectId" {
  715. field = u.StringTOBsonId(tmp[qfield].(string))
  716. }
  717. }
  718. }
  719. update = append(update, map[string]interface{}{ //根据关联字段更新
  720. rfield: field,
  721. })
  722. }
  723. } else {
  724. update = append(update, map[string]interface{}{ //更新的条件 根据id更新
  725. "_id": tid,
  726. })
  727. }
  728. res := map[string]interface{}{}
  729. ksmap := make(map[string]map[string][]string)
  730. if util.IntAll(tmp["infoformat"]) == 2 { //此处增加特例
  731. res["toptype"] = "拟建"
  732. res["subtype"] = "拟建"
  733. } else if util.IntAll(tmp["infoformat"]) == 3 {
  734. res["toptype"] = "产权"
  735. res["subtype"] = "产权"
  736. } else {
  737. SMap := &tools.SortMap{}
  738. if tt.I_tasktype == 2 { //标签任务
  739. SMap = TagClassificationRun(tt, tmp)
  740. } else if tt.I_tasktype == 1 { //附件任务
  741. SMap = FileClassificationRun(tt, tmp)
  742. //res["projectinfo"] = tmp["projectinfo"]
  743. } else { //常规任务
  744. SMap = NewClassificationRun(tt, tmp)
  745. //1.针对招标分类的特殊处理
  746. //if tt.ID == "57982b4436b82b073c000001" && tt.S_name == "招标分类" {
  747. if strings.Contains(tt.S_name, "招标分类") || tt.S_name == "招标分类" {
  748. //1.一级分类时,符合结果中成交规则时
  749. //todo 如果没有打上分类,调用ai 模型分类
  750. if _, ok := SMap.Map["toptype"]; !ok {
  751. if util.ObjToString(tools.Config["aiurl"]) != "" {
  752. data := map[string]interface{}{
  753. "title": tmp["title"],
  754. "detail": tmp["detail"],
  755. }
  756. reqData := map[string]interface{}{
  757. "texts": []interface{}{data},
  758. }
  759. rai := SendAi(reqData, util.ObjToString(tools.Config["aiurl"]))
  760. if len(rai) > 0 {
  761. resa := rai["result"]
  762. if dataa, ok := resa.([]interface{}); ok {
  763. da := dataa[0]
  764. if len(util.ObjToString(da)) > 0 {
  765. cs := strings.Split(util.ObjToString(da), "-")
  766. SMap.Map["toptype"] = cs[0]
  767. SMap.Map["subtype"] = cs[1]
  768. }
  769. }
  770. }
  771. }
  772. }
  773. if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" {
  774. if _, ok := tmp["detail"]; ok {
  775. if u.ChargeDetailResult(util.ObjToString(tmp["detail"])) {
  776. SMap.Map["toptype"] = "结果"
  777. resa := ReSub(tt, tmp, "结果")
  778. subtype := resa.Map["subtype"]
  779. delete(SMap.Map, "subtype")
  780. SMap.Map["subtype"] = subtype
  781. }
  782. }
  783. }
  784. //2.一级分类是预告,但是标题含有招标计划,同时含有 预公告|预公示,变为 采购意向
  785. if SMap.Map["toptype"] == "预告" {
  786. if u.DealYuce(tmp["title"].(string)) {
  787. SMap.Map["toptype"] = "采购意向"
  788. SMap.Map["subtype"] = "采购意向"
  789. }
  790. }
  791. //3.针对 项目登记 相关数据处理,符合条件的归类为‘采购意向’
  792. if u.IsPurchasingIntent(tmp) {
  793. SMap.Map["toptype"] = "采购意向"
  794. SMap.Map["subtype"] = "采购意向"
  795. }
  796. //一级分类成功,但是二级没有分类成功,并且原值里有subtype
  797. if _, ok := SMap.Map["toptype"]; ok {
  798. if _, ok2 := SMap.Map["subtype"]; !ok2 {
  799. //没有二级分类,直接设置为 其它
  800. SMap.Map["subtype"] = "其它"
  801. }
  802. } else {
  803. SMap.Map["toptype"] = "其它"
  804. SMap.Map["subtype"] = "其它"
  805. }
  806. }
  807. }
  808. //2.针对用户行业分类,单独处理数据
  809. if mapInfo["stype"] == "yonghuhangye" || strings.TrimSpace(tt.S_name) == "用户行业分类" {
  810. subs := SMap.Map["subscope_dy"]
  811. delete(SMap.Map, "topscope_dy")
  812. var tops []string
  813. if subscopes, ok := subs.([]string); ok {
  814. for _, sub := range subscopes {
  815. top := strings.Split(sub, "_")[0]
  816. tops = append(tops, top)
  817. }
  818. SMap.Map["topscope_dy"] = u.RemoveDuplicateString(tops)
  819. }
  820. }
  821. //追加时处理,//更新字段 I_fieldUpdate 0:覆盖 1:追加
  822. if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 {
  823. //封装追加信息
  824. if len(SMap.Keys) > 0 {
  825. for _, k := range SMap.Keys {
  826. ksarr := make([]string, 0)
  827. if k != "toptype" && k != "subtype" {
  828. ks, ok := SMap.Map[k].(string)
  829. if ok {
  830. ksarr = append(ksarr, ks)
  831. } else {
  832. ksarr = append(ksarr, SMap.Map[k].([]string)...)
  833. }
  834. }
  835. ksmap[k] = map[string][]string{
  836. "$each": ksarr,
  837. }
  838. }
  839. }
  840. } else { //非多分类
  841. res = SMap.Map
  842. if tt.I_tasktype == 1 { //附件任务
  843. if tmp["projectinfo"] != nil {
  844. res["projectinfo"] = tmp["projectinfo"]
  845. }
  846. }
  847. }
  848. }
  849. //if len(res) > 0 || len(ksmap) > 0 {
  850. IS.NewAdd(tt, res)
  851. if tt.S_attr != "" {
  852. //res[tt.S_attr] = 1
  853. res[tt.S_attr] = tt.AttrVal
  854. }
  855. // 添加分类时间
  856. res["classification_time"] = time.Now().Unix()
  857. if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //I_fieldUpdate 0:覆盖 1:追加
  858. if len(ksmap) > 0 && len(res) > 0 {
  859. update = append(update, map[string]interface{}{
  860. "$set": res,
  861. "$addToSet": ksmap,
  862. })
  863. } else if len(ksmap) > 0 && len(res) == 0 {
  864. update = append(update, map[string]interface{}{
  865. "$addToSet": ksmap,
  866. })
  867. } else if len(ksmap) == 0 && len(res) > 0 {
  868. update = append(update, map[string]interface{}{
  869. "$set": res,
  870. })
  871. }
  872. } else { //非多分类或者多分类的覆盖
  873. //log.Println("更新==", res)
  874. if len(res) > 0 {
  875. update = append(update, map[string]interface{}{
  876. "$set": res,
  877. })
  878. }
  879. }
  880. //更新
  881. lock.Lock()
  882. if len(update) == 2 { //有更新条件和更新内容时才进行更新操作
  883. arr = append(arr, update)
  884. }
  885. if len(arr) >= NN {
  886. //if s_table == "" {
  887. tt.MgoTask.UpdateBulk(tt.S_collection, arr...)
  888. //} else {
  889. // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...)
  890. //}
  891. arr = [][]map[string]interface{}{}
  892. }
  893. lock.Unlock()
  894. //}
  895. }(tmp)
  896. ttid := u.BsonIdToSId(tid)
  897. if ttid > tt.LastId && !budp {
  898. tt.LastId = ttid
  899. }
  900. tmp = make(map[string]interface{})
  901. }
  902. total = sum
  903. //通过ID 查询分类数据才打印日志
  904. if tt.S_querycon == "1" {
  905. log.Println("总数:————", sum)
  906. if timespan {
  907. log.Println("current:————", sum)
  908. }
  909. }
  910. wg.Wait()
  911. lock.Lock()
  912. if len(arr) > 0 {
  913. //if s_table == "" { //没有结果表
  914. tt.MgoTask.UpdateBulk(tt.S_collection, arr...) //在原表上更新
  915. //} else { //有结果表
  916. // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...)
  917. //}
  918. arr = [][]map[string]interface{}{}
  919. }
  920. lock.Unlock()
  921. tt.WcLock.Lock()
  922. if len(tt.WordCount) > 0 {
  923. savem := []map[string]interface{}{}
  924. tn := time.Now().Unix()
  925. for wk, wm := range tt.WordCount {
  926. for ck, cm := range wm {
  927. m1 := map[string]interface{}{
  928. "s_class": wk,
  929. "s_word": ck,
  930. "i_count": cm,
  931. "bz": tn,
  932. }
  933. savem = append(savem, m1)
  934. if len(savem) >= 1000 {
  935. tools.MgoClass.SaveBulk("wordcount", savem...)
  936. savem = []map[string]interface{}{}
  937. }
  938. }
  939. }
  940. if len(savem) > 0 {
  941. tools.MgoClass.SaveBulk("wordcount", savem...)
  942. savem = nil
  943. }
  944. }
  945. tt.WcLock.Unlock()
  946. //更新最后id
  947. if !budp && oid != tt.LastId {
  948. setid := map[string]interface{}{
  949. "$set": map[string]interface{}{
  950. "s_startid": tt.LastId,
  951. },
  952. }
  953. go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, setid, false, false)
  954. }
  955. //更新最后时间
  956. if !budp {
  957. nowtime := time.Now().Unix()
  958. settime := map[string]interface{}{
  959. "$set": map[string]interface{}{
  960. "s_starttime": nowtime,
  961. },
  962. }
  963. go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, settime, false, false)
  964. }
  965. //InitRule()
  966. if tt.S_querycon == "1" {
  967. log.Println("运行", tt.S_name, "over", oid, " endid:", tt.LastId)
  968. }
  969. //定时任务完成发送udp信号调抽取
  970. if tools.Extract["preNodeId"] == tt.ID { //常规招标定时任务udp调用抽取
  971. if tt.S_idcoll == "" {
  972. nextNodeSid = oid
  973. nextNodeEid = tt.LastId
  974. }
  975. UdpRunExtract(nextNodeSid, nextNodeEid)
  976. }
  977. })
  978. return total
  979. }
  980. // udp合并数据处理的方法
  981. func UdpTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}, stype string) int {
  982. total := 0
  983. tools.Try(func() { //不加这一层defer运行不了!!!
  984. timespan := false
  985. tt.B_Running = true
  986. defer func() {
  987. tt.B_Running = false
  988. }()
  989. //开始识别
  990. pool := make(chan bool, tt.I_thread)
  991. wg := &sync.WaitGroup{}
  992. lock := &sync.Mutex{}
  993. q := map[string]interface{}{}
  994. s_table := tt.S_table //结果表
  995. s_asfield := tt.S_asfield //关联字段
  996. asfields := strings.Split(s_asfield, "==")
  997. qfield := "" //查询表字段
  998. rfield := "" //结果表字段
  999. qtp := "" //查询字段的类型
  1000. rtp := "" //结果字段的类型
  1001. rtype := "" //结果字段的真实类型
  1002. if s_table != "" { //有结果表保存到结果表(更新,插入)
  1003. tt.S_collection = s_table
  1004. }
  1005. if len(asfields) == 2 {
  1006. qfield = asfields[0] //查询表字段
  1007. rfield = asfields[1] //结果表字段
  1008. //id处理 object string互转
  1009. qfield, rfield, qtp, rtp = IdTypeConversion(qfield, rfield)
  1010. queryExis := map[string]interface{}{
  1011. rfield: map[string]interface{}{
  1012. "$exists": true,
  1013. },
  1014. }
  1015. onedata, _ := tt.MgoTask.FindOne(tt.S_collection, queryExis)
  1016. rtype = reflect.TypeOf((*onedata)[rfield]).String() //结果表字段真实类型
  1017. }
  1018. //有结果表有关联字段,在结果表上根据关联字段更新;有结果表没有关联字段,在结果表根据_id段更新
  1019. //没有结果表在查询表上更新
  1020. //log.Println("lastid:", tt.LastId, "查询方式:", tt.S_querycon, "时间:", tt.S_starttime, "条件:", tt.S_query, "table:", tt.S_table, "s_timefieldname:", tt.S_timefieldname)
  1021. q = mapInfo["q"].(map[string]interface{})
  1022. //计算起始id和结束id时间宽度,大于半小时的2000数据打印一次个数,小于半小时的每条数据都打印一次个数
  1023. sid := mapInfo["gtid"].(string)
  1024. eid := mapInfo["lteid"].(string)
  1025. stime, _ := strconv.ParseInt(sid[:8], 16, 64)
  1026. etime, _ := strconv.ParseInt(eid[:8], 16, 64)
  1027. if etime-stime < 1800 { //时间跨度小于半小时
  1028. timespan = true
  1029. }
  1030. udpsess := tt.MulMgo.GetMgoConn()
  1031. if udpsess == nil {
  1032. log.Println("连接为空", tt.S_name, mapInfo)
  1033. return
  1034. }
  1035. defer tt.MulMgo.DestoryMongoConn(udpsess)
  1036. udptmp := udpsess.DB(tt.MulMgo.DbName).C(tt.MulColl).Find(&q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
  1037. //task
  1038. tasksess := tt.MgoTask.GetMgoConn()
  1039. defer tt.MgoTask.DestoryMongoConn(tasksess)
  1040. log.Println("线程数:", tt.I_thread, "查询语句", q)
  1041. log.Println("task---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
  1042. log.Println("select:", tt.Task_QueryFieldMap)
  1043. extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(&q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
  1044. arr := [][]map[string]interface{}{}
  1045. oid := tt.LastId
  1046. if tt.I_wordcount == 1 {
  1047. tt.WordCount = map[string]map[string]int{}
  1048. }
  1049. sum := 0
  1050. //对比两张表数据,减少查询次数
  1051. var compare bson.M
  1052. for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ {
  1053. result := tmp
  1054. //对比
  1055. for {
  1056. if compare == nil {
  1057. compare = make(bson.M)
  1058. if !udptmp.Next(&compare) { //传参表数据赋值给compare
  1059. break
  1060. }
  1061. }
  1062. if compare != nil {
  1063. //对比
  1064. cid := u.BsonIdToSId(compare["_id"]) //传参表id
  1065. tid := u.BsonIdToSId(tmp["_id"]) //抽取表id
  1066. if cid == tid {
  1067. //更新抽取表的数据,再进行分类
  1068. for _, k := range tt.Task_QueryFieldArr {
  1069. v1 := compare[k]
  1070. v2 := tmp[k]
  1071. if v2 == nil && v1 != nil {
  1072. result[k] = v1
  1073. }
  1074. }
  1075. break
  1076. } else {
  1077. if tid > cid { //抽取表id大于传参表id
  1078. compare = nil
  1079. continue
  1080. } else {
  1081. break
  1082. }
  1083. }
  1084. } else {
  1085. break
  1086. }
  1087. }
  1088. tid := tmp["_id"]
  1089. if !timespan && sum%2000 == 0 {
  1090. log.Println("current:", sum, tid)
  1091. }
  1092. pool <- true
  1093. wg.Add(1)
  1094. go func(result map[string]interface{}) {
  1095. defer func() {
  1096. <-pool
  1097. wg.Done()
  1098. }()
  1099. //按顺序识别
  1100. if result != nil {
  1101. tid := result["_id"]
  1102. update := []map[string]interface{}{}
  1103. //如果有关联字段 根据关联字段更新或者保存
  1104. if len(asfields) == 2 {
  1105. //log.Println("qfield====", qfield)
  1106. field := result[qfield]
  1107. if field != nil {
  1108. //log.Println("field===", field, qtp, rtp)
  1109. if qtp == "bson.ObjectId" && rtp == "bson.ObjectId" { //俩字段类型一致
  1110. } else {
  1111. //log.Println(field, " 查询字段", qfield, "查询类型", qtp, "结果字段", rfield, " 结果类型", rtp, "结果真实类型", rtype)
  1112. if rtype == rtp { //转换后与真实类型不同,填写时类型错误
  1113. //将查询字段类型转换为对应的结果字段类型
  1114. if qtp == "bson.ObjectId" && rtp == "string" {
  1115. field = u.BsonIdToSId(result[qfield])
  1116. } else if qtp == "string" && rtp == "bson.ObjectId" {
  1117. field = u.StringTOBsonId(result[qfield].(string))
  1118. }
  1119. }
  1120. }
  1121. update = append(update, map[string]interface{}{ //根据关联字段更新
  1122. rfield: field,
  1123. })
  1124. }
  1125. } else {
  1126. update = append(update, map[string]interface{}{ //更新的条件 根据id更新
  1127. "_id": tid,
  1128. })
  1129. }
  1130. res := map[string]interface{}{}
  1131. ksmap := make(map[string]map[string][]string)
  1132. if util.IntAll(result["infoformat"]) == 2 { //此处增加特例
  1133. res["toptype"] = "拟建"
  1134. res["subtype"] = "拟建"
  1135. } else if util.IntAll(result["infoformat"]) == 3 {
  1136. res["toptype"] = "产权"
  1137. res["subtype"] = "产权"
  1138. } else {
  1139. SMap := &tools.SortMap{}
  1140. if tt.I_tasktype == 2 { //标签任务
  1141. SMap = TagClassificationRun(tt, result)
  1142. } else if tt.I_tasktype == 1 { //附件任务
  1143. SMap = FileClassificationRun(tt, result)
  1144. //res["projectinfo"] = tmp["projectinfo"]
  1145. } else { //常规任务
  1146. SMap = NewClassificationRun(tt, result)
  1147. //一级分类时,符合结果中成交规则时
  1148. if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" {
  1149. if _, ok := tmp["detail"]; ok {
  1150. if u.ChargeDetailResult(util.ObjToString(tmp["detail"])) {
  1151. SMap.Map["toptype"] = "结果"
  1152. resa := ReSub(tt, tmp, "结果")
  1153. subtype := resa.Map["subtype"]
  1154. delete(SMap.Map, "subtype")
  1155. SMap.Map["subtype"] = subtype
  1156. }
  1157. }
  1158. }
  1159. //3.针对行业分类,添加默认值
  1160. if mapInfo["stype"] == "hangye" || strings.TrimSpace(tt.S_name) == "行业分类" {
  1161. /**没有一级分类是,默认一级 '其它',二级'其它_其它'
  1162. **/
  1163. //tops := SMap.Map["topscopeclass"]
  1164. //subs := SMap.Map["subscopeclass"]
  1165. resultSubs := make([]string, 0)
  1166. resultTobs := make([]string, 0)
  1167. //存在行业一级分类
  1168. if tops, ok := SMap.Map["topscopeclass"]; ok {
  1169. if topps, ok2 := tops.([]string); ok2 {
  1170. for _, v := range topps {
  1171. top := util.ObjToString(v)
  1172. if top != "" {
  1173. resultTobs = append(resultTobs, top)
  1174. }
  1175. }
  1176. }
  1177. if subs, ok3 := SMap.Map["subscopeclass"]; ok3 {
  1178. if subbs, ok4 := subs.([]string); ok4 {
  1179. for _, v := range subbs {
  1180. sub := util.ObjToString(v)
  1181. if sub != "" {
  1182. resultSubs = append(resultSubs, sub)
  1183. }
  1184. }
  1185. }
  1186. }
  1187. newTops, newSubs := u.ProcessTopscopeclass(resultTobs, resultSubs)
  1188. SMap.Map["topscopeclass"] = newTops
  1189. SMap.Map["subscopeclass"] = newSubs
  1190. } else {
  1191. SMap.Map["topscopeclass"] = []string{"其它"}
  1192. SMap.Map["subscopeclass"] = []string{"其它"}
  1193. }
  1194. }
  1195. }
  1196. if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //更新字段 I_fieldUpdate 0:覆盖 1:追加
  1197. //封装追加信息
  1198. if len(SMap.Keys) > 0 {
  1199. for _, k := range SMap.Keys {
  1200. ksarr := make([]string, 0)
  1201. if k != "toptype" && k != "subtype" {
  1202. ks, ok := SMap.Map[k].(string)
  1203. if ok {
  1204. ksarr = append(ksarr, ks)
  1205. } else {
  1206. ksarr = append(ksarr, SMap.Map[k].([]string)...)
  1207. }
  1208. }
  1209. ksmap[k] = map[string][]string{
  1210. "$each": ksarr,
  1211. }
  1212. }
  1213. }
  1214. } else {
  1215. res = SMap.Map
  1216. }
  1217. }
  1218. IS.NewAdd(tt, res)
  1219. if tt.S_attr != "" {
  1220. res[tt.S_attr] = tt.AttrVal
  1221. }
  1222. if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //I_fieldUpdate 0:覆盖 1:追加
  1223. if len(ksmap) > 0 && len(res) > 0 {
  1224. update = append(update, map[string]interface{}{
  1225. "$set": res,
  1226. "$addToSet": ksmap,
  1227. })
  1228. } else if len(ksmap) > 0 && len(res) == 0 {
  1229. update = append(update, map[string]interface{}{
  1230. "$addToSet": ksmap,
  1231. })
  1232. } else if len(ksmap) == 0 && len(res) > 0 {
  1233. update = append(update, map[string]interface{}{
  1234. "$set": res,
  1235. })
  1236. }
  1237. } else {
  1238. if len(res) > 0 {
  1239. update = append(update, map[string]interface{}{
  1240. "$set": res,
  1241. })
  1242. }
  1243. }
  1244. //更新
  1245. lock.Lock()
  1246. if len(update) == 2 { //有更新条件和更新内容时才进行更新操作
  1247. arr = append(arr, update)
  1248. }
  1249. if len(arr) >= NN {
  1250. tt.MgoTask.UpdateBulk(tt.S_collection, arr...)
  1251. arr = [][]map[string]interface{}{}
  1252. }
  1253. lock.Unlock()
  1254. }
  1255. }(result)
  1256. ttid := u.BsonIdToSId(tid)
  1257. if ttid > tt.LastId {
  1258. tt.LastId = ttid
  1259. }
  1260. tmp = make(map[string]interface{})
  1261. }
  1262. total = sum
  1263. if timespan {
  1264. log.Println("current:", sum)
  1265. }
  1266. wg.Wait()
  1267. lock.Lock()
  1268. if len(arr) > 0 {
  1269. //if s_table == "" { //没有结果表
  1270. tt.MgoTask.UpdateBulk(tt.S_collection, arr...) //在原表上更新
  1271. //} else { //有结果表
  1272. // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...)
  1273. //}
  1274. arr = [][]map[string]interface{}{}
  1275. }
  1276. lock.Unlock()
  1277. tt.WcLock.Lock()
  1278. if len(tt.WordCount) > 0 {
  1279. savem := []map[string]interface{}{}
  1280. tn := time.Now().Unix()
  1281. for wk, wm := range tt.WordCount {
  1282. for ck, cm := range wm {
  1283. m1 := map[string]interface{}{
  1284. "s_class": wk,
  1285. "s_word": ck,
  1286. "i_count": cm,
  1287. "bz": tn,
  1288. }
  1289. savem = append(savem, m1)
  1290. if len(savem) >= 1000 {
  1291. tools.MgoClass.SaveBulk("wordcount", savem...)
  1292. savem = []map[string]interface{}{}
  1293. }
  1294. }
  1295. }
  1296. if len(savem) > 0 {
  1297. tools.MgoClass.SaveBulk("wordcount", savem...)
  1298. savem = nil
  1299. }
  1300. }
  1301. tt.WcLock.Unlock()
  1302. //更新最后id
  1303. if !budp && oid != tt.LastId {
  1304. setid := map[string]interface{}{
  1305. "$set": map[string]interface{}{
  1306. "s_startid": tt.LastId,
  1307. },
  1308. }
  1309. go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, setid, false, false)
  1310. }
  1311. //更新最后时间
  1312. if !budp {
  1313. nowtime := time.Now().Unix()
  1314. settime := map[string]interface{}{
  1315. "$set": map[string]interface{}{
  1316. "s_starttime": nowtime,
  1317. },
  1318. }
  1319. go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, settime, false, false)
  1320. }
  1321. //InitRule()
  1322. log.Println("运行", tt.S_name, "over")
  1323. })
  1324. return total
  1325. }
  1326. func UdpRunExtract(sid, eid string) { //5cb6c508a5cb26b9b70d6536
  1327. by, _ := json.Marshal(map[string]interface{}{
  1328. "gtid": sid,
  1329. "lteid": eid,
  1330. "stype": tools.ExtractStype,
  1331. })
  1332. log.Println("定时任务调下一节点分类:", tools.ExtractPort, string(by))
  1333. addr := &net.UDPAddr{
  1334. IP: net.ParseIP(tools.ExtractAddr),
  1335. Port: tools.ExtractPort,
  1336. }
  1337. //node := &tools.UdpNode{by, addr, time.Now().Unix(), 0}
  1338. //udptaskmap.Store(key, node)
  1339. tools.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  1340. }
  1341. func FindId_back(coll string) (gtid, lteid string) {
  1342. sum := 0 //记录数据总量
  1343. data, _ := tools.MgoClass.Find(coll, `{"isused":false}`, `{"_id":1}`, nil, false, -1, -1)
  1344. length := len(*data)
  1345. set := bson.M{
  1346. "$set": bson.M{
  1347. "isused": true,
  1348. "publishtime": time.Now().Unix(),
  1349. },
  1350. }
  1351. for i, d := range *data {
  1352. id := d["_id"]
  1353. count := util.IntAll(d["count"])
  1354. sum += count
  1355. if gtid == "" {
  1356. gtid = d["gtid"].(string)
  1357. }
  1358. if sum >= 5000 || i == length-1 { //总数大于5000或数据取完,取此id段数据
  1359. lteid := d["lteid"].(string)
  1360. go tools.MgoClass.Update(coll, bson.M{"_id": id}, set, false, false)
  1361. return gtid, lteid
  1362. } else { //总数小于5000,删除已使用数据
  1363. go tools.MgoClass.Update(coll, bson.M{"_id": id}, set, false, false)
  1364. }
  1365. }
  1366. return gtid, lteid
  1367. }
  1368. func FindId(coll string) (gtid, lteid string) {
  1369. data, _ := tools.MgoClass.Find(coll, map[string]interface{}{"dataprocess": 0}, `{"_id":1}`, nil, false, -1, -1)
  1370. for _, d := range *data {
  1371. gtid = d["gtid"].(string)
  1372. lteid = d["lteid"].(string)
  1373. set := map[string]interface{}{
  1374. "$set": map[string]interface{}{
  1375. "dataprocess": 2,
  1376. "updatetime": time.Now().Unix(),
  1377. },
  1378. }
  1379. tools.MgoClass.Update(coll, map[string]interface{}{"_id": d["_id"]}, set, false, false)
  1380. break
  1381. }
  1382. return gtid, lteid
  1383. }
  1384. // NewLoadTestTask 测试任务
  1385. func NewLoadTestTask(_id, s_mgourl, s_mgodb, s_coll, i_poolsize, s_startid, s_endid, s_query string) (bs bool, filename string) {
  1386. defer tools.Catch()
  1387. r, t, _ := NewAnalyTask(_id, s_mgourl, s_mgodb, s_coll, tools.IntAllDef(i_poolsize, 5))
  1388. //log.Println(m)
  1389. if r && t != nil {
  1390. filename = time.Now().Format("150405") + ".csv"
  1391. go t.RRunTest(s_startid, s_endid, s_query, filename)
  1392. bs = true
  1393. }
  1394. return
  1395. }
  1396. // NewLoadTask 加载任务
  1397. func NewLoadTask(_id string, res *tools.JSON) {
  1398. defer tools.Catch()
  1399. //初始化任务信息
  1400. InitTaskData(_id)
  1401. //初始化任务mgo配置信息
  1402. bres, tt, msg := NewAnalyTask(_id, "", "", "", 5)
  1403. tt.I_status = 1
  1404. log.Println(tt.S_mgodb, tt.S_name, tt.I_thread)
  1405. res.Msg = msg
  1406. if bres && tt != nil {
  1407. res.Status = true
  1408. NEWTASKPOOL[_id] = tt //存入当前启动任务
  1409. log.Println("加载", tt.S_name, "完成...", tt.S_query)
  1410. go tt.RRun()
  1411. }
  1412. }
  1413. // 处理id的类型转换
  1414. func IdTypeConversion(q, r string) (string, string, string, string) {
  1415. qtp, rtp := "bson.ObjectId", "bson.ObjectId"
  1416. if strings.Contains(q, "ObjectId") || strings.Contains(q, "objectId") {
  1417. q = q[9 : len(q)-1]
  1418. } else if strings.Contains(q, "StringId") || strings.Contains(q, "stringId") {
  1419. q = q[9 : len(q)-1]
  1420. qtp = "string"
  1421. }
  1422. if strings.Contains(r, "ObjectId") || strings.Contains(r, "objectId") {
  1423. r = r[9 : len(r)-1]
  1424. } else if strings.Contains(r, "StringId") || strings.Contains(r, "stringId") {
  1425. r = r[9 : len(r)-1]
  1426. rtp = "string"
  1427. }
  1428. return q, r, qtp, rtp
  1429. }
  1430. // 获取匹配或不匹配的个数
  1431. func GetNum(rule string) (int, bool) {
  1432. num := 1
  1433. isnum := strings.HasSuffix(rule, ")")
  1434. if !isnum { //是数字
  1435. s := []rune(rule)
  1436. last := string(s[len(s)-1:])
  1437. num = tools.IntAll(last)
  1438. }
  1439. return num, isnum
  1440. }
  1441. // 获取规则
  1442. func GetRule(text string, isnum bool) (matchArr []string) {
  1443. if isnum { //最后一个不是数字
  1444. if strings.HasPrefix(text, "(") && strings.HasSuffix(text, ")") {
  1445. text = text[1 : len(text)-1]
  1446. matchArr = strings.Split(text, "|")
  1447. }
  1448. } else if strings.HasPrefix(text, "(") && !isnum {
  1449. text = text[1 : len(text)-2]
  1450. matchArr = strings.Split(text, "|")
  1451. }
  1452. return matchArr
  1453. }
  1454. func (d *DFA) AddWord(keys ...string) {
  1455. d.AddWordAll(true, keys...)
  1456. }
  1457. func (d *DFA) AddWordAll(haskey bool, keys ...string) {
  1458. if d.Link == nil {
  1459. d.Link = make(map[string]interface{})
  1460. }
  1461. for _, key := range keys {
  1462. nowMap := &d.Link
  1463. for i := 0; i < len(key); i++ {
  1464. kc := key[i : i+1]
  1465. if v, ok := (*nowMap)[kc]; ok {
  1466. nowMap, _ = v.(*map[string]interface{})
  1467. } else {
  1468. newMap := map[string]interface{}{}
  1469. newMap["YN"] = "0"
  1470. (*nowMap)[kc] = &newMap
  1471. nowMap = &newMap
  1472. }
  1473. if i == len(key)-1 {
  1474. (*nowMap)["YN"] = "1"
  1475. if haskey {
  1476. (*nowMap)["K"] = key
  1477. }
  1478. }
  1479. }
  1480. }
  1481. }
  1482. func (d *DFA) CheckSensitiveWord(src string, n int) (bool, []string) {
  1483. res := make([]string, 0)
  1484. tmpMap := make(map[string]int)
  1485. for j := 0; j < len(src); j++ {
  1486. nowMap := &d.Link
  1487. for i := j; i < len(src); i++ {
  1488. word := src[i : i+1]
  1489. nowMap, _ = (*nowMap)[word].(*map[string]interface{})
  1490. if nowMap != nil { // 存在,则判断是否为最后一个
  1491. if "1" == util.ObjToString((*nowMap)["YN"]) {
  1492. s := util.ObjToString((*nowMap)["K"])
  1493. tmpMap[s] = 1
  1494. //nowMap = &d.Link //匹配到之后继续匹配后边的内容
  1495. }
  1496. } else {
  1497. //nowMap = &d.Link
  1498. break
  1499. }
  1500. }
  1501. }
  1502. if len(tmpMap) >= n {
  1503. for k, _ := range tmpMap {
  1504. res = append(res, k)
  1505. }
  1506. return true, res
  1507. }
  1508. return false, []string{}
  1509. }
  1510. func (d *DFA) CheckSensitiveWordTest(src string, n int) (bool, []string) {
  1511. res := make([]string, 0)
  1512. tmpMap := make(map[string]int)
  1513. for j := 0; j < len(src); j++ {
  1514. nowMap := &d.Link
  1515. for i := j; i < len(src); i++ {
  1516. word := src[i : i+1]
  1517. nowMap, _ = (*nowMap)[word].(*map[string]interface{})
  1518. if nowMap != nil { // 存在,则判断是否为最后一个
  1519. if "1" == util.ObjToString((*nowMap)["YN"]) {
  1520. s := util.ObjToString((*nowMap)["K"])
  1521. tmpMap[s] = 1
  1522. //nowMap = &d.Link //匹配到之后继续匹配后边的内容
  1523. }
  1524. } else {
  1525. //nowMap = &d.Link
  1526. break
  1527. }
  1528. }
  1529. }
  1530. for k, _ := range tmpMap {
  1531. res = append(res, k)
  1532. }
  1533. return len(tmpMap) >= n, res
  1534. }
  1535. // UpdateTaskInfo 更新任务信息
  1536. func UpdateTaskInfo(flag bool, tid string) bool {
  1537. query := bson.M{
  1538. "_id": u.StringTOBsonId(tid),
  1539. }
  1540. set := bson.M{
  1541. "$set": bson.M{
  1542. "b_updaterule": flag,
  1543. },
  1544. }
  1545. return tools.MgoClass.Update(tools.COLL_TASK, query, set, false, false)
  1546. }
  1547. // o_projectinfo中数据分类定时任务
  1548. func RunTask() {
  1549. if tools.IsStart { //是否开启定时任务
  1550. tt := InitTimeTask() //初始化任务
  1551. //StartTask(tt)
  1552. //return
  1553. c := cron.New()
  1554. cronstr := "0 */" + fmt.Sprint(tt.I_rate) + " * * * ?" //每TaskTime分钟执行一次
  1555. c.AddFunc(cronstr, func() { StartTask(tt) })
  1556. c.Start()
  1557. }
  1558. }
  1559. // 初始化任务
  1560. func InitTimeTask() *TTask {
  1561. defer util.Catch()
  1562. timeTaskTT := &TTask{}
  1563. InitTaskData(tools.TimeTaskid)
  1564. bres, tt, _ := NewAnalyTask(tools.TimeTaskid, "", "", "", 5)
  1565. if bres && tt != nil {
  1566. timeTaskTT = tt
  1567. logger.Debug("初始化定时任务成功")
  1568. } else {
  1569. logger.Debug("初始化定时任务失败")
  1570. }
  1571. return timeTaskTT
  1572. }
  1573. // StartTask 开始任务
  1574. func StartTask(t *TTask) {
  1575. defer util.Catch()
  1576. logger.Debug("开始执行定时任务")
  1577. query := map[string]interface{}{
  1578. "_id": map[string]interface{}{
  1579. "$gt": u.StringTOBsonId(tools.IdCollSid),
  1580. },
  1581. "dataprocess": 8,
  1582. }
  1583. order := map[string]interface{}{"_id": -1}
  1584. logger.Debug("query:", query)
  1585. list, _ := tools.MgoClass.Find(t.S_idcoll, query, order, nil, false, -1, -1)
  1586. sid := t.S_startid
  1587. eid := ""
  1588. if list != nil && len(*list) > 0 {
  1589. eid = util.ObjToString((*list)[0]["lteid"])
  1590. if eid <= sid {
  1591. logger.Debug("id err. sid:", sid, " eid:", eid)
  1592. return
  1593. }
  1594. t.S_startid = eid //更新任务中数据的起始id
  1595. tools.IdCollSid = u.BsonIdToSId((*list)[0]["_id"]) //更新id表起始id
  1596. //更新任务表中起始id
  1597. setid := map[string]interface{}{
  1598. "$set": map[string]interface{}{
  1599. "s_startid": t.S_startid,
  1600. },
  1601. }
  1602. go tools.MgoClass.Update("rc_task", `{"_id":"`+t.ID+`"}`, setid, false, false)
  1603. //查拟建数据
  1604. query := map[string]interface{}{
  1605. "_id": map[string]interface{}{
  1606. "$gt": u.StringTOBsonId(sid),
  1607. "$lte": u.StringTOBsonId(eid),
  1608. },
  1609. "toptype": "拟建",
  1610. }
  1611. sess := t.MgoTask.GetMgoConn()
  1612. defer t.MgoTask.DestoryMongoConn(sess)
  1613. count, _ := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Count()
  1614. logger.Debug("count:", count, " query:", query)
  1615. if count == 0 { //此轮任务没有查到数据
  1616. return
  1617. }
  1618. arr := [][]map[string]interface{}{}
  1619. wg := &sync.WaitGroup{}
  1620. lock := &sync.Mutex{}
  1621. pool := make(chan bool, t.I_thread)
  1622. sum := 0
  1623. logger.Debug("select:", t.Task_QueryFieldMap)
  1624. it := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Select(t.Task_QueryFieldMap).Iter()
  1625. for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
  1626. pool <- true
  1627. wg.Add(1)
  1628. go func(tmp map[string]interface{}) {
  1629. defer func() {
  1630. <-pool
  1631. wg.Done()
  1632. }()
  1633. update := []map[string]interface{}{}
  1634. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  1635. SMap := &tools.SortMap{}
  1636. SMap = NewClassificationRun(t, tmp)
  1637. if len(SMap.Map) > 0 {
  1638. //一级分类时,符合结果中成交规则时
  1639. if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" {
  1640. if _, ok := tmp["detail"]; ok {
  1641. if u.ChargeDetailResult(util.ObjToString(tmp["detail"])) {
  1642. SMap.Map["toptype"] = "结果"
  1643. resa := ReSub(t, tmp, "结果")
  1644. subtype := resa.Map["subtype"]
  1645. delete(SMap.Map, "subtype")
  1646. SMap.Map["subtype"] = subtype
  1647. }
  1648. }
  1649. }
  1650. update = append(update, map[string]interface{}{"$set": SMap.Map})
  1651. }
  1652. //更新
  1653. lock.Lock()
  1654. if len(update) == 2 { //有更新条件和更新内容时才进行更新操作
  1655. arr = append(arr, update)
  1656. }
  1657. if len(arr) >= NN {
  1658. tmps := arr
  1659. t.MgoTask.UpdateBulk(t.S_coll, tmps...)
  1660. arr = [][]map[string]interface{}{}
  1661. }
  1662. lock.Unlock()
  1663. }(tmp)
  1664. if sum%100 == 0 {
  1665. log.Println("current:", sum)
  1666. }
  1667. tmp = make(map[string]interface{})
  1668. }
  1669. wg.Wait()
  1670. lock.Lock()
  1671. if len(arr) > 0 {
  1672. t.MgoTask.UpdateBulk(t.S_coll, arr...)
  1673. arr = [][]map[string]interface{}{}
  1674. }
  1675. lock.Unlock()
  1676. logger.Debug("定时任务执行完毕 count:", sum)
  1677. UdpRunProjectForecast(sid, eid)
  1678. }
  1679. logger.Debug("Udp通知项目预测执行完毕")
  1680. }
  1681. // udp通知项目预测
  1682. func UdpRunProjectForecast(sid, eid string) {
  1683. by, _ := json.Marshal(map[string]interface{}{
  1684. "gtid": sid,
  1685. "lteid": eid,
  1686. })
  1687. logger.Debug("定时任务通知项目预测:", string(by))
  1688. addr := &net.UDPAddr{
  1689. IP: net.ParseIP(tools.NextNodeAddr),
  1690. Port: tools.NextNodePort,
  1691. }
  1692. tools.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  1693. }
  1694. // SendAi 调用大模型招标分类;map[result:[结果-中标] status:200]
  1695. func SendAi(data map[string]interface{}, url string) (res map[string]interface{}) {
  1696. // 设置 2 秒的超时
  1697. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  1698. defer cancel()
  1699. //url := "http://192.168.3.109:16688"
  1700. jsonData, err := json.Marshal(data)
  1701. if err != nil {
  1702. fmt.Println("JSON marshal error:", err)
  1703. return
  1704. }
  1705. req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  1706. if err != nil {
  1707. fmt.Println("Request error:", err)
  1708. return
  1709. }
  1710. req.Header.Set("Content-Type", "application/json")
  1711. // 将请求与上下文关联
  1712. req = req.WithContext(ctx)
  1713. client := &http.Client{}
  1714. resp, err := client.Do(req)
  1715. if err != nil {
  1716. // 使用 errors.Is 检查错误是否是超时错误
  1717. if errors.Is(err, context.DeadlineExceeded) {
  1718. fmt.Println("Request timed out")
  1719. return
  1720. }
  1721. fmt.Println("Request error:", err)
  1722. return
  1723. }
  1724. defer resp.Body.Close()
  1725. err = json.NewDecoder(resp.Body).Decode(&res)
  1726. if err != nil {
  1727. fmt.Println("Response decoding error:", err)
  1728. return
  1729. }
  1730. return
  1731. }