main.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "os"
  13. "qfw/util"
  14. "qfw/util/mongodb"
  15. "regexp"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. Sysconfig map[string]interface{} //配置文件
  21. mconf map[string]interface{} //mongodb配置信息
  22. mgo *mongodb.MongodbSim //mongodb操作对象
  23. extract string
  24. udpclient mu.UdpClient //udp对象
  25. nextNode []map[string]interface{} //下节点数组
  26. dupdays = 5 //初始化判重范围
  27. DM *datamap //
  28. HM *historymap //判重数据
  29. lastid = ""
  30. /*
  31. 5da3f31aa5cb26b9b798d3aa
  32. */
  33. //正则筛选相关
  34. FilterRegTitle = regexp.MustCompile("^_$")
  35. FilterRegTitle_1 = regexp.MustCompile("^_$")
  36. FilterRegTitle_2 = regexp.MustCompile("^_$")
  37. isMerger bool //是否合并
  38. threadNum int //线程数量
  39. SiteMap map[string]map[string]interface{} //站点map
  40. idtype, sid, eid string //测试人员判重使用
  41. )
  42. func init() {
  43. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  44. flag.StringVar(&sid, "sid", "", "开始id")
  45. flag.StringVar(&eid, "eid", "", "结束id")
  46. flag.StringVar(&idtype, "idtype", "", "id类型,默认ObjectId:0,String:1")
  47. flag.Parse()
  48. //172.17.145.163:27080
  49. util.ReadConfig(&Sysconfig)
  50. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  51. mconf = Sysconfig["mongodb"].(map[string]interface{})
  52. mgo = &mongodb.MongodbSim{
  53. MongodbAddr: mconf["addr"].(string),
  54. DbName: mconf["db"].(string),
  55. Size: util.IntAllDef(mconf["pool"], 10),
  56. }
  57. extract = mconf["extract"].(string)
  58. mgo.InitPool()
  59. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  60. //加载数据
  61. DM = NewDatamap(dupdays, lastid)
  62. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  63. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  64. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  65. isMerger = Sysconfig["isMerger"].(bool)
  66. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  67. //站点配置
  68. site := mconf["site"].(map[string]interface{})
  69. SiteMap = make(map[string]map[string]interface{}, 0)
  70. start := int(time.Now().Unix())
  71. sess_site := mgo.GetMgoConn()
  72. defer sess_site.Close()
  73. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(nil).Sort("_id").Iter()
  74. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  75. data_map := map[string]interface{}{
  76. "area": util.ObjToString(site_dict["area"]),
  77. "city": util.ObjToString(site_dict["city"]),
  78. "district": util.ObjToString(site_dict["district"]),
  79. "sitetype": util.ObjToString(site_dict["sitetype"]),
  80. "level": util.ObjToString(site_dict["level"]),
  81. }
  82. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  83. }
  84. fmt.Printf("站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  85. }
  86. func main() {
  87. go checkMapJob()
  88. updport := Sysconfig["udpport"].(string)
  89. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  90. udpclient.Listen(processUdpMsg)
  91. log.Println("Udp服务监听", updport)
  92. time.Sleep(99999 * time.Hour)
  93. }
  94. //测试组人员使用
  95. func mainT() {
  96. /*
  97. ObjectId("5da3f31aa5cb26b9b798d3aa")
  98. ObjectId("5da418c4a5cb26b9b7e3e9a6")
  99. */
  100. //sid = "5da3f31aa5cb26b9b798d3aa"
  101. //eid = "5da418c4a5cb26b9b7e3e9a6"
  102. mapinfo := map[string]interface{}{}
  103. if sid == "" || eid == "" {
  104. log.Println("sid,eid参数不能为空")
  105. os.Exit(0)
  106. }
  107. mapinfo["gtid"] = sid
  108. mapinfo["lteid"] = eid
  109. mapinfo["stop"] = "true"
  110. task([]byte{}, mapinfo)
  111. time.Sleep(10 * time.Second)
  112. }
  113. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  114. fmt.Println("接受的段数据")
  115. switch act {
  116. case mu.OP_TYPE_DATA: //上个节点的数据
  117. //从表中开始处理
  118. var mapInfo map[string]interface{}
  119. err := json.Unmarshal(data, &mapInfo)
  120. log.Println("err:", err, "mapInfo:", mapInfo)
  121. if err != nil {
  122. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  123. } else if mapInfo != nil {
  124. taskType := util.ObjToString(mapInfo["stype"])
  125. if taskType == "historyTask" {
  126. //更新流程
  127. go historyTask(data, mapInfo)
  128. } else if taskType == "normalTask" {
  129. //判重流程
  130. go task(data, mapInfo)
  131. } else {
  132. //其他
  133. go task(data, mapInfo)
  134. }
  135. key, _ := mapInfo["key"].(string)
  136. if key == "" {
  137. key = "udpok"
  138. }
  139. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  140. }
  141. case mu.OP_NOOP: //下个节点回应
  142. ok := string(data)
  143. if ok != "" {
  144. log.Println("ok:", ok)
  145. udptaskmap.Delete(ok)
  146. }
  147. }
  148. }
  149. //开始判重程序
  150. func task(data []byte, mapInfo map[string]interface{}) {
  151. fmt.Println("开始数据判重")
  152. defer util.Catch()
  153. //区间id
  154. sess := mgo.GetMgoConn()
  155. defer mgo.DestoryMongoConn(sess)
  156. var q map[string]interface{}
  157. if idtype == "1" {
  158. q = map[string]interface{}{
  159. "_id": map[string]interface{}{
  160. "$gt": mapInfo["gtid"].(string),
  161. "$lte": mapInfo["lteid"].(string),
  162. },
  163. }
  164. } else {
  165. q = map[string]interface{}{
  166. "_id": map[string]interface{}{
  167. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  168. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  169. },
  170. }
  171. }
  172. log.Println(mgo.DbName,extract,q)
  173. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  174. updateExtract := [][]map[string]interface{}{}
  175. log.Println("线程数:",threadNum)
  176. pool := make(chan bool, threadNum)
  177. wg := &sync.WaitGroup{}
  178. //mapLock := &sync.Mutex{}
  179. n, repeateN := 0, 0
  180. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  181. if n%10000 == 0 {
  182. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  183. }
  184. pool <- true
  185. wg.Add(1)
  186. go func(tmp map[string]interface{}) {
  187. defer func() {
  188. <-pool
  189. wg.Done()
  190. }()
  191. info := NewInfo(tmp)
  192. //是否为无效数据
  193. if invalidData(info.buyer, info.projectname, info.projectcode,info.contractnumber) {
  194. updateExtract = append(updateExtract, []map[string]interface{}{
  195. map[string]interface{}{
  196. "_id": tmp["_id"],
  197. },
  198. map[string]interface{}{
  199. "$set": map[string]interface{}{
  200. "repeat": -1,
  201. },
  202. },
  203. })
  204. if len(updateExtract) > 500 {
  205. mgo.UpdateBulk(extract, updateExtract...)
  206. updateExtract = [][]map[string]interface{}{}
  207. }
  208. } else {
  209. b, source, reason := DM.check(info)
  210. if b { //有重复,生成更新语句,更新抽取和更新招标
  211. repeateN++
  212. var mergeArr = []int64{} //更改合并数组记录
  213. var newData = &Info{} //更换新的数据池数据
  214. var id_map = map[string]interface{}{}
  215. repeat_id := source.id
  216. if idtype == "1" {
  217. id_map["_id"] = info.id
  218. } else {
  219. id_map["_id"] = util.StringTOBsonId(info.id)
  220. }
  221. if isMerger {
  222. //需要合并相关操作-合并操作--评功权重打分-合并完替换原始数据池
  223. basic_bool := basicDataScore(source, info)
  224. if basic_bool {
  225. //已原始数据为标准-对比数据打判重标签
  226. newData, mergeArr = mergeDataFields(source, info)
  227. DM.replaceSourceData(newData, source.id) //替换
  228. if idtype == "1" {
  229. id_map["_id"] = info.id
  230. } else {
  231. id_map["_id"] = util.StringTOBsonId(info.id)
  232. }
  233. repeat_id = source.id
  234. } else {
  235. //已对比数据为标准 ,数据池的数据打判重标签
  236. newData, mergeArr = mergeDataFields(info, source)
  237. DM.replaceSourceData(newData, source.id) //替换
  238. if idtype == "1" {
  239. id_map["_id"] = source.id
  240. } else {
  241. id_map["_id"] = util.StringTOBsonId(source.id)
  242. }
  243. repeat_id = info.id
  244. }
  245. }
  246. var update_map = map[string]interface{}{
  247. "$set": map[string]interface{}{
  248. "repeat_reason": reason,
  249. "repeat": 1,
  250. "repeatid": repeat_id,
  251. },
  252. }
  253. if isMerger {
  254. if len(newData.mergemap) > 0 {
  255. update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap
  256. }
  257. //更新合并后的数据
  258. for _, value := range mergeArr {
  259. if value == 1 {
  260. update_map["$set"].(map[string]interface{})["area"] = newData.area
  261. update_map["$set"].(map[string]interface{})["city"] = newData.city
  262. } else if value == 2 {
  263. update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  264. } else if value == 3 {
  265. update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  266. } else if value == 4 {
  267. update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  268. } else if value == 5 {
  269. update_map["$set"].(map[string]interface{})["budget"] = newData.budget
  270. } else if value == 6 {
  271. update_map["$set"].(map[string]interface{})["winner"] = newData.winner
  272. } else if value == 7 {
  273. update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  274. } else if value == 8 {
  275. update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  276. } else if value == 9 {
  277. update_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  278. }else {
  279. }
  280. }
  281. }
  282. //构建数据库更新用到的
  283. updateExtract = append(updateExtract, []map[string]interface{}{
  284. id_map,
  285. update_map,
  286. })
  287. }
  288. }
  289. }(tmp)
  290. if len(updateExtract) > 500 {
  291. mgo.UpdateBulk(extract, updateExtract...)
  292. updateExtract = [][]map[string]interface{}{}
  293. }
  294. tmp = make(map[string]interface{})
  295. }
  296. wg.Wait()
  297. if len(updateExtract) > 0 {
  298. mgo.UpdateBulk(extract, updateExtract...)
  299. //mgo.UpdateBulk(bidding, updateBidding...)
  300. }
  301. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  302. //任务完成,开始发送广播通知下面节点
  303. if n > repeateN && mapInfo["stop"] == nil {
  304. for _, to := range nextNode {
  305. sid, _ := mapInfo["gtid"].(string)
  306. eid, _ := mapInfo["lteid"].(string)
  307. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  308. by, _ := json.Marshal(map[string]interface{}{
  309. "gtid": sid,
  310. "lteid": eid,
  311. "stype": util.ObjToString(to["stype"]),
  312. "key": key,
  313. })
  314. addr := &net.UDPAddr{
  315. IP: net.ParseIP(to["addr"].(string)),
  316. Port: util.IntAll(to["port"]),
  317. }
  318. node := &udpNode{by, addr, time.Now().Unix(), 0}
  319. udptaskmap.Store(key, node)
  320. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  321. }
  322. }
  323. }
  324. //支持历史更新
  325. func historyTask(data []byte, mapInfo map[string]interface{}) {
  326. fmt.Println("开始取历史时间段")
  327. defer util.Catch()
  328. sess := mgo.GetMgoConn()
  329. defer mgo.DestoryMongoConn(sess)
  330. var q map[string]interface{}
  331. if idtype == "1" {
  332. q = map[string]interface{}{
  333. "_id": map[string]interface{}{
  334. "$gt": mapInfo["gtid"].(string),
  335. "$lte": mapInfo["lteid"].(string),
  336. },
  337. }
  338. } else {
  339. q = map[string]interface{}{
  340. "_id": map[string]interface{}{
  341. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  342. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  343. },
  344. }
  345. }
  346. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  347. minTime, maxTime := int64(0), int64(0)
  348. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  349. //取出最大最小时间
  350. if minTime == 0 || maxTime == 0 &&util.Int64All(tmp["publishtime"])!=0{
  351. minTime = util.Int64All(tmp["publishtime"])
  352. maxTime = util.Int64All(tmp["publishtime"])
  353. } else {
  354. t := util.Int64All(tmp["publishtime"])
  355. if t < minTime && t != 0 {
  356. minTime = t
  357. }
  358. if t > maxTime && t != 0 {
  359. maxTime = t
  360. }
  361. }
  362. }
  363. //时间不正确时
  364. if minTime==0&&maxTime==0 {
  365. log.Println("段数据区间 publishtime不符合")
  366. return
  367. }
  368. fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
  369. gtid,lteid:= util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
  370. fmt.Println(gtid,lteid)
  371. HM = NewHistorymap(gtid,lteid, minTime, maxTime)
  372. fmt.Println("开始历史数据判重")
  373. defer util.Catch()
  374. //区间id
  375. sess_history:= mgo.GetMgoConn()
  376. defer mgo.DestoryMongoConn(sess_history)
  377. var q_history map[string]interface{}
  378. if idtype == "1" {
  379. q_history = map[string]interface{}{
  380. "_id": map[string]interface{}{
  381. "$gt": mapInfo["gtid"].(string),
  382. "$lte": mapInfo["lteid"].(string),
  383. },
  384. }
  385. } else {
  386. q_history = map[string]interface{}{
  387. "_id": map[string]interface{}{
  388. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  389. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  390. },
  391. }
  392. }
  393. log.Println(mgo.DbName,extract,q_history)
  394. it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
  395. updateExtract := [][]map[string]interface{}{}
  396. log.Println("线程数:",threadNum)
  397. pool := make(chan bool, threadNum)
  398. wg := &sync.WaitGroup{}
  399. //mapLock := &sync.Mutex{}
  400. n, repeateN := 0, 0
  401. for tmp := make(map[string]interface{}); it_history.Next(&tmp); n++ {
  402. if n%10000 == 0 {
  403. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  404. }
  405. pool <- true
  406. wg.Add(1)
  407. go func(tmp map[string]interface{}) {
  408. defer func() {
  409. <-pool
  410. wg.Done()
  411. }()
  412. info := NewInfo(tmp)
  413. if invalidData(info.buyer, info.projectname, info.projectcode,info.contractnumber) {
  414. //mapLock.Lock()
  415. updateExtract = append(updateExtract, []map[string]interface{}{
  416. map[string]interface{}{
  417. "_id": tmp["_id"],
  418. },
  419. map[string]interface{}{
  420. "$set": map[string]interface{}{
  421. "repeat": -1,
  422. },
  423. },
  424. })
  425. if len(updateExtract) > 500 {
  426. mgo.UpdateBulk(extract, updateExtract...)
  427. updateExtract = [][]map[string]interface{}{}
  428. }
  429. //mapLock.Unlock()
  430. } else {
  431. b, source, reason := HM.checkHistory(info)
  432. if b { //有重复,生成更新语句,更新抽取和更新招标
  433. if reason == "未判重记录" {
  434. fmt.Println("未判重记录")
  435. //把info的数据判重的标签更换,并新增字段
  436. DM.replaceSourceData(info, info.id) //替换即添加
  437. updateExtract = append(updateExtract, []map[string]interface{}{
  438. map[string]interface{}{
  439. "_id": tmp["_id"],
  440. },
  441. map[string]interface{}{
  442. "$set": map[string]interface{}{
  443. "repeat": 0,
  444. "repeatid": -2,
  445. },
  446. },
  447. })
  448. } else {
  449. repeateN++
  450. var mergeArr = []int64{} //更改合并数组记录
  451. var newData = &Info{} //更换新的数据池数据
  452. var id_map = map[string]interface{}{}
  453. repeat_id := source.id
  454. if idtype == "1" {
  455. id_map["_id"] = info.id
  456. } else {
  457. id_map["_id"] = util.StringTOBsonId(info.id)
  458. }
  459. if isMerger {
  460. //需要合并相关操作-合并操作--评功权重打分-合并完替换原始数据池
  461. basic_bool := basicDataScore(source, info)
  462. if basic_bool {
  463. //已原始数据为标准-对比数据打判重标签
  464. newData, mergeArr = mergeDataFields(source, info)
  465. DM.replaceSourceData(newData, source.id) //替换
  466. if idtype == "1" {
  467. id_map["_id"] = info.id
  468. } else {
  469. id_map["_id"] = util.StringTOBsonId(info.id)
  470. }
  471. repeat_id = source.id
  472. } else {
  473. //已对比数据为标准 ,数据池的数据打判重标签
  474. newData, mergeArr = mergeDataFields(info, source)
  475. DM.replaceSourceData(newData, source.id) //替换
  476. if idtype == "1" {
  477. id_map["_id"] = source.id
  478. } else {
  479. id_map["_id"] = util.StringTOBsonId(source.id)
  480. }
  481. repeat_id = info.id
  482. }
  483. }
  484. var update_map = map[string]interface{}{
  485. "$set": map[string]interface{}{
  486. "repeat_reason": reason,
  487. "repeat": 1,
  488. "repeatid": repeat_id,
  489. },
  490. }
  491. if isMerger {
  492. //合并记录
  493. if len(newData.mergemap) > 0 {
  494. update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap
  495. }
  496. //更新合并后的数据
  497. for _, value := range mergeArr {
  498. if value == 1 {
  499. update_map["$set"].(map[string]interface{})["area"] = newData.area
  500. update_map["$set"].(map[string]interface{})["city"] = newData.city
  501. } else if value == 2 {
  502. update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  503. } else if value == 3 {
  504. update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  505. } else if value == 4 {
  506. update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  507. } else if value == 5 {
  508. update_map["$set"].(map[string]interface{})["budget"] = newData.budget
  509. } else if value == 6 {
  510. update_map["$set"].(map[string]interface{})["winner"] = newData.winner
  511. } else if value == 7 {
  512. update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  513. } else if value == 8 {
  514. update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  515. } else if value == 9 {
  516. update_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  517. }else {
  518. }
  519. }
  520. }
  521. //构建数据库更新用到的
  522. updateExtract = append(updateExtract, []map[string]interface{}{
  523. id_map,
  524. update_map,
  525. })
  526. }
  527. }
  528. }
  529. }(tmp)
  530. if len(updateExtract) > 500 {
  531. mgo.UpdateBulk(extract, updateExtract...)
  532. updateExtract = [][]map[string]interface{}{}
  533. }
  534. tmp = make(map[string]interface{})
  535. }
  536. wg.Wait()
  537. if len(updateExtract) > 0 {
  538. mgo.UpdateBulk(extract, updateExtract...)
  539. //mgo.UpdateBulk(bidding, updateBidding...)
  540. }
  541. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  542. //任务完成,开始发送广播通知下面节点
  543. if n > repeateN && mapInfo["stop"] == nil {
  544. for _, to := range nextNode {
  545. sid, _ := mapInfo["gtid"].(string)
  546. eid, _ := mapInfo["lteid"].(string)
  547. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  548. by, _ := json.Marshal(map[string]interface{}{
  549. "gtid": sid,
  550. "lteid": eid,
  551. "stype": util.ObjToString(to["stype"]),
  552. "key": key,
  553. })
  554. addr := &net.UDPAddr{
  555. IP: net.ParseIP(to["addr"].(string)),
  556. Port: util.IntAll(to["port"]),
  557. }
  558. node := &udpNode{by, addr, time.Now().Unix(), 0}
  559. udptaskmap.Store(key, node)
  560. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  561. }
  562. }
  563. }
  564. //合并字段
  565. func mergeDataFields(source *Info, info *Info) (*Info, []int64) {
  566. var mergeArr []int64
  567. mergeArr = make([]int64, 0)
  568. //1、城市
  569. if (source.area == "" || source.area == "全国") && info.area != "全国" && info.area != "" {
  570. var arrA []string
  571. if source.mergemap["area"] == nil {
  572. arrA = make([]string, 0)
  573. } else {
  574. arrA = source.mergemap["area"].([]string)
  575. }
  576. arrA = append(arrA, source.area)
  577. source.mergemap["area"] = arrA
  578. var arrC []string
  579. if source.mergemap["city"] == nil {
  580. arrC = make([]string, 0)
  581. } else {
  582. arrC = source.mergemap["city"].([]string)
  583. }
  584. arrC = append(arrC, source.city)
  585. source.mergemap["city"] = arrC
  586. source.area = info.area
  587. source.city = info.city
  588. mergeArr = append(mergeArr, 1)
  589. }
  590. //2、项目名称
  591. if source.projectname == "" && info.projectname != "" {
  592. var arr []string
  593. if source.mergemap["projectname"] == nil {
  594. arr = make([]string, 0)
  595. } else {
  596. arr = source.mergemap["projectname"].([]string)
  597. }
  598. arr = append(arr, source.projectname)
  599. source.mergemap["projectname"] = arr
  600. source.projectname = info.projectname
  601. mergeArr = append(mergeArr, 2)
  602. }
  603. //3、项目编号
  604. if source.projectcode == "" && info.projectcode != "" {
  605. var arr []string
  606. if source.mergemap["projectcode"] == nil {
  607. arr = make([]string, 0)
  608. } else {
  609. arr = source.mergemap["projectcode"].([]string)
  610. }
  611. arr = append(arr, source.projectcode)
  612. source.mergemap["projectcode"] = arr
  613. source.projectcode = info.projectcode
  614. mergeArr = append(mergeArr, 3)
  615. }
  616. //4、采购单位
  617. if source.buyer == "" && info.buyer != "" {
  618. var arr []string
  619. if source.mergemap["buyer"] == nil {
  620. arr = make([]string, 0)
  621. } else {
  622. arr = source.mergemap["buyer"].([]string)
  623. }
  624. arr = append(arr, source.buyer)
  625. source.mergemap["buyer"] = arr
  626. source.buyer = info.buyer
  627. mergeArr = append(mergeArr, 4)
  628. }
  629. //5、预算
  630. if source.budget == 0 && info.budget != 0 {
  631. var arr []float64
  632. if source.mergemap["budget"] == nil {
  633. arr = make([]float64, 0)
  634. } else {
  635. arr = source.mergemap["budget"].([]float64)
  636. }
  637. arr = append(arr, source.budget)
  638. source.mergemap["budget"] = arr
  639. source.budget = info.budget
  640. mergeArr = append(mergeArr, 5)
  641. }
  642. //6、中标单位
  643. if source.winner == "" && info.winner != "" {
  644. var arr []string
  645. if source.mergemap["winner"] == nil {
  646. arr = make([]string, 0)
  647. } else {
  648. arr = source.mergemap["winner"].([]string)
  649. }
  650. arr = append(arr, source.winner)
  651. source.mergemap["winner"] = arr
  652. source.winner = info.winner
  653. mergeArr = append(mergeArr, 6)
  654. }
  655. //7、中标金额
  656. if source.bidamount == 0 && info.bidamount != 0 {
  657. var arr []float64
  658. if source.mergemap["bidamount"] == nil {
  659. arr = make([]float64, 0)
  660. } else {
  661. arr = source.mergemap["bidamount"].([]float64)
  662. }
  663. arr = append(arr, source.bidamount)
  664. source.mergemap["bidamount"] = arr
  665. source.bidamount = info.bidamount
  666. mergeArr = append(mergeArr, 7)
  667. }
  668. //8、开标时间-地点
  669. if source.bidopentime == 0 && info.bidopentime != 0 {
  670. var arr []int64
  671. if source.mergemap["bidopentime"] == nil {
  672. arr = make([]int64, 0)
  673. } else {
  674. arr = source.mergemap["bidopentime"].([]int64)
  675. }
  676. arr = append(arr, source.bidopentime)
  677. source.mergemap["bidopentime"] = arr
  678. source.bidopentime = info.bidopentime
  679. mergeArr = append(mergeArr, 8)
  680. }
  681. //9、合同编号
  682. if source.contractnumber == "" && info.contractnumber != "" {
  683. var arr []string
  684. if source.mergemap["contractnumber"] == nil {
  685. arr = make([]string, 0)
  686. } else {
  687. arr = source.mergemap["contractnumber"].([]string)
  688. }
  689. arr = append(arr, source.contractnumber)
  690. source.mergemap["contractnumber"] = arr
  691. source.contractnumber = info.contractnumber
  692. mergeArr = append(mergeArr, 9)
  693. }
  694. //以上合并过于简单,待进一步优化
  695. return source, mergeArr
  696. }
  697. //权重评估
  698. func basicDataScore(v *Info, info *Info) bool {
  699. /*
  700. 权重评估
  701. 网站优先级判定规则:
  702. 1、中央>省>市>县区
  703. 2、政府采购>公共资源>采购单位官网>招标代理公司/平台
  704. */
  705. v_score, info_score := -1, -1
  706. dict_v := SiteMap[v.site]
  707. dict_info := SiteMap[info.site]
  708. //先判断level
  709. if dict_v != nil {
  710. v_level := util.ObjToString(dict_v["level"])
  711. if v_level == "中央" {
  712. v_score = 4
  713. } else if v_level == "省级" {
  714. v_score = 3
  715. } else if v_level == "市级" {
  716. v_score = 2
  717. } else if v_level == "县区" {
  718. v_score = 1
  719. } else if v_level == "" {
  720. } else {
  721. v_score = 0
  722. }
  723. }
  724. if dict_info != nil {
  725. info_level := util.ObjToString(dict_info["level"])
  726. if info_level == "中央" {
  727. info_score = 4
  728. } else if info_level == "省级" {
  729. info_score = 3
  730. } else if info_level == "市级" {
  731. info_score = 2
  732. } else if info_level == "县区" {
  733. info_score = 1
  734. } else if info_level == "" {
  735. } else {
  736. v_score = 0
  737. }
  738. }
  739. if v_score > info_score {
  740. return true
  741. }
  742. if v_score < info_score {
  743. return false
  744. }
  745. //判断sitetype
  746. if dict_v != nil {
  747. v_sitetype := util.ObjToString(dict_v["sitetype"])
  748. if v_sitetype == "政府采购" || v_sitetype == "政府门户" {
  749. v_score = 4
  750. } else if v_sitetype == "公共资源" {
  751. v_score = 3
  752. } else if v_sitetype == "官方网站" {
  753. v_score = 2
  754. } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
  755. v_score = 1
  756. } else if v_sitetype == "" {
  757. } else {
  758. v_score = 0
  759. }
  760. }
  761. if dict_info != nil {
  762. info_sitetype := util.ObjToString(dict_info["sitetype"])
  763. if info_sitetype == "政府采购" || info_sitetype == "政府门户" {
  764. info_score = 4
  765. } else if info_sitetype == "公共资源" {
  766. info_score = 3
  767. } else if info_sitetype == "官方网站" {
  768. info_score = 2
  769. } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
  770. info_score = 1
  771. } else if info_sitetype == "" {
  772. } else {
  773. info_score = 0
  774. }
  775. }
  776. if v_score > info_score {
  777. return true
  778. }
  779. if v_score < info_score {
  780. return false
  781. }
  782. //网站评估
  783. m, n := 0, 0
  784. if v.projectname != "" {
  785. m++
  786. }
  787. if v.buyer != "" {
  788. m++
  789. }
  790. if v.projectcode != ""||v.contractnumber != "" {
  791. m++
  792. }
  793. if v.budget != 0 {
  794. m++
  795. }
  796. if v.bidamount != 0 {
  797. m++
  798. }
  799. if v.winner != "" {
  800. m++
  801. }
  802. if v.bidopentime != 0 {
  803. m++
  804. }
  805. if v.agencyaddr != "" {
  806. m++
  807. }
  808. if v.agency != "" {
  809. m = m + 2
  810. }
  811. if v.city != "" {
  812. m = m + 2
  813. }
  814. if info.projectname != "" {
  815. n++
  816. }
  817. if info.buyer != "" {
  818. n++
  819. }
  820. if info.projectcode != "" || info.contractnumber != ""{
  821. n++
  822. }
  823. if info.budget != 0 {
  824. n++
  825. }
  826. if info.bidamount != 0 {
  827. n++
  828. }
  829. if info.winner != "" {
  830. n++
  831. }
  832. if info.bidopentime != 0 {
  833. n++
  834. }
  835. if info.agencyaddr != "" {
  836. n++
  837. }
  838. if info.agency != "" {
  839. n = n + 2
  840. }
  841. if info.city != "" {
  842. n = n + 2
  843. }
  844. if m > n {
  845. return true
  846. } else if m == n {
  847. if v.publishtime >= info.publishtime {
  848. return true
  849. } else {
  850. return false
  851. }
  852. } else {
  853. return false
  854. }
  855. }
  856. //无效数据
  857. func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
  858. var n int
  859. if d1 != "" {
  860. n++
  861. }
  862. if d2 != "" {
  863. n++
  864. }
  865. if d3 != "" {
  866. n++
  867. }
  868. if d4 != "" {
  869. n++
  870. }
  871. if n == 0 {
  872. return true
  873. }
  874. return false
  875. }