task.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "log"
  7. mu "mfw/util"
  8. "qfw/util"
  9. "regexp"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/robfig/cron"
  15. "go.mongodb.org/mongo-driver/bson/primitive"
  16. )
  17. /**
  18. 任务入口
  19. 全量、增量合并
  20. 更新、插入,内存清理
  21. 转换成info对象
  22. **/
  23. //项目合并对象
  24. type ProjectTask struct {
  25. InitMinTime int64 //最小时间,小于0的处理一次
  26. name string
  27. thread int //线程数
  28. //查找锁
  29. findLock sync.Mutex
  30. wg sync.WaitGroup
  31. //map锁
  32. AllIdsMapLock sync.Mutex
  33. //对应的id
  34. AllIdsMap map[string]*ID
  35. //采购单位、项目名称、项目编号
  36. mapPb, mapPn, mapPc map[string]*Key
  37. //流程数据 字段相同,直接合并
  38. mapHref map[string]string
  39. mapHrefLock sync.Mutex
  40. //站点
  41. mapSite map[string]*Site
  42. mapSiteLock sync.Mutex
  43. //bidtype、bidstatus 锁
  44. mapBidLock sync.Mutex
  45. //更新或新增通道
  46. updatePool chan []map[string]interface{}
  47. //savePool chan map[string]interface{}
  48. //saveSign, updateSign chan bool
  49. //表名
  50. coll string
  51. //当前状态是全量还是增量
  52. currentType string //当前是跑全量还是跑增量
  53. //
  54. clearContimes int
  55. //当前时间
  56. currentTime int64
  57. //保存长度
  58. saveSize int
  59. pici int64
  60. validTime int64
  61. statusTime int64
  62. //结果时间的更新 最近两天的公告不再更新jgtime
  63. jgTime int64
  64. // LockPool chan *sync.Mutex
  65. // LockPoolLock sync.Mutex
  66. // m1, m23, m4 map[int]int
  67. // l1, l23, l4 map[int]*sync.Mutex
  68. Brun bool
  69. }
  70. func NewPT() *ProjectTask {
  71. p := &ProjectTask{
  72. InitMinTime: int64(1325347200),
  73. name: "全/增量对象",
  74. thread: Thread,
  75. updatePool: make(chan []map[string]interface{}, 5000),
  76. //savePool: make(chan map[string]interface{}, 2000),
  77. wg: sync.WaitGroup{},
  78. AllIdsMap: make(map[string]*ID, 5000000),
  79. mapPb: make(map[string]*Key, 1500000),
  80. mapPn: make(map[string]*Key, 5000000),
  81. mapPc: make(map[string]*Key, 5000000),
  82. mapHref: make(map[string]string, 1500000),
  83. mapSite: make(map[string]*Site, 1000000),
  84. saveSize: 100,
  85. //saveSign: make(chan bool, 1),
  86. //updateSign: make(chan bool, 1),
  87. coll: ProjectColl,
  88. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  89. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  90. jgTime: int64(util.IntAllDef(3, 3) * 86400),
  91. }
  92. return p
  93. }
  94. var P_QL *ProjectTask
  95. var sp = make(chan bool, 5)
  96. //初始化全量合并对象
  97. func init() {
  98. util.Debug("task init...")
  99. P_QL = NewPT()
  100. log.Println(len(P_QL.updatePool))
  101. go P_QL.updateAllQueue()
  102. go P_QL.clearMem()
  103. util.Debug("task init end")
  104. }
  105. func (p *ProjectTask) updateAllQueue() {
  106. arru := make([][]map[string]interface{}, p.saveSize)
  107. indexu := 0
  108. for {
  109. select {
  110. case v := <-p.updatePool:
  111. arru[indexu] = v
  112. indexu++
  113. if indexu == p.saveSize {
  114. sp <- true
  115. go func(arru [][]map[string]interface{}) {
  116. defer func() {
  117. <-sp
  118. }()
  119. MongoTool.UpSertBulk(p.coll, arru...)
  120. }(arru)
  121. arru = make([][]map[string]interface{}, p.saveSize)
  122. indexu = 0
  123. }
  124. case <-time.After(1000 * time.Millisecond):
  125. if indexu > 0 {
  126. sp <- true
  127. go func(arru [][]map[string]interface{}) {
  128. defer func() {
  129. <-sp
  130. }()
  131. MongoTool.UpSertBulk(p.coll, arru...)
  132. }(arru[:indexu])
  133. arru = make([][]map[string]interface{}, p.saveSize)
  134. indexu = 0
  135. }
  136. }
  137. }
  138. }
  139. //项目合并内存更新
  140. func (p *ProjectTask) clearMem() {
  141. c := cron.New()
  142. //在内存中保留最近6个月的信息
  143. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  144. _ = c.AddFunc("50 0/5 * * * *", func() {
  145. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
  146. SingleClear = 1
  147. //跳过的次数清零
  148. p.clearContimes = 0
  149. //信息进入查找对比全局锁
  150. p.findLock.Lock()
  151. //defer p.findLock.Unlock()
  152. //合并进行的任务都完成
  153. p.wg.Wait()
  154. //遍历id
  155. //所有内存中的项目信息
  156. p.AllIdsMapLock.Lock()
  157. p.mapHrefLock.Lock()
  158. log.Println("清除开始")
  159. //清除计数
  160. clearNum := 0
  161. for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
  162. v := p.AllIdsMap[pid]
  163. if p.currentTime-v.P.LastTime > p.validTime {
  164. delete(p.mapHref, kHref)
  165. }
  166. }
  167. for k, v := range p.AllIdsMap {
  168. if p.currentTime-v.P.LastTime > p.validTime {
  169. clearNum++
  170. //删除id的map
  171. delete(p.AllIdsMap, k)
  172. //删除pb
  173. if v.P.Buyer != "" {
  174. ids := p.mapPb[v.P.Buyer]
  175. if ids != nil {
  176. ids.Lock.Lock()
  177. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  178. if len(ids.Arr) == 0 {
  179. delete(p.mapPb, v.P.Buyer)
  180. }
  181. ids.Lock.Unlock()
  182. }
  183. }
  184. //删除mapPn
  185. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  186. if vn != "" {
  187. ids := p.mapPn[vn]
  188. if ids != nil {
  189. ids.Lock.Lock()
  190. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  191. if len(ids.Arr) == 0 {
  192. delete(p.mapPn, vn)
  193. }
  194. ids.Lock.Unlock()
  195. }
  196. }
  197. }
  198. //删除mapPc
  199. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  200. if vn != "" {
  201. ids := p.mapPc[vn]
  202. if ids != nil {
  203. ids.Lock.Lock()
  204. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  205. if len(ids.Arr) == 0 {
  206. delete(p.mapPc, vn)
  207. }
  208. ids.Lock.Unlock()
  209. }
  210. }
  211. }
  212. v = nil
  213. }
  214. }
  215. p.mapHrefLock.Unlock()
  216. p.AllIdsMapLock.Unlock()
  217. p.findLock.Unlock()
  218. SingleClear = 0
  219. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
  220. } else {
  221. p.clearContimes++
  222. }
  223. })
  224. c.Start()
  225. }
  226. //全量合并
  227. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  228. defer util.Catch()
  229. p.thread = util.IntAllDef(Thread, 4)
  230. q, _ := udpInfo["query"].(map[string]interface{})
  231. if q == nil {
  232. q = map[string]interface{}{}
  233. lteid, _ := udpInfo["lteid"].(string)
  234. var idmap map[string]interface{}
  235. if len(lteid) > 15 {
  236. idmap = map[string]interface{}{
  237. "$lte": StringTOBsonId(lteid),
  238. }
  239. }
  240. gtid, _ := udpInfo["gtid"].(string)
  241. if len(gtid) > 15 {
  242. if idmap == nil {
  243. idmap = map[string]interface{}{}
  244. }
  245. idmap["$gte"] = StringTOBsonId(gtid)
  246. }
  247. if idmap != nil {
  248. q["_id"] = idmap
  249. }
  250. }
  251. //生成查询语句执行
  252. log.Println("查询语句:", q)
  253. p.enter(MongoTool.DbName, ExtractColl, q)
  254. }
  255. //增量合并
  256. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  257. defer util.Catch()
  258. //1、检查pubilshtime索引
  259. db, _ := udpInfo["db"].(string)
  260. if db == "" {
  261. db = MongoTool.DbName
  262. }
  263. coll, _ := udpInfo["coll"].(string)
  264. if coll == "" {
  265. coll = ExtractColl
  266. }
  267. thread := util.IntAllDef(Thread, 4)
  268. if thread > 0 {
  269. p.thread = thread
  270. }
  271. //开始id和结束id
  272. q, _ := udpInfo["query"].(map[string]interface{})
  273. gtid := udpInfo["gtid"].(string)
  274. lteid := udpInfo["lteid"].(string)
  275. if q == nil {
  276. q = map[string]interface{}{
  277. "_id": map[string]interface{}{
  278. "$gt": StringTOBsonId(gtid),
  279. "$lte": StringTOBsonId(lteid),
  280. },
  281. }
  282. }
  283. if q != nil {
  284. //生成查询语句执行
  285. p.enter(db, coll, q)
  286. }
  287. if udpInfo["stop"] == nil {
  288. for i := 0; i < 5; i++ {
  289. sp <- true
  290. }
  291. for i := 0; i < 5; i++ {
  292. <-sp
  293. }
  294. log.Println("保存完成,生索引", p.pici)
  295. time.Sleep(5 * time.Second)
  296. nextNode(udpInfo, p.pici)
  297. }
  298. }
  299. //招标字段更新
  300. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  301. defer util.Catch()
  302. db, _ := udpInfo["db"].(string)
  303. if db == "" {
  304. db = MongoTool.DbName
  305. }
  306. coll, _ := udpInfo["coll"].(string)
  307. if coll == "" {
  308. coll = ExtractColl
  309. }
  310. thread := util.IntAllDef(Thread, 4)
  311. if thread > 0 {
  312. p.thread = thread
  313. }
  314. q, _ := udpInfo["query"].(map[string]interface{})
  315. gtid := udpInfo["gtid"].(string)
  316. lteid := udpInfo["lteid"].(string)
  317. if q == nil {
  318. q = map[string]interface{}{
  319. "_id": map[string]interface{}{
  320. "$gte": StringTOBsonId(gtid),
  321. "$lte": StringTOBsonId(lteid),
  322. },
  323. "is_m": 1,
  324. }
  325. }
  326. log.Println("查询语句:", q)
  327. p.enter(db, coll, q)
  328. }
  329. //修改公告信息的预算/中标金额
  330. func (p *ProjectTask) taskUpdateMoney(udpInfo map[string]interface{}) {
  331. defer util.Catch()
  332. id := udpInfo["id"].(string)
  333. budget := util.ObjToString(udpInfo["budget"])
  334. bidamount := util.ObjToString(udpInfo["bidamount"])
  335. if budget == "" && bidamount == "" {
  336. util.Debug("")
  337. return
  338. }
  339. client := Es.GetEsConn()
  340. defer Es.DestoryEsConn(client)
  341. esquery := `{"query": {"bool": {"must": [{"term": {"list.infoid": "`+id+`"}}]}}}`
  342. data := Es.Get(Index, Itype, esquery)
  343. util.Debug(*data)
  344. if data != nil {
  345. pid := util.ObjToString((*data)[0]["_id"])
  346. pro := MongoTool.FindById(ProjectColl, pid)
  347. var info *map[string]interface{}
  348. for _, v := range []interface{}(pro["list"].(primitive.A)){
  349. v1 := v.(map[string]interface{})
  350. if util.ObjToString(v1["infoid"]) == id {
  351. info = util.ObjToMap(v)
  352. infoField := util.ObjToMap(pro["infofield"])
  353. if budget != "" {
  354. if budget != "del" {
  355. newBudget, _ := strconv.ParseFloat(budget, 64)
  356. if pro["budget"] == (*info)["budget"] {
  357. pro["budget"] = newBudget
  358. }
  359. if util.IntAll(pro["multipackage"]) == 1 {
  360. if packages, ok := pro["package"].(map[string]interface{}); ok {
  361. M :
  362. for _, v := range packages{
  363. v1 := []interface{}(v.(primitive.A))
  364. for _, v2 := range v1{
  365. v3 := v2.(map[string]interface{})
  366. if util.ObjToString(v3["infoid"]) == id {
  367. if v3["budget"] != nil {
  368. v3["budget"] = newBudget
  369. }
  370. }else {
  371. break M
  372. }
  373. }
  374. }
  375. }
  376. }
  377. if pro["sortprice"] == (*info)["budget"] {
  378. pro["sortprice"] = newBudget
  379. }
  380. (*info)["budget"] = newBudget
  381. (*util.ObjToMap((*infoField)[id]))["budget"] = newBudget
  382. }else {
  383. delete(*info, "budget")
  384. delete(*util.ObjToMap((*infoField)[id]), "budget")
  385. if pro["budget"] == (*info)["budget"] {
  386. money := FindMoney("budget", pro)
  387. if money >= 0 {
  388. pro["budget"] = money
  389. }
  390. }
  391. }
  392. }
  393. if bidamount != "" {
  394. if bidamount != "del" {
  395. newBidamount, _ := strconv.ParseFloat(bidamount, 64)
  396. if pro["bidamount"] == (*info)["bidamount"] {
  397. pro["bidamount"] = newBidamount
  398. }
  399. if util.IntAll(pro["multipackage"]) == 1 {
  400. if packages, ok := pro["package"].(map[string]interface{}); ok {
  401. N :
  402. for _, v := range packages{
  403. v1 := []interface{}(v.(primitive.A))
  404. for _, v2 := range v1{
  405. v3 := v2.(map[string]interface{})
  406. if util.ObjToString(v3["infoid"]) == id {
  407. if v3["bidamount"] != nil {
  408. v3["bidamount"] = newBidamount
  409. }
  410. }else {
  411. break N
  412. }
  413. }
  414. }
  415. }
  416. }
  417. if pro["sortprice"] == (*info)["bidamount"] {
  418. pro["sortprice"] = newBidamount
  419. }
  420. (*info)["bidamount"] = newBidamount
  421. (*util.ObjToMap((*infoField)[id]))["bidamount"] = newBidamount
  422. }else {
  423. delete((*info), "bidamount")
  424. delete((*util.ObjToMap((*infoField)[id])), "bidamount")
  425. if pro["bidamount"] == (*info)["bidamount"] {
  426. money := FindMoney("bidamount", pro)
  427. if money >= 0 {
  428. pro["bidamount"] = money
  429. }
  430. }
  431. }
  432. }
  433. break
  434. }
  435. }
  436. var project *ProjectInfo
  437. var pInfo *Info
  438. bys, _ := json.Marshal(pro)
  439. _ = json.Unmarshal(bys, &project)
  440. bys1, _ := json.Marshal(info)
  441. _ = json.Unmarshal(bys1, &pInfo)
  442. CountAmount(project, pInfo, *info)
  443. if project.Budget > 0 {
  444. util.Debug(project.Budget)
  445. pro["budget"] = project.Budget
  446. pro["budgettag"] = 0
  447. }
  448. if project.Bidamount > 0 {
  449. util.Debug(project.Bidamount)
  450. pro["bidamount"] = project.Bidamount
  451. pro["bidamounttag"] = 0
  452. }
  453. set := map[string]interface{}{
  454. "$set": pro,
  455. }
  456. MongoTool.UpdateById(ProjectColl, pid, set)
  457. loadStart := util.Int64All(Sysconfig["loadStart"])
  458. if loadStart > -1 && project.LastTime >loadStart {
  459. util.Debug("内存中存在该项目信息", project.Id)
  460. p.AllIdsMapLock.Lock()
  461. p.AllIdsMap[pid].P = project
  462. p.AllIdsMapLock.Unlock()
  463. }
  464. bol := Es.DelById(Index, Itype, pid)
  465. if bol {
  466. util.Debug("删除es索引, pid------", pid)
  467. //调udp生索引
  468. by, _ := json.Marshal(map[string]interface{}{
  469. "query": map[string]interface{}{
  470. "_id": bson.M{
  471. "$gte": pid,
  472. "$lte": pid,
  473. }},
  474. "stype": "project",
  475. })
  476. _ = udpclient.WriteUdp(by, mu.OP_TYPE_DATA, toaddr[1])
  477. }
  478. }
  479. }
  480. func FindMoney(key string, project map[string]interface{}) float64 {
  481. money := -0.1
  482. for i, v := range []interface{}(project["list"].(primitive.A)){
  483. v1 := v.(map[string]interface{})
  484. if i == 0 {
  485. if v1[key] != nil {
  486. money = util.Float64All(v1[key])
  487. }
  488. }else {
  489. if v1[key] != nil && util.Float64All(v1[key]) > money {
  490. money = util.Float64All(v1[key])
  491. }
  492. }
  493. }
  494. return money
  495. }
  496. func StringTOBsonId(id string) primitive.ObjectID {
  497. objectId, _ := primitive.ObjectIDFromHex(id)
  498. return objectId
  499. }
  500. //通知下个节点nextNode
  501. func nextNode(mapInfo map[string]interface{}, pici int64) {
  502. mapInfo["stype"] = "project"
  503. mapInfo["query"] = map[string]interface{}{
  504. "pici": pici,
  505. }
  506. for n, to := range toaddr {
  507. key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
  508. mapInfo["key"] = key
  509. datas, _ := json.Marshal(mapInfo)
  510. node := &udpNode{datas, to, time.Now().Unix(), 0}
  511. udptaskmap.Store(key, node)
  512. _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
  513. }
  514. }
  515. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  516. defer util.Catch()
  517. defer func() {
  518. p.Brun = false
  519. }()
  520. p.Brun = true
  521. count, taskcount := 0, 0
  522. pool := make(chan bool, p.thread)
  523. log.Println("start project", q)
  524. sess := MongoTool.GetMgoConn()
  525. defer MongoTool.DestoryMongoConn(sess)
  526. infoPool := make(chan map[string]interface{}, 2000)
  527. over := make(chan bool)
  528. go func() {
  529. L:
  530. for {
  531. select {
  532. case tmp := <-infoPool:
  533. pool <- true
  534. taskcount++
  535. go func(tmp map[string]interface{}) {
  536. defer func() {
  537. <-pool
  538. }()
  539. if util.IntAll(tmp["repeat"]) == 0 {
  540. if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 1 {
  541. //增量 dataging为1不参与合并
  542. return
  543. }
  544. p.fillInPlace(tmp)
  545. info := ParseInfo(tmp)
  546. p.currentTime = info.Publishtime
  547. if p.currentType == "updateInfo" {
  548. //招标信息更改合并
  549. p.updateJudge(tmp, info)
  550. } else {
  551. //普通合并
  552. p.CommonMerge(tmp, info)
  553. }
  554. } else {
  555. //信息错误,进行更新
  556. }
  557. }(tmp)
  558. case <-over:
  559. break L
  560. }
  561. }
  562. }()
  563. fields := map[string]interface{} {"area": 1, "city": 1, "district": 1, "comeintime": 1, "publishtime": 1, "bidopentime": 1, "title": 1, "projectname": 1, "href": 1,
  564. "projectcode": 1, "buyerclass": 1, "winner": 1, "s_winner": 1, "buyer": 1, "buyerperson": 1, "buyertel": 1, "infoformat": 1, "toptype": 1, "subtype": 1, "spidercode": 1,
  565. "site": 1, "topscopeclass": 1, "subscopeclass": 1, "bidamount": 1, "budget": 1, "agency": 1, "package": 1, "jsondata": 1}
  566. ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
  567. if Sysconfig["hints"] != nil {
  568. ms.Hint(Sysconfig["hints"])
  569. }
  570. query := ms.Iter()
  571. //query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
  572. var lastid interface{}
  573. L:
  574. for {
  575. select {
  576. case <-queryClose:
  577. log.Println("receive interrupt sign")
  578. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  579. queryCloseOver <- true
  580. break L
  581. default:
  582. tmp := make(map[string]interface{})
  583. if query.Next(&tmp) {
  584. lastid = tmp["_id"]
  585. if count%10000 == 0 {
  586. log.Println("current", count, lastid)
  587. }
  588. infoPool <- tmp
  589. count++
  590. } else {
  591. break L
  592. }
  593. }
  594. }
  595. time.Sleep(5 * time.Second)
  596. over <- true
  597. //阻塞
  598. for n := 0; n < p.thread; n++ {
  599. pool <- true
  600. }
  601. log.Println("所有线程执行完成...", count, taskcount)
  602. }
  603. var (
  604. //从标题获取项目编号
  605. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  606. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  607. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  608. //项目编号过滤
  609. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  610. //项目编号只是数字或只是字母4个以下
  611. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  612. //纯数字或纯字母
  613. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  614. //含分包词,招标未识别分包 合并到一个项目
  615. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  616. )
  617. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  618. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  619. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  620. proHref := util.ObjToString(jsonData["projecthref"])
  621. if jsonData != nil && proHref != "" {
  622. //projectHref字段合并
  623. tmp["projecthref"] = proHref
  624. p.mapHrefLock.Lock()
  625. pid := p.mapHref[proHref]
  626. p.mapHrefLock.Unlock()
  627. if pid != "" {
  628. p.AllIdsMapLock.Lock()
  629. comparePro := p.AllIdsMap[pid].P
  630. p.AllIdsMapLock.Unlock()
  631. _, ex := p.CompareStatus(comparePro, info)
  632. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  633. } else {
  634. id, p1 := p.NewProject(tmp, info)
  635. p.mapHrefLock.Lock()
  636. p.mapHref[proHref] = id
  637. p.mapHrefLock.Unlock()
  638. p.AllIdsMapLock.Lock()
  639. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  640. p.AllIdsMapLock.Unlock()
  641. }
  642. } else {
  643. //项目合并
  644. p.startProjectMerge(info, tmp)
  645. }
  646. } else {
  647. //项目合并
  648. p.startProjectMerge(info, tmp)
  649. }
  650. }
  651. }
  652. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  653. bys, _ := json.Marshal(tmp)
  654. var thisinfo *Info
  655. _ = json.Unmarshal(bys, &thisinfo)
  656. if thisinfo == nil {
  657. return nil
  658. }
  659. if len(thisinfo.Topscopeclass) == 0 {
  660. thisinfo.Topscopeclass = []string{}
  661. }
  662. if len(thisinfo.Subscopeclass) == 0 {
  663. thisinfo.Subscopeclass = []string{}
  664. }
  665. if thisinfo.SubType == "" {
  666. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  667. }
  668. //从标题中查找项目编号
  669. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  670. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  671. thisinfo.PTC = res[1]
  672. } else {
  673. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  674. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  675. thisinfo.PTC = res[3]
  676. } else {
  677. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  678. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  679. thisinfo.PTC = res[1]
  680. }
  681. }
  682. }
  683. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  684. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  685. if thisinfo.ProjectName != "" {
  686. thisinfo.pnbval++
  687. }
  688. }
  689. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  690. if thisinfo.ProjectCode != "" {
  691. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  692. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  693. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  694. }
  695. } else {
  696. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  697. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  698. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  699. }
  700. }
  701. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  702. thisinfo.pnbval++
  703. }
  704. }
  705. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  706. thisinfo.PTC = ""
  707. }
  708. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  709. thisinfo.pnbval++
  710. } else {
  711. thisinfo.Buyer = ""
  712. }
  713. //winners整理
  714. winner, _ := tmp["winner"].(string)
  715. m1 := map[string]bool{}
  716. winners := []string{}
  717. if winner != "" {
  718. m1[winner] = true
  719. winners = append(winners, winner)
  720. }
  721. packageM, _ := tmp["package"].(map[string]interface{})
  722. if packageM != nil {
  723. thisinfo.HasPackage = true
  724. for _, p := range packageM {
  725. pm, _ := p.(map[string]interface{})
  726. pw, _ := pm["winner"].(string)
  727. if pw != "" && !m1[pw] {
  728. m1[pw] = true
  729. winners = append(winners, pw)
  730. }
  731. }
  732. }
  733. thisinfo.Winners = winners
  734. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  735. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  736. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  737. //处理分包中数据异常问题
  738. for k, tmp := range thisinfo.Package {
  739. if ps, ok := tmp.([]map[string]interface{}); ok {
  740. for i, p := range ps {
  741. name, _ := p["name"].(string)
  742. if len([]rune(name)) > 100 {
  743. p["name"] = fmt.Sprint([]rune(name[:100]))
  744. }
  745. ps[i] = p
  746. }
  747. thisinfo.Package[k] = ps
  748. }
  749. }
  750. return thisinfo
  751. }
  752. func (p *ProjectTask) updateJudge(tmp map[string]interface{}, info *Info) {
  753. index := -1
  754. pInfoId := ""
  755. p.AllIdsMapLock.Lock()
  756. F:
  757. for k, ID := range p.AllIdsMap {
  758. for i, id := range ID.P.Ids {
  759. if info.Id == id {
  760. pInfoId = k
  761. index = i
  762. break F
  763. }
  764. }
  765. }
  766. p.AllIdsMapLock.Unlock()
  767. //未找到招标信息
  768. if index == -1 {
  769. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  770. p.currentTime = info.Publishtime
  771. p.startProjectMerge(info, tmp)
  772. }
  773. } else {
  774. tmpPro := MongoTool.FindById(ProjectColl, pInfoId)
  775. infoList := []interface{}(tmpPro["list"].(primitive.A))
  776. infoMap := infoList[index].(map[string]interface{})
  777. modifyMap, f := modifyEle(infoMap, tmp)
  778. //projecthref字段
  779. jsonData := tmp["jsondata"].(map[string]interface{})
  780. if jsonData != nil && jsonData["projecthref"] != nil {
  781. proHref := jsonData["projecthref"].(string)
  782. tmp["projecthref"] = proHref
  783. p.mapHrefLock.Lock()
  784. pid := p.mapHref[proHref]
  785. p.mapHrefLock.Unlock()
  786. if pid == pInfoId {
  787. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  788. return
  789. }
  790. }
  791. if f {
  792. //合并、修改
  793. log.Println("合并修改更新", "----------------------------")
  794. p.mergeAndModify(pInfoId, index, info, tmp, tmpPro, modifyMap)
  795. } else {
  796. //修改
  797. log.Println("修改更新", "----------------------------")
  798. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  799. }
  800. }
  801. }
  802. var Elements = []string{
  803. "projectname",
  804. "projectcode",
  805. "agency",
  806. "budget",
  807. "bidamount",
  808. "buyerperson",
  809. "area",
  810. "city",
  811. "publishtime",
  812. }
  813. /**
  814. 判断修改的字段是否是影响合并流程的要素字段
  815. */
  816. func modifyEle(tmpPro map[string]interface{}, tmp map[string]interface{}) (map[string]interface{}, bool) {
  817. modifyMap := map[string]interface{}{}
  818. for k := range tmpPro {
  819. for k1 := range tmp {
  820. if k == k1 && tmpPro[k] != tmp[k1] {
  821. modifyMap[k] = tmp[k1]
  822. break
  823. }
  824. }
  825. }
  826. for k := range modifyMap {
  827. for _, str := range Elements {
  828. if k == str {
  829. return modifyMap, true
  830. }
  831. }
  832. }
  833. delete(modifyMap, "_id")
  834. return modifyMap, false
  835. }
  836. //补全位置信息
  837. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  838. area := util.ObjToString(tmp["area"])
  839. city := util.ObjToString(tmp["city"])
  840. district := util.ObjToString(tmp["district"])
  841. if area != "" && city != "" && district != "" {
  842. return
  843. }
  844. tmpSite := util.ObjToString(tmp["site"])
  845. if tmpSite == "" {
  846. return
  847. }
  848. p.mapSiteLock.Lock()
  849. defer p.mapSiteLock.Unlock()
  850. site := p.mapSite[tmpSite]
  851. if site != nil {
  852. if area != "" {
  853. if area == "全国" {
  854. tmp["area"] = site.Area
  855. tmp["city"] = site.City
  856. tmp["district"] = site.District
  857. return
  858. }
  859. if area != site.Area {
  860. return
  861. } else {
  862. if city == site.City {
  863. if district == "" {
  864. tmp["district"] = site.District
  865. return
  866. }
  867. } else if city == "" {
  868. tmp["city"] = site.City
  869. tmp["district"] = site.District
  870. return
  871. } else if site.City == "" {
  872. return
  873. }
  874. }
  875. } else {
  876. tmp["area"] = site.Area
  877. tmp["city"] = site.City
  878. tmp["district"] = site.District
  879. return
  880. }
  881. }
  882. }
  883. //从数组中删除元素
  884. func deleteSlice(arr []string, v, stype string) []string {
  885. for k, v1 := range arr {
  886. if v1 == v {
  887. ts := time.Now().Unix()
  888. arr = append(arr[:k], arr[k+1:]...)
  889. rt := time.Now().Unix() - ts
  890. if rt > 0 {
  891. log.Println("deleteSlice", stype, rt, v, len(arr))
  892. }
  893. return arr
  894. }
  895. }
  896. return arr
  897. }