task.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. mu "mfw/util"
  7. "qfw/util"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/robfig/cron"
  13. "go.mongodb.org/mongo-driver/bson/primitive"
  14. )
  15. /**
  16. 任务入口
  17. 全量、增量合并
  18. 更新、插入,内存清理
  19. 转换成info对象
  20. **/
  21. //项目合并对象
  22. type ProjectTask struct {
  23. InitMinTime int64 //最小时间,小于0的处理一次
  24. name string
  25. thread int //线程数
  26. //查找锁
  27. findLock sync.Mutex
  28. wg sync.WaitGroup
  29. //map锁
  30. AllIdsMapLock sync.Mutex
  31. //对应的id
  32. AllIdsMap map[string]*ID
  33. //采购单位、项目名称、项目编号
  34. mapPb, mapPn, mapPc map[string]*Key
  35. //流程数据 字段相同,直接合并
  36. mapHref map[string]string
  37. mapHrefLock sync.Mutex
  38. //站点
  39. mapSite map[string]*Site
  40. mapSiteLock sync.Mutex
  41. //bidtype、bidstatus 锁
  42. mapBidLock sync.Mutex
  43. //更新或新增通道
  44. updatePool chan []map[string]interface{}
  45. //savePool chan map[string]interface{}
  46. //saveSign, updateSign chan bool
  47. //表名
  48. coll string
  49. //当前状态是全量还是增量
  50. currentType string //当前是跑全量还是跑增量
  51. //
  52. clearContimes int
  53. //当前时间
  54. currentTime int64
  55. //保存长度
  56. saveSize int
  57. pici int64
  58. validTime int64
  59. statusTime int64
  60. //结果时间的更新 最近两天的公告不再更新jgtime
  61. jgTime int64
  62. // LockPool chan *sync.Mutex
  63. // LockPoolLock sync.Mutex
  64. // m1, m23, m4 map[int]int
  65. // l1, l23, l4 map[int]*sync.Mutex
  66. Brun bool
  67. }
  68. func NewPT() *ProjectTask {
  69. p := &ProjectTask{
  70. InitMinTime: int64(1325347200),
  71. name: "全/增量对象",
  72. thread: 4,
  73. updatePool: make(chan []map[string]interface{}, 5000),
  74. //savePool: make(chan map[string]interface{}, 2000),
  75. wg: sync.WaitGroup{},
  76. AllIdsMap: make(map[string]*ID, 5000000),
  77. mapPb: make(map[string]*Key, 1500000),
  78. mapPn: make(map[string]*Key, 5000000),
  79. mapPc: make(map[string]*Key, 5000000),
  80. mapHref: make(map[string]string, 1500000),
  81. mapSite: make(map[string]*Site, 1000000),
  82. saveSize: 100,
  83. //saveSign: make(chan bool, 1),
  84. //updateSign: make(chan bool, 1),
  85. coll: ProjectColl,
  86. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  87. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  88. jgTime: int64(util.IntAllDef(3, 3) * 86400),
  89. }
  90. return p
  91. }
  92. var P_QL *ProjectTask
  93. var sp = make(chan bool, 5)
  94. //初始化全量合并对象
  95. func init() {
  96. P_QL = NewPT()
  97. log.Println(len(P_QL.updatePool))
  98. go P_QL.updateAllQueue()
  99. go P_QL.clearMem()
  100. }
  101. func (p *ProjectTask) updateAllQueue() {
  102. arru := make([][]map[string]interface{}, p.saveSize)
  103. indexu := 0
  104. for {
  105. select {
  106. case v := <-p.updatePool:
  107. arru[indexu] = v
  108. indexu++
  109. if indexu == p.saveSize {
  110. sp <- true
  111. go func(arru [][]map[string]interface{}) {
  112. defer func() {
  113. <-sp
  114. }()
  115. MongoTool.UpSertBulk(p.coll, arru...)
  116. }(arru)
  117. arru = make([][]map[string]interface{}, p.saveSize)
  118. indexu = 0
  119. }
  120. case <-time.After(1000 * time.Millisecond):
  121. if indexu > 0 {
  122. sp <- true
  123. go func(arru [][]map[string]interface{}) {
  124. defer func() {
  125. <-sp
  126. }()
  127. MongoTool.UpSertBulk(p.coll, arru...)
  128. }(arru[:indexu])
  129. arru = make([][]map[string]interface{}, p.saveSize)
  130. indexu = 0
  131. }
  132. }
  133. }
  134. }
  135. //项目合并内存更新
  136. func (p *ProjectTask) clearMem() {
  137. c := cron.New()
  138. //在内存中保留最近6个月的信息
  139. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  140. _ = c.AddFunc("50 0/5 * * * *", func() {
  141. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
  142. SingleClear = 1
  143. //跳过的次数清零
  144. p.clearContimes = 0
  145. //信息进入查找对比全局锁
  146. p.findLock.Lock()
  147. //defer p.findLock.Unlock()
  148. //合并进行的任务都完成
  149. p.wg.Wait()
  150. //遍历id
  151. //所有内存中的项目信息
  152. p.AllIdsMapLock.Lock()
  153. p.mapHrefLock.Lock()
  154. log.Println("清除开始")
  155. //清除计数
  156. clearNum := 0
  157. for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
  158. v := p.AllIdsMap[pid]
  159. if p.currentTime-v.P.LastTime > p.validTime {
  160. delete(p.mapHref, kHref)
  161. }
  162. }
  163. for k, v := range p.AllIdsMap {
  164. if p.currentTime-v.P.LastTime > p.validTime {
  165. clearNum++
  166. //删除id的map
  167. delete(p.AllIdsMap, k)
  168. //删除pb
  169. if v.P.Buyer != "" {
  170. ids := p.mapPb[v.P.Buyer]
  171. if ids != nil {
  172. ids.Lock.Lock()
  173. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  174. if len(ids.Arr) == 0 {
  175. delete(p.mapPb, v.P.Buyer)
  176. }
  177. ids.Lock.Unlock()
  178. }
  179. }
  180. //删除mapPn
  181. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  182. if vn != "" {
  183. ids := p.mapPn[vn]
  184. if ids != nil {
  185. ids.Lock.Lock()
  186. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  187. if len(ids.Arr) == 0 {
  188. delete(p.mapPn, vn)
  189. }
  190. ids.Lock.Unlock()
  191. }
  192. }
  193. }
  194. //删除mapPc
  195. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  196. if vn != "" {
  197. ids := p.mapPc[vn]
  198. if ids != nil {
  199. ids.Lock.Lock()
  200. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  201. if len(ids.Arr) == 0 {
  202. delete(p.mapPc, vn)
  203. }
  204. ids.Lock.Unlock()
  205. }
  206. }
  207. }
  208. v = nil
  209. }
  210. }
  211. p.mapHrefLock.Unlock()
  212. p.AllIdsMapLock.Unlock()
  213. p.findLock.Unlock()
  214. SingleClear = 0
  215. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
  216. } else {
  217. p.clearContimes++
  218. }
  219. })
  220. c.Start()
  221. select {}
  222. }
  223. //全量合并
  224. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  225. defer util.Catch()
  226. //1、检查pubilshtime索引
  227. db, _ := udpInfo["db"].(string)
  228. if db == "" {
  229. db = MongoTool.DbName
  230. }
  231. coll, _ := udpInfo["coll"].(string)
  232. if coll == "" {
  233. coll = ExtractColl
  234. }
  235. thread := util.IntAllDef(Thread, 4)
  236. if thread > 0 {
  237. p.thread = thread
  238. }
  239. q, _ := udpInfo["query"].(map[string]interface{})
  240. if q == nil {
  241. q = map[string]interface{}{}
  242. lteid, _ := udpInfo["lteid"].(string)
  243. var idmap map[string]interface{}
  244. if len(lteid) > 15 {
  245. idmap = map[string]interface{}{
  246. "$lte": StringTOBsonId(lteid),
  247. }
  248. }
  249. gtid, _ := udpInfo["gtid"].(string)
  250. if len(gtid) > 15 {
  251. if idmap == nil {
  252. idmap = map[string]interface{}{}
  253. }
  254. idmap["$gte"] = StringTOBsonId(gtid)
  255. }
  256. if idmap != nil {
  257. q["_id"] = idmap
  258. }
  259. }
  260. //生成查询语句执行
  261. log.Println("查询语句:", q)
  262. p.enter(db, coll, q)
  263. }
  264. //增量合并
  265. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  266. defer util.Catch()
  267. //1、检查pubilshtime索引
  268. db, _ := udpInfo["db"].(string)
  269. if db == "" {
  270. db = MongoTool.DbName
  271. }
  272. coll, _ := udpInfo["coll"].(string)
  273. if coll == "" {
  274. coll = ExtractColl
  275. }
  276. thread := util.IntAllDef(Thread, 4)
  277. if thread > 0 {
  278. p.thread = thread
  279. }
  280. //开始id和结束id
  281. q, _ := udpInfo["query"].(map[string]interface{})
  282. gtid := udpInfo["gtid"].(string)
  283. lteid := udpInfo["lteid"].(string)
  284. if q == nil {
  285. q = map[string]interface{}{
  286. "_id": map[string]interface{}{
  287. "$gt": StringTOBsonId(gtid),
  288. "$lte": StringTOBsonId(lteid),
  289. },
  290. }
  291. }
  292. if q != nil {
  293. //生成查询语句执行
  294. p.enter(db, coll, q)
  295. }
  296. if udpInfo["stop"] == nil {
  297. for i := 0; i < 5; i++ {
  298. sp <- true
  299. }
  300. for i := 0; i < 5; i++ {
  301. <-sp
  302. }
  303. log.Println("保存完成,生索引", p.pici)
  304. time.Sleep(5 * time.Second)
  305. nextNode(udpInfo, p.pici)
  306. }
  307. }
  308. //招标字段更新
  309. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  310. defer util.Catch()
  311. db, _ := udpInfo["db"].(string)
  312. if db == "" {
  313. db = MongoTool.DbName
  314. }
  315. coll, _ := udpInfo["coll"].(string)
  316. if coll == "" {
  317. coll = ExtractColl
  318. }
  319. thread := util.IntAllDef(Thread, 4)
  320. if thread > 0 {
  321. p.thread = thread
  322. }
  323. q, _ := udpInfo["query"].(map[string]interface{})
  324. gtid := udpInfo["gtid"].(string)
  325. lteid := udpInfo["lteid"].(string)
  326. if q == nil {
  327. q = map[string]interface{}{
  328. "_id": map[string]interface{}{
  329. "$gte": StringTOBsonId(gtid),
  330. "$lte": StringTOBsonId(lteid),
  331. },
  332. "is_m": 1,
  333. }
  334. }
  335. log.Println("查询语句:", q)
  336. p.enter(db, coll, q)
  337. }
  338. func StringTOBsonId(id string) primitive.ObjectID {
  339. objectId, _ := primitive.ObjectIDFromHex(id)
  340. return objectId
  341. }
  342. //通知下个节点nextNode
  343. func nextNode(mapInfo map[string]interface{}, pici int64) {
  344. mapInfo["stype"] = "project"
  345. mapInfo["query"] = map[string]interface{}{
  346. "pici": pici,
  347. }
  348. for n, to := range toaddr {
  349. key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
  350. mapInfo["key"] = key
  351. datas, _ := json.Marshal(mapInfo)
  352. node := &udpNode{datas, to, time.Now().Unix(), 0}
  353. udptaskmap.Store(key, node)
  354. _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
  355. }
  356. }
  357. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  358. defer util.Catch()
  359. defer func() {
  360. p.Brun = false
  361. }()
  362. p.Brun = true
  363. count, taskcount := 0, 0
  364. pool := make(chan bool, p.thread)
  365. log.Println("start project", q)
  366. sess := MongoTool.GetMgoConn()
  367. defer MongoTool.DestoryMongoConn(sess)
  368. infoPool := make(chan map[string]interface{}, 2000)
  369. over := make(chan bool)
  370. go func() {
  371. L:
  372. for {
  373. select {
  374. case tmp := <-infoPool:
  375. pool <- true
  376. taskcount++
  377. go func(tmp map[string]interface{}) {
  378. defer func() {
  379. <-pool
  380. }()
  381. if util.IntAll(tmp["repeat"]) == 0 {
  382. if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 1 {
  383. //增量 dataging为1不参与合并
  384. return
  385. }
  386. p.fillInPlace(tmp)
  387. info := ParseInfo(tmp)
  388. p.currentTime = info.Publishtime
  389. if p.currentType == "updateInfo" {
  390. //招标信息更改合并
  391. p.updateJudge(tmp, info)
  392. } else {
  393. //普通合并
  394. p.CommonMerge(tmp, info)
  395. }
  396. } else {
  397. //信息错误,进行更新
  398. }
  399. }(tmp)
  400. case <-over:
  401. break L
  402. }
  403. }
  404. }()
  405. fields := map[string]interface{} {"area": 1, "city": 1, "district": 1, "comeintime": 1, "publishtime": 1, "bidopentime": 1, "title": 1, "projectname": 1, "href": 1,
  406. "projectcode": 1, "buyerclass": 1, "winner": 1, "s_winner": 1, "buyer": 1, "buyerperson": 1, "buyertel": 1, "infoformat": 1, "toptype": 1, "subtype": 1, "spidercode": 1,
  407. "site": 1, "topscopeclass": 1, "subscopeclass": 1, "bidamount": 1, "budget": 1, "agency": 1, "package": 1}
  408. ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
  409. if Sysconfig["hints"] != nil {
  410. ms.Hint(Sysconfig["hints"])
  411. }
  412. query := ms.Iter()
  413. //query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
  414. var lastid interface{}
  415. L:
  416. for {
  417. select {
  418. case <-queryClose:
  419. log.Println("receive interrupt sign")
  420. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  421. queryCloseOver <- true
  422. break L
  423. default:
  424. tmp := make(map[string]interface{})
  425. if query.Next(&tmp) {
  426. lastid = tmp["_id"]
  427. if count%10000 == 0 {
  428. log.Println("current", count, lastid)
  429. }
  430. infoPool <- tmp
  431. count++
  432. } else {
  433. break L
  434. }
  435. }
  436. }
  437. time.Sleep(5 * time.Second)
  438. over <- true
  439. //阻塞
  440. for n := 0; n < p.thread; n++ {
  441. pool <- true
  442. }
  443. log.Println("所有线程执行完成...", count, taskcount)
  444. }
  445. var (
  446. //从标题获取项目编号
  447. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  448. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  449. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  450. //项目编号过滤
  451. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  452. //项目编号只是数字或只是字母4个以下
  453. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  454. //纯数字或纯字母
  455. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  456. //含分包词,招标未识别分包 合并到一个项目
  457. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  458. )
  459. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  460. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  461. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  462. proHref := util.ObjToString(jsonData["projecthref"])
  463. if jsonData != nil && proHref != "" {
  464. //projectHref字段合并
  465. tmp["projecthref"] = proHref
  466. p.mapHrefLock.Lock()
  467. pid := p.mapHref[proHref]
  468. p.mapHrefLock.Unlock()
  469. if pid != "" {
  470. p.AllIdsMapLock.Lock()
  471. comparePro := p.AllIdsMap[pid].P
  472. p.AllIdsMapLock.Unlock()
  473. _, ex := p.CompareStatus(comparePro, info)
  474. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  475. } else {
  476. id, p1 := p.NewProject(tmp, info)
  477. p.mapHrefLock.Lock()
  478. p.mapHref[proHref] = id
  479. p.mapHrefLock.Unlock()
  480. p.AllIdsMapLock.Lock()
  481. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  482. p.AllIdsMapLock.Unlock()
  483. }
  484. } else {
  485. //项目合并
  486. p.startProjectMerge(info, tmp)
  487. }
  488. } else {
  489. //项目合并
  490. p.startProjectMerge(info, tmp)
  491. }
  492. }
  493. }
  494. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  495. bys, _ := json.Marshal(tmp)
  496. var thisinfo *Info
  497. _ = json.Unmarshal(bys, &thisinfo)
  498. if thisinfo == nil {
  499. return nil
  500. }
  501. if len(thisinfo.Topscopeclass) == 0 {
  502. thisinfo.Topscopeclass = []string{}
  503. }
  504. if len(thisinfo.Subscopeclass) == 0 {
  505. thisinfo.Subscopeclass = []string{}
  506. }
  507. if thisinfo.SubType == "" {
  508. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  509. }
  510. //从标题中查找项目编号
  511. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  512. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  513. thisinfo.PTC = res[1]
  514. } else {
  515. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  516. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  517. thisinfo.PTC = res[3]
  518. } else {
  519. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  520. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  521. thisinfo.PTC = res[1]
  522. }
  523. }
  524. }
  525. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  526. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  527. if thisinfo.ProjectName != "" {
  528. thisinfo.pnbval++
  529. }
  530. }
  531. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  532. if thisinfo.ProjectCode != "" {
  533. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  534. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  535. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  536. }
  537. } else {
  538. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  539. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  540. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  541. }
  542. }
  543. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  544. thisinfo.pnbval++
  545. }
  546. }
  547. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  548. thisinfo.PTC = ""
  549. }
  550. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  551. thisinfo.pnbval++
  552. } else {
  553. thisinfo.Buyer = ""
  554. }
  555. //winners整理
  556. winner, _ := tmp["winner"].(string)
  557. m1 := map[string]bool{}
  558. winners := []string{}
  559. if winner != "" {
  560. m1[winner] = true
  561. winners = append(winners, winner)
  562. }
  563. packageM, _ := tmp["package"].(map[string]interface{})
  564. if packageM != nil {
  565. thisinfo.HasPackage = true
  566. for _, p := range packageM {
  567. pm, _ := p.(map[string]interface{})
  568. pw, _ := pm["winner"].(string)
  569. if pw != "" && !m1[pw] {
  570. m1[pw] = true
  571. winners = append(winners, pw)
  572. }
  573. }
  574. }
  575. thisinfo.Winners = winners
  576. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  577. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  578. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  579. //处理分包中数据异常问题
  580. for k, tmp := range thisinfo.Package {
  581. if ps, ok := tmp.([]map[string]interface{}); ok {
  582. for i, p := range ps {
  583. name, _ := p["name"].(string)
  584. if len([]rune(name)) > 100 {
  585. p["name"] = fmt.Sprint([]rune(name[:100]))
  586. }
  587. ps[i] = p
  588. }
  589. thisinfo.Package[k] = ps
  590. }
  591. }
  592. return thisinfo
  593. }
  594. func (p *ProjectTask) updateJudge(tmp map[string]interface{}, info *Info) {
  595. index := -1
  596. pInfoId := ""
  597. p.AllIdsMapLock.Lock()
  598. F:
  599. for k, ID := range p.AllIdsMap {
  600. for i, id := range ID.P.Ids {
  601. if info.Id == id {
  602. pInfoId = k
  603. index = i
  604. break F
  605. }
  606. }
  607. }
  608. p.AllIdsMapLock.Unlock()
  609. //未找到招标信息
  610. if index == -1 {
  611. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  612. p.currentTime = info.Publishtime
  613. p.startProjectMerge(info, tmp)
  614. }
  615. } else {
  616. tmpPro := MongoTool.FindById(ProjectColl, pInfoId)
  617. infoList := []interface{}(tmpPro["list"].(primitive.A))
  618. infoMap := infoList[index].(map[string]interface{})
  619. modifyMap, f := modifyEle(infoMap, tmp)
  620. //projecthref字段
  621. jsonData := tmp["jsondata"].(map[string]interface{})
  622. if jsonData != nil && jsonData["projecthref"] != nil {
  623. proHref := jsonData["projecthref"].(string)
  624. tmp["projecthref"] = proHref
  625. p.mapHrefLock.Lock()
  626. pid := p.mapHref[proHref]
  627. p.mapHrefLock.Unlock()
  628. if pid == pInfoId {
  629. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  630. return
  631. }
  632. }
  633. if f {
  634. //合并、修改
  635. log.Println("合并修改更新", "----------------------------")
  636. p.mergeAndModify(pInfoId, index, info, tmp, tmpPro, modifyMap)
  637. } else {
  638. //修改
  639. log.Println("修改更新", "----------------------------")
  640. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  641. }
  642. }
  643. }
  644. var Elements = []string{
  645. "projectname",
  646. "projectcode",
  647. "agency",
  648. "budget",
  649. "bidamount",
  650. "buyerperson",
  651. "area",
  652. "city",
  653. "publishtime",
  654. }
  655. /**
  656. 判断修改的字段是否是影响合并流程的要素字段
  657. */
  658. func modifyEle(tmpPro map[string]interface{}, tmp map[string]interface{}) (map[string]interface{}, bool) {
  659. modifyMap := map[string]interface{}{}
  660. for k := range tmpPro {
  661. for k1 := range tmp {
  662. if k == k1 && tmpPro[k] != tmp[k1] {
  663. modifyMap[k] = tmp[k1]
  664. break
  665. }
  666. }
  667. }
  668. for k := range modifyMap {
  669. for _, str := range Elements {
  670. if k == str {
  671. return modifyMap, true
  672. }
  673. }
  674. }
  675. delete(modifyMap, "_id")
  676. return modifyMap, false
  677. }
  678. //补全位置信息
  679. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  680. area := util.ObjToString(tmp["area"])
  681. city := util.ObjToString(tmp["city"])
  682. district := util.ObjToString(tmp["district"])
  683. if area != "" && city != "" && district != "" {
  684. return
  685. }
  686. tmpSite := util.ObjToString(tmp["site"])
  687. if tmpSite == "" {
  688. return
  689. }
  690. p.mapSiteLock.Lock()
  691. defer p.mapSiteLock.Unlock()
  692. site := p.mapSite[tmpSite]
  693. if site != nil {
  694. if area != "" {
  695. if area == "全国" {
  696. tmp["area"] = site.Area
  697. tmp["city"] = site.City
  698. tmp["district"] = site.District
  699. return
  700. }
  701. if area != site.Area {
  702. return
  703. } else {
  704. if city == site.City {
  705. if district == "" {
  706. tmp["district"] = site.District
  707. return
  708. }
  709. } else if city == "" {
  710. tmp["city"] = site.City
  711. tmp["district"] = site.District
  712. return
  713. } else if site.City == "" {
  714. return
  715. }
  716. }
  717. } else {
  718. tmp["area"] = site.Area
  719. tmp["city"] = site.City
  720. tmp["district"] = site.District
  721. return
  722. }
  723. }
  724. }
  725. //从数组中删除元素
  726. func deleteSlice(arr []string, v, stype string) []string {
  727. for k, v1 := range arr {
  728. if v1 == v {
  729. ts := time.Now().Unix()
  730. arr = append(arr[:k], arr[k+1:]...)
  731. rt := time.Now().Unix() - ts
  732. if rt > 0 {
  733. log.Println("deleteSlice", stype, rt, v, len(arr))
  734. }
  735. return arr
  736. }
  737. }
  738. return arr
  739. }