main.go 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "github.com/cron"
  10. "log"
  11. mu "mfw/util"
  12. "net"
  13. "os"
  14. "qfw/util"
  15. "regexp"
  16. "sync"
  17. "time"
  18. "gopkg.in/mgo.v2/bson"
  19. )
  20. var (
  21. Sysconfig map[string]interface{} //配置文件
  22. mconf map[string]interface{} //mongodb配置信息
  23. mgo *MongodbSim //mongodb操作对象
  24. extract string
  25. extract_back string
  26. udpclient mu.UdpClient //udp对象
  27. nextNode []map[string]interface{} //下节点数组
  28. dupdays = 5 //初始化判重范围
  29. DM *datamap //
  30. HM *historymap //判重数据
  31. lastid = ""
  32. //正则筛选相关
  33. FilterRegTitle = regexp.MustCompile("^_$")
  34. FilterRegTitle_0 = regexp.MustCompile("^_$")
  35. FilterRegTitle_1 = regexp.MustCompile("^_$")
  36. FilterRegTitle_2 = regexp.MustCompile("^_$")
  37. isMerger bool //是否合并
  38. Is_Sort bool //是否排序
  39. threadNum int //线程数量
  40. SiteMap map[string]map[string]interface{} //站点map
  41. LowHeavy bool //低质量数据判重
  42. TimingTask bool //是否定时任务
  43. timingSpanDay int64 //时间跨度
  44. timingPubScope int64 //发布时间周期
  45. sid, eid string //测试人员判重使用
  46. )
  47. func init() {
  48. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  49. flag.StringVar(&sid, "sid", "", "开始id")
  50. flag.StringVar(&eid, "eid", "", "结束id")
  51. flag.Parse()
  52. //172.17.145.163:27080
  53. util.ReadConfig(&Sysconfig)
  54. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  55. mconf = Sysconfig["mongodb"].(map[string]interface{})
  56. mgo = &MongodbSim{
  57. MongodbAddr: mconf["addr"].(string),
  58. DbName: mconf["db"].(string),
  59. Size: util.IntAllDef(mconf["pool"], 10),
  60. }
  61. mgo.InitPool()
  62. extract = mconf["extract"].(string)
  63. extract_back = mconf["extract_back"].(string)
  64. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  65. //加载数据
  66. DM = NewDatamap(dupdays, lastid)
  67. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  68. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  69. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  70. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  71. isMerger = Sysconfig["isMerger"].(bool)
  72. Is_Sort = Sysconfig["isSort"].(bool)
  73. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  74. LowHeavy = Sysconfig["lowHeavy"].(bool)
  75. TimingTask = Sysconfig["timingTask"].(bool)
  76. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  77. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  78. //站点配置
  79. site := mconf["site"].(map[string]interface{})
  80. SiteMap = make(map[string]map[string]interface{}, 0)
  81. start := int(time.Now().Unix())
  82. sess_site := mgo.GetMgoConn()
  83. defer mgo.DestoryMongoConn(sess_site)
  84. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  85. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  86. data_map := map[string]interface{}{
  87. "area": util.ObjToString(site_dict["area"]),
  88. "city": util.ObjToString(site_dict["city"]),
  89. "district": util.ObjToString(site_dict["district"]),
  90. "sitetype": util.ObjToString(site_dict["sitetype"]),
  91. "level": util.ObjToString(site_dict["level"]),
  92. }
  93. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  94. }
  95. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  96. }
  97. func main() {
  98. //TestXiuFu()
  99. //
  100. //return
  101. go checkMapJob()
  102. updport := Sysconfig["udpport"].(string)
  103. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  104. if TimingTask {
  105. go timedTaskDay()
  106. } else {
  107. udpclient.Listen(processUdpMsg)
  108. log.Println("Udp服务监听", updport)
  109. }
  110. time.Sleep(99999 * time.Hour)
  111. }
  112. //测试组人员使用
  113. func mainT() {
  114. /*
  115. ObjectId("5da3f31aa5cb26b9b798d3aa")
  116. ObjectId("5da418c4a5cb26b9b7e3e9a6")
  117. ObjectId("5da3f2c5a5cb26b9b79847fc")
  118. ObjectId("5db2735ba5cb26b9b7c99c6f")
  119. */
  120. if TimingTask {
  121. log.Println("定时任务测试开始")
  122. go timedTaskDay()
  123. time.Sleep(99999 * time.Hour)
  124. } else {
  125. sid = "5c2c10fda5cb26b9b75e6f7f"
  126. eid = "5e976e4a50b5ea296ef376b9"
  127. log.Println("正常判重测试开始")
  128. log.Println(sid, "---", eid)
  129. mapinfo := map[string]interface{}{}
  130. if sid == "" || eid == "" {
  131. log.Println("sid,eid参数不能为空")
  132. os.Exit(0)
  133. }
  134. mapinfo["gtid"] = sid
  135. mapinfo["lteid"] = eid
  136. mapinfo["stop"] = "true"
  137. task([]byte{}, mapinfo)
  138. time.Sleep(10 * time.Second)
  139. }
  140. }
  141. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  142. fmt.Println("接受的段数据")
  143. switch act {
  144. case mu.OP_TYPE_DATA: //上个节点的数据
  145. //从表中开始处理
  146. var mapInfo map[string]interface{}
  147. err := json.Unmarshal(data, &mapInfo)
  148. log.Println("err:", err, "mapInfo:", mapInfo)
  149. if err != nil {
  150. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  151. } else if mapInfo != nil {
  152. taskType := util.ObjToString(mapInfo["stype"])
  153. if taskType == "historyTask" {
  154. //历史更新流程
  155. go historyTask(data, mapInfo)
  156. } else if taskType == "normalTask" {
  157. //判重流程
  158. go task(data, mapInfo)
  159. } else {
  160. //其他
  161. go task(data, mapInfo)
  162. }
  163. key, _ := mapInfo["key"].(string)
  164. if key == "" {
  165. key = "udpok"
  166. }
  167. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  168. }
  169. case mu.OP_NOOP: //下个节点回应
  170. ok := string(data)
  171. if ok != "" {
  172. log.Println("ok:", ok)
  173. udptaskmap.Delete(ok)
  174. }
  175. }
  176. }
  177. //开始判重程序
  178. func task(data []byte, mapInfo map[string]interface{}) {
  179. log.Println("开始数据判重")
  180. defer util.Catch()
  181. //区间id
  182. q := map[string]interface{}{
  183. "_id": map[string]interface{}{
  184. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  185. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  186. },
  187. }
  188. log.Println(mgo.DbName, extract, q)
  189. sess := mgo.GetMgoConn()
  190. defer mgo.DestoryMongoConn(sess)
  191. //是否排序
  192. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("_id").Iter()
  193. if Is_Sort {
  194. log.Println("排序:publishtime")
  195. it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  196. }
  197. //it = sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  198. updateExtract := [][]map[string]interface{}{}
  199. log.Println("线程数:", threadNum)
  200. pool := make(chan bool, threadNum)
  201. wg := &sync.WaitGroup{}
  202. n, repeateN := 0, 0
  203. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  204. if n%10000 == 0 {
  205. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  206. }
  207. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
  208. tmp = make(map[string]interface{})
  209. repeateN++
  210. continue
  211. }
  212. pool <- true
  213. wg.Add(1)
  214. go func(tmp map[string]interface{}) {
  215. defer func() {
  216. <-pool
  217. wg.Done()
  218. }()
  219. info := NewInfo(tmp)
  220. if !LowHeavy { //是否进行低质量数据判重
  221. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  222. updateExtract = append(updateExtract, []map[string]interface{}{
  223. map[string]interface{}{
  224. "_id": tmp["_id"],
  225. },
  226. map[string]interface{}{
  227. "$set": map[string]interface{}{
  228. "repeat": -1, //无效数据标签
  229. },
  230. },
  231. })
  232. if len(updateExtract) > 500 {
  233. mgo.UpSertBulk(extract, updateExtract...)
  234. updateExtract = [][]map[string]interface{}{}
  235. }
  236. return
  237. }
  238. }
  239. b, source, reason := DM.check(info)
  240. if b { //有重复,生成更新语句,更新抽取和更新招标
  241. repeateN++
  242. var is_replace = false
  243. var mergeArr = []int64{} //更改合并数组记录
  244. var newData = &Info{} //更换新的数据池数据
  245. var repeat_idMap = map[string]interface{}{} //记录判重的
  246. var merge_idMap = map[string]interface{}{} //记录合并的
  247. repeat_idMap["_id"] = StringTOBsonId(info.id)
  248. merge_idMap["_id"] = StringTOBsonId(source.id)
  249. repeat_id := source.id //初始化一个数据
  250. if isMerger { //合并相关
  251. basic_bool := basicDataScore(source, info)
  252. if basic_bool {
  253. //已原始数据为标准 - 对比数据打判重标签-
  254. newData, mergeArr, is_replace = mergeDataFields(source, info)
  255. DM.replaceSourceData(newData, source.id) //替换
  256. //对比数据打重复标签的id,原始数据id的记录
  257. repeat_idMap["_id"] = StringTOBsonId(info.id)
  258. merge_idMap["_id"] = StringTOBsonId(source.id)
  259. repeat_id = source.id
  260. } else {
  261. //已对比数据为标准 ,数据池的数据打判重标签
  262. newData, mergeArr, is_replace = mergeDataFields(info, source)
  263. DM.replaceSourceData(newData, source.id) //替换
  264. //原始数据打重复标签的id, 对比数据id的记录
  265. repeat_idMap["_id"] = StringTOBsonId(source.id)
  266. merge_idMap["_id"] = StringTOBsonId(info.id)
  267. repeat_id = info.id
  268. }
  269. merge_map := make(map[string]interface{}, 0)
  270. if is_replace { //有过合并-更新数据
  271. merge_map = map[string]interface{}{
  272. "$set": map[string]interface{}{
  273. "merge": newData.mergemap,
  274. },
  275. }
  276. //更新合并后的数据
  277. for _, value := range mergeArr {
  278. if value == 0 {
  279. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  280. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  281. } else if value == 1 {
  282. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  283. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  284. } else if value == 2 {
  285. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  286. } else if value == 3 {
  287. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  288. } else if value == 4 {
  289. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  290. } else if value == 5 {
  291. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  292. } else if value == 6 {
  293. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  294. } else if value == 7 {
  295. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  296. } else if value == 8 {
  297. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  298. } else if value == 9 {
  299. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  300. } else if value == 10 {
  301. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  302. } else if value == 11 {
  303. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  304. } else {
  305. }
  306. }
  307. //模板数据更新
  308. updateExtract = append(updateExtract, []map[string]interface{}{
  309. merge_idMap,
  310. merge_map,
  311. })
  312. }
  313. } else { //高质量数据
  314. basic_bool := basicDataScore(source, info)
  315. if !basic_bool {
  316. DM.replaceSourceData(info, source.id) //替换
  317. repeat_idMap["_id"] = StringTOBsonId(source.id)
  318. repeat_id = info.id
  319. }
  320. }
  321. //重复数据打标签
  322. updateExtract = append(updateExtract, []map[string]interface{}{
  323. repeat_idMap,
  324. map[string]interface{}{
  325. "$set": map[string]interface{}{
  326. "repeat": 1,
  327. "repeat_reason": reason,
  328. "repeat_id": repeat_id,
  329. },
  330. },
  331. })
  332. }
  333. }(tmp)
  334. if len(updateExtract) > 500 {
  335. mgo.UpSertBulk(extract, updateExtract...)
  336. updateExtract = [][]map[string]interface{}{}
  337. }
  338. tmp = make(map[string]interface{})
  339. }
  340. wg.Wait()
  341. if len(updateExtract) > 0 {
  342. mgo.UpSertBulk(extract, updateExtract...)
  343. }
  344. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  345. //任务完成,开始发送广播通知下面节点
  346. if n > repeateN && mapInfo["stop"] == nil {
  347. for _, to := range nextNode {
  348. sid, _ := mapInfo["gtid"].(string)
  349. eid, _ := mapInfo["lteid"].(string)
  350. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  351. by, _ := json.Marshal(map[string]interface{}{
  352. "gtid": sid,
  353. "lteid": eid,
  354. "stype": util.ObjToString(to["stype"]),
  355. "key": key,
  356. })
  357. addr := &net.UDPAddr{
  358. IP: net.ParseIP(to["addr"].(string)),
  359. Port: util.IntAll(to["port"]),
  360. }
  361. node := &udpNode{by, addr, time.Now().Unix(), 0}
  362. udptaskmap.Store(key, node)
  363. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  364. }
  365. }
  366. }
  367. //支持历史更新
  368. func historyTask(data []byte, mapInfo map[string]interface{}) {
  369. fmt.Println("开始取历史时间段")
  370. defer util.Catch()
  371. sess := mgo.GetMgoConn()
  372. defer mgo.DestoryMongoConn(sess)
  373. q := map[string]interface{}{
  374. "_id": map[string]interface{}{
  375. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  376. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  377. },
  378. }
  379. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  380. minTime, maxTime := int64(0), int64(0)
  381. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  382. //取出最大最小时间
  383. info_time := tmp["comeintime"]
  384. if Is_Sort {
  385. info_time = tmp["publishtime"]
  386. }
  387. if minTime == 0 || maxTime == 0 && util.Int64All(info_time) != 0 {
  388. minTime = util.Int64All(info_time)
  389. maxTime = util.Int64All(info_time)
  390. } else {
  391. t := util.Int64All(info_time)
  392. if t < minTime && t != 0 {
  393. minTime = t
  394. }
  395. if t > maxTime && t != 0 {
  396. maxTime = t
  397. }
  398. }
  399. }
  400. //时间不正确时
  401. if minTime == 0 && maxTime == 0 {
  402. log.Println("段数据区间 不符合")
  403. return
  404. }
  405. fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
  406. gtid, lteid := util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
  407. fmt.Println(gtid, lteid)
  408. HM = NewHistorymap(gtid, lteid, minTime, maxTime)
  409. fmt.Println("开始历史数据判重")
  410. defer util.Catch()
  411. //区间id
  412. sess_history := mgo.GetMgoConn()
  413. defer mgo.DestoryMongoConn(sess_history)
  414. q_history := map[string]interface{}{
  415. "_id": map[string]interface{}{
  416. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  417. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  418. },
  419. }
  420. log.Println(mgo.DbName, extract, q_history)
  421. //是否排序
  422. it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Iter()
  423. if Is_Sort {
  424. it_history = sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
  425. }
  426. updateExtract := [][]map[string]interface{}{}
  427. log.Println("线程数:", threadNum)
  428. pool := make(chan bool, threadNum)
  429. wg := &sync.WaitGroup{}
  430. n, repeateN := 0, 0
  431. for tmp := make(map[string]interface{}); it_history.Next(&tmp); n++ {
  432. if n%10000 == 0 {
  433. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  434. }
  435. if util.IntAll(tmp["dataging"]) == 1 {
  436. tmp = make(map[string]interface{})
  437. continue
  438. }
  439. pool <- true
  440. wg.Add(1)
  441. go func(tmp map[string]interface{}) {
  442. defer func() {
  443. <-pool
  444. wg.Done()
  445. }()
  446. info := NewInfo(tmp)
  447. if !LowHeavy { //是否进行低质量数据判重
  448. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  449. updateExtract = append(updateExtract, []map[string]interface{}{
  450. map[string]interface{}{
  451. "_id": tmp["_id"],
  452. },
  453. map[string]interface{}{
  454. "$set": map[string]interface{}{
  455. "repeat": -1, //无效数据标签
  456. },
  457. },
  458. })
  459. if len(updateExtract) > 500 {
  460. mgo.UpSertBulk(extract, updateExtract...)
  461. updateExtract = [][]map[string]interface{}{}
  462. }
  463. return
  464. }
  465. }
  466. b, source, reason := HM.checkHistory(info)
  467. if b { //有重复,生成更新语句,更新抽取和更新招标
  468. if reason == "未判重记录" {
  469. fmt.Println("未判重记录")
  470. //把info的数据判重的标签更换,并新增字段
  471. HM.replaceSourceData(info, info.id) //替换即添加
  472. updateExtract = append(updateExtract, []map[string]interface{}{
  473. map[string]interface{}{
  474. "_id": tmp["_id"],
  475. },
  476. map[string]interface{}{
  477. "$set": map[string]interface{}{
  478. "repeat": 0,
  479. "repeatid": -2,
  480. },
  481. },
  482. })
  483. } else {
  484. repeateN++
  485. var is_replace = false
  486. var mergeArr = []int64{} //更改合并数组记录
  487. var newData = &Info{} //更换新的数据池数据
  488. var repeat_idMap = map[string]interface{}{} //记录判重的
  489. var merge_idMap = map[string]interface{}{} //记录合并的
  490. repeat_idMap["_id"] = StringTOBsonId(info.id)
  491. merge_idMap["_id"] = StringTOBsonId(source.id)
  492. repeat_id := source.id
  493. //以下合并相关
  494. if isMerger {
  495. basic_bool := basicDataScore(source, info)
  496. if basic_bool {
  497. //已原始数据为标准 - 对比数据打判重标签-
  498. newData, mergeArr, is_replace = mergeDataFields(source, info)
  499. HM.replaceSourceData(newData, source.id) //替换
  500. //对比数据打重复标签的id,原始数据id的记录
  501. repeat_idMap["_id"] = StringTOBsonId(info.id)
  502. merge_idMap["_id"] = StringTOBsonId(source.id)
  503. repeat_id = source.id
  504. } else {
  505. //已对比数据为标准 ,数据池的数据打判重标签
  506. newData, mergeArr, is_replace = mergeDataFields(info, source)
  507. HM.replaceSourceData(newData, source.id) //替换
  508. //原始数据打重复标签的id, 对比数据id的记录
  509. repeat_idMap["_id"] = StringTOBsonId(source.id)
  510. merge_idMap["_id"] = StringTOBsonId(info.id)
  511. repeat_id = info.id
  512. }
  513. merge_map := make(map[string]interface{}, 0)
  514. if is_replace { //有过合并-更新数据
  515. merge_map = map[string]interface{}{
  516. "$set": map[string]interface{}{
  517. "merge": newData.mergemap,
  518. },
  519. }
  520. //更新合并后的数据
  521. for _, value := range mergeArr {
  522. if value == 0 {
  523. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  524. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  525. } else if value == 1 {
  526. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  527. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  528. } else if value == 2 {
  529. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  530. } else if value == 3 {
  531. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  532. } else if value == 4 {
  533. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  534. } else if value == 5 {
  535. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  536. } else if value == 6 {
  537. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  538. } else if value == 7 {
  539. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  540. } else if value == 8 {
  541. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  542. } else if value == 9 {
  543. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  544. } else if value == 10 {
  545. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  546. } else if value == 11 {
  547. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  548. } else {
  549. }
  550. }
  551. //模板数据更新
  552. updateExtract = append(updateExtract, []map[string]interface{}{
  553. merge_idMap,
  554. merge_map,
  555. })
  556. }
  557. } else { //高质量数据
  558. basic_bool := basicDataScore(source, info)
  559. if !basic_bool {
  560. HM.replaceSourceData(info, source.id) //替换
  561. repeat_idMap["_id"] = StringTOBsonId(source.id)
  562. repeat_id = info.id
  563. }
  564. }
  565. //重复数据打标签
  566. updateExtract = append(updateExtract, []map[string]interface{}{
  567. repeat_idMap,
  568. map[string]interface{}{
  569. "$set": map[string]interface{}{
  570. "repeat": 1,
  571. "repeat_reason": reason,
  572. "repeat_id": repeat_id,
  573. },
  574. },
  575. })
  576. }
  577. }
  578. }(tmp)
  579. if len(updateExtract) > 500 {
  580. mgo.UpSertBulk(extract, updateExtract...)
  581. updateExtract = [][]map[string]interface{}{}
  582. }
  583. tmp = make(map[string]interface{})
  584. }
  585. wg.Wait()
  586. if len(updateExtract) > 0 {
  587. mgo.UpSertBulk(extract, updateExtract...)
  588. //mgo.UpdateBulk(bidding, updateBidding...)
  589. }
  590. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  591. //任务完成,开始发送广播通知下面节点
  592. if n > repeateN && mapInfo["stop"] == nil {
  593. for _, to := range nextNode {
  594. sid, _ := mapInfo["gtid"].(string)
  595. eid, _ := mapInfo["lteid"].(string)
  596. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  597. by, _ := json.Marshal(map[string]interface{}{
  598. "gtid": sid,
  599. "lteid": eid,
  600. "stype": util.ObjToString(to["stype"]),
  601. "key": key,
  602. })
  603. addr := &net.UDPAddr{
  604. IP: net.ParseIP(to["addr"].(string)),
  605. Port: util.IntAll(to["port"]),
  606. }
  607. node := &udpNode{by, addr, time.Now().Unix(), 0}
  608. udptaskmap.Store(key, node)
  609. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  610. }
  611. }
  612. }
  613. //定时任务
  614. func timedTaskDay() {
  615. log.Println("部署定时任务")
  616. c := cron.New()
  617. c.AddFunc("0 0 1 * * ?", func() { movedata() }) //每天凌晨1点执行一次
  618. c.AddFunc("0 0 2 * * ?", func() { timedTaskOnce() }) //每天凌晨2点执行一次
  619. c.Start()
  620. //timedTaskOnce()
  621. }
  622. func timedTaskOnce() {
  623. log.Println("开始一次定时任务")
  624. defer util.Catch()
  625. now := time.Now()
  626. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
  627. curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  628. task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
  629. task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
  630. between_time := curTime.Unix() - (86400 * timingPubScope)
  631. log.Println(task_sid, task_eid, curTime.Unix(), between_time)
  632. //区间id
  633. q_start := map[string]interface{}{
  634. "_id": map[string]interface{}{
  635. "$gte": StringTOBsonId(task_sid),
  636. "$lte": StringTOBsonId(task_eid),
  637. },
  638. }
  639. sess := mgo.GetMgoConn()
  640. defer mgo.DestoryMongoConn(sess)
  641. it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
  642. num, deterTime:= int64(0),int64(0) //计数
  643. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  644. dayArr := []map[string]interface{}{}
  645. for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
  646. if num%10000 == 0 {
  647. log.Println("正序遍历:", num)
  648. }
  649. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
  650. tmp = make(map[string]interface{})
  651. continue
  652. }
  653. //取-符合-发布时间半年内的数据
  654. if util.IntAll(tmp["dataging"]) == 1 {
  655. pubtime := util.Int64All(tmp["publishtime"])
  656. if pubtime > 0 && pubtime >= between_time {
  657. if deterTime==0 {
  658. log.Println("找到第一条符合条件的数据")
  659. deterTime = util.Int64All(tmp["publishtime"])
  660. dayArr = append(dayArr,tmp)
  661. }else {
  662. if util.Int64All(tmp["publishtime"])-deterTime >timingSpanDay*86400 {
  663. //新数组重新构建,当前组数据加到全部组数据
  664. pendAllArr = append(pendAllArr,dayArr)
  665. dayArr = []map[string]interface{}{}
  666. deterTime = util.Int64All(tmp["publishtime"])
  667. dayArr = append(dayArr,tmp)
  668. }else {
  669. dayArr = append(dayArr,tmp)
  670. }
  671. }
  672. }
  673. }
  674. tmp = make(map[string]interface{})
  675. }
  676. if len(pendAllArr) <= 0 {
  677. log.Println("没找到dataging==1的数据")
  678. return
  679. }
  680. log.Println("本地构建分组完成:",len(pendAllArr),"组")
  681. updateExtract := [][]map[string]interface{}{}
  682. n, repeateN := 0, 0
  683. for k,v:=range pendAllArr {
  684. //构建当前组的数据池
  685. log.Println("构建第",k,"组---(数据池)")
  686. DM = TimedTaskDatamap(dupdays, util.Int64All(v[0]["publishtime"]))
  687. log.Println("开始遍历判重第",k,"组 共计数量:",len(v))
  688. log.Println("统计目前总数量:",n,"重复数量:",repeateN)
  689. n = n+len(v)
  690. for _,tmp:=range v {
  691. info := NewInfo(tmp)
  692. if !LowHeavy { //是否进行低质量数据判重
  693. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  694. log.Println("无效数据")
  695. updateExtract = append(updateExtract, []map[string]interface{}{
  696. map[string]interface{}{
  697. "_id": tmp["_id"],
  698. },
  699. map[string]interface{}{
  700. "$set": map[string]interface{}{
  701. "repeat": -1, //无效数据标签
  702. "dataging": 0,
  703. },
  704. },
  705. })
  706. if len(updateExtract) > 500 {
  707. mgo.UpSertBulk(extract, updateExtract...)
  708. updateExtract = [][]map[string]interface{}{}
  709. }
  710. continue
  711. }
  712. }
  713. b, source, reason := DM.check(info)
  714. if b { //有重复,生成更新语句,更新抽取和更新招标
  715. log.Println("判重结果", b, reason)
  716. repeateN++
  717. var is_replace = false
  718. var mergeArr = []int64{} //更改合并数组记录
  719. var newData = &Info{} //更换新的数据池数据
  720. var repeat_idMap = map[string]interface{}{} //记录判重的
  721. var merge_idMap = map[string]interface{}{} //记录合并的
  722. repeat_idMap["_id"] = StringTOBsonId(info.id)
  723. merge_idMap["_id"] = StringTOBsonId(source.id)
  724. repeat_id := source.id //初始化一个数据
  725. if isMerger { //合并相关
  726. basic_bool := basicDataScore(source, info)
  727. if basic_bool {
  728. //已原始数据为标准 - 对比数据打判重标签-
  729. newData, mergeArr, is_replace = mergeDataFields(source, info)
  730. DM.replaceSourceData(newData, source.id) //替换
  731. //对比数据打重复标签的id,原始数据id的记录
  732. repeat_idMap["_id"] = StringTOBsonId(info.id)
  733. merge_idMap["_id"] = StringTOBsonId(source.id)
  734. repeat_id = source.id
  735. } else {
  736. //已对比数据为标准 ,数据池的数据打判重标签
  737. newData, mergeArr, is_replace = mergeDataFields(info, source)
  738. DM.replaceSourceData(newData, source.id) //替换
  739. //原始数据打重复标签的id, 对比数据id的记录
  740. repeat_idMap["_id"] = StringTOBsonId(source.id)
  741. merge_idMap["_id"] = StringTOBsonId(info.id)
  742. repeat_id = info.id
  743. }
  744. merge_map := make(map[string]interface{}, 0)
  745. if is_replace { //有过合并-更新数据
  746. merge_map = map[string]interface{}{
  747. "$set": map[string]interface{}{
  748. "merge": newData.mergemap,
  749. "dataging": 0,
  750. },
  751. }
  752. //更新合并后的数据
  753. for _, value := range mergeArr {
  754. if value == 0 {
  755. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  756. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  757. } else if value == 1 {
  758. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  759. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  760. } else if value == 2 {
  761. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  762. } else if value == 3 {
  763. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  764. } else if value == 4 {
  765. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  766. } else if value == 5 {
  767. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  768. } else if value == 6 {
  769. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  770. } else if value == 7 {
  771. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  772. } else if value == 8 {
  773. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  774. } else if value == 9 {
  775. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  776. } else if value == 10 {
  777. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  778. } else if value == 11 {
  779. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  780. } else {
  781. }
  782. }
  783. //模板数据更新
  784. updateExtract = append(updateExtract, []map[string]interface{}{
  785. merge_idMap,
  786. merge_map,
  787. })
  788. }
  789. } else { //高质量数据
  790. basic_bool := basicDataScore(source, info)
  791. if !basic_bool {
  792. log.Println("高质量数据替换:",source.id,info.id)
  793. DM.replaceSourceData(info, source.id) //替换
  794. repeat_idMap["_id"] = StringTOBsonId(source.id)
  795. repeat_id = info.id
  796. }
  797. }
  798. //重复数据打标签
  799. updateExtract = append(updateExtract, []map[string]interface{}{
  800. repeat_idMap,
  801. map[string]interface{}{
  802. "$set": map[string]interface{}{
  803. "repeat": 1,
  804. "repeat_reason": reason,
  805. "repeat_id": repeat_id,
  806. "dataging": 0,
  807. },
  808. },
  809. })
  810. }else {
  811. updateExtract = append(updateExtract, []map[string]interface{}{
  812. map[string]interface{}{
  813. "_id": tmp["_id"],
  814. },
  815. map[string]interface{}{
  816. "$set": map[string]interface{}{
  817. "dataging": 0,//符合条件的都为dataging==0
  818. },
  819. },
  820. })
  821. }
  822. if len(updateExtract) > 500 {
  823. mgo.UpSertBulk(extract, updateExtract...)
  824. updateExtract = [][]map[string]interface{}{}
  825. }
  826. }
  827. }
  828. if len(updateExtract) > 0 {
  829. mgo.UpSertBulk(extract, updateExtract...)
  830. }
  831. log.Println("this timeTask over.", n, "repeateN:", repeateN)
  832. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  833. if n > repeateN {
  834. for _, to := range nextNode {
  835. next_sid := util.BsonIdToSId(task_sid)
  836. next_eid := util.BsonIdToSId(task_eid)
  837. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  838. by, _ := json.Marshal(map[string]interface{}{
  839. "gtid": next_sid,
  840. "lteid": next_eid,
  841. "stype": util.ObjToString(to["stype"]),
  842. "key": key,
  843. })
  844. addr := &net.UDPAddr{
  845. IP: net.ParseIP(to["addr"].(string)),
  846. Port: util.IntAll(to["port"]),
  847. }
  848. node := &udpNode{by, addr, time.Now().Unix(), 0}
  849. udptaskmap.Store(key, node)
  850. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  851. }
  852. }
  853. }
  854. //合并字段-并更新merge字段的值
  855. func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
  856. merge_recordMap := make(map[string]interface{}, 0)
  857. mergeArr := make([]int64, 0)
  858. //是否替换数据了-记录原始的数据
  859. is_replace := false
  860. //1、城市
  861. if source.area == "" || source.area == "全国" {
  862. //为空
  863. if info.area != "全国" && info.area != "" {
  864. merge_recordMap["area"] = source.area
  865. merge_recordMap["city"] = source.city
  866. source.area = info.area
  867. source.city = info.city
  868. mergeArr = append(mergeArr, 1)
  869. is_replace = true
  870. }
  871. } else {
  872. //不为空-查看站点相关-有值必替换
  873. if source.is_site {
  874. //是站点替换的城市
  875. merge_recordMap["site_area"] = source.area
  876. merge_recordMap["site_city"] = source.city
  877. mergeArr = append(mergeArr, 0)
  878. is_replace = true
  879. source.is_site = false
  880. }
  881. }
  882. //2、项目名称
  883. if source.projectname == "" && info.projectname != "" {
  884. merge_recordMap["projectname"] = source.projectname
  885. source.projectname = info.projectname
  886. mergeArr = append(mergeArr, 2)
  887. is_replace = true
  888. }
  889. //3、项目编号
  890. if source.projectcode == "" && info.projectcode != "" {
  891. merge_recordMap["projectcode"] = source.projectcode
  892. source.projectcode = info.projectcode
  893. mergeArr = append(mergeArr, 3)
  894. is_replace = true
  895. }
  896. //4、采购单位
  897. if source.buyer == "" && info.buyer != "" {
  898. merge_recordMap["buyer"] = source.buyer
  899. source.buyer = info.buyer
  900. mergeArr = append(mergeArr, 4)
  901. is_replace = true
  902. }
  903. //5、预算
  904. if source.budget == 0 && info.budget != 0 {
  905. merge_recordMap["budget"] = source.budget
  906. source.budget = info.budget
  907. mergeArr = append(mergeArr, 5)
  908. is_replace = true
  909. }
  910. //6、中标单位
  911. if source.winner == "" && info.winner != "" {
  912. merge_recordMap["winner"] = source.winner
  913. source.winner = info.winner
  914. mergeArr = append(mergeArr, 6)
  915. is_replace = true
  916. }
  917. //7、中标金额
  918. if source.bidamount == 0 && info.bidamount != 0 {
  919. merge_recordMap["bidamount"] = source.bidamount
  920. source.bidamount = info.bidamount
  921. mergeArr = append(mergeArr, 7)
  922. is_replace = true
  923. }
  924. //8、开标时间-地点
  925. if source.bidopentime == 0 && info.bidopentime != 0 {
  926. merge_recordMap["bidopentime"] = source.bidopentime
  927. source.bidopentime = info.bidopentime
  928. mergeArr = append(mergeArr, 8)
  929. is_replace = true
  930. }
  931. //9、合同编号
  932. if source.contractnumber == "" && info.contractnumber != "" {
  933. merge_recordMap["contractnumber"] = source.contractnumber
  934. source.contractnumber = info.contractnumber
  935. mergeArr = append(mergeArr, 9)
  936. is_replace = true
  937. }
  938. //10、发布时间
  939. if source.publishtime == 0 && info.publishtime != 0 {
  940. merge_recordMap["publishtime"] = source.publishtime
  941. source.publishtime = info.publishtime
  942. mergeArr = append(mergeArr, 10)
  943. is_replace = true
  944. }
  945. //11、代理机构
  946. if source.agency == "" && info.agency != "" {
  947. merge_recordMap["agency"] = source.agency
  948. source.agency = info.agency
  949. mergeArr = append(mergeArr, 11)
  950. is_replace = true
  951. }
  952. if is_replace { //有过替换更新
  953. //总次数+1
  954. source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
  955. merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
  956. //和哪一个数据id进行非空替换的-记录
  957. key := info.id
  958. source.mergemap[key] = merge_recordMap
  959. }
  960. //待进一步优化
  961. return source, mergeArr, is_replace
  962. }
  963. //权重评估
  964. func basicDataScore(v *Info, info *Info) bool {
  965. /*
  966. 权重评估
  967. 网站优先级判定规则:
  968. 1、中央>省>市>县区
  969. 2、政府采购>公共资源>采购单位官网>招标代理公司/平台
  970. */
  971. v_score, info_score := -1, -1
  972. dict_v := SiteMap[v.site]
  973. dict_info := SiteMap[info.site]
  974. //先判断level
  975. if dict_v != nil {
  976. v_level := util.ObjToString(dict_v["level"])
  977. if v_level == "中央" {
  978. v_score = 4
  979. } else if v_level == "省级" {
  980. v_score = 3
  981. } else if v_level == "市级" {
  982. v_score = 2
  983. } else if v_level == "县区" {
  984. v_score = 1
  985. } else if v_level == "" {
  986. } else {
  987. v_score = 0
  988. }
  989. }
  990. if dict_info != nil {
  991. info_level := util.ObjToString(dict_info["level"])
  992. if info_level == "中央" {
  993. info_score = 4
  994. } else if info_level == "省级" {
  995. info_score = 3
  996. } else if info_level == "市级" {
  997. info_score = 2
  998. } else if info_level == "县区" {
  999. info_score = 1
  1000. } else if info_level == "" {
  1001. } else {
  1002. v_score = 0
  1003. }
  1004. }
  1005. if v_score > info_score {
  1006. return true
  1007. }
  1008. if v_score < info_score {
  1009. return false
  1010. }
  1011. //判断sitetype
  1012. if dict_v != nil {
  1013. v_sitetype := util.ObjToString(dict_v["sitetype"])
  1014. if v_sitetype == "政府采购" || v_sitetype == "政府门户" {
  1015. v_score = 4
  1016. } else if v_sitetype == "公共资源" {
  1017. v_score = 3
  1018. } else if v_sitetype == "官方网站" {
  1019. v_score = 2
  1020. } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
  1021. v_score = 1
  1022. } else if v_sitetype == "" {
  1023. } else {
  1024. v_score = 0
  1025. }
  1026. }
  1027. if dict_info != nil {
  1028. info_sitetype := util.ObjToString(dict_info["sitetype"])
  1029. if info_sitetype == "政府采购" || info_sitetype == "政府门户" {
  1030. info_score = 4
  1031. } else if info_sitetype == "公共资源" {
  1032. info_score = 3
  1033. } else if info_sitetype == "官方网站" {
  1034. info_score = 2
  1035. } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
  1036. info_score = 1
  1037. } else if info_sitetype == "" {
  1038. } else {
  1039. info_score = 0
  1040. }
  1041. }
  1042. if v_score > info_score {
  1043. return true
  1044. }
  1045. if v_score < info_score {
  1046. return false
  1047. }
  1048. //网站评估
  1049. m, n := 0, 0
  1050. if v.projectname != "" {
  1051. m++
  1052. }
  1053. if v.buyer != "" {
  1054. m++
  1055. }
  1056. if v.projectcode != "" || v.contractnumber != "" {
  1057. m++
  1058. }
  1059. if v.budget != 0 {
  1060. m++
  1061. }
  1062. if v.bidamount != 0 {
  1063. m++
  1064. }
  1065. if v.winner != "" {
  1066. m++
  1067. }
  1068. if v.bidopentime != 0 {
  1069. m++
  1070. }
  1071. if v.bidopenaddress != "" {
  1072. m++
  1073. }
  1074. if v.agency != "" {
  1075. m = m + 2
  1076. }
  1077. if v.city != "" {
  1078. m = m + 2
  1079. }
  1080. if info.projectname != "" {
  1081. n++
  1082. }
  1083. if info.buyer != "" {
  1084. n++
  1085. }
  1086. if info.projectcode != "" || info.contractnumber != "" {
  1087. n++
  1088. }
  1089. if info.budget != 0 {
  1090. n++
  1091. }
  1092. if info.bidamount != 0 {
  1093. n++
  1094. }
  1095. if info.winner != "" {
  1096. n++
  1097. }
  1098. if info.bidopentime != 0 {
  1099. n++
  1100. }
  1101. if info.bidopenaddress != "" {
  1102. n++
  1103. }
  1104. if info.agency != "" {
  1105. n = n + 2
  1106. }
  1107. if info.city != "" {
  1108. n = n + 2
  1109. }
  1110. if m > n {
  1111. return true
  1112. } else if m == n {
  1113. if v.publishtime >= info.publishtime {
  1114. return true
  1115. } else {
  1116. return false
  1117. }
  1118. } else {
  1119. return false
  1120. }
  1121. }
  1122. //无效数据
  1123. func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
  1124. var n int
  1125. if d1 != "" {
  1126. n++
  1127. }
  1128. if d2 != "" {
  1129. n++
  1130. }
  1131. if d3 != "" {
  1132. n++
  1133. }
  1134. if d4 != "" {
  1135. n++
  1136. }
  1137. if n == 0 {
  1138. return true
  1139. }
  1140. return false
  1141. }
  1142. //迁移数据dupdays+5之前的数据
  1143. func movedata() {
  1144. sess := mgo.GetMgoConn()
  1145. defer mgo.DestoryMongoConn(sess)
  1146. year, month, day := time.Now().Date()
  1147. q := map[string]interface{}{
  1148. "comeintime": map[string]interface{}{
  1149. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix(),
  1150. },
  1151. }
  1152. log.Println(q)
  1153. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  1154. index := 0
  1155. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  1156. mgo.Save(extract_back, tmp)
  1157. tmp = map[string]interface{}{}
  1158. if index%1000 == 0 {
  1159. log.Println("index", index)
  1160. }
  1161. }
  1162. log.Println("save to", extract_back, " ok index", index)
  1163. delnum := mgo.Delete(extract, q)
  1164. log.Println("remove from ", extract, delnum)
  1165. }