main.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "os"
  13. "qfw/util"
  14. "regexp"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. Sysconfig map[string]interface{} //配置文件
  20. mconf map[string]interface{} //mongodb配置信息
  21. mgo *MongodbSim //mongodb操作对象
  22. extract string
  23. udpclient mu.UdpClient //udp对象
  24. nextNode []map[string]interface{} //下节点数组
  25. dupdays = 5 //初始化判重范围
  26. DM *datamap //
  27. HM *historymap //判重数据
  28. lastid = ""
  29. //正则筛选相关
  30. FilterRegTitle = regexp.MustCompile("^_$")
  31. FilterRegTitle_1 = regexp.MustCompile("^_$")
  32. FilterRegTitle_2 = regexp.MustCompile("^_$")
  33. isMerger bool //是否合并
  34. Is_Sort bool //是否排序
  35. threadNum int //线程数量
  36. SiteMap map[string]map[string]interface{} //站点map
  37. idtype, sid, eid string //测试人员判重使用
  38. )
  39. func init() {
  40. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  41. flag.StringVar(&sid, "sid", "", "开始id")
  42. flag.StringVar(&eid, "eid", "", "结束id")
  43. flag.StringVar(&idtype, "idtype", "", "id类型,默认ObjectId:0,String:1")
  44. flag.Parse()
  45. //172.17.145.163:27080
  46. util.ReadConfig(&Sysconfig)
  47. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  48. mconf = Sysconfig["mongodb"].(map[string]interface{})
  49. mgo = &MongodbSim{
  50. MongodbAddr: mconf["addr"].(string),
  51. DbName: mconf["db"].(string),
  52. Size: util.IntAllDef(mconf["pool"], 10),
  53. }
  54. mgo.InitPool()
  55. extract = mconf["extract"].(string)
  56. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  57. //加载数据
  58. DM = NewDatamap(dupdays, lastid)
  59. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  60. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  61. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  62. isMerger = Sysconfig["isMerger"].(bool)
  63. Is_Sort = Sysconfig["isSort"].(bool)
  64. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  65. //站点配置
  66. site := mconf["site"].(map[string]interface{})
  67. SiteMap = make(map[string]map[string]interface{}, 0)
  68. start := int(time.Now().Unix())
  69. sess_site := mgo.GetMgoConn()
  70. defer mgo.DestoryMongoConn(sess_site)
  71. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  72. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  73. data_map := map[string]interface{}{
  74. "area": util.ObjToString(site_dict["area"]),
  75. "city": util.ObjToString(site_dict["city"]),
  76. "district": util.ObjToString(site_dict["district"]),
  77. "sitetype": util.ObjToString(site_dict["sitetype"]),
  78. "level": util.ObjToString(site_dict["level"]),
  79. }
  80. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  81. }
  82. log.Printf("站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  83. }
  84. func main() {
  85. go checkMapJob()
  86. updport := Sysconfig["udpport"].(string)
  87. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  88. udpclient.Listen(processUdpMsg)
  89. log.Println("Udp服务监听", updport)
  90. time.Sleep(99999 * time.Hour)
  91. }
  92. //测试组人员使用
  93. func mainT() {
  94. /*
  95. ObjectId("5da3f31aa5cb26b9b798d3aa")
  96. ObjectId("5da418c4a5cb26b9b7e3e9a6")
  97. ObjectId("5df5071ce9d1f601e495fa54")
  98. ObjectId("5e09c05f0cf41612e0626abc")
  99. */
  100. log.Println("测试开始")
  101. sid = "5da3f31aa5cb26b9b798d3aa"
  102. eid = "5da418c4a5cb26b9b7e3e9a6"
  103. mapinfo := map[string]interface{}{}
  104. if sid == "" || eid == "" {
  105. log.Println("sid,eid参数不能为空")
  106. os.Exit(0)
  107. }
  108. mapinfo["gtid"] = sid
  109. mapinfo["lteid"] = eid
  110. mapinfo["stop"] = "true"
  111. task([]byte{}, mapinfo)
  112. time.Sleep(10 * time.Second)
  113. }
  114. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  115. fmt.Println("接受的段数据")
  116. switch act {
  117. case mu.OP_TYPE_DATA: //上个节点的数据
  118. //从表中开始处理
  119. var mapInfo map[string]interface{}
  120. err := json.Unmarshal(data, &mapInfo)
  121. log.Println("err:", err, "mapInfo:", mapInfo)
  122. if err != nil {
  123. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  124. } else if mapInfo != nil {
  125. taskType := util.ObjToString(mapInfo["stype"])
  126. if taskType == "historyTask" {
  127. //更新流程
  128. go historyTask(data, mapInfo)
  129. } else if taskType == "normalTask" {
  130. //判重流程
  131. go task(data, mapInfo)
  132. } else {
  133. //其他
  134. go task(data, mapInfo)
  135. }
  136. key, _ := mapInfo["key"].(string)
  137. if key == "" {
  138. key = "udpok"
  139. }
  140. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  141. }
  142. case mu.OP_NOOP: //下个节点回应
  143. ok := string(data)
  144. if ok != "" {
  145. log.Println("ok:", ok)
  146. udptaskmap.Delete(ok)
  147. }
  148. }
  149. }
  150. //开始判重程序
  151. func task(data []byte, mapInfo map[string]interface{}) {
  152. fmt.Println("开始数据判重")
  153. defer util.Catch()
  154. //区间id
  155. q := map[string]interface{}{}
  156. if idtype == "1" {
  157. q = map[string]interface{}{
  158. "_id": map[string]interface{}{
  159. "$gt": mapInfo["gtid"].(string),
  160. "$lte": mapInfo["lteid"].(string),
  161. },
  162. }
  163. } else {
  164. q = map[string]interface{}{
  165. "_id": map[string]interface{}{
  166. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  167. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  168. },
  169. }
  170. }
  171. log.Println(mgo.DbName, extract, q)
  172. sess := mgo.GetMgoConn()
  173. defer mgo.DestoryMongoConn(sess)
  174. //是否排序
  175. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  176. if Is_Sort {
  177. it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  178. }
  179. //it = sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  180. updateExtract := [][]map[string]interface{}{}
  181. log.Println("线程数:", threadNum)
  182. pool := make(chan bool, threadNum)
  183. wg := &sync.WaitGroup{}
  184. //mapLock := &sync.Mutex{}
  185. n, repeateN := 0, 0
  186. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  187. if n%10000 == 0 {
  188. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  189. }
  190. pool <- true
  191. wg.Add(1)
  192. go func(tmp map[string]interface{}) {
  193. defer func() {
  194. <-pool
  195. wg.Done()
  196. }()
  197. info := NewInfo(tmp)
  198. //是否为无效数据
  199. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  200. updateExtract = append(updateExtract, []map[string]interface{}{
  201. map[string]interface{}{
  202. "_id": tmp["_id"],
  203. },
  204. map[string]interface{}{
  205. "$set": map[string]interface{}{
  206. "repeat": -1,
  207. },
  208. },
  209. })
  210. if len(updateExtract) > 500 {
  211. mgo.UpSertBulk(extract, updateExtract...)
  212. updateExtract = [][]map[string]interface{}{}
  213. }
  214. } else {
  215. b, source, reason := DM.check(info)
  216. if b { //有重复,生成更新语句,更新抽取和更新招标
  217. repeateN++
  218. var is_replace = false
  219. var mergeArr = []int64{} //更改合并数组记录
  220. var newData = &Info{} //更换新的数据池数据
  221. var repeat_idMap = map[string]interface{}{} //记录判重的
  222. var merge_idMap = map[string]interface{}{} //记录合并的
  223. if idtype == "1" { //先临时决定一个id
  224. repeat_idMap["_id"] = info.id
  225. merge_idMap["_id"] = source.id
  226. } else {
  227. repeat_idMap["_id"] = StringTOBsonId(info.id)
  228. merge_idMap["_id"] = StringTOBsonId(source.id)
  229. }
  230. repeat_id := source.id
  231. //以下合并相关
  232. if isMerger {
  233. basic_bool := basicDataScore(source, info)
  234. if basic_bool {
  235. //已原始数据为标准 - 对比数据打判重标签-
  236. newData, mergeArr, is_replace = mergeDataFields(source, info)
  237. DM.replaceSourceData(newData, source.id) //替换
  238. //对比数据打重复标签的id,原始数据id的记录
  239. if idtype == "1" {
  240. repeat_idMap["_id"] = info.id
  241. merge_idMap["_id"] = source.id
  242. } else {
  243. repeat_idMap["_id"] = StringTOBsonId(info.id)
  244. merge_idMap["_id"] = StringTOBsonId(source.id)
  245. }
  246. repeat_id = source.id
  247. } else {
  248. //已对比数据为标准 ,数据池的数据打判重标签
  249. newData, mergeArr, is_replace = mergeDataFields(info, source)
  250. DM.replaceSourceData(newData, source.id) //替换
  251. //原始数据打重复标签的id, 对比数据id的记录
  252. if idtype == "1" {
  253. repeat_idMap["_id"] = source.id
  254. merge_idMap["_id"] = info.id
  255. } else {
  256. repeat_idMap["_id"] = StringTOBsonId(source.id)
  257. merge_idMap["_id"] = StringTOBsonId(info.id)
  258. }
  259. repeat_id = info.id
  260. }
  261. merge_map := make(map[string]interface{}, 0)
  262. if is_replace { //有过合并-更新数据
  263. merge_map = map[string]interface{}{
  264. "$set": map[string]interface{}{
  265. "merge": newData.mergemap,
  266. },
  267. }
  268. //更新合并后的数据
  269. for _, value := range mergeArr {
  270. if value == 0 {
  271. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  272. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  273. } else if value == 1 {
  274. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  275. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  276. } else if value == 2 {
  277. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  278. } else if value == 3 {
  279. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  280. } else if value == 4 {
  281. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  282. } else if value == 5 {
  283. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  284. } else if value == 6 {
  285. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  286. } else if value == 7 {
  287. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  288. } else if value == 8 {
  289. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  290. } else if value == 9 {
  291. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  292. } else if value == 10 {
  293. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  294. } else if value == 11 {
  295. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  296. } else {
  297. }
  298. }
  299. //模板数据更新
  300. updateExtract = append(updateExtract, []map[string]interface{}{
  301. merge_idMap,
  302. merge_map,
  303. })
  304. }
  305. }
  306. //重复数据打标签
  307. updateExtract = append(updateExtract, []map[string]interface{}{
  308. repeat_idMap,
  309. map[string]interface{}{
  310. "$set": map[string]interface{}{
  311. "repeat": 1,
  312. "repeat_reason": reason,
  313. "repeat_id": repeat_id,
  314. },
  315. },
  316. })
  317. }
  318. }
  319. }(tmp)
  320. if len(updateExtract) > 500 {
  321. mgo.UpSertBulk(extract, updateExtract...)
  322. updateExtract = [][]map[string]interface{}{}
  323. }
  324. tmp = make(map[string]interface{})
  325. }
  326. wg.Wait()
  327. if len(updateExtract) > 0 {
  328. mgo.UpSertBulk(extract, updateExtract...)
  329. //mgo.UpdateBulk(bidding, updateBidding...)
  330. }
  331. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  332. //任务完成,开始发送广播通知下面节点
  333. if n > repeateN && mapInfo["stop"] == nil {
  334. for _, to := range nextNode {
  335. sid, _ := mapInfo["gtid"].(string)
  336. eid, _ := mapInfo["lteid"].(string)
  337. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  338. by, _ := json.Marshal(map[string]interface{}{
  339. "gtid": sid,
  340. "lteid": eid,
  341. "stype": util.ObjToString(to["stype"]),
  342. "key": key,
  343. })
  344. addr := &net.UDPAddr{
  345. IP: net.ParseIP(to["addr"].(string)),
  346. Port: util.IntAll(to["port"]),
  347. }
  348. node := &udpNode{by, addr, time.Now().Unix(), 0}
  349. udptaskmap.Store(key, node)
  350. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  351. }
  352. }
  353. }
  354. //支持历史更新
  355. func historyTask(data []byte, mapInfo map[string]interface{}) {
  356. fmt.Println("开始取历史时间段")
  357. defer util.Catch()
  358. sess := mgo.GetMgoConn()
  359. defer mgo.DestoryMongoConn(sess)
  360. var q map[string]interface{}
  361. if idtype == "1" {
  362. q = map[string]interface{}{
  363. "_id": map[string]interface{}{
  364. "$gt": mapInfo["gtid"].(string),
  365. "$lte": mapInfo["lteid"].(string),
  366. },
  367. }
  368. } else {
  369. q = map[string]interface{}{
  370. "_id": map[string]interface{}{
  371. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  372. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  373. },
  374. }
  375. }
  376. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  377. minTime, maxTime := int64(0), int64(0)
  378. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  379. //取出最大最小时间
  380. info_time:=tmp["comeintime"]
  381. if Is_Sort {
  382. info_time = tmp["publishtime"]
  383. }
  384. if minTime == 0 || maxTime == 0 && util.Int64All(info_time) != 0 {
  385. minTime = util.Int64All(info_time)
  386. maxTime = util.Int64All(info_time)
  387. } else {
  388. t := util.Int64All(info_time)
  389. if t < minTime && t != 0 {
  390. minTime = t
  391. }
  392. if t > maxTime && t != 0 {
  393. maxTime = t
  394. }
  395. }
  396. }
  397. //时间不正确时
  398. if minTime == 0 && maxTime == 0 {
  399. log.Println("段数据区间 不符合")
  400. return
  401. }
  402. fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
  403. gtid, lteid := util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
  404. fmt.Println(gtid, lteid)
  405. HM = NewHistorymap(gtid, lteid, minTime, maxTime)
  406. fmt.Println("开始历史数据判重")
  407. defer util.Catch()
  408. //区间id
  409. sess_history := mgo.GetMgoConn()
  410. defer mgo.DestoryMongoConn(sess_history)
  411. var q_history map[string]interface{}
  412. if idtype == "1" {
  413. q_history = map[string]interface{}{
  414. "_id": map[string]interface{}{
  415. "$gt": mapInfo["gtid"].(string),
  416. "$lte": mapInfo["lteid"].(string),
  417. },
  418. }
  419. } else {
  420. q_history = map[string]interface{}{
  421. "_id": map[string]interface{}{
  422. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  423. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  424. },
  425. }
  426. }
  427. log.Println(mgo.DbName, extract, q_history)
  428. //是否排序
  429. it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Iter()
  430. if Is_Sort {
  431. it_history = sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
  432. }
  433. updateExtract := [][]map[string]interface{}{}
  434. log.Println("线程数:", threadNum)
  435. pool := make(chan bool, threadNum)
  436. wg := &sync.WaitGroup{}
  437. //mapLock := &sync.Mutex{}
  438. n, repeateN := 0, 0
  439. for tmp := make(map[string]interface{}); it_history.Next(&tmp); n++ {
  440. if n%10000 == 0 {
  441. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  442. }
  443. pool <- true
  444. wg.Add(1)
  445. go func(tmp map[string]interface{}) {
  446. defer func() {
  447. <-pool
  448. wg.Done()
  449. }()
  450. info := NewInfo(tmp)
  451. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  452. updateExtract = append(updateExtract, []map[string]interface{}{
  453. map[string]interface{}{
  454. "_id": tmp["_id"],
  455. },
  456. map[string]interface{}{
  457. "$set": map[string]interface{}{
  458. "repeat": -1,
  459. },
  460. },
  461. })
  462. if len(updateExtract) > 500 {
  463. mgo.UpSertBulk(extract, updateExtract...)
  464. updateExtract = [][]map[string]interface{}{}
  465. }
  466. } else {
  467. b, source, reason := HM.checkHistory(info)
  468. if b { //有重复,生成更新语句,更新抽取和更新招标
  469. if reason == "未判重记录" {
  470. fmt.Println("未判重记录")
  471. //把info的数据判重的标签更换,并新增字段
  472. DM.replaceSourceData(info, info.id) //替换即添加
  473. updateExtract = append(updateExtract, []map[string]interface{}{
  474. map[string]interface{}{
  475. "_id": tmp["_id"],
  476. },
  477. map[string]interface{}{
  478. "$set": map[string]interface{}{
  479. "repeat": 0,
  480. "repeatid": -2,
  481. },
  482. },
  483. })
  484. } else {
  485. repeateN++
  486. var is_replace = false
  487. var mergeArr = []int64{} //更改合并数组记录
  488. var newData = &Info{} //更换新的数据池数据
  489. var repeat_idMap = map[string]interface{}{} //记录判重的
  490. var merge_idMap = map[string]interface{}{} //记录合并的
  491. if idtype == "1" { //先临时决定一个id
  492. repeat_idMap["_id"] = info.id
  493. merge_idMap["_id"] = source.id
  494. } else {
  495. repeat_idMap["_id"] = StringTOBsonId(info.id)
  496. merge_idMap["_id"] = StringTOBsonId(source.id)
  497. }
  498. repeat_id := source.id
  499. //以下合并相关
  500. if isMerger {
  501. basic_bool := basicDataScore(source, info)
  502. if basic_bool {
  503. //已原始数据为标准 - 对比数据打判重标签-
  504. newData, mergeArr, is_replace = mergeDataFields(source, info)
  505. DM.replaceSourceData(newData, source.id) //替换
  506. //对比数据打重复标签的id,原始数据id的记录
  507. if idtype == "1" {
  508. repeat_idMap["_id"] = info.id
  509. merge_idMap["_id"] = source.id
  510. } else {
  511. repeat_idMap["_id"] = StringTOBsonId(info.id)
  512. merge_idMap["_id"] = StringTOBsonId(source.id)
  513. }
  514. repeat_id = source.id
  515. } else {
  516. //已对比数据为标准 ,数据池的数据打判重标签
  517. newData, mergeArr, is_replace = mergeDataFields(info, source)
  518. DM.replaceSourceData(newData, source.id) //替换
  519. //原始数据打重复标签的id, 对比数据id的记录
  520. if idtype == "1" {
  521. repeat_idMap["_id"] = source.id
  522. merge_idMap["_id"] = info.id
  523. } else {
  524. repeat_idMap["_id"] = StringTOBsonId(source.id)
  525. merge_idMap["_id"] = StringTOBsonId(info.id)
  526. }
  527. repeat_id = info.id
  528. }
  529. merge_map := make(map[string]interface{}, 0)
  530. if is_replace { //有过合并-更新数据
  531. merge_map = map[string]interface{}{
  532. "$set": map[string]interface{}{
  533. "merge": newData.mergemap,
  534. },
  535. }
  536. //更新合并后的数据
  537. for _, value := range mergeArr {
  538. if value == 0 {
  539. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  540. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  541. } else if value == 1 {
  542. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  543. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  544. } else if value == 2 {
  545. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  546. } else if value == 3 {
  547. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  548. } else if value == 4 {
  549. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  550. } else if value == 5 {
  551. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  552. } else if value == 6 {
  553. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  554. } else if value == 7 {
  555. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  556. } else if value == 8 {
  557. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  558. } else if value == 9 {
  559. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  560. } else if value == 10 {
  561. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  562. } else if value == 11 {
  563. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  564. } else {
  565. }
  566. }
  567. //模板数据更新
  568. updateExtract = append(updateExtract, []map[string]interface{}{
  569. merge_idMap,
  570. merge_map,
  571. })
  572. }
  573. }
  574. //重复数据打标签
  575. updateExtract = append(updateExtract, []map[string]interface{}{
  576. repeat_idMap,
  577. map[string]interface{}{
  578. "$set": map[string]interface{}{
  579. "repeat": 1,
  580. "repeat_reason": reason,
  581. "repeat_id": repeat_id,
  582. },
  583. },
  584. })
  585. }
  586. }
  587. }
  588. }(tmp)
  589. if len(updateExtract) > 500 {
  590. mgo.UpSertBulk(extract, updateExtract...)
  591. updateExtract = [][]map[string]interface{}{}
  592. }
  593. tmp = make(map[string]interface{})
  594. }
  595. wg.Wait()
  596. if len(updateExtract) > 0 {
  597. mgo.UpSertBulk(extract, updateExtract...)
  598. //mgo.UpdateBulk(bidding, updateBidding...)
  599. }
  600. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  601. //任务完成,开始发送广播通知下面节点
  602. if n > repeateN && mapInfo["stop"] == nil {
  603. for _, to := range nextNode {
  604. sid, _ := mapInfo["gtid"].(string)
  605. eid, _ := mapInfo["lteid"].(string)
  606. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  607. by, _ := json.Marshal(map[string]interface{}{
  608. "gtid": sid,
  609. "lteid": eid,
  610. "stype": util.ObjToString(to["stype"]),
  611. "key": key,
  612. })
  613. addr := &net.UDPAddr{
  614. IP: net.ParseIP(to["addr"].(string)),
  615. Port: util.IntAll(to["port"]),
  616. }
  617. node := &udpNode{by, addr, time.Now().Unix(), 0}
  618. udptaskmap.Store(key, node)
  619. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  620. }
  621. }
  622. }
  623. //合并字段-并更新merge字段的值
  624. func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
  625. merge_recordMap := make(map[string]interface{}, 0)
  626. mergeArr := make([]int64, 0)
  627. //是否替换数据了-记录原始的数据
  628. is_replace := false
  629. //1、城市
  630. if source.area == "" || source.area == "全国" {
  631. //为空
  632. if info.area != "全国" && info.area != "" {
  633. merge_recordMap["area"] = source.area
  634. merge_recordMap["city"] = source.city
  635. source.area = info.area
  636. source.city = info.city
  637. mergeArr = append(mergeArr, 1)
  638. is_replace = true
  639. }
  640. } else {
  641. //不为空-查看站点相关-有值必替换
  642. if source.is_site {
  643. //是站点替换的城市
  644. merge_recordMap["site_area"] = source.area
  645. merge_recordMap["site_city"] = source.city
  646. mergeArr = append(mergeArr, 0)
  647. is_replace = true
  648. source.is_site = false
  649. }
  650. }
  651. //2、项目名称
  652. if source.projectname == "" && info.projectname != "" {
  653. merge_recordMap["projectname"] = source.projectname
  654. source.projectname = info.projectname
  655. mergeArr = append(mergeArr, 2)
  656. is_replace = true
  657. }
  658. //3、项目编号
  659. if source.projectcode == "" && info.projectcode != "" {
  660. merge_recordMap["projectcode"] = source.projectcode
  661. source.projectcode = info.projectcode
  662. mergeArr = append(mergeArr, 3)
  663. is_replace = true
  664. }
  665. //4、采购单位
  666. if source.buyer == "" && info.buyer != "" {
  667. merge_recordMap["buyer"] = source.buyer
  668. source.buyer = info.buyer
  669. mergeArr = append(mergeArr, 4)
  670. is_replace = true
  671. }
  672. //5、预算
  673. if source.budget == 0 && info.budget != 0 {
  674. merge_recordMap["budget"] = source.budget
  675. source.budget = info.budget
  676. mergeArr = append(mergeArr, 5)
  677. is_replace = true
  678. }
  679. //6、中标单位
  680. if source.winner == "" && info.winner != "" {
  681. merge_recordMap["winner"] = source.winner
  682. source.winner = info.winner
  683. mergeArr = append(mergeArr, 6)
  684. is_replace = true
  685. }
  686. //7、中标金额
  687. if source.bidamount == 0 && info.bidamount != 0 {
  688. merge_recordMap["bidamount"] = source.bidamount
  689. source.bidamount = info.bidamount
  690. mergeArr = append(mergeArr, 7)
  691. is_replace = true
  692. }
  693. //8、开标时间-地点
  694. if source.bidopentime == 0 && info.bidopentime != 0 {
  695. merge_recordMap["bidopentime"] = source.bidopentime
  696. source.bidopentime = info.bidopentime
  697. mergeArr = append(mergeArr, 8)
  698. is_replace = true
  699. }
  700. //9、合同编号
  701. if source.contractnumber == "" && info.contractnumber != "" {
  702. merge_recordMap["contractnumber"] = source.contractnumber
  703. source.contractnumber = info.contractnumber
  704. mergeArr = append(mergeArr, 9)
  705. is_replace = true
  706. }
  707. //10、发布时间
  708. if source.publishtime == 0 && info.publishtime != 0 {
  709. merge_recordMap["publishtime"] = source.publishtime
  710. source.publishtime = info.publishtime
  711. mergeArr = append(mergeArr, 10)
  712. is_replace = true
  713. }
  714. //11、代理机构
  715. if source.agency == "" && info.agency != "" {
  716. merge_recordMap["agency"] = source.agency
  717. source.agency = info.agency
  718. mergeArr = append(mergeArr, 11)
  719. is_replace = true
  720. }
  721. if is_replace { //有过替换更新
  722. //总次数+1
  723. source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
  724. merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
  725. //和哪一个数据id进行非空替换的-记录
  726. key := info.id
  727. source.mergemap[key] = merge_recordMap
  728. }
  729. //以上合并过于简单,待进一步优化
  730. return source, mergeArr, is_replace
  731. }
  732. //权重评估
  733. func basicDataScore(v *Info, info *Info) bool {
  734. /*
  735. 权重评估
  736. 网站优先级判定规则:
  737. 1、中央>省>市>县区
  738. 2、政府采购>公共资源>采购单位官网>招标代理公司/平台
  739. */
  740. v_score, info_score := -1, -1
  741. dict_v := SiteMap[v.site]
  742. dict_info := SiteMap[info.site]
  743. //先判断level
  744. if dict_v != nil {
  745. v_level := util.ObjToString(dict_v["level"])
  746. if v_level == "中央" {
  747. v_score = 4
  748. } else if v_level == "省级" {
  749. v_score = 3
  750. } else if v_level == "市级" {
  751. v_score = 2
  752. } else if v_level == "县区" {
  753. v_score = 1
  754. } else if v_level == "" {
  755. } else {
  756. v_score = 0
  757. }
  758. }
  759. if dict_info != nil {
  760. info_level := util.ObjToString(dict_info["level"])
  761. if info_level == "中央" {
  762. info_score = 4
  763. } else if info_level == "省级" {
  764. info_score = 3
  765. } else if info_level == "市级" {
  766. info_score = 2
  767. } else if info_level == "县区" {
  768. info_score = 1
  769. } else if info_level == "" {
  770. } else {
  771. v_score = 0
  772. }
  773. }
  774. if v_score > info_score {
  775. return true
  776. }
  777. if v_score < info_score {
  778. return false
  779. }
  780. //判断sitetype
  781. if dict_v != nil {
  782. v_sitetype := util.ObjToString(dict_v["sitetype"])
  783. if v_sitetype == "政府采购" || v_sitetype == "政府门户" {
  784. v_score = 4
  785. } else if v_sitetype == "公共资源" {
  786. v_score = 3
  787. } else if v_sitetype == "官方网站" {
  788. v_score = 2
  789. } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
  790. v_score = 1
  791. } else if v_sitetype == "" {
  792. } else {
  793. v_score = 0
  794. }
  795. }
  796. if dict_info != nil {
  797. info_sitetype := util.ObjToString(dict_info["sitetype"])
  798. if info_sitetype == "政府采购" || info_sitetype == "政府门户" {
  799. info_score = 4
  800. } else if info_sitetype == "公共资源" {
  801. info_score = 3
  802. } else if info_sitetype == "官方网站" {
  803. info_score = 2
  804. } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
  805. info_score = 1
  806. } else if info_sitetype == "" {
  807. } else {
  808. info_score = 0
  809. }
  810. }
  811. if v_score > info_score {
  812. return true
  813. }
  814. if v_score < info_score {
  815. return false
  816. }
  817. //网站评估
  818. m, n := 0, 0
  819. if v.projectname != "" {
  820. m++
  821. }
  822. if v.buyer != "" {
  823. m++
  824. }
  825. if v.projectcode != "" || v.contractnumber != "" {
  826. m++
  827. }
  828. if v.budget != 0 {
  829. m++
  830. }
  831. if v.bidamount != 0 {
  832. m++
  833. }
  834. if v.winner != "" {
  835. m++
  836. }
  837. if v.bidopentime != 0 {
  838. m++
  839. }
  840. if v.agencyaddr != "" {
  841. m++
  842. }
  843. if v.agency != "" {
  844. m = m + 2
  845. }
  846. if v.city != "" {
  847. m = m + 2
  848. }
  849. if info.projectname != "" {
  850. n++
  851. }
  852. if info.buyer != "" {
  853. n++
  854. }
  855. if info.projectcode != "" || info.contractnumber != "" {
  856. n++
  857. }
  858. if info.budget != 0 {
  859. n++
  860. }
  861. if info.bidamount != 0 {
  862. n++
  863. }
  864. if info.winner != "" {
  865. n++
  866. }
  867. if info.bidopentime != 0 {
  868. n++
  869. }
  870. if info.agencyaddr != "" {
  871. n++
  872. }
  873. if info.agency != "" {
  874. n = n + 2
  875. }
  876. if info.city != "" {
  877. n = n + 2
  878. }
  879. if m > n {
  880. return true
  881. } else if m == n {
  882. if v.publishtime >= info.publishtime {
  883. return true
  884. } else {
  885. return false
  886. }
  887. } else {
  888. return false
  889. }
  890. }
  891. //无效数据
  892. func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
  893. var n int
  894. if d1 != "" {
  895. n++
  896. }
  897. if d2 != "" {
  898. n++
  899. }
  900. if d3 != "" {
  901. n++
  902. }
  903. if d4 != "" {
  904. n++
  905. }
  906. if n == 0 {
  907. return true
  908. }
  909. return false
  910. }