task.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "log"
  7. mu "mfw/util"
  8. "qfw/util"
  9. "regexp"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/robfig/cron"
  14. "go.mongodb.org/mongo-driver/bson/primitive"
  15. )
  16. /**
  17. 任务入口
  18. 全量、增量合并
  19. 更新、插入,内存清理
  20. 转换成info对象
  21. **/
  22. //项目合并对象
  23. type ProjectTask struct {
  24. InitMinTime int64 //最小时间,小于0的处理一次
  25. name string
  26. thread int //线程数
  27. //查找锁
  28. findLock sync.Mutex
  29. wg sync.WaitGroup
  30. //map锁
  31. AllIdsMapLock sync.Mutex
  32. //对应的id
  33. AllIdsMap map[string]*ID
  34. //采购单位、项目名称、项目编号
  35. mapPb, mapPn, mapPc map[string]*Key
  36. //流程数据 字段相同,直接合并
  37. mapHref map[string]string
  38. mapHrefLock sync.Mutex
  39. //站点
  40. mapSite map[string]*Site
  41. mapSiteLock sync.Mutex
  42. //bidtype、bidstatus 锁
  43. mapBidLock sync.Mutex
  44. //更新或新增通道
  45. updatePool chan []map[string]interface{}
  46. //savePool chan map[string]interface{}
  47. //saveSign, updateSign chan bool
  48. //表名
  49. coll string
  50. //当前状态是全量还是增量
  51. currentType string //当前是跑全量还是跑增量
  52. //
  53. clearContimes int
  54. //当前时间
  55. currentTime int64
  56. //保存长度
  57. saveSize int
  58. pici int64
  59. validTime int64
  60. statusTime int64
  61. //结果时间的更新 最近两天的公告不再更新jgtime
  62. jgTime int64
  63. // LockPool chan *sync.Mutex
  64. // LockPoolLock sync.Mutex
  65. // m1, m23, m4 map[int]int
  66. // l1, l23, l4 map[int]*sync.Mutex
  67. Brun bool
  68. }
  69. func NewPT() *ProjectTask {
  70. p := &ProjectTask{
  71. InitMinTime: int64(1325347200),
  72. name: "全/增量对象",
  73. thread: Thread,
  74. updatePool: make(chan []map[string]interface{}, 5000),
  75. //savePool: make(chan map[string]interface{}, 2000),
  76. wg: sync.WaitGroup{},
  77. AllIdsMap: make(map[string]*ID, 5000000),
  78. mapPb: make(map[string]*Key, 1500000),
  79. mapPn: make(map[string]*Key, 5000000),
  80. mapPc: make(map[string]*Key, 5000000),
  81. mapHref: make(map[string]string, 1500000),
  82. mapSite: make(map[string]*Site, 1000000),
  83. saveSize: 100,
  84. //saveSign: make(chan bool, 1),
  85. //updateSign: make(chan bool, 1),
  86. coll: ProjectColl,
  87. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  88. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  89. jgTime: int64(util.IntAllDef(3, 3) * 86400),
  90. }
  91. return p
  92. }
  93. var P_QL *ProjectTask
  94. var sp = make(chan bool, 5)
  95. //初始化全量合并对象
  96. func init() {
  97. P_QL = NewPT()
  98. log.Println(len(P_QL.updatePool))
  99. go P_QL.updateAllQueue()
  100. go P_QL.clearMem()
  101. }
  102. func (p *ProjectTask) updateAllQueue() {
  103. arru := make([][]map[string]interface{}, p.saveSize)
  104. indexu := 0
  105. for {
  106. select {
  107. case v := <-p.updatePool:
  108. arru[indexu] = v
  109. indexu++
  110. if indexu == p.saveSize {
  111. sp <- true
  112. go func(arru [][]map[string]interface{}) {
  113. defer func() {
  114. <-sp
  115. }()
  116. MongoTool.UpSertBulk(p.coll, arru...)
  117. }(arru)
  118. arru = make([][]map[string]interface{}, p.saveSize)
  119. indexu = 0
  120. }
  121. case <-time.After(1000 * time.Millisecond):
  122. if indexu > 0 {
  123. sp <- true
  124. go func(arru [][]map[string]interface{}) {
  125. defer func() {
  126. <-sp
  127. }()
  128. MongoTool.UpSertBulk(p.coll, arru...)
  129. }(arru[:indexu])
  130. arru = make([][]map[string]interface{}, p.saveSize)
  131. indexu = 0
  132. }
  133. }
  134. }
  135. }
  136. //项目合并内存更新
  137. func (p *ProjectTask) clearMem() {
  138. c := cron.New()
  139. //在内存中保留最近6个月的信息
  140. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  141. _ = c.AddFunc("50 0/5 * * * *", func() {
  142. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
  143. SingleClear = 1
  144. //跳过的次数清零
  145. p.clearContimes = 0
  146. //信息进入查找对比全局锁
  147. p.findLock.Lock()
  148. //defer p.findLock.Unlock()
  149. //合并进行的任务都完成
  150. p.wg.Wait()
  151. //遍历id
  152. //所有内存中的项目信息
  153. p.AllIdsMapLock.Lock()
  154. p.mapHrefLock.Lock()
  155. log.Println("清除开始")
  156. //清除计数
  157. clearNum := 0
  158. for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
  159. v := p.AllIdsMap[pid]
  160. if p.currentTime-v.P.LastTime > p.validTime {
  161. delete(p.mapHref, kHref)
  162. }
  163. }
  164. for k, v := range p.AllIdsMap {
  165. if p.currentTime-v.P.LastTime > p.validTime {
  166. clearNum++
  167. //删除id的map
  168. delete(p.AllIdsMap, k)
  169. //删除pb
  170. if v.P.Buyer != "" {
  171. ids := p.mapPb[v.P.Buyer]
  172. if ids != nil {
  173. ids.Lock.Lock()
  174. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  175. if len(ids.Arr) == 0 {
  176. delete(p.mapPb, v.P.Buyer)
  177. }
  178. ids.Lock.Unlock()
  179. }
  180. }
  181. //删除mapPn
  182. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  183. if vn != "" {
  184. ids := p.mapPn[vn]
  185. if ids != nil {
  186. ids.Lock.Lock()
  187. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  188. if len(ids.Arr) == 0 {
  189. delete(p.mapPn, vn)
  190. }
  191. ids.Lock.Unlock()
  192. }
  193. }
  194. }
  195. //删除mapPc
  196. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  197. if vn != "" {
  198. ids := p.mapPc[vn]
  199. if ids != nil {
  200. ids.Lock.Lock()
  201. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  202. if len(ids.Arr) == 0 {
  203. delete(p.mapPc, vn)
  204. }
  205. ids.Lock.Unlock()
  206. }
  207. }
  208. }
  209. v = nil
  210. }
  211. }
  212. p.mapHrefLock.Unlock()
  213. p.AllIdsMapLock.Unlock()
  214. p.findLock.Unlock()
  215. SingleClear = 0
  216. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
  217. } else {
  218. p.clearContimes++
  219. }
  220. })
  221. c.Start()
  222. }
  223. //全量合并
  224. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  225. defer util.Catch()
  226. p.thread = util.IntAllDef(Thread, 4)
  227. q, _ := udpInfo["query"].(map[string]interface{})
  228. if q == nil {
  229. q = map[string]interface{}{}
  230. lteid, _ := udpInfo["lteid"].(string)
  231. var idmap map[string]interface{}
  232. if len(lteid) > 15 {
  233. idmap = map[string]interface{}{
  234. "$lte": StringTOBsonId(lteid),
  235. }
  236. }
  237. gtid, _ := udpInfo["gtid"].(string)
  238. if len(gtid) > 15 {
  239. if idmap == nil {
  240. idmap = map[string]interface{}{}
  241. }
  242. idmap["$gte"] = StringTOBsonId(gtid)
  243. }
  244. if idmap != nil {
  245. q["_id"] = idmap
  246. }
  247. }
  248. //生成查询语句执行
  249. log.Println("查询语句:", q)
  250. p.enter(MongoTool.DbName, ExtractColl, q)
  251. }
  252. //增量合并
  253. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  254. defer util.Catch()
  255. //1、检查pubilshtime索引
  256. db, _ := udpInfo["db"].(string)
  257. if db == "" {
  258. db = MongoTool.DbName
  259. }
  260. coll, _ := udpInfo["coll"].(string)
  261. if coll == "" {
  262. coll = ExtractColl
  263. }
  264. thread := util.IntAllDef(Thread, 4)
  265. if thread > 0 {
  266. p.thread = thread
  267. }
  268. //开始id和结束id
  269. q, _ := udpInfo["query"].(map[string]interface{})
  270. gtid := udpInfo["gtid"].(string)
  271. lteid := udpInfo["lteid"].(string)
  272. if q == nil {
  273. q = map[string]interface{}{
  274. "_id": map[string]interface{}{
  275. "$gt": StringTOBsonId(gtid),
  276. "$lte": StringTOBsonId(lteid),
  277. },
  278. }
  279. }
  280. if q != nil {
  281. //生成查询语句执行
  282. p.enter(db, coll, q)
  283. }
  284. if udpInfo["stop"] == nil {
  285. for i := 0; i < 5; i++ {
  286. sp <- true
  287. }
  288. for i := 0; i < 5; i++ {
  289. <-sp
  290. }
  291. log.Println("保存完成,生索引", p.pici)
  292. time.Sleep(5 * time.Second)
  293. nextNode(udpInfo, p.pici)
  294. }
  295. }
  296. //招标字段更新
  297. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  298. defer util.Catch()
  299. db, _ := udpInfo["db"].(string)
  300. if db == "" {
  301. db = MongoTool.DbName
  302. }
  303. coll, _ := udpInfo["coll"].(string)
  304. if coll == "" {
  305. coll = ExtractColl
  306. }
  307. thread := util.IntAllDef(Thread, 4)
  308. if thread > 0 {
  309. p.thread = thread
  310. }
  311. q, _ := udpInfo["query"].(map[string]interface{})
  312. gtid := udpInfo["gtid"].(string)
  313. lteid := udpInfo["lteid"].(string)
  314. if q == nil {
  315. q = map[string]interface{}{
  316. "_id": map[string]interface{}{
  317. "$gte": StringTOBsonId(gtid),
  318. "$lte": StringTOBsonId(lteid),
  319. },
  320. "is_m": 1,
  321. }
  322. }
  323. log.Println("查询语句:", q)
  324. p.enter(db, coll, q)
  325. }
  326. func (p *ProjectTask) taskQuery() {
  327. defer util.Catch()
  328. count := 0
  329. sess := MongoTool.GetMgoConn()
  330. defer MongoTool.DestoryMongoConn(sess)
  331. fields := map[string]interface{} {"budget": 1, "bidamount": 1, "package": 1}
  332. ms := sess.DB(MongoTool.DbName).C(UpdateColl).Find(map[string]interface{}{}).Select(fields)
  333. query := ms.Iter()
  334. L:
  335. for {
  336. tmp := make(map[string]interface{})
  337. if query.Next(&tmp) {
  338. lastid := tmp["_id"]
  339. tmp["id"] = tmp["_id"].(primitive.ObjectID).Hex();
  340. if count%1000 == 0 {
  341. log.Println("current modify", count, lastid)
  342. }
  343. p.taskUpdateMoney(tmp)
  344. count++
  345. } else {
  346. break L
  347. }
  348. }
  349. }
  350. //修改公告信息的预算/中标金额
  351. func (p *ProjectTask) taskUpdateMoney(udpInfo map[string]interface{}) {
  352. defer util.Catch()
  353. id := udpInfo["id"].(string)
  354. budget := util.Float64All(udpInfo["budget"])
  355. bidamount := util.Float64All(udpInfo["bidamount"])
  356. client := Es.GetEsConn()
  357. defer Es.DestoryEsConn(client)
  358. esquery := `{"query": {"bool": {"must": [{"term": {"list.infoid": "`+id+`"}}]}}}`
  359. data := Es.Get(Index, Itype, esquery)
  360. if len(*data) > 0 {
  361. pid := util.ObjToString((*data)[0]["_id"])
  362. pro := MongoTool.FindById(ProjectColl, pid)
  363. if len(pro) == 0 {
  364. util.Debug("未找到项目, pid=", pid)
  365. return
  366. }
  367. var info *map[string]interface{}
  368. for _, v := range []interface{}(pro["list"].(primitive.A)){
  369. v1 := v.(map[string]interface{})
  370. if util.ObjToString(v1["infoid"]) == id {
  371. info = util.ObjToMap(v)
  372. infoField := util.ObjToMap(pro["infofield"])
  373. if udpInfo["budget"] != nil{
  374. util.Debug("update-------", (*info)["infoid"])
  375. //if pro["budget"] == (*info)["budget"] {
  376. // pro["budget"] = budget
  377. //}
  378. //多包中的金额
  379. if util.IntAll(pro["multipackage"]) == 1 {
  380. if packages, ok := pro["package"].(map[string]interface{}); ok {
  381. M :
  382. for k, v := range packages{
  383. v1 := []interface{}(v.(primitive.A))
  384. for _, v2 := range v1{
  385. v3 := v2.(map[string]interface{})
  386. if util.ObjToString(v3["infoid"]) == id {
  387. if v3["budget"] != nil {
  388. pkg := udpInfo["package"].(map[string]interface{})
  389. tmp := pkg[k].(map[string]interface{})
  390. v3["budget"] = tmp["budget"]
  391. }
  392. }else {
  393. break M
  394. }
  395. }
  396. }
  397. }
  398. }
  399. (*info)["budget"] = budget
  400. (*util.ObjToMap((*infoField)[id]))["budget"] = budget
  401. if pro["sortprice"] == (*info)["budget"] {
  402. pro["sortprice"] = budget
  403. }
  404. }else {
  405. delete(*info, "budget")
  406. }
  407. if udpInfo["bidamount"] != nil{
  408. //if pro["bidamount"] == (*info)["bidamount"] {
  409. // pro["bidamount"] = bidamount
  410. //}
  411. v1["bidamount"] = bidamount
  412. if util.IntAll(pro["multipackage"]) == 1 {
  413. if packages, ok := pro["package"].(map[string]interface{}); ok {
  414. for k, v := range packages{
  415. v1 := []interface{}(v.(primitive.A))
  416. for _, v2 := range v1{
  417. v3 := v2.(map[string]interface{})
  418. if util.ObjToString(v3["infoid"]) == id {
  419. if v3["bidamount"] != nil {
  420. pkg := udpInfo["package"].(map[string]interface{})
  421. tmp := pkg[k].(map[string]interface{})
  422. v3["bidamount"] = tmp["bidamount"]
  423. }
  424. }
  425. }
  426. }
  427. }
  428. }
  429. (*info)["bidamount"] = bidamount
  430. (*util.ObjToMap((*infoField)[id]))["bidamount"] = bidamount
  431. if pro["sortprice"] == (*info)["bidamount"] {
  432. pro["sortprice"] = bidamount
  433. }
  434. }else {
  435. delete(*info, "bidamount")
  436. }
  437. break
  438. }
  439. }
  440. var project *ProjectInfo
  441. var pInfo *Info
  442. bys, _ := json.Marshal(pro)
  443. _ = json.Unmarshal(bys, &project)
  444. bys1, _ := json.Marshal(info)
  445. _ = json.Unmarshal(bys1, &pInfo)
  446. if len(project.Ids) > 1 {
  447. CountAmount(project, pInfo, *info)
  448. if project.Budget > 0 {
  449. pro["budget"] = project.Budget
  450. }
  451. if project.Bidamount > 0 {
  452. pro["bidamount"] = project.Bidamount
  453. }
  454. }else {
  455. pro["budget"] = budget
  456. pro["bidamount"] = bidamount
  457. if budget > bidamount {
  458. pro["sortprice"] = budget
  459. }else {
  460. pro["sortprice"] = bidamount
  461. }
  462. }
  463. set := map[string]interface{}{
  464. "$set": pro,
  465. }
  466. MongoTool.UpdateById(ProjectColl, pid, set)
  467. loadStart := util.Int64All(Sysconfig["loadStart"])
  468. if loadStart > -1 && project.LastTime >loadStart {
  469. util.Debug("内存中存在该项目信息", project.Id)
  470. p.AllIdsMapLock.Lock()
  471. p.AllIdsMap[pid].P = project
  472. p.AllIdsMapLock.Unlock()
  473. }
  474. bol := Es.DelById(Index, Itype, pid)
  475. if bol {
  476. util.Debug("删除es索引, pid------", pid)
  477. //调udp生索引
  478. by, _ := json.Marshal(map[string]interface{}{
  479. "query": map[string]interface{}{
  480. "_id": bson.M{
  481. "$gte": pid,
  482. "$lte": pid,
  483. }},
  484. "stype": "project",
  485. })
  486. util.Debug(string(by))
  487. _ = udpclient.WriteUdp(by, mu.OP_TYPE_DATA, toaddr[1])
  488. }
  489. }
  490. }
  491. func FindMoney(key string, project map[string]interface{}) float64 {
  492. money := -0.1
  493. for i, v := range []interface{}(project["list"].(primitive.A)){
  494. v1 := v.(map[string]interface{})
  495. if i == 0 {
  496. if v1[key] != nil {
  497. money = util.Float64All(v1[key])
  498. }
  499. }else {
  500. if v1[key] != nil && util.Float64All(v1[key]) > money {
  501. money = util.Float64All(v1[key])
  502. }
  503. }
  504. }
  505. return money
  506. }
  507. func StringTOBsonId(id string) primitive.ObjectID {
  508. objectId, _ := primitive.ObjectIDFromHex(id)
  509. return objectId
  510. }
  511. //通知下个节点nextNode
  512. func nextNode(mapInfo map[string]interface{}, pici int64) {
  513. mapInfo["stype"] = "project"
  514. mapInfo["query"] = map[string]interface{}{
  515. "pici": pici,
  516. }
  517. key := fmt.Sprintf("%d-%s-%d", pici, "project", 0)
  518. mapInfo["key"] = key
  519. datas, _ := json.Marshal(mapInfo)
  520. node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0}
  521. udptaskmap.Store(key, node)
  522. _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, toaddr[0])
  523. }
  524. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  525. defer util.Catch()
  526. defer func() {
  527. p.Brun = false
  528. }()
  529. p.Brun = true
  530. count, taskcount := 0, 0
  531. pool := make(chan bool, p.thread)
  532. log.Println("start project", q)
  533. sess := MongoTool.GetMgoConn()
  534. defer MongoTool.DestoryMongoConn(sess)
  535. infoPool := make(chan map[string]interface{}, 2000)
  536. over := make(chan bool)
  537. go func() {
  538. L:
  539. for {
  540. select {
  541. case tmp := <-infoPool:
  542. pool <- true
  543. taskcount++
  544. go func(tmp map[string]interface{}) {
  545. defer func() {
  546. <-pool
  547. }()
  548. if util.IntAll(tmp["repeat"]) == 0 {
  549. if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 1 {
  550. //增量 dataging为1不参与合并
  551. return
  552. }
  553. p.fillInPlace(tmp)
  554. info := ParseInfo(tmp)
  555. p.currentTime = info.Publishtime
  556. if p.currentType == "updateInfo" {
  557. //招标信息更改合并
  558. p.updateJudge(tmp, info)
  559. } else {
  560. //普通合并
  561. p.CommonMerge(tmp, info)
  562. }
  563. } else {
  564. //信息错误,进行更新
  565. }
  566. }(tmp)
  567. case <-over:
  568. break L
  569. }
  570. }
  571. }()
  572. fields := map[string]interface{} {"area": 1, "city": 1, "district": 1, "comeintime": 1, "publishtime": 1, "bidopentime": 1, "title": 1, "projectname": 1, "href": 1,
  573. "projectcode": 1, "buyerclass": 1, "winner": 1, "s_winner": 1, "buyer": 1, "buyerperson": 1, "buyertel": 1, "infoformat": 1, "toptype": 1, "subtype": 1, "spidercode": 1,
  574. "site": 1, "topscopeclass": 1, "subscopeclass": 1, "bidamount": 1, "budget": 1, "agency": 1, "package": 1, "jsondata": 1, "review_experts": 1, "purchasing": 1, "winnerorder": 1}
  575. ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
  576. if Sysconfig["hints"] != nil {
  577. ms.Hint(Sysconfig["hints"])
  578. }
  579. query := ms.Iter()
  580. //query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
  581. var lastid interface{}
  582. L:
  583. for {
  584. select {
  585. case <-queryClose:
  586. log.Println("receive interrupt sign")
  587. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  588. queryCloseOver <- true
  589. break L
  590. default:
  591. tmp := make(map[string]interface{})
  592. if query.Next(&tmp) {
  593. lastid = tmp["_id"]
  594. if count%10000 == 0 {
  595. log.Println("current", count, lastid)
  596. }
  597. infoPool <- tmp
  598. count++
  599. } else {
  600. break L
  601. }
  602. }
  603. }
  604. time.Sleep(5 * time.Second)
  605. over <- true
  606. //阻塞
  607. for n := 0; n < p.thread; n++ {
  608. pool <- true
  609. }
  610. log.Println("所有线程执行完成...", count, taskcount)
  611. }
  612. var (
  613. //从标题获取项目编号
  614. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  615. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  616. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  617. //项目编号过滤
  618. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  619. //项目编号只是数字或只是字母4个以下
  620. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  621. //纯数字或纯字母
  622. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  623. //含分包词,招标未识别分包 合并到一个项目
  624. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  625. )
  626. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  627. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  628. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  629. proHref := util.ObjToString(jsonData["projecthref"])
  630. if jsonData != nil && proHref != "" {
  631. //projectHref字段合并
  632. tmp["projecthref"] = proHref
  633. p.mapHrefLock.Lock()
  634. pid := p.mapHref[proHref]
  635. p.mapHrefLock.Unlock()
  636. if pid != "" {
  637. p.AllIdsMapLock.Lock()
  638. comparePro := p.AllIdsMap[pid].P
  639. p.AllIdsMapLock.Unlock()
  640. _, ex := p.CompareStatus(comparePro, info)
  641. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  642. } else {
  643. id, p1 := p.NewProject(tmp, info)
  644. p.mapHrefLock.Lock()
  645. p.mapHref[proHref] = id
  646. p.mapHrefLock.Unlock()
  647. p.AllIdsMapLock.Lock()
  648. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  649. p.AllIdsMapLock.Unlock()
  650. }
  651. } else {
  652. //项目合并
  653. p.startProjectMerge(info, tmp)
  654. }
  655. } else {
  656. //项目合并
  657. p.startProjectMerge(info, tmp)
  658. }
  659. }
  660. }
  661. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  662. bys, _ := json.Marshal(tmp)
  663. var thisinfo *Info
  664. _ = json.Unmarshal(bys, &thisinfo)
  665. if thisinfo == nil {
  666. return nil
  667. }
  668. if len(thisinfo.Topscopeclass) == 0 {
  669. thisinfo.Topscopeclass = []string{}
  670. }
  671. if len(thisinfo.Subscopeclass) == 0 {
  672. thisinfo.Subscopeclass = []string{}
  673. }
  674. if thisinfo.SubType == "" {
  675. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  676. }
  677. //从标题中查找项目编号
  678. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  679. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  680. thisinfo.PTC = res[1]
  681. } else {
  682. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  683. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  684. thisinfo.PTC = res[3]
  685. } else {
  686. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  687. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  688. thisinfo.PTC = res[1]
  689. }
  690. }
  691. }
  692. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  693. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  694. if thisinfo.ProjectName != "" {
  695. thisinfo.pnbval++
  696. }
  697. }
  698. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  699. if thisinfo.ProjectCode != "" {
  700. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  701. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  702. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  703. }
  704. } else {
  705. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  706. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  707. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  708. }
  709. }
  710. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  711. thisinfo.pnbval++
  712. }
  713. }
  714. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  715. thisinfo.PTC = ""
  716. }
  717. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  718. thisinfo.pnbval++
  719. } else {
  720. thisinfo.Buyer = ""
  721. }
  722. //winners整理
  723. winner, _ := tmp["winner"].(string)
  724. m1 := map[string]bool{}
  725. winners := []string{}
  726. if winner != "" {
  727. m1[winner] = true
  728. winners = append(winners, winner)
  729. }
  730. packageM, _ := tmp["package"].(map[string]interface{})
  731. if packageM != nil {
  732. thisinfo.HasPackage = true
  733. for _, p := range packageM {
  734. pm, _ := p.(map[string]interface{})
  735. pw, _ := pm["winner"].(string)
  736. if pw != "" && !m1[pw] {
  737. m1[pw] = true
  738. winners = append(winners, pw)
  739. }
  740. }
  741. }
  742. thisinfo.Winners = winners
  743. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  744. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  745. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  746. //处理分包中数据异常问题
  747. for k, tmp := range thisinfo.Package {
  748. if ps, ok := tmp.([]map[string]interface{}); ok {
  749. for i, p := range ps {
  750. name, _ := p["name"].(string)
  751. if len([]rune(name)) > 100 {
  752. p["name"] = fmt.Sprint([]rune(name[:100]))
  753. }
  754. ps[i] = p
  755. }
  756. thisinfo.Package[k] = ps
  757. }
  758. }
  759. return thisinfo
  760. }
  761. func (p *ProjectTask) updateJudge(tmp map[string]interface{}, info *Info) {
  762. index := -1
  763. pInfoId := ""
  764. p.AllIdsMapLock.Lock()
  765. F:
  766. for k, ID := range p.AllIdsMap {
  767. for i, id := range ID.P.Ids {
  768. if info.Id == id {
  769. pInfoId = k
  770. index = i
  771. break F
  772. }
  773. }
  774. }
  775. p.AllIdsMapLock.Unlock()
  776. //未找到招标信息
  777. if index == -1 {
  778. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  779. p.currentTime = info.Publishtime
  780. p.startProjectMerge(info, tmp)
  781. }
  782. } else {
  783. tmpPro := MongoTool.FindById(ProjectColl, pInfoId)
  784. infoList := []interface{}(tmpPro["list"].(primitive.A))
  785. infoMap := infoList[index].(map[string]interface{})
  786. modifyMap, f := modifyEle(infoMap, tmp)
  787. //projecthref字段
  788. jsonData := tmp["jsondata"].(map[string]interface{})
  789. if jsonData != nil && jsonData["projecthref"] != nil {
  790. proHref := jsonData["projecthref"].(string)
  791. tmp["projecthref"] = proHref
  792. p.mapHrefLock.Lock()
  793. pid := p.mapHref[proHref]
  794. p.mapHrefLock.Unlock()
  795. if pid == pInfoId {
  796. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  797. return
  798. }
  799. }
  800. if f {
  801. //合并、修改
  802. log.Println("合并修改更新", "----------------------------")
  803. p.mergeAndModify(pInfoId, index, info, tmp, tmpPro, modifyMap)
  804. } else {
  805. //修改
  806. log.Println("修改更新", "----------------------------")
  807. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  808. }
  809. }
  810. }
  811. var Elements = []string{
  812. "projectname",
  813. "projectcode",
  814. "agency",
  815. "budget",
  816. "bidamount",
  817. "buyerperson",
  818. "area",
  819. "city",
  820. "publishtime",
  821. }
  822. /**
  823. 判断修改的字段是否是影响合并流程的要素字段
  824. */
  825. func modifyEle(tmpPro map[string]interface{}, tmp map[string]interface{}) (map[string]interface{}, bool) {
  826. modifyMap := map[string]interface{}{}
  827. for k := range tmpPro {
  828. for k1 := range tmp {
  829. if k == k1 && tmpPro[k] != tmp[k1] {
  830. modifyMap[k] = tmp[k1]
  831. break
  832. }
  833. }
  834. }
  835. for k := range modifyMap {
  836. for _, str := range Elements {
  837. if k == str {
  838. return modifyMap, true
  839. }
  840. }
  841. }
  842. delete(modifyMap, "_id")
  843. return modifyMap, false
  844. }
  845. //补全位置信息
  846. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  847. area := util.ObjToString(tmp["area"])
  848. city := util.ObjToString(tmp["city"])
  849. if area != "" && city != "" {
  850. return
  851. }
  852. tmpSite := util.ObjToString(tmp["site"])
  853. if tmpSite == "" {
  854. return
  855. }
  856. p.mapSiteLock.Lock()
  857. defer p.mapSiteLock.Unlock()
  858. site := p.mapSite[tmpSite]
  859. if site != nil {
  860. if area != "" {
  861. if area == "全国" {
  862. tmp["area"] = site.Area
  863. tmp["city"] = site.City
  864. tmp["district"] = site.District
  865. return
  866. }
  867. if area != site.Area {
  868. return
  869. } else {
  870. if site.City != "" {
  871. tmp["area"] = site.City
  872. }
  873. }
  874. } else {
  875. tmp["area"] = site.Area
  876. tmp["city"] = site.City
  877. return
  878. }
  879. }
  880. }
  881. //从数组中删除元素
  882. func deleteSlice(arr []string, v, stype string) []string {
  883. for k, v1 := range arr {
  884. if v1 == v {
  885. ts := time.Now().Unix()
  886. arr = append(arr[:k], arr[k+1:]...)
  887. rt := time.Now().Unix() - ts
  888. if rt > 0 {
  889. log.Println("deleteSlice", stype, rt, v, len(arr))
  890. }
  891. return arr
  892. }
  893. }
  894. return arr
  895. }