task.go 26 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "log"
  7. mu "mfw/util"
  8. "qfw/util"
  9. "regexp"
  10. "strings"
  11. "sync"
  12. "time"
  13. "unicode/utf8"
  14. "github.com/robfig/cron"
  15. "go.mongodb.org/mongo-driver/bson/primitive"
  16. )
  17. /**
  18. 任务入口
  19. 全量、增量合并
  20. 更新、插入,内存清理
  21. 转换成info对象
  22. **/
  23. var PreRegexp = map[string][]*regexp.Regexp{}
  24. var BackRegexp = map[string][]*regexp.Regexp{}
  25. var BackRepRegexp = map[string][]RegexpInfo{}
  26. var BlackRegexp = map[string][]*regexp.Regexp{}
  27. type RegexpInfo struct {
  28. regs *regexp.Regexp
  29. repstr string
  30. }
  31. //项目合并对象
  32. type ProjectTask struct {
  33. InitMinTime int64 //最小时间,小于0的处理一次
  34. name string
  35. thread int //线程数
  36. //查找锁
  37. findLock sync.Mutex
  38. wg sync.WaitGroup
  39. //map锁
  40. AllIdsMapLock sync.Mutex
  41. //对应的id
  42. AllIdsMap map[string]*ID
  43. //采购单位、项目名称、项目编号
  44. mapPb, mapPn, mapPc map[string]*Key
  45. //流程数据 字段相同,直接合并
  46. mapHref map[string]string
  47. mapHrefLock sync.Mutex
  48. //站点
  49. mapSite map[string]*Site
  50. mapSiteLock sync.Mutex
  51. //bidtype、bidstatus 锁
  52. mapBidLock sync.Mutex
  53. //更新或新增通道
  54. updatePool chan []map[string]interface{}
  55. //savePool chan map[string]interface{}
  56. //saveSign, updateSign chan bool
  57. //表名
  58. coll string
  59. //当前状态是全量还是增量
  60. currentType string //当前是跑全量还是跑增量
  61. //
  62. clearContimes int
  63. //当前时间
  64. currentTime int64
  65. //保存长度
  66. saveSize int
  67. pici int64
  68. validTime int64
  69. statusTime int64
  70. //结果时间的更新 最近两天的公告不再更新jgtime
  71. jgTime int64
  72. // LockPool chan *sync.Mutex
  73. // LockPoolLock sync.Mutex
  74. // m1, m23, m4 map[int]int
  75. // l1, l23, l4 map[int]*sync.Mutex
  76. Brun bool
  77. }
  78. func NewPT() *ProjectTask {
  79. p := &ProjectTask{
  80. InitMinTime: int64(1325347200),
  81. name: "全/增量对象",
  82. thread: Thread,
  83. updatePool: make(chan []map[string]interface{}, 5000),
  84. //savePool: make(chan map[string]interface{}, 2000),
  85. wg: sync.WaitGroup{},
  86. AllIdsMap: make(map[string]*ID, 5000000),
  87. mapPb: make(map[string]*Key, 1500000),
  88. mapPn: make(map[string]*Key, 5000000),
  89. mapPc: make(map[string]*Key, 5000000),
  90. mapHref: make(map[string]string, 1500000),
  91. mapSite: make(map[string]*Site, 1000000),
  92. saveSize: 100,
  93. //saveSign: make(chan bool, 1),
  94. //updateSign: make(chan bool, 1),
  95. coll: ProjectColl,
  96. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  97. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  98. jgTime: int64(util.IntAllDef(3, 3) * 86400),
  99. }
  100. return p
  101. }
  102. var P_QL *ProjectTask
  103. var sp = make(chan bool, 5)
  104. //初始化全量合并对象
  105. func init() {
  106. P_QL = NewPT()
  107. log.Println(len(P_QL.updatePool))
  108. go P_QL.updateAllQueue()
  109. go P_QL.clearMem()
  110. }
  111. func (p *ProjectTask) updateAllQueue() {
  112. arru := make([][]map[string]interface{}, p.saveSize)
  113. indexu := 0
  114. for {
  115. select {
  116. case v := <-p.updatePool:
  117. arru[indexu] = v
  118. indexu++
  119. if indexu == p.saveSize {
  120. sp <- true
  121. go func(arru [][]map[string]interface{}) {
  122. defer func() {
  123. <-sp
  124. }()
  125. MongoTool.UpSertBulk(p.coll, arru...)
  126. }(arru)
  127. arru = make([][]map[string]interface{}, p.saveSize)
  128. indexu = 0
  129. }
  130. case <-time.After(1000 * time.Millisecond):
  131. if indexu > 0 {
  132. sp <- true
  133. go func(arru [][]map[string]interface{}) {
  134. defer func() {
  135. <-sp
  136. }()
  137. MongoTool.UpSertBulk(p.coll, arru...)
  138. }(arru[:indexu])
  139. arru = make([][]map[string]interface{}, p.saveSize)
  140. indexu = 0
  141. }
  142. }
  143. }
  144. }
  145. //项目合并内存更新
  146. func (p *ProjectTask) clearMem() {
  147. c := cron.New()
  148. //在内存中保留最近6个月的信息
  149. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  150. _ = c.AddFunc("50 0/5 * * * *", func() {
  151. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
  152. SingleClear = 1
  153. //跳过的次数清零
  154. p.clearContimes = 0
  155. //信息进入查找对比全局锁
  156. p.findLock.Lock()
  157. //defer p.findLock.Unlock()
  158. //合并进行的任务都完成
  159. p.wg.Wait()
  160. //遍历id
  161. //所有内存中的项目信息
  162. p.AllIdsMapLock.Lock()
  163. p.mapHrefLock.Lock()
  164. log.Println("清除开始")
  165. //清除计数
  166. clearNum := 0
  167. for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
  168. v := p.AllIdsMap[pid]
  169. if p.currentTime-v.P.LastTime > p.validTime {
  170. delete(p.mapHref, kHref)
  171. }
  172. }
  173. for k, v := range p.AllIdsMap {
  174. if p.currentTime-v.P.LastTime > p.validTime {
  175. clearNum++
  176. //删除id的map
  177. delete(p.AllIdsMap, k)
  178. //删除pb
  179. if v.P.Buyer != "" {
  180. ids := p.mapPb[v.P.Buyer]
  181. if ids != nil {
  182. ids.Lock.Lock()
  183. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  184. if len(ids.Arr) == 0 {
  185. delete(p.mapPb, v.P.Buyer)
  186. }
  187. ids.Lock.Unlock()
  188. }
  189. }
  190. //删除mapPn
  191. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  192. if vn != "" {
  193. ids := p.mapPn[vn]
  194. if ids != nil {
  195. ids.Lock.Lock()
  196. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  197. if len(ids.Arr) == 0 {
  198. delete(p.mapPn, vn)
  199. }
  200. ids.Lock.Unlock()
  201. }
  202. }
  203. }
  204. //删除mapPc
  205. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  206. if vn != "" {
  207. ids := p.mapPc[vn]
  208. if ids != nil {
  209. ids.Lock.Lock()
  210. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  211. if len(ids.Arr) == 0 {
  212. delete(p.mapPc, vn)
  213. }
  214. ids.Lock.Unlock()
  215. }
  216. }
  217. }
  218. v = nil
  219. }
  220. }
  221. p.mapHrefLock.Unlock()
  222. p.AllIdsMapLock.Unlock()
  223. p.findLock.Unlock()
  224. SingleClear = 0
  225. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
  226. } else {
  227. p.clearContimes++
  228. }
  229. })
  230. c.Start()
  231. }
  232. //全量合并
  233. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  234. defer util.Catch()
  235. p.thread = util.IntAllDef(Thread, 4)
  236. q, _ := udpInfo["query"].(map[string]interface{})
  237. if q == nil {
  238. q = map[string]interface{}{}
  239. lteid, _ := udpInfo["lteid"].(string)
  240. var idmap map[string]interface{}
  241. if len(lteid) > 15 {
  242. idmap = map[string]interface{}{
  243. "$lte": StringTOBsonId(lteid),
  244. }
  245. }
  246. gtid, _ := udpInfo["gtid"].(string)
  247. if len(gtid) > 15 {
  248. if idmap == nil {
  249. idmap = map[string]interface{}{}
  250. }
  251. idmap["$gte"] = StringTOBsonId(gtid)
  252. }
  253. if idmap != nil {
  254. q["_id"] = idmap
  255. }
  256. }
  257. //生成查询语句执行
  258. log.Println("查询语句:", q)
  259. p.enter(MongoTool.DbName, ExtractColl, q)
  260. }
  261. //增量合并
  262. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  263. defer util.Catch()
  264. //1、检查pubilshtime索引
  265. db, _ := udpInfo["db"].(string)
  266. if db == "" {
  267. db = MongoTool.DbName
  268. }
  269. coll, _ := udpInfo["coll"].(string)
  270. if coll == "" {
  271. coll = ExtractColl
  272. }
  273. thread := util.IntAllDef(Thread, 4)
  274. if thread > 0 {
  275. p.thread = thread
  276. }
  277. //开始id和结束id
  278. q, _ := udpInfo["query"].(map[string]interface{})
  279. gtid := udpInfo["gtid"].(string)
  280. lteid := udpInfo["lteid"].(string)
  281. if q == nil {
  282. q = map[string]interface{}{
  283. "_id": map[string]interface{}{
  284. "$gt": StringTOBsonId(gtid),
  285. "$lte": StringTOBsonId(lteid),
  286. },
  287. }
  288. }
  289. if q != nil {
  290. //生成查询语句执行
  291. p.enter(db, coll, q)
  292. }
  293. if udpInfo["stop"] == nil {
  294. for i := 0; i < 5; i++ {
  295. sp <- true
  296. }
  297. for i := 0; i < 5; i++ {
  298. <-sp
  299. }
  300. log.Println("保存完成,生索引", p.pici)
  301. time.Sleep(5 * time.Second)
  302. nextNode(udpInfo, p.pici)
  303. }
  304. }
  305. //招标字段更新
  306. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  307. defer util.Catch()
  308. infoid := udpInfo["infoid"].(string)
  309. infoMap := MongoTool.FindById(ExtractColl, infoid)
  310. client := Es.GetEsConn()
  311. defer Es.DestoryEsConn(client)
  312. esquery := `{"query": {"bool": {"must": [{"term": {"list.infoid": "`+infoid+`"}}]}}}`
  313. data := Es.Get(Index, Itype, esquery)
  314. if len(*data) > 0 {
  315. info := ParseInfo(infoMap)
  316. p.updateJudge(infoMap, info, util.ObjToString((*data)[0]["_id"]))
  317. }else {
  318. util.Debug("Not find project---", infoid)
  319. }
  320. }
  321. func (p *ProjectTask) taskQuery() {
  322. defer util.Catch()
  323. count := 0
  324. sess := MongoTool.GetMgoConn()
  325. defer MongoTool.DestoryMongoConn(sess)
  326. fields := map[string]interface{} {"budget": 1, "bidamount": 1, "package": 1}
  327. ms := sess.DB(MongoTool.DbName).C(UpdateColl).Find(map[string]interface{}{}).Select(fields)
  328. query := ms.Iter()
  329. L:
  330. for {
  331. tmp := make(map[string]interface{})
  332. if query.Next(&tmp) {
  333. lastid := tmp["_id"]
  334. tmp["id"] = tmp["_id"].(primitive.ObjectID).Hex();
  335. if count%1000 == 0 {
  336. log.Println("current modify", count, lastid)
  337. }
  338. p.taskUpdateMoney(tmp)
  339. count++
  340. } else {
  341. break L
  342. }
  343. }
  344. }
  345. //修改公告信息的预算/中标金额
  346. func (p *ProjectTask) taskUpdateMoney(udpInfo map[string]interface{}) {
  347. defer util.Catch()
  348. id := udpInfo["id"].(string)
  349. budget := util.Float64All(udpInfo["budget"])
  350. bidamount := util.Float64All(udpInfo["bidamount"])
  351. client := Es.GetEsConn()
  352. defer Es.DestoryEsConn(client)
  353. esquery := `{"query": {"bool": {"must": [{"term": {"list.infoid": "`+id+`"}}]}}}`
  354. data := Es.Get(Index, Itype, esquery)
  355. if len(*data) > 0 {
  356. pid := util.ObjToString((*data)[0]["_id"])
  357. pro := MongoTool.FindById(ProjectColl, pid)
  358. if len(pro) == 0 {
  359. util.Debug("未找到项目, pid=", pid)
  360. return
  361. }
  362. var info *map[string]interface{}
  363. for _, v := range []interface{}(pro["list"].(primitive.A)){
  364. v1 := v.(map[string]interface{})
  365. if util.ObjToString(v1["infoid"]) == id {
  366. info = util.ObjToMap(v)
  367. infoField := util.ObjToMap(pro["infofield"])
  368. if udpInfo["budget"] != nil{
  369. util.Debug("update-------", (*info)["infoid"])
  370. //if pro["budget"] == (*info)["budget"] {
  371. // pro["budget"] = budget
  372. //}
  373. //多包中的金额
  374. if util.IntAll(pro["multipackage"]) == 1 {
  375. if packages, ok := pro["package"].(map[string]interface{}); ok {
  376. M :
  377. for k, v := range packages{
  378. v1 := []interface{}(v.(primitive.A))
  379. for _, v2 := range v1{
  380. v3 := v2.(map[string]interface{})
  381. if util.ObjToString(v3["infoid"]) == id {
  382. if v3["budget"] != nil {
  383. pkg := udpInfo["package"].(map[string]interface{})
  384. tmp := pkg[k].(map[string]interface{})
  385. v3["budget"] = tmp["budget"]
  386. }
  387. }else {
  388. break M
  389. }
  390. }
  391. }
  392. }
  393. }
  394. (*info)["budget"] = budget
  395. (*util.ObjToMap((*infoField)[id]))["budget"] = budget
  396. if pro["sortprice"] == (*info)["budget"] {
  397. pro["sortprice"] = budget
  398. }
  399. }else {
  400. delete(*info, "budget")
  401. }
  402. if udpInfo["bidamount"] != nil{
  403. //if pro["bidamount"] == (*info)["bidamount"] {
  404. // pro["bidamount"] = bidamount
  405. //}
  406. v1["bidamount"] = bidamount
  407. if util.IntAll(pro["multipackage"]) == 1 {
  408. if packages, ok := pro["package"].(map[string]interface{}); ok {
  409. for k, v := range packages{
  410. v1 := []interface{}(v.(primitive.A))
  411. for _, v2 := range v1{
  412. v3 := v2.(map[string]interface{})
  413. if util.ObjToString(v3["infoid"]) == id {
  414. if v3["bidamount"] != nil {
  415. pkg := udpInfo["package"].(map[string]interface{})
  416. tmp := pkg[k].(map[string]interface{})
  417. v3["bidamount"] = tmp["bidamount"]
  418. }
  419. }
  420. }
  421. }
  422. }
  423. }
  424. (*info)["bidamount"] = bidamount
  425. (*util.ObjToMap((*infoField)[id]))["bidamount"] = bidamount
  426. if pro["sortprice"] == (*info)["bidamount"] {
  427. pro["sortprice"] = bidamount
  428. }
  429. }else {
  430. delete(*info, "bidamount")
  431. }
  432. break
  433. }
  434. }
  435. var project *ProjectInfo
  436. var pInfo *Info
  437. bys, _ := json.Marshal(pro)
  438. _ = json.Unmarshal(bys, &project)
  439. bys1, _ := json.Marshal(info)
  440. _ = json.Unmarshal(bys1, &pInfo)
  441. if len(project.Ids) > 1 {
  442. CountAmount(project, pInfo, *info)
  443. if project.Budget > 0 {
  444. pro["budget"] = project.Budget
  445. }
  446. if project.Bidamount > 0 {
  447. pro["bidamount"] = project.Bidamount
  448. }
  449. }else {
  450. pro["budget"] = budget
  451. pro["bidamount"] = bidamount
  452. if budget > bidamount {
  453. pro["sortprice"] = budget
  454. }else {
  455. pro["sortprice"] = bidamount
  456. }
  457. }
  458. set := map[string]interface{}{
  459. "$set": pro,
  460. }
  461. MongoTool.UpdateById(ProjectColl, pid, set)
  462. loadStart := util.Int64All(Sysconfig["loadStart"])
  463. if loadStart > -1 && project.LastTime >loadStart {
  464. util.Debug("内存中存在该项目信息", project.Id)
  465. p.AllIdsMapLock.Lock()
  466. p.AllIdsMap[pid].P = project
  467. p.AllIdsMapLock.Unlock()
  468. }
  469. bol := Es.DelById(Index, Itype, pid)
  470. if bol {
  471. util.Debug("删除es索引, pid------", pid)
  472. //调udp生索引
  473. by, _ := json.Marshal(map[string]interface{}{
  474. "query": map[string]interface{}{
  475. "_id": bson.M{
  476. "$gte": pid,
  477. "$lte": pid,
  478. }},
  479. "stype": "project",
  480. })
  481. util.Debug(string(by))
  482. _ = udpclient.WriteUdp(by, mu.OP_TYPE_DATA, toaddr[1])
  483. }
  484. }
  485. }
  486. func FindMoney(key string, project map[string]interface{}) float64 {
  487. money := -0.1
  488. for i, v := range []interface{}(project["list"].(primitive.A)){
  489. v1 := v.(map[string]interface{})
  490. if i == 0 {
  491. if v1[key] != nil {
  492. money = util.Float64All(v1[key])
  493. }
  494. }else {
  495. if v1[key] != nil && util.Float64All(v1[key]) > money {
  496. money = util.Float64All(v1[key])
  497. }
  498. }
  499. }
  500. return money
  501. }
  502. func StringTOBsonId(id string) primitive.ObjectID {
  503. objectId, _ := primitive.ObjectIDFromHex(id)
  504. return objectId
  505. }
  506. //通知下个节点nextNode
  507. func nextNode(mapInfo map[string]interface{}, pici int64) {
  508. mapInfo["stype"] = "project"
  509. mapInfo["query"] = map[string]interface{}{
  510. "pici": pici,
  511. }
  512. key := fmt.Sprintf("%d-%s-%d", pici, "project", 0)
  513. mapInfo["key"] = key
  514. datas, _ := json.Marshal(mapInfo)
  515. node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0}
  516. udptaskmap.Store(key, node)
  517. _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, toaddr[0])
  518. }
  519. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  520. defer util.Catch()
  521. defer func() {
  522. p.Brun = false
  523. }()
  524. p.Brun = true
  525. count, taskcount := 0, 0
  526. countRepeat := 0
  527. pool := make(chan bool, p.thread)
  528. log.Println("start project", q)
  529. sess := MongoTool.GetMgoConn()
  530. defer MongoTool.DestoryMongoConn(sess)
  531. infoPool := make(chan map[string]interface{}, 2000)
  532. over := make(chan bool)
  533. go func() {
  534. L:
  535. for {
  536. select {
  537. case tmp := <-infoPool:
  538. pool <- true
  539. taskcount++
  540. go func(tmp map[string]interface{}) {
  541. defer func() {
  542. <-pool
  543. }()
  544. if util.IntAll(tmp["repeat"]) == 0 {
  545. if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 1 {
  546. //增量 dataging为1不参与合并
  547. util.Debug("增量 dataging == 1 ", tmp["_id"])
  548. return
  549. }
  550. p.fillInPlace(tmp)
  551. info := ParseInfo(tmp)
  552. p.currentTime = info.Publishtime
  553. //普通合并
  554. p.CommonMerge(tmp, info)
  555. } else {
  556. //信息错误,进行更新
  557. util.Debug(tmp["_id"])
  558. countRepeat++
  559. }
  560. }(tmp)
  561. case <-over:
  562. break L
  563. }
  564. }
  565. }()
  566. //fields := map[string]interface{} {"repeat": 1, "dataging": 1, "area": 1, "city": 1, "district": 1, "comeintime": 1, "publishtime": 1, "bidopentime": 1, "title": 1, "projectname": 1, "href": 1,
  567. // "projectcode": 1, "buyerclass": 1, "winner": 1, "buyer": 1, "buyerperson": 1, "buyertel": 1, "infoformat": 1, "toptype": 1, "subtype": 1, "spidercode": 1, "projectscope": 1, "contractcode": 1,
  568. // "site": 1, "topscopeclass": 1, "subscopeclass": 1, "bidamount": 1, "budget": 1, "agency": 1, "package": 1, "jsondata": 1, "review_experts": 1, "purchasing": 1, "winnerorder": 1}
  569. fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0}
  570. ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
  571. if Sysconfig["hints"] != nil {
  572. ms.Hint(Sysconfig["hints"])
  573. }
  574. query := ms.Iter()
  575. //query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
  576. var lastid interface{}
  577. L:
  578. for {
  579. select {
  580. case <-queryClose:
  581. log.Println("receive interrupt sign")
  582. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  583. queryCloseOver <- true
  584. break L
  585. default:
  586. tmp := make(map[string]interface{})
  587. if query.Next(&tmp) {
  588. lastid = tmp["_id"]
  589. if count%10000 == 0 {
  590. log.Println("current", count, lastid)
  591. }
  592. infoPool <- tmp
  593. count++
  594. } else {
  595. break L
  596. }
  597. }
  598. }
  599. time.Sleep(5 * time.Second)
  600. over <- true
  601. //阻塞
  602. for n := 0; n < p.thread; n++ {
  603. pool <- true
  604. }
  605. log.Println("所有线程执行完成...", count, taskcount, countRepeat)
  606. }
  607. var (
  608. //从标题获取项目编号
  609. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  610. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  611. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  612. //项目编号过滤
  613. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  614. //项目编号只是数字或只是字母4个以下
  615. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  616. //纯数字或纯字母
  617. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  618. //含分包词,招标未识别分包 合并到一个项目
  619. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  620. )
  621. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  622. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  623. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  624. proHref := util.ObjToString(jsonData["projecthref"])
  625. if jsonData != nil && proHref != "" {
  626. //projectHref字段合并
  627. tmp["projecthref"] = proHref
  628. p.mapHrefLock.Lock()
  629. pid := p.mapHref[proHref]
  630. p.mapHrefLock.Unlock()
  631. if pid != "" {
  632. p.AllIdsMapLock.Lock()
  633. comparePro := p.AllIdsMap[pid].P
  634. p.AllIdsMapLock.Unlock()
  635. _, ex := p.CompareStatus(comparePro, info)
  636. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  637. } else {
  638. id, p1 := p.NewProject(tmp, info)
  639. p.mapHrefLock.Lock()
  640. p.mapHref[proHref] = id
  641. p.mapHrefLock.Unlock()
  642. p.AllIdsMapLock.Lock()
  643. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  644. p.AllIdsMapLock.Unlock()
  645. }
  646. } else {
  647. //项目合并
  648. p.startProjectMerge(info, tmp)
  649. }
  650. } else {
  651. //项目合并
  652. p.startProjectMerge(info, tmp)
  653. }
  654. }
  655. }
  656. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  657. bys, _ := json.Marshal(tmp)
  658. var thisinfo *Info
  659. _ = json.Unmarshal(bys, &thisinfo)
  660. if thisinfo == nil {
  661. return nil
  662. }
  663. if len(thisinfo.Topscopeclass) == 0 {
  664. thisinfo.Topscopeclass = []string{}
  665. }
  666. if len(thisinfo.Subscopeclass) == 0 {
  667. thisinfo.Subscopeclass = []string{}
  668. }
  669. if thisinfo.SubType == "" {
  670. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  671. }
  672. //从标题中查找项目编号
  673. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  674. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  675. thisinfo.PTC = res[1]
  676. } else {
  677. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  678. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  679. thisinfo.PTC = res[3]
  680. } else {
  681. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  682. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  683. thisinfo.PTC = res[1]
  684. }
  685. }
  686. }
  687. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  688. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  689. if thisinfo.ProjectName != "" {
  690. thisinfo.pnbval++
  691. }
  692. }
  693. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  694. if thisinfo.ProjectCode != "" {
  695. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  696. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  697. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  698. }
  699. } else {
  700. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  701. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  702. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  703. }
  704. }
  705. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  706. thisinfo.pnbval++
  707. }
  708. }
  709. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  710. thisinfo.PTC = ""
  711. }
  712. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  713. thisinfo.pnbval++
  714. } else {
  715. thisinfo.Buyer = ""
  716. }
  717. //清理评审专家名单
  718. if len(thisinfo.ReviewExperts) > 0 {
  719. thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
  720. }
  721. //winners整理、清理
  722. winner := QyFilter(util.ObjToString(tmp["winner"]), "winner")
  723. tmp["winner"] = winner
  724. m1 := map[string]bool{}
  725. winners := []string{}
  726. if winner != "" {
  727. m1[winner] = true
  728. winners = append(winners, winner)
  729. }
  730. packageM, _ := tmp["package"].(map[string]interface{})
  731. if packageM != nil {
  732. thisinfo.HasPackage = true
  733. for _, p := range packageM {
  734. pm, _ := p.(map[string]interface{})
  735. pw := QyFilter(util.ObjToString(pm["winner"]), "winner")
  736. if pw != "" && !m1[pw] {
  737. m1[pw] = true
  738. winners = append(winners, pw)
  739. }
  740. }
  741. }
  742. thisinfo.Winners = winners
  743. //清理winnerorder
  744. var wins []map[string]interface{}
  745. for _, v := range thisinfo.WinnerOrder {
  746. w := QyFilter(util.ObjToString(v["entname"]), "winner")
  747. if w != "" {
  748. v["entname"] = w
  749. wins = append(wins, v)
  750. }
  751. }
  752. thisinfo.WinnerOrder = wins
  753. //清理buyer
  754. buyer := QyFilter(util.ObjToString(tmp["buyer"]), "buyer")
  755. tmp["buyer"] = buyer
  756. thisinfo.Buyer = buyer
  757. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  758. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  759. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  760. //处理分包中数据异常问题
  761. for k, tmp := range thisinfo.Package {
  762. if ps, ok := tmp.([]map[string]interface{}); ok {
  763. for i, p := range ps {
  764. name, _ := p["name"].(string)
  765. if len([]rune(name)) > 100 {
  766. p["name"] = fmt.Sprint([]rune(name[:100]))
  767. }
  768. ps[i] = p
  769. }
  770. thisinfo.Package[k] = ps
  771. }
  772. }
  773. return thisinfo
  774. }
  775. func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, info *Info, pInfoId string) {
  776. var index = -1 //0:第一个,1:中间,2:最后一个
  777. tmpPro := MongoTool.FindById(ProjectColl, pInfoId)
  778. ids := []interface{}(tmpPro["ids"].(primitive.A))
  779. infoList := []interface{}(tmpPro["list"].(primitive.A))
  780. for i, v := range ids {
  781. if util.ObjToString(v) == util.ObjToString(infoMap["_id"]) {
  782. if i == 0 {
  783. index = 0
  784. }else if i == len(ids) - 1 {
  785. index = 2
  786. }else {
  787. index = 1
  788. }
  789. }
  790. }
  791. proListMap := infoList[index].(map[string]interface{})
  792. modifyMap, f := modifyEle(proListMap, infoMap)
  793. //projecthref字段
  794. jsonData := infoMap["jsondata"].(map[string]interface{})
  795. if jsonData != nil && jsonData["projecthref"] != nil {
  796. proHref := jsonData["projecthref"].(string)
  797. infoMap["projecthref"] = proHref
  798. p.mapHrefLock.Lock()
  799. pid := p.mapHref[proHref]
  800. p.mapHrefLock.Unlock()
  801. if pid == pInfoId {
  802. p.modifyUpdate(pInfoId, index, info, infoMap, tmpPro, modifyMap)
  803. return
  804. }
  805. }
  806. if f {
  807. //合并、修改
  808. log.Println("合并修改更新", "----------------------------")
  809. p.mergeAndModify(pInfoId, index, info, infoMap, tmpPro, modifyMap)
  810. } else {
  811. //修改
  812. log.Println("修改更新", "----------------------------")
  813. p.modifyUpdate(pInfoId, index, info, infoMap, tmpPro, modifyMap)
  814. }
  815. }
  816. var Elements = []string{
  817. "projectname",
  818. "projectcode",
  819. "buyer",
  820. "agency",
  821. "budget",
  822. "bidamount",
  823. "area",
  824. "city",
  825. "publishtime",
  826. }
  827. /**
  828. 判断修改的字段是否是影响合并流程的要素字段
  829. */
  830. func modifyEle(tmpPro map[string]interface{}, tmp map[string]interface{}) (map[string]interface{}, bool) {
  831. modifyMap := map[string]interface{}{}
  832. for k := range tmpPro {
  833. for k1 := range tmp {
  834. if k == k1 && tmpPro[k] != tmp[k1] {
  835. modifyMap[k] = tmp[k1]
  836. break
  837. }
  838. }
  839. }
  840. for k := range modifyMap {
  841. for _, str := range Elements {
  842. if k == str {
  843. return modifyMap, true
  844. }
  845. }
  846. }
  847. delete(modifyMap, "_id")
  848. return modifyMap, false
  849. }
  850. //补全位置信息
  851. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  852. area := util.ObjToString(tmp["area"])
  853. city := util.ObjToString(tmp["city"])
  854. if area != "" && city != "" {
  855. return
  856. }
  857. tmpSite := util.ObjToString(tmp["site"])
  858. if tmpSite == "" {
  859. return
  860. }
  861. p.mapSiteLock.Lock()
  862. defer p.mapSiteLock.Unlock()
  863. site := p.mapSite[tmpSite]
  864. if site != nil {
  865. if area != "" {
  866. if area == "全国" {
  867. tmp["area"] = site.Area
  868. tmp["city"] = site.City
  869. tmp["district"] = site.District
  870. return
  871. }
  872. if area != site.Area {
  873. return
  874. } else {
  875. if site.City != "" {
  876. tmp["area"] = site.Area
  877. tmp["city"] = site.City
  878. tmp["district"] = site.District
  879. }
  880. }
  881. } else {
  882. tmp["area"] = site.Area
  883. tmp["city"] = site.City
  884. tmp["district"] = site.District
  885. return
  886. }
  887. }
  888. }
  889. //从数组中删除元素
  890. func deleteSlice(arr []string, v, stype string) []string {
  891. for k, v1 := range arr {
  892. if v1 == v {
  893. ts := time.Now().Unix()
  894. arr = append(arr[:k], arr[k+1:]...)
  895. rt := time.Now().Unix() - ts
  896. if rt > 0 {
  897. log.Println("deleteSlice", stype, rt, v, len(arr))
  898. }
  899. return arr
  900. }
  901. }
  902. return arr
  903. }
  904. //校验评审专家
  905. func ClearRp(tmp []string) []string {
  906. arrTmp := []string{}
  907. for _, v := range tmp {
  908. // 汉字过滤(全汉字,2-4个字)
  909. if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
  910. continue
  911. }
  912. //黑名单过滤
  913. if BlaskListMap[v] {
  914. continue
  915. }
  916. arrTmp = append(arrTmp, v)
  917. }
  918. return arrTmp
  919. }
  920. func QyFilter(name, stype string) string {
  921. name = strings.ReplaceAll(name, " ", "")
  922. preReg := PreRegexp[stype]
  923. for _, v := range preReg {
  924. name = v.ReplaceAllString(name, "")
  925. }
  926. backReg := BackRegexp[stype]
  927. for _, v := range backReg {
  928. name = v.ReplaceAllString(name, "")
  929. }
  930. backRepReg := BackRepRegexp[stype]
  931. for _, v := range backRepReg {
  932. name = v.regs.ReplaceAllString(name, v.repstr)
  933. }
  934. blackReg := BlackRegexp[stype]
  935. for _, v := range blackReg {
  936. if v.MatchString(name) {
  937. name = ""
  938. break
  939. }
  940. }
  941. if !regexp.MustCompile("[\\p{Han}]{4,}").MatchString(name) {
  942. name = ""
  943. }
  944. if utf8.RuneCountInString(name) > 60 {
  945. name = ""
  946. }
  947. return name
  948. }