task.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "log"
  7. mu "mfw/util"
  8. "mongodb"
  9. "qfw/util"
  10. "qfw/util/redis"
  11. "regexp"
  12. "strings"
  13. "sync"
  14. "time"
  15. "unicode/utf8"
  16. "github.com/goinggo/mapstructure"
  17. "github.com/robfig/cron"
  18. )
  19. /**
  20. 任务入口
  21. 全量、增量合并
  22. 更新、插入,内存清理
  23. 转换成info对象
  24. **/
  25. //var PreRegexp = map[string][]*regexp.Regexp{}
  26. //var BackRegexp = map[string][]*regexp.Regexp{}
  27. //var BackRepRegexp = map[string][]RegexpInfo{}
  28. var BlackRegexp = map[string][]*regexp.Regexp{}
  29. var (
  30. //从标题获取项目编号
  31. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  32. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  33. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  34. //项目编号过滤
  35. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  36. //项目编号只是数字或只是字母4个以下
  37. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  38. //纯数字或纯字母
  39. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  40. //含分包词,招标未识别分包 合并到一个项目
  41. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  42. )
  43. type RegexpInfo struct {
  44. regs *regexp.Regexp
  45. repstr string
  46. }
  47. //项目合并对象
  48. type ProjectTask struct {
  49. InitMinTime int64 //最小时间,小于0的处理一次
  50. name string
  51. thread int //线程数
  52. //查找锁
  53. findLock sync.Mutex
  54. wg sync.WaitGroup
  55. //map锁
  56. AllIdsMapLock sync.Mutex
  57. //对应的id
  58. AllIdsMap map[string]*ID
  59. //采购单位、项目名称、项目编号
  60. mapPb, mapPn, mapPc map[string]*Key
  61. //流程数据 字段相同,直接合并
  62. mapHref map[string]string
  63. mapHrefLock sync.Mutex
  64. //站点
  65. mapSite map[string]*Site
  66. mapSiteLock sync.Mutex
  67. //spider isflow
  68. mapSpider map[string]int
  69. mapSpiderLock sync.Mutex
  70. //bidtype、bidstatus 锁
  71. mapBidLock sync.Mutex
  72. //更新或新增通道
  73. updatePool chan []map[string]interface{}
  74. //savePool chan map[string]interface{}
  75. //saveSign, updateSign chan bool
  76. //表名
  77. coll string
  78. //当前状态是全量还是增量
  79. currentType string //当前是跑全量还是跑增量
  80. //
  81. clearContimes int
  82. //当前时间
  83. currentTime int64
  84. //保存长度
  85. saveSize int
  86. pici int64
  87. validTime int64
  88. statusTime int64
  89. //结果时间的更新 最近两天的公告不再更新jgtime
  90. jgTime int64
  91. Brun bool
  92. }
  93. func NewPT() *ProjectTask {
  94. p := &ProjectTask{
  95. InitMinTime: int64(1325347200),
  96. name: "全/增量对象",
  97. thread: Thread,
  98. updatePool: make(chan []map[string]interface{}, 5000),
  99. //savePool: make(chan map[string]interface{}, 2000),
  100. wg: sync.WaitGroup{},
  101. AllIdsMap: make(map[string]*ID, 5000000),
  102. mapPb: make(map[string]*Key, 1500000),
  103. mapPn: make(map[string]*Key, 5000000),
  104. mapPc: make(map[string]*Key, 5000000),
  105. mapHref: make(map[string]string, 1500000),
  106. mapSite: make(map[string]*Site, 1000000),
  107. mapSpider: make(map[string]int, 1000000),
  108. saveSize: 200,
  109. //saveSign: make(chan bool, 1),
  110. //updateSign: make(chan bool, 1),
  111. coll: ProjectColl,
  112. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  113. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  114. jgTime: int64(util.IntAllDef(7, 7) * 86400),
  115. }
  116. return p
  117. }
  118. var P_QL *ProjectTask
  119. var sp = make(chan bool, 1)
  120. func (p *ProjectTask) updateAllQueue() {
  121. arru := make([][]map[string]interface{}, p.saveSize)
  122. indexu := 0
  123. for {
  124. select {
  125. case v := <-p.updatePool:
  126. arru[indexu] = v
  127. indexu++
  128. if indexu == p.saveSize {
  129. sp <- true
  130. go func(arru [][]map[string]interface{}) {
  131. defer func() {
  132. <-sp
  133. }()
  134. MongoTool.UpSertBulk(p.coll, arru...)
  135. }(arru)
  136. arru = make([][]map[string]interface{}, p.saveSize)
  137. indexu = 0
  138. }
  139. case <-time.After(1 * time.Second):
  140. if indexu > 0 {
  141. sp <- true
  142. go func(arru [][]map[string]interface{}) {
  143. defer func() {
  144. <-sp
  145. }()
  146. MongoTool.UpSertBulk(p.coll, arru...)
  147. }(arru[:indexu])
  148. arru = make([][]map[string]interface{}, p.saveSize)
  149. indexu = 0
  150. }
  151. }
  152. }
  153. }
  154. //项目合并内存更新
  155. func (p *ProjectTask) clearMem() {
  156. c := cron.New()
  157. // 创建项目的时间大于7天
  158. //在内存中保留最近6个月的信息
  159. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  160. _ = c.AddFunc("50 0/5 * * * *", func() {
  161. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
  162. SingleClear = 1
  163. //跳过的次数清零
  164. p.clearContimes = 0
  165. //信息进入查找对比全局锁
  166. p.findLock.Lock()
  167. //defer p.findLock.Unlock()
  168. //合并进行的任务都完成
  169. p.wg.Wait()
  170. //遍历id
  171. //所有内存中的项目信息
  172. p.AllIdsMapLock.Lock()
  173. p.mapHrefLock.Lock()
  174. log.Println("清除开始")
  175. //清除计数
  176. clearNum := 0
  177. for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
  178. v := p.AllIdsMap[pid]
  179. if v != nil && p.currentTime-v.P.LastTime > p.validTime {
  180. delete(p.mapHref, kHref)
  181. }
  182. }
  183. for k, v := range p.AllIdsMap {
  184. if p.currentTime-v.P.LastTime > p.validTime {
  185. clearNum++
  186. redis.Del("project", k)
  187. //删除id的map
  188. delete(p.AllIdsMap, k)
  189. //删除pb
  190. if v.P.Buyer != "" {
  191. ids := p.mapPb[v.P.Buyer]
  192. if ids != nil {
  193. ids.Lock.Lock()
  194. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  195. if len(ids.Arr) == 0 {
  196. delete(p.mapPb, v.P.Buyer)
  197. }
  198. ids.Lock.Unlock()
  199. }
  200. }
  201. //删除mapPn
  202. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  203. if vn != "" {
  204. ids := p.mapPn[vn]
  205. if ids != nil {
  206. ids.Lock.Lock()
  207. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  208. if len(ids.Arr) == 0 {
  209. delete(p.mapPn, vn)
  210. }
  211. ids.Lock.Unlock()
  212. }
  213. }
  214. }
  215. //删除mapPc
  216. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  217. if vn != "" {
  218. ids := p.mapPc[vn]
  219. if ids != nil {
  220. ids.Lock.Lock()
  221. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  222. if len(ids.Arr) == 0 {
  223. delete(p.mapPc, vn)
  224. }
  225. ids.Lock.Unlock()
  226. }
  227. }
  228. }
  229. v = nil
  230. }
  231. }
  232. p.mapHrefLock.Unlock()
  233. p.AllIdsMapLock.Unlock()
  234. p.findLock.Unlock()
  235. SingleClear = 0
  236. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
  237. } else {
  238. p.clearContimes++
  239. }
  240. })
  241. c.Start()
  242. }
  243. //全量合并
  244. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  245. defer util.Catch()
  246. p.thread = util.IntAllDef(Thread, 4)
  247. q, _ := udpInfo["query"].(map[string]interface{})
  248. if q == nil {
  249. q = map[string]interface{}{}
  250. lteid, _ := udpInfo["lteid"].(string)
  251. var idmap map[string]interface{}
  252. if len(lteid) > 15 {
  253. idmap = map[string]interface{}{
  254. "$lte": mongodb.StringTOBsonId(lteid),
  255. }
  256. }
  257. gtid, _ := udpInfo["gtid"].(string)
  258. if len(gtid) > 15 {
  259. if idmap == nil {
  260. idmap = map[string]interface{}{}
  261. }
  262. idmap["$gte"] = mongodb.StringTOBsonId(gtid)
  263. }
  264. if idmap != nil {
  265. q["_id"] = idmap
  266. }
  267. }
  268. //生成查询语句执行
  269. log.Println("查询语句:", q)
  270. p.enter(MongoTool.DbName, ExtractColl, q)
  271. }
  272. //增量合并
  273. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  274. defer util.Catch()
  275. //1、检查pubilshtime索引
  276. db, _ := udpInfo["db"].(string)
  277. if db == "" {
  278. db = MongoTool.DbName
  279. }
  280. coll, _ := udpInfo["coll"].(string)
  281. if coll == "" {
  282. coll = ExtractColl
  283. }
  284. thread := util.IntAllDef(Thread, 4)
  285. if thread > 0 {
  286. p.thread = thread
  287. }
  288. //开始id和结束id
  289. q, _ := udpInfo["query"].(map[string]interface{})
  290. gtid := udpInfo["gtid"].(string)
  291. lteid := udpInfo["lteid"].(string)
  292. q = map[string]interface{}{
  293. "_id": map[string]interface{}{
  294. "$gt": mongodb.StringTOBsonId(gtid),
  295. "$lte": mongodb.StringTOBsonId(lteid),
  296. },
  297. }
  298. p.enter(db, coll, q)
  299. if udpInfo["stop"] == nil {
  300. for i := 0; i < 1; i++ {
  301. sp <- true
  302. }
  303. for i := 0; i < 1; i++ {
  304. <-sp
  305. }
  306. }
  307. log.Println("保存完成,生索引", p.pici)
  308. time.Sleep(5 * time.Second)
  309. nextNode(udpInfo, p.pici)
  310. }
  311. //招标字段更新
  312. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  313. defer util.Catch()
  314. infoid := udpInfo["infoid"].(string)
  315. infoMap, _ := MgoBidding.FindById(util.ObjToString(udpInfo["coll"]), infoid, nil)
  316. if (*infoMap)["modifyinfo"] == nil {
  317. util.Debug("does not exist modifyinfo ---,", infoid)
  318. return
  319. }
  320. client := Es.GetEsConn()
  321. defer Es.DestoryEsConn(client)
  322. esquery := `{"query": {"bool": {"must": [{"match": {"ids": "` + infoid + `"}}]}}}`
  323. data := Es.Get(Index, Itype, esquery)
  324. if len(*data) > 0 {
  325. pid := util.ObjToString(((*data)[0])["_id"])
  326. p.updateJudge(*infoMap, pid)
  327. } else {
  328. util.Debug("not find project---,", infoid)
  329. }
  330. }
  331. func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
  332. defer util.Catch()
  333. util.Debug(udpInfo)
  334. pid := util.ObjToString(udpInfo["pid"])
  335. updateMap := util.ObjToMap(udpInfo["updateField"])
  336. if pid == "" || len(*updateMap) == 0 {
  337. util.Debug("参数有误")
  338. return
  339. }
  340. proMap, _ := MongoTool.FindById(ProjectColl, pid, nil)
  341. if len(*proMap) > 1 {
  342. (*proMap)["reason"] = "直接修改项目字段信息"
  343. backupPro(*proMap)
  344. delete(*proMap, "reason")
  345. updataMap := make(map[string]interface{})
  346. modifyInfo := make(map[string]interface{})
  347. for k, v := range *updateMap {
  348. if strings.Contains(k, "time") {
  349. updataMap[k] = util.Int64All(v)
  350. } else {
  351. updataMap[k] = v
  352. }
  353. modifyInfo[k] = true
  354. }
  355. updataMap["modifyinfo"] = modifyInfo
  356. util.Debug(updataMap)
  357. bol := MongoTool.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": updataMap})
  358. if bol {
  359. //es索引
  360. by, _ := json.Marshal(map[string]interface{}{
  361. "query": map[string]interface{}{
  362. "_id": bson.M{
  363. "$gte": pid,
  364. "$lte": pid,
  365. }},
  366. "stype": "project",
  367. })
  368. util.Debug(string(by))
  369. _ = udpclient.WriteUdp(by, mu.OP_TYPE_DATA, toaddr[1])
  370. }
  371. // 内存
  372. var pro ProjectCache
  373. err := mapstructure.Decode(proMap, &pro)
  374. if err != nil {
  375. util.Debug(err)
  376. }
  377. p.AllIdsMapLock.Lock()
  378. if v, ok := p.AllIdsMap[pid]; ok {
  379. v.P = &pro
  380. }
  381. p.AllIdsMapLock.Unlock()
  382. } else {
  383. util.Debug("Not find project---", pid)
  384. }
  385. }
  386. func (p *ProjectTask) delInfoPro(udpInfo map[string]interface{}) {
  387. defer util.Catch()
  388. util.Debug(udpInfo)
  389. infoid := util.ObjToString(udpInfo["infoid"])
  390. if infoid == "" {
  391. util.Debug("参数有误")
  392. return
  393. }
  394. client := Es.GetEsConn()
  395. defer Es.DestoryEsConn(client)
  396. esquery := `{"query": {"bool": {"must": [{"term": {"ids": "` + infoid + `"}}]}}}`
  397. data := Es.Get(Index, Itype, esquery)
  398. if len(*data) > 0 {
  399. pid := util.ObjToString(((*data)[0])["_id"])
  400. p.delJudge(infoid, pid)
  401. } else {
  402. util.Debug("not find project---,", infoid)
  403. }
  404. }
  405. //通知下个节点nextNode
  406. func nextNode(mapInfo map[string]interface{}, pici int64) {
  407. mapInfo["stype"] = "project"
  408. mapInfo["query"] = map[string]interface{}{
  409. "pici": pici,
  410. }
  411. key := fmt.Sprintf("%d-%s-%d", pici, "project", 0)
  412. mapInfo["key"] = key
  413. datas, _ := json.Marshal(mapInfo)
  414. node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0}
  415. udptaskmap.Store(key, node)
  416. _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, toaddr[0])
  417. }
  418. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  419. defer util.Catch()
  420. defer func() {
  421. p.Brun = false
  422. }()
  423. p.Brun = true
  424. count := 0
  425. countRepeat := 0
  426. pool := make(chan bool, p.thread)
  427. util.Debug("start project", q, p.pici)
  428. sess := MongoTool.GetMgoConn()
  429. defer MongoTool.DestoryMongoConn(sess)
  430. infoPool := make(chan map[string]interface{}, 2000)
  431. over := make(chan bool)
  432. go func() {
  433. L:
  434. for {
  435. select {
  436. case tmp := <-infoPool:
  437. pool <- true
  438. go func(tmp map[string]interface{}) {
  439. defer func() {
  440. <-pool
  441. }()
  442. p.fillInPlace(tmp)
  443. info := ParseInfo(tmp)
  444. p.currentTime = info.Publishtime
  445. //普通合并
  446. p.CommonMerge(tmp, info)
  447. }(tmp)
  448. default:
  449. select {
  450. case tmp := <-infoPool:
  451. pool <- true
  452. go func(tmp map[string]interface{}) {
  453. defer func() {
  454. <-pool
  455. }()
  456. p.fillInPlace(tmp)
  457. info := ParseInfo(tmp)
  458. p.currentTime = info.Publishtime
  459. //普通合并
  460. p.CommonMerge(tmp, info)
  461. }(tmp)
  462. case <-over:
  463. break L
  464. }
  465. }
  466. }
  467. }()
  468. fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0}
  469. if p.currentType == "project" || p.currentType == "project_history" {
  470. c, _ := sess.DB(db).C(coll).Find(q).Count()
  471. util.Debug("共查询:", c, "条")
  472. }
  473. ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
  474. query := ms.Iter()
  475. var lastid interface{}
  476. L:
  477. for {
  478. select {
  479. case <-queryClose:
  480. log.Println("receive interrupt sign")
  481. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  482. queryCloseOver <- true
  483. break L
  484. default:
  485. tmp := make(map[string]interface{})
  486. if query.Next(&tmp) {
  487. lastid = tmp["_id"]
  488. if P_QL.currentType == "ql" {
  489. if count%50000 == 0 {
  490. util.Debug("current", count, lastid)
  491. }
  492. } else {
  493. if count%2000 == 0 {
  494. util.Debug("current", count, lastid)
  495. }
  496. }
  497. if util.IntAll(tmp["repeat"]) == 0 {
  498. if util.ObjToString(tmp["toptype"]) != "采购意向" {
  499. if P_QL.currentType == "ql" {
  500. infoPool <- tmp
  501. } else if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 {
  502. // id段增量数据
  503. infoPool <- tmp
  504. } else if P_QL.currentType == "project_history" && util.ObjToString(tmp["history_updatetime"]) != "" {
  505. // id段 历史数据
  506. infoPool <- tmp
  507. }
  508. }
  509. } else {
  510. countRepeat++
  511. //if P_QL.currentType == "project" {
  512. // util.Debug("repeat err---", tmp["_id"])
  513. //}
  514. }
  515. count++
  516. } else {
  517. break L
  518. }
  519. }
  520. }
  521. time.Sleep(5 * time.Second)
  522. over <- true
  523. //阻塞
  524. for n := 0; n < p.thread; n++ {
  525. pool <- true
  526. }
  527. log.Println("所有线程执行完成...", count, countRepeat)
  528. }
  529. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  530. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  531. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  532. proHref := util.ObjToString(jsonData["projecthref"]) // 网站本身发布的公告具有招投标流程,直接参与合并
  533. if jsonData != nil && proHref != "" {
  534. //projectHref字段合并
  535. tmp["projecthref"] = proHref
  536. p.mapHrefLock.Lock()
  537. pid := p.mapHref[proHref]
  538. p.mapHrefLock.Unlock()
  539. if pid != "" {
  540. p.AllIdsMapLock.Lock()
  541. res := p.AllIdsMap[pid]
  542. p.AllIdsMapLock.Unlock()
  543. if res != nil {
  544. comparePro := res.P
  545. _, ex := p.CompareStatus(comparePro, info)
  546. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  547. } else {
  548. p.startProjectMerge(info, tmp)
  549. }
  550. } else {
  551. id, p1 := p.NewProject(tmp, info)
  552. p.mapHrefLock.Lock()
  553. p.mapHref[proHref] = id
  554. p.mapHrefLock.Unlock()
  555. p.AllIdsMapLock.Lock()
  556. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  557. p.AllIdsMapLock.Unlock()
  558. }
  559. } else {
  560. //项目合并
  561. p.startProjectMerge(info, tmp)
  562. }
  563. } else {
  564. //项目合并
  565. p.startProjectMerge(info, tmp)
  566. }
  567. } else {
  568. }
  569. }
  570. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  571. bys, _ := json.Marshal(tmp)
  572. var thisinfo *Info
  573. _ = json.Unmarshal(bys, &thisinfo)
  574. if thisinfo == nil {
  575. return nil
  576. }
  577. if tmp["pt_modify"] != nil {
  578. thisinfo.Publishtime = util.Int64All(tmp["pt_modify"])
  579. tmp["publishtime"] = tmp["pt_modify"]
  580. }
  581. // 处理publishtime为空
  582. if thisinfo.Publishtime <= 0 {
  583. for _, d := range DateTimeSelect {
  584. if tmp[d] != nil {
  585. thisinfo.Publishtime = util.Int64All(tmp[d])
  586. tmp["publishtime"] = tmp[d]
  587. break
  588. }
  589. }
  590. }
  591. if len(thisinfo.Topscopeclass) == 0 {
  592. thisinfo.Topscopeclass = []string{}
  593. }
  594. if len(thisinfo.Subscopeclass) == 0 {
  595. thisinfo.Subscopeclass = []string{}
  596. }
  597. if thisinfo.SubType == "" {
  598. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  599. }
  600. //从标题中查找项目编号
  601. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  602. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  603. thisinfo.PTC = res[1]
  604. } else {
  605. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  606. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  607. thisinfo.PTC = res[3]
  608. } else {
  609. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  610. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  611. thisinfo.PTC = res[1]
  612. }
  613. }
  614. }
  615. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  616. //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  617. //if thisinfo.ProjectName != "" {
  618. thisinfo.pnbval++
  619. //}
  620. }
  621. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  622. if thisinfo.ProjectCode != "" {
  623. //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  624. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  625. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  626. }
  627. } else {
  628. //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  629. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  630. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  631. }
  632. }
  633. thisinfo.pnbval++
  634. }
  635. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  636. thisinfo.PTC = ""
  637. }
  638. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  639. thisinfo.pnbval++
  640. } else {
  641. thisinfo.Buyer = ""
  642. }
  643. //清理评审专家名单
  644. if len(thisinfo.ReviewExperts) > 0 {
  645. thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
  646. }
  647. //winners整理、清理
  648. winner := QyFilter(util.ObjToString(tmp["winner"]), "winner")
  649. tmp["winner"] = winner
  650. m1 := map[string]bool{}
  651. winners := []string{}
  652. if winner != "" {
  653. m1[winner] = true
  654. winners = append(winners, winner)
  655. }
  656. packageM, _ := tmp["package"].(map[string]interface{})
  657. if packageM != nil {
  658. thisinfo.HasPackage = true
  659. for _, p := range packageM {
  660. pm, _ := p.(map[string]interface{})
  661. pw := QyFilter(util.ObjToString(pm["winner"]), "winner")
  662. if pw != "" && !m1[pw] {
  663. m1[pw] = true
  664. winners = append(winners, pw)
  665. }
  666. }
  667. }
  668. thisinfo.Winners = winners
  669. //清理winnerorder
  670. var wins []map[string]interface{}
  671. for _, v := range thisinfo.WinnerOrder {
  672. w := QyFilter(util.ObjToString(v["entname"]), "winner")
  673. if w != "" {
  674. v["entname"] = w
  675. wins = append(wins, v)
  676. }
  677. }
  678. thisinfo.WinnerOrder = wins
  679. //清理buyer
  680. buyer := QyFilter(util.ObjToString(tmp["buyer"]), "buyer")
  681. tmp["buyer"] = buyer
  682. thisinfo.Buyer = buyer
  683. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  684. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  685. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  686. //处理分包中数据异常问题
  687. for k, tmp := range thisinfo.Package {
  688. if ps, ok := tmp.([]map[string]interface{}); ok {
  689. for i, p := range ps {
  690. name, _ := p["name"].(string)
  691. if len([]rune(name)) > 100 {
  692. p["name"] = fmt.Sprint([]rune(name[:100]))
  693. }
  694. ps[i] = p
  695. }
  696. thisinfo.Package[k] = ps
  697. }
  698. }
  699. return thisinfo
  700. }
  701. func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
  702. tmpPro, _ := MongoTool.FindById(ProjectColl, pid, nil)
  703. modifyProMap := make(map[string]interface{}) // 修改项目的字段
  704. if modifyMap, ok := infoMap["modifyinfo"].(map[string]interface{}); ok {
  705. for k := range modifyMap {
  706. if modifyMap[k] != nil && modifyMap[k] != "" && (*tmpPro)[k] != nil {
  707. modifyProMap[k] = infoMap[k]
  708. }
  709. }
  710. }
  711. if len(modifyProMap) == 0 {
  712. util.Debug("修改招标公告信息不需要修改项目信息字段", infoMap["_id"])
  713. return
  714. }
  715. p.AllIdsMapLock.Lock()
  716. _, ok := p.AllIdsMap[pid]
  717. p.AllIdsMapLock.Unlock()
  718. ids := (*tmpPro)["ids"].([]interface{})
  719. index, position := -1, 0 // index 0:第一个,1:中间,2:最后一个 position list中位置
  720. for i, v := range ids {
  721. if util.ObjToString(v) == mongodb.BsonIdToSId(infoMap["_id"]) {
  722. position = i
  723. if i == 0 {
  724. index = 0
  725. } else if i == len(ids)-1 {
  726. index = 2
  727. } else {
  728. index = 1
  729. }
  730. }
  731. }
  732. if ok {
  733. // 周期内
  734. //projecthref字段
  735. if infoMap["jsondata"] != nil {
  736. jsonData := infoMap["jsondata"].(map[string]interface{})
  737. if proHref, ok := jsonData["projecthref"].(string); ok {
  738. p.mapHrefLock.Lock()
  739. tempId := p.mapHref[proHref]
  740. p.mapHrefLock.Unlock()
  741. if pid == tempId {
  742. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  743. } else {
  744. util.Debug("projecthref data id err---pid="+pid, "---"+tempId)
  745. }
  746. } else {
  747. f := modifyEle(modifyProMap)
  748. if f {
  749. //合并、修改
  750. util.Debug("合并修改更新", "----------------------------")
  751. p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap)
  752. } else {
  753. //修改
  754. util.Debug("修改更新", "----------------------------")
  755. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  756. }
  757. }
  758. } else {
  759. f := modifyEle(modifyProMap)
  760. if f {
  761. //合并、修改
  762. util.Debug("合并修改更新", "----------------------------")
  763. p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap)
  764. } else {
  765. //修改
  766. util.Debug("修改更新", "----------------------------")
  767. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  768. }
  769. }
  770. } else {
  771. // 周期外
  772. util.Debug("周期外数据直接修改", "----------------------------")
  773. p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
  774. }
  775. }
  776. var Elements = []string{
  777. "projectname",
  778. "projectcode",
  779. "buyer",
  780. "agency",
  781. "area",
  782. "city",
  783. "publishtime",
  784. "toptype",
  785. "subtype",
  786. }
  787. /**
  788. 修改的字段
  789. 修改的字段是否是影响合并流程的要素字段
  790. */
  791. func modifyEle(tmp map[string]interface{}) bool {
  792. merge := false
  793. for _, str := range Elements {
  794. if tmp[str] != nil {
  795. merge = true
  796. break
  797. }
  798. }
  799. return merge
  800. }
  801. //补全位置信息
  802. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  803. area := util.ObjToString(tmp["area"])
  804. city := util.ObjToString(tmp["city"])
  805. if area != "" && city != "" {
  806. return
  807. }
  808. tmpSite := util.ObjToString(tmp["site"])
  809. if tmpSite == "" {
  810. return
  811. }
  812. p.mapSiteLock.Lock()
  813. defer p.mapSiteLock.Unlock()
  814. site := p.mapSite[tmpSite]
  815. if site != nil {
  816. if area != "" {
  817. if area == "全国" {
  818. tmp["area"] = site.Area
  819. tmp["city"] = site.City
  820. tmp["district"] = site.District
  821. return
  822. }
  823. if area != site.Area {
  824. return
  825. } else {
  826. if site.City != "" {
  827. tmp["area"] = site.Area
  828. tmp["city"] = site.City
  829. tmp["district"] = site.District
  830. }
  831. }
  832. } else {
  833. tmp["area"] = site.Area
  834. tmp["city"] = site.City
  835. tmp["district"] = site.District
  836. return
  837. }
  838. }
  839. }
  840. //从数组中删除元素
  841. func deleteSlice(arr []string, v, stype string) []string {
  842. for k, v1 := range arr {
  843. if v1 == v {
  844. ts := time.Now().Unix()
  845. arr = append(arr[:k], arr[k+1:]...)
  846. rt := time.Now().Unix() - ts
  847. if rt > 0 {
  848. log.Println("deleteSlice", stype, rt, v, len(arr))
  849. }
  850. return arr
  851. }
  852. }
  853. return arr
  854. }
  855. //校验评审专家
  856. func ClearRp(tmp []string) []string {
  857. arrTmp := []string{}
  858. for _, v := range tmp {
  859. // 汉字过滤(全汉字,2-4个字)
  860. if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
  861. continue
  862. }
  863. //黑名单过滤
  864. if BlaskListMap[v] {
  865. continue
  866. }
  867. arrTmp = append(arrTmp, v)
  868. }
  869. return arrTmp
  870. }
  871. func QyFilter(name, stype string) string {
  872. name = strings.ReplaceAll(name, " ", "")
  873. //preReg := PreRegexp[stype]
  874. //for _, v := range preReg {
  875. // name = v.ReplaceAllString(name, "")
  876. //}
  877. //backReg := BackRegexp[stype]
  878. //for _, v := range backReg {
  879. // name = v.ReplaceAllString(name, "")
  880. //}
  881. //backRepReg := BackRepRegexp[stype]
  882. //for _, v := range backRepReg {
  883. // name = v.regs.ReplaceAllString(name, v.repstr)
  884. //}
  885. blackReg := BlackRegexp[stype]
  886. for _, v := range blackReg {
  887. if v.MatchString(name) {
  888. name = ""
  889. break
  890. }
  891. }
  892. if !regexp.MustCompile("[\\p{Han}]{4,}").MatchString(name) {
  893. name = ""
  894. }
  895. if utf8.RuneCountInString(name) > 60 {
  896. name = ""
  897. }
  898. return name
  899. }