task.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "log"
  7. mu "mfw/util"
  8. "mongodb"
  9. "qfw/util"
  10. "regexp"
  11. "strings"
  12. "sync"
  13. "time"
  14. "unicode/utf8"
  15. "github.com/goinggo/mapstructure"
  16. "github.com/robfig/cron"
  17. "go.mongodb.org/mongo-driver/bson/primitive"
  18. )
  19. /**
  20. 任务入口
  21. 全量、增量合并
  22. 更新、插入,内存清理
  23. 转换成info对象
  24. **/
  25. var PreRegexp = map[string][]*regexp.Regexp{}
  26. var BackRegexp = map[string][]*regexp.Regexp{}
  27. var BackRepRegexp = map[string][]RegexpInfo{}
  28. var BlackRegexp = map[string][]*regexp.Regexp{}
  29. var (
  30. //从标题获取项目编号
  31. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  32. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  33. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  34. //项目编号过滤
  35. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  36. //项目编号只是数字或只是字母4个以下
  37. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  38. //纯数字或纯字母
  39. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  40. //含分包词,招标未识别分包 合并到一个项目
  41. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  42. )
  43. type RegexpInfo struct {
  44. regs *regexp.Regexp
  45. repstr string
  46. }
  47. //项目合并对象
  48. type ProjectTask struct {
  49. InitMinTime int64 //最小时间,小于0的处理一次
  50. name string
  51. thread int //线程数
  52. //查找锁
  53. findLock sync.Mutex
  54. wg sync.WaitGroup
  55. //map锁
  56. AllIdsMapLock sync.Mutex
  57. //对应的id
  58. AllIdsMap map[string]*ID
  59. //采购单位、项目名称、项目编号
  60. mapPb, mapPn, mapPc map[string]*Key
  61. //流程数据 字段相同,直接合并
  62. mapHref map[string]string
  63. mapHrefLock sync.Mutex
  64. //站点
  65. mapSite map[string]*Site
  66. mapSiteLock sync.Mutex
  67. //spider isflow
  68. mapSpider map[string]int
  69. mapSpiderLock sync.Mutex
  70. //bidtype、bidstatus 锁
  71. mapBidLock sync.Mutex
  72. //更新或新增通道
  73. updatePool chan []map[string]interface{}
  74. //savePool chan map[string]interface{}
  75. //saveSign, updateSign chan bool
  76. //表名
  77. coll string
  78. //当前状态是全量还是增量
  79. currentType string //当前是跑全量还是跑增量
  80. //
  81. clearContimes int
  82. //当前时间
  83. currentTime int64
  84. //保存长度
  85. saveSize int
  86. pici int64
  87. validTime int64
  88. statusTime int64
  89. //结果时间的更新 最近两天的公告不再更新jgtime
  90. jgTime int64
  91. // LockPool chan *sync.Mutex
  92. // LockPoolLock sync.Mutex
  93. // m1, m23, m4 map[int]int
  94. // l1, l23, l4 map[int]*sync.Mutex
  95. Brun bool
  96. }
  97. func NewPT() *ProjectTask {
  98. p := &ProjectTask{
  99. InitMinTime: int64(1325347200),
  100. name: "全/增量对象",
  101. thread: Thread,
  102. updatePool: make(chan []map[string]interface{}, 5000),
  103. //savePool: make(chan map[string]interface{}, 2000),
  104. wg: sync.WaitGroup{},
  105. AllIdsMap: make(map[string]*ID, 5000000),
  106. mapPb: make(map[string]*Key, 1500000),
  107. mapPn: make(map[string]*Key, 5000000),
  108. mapPc: make(map[string]*Key, 5000000),
  109. mapHref: make(map[string]string, 1500000),
  110. mapSite: make(map[string]*Site, 1000000),
  111. mapSpider: make(map[string]int, 1000000),
  112. saveSize: 100,
  113. //saveSign: make(chan bool, 1),
  114. //updateSign: make(chan bool, 1),
  115. coll: ProjectColl,
  116. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  117. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  118. jgTime: int64(util.IntAllDef(7, 7) * 86400),
  119. }
  120. return p
  121. }
  122. var P_QL *ProjectTask
  123. var sp = make(chan bool, 5)
  124. //初始化全量合并对象
  125. func init() {
  126. P_QL = NewPT()
  127. log.Println(len(P_QL.updatePool))
  128. go P_QL.updateAllQueue()
  129. go P_QL.clearMem()
  130. }
  131. func (p *ProjectTask) updateAllQueue() {
  132. arru := make([][]map[string]interface{}, p.saveSize)
  133. indexu := 0
  134. for {
  135. select {
  136. case v := <-p.updatePool:
  137. arru[indexu] = v
  138. indexu++
  139. if indexu == p.saveSize {
  140. sp <- true
  141. go func(arru [][]map[string]interface{}) {
  142. defer func() {
  143. <-sp
  144. }()
  145. MongoTool.UpSertBulk(p.coll, arru...)
  146. }(arru)
  147. arru = make([][]map[string]interface{}, p.saveSize)
  148. indexu = 0
  149. }
  150. case <-time.After(1000 * time.Millisecond):
  151. if indexu > 0 {
  152. sp <- true
  153. go func(arru [][]map[string]interface{}) {
  154. defer func() {
  155. <-sp
  156. }()
  157. MongoTool.UpSertBulk(p.coll, arru...)
  158. }(arru[:indexu])
  159. arru = make([][]map[string]interface{}, p.saveSize)
  160. indexu = 0
  161. }
  162. }
  163. }
  164. }
  165. //项目合并内存更新
  166. func (p *ProjectTask) clearMem() {
  167. c := cron.New()
  168. //在内存中保留最近6个月的信息
  169. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  170. _ = c.AddFunc("50 0/5 * * * *", func() {
  171. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
  172. SingleClear = 1
  173. //跳过的次数清零
  174. p.clearContimes = 0
  175. //信息进入查找对比全局锁
  176. p.findLock.Lock()
  177. //defer p.findLock.Unlock()
  178. //合并进行的任务都完成
  179. p.wg.Wait()
  180. //遍历id
  181. //所有内存中的项目信息
  182. p.AllIdsMapLock.Lock()
  183. p.mapHrefLock.Lock()
  184. log.Println("清除开始")
  185. //清除计数
  186. clearNum := 0
  187. for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
  188. v := p.AllIdsMap[pid]
  189. if p.currentTime-v.P.LastTime > p.validTime {
  190. delete(p.mapHref, kHref)
  191. }
  192. }
  193. for k, v := range p.AllIdsMap {
  194. if p.currentTime-v.P.LastTime > p.validTime {
  195. clearNum++
  196. //删除id的map
  197. delete(p.AllIdsMap, k)
  198. //删除pb
  199. if v.P.Buyer != "" {
  200. ids := p.mapPb[v.P.Buyer]
  201. if ids != nil {
  202. ids.Lock.Lock()
  203. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  204. if len(ids.Arr) == 0 {
  205. delete(p.mapPb, v.P.Buyer)
  206. }
  207. ids.Lock.Unlock()
  208. }
  209. }
  210. //删除mapPn
  211. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  212. if vn != "" {
  213. ids := p.mapPn[vn]
  214. if ids != nil {
  215. ids.Lock.Lock()
  216. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  217. if len(ids.Arr) == 0 {
  218. delete(p.mapPn, vn)
  219. }
  220. ids.Lock.Unlock()
  221. }
  222. }
  223. }
  224. //删除mapPc
  225. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  226. if vn != "" {
  227. ids := p.mapPc[vn]
  228. if ids != nil {
  229. ids.Lock.Lock()
  230. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  231. if len(ids.Arr) == 0 {
  232. delete(p.mapPc, vn)
  233. }
  234. ids.Lock.Unlock()
  235. }
  236. }
  237. }
  238. v = nil
  239. }
  240. }
  241. p.mapHrefLock.Unlock()
  242. p.AllIdsMapLock.Unlock()
  243. p.findLock.Unlock()
  244. SingleClear = 0
  245. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
  246. } else {
  247. p.clearContimes++
  248. }
  249. })
  250. c.Start()
  251. }
  252. //全量合并
  253. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  254. defer util.Catch()
  255. p.thread = util.IntAllDef(Thread, 4)
  256. q, _ := udpInfo["query"].(map[string]interface{})
  257. if q == nil {
  258. q = map[string]interface{}{}
  259. lteid, _ := udpInfo["lteid"].(string)
  260. var idmap map[string]interface{}
  261. if len(lteid) > 15 {
  262. idmap = map[string]interface{}{
  263. "$lte": StringTOBsonId(lteid),
  264. }
  265. }
  266. gtid, _ := udpInfo["gtid"].(string)
  267. if len(gtid) > 15 {
  268. if idmap == nil {
  269. idmap = map[string]interface{}{}
  270. }
  271. idmap["$gte"] = StringTOBsonId(gtid)
  272. }
  273. if idmap != nil {
  274. q["_id"] = idmap
  275. }
  276. }
  277. //生成查询语句执行
  278. log.Println("查询语句:", q)
  279. p.enter(MongoTool.DbName, ExtractColl, q)
  280. }
  281. //增量合并
  282. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  283. defer util.Catch()
  284. //1、检查pubilshtime索引
  285. db, _ := udpInfo["db"].(string)
  286. if db == "" {
  287. db = MongoTool.DbName
  288. }
  289. coll, _ := udpInfo["coll"].(string)
  290. if coll == "" {
  291. coll = ExtractColl
  292. }
  293. thread := util.IntAllDef(Thread, 4)
  294. if thread > 0 {
  295. p.thread = thread
  296. }
  297. //开始id和结束id
  298. q, _ := udpInfo["query"].(map[string]interface{})
  299. gtid := udpInfo["gtid"].(string)
  300. lteid := udpInfo["lteid"].(string)
  301. q = map[string]interface{}{
  302. "_id": map[string]interface{}{
  303. "$gt": StringTOBsonId(gtid),
  304. "$lte": StringTOBsonId(lteid),
  305. },
  306. }
  307. p.enter(db, coll, q)
  308. if udpInfo["stop"] == nil {
  309. for i := 0; i < 5; i++ {
  310. sp <- true
  311. }
  312. for i := 0; i < 5; i++ {
  313. <-sp
  314. }
  315. log.Println("保存完成,生索引", p.pici)
  316. time.Sleep(5 * time.Second)
  317. nextNode(udpInfo, p.pici)
  318. }
  319. }
  320. //招标字段更新
  321. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  322. defer util.Catch()
  323. infoid := udpInfo["infoid"].(string)
  324. infoMap := MgoBidding.FindById(util.ObjToString(udpInfo["coll"]), infoid)
  325. if infoMap["modifyinfo"] == nil {
  326. util.Debug("does not exist modifyinfo ---,", infoid)
  327. return
  328. }
  329. client := Es.GetEsConn()
  330. defer Es.DestoryEsConn(client)
  331. esquery := `{"query": {"bool": {"must": [{"match": {"ids": "`+infoid+`"}}]}}}`
  332. data := Es.Get(Index, Itype, esquery)
  333. if len(*data) > 0 {
  334. pid := util.ObjToString(((*data)[0])["_id"])
  335. p.updateJudge(infoMap, pid)
  336. }else {
  337. util.Debug("not find project---,", infoid)
  338. }
  339. }
  340. func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
  341. defer util.Catch()
  342. util.Debug(udpInfo)
  343. pid := util.ObjToString(udpInfo["pid"])
  344. updateMap := util.ObjToMap(udpInfo["updateField"])
  345. if pid == "" || len(*updateMap) == 0 {
  346. util.Debug("参数有误")
  347. return
  348. }
  349. proMap := MongoTool.FindById(ProjectColl, pid)
  350. if len(proMap) > 1 {
  351. proMap["reason"] = "直接修改项目字段信息"
  352. backupPro(proMap)
  353. delete(proMap, "reason")
  354. updataMap := make(map[string]interface{})
  355. modifyInfo := make(map[string]interface{})
  356. for k, v := range *updateMap{
  357. if strings.Contains(k, "time") {
  358. updataMap[k] = util.Int64All(v)
  359. }else {
  360. updataMap[k] = v
  361. }
  362. modifyInfo[k] = true
  363. }
  364. updataMap["modifyinfo"] = modifyInfo
  365. util.Debug(updataMap)
  366. bol := MongoTool.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": updataMap})
  367. if bol {
  368. //es索引
  369. by, _ := json.Marshal(map[string]interface{}{
  370. "query": map[string]interface{}{
  371. "_id": bson.M{
  372. "$gte": pid,
  373. "$lte": pid,
  374. }},
  375. "stype": "project",
  376. })
  377. util.Debug(string(by))
  378. _ = udpclient.WriteUdp(by, mu.OP_TYPE_DATA, toaddr[1])
  379. }
  380. // 内存
  381. var pro ProjectInfo
  382. err := mapstructure.Decode(proMap, &pro)
  383. if err != nil {
  384. util.Debug(err)
  385. }
  386. p.AllIdsMapLock.Lock()
  387. if v, ok := p.AllIdsMap[pid]; ok {
  388. v.P = &pro
  389. }
  390. p.AllIdsMapLock.Unlock()
  391. }else {
  392. util.Debug("Not find project---", pid)
  393. }
  394. }
  395. func (p *ProjectTask) delInfoPro(udpInfo map[string]interface{}) {
  396. defer util.Catch()
  397. util.Debug(udpInfo)
  398. infoid := util.ObjToString(udpInfo["infoid"])
  399. if infoid == "" {
  400. util.Debug("参数有误")
  401. return
  402. }
  403. client := Es.GetEsConn()
  404. defer Es.DestoryEsConn(client)
  405. esquery := `{"query": {"bool": {"must": [{"term": {"ids": "`+infoid+`"}}]}}}`
  406. data := Es.Get(Index, Itype, esquery)
  407. if len(*data) > 0 {
  408. pid := util.ObjToString(((*data)[0])["_id"])
  409. p.delJudge(infoid, pid)
  410. }else {
  411. util.Debug("not find project---,", infoid)
  412. }
  413. }
  414. //通知下个节点nextNode
  415. func nextNode(mapInfo map[string]interface{}, pici int64) {
  416. mapInfo["stype"] = "project"
  417. mapInfo["query"] = map[string]interface{}{
  418. "pici": pici,
  419. }
  420. key := fmt.Sprintf("%d-%s-%d", pici, "project", 0)
  421. mapInfo["key"] = key
  422. datas, _ := json.Marshal(mapInfo)
  423. node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0}
  424. udptaskmap.Store(key, node)
  425. _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, toaddr[0])
  426. }
  427. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  428. defer util.Catch()
  429. defer func() {
  430. p.Brun = false
  431. }()
  432. p.Brun = true
  433. count := 0
  434. countRepeat := 0
  435. pool := make(chan bool, p.thread)
  436. log.Println("start project", q, p.pici)
  437. sess := MongoTool.GetMgoConn()
  438. defer MongoTool.DestoryMongoConn(sess)
  439. infoPool := make(chan map[string]interface{}, 2000)
  440. over := make(chan bool)
  441. go func() {
  442. L:
  443. for {
  444. select {
  445. case tmp := <-infoPool:
  446. pool <- true
  447. go func(tmp map[string]interface{}) {
  448. defer func() {
  449. <-pool
  450. }()
  451. p.fillInPlace(tmp)
  452. info := ParseInfo(tmp)
  453. util.Debug(tmp["projectname"])
  454. util.Debug(info.ProjectName)
  455. p.currentTime = info.Publishtime
  456. //普通合并
  457. p.CommonMerge(tmp, info)
  458. }(tmp)
  459. case <-over:
  460. break L
  461. }
  462. }
  463. }()
  464. //fields := map[string]interface{} {"repeat": 1, "dataging": 1, "area": 1, "city": 1, "district": 1, "comeintime": 1, "publishtime": 1, "bidopentime": 1, "title": 1, "projectname": 1, "href": 1,
  465. // "projectcode": 1, "buyerclass": 1, "winner": 1, "buyer": 1, "buyerperson": 1, "buyertel": 1, "infoformat": 1, "toptype": 1, "subtype": 1, "spidercode": 1, "projectscope": 1, "contractcode": 1,
  466. // "site": 1, "topscopeclass": 1, "subscopeclass": 1, "bidamount": 1, "budget": 1, "agency": 1, "package": 1, "jsondata": 1, "review_experts": 1, "purchasing": 1, "winnerorder": 1}
  467. fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0}
  468. if p.currentType == "project" {
  469. c, _ := sess.DB(db).C(coll).Find(q).Count()
  470. util.Debug("共查询:", c, "条")
  471. }
  472. ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
  473. if Sysconfig["hints"] != nil {
  474. ms.Hint(Sysconfig["hints"])
  475. }
  476. query := ms.Iter()
  477. var lastid interface{}
  478. L:
  479. for {
  480. select {
  481. case <-queryClose:
  482. log.Println("receive interrupt sign")
  483. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  484. queryCloseOver <- true
  485. break L
  486. default:
  487. tmp := make(map[string]interface{})
  488. if query.Next(&tmp) {
  489. lastid = tmp["_id"]
  490. if P_QL.currentType == "ql" {
  491. if count%20000 == 0 {
  492. log.Println("current", count, lastid)
  493. }
  494. }else {
  495. if count%1000 == 0 {
  496. log.Println("current", count, lastid)
  497. }
  498. }
  499. if util.IntAll(tmp["repeat"]) == 0 {
  500. if P_QL.currentType == "ql" {
  501. infoPool <- tmp
  502. }else if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 {
  503. infoPool <- tmp
  504. }else {
  505. util.Debug("增量 dataging == 1 ", tmp["_id"])
  506. }
  507. }else {
  508. countRepeat++
  509. //if P_QL.currentType == "project" {
  510. // util.Debug("repeat err---", tmp["_id"])
  511. //}
  512. }
  513. count++
  514. } else {
  515. break L
  516. }
  517. }
  518. }
  519. time.Sleep(5 * time.Second)
  520. over <- true
  521. //阻塞
  522. for n := 0; n < p.thread; n++ {
  523. pool <- true
  524. }
  525. log.Println("所有线程执行完成...", count, countRepeat)
  526. }
  527. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  528. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  529. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  530. proHref := util.ObjToString(jsonData["projecthref"])
  531. if jsonData != nil && proHref != "" {
  532. //projectHref字段合并
  533. tmp["projecthref"] = proHref
  534. p.mapHrefLock.Lock()
  535. pid := p.mapHref[proHref]
  536. p.mapHrefLock.Unlock()
  537. if pid != "" {
  538. p.AllIdsMapLock.Lock()
  539. comparePro := p.AllIdsMap[pid].P
  540. p.AllIdsMapLock.Unlock()
  541. _, ex := p.CompareStatus(comparePro, info)
  542. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  543. } else {
  544. id, p1 := p.NewProject(tmp, info)
  545. p.mapHrefLock.Lock()
  546. p.mapHref[proHref] = id
  547. p.mapHrefLock.Unlock()
  548. p.AllIdsMapLock.Lock()
  549. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  550. p.AllIdsMapLock.Unlock()
  551. }
  552. } else {
  553. //项目合并
  554. p.startProjectMerge(info, tmp)
  555. }
  556. } else {
  557. //项目合并
  558. p.startProjectMerge(info, tmp)
  559. }
  560. }
  561. }
  562. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  563. bys, _ := json.Marshal(tmp)
  564. var thisinfo *Info
  565. _ = json.Unmarshal(bys, &thisinfo)
  566. if thisinfo == nil {
  567. return nil
  568. }
  569. if len(thisinfo.Topscopeclass) == 0 {
  570. thisinfo.Topscopeclass = []string{}
  571. }
  572. if len(thisinfo.Subscopeclass) == 0 {
  573. thisinfo.Subscopeclass = []string{}
  574. }
  575. if thisinfo.SubType == "" {
  576. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  577. }
  578. //从标题中查找项目编号
  579. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  580. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  581. thisinfo.PTC = res[1]
  582. } else {
  583. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  584. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  585. thisinfo.PTC = res[3]
  586. } else {
  587. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  588. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  589. thisinfo.PTC = res[1]
  590. }
  591. }
  592. }
  593. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  594. //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  595. //if thisinfo.ProjectName != "" {
  596. thisinfo.pnbval++
  597. //}
  598. }
  599. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  600. if thisinfo.ProjectCode != "" {
  601. //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  602. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  603. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  604. }
  605. } else {
  606. //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  607. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  608. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  609. }
  610. }
  611. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  612. thisinfo.pnbval++
  613. }
  614. }
  615. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  616. thisinfo.PTC = ""
  617. }
  618. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  619. thisinfo.pnbval++
  620. } else {
  621. thisinfo.Buyer = ""
  622. }
  623. //清理评审专家名单
  624. if len(thisinfo.ReviewExperts) > 0 {
  625. thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
  626. }
  627. //winners整理、清理
  628. winner := QyFilter(util.ObjToString(tmp["winner"]), "winner")
  629. tmp["winner"] = winner
  630. m1 := map[string]bool{}
  631. winners := []string{}
  632. if winner != "" {
  633. m1[winner] = true
  634. winners = append(winners, winner)
  635. }
  636. packageM, _ := tmp["package"].(map[string]interface{})
  637. if packageM != nil {
  638. thisinfo.HasPackage = true
  639. for _, p := range packageM {
  640. pm, _ := p.(map[string]interface{})
  641. pw := QyFilter(util.ObjToString(pm["winner"]), "winner")
  642. if pw != "" && !m1[pw] {
  643. m1[pw] = true
  644. winners = append(winners, pw)
  645. }
  646. }
  647. }
  648. thisinfo.Winners = winners
  649. //清理winnerorder
  650. var wins []map[string]interface{}
  651. for _, v := range thisinfo.WinnerOrder {
  652. w := QyFilter(util.ObjToString(v["entname"]), "winner")
  653. if w != "" {
  654. v["entname"] = w
  655. wins = append(wins, v)
  656. }
  657. }
  658. thisinfo.WinnerOrder = wins
  659. //清理buyer
  660. buyer := QyFilter(util.ObjToString(tmp["buyer"]), "buyer")
  661. tmp["buyer"] = buyer
  662. thisinfo.Buyer = buyer
  663. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  664. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  665. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  666. //处理分包中数据异常问题
  667. for k, tmp := range thisinfo.Package {
  668. if ps, ok := tmp.([]map[string]interface{}); ok {
  669. for i, p := range ps {
  670. name, _ := p["name"].(string)
  671. if len([]rune(name)) > 100 {
  672. p["name"] = fmt.Sprint([]rune(name[:100]))
  673. }
  674. ps[i] = p
  675. }
  676. thisinfo.Package[k] = ps
  677. }
  678. }
  679. return thisinfo
  680. }
  681. func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
  682. tmpPro := MongoTool.FindById(ProjectColl, pid)
  683. modifyProMap := make(map[string]interface{}) // 修改项目的字段
  684. if modifyMap, ok := infoMap["modifyinfo"].(map[string]interface{}); ok {
  685. for k := range modifyMap {
  686. if modifyMap[k] != nil && modifyMap[k] != "" && tmpPro[k] != nil {
  687. modifyProMap[k] = infoMap[k]
  688. }
  689. }
  690. }
  691. if len(modifyProMap) == 0 {
  692. util.Debug("修改招标公告信息不需要修改项目信息字段", infoMap["_id"])
  693. return
  694. }
  695. p.AllIdsMapLock.Lock()
  696. _, ok := p.AllIdsMap[pid]
  697. p.AllIdsMapLock.Unlock()
  698. ids := []interface{}(tmpPro["ids"].(primitive.A))
  699. index, position := -1, 0 // index 0:第一个,1:中间,2:最后一个 position list中位置
  700. for i, v := range ids {
  701. if util.ObjToString(v) == mongodb.BsonIdToSId(infoMap["_id"]) {
  702. position = i
  703. if i == 0 {
  704. index = 0
  705. }else if i == len(ids) - 1 {
  706. index = 2
  707. }else {
  708. index = 1
  709. }
  710. }
  711. }
  712. if ok {
  713. // 周期内
  714. //projecthref字段
  715. if infoMap["jsondata"] != nil {
  716. jsonData := infoMap["jsondata"].(map[string]interface{})
  717. if proHref, ok := jsonData["projecthref"].(string); ok {
  718. p.mapHrefLock.Lock()
  719. tempId := p.mapHref[proHref]
  720. p.mapHrefLock.Unlock()
  721. if pid == tempId {
  722. p.modifyUpdate(pid, index, position, tmpPro, modifyProMap)
  723. }else {
  724. util.Debug("projecthref data id err---pid=" + pid, "---"+tempId)
  725. }
  726. }else {
  727. f := modifyEle(modifyProMap)
  728. if f {
  729. //合并、修改
  730. util.Debug("合并修改更新", "----------------------------")
  731. p.mergeAndModify(pid, index, position, infoMap, tmpPro, modifyProMap)
  732. } else {
  733. //修改
  734. util.Debug("修改更新", "----------------------------")
  735. p.modifyUpdate(pid, index, position, tmpPro, modifyProMap)
  736. }
  737. }
  738. }else {
  739. f := modifyEle(modifyProMap)
  740. if f {
  741. //合并、修改
  742. util.Debug("合并修改更新", "----------------------------")
  743. p.mergeAndModify(pid, index, position, infoMap, tmpPro, modifyProMap)
  744. } else {
  745. //修改
  746. util.Debug("修改更新", "----------------------------")
  747. p.modifyUpdate(pid, index, position, tmpPro, modifyProMap)
  748. }
  749. }
  750. }else {
  751. // 周期外
  752. util.Debug("周期外数据直接修改", "----------------------------")
  753. p.modifyUpdate(pid, index, position, tmpPro, modifyProMap)
  754. }
  755. }
  756. var Elements = []string{
  757. "projectname",
  758. "projectcode",
  759. "buyer",
  760. "agency",
  761. "area",
  762. "city",
  763. "publishtime",
  764. "toptype",
  765. "subtype",
  766. }
  767. /**
  768. 修改的字段
  769. 修改的字段是否是影响合并流程的要素字段
  770. */
  771. func modifyEle(tmp map[string]interface{}) bool {
  772. merge := false
  773. for _, str := range Elements {
  774. if tmp[str] != nil {
  775. merge = true
  776. break
  777. }
  778. }
  779. return merge
  780. }
  781. //补全位置信息
  782. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  783. area := util.ObjToString(tmp["area"])
  784. city := util.ObjToString(tmp["city"])
  785. if area != "" && city != "" {
  786. return
  787. }
  788. tmpSite := util.ObjToString(tmp["site"])
  789. if tmpSite == "" {
  790. return
  791. }
  792. p.mapSiteLock.Lock()
  793. defer p.mapSiteLock.Unlock()
  794. site := p.mapSite[tmpSite]
  795. if site != nil {
  796. if area != "" {
  797. if area == "全国" {
  798. tmp["area"] = site.Area
  799. tmp["city"] = site.City
  800. tmp["district"] = site.District
  801. return
  802. }
  803. if area != site.Area {
  804. return
  805. } else {
  806. if site.City != "" {
  807. tmp["area"] = site.Area
  808. tmp["city"] = site.City
  809. tmp["district"] = site.District
  810. }
  811. }
  812. } else {
  813. tmp["area"] = site.Area
  814. tmp["city"] = site.City
  815. tmp["district"] = site.District
  816. return
  817. }
  818. }
  819. }
  820. //从数组中删除元素
  821. func deleteSlice(arr []string, v, stype string) []string {
  822. for k, v1 := range arr {
  823. if v1 == v {
  824. ts := time.Now().Unix()
  825. arr = append(arr[:k], arr[k+1:]...)
  826. rt := time.Now().Unix() - ts
  827. if rt > 0 {
  828. log.Println("deleteSlice", stype, rt, v, len(arr))
  829. }
  830. return arr
  831. }
  832. }
  833. return arr
  834. }
  835. //校验评审专家
  836. func ClearRp(tmp []string) []string {
  837. arrTmp := []string{}
  838. for _, v := range tmp {
  839. // 汉字过滤(全汉字,2-4个字)
  840. if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
  841. continue
  842. }
  843. //黑名单过滤
  844. if BlaskListMap[v] {
  845. continue
  846. }
  847. arrTmp = append(arrTmp, v)
  848. }
  849. return arrTmp
  850. }
  851. func QyFilter(name, stype string) string {
  852. name = strings.ReplaceAll(name, " ", "")
  853. preReg := PreRegexp[stype]
  854. for _, v := range preReg {
  855. name = v.ReplaceAllString(name, "")
  856. }
  857. backReg := BackRegexp[stype]
  858. for _, v := range backReg {
  859. name = v.ReplaceAllString(name, "")
  860. }
  861. backRepReg := BackRepRegexp[stype]
  862. for _, v := range backRepReg {
  863. name = v.regs.ReplaceAllString(name, v.repstr)
  864. }
  865. blackReg := BlackRegexp[stype]
  866. for _, v := range blackReg {
  867. if v.MatchString(name) {
  868. name = ""
  869. break
  870. }
  871. }
  872. if !regexp.MustCompile("[\\p{Han}]{4,}").MatchString(name) {
  873. name = ""
  874. }
  875. if utf8.RuneCountInString(name) > 60 {
  876. name = ""
  877. }
  878. return name
  879. }