task.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. mu "mfw/util"
  7. "qfw/util"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/robfig/cron"
  13. "go.mongodb.org/mongo-driver/bson/primitive"
  14. )
  15. /**
  16. 任务入口
  17. 全量、增量合并
  18. 更新、插入,内存清理
  19. 转换成info对象
  20. **/
  21. //项目合并对象
  22. type ProjectTask struct {
  23. InitMinTime int64 //最小时间,小于0的处理一次
  24. name string
  25. thread int //线程数
  26. //查找锁
  27. findLock sync.Mutex
  28. wg sync.WaitGroup
  29. //map锁
  30. AllIdsMapLock sync.Mutex
  31. //对应的id
  32. AllIdsMap map[string]*ID
  33. //采购单位、项目名称、项目编号
  34. mapPb, mapPn, mapPc map[string]*Key
  35. //流程数据 字段相同,直接合并
  36. mapHref map[string]string
  37. mapHrefLock sync.Mutex
  38. //站点
  39. mapSite map[string]*Site
  40. mapSiteLock sync.Mutex
  41. //bidtype、bidstatus 锁
  42. mapBidLock sync.Mutex
  43. //更新或新增通道
  44. updatePool chan []map[string]interface{}
  45. //savePool chan map[string]interface{}
  46. //saveSign, updateSign chan bool
  47. //表名
  48. coll string
  49. //当前状态是全量还是增量
  50. currentType string //当前是跑全量还是跑增量
  51. //
  52. clearContimes int
  53. //当前时间
  54. currentTime int64
  55. //保存长度
  56. saveSize int
  57. pici int64
  58. validTime int64
  59. statusTime int64
  60. // LockPool chan *sync.Mutex
  61. // LockPoolLock sync.Mutex
  62. // m1, m23, m4 map[int]int
  63. // l1, l23, l4 map[int]*sync.Mutex
  64. Brun bool
  65. }
  66. func NewPT() *ProjectTask {
  67. p := &ProjectTask{
  68. InitMinTime: int64(1325347200),
  69. name: "全/增量对象",
  70. thread: 4,
  71. updatePool: make(chan []map[string]interface{}, 5000),
  72. //savePool: make(chan map[string]interface{}, 2000),
  73. wg: sync.WaitGroup{},
  74. AllIdsMap: make(map[string]*ID, 5000000),
  75. mapPb: make(map[string]*Key, 1500000),
  76. mapPn: make(map[string]*Key, 5000000),
  77. mapPc: make(map[string]*Key, 5000000),
  78. mapHref: make(map[string]string, 1500000),
  79. mapSite: make(map[string]*Site, 1000000),
  80. saveSize: 400,
  81. //saveSign: make(chan bool, 1),
  82. //updateSign: make(chan bool, 1),
  83. coll: ProjectColl,
  84. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  85. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 7) * 86400),
  86. }
  87. return p
  88. }
  89. var P_QL *ProjectTask
  90. var sp = make(chan bool, 5)
  91. //初始化全量合并对象
  92. func init() {
  93. P_QL = NewPT()
  94. log.Println(len(P_QL.updatePool))
  95. go P_QL.updateAllQueue()
  96. go P_QL.clearMem()
  97. }
  98. func (p *ProjectTask) updateAllQueue() {
  99. arru := make([][]map[string]interface{}, p.saveSize)
  100. indexu := 0
  101. for {
  102. select {
  103. case v := <-p.updatePool:
  104. arru[indexu] = v
  105. indexu++
  106. if indexu == p.saveSize {
  107. sp <- true
  108. go func(arru [][]map[string]interface{}) {
  109. defer func() {
  110. <-sp
  111. }()
  112. MongoTool.UpSertBulk(p.coll, arru...)
  113. }(arru)
  114. arru = make([][]map[string]interface{}, p.saveSize)
  115. indexu = 0
  116. }
  117. case <-time.After(1000 * time.Millisecond):
  118. if indexu > 0 {
  119. sp <- true
  120. go func(arru [][]map[string]interface{}) {
  121. defer func() {
  122. <-sp
  123. }()
  124. MongoTool.UpSertBulk(p.coll, arru...)
  125. }(arru[:indexu])
  126. arru = make([][]map[string]interface{}, p.saveSize)
  127. indexu = 0
  128. }
  129. }
  130. }
  131. }
  132. //项目合并内存更新
  133. func (p *ProjectTask) clearMem() {
  134. c := cron.New()
  135. //在内存中保留最近6个月的信息
  136. //跑全量时每4分钟跑一次,跑增量时400分钟跑一次
  137. _ = c.AddFunc("50 0/15 * * * *", func() {
  138. if p.currentType == "ql" || p.clearContimes >= 60 {
  139. //跳过的次数清零
  140. p.clearContimes = 0
  141. //信息进入查找对比全局锁
  142. p.findLock.Lock()
  143. //defer p.findLock.Unlock()
  144. //合并进行的任务都完成
  145. p.wg.Wait()
  146. //遍历id
  147. //所有内存中的项目信息
  148. p.AllIdsMapLock.Lock()
  149. p.mapHrefLock.Lock()
  150. //清除计数
  151. clearNum := 0
  152. for k, v := range p.AllIdsMap {
  153. if p.currentTime-v.P.LastTime > p.validTime {
  154. clearNum++
  155. //删除id的map
  156. delete(p.AllIdsMap, k)
  157. //删除pb
  158. if v.P.Buyer != "" {
  159. ids := p.mapPb[v.P.Buyer]
  160. if ids != nil {
  161. ids.Lock.Lock()
  162. ids.Arr = deleteSlice(ids.Arr, k)
  163. if len(ids.Arr) == 0 {
  164. delete(p.mapPb, v.P.Buyer)
  165. }
  166. ids.Lock.Unlock()
  167. }
  168. }
  169. //删除mapPn
  170. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  171. if vn != "" {
  172. ids := p.mapPn[vn]
  173. if ids != nil {
  174. ids.Lock.Lock()
  175. ids.Arr = deleteSlice(ids.Arr, k)
  176. if len(ids.Arr) == 0 {
  177. delete(p.mapPn, vn)
  178. }
  179. ids.Lock.Unlock()
  180. }
  181. }
  182. }
  183. //删除mapPc
  184. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  185. if vn != "" {
  186. ids := p.mapPc[vn]
  187. if ids != nil {
  188. ids.Lock.Lock()
  189. ids.Arr = deleteSlice(ids.Arr, k)
  190. if len(ids.Arr) == 0 {
  191. delete(p.mapPc, vn)
  192. }
  193. ids.Lock.Unlock()
  194. }
  195. }
  196. }
  197. for kHref, pid := range p.mapHref {
  198. if pid == k {
  199. delete(p.mapHref, kHref)
  200. }
  201. }
  202. v = nil
  203. }
  204. }
  205. p.mapHrefLock.Unlock()
  206. p.AllIdsMapLock.Unlock()
  207. p.findLock.Unlock()
  208. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
  209. } else {
  210. p.clearContimes++
  211. }
  212. })
  213. c.Start()
  214. select {}
  215. }
  216. //全量合并
  217. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  218. defer util.Catch()
  219. //1、检查pubilshtime索引
  220. db, _ := udpInfo["db"].(string)
  221. if db == "" {
  222. db = MongoTool.DbName
  223. }
  224. coll, _ := udpInfo["coll"].(string)
  225. if coll == "" {
  226. coll = ExtractColl
  227. }
  228. thread := util.IntAllDef(udpInfo["thread"], 4)
  229. if thread > 0 {
  230. p.thread = thread
  231. }
  232. q, _ := udpInfo["query"].(map[string]interface{})
  233. if q == nil {
  234. q = map[string]interface{}{}
  235. lteid, _ := udpInfo["lteid"].(string)
  236. var idmap map[string]interface{}
  237. if len(lteid) > 15 {
  238. idmap = map[string]interface{}{
  239. "$lte": StringTOBsonId(lteid),
  240. }
  241. }
  242. gtid, _ := udpInfo["gtid"].(string)
  243. if len(gtid) > 15 {
  244. if idmap == nil {
  245. idmap = map[string]interface{}{}
  246. }
  247. idmap["$gt"] = StringTOBsonId(gtid)
  248. }
  249. if idmap != nil {
  250. q["_id"] = idmap
  251. }
  252. }
  253. //生成查询语句执行
  254. log.Println("查询语句:", q)
  255. p.enter(db, coll, q)
  256. }
  257. //增量合并
  258. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  259. defer util.Catch()
  260. //1、检查pubilshtime索引
  261. db, _ := udpInfo["db"].(string)
  262. if db == "" {
  263. db = MongoTool.DbName
  264. }
  265. coll, _ := udpInfo["coll"].(string)
  266. if coll == "" {
  267. coll = ExtractColl
  268. }
  269. thread := util.IntAllDef(udpInfo["thread"], 4)
  270. if thread > 0 {
  271. p.thread = thread
  272. }
  273. //开始id和结束id
  274. q, _ := udpInfo["query"].(map[string]interface{})
  275. gtid := udpInfo["gtid"].(string)
  276. lteid := udpInfo["lteid"].(string)
  277. if q == nil {
  278. q = map[string]interface{}{
  279. "_id": map[string]interface{}{
  280. "$gt": StringTOBsonId(gtid),
  281. "$lte": StringTOBsonId(lteid),
  282. },
  283. }
  284. }
  285. if q != nil {
  286. //生成查询语句执行
  287. p.enter(db, coll, q)
  288. }
  289. if udpInfo["stop"] == nil {
  290. for i := 0; i < 5; i++ {
  291. sp <- true
  292. }
  293. for i := 0; i < 5; i++ {
  294. <-sp
  295. }
  296. log.Println("保存完成,生索引", p.pici)
  297. time.Sleep(5 * time.Second)
  298. nextNode(udpInfo, p.pici)
  299. }
  300. }
  301. //招标字段更新
  302. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  303. defer util.Catch()
  304. db, _ := udpInfo["db"].(string)
  305. if db == "" {
  306. db = MongoTool.DbName
  307. }
  308. coll, _ := udpInfo["coll"].(string)
  309. if coll == "" {
  310. coll = ExtractColl
  311. }
  312. thread := util.IntAllDef(udpInfo["thread"], 4)
  313. if thread > 0 {
  314. p.thread = thread
  315. }
  316. q, _ := udpInfo["query"].(map[string]interface{})
  317. gtid := udpInfo["gtid"].(string)
  318. lteid := udpInfo["lteid"].(string)
  319. if q == nil {
  320. q = map[string]interface{}{
  321. "_id": map[string]interface{}{
  322. "$gt": StringTOBsonId(gtid),
  323. "$lte": StringTOBsonId(lteid),
  324. },
  325. "is_m": 1,
  326. }
  327. }
  328. log.Println("查询语句:", q)
  329. p.enter(db, coll, q)
  330. }
  331. func StringTOBsonId(id string) primitive.ObjectID {
  332. objectId, _ := primitive.ObjectIDFromHex(id)
  333. return objectId
  334. }
  335. //通知下个节点nextNode
  336. func nextNode(mapInfo map[string]interface{}, pici int64) {
  337. mapInfo["stype"] = "project"
  338. mapInfo["query"] = map[string]interface{}{
  339. "pici": pici,
  340. }
  341. for n, to := range toaddr {
  342. key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
  343. mapInfo["key"] = key
  344. datas, _ := json.Marshal(mapInfo)
  345. node := &udpNode{datas, to, time.Now().Unix(), 0}
  346. udptaskmap.Store(key, node)
  347. _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
  348. }
  349. }
  350. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  351. defer util.Catch()
  352. defer func() {
  353. p.Brun = false
  354. }()
  355. p.Brun = true
  356. count, taskcount := 0, 0
  357. pool := make(chan bool, p.thread)
  358. log.Println("start project", q)
  359. sess := MongoTool.GetMgoConn()
  360. defer MongoTool.DestoryMongoConn(sess)
  361. infoPool := make(chan map[string]interface{}, 2000)
  362. over := make(chan bool)
  363. go func() {
  364. L:
  365. for {
  366. select {
  367. case tmp := <-infoPool:
  368. pool <- true
  369. taskcount++
  370. go func(tmp map[string]interface{}) {
  371. defer func() {
  372. <-pool
  373. }()
  374. if util.IntAll(tmp["repeat"]) == 0 {
  375. p.fillInPlace(tmp)
  376. info := ParseInfo(tmp)
  377. p.currentTime = info.Publishtime
  378. if p.currentType == "updateInfo" {
  379. //招标信息更改合并
  380. p.updateJudge(tmp, info)
  381. } else {
  382. //普通合并
  383. p.CommonMerge(tmp, info)
  384. }
  385. } else {
  386. //信息错误,进行更新
  387. }
  388. }(tmp)
  389. case <-over:
  390. break L
  391. }
  392. }
  393. }()
  394. ms := sess.DB(db).C(coll).Find(q).Sort("publishtime")
  395. if Sysconfig["hints"] != nil {
  396. ms.Hint(Sysconfig["hints"])
  397. }
  398. query := ms.Iter()
  399. var lastid interface{}
  400. L:
  401. for {
  402. select {
  403. case <-queryClose:
  404. log.Println("receive interrupt sign")
  405. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  406. queryCloseOver <- true
  407. break L
  408. default:
  409. tmp := make(map[string]interface{})
  410. if query.Next(&tmp) {
  411. lastid = tmp["_id"]
  412. if count%10000 == 0 {
  413. log.Println("current", count, lastid)
  414. }
  415. infoPool <- tmp
  416. count++
  417. } else {
  418. break L
  419. }
  420. }
  421. }
  422. time.Sleep(5 * time.Second)
  423. over <- true
  424. //阻塞
  425. for n := 0; n < p.thread; n++ {
  426. pool <- true
  427. }
  428. log.Println("所有线程执行完成...", count, taskcount)
  429. }
  430. var (
  431. //从标题获取项目编号
  432. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  433. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  434. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  435. //项目编号过滤
  436. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  437. //项目编号只是数字或只是字母4个以下
  438. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  439. //纯数字或纯字母
  440. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  441. //含分包词,招标未识别分包 合并到一个项目
  442. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  443. )
  444. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  445. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  446. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  447. proHref := util.ObjToString(jsonData["projecthref"])
  448. if jsonData != nil && proHref != "" {
  449. //projectHref字段合并
  450. tmp["projecthref"] = proHref
  451. p.mapHrefLock.Lock()
  452. pid := p.mapHref[proHref]
  453. p.mapHrefLock.Unlock()
  454. if pid != "" {
  455. p.AllIdsMapLock.Lock()
  456. comparePro := p.AllIdsMap[pid].P
  457. p.AllIdsMapLock.Unlock()
  458. _, ex := p.CompareStatus(comparePro, info)
  459. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  460. } else {
  461. id, p1 := p.NewProject(tmp, info)
  462. p.mapHrefLock.Lock()
  463. p.mapHref[proHref] = id
  464. p.mapHrefLock.Unlock()
  465. p.AllIdsMapLock.Lock()
  466. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  467. p.AllIdsMapLock.Unlock()
  468. }
  469. } else {
  470. //项目合并
  471. p.startProjectMerge(info, tmp)
  472. }
  473. } else {
  474. //项目合并
  475. p.startProjectMerge(info, tmp)
  476. }
  477. }
  478. }
  479. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  480. bys, _ := json.Marshal(tmp)
  481. var thisinfo *Info
  482. _ = json.Unmarshal(bys, &thisinfo)
  483. if thisinfo == nil {
  484. return nil
  485. }
  486. if len(thisinfo.Topscopeclass) == 0 {
  487. thisinfo.Topscopeclass = []string{}
  488. }
  489. if len(thisinfo.Subscopeclass) == 0 {
  490. thisinfo.Subscopeclass = []string{}
  491. }
  492. if thisinfo.Publishtime == 0 {
  493. thisinfo.Publishtime = thisinfo.Comeintime
  494. tmp["publishtime"] = thisinfo.Comeintime
  495. }
  496. //从标题中查找项目编号
  497. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  498. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  499. thisinfo.PTC = res[1]
  500. } else {
  501. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  502. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  503. thisinfo.PTC = res[3]
  504. } else {
  505. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  506. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  507. thisinfo.PTC = res[1]
  508. }
  509. }
  510. }
  511. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  512. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  513. if thisinfo.ProjectName != "" {
  514. thisinfo.pnbval++
  515. }
  516. }
  517. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  518. if thisinfo.ProjectCode != "" {
  519. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  520. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  521. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  522. }
  523. } else {
  524. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  525. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  526. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  527. }
  528. }
  529. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  530. thisinfo.pnbval++
  531. }
  532. }
  533. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  534. thisinfo.PTC = ""
  535. }
  536. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  537. thisinfo.pnbval++
  538. } else {
  539. thisinfo.Buyer = ""
  540. }
  541. //winners整理
  542. winner, _ := tmp["winner"].(string)
  543. m1 := map[string]bool{}
  544. winners := []string{}
  545. if winner != "" {
  546. m1[winner] = true
  547. winners = append(winners, winner)
  548. }
  549. packageM, _ := tmp["package"].(map[string]interface{})
  550. if packageM != nil {
  551. thisinfo.HasPackage = true
  552. for _, p := range packageM {
  553. pm, _ := p.(map[string]interface{})
  554. pw, _ := pm["winner"].(string)
  555. if pw != "" && !m1[pw] {
  556. m1[pw] = true
  557. winners = append(winners, pw)
  558. }
  559. }
  560. }
  561. thisinfo.Winners = winners
  562. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  563. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  564. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  565. return thisinfo
  566. }
  567. func (p *ProjectTask) updateJudge(tmp map[string]interface{}, info *Info) {
  568. index := -1
  569. pInfoId := ""
  570. p.AllIdsMapLock.Lock()
  571. F:
  572. for k, ID := range p.AllIdsMap {
  573. for i, id := range ID.P.Ids {
  574. if info.Id == id {
  575. pInfoId = k
  576. index = i
  577. break F
  578. }
  579. }
  580. }
  581. p.AllIdsMapLock.Unlock()
  582. //未找到招标信息
  583. if index == -1 {
  584. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  585. p.currentTime = info.Publishtime
  586. p.startProjectMerge(info, tmp)
  587. }
  588. } else {
  589. tmpPro := MongoTool.FindById(ProjectColl, pInfoId)
  590. infoList := []interface{}(tmpPro["list"].(primitive.A))
  591. infoMap := infoList[index].(map[string]interface{})
  592. modifyMap, f := modifyEle(infoMap, tmp)
  593. //projecthref字段
  594. jsonData := tmp["jsondata"].(map[string]interface{})
  595. if jsonData != nil && jsonData["projecthref"] != nil {
  596. proHref := jsonData["projecthref"].(string)
  597. tmp["projecthref"] = proHref
  598. p.mapHrefLock.Lock()
  599. pid := p.mapHref[proHref]
  600. p.mapHrefLock.Unlock()
  601. if pid == pInfoId {
  602. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  603. return
  604. }
  605. }
  606. if f {
  607. //合并、修改
  608. log.Println("合并修改更新", "----------------------------")
  609. p.mergeAndModify(pInfoId, index, info, tmp, tmpPro, modifyMap)
  610. } else {
  611. //修改
  612. log.Println("修改更新", "----------------------------")
  613. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  614. }
  615. }
  616. }
  617. var Elements = []string{
  618. "projectname",
  619. "projectcode",
  620. "agency",
  621. "budget",
  622. "bidamount",
  623. "buyerperson",
  624. "area",
  625. "city",
  626. "publishtime",
  627. }
  628. /**
  629. 判断修改的字段是否是影响合并流程的要素字段
  630. */
  631. func modifyEle(tmpPro map[string]interface{}, tmp map[string]interface{}) (map[string]interface{}, bool) {
  632. modifyMap := map[string]interface{}{}
  633. for k, _ := range tmpPro {
  634. for k1, _ := range tmp {
  635. if k == k1 && tmpPro[k] != tmp[k1] {
  636. modifyMap[k] = tmp[k1]
  637. break
  638. }
  639. }
  640. }
  641. for k, _ := range modifyMap {
  642. for _, str := range Elements {
  643. if k == str {
  644. return modifyMap, true
  645. }
  646. }
  647. }
  648. delete(modifyMap, "_id")
  649. return modifyMap, false
  650. }
  651. //补全位置信息
  652. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  653. area := util.ObjToString(tmp["area"])
  654. city := util.ObjToString(tmp["city"])
  655. district := util.ObjToString(tmp["district"])
  656. if area != "" && city != "" && district != "" {
  657. return
  658. }
  659. tmpSite := util.ObjToString(tmp["site"])
  660. if tmpSite == "" {
  661. return
  662. }
  663. p.mapSiteLock.Lock()
  664. defer p.mapSiteLock.Unlock()
  665. site := p.mapSite[tmpSite]
  666. if site != nil {
  667. if area != "" {
  668. if area == "全国" {
  669. tmp["area"] = site.Area
  670. tmp["city"] = site.City
  671. tmp["district"] = site.District
  672. return
  673. }
  674. if area != site.Area {
  675. return
  676. } else {
  677. if city == site.City {
  678. if district == "" {
  679. tmp["district"] = site.District
  680. return
  681. }
  682. } else if city == "" {
  683. tmp["city"] = site.City
  684. tmp["district"] = site.District
  685. return
  686. }
  687. }
  688. } else {
  689. tmp["area"] = site.Area
  690. tmp["city"] = site.City
  691. tmp["district"] = site.District
  692. return
  693. }
  694. }
  695. }
  696. //从数组中删除元素
  697. func deleteSlice(arr []string, v string) []string {
  698. for k, v1 := range arr {
  699. if v1 == v {
  700. return append(arr[:k], arr[k+1:]...)
  701. }
  702. }
  703. return arr
  704. }