main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "fmt"
  8. "gopkg.in/mgo.v2/bson"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "qfw/util"
  13. "qfw/util/mongodb"
  14. "regexp"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. Sysconfig map[string]interface{} //配置文件
  20. mconf map[string]interface{} //mongodb配置信息
  21. mgo *mongodb.MongodbSim //mongodb操作对象
  22. siteMgo *mongodb.MongodbSim
  23. //mgoTest *mongodb.MongodbSim //mongodb操作对象
  24. extract string
  25. extract_copy string
  26. bidding string
  27. udpclient mu.UdpClient //udp对象
  28. nextNode []map[string]interface{} //下节点数组
  29. dupdays = 5 //初始化判重范围
  30. DM *datamap //
  31. HM *historymap //判重数据
  32. lastid = "5d767728a5cb26b9b7748868"
  33. //ObjectId("5d767728a5cb26b9b7748868")
  34. //正则筛选相关
  35. FilterRegTitle = regexp.MustCompile("^_$")
  36. FilterRegTitle_1 = regexp.MustCompile("^_$")
  37. FilterRegTitle_2 = regexp.MustCompile("^_$")
  38. SiteMap map[string]interface{} //站点map
  39. )
  40. func init() {
  41. //flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  42. //flag.Parse()
  43. //172.17.145.163:27080
  44. util.ReadConfig(&Sysconfig)
  45. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  46. mconf = Sysconfig["mongodb"].(map[string]interface{})
  47. mgo = &mongodb.MongodbSim{
  48. MongodbAddr: mconf["addr"].(string),
  49. DbName: mconf["db"].(string),
  50. Size: util.IntAllDef(mconf["pool"], 10),
  51. }
  52. extract = mconf["extract"].(string)
  53. extract_copy = mconf["extract_copy"].(string)
  54. //bidding = mconf["bidding"].(string)
  55. mgo.InitPool()
  56. //测试需临时注释
  57. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  58. //加载数据
  59. DM = NewDatamap(dupdays, lastid)
  60. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  61. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  62. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  63. //站点相关数据库
  64. mongodb.InitMongodbPool(5, "192.168.3.207:27082", "")
  65. siteMgo = &mongodb.MongodbSim{
  66. MongodbAddr: "192.168.3.207:27082",
  67. Size: 5,
  68. DbName: "zhaolongyue",
  69. }
  70. siteMgo.InitPool()
  71. SiteMap = make(map[string]interface{},0)
  72. start := int(time.Now().Unix())
  73. //站点配置
  74. sess_site := siteMgo.GetMgoConn()
  75. defer sess_site.Close()
  76. res_site := sess_site.DB("zhaolongyue").C("site").Find(nil).Sort("_id").Iter()
  77. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  78. data_map := map[string]string{
  79. "area":util.ObjToString(site_dict["area"]),
  80. "city":util.ObjToString(site_dict["city"]),
  81. "district":util.ObjToString(site_dict["district"]),
  82. }
  83. SiteMap[site_dict["site"].(string)]= data_map
  84. }
  85. fmt.Printf("用时:%d秒,%d个",int(time.Now().Unix())-start,len(SiteMap))
  86. }
  87. //新增一个方法 判断
  88. func mainTest() {
  89. //log.Println("1")
  90. //代码copy数据
  91. //sessTest :=mgoTest.GetMgoConn()
  92. //defer sessTest.Close()
  93. //
  94. //sess := mgo.GetMgoConn()
  95. //defer sess.Close()
  96. //
  97. ////var arr []map[string]interface{}
  98. //
  99. //res_test := sessTest.DB("qfw").C("bidding").Find(mongodb.ObjToMQ(`{"comeintime":{"$gte": 1571025600, "$lte": 1571976000}}`, true)).Iter()
  100. //res :=sess.DB("extract_kf").C("a_testbidding")
  101. //5
  102. //
  103. //
  104. //
  105. //
  106. //i:=0
  107. //for dict := make(map[string]interface{}); res_test.Next(&dict); i++{
  108. //
  109. // //插入
  110. // if i%2000==0 {
  111. // log.Println("当前:",i)
  112. // }
  113. // res.Insert(dict)
  114. // //if len(arr)>=500 {
  115. // // arr = make([]map[string]interface{},0)
  116. // //}else {
  117. // // arr = append(arr,dict)
  118. // //}
  119. //}
  120. sess := mgo.GetMgoConn()
  121. defer mgo.DestoryMongoConn(sess)
  122. res_copy := sess.DB("extract_kf").C(extract_copy).Find(nil).Iter()
  123. m1 :=map[string]int{} //老版本
  124. m2 :=map[string]int{} //新版本
  125. i:=0
  126. j:=0
  127. for v1 := make(map[string]interface{}); res_copy.Next(&v1); i++{
  128. if i%2000==0 {
  129. log.Println("当前i:",i)
  130. }
  131. m1[(v1["_id"].(bson.ObjectId).Hex())]= util.IntAll(v1["repeat"])
  132. }
  133. sesss := mgo.GetMgoConn()
  134. defer mgo.DestoryMongoConn(sesss)
  135. res := sesss.DB("extract_kf").C(extract).Find(nil).Iter()
  136. for v2 := make(map[string]interface{}); res.Next(&v2); j++{
  137. if j%2000==0 {
  138. log.Println("当前j:",j)
  139. }
  140. m2[(v2["_id"].(bson.ObjectId).Hex())]= util.IntAll(v2["repeat"])
  141. }
  142. fmt.Println(len(m1),len(m2))
  143. n1:=0
  144. n2:=0
  145. n3:=0
  146. n4:=0
  147. n5:=0
  148. n6:=0
  149. var arr1 []string
  150. var arr2 []string
  151. for k,v:=range m1{
  152. if m2[k]==1&&v==0{//0:1
  153. n1++
  154. arr2 = append(arr2,fmt.Sprintf("目标_id:%s",k))
  155. }
  156. if m2[k]==0&&v==1{ //1:0
  157. n2++
  158. arr1 = append(arr1,fmt.Sprintf("目标_id:%s",k))
  159. }
  160. if m2[k]==0&&v==0{ //0:0
  161. n3++
  162. }
  163. if m2[k]==1&&v==1{//1:1
  164. n4++
  165. }
  166. if m2[k]==-1&&v==0{ //0:-1
  167. n5++
  168. }
  169. if m2[k]==-1&&v==1{//1:-1
  170. n6++
  171. }
  172. }
  173. //打印 1:0情况 66989;
  174. mm:=0
  175. for _,v:=range arr1 {
  176. mm++
  177. if mm%222==0 {
  178. log.Println(v)
  179. }
  180. }
  181. log.Println("分割线---------------")
  182. log.Println("分割线---------------")
  183. //打印 0:1情况 8729
  184. nn:=0
  185. for _,v:=range arr2 {
  186. nn++
  187. if nn%30==0 {
  188. log.Println(v)
  189. }
  190. }
  191. log.Println("V1 0:1---",n1)
  192. log.Println("V1 1:0---",n2)
  193. log.Println("V1 0:0---",n3)
  194. log.Println("V1 1:1---",n4)
  195. log.Println("V1 0:-1---",n5)
  196. log.Println("V1 1:-1---",n6)
  197. }
  198. func main() {
  199. go checkMapJob()
  200. updport := Sysconfig["udpport"].(string)
  201. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  202. udpclient.Listen(processUdpMsg)
  203. log.Println("Udp服务监听", updport)
  204. time.Sleep(99999 * time.Hour)
  205. }
  206. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  207. fmt.Println("接受的段数据")
  208. switch act {
  209. case mu.OP_TYPE_DATA: //上个节点的数据
  210. //从表中开始处理
  211. var mapInfo map[string]interface{}
  212. err := json.Unmarshal(data, &mapInfo)
  213. log.Println("err:", err, "mapInfo:", mapInfo)
  214. if err != nil {
  215. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  216. } else if mapInfo != nil {
  217. //更新流程
  218. go historyTask(data,mapInfo)
  219. //判重流程
  220. //go task(data, mapInfo)
  221. key, _ := mapInfo["key"].(string)
  222. if key == "" {
  223. key = "udpok"
  224. }
  225. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  226. }
  227. case mu.OP_NOOP: //下个节点回应
  228. ok := string(data)
  229. if ok != "" {
  230. log.Println("ok:", ok)
  231. udptaskmap.Delete(ok)
  232. }
  233. }
  234. }
  235. //开始判重程序
  236. func task(data []byte, mapInfo map[string]interface{}) {
  237. fmt.Println("开始数据判重")
  238. defer util.Catch()
  239. //区间id
  240. sess := mgo.GetMgoConn()
  241. defer mgo.DestoryMongoConn(sess)
  242. q := map[string]interface{}{
  243. "_id": map[string]interface{}{
  244. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  245. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  246. },
  247. }
  248. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  249. updateExtract := [][]map[string]interface{}{}
  250. pool := make(chan bool, 16)
  251. wg := &sync.WaitGroup{}
  252. mapLock := &sync.Mutex{}
  253. n, repeateN := 0, 0
  254. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  255. if n%10000 == 0 {
  256. log.Println("current:", n, tmp["_id"],"repeateN:",repeateN)
  257. }
  258. pool <- true
  259. wg.Add(1)
  260. go func(tmp map[string]interface{}) {
  261. defer func() {
  262. <-pool
  263. wg.Done()
  264. }()
  265. info := NewInfo(tmp)
  266. //是否为无效数据
  267. if invalidData(info.buyer,info.projectname,info.projectcode) {
  268. mapLock.Lock()
  269. updateExtract = append(updateExtract, []map[string]interface{}{
  270. map[string]interface{}{
  271. "_id": tmp["_id"],
  272. },
  273. map[string]interface{}{
  274. "$set": map[string]interface{}{
  275. "repeat": -1,
  276. },
  277. },
  278. })
  279. if len(updateExtract) > 500 {
  280. mgo.UpdateBulk(extract, updateExtract...)
  281. updateExtract = [][]map[string]interface{}{}
  282. }
  283. mapLock.Unlock()
  284. }else {
  285. //判重原因 reason tmp["_id"] 对比id id原始id
  286. mapLock.Lock()
  287. b, source,reason := DM.check(info)
  288. if b { //有重复,生成更新语句,更新抽取和更新招标
  289. repeateN++
  290. var mergeArr = []int64{} //更改合并数组记录
  291. var newData = &Info{} //更换新的数据池数据
  292. var id_map = map[string]interface{}{}
  293. repeat_id := ""
  294. //合并操作--评功权重打分-合并完替换原始数据池
  295. basic_bool := basicDataScore(source,info)
  296. if basic_bool {
  297. //已原始数据为标准-对比数据打判重标签
  298. newData,mergeArr= mergeDataFields(source,info)
  299. DM.replaceSourceData(newData,source.id) //替换
  300. id_map["_id"]= util.StringTOBsonId(source.id)
  301. repeat_id = source.id
  302. }else {
  303. //已对比数据为标准 ,数据池的数据打判重标签
  304. newData,mergeArr= mergeDataFields(info,source)
  305. DM.replaceSourceData(newData,source.id)//替换
  306. id_map["_id"]= util.StringTOBsonId(info.id)
  307. repeat_id = info.id
  308. }
  309. var update_map = map[string]interface{}{
  310. "$set": map[string]interface{}{
  311. "reason":reason,
  312. "repeat":"1",
  313. "repeatid":repeat_id,
  314. },
  315. }
  316. //合并记录
  317. if len(newData.mergemap)>0 {
  318. update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap
  319. }
  320. //更新合并后的数据
  321. for _,value :=range mergeArr {
  322. if value==1 {
  323. update_map["$set"].(map[string]interface{})["area"] = newData.area
  324. update_map["$set"].(map[string]interface{})["city"] = newData.city
  325. }else if value==2 {
  326. update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  327. }else if value==3 {
  328. update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  329. }else if value==4 {
  330. update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  331. }else if value==5 {
  332. update_map["$set"].(map[string]interface{})["budget"] = newData.budget
  333. }else if value==6 {
  334. update_map["$set"].(map[string]interface{})["winner"] = newData.winner
  335. }else if value==7 {
  336. update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  337. }else if value==8 {
  338. update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  339. }else {
  340. }
  341. }
  342. //构建数据库更新用到的
  343. updateExtract = append(updateExtract, []map[string]interface{}{
  344. id_map,
  345. update_map,
  346. })
  347. if len(updateExtract) > 500 {
  348. mgo.UpdateBulk(extract, updateExtract...)
  349. updateExtract = [][]map[string]interface{}{}
  350. }
  351. mapLock.Unlock()
  352. } else {
  353. mapLock.Unlock()
  354. }
  355. }
  356. }(tmp)
  357. tmp = make(map[string]interface{})
  358. }
  359. wg.Wait()
  360. if len(updateExtract) > 0 {
  361. mgo.UpdateBulk(extract, updateExtract...)
  362. //mgo.UpdateBulk(bidding, updateBidding...)
  363. }
  364. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  365. //任务完成,开始发送广播通知下面节点
  366. if n > repeateN && mapInfo["stop"] == nil {
  367. for _, to := range nextNode {
  368. sid, _ := mapInfo["gtid"].(string)
  369. eid, _ := mapInfo["lteid"].(string)
  370. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  371. by, _ := json.Marshal(map[string]interface{}{
  372. "gtid": sid,
  373. "lteid": eid,
  374. "stype": util.ObjToString(to["stype"]),
  375. "key": key,
  376. })
  377. addr := &net.UDPAddr{
  378. IP: net.ParseIP(to["addr"].(string)),
  379. Port: util.IntAll(to["port"]),
  380. }
  381. node := &udpNode{by, addr, time.Now().Unix(), 0}
  382. udptaskmap.Store(key, node)
  383. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  384. }
  385. }
  386. }
  387. //支持历史更新
  388. func historyTask(data []byte, mapInfo map[string]interface{}) {
  389. fmt.Println("开始取历史时间段")
  390. defer util.Catch()
  391. sess := mgo.GetMgoConn()
  392. defer mgo.DestoryMongoConn(sess)
  393. q := map[string]interface{}{
  394. "_id": map[string]interface{}{
  395. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  396. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  397. },
  398. }
  399. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  400. minTime,maxTime:=int64(0),int64(0)
  401. for tmp := make(map[string]interface{}); it.Next(&tmp);{
  402. //取出最大最小时间
  403. if minTime==0||maxTime ==0 {
  404. minTime = util.Int64All(tmp["comeintime"])
  405. maxTime = util.Int64All(tmp["comeintime"])
  406. }else {
  407. t := util.Int64All(tmp["comeintime"])
  408. if t<minTime&&t!=0 {
  409. minTime = t
  410. }
  411. if t>maxTime&&t!=0 {
  412. maxTime = t
  413. }
  414. }
  415. }
  416. fmt.Println("最小时间==",minTime,"最大时间==",maxTime)
  417. //最小时间== 1568087634 最大时间== 1568103381
  418. HM = NewHistorymap(util.ObjToString(mapInfo["gtid"]),
  419. util.ObjToString(mapInfo["lteid"]),minTime,maxTime)
  420. //return
  421. //开始判重...
  422. defer util.Catch()
  423. sess_task := mgo.GetMgoConn()
  424. defer mgo.DestoryMongoConn(sess_task)
  425. q_task := map[string]interface{}{
  426. "_id": map[string]interface{}{
  427. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  428. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  429. },
  430. }
  431. it_task := sess.DB(mgo.DbName).C(extract).Find(&q_task).Iter()
  432. updateExtract := [][]map[string]interface{}{}
  433. pool := make(chan bool, 16)
  434. wg := &sync.WaitGroup{}
  435. mapLock := &sync.Mutex{}
  436. n, repeateN := 0, 0
  437. for tmp := make(map[string]interface{}); it_task.Next(&tmp); n++ {
  438. if n%10000 == 0 {
  439. log.Println("current:", n, tmp["_id"],"repeateN:",repeateN)
  440. }
  441. pool <- true
  442. wg.Add(1)
  443. go func(tmp map[string]interface{}) {
  444. defer func() {
  445. <-pool
  446. wg.Done()
  447. }()
  448. info := NewInfo(tmp)
  449. //是否为无效数据
  450. if invalidData(info.buyer,info.projectname,info.projectcode) {
  451. mapLock.Lock()
  452. updateExtract = append(updateExtract, []map[string]interface{}{
  453. map[string]interface{}{
  454. "_id": tmp["_id"],
  455. },
  456. map[string]interface{}{
  457. "$set": map[string]interface{}{
  458. "repeat": -1,
  459. },
  460. },
  461. })
  462. if len(updateExtract) > 500 {
  463. mgo.UpdateBulk(extract, updateExtract...)
  464. updateExtract = [][]map[string]interface{}{}
  465. }
  466. mapLock.Unlock()
  467. }else {
  468. mapLock.Lock()
  469. b, source,reason := HM.check(info)
  470. if b { //有重复,生成更新语句,更新抽取和更新招标
  471. if reason == "未判重记录" {
  472. fmt.Println("未判重记录")
  473. //把info的数据判重的标签更换,并新增字段
  474. DM.replaceSourceData(info, info.id) //替换即添加
  475. updateExtract = append(updateExtract, []map[string]interface{}{
  476. map[string]interface{}{
  477. "_id": tmp["_id"],
  478. },
  479. map[string]interface{}{
  480. "$set": map[string]interface{}{
  481. "repeat": 0,
  482. "repeatid": "-1",
  483. },
  484. },
  485. })
  486. if len(updateExtract) > 500 {
  487. mgo.UpdateBulk(extract, updateExtract...)
  488. updateExtract = [][]map[string]interface{}{}
  489. }
  490. mapLock.Unlock()
  491. }else {
  492. repeateN++
  493. var mergeArr = []int64{} //更改合并数组记录
  494. var newData = &Info{} //更换新的数据池数据
  495. var id_map = map[string]interface{}{}
  496. repeat_id := ""
  497. //合并操作--评功权重打分-合并完替换原始数据池
  498. basic_bool := basicDataScore(source,info)
  499. if basic_bool {
  500. //已原始数据为标准-对比数据打判重标签
  501. newData,mergeArr= mergeDataFields(source,info)
  502. DM.replaceSourceData(newData,source.id) //替换
  503. id_map["_id"]= util.StringTOBsonId(source.id)
  504. repeat_id = source.id
  505. }else {
  506. //已对比数据为标准 ,数据池的数据打判重标签
  507. newData,mergeArr= mergeDataFields(info,source)
  508. DM.replaceSourceData(newData,source.id)//替换
  509. id_map["_id"]= util.StringTOBsonId(info.id)
  510. repeat_id = info.id
  511. }
  512. var update_map = map[string]interface{}{
  513. "$set": map[string]interface{}{
  514. "reason":reason,
  515. "repeat":"1",
  516. "repeatid":repeat_id,
  517. },
  518. }
  519. //合并记录
  520. if len(newData.mergemap)>0 {
  521. update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap
  522. }
  523. //更新合并后的数据
  524. for _,value :=range mergeArr {
  525. if value==1 {
  526. update_map["$set"].(map[string]interface{})["area"] = newData.area
  527. update_map["$set"].(map[string]interface{})["city"] = newData.city
  528. }else if value==2 {
  529. update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  530. }else if value==3 {
  531. update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  532. }else if value==4 {
  533. update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  534. }else if value==5 {
  535. update_map["$set"].(map[string]interface{})["budget"] = newData.budget
  536. }else if value==6 {
  537. update_map["$set"].(map[string]interface{})["winner"] = newData.winner
  538. }else if value==7 {
  539. update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  540. }else if value==8 {
  541. update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  542. }else {
  543. }
  544. }
  545. //构建数据库更新用到的
  546. updateExtract = append(updateExtract, []map[string]interface{}{
  547. id_map,
  548. update_map,
  549. })
  550. if len(updateExtract) > 500 {
  551. mgo.UpdateBulk(extract, updateExtract...)
  552. updateExtract = [][]map[string]interface{}{}
  553. }
  554. mapLock.Unlock()
  555. }
  556. }else {
  557. mapLock.Unlock()
  558. }
  559. }
  560. }(tmp)
  561. tmp = make(map[string]interface{})
  562. }
  563. wg.Wait()
  564. if len(updateExtract) > 0 {
  565. mgo.UpdateBulk(extract, updateExtract...)
  566. //mgo.UpdateBulk(bidding, updateBidding...)
  567. }
  568. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  569. //任务完成,开始发送广播通知下面节点
  570. if n > repeateN &&mapInfo["stop"] == nil {
  571. for _, to := range nextNode {
  572. sid, _ := mapInfo["gtid"].(string)
  573. eid, _ := mapInfo["lteid"].(string)
  574. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  575. by, _ := json.Marshal(map[string]interface{}{
  576. "gtid": sid,
  577. "lteid": eid,
  578. "stype": util.ObjToString(to["stype"]),
  579. "key": key,
  580. })
  581. addr := &net.UDPAddr{
  582. IP: net.ParseIP(to["addr"].(string)),
  583. Port: util.IntAll(to["port"]),
  584. }
  585. node := &udpNode{by, addr, time.Now().Unix(), 0}
  586. udptaskmap.Store(key, node)
  587. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  588. }
  589. }
  590. }
  591. //合并字段
  592. func mergeDataFields(source *Info, info *Info) (*Info,[]int64){
  593. var mergeArr []int64
  594. mergeArr = make([]int64,0)
  595. //1、城市
  596. if (source.area==""||source.area=="全国")&&info.area!="全国"&&info.area!=""{
  597. var arrA []string
  598. if source.mergemap["area"]==nil {
  599. arrA = make([]string, 0)
  600. }else {
  601. arrA = source.mergemap["area"].([]string)
  602. }
  603. arrA = append(arrA,source.area)
  604. source.mergemap["area"] = arrA
  605. var arrC []string
  606. if source.mergemap["city"]==nil {
  607. arrC = make([]string, 0)
  608. }else {
  609. arrC = source.mergemap["city"].([]string)
  610. }
  611. arrC = append(arrC,source.city)
  612. source.mergemap["city"] = arrC
  613. source.area = info.area
  614. source.city = info.city
  615. mergeArr = append(mergeArr,1)
  616. }
  617. //2、项目名称
  618. if source.projectname==""&&info.projectname!=""{
  619. var arr []string
  620. if source.mergemap["projectname"]==nil {
  621. arr = make([]string, 0)
  622. }else {
  623. arr = source.mergemap["projectname"].([]string)
  624. }
  625. arr = append(arr,source.projectname)
  626. source.mergemap["projectname"] = arr
  627. source.projectname = info.projectname
  628. mergeArr = append(mergeArr,2)
  629. }
  630. //3、项目编号
  631. if source.projectcode==""&&info.projectcode!=""{
  632. var arr []string
  633. if source.mergemap["projectcode"]==nil {
  634. arr = make([]string, 0)
  635. }else {
  636. arr = source.mergemap["projectcode"].([]string)
  637. }
  638. arr = append(arr,source.projectcode)
  639. source.mergemap["projectcode"] = arr
  640. source.projectcode = info.projectcode
  641. mergeArr = append(mergeArr,3)
  642. }
  643. //4、采购单位
  644. if source.buyer==""&&info.buyer!=""{
  645. var arr []string
  646. if source.mergemap["buyer"]==nil {
  647. arr = make([]string, 0)
  648. }else {
  649. arr = source.mergemap["buyer"].([]string)
  650. }
  651. arr = append(arr,source.buyer)
  652. source.mergemap["buyer"] = arr
  653. source.buyer = info.buyer
  654. mergeArr = append(mergeArr,4)
  655. }
  656. //5、预算
  657. if source.budget==0&&info.budget!=0{
  658. var arr []float64
  659. if source.mergemap["budget"]==nil {
  660. arr = make([]float64, 0)
  661. }else {
  662. arr = source.mergemap["budget"].([]float64)
  663. }
  664. arr = append(arr,source.budget)
  665. source.mergemap["budget"] = arr
  666. source.budget = info.budget
  667. mergeArr = append(mergeArr,5)
  668. }
  669. //6、中标单位
  670. if source.winner==""&&info.winner!=""{
  671. var arr []string
  672. if source.mergemap["winner"]==nil {
  673. arr = make([]string, 0)
  674. }else {
  675. arr = source.mergemap["winner"].([]string)
  676. }
  677. arr = append(arr,source.winner)
  678. source.mergemap["winner"] = arr
  679. source.winner = info.winner
  680. mergeArr = append(mergeArr,6)
  681. }
  682. //7、中标金额
  683. if source.bidamount==0&&info.bidamount!=0{
  684. var arr []float64
  685. if source.mergemap["bidamount"]==nil {
  686. arr = make([]float64, 0)
  687. }else {
  688. arr = source.mergemap["bidamount"].([]float64)
  689. }
  690. arr = append(arr,source.bidamount)
  691. source.mergemap["bidamount"] = arr
  692. source.bidamount = info.bidamount
  693. mergeArr = append(mergeArr,7)
  694. }
  695. //8、开天时间-地点
  696. if source.bidopentime==0&&info.bidopentime!=0{
  697. var arr []int64
  698. if source.mergemap["bidopentime"]==nil {
  699. arr = make([]int64, 0)
  700. }else {
  701. arr = source.mergemap["bidopentime"].([]int64)
  702. }
  703. arr = append(arr,source.bidopentime)
  704. source.mergemap["bidopentime"] = arr
  705. source.bidopentime = info.bidopentime
  706. mergeArr = append(mergeArr,8)
  707. }
  708. //以上合并过于简单,待进一步优化
  709. return source,mergeArr
  710. }
  711. //权重评估
  712. func basicDataScore(v *Info, info *Info) bool {
  713. m,n:=0,0
  714. if v.projectname!="" {m++}
  715. if v.buyer!="" {m++}
  716. if v.projectcode!="" {m++}
  717. if v.budget!=0 {m++}
  718. if v.bidamount!=0 {m++}
  719. if v.winner!="" {m++}
  720. if v.bidopentime!=0 {m++}
  721. if v.agencyaddr!="" {m++}
  722. if v.agency!="" {m=m+2}
  723. if v.city!="" {m=m+2}
  724. if info.projectname!="" {n++}
  725. if info.buyer!="" {n++}
  726. if info.projectcode!="" {n++}
  727. if info.budget!=0 {n++}
  728. if info.bidamount!=0 {n++}
  729. if info.winner!="" {n++}
  730. if info.bidopentime!=0 {n++}
  731. if info.agencyaddr!="" {n++}
  732. if info.agency!="" {n=m+2}
  733. if info.city!="" {n=m+2}
  734. if m>n {
  735. return true
  736. }else if m==n {
  737. if v.comeintime>=info.comeintime {
  738. return true
  739. }else {
  740. return false
  741. }
  742. }else {
  743. return false
  744. }
  745. }
  746. //无效数据
  747. func invalidData(d1 string,d2 string,d3 string) bool{
  748. var n int
  749. if d1 != "" {
  750. n++
  751. }
  752. if d2 != "" {
  753. n++
  754. }
  755. if d3 != "" {
  756. n++
  757. }
  758. if n==0 {
  759. return true
  760. }
  761. return false
  762. }