task.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "log"
  7. mu "mfw/util"
  8. "qfw/util"
  9. "regexp"
  10. "strings"
  11. "sync"
  12. "time"
  13. "unicode/utf8"
  14. "github.com/robfig/cron"
  15. "go.mongodb.org/mongo-driver/bson/primitive"
  16. )
  17. /**
  18. 任务入口
  19. 全量、增量合并
  20. 更新、插入,内存清理
  21. 转换成info对象
  22. **/
  23. var PreRegexp = map[string][]*regexp.Regexp{}
  24. var BackRegexp = map[string][]*regexp.Regexp{}
  25. var BackRepRegexp = map[string][]RegexpInfo{}
  26. var BlackRegexp = map[string][]*regexp.Regexp{}
  27. type RegexpInfo struct {
  28. regs *regexp.Regexp
  29. repstr string
  30. }
  31. //项目合并对象
  32. type ProjectTask struct {
  33. InitMinTime int64 //最小时间,小于0的处理一次
  34. name string
  35. thread int //线程数
  36. //查找锁
  37. findLock sync.Mutex
  38. wg sync.WaitGroup
  39. //map锁
  40. AllIdsMapLock sync.Mutex
  41. //对应的id
  42. AllIdsMap map[string]*ID
  43. //采购单位、项目名称、项目编号
  44. mapPb, mapPn, mapPc map[string]*Key
  45. //流程数据 字段相同,直接合并
  46. mapHref map[string]string
  47. mapHrefLock sync.Mutex
  48. //站点
  49. mapSite map[string]*Site
  50. mapSiteLock sync.Mutex
  51. //bidtype、bidstatus 锁
  52. mapBidLock sync.Mutex
  53. //更新或新增通道
  54. updatePool chan []map[string]interface{}
  55. //savePool chan map[string]interface{}
  56. //saveSign, updateSign chan bool
  57. //表名
  58. coll string
  59. //当前状态是全量还是增量
  60. currentType string //当前是跑全量还是跑增量
  61. //
  62. clearContimes int
  63. //当前时间
  64. currentTime int64
  65. //保存长度
  66. saveSize int
  67. pici int64
  68. validTime int64
  69. statusTime int64
  70. //结果时间的更新 最近两天的公告不再更新jgtime
  71. jgTime int64
  72. // LockPool chan *sync.Mutex
  73. // LockPoolLock sync.Mutex
  74. // m1, m23, m4 map[int]int
  75. // l1, l23, l4 map[int]*sync.Mutex
  76. Brun bool
  77. }
  78. func NewPT() *ProjectTask {
  79. p := &ProjectTask{
  80. InitMinTime: int64(1325347200),
  81. name: "全/增量对象",
  82. thread: Thread,
  83. updatePool: make(chan []map[string]interface{}, 5000),
  84. //savePool: make(chan map[string]interface{}, 2000),
  85. wg: sync.WaitGroup{},
  86. AllIdsMap: make(map[string]*ID, 5000000),
  87. mapPb: make(map[string]*Key, 1500000),
  88. mapPn: make(map[string]*Key, 5000000),
  89. mapPc: make(map[string]*Key, 5000000),
  90. mapHref: make(map[string]string, 1500000),
  91. mapSite: make(map[string]*Site, 1000000),
  92. saveSize: 100,
  93. //saveSign: make(chan bool, 1),
  94. //updateSign: make(chan bool, 1),
  95. coll: ProjectColl,
  96. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  97. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  98. jgTime: int64(util.IntAllDef(3, 3) * 86400),
  99. }
  100. return p
  101. }
  102. var P_QL *ProjectTask
  103. var sp = make(chan bool, 5)
  104. //初始化全量合并对象
  105. func init() {
  106. P_QL = NewPT()
  107. log.Println(len(P_QL.updatePool))
  108. go P_QL.updateAllQueue()
  109. go P_QL.clearMem()
  110. }
  111. func (p *ProjectTask) updateAllQueue() {
  112. arru := make([][]map[string]interface{}, p.saveSize)
  113. indexu := 0
  114. for {
  115. select {
  116. case v := <-p.updatePool:
  117. arru[indexu] = v
  118. indexu++
  119. if indexu == p.saveSize {
  120. sp <- true
  121. go func(arru [][]map[string]interface{}) {
  122. defer func() {
  123. <-sp
  124. }()
  125. MongoTool.UpSertBulk(p.coll, arru...)
  126. }(arru)
  127. arru = make([][]map[string]interface{}, p.saveSize)
  128. indexu = 0
  129. }
  130. case <-time.After(1000 * time.Millisecond):
  131. if indexu > 0 {
  132. sp <- true
  133. go func(arru [][]map[string]interface{}) {
  134. defer func() {
  135. <-sp
  136. }()
  137. MongoTool.UpSertBulk(p.coll, arru...)
  138. }(arru[:indexu])
  139. arru = make([][]map[string]interface{}, p.saveSize)
  140. indexu = 0
  141. }
  142. }
  143. }
  144. }
  145. //项目合并内存更新
  146. func (p *ProjectTask) clearMem() {
  147. c := cron.New()
  148. //在内存中保留最近6个月的信息
  149. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  150. _ = c.AddFunc("50 0/5 * * * *", func() {
  151. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
  152. SingleClear = 1
  153. //跳过的次数清零
  154. p.clearContimes = 0
  155. //信息进入查找对比全局锁
  156. p.findLock.Lock()
  157. //defer p.findLock.Unlock()
  158. //合并进行的任务都完成
  159. p.wg.Wait()
  160. //遍历id
  161. //所有内存中的项目信息
  162. p.AllIdsMapLock.Lock()
  163. p.mapHrefLock.Lock()
  164. log.Println("清除开始")
  165. //清除计数
  166. clearNum := 0
  167. for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
  168. v := p.AllIdsMap[pid]
  169. if p.currentTime-v.P.LastTime > p.validTime {
  170. delete(p.mapHref, kHref)
  171. }
  172. }
  173. for k, v := range p.AllIdsMap {
  174. if p.currentTime-v.P.LastTime > p.validTime {
  175. clearNum++
  176. //删除id的map
  177. delete(p.AllIdsMap, k)
  178. //删除pb
  179. if v.P.Buyer != "" {
  180. ids := p.mapPb[v.P.Buyer]
  181. if ids != nil {
  182. ids.Lock.Lock()
  183. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  184. if len(ids.Arr) == 0 {
  185. delete(p.mapPb, v.P.Buyer)
  186. }
  187. ids.Lock.Unlock()
  188. }
  189. }
  190. //删除mapPn
  191. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  192. if vn != "" {
  193. ids := p.mapPn[vn]
  194. if ids != nil {
  195. ids.Lock.Lock()
  196. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  197. if len(ids.Arr) == 0 {
  198. delete(p.mapPn, vn)
  199. }
  200. ids.Lock.Unlock()
  201. }
  202. }
  203. }
  204. //删除mapPc
  205. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  206. if vn != "" {
  207. ids := p.mapPc[vn]
  208. if ids != nil {
  209. ids.Lock.Lock()
  210. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  211. if len(ids.Arr) == 0 {
  212. delete(p.mapPc, vn)
  213. }
  214. ids.Lock.Unlock()
  215. }
  216. }
  217. }
  218. v = nil
  219. }
  220. }
  221. p.mapHrefLock.Unlock()
  222. p.AllIdsMapLock.Unlock()
  223. p.findLock.Unlock()
  224. SingleClear = 0
  225. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
  226. } else {
  227. p.clearContimes++
  228. }
  229. })
  230. c.Start()
  231. }
  232. //全量合并
  233. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  234. defer util.Catch()
  235. p.thread = util.IntAllDef(Thread, 4)
  236. q, _ := udpInfo["query"].(map[string]interface{})
  237. if q == nil {
  238. q = map[string]interface{}{}
  239. lteid, _ := udpInfo["lteid"].(string)
  240. var idmap map[string]interface{}
  241. if len(lteid) > 15 {
  242. idmap = map[string]interface{}{
  243. "$lte": StringTOBsonId(lteid),
  244. }
  245. }
  246. gtid, _ := udpInfo["gtid"].(string)
  247. if len(gtid) > 15 {
  248. if idmap == nil {
  249. idmap = map[string]interface{}{}
  250. }
  251. idmap["$gte"] = StringTOBsonId(gtid)
  252. }
  253. if idmap != nil {
  254. q["_id"] = idmap
  255. }
  256. }
  257. //生成查询语句执行
  258. log.Println("查询语句:", q)
  259. p.enter(MongoTool.DbName, ExtractColl, q)
  260. }
  261. //增量合并
  262. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  263. defer util.Catch()
  264. //1、检查pubilshtime索引
  265. db, _ := udpInfo["db"].(string)
  266. if db == "" {
  267. db = MongoTool.DbName
  268. }
  269. coll, _ := udpInfo["coll"].(string)
  270. if coll == "" {
  271. coll = ExtractColl
  272. }
  273. thread := util.IntAllDef(Thread, 4)
  274. if thread > 0 {
  275. p.thread = thread
  276. }
  277. //开始id和结束id
  278. q, _ := udpInfo["query"].(map[string]interface{})
  279. gtid := udpInfo["gtid"].(string)
  280. lteid := udpInfo["lteid"].(string)
  281. if q == nil {
  282. q = map[string]interface{}{
  283. "_id": map[string]interface{}{
  284. "$gt": StringTOBsonId(gtid),
  285. "$lte": StringTOBsonId(lteid),
  286. },
  287. }
  288. }
  289. if q != nil {
  290. //生成查询语句执行
  291. p.enter(db, coll, q)
  292. }
  293. if udpInfo["stop"] == nil {
  294. for i := 0; i < 5; i++ {
  295. sp <- true
  296. }
  297. for i := 0; i < 5; i++ {
  298. <-sp
  299. }
  300. log.Println("保存完成,生索引", p.pici)
  301. time.Sleep(5 * time.Second)
  302. nextNode(udpInfo, p.pici)
  303. }
  304. }
  305. //招标字段更新
  306. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  307. defer util.Catch()
  308. db, _ := udpInfo["db"].(string)
  309. if db == "" {
  310. db = MongoTool.DbName
  311. }
  312. coll, _ := udpInfo["coll"].(string)
  313. if coll == "" {
  314. coll = ExtractColl
  315. }
  316. thread := util.IntAllDef(Thread, 4)
  317. if thread > 0 {
  318. p.thread = thread
  319. }
  320. q, _ := udpInfo["query"].(map[string]interface{})
  321. gtid := udpInfo["gtid"].(string)
  322. lteid := udpInfo["lteid"].(string)
  323. if q == nil {
  324. q = map[string]interface{}{
  325. "_id": map[string]interface{}{
  326. "$gte": StringTOBsonId(gtid),
  327. "$lte": StringTOBsonId(lteid),
  328. },
  329. "is_m": 1,
  330. }
  331. }
  332. log.Println("查询语句:", q)
  333. p.enter(db, coll, q)
  334. }
  335. func (p *ProjectTask) taskQuery() {
  336. defer util.Catch()
  337. count := 0
  338. sess := MongoTool.GetMgoConn()
  339. defer MongoTool.DestoryMongoConn(sess)
  340. fields := map[string]interface{} {"budget": 1, "bidamount": 1, "package": 1}
  341. ms := sess.DB(MongoTool.DbName).C(UpdateColl).Find(map[string]interface{}{}).Select(fields)
  342. query := ms.Iter()
  343. L:
  344. for {
  345. tmp := make(map[string]interface{})
  346. if query.Next(&tmp) {
  347. lastid := tmp["_id"]
  348. tmp["id"] = tmp["_id"].(primitive.ObjectID).Hex();
  349. if count%1000 == 0 {
  350. log.Println("current modify", count, lastid)
  351. }
  352. p.taskUpdateMoney(tmp)
  353. count++
  354. } else {
  355. break L
  356. }
  357. }
  358. }
  359. //修改公告信息的预算/中标金额
  360. func (p *ProjectTask) taskUpdateMoney(udpInfo map[string]interface{}) {
  361. defer util.Catch()
  362. id := udpInfo["id"].(string)
  363. budget := util.Float64All(udpInfo["budget"])
  364. bidamount := util.Float64All(udpInfo["bidamount"])
  365. client := Es.GetEsConn()
  366. defer Es.DestoryEsConn(client)
  367. esquery := `{"query": {"bool": {"must": [{"term": {"list.infoid": "`+id+`"}}]}}}`
  368. data := Es.Get(Index, Itype, esquery)
  369. if len(*data) > 0 {
  370. pid := util.ObjToString((*data)[0]["_id"])
  371. pro := MongoTool.FindById(ProjectColl, pid)
  372. if len(pro) == 0 {
  373. util.Debug("未找到项目, pid=", pid)
  374. return
  375. }
  376. var info *map[string]interface{}
  377. for _, v := range []interface{}(pro["list"].(primitive.A)){
  378. v1 := v.(map[string]interface{})
  379. if util.ObjToString(v1["infoid"]) == id {
  380. info = util.ObjToMap(v)
  381. infoField := util.ObjToMap(pro["infofield"])
  382. if udpInfo["budget"] != nil{
  383. util.Debug("update-------", (*info)["infoid"])
  384. //if pro["budget"] == (*info)["budget"] {
  385. // pro["budget"] = budget
  386. //}
  387. //多包中的金额
  388. if util.IntAll(pro["multipackage"]) == 1 {
  389. if packages, ok := pro["package"].(map[string]interface{}); ok {
  390. M :
  391. for k, v := range packages{
  392. v1 := []interface{}(v.(primitive.A))
  393. for _, v2 := range v1{
  394. v3 := v2.(map[string]interface{})
  395. if util.ObjToString(v3["infoid"]) == id {
  396. if v3["budget"] != nil {
  397. pkg := udpInfo["package"].(map[string]interface{})
  398. tmp := pkg[k].(map[string]interface{})
  399. v3["budget"] = tmp["budget"]
  400. }
  401. }else {
  402. break M
  403. }
  404. }
  405. }
  406. }
  407. }
  408. (*info)["budget"] = budget
  409. (*util.ObjToMap((*infoField)[id]))["budget"] = budget
  410. if pro["sortprice"] == (*info)["budget"] {
  411. pro["sortprice"] = budget
  412. }
  413. }else {
  414. delete(*info, "budget")
  415. }
  416. if udpInfo["bidamount"] != nil{
  417. //if pro["bidamount"] == (*info)["bidamount"] {
  418. // pro["bidamount"] = bidamount
  419. //}
  420. v1["bidamount"] = bidamount
  421. if util.IntAll(pro["multipackage"]) == 1 {
  422. if packages, ok := pro["package"].(map[string]interface{}); ok {
  423. for k, v := range packages{
  424. v1 := []interface{}(v.(primitive.A))
  425. for _, v2 := range v1{
  426. v3 := v2.(map[string]interface{})
  427. if util.ObjToString(v3["infoid"]) == id {
  428. if v3["bidamount"] != nil {
  429. pkg := udpInfo["package"].(map[string]interface{})
  430. tmp := pkg[k].(map[string]interface{})
  431. v3["bidamount"] = tmp["bidamount"]
  432. }
  433. }
  434. }
  435. }
  436. }
  437. }
  438. (*info)["bidamount"] = bidamount
  439. (*util.ObjToMap((*infoField)[id]))["bidamount"] = bidamount
  440. if pro["sortprice"] == (*info)["bidamount"] {
  441. pro["sortprice"] = bidamount
  442. }
  443. }else {
  444. delete(*info, "bidamount")
  445. }
  446. break
  447. }
  448. }
  449. var project *ProjectInfo
  450. var pInfo *Info
  451. bys, _ := json.Marshal(pro)
  452. _ = json.Unmarshal(bys, &project)
  453. bys1, _ := json.Marshal(info)
  454. _ = json.Unmarshal(bys1, &pInfo)
  455. if len(project.Ids) > 1 {
  456. CountAmount(project, pInfo, *info)
  457. if project.Budget > 0 {
  458. pro["budget"] = project.Budget
  459. }
  460. if project.Bidamount > 0 {
  461. pro["bidamount"] = project.Bidamount
  462. }
  463. }else {
  464. pro["budget"] = budget
  465. pro["bidamount"] = bidamount
  466. if budget > bidamount {
  467. pro["sortprice"] = budget
  468. }else {
  469. pro["sortprice"] = bidamount
  470. }
  471. }
  472. set := map[string]interface{}{
  473. "$set": pro,
  474. }
  475. MongoTool.UpdateById(ProjectColl, pid, set)
  476. loadStart := util.Int64All(Sysconfig["loadStart"])
  477. if loadStart > -1 && project.LastTime >loadStart {
  478. util.Debug("内存中存在该项目信息", project.Id)
  479. p.AllIdsMapLock.Lock()
  480. p.AllIdsMap[pid].P = project
  481. p.AllIdsMapLock.Unlock()
  482. }
  483. bol := Es.DelById(Index, Itype, pid)
  484. if bol {
  485. util.Debug("删除es索引, pid------", pid)
  486. //调udp生索引
  487. by, _ := json.Marshal(map[string]interface{}{
  488. "query": map[string]interface{}{
  489. "_id": bson.M{
  490. "$gte": pid,
  491. "$lte": pid,
  492. }},
  493. "stype": "project",
  494. })
  495. util.Debug(string(by))
  496. _ = udpclient.WriteUdp(by, mu.OP_TYPE_DATA, toaddr[1])
  497. }
  498. }
  499. }
  500. func FindMoney(key string, project map[string]interface{}) float64 {
  501. money := -0.1
  502. for i, v := range []interface{}(project["list"].(primitive.A)){
  503. v1 := v.(map[string]interface{})
  504. if i == 0 {
  505. if v1[key] != nil {
  506. money = util.Float64All(v1[key])
  507. }
  508. }else {
  509. if v1[key] != nil && util.Float64All(v1[key]) > money {
  510. money = util.Float64All(v1[key])
  511. }
  512. }
  513. }
  514. return money
  515. }
  516. func StringTOBsonId(id string) primitive.ObjectID {
  517. objectId, _ := primitive.ObjectIDFromHex(id)
  518. return objectId
  519. }
  520. //通知下个节点nextNode
  521. func nextNode(mapInfo map[string]interface{}, pici int64) {
  522. mapInfo["stype"] = "project"
  523. mapInfo["query"] = map[string]interface{}{
  524. "pici": pici,
  525. }
  526. key := fmt.Sprintf("%d-%s-%d", pici, "project", 0)
  527. mapInfo["key"] = key
  528. datas, _ := json.Marshal(mapInfo)
  529. node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0}
  530. udptaskmap.Store(key, node)
  531. _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, toaddr[0])
  532. }
  533. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  534. defer util.Catch()
  535. defer func() {
  536. p.Brun = false
  537. }()
  538. p.Brun = true
  539. count, taskcount := 0, 0
  540. pool := make(chan bool, p.thread)
  541. log.Println("start project", q)
  542. sess := MongoTool.GetMgoConn()
  543. defer MongoTool.DestoryMongoConn(sess)
  544. infoPool := make(chan map[string]interface{}, 2000)
  545. over := make(chan bool)
  546. go func() {
  547. L:
  548. for {
  549. select {
  550. case tmp := <-infoPool:
  551. pool <- true
  552. taskcount++
  553. go func(tmp map[string]interface{}) {
  554. defer func() {
  555. <-pool
  556. }()
  557. if util.IntAll(tmp["repeat"]) == 0 {
  558. if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 1 {
  559. //增量 dataging为1不参与合并
  560. return
  561. }
  562. p.fillInPlace(tmp)
  563. info := ParseInfo(tmp)
  564. p.currentTime = info.Publishtime
  565. if p.currentType == "updateInfo" {
  566. //招标信息更改合并
  567. p.updateJudge(tmp, info)
  568. } else {
  569. //普通合并
  570. p.CommonMerge(tmp, info)
  571. }
  572. } else {
  573. //信息错误,进行更新
  574. }
  575. }(tmp)
  576. case <-over:
  577. break L
  578. }
  579. }
  580. }()
  581. fields := map[string]interface{} {"area": 1, "city": 1, "district": 1, "comeintime": 1, "publishtime": 1, "bidopentime": 1, "title": 1, "projectname": 1, "href": 1,
  582. "projectcode": 1, "buyerclass": 1, "winner": 1, "buyer": 1, "buyerperson": 1, "buyertel": 1, "infoformat": 1, "toptype": 1, "subtype": 1, "spidercode": 1,
  583. "site": 1, "topscopeclass": 1, "subscopeclass": 1, "bidamount": 1, "budget": 1, "agency": 1, "package": 1, "jsondata": 1, "review_experts": 1, "purchasing": 1, "winnerorder": 1}
  584. ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
  585. if Sysconfig["hints"] != nil {
  586. ms.Hint(Sysconfig["hints"])
  587. }
  588. query := ms.Iter()
  589. //query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
  590. var lastid interface{}
  591. L:
  592. for {
  593. select {
  594. case <-queryClose:
  595. log.Println("receive interrupt sign")
  596. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  597. queryCloseOver <- true
  598. break L
  599. default:
  600. tmp := make(map[string]interface{})
  601. if query.Next(&tmp) {
  602. lastid = tmp["_id"]
  603. if count%10000 == 0 {
  604. log.Println("current", count, lastid)
  605. }
  606. infoPool <- tmp
  607. count++
  608. } else {
  609. break L
  610. }
  611. }
  612. }
  613. time.Sleep(5 * time.Second)
  614. over <- true
  615. //阻塞
  616. for n := 0; n < p.thread; n++ {
  617. pool <- true
  618. }
  619. log.Println("所有线程执行完成...", count, taskcount)
  620. }
  621. var (
  622. //从标题获取项目编号
  623. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  624. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  625. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  626. //项目编号过滤
  627. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  628. //项目编号只是数字或只是字母4个以下
  629. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  630. //纯数字或纯字母
  631. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  632. //含分包词,招标未识别分包 合并到一个项目
  633. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  634. )
  635. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  636. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  637. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  638. proHref := util.ObjToString(jsonData["projecthref"])
  639. if jsonData != nil && proHref != "" {
  640. //projectHref字段合并
  641. tmp["projecthref"] = proHref
  642. p.mapHrefLock.Lock()
  643. pid := p.mapHref[proHref]
  644. p.mapHrefLock.Unlock()
  645. if pid != "" {
  646. p.AllIdsMapLock.Lock()
  647. comparePro := p.AllIdsMap[pid].P
  648. p.AllIdsMapLock.Unlock()
  649. _, ex := p.CompareStatus(comparePro, info)
  650. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  651. } else {
  652. id, p1 := p.NewProject(tmp, info)
  653. p.mapHrefLock.Lock()
  654. p.mapHref[proHref] = id
  655. p.mapHrefLock.Unlock()
  656. p.AllIdsMapLock.Lock()
  657. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  658. p.AllIdsMapLock.Unlock()
  659. }
  660. } else {
  661. //项目合并
  662. p.startProjectMerge(info, tmp)
  663. }
  664. } else {
  665. //项目合并
  666. p.startProjectMerge(info, tmp)
  667. }
  668. }
  669. }
  670. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  671. bys, _ := json.Marshal(tmp)
  672. var thisinfo *Info
  673. _ = json.Unmarshal(bys, &thisinfo)
  674. if thisinfo == nil {
  675. return nil
  676. }
  677. if len(thisinfo.Topscopeclass) == 0 {
  678. thisinfo.Topscopeclass = []string{}
  679. }
  680. if len(thisinfo.Subscopeclass) == 0 {
  681. thisinfo.Subscopeclass = []string{}
  682. }
  683. if thisinfo.SubType == "" {
  684. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  685. }
  686. //从标题中查找项目编号
  687. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  688. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  689. thisinfo.PTC = res[1]
  690. } else {
  691. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  692. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  693. thisinfo.PTC = res[3]
  694. } else {
  695. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  696. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  697. thisinfo.PTC = res[1]
  698. }
  699. }
  700. }
  701. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  702. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  703. if thisinfo.ProjectName != "" {
  704. thisinfo.pnbval++
  705. }
  706. }
  707. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  708. if thisinfo.ProjectCode != "" {
  709. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  710. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  711. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  712. }
  713. } else {
  714. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  715. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  716. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  717. }
  718. }
  719. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  720. thisinfo.pnbval++
  721. }
  722. }
  723. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  724. thisinfo.PTC = ""
  725. }
  726. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  727. thisinfo.pnbval++
  728. } else {
  729. thisinfo.Buyer = ""
  730. }
  731. //清理评审专家名单
  732. if len(thisinfo.ReviewExperts) > 0 {
  733. thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
  734. }
  735. //winners整理、清理
  736. winner := QyFilter(util.ObjToString(tmp["winner"]), "winner")
  737. tmp["winner"] = winner
  738. m1 := map[string]bool{}
  739. winners := []string{}
  740. if winner != "" {
  741. m1[winner] = true
  742. winners = append(winners, winner)
  743. }
  744. packageM, _ := tmp["package"].(map[string]interface{})
  745. if packageM != nil {
  746. thisinfo.HasPackage = true
  747. for _, p := range packageM {
  748. pm, _ := p.(map[string]interface{})
  749. pw := QyFilter(util.ObjToString(pm["winner"]), "winner")
  750. if pw != "" && !m1[pw] {
  751. m1[pw] = true
  752. winners = append(winners, pw)
  753. }
  754. }
  755. }
  756. thisinfo.Winners = winners
  757. //清理winnerorder
  758. var wins []map[string]interface{}
  759. for _, v := range thisinfo.WinnerOrder {
  760. w := QyFilter(util.ObjToString(v["entname"]), "winner")
  761. if w != "" {
  762. v["entname"] = w
  763. wins = append(wins, v)
  764. }
  765. }
  766. thisinfo.WinnerOrder = wins
  767. //清理buyer
  768. buyer := QyFilter(util.ObjToString(tmp["tmp"]), "buyer")
  769. tmp["buyer"] = buyer
  770. thisinfo.Buyer = buyer
  771. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  772. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  773. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  774. //处理分包中数据异常问题
  775. for k, tmp := range thisinfo.Package {
  776. if ps, ok := tmp.([]map[string]interface{}); ok {
  777. for i, p := range ps {
  778. name, _ := p["name"].(string)
  779. if len([]rune(name)) > 100 {
  780. p["name"] = fmt.Sprint([]rune(name[:100]))
  781. }
  782. ps[i] = p
  783. }
  784. thisinfo.Package[k] = ps
  785. }
  786. }
  787. return thisinfo
  788. }
  789. func (p *ProjectTask) updateJudge(tmp map[string]interface{}, info *Info) {
  790. index := -1
  791. pInfoId := ""
  792. p.AllIdsMapLock.Lock()
  793. F:
  794. for k, ID := range p.AllIdsMap {
  795. for i, id := range ID.P.Ids {
  796. if info.Id == id {
  797. pInfoId = k
  798. index = i
  799. break F
  800. }
  801. }
  802. }
  803. p.AllIdsMapLock.Unlock()
  804. //未找到招标信息
  805. if index == -1 {
  806. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  807. p.currentTime = info.Publishtime
  808. p.startProjectMerge(info, tmp)
  809. }
  810. } else {
  811. tmpPro := MongoTool.FindById(ProjectColl, pInfoId)
  812. infoList := []interface{}(tmpPro["list"].(primitive.A))
  813. infoMap := infoList[index].(map[string]interface{})
  814. modifyMap, f := modifyEle(infoMap, tmp)
  815. //projecthref字段
  816. jsonData := tmp["jsondata"].(map[string]interface{})
  817. if jsonData != nil && jsonData["projecthref"] != nil {
  818. proHref := jsonData["projecthref"].(string)
  819. tmp["projecthref"] = proHref
  820. p.mapHrefLock.Lock()
  821. pid := p.mapHref[proHref]
  822. p.mapHrefLock.Unlock()
  823. if pid == pInfoId {
  824. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  825. return
  826. }
  827. }
  828. if f {
  829. //合并、修改
  830. log.Println("合并修改更新", "----------------------------")
  831. p.mergeAndModify(pInfoId, index, info, tmp, tmpPro, modifyMap)
  832. } else {
  833. //修改
  834. log.Println("修改更新", "----------------------------")
  835. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  836. }
  837. }
  838. }
  839. var Elements = []string{
  840. "projectname",
  841. "projectcode",
  842. "agency",
  843. "budget",
  844. "bidamount",
  845. "buyerperson",
  846. "area",
  847. "city",
  848. "publishtime",
  849. }
  850. /**
  851. 判断修改的字段是否是影响合并流程的要素字段
  852. */
  853. func modifyEle(tmpPro map[string]interface{}, tmp map[string]interface{}) (map[string]interface{}, bool) {
  854. modifyMap := map[string]interface{}{}
  855. for k := range tmpPro {
  856. for k1 := range tmp {
  857. if k == k1 && tmpPro[k] != tmp[k1] {
  858. modifyMap[k] = tmp[k1]
  859. break
  860. }
  861. }
  862. }
  863. for k := range modifyMap {
  864. for _, str := range Elements {
  865. if k == str {
  866. return modifyMap, true
  867. }
  868. }
  869. }
  870. delete(modifyMap, "_id")
  871. return modifyMap, false
  872. }
  873. //补全位置信息
  874. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  875. area := util.ObjToString(tmp["area"])
  876. city := util.ObjToString(tmp["city"])
  877. if area != "" && city != "" {
  878. return
  879. }
  880. tmpSite := util.ObjToString(tmp["site"])
  881. if tmpSite == "" {
  882. return
  883. }
  884. p.mapSiteLock.Lock()
  885. defer p.mapSiteLock.Unlock()
  886. site := p.mapSite[tmpSite]
  887. if site != nil {
  888. if area != "" {
  889. if area == "全国" {
  890. tmp["area"] = site.Area
  891. tmp["city"] = site.City
  892. tmp["district"] = site.District
  893. return
  894. }
  895. if area != site.Area {
  896. return
  897. } else {
  898. if site.City != "" {
  899. tmp["area"] = site.Area
  900. tmp["city"] = site.City
  901. tmp["district"] = site.District
  902. }
  903. }
  904. } else {
  905. tmp["area"] = site.Area
  906. tmp["city"] = site.City
  907. tmp["district"] = site.District
  908. return
  909. }
  910. }
  911. }
  912. //从数组中删除元素
  913. func deleteSlice(arr []string, v, stype string) []string {
  914. for k, v1 := range arr {
  915. if v1 == v {
  916. ts := time.Now().Unix()
  917. arr = append(arr[:k], arr[k+1:]...)
  918. rt := time.Now().Unix() - ts
  919. if rt > 0 {
  920. log.Println("deleteSlice", stype, rt, v, len(arr))
  921. }
  922. return arr
  923. }
  924. }
  925. return arr
  926. }
  927. //校验评审专家
  928. func ClearRp(tmp []string) []string {
  929. arrTmp := []string{}
  930. for _, v := range tmp {
  931. // 汉字过滤(全汉字,2-4个字)
  932. if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
  933. continue
  934. }
  935. //黑名单过滤
  936. if BlaskListMap[v] {
  937. continue
  938. }
  939. arrTmp = append(arrTmp, v)
  940. }
  941. return arrTmp
  942. }
  943. func QyFilter(name, stype string) string {
  944. name = strings.ReplaceAll(name, " ", "")
  945. preReg := PreRegexp[stype]
  946. for _, v := range preReg {
  947. name = v.ReplaceAllString(name, "")
  948. }
  949. backReg := BackRegexp[stype]
  950. for _, v := range backReg {
  951. name = v.ReplaceAllString(name, "")
  952. }
  953. backRepReg := BackRepRegexp[stype]
  954. for _, v := range backRepReg {
  955. name = v.regs.ReplaceAllString(name, v.repstr)
  956. }
  957. blackReg := BlackRegexp[stype]
  958. for _, v := range blackReg {
  959. if v.MatchString(name) {
  960. name = ""
  961. break
  962. }
  963. }
  964. if !regexp.MustCompile("[\\p{Han}]{4,}").MatchString(name) {
  965. name = ""
  966. }
  967. if utf8.RuneCountInString(name) > 60 {
  968. name = ""
  969. }
  970. return name
  971. }