main.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "github.com/cron"
  10. "log"
  11. mu "mfw/util"
  12. "net"
  13. "os"
  14. "qfw/util"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "time"
  19. "gopkg.in/mgo.v2/bson"
  20. )
  21. var (
  22. Sysconfig map[string]interface{} //配置文件
  23. mconf map[string]interface{} //mongodb配置信息
  24. mgo *MongodbSim //mongodb操作对象
  25. extract string
  26. extract_back string
  27. udpclient mu.UdpClient //udp对象
  28. nextNode []map[string]interface{} //下节点数组
  29. dupdays = 5 //初始化判重范围
  30. DM *datamap //
  31. //正则筛选相关
  32. FilterRegTitle = regexp.MustCompile("^_$")
  33. FilterRegTitle_0 = regexp.MustCompile("^_$")
  34. FilterRegTitle_1 = regexp.MustCompile("^_$")
  35. FilterRegTitle_2 = regexp.MustCompile("^_$")
  36. isMerger bool //是否合并
  37. Is_Sort bool //是否排序
  38. threadNum int //线程数量
  39. SiteMap map[string]map[string]interface{} //站点map
  40. LowHeavy bool //低质量数据判重
  41. TimingTask bool //是否定时任务
  42. timingSpanDay int64 //时间跨度
  43. timingPubScope int64 //发布时间周期
  44. sid,eid,lastid string //测试人员判重使用
  45. IdType bool //默认object类型
  46. )
  47. func init() {
  48. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  49. flag.StringVar(&sid, "sid", "", "开始id")
  50. flag.StringVar(&eid, "eid", "", "结束id")
  51. flag.Parse()
  52. //172.17.145.163:27080
  53. util.ReadConfig(&Sysconfig)
  54. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  55. mconf = Sysconfig["mongodb"].(map[string]interface{})
  56. mgo = &MongodbSim{
  57. MongodbAddr: mconf["addr"].(string),
  58. DbName: mconf["db"].(string),
  59. Size: util.IntAllDef(mconf["pool"], 10),
  60. }
  61. mgo.InitPool()
  62. extract = mconf["extract"].(string)
  63. extract_back = mconf["extract_back"].(string)
  64. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  65. //加载数据
  66. DM = NewDatamap(dupdays, lastid)
  67. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  68. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  69. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  70. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  71. isMerger = Sysconfig["isMerger"].(bool)
  72. Is_Sort = Sysconfig["isSort"].(bool)
  73. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  74. LowHeavy = Sysconfig["lowHeavy"].(bool)
  75. TimingTask = Sysconfig["timingTask"].(bool)
  76. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  77. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  78. //站点配置
  79. site := mconf["site"].(map[string]interface{})
  80. SiteMap = make(map[string]map[string]interface{}, 0)
  81. start := int(time.Now().Unix())
  82. sess_site := mgo.GetMgoConn()
  83. defer mgo.DestoryMongoConn(sess_site)
  84. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  85. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  86. data_map := map[string]interface{}{
  87. "area": util.ObjToString(site_dict["area"]),
  88. "city": util.ObjToString(site_dict["city"]),
  89. "district": util.ObjToString(site_dict["district"]),
  90. "sitetype": util.ObjToString(site_dict["sitetype"]),
  91. "level": util.ObjToString(site_dict["level"]),
  92. "weight": util.ObjToString(site_dict["weight"]),
  93. }
  94. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  95. }
  96. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  97. }
  98. func main() {
  99. go checkMapJob()
  100. updport := Sysconfig["udpport"].(string)
  101. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  102. udpclient.Listen(processUdpMsg)
  103. log.Println("Udp服务监听", updport)
  104. if TimingTask {
  105. go timedTaskDay()
  106. }
  107. time.Sleep(99999 * time.Hour)
  108. }
  109. //测试组人员使用
  110. func mainT() {
  111. if TimingTask {
  112. log.Println("定时任务测试开始")
  113. go timedTaskDay()
  114. time.Sleep(99999 * time.Hour)
  115. } else {
  116. //2019年8月1日-8月17日 712646
  117. /*
  118. sid = "5d55031fa5cb26b9b7f57570"
  119. eid = "5e8c02b150b5ea296eed4509"
  120. 5e933b1a50b5ea296ef0e839
  121. */
  122. //IdType = true
  123. sid = "5ece4b1b9e628c59915eb257"
  124. eid = "5ed55b6d9e628c599161977c"
  125. log.Println("正常判重测试开始")
  126. log.Println(sid, "---", eid)
  127. mapinfo := map[string]interface{}{}
  128. if sid == "" || eid == "" {
  129. log.Println("sid,eid参数不能为空")
  130. os.Exit(0)
  131. }
  132. mapinfo["gtid"] = sid
  133. mapinfo["lteid"] = eid
  134. mapinfo["stop"] = "true"
  135. task([]byte{}, mapinfo)
  136. time.Sleep(99999 * time.Hour)
  137. }
  138. }
  139. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  140. fmt.Println("接受的段数据")
  141. switch act {
  142. case mu.OP_TYPE_DATA: //上个节点的数据
  143. //从表中开始处理
  144. var mapInfo map[string]interface{}
  145. err := json.Unmarshal(data, &mapInfo)
  146. log.Println("err:", err, "mapInfo:", mapInfo)
  147. if err != nil {
  148. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  149. } else if mapInfo != nil {
  150. taskType := util.ObjToString(mapInfo["stype"])
  151. if taskType == "normalTask" {
  152. //判重流程
  153. go task(data, mapInfo)
  154. } else {
  155. //其他
  156. go task(data, mapInfo)
  157. }
  158. key, _ := mapInfo["key"].(string)
  159. if key == "" {
  160. key = "udpok"
  161. }
  162. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  163. }
  164. case mu.OP_NOOP: //下个节点回应
  165. ok := string(data)
  166. if ok != "" {
  167. log.Println("ok:", ok)
  168. udptaskmap.Delete(ok)
  169. }
  170. }
  171. }
  172. //开始判重程序
  173. func task(data []byte, mapInfo map[string]interface{}) {
  174. log.Println("开始数据判重")
  175. defer util.Catch()
  176. //区间id
  177. q := map[string]interface{}{
  178. "_id": map[string]interface{}{
  179. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  180. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  181. },
  182. }
  183. if IdType {
  184. q = map[string]interface{}{
  185. "_id": map[string]interface{}{
  186. "$gt": mapInfo["gtid"].(string),
  187. "$lte": mapInfo["lteid"].(string),
  188. },
  189. }
  190. }
  191. log.Println(mgo.DbName, extract, q)
  192. sess := mgo.GetMgoConn()
  193. defer mgo.DestoryMongoConn(sess)
  194. //是否排序
  195. sortName :="_id"
  196. if Is_Sort {
  197. sortName = "publishtime"
  198. log.Println("排序")
  199. }
  200. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort(sortName).Iter()
  201. updateExtract := [][]map[string]interface{}{}
  202. ids:=[]string{}
  203. log.Println("线程数:", threadNum)
  204. pool := make(chan bool, threadNum)
  205. wg := &sync.WaitGroup{}
  206. n, repeateN := 0, 0
  207. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  208. if n%10000 == 0 {
  209. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  210. }
  211. source := util.ObjToMap(tmp["jsondata"])
  212. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  213. repeateN++
  214. updateExtract = append(updateExtract, []map[string]interface{}{
  215. map[string]interface{}{
  216. "_id": tmp["_id"],
  217. },
  218. map[string]interface{}{
  219. "$set": map[string]interface{}{
  220. "repeat": 1,
  221. "repeat_reason": "sourcewebsite为1,重复",
  222. },
  223. },
  224. })
  225. if len(updateExtract) >= 200 {
  226. mgo.UpSertBulk(extract, updateExtract...)
  227. updateExtract = [][]map[string]interface{}{}
  228. }
  229. tmp = make(map[string]interface{})
  230. continue
  231. }
  232. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
  233. util.IntAll(tmp["dataging"]) == 1 {
  234. if util.IntAll(tmp["repeat"]) == 1 {
  235. repeateN++
  236. }
  237. tmp = make(map[string]interface{})
  238. continue
  239. }
  240. pool <- true
  241. wg.Add(1)
  242. go func(tmp map[string]interface{}) {
  243. defer func() {
  244. <-pool
  245. wg.Done()
  246. }()
  247. info := NewInfo(tmp)
  248. if !LowHeavy { //是否进行低质量数据判重
  249. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  250. updateExtract = append(updateExtract, []map[string]interface{}{
  251. map[string]interface{}{
  252. "_id": tmp["_id"],
  253. },
  254. map[string]interface{}{
  255. "$set": map[string]interface{}{
  256. "repeat": -1, //无效数据标签
  257. },
  258. },
  259. })
  260. if len(updateExtract) >= 200 {
  261. mgo.UpSertBulk(extract, updateExtract...)
  262. updateExtract = [][]map[string]interface{}{}
  263. }
  264. return
  265. }
  266. }
  267. b, source, reason := DM.check(info)
  268. if b { //有重复,生成更新语句,更新抽取和更新招标
  269. repeateN++
  270. var is_replace = false
  271. var mergeArr = []int64{} //更改合并数组记录
  272. var newData = &Info{} //更换新的数据池数据
  273. var repeat_idMap = map[string]interface{}{} //记录判重的
  274. var merge_idMap = map[string]interface{}{} //记录合并的
  275. repeat_idMap["_id"] = StringTOBsonId(info.id)
  276. if IdType {
  277. repeat_idMap["_id"] = info.id
  278. }
  279. merge_idMap["_id"] = StringTOBsonId(source.id)
  280. repeat_id := source.id //初始化一个数据
  281. if isMerger { //合并相关
  282. basic_bool := basicDataScore(source, info)
  283. if basic_bool {
  284. //已原始数据为标准 - 对比数据打判重标签-
  285. newData, mergeArr, is_replace = mergeDataFields(source, info)
  286. DM.replaceSourceData(newData, source) //替换
  287. //对比数据打重复标签的id,原始数据id的记录
  288. repeat_idMap["_id"] = StringTOBsonId(info.id)
  289. merge_idMap["_id"] = StringTOBsonId(source.id)
  290. if IdType {
  291. repeat_idMap["_id"] = info.id
  292. merge_idMap["_id"] = source.id
  293. }
  294. repeat_id = source.id
  295. } else {
  296. //已对比数据为标准 ,数据池的数据打判重标签
  297. newData, mergeArr, is_replace = mergeDataFields(info, source)
  298. DM.replaceSourceData(newData, source) //替换
  299. //原始数据打重复标签的id, 对比数据id的记录
  300. repeat_idMap["_id"] = StringTOBsonId(source.id)
  301. merge_idMap["_id"] = StringTOBsonId(info.id)
  302. if IdType {
  303. repeat_idMap["_id"] = source.id
  304. merge_idMap["_id"] = info.id
  305. }
  306. repeat_id = info.id
  307. }
  308. merge_map := make(map[string]interface{}, 0)
  309. if is_replace { //有过合并-更新数据
  310. merge_map = map[string]interface{}{
  311. "$set": map[string]interface{}{
  312. "merge": newData.mergemap,
  313. },
  314. }
  315. //更新合并后的数据
  316. for _, value := range mergeArr {
  317. if value == 0 {
  318. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  319. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  320. } else if value == 1 {
  321. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  322. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  323. } else if value == 2 {
  324. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  325. } else if value == 3 {
  326. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  327. } else if value == 4 {
  328. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  329. } else if value == 5 {
  330. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  331. } else if value == 6 {
  332. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  333. } else if value == 7 {
  334. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  335. } else if value == 8 {
  336. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  337. } else if value == 9 {
  338. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  339. } else if value == 10 {
  340. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  341. } else if value == 11 {
  342. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  343. } else {
  344. }
  345. }
  346. //模板数据更新
  347. updateExtract = append(updateExtract, []map[string]interface{}{
  348. merge_idMap,
  349. merge_map,
  350. })
  351. }
  352. } else { //高质量数据
  353. basic_bool := basicDataScore(source, info)
  354. if !basic_bool {
  355. DM.replaceSourceData(info, source) //替换
  356. repeat_idMap["_id"] = StringTOBsonId(source.id)
  357. if IdType {
  358. repeat_idMap["_id"] = source.id
  359. }
  360. repeat_id = info.id
  361. if len(ids)>=9 {
  362. ids=append(ids,source.id)
  363. for _, to := range nextNode {
  364. key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
  365. by, _ := json.Marshal(map[string]interface{}{
  366. "gtid": source.id,
  367. "lteid": source.id,
  368. "stype": util.ObjToString(to["stype"]),
  369. "key": key,
  370. "ids": strings.Join(ids, ","),
  371. })
  372. addr := &net.UDPAddr{
  373. IP: net.ParseIP(to["addr"].(string)),
  374. Port: util.IntAll(to["port"]),
  375. }
  376. node := &udpNode{by, addr, time.Now().Unix(), 0}
  377. udptaskmap.Store(key, node)
  378. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  379. }
  380. //
  381. ids = []string{}
  382. }else {
  383. ids=append(ids,source.id)
  384. }
  385. }
  386. }
  387. //if repeateN%150==0&&repeateN>0 {
  388. // fmt.Println("最终结果","目标id:",repeat_idMap["_id"])
  389. //}
  390. //重复数据打标签
  391. updateExtract = append(updateExtract, []map[string]interface{}{
  392. repeat_idMap,
  393. map[string]interface{}{
  394. "$set": map[string]interface{}{
  395. "repeat": 1,
  396. "repeat_reason": reason,
  397. "repeat_id": repeat_id,
  398. },
  399. },
  400. })
  401. }
  402. }(tmp)
  403. if len(updateExtract) >= 200 {
  404. mgo.UpSertBulk(extract, updateExtract...)
  405. updateExtract = [][]map[string]interface{}{}
  406. }
  407. tmp = make(map[string]interface{})
  408. }
  409. wg.Wait()
  410. if len(updateExtract) > 0 {
  411. mgo.UpSertBulk(extract, updateExtract...)
  412. }
  413. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  414. //任务完成,开始发送广播通知下面节点
  415. if n > repeateN && mapInfo["stop"] == nil {
  416. log.Println("判重任务完成发送udp")
  417. for _, to := range nextNode {
  418. sid, _ := mapInfo["gtid"].(string)
  419. eid, _ := mapInfo["lteid"].(string)
  420. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  421. by, _ := json.Marshal(map[string]interface{}{
  422. "gtid": sid,
  423. "lteid": eid,
  424. "stype": util.ObjToString(to["stype"]),
  425. "key": key,
  426. "ids": strings.Join(ids, ","),
  427. })
  428. addr := &net.UDPAddr{
  429. IP: net.ParseIP(to["addr"].(string)),
  430. Port: util.IntAll(to["port"]),
  431. }
  432. node := &udpNode{by, addr, time.Now().Unix(), 0}
  433. udptaskmap.Store(key, node)
  434. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  435. }
  436. }
  437. }
  438. //定时任务
  439. func timedTaskDay() {
  440. log.Println("部署定时任务")
  441. c := cron.New()
  442. c.AddFunc("0 0 1 * * ?", func() { movedata() }) //每天凌晨1点执行一次
  443. c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() }) //每天凌晨2点执行一次
  444. c.Start()
  445. //timedTaskOnce()
  446. }
  447. func timedTaskOnce() {
  448. log.Println("开始一次定时任务")
  449. defer util.Catch()
  450. //当前时间-8 -4 小时
  451. now := time.Now()
  452. log.Println(now)
  453. preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local)
  454. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local)
  455. log.Println(preTime,curTime)
  456. task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
  457. task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
  458. between_time := curTime.Unix() - (86400 * timingPubScope)
  459. log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
  460. //区间id
  461. q_start := map[string]interface{}{
  462. "_id": map[string]interface{}{
  463. "$gte": StringTOBsonId(task_sid),
  464. "$lte": StringTOBsonId(task_eid),
  465. },
  466. }
  467. sess := mgo.GetMgoConn()
  468. defer mgo.DestoryMongoConn(sess)
  469. it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
  470. num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
  471. updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
  472. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  473. dayArr := []map[string]interface{}{}
  474. for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
  475. if num%10000 == 0 {
  476. log.Println("正序遍历:", num)
  477. }
  478. source := util.ObjToMap(tmp["jsondata"])
  479. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  480. updateExtract = append(updateExtract, []map[string]interface{}{
  481. map[string]interface{}{
  482. "_id": tmp["_id"],
  483. },
  484. map[string]interface{}{
  485. "$set": map[string]interface{}{
  486. "repeat": 1,
  487. "dataging": 0,
  488. "repeat_reason": "sourcewebsite为1 重复",
  489. },
  490. },
  491. })
  492. if len(updateExtract) > 50 {
  493. mgo.UpSertBulk(extract, updateExtract...)
  494. updateExtract = [][]map[string]interface{}{}
  495. }
  496. tmp = make(map[string]interface{})
  497. continue
  498. }
  499. //取-符合-发布时间X年内的数据
  500. if util.IntAll(tmp["dataging"]) == 1 {
  501. pubtime := util.Int64All(tmp["publishtime"])
  502. if pubtime > 0 && pubtime >= between_time {
  503. oknum++
  504. if deterTime==0 {
  505. log.Println("找到第一条符合条件的数据")
  506. deterTime = util.Int64All(tmp["publishtime"])
  507. dayArr = append(dayArr,tmp)
  508. }else {
  509. if pubtime-deterTime >timingSpanDay*86400 {
  510. //新数组重新构建,当前组数据加到全部组数据
  511. pendAllArr = append(pendAllArr,dayArr)
  512. dayArr = []map[string]interface{}{}
  513. deterTime = util.Int64All(tmp["publishtime"])
  514. dayArr = append(dayArr,tmp)
  515. }else {
  516. dayArr = append(dayArr,tmp)
  517. }
  518. }
  519. }else {
  520. //不在两年内的也清标记
  521. updateExtract = append(updateExtract, []map[string]interface{}{
  522. map[string]interface{}{
  523. "_id": tmp["_id"],
  524. },
  525. map[string]interface{}{
  526. "$set": map[string]interface{}{
  527. "dataging": 0,
  528. },
  529. },
  530. })
  531. if len(updateExtract) > 50 {
  532. mgo.UpSertBulk(extract, updateExtract...)
  533. updateExtract = [][]map[string]interface{}{}
  534. }
  535. }
  536. }
  537. tmp = make(map[string]interface{})
  538. }
  539. //批量更新标记
  540. if len(updateExtract) > 0 {
  541. mgo.UpSertBulk(extract, updateExtract...)
  542. updateExtract = [][]map[string]interface{}{}
  543. }
  544. if len(dayArr)>0 {
  545. pendAllArr = append(pendAllArr,dayArr)
  546. dayArr = []map[string]interface{}{}
  547. }
  548. log.Println("查询数量:",num,"符合条件:",oknum)
  549. if len(pendAllArr) <= 0 {
  550. log.Println("没找到dataging==1的数据")
  551. return
  552. }
  553. //测试分组数量是否正确
  554. testNum:=0
  555. for k,v:=range pendAllArr {
  556. log.Println("第",k,"组--","数量:",len(v))
  557. testNum = testNum+len(v)
  558. }
  559. log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
  560. n, repeateN := 0, 0
  561. for k,v:=range pendAllArr { //每组结束更新一波数据
  562. //构建当前组的数据池
  563. log.Println("构建第",k,"组---(数据池)")
  564. //当前组的第一个发布时间
  565. first_pt :=util.Int64All(v[0]["publishtime"])
  566. coll :=extract_back
  567. if isTaskTimeCycle(first_pt) {
  568. coll = extract
  569. }
  570. DM = TimedTaskDatamap(dupdays, first_pt,coll)
  571. log.Println("开始遍历判重第",k,"组 共计数量:",len(v))
  572. n = n+len(v)
  573. log.Println("统计目前总数量:",n,"重复数量:",repeateN)
  574. for _,tmp:=range v {
  575. info := NewInfo(tmp)
  576. if !LowHeavy { //是否进行低质量数据判重
  577. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  578. log.Println("无效数据")
  579. updateExtract = append(updateExtract, []map[string]interface{}{
  580. map[string]interface{}{
  581. "_id": tmp["_id"],
  582. },
  583. map[string]interface{}{
  584. "$set": map[string]interface{}{
  585. "repeat": -1, //无效数据标签
  586. "dataging": 0,
  587. },
  588. },
  589. })
  590. if len(updateExtract) > 50 {
  591. mgo.UpSertBulk(extract, updateExtract...)
  592. updateExtract = [][]map[string]interface{}{}
  593. }
  594. continue
  595. }
  596. }
  597. b, source, reason := DM.check(info)
  598. if b { //有重复,生成更新语句,更新抽取和更新招标
  599. log.Println("判重结果", b, reason,"目标id",info.id)
  600. repeateN++
  601. //重复数据打标签
  602. updateExtract = append(updateExtract, []map[string]interface{}{
  603. map[string]interface{}{
  604. "_id": tmp["_id"],
  605. },
  606. map[string]interface{}{
  607. "$set": map[string]interface{}{
  608. "repeat": 1,
  609. "repeat_reason": reason,
  610. "repeat_id": source.id,
  611. "dataging": 0,
  612. },
  613. },
  614. })
  615. }else {
  616. updateExtract = append(updateExtract, []map[string]interface{}{
  617. map[string]interface{}{
  618. "_id": tmp["_id"],
  619. },
  620. map[string]interface{}{
  621. "$set": map[string]interface{}{
  622. "dataging": 0,//符合条件的都为dataging==0
  623. },
  624. },
  625. })
  626. }
  627. if len(updateExtract) > 50 {
  628. mgo.UpSertBulk(extract, updateExtract...)
  629. updateExtract = [][]map[string]interface{}{}
  630. }
  631. }
  632. //每组数据结束-更新数据
  633. if len(updateExtract) > 0 {
  634. mgo.UpSertBulk(extract, updateExtract...)
  635. updateExtract = [][]map[string]interface{}{}
  636. }
  637. }
  638. if len(updateExtract) > 0 {
  639. mgo.UpSertBulk(extract, updateExtract...)
  640. updateExtract = [][]map[string]interface{}{}
  641. }
  642. log.Println("this timeTask over.", n, "repeateN:", repeateN)
  643. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  644. if n > repeateN {
  645. for _, to := range nextNode {
  646. next_sid := util.BsonIdToSId(task_sid)
  647. next_eid := util.BsonIdToSId(task_eid)
  648. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  649. by, _ := json.Marshal(map[string]interface{}{
  650. "gtid": next_sid,
  651. "lteid": next_eid,
  652. "stype": util.ObjToString(to["stype"]),
  653. "key": key,
  654. })
  655. addr := &net.UDPAddr{
  656. IP: net.ParseIP(to["addr"].(string)),
  657. Port: util.IntAll(to["port"]),
  658. }
  659. node := &udpNode{by, addr, time.Now().Unix(), 0}
  660. udptaskmap.Store(key, node)
  661. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  662. }
  663. }
  664. }
  665. //判断是否在周期天内
  666. func isTaskTimeCycle(pt int64) bool {
  667. year, month, day := time.Now().Date()
  668. predur_pt:=time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix()
  669. log.Println(predur_pt)
  670. if pt >= predur_pt {
  671. return true
  672. }else {
  673. return false
  674. }
  675. }
  676. //合并字段-并更新merge字段的值
  677. func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
  678. merge_recordMap := make(map[string]interface{}, 0)
  679. mergeArr := make([]int64, 0)
  680. //是否替换数据了-记录原始的数据
  681. is_replace := false
  682. //1、城市
  683. if source.area == "" || source.area == "全国" {
  684. //为空
  685. if info.area != "全国" && info.area != "" {
  686. merge_recordMap["area"] = source.area
  687. merge_recordMap["city"] = source.city
  688. source.area = info.area
  689. source.city = info.city
  690. mergeArr = append(mergeArr, 1)
  691. is_replace = true
  692. }
  693. } else {
  694. //不为空-查看站点相关-有值必替换
  695. if source.is_site {
  696. //是站点替换的城市
  697. merge_recordMap["site_area"] = source.area
  698. merge_recordMap["site_city"] = source.city
  699. mergeArr = append(mergeArr, 0)
  700. is_replace = true
  701. source.is_site = false
  702. }
  703. }
  704. //2、项目名称
  705. if source.projectname == "" && info.projectname != "" {
  706. merge_recordMap["projectname"] = source.projectname
  707. source.projectname = info.projectname
  708. mergeArr = append(mergeArr, 2)
  709. is_replace = true
  710. }
  711. //3、项目编号
  712. if source.projectcode == "" && info.projectcode != "" {
  713. merge_recordMap["projectcode"] = source.projectcode
  714. source.projectcode = info.projectcode
  715. mergeArr = append(mergeArr, 3)
  716. is_replace = true
  717. }
  718. //4、采购单位
  719. if source.buyer == "" && info.buyer != "" {
  720. merge_recordMap["buyer"] = source.buyer
  721. source.buyer = info.buyer
  722. mergeArr = append(mergeArr, 4)
  723. is_replace = true
  724. }
  725. //5、预算
  726. if source.budget == 0 && info.budget != 0 {
  727. merge_recordMap["budget"] = source.budget
  728. source.budget = info.budget
  729. mergeArr = append(mergeArr, 5)
  730. is_replace = true
  731. }
  732. //6、中标单位
  733. if source.winner == "" && info.winner != "" {
  734. merge_recordMap["winner"] = source.winner
  735. source.winner = info.winner
  736. mergeArr = append(mergeArr, 6)
  737. is_replace = true
  738. }
  739. //7、中标金额
  740. if source.bidamount == 0 && info.bidamount != 0 {
  741. merge_recordMap["bidamount"] = source.bidamount
  742. source.bidamount = info.bidamount
  743. mergeArr = append(mergeArr, 7)
  744. is_replace = true
  745. }
  746. //8、开标时间-地点
  747. if source.bidopentime == 0 && info.bidopentime != 0 {
  748. merge_recordMap["bidopentime"] = source.bidopentime
  749. source.bidopentime = info.bidopentime
  750. mergeArr = append(mergeArr, 8)
  751. is_replace = true
  752. }
  753. //9、合同编号
  754. if source.contractnumber == "" && info.contractnumber != "" {
  755. merge_recordMap["contractnumber"] = source.contractnumber
  756. source.contractnumber = info.contractnumber
  757. mergeArr = append(mergeArr, 9)
  758. is_replace = true
  759. }
  760. //10、发布时间
  761. if source.publishtime == 0 && info.publishtime != 0 {
  762. merge_recordMap["publishtime"] = source.publishtime
  763. source.publishtime = info.publishtime
  764. mergeArr = append(mergeArr, 10)
  765. is_replace = true
  766. }
  767. //11、代理机构
  768. if source.agency == "" && info.agency != "" {
  769. merge_recordMap["agency"] = source.agency
  770. source.agency = info.agency
  771. mergeArr = append(mergeArr, 11)
  772. is_replace = true
  773. }
  774. if is_replace { //有过替换更新
  775. //总次数+1
  776. source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
  777. merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
  778. //和哪一个数据id进行非空替换的-记录
  779. key := info.id
  780. source.mergemap[key] = merge_recordMap
  781. }
  782. //待进一步优化
  783. return source, mergeArr, is_replace
  784. }
  785. //权重评估
  786. func basicDataScore(v *Info, info *Info) bool {
  787. /*
  788. 权重评估
  789. 网站优先级判定规则:
  790. 1、国家>省级>市级>县区
  791. 2、政府采购>公共资源>官方网站|政府门户>社会公共招标平台|企业招标平台
  792. 3、同sitetype-分析weight
  793. 4、要素打分-分析
  794. */
  795. v_score, info_score := -1, -1
  796. dict_v := SiteMap[v.site]
  797. dict_info := SiteMap[info.site]
  798. //先判断level
  799. if dict_v != nil {
  800. v_level := util.ObjToString(dict_v["level"])
  801. if v_level == "国家" {
  802. v_score = 4
  803. } else if v_level == "省级" {
  804. v_score = 3
  805. } else if v_level == "市级" {
  806. v_score = 2
  807. } else if v_level == "县区" {
  808. v_score = 1
  809. } else if v_level == "" {
  810. } else {
  811. v_score = 0
  812. }
  813. }
  814. if dict_info != nil {
  815. info_level := util.ObjToString(dict_info["level"])
  816. if info_level == "国家" {
  817. info_score = 4
  818. } else if info_level == "省级" {
  819. info_score = 3
  820. } else if info_level == "市级" {
  821. info_score = 2
  822. } else if info_level == "县区" {
  823. info_score = 1
  824. } else if info_level == "" {
  825. } else {
  826. v_score = 0
  827. }
  828. }
  829. if v_score > info_score {
  830. return true
  831. }
  832. if v_score < info_score {
  833. return false
  834. }
  835. //判断sitetype
  836. if dict_v != nil {
  837. v_sitetype := util.ObjToString(dict_v["sitetype"])
  838. if v_sitetype == "政府采购" {
  839. v_score = 4
  840. } else if v_sitetype == "公共资源" {
  841. v_score = 3
  842. } else if v_sitetype == "官方网站"|| v_sitetype == "政府门户" {
  843. v_score = 2
  844. } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
  845. v_score = 1
  846. } else if v_sitetype == "" {
  847. } else {
  848. v_score = 0
  849. }
  850. }
  851. if dict_info != nil {
  852. info_sitetype := util.ObjToString(dict_info["sitetype"])
  853. if info_sitetype == "政府采购" {
  854. info_score = 4
  855. } else if info_sitetype == "公共资源" {
  856. info_score = 3
  857. } else if info_sitetype == "官方网站"|| info_sitetype == "政府门户" {
  858. info_score = 2
  859. } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
  860. info_score = 1
  861. } else if info_sitetype == "" {
  862. } else {
  863. info_score = 0
  864. }
  865. }
  866. if v_score > info_score {
  867. return true
  868. }
  869. if v_score < info_score {
  870. return false
  871. }
  872. if v_score == info_score {//同sitetype 情况下 分析weight
  873. v_weight := util.IntAll(dict_v["weight"])
  874. info_weight := util.IntAll(dict_info["weight"])
  875. if v_weight>info_weight {
  876. return true
  877. }
  878. if info_weight>v_weight {
  879. return false
  880. }
  881. }
  882. //网站评估
  883. m, n := 0, 0
  884. if v.projectname != "" {
  885. m++
  886. }
  887. if v.buyer != "" {
  888. m++
  889. }
  890. if v.projectcode != "" || v.contractnumber != "" {
  891. m++
  892. }
  893. if v.budget != 0 {
  894. m++
  895. }
  896. if v.bidamount != 0 {
  897. m++
  898. }
  899. if v.winner != "" {
  900. m++
  901. }
  902. if v.bidopentime != 0 {
  903. m++
  904. }
  905. if v.bidopenaddress != "" {
  906. m++
  907. }
  908. if v.agency != "" {
  909. m = m + 2
  910. }
  911. if v.city != "" {
  912. m = m + 2
  913. }
  914. if info.projectname != "" {
  915. n++
  916. }
  917. if info.buyer != "" {
  918. n++
  919. }
  920. if info.projectcode != "" || info.contractnumber != "" {
  921. n++
  922. }
  923. if info.budget != 0 {
  924. n++
  925. }
  926. if info.bidamount != 0 {
  927. n++
  928. }
  929. if info.winner != "" {
  930. n++
  931. }
  932. if info.bidopentime != 0 {
  933. n++
  934. }
  935. if info.bidopenaddress != "" {
  936. n++
  937. }
  938. if info.agency != "" {
  939. n = n + 2
  940. }
  941. if info.city != "" {
  942. n = n + 2
  943. }
  944. if m > n {
  945. return true
  946. } else if m == n {
  947. if v.publishtime >= info.publishtime {
  948. return true
  949. } else {
  950. return false
  951. }
  952. } else {
  953. return false
  954. }
  955. }
  956. //无效数据
  957. func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
  958. var n int
  959. if d1 != "" {
  960. n++
  961. }
  962. if d2 != "" {
  963. n++
  964. }
  965. if d3 != "" {
  966. n++
  967. }
  968. if d4 != "" {
  969. n++
  970. }
  971. if n == 0 {
  972. return true
  973. }
  974. return false
  975. }
  976. //迁移数据dupdays+5之前的数据
  977. func movedata() {
  978. sess := mgo.GetMgoConn()
  979. defer mgo.DestoryMongoConn(sess)
  980. year, month, day := time.Now().Date()
  981. q := map[string]interface{}{
  982. "comeintime": map[string]interface{}{
  983. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix(),
  984. },
  985. }
  986. log.Println(q)
  987. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  988. index := 0
  989. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  990. mgo.Save(extract_back, tmp)
  991. tmp = map[string]interface{}{}
  992. if index%1000 == 0 {
  993. log.Println("index", index)
  994. }
  995. }
  996. log.Println("save to", extract_back, " ok index", index)
  997. qv := map[string]interface{}{
  998. "comeintime": map[string]interface{}{
  999. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(),
  1000. },
  1001. }
  1002. delnum := mgo.Delete(extract, qv)
  1003. log.Println("remove from ", extract, delnum)
  1004. }