task.go 27 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "log"
  7. mu "mfw/util"
  8. "qfw/util"
  9. "regexp"
  10. "strings"
  11. "sync"
  12. "time"
  13. "unicode/utf8"
  14. "github.com/robfig/cron"
  15. "go.mongodb.org/mongo-driver/bson/primitive"
  16. )
  17. /**
  18. 任务入口
  19. 全量、增量合并
  20. 更新、插入,内存清理
  21. 转换成info对象
  22. **/
  23. var PreRegexp = map[string][]*regexp.Regexp{}
  24. var BackRegexp = map[string][]*regexp.Regexp{}
  25. var BackRepRegexp = map[string][]RegexpInfo{}
  26. var BlackRegexp = map[string][]*regexp.Regexp{}
  27. type RegexpInfo struct {
  28. regs *regexp.Regexp
  29. repstr string
  30. }
  31. //项目合并对象
  32. type ProjectTask struct {
  33. InitMinTime int64 //最小时间,小于0的处理一次
  34. name string
  35. thread int //线程数
  36. //查找锁
  37. findLock sync.Mutex
  38. wg sync.WaitGroup
  39. //map锁
  40. AllIdsMapLock sync.Mutex
  41. //对应的id
  42. AllIdsMap map[string]*ID
  43. //采购单位、项目名称、项目编号
  44. mapPb, mapPn, mapPc map[string]*Key
  45. //流程数据 字段相同,直接合并
  46. mapHref map[string]string
  47. mapHrefLock sync.Mutex
  48. //站点
  49. mapSite map[string]*Site
  50. mapSiteLock sync.Mutex
  51. //bidtype、bidstatus 锁
  52. mapBidLock sync.Mutex
  53. //更新或新增通道
  54. updatePool chan []map[string]interface{}
  55. //savePool chan map[string]interface{}
  56. //saveSign, updateSign chan bool
  57. //表名
  58. coll string
  59. //当前状态是全量还是增量
  60. currentType string //当前是跑全量还是跑增量
  61. //
  62. clearContimes int
  63. //当前时间
  64. currentTime int64
  65. //保存长度
  66. saveSize int
  67. pici int64
  68. validTime int64
  69. statusTime int64
  70. //结果时间的更新 最近两天的公告不再更新jgtime
  71. jgTime int64
  72. // LockPool chan *sync.Mutex
  73. // LockPoolLock sync.Mutex
  74. // m1, m23, m4 map[int]int
  75. // l1, l23, l4 map[int]*sync.Mutex
  76. Brun bool
  77. }
  78. func NewPT() *ProjectTask {
  79. p := &ProjectTask{
  80. InitMinTime: int64(1325347200),
  81. name: "全/增量对象",
  82. thread: Thread,
  83. updatePool: make(chan []map[string]interface{}, 5000),
  84. //savePool: make(chan map[string]interface{}, 2000),
  85. wg: sync.WaitGroup{},
  86. AllIdsMap: make(map[string]*ID, 5000000),
  87. mapPb: make(map[string]*Key, 1500000),
  88. mapPn: make(map[string]*Key, 5000000),
  89. mapPc: make(map[string]*Key, 5000000),
  90. mapHref: make(map[string]string, 1500000),
  91. mapSite: make(map[string]*Site, 1000000),
  92. saveSize: 100,
  93. //saveSign: make(chan bool, 1),
  94. //updateSign: make(chan bool, 1),
  95. coll: ProjectColl,
  96. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  97. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  98. jgTime: int64(util.IntAllDef(3, 3) * 86400),
  99. }
  100. return p
  101. }
  102. var P_QL *ProjectTask
  103. var sp = make(chan bool, 5)
  104. //初始化全量合并对象
  105. func init() {
  106. P_QL = NewPT()
  107. log.Println(len(P_QL.updatePool))
  108. go P_QL.updateAllQueue()
  109. go P_QL.clearMem()
  110. }
  111. func (p *ProjectTask) updateAllQueue() {
  112. arru := make([][]map[string]interface{}, p.saveSize)
  113. indexu := 0
  114. for {
  115. select {
  116. case v := <-p.updatePool:
  117. arru[indexu] = v
  118. indexu++
  119. if indexu == p.saveSize {
  120. sp <- true
  121. go func(arru [][]map[string]interface{}) {
  122. defer func() {
  123. <-sp
  124. }()
  125. MongoTool.UpSertBulk(p.coll, arru...)
  126. }(arru)
  127. arru = make([][]map[string]interface{}, p.saveSize)
  128. indexu = 0
  129. }
  130. case <-time.After(1000 * time.Millisecond):
  131. if indexu > 0 {
  132. sp <- true
  133. go func(arru [][]map[string]interface{}) {
  134. defer func() {
  135. <-sp
  136. }()
  137. MongoTool.UpSertBulk(p.coll, arru...)
  138. }(arru[:indexu])
  139. arru = make([][]map[string]interface{}, p.saveSize)
  140. indexu = 0
  141. }
  142. }
  143. }
  144. }
  145. //项目合并内存更新
  146. func (p *ProjectTask) clearMem() {
  147. c := cron.New()
  148. //在内存中保留最近6个月的信息
  149. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  150. _ = c.AddFunc("50 0/5 * * * *", func() {
  151. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
  152. SingleClear = 1
  153. //跳过的次数清零
  154. p.clearContimes = 0
  155. //信息进入查找对比全局锁
  156. p.findLock.Lock()
  157. //defer p.findLock.Unlock()
  158. //合并进行的任务都完成
  159. p.wg.Wait()
  160. //遍历id
  161. //所有内存中的项目信息
  162. p.AllIdsMapLock.Lock()
  163. p.mapHrefLock.Lock()
  164. log.Println("清除开始")
  165. //清除计数
  166. clearNum := 0
  167. for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
  168. v := p.AllIdsMap[pid]
  169. if p.currentTime-v.P.LastTime > p.validTime {
  170. delete(p.mapHref, kHref)
  171. }
  172. }
  173. for k, v := range p.AllIdsMap {
  174. if p.currentTime-v.P.LastTime > p.validTime {
  175. clearNum++
  176. //删除id的map
  177. delete(p.AllIdsMap, k)
  178. //删除pb
  179. if v.P.Buyer != "" {
  180. ids := p.mapPb[v.P.Buyer]
  181. if ids != nil {
  182. ids.Lock.Lock()
  183. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  184. if len(ids.Arr) == 0 {
  185. delete(p.mapPb, v.P.Buyer)
  186. }
  187. ids.Lock.Unlock()
  188. }
  189. }
  190. //删除mapPn
  191. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  192. if vn != "" {
  193. ids := p.mapPn[vn]
  194. if ids != nil {
  195. ids.Lock.Lock()
  196. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  197. if len(ids.Arr) == 0 {
  198. delete(p.mapPn, vn)
  199. }
  200. ids.Lock.Unlock()
  201. }
  202. }
  203. }
  204. //删除mapPc
  205. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  206. if vn != "" {
  207. ids := p.mapPc[vn]
  208. if ids != nil {
  209. ids.Lock.Lock()
  210. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  211. if len(ids.Arr) == 0 {
  212. delete(p.mapPc, vn)
  213. }
  214. ids.Lock.Unlock()
  215. }
  216. }
  217. }
  218. v = nil
  219. }
  220. }
  221. p.mapHrefLock.Unlock()
  222. p.AllIdsMapLock.Unlock()
  223. p.findLock.Unlock()
  224. SingleClear = 0
  225. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
  226. } else {
  227. p.clearContimes++
  228. }
  229. })
  230. c.Start()
  231. }
  232. //全量合并
  233. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  234. defer util.Catch()
  235. p.thread = util.IntAllDef(Thread, 4)
  236. q, _ := udpInfo["query"].(map[string]interface{})
  237. if q == nil {
  238. q = map[string]interface{}{}
  239. lteid, _ := udpInfo["lteid"].(string)
  240. var idmap map[string]interface{}
  241. if len(lteid) > 15 {
  242. idmap = map[string]interface{}{
  243. "$lte": StringTOBsonId(lteid),
  244. }
  245. }
  246. gtid, _ := udpInfo["gtid"].(string)
  247. if len(gtid) > 15 {
  248. if idmap == nil {
  249. idmap = map[string]interface{}{}
  250. }
  251. idmap["$gte"] = StringTOBsonId(gtid)
  252. }
  253. if idmap != nil {
  254. q["_id"] = idmap
  255. }
  256. }
  257. //生成查询语句执行
  258. log.Println("查询语句:", q)
  259. p.enter(MongoTool.DbName, ExtractColl, q)
  260. }
  261. //增量合并
  262. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  263. defer util.Catch()
  264. //1、检查pubilshtime索引
  265. db, _ := udpInfo["db"].(string)
  266. if db == "" {
  267. db = MongoTool.DbName
  268. }
  269. coll, _ := udpInfo["coll"].(string)
  270. if coll == "" {
  271. coll = ExtractColl
  272. }
  273. thread := util.IntAllDef(Thread, 4)
  274. if thread > 0 {
  275. p.thread = thread
  276. }
  277. //开始id和结束id
  278. q, _ := udpInfo["query"].(map[string]interface{})
  279. gtid := udpInfo["gtid"].(string)
  280. lteid := udpInfo["lteid"].(string)
  281. if q == nil {
  282. q = map[string]interface{}{
  283. "_id": map[string]interface{}{
  284. "$gt": StringTOBsonId(gtid),
  285. "$lte": StringTOBsonId(lteid),
  286. },
  287. }
  288. }
  289. if q != nil {
  290. //生成查询语句执行
  291. p.enter(db, coll, q)
  292. }
  293. if udpInfo["stop"] == nil {
  294. for i := 0; i < 5; i++ {
  295. sp <- true
  296. }
  297. for i := 0; i < 5; i++ {
  298. <-sp
  299. }
  300. log.Println("保存完成,生索引", p.pici)
  301. time.Sleep(5 * time.Second)
  302. nextNode(udpInfo, p.pici)
  303. }
  304. }
  305. //招标字段更新
  306. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  307. defer util.Catch()
  308. db := MongoTool.DbName
  309. coll, _ := udpInfo["coll"].(string)
  310. if coll == "" {
  311. coll = ExtractColl
  312. }
  313. thread := util.IntAllDef(Thread, 4)
  314. if thread > 0 {
  315. p.thread = thread
  316. }
  317. q, _ := udpInfo["query"].(map[string]interface{})
  318. gtid := udpInfo["gtid"].(string)
  319. lteid := udpInfo["lteid"].(string)
  320. if q == nil {
  321. q = map[string]interface{}{
  322. "_id": map[string]interface{}{
  323. "$gte": StringTOBsonId(gtid),
  324. "$lte": StringTOBsonId(lteid),
  325. },
  326. "is_m": 1,
  327. }
  328. }
  329. log.Println("查询语句:", q)
  330. p.enter(db, coll, q)
  331. }
  332. func (p *ProjectTask) taskQuery() {
  333. defer util.Catch()
  334. count := 0
  335. sess := MongoTool.GetMgoConn()
  336. defer MongoTool.DestoryMongoConn(sess)
  337. fields := map[string]interface{} {"budget": 1, "bidamount": 1, "package": 1}
  338. ms := sess.DB(MongoTool.DbName).C(UpdateColl).Find(map[string]interface{}{}).Select(fields)
  339. query := ms.Iter()
  340. L:
  341. for {
  342. tmp := make(map[string]interface{})
  343. if query.Next(&tmp) {
  344. lastid := tmp["_id"]
  345. tmp["id"] = tmp["_id"].(primitive.ObjectID).Hex();
  346. if count%1000 == 0 {
  347. log.Println("current modify", count, lastid)
  348. }
  349. p.taskUpdateMoney(tmp)
  350. count++
  351. } else {
  352. break L
  353. }
  354. }
  355. }
  356. //修改公告信息的预算/中标金额
  357. func (p *ProjectTask) taskUpdateMoney(udpInfo map[string]interface{}) {
  358. defer util.Catch()
  359. id := udpInfo["id"].(string)
  360. budget := util.Float64All(udpInfo["budget"])
  361. bidamount := util.Float64All(udpInfo["bidamount"])
  362. client := Es.GetEsConn()
  363. defer Es.DestoryEsConn(client)
  364. esquery := `{"query": {"bool": {"must": [{"term": {"list.infoid": "`+id+`"}}]}}}`
  365. data := Es.Get(Index, Itype, esquery)
  366. if len(*data) > 0 {
  367. pid := util.ObjToString((*data)[0]["_id"])
  368. pro := MongoTool.FindById(ProjectColl, pid)
  369. if len(pro) == 0 {
  370. util.Debug("未找到项目, pid=", pid)
  371. return
  372. }
  373. var info *map[string]interface{}
  374. for _, v := range []interface{}(pro["list"].(primitive.A)){
  375. v1 := v.(map[string]interface{})
  376. if util.ObjToString(v1["infoid"]) == id {
  377. info = util.ObjToMap(v)
  378. infoField := util.ObjToMap(pro["infofield"])
  379. if udpInfo["budget"] != nil{
  380. util.Debug("update-------", (*info)["infoid"])
  381. //if pro["budget"] == (*info)["budget"] {
  382. // pro["budget"] = budget
  383. //}
  384. //多包中的金额
  385. if util.IntAll(pro["multipackage"]) == 1 {
  386. if packages, ok := pro["package"].(map[string]interface{}); ok {
  387. M :
  388. for k, v := range packages{
  389. v1 := []interface{}(v.(primitive.A))
  390. for _, v2 := range v1{
  391. v3 := v2.(map[string]interface{})
  392. if util.ObjToString(v3["infoid"]) == id {
  393. if v3["budget"] != nil {
  394. pkg := udpInfo["package"].(map[string]interface{})
  395. tmp := pkg[k].(map[string]interface{})
  396. v3["budget"] = tmp["budget"]
  397. }
  398. }else {
  399. break M
  400. }
  401. }
  402. }
  403. }
  404. }
  405. (*info)["budget"] = budget
  406. (*util.ObjToMap((*infoField)[id]))["budget"] = budget
  407. if pro["sortprice"] == (*info)["budget"] {
  408. pro["sortprice"] = budget
  409. }
  410. }else {
  411. delete(*info, "budget")
  412. }
  413. if udpInfo["bidamount"] != nil{
  414. //if pro["bidamount"] == (*info)["bidamount"] {
  415. // pro["bidamount"] = bidamount
  416. //}
  417. v1["bidamount"] = bidamount
  418. if util.IntAll(pro["multipackage"]) == 1 {
  419. if packages, ok := pro["package"].(map[string]interface{}); ok {
  420. for k, v := range packages{
  421. v1 := []interface{}(v.(primitive.A))
  422. for _, v2 := range v1{
  423. v3 := v2.(map[string]interface{})
  424. if util.ObjToString(v3["infoid"]) == id {
  425. if v3["bidamount"] != nil {
  426. pkg := udpInfo["package"].(map[string]interface{})
  427. tmp := pkg[k].(map[string]interface{})
  428. v3["bidamount"] = tmp["bidamount"]
  429. }
  430. }
  431. }
  432. }
  433. }
  434. }
  435. (*info)["bidamount"] = bidamount
  436. (*util.ObjToMap((*infoField)[id]))["bidamount"] = bidamount
  437. if pro["sortprice"] == (*info)["bidamount"] {
  438. pro["sortprice"] = bidamount
  439. }
  440. }else {
  441. delete(*info, "bidamount")
  442. }
  443. break
  444. }
  445. }
  446. var project *ProjectInfo
  447. var pInfo *Info
  448. bys, _ := json.Marshal(pro)
  449. _ = json.Unmarshal(bys, &project)
  450. bys1, _ := json.Marshal(info)
  451. _ = json.Unmarshal(bys1, &pInfo)
  452. if len(project.Ids) > 1 {
  453. CountAmount(project, pInfo, *info)
  454. if project.Budget > 0 {
  455. pro["budget"] = project.Budget
  456. }
  457. if project.Bidamount > 0 {
  458. pro["bidamount"] = project.Bidamount
  459. }
  460. }else {
  461. pro["budget"] = budget
  462. pro["bidamount"] = bidamount
  463. if budget > bidamount {
  464. pro["sortprice"] = budget
  465. }else {
  466. pro["sortprice"] = bidamount
  467. }
  468. }
  469. set := map[string]interface{}{
  470. "$set": pro,
  471. }
  472. MongoTool.UpdateById(ProjectColl, pid, set)
  473. loadStart := util.Int64All(Sysconfig["loadStart"])
  474. if loadStart > -1 && project.LastTime >loadStart {
  475. util.Debug("内存中存在该项目信息", project.Id)
  476. p.AllIdsMapLock.Lock()
  477. p.AllIdsMap[pid].P = project
  478. p.AllIdsMapLock.Unlock()
  479. }
  480. bol := Es.DelById(Index, Itype, pid)
  481. if bol {
  482. util.Debug("删除es索引, pid------", pid)
  483. //调udp生索引
  484. by, _ := json.Marshal(map[string]interface{}{
  485. "query": map[string]interface{}{
  486. "_id": bson.M{
  487. "$gte": pid,
  488. "$lte": pid,
  489. }},
  490. "stype": "project",
  491. })
  492. util.Debug(string(by))
  493. _ = udpclient.WriteUdp(by, mu.OP_TYPE_DATA, toaddr[1])
  494. }
  495. }
  496. }
  497. func FindMoney(key string, project map[string]interface{}) float64 {
  498. money := -0.1
  499. for i, v := range []interface{}(project["list"].(primitive.A)){
  500. v1 := v.(map[string]interface{})
  501. if i == 0 {
  502. if v1[key] != nil {
  503. money = util.Float64All(v1[key])
  504. }
  505. }else {
  506. if v1[key] != nil && util.Float64All(v1[key]) > money {
  507. money = util.Float64All(v1[key])
  508. }
  509. }
  510. }
  511. return money
  512. }
  513. func StringTOBsonId(id string) primitive.ObjectID {
  514. objectId, _ := primitive.ObjectIDFromHex(id)
  515. return objectId
  516. }
  517. //通知下个节点nextNode
  518. func nextNode(mapInfo map[string]interface{}, pici int64) {
  519. mapInfo["stype"] = "project"
  520. mapInfo["query"] = map[string]interface{}{
  521. "pici": pici,
  522. }
  523. key := fmt.Sprintf("%d-%s-%d", pici, "project", 0)
  524. mapInfo["key"] = key
  525. datas, _ := json.Marshal(mapInfo)
  526. node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0}
  527. udptaskmap.Store(key, node)
  528. _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, toaddr[0])
  529. }
  530. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  531. defer util.Catch()
  532. defer func() {
  533. p.Brun = false
  534. }()
  535. p.Brun = true
  536. count, taskcount := 0, 0
  537. countRepeat := 0
  538. pool := make(chan bool, p.thread)
  539. log.Println("start project", q)
  540. sess := MongoTool.GetMgoConn()
  541. defer MongoTool.DestoryMongoConn(sess)
  542. infoPool := make(chan map[string]interface{}, 2000)
  543. over := make(chan bool)
  544. go func() {
  545. L:
  546. for {
  547. select {
  548. case tmp := <-infoPool:
  549. pool <- true
  550. taskcount++
  551. go func(tmp map[string]interface{}) {
  552. defer func() {
  553. <-pool
  554. }()
  555. if util.IntAll(tmp["repeat"]) == 0 {
  556. if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 1 {
  557. //增量 dataging为1不参与合并
  558. util.Debug("增量 dataging == 1 ", tmp["_id"])
  559. return
  560. }
  561. p.fillInPlace(tmp)
  562. info := ParseInfo(tmp)
  563. p.currentTime = info.Publishtime
  564. if p.currentType == "updateInfo" {
  565. //招标信息更改合并
  566. p.updateJudge(tmp, info)
  567. } else {
  568. //普通合并
  569. p.CommonMerge(tmp, info)
  570. }
  571. } else {
  572. //信息错误,进行更新
  573. util.Debug(tmp["_id"])
  574. countRepeat++
  575. }
  576. }(tmp)
  577. case <-over:
  578. break L
  579. }
  580. }
  581. }()
  582. //fields := map[string]interface{} {"repeat": 1, "dataging": 1, "area": 1, "city": 1, "district": 1, "comeintime": 1, "publishtime": 1, "bidopentime": 1, "title": 1, "projectname": 1, "href": 1,
  583. // "projectcode": 1, "buyerclass": 1, "winner": 1, "buyer": 1, "buyerperson": 1, "buyertel": 1, "infoformat": 1, "toptype": 1, "subtype": 1, "spidercode": 1, "projectscope": 1, "contractcode": 1,
  584. // "site": 1, "topscopeclass": 1, "subscopeclass": 1, "bidamount": 1, "budget": 1, "agency": 1, "package": 1, "jsondata": 1, "review_experts": 1, "purchasing": 1, "winnerorder": 1}
  585. fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0}
  586. ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
  587. if Sysconfig["hints"] != nil {
  588. ms.Hint(Sysconfig["hints"])
  589. }
  590. query := ms.Iter()
  591. //query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
  592. var lastid interface{}
  593. L:
  594. for {
  595. select {
  596. case <-queryClose:
  597. log.Println("receive interrupt sign")
  598. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  599. queryCloseOver <- true
  600. break L
  601. default:
  602. tmp := make(map[string]interface{})
  603. if query.Next(&tmp) {
  604. lastid = tmp["_id"]
  605. if count%10000 == 0 {
  606. log.Println("current", count, lastid)
  607. }
  608. infoPool <- tmp
  609. count++
  610. } else {
  611. break L
  612. }
  613. }
  614. }
  615. time.Sleep(5 * time.Second)
  616. over <- true
  617. //阻塞
  618. for n := 0; n < p.thread; n++ {
  619. pool <- true
  620. }
  621. log.Println("所有线程执行完成...", count, taskcount, countRepeat)
  622. }
  623. var (
  624. //从标题获取项目编号
  625. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  626. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  627. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  628. //项目编号过滤
  629. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  630. //项目编号只是数字或只是字母4个以下
  631. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  632. //纯数字或纯字母
  633. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  634. //含分包词,招标未识别分包 合并到一个项目
  635. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  636. )
  637. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  638. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  639. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  640. proHref := util.ObjToString(jsonData["projecthref"])
  641. if jsonData != nil && proHref != "" {
  642. //projectHref字段合并
  643. tmp["projecthref"] = proHref
  644. p.mapHrefLock.Lock()
  645. pid := p.mapHref[proHref]
  646. p.mapHrefLock.Unlock()
  647. if pid != "" {
  648. p.AllIdsMapLock.Lock()
  649. comparePro := p.AllIdsMap[pid].P
  650. p.AllIdsMapLock.Unlock()
  651. _, ex := p.CompareStatus(comparePro, info)
  652. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  653. } else {
  654. id, p1 := p.NewProject(tmp, info)
  655. p.mapHrefLock.Lock()
  656. p.mapHref[proHref] = id
  657. p.mapHrefLock.Unlock()
  658. p.AllIdsMapLock.Lock()
  659. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  660. p.AllIdsMapLock.Unlock()
  661. }
  662. } else {
  663. //项目合并
  664. p.startProjectMerge(info, tmp)
  665. }
  666. } else {
  667. //项目合并
  668. p.startProjectMerge(info, tmp)
  669. }
  670. }
  671. }
  672. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  673. bys, _ := json.Marshal(tmp)
  674. var thisinfo *Info
  675. _ = json.Unmarshal(bys, &thisinfo)
  676. if thisinfo == nil {
  677. return nil
  678. }
  679. if len(thisinfo.Topscopeclass) == 0 {
  680. thisinfo.Topscopeclass = []string{}
  681. }
  682. if len(thisinfo.Subscopeclass) == 0 {
  683. thisinfo.Subscopeclass = []string{}
  684. }
  685. if thisinfo.SubType == "" {
  686. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  687. }
  688. //从标题中查找项目编号
  689. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  690. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  691. thisinfo.PTC = res[1]
  692. } else {
  693. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  694. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  695. thisinfo.PTC = res[3]
  696. } else {
  697. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  698. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  699. thisinfo.PTC = res[1]
  700. }
  701. }
  702. }
  703. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  704. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  705. if thisinfo.ProjectName != "" {
  706. thisinfo.pnbval++
  707. }
  708. }
  709. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  710. if thisinfo.ProjectCode != "" {
  711. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  712. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  713. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  714. }
  715. } else {
  716. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  717. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  718. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  719. }
  720. }
  721. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  722. thisinfo.pnbval++
  723. }
  724. }
  725. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  726. thisinfo.PTC = ""
  727. }
  728. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  729. thisinfo.pnbval++
  730. } else {
  731. thisinfo.Buyer = ""
  732. }
  733. //清理评审专家名单
  734. if len(thisinfo.ReviewExperts) > 0 {
  735. thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
  736. }
  737. //winners整理、清理
  738. winner := QyFilter(util.ObjToString(tmp["winner"]), "winner")
  739. tmp["winner"] = winner
  740. m1 := map[string]bool{}
  741. winners := []string{}
  742. if winner != "" {
  743. m1[winner] = true
  744. winners = append(winners, winner)
  745. }
  746. packageM, _ := tmp["package"].(map[string]interface{})
  747. if packageM != nil {
  748. thisinfo.HasPackage = true
  749. for _, p := range packageM {
  750. pm, _ := p.(map[string]interface{})
  751. pw := QyFilter(util.ObjToString(pm["winner"]), "winner")
  752. if pw != "" && !m1[pw] {
  753. m1[pw] = true
  754. winners = append(winners, pw)
  755. }
  756. }
  757. }
  758. thisinfo.Winners = winners
  759. //清理winnerorder
  760. var wins []map[string]interface{}
  761. for _, v := range thisinfo.WinnerOrder {
  762. w := QyFilter(util.ObjToString(v["entname"]), "winner")
  763. if w != "" {
  764. v["entname"] = w
  765. wins = append(wins, v)
  766. }
  767. }
  768. thisinfo.WinnerOrder = wins
  769. //清理buyer
  770. buyer := QyFilter(util.ObjToString(tmp["buyer"]), "buyer")
  771. tmp["buyer"] = buyer
  772. thisinfo.Buyer = buyer
  773. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  774. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  775. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  776. //处理分包中数据异常问题
  777. for k, tmp := range thisinfo.Package {
  778. if ps, ok := tmp.([]map[string]interface{}); ok {
  779. for i, p := range ps {
  780. name, _ := p["name"].(string)
  781. if len([]rune(name)) > 100 {
  782. p["name"] = fmt.Sprint([]rune(name[:100]))
  783. }
  784. ps[i] = p
  785. }
  786. thisinfo.Package[k] = ps
  787. }
  788. }
  789. return thisinfo
  790. }
  791. func (p *ProjectTask) updateJudge(tmp map[string]interface{}, info *Info) {
  792. index := -1
  793. pInfoId := ""
  794. p.AllIdsMapLock.Lock()
  795. F:
  796. for k, ID := range p.AllIdsMap {
  797. for i, id := range ID.P.Ids {
  798. if info.Id == id {
  799. pInfoId = k
  800. index = i
  801. break F
  802. }
  803. }
  804. }
  805. p.AllIdsMapLock.Unlock()
  806. //未找到招标信息
  807. if index == -1 {
  808. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  809. p.currentTime = info.Publishtime
  810. p.startProjectMerge(info, tmp)
  811. }
  812. } else {
  813. tmpPro := MongoTool.FindById(ProjectColl, pInfoId)
  814. infoList := []interface{}(tmpPro["list"].(primitive.A))
  815. infoMap := infoList[index].(map[string]interface{})
  816. modifyMap, f := modifyEle(infoMap, tmp)
  817. //projecthref字段
  818. jsonData := tmp["jsondata"].(map[string]interface{})
  819. if jsonData != nil && jsonData["projecthref"] != nil {
  820. proHref := jsonData["projecthref"].(string)
  821. tmp["projecthref"] = proHref
  822. p.mapHrefLock.Lock()
  823. pid := p.mapHref[proHref]
  824. p.mapHrefLock.Unlock()
  825. if pid == pInfoId {
  826. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  827. return
  828. }
  829. }
  830. if f {
  831. //合并、修改
  832. log.Println("合并修改更新", "----------------------------")
  833. p.mergeAndModify(pInfoId, index, info, tmp, tmpPro, modifyMap)
  834. } else {
  835. //修改
  836. log.Println("修改更新", "----------------------------")
  837. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  838. }
  839. }
  840. }
  841. var Elements = []string{
  842. "projectname",
  843. "projectcode",
  844. "agency",
  845. "budget",
  846. "bidamount",
  847. "buyerperson",
  848. "area",
  849. "city",
  850. "publishtime",
  851. }
  852. /**
  853. 判断修改的字段是否是影响合并流程的要素字段
  854. */
  855. func modifyEle(tmpPro map[string]interface{}, tmp map[string]interface{}) (map[string]interface{}, bool) {
  856. modifyMap := map[string]interface{}{}
  857. for k := range tmpPro {
  858. for k1 := range tmp {
  859. if k == k1 && tmpPro[k] != tmp[k1] {
  860. modifyMap[k] = tmp[k1]
  861. break
  862. }
  863. }
  864. }
  865. for k := range modifyMap {
  866. for _, str := range Elements {
  867. if k == str {
  868. return modifyMap, true
  869. }
  870. }
  871. }
  872. delete(modifyMap, "_id")
  873. return modifyMap, false
  874. }
  875. //补全位置信息
  876. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  877. area := util.ObjToString(tmp["area"])
  878. city := util.ObjToString(tmp["city"])
  879. if area != "" && city != "" {
  880. return
  881. }
  882. tmpSite := util.ObjToString(tmp["site"])
  883. if tmpSite == "" {
  884. return
  885. }
  886. p.mapSiteLock.Lock()
  887. defer p.mapSiteLock.Unlock()
  888. site := p.mapSite[tmpSite]
  889. if site != nil {
  890. if area != "" {
  891. if area == "全国" {
  892. tmp["area"] = site.Area
  893. tmp["city"] = site.City
  894. tmp["district"] = site.District
  895. return
  896. }
  897. if area != site.Area {
  898. return
  899. } else {
  900. if site.City != "" {
  901. tmp["area"] = site.Area
  902. tmp["city"] = site.City
  903. tmp["district"] = site.District
  904. }
  905. }
  906. } else {
  907. tmp["area"] = site.Area
  908. tmp["city"] = site.City
  909. tmp["district"] = site.District
  910. return
  911. }
  912. }
  913. }
  914. //从数组中删除元素
  915. func deleteSlice(arr []string, v, stype string) []string {
  916. for k, v1 := range arr {
  917. if v1 == v {
  918. ts := time.Now().Unix()
  919. arr = append(arr[:k], arr[k+1:]...)
  920. rt := time.Now().Unix() - ts
  921. if rt > 0 {
  922. log.Println("deleteSlice", stype, rt, v, len(arr))
  923. }
  924. return arr
  925. }
  926. }
  927. return arr
  928. }
  929. //校验评审专家
  930. func ClearRp(tmp []string) []string {
  931. arrTmp := []string{}
  932. for _, v := range tmp {
  933. // 汉字过滤(全汉字,2-4个字)
  934. if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
  935. continue
  936. }
  937. //黑名单过滤
  938. if BlaskListMap[v] {
  939. continue
  940. }
  941. arrTmp = append(arrTmp, v)
  942. }
  943. return arrTmp
  944. }
  945. func QyFilter(name, stype string) string {
  946. name = strings.ReplaceAll(name, " ", "")
  947. preReg := PreRegexp[stype]
  948. for _, v := range preReg {
  949. name = v.ReplaceAllString(name, "")
  950. }
  951. backReg := BackRegexp[stype]
  952. for _, v := range backReg {
  953. name = v.ReplaceAllString(name, "")
  954. }
  955. backRepReg := BackRepRegexp[stype]
  956. for _, v := range backRepReg {
  957. name = v.regs.ReplaceAllString(name, v.repstr)
  958. }
  959. blackReg := BlackRegexp[stype]
  960. for _, v := range blackReg {
  961. if v.MatchString(name) {
  962. name = ""
  963. break
  964. }
  965. }
  966. if !regexp.MustCompile("[\\p{Han}]{4,}").MatchString(name) {
  967. name = ""
  968. }
  969. if utf8.RuneCountInString(name) > 60 {
  970. name = ""
  971. }
  972. return name
  973. }