main.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "os"
  13. "qfw/util"
  14. "qfw/util/mongodb"
  15. "regexp"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. Sysconfig map[string]interface{} //配置文件
  21. mconf map[string]interface{} //mongodb配置信息
  22. mgo *mongodb.MongodbSim //mongodb操作对象
  23. extract string
  24. udpclient mu.UdpClient //udp对象
  25. nextNode []map[string]interface{} //下节点数组
  26. dupdays = 5 //初始化判重范围
  27. DM *datamap //
  28. HM *historymap //判重数据
  29. lastid = ""
  30. /*
  31. 5da3f31aa5cb26b9b798d3aa
  32. */
  33. //正则筛选相关
  34. FilterRegTitle = regexp.MustCompile("^_$")
  35. FilterRegTitle_1 = regexp.MustCompile("^_$")
  36. FilterRegTitle_2 = regexp.MustCompile("^_$")
  37. isMerger bool //是否合并
  38. threadNum int //线程数量
  39. SiteMap map[string]map[string]interface{} //站点map
  40. idtype, sid, eid string //测试人员判重使用
  41. )
  42. func init() {
  43. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  44. flag.StringVar(&sid, "sid", "", "开始id")
  45. flag.StringVar(&eid, "eid", "", "结束id")
  46. flag.StringVar(&idtype, "idtype", "", "id类型,默认ObjectId:0,String:1")
  47. flag.Parse()
  48. //172.17.145.163:27080
  49. util.ReadConfig(&Sysconfig)
  50. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  51. mconf = Sysconfig["mongodb"].(map[string]interface{})
  52. mgo = &mongodb.MongodbSim{
  53. MongodbAddr: mconf["addr"].(string),
  54. DbName: mconf["db"].(string),
  55. Size: util.IntAllDef(mconf["pool"], 10),
  56. }
  57. extract = mconf["extract"].(string)
  58. mgo.InitPool()
  59. //测试可以临时注释
  60. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  61. //加载数据
  62. DM = NewDatamap(dupdays, lastid)
  63. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  64. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  65. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  66. isMerger = Sysconfig["isMerger"].(bool)
  67. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  68. //站点配置
  69. site := mconf["site"].(map[string]interface{})
  70. SiteMap = make(map[string]map[string]interface{}, 0)
  71. start := int(time.Now().Unix())
  72. sess_site := mgo.GetMgoConn()
  73. defer sess_site.Close()
  74. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(nil).Sort("_id").Iter()
  75. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  76. data_map := map[string]interface{}{
  77. "area": util.ObjToString(site_dict["area"]),
  78. "city": util.ObjToString(site_dict["city"]),
  79. "district": util.ObjToString(site_dict["district"]),
  80. "sitetype": util.ObjToString(site_dict["sitetype"]),
  81. "level": util.ObjToString(site_dict["level"]),
  82. }
  83. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  84. }
  85. fmt.Printf("站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  86. }
  87. func main() {
  88. go checkMapJob()
  89. updport := Sysconfig["udpport"].(string)
  90. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  91. udpclient.Listen(processUdpMsg)
  92. log.Println("Udp服务监听", updport)
  93. time.Sleep(99999 * time.Hour)
  94. }
  95. //测试组人员使用
  96. func mainT() {
  97. //sid = "5da3f31aa5cb26b9b798d3aa"
  98. //eid = "5da422fba5cb26b9b706984b"
  99. mapinfo := map[string]interface{}{}
  100. if sid == "" || eid == "" {
  101. log.Println("sid,eid参数不能为空")
  102. os.Exit(0)
  103. }
  104. mapinfo["gtid"] = sid
  105. mapinfo["lteid"] = eid
  106. mapinfo["stop"] = "true"
  107. task([]byte{}, mapinfo)
  108. time.Sleep(10 * time.Second)
  109. }
  110. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  111. fmt.Println("接受的段数据")
  112. switch act {
  113. case mu.OP_TYPE_DATA: //上个节点的数据
  114. //从表中开始处理
  115. var mapInfo map[string]interface{}
  116. err := json.Unmarshal(data, &mapInfo)
  117. log.Println("err:", err, "mapInfo:", mapInfo)
  118. if err != nil {
  119. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  120. } else if mapInfo != nil {
  121. taskType := util.ObjToString(mapInfo["stype"])
  122. if taskType == "historyTask" {
  123. //更新流程
  124. go historyTask(data, mapInfo)
  125. } else if taskType == "normalTask" {
  126. //判重流程
  127. go task(data, mapInfo)
  128. } else {
  129. //其他
  130. go task(data, mapInfo)
  131. }
  132. key, _ := mapInfo["key"].(string)
  133. if key == "" {
  134. key = "udpok"
  135. }
  136. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  137. }
  138. case mu.OP_NOOP: //下个节点回应
  139. ok := string(data)
  140. if ok != "" {
  141. log.Println("ok:", ok)
  142. udptaskmap.Delete(ok)
  143. }
  144. }
  145. }
  146. //开始判重程序
  147. func task(data []byte, mapInfo map[string]interface{}) {
  148. fmt.Println("开始数据判重")
  149. defer util.Catch()
  150. //区间id
  151. sess := mgo.GetMgoConn()
  152. defer mgo.DestoryMongoConn(sess)
  153. var q map[string]interface{}
  154. if idtype == "1" {
  155. q = map[string]interface{}{
  156. "_id": map[string]interface{}{
  157. "$gt": mapInfo["gtid"].(string),
  158. "$lte": mapInfo["lteid"].(string),
  159. },
  160. }
  161. } else {
  162. q = map[string]interface{}{
  163. "_id": map[string]interface{}{
  164. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  165. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  166. },
  167. }
  168. }
  169. log.Println(mgo.DbName,extract,q)
  170. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  171. updateExtract := [][]map[string]interface{}{}
  172. log.Println("线程数:",threadNum)
  173. pool := make(chan bool, threadNum)
  174. wg := &sync.WaitGroup{}
  175. //mapLock := &sync.Mutex{}
  176. n, repeateN := 0, 0
  177. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  178. if n%10000 == 0 {
  179. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  180. }
  181. pool <- true
  182. wg.Add(1)
  183. go func(tmp map[string]interface{}) {
  184. defer func() {
  185. <-pool
  186. wg.Done()
  187. }()
  188. info := NewInfo(tmp)
  189. //是否为无效数据
  190. if invalidData(info.buyer, info.projectname, info.projectcode,info.contractnumber) {
  191. updateExtract = append(updateExtract, []map[string]interface{}{
  192. map[string]interface{}{
  193. "_id": tmp["_id"],
  194. },
  195. map[string]interface{}{
  196. "$set": map[string]interface{}{
  197. "repeat": -1,
  198. },
  199. },
  200. })
  201. if len(updateExtract) > 500 {
  202. mgo.UpdateBulk(extract, updateExtract...)
  203. updateExtract = [][]map[string]interface{}{}
  204. }
  205. } else {
  206. b, source, reason := DM.check(info)
  207. if b { //有重复,生成更新语句,更新抽取和更新招标
  208. repeateN++
  209. var mergeArr = []int64{} //更改合并数组记录
  210. var newData = &Info{} //更换新的数据池数据
  211. var id_map = map[string]interface{}{}
  212. repeat_id := source.id
  213. if idtype == "1" {
  214. id_map["_id"] = info.id
  215. } else {
  216. id_map["_id"] = util.StringTOBsonId(info.id)
  217. }
  218. if isMerger {
  219. //需要合并相关操作-合并操作--评功权重打分-合并完替换原始数据池
  220. basic_bool := basicDataScore(source, info)
  221. if basic_bool {
  222. //已原始数据为标准-对比数据打判重标签
  223. newData, mergeArr = mergeDataFields(source, info)
  224. DM.replaceSourceData(newData, source.id) //替换
  225. if idtype == "1" {
  226. id_map["_id"] = source.id
  227. } else {
  228. id_map["_id"] = util.StringTOBsonId(source.id)
  229. }
  230. repeat_id = source.id
  231. } else {
  232. //已对比数据为标准 ,数据池的数据打判重标签
  233. newData, mergeArr = mergeDataFields(info, source)
  234. DM.replaceSourceData(newData, source.id) //替换
  235. if idtype == "1" {
  236. id_map["_id"] = info.id
  237. } else {
  238. id_map["_id"] = util.StringTOBsonId(info.id)
  239. }
  240. repeat_id = info.id
  241. }
  242. }
  243. var update_map = map[string]interface{}{
  244. "$set": map[string]interface{}{
  245. "repeat_reason": reason,
  246. "repeat": 1,
  247. "repeatid": repeat_id,
  248. },
  249. }
  250. if isMerger {
  251. if len(newData.mergemap) > 0 {
  252. update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap
  253. }
  254. //更新合并后的数据
  255. for _, value := range mergeArr {
  256. if value == 1 {
  257. update_map["$set"].(map[string]interface{})["area"] = newData.area
  258. update_map["$set"].(map[string]interface{})["city"] = newData.city
  259. } else if value == 2 {
  260. update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  261. } else if value == 3 {
  262. update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  263. } else if value == 4 {
  264. update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  265. } else if value == 5 {
  266. update_map["$set"].(map[string]interface{})["budget"] = newData.budget
  267. } else if value == 6 {
  268. update_map["$set"].(map[string]interface{})["winner"] = newData.winner
  269. } else if value == 7 {
  270. update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  271. } else if value == 8 {
  272. update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  273. } else if value == 9 {
  274. update_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  275. }else {
  276. }
  277. }
  278. }
  279. //构建数据库更新用到的
  280. updateExtract = append(updateExtract, []map[string]interface{}{
  281. id_map,
  282. update_map,
  283. })
  284. }
  285. }
  286. }(tmp)
  287. if len(updateExtract) > 500 {
  288. mgo.UpdateBulk(extract, updateExtract...)
  289. updateExtract = [][]map[string]interface{}{}
  290. }
  291. tmp = make(map[string]interface{})
  292. }
  293. wg.Wait()
  294. if len(updateExtract) > 0 {
  295. mgo.UpdateBulk(extract, updateExtract...)
  296. //mgo.UpdateBulk(bidding, updateBidding...)
  297. }
  298. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  299. //任务完成,开始发送广播通知下面节点
  300. if n > repeateN && mapInfo["stop"] == nil {
  301. for _, to := range nextNode {
  302. sid, _ := mapInfo["gtid"].(string)
  303. eid, _ := mapInfo["lteid"].(string)
  304. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  305. by, _ := json.Marshal(map[string]interface{}{
  306. "gtid": sid,
  307. "lteid": eid,
  308. "stype": util.ObjToString(to["stype"]),
  309. "key": key,
  310. })
  311. addr := &net.UDPAddr{
  312. IP: net.ParseIP(to["addr"].(string)),
  313. Port: util.IntAll(to["port"]),
  314. }
  315. node := &udpNode{by, addr, time.Now().Unix(), 0}
  316. udptaskmap.Store(key, node)
  317. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  318. }
  319. }
  320. }
  321. //支持历史更新
  322. func historyTask(data []byte, mapInfo map[string]interface{}) {
  323. fmt.Println("开始取历史时间段")
  324. defer util.Catch()
  325. sess := mgo.GetMgoConn()
  326. defer mgo.DestoryMongoConn(sess)
  327. var q map[string]interface{}
  328. if idtype == "1" {
  329. q = map[string]interface{}{
  330. "_id": map[string]interface{}{
  331. "$gt": mapInfo["gtid"].(string),
  332. "$lte": mapInfo["lteid"].(string),
  333. },
  334. }
  335. } else {
  336. q = map[string]interface{}{
  337. "_id": map[string]interface{}{
  338. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  339. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  340. },
  341. }
  342. }
  343. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  344. minTime, maxTime := int64(0), int64(0)
  345. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  346. //取出最大最小时间
  347. if minTime == 0 || maxTime == 0 &&util.Int64All(tmp["publishtime"])!=0{
  348. minTime = util.Int64All(tmp["publishtime"])
  349. maxTime = util.Int64All(tmp["publishtime"])
  350. } else {
  351. t := util.Int64All(tmp["publishtime"])
  352. if t < minTime && t != 0 {
  353. minTime = t
  354. }
  355. if t > maxTime && t != 0 {
  356. maxTime = t
  357. }
  358. }
  359. }
  360. //时间不正确时
  361. if minTime==0&&maxTime==0 {
  362. log.Println("段数据区间 publishtime不符合")
  363. return
  364. }
  365. fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
  366. gtid,lteid:= util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
  367. fmt.Println(gtid,lteid)
  368. HM = NewHistorymap(gtid,lteid, minTime, maxTime)
  369. fmt.Println("开始历史数据判重")
  370. defer util.Catch()
  371. //区间id
  372. sess_history:= mgo.GetMgoConn()
  373. defer mgo.DestoryMongoConn(sess_history)
  374. var q_history map[string]interface{}
  375. if idtype == "1" {
  376. q_history = map[string]interface{}{
  377. "_id": map[string]interface{}{
  378. "$gt": mapInfo["gtid"].(string),
  379. "$lte": mapInfo["lteid"].(string),
  380. },
  381. }
  382. } else {
  383. q_history = map[string]interface{}{
  384. "_id": map[string]interface{}{
  385. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  386. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  387. },
  388. }
  389. }
  390. log.Println(mgo.DbName,extract,q_history)
  391. it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
  392. updateExtract := [][]map[string]interface{}{}
  393. log.Println("线程数:",threadNum)
  394. pool := make(chan bool, threadNum)
  395. wg := &sync.WaitGroup{}
  396. //mapLock := &sync.Mutex{}
  397. n, repeateN := 0, 0
  398. for tmp := make(map[string]interface{}); it_history.Next(&tmp); n++ {
  399. if n%10000 == 0 {
  400. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  401. }
  402. pool <- true
  403. wg.Add(1)
  404. go func(tmp map[string]interface{}) {
  405. defer func() {
  406. <-pool
  407. wg.Done()
  408. }()
  409. info := NewInfo(tmp)
  410. if invalidData(info.buyer, info.projectname, info.projectcode,info.contractnumber) {
  411. //mapLock.Lock()
  412. updateExtract = append(updateExtract, []map[string]interface{}{
  413. map[string]interface{}{
  414. "_id": tmp["_id"],
  415. },
  416. map[string]interface{}{
  417. "$set": map[string]interface{}{
  418. "repeat": -1,
  419. },
  420. },
  421. })
  422. if len(updateExtract) > 500 {
  423. mgo.UpdateBulk(extract, updateExtract...)
  424. updateExtract = [][]map[string]interface{}{}
  425. }
  426. //mapLock.Unlock()
  427. } else {
  428. b, source, reason := HM.checkHistory(info)
  429. if b { //有重复,生成更新语句,更新抽取和更新招标
  430. if reason == "未判重记录" {
  431. fmt.Println("未判重记录")
  432. //把info的数据判重的标签更换,并新增字段
  433. DM.replaceSourceData(info, info.id) //替换即添加
  434. updateExtract = append(updateExtract, []map[string]interface{}{
  435. map[string]interface{}{
  436. "_id": tmp["_id"],
  437. },
  438. map[string]interface{}{
  439. "$set": map[string]interface{}{
  440. "repeat": 0,
  441. "repeatid": -2,
  442. },
  443. },
  444. })
  445. } else {
  446. repeateN++
  447. var mergeArr = []int64{} //更改合并数组记录
  448. var newData = &Info{} //更换新的数据池数据
  449. var id_map = map[string]interface{}{}
  450. repeat_id := source.id
  451. if idtype == "1" {
  452. id_map["_id"] = info.id
  453. } else {
  454. id_map["_id"] = util.StringTOBsonId(info.id)
  455. }
  456. if isMerger {
  457. //需要合并相关操作-合并操作--评功权重打分-合并完替换原始数据池
  458. basic_bool := basicDataScore(source, info)
  459. if basic_bool {
  460. //已原始数据为标准-对比数据打判重标签
  461. newData, mergeArr = mergeDataFields(source, info)
  462. DM.replaceSourceData(newData, source.id) //替换
  463. if idtype == "1" {
  464. id_map["_id"] = source.id
  465. } else {
  466. id_map["_id"] = util.StringTOBsonId(source.id)
  467. }
  468. repeat_id = source.id
  469. } else {
  470. //已对比数据为标准 ,数据池的数据打判重标签
  471. newData, mergeArr = mergeDataFields(info, source)
  472. DM.replaceSourceData(newData, source.id) //替换
  473. if idtype == "1" {
  474. id_map["_id"] = info.id
  475. } else {
  476. id_map["_id"] = util.StringTOBsonId(info.id)
  477. }
  478. repeat_id = info.id
  479. }
  480. }
  481. var update_map = map[string]interface{}{
  482. "$set": map[string]interface{}{
  483. "repeat_reason": reason,
  484. "repeat": 1,
  485. "repeatid": repeat_id,
  486. },
  487. }
  488. if isMerger {
  489. //合并记录
  490. if len(newData.mergemap) > 0 {
  491. update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap
  492. }
  493. //更新合并后的数据
  494. for _, value := range mergeArr {
  495. if value == 1 {
  496. update_map["$set"].(map[string]interface{})["area"] = newData.area
  497. update_map["$set"].(map[string]interface{})["city"] = newData.city
  498. } else if value == 2 {
  499. update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  500. } else if value == 3 {
  501. update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  502. } else if value == 4 {
  503. update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  504. } else if value == 5 {
  505. update_map["$set"].(map[string]interface{})["budget"] = newData.budget
  506. } else if value == 6 {
  507. update_map["$set"].(map[string]interface{})["winner"] = newData.winner
  508. } else if value == 7 {
  509. update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  510. } else if value == 8 {
  511. update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  512. } else if value == 9 {
  513. update_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  514. }else {
  515. }
  516. }
  517. }
  518. //构建数据库更新用到的
  519. updateExtract = append(updateExtract, []map[string]interface{}{
  520. id_map,
  521. update_map,
  522. })
  523. }
  524. }
  525. }
  526. }(tmp)
  527. if len(updateExtract) > 500 {
  528. mgo.UpdateBulk(extract, updateExtract...)
  529. updateExtract = [][]map[string]interface{}{}
  530. }
  531. tmp = make(map[string]interface{})
  532. }
  533. wg.Wait()
  534. if len(updateExtract) > 0 {
  535. mgo.UpdateBulk(extract, updateExtract...)
  536. //mgo.UpdateBulk(bidding, updateBidding...)
  537. }
  538. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  539. //任务完成,开始发送广播通知下面节点
  540. if n > repeateN && mapInfo["stop"] == nil {
  541. for _, to := range nextNode {
  542. sid, _ := mapInfo["gtid"].(string)
  543. eid, _ := mapInfo["lteid"].(string)
  544. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  545. by, _ := json.Marshal(map[string]interface{}{
  546. "gtid": sid,
  547. "lteid": eid,
  548. "stype": util.ObjToString(to["stype"]),
  549. "key": key,
  550. })
  551. addr := &net.UDPAddr{
  552. IP: net.ParseIP(to["addr"].(string)),
  553. Port: util.IntAll(to["port"]),
  554. }
  555. node := &udpNode{by, addr, time.Now().Unix(), 0}
  556. udptaskmap.Store(key, node)
  557. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  558. }
  559. }
  560. }
  561. //合并字段
  562. func mergeDataFields(source *Info, info *Info) (*Info, []int64) {
  563. var mergeArr []int64
  564. mergeArr = make([]int64, 0)
  565. //1、城市
  566. if (source.area == "" || source.area == "全国") && info.area != "全国" && info.area != "" {
  567. var arrA []string
  568. if source.mergemap["area"] == nil {
  569. arrA = make([]string, 0)
  570. } else {
  571. arrA = source.mergemap["area"].([]string)
  572. }
  573. arrA = append(arrA, source.area)
  574. source.mergemap["area"] = arrA
  575. var arrC []string
  576. if source.mergemap["city"] == nil {
  577. arrC = make([]string, 0)
  578. } else {
  579. arrC = source.mergemap["city"].([]string)
  580. }
  581. arrC = append(arrC, source.city)
  582. source.mergemap["city"] = arrC
  583. source.area = info.area
  584. source.city = info.city
  585. mergeArr = append(mergeArr, 1)
  586. }
  587. //2、项目名称
  588. if source.projectname == "" && info.projectname != "" {
  589. var arr []string
  590. if source.mergemap["projectname"] == nil {
  591. arr = make([]string, 0)
  592. } else {
  593. arr = source.mergemap["projectname"].([]string)
  594. }
  595. arr = append(arr, source.projectname)
  596. source.mergemap["projectname"] = arr
  597. source.projectname = info.projectname
  598. mergeArr = append(mergeArr, 2)
  599. }
  600. //3、项目编号
  601. if source.projectcode == "" && info.projectcode != "" {
  602. var arr []string
  603. if source.mergemap["projectcode"] == nil {
  604. arr = make([]string, 0)
  605. } else {
  606. arr = source.mergemap["projectcode"].([]string)
  607. }
  608. arr = append(arr, source.projectcode)
  609. source.mergemap["projectcode"] = arr
  610. source.projectcode = info.projectcode
  611. mergeArr = append(mergeArr, 3)
  612. }
  613. //4、采购单位
  614. if source.buyer == "" && info.buyer != "" {
  615. var arr []string
  616. if source.mergemap["buyer"] == nil {
  617. arr = make([]string, 0)
  618. } else {
  619. arr = source.mergemap["buyer"].([]string)
  620. }
  621. arr = append(arr, source.buyer)
  622. source.mergemap["buyer"] = arr
  623. source.buyer = info.buyer
  624. mergeArr = append(mergeArr, 4)
  625. }
  626. //5、预算
  627. if source.budget == 0 && info.budget != 0 {
  628. var arr []float64
  629. if source.mergemap["budget"] == nil {
  630. arr = make([]float64, 0)
  631. } else {
  632. arr = source.mergemap["budget"].([]float64)
  633. }
  634. arr = append(arr, source.budget)
  635. source.mergemap["budget"] = arr
  636. source.budget = info.budget
  637. mergeArr = append(mergeArr, 5)
  638. }
  639. //6、中标单位
  640. if source.winner == "" && info.winner != "" {
  641. var arr []string
  642. if source.mergemap["winner"] == nil {
  643. arr = make([]string, 0)
  644. } else {
  645. arr = source.mergemap["winner"].([]string)
  646. }
  647. arr = append(arr, source.winner)
  648. source.mergemap["winner"] = arr
  649. source.winner = info.winner
  650. mergeArr = append(mergeArr, 6)
  651. }
  652. //7、中标金额
  653. if source.bidamount == 0 && info.bidamount != 0 {
  654. var arr []float64
  655. if source.mergemap["bidamount"] == nil {
  656. arr = make([]float64, 0)
  657. } else {
  658. arr = source.mergemap["bidamount"].([]float64)
  659. }
  660. arr = append(arr, source.bidamount)
  661. source.mergemap["bidamount"] = arr
  662. source.bidamount = info.bidamount
  663. mergeArr = append(mergeArr, 7)
  664. }
  665. //8、开标时间-地点
  666. if source.bidopentime == 0 && info.bidopentime != 0 {
  667. var arr []int64
  668. if source.mergemap["bidopentime"] == nil {
  669. arr = make([]int64, 0)
  670. } else {
  671. arr = source.mergemap["bidopentime"].([]int64)
  672. }
  673. arr = append(arr, source.bidopentime)
  674. source.mergemap["bidopentime"] = arr
  675. source.bidopentime = info.bidopentime
  676. mergeArr = append(mergeArr, 8)
  677. }
  678. //9、合同编号
  679. if source.contractnumber == "" && info.contractnumber != "" {
  680. var arr []string
  681. if source.mergemap["contractnumber"] == nil {
  682. arr = make([]string, 0)
  683. } else {
  684. arr = source.mergemap["contractnumber"].([]string)
  685. }
  686. arr = append(arr, source.contractnumber)
  687. source.mergemap["contractnumber"] = arr
  688. source.contractnumber = info.contractnumber
  689. mergeArr = append(mergeArr, 9)
  690. }
  691. //以上合并过于简单,待进一步优化
  692. return source, mergeArr
  693. }
  694. //权重评估
  695. func basicDataScore(v *Info, info *Info) bool {
  696. /*
  697. 权重评估
  698. 网站优先级判定规则:
  699. 1、中央>省>市>县区
  700. 2、政府采购>公共资源>采购单位官网>招标代理公司/平台
  701. */
  702. v_score, info_score := -1, -1
  703. dict_v := SiteMap[v.site]
  704. dict_info := SiteMap[info.site]
  705. //先判断level
  706. if dict_v != nil {
  707. v_level := util.ObjToString(dict_v["level"])
  708. if v_level == "中央" {
  709. v_score = 4
  710. } else if v_level == "省级" {
  711. v_score = 3
  712. } else if v_level == "市级" {
  713. v_score = 2
  714. } else if v_level == "县区" {
  715. v_score = 1
  716. } else if v_level == "" {
  717. } else {
  718. v_score = 0
  719. }
  720. }
  721. if dict_info != nil {
  722. info_level := util.ObjToString(dict_info["level"])
  723. if info_level == "中央" {
  724. info_score = 4
  725. } else if info_level == "省级" {
  726. info_score = 3
  727. } else if info_level == "市级" {
  728. info_score = 2
  729. } else if info_level == "县区" {
  730. info_score = 1
  731. } else if info_level == "" {
  732. } else {
  733. v_score = 0
  734. }
  735. }
  736. if v_score > info_score {
  737. return true
  738. }
  739. if v_score < info_score {
  740. return false
  741. }
  742. //判断sitetype
  743. if dict_v != nil {
  744. v_sitetype := util.ObjToString(dict_v["sitetype"])
  745. if v_sitetype == "政府采购" || v_sitetype == "政府门户" {
  746. v_score = 4
  747. } else if v_sitetype == "公共资源" {
  748. v_score = 3
  749. } else if v_sitetype == "官方网站" {
  750. v_score = 2
  751. } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
  752. v_score = 1
  753. } else if v_sitetype == "" {
  754. } else {
  755. v_score = 0
  756. }
  757. }
  758. if dict_info != nil {
  759. info_sitetype := util.ObjToString(dict_info["sitetype"])
  760. if info_sitetype == "政府采购" || info_sitetype == "政府门户" {
  761. info_score = 4
  762. } else if info_sitetype == "公共资源" {
  763. info_score = 3
  764. } else if info_sitetype == "官方网站" {
  765. info_score = 2
  766. } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
  767. info_score = 1
  768. } else if info_sitetype == "" {
  769. } else {
  770. info_score = 0
  771. }
  772. }
  773. if v_score > info_score {
  774. return true
  775. }
  776. if v_score < info_score {
  777. return false
  778. }
  779. //网站评估
  780. m, n := 0, 0
  781. if v.projectname != "" {
  782. m++
  783. }
  784. if v.buyer != "" {
  785. m++
  786. }
  787. if v.projectcode != ""||v.contractnumber != "" {
  788. m++
  789. }
  790. if v.budget != 0 {
  791. m++
  792. }
  793. if v.bidamount != 0 {
  794. m++
  795. }
  796. if v.winner != "" {
  797. m++
  798. }
  799. if v.bidopentime != 0 {
  800. m++
  801. }
  802. if v.agencyaddr != "" {
  803. m++
  804. }
  805. if v.agency != "" {
  806. m = m + 2
  807. }
  808. if v.city != "" {
  809. m = m + 2
  810. }
  811. if info.projectname != "" {
  812. n++
  813. }
  814. if info.buyer != "" {
  815. n++
  816. }
  817. if info.projectcode != "" || info.contractnumber != ""{
  818. n++
  819. }
  820. if info.budget != 0 {
  821. n++
  822. }
  823. if info.bidamount != 0 {
  824. n++
  825. }
  826. if info.winner != "" {
  827. n++
  828. }
  829. if info.bidopentime != 0 {
  830. n++
  831. }
  832. if info.agencyaddr != "" {
  833. n++
  834. }
  835. if info.agency != "" {
  836. n = n + 2
  837. }
  838. if info.city != "" {
  839. n = n + 2
  840. }
  841. if m > n {
  842. return true
  843. } else if m == n {
  844. if v.publishtime >= info.publishtime {
  845. return true
  846. } else {
  847. return false
  848. }
  849. } else {
  850. return false
  851. }
  852. }
  853. //无效数据
  854. func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
  855. var n int
  856. if d1 != "" {
  857. n++
  858. }
  859. if d2 != "" {
  860. n++
  861. }
  862. if d3 != "" {
  863. n++
  864. }
  865. if d4 != "" {
  866. n++
  867. }
  868. if n == 0 {
  869. return true
  870. }
  871. return false
  872. }