task.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. mu "mfw/util"
  7. "qfw/util"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/robfig/cron"
  13. "go.mongodb.org/mongo-driver/bson/primitive"
  14. )
  15. /**
  16. 任务入口
  17. 全量、增量合并
  18. 更新、插入,内存清理
  19. 转换成info对象
  20. **/
  21. //项目合并对象
  22. type ProjectTask struct {
  23. InitMinTime int64 //最小时间,小于0的处理一次
  24. name string
  25. thread int //线程数
  26. //查找锁
  27. findLock sync.Mutex
  28. wg sync.WaitGroup
  29. //map锁
  30. AllIdsMapLock sync.Mutex
  31. //对应的id
  32. AllIdsMap map[string]*ID
  33. //采购单位、项目名称、项目编号
  34. mapPb, mapPn, mapPc map[string]*Key
  35. //流程数据 字段相同,直接合并
  36. mapHref map[string]string
  37. mapHrefLock sync.Mutex
  38. //站点
  39. mapSite map[string]*Site
  40. mapSiteLock sync.Mutex
  41. //更新或新增通道
  42. updatePool chan []map[string]interface{}
  43. //savePool chan map[string]interface{}
  44. //saveSign, updateSign chan bool
  45. //表名
  46. coll string
  47. //当前状态是全量还是增量
  48. currentType string //当前是跑全量还是跑增量
  49. //
  50. clearContimes int
  51. //当前时间
  52. currentTime int64
  53. //保存长度
  54. saveSize int
  55. pici int64
  56. validTime int64
  57. // LockPool chan *sync.Mutex
  58. // LockPoolLock sync.Mutex
  59. // m1, m23, m4 map[int]int
  60. // l1, l23, l4 map[int]*sync.Mutex
  61. Brun bool
  62. }
  63. func NewPT() *ProjectTask {
  64. p := &ProjectTask{
  65. InitMinTime: int64(1325347200),
  66. name: "全/增量对象",
  67. thread: 4,
  68. updatePool: make(chan []map[string]interface{}, 5000),
  69. //savePool: make(chan map[string]interface{}, 2000),
  70. wg: sync.WaitGroup{},
  71. AllIdsMap: make(map[string]*ID, 5000000),
  72. mapPb: make(map[string]*Key, 1500000),
  73. mapPn: make(map[string]*Key, 5000000),
  74. mapPc: make(map[string]*Key, 5000000),
  75. mapHref: make(map[string]string, 1500000),
  76. mapSite: make(map[string]*Site, 1000000),
  77. saveSize: 400,
  78. //saveSign: make(chan bool, 1),
  79. //updateSign: make(chan bool, 1),
  80. coll: ProjectColl,
  81. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  82. }
  83. return p
  84. }
  85. var P_QL *ProjectTask
  86. var sp = make(chan bool, 5)
  87. //初始化全量合并对象
  88. func init() {
  89. P_QL = NewPT()
  90. log.Println(len(P_QL.updatePool))
  91. go P_QL.updateAllQueue()
  92. go P_QL.clearMem()
  93. }
  94. func (p *ProjectTask) updateAllQueue() {
  95. arru := make([][]map[string]interface{}, p.saveSize)
  96. indexu := 0
  97. for {
  98. select {
  99. case v := <-p.updatePool:
  100. arru[indexu] = v
  101. indexu++
  102. if indexu == p.saveSize {
  103. sp <- true
  104. go func(arru [][]map[string]interface{}) {
  105. defer func() {
  106. <-sp
  107. }()
  108. MongoTool.UpSertBulk(p.coll, arru...)
  109. }(arru)
  110. arru = make([][]map[string]interface{}, p.saveSize)
  111. indexu = 0
  112. }
  113. case <-time.After(1000 * time.Millisecond):
  114. if indexu > 0 {
  115. sp <- true
  116. go func(arru [][]map[string]interface{}) {
  117. defer func() {
  118. <-sp
  119. }()
  120. MongoTool.UpSertBulk(p.coll, arru...)
  121. }(arru[:indexu])
  122. arru = make([][]map[string]interface{}, p.saveSize)
  123. indexu = 0
  124. }
  125. }
  126. }
  127. }
  128. //项目合并内存更新
  129. func (p *ProjectTask) clearMem() {
  130. c := cron.New()
  131. //在内存中保留最近6个月的信息
  132. //跑全量时每4分钟跑一次,跑增量时400分钟跑一次
  133. _ = c.AddFunc("50 0/15 * * * *", func() {
  134. if p.currentType == "ql" || p.clearContimes >= 60 {
  135. //跳过的次数清零
  136. p.clearContimes = 0
  137. //信息进入查找对比全局锁
  138. p.findLock.Lock()
  139. //defer p.findLock.Unlock()
  140. //合并进行的任务都完成
  141. p.wg.Wait()
  142. //遍历id
  143. //所有内存中的项目信息
  144. p.AllIdsMapLock.Lock()
  145. p.mapHrefLock.Lock()
  146. //清除计数
  147. clearNum := 0
  148. for k, v := range p.AllIdsMap {
  149. if p.currentTime-v.P.LastTime > p.validTime {
  150. log.Println(p.currentTime)
  151. log.Println(k)
  152. clearNum++
  153. //删除id的map
  154. delete(p.AllIdsMap, k)
  155. //删除pb
  156. if v.P.Buyer != "" {
  157. ids := p.mapPb[v.P.Buyer]
  158. if ids != nil {
  159. ids.Lock.Lock()
  160. ids.Arr = deleteSlice(ids.Arr, k)
  161. if len(ids.Arr) == 0 {
  162. delete(p.mapPb, v.P.Buyer)
  163. }
  164. ids.Lock.Unlock()
  165. }
  166. }
  167. //删除mapPn
  168. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  169. if vn != "" {
  170. ids := p.mapPn[vn]
  171. if ids != nil {
  172. ids.Lock.Lock()
  173. ids.Arr = deleteSlice(ids.Arr, k)
  174. if len(ids.Arr) == 0 {
  175. delete(p.mapPn, vn)
  176. }
  177. ids.Lock.Unlock()
  178. }
  179. }
  180. }
  181. //删除mapPc
  182. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  183. if vn != "" {
  184. ids := p.mapPc[vn]
  185. if ids != nil {
  186. ids.Lock.Lock()
  187. ids.Arr = deleteSlice(ids.Arr, k)
  188. if len(ids.Arr) == 0 {
  189. delete(p.mapPc, vn)
  190. }
  191. ids.Lock.Unlock()
  192. }
  193. }
  194. }
  195. for kHref, pid := range p.mapHref{
  196. if pid == k {
  197. delete(p.mapHref, kHref)
  198. }
  199. }
  200. v = nil
  201. }
  202. }
  203. p.mapHrefLock.Unlock()
  204. p.AllIdsMapLock.Unlock()
  205. p.findLock.Unlock()
  206. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
  207. } else {
  208. p.clearContimes++
  209. }
  210. })
  211. c.Start()
  212. select {}
  213. }
  214. //全量合并
  215. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  216. defer util.Catch()
  217. //1、检查pubilshtime索引
  218. db, _ := udpInfo["db"].(string)
  219. if db == "" {
  220. db = MongoTool.DbName
  221. }
  222. coll, _ := udpInfo["coll"].(string)
  223. if coll == "" {
  224. coll = ExtractColl
  225. }
  226. thread := util.IntAllDef(udpInfo["thread"], 4)
  227. if thread > 0 {
  228. p.thread = thread
  229. }
  230. q, _ := udpInfo["query"].(map[string]interface{})
  231. if q == nil {
  232. q = map[string]interface{}{}
  233. lteid, _ := udpInfo["lteid"].(string)
  234. var idmap map[string]interface{}
  235. if len(lteid) > 15 {
  236. idmap = map[string]interface{}{
  237. "$lte": StringTOBsonId(lteid),
  238. }
  239. }
  240. gtid, _ := udpInfo["gtid"].(string)
  241. if len(gtid) > 15 {
  242. if idmap == nil {
  243. idmap = map[string]interface{}{}
  244. }
  245. idmap["$gt"] = StringTOBsonId(gtid)
  246. }
  247. if idmap != nil {
  248. q["_id"] = idmap
  249. }
  250. }
  251. //生成查询语句执行
  252. log.Println("查询语句:", q)
  253. p.enter(db, coll, q)
  254. }
  255. //增量合并
  256. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  257. defer util.Catch()
  258. //1、检查pubilshtime索引
  259. db, _ := udpInfo["db"].(string)
  260. if db == "" {
  261. db = MongoTool.DbName
  262. }
  263. coll, _ := udpInfo["coll"].(string)
  264. if coll == "" {
  265. coll = ExtractColl
  266. }
  267. thread := util.IntAllDef(udpInfo["thread"], 4)
  268. if thread > 0 {
  269. p.thread = thread
  270. }
  271. //开始id和结束id
  272. q, _ := udpInfo["query"].(map[string]interface{})
  273. gtid := udpInfo["gtid"].(string)
  274. lteid := udpInfo["lteid"].(string)
  275. if q == nil {
  276. q = map[string]interface{}{
  277. "_id": map[string]interface{}{
  278. "$gt": StringTOBsonId(gtid),
  279. "$lte": StringTOBsonId(lteid),
  280. },
  281. }
  282. }
  283. if q != nil {
  284. //生成查询语句执行
  285. p.enter(db, coll, q)
  286. }
  287. if udpInfo["stop"] == nil {
  288. for i := 0; i < 5; i++ {
  289. sp <- true
  290. }
  291. for i := 0; i < 5; i++ {
  292. <-sp
  293. }
  294. log.Println("保存完成,生索引", p.pici)
  295. time.Sleep(5 * time.Second)
  296. nextNode(udpInfo, p.pici)
  297. }
  298. }
  299. //招标字段更新
  300. func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
  301. defer util.Catch()
  302. db, _ := udpInfo["db"].(string)
  303. if db == "" {
  304. db = MongoTool.DbName
  305. }
  306. coll, _ := udpInfo["coll"].(string)
  307. if coll == "" {
  308. coll = ExtractColl
  309. }
  310. thread := util.IntAllDef(udpInfo["thread"], 4)
  311. if thread > 0 {
  312. p.thread = thread
  313. }
  314. q, _ := udpInfo["query"].(map[string]interface{})
  315. gtid := udpInfo["gtid"].(string)
  316. lteid := udpInfo["lteid"].(string)
  317. if q == nil {
  318. q = map[string]interface{}{
  319. "_id": map[string]interface{}{
  320. "$gt": StringTOBsonId(gtid),
  321. "$lte": StringTOBsonId(lteid),
  322. },
  323. "is_m": 1,
  324. }
  325. }
  326. log.Println("查询语句:", q)
  327. p.enter(db, coll, q)
  328. }
  329. func StringTOBsonId(id string) primitive.ObjectID {
  330. objectId, _ := primitive.ObjectIDFromHex(id)
  331. return objectId
  332. }
  333. //通知下个节点nextNode
  334. func nextNode(mapInfo map[string]interface{}, pici int64) {
  335. mapInfo["stype"] = "project"
  336. mapInfo["query"] = map[string]interface{}{
  337. "pici": pici,
  338. }
  339. for n, to := range toaddr {
  340. key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
  341. mapInfo["key"] = key
  342. datas, _ := json.Marshal(mapInfo)
  343. node := &udpNode{datas, to, time.Now().Unix(), 0}
  344. udptaskmap.Store(key, node)
  345. _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
  346. }
  347. }
  348. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  349. defer util.Catch()
  350. defer func() {
  351. p.Brun = false
  352. }()
  353. p.Brun = true
  354. count, taskcount := 0, 0
  355. pool := make(chan bool, p.thread)
  356. log.Println("start project", q)
  357. sess := MongoTool.GetMgoConn()
  358. defer MongoTool.DestoryMongoConn(sess)
  359. infoPool := make(chan map[string]interface{}, 2000)
  360. over := make(chan bool)
  361. go func() {
  362. L:
  363. for {
  364. select {
  365. case tmp := <-infoPool:
  366. pool <- true
  367. taskcount++
  368. go func(tmp map[string]interface{}) {
  369. defer func() {
  370. <-pool
  371. }()
  372. if util.IntAll(tmp["repeat"]) == 0 {
  373. p.fillInPlace(tmp)
  374. info := ParseInfo(tmp)
  375. p.currentTime = info.Publishtime
  376. if p.currentType == "updateInfo" {
  377. //招标信息更改合并
  378. p.updateJudge(tmp, info)
  379. }else {
  380. //普通合并
  381. p.CommonMerge(tmp, info)
  382. }
  383. }else {
  384. //信息错误,进行更新
  385. }
  386. }(tmp)
  387. case <-over:
  388. break L
  389. }
  390. }
  391. }()
  392. ms := sess.DB(db).C(coll).Find(q).Sort("publishtime")
  393. //if Sysconfig["hints"] != nil {
  394. // ms.Hint(Sysconfig["hints"])
  395. //}
  396. query := ms.Iter()
  397. //
  398. var lastid interface{}
  399. L:
  400. for {
  401. select {
  402. case <-queryClose:
  403. log.Println("receive interrupt sign")
  404. log.Println("close iter..", lastid, query.Cursor.Close(nil))
  405. queryCloseOver <- true
  406. break L
  407. default:
  408. tmp := make(map[string]interface{})
  409. if query.Next(&tmp) {
  410. lastid = tmp["_id"]
  411. if count%2000 == 0 {
  412. log.Println("current", count, lastid)
  413. }
  414. infoPool <- tmp
  415. count++
  416. } else {
  417. break L
  418. }
  419. }
  420. }
  421. time.Sleep(5 * time.Second)
  422. over <- true
  423. //阻塞
  424. for n := 0; n < p.thread; n++ {
  425. pool <- true
  426. }
  427. log.Println("所有线程执行完成...", count, taskcount)
  428. }
  429. var (
  430. //从标题获取项目编号
  431. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  432. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  433. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  434. //项目编号过滤
  435. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  436. //项目编号只是数字或只是字母4个以下
  437. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  438. //纯数字或纯字母
  439. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  440. )
  441. func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
  442. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  443. if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
  444. if jsonData != nil && jsonData["projecthref"] != nil {
  445. //projectHref字段合并
  446. proHref := jsonData["projecthref"].(string)
  447. tmp["projecthref"] = proHref
  448. p.mapHrefLock.Lock()
  449. defer p.mapHrefLock.Unlock()
  450. p.AllIdsMapLock.Lock()
  451. defer p.AllIdsMapLock.Unlock()
  452. if p.mapHref[proHref] != "" {
  453. pid := p.mapHref[proHref]
  454. comparePro := p.AllIdsMap[pid].P
  455. _, ex := CompareStatus(comparePro, info)
  456. p.UpdateProject(tmp, info, comparePro, -1, "AAAAAAAAAA", ex)
  457. } else {
  458. id, p1 := p.NewProject(tmp, info)
  459. p.mapHref[proHref] = id
  460. p.AllIdsMap[id] = &ID{Id: id, P: p1}
  461. }
  462. }else {
  463. //项目合并
  464. p.startProjectMerge(info, tmp)
  465. }
  466. }else {
  467. //项目合并
  468. p.startProjectMerge(info, tmp)
  469. }
  470. }
  471. }
  472. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  473. bys, _ := json.Marshal(tmp)
  474. var thisinfo *Info
  475. _ = json.Unmarshal(bys, &thisinfo)
  476. if thisinfo == nil {
  477. return nil
  478. }
  479. if len(thisinfo.Topscopeclass) == 0 {
  480. thisinfo.Topscopeclass = []string{}
  481. }
  482. if len(thisinfo.Subscopeclass) == 0 {
  483. thisinfo.Subscopeclass = []string{}
  484. }
  485. if thisinfo.Publishtime == 0 {
  486. thisinfo.Publishtime = thisinfo.Comeintime
  487. }
  488. //从标题中查找项目编号
  489. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  490. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  491. thisinfo.PTC = res[1]
  492. } else {
  493. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  494. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  495. thisinfo.PTC = res[3]
  496. } else {
  497. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  498. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  499. thisinfo.PTC = res[1]
  500. }
  501. }
  502. }
  503. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  504. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  505. if thisinfo.ProjectName != "" {
  506. thisinfo.pnbval++
  507. }
  508. }
  509. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  510. if thisinfo.ProjectCode != "" {
  511. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  512. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  513. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  514. }
  515. } else {
  516. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  517. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  518. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  519. }
  520. }
  521. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  522. thisinfo.pnbval++
  523. }
  524. }
  525. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  526. thisinfo.PTC = ""
  527. }
  528. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  529. thisinfo.pnbval++
  530. } else {
  531. thisinfo.Buyer = ""
  532. }
  533. //winners整理
  534. winner, _ := tmp["winner"].(string)
  535. m1 := map[string]bool{}
  536. winners := []string{}
  537. if winner != "" {
  538. m1[winner] = true
  539. winners = append(winners, winner)
  540. }
  541. packageM, _ := tmp["package"].(map[string]interface{})
  542. if packageM != nil {
  543. thisinfo.HasPackage = true
  544. for _, p := range packageM {
  545. pm, _ := p.(map[string]interface{})
  546. pw, _ := pm["winner"].(string)
  547. if pw != "" && !m1[pw] {
  548. m1[pw] = true
  549. winners = append(winners, pw)
  550. }
  551. }
  552. }
  553. thisinfo.Winners = winners
  554. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  555. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  556. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  557. return thisinfo
  558. }
  559. func (p *ProjectTask) updateJudge(tmp map[string]interface{}, info *Info) {
  560. index := -1
  561. pInfoId := ""
  562. p.AllIdsMapLock.Lock()
  563. F:
  564. for k, ID := range p.AllIdsMap {
  565. for i, id := range ID.P.Ids{
  566. if info.Id == id {
  567. pInfoId = k
  568. index = i
  569. break F
  570. }
  571. }
  572. }
  573. p.AllIdsMapLock.Unlock()
  574. //未找到招标信息
  575. if index == -1 {
  576. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  577. p.currentTime = info.Publishtime
  578. p.startProjectMerge(info, tmp)
  579. }
  580. }else {
  581. tmpPro := MongoTool.FindById(ProjectColl, pInfoId)
  582. infoList := []interface{}(tmpPro["list"].(primitive.A))
  583. infoMap := infoList[index].(map[string]interface{})
  584. modifyMap, f := modifyEle(infoMap, tmp)
  585. //projecthref字段
  586. jsonData := tmp["jsondata"].(map[string]interface{})
  587. if jsonData != nil && jsonData["projecthref"] != nil {
  588. proHref := jsonData["projecthref"].(string)
  589. tmp["projecthref"] = proHref
  590. p.mapHrefLock.Lock()
  591. defer p.mapHrefLock.Unlock()
  592. pid := p.mapHref[proHref]
  593. if pid == pInfoId {
  594. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  595. return
  596. }
  597. }
  598. if f {
  599. //合并、修改
  600. log.Println("合并修改更新", "----------------------------")
  601. p.mergeAndModify(pInfoId, index, info, tmp, tmpPro, modifyMap)
  602. }else {
  603. //修改
  604. log.Println("修改更新", "----------------------------")
  605. p.modifyUpdate(pInfoId, index, info, tmp, tmpPro, modifyMap)
  606. }
  607. }
  608. }
  609. var Elements = []string{
  610. "projectname",
  611. "projectcode",
  612. "agency",
  613. "budget",
  614. "bidamount",
  615. "buyerperson",
  616. "area",
  617. "city",
  618. "publishtime",
  619. }
  620. /**
  621. 判断修改的字段是否是影响合并流程的要素字段
  622. */
  623. func modifyEle(tmpPro map[string]interface{}, tmp map[string]interface{}) (map[string]interface{}, bool) {
  624. modifyMap := map[string]interface{}{}
  625. for k, _ := range tmpPro {
  626. for k1, _ := range tmp {
  627. if k == k1 && tmpPro[k] != tmp[k1] {
  628. modifyMap[k] = tmp[k1]
  629. break
  630. }
  631. }
  632. }
  633. for k, _ := range modifyMap {
  634. for _, str := range Elements{
  635. if k == str {
  636. return modifyMap, true
  637. }
  638. }
  639. }
  640. delete(modifyMap, "_id")
  641. return modifyMap, false
  642. }
  643. //补全位置信息
  644. func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
  645. area := tmp["area"].(string)
  646. city := tmp["city"].(string)
  647. district := tmp["district"].(string)
  648. if area != "" && city != "" && district != "" {
  649. return
  650. }
  651. p.mapSiteLock.Lock()
  652. defer p.mapSiteLock.Unlock()
  653. tmpSite := tmp["site"].(string)
  654. site := p.mapSite[tmpSite]
  655. if site != nil {
  656. if area != "" {
  657. if area == "全国" {
  658. tmp["area"] = site.Area
  659. tmp["city"] = site.City
  660. tmp["district"] = site.District
  661. return
  662. }
  663. if area != site.Area {
  664. return
  665. }else {
  666. if city == site.City {
  667. if district == "" {
  668. tmp["district"] = site.District
  669. return
  670. }
  671. }else if city == "" {
  672. tmp["city"] = site.City
  673. tmp["district"] = site.District
  674. return
  675. }
  676. }
  677. }else {
  678. tmp["area"] = site.Area
  679. tmp["city"] = site.City
  680. tmp["district"] = site.District
  681. return
  682. }
  683. }
  684. }
  685. //从数组中删除元素
  686. func deleteSlice(arr []string, v string) []string {
  687. for k, v1 := range arr {
  688. if v1 == v {
  689. return append(arr[:k], arr[k+1:]...)
  690. }
  691. }
  692. return arr
  693. }