task.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "project/config"
  6. "regexp"
  7. "strings"
  8. "sync"
  9. "time"
  10. "unicode/utf8"
  11. "github.com/goinggo/mapstructure"
  12. "github.com/robfig/cron"
  13. "go.mongodb.org/mongo-driver/bson"
  14. "go.uber.org/zap"
  15. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  16. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  17. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  18. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  19. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  20. )
  21. /**
  22. 任务入口
  23. 全量、增量合并
  24. 更新、插入,内存清理
  25. 转换成info对象
  26. **/
  27. // var PreRegexp = map[string][]*regexp.Regexp{}
  28. // var BackRegexp = map[string][]*regexp.Regexp{}
  29. // var BackRepRegexp = map[string][]RegexpInfo{}
  30. var BlackRegexp = map[string][]*regexp.Regexp{}
  31. var (
  32. //从标题获取项目编号
  33. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  34. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  35. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  36. //项目编号过滤
  37. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  38. //项目编号只是数字或只是字母4个以下
  39. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  40. //纯数字或纯字母
  41. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  42. //含分包词,招标未识别分包 合并到一个项目
  43. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  44. )
  45. type RegexpInfo struct {
  46. regs *regexp.Regexp
  47. repstr string
  48. }
  49. // 项目合并对象
  50. type ProjectTask struct {
  51. InitMinTime int64 //最小时间,小于0的处理一次
  52. name string
  53. thread int //线程数
  54. //查找锁
  55. findLock sync.Mutex
  56. wg sync.WaitGroup
  57. //map锁
  58. AllIdsMapLock sync.Mutex
  59. //对应的id
  60. AllIdsMap map[string]*ID
  61. //采购单位、项目名称、项目编号
  62. mapPb, mapPn, mapPc map[string]*Key
  63. //流程数据 字段相同,直接合并
  64. mapHref map[string]string
  65. mapHrefLock sync.Mutex
  66. //站点
  67. mapSite map[string]*Site
  68. mapSiteLock sync.Mutex
  69. //spider isflow
  70. mapSpider map[string]int
  71. mapSpiderLock sync.Mutex
  72. //bidtype、bidstatus 锁
  73. mapBidLock sync.Mutex
  74. //更新或新增通道
  75. updatePool chan []map[string]interface{}
  76. //savePool chan map[string]interface{}
  77. //saveSign, updateSign chan bool
  78. //表名
  79. coll string
  80. //当前状态是全量还是增量
  81. currentType string //当前是跑全量还是跑增量
  82. //
  83. clearContimes int
  84. //当前时间
  85. currentTime int64
  86. //保存长度
  87. saveSize int
  88. pici int64
  89. validTime int64
  90. statusTime int64
  91. //结果时间的更新 最近两天的公告不再更新jgtime
  92. jgTime int64
  93. Brun bool
  94. }
  95. func NewPT() *ProjectTask {
  96. p := &ProjectTask{
  97. InitMinTime: int64(1325347200),
  98. name: "全/增量对象",
  99. thread: Thread,
  100. updatePool: make(chan []map[string]interface{}, 5000),
  101. //savePool: make(chan map[string]interface{}, 2000),
  102. wg: sync.WaitGroup{},
  103. AllIdsMap: make(map[string]*ID, 5000000),
  104. mapPb: make(map[string]*Key, 1500000),
  105. mapPn: make(map[string]*Key, 5000000),
  106. mapPc: make(map[string]*Key, 5000000),
  107. mapHref: make(map[string]string, 1500000),
  108. mapSite: make(map[string]*Site, 1000000),
  109. mapSpider: make(map[string]int, 1000000),
  110. saveSize: 200,
  111. //saveSign: make(chan bool, 1),
  112. //updateSign: make(chan bool, 1),
  113. coll: ProjectColl,
  114. validTime: int64(util.IntAllDef(config.Conf.Serve.ValidDays, 150)) * 86400,
  115. statusTime: int64(util.IntAllDef(config.Conf.Serve.StatusDays, 15) * 86400),
  116. jgTime: int64(util.IntAllDef(7, 7) * 86400),
  117. currentType: "ql",
  118. }
  119. return p
  120. }
  121. var P_QL *ProjectTask
  122. var sp = make(chan bool, 1)
  123. func (p *ProjectTask) updateAllQueue() {
  124. arru := make([][]map[string]interface{}, p.saveSize)
  125. indexu := 0
  126. for {
  127. select {
  128. case v := <-p.updatePool:
  129. arru[indexu] = v
  130. indexu++
  131. if indexu == p.saveSize {
  132. sp <- true
  133. go func(arru [][]map[string]interface{}) {
  134. defer func() {
  135. <-sp
  136. }()
  137. MgoP.UpSertBulk(p.coll, arru...)
  138. }(arru)
  139. arru = make([][]map[string]interface{}, p.saveSize)
  140. indexu = 0
  141. }
  142. case <-time.After(1 * time.Second):
  143. if indexu > 0 {
  144. sp <- true
  145. go func(arru [][]map[string]interface{}) {
  146. defer func() {
  147. <-sp
  148. }()
  149. MgoP.UpSertBulk(p.coll, arru...)
  150. }(arru[:indexu])
  151. arru = make([][]map[string]interface{}, p.saveSize)
  152. indexu = 0
  153. }
  154. }
  155. }
  156. }
  157. // 项目合并内存更新
  158. func (p *ProjectTask) clearMem() {
  159. c := cron.New()
  160. // 创建项目的时间大于7天
  161. //在内存中保留最近6个月的信息
  162. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  163. _ = c.AddFunc("0 */30 * * * ?", func() {
  164. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 20 {
  165. SingleClear = 1
  166. //跳过的次数清零
  167. p.clearContimes = 0
  168. //信息进入查找对比全局锁
  169. p.findLock.Lock()
  170. //defer p.findLock.Unlock()
  171. //合并进行的任务都完成
  172. p.wg.Wait()
  173. log.Info("3")
  174. //遍历id
  175. //所有内存中的项目信息
  176. p.AllIdsMapLock.Lock()
  177. p.mapHrefLock.Lock()
  178. log.Info("清除开始")
  179. //清除计数
  180. clearNum := 0
  181. for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
  182. v := p.AllIdsMap[pid]
  183. if v != nil && p.currentTime-v.P.LastTime > p.validTime {
  184. delete(p.mapHref, kHref)
  185. }
  186. }
  187. for k, v := range p.AllIdsMap {
  188. if p.currentTime-v.P.LastTime > p.validTime {
  189. clearNum++
  190. redis.Del(RedisProject, k)
  191. //删除id的map
  192. delete(p.AllIdsMap, k)
  193. //删除pb
  194. if v.P.Buyer != "" {
  195. ids := p.mapPb[v.P.Buyer]
  196. if ids != nil {
  197. ids.Lock.Lock()
  198. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  199. if len(ids.Arr) == 0 {
  200. delete(p.mapPb, v.P.Buyer)
  201. }
  202. ids.Lock.Unlock()
  203. }
  204. }
  205. //删除mapPn
  206. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  207. if vn != "" {
  208. ids := p.mapPn[vn]
  209. if ids != nil {
  210. ids.Lock.Lock()
  211. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  212. if len(ids.Arr) == 0 {
  213. delete(p.mapPn, vn)
  214. }
  215. ids.Lock.Unlock()
  216. }
  217. }
  218. }
  219. //删除mapPc
  220. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  221. if vn != "" {
  222. ids := p.mapPc[vn]
  223. if ids != nil {
  224. ids.Lock.Lock()
  225. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  226. if len(ids.Arr) == 0 {
  227. delete(p.mapPc, vn)
  228. }
  229. ids.Lock.Unlock()
  230. }
  231. }
  232. }
  233. v = nil
  234. }
  235. }
  236. p.mapHrefLock.Unlock()
  237. p.AllIdsMapLock.Unlock()
  238. p.findLock.Unlock()
  239. SingleClear = 0
  240. log.Info("清除完成:", zap.Int("clearNum", clearNum), zap.Int("AllIdsMap:", len(p.AllIdsMap)), zap.Int("mapPn:", len(p.mapPn)),
  241. zap.Int("mapPc:", len(p.mapPc)), zap.Int("mapPb:", len(p.mapPb)), zap.Int("mapHref:", len(p.mapHref)))
  242. } else {
  243. p.clearContimes++
  244. }
  245. })
  246. c.Start()
  247. }
  248. // 全量合并
  249. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  250. defer util.Catch()
  251. p.thread = util.IntAllDef(Thread, 4)
  252. q, _ := udpInfo["query"].(map[string]interface{})
  253. if q == nil {
  254. q = map[string]interface{}{}
  255. lteid, _ := udpInfo["lteid"].(string)
  256. var idmap map[string]interface{}
  257. if len(lteid) > 15 {
  258. idmap = map[string]interface{}{
  259. "$lte": mongodb.StringTOBsonId(lteid),
  260. }
  261. }
  262. gtid, _ := udpInfo["gtid"].(string)
  263. if len(gtid) > 15 {
  264. if idmap == nil {
  265. idmap = map[string]interface{}{}
  266. }
  267. idmap["$gte"] = mongodb.StringTOBsonId(gtid)
  268. }
  269. if idmap != nil {
  270. q["_id"] = idmap
  271. }
  272. }
  273. if c := util.ObjToString(udpInfo["coll"]); c != "" {
  274. BiddingColl = c
  275. } else {
  276. BiddingColl = config.Conf.DB.MongoB.Coll
  277. }
  278. //生成查询语句执行
  279. p.enter(MgoB.DbName, BiddingColl, q)
  280. }
  281. // 增量合并
  282. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  283. defer util.Catch()
  284. //1、检查pubilshtime索引
  285. db, _ := udpInfo["db"].(string)
  286. if db == "" {
  287. db = MgoB.DbName
  288. }
  289. if c := util.ObjToString(udpInfo["coll"]); c != "" {
  290. BiddingColl = c
  291. } else {
  292. BiddingColl = config.Conf.DB.MongoB.Coll
  293. }
  294. thread := util.IntAllDef(Thread, 1)
  295. if thread > 0 {
  296. p.thread = thread
  297. }
  298. //开始id和结束id
  299. q, _ := udpInfo["query"].(map[string]interface{})
  300. gtid := udpInfo["gtid"].(string)
  301. lteid := udpInfo["lteid"].(string)
  302. q = map[string]interface{}{
  303. "_id": map[string]interface{}{
  304. "$gt": mongodb.StringTOBsonId(gtid),
  305. "$lte": mongodb.StringTOBsonId(lteid),
  306. },
  307. }
  308. p.enter(db, BiddingColl, q)
  309. if udpInfo["stop"] == nil {
  310. for i := 0; i < 1; i++ {
  311. sp <- true
  312. }
  313. for i := 0; i < 1; i++ {
  314. <-sp
  315. }
  316. }
  317. log.Info("保存完成,生索引", zap.Int64("pici:", p.pici))
  318. time.Sleep(5 * time.Second)
  319. nextNode(udpInfo, p.pici)
  320. }
  321. // 招标字段更新
  322. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  323. defer util.Catch()
  324. infoid := udpInfo["infoid"].(string)
  325. infoMap, _ := MgoB.FindById(util.ObjToString(udpInfo["coll"]), infoid, nil)
  326. if (*infoMap)["modifyinfo"] == nil {
  327. log.Info("does not exist modifyinfo ---," + infoid)
  328. return
  329. }
  330. //client := Es.GetEsConn()
  331. //defer Es.DestoryEsConn(client)
  332. //esquery := `{"query": {"bool": {"must": [{"match": {"ids": "` + infoid + `"}}]}}}`
  333. //data := Es.Get(config.Conf.DB.Es.Index, esquery)
  334. data, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, bson.M{"ids": infoid})
  335. if len(*data) > 0 {
  336. pid := mongodb.BsonIdToSId((*data)["_id"])
  337. p.updateJudge(*infoMap, pid)
  338. } else {
  339. log.Info("not find project---," + infoid)
  340. }
  341. }
  342. func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
  343. defer util.Catch()
  344. pid := util.ObjToString(udpInfo["pid"])
  345. updateMap := util.ObjToMap(udpInfo["updateField"])
  346. if pid == "" || len(*updateMap) == 0 {
  347. log.Error("参数有误")
  348. return
  349. }
  350. proMap, _ := MgoP.FindById(ProjectColl, pid, nil)
  351. if len(*proMap) > 1 {
  352. (*proMap)["reason"] = "直接修改项目字段信息"
  353. backupPro(*proMap)
  354. delete(*proMap, "reason")
  355. updataMap := make(map[string]interface{})
  356. modifyInfo := make(map[string]interface{})
  357. for k, v := range *updateMap {
  358. if strings.Contains(k, "time") {
  359. updataMap[k] = util.Int64All(v)
  360. } else {
  361. updataMap[k] = v
  362. }
  363. modifyInfo[k] = true
  364. }
  365. updataMap["modifyinfo"] = modifyInfo
  366. bol := MgoP.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": updataMap})
  367. if bol {
  368. //es索引
  369. by, _ := json.Marshal(map[string]interface{}{
  370. "query": map[string]interface{}{
  371. "_id": bson.M{
  372. "$gte": pid,
  373. "$lte": pid,
  374. }},
  375. "stype": "project",
  376. })
  377. _ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
  378. }
  379. // 内存
  380. var pro ProjectCache
  381. err := mapstructure.Decode(proMap, &pro)
  382. if err != nil {
  383. log.Error(err.Error())
  384. }
  385. p.AllIdsMapLock.Lock()
  386. if v, ok := p.AllIdsMap[pid]; ok {
  387. v.P = &pro
  388. }
  389. p.AllIdsMapLock.Unlock()
  390. } else {
  391. log.Info("Not find project---" + pid)
  392. }
  393. }
  394. func (p *ProjectTask) delInfoPro(udpInfo map[string]interface{}) {
  395. defer util.Catch()
  396. infoid := util.ObjToString(udpInfo["infoid"])
  397. if infoid == "" {
  398. return
  399. }
  400. //client := Es.GetEsConn()
  401. //defer Es.DestoryEsConn(client)
  402. //esquery := `{"query": {"bool": {"must": [{"term": {"ids": "` + infoid + `"}}]}}}`
  403. //data := Es.Get(config.Conf.DB.Es.Index, esquery)
  404. data, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, bson.M{"ids": infoid})
  405. if len(*data) > 0 {
  406. pid := mongodb.BsonIdToSId((*data)["_id"])
  407. p.delJudge(infoid, pid)
  408. } else {
  409. log.Info("not find project---," + infoid)
  410. }
  411. }
  412. // 通知下个节点nextNode
  413. func nextNode(mapInfo map[string]interface{}, pici int64) {
  414. mapInfo["stype"] = "project"
  415. mapInfo["query"] = map[string]interface{}{
  416. "pici": pici,
  417. }
  418. key := fmt.Sprintf("%d-%s-%d", pici, "project", 0)
  419. mapInfo["key"] = key
  420. datas, _ := json.Marshal(mapInfo)
  421. node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0}
  422. udptaskmap.Store(key, node)
  423. _ = udpclient.WriteUdp(datas, udp.OP_TYPE_DATA, toaddr[0])
  424. }
  425. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  426. defer util.Catch()
  427. defer func() {
  428. p.Brun = false
  429. }()
  430. p.Brun = true
  431. count := 0
  432. countRepeat := 0
  433. pool := make(chan bool, p.thread)
  434. log.Info("start project", zap.Any("q:", q), zap.String("coll:", coll), zap.Int64("pici", p.pici))
  435. sess := MgoB.GetMgoConn()
  436. defer MgoB.DestoryMongoConn(sess)
  437. infoPool := make(chan map[string]interface{}, 2000)
  438. over := make(chan bool)
  439. go func() {
  440. L:
  441. for {
  442. select {
  443. case tmp := <-infoPool:
  444. pool <- true
  445. go func(tmp map[string]interface{}) {
  446. defer func() {
  447. <-pool
  448. }()
  449. p.fillInPlace(tmp)
  450. info := ParseInfo(tmp)
  451. p.currentTime = info.Publishtime
  452. //普通合并
  453. p.CommonMerge(tmp, info)
  454. }(tmp)
  455. default:
  456. select {
  457. case tmp := <-infoPool:
  458. pool <- true
  459. go func(tmp map[string]interface{}) {
  460. defer func() {
  461. <-pool
  462. }()
  463. p.fillInPlace(tmp)
  464. info := ParseInfo(tmp)
  465. p.currentTime = info.Publishtime
  466. //普通合并
  467. p.CommonMerge(tmp, info)
  468. }(tmp)
  469. case <-over:
  470. break L
  471. }
  472. }
  473. }
  474. }()
  475. //fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0}
  476. fields := map[string]interface{}{"detail": 0, "contenthtml": 0, "jsondata": 0, "regions_log": 0, "field_source": 0}
  477. if p.currentType == "project" || p.currentType == "project_history" {
  478. c, _ := sess.DB(db).C(coll).Find(q).Count()
  479. log.Info(fmt.Sprintf("共查询: %d条", c))
  480. }
  481. ms := sess.DB(db).C(coll).Find(q).Select(fields)
  482. query := ms.Iter()
  483. var lastid interface{}
  484. L:
  485. for {
  486. select {
  487. case <-queryClose:
  488. log.Info("receive interrupt sign")
  489. log.Info("close iter..", zap.Any("lastid:", lastid), zap.Any("err:", query.Cursor.Close(nil)))
  490. queryCloseOver <- true
  491. break L
  492. default:
  493. tmp := make(map[string]interface{})
  494. if query.Next(&tmp) {
  495. lastid = tmp["_id"]
  496. if P_QL.currentType == "ql" {
  497. if count%50000 == 0 {
  498. log.Info("current---", zap.Int("count", count), zap.String("lastid", mongodb.BsonIdToSId(lastid)))
  499. }
  500. } else {
  501. if count%2000 == 0 {
  502. log.Info("current---", zap.Int("count", count), zap.String("lastid", mongodb.BsonIdToSId(lastid)))
  503. }
  504. }
  505. // 判重过滤 中标记录
  506. if !siteJudge(util.ObjToString(tmp["spidercode"])) {
  507. //extracttype -1: 重复,1: 不重复
  508. if util.IntAll(tmp["extracttype"]) == 1 {
  509. if util.ObjToString(tmp["toptype"]) != "采购意向" && util.ObjToString(tmp["toptype"]) != "产权" && util.ObjToString(tmp["toptype"]) != "拟建" {
  510. if P_QL.currentType == "ql" {
  511. infoPool <- tmp
  512. } else if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 {
  513. // id段增量数据
  514. infoPool <- tmp
  515. } else if P_QL.currentType == "project_history" && tmp["history_updatetime"] != nil {
  516. // id段 历史数据
  517. infoPool <- tmp
  518. }
  519. }
  520. } else {
  521. countRepeat++
  522. //if P_QL.currentType == "project" {
  523. // log.Info("repeat err---", tmp["_id"])
  524. //}
  525. }
  526. count++
  527. }
  528. } else {
  529. break L
  530. }
  531. }
  532. }
  533. time.Sleep(5 * time.Second)
  534. over <- true
  535. //阻塞
  536. for n := 0; n < p.thread; n++ {
  537. pool <- true
  538. }
  539. log.Info("所有线程执行完成...", zap.Int("count:", count), zap.Int("countRepeat", countRepeat))
  540. }
  541. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  542. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  543. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  544. proHref := util.ObjToString(jsonData["projecthref"]) // 网站本身发布的公告具有招投标流程,直接参与合并
  545. if jsonData != nil && proHref != "" {
  546. //projectHref字段合并
  547. tmp["projecthref"] = proHref
  548. p.mapHrefLock.Lock()
  549. pid := p.mapHref[proHref]
  550. p.mapHrefLock.Unlock()
  551. if pid != "" {
  552. p.AllIdsMapLock.Lock()
  553. res := p.AllIdsMap[pid]
  554. p.AllIdsMapLock.Unlock()
  555. if res != nil {
  556. comparePro := res.P
  557. _, ex := p.CompareStatus(comparePro, info)
  558. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  559. } else {
  560. p.startProjectMerge(info, tmp)
  561. }
  562. } else {
  563. id, p1 := p.NewProject(tmp, info)
  564. p.mapHrefLock.Lock()
  565. p.mapHref[proHref] = id
  566. p.mapHrefLock.Unlock()
  567. p.AllIdsMapLock.Lock()
  568. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  569. p.AllIdsMapLock.Unlock()
  570. }
  571. } else {
  572. //项目合并
  573. p.startProjectMerge(info, tmp)
  574. }
  575. } else {
  576. //项目合并
  577. p.startProjectMerge(info, tmp)
  578. }
  579. } else {
  580. }
  581. }
  582. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  583. bys, _ := json.Marshal(tmp)
  584. var thisinfo *Info
  585. _ = json.Unmarshal(bys, &thisinfo)
  586. if thisinfo == nil {
  587. return nil
  588. }
  589. // 处理publishtime为空
  590. if thisinfo.Publishtime <= 0 {
  591. for _, d := range DateTimeSelect {
  592. if tmp[d] != nil {
  593. thisinfo.Publishtime = util.Int64All(tmp[d])
  594. tmp["publishtime"] = tmp[d]
  595. break
  596. }
  597. }
  598. }
  599. if len(thisinfo.Topscopeclass) == 0 {
  600. thisinfo.Topscopeclass = []string{}
  601. }
  602. if len(thisinfo.Subscopeclass) == 0 {
  603. thisinfo.Subscopeclass = []string{}
  604. }
  605. if thisinfo.SubType == "" {
  606. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  607. }
  608. //从标题中查找项目编号
  609. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  610. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  611. thisinfo.PTC = res[1]
  612. } else {
  613. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  614. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  615. thisinfo.PTC = res[3]
  616. } else {
  617. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  618. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  619. thisinfo.PTC = res[1]
  620. }
  621. }
  622. }
  623. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  624. //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  625. //if thisinfo.ProjectName != "" {
  626. thisinfo.pnbval++
  627. //}
  628. }
  629. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  630. if thisinfo.ProjectCode != "" {
  631. //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  632. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  633. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  634. }
  635. } else {
  636. //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  637. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  638. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  639. }
  640. }
  641. thisinfo.pnbval++
  642. }
  643. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  644. thisinfo.PTC = ""
  645. }
  646. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  647. thisinfo.pnbval++
  648. } else {
  649. thisinfo.Buyer = ""
  650. }
  651. //清理评审专家名单
  652. if thisinfo.ReviewExperts != "" {
  653. thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
  654. }
  655. //winners整理、清理
  656. winner := QyFilter(util.ObjToString(tmp["winner"]), "winner")
  657. tmp["winner"] = winner
  658. m1 := map[string]bool{}
  659. winners := []string{}
  660. if winner != "" {
  661. m1[winner] = true
  662. winners = append(winners, winner)
  663. }
  664. packageM, _ := tmp["package"].(map[string]interface{})
  665. if packageM != nil {
  666. thisinfo.HasPackage = true
  667. for _, p := range packageM {
  668. pm, _ := p.(map[string]interface{})
  669. pw := QyFilter(util.ObjToString(pm["winner"]), "winner")
  670. if pw != "" && !m1[pw] {
  671. m1[pw] = true
  672. winners = append(winners, pw)
  673. }
  674. }
  675. }
  676. thisinfo.Winners = winners
  677. //清理winnerorder
  678. var wins []map[string]interface{}
  679. for _, v := range thisinfo.WinnerOrder {
  680. w := QyFilter(util.ObjToString(v["entname"]), "winner")
  681. if w != "" {
  682. v["entname"] = w
  683. wins = append(wins, v)
  684. }
  685. }
  686. thisinfo.WinnerOrder = wins
  687. //清理buyer
  688. buyer := QyFilter(util.ObjToString(tmp["buyer"]), "buyer")
  689. tmp["buyer"] = buyer
  690. thisinfo.Buyer = buyer
  691. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  692. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  693. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  694. //处理分包中数据异常问题
  695. for k, tmp := range thisinfo.Package {
  696. if ps, ok := tmp.([]map[string]interface{}); ok {
  697. for i, p := range ps {
  698. name, _ := p["name"].(string)
  699. if len([]rune(name)) > 100 {
  700. p["name"] = fmt.Sprint([]rune(name[:100]))
  701. }
  702. ps[i] = p
  703. }
  704. thisinfo.Package[k] = ps
  705. }
  706. }
  707. return thisinfo
  708. }
  709. func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
  710. tmpPro, _ := MgoP.FindById(ProjectColl, pid, nil)
  711. modifyProMap := make(map[string]interface{}) // 修改项目的字段
  712. if modifyMap, ok := infoMap["modifyinfo"].(map[string]interface{}); ok {
  713. for k := range modifyMap {
  714. if modifyMap[k] != nil && modifyMap[k] != "" && (*tmpPro)[k] != nil {
  715. modifyProMap[k] = infoMap[k]
  716. }
  717. }
  718. }
  719. if len(modifyProMap) == 0 {
  720. log.Info("修改招标公告信息不需要修改项目信息字段", zap.Any("id", infoMap["_id"]))
  721. return
  722. }
  723. p.AllIdsMapLock.Lock()
  724. _, ok := p.AllIdsMap[pid]
  725. p.AllIdsMapLock.Unlock()
  726. ids := (*tmpPro)["ids"].([]interface{})
  727. index, position := -1, 0 // index 0:第一个,1:中间,2:最后一个 position list中位置
  728. for i, v := range ids {
  729. if util.ObjToString(v) == mongodb.BsonIdToSId(infoMap["_id"]) {
  730. position = i
  731. if i == 0 {
  732. index = 0
  733. } else if i == len(ids)-1 {
  734. index = 2
  735. } else {
  736. index = 1
  737. }
  738. }
  739. }
  740. if ok {
  741. // 周期内
  742. //projecthref字段
  743. if infoMap["jsondata"] != nil {
  744. jsonData := infoMap["jsondata"].(map[string]interface{})
  745. if proHref, ok := jsonData["projecthref"].(string); ok {
  746. p.mapHrefLock.Lock()
  747. tempId := p.mapHref[proHref]
  748. p.mapHrefLock.Unlock()
  749. if pid == tempId {
  750. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  751. } else {
  752. log.Info("projecthref data id err---pid=" + pid + "---" + tempId)
  753. }
  754. } else {
  755. f := modifyEle(modifyProMap)
  756. if f {
  757. //合并、修改
  758. log.Info("合并修改更新" + "----------------------------")
  759. p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap)
  760. } else {
  761. //修改
  762. log.Info("修改更新" + "----------------------------")
  763. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  764. }
  765. }
  766. } else {
  767. f := modifyEle(modifyProMap)
  768. if f {
  769. //合并、修改
  770. log.Info("合并修改更新" + "----------------------------")
  771. p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap)
  772. } else {
  773. //修改
  774. log.Info("修改更新" + "----------------------------")
  775. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  776. }
  777. }
  778. } else {
  779. // 周期外
  780. log.Info("周期外数据直接修改" + "----------------------------")
  781. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  782. }
  783. }
  784. var Elements = []string{
  785. "projectname",
  786. "projectcode",
  787. "buyer",
  788. "agency",
  789. "area",
  790. "city",
  791. "publishtime",
  792. "toptype",
  793. "subtype",
  794. }
  795. /*
  796. *
  797. 修改的字段
  798. 修改的字段是否是影响合并流程的要素字段
  799. */
  800. func modifyEle(tmp map[string]interface{}) bool {
  801. merge := false
  802. for _, str := range Elements {
  803. if tmp[str] != nil {
  804. merge = true
  805. break
  806. }
  807. }
  808. return merge
  809. }
  810. // 补全位置信息
  811. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  812. area := util.ObjToString(tmp["area"])
  813. city := util.ObjToString(tmp["city"])
  814. if area != "" && city != "" {
  815. return
  816. }
  817. tmpSite := util.ObjToString(tmp["site"])
  818. if tmpSite == "" {
  819. return
  820. }
  821. p.mapSiteLock.Lock()
  822. defer p.mapSiteLock.Unlock()
  823. site := p.mapSite[tmpSite]
  824. if site != nil {
  825. if area != "" {
  826. if area == "全国" {
  827. tmp["area"] = site.Area
  828. tmp["city"] = site.City
  829. tmp["district"] = site.District
  830. return
  831. }
  832. if area != site.Area {
  833. return
  834. } else {
  835. if site.City != "" {
  836. tmp["area"] = site.Area
  837. tmp["city"] = site.City
  838. tmp["district"] = site.District
  839. }
  840. }
  841. } else {
  842. tmp["area"] = site.Area
  843. tmp["city"] = site.City
  844. tmp["district"] = site.District
  845. return
  846. }
  847. }
  848. }
  849. // 从数组中删除元素
  850. func deleteSlice(arr []string, v, stype string) []string {
  851. for k, v1 := range arr {
  852. if v1 == v {
  853. arr = append(arr[:k], arr[k+1:]...)
  854. return arr
  855. }
  856. }
  857. return arr
  858. }
  859. // 校验评审专家
  860. func ClearRp(tmp string) string {
  861. arrTmp := []string{}
  862. for _, v := range strings.Split(tmp, ",") {
  863. // 汉字过滤(全汉字,2-4个字)
  864. if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
  865. continue
  866. }
  867. //黑名单过滤
  868. if BlaskListMap[v] {
  869. continue
  870. }
  871. arrTmp = append(arrTmp, v)
  872. }
  873. return strings.Join(arrTmp, ",")
  874. }
  875. func QyFilter(name, stype string) string {
  876. name = strings.ReplaceAll(name, " ", "")
  877. //preReg := PreRegexp[stype]
  878. //for _, v := range preReg {
  879. // name = v.ReplaceAllString(name, "")
  880. //}
  881. //backReg := BackRegexp[stype]
  882. //for _, v := range backReg {
  883. // name = v.ReplaceAllString(name, "")
  884. //}
  885. //backRepReg := BackRepRegexp[stype]
  886. //for _, v := range backRepReg {
  887. // name = v.regs.ReplaceAllString(name, v.repstr)
  888. //}
  889. blackReg := BlackRegexp[stype]
  890. for _, v := range blackReg {
  891. if v.MatchString(name) {
  892. name = ""
  893. break
  894. }
  895. }
  896. if !regexp.MustCompile("[\\p{Han}]{4,}").MatchString(name) {
  897. name = ""
  898. }
  899. if utf8.RuneCountInString(name) > 60 {
  900. name = ""
  901. }
  902. return name
  903. }
  904. // @Description 过滤中标记录数据
  905. // @Author J 2023/6/12 15:11
  906. func siteJudge(code string) bool {
  907. for _, s := range SkipSiteList {
  908. if code == s {
  909. return true
  910. }
  911. }
  912. return false
  913. }