task.go 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731
  1. package task
  2. import (
  3. "encoding/csv"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "os"
  10. "qfw/util"
  11. "reflect"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "tools"
  18. u "util"
  19. "github.com/cron"
  20. "github.com/donnie4w/go-logger/logger"
  21. "go.mongodb.org/mongo-driver/bson"
  22. "go.mongodb.org/mongo-driver/bson/primitive"
  23. )
  24. func init() {
  25. REG, _ = regexp.Compile(`\(.*?\)\d*`)
  26. REG1, _ = regexp.Compile(`\(.*?\)`)
  27. //log.Println(REG.FindAllString("(平台|软件|电子商务|多媒体|通讯设备)1(建设|采购|开发)2", -1))
  28. //加载所有任务
  29. StartMonitor()
  30. if tools.NoAutoRun == 0 {
  31. rtask, b := tools.MgoClass.Find(tools.COLL_TASK, `{"i_status":1}`, nil, `{"_id":1}`, false, 0, 50)
  32. if b && rtask != nil && *rtask != nil {
  33. for _, t := range *rtask {
  34. _id := u.BsonIdToSId(t["_id"])
  35. res := tools.JSON{}
  36. //LoadTask(_id, &res)
  37. NewLoadTask(_id, &res)
  38. }
  39. }
  40. }
  41. }
  42. //var UdpSess *tools.MongodbSim
  43. var REG *regexp.Regexp
  44. var REG1 *regexp.Regexp
  45. var TaskLock = sync.Mutex{}
  46. var NEWTASKPOOL = map[string]*TTask{} //存储当前任务
  47. var TaskMap = map[string]*TTask{}
  48. var HangyeUdps = make(chan map[string]interface{}, 100) //初始化行业分类udp任务池
  49. type Rule struct {
  50. Reg []interface{} //规则
  51. DetailReg []interface{} //detail正则,目前只针对title、channel的二级招标分类中的中标规则
  52. NotReg []interface{} //排除规则
  53. Rid string
  54. Class *Class
  55. Rule_PreRule string
  56. S_name string
  57. S_code string
  58. S_pid string
  59. //Reg []*RuleDFA
  60. }
  61. type Class struct {
  62. Cid string
  63. TTask *TTask
  64. Rule []*Rule
  65. Class_PreRule string
  66. S_name string
  67. S_fields string
  68. S_code string
  69. S_default string
  70. S_pid string
  71. S_savefield string
  72. }
  73. type TTask struct {
  74. Class []*Class //任务中类集合
  75. ID string //任务id
  76. S_name string //任务名称
  77. I_rate int //任务执行频率
  78. S_class string //任务中类id
  79. S_mgourl string //任务mgo addr
  80. S_mgodb string //任务mgo db
  81. S_collection string //查询、存储表
  82. I_poolsize int //任务连接池个数
  83. S_startid string //任务起始id
  84. I_status int //控制任务状态属性
  85. S_attr string //标识属性key值
  86. AttrVal int //标识属性val值
  87. S_coll string //查询、存储表
  88. LastId string //上次定时任务的结束id
  89. Lock sync.Mutex
  90. S_query string //任务查询条件
  91. FlagQuit chan bool //任务结束控制
  92. B_Running bool //任务是否执行
  93. MgoTask *u.MongodbSim //任务查询表mgo
  94. I_multiclass int //是否支持多分类
  95. I_savetype int //1 名称 2 值
  96. I_thread int //线程数
  97. I_wordcount int //词频统计
  98. WordCount map[string]map[string]int //词频统计集合
  99. WcLock sync.Mutex
  100. Task_PreRule string //任务前置过滤
  101. S_table string //结果表
  102. S_querycon string //查询方式 1是id查询 0是时间查询
  103. S_starttime int64 //起始时间
  104. S_timefieldname string //各表所查时间字段名称
  105. S_asfield string //查询表与结果表关联字段
  106. I_fieldUpdate int //分类中的保存字段信息 0:覆盖 1:更新
  107. MulMgo *u.MongodbSim //联合查询的mgo配置信息
  108. MulColl string //联合查询的表名
  109. I_tasktype int //任务类型 0:常规任务 1:附件任务:2
  110. S_idcoll string //正式环境查询数据id段
  111. B_UpdateRule bool //是否更新任务下的规则
  112. S_classField string //分类字段
  113. Task_QueryFieldArr []string //用于合并数据
  114. Task_QueryFieldMap map[string]interface{} //用于仅查询字段
  115. Dbtype string //用于区分连得是哪个库,使用不同的用户密码
  116. }
  117. type RuleDFA struct {
  118. Match []DFA //包含的敏感词
  119. MatchNum []int //包含敏感词匹配个数
  120. MisMatch DFA //不包含的敏感词
  121. MisMatchNum int //不包含敏感词匹配个数
  122. }
  123. type DFA struct {
  124. Link map[string]interface{}
  125. }
  126. //初始化任务
  127. func InitTaskData(_id string) {
  128. defer tools.Catch()
  129. taskData, _ := tools.MgoClass.FindById(tools.COLL_TASK, _id, nil)
  130. task := &TTask{}
  131. task.MgoTask = &u.MongodbSim{}
  132. s_querycon := "1"
  133. s_table := ""
  134. s_timefieldname := ""
  135. s_starttime := int64(0)
  136. s_asfield := ""
  137. if b_updaterule, ok := (*taskData)["b_updaterule"].(bool); ok {
  138. task.B_UpdateRule = b_updaterule //更新任务下的规则
  139. }
  140. if (*taskData)["s_querycon"] != nil { //
  141. s_querycon = util.ObjToString((*taskData)["s_querycon"])
  142. }
  143. if (*taskData)["s_starttime"] != nil {
  144. s_starttime = (*taskData)["s_starttime"].(int64)
  145. }
  146. if (*taskData)["s_table"] != nil {
  147. s_table = util.ObjToString((*taskData)["s_table"])
  148. }
  149. if (*taskData)["s_timefieldname"] != nil {
  150. s_timefieldname = util.ObjToString((*taskData)["s_timefieldname"])
  151. }
  152. if (*taskData)["s_asfield"] != nil {
  153. s_asfield = util.ObjToString((*taskData)["s_asfield"])
  154. }
  155. //task.Class
  156. task.ID = u.BsonIdToSId((*taskData)["_id"])
  157. task.S_name = util.ObjToString((*taskData)["s_name"])
  158. task.S_class = util.ObjToString((*taskData)["s_class"])
  159. task.S_mgourl = util.ObjToString((*taskData)["s_mgourl"])
  160. task.S_mgodb = util.ObjToString((*taskData)["s_mgodb"])
  161. task.S_coll = util.ObjToString((*taskData)["s_coll"])
  162. task.I_poolsize = util.IntAll((*taskData)["i_poolsize"])
  163. task.Task_PreRule = util.ObjToString((*taskData)["s_task_prerule"])
  164. task.I_multiclass = util.IntAllDef((*taskData)["i_multiclass"], 0)
  165. task.I_wordcount = util.IntAllDef((*taskData)["i_wordcount"], 0)
  166. task.I_savetype = util.IntAllDef((*taskData)["i_savetype"], 1)
  167. task.I_thread = util.IntAllDef((*taskData)["i_thread"], 1)
  168. task.S_query = util.ObjToString((*taskData)["s_query"])
  169. task.S_startid = util.ObjToString((*taskData)["s_startid"])
  170. task.S_attr = util.ObjToString((*taskData)["s_attr"])
  171. task.I_rate = util.IntAllDef((*taskData)["i_rate"], 10)
  172. task.I_tasktype = util.IntAllDef((*taskData)["i_tasktype"], 0)
  173. task.I_status = util.IntAll((*taskData)["i_status"])
  174. task.S_idcoll = util.ObjToString((*taskData)["s_idcoll"])
  175. task.S_querycon = s_querycon
  176. task.S_starttime = s_starttime
  177. task.S_table = s_table
  178. task.S_timefieldname = s_timefieldname
  179. task.S_asfield = s_asfield
  180. task.I_fieldUpdate = util.IntAllDef((*taskData)["i_fieldUpdate"], 0)
  181. task.S_classField = util.ObjToString((*taskData)["s_classfield"])
  182. task.Dbtype = util.ObjToString((*taskData)["s_dbtype"])
  183. flagAttrVal := 1
  184. attrs := strings.Split(task.S_attr, "__")
  185. if len(attrs) == 2 {
  186. flagAttrVal = util.IntAll(attrs[1])
  187. task.S_attr = attrs[0]
  188. }
  189. task.AttrVal = flagAttrVal
  190. //联表查询初始化mgo,线上只有行业分类用到;跑历史招标、行业分类的时候也用到两边查询
  191. for _, v := range tools.Config {
  192. if m, ok := v.(map[string]interface{}); ok {
  193. if m["taskid"] == task.ID {
  194. if m["mgoaddr"] != nil && m["db"] != nil && m["coll"] != nil {
  195. dbtype := util.ObjToString(m["dbtype"])
  196. addr, _ := m["mgoaddr"].(string)
  197. db, _ := m["db"].(string)
  198. coll, _ := m["coll"].(string)
  199. task.MulMgo = &u.MongodbSim{}
  200. task.MulMgo.MongodbAddr = addr
  201. task.MulMgo.Size = 3
  202. task.MulMgo.DbName = db
  203. if dbtype != "" {
  204. task.MulMgo.UserName = tools.DbInfo[dbtype][0]
  205. task.MulMgo.Password = tools.DbInfo[dbtype][1]
  206. }
  207. task.MulMgo.InitPool()
  208. task.MulColl = coll
  209. }
  210. }
  211. }
  212. }
  213. //初始化查询字段信息
  214. if task.Task_QueryFieldMap == nil {
  215. task.Task_QueryFieldMap = make(map[string]interface{})
  216. }
  217. //Task_QueryFieldMap加入关联字段
  218. for _, v := range strings.Split(s_asfield, "==") {
  219. if v != "" {
  220. task.Task_QueryFieldMap[v] = 1
  221. }
  222. }
  223. //Task_QueryFieldMap加入分类字段
  224. //Task_QueryFieldArr加入分类字段
  225. for _, f := range strings.Split(task.S_classField, ",") {
  226. task.Task_QueryFieldMap[f] = 1
  227. task.Task_QueryFieldArr = append(task.Task_QueryFieldArr, f)
  228. }
  229. //初始化任务下所有的分类和规则
  230. InitClassAndRuleData(_id, task)
  231. }
  232. //InitClassAndRuleData 初始化任务下所有的分类和规则
  233. func InitClassAndRuleData(_id string, task *TTask) {
  234. defer tools.Catch()
  235. classIdStr := task.S_class
  236. if classIdStr != "" {
  237. classIdArr := strings.Split(classIdStr, ",")
  238. classArr := make([]*Class, 0)
  239. for _, classid := range classIdArr {
  240. classData, _ := tools.MgoClass.FindById(tools.COLL_CLASS, classid, nil)
  241. if classData != nil {
  242. //初始化Class
  243. class := &Class{
  244. //Rule: CidRuleMap[classid],
  245. Cid: classid,
  246. Class_PreRule: util.ObjToString((*classData)["s_class_prerule"]),
  247. S_name: util.ObjToString((*classData)["s_name"]),
  248. S_fields: util.ObjToString((*classData)["s_fields"]),
  249. S_code: util.ObjToString((*classData)["s_code"]),
  250. S_default: util.ObjToString((*classData)["s_default"]),
  251. S_pid: util.ObjToString((*classData)["s_pid"]),
  252. S_savefield: util.ObjToString((*classData)["s_savefield"]),
  253. }
  254. //根据classid查找对应的rule
  255. ruleList, _ := tools.MgoClass.Find(tools.COLL_RULE, `{"s_classid":"`+classid+`"}`, `{"i_order":1}`, nil, false, -1, -1)
  256. if ruleList != nil && len(*ruleList) > 0 {
  257. for _, v := range *ruleList {
  258. _id := u.BsonIdToSId(v["_id"])
  259. //rule
  260. s_rule, _ := v["s_rule"].(primitive.A)
  261. i_rule := DealRules(s_rule)
  262. //detailrule
  263. s_detailrule, _ := v["s_detailrule"].(primitive.A)
  264. i_detailrule := DealRules(s_detailrule)
  265. //notrule
  266. s_notrule, _ := v["s_notrule"].(primitive.A)
  267. i_notrule := DealRules(s_notrule)
  268. rule := &Rule{
  269. Rid: _id,
  270. Reg: i_rule,
  271. DetailReg: i_detailrule,
  272. NotReg: i_notrule,
  273. Rule_PreRule: util.ObjToString(v["s_rule_prerule"]),
  274. S_name: util.ObjToString(v["s_name"]),
  275. S_code: util.ObjToString(v["s_code"]),
  276. S_pid: util.ObjToString(v["s_pid"]),
  277. }
  278. class.Rule = append(class.Rule, rule)
  279. }
  280. }
  281. classArr = append(classArr, class)
  282. }
  283. }
  284. task.Class = classArr
  285. TaskMap[_id] = task
  286. }
  287. }
  288. func DealRules(rules []interface{}) (i_rule []interface{}) {
  289. for _, r := range tools.ObjArrToStringArr(rules) {
  290. if strings.HasPrefix(r, "'") && strings.HasSuffix(r, "'") { //正则
  291. rs := []rune(r)
  292. ru := string(rs[1 : len(rs)-1])
  293. rureg, err := regexp.Compile(ru)
  294. if err != nil {
  295. log.Println("error---rule:", r)
  296. continue
  297. }
  298. i_rule = append(i_rule, []interface{}{rureg}...)
  299. } else { //规则,加入到敏感词匹配
  300. matchnum := 0
  301. mismatchnum := 0
  302. isnum1 := false
  303. isnum2 := false
  304. numArr := make([]int, 0)
  305. ruleDFA := &RuleDFA{
  306. Match: []DFA{},
  307. MisMatch: DFA{},
  308. }
  309. tmpArr := strings.Split(r, "^")
  310. matchTmp := tmpArr[0]
  311. ruleTextArr := REG.FindAllString(matchTmp, -1)
  312. for _, match := range ruleTextArr {
  313. matchnum, isnum1 = GetNum(match)
  314. numArr = append(numArr, matchnum)
  315. matchArr := GetRule(match, isnum1)
  316. tmpDFA := DFA{
  317. Link: make(map[string]interface{}),
  318. }
  319. tmpDFA.AddWord(matchArr...)
  320. ruleDFA.Match = append(ruleDFA.Match, tmpDFA)
  321. }
  322. if len(tmpArr) == 2 {
  323. mismatch := tmpArr[1]
  324. mismatchnum, isnum2 = GetNum(mismatch)
  325. mismatchArr := GetRule(mismatch, isnum2)
  326. ruleDFA.MisMatch.AddWord(mismatchArr...)
  327. }
  328. ruleDFA.MatchNum = numArr
  329. ruleDFA.MisMatchNum = mismatchnum
  330. i_rule = append(i_rule, []interface{}{ruleDFA}...)
  331. }
  332. }
  333. return
  334. }
  335. //更新任务状态
  336. func (tt *TTask) Sstatus() int {
  337. if tt.I_status == 0 && tt.B_Running {
  338. return 1
  339. } else if tt.I_status == 0 && !tt.B_Running {
  340. return 0
  341. } else if tt.I_status == 1 && tt.B_Running {
  342. return 2
  343. } else if tt.I_status == 1 && !tt.B_Running {
  344. return 3
  345. }
  346. return -1
  347. }
  348. //停止任务
  349. func (tt *TTask) SStop() bool {
  350. tt.Lock.Lock()
  351. defer tt.Lock.Unlock()
  352. if tt.I_status == 1 {
  353. tt.I_status = 0
  354. tt.FlagQuit <- true
  355. log.Println("开始停止...")
  356. }
  357. return true
  358. }
  359. var NN = 400
  360. //存放测试的数据
  361. var TEST = &TestList{
  362. Count: map[string][]int{},
  363. }
  364. type TestList struct {
  365. Lock sync.Mutex
  366. Count map[string][]int
  367. }
  368. func (tl *TestList) Get(id string) []int {
  369. tl.Lock.Lock()
  370. defer tl.Lock.Unlock()
  371. return tl.Count[id]
  372. }
  373. func (tl *TestList) Put(id string, val ...int) {
  374. defer tools.Catch()
  375. tl.Lock.Lock()
  376. defer tl.Lock.Unlock()
  377. if len(val) == 3 {
  378. tl.Count[id] = val
  379. } else {
  380. tval := tl.Count[id]
  381. tl.Count[id][2] = val[0]
  382. if tval[1] <= val[0] {
  383. tl.Count[id][0] = 1
  384. }
  385. }
  386. }
  387. func (tl *TestList) Del(id string) {
  388. tl.Lock.Lock()
  389. defer tl.Lock.Unlock()
  390. delete(tl.Count, id)
  391. os.Remove("csv/" + id)
  392. }
  393. //任务测试
  394. func (tt *TTask) RRunTest(s_startid, s_endid, s_query, filename string) {
  395. defer tools.Catch()
  396. defer func() {
  397. go func() {
  398. time.AfterFunc(10*time.Minute, func() {
  399. TEST.Del(filename)
  400. })
  401. }()
  402. }()
  403. //开始识别
  404. sess := tt.MgoTask.GetMgoConn()
  405. defer tt.MgoTask.DestoryMongoConn(sess)
  406. q := map[string]interface{}{}
  407. if s_query != "" {
  408. json.Unmarshal([]byte(strings.Replace(s_query, "'", "\"", -1)), &q)
  409. }
  410. if s_startid != "" {
  411. q["_id"] = map[string]interface{}{
  412. "$gte": u.StringTOBsonId(s_startid),
  413. }
  414. }
  415. if s_endid != "" {
  416. if q["_id"] != nil {
  417. q["_id"].(map[string]interface{})["$lte"] = u.StringTOBsonId(s_endid)
  418. } else {
  419. q["_id"] = map[string]interface{}{
  420. "$lte": u.StringTOBsonId(s_endid),
  421. }
  422. }
  423. }
  424. count := tt.MgoTask.Count(tt.S_coll, &q)
  425. if count == 0 {
  426. return
  427. }
  428. TEST.Put(filename, 0, count, 0)
  429. query := sess.DB(tt.S_mgodb).C(tt.S_coll).Find(&q).Iter()
  430. arr := [][]string{}
  431. i := 0
  432. for tmp := make(map[string]interface{}); query.Next(&tmp); i = i + 1 {
  433. if i > 2000 { //数据跑至2000停止
  434. break
  435. }
  436. //按顺序识别
  437. tid := tmp["_id"]
  438. res := make([]string, 3)
  439. res[0] = u.BsonIdToSId(tid)
  440. res[1] = util.ObjToString(tmp["title"])
  441. if util.IntAll(tmp["infoformat"]) == 2 { //此处增加特例
  442. res = append(res, "拟建", "拟建")
  443. } else if util.IntAll(tmp["infoformat"]) == 3 {
  444. res = append(res, "产权", "产权")
  445. } else {
  446. SMap := NewClassificationRun(tt, tmp)
  447. for _, k := range SMap.Keys {
  448. res = append(res, util.ObjToString((SMap.Map[k].([]string))[0]))
  449. }
  450. }
  451. arr = append(arr, res)
  452. TEST.Put(filename, i+1)
  453. tmp = make(map[string]interface{})
  454. }
  455. if len(arr) > 0 {
  456. if !Exist("csv") {
  457. os.Mkdir("csv", 777)
  458. }
  459. f, _ := os.Create("csv/" + filename)
  460. w := csv.NewWriter(f)
  461. for _, str := range arr {
  462. w.Write(str)
  463. }
  464. w.Flush()
  465. f.Close()
  466. }
  467. log.Println("运行RUNTEST-OVER")
  468. }
  469. func Exist(filename string) bool {
  470. _, err := os.Stat(filename)
  471. return err == nil || os.IsExist(err)
  472. }
  473. func (tt *TTask) RRun() {
  474. first := make(chan bool, 1)
  475. if tt.I_status == 1 {
  476. first <- true
  477. }
  478. OVER:
  479. for tt.I_status == 1 {
  480. tt.B_Running = false
  481. select {
  482. case <-tt.FlagQuit: //结果任务控制
  483. log.Println("退出,RUN", tt.S_name)
  484. tt = nil
  485. break OVER
  486. case <-first: //第一次执行控制
  487. if tools.ControlTaskRun { //任务流程控制,现有模式用不到,默认false
  488. tools.AllTaskFinish = false
  489. }
  490. log.Println("第一次执行任务:", tt.S_name)
  491. newtaskrun(tt)
  492. case <-time.Tick(time.Duration(tt.I_rate) * time.Second): //任务定时控制
  493. //执行定时任务前,检查任务是否更新了rule
  494. if tt.B_UpdateRule {
  495. InitClassAndRuleData(tt.ID, tt) //重新加载任务的rule
  496. tt.B_UpdateRule = false
  497. UpdateTaskInfo(false, tt.ID)
  498. }
  499. //上一轮任务执行完毕再走下一轮(上一轮任务执行完毕的标志是业主分类执行完)
  500. //util.Debug("ControlTaskRun---", tools.ControlTaskRun, "AllTaskFinish---", tools.AllTaskFinish)
  501. if !tools.ControlTaskRun { //线下环境不控制定时任务
  502. newtaskrun(tt)
  503. } else if tools.ControlTaskRun && tools.AllTaskFinish { //线上控制
  504. tools.AllTaskFinish = false
  505. newtaskrun(tt)
  506. } else {
  507. log.Println("上轮任务暂未完成")
  508. }
  509. }
  510. }
  511. }
  512. func newtaskrun(tt *TTask) {
  513. //针对用户行业标签,需要单独处理
  514. if tt.S_name == "用户行业分类" {
  515. log.Println("执行任务:->", tt.S_name)
  516. DealUserKey(tt)
  517. } else {
  518. NewTaskRunAll(tt, false, nil)
  519. }
  520. }
  521. //DealUserKey 用户行业标签分类前预处理
  522. func DealUserKey(tt *TTask) {
  523. //最终更新的数据
  524. var updateUserPool [][]map[string]interface{}
  525. //开始识别
  526. pool := make(chan bool, tt.I_thread)
  527. wg := &sync.WaitGroup{}
  528. lock := &sync.Mutex{}
  529. q := make(map[string]interface{})
  530. var lastID string
  531. //1.获取查询条件
  532. comeintime := time.Now().Unix() - 5*60
  533. query := map[string]interface{}{
  534. "l_registedate": map[string]interface{}{
  535. "$lt": comeintime,
  536. },
  537. }
  538. qId := tt.MgoTask.GetMgoConn()
  539. defer tt.MgoTask.DestoryMongoConn(qId)
  540. tmpData := qId.DB(tt.S_mgodb).C(tt.S_coll).Find(&query).Limit(1).Sort("-_id").Iter()
  541. eId := ""
  542. for tmp := make(map[string]interface{}); tmpData.Next(tmp); {
  543. eId = u.BsonIdToSId(tmp["_id"])
  544. }
  545. if tt.LastId != "" {
  546. sid := tt.LastId
  547. if eId <= sid || eId == "" {
  548. return
  549. }
  550. q["_id"] = map[string]interface{}{
  551. "$gt": u.StringTOBsonId(sid),
  552. "$lte": u.StringTOBsonId(eId),
  553. }
  554. } else {
  555. q["_id"] = map[string]interface{}{
  556. "$lte": u.StringTOBsonId(eId),
  557. }
  558. }
  559. //2.条件封装完毕,开始查询数据
  560. sess := tt.MgoTask.GetMgoConn()
  561. defer tt.MgoTask.DestoryMongoConn(sess)
  562. log.Println(tt.S_name, " 查询条件:=>", q)
  563. extractquery := sess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(nil).Sort("_id").Iter()
  564. sum := 0
  565. realCount := 0
  566. for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ {
  567. lastID = u.BsonIdToSId(tmp["_id"])
  568. pool <- true
  569. wg.Add(1)
  570. go func(tmp map[string]interface{}) {
  571. defer func() {
  572. <-pool
  573. wg.Done()
  574. }()
  575. keys := u.GetUserKeys(tmp)
  576. tags := []string{}
  577. for _, v := range keys {
  578. tag := util.ObjToString(v)
  579. tags = append(tags, tag)
  580. }
  581. //按顺序识别
  582. update := map[string]interface{}{}
  583. if len(keys) > 0 {
  584. //用户关键词
  585. update["key_list"] = strings.Join(tags, ",")
  586. }
  587. lock.Lock()
  588. tmp["key_list"] = strings.Join(tags, ",")
  589. SMap := NewClassificationRun(tt, tmp)
  590. //fmt.Println("SMap=>", SMap)
  591. subtype := SMap.Map["subscope_dy"]
  592. if subtype != nil {
  593. if subs, ok := subtype.([]string); ok {
  594. update["subscope_dy"] = strings.Join(subs, ",")
  595. updatePool := []map[string]interface{}{
  596. {"_id": tmp["_id"]},
  597. {"$set": update},
  598. }
  599. updateUserPool = append(updateUserPool, updatePool)
  600. realCount++
  601. }
  602. }
  603. if len(updateUserPool) > 10 {
  604. tt.MgoTask.UpdateBulk(tt.S_collection, updateUserPool...)
  605. updateUserPool = [][]map[string]interface{}{}
  606. //log.Println("current:", tt.S_name, "number:", sum)
  607. }
  608. lock.Unlock()
  609. }(tmp)
  610. tmp = make(map[string]interface{})
  611. }
  612. wg.Wait()
  613. if lastID > tt.LastId {
  614. tt.LastId = lastID
  615. setid := map[string]interface{}{
  616. "$set": map[string]interface{}{
  617. "s_startid": tt.LastId,
  618. "s_starttime": time.Now().Unix(),
  619. },
  620. }
  621. go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, setid, false, false)
  622. }
  623. log.Println("运行", tt.S_name, "over", sum)
  624. }
  625. //NewTaskRunAll 常规任务和udp非合并数据处理方法
  626. func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
  627. total := 0
  628. tools.Try(func() { //不加这一层defer运行不了!!!
  629. timespan := false //时间间隔(控制数据条数打印)
  630. tt.B_Running = true
  631. defer func() {
  632. //业主分类执行完修改AllTaskFinish状态;控制流程的任务id(整个分类流程业主分类结尾,以此为标记)
  633. if tt.ID == tools.ControlLastTaskId {
  634. tools.AllTaskFinish = true
  635. }
  636. tt.B_Running = false
  637. }()
  638. //开始识别
  639. pool := make(chan bool, tt.I_thread)
  640. wg := &sync.WaitGroup{}
  641. lock := &sync.Mutex{}
  642. q := make(map[string]interface{})
  643. nextNodeSid, nextNodeEid := "", ""
  644. oid := tt.LastId //上次定时任务的结束id
  645. s_table := tt.S_table //结果表
  646. s_asfield := tt.S_asfield //关联字段
  647. asfields := strings.Split(s_asfield, "==")
  648. qfield := "" //查询表字段
  649. rfield := "" //结果表字段
  650. qtp := "" //查询字段的类型
  651. rtp := "" //结果字段的类型
  652. rtype := "" //结果字段的真实类型
  653. if s_table != "" { //有结果表保存到结果表(更新,插入)
  654. tt.S_collection = s_table
  655. }
  656. if len(asfields) == 2 {
  657. qfield = asfields[0] //查询表字段
  658. rfield = asfields[1] //结果表字段
  659. //id处理 object string互转
  660. qfield, rfield, qtp, rtp = IdTypeConversion(qfield, rfield)
  661. queryExis := map[string]interface{}{
  662. rfield: map[string]interface{}{
  663. "$exists": true,
  664. },
  665. }
  666. onedata, _ := tt.MgoTask.FindOne(tt.S_collection, queryExis)
  667. rtype = reflect.TypeOf((*onedata)[rfield]).String() //结果表字段真实类型
  668. }
  669. //有结果表有关联字段,在结果表上根据关联字段更新;有结果表没有关联字段,在结果表根据_id段更新
  670. //没有结果表在查询表上更新
  671. //log.Println("lastid:", tt.LastId, "查询方式:", tt.S_querycon, "时间:", tt.S_starttime, "条件:", tt.S_query, "table:", tt.S_table, "s_timefieldname:", tt.S_timefieldname)
  672. if !budp { //非udp查询条件
  673. if tt.S_query != "" { //有查询条件
  674. json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
  675. }
  676. idcoll := tt.S_idcoll
  677. if idcoll != "" { //idcoll中查询id区间,bidding_processing_ids
  678. nextNodeSid, nextNodeEid = FindId(idcoll) //查询id段
  679. if nextNodeSid != "" && nextNodeEid != "" && nextNodeSid <= nextNodeEid {
  680. q["_id"] = bson.M{
  681. "$gt": u.StringTOBsonId(nextNodeSid),
  682. "$lte": u.StringTOBsonId(nextNodeEid),
  683. }
  684. } else {
  685. log.Println("定时任务", tt.S_name, "查询", tt.S_idcoll, "时间段出错", nextNodeSid, nextNodeEid)
  686. tools.AllTaskFinish = true //为查询到数据视为此轮任务完成
  687. return
  688. }
  689. stime, _ := strconv.ParseInt(nextNodeSid[:8], 16, 64) //取id前8位转成时间戳
  690. etime, _ := strconv.ParseInt(nextNodeEid[:8], 16, 64) //
  691. if etime-stime < 1800 { //时间跨度小于半小时
  692. timespan = true
  693. }
  694. } else {
  695. if q["_id"] != nil {
  696. if _id, ok := q["_id"].(string); ok {
  697. q["_id"] = u.StringTOBsonId(_id)
  698. } else if _ids, ok := q["_id"].(map[string]interface{}); ok {
  699. for k, v := range _ids {
  700. if id, bk := v.(string); bk {
  701. _ids[k] = u.StringTOBsonId(id)
  702. }
  703. }
  704. }
  705. }
  706. if tt.S_querycon == "1" { //id查询
  707. //临时修改查询id区间段
  708. comeintime := time.Now().Unix() - 5*60
  709. query := map[string]interface{}{
  710. "comeintime": map[string]interface{}{
  711. "$gt": comeintime,
  712. },
  713. }
  714. qId := tt.MgoTask.GetMgoConn()
  715. defer tt.MgoTask.DestoryMongoConn(qId)
  716. tmpData := qId.DB(tt.S_mgodb).C(tt.S_coll).Find(&query).Limit(1).Sort("-_id").Iter()
  717. eId := ""
  718. for tmp := make(map[string]interface{}); tmpData.Next(tmp); {
  719. eId = u.BsonIdToSId(tmp["_id"])
  720. }
  721. if tt.LastId != "" && q["_id"] == nil {
  722. sid := tt.LastId
  723. if eId <= sid || eId == "" {
  724. return
  725. }
  726. q["_id"] = map[string]interface{}{
  727. "$gt": u.StringTOBsonId(sid),
  728. "$lte": u.StringTOBsonId(eId),
  729. }
  730. }
  731. time.Sleep(time.Minute * 2) //按id查询,为了保证有新数据入库,每次休息2分钟
  732. //测试环境q的赋值执行下述代码
  733. //if tt.LastId != "" && q["_id"] == nil {
  734. // q["_id"] = map[string]interface{}{
  735. // "$gt": u.StringTOBsonId(tt.LastId),
  736. // }
  737. //}
  738. } else { //时间查询
  739. name := tt.S_timefieldname
  740. q[name] = map[string]interface{}{
  741. "$gt": tt.S_starttime,
  742. }
  743. }
  744. }
  745. } else { //udp查询条件
  746. if tt.S_query != "" { //有查询条件
  747. json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
  748. }
  749. tmpq := mapInfo["q"].(map[string]interface{})
  750. for k, v := range tmpq {
  751. q[k] = v
  752. }
  753. sid := mapInfo["gtid"].(string)
  754. eid := mapInfo["lteid"].(string)
  755. stime, _ := strconv.ParseInt(sid[:8], 16, 64)
  756. etime, _ := strconv.ParseInt(eid[:8], 16, 64)
  757. if etime-stime < 1800 { //时间跨度小于半小时
  758. timespan = true
  759. }
  760. }
  761. //task
  762. tasksess := tt.MgoTask.GetMgoConn()
  763. defer tt.MgoTask.DestoryMongoConn(tasksess)
  764. log.Println("线程数:", tt.I_thread, "查询语句", q)
  765. log.Println("查询---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
  766. log.Println("select:", tt.Task_QueryFieldMap, tt.Task_QueryFieldArr)
  767. extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
  768. arr := [][]map[string]interface{}{}
  769. if tt.I_wordcount == 1 {
  770. tt.WordCount = map[string]map[string]int{}
  771. }
  772. sum := 0
  773. for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ {
  774. tid := tmp["_id"]
  775. if !timespan && sum%2000 == 0 {
  776. log.Println("current:", sum)
  777. }
  778. pool <- true
  779. wg.Add(1)
  780. go func(tmp map[string]interface{}) {
  781. defer func() {
  782. <-pool
  783. wg.Done()
  784. }()
  785. //按顺序识别
  786. tid := tmp["_id"]
  787. update := []map[string]interface{}{}
  788. //如果有关联字段 根据关联字段更新或者保存
  789. if len(asfields) == 2 {
  790. //log.Println("qfield====", qfield)
  791. field := tmp[qfield]
  792. if field != nil {
  793. //log.Println("field===", field, qtp, rtp)
  794. if qtp == "bson.ObjectId" && rtp == "bson.ObjectId" { //俩字段类型一致
  795. } else {
  796. //log.Println(field, " 查询字段", qfield, "查询类型", qtp, "结果字段", rfield, " 结果类型", rtp, "结果真实类型", rtype)
  797. if rtype == rtp { //转换后与真实类型不同,填写时类型错误
  798. //将查询字段类型转换为对应的结果字段类型
  799. if qtp == "bson.ObjectId" && rtp == "string" {
  800. field = u.BsonIdToSId(tmp[qfield])
  801. } else if qtp == "string" && rtp == "bson.ObjectId" {
  802. field = u.StringTOBsonId(tmp[qfield].(string))
  803. }
  804. }
  805. }
  806. update = append(update, map[string]interface{}{ //根据关联字段更新
  807. rfield: field,
  808. })
  809. }
  810. } else {
  811. update = append(update, map[string]interface{}{ //更新的条件 根据id更新
  812. "_id": tid,
  813. })
  814. }
  815. res := map[string]interface{}{}
  816. ksmap := make(map[string]map[string][]string)
  817. if util.IntAll(tmp["infoformat"]) == 2 { //此处增加特例
  818. res["toptype"] = "拟建"
  819. res["subtype"] = "拟建"
  820. } else if util.IntAll(tmp["infoformat"]) == 3 {
  821. res["toptype"] = "产权"
  822. res["subtype"] = "产权"
  823. } else {
  824. SMap := &tools.SortMap{}
  825. if tt.I_tasktype == 2 { //标签任务
  826. SMap = TagClassificationRun(tt, tmp)
  827. } else if tt.I_tasktype == 1 { //附件任务
  828. SMap = FileClassificationRun(tt, tmp)
  829. //res["projectinfo"] = tmp["projectinfo"]
  830. } else { //常规任务
  831. SMap = NewClassificationRun(tt, tmp)
  832. //一级分类时,符合结果中成交规则时
  833. if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" {
  834. if u.ChargeDetailResult(tmp["detail"].(string)) {
  835. SMap.Map["toptype"] = "结果"
  836. resa := ReSub(tt, tmp, "结果")
  837. subtype := resa.Map["subtype"]
  838. delete(SMap.Map, "subtype")
  839. SMap.Map["subtype"] = subtype
  840. }
  841. }
  842. }
  843. //追加时处理,//更新字段 I_fieldUpdate 0:覆盖 1:追加
  844. if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 {
  845. //封装追加信息
  846. if len(SMap.Keys) > 0 {
  847. for _, k := range SMap.Keys {
  848. ksarr := make([]string, 0)
  849. if k != "toptype" && k != "subtype" {
  850. ks, ok := SMap.Map[k].(string)
  851. if ok {
  852. ksarr = append(ksarr, ks)
  853. } else {
  854. ksarr = append(ksarr, SMap.Map[k].([]string)...)
  855. }
  856. }
  857. ksmap[k] = map[string][]string{
  858. "$each": ksarr,
  859. }
  860. }
  861. }
  862. } else { //非多分类
  863. res = SMap.Map
  864. if tt.I_tasktype == 1 { //附件任务
  865. if tmp["projectinfo"] != nil {
  866. res["projectinfo"] = tmp["projectinfo"]
  867. }
  868. }
  869. }
  870. }
  871. //if len(res) > 0 || len(ksmap) > 0 {
  872. IS.NewAdd(tt, res)
  873. if tt.S_attr != "" {
  874. //res[tt.S_attr] = 1
  875. res[tt.S_attr] = tt.AttrVal
  876. }
  877. if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //I_fieldUpdate 0:覆盖 1:追加
  878. if len(ksmap) > 0 && len(res) > 0 {
  879. update = append(update, map[string]interface{}{
  880. "$set": res,
  881. "$addToSet": ksmap,
  882. })
  883. } else if len(ksmap) > 0 && len(res) == 0 {
  884. update = append(update, map[string]interface{}{
  885. "$addToSet": ksmap,
  886. })
  887. } else if len(ksmap) == 0 && len(res) > 0 {
  888. update = append(update, map[string]interface{}{
  889. "$set": res,
  890. })
  891. }
  892. } else { //非多分类或者多分类的覆盖
  893. //log.Println("更新==", res)
  894. if len(res) > 0 {
  895. update = append(update, map[string]interface{}{
  896. "$set": res,
  897. })
  898. }
  899. }
  900. //更新
  901. lock.Lock()
  902. if len(update) == 2 { //有更新条件和更新内容时才进行更新操作
  903. arr = append(arr, update)
  904. }
  905. if len(arr) >= NN {
  906. //if s_table == "" {
  907. tt.MgoTask.UpdateBulk(tt.S_collection, arr...)
  908. //} else {
  909. // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...)
  910. //}
  911. arr = [][]map[string]interface{}{}
  912. }
  913. lock.Unlock()
  914. //}
  915. }(tmp)
  916. ttid := u.BsonIdToSId(tid)
  917. if ttid > tt.LastId {
  918. tt.LastId = ttid
  919. }
  920. tmp = make(map[string]interface{})
  921. }
  922. total = sum
  923. log.Println("总数:————", sum)
  924. if timespan {
  925. log.Println("current:————", sum)
  926. }
  927. wg.Wait()
  928. lock.Lock()
  929. if len(arr) > 0 {
  930. //if s_table == "" { //没有结果表
  931. tt.MgoTask.UpdateBulk(tt.S_collection, arr...) //在原表上更新
  932. //} else { //有结果表
  933. // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...)
  934. //}
  935. arr = [][]map[string]interface{}{}
  936. }
  937. lock.Unlock()
  938. tt.WcLock.Lock()
  939. if len(tt.WordCount) > 0 {
  940. savem := []map[string]interface{}{}
  941. tn := time.Now().Unix()
  942. for wk, wm := range tt.WordCount {
  943. for ck, cm := range wm {
  944. m1 := map[string]interface{}{
  945. "s_class": wk,
  946. "s_word": ck,
  947. "i_count": cm,
  948. "bz": tn,
  949. }
  950. savem = append(savem, m1)
  951. if len(savem) >= 1000 {
  952. tools.MgoClass.SaveBulk("wordcount", savem...)
  953. savem = []map[string]interface{}{}
  954. }
  955. }
  956. }
  957. if len(savem) > 0 {
  958. tools.MgoClass.SaveBulk("wordcount", savem...)
  959. savem = nil
  960. }
  961. }
  962. tt.WcLock.Unlock()
  963. //更新最后id
  964. if !budp && oid != tt.LastId {
  965. setid := map[string]interface{}{
  966. "$set": map[string]interface{}{
  967. "s_startid": tt.LastId,
  968. },
  969. }
  970. go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, setid, false, false)
  971. }
  972. //更新最后时间
  973. if !budp {
  974. nowtime := time.Now().Unix()
  975. settime := map[string]interface{}{
  976. "$set": map[string]interface{}{
  977. "s_starttime": nowtime,
  978. },
  979. }
  980. go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, settime, false, false)
  981. }
  982. //InitRule()
  983. log.Println("运行", tt.S_name, "over", oid, " endid:", tt.LastId)
  984. //定时任务完成发送udp信号调抽取
  985. if tools.Extract["preNodeId"] == tt.ID { //常规招标定时任务udp调用抽取
  986. if tt.S_idcoll == "" {
  987. nextNodeSid = oid
  988. nextNodeEid = tt.LastId
  989. }
  990. UdpRunExtract(nextNodeSid, nextNodeEid)
  991. }
  992. })
  993. return total
  994. }
  995. //udp合并数据处理的方法
  996. func UdpTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}, stype string) int {
  997. total := 0
  998. tools.Try(func() { //不加这一层defer运行不了!!!
  999. timespan := false
  1000. tt.B_Running = true
  1001. defer func() {
  1002. tt.B_Running = false
  1003. }()
  1004. //开始识别
  1005. pool := make(chan bool, tt.I_thread)
  1006. wg := &sync.WaitGroup{}
  1007. lock := &sync.Mutex{}
  1008. q := map[string]interface{}{}
  1009. s_table := tt.S_table //结果表
  1010. s_asfield := tt.S_asfield //关联字段
  1011. asfields := strings.Split(s_asfield, "==")
  1012. qfield := "" //查询表字段
  1013. rfield := "" //结果表字段
  1014. qtp := "" //查询字段的类型
  1015. rtp := "" //结果字段的类型
  1016. rtype := "" //结果字段的真实类型
  1017. if s_table != "" { //有结果表保存到结果表(更新,插入)
  1018. tt.S_collection = s_table
  1019. }
  1020. if len(asfields) == 2 {
  1021. qfield = asfields[0] //查询表字段
  1022. rfield = asfields[1] //结果表字段
  1023. //id处理 object string互转
  1024. qfield, rfield, qtp, rtp = IdTypeConversion(qfield, rfield)
  1025. queryExis := map[string]interface{}{
  1026. rfield: map[string]interface{}{
  1027. "$exists": true,
  1028. },
  1029. }
  1030. onedata, _ := tt.MgoTask.FindOne(tt.S_collection, queryExis)
  1031. rtype = reflect.TypeOf((*onedata)[rfield]).String() //结果表字段真实类型
  1032. }
  1033. //有结果表有关联字段,在结果表上根据关联字段更新;有结果表没有关联字段,在结果表根据_id段更新
  1034. //没有结果表在查询表上更新
  1035. //log.Println("lastid:", tt.LastId, "查询方式:", tt.S_querycon, "时间:", tt.S_starttime, "条件:", tt.S_query, "table:", tt.S_table, "s_timefieldname:", tt.S_timefieldname)
  1036. q = mapInfo["q"].(map[string]interface{})
  1037. //计算起始id和结束id时间宽度,大于半小时的2000数据打印一次个数,小于半小时的每条数据都打印一次个数
  1038. sid := mapInfo["gtid"].(string)
  1039. eid := mapInfo["lteid"].(string)
  1040. stime, _ := strconv.ParseInt(sid[:8], 16, 64)
  1041. etime, _ := strconv.ParseInt(eid[:8], 16, 64)
  1042. if etime-stime < 1800 { //时间跨度小于半小时
  1043. timespan = true
  1044. }
  1045. udpsess := tt.MulMgo.GetMgoConn()
  1046. if udpsess == nil {
  1047. log.Println("连接为空", tt.S_name, mapInfo)
  1048. return
  1049. }
  1050. defer tt.MulMgo.DestoryMongoConn(udpsess)
  1051. udptmp := udpsess.DB(tt.MulMgo.DbName).C(tt.MulColl).Find(&q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
  1052. //task
  1053. tasksess := tt.MgoTask.GetMgoConn()
  1054. defer tt.MgoTask.DestoryMongoConn(tasksess)
  1055. log.Println("线程数:", tt.I_thread, "查询语句", q)
  1056. log.Println("task---", tt.S_mgodb, tt.S_coll, tt.S_collection, tt.S_table)
  1057. log.Println("select:", tt.Task_QueryFieldMap)
  1058. extractquery := tasksess.DB(tt.S_mgodb).C(tt.S_coll).Find(&q).Select(tt.Task_QueryFieldMap).Sort("_id").Iter()
  1059. arr := [][]map[string]interface{}{}
  1060. oid := tt.LastId
  1061. if tt.I_wordcount == 1 {
  1062. tt.WordCount = map[string]map[string]int{}
  1063. }
  1064. sum := 0
  1065. //对比两张表数据,减少查询次数
  1066. var compare bson.M
  1067. for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ {
  1068. result := tmp
  1069. //对比
  1070. for {
  1071. if compare == nil {
  1072. compare = make(bson.M)
  1073. if !udptmp.Next(&compare) { //传参表数据赋值给compare
  1074. break
  1075. }
  1076. }
  1077. if compare != nil {
  1078. //对比
  1079. cid := u.BsonIdToSId(compare["_id"]) //传参表id
  1080. tid := u.BsonIdToSId(tmp["_id"]) //抽取表id
  1081. if cid == tid {
  1082. //更新抽取表的数据,再进行分类
  1083. for _, k := range tt.Task_QueryFieldArr {
  1084. v1 := compare[k]
  1085. v2 := tmp[k]
  1086. if v2 == nil && v1 != nil {
  1087. result[k] = v1
  1088. }
  1089. }
  1090. break
  1091. } else {
  1092. if tid > cid { //抽取表id大于传参表id
  1093. compare = nil
  1094. continue
  1095. } else {
  1096. break
  1097. }
  1098. }
  1099. } else {
  1100. break
  1101. }
  1102. }
  1103. tid := tmp["_id"]
  1104. if !timespan && sum%2000 == 0 {
  1105. log.Println("current:", sum, tid)
  1106. }
  1107. pool <- true
  1108. wg.Add(1)
  1109. go func(result map[string]interface{}) {
  1110. defer func() {
  1111. <-pool
  1112. wg.Done()
  1113. }()
  1114. //按顺序识别
  1115. if result != nil {
  1116. tid := result["_id"]
  1117. update := []map[string]interface{}{}
  1118. //如果有关联字段 根据关联字段更新或者保存
  1119. if len(asfields) == 2 {
  1120. //log.Println("qfield====", qfield)
  1121. field := result[qfield]
  1122. if field != nil {
  1123. //log.Println("field===", field, qtp, rtp)
  1124. if qtp == "bson.ObjectId" && rtp == "bson.ObjectId" { //俩字段类型一致
  1125. } else {
  1126. //log.Println(field, " 查询字段", qfield, "查询类型", qtp, "结果字段", rfield, " 结果类型", rtp, "结果真实类型", rtype)
  1127. if rtype == rtp { //转换后与真实类型不同,填写时类型错误
  1128. //将查询字段类型转换为对应的结果字段类型
  1129. if qtp == "bson.ObjectId" && rtp == "string" {
  1130. field = u.BsonIdToSId(result[qfield])
  1131. } else if qtp == "string" && rtp == "bson.ObjectId" {
  1132. field = u.StringTOBsonId(result[qfield].(string))
  1133. }
  1134. }
  1135. }
  1136. update = append(update, map[string]interface{}{ //根据关联字段更新
  1137. rfield: field,
  1138. })
  1139. }
  1140. } else {
  1141. update = append(update, map[string]interface{}{ //更新的条件 根据id更新
  1142. "_id": tid,
  1143. })
  1144. }
  1145. res := map[string]interface{}{}
  1146. ksmap := make(map[string]map[string][]string)
  1147. if util.IntAll(result["infoformat"]) == 2 { //此处增加特例
  1148. res["toptype"] = "拟建"
  1149. res["subtype"] = "拟建"
  1150. } else if util.IntAll(result["infoformat"]) == 3 {
  1151. res["toptype"] = "产权"
  1152. res["subtype"] = "产权"
  1153. } else {
  1154. SMap := &tools.SortMap{}
  1155. if tt.I_tasktype == 2 { //标签任务
  1156. SMap = TagClassificationRun(tt, result)
  1157. } else if tt.I_tasktype == 1 { //附件任务
  1158. SMap = FileClassificationRun(tt, result)
  1159. //res["projectinfo"] = tmp["projectinfo"]
  1160. } else { //常规任务
  1161. SMap = NewClassificationRun(tt, result)
  1162. //一级分类时,符合结果中成交规则时
  1163. if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" {
  1164. if u.ChargeDetailResult(tmp["detail"].(string)) {
  1165. SMap.Map["toptype"] = "结果"
  1166. resa := ReSub(tt, tmp, "结果")
  1167. subtype := resa.Map["subtype"]
  1168. delete(SMap.Map, "subtype")
  1169. SMap.Map["subtype"] = subtype
  1170. }
  1171. }
  1172. }
  1173. if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //更新字段 I_fieldUpdate 0:覆盖 1:追加
  1174. //封装追加信息
  1175. if len(SMap.Keys) > 0 {
  1176. for _, k := range SMap.Keys {
  1177. ksarr := make([]string, 0)
  1178. if k != "toptype" && k != "subtype" {
  1179. ks, ok := SMap.Map[k].(string)
  1180. if ok {
  1181. ksarr = append(ksarr, ks)
  1182. } else {
  1183. ksarr = append(ksarr, SMap.Map[k].([]string)...)
  1184. }
  1185. }
  1186. ksmap[k] = map[string][]string{
  1187. "$each": ksarr,
  1188. }
  1189. }
  1190. }
  1191. } else {
  1192. res = SMap.Map
  1193. }
  1194. }
  1195. IS.NewAdd(tt, res)
  1196. if tt.S_attr != "" {
  1197. res[tt.S_attr] = tt.AttrVal
  1198. }
  1199. if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 { //I_fieldUpdate 0:覆盖 1:追加
  1200. if len(ksmap) > 0 && len(res) > 0 {
  1201. update = append(update, map[string]interface{}{
  1202. "$set": res,
  1203. "$addToSet": ksmap,
  1204. })
  1205. } else if len(ksmap) > 0 && len(res) == 0 {
  1206. update = append(update, map[string]interface{}{
  1207. "$addToSet": ksmap,
  1208. })
  1209. } else if len(ksmap) == 0 && len(res) > 0 {
  1210. update = append(update, map[string]interface{}{
  1211. "$set": res,
  1212. })
  1213. }
  1214. } else {
  1215. if len(res) > 0 {
  1216. update = append(update, map[string]interface{}{
  1217. "$set": res,
  1218. })
  1219. }
  1220. }
  1221. //更新
  1222. lock.Lock()
  1223. if len(update) == 2 { //有更新条件和更新内容时才进行更新操作
  1224. arr = append(arr, update)
  1225. }
  1226. if len(arr) >= NN {
  1227. tt.MgoTask.UpdateBulk(tt.S_collection, arr...)
  1228. arr = [][]map[string]interface{}{}
  1229. }
  1230. lock.Unlock()
  1231. }
  1232. }(result)
  1233. ttid := u.BsonIdToSId(tid)
  1234. if ttid > tt.LastId {
  1235. tt.LastId = ttid
  1236. }
  1237. tmp = make(map[string]interface{})
  1238. }
  1239. total = sum
  1240. if timespan {
  1241. log.Println("current:", sum)
  1242. }
  1243. wg.Wait()
  1244. lock.Lock()
  1245. if len(arr) > 0 {
  1246. //if s_table == "" { //没有结果表
  1247. tt.MgoTask.UpdateBulk(tt.S_collection, arr...) //在原表上更新
  1248. //} else { //有结果表
  1249. // tt.Mgo.UpdateAndSaveBulk(tt.S_collection, arr...)
  1250. //}
  1251. arr = [][]map[string]interface{}{}
  1252. }
  1253. lock.Unlock()
  1254. tt.WcLock.Lock()
  1255. if len(tt.WordCount) > 0 {
  1256. savem := []map[string]interface{}{}
  1257. tn := time.Now().Unix()
  1258. for wk, wm := range tt.WordCount {
  1259. for ck, cm := range wm {
  1260. m1 := map[string]interface{}{
  1261. "s_class": wk,
  1262. "s_word": ck,
  1263. "i_count": cm,
  1264. "bz": tn,
  1265. }
  1266. savem = append(savem, m1)
  1267. if len(savem) >= 1000 {
  1268. tools.MgoClass.SaveBulk("wordcount", savem...)
  1269. savem = []map[string]interface{}{}
  1270. }
  1271. }
  1272. }
  1273. if len(savem) > 0 {
  1274. tools.MgoClass.SaveBulk("wordcount", savem...)
  1275. savem = nil
  1276. }
  1277. }
  1278. tt.WcLock.Unlock()
  1279. //更新最后id
  1280. if !budp && oid != tt.LastId {
  1281. setid := map[string]interface{}{
  1282. "$set": map[string]interface{}{
  1283. "s_startid": tt.LastId,
  1284. },
  1285. }
  1286. go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, setid, false, false)
  1287. }
  1288. //更新最后时间
  1289. if !budp {
  1290. nowtime := time.Now().Unix()
  1291. settime := map[string]interface{}{
  1292. "$set": map[string]interface{}{
  1293. "s_starttime": nowtime,
  1294. },
  1295. }
  1296. go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, settime, false, false)
  1297. }
  1298. //InitRule()
  1299. log.Println("运行", tt.S_name, "over")
  1300. })
  1301. return total
  1302. }
  1303. func UdpRunExtract(sid, eid string) { //5cb6c508a5cb26b9b70d6536
  1304. by, _ := json.Marshal(map[string]interface{}{
  1305. "gtid": sid,
  1306. "lteid": eid,
  1307. "stype": tools.ExtractStype,
  1308. })
  1309. log.Println("定时任务调下一节点分类:", tools.ExtractPort, string(by))
  1310. addr := &net.UDPAddr{
  1311. IP: net.ParseIP(tools.ExtractAddr),
  1312. Port: tools.ExtractPort,
  1313. }
  1314. //node := &tools.UdpNode{by, addr, time.Now().Unix(), 0}
  1315. //udptaskmap.Store(key, node)
  1316. tools.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  1317. }
  1318. func FindId_back(coll string) (gtid, lteid string) {
  1319. sum := 0 //记录数据总量
  1320. data, _ := tools.MgoClass.Find(coll, `{"isused":false}`, `{"_id":1}`, nil, false, -1, -1)
  1321. length := len(*data)
  1322. set := bson.M{
  1323. "$set": bson.M{
  1324. "isused": true,
  1325. "publishtime": time.Now().Unix(),
  1326. },
  1327. }
  1328. for i, d := range *data {
  1329. id := d["_id"]
  1330. count := util.IntAll(d["count"])
  1331. sum += count
  1332. if gtid == "" {
  1333. gtid = d["gtid"].(string)
  1334. }
  1335. if sum >= 5000 || i == length-1 { //总数大于5000或数据取完,取此id段数据
  1336. lteid := d["lteid"].(string)
  1337. go tools.MgoClass.Update(coll, bson.M{"_id": id}, set, false, false)
  1338. return gtid, lteid
  1339. } else { //总数小于5000,删除已使用数据
  1340. go tools.MgoClass.Update(coll, bson.M{"_id": id}, set, false, false)
  1341. }
  1342. }
  1343. return gtid, lteid
  1344. }
  1345. func FindId(coll string) (gtid, lteid string) {
  1346. data, _ := tools.MgoClass.Find(coll, map[string]interface{}{"dataprocess": 0}, `{"_id":1}`, nil, false, -1, -1)
  1347. for _, d := range *data {
  1348. gtid = d["gtid"].(string)
  1349. lteid = d["lteid"].(string)
  1350. set := map[string]interface{}{
  1351. "$set": map[string]interface{}{
  1352. "dataprocess": 1,
  1353. "updatetime": time.Now().Unix(),
  1354. },
  1355. }
  1356. tools.MgoClass.Update(coll, map[string]interface{}{"_id": d["_id"]}, set, false, false)
  1357. break
  1358. }
  1359. return gtid, lteid
  1360. }
  1361. func NewLoadTestTask(_id, s_mgourl, s_mgodb, s_coll, i_poolsize, s_startid, s_endid, s_query string) (bs bool, filename string) {
  1362. defer tools.Catch()
  1363. r, t, _ := NewAnalyTask(_id, s_mgourl, s_mgodb, s_coll, tools.IntAllDef(i_poolsize, 5))
  1364. //log.Println(m)
  1365. if r && t != nil {
  1366. filename = time.Now().Format("150405") + ".csv"
  1367. go t.RRunTest(s_startid, s_endid, s_query, filename)
  1368. bs = true
  1369. }
  1370. return
  1371. }
  1372. //加载任务
  1373. func NewLoadTask(_id string, res *tools.JSON) {
  1374. defer tools.Catch()
  1375. //初始化任务信息
  1376. InitTaskData(_id)
  1377. //初始化任务mgo配置信息
  1378. bres, tt, msg := NewAnalyTask(_id, "", "", "", 5)
  1379. tt.I_status = 1
  1380. log.Println(tt.S_mgodb, tt.S_name, tt.I_thread)
  1381. res.Msg = msg
  1382. if bres && tt != nil {
  1383. res.Status = true
  1384. NEWTASKPOOL[_id] = tt //存入当前启动任务
  1385. log.Println("加载", tt.S_name, "完成...", tt.S_query)
  1386. go tt.RRun()
  1387. }
  1388. }
  1389. //处理id的类型转换
  1390. func IdTypeConversion(q, r string) (string, string, string, string) {
  1391. qtp, rtp := "bson.ObjectId", "bson.ObjectId"
  1392. if strings.Contains(q, "ObjectId") || strings.Contains(q, "objectId") {
  1393. q = q[9 : len(q)-1]
  1394. } else if strings.Contains(q, "StringId") || strings.Contains(q, "stringId") {
  1395. q = q[9 : len(q)-1]
  1396. qtp = "string"
  1397. }
  1398. if strings.Contains(r, "ObjectId") || strings.Contains(r, "objectId") {
  1399. r = r[9 : len(r)-1]
  1400. } else if strings.Contains(r, "StringId") || strings.Contains(r, "stringId") {
  1401. r = r[9 : len(r)-1]
  1402. rtp = "string"
  1403. }
  1404. return q, r, qtp, rtp
  1405. }
  1406. //获取匹配或不匹配的个数
  1407. func GetNum(rule string) (int, bool) {
  1408. num := 1
  1409. isnum := strings.HasSuffix(rule, ")")
  1410. if !isnum { //是数字
  1411. s := []rune(rule)
  1412. last := string(s[len(s)-1:])
  1413. num = tools.IntAll(last)
  1414. }
  1415. return num, isnum
  1416. }
  1417. //获取规则
  1418. func GetRule(text string, isnum bool) (matchArr []string) {
  1419. if isnum { //最后一个不是数字
  1420. if strings.HasPrefix(text, "(") && strings.HasSuffix(text, ")") {
  1421. text = text[1 : len(text)-1]
  1422. matchArr = strings.Split(text, "|")
  1423. }
  1424. } else if strings.HasPrefix(text, "(") && !isnum {
  1425. text = text[1 : len(text)-2]
  1426. matchArr = strings.Split(text, "|")
  1427. }
  1428. return matchArr
  1429. }
  1430. func (d *DFA) AddWord(keys ...string) {
  1431. d.AddWordAll(true, keys...)
  1432. }
  1433. func (d *DFA) AddWordAll(haskey bool, keys ...string) {
  1434. if d.Link == nil {
  1435. d.Link = make(map[string]interface{})
  1436. }
  1437. for _, key := range keys {
  1438. nowMap := &d.Link
  1439. for i := 0; i < len(key); i++ {
  1440. kc := key[i : i+1]
  1441. if v, ok := (*nowMap)[kc]; ok {
  1442. nowMap, _ = v.(*map[string]interface{})
  1443. } else {
  1444. newMap := map[string]interface{}{}
  1445. newMap["YN"] = "0"
  1446. (*nowMap)[kc] = &newMap
  1447. nowMap = &newMap
  1448. }
  1449. if i == len(key)-1 {
  1450. (*nowMap)["YN"] = "1"
  1451. if haskey {
  1452. (*nowMap)["K"] = key
  1453. }
  1454. }
  1455. }
  1456. }
  1457. }
  1458. func (d *DFA) CheckSensitiveWord(src string, n int) (bool, []string) {
  1459. res := make([]string, 0)
  1460. tmpMap := make(map[string]int)
  1461. for j := 0; j < len(src); j++ {
  1462. nowMap := &d.Link
  1463. for i := j; i < len(src); i++ {
  1464. word := src[i : i+1]
  1465. nowMap, _ = (*nowMap)[word].(*map[string]interface{})
  1466. if nowMap != nil { // 存在,则判断是否为最后一个
  1467. if "1" == util.ObjToString((*nowMap)["YN"]) {
  1468. s := util.ObjToString((*nowMap)["K"])
  1469. tmpMap[s] = 1
  1470. //nowMap = &d.Link //匹配到之后继续匹配后边的内容
  1471. }
  1472. } else {
  1473. //nowMap = &d.Link
  1474. break
  1475. }
  1476. }
  1477. }
  1478. if len(tmpMap) >= n {
  1479. for k, _ := range tmpMap {
  1480. res = append(res, k)
  1481. }
  1482. return true, res
  1483. }
  1484. return false, []string{}
  1485. }
  1486. func (d *DFA) CheckSensitiveWordTest(src string, n int) (bool, []string) {
  1487. res := make([]string, 0)
  1488. tmpMap := make(map[string]int)
  1489. for j := 0; j < len(src); j++ {
  1490. nowMap := &d.Link
  1491. for i := j; i < len(src); i++ {
  1492. word := src[i : i+1]
  1493. nowMap, _ = (*nowMap)[word].(*map[string]interface{})
  1494. if nowMap != nil { // 存在,则判断是否为最后一个
  1495. if "1" == util.ObjToString((*nowMap)["YN"]) {
  1496. s := util.ObjToString((*nowMap)["K"])
  1497. tmpMap[s] = 1
  1498. //nowMap = &d.Link //匹配到之后继续匹配后边的内容
  1499. }
  1500. } else {
  1501. //nowMap = &d.Link
  1502. break
  1503. }
  1504. }
  1505. }
  1506. for k, _ := range tmpMap {
  1507. res = append(res, k)
  1508. }
  1509. return len(tmpMap) >= n, res
  1510. }
  1511. func UpdateTaskInfo(flag bool, tid string) bool {
  1512. query := bson.M{
  1513. "_id": u.StringTOBsonId(tid),
  1514. }
  1515. set := bson.M{
  1516. "$set": bson.M{
  1517. "b_updaterule": flag,
  1518. },
  1519. }
  1520. return tools.MgoClass.Update(tools.COLL_TASK, query, set, false, false)
  1521. }
  1522. //o_projectinfo中数据分类定时任务
  1523. func RunTask() {
  1524. if tools.IsStart { //是否开启定时任务
  1525. tt := InitTimeTask() //初始化任务
  1526. //StartTask(tt)
  1527. //return
  1528. c := cron.New()
  1529. cronstr := "0 */" + fmt.Sprint(tt.I_rate) + " * * * ?" //每TaskTime分钟执行一次
  1530. c.AddFunc(cronstr, func() { StartTask(tt) })
  1531. c.Start()
  1532. }
  1533. }
  1534. //初始化任务
  1535. func InitTimeTask() *TTask {
  1536. defer util.Catch()
  1537. timeTaskTT := &TTask{}
  1538. InitTaskData(tools.TimeTaskid)
  1539. bres, tt, _ := NewAnalyTask(tools.TimeTaskid, "", "", "", 5)
  1540. if bres && tt != nil {
  1541. timeTaskTT = tt
  1542. logger.Debug("初始化定时任务成功")
  1543. } else {
  1544. logger.Debug("初始化定时任务失败")
  1545. }
  1546. return timeTaskTT
  1547. }
  1548. //开始任务
  1549. func StartTask(t *TTask) {
  1550. defer util.Catch()
  1551. logger.Debug("开始执行定时任务")
  1552. query := map[string]interface{}{
  1553. "_id": map[string]interface{}{
  1554. "$gt": u.StringTOBsonId(tools.IdCollSid),
  1555. },
  1556. "dataprocess": 8,
  1557. }
  1558. order := map[string]interface{}{"_id": -1}
  1559. logger.Debug("query:", query)
  1560. list, _ := tools.MgoClass.Find(t.S_idcoll, query, order, nil, false, -1, -1)
  1561. sid := t.S_startid
  1562. eid := ""
  1563. if list != nil && len(*list) > 0 {
  1564. eid = util.ObjToString((*list)[0]["lteid"])
  1565. if eid <= sid {
  1566. logger.Debug("id err. sid:", sid, " eid:", eid)
  1567. return
  1568. }
  1569. t.S_startid = eid //更新任务中数据的起始id
  1570. tools.IdCollSid = u.BsonIdToSId((*list)[0]["_id"]) //更新id表起始id
  1571. //更新任务表中起始id
  1572. setid := map[string]interface{}{
  1573. "$set": map[string]interface{}{
  1574. "s_startid": t.S_startid,
  1575. },
  1576. }
  1577. go tools.MgoClass.Update("rc_task", `{"_id":"`+t.ID+`"}`, setid, false, false)
  1578. //查拟建数据
  1579. query := map[string]interface{}{
  1580. "_id": map[string]interface{}{
  1581. "$gt": u.StringTOBsonId(sid),
  1582. "$lte": u.StringTOBsonId(eid),
  1583. },
  1584. "toptype": "拟建",
  1585. }
  1586. sess := t.MgoTask.GetMgoConn()
  1587. defer t.MgoTask.DestoryMongoConn(sess)
  1588. count, _ := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Count()
  1589. logger.Debug("count:", count, " query:", query)
  1590. if count == 0 { //此轮任务没有查到数据
  1591. return
  1592. }
  1593. arr := [][]map[string]interface{}{}
  1594. wg := &sync.WaitGroup{}
  1595. lock := &sync.Mutex{}
  1596. pool := make(chan bool, t.I_thread)
  1597. sum := 0
  1598. logger.Debug("select:", t.Task_QueryFieldMap)
  1599. it := sess.DB(t.S_mgodb).C(t.S_coll).Find(&query).Select(t.Task_QueryFieldMap).Iter()
  1600. for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
  1601. pool <- true
  1602. wg.Add(1)
  1603. go func(tmp map[string]interface{}) {
  1604. defer func() {
  1605. <-pool
  1606. wg.Done()
  1607. }()
  1608. update := []map[string]interface{}{}
  1609. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  1610. SMap := &tools.SortMap{}
  1611. SMap = NewClassificationRun(t, tmp)
  1612. if len(SMap.Map) > 0 {
  1613. //一级分类时,符合结果中成交规则时
  1614. if SMap.Map["toptype"] == "招标" && SMap.Map["subtype"] != "单一" {
  1615. if u.ChargeDetailResult(tmp["detail"].(string)) {
  1616. SMap.Map["toptype"] = "结果"
  1617. resa := ReSub(t, tmp, "结果")
  1618. subtype := resa.Map["subtype"]
  1619. delete(SMap.Map, "subtype")
  1620. SMap.Map["subtype"] = subtype
  1621. }
  1622. }
  1623. update = append(update, map[string]interface{}{"$set": SMap.Map})
  1624. }
  1625. //更新
  1626. lock.Lock()
  1627. if len(update) == 2 { //有更新条件和更新内容时才进行更新操作
  1628. arr = append(arr, update)
  1629. }
  1630. if len(arr) >= NN {
  1631. tmps := arr
  1632. t.MgoTask.UpdateBulk(t.S_coll, tmps...)
  1633. arr = [][]map[string]interface{}{}
  1634. }
  1635. lock.Unlock()
  1636. }(tmp)
  1637. if sum%100 == 0 {
  1638. log.Println("current:", sum)
  1639. }
  1640. tmp = make(map[string]interface{})
  1641. }
  1642. wg.Wait()
  1643. lock.Lock()
  1644. if len(arr) > 0 {
  1645. t.MgoTask.UpdateBulk(t.S_coll, arr...)
  1646. arr = [][]map[string]interface{}{}
  1647. }
  1648. lock.Unlock()
  1649. logger.Debug("定时任务执行完毕 count:", sum)
  1650. UdpRunProjectForecast(sid, eid)
  1651. }
  1652. logger.Debug("Udp通知项目预测执行完毕")
  1653. }
  1654. //udp通知项目预测
  1655. func UdpRunProjectForecast(sid, eid string) {
  1656. by, _ := json.Marshal(map[string]interface{}{
  1657. "gtid": sid,
  1658. "lteid": eid,
  1659. })
  1660. logger.Debug("定时任务通知项目预测:", string(by))
  1661. addr := &net.UDPAddr{
  1662. IP: net.ParseIP(tools.NextNodeAddr),
  1663. Port: tools.NextNodePort,
  1664. }
  1665. tools.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  1666. }