task.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/goinggo/mapstructure"
  6. "github.com/robfig/cron"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.uber.org/zap"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  14. "project/config"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "time"
  19. "unicode/utf8"
  20. )
  21. /**
  22. 任务入口
  23. 全量、增量合并
  24. 更新、插入,内存清理
  25. 转换成info对象
  26. **/
  27. // var PreRegexp = map[string][]*regexp.Regexp{}
  28. // var BackRegexp = map[string][]*regexp.Regexp{}
  29. // var BackRepRegexp = map[string][]RegexpInfo{}
  30. var BlackRegexp = map[string][]*regexp.Regexp{}
  31. var (
  32. //从标题获取项目编号
  33. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  34. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  35. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  36. //项目编号过滤
  37. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  38. //项目编号只是数字或只是字母4个以下
  39. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  40. //纯数字或纯字母
  41. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  42. //含分包词,招标未识别分包 合并到一个项目
  43. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  44. )
  45. type RegexpInfo struct {
  46. regs *regexp.Regexp
  47. repstr string
  48. }
  49. // 项目合并对象
  50. type ProjectTask struct {
  51. InitMinTime int64 //最小时间,小于0的处理一次
  52. name string
  53. thread int //线程数
  54. //查找锁
  55. findLock sync.Mutex
  56. wg sync.WaitGroup
  57. //map锁
  58. AllIdsMapLock sync.Mutex
  59. //对应的id
  60. AllIdsMap map[string]*ID
  61. //采购单位、项目名称、项目编号
  62. mapPb, mapPn, mapPc map[string]*Key
  63. //流程数据 字段相同,直接合并
  64. mapHref map[string]string
  65. mapHrefLock sync.Mutex
  66. //站点
  67. mapSite map[string]*Site
  68. mapSiteLock sync.Mutex
  69. //spider isflow
  70. mapSpider map[string]int
  71. mapSpiderLock sync.Mutex
  72. //bidtype、bidstatus 锁
  73. mapBidLock sync.Mutex
  74. //更新或新增通道
  75. updatePool chan []map[string]interface{}
  76. //savePool chan map[string]interface{}
  77. //saveSign, updateSign chan bool
  78. //表名
  79. coll string
  80. //当前状态是全量还是增量
  81. currentType string //当前是跑全量还是跑增量
  82. //
  83. clearContimes int
  84. //当前时间
  85. currentTime int64
  86. //保存长度
  87. saveSize int
  88. pici int64
  89. validTime int64
  90. statusTime int64
  91. //结果时间的更新 最近两天的公告不再更新jgtime
  92. jgTime int64
  93. Brun bool
  94. }
  95. func NewPT() *ProjectTask {
  96. p := &ProjectTask{
  97. InitMinTime: int64(1325347200),
  98. name: "全/增量对象",
  99. thread: Thread,
  100. updatePool: make(chan []map[string]interface{}, 5000),
  101. //savePool: make(chan map[string]interface{}, 2000),
  102. wg: sync.WaitGroup{},
  103. AllIdsMap: make(map[string]*ID, 5000000),
  104. mapPb: make(map[string]*Key, 1500000),
  105. mapPn: make(map[string]*Key, 5000000),
  106. mapPc: make(map[string]*Key, 5000000),
  107. mapHref: make(map[string]string, 1500000),
  108. mapSite: make(map[string]*Site, 1000000),
  109. mapSpider: make(map[string]int, 1000000),
  110. saveSize: 200,
  111. //saveSign: make(chan bool, 1),
  112. //updateSign: make(chan bool, 1),
  113. coll: ProjectColl,
  114. validTime: int64(util.IntAllDef(config.Conf.Serve.ValidDays, 150)) * 86400,
  115. statusTime: int64(util.IntAllDef(config.Conf.Serve.StatusDays, 15) * 86400),
  116. jgTime: int64(util.IntAllDef(7, 7) * 86400),
  117. currentType: "ql",
  118. }
  119. return p
  120. }
  121. var P_QL *ProjectTask
  122. var sp = make(chan bool, 1)
  123. func (p *ProjectTask) updateAllQueue() {
  124. arru := make([][]map[string]interface{}, p.saveSize)
  125. indexu := 0
  126. for {
  127. select {
  128. case v := <-p.updatePool:
  129. arru[indexu] = v
  130. indexu++
  131. if indexu == p.saveSize {
  132. sp <- true
  133. go func(arru [][]map[string]interface{}) {
  134. defer func() {
  135. <-sp
  136. }()
  137. MgoP.UpSertBulk(p.coll, arru...)
  138. }(arru)
  139. arru = make([][]map[string]interface{}, p.saveSize)
  140. indexu = 0
  141. }
  142. case <-time.After(1 * time.Second):
  143. if indexu > 0 {
  144. sp <- true
  145. go func(arru [][]map[string]interface{}) {
  146. defer func() {
  147. <-sp
  148. }()
  149. MgoP.UpSertBulk(p.coll, arru...)
  150. }(arru[:indexu])
  151. arru = make([][]map[string]interface{}, p.saveSize)
  152. indexu = 0
  153. }
  154. }
  155. }
  156. }
  157. // 项目合并内存更新
  158. func (p *ProjectTask) clearMem() {
  159. c := cron.New()
  160. // 创建项目的时间大于7天
  161. //在内存中保留最近6个月的信息
  162. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  163. _ = c.AddFunc("0 0/1 * * * ?", func() {
  164. log.Info("1")
  165. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
  166. log.Info("2")
  167. SingleClear = 1
  168. //跳过的次数清零
  169. p.clearContimes = 0
  170. //信息进入查找对比全局锁
  171. p.findLock.Lock()
  172. //defer p.findLock.Unlock()
  173. //合并进行的任务都完成
  174. p.wg.Wait()
  175. log.Info("3")
  176. //遍历id
  177. //所有内存中的项目信息
  178. p.AllIdsMapLock.Lock()
  179. p.mapHrefLock.Lock()
  180. log.Info("清除开始")
  181. //清除计数
  182. clearNum := 0
  183. for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
  184. v := p.AllIdsMap[pid]
  185. if v != nil && p.currentTime-v.P.LastTime > p.validTime {
  186. delete(p.mapHref, kHref)
  187. }
  188. }
  189. for k, v := range p.AllIdsMap {
  190. if p.currentTime-v.P.LastTime > p.validTime {
  191. clearNum++
  192. redis.Del(RedisProject, k)
  193. //删除id的map
  194. delete(p.AllIdsMap, k)
  195. //删除pb
  196. if v.P.Buyer != "" {
  197. ids := p.mapPb[v.P.Buyer]
  198. if ids != nil {
  199. ids.Lock.Lock()
  200. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  201. if len(ids.Arr) == 0 {
  202. delete(p.mapPb, v.P.Buyer)
  203. }
  204. ids.Lock.Unlock()
  205. }
  206. }
  207. //删除mapPn
  208. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  209. if vn != "" {
  210. ids := p.mapPn[vn]
  211. if ids != nil {
  212. ids.Lock.Lock()
  213. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  214. if len(ids.Arr) == 0 {
  215. delete(p.mapPn, vn)
  216. }
  217. ids.Lock.Unlock()
  218. }
  219. }
  220. }
  221. //删除mapPc
  222. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  223. if vn != "" {
  224. ids := p.mapPc[vn]
  225. if ids != nil {
  226. ids.Lock.Lock()
  227. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  228. if len(ids.Arr) == 0 {
  229. delete(p.mapPc, vn)
  230. }
  231. ids.Lock.Unlock()
  232. }
  233. }
  234. }
  235. v = nil
  236. }
  237. }
  238. p.mapHrefLock.Unlock()
  239. p.AllIdsMapLock.Unlock()
  240. p.findLock.Unlock()
  241. SingleClear = 0
  242. log.Info("清除完成:", zap.Int("clearNum", clearNum), zap.Int("AllIdsMap:", len(p.AllIdsMap)), zap.Int("mapPn:", len(p.mapPn)),
  243. zap.Int("mapPc:", len(p.mapPc)), zap.Int("mapPb:", len(p.mapPb)), zap.Int("mapHref:", len(p.mapHref)))
  244. } else {
  245. p.clearContimes++
  246. }
  247. })
  248. c.Start()
  249. }
  250. // 全量合并
  251. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  252. defer util.Catch()
  253. p.thread = util.IntAllDef(Thread, 4)
  254. q, _ := udpInfo["query"].(map[string]interface{})
  255. if q == nil {
  256. q = map[string]interface{}{}
  257. lteid, _ := udpInfo["lteid"].(string)
  258. var idmap map[string]interface{}
  259. if len(lteid) > 15 {
  260. idmap = map[string]interface{}{
  261. "$lte": mongodb.StringTOBsonId(lteid),
  262. }
  263. }
  264. gtid, _ := udpInfo["gtid"].(string)
  265. if len(gtid) > 15 {
  266. if idmap == nil {
  267. idmap = map[string]interface{}{}
  268. }
  269. idmap["$gte"] = mongodb.StringTOBsonId(gtid)
  270. }
  271. if idmap != nil {
  272. q["_id"] = idmap
  273. }
  274. }
  275. if c := util.ObjToString(udpInfo["coll"]); c != "" {
  276. BiddingColl = c
  277. } else {
  278. BiddingColl = config.Conf.DB.MongoB.Coll
  279. }
  280. //生成查询语句执行
  281. p.enter(MgoB.DbName, BiddingColl, q)
  282. }
  283. // 增量合并
  284. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  285. defer util.Catch()
  286. //1、检查pubilshtime索引
  287. db, _ := udpInfo["db"].(string)
  288. if db == "" {
  289. db = MgoB.DbName
  290. }
  291. if c := util.ObjToString(udpInfo["coll"]); c != "" {
  292. BiddingColl = c
  293. } else {
  294. BiddingColl = config.Conf.DB.MongoB.Coll
  295. }
  296. thread := util.IntAllDef(Thread, 1)
  297. if thread > 0 {
  298. p.thread = thread
  299. }
  300. //开始id和结束id
  301. q, _ := udpInfo["query"].(map[string]interface{})
  302. gtid := udpInfo["gtid"].(string)
  303. lteid := udpInfo["lteid"].(string)
  304. q = map[string]interface{}{
  305. "_id": map[string]interface{}{
  306. "$gt": mongodb.StringTOBsonId(gtid),
  307. "$lte": mongodb.StringTOBsonId(lteid),
  308. },
  309. }
  310. p.enter(db, BiddingColl, q)
  311. if udpInfo["stop"] == nil {
  312. for i := 0; i < 1; i++ {
  313. sp <- true
  314. }
  315. for i := 0; i < 1; i++ {
  316. <-sp
  317. }
  318. }
  319. log.Info("保存完成,生索引", zap.Int64("pici:", p.pici))
  320. time.Sleep(5 * time.Second)
  321. nextNode(udpInfo, p.pici)
  322. }
  323. // 招标字段更新
  324. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  325. defer util.Catch()
  326. infoid := udpInfo["infoid"].(string)
  327. infoMap, _ := MgoB.FindById(util.ObjToString(udpInfo["coll"]), infoid, nil)
  328. if (*infoMap)["modifyinfo"] == nil {
  329. log.Info("does not exist modifyinfo ---," + infoid)
  330. return
  331. }
  332. //client := Es.GetEsConn()
  333. //defer Es.DestoryEsConn(client)
  334. //esquery := `{"query": {"bool": {"must": [{"match": {"ids": "` + infoid + `"}}]}}}`
  335. //data := Es.Get(config.Conf.DB.Es.Index, esquery)
  336. data, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, bson.M{"ids": infoid})
  337. if len(*data) > 0 {
  338. pid := mongodb.BsonIdToSId((*data)["_id"])
  339. p.updateJudge(*infoMap, pid)
  340. } else {
  341. log.Info("not find project---," + infoid)
  342. }
  343. }
  344. func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
  345. defer util.Catch()
  346. pid := util.ObjToString(udpInfo["pid"])
  347. updateMap := util.ObjToMap(udpInfo["updateField"])
  348. if pid == "" || len(*updateMap) == 0 {
  349. log.Error("参数有误")
  350. return
  351. }
  352. proMap, _ := MgoP.FindById(ProjectColl, pid, nil)
  353. if len(*proMap) > 1 {
  354. (*proMap)["reason"] = "直接修改项目字段信息"
  355. backupPro(*proMap)
  356. delete(*proMap, "reason")
  357. updataMap := make(map[string]interface{})
  358. modifyInfo := make(map[string]interface{})
  359. for k, v := range *updateMap {
  360. if strings.Contains(k, "time") {
  361. updataMap[k] = util.Int64All(v)
  362. } else {
  363. updataMap[k] = v
  364. }
  365. modifyInfo[k] = true
  366. }
  367. updataMap["modifyinfo"] = modifyInfo
  368. bol := MgoP.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": updataMap})
  369. if bol {
  370. //es索引
  371. by, _ := json.Marshal(map[string]interface{}{
  372. "query": map[string]interface{}{
  373. "_id": bson.M{
  374. "$gte": pid,
  375. "$lte": pid,
  376. }},
  377. "stype": "project",
  378. })
  379. _ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
  380. }
  381. // 内存
  382. var pro ProjectCache
  383. err := mapstructure.Decode(proMap, &pro)
  384. if err != nil {
  385. log.Error(err.Error())
  386. }
  387. p.AllIdsMapLock.Lock()
  388. if v, ok := p.AllIdsMap[pid]; ok {
  389. v.P = &pro
  390. }
  391. p.AllIdsMapLock.Unlock()
  392. } else {
  393. log.Info("Not find project---" + pid)
  394. }
  395. }
  396. func (p *ProjectTask) delInfoPro(udpInfo map[string]interface{}) {
  397. defer util.Catch()
  398. infoid := util.ObjToString(udpInfo["infoid"])
  399. if infoid == "" {
  400. return
  401. }
  402. //client := Es.GetEsConn()
  403. //defer Es.DestoryEsConn(client)
  404. //esquery := `{"query": {"bool": {"must": [{"term": {"ids": "` + infoid + `"}}]}}}`
  405. //data := Es.Get(config.Conf.DB.Es.Index, esquery)
  406. data, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, bson.M{"ids": infoid})
  407. if len(*data) > 0 {
  408. pid := mongodb.BsonIdToSId((*data)["_id"])
  409. p.delJudge(infoid, pid)
  410. } else {
  411. log.Info("not find project---," + infoid)
  412. }
  413. }
  414. // 通知下个节点nextNode
  415. func nextNode(mapInfo map[string]interface{}, pici int64) {
  416. mapInfo["stype"] = "project"
  417. mapInfo["query"] = map[string]interface{}{
  418. "pici": pici,
  419. }
  420. key := fmt.Sprintf("%d-%s-%d", pici, "project", 0)
  421. mapInfo["key"] = key
  422. datas, _ := json.Marshal(mapInfo)
  423. node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0}
  424. udptaskmap.Store(key, node)
  425. _ = udpclient.WriteUdp(datas, udp.OP_TYPE_DATA, toaddr[0])
  426. }
  427. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  428. defer util.Catch()
  429. defer func() {
  430. p.Brun = false
  431. }()
  432. p.Brun = true
  433. count := 0
  434. countRepeat := 0
  435. pool := make(chan bool, p.thread)
  436. log.Info("start project", zap.Any("q:", q), zap.String("coll:", coll), zap.Int64("pici", p.pici))
  437. sess := MgoB.GetMgoConn()
  438. defer MgoB.DestoryMongoConn(sess)
  439. infoPool := make(chan map[string]interface{}, 2000)
  440. over := make(chan bool)
  441. go func() {
  442. L:
  443. for {
  444. select {
  445. case tmp := <-infoPool:
  446. pool <- true
  447. go func(tmp map[string]interface{}) {
  448. defer func() {
  449. <-pool
  450. }()
  451. p.fillInPlace(tmp)
  452. info := ParseInfo(tmp)
  453. p.currentTime = info.Publishtime
  454. //普通合并
  455. p.CommonMerge(tmp, info)
  456. }(tmp)
  457. default:
  458. select {
  459. case tmp := <-infoPool:
  460. pool <- true
  461. go func(tmp map[string]interface{}) {
  462. defer func() {
  463. <-pool
  464. }()
  465. p.fillInPlace(tmp)
  466. info := ParseInfo(tmp)
  467. p.currentTime = info.Publishtime
  468. //普通合并
  469. p.CommonMerge(tmp, info)
  470. }(tmp)
  471. case <-over:
  472. break L
  473. }
  474. }
  475. }
  476. }()
  477. //fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0}
  478. fields := map[string]interface{}{"detail": 0, "contenthtml": 0, "jsondata": 0, "regions_log": 0, "field_source": 0}
  479. if p.currentType == "project" || p.currentType == "project_history" {
  480. c, _ := sess.DB(db).C(coll).Find(q).Count()
  481. log.Info(fmt.Sprintf("共查询: %d条", c))
  482. }
  483. ms := sess.DB(db).C(coll).Find(q).Select(fields)
  484. query := ms.Iter()
  485. var lastid interface{}
  486. L:
  487. for {
  488. select {
  489. case <-queryClose:
  490. log.Info("receive interrupt sign")
  491. log.Info("close iter..", zap.Any("lastid:", lastid), zap.Any("err:", query.Cursor.Close(nil)))
  492. queryCloseOver <- true
  493. break L
  494. default:
  495. tmp := make(map[string]interface{})
  496. if query.Next(&tmp) {
  497. lastid = tmp["_id"]
  498. if P_QL.currentType == "ql" {
  499. if count%50000 == 0 {
  500. log.Info("current---", zap.Int("count", count), zap.String("lastid", mongodb.BsonIdToSId(lastid)))
  501. }
  502. } else {
  503. if count%2000 == 0 {
  504. log.Info("current---", zap.Int("count", count), zap.String("lastid", mongodb.BsonIdToSId(lastid)))
  505. }
  506. }
  507. // 判重过滤 中标记录
  508. if !siteJudge(util.ObjToString(tmp["spidercode"])) {
  509. //extracttype -1: 重复,1: 不重复
  510. if util.IntAll(tmp["extracttype"]) == 1 {
  511. if util.ObjToString(tmp["toptype"]) != "采购意向" && util.ObjToString(tmp["toptype"]) != "产权" && util.ObjToString(tmp["toptype"]) != "拟建" {
  512. if P_QL.currentType == "ql" {
  513. infoPool <- tmp
  514. } else if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 {
  515. // id段增量数据
  516. infoPool <- tmp
  517. } else if P_QL.currentType == "project_history" && tmp["history_updatetime"] != nil {
  518. // id段 历史数据
  519. infoPool <- tmp
  520. }
  521. }
  522. } else {
  523. countRepeat++
  524. //if P_QL.currentType == "project" {
  525. // log.Info("repeat err---", tmp["_id"])
  526. //}
  527. }
  528. count++
  529. }
  530. } else {
  531. break L
  532. }
  533. }
  534. }
  535. time.Sleep(5 * time.Second)
  536. over <- true
  537. //阻塞
  538. for n := 0; n < p.thread; n++ {
  539. pool <- true
  540. }
  541. log.Info("所有线程执行完成...", zap.Int("count:", count), zap.Int("countRepeat", countRepeat))
  542. }
  543. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  544. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  545. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  546. proHref := util.ObjToString(jsonData["projecthref"]) // 网站本身发布的公告具有招投标流程,直接参与合并
  547. if jsonData != nil && proHref != "" {
  548. //projectHref字段合并
  549. tmp["projecthref"] = proHref
  550. p.mapHrefLock.Lock()
  551. pid := p.mapHref[proHref]
  552. p.mapHrefLock.Unlock()
  553. if pid != "" {
  554. p.AllIdsMapLock.Lock()
  555. res := p.AllIdsMap[pid]
  556. p.AllIdsMapLock.Unlock()
  557. if res != nil {
  558. comparePro := res.P
  559. _, ex := p.CompareStatus(comparePro, info)
  560. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  561. } else {
  562. p.startProjectMerge(info, tmp)
  563. }
  564. } else {
  565. id, p1 := p.NewProject(tmp, info)
  566. p.mapHrefLock.Lock()
  567. p.mapHref[proHref] = id
  568. p.mapHrefLock.Unlock()
  569. p.AllIdsMapLock.Lock()
  570. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  571. p.AllIdsMapLock.Unlock()
  572. }
  573. } else {
  574. //项目合并
  575. p.startProjectMerge(info, tmp)
  576. }
  577. } else {
  578. //项目合并
  579. p.startProjectMerge(info, tmp)
  580. }
  581. } else {
  582. }
  583. }
  584. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  585. bys, _ := json.Marshal(tmp)
  586. var thisinfo *Info
  587. _ = json.Unmarshal(bys, &thisinfo)
  588. if thisinfo == nil {
  589. return nil
  590. }
  591. // 处理publishtime为空
  592. if thisinfo.Publishtime <= 0 {
  593. for _, d := range DateTimeSelect {
  594. if tmp[d] != nil {
  595. thisinfo.Publishtime = util.Int64All(tmp[d])
  596. tmp["publishtime"] = tmp[d]
  597. break
  598. }
  599. }
  600. }
  601. if len(thisinfo.Topscopeclass) == 0 {
  602. thisinfo.Topscopeclass = []string{}
  603. }
  604. if len(thisinfo.Subscopeclass) == 0 {
  605. thisinfo.Subscopeclass = []string{}
  606. }
  607. if thisinfo.SubType == "" {
  608. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  609. }
  610. //从标题中查找项目编号
  611. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  612. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  613. thisinfo.PTC = res[1]
  614. } else {
  615. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  616. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  617. thisinfo.PTC = res[3]
  618. } else {
  619. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  620. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  621. thisinfo.PTC = res[1]
  622. }
  623. }
  624. }
  625. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  626. //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  627. //if thisinfo.ProjectName != "" {
  628. thisinfo.pnbval++
  629. //}
  630. }
  631. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  632. if thisinfo.ProjectCode != "" {
  633. //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  634. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  635. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  636. }
  637. } else {
  638. //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  639. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  640. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  641. }
  642. }
  643. thisinfo.pnbval++
  644. }
  645. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  646. thisinfo.PTC = ""
  647. }
  648. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  649. thisinfo.pnbval++
  650. } else {
  651. thisinfo.Buyer = ""
  652. }
  653. //清理评审专家名单
  654. if thisinfo.ReviewExperts != "" {
  655. thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
  656. }
  657. //winners整理、清理
  658. winner := QyFilter(util.ObjToString(tmp["winner"]), "winner")
  659. tmp["winner"] = winner
  660. m1 := map[string]bool{}
  661. winners := []string{}
  662. if winner != "" {
  663. m1[winner] = true
  664. winners = append(winners, winner)
  665. }
  666. packageM, _ := tmp["package"].(map[string]interface{})
  667. if packageM != nil {
  668. thisinfo.HasPackage = true
  669. for _, p := range packageM {
  670. pm, _ := p.(map[string]interface{})
  671. pw := QyFilter(util.ObjToString(pm["winner"]), "winner")
  672. if pw != "" && !m1[pw] {
  673. m1[pw] = true
  674. winners = append(winners, pw)
  675. }
  676. }
  677. }
  678. thisinfo.Winners = winners
  679. //清理winnerorder
  680. var wins []map[string]interface{}
  681. for _, v := range thisinfo.WinnerOrder {
  682. w := QyFilter(util.ObjToString(v["entname"]), "winner")
  683. if w != "" {
  684. v["entname"] = w
  685. wins = append(wins, v)
  686. }
  687. }
  688. thisinfo.WinnerOrder = wins
  689. //清理buyer
  690. buyer := QyFilter(util.ObjToString(tmp["buyer"]), "buyer")
  691. tmp["buyer"] = buyer
  692. thisinfo.Buyer = buyer
  693. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  694. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  695. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  696. //处理分包中数据异常问题
  697. for k, tmp := range thisinfo.Package {
  698. if ps, ok := tmp.([]map[string]interface{}); ok {
  699. for i, p := range ps {
  700. name, _ := p["name"].(string)
  701. if len([]rune(name)) > 100 {
  702. p["name"] = fmt.Sprint([]rune(name[:100]))
  703. }
  704. ps[i] = p
  705. }
  706. thisinfo.Package[k] = ps
  707. }
  708. }
  709. return thisinfo
  710. }
  711. func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
  712. tmpPro, _ := MgoP.FindById(ProjectColl, pid, nil)
  713. modifyProMap := make(map[string]interface{}) // 修改项目的字段
  714. if modifyMap, ok := infoMap["modifyinfo"].(map[string]interface{}); ok {
  715. for k := range modifyMap {
  716. if modifyMap[k] != nil && modifyMap[k] != "" && (*tmpPro)[k] != nil {
  717. modifyProMap[k] = infoMap[k]
  718. }
  719. }
  720. }
  721. if len(modifyProMap) == 0 {
  722. log.Info("修改招标公告信息不需要修改项目信息字段", zap.Any("id", infoMap["_id"]))
  723. return
  724. }
  725. p.AllIdsMapLock.Lock()
  726. _, ok := p.AllIdsMap[pid]
  727. p.AllIdsMapLock.Unlock()
  728. ids := (*tmpPro)["ids"].([]interface{})
  729. index, position := -1, 0 // index 0:第一个,1:中间,2:最后一个 position list中位置
  730. for i, v := range ids {
  731. if util.ObjToString(v) == mongodb.BsonIdToSId(infoMap["_id"]) {
  732. position = i
  733. if i == 0 {
  734. index = 0
  735. } else if i == len(ids)-1 {
  736. index = 2
  737. } else {
  738. index = 1
  739. }
  740. }
  741. }
  742. if ok {
  743. // 周期内
  744. //projecthref字段
  745. if infoMap["jsondata"] != nil {
  746. jsonData := infoMap["jsondata"].(map[string]interface{})
  747. if proHref, ok := jsonData["projecthref"].(string); ok {
  748. p.mapHrefLock.Lock()
  749. tempId := p.mapHref[proHref]
  750. p.mapHrefLock.Unlock()
  751. if pid == tempId {
  752. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  753. } else {
  754. log.Info("projecthref data id err---pid=" + pid + "---" + tempId)
  755. }
  756. } else {
  757. f := modifyEle(modifyProMap)
  758. if f {
  759. //合并、修改
  760. log.Info("合并修改更新" + "----------------------------")
  761. p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap)
  762. } else {
  763. //修改
  764. log.Info("修改更新" + "----------------------------")
  765. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  766. }
  767. }
  768. } else {
  769. f := modifyEle(modifyProMap)
  770. if f {
  771. //合并、修改
  772. log.Info("合并修改更新" + "----------------------------")
  773. p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap)
  774. } else {
  775. //修改
  776. log.Info("修改更新" + "----------------------------")
  777. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  778. }
  779. }
  780. } else {
  781. // 周期外
  782. log.Info("周期外数据直接修改" + "----------------------------")
  783. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  784. }
  785. }
  786. var Elements = []string{
  787. "projectname",
  788. "projectcode",
  789. "buyer",
  790. "agency",
  791. "area",
  792. "city",
  793. "publishtime",
  794. "toptype",
  795. "subtype",
  796. }
  797. /*
  798. *
  799. 修改的字段
  800. 修改的字段是否是影响合并流程的要素字段
  801. */
  802. func modifyEle(tmp map[string]interface{}) bool {
  803. merge := false
  804. for _, str := range Elements {
  805. if tmp[str] != nil {
  806. merge = true
  807. break
  808. }
  809. }
  810. return merge
  811. }
  812. // 补全位置信息
  813. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  814. area := util.ObjToString(tmp["area"])
  815. city := util.ObjToString(tmp["city"])
  816. if area != "" && city != "" {
  817. return
  818. }
  819. tmpSite := util.ObjToString(tmp["site"])
  820. if tmpSite == "" {
  821. return
  822. }
  823. p.mapSiteLock.Lock()
  824. defer p.mapSiteLock.Unlock()
  825. site := p.mapSite[tmpSite]
  826. if site != nil {
  827. if area != "" {
  828. if area == "全国" {
  829. tmp["area"] = site.Area
  830. tmp["city"] = site.City
  831. tmp["district"] = site.District
  832. return
  833. }
  834. if area != site.Area {
  835. return
  836. } else {
  837. if site.City != "" {
  838. tmp["area"] = site.Area
  839. tmp["city"] = site.City
  840. tmp["district"] = site.District
  841. }
  842. }
  843. } else {
  844. tmp["area"] = site.Area
  845. tmp["city"] = site.City
  846. tmp["district"] = site.District
  847. return
  848. }
  849. }
  850. }
  851. // 从数组中删除元素
  852. func deleteSlice(arr []string, v, stype string) []string {
  853. for k, v1 := range arr {
  854. if v1 == v {
  855. arr = append(arr[:k], arr[k+1:]...)
  856. return arr
  857. }
  858. }
  859. return arr
  860. }
  861. // 校验评审专家
  862. func ClearRp(tmp string) string {
  863. arrTmp := []string{}
  864. for _, v := range strings.Split(tmp, ",") {
  865. // 汉字过滤(全汉字,2-4个字)
  866. if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
  867. continue
  868. }
  869. //黑名单过滤
  870. if BlaskListMap[v] {
  871. continue
  872. }
  873. arrTmp = append(arrTmp, v)
  874. }
  875. return strings.Join(arrTmp, ",")
  876. }
  877. func QyFilter(name, stype string) string {
  878. name = strings.ReplaceAll(name, " ", "")
  879. //preReg := PreRegexp[stype]
  880. //for _, v := range preReg {
  881. // name = v.ReplaceAllString(name, "")
  882. //}
  883. //backReg := BackRegexp[stype]
  884. //for _, v := range backReg {
  885. // name = v.ReplaceAllString(name, "")
  886. //}
  887. //backRepReg := BackRepRegexp[stype]
  888. //for _, v := range backRepReg {
  889. // name = v.regs.ReplaceAllString(name, v.repstr)
  890. //}
  891. blackReg := BlackRegexp[stype]
  892. for _, v := range blackReg {
  893. if v.MatchString(name) {
  894. name = ""
  895. break
  896. }
  897. }
  898. if !regexp.MustCompile("[\\p{Han}]{4,}").MatchString(name) {
  899. name = ""
  900. }
  901. if utf8.RuneCountInString(name) > 60 {
  902. name = ""
  903. }
  904. return name
  905. }
  906. // @Description 过滤中标记录数据
  907. // @Author J 2023/6/12 15:11
  908. func siteJudge(code string) bool {
  909. for _, s := range SkipSiteList {
  910. if code == s {
  911. return true
  912. }
  913. }
  914. return false
  915. }