main.go 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "os"
  13. "qfw/util"
  14. "regexp"
  15. "sync"
  16. "time"
  17. "github.com/cron"
  18. "gopkg.in/mgo.v2/bson"
  19. )
  20. var (
  21. Sysconfig map[string]interface{} //配置文件
  22. mconf map[string]interface{} //mongodb配置信息
  23. mgo *MongodbSim //mongodb操作对象
  24. extract string
  25. extract_back string
  26. udpclient mu.UdpClient //udp对象
  27. nextNode []map[string]interface{} //下节点数组
  28. dupdays = 5 //初始化判重范围
  29. DM *datamap //
  30. HM *historymap //判重数据
  31. lastid = ""
  32. //正则筛选相关
  33. FilterRegTitle = regexp.MustCompile("^_$")
  34. FilterRegTitle_0 = regexp.MustCompile("^_$")
  35. FilterRegTitle_1 = regexp.MustCompile("^_$")
  36. FilterRegTitle_2 = regexp.MustCompile("^_$")
  37. isMerger bool //是否合并
  38. Is_Sort bool //是否排序
  39. threadNum int //线程数量
  40. SiteMap map[string]map[string]interface{} //站点map
  41. LowHeavy bool //低质量数据判重
  42. TimingTask bool //是否定时任务
  43. timingSpanDay int64 //时间跨度
  44. timingPubScope int64 //发布时间周期
  45. sid, eid string //测试人员判重使用
  46. )
  47. func init() {
  48. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  49. flag.StringVar(&sid, "sid", "", "开始id")
  50. flag.StringVar(&eid, "eid", "", "结束id")
  51. flag.Parse()
  52. //172.17.145.163:27080
  53. util.ReadConfig(&Sysconfig)
  54. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  55. mconf = Sysconfig["mongodb"].(map[string]interface{})
  56. mgo = &MongodbSim{
  57. MongodbAddr: mconf["addr"].(string),
  58. DbName: mconf["db"].(string),
  59. Size: util.IntAllDef(mconf["pool"], 10),
  60. }
  61. mgo.InitPool()
  62. extract = mconf["extract"].(string)
  63. extract_back = mconf["extract_back"].(string)
  64. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  65. //加载数据
  66. DM = NewDatamap(dupdays, lastid)
  67. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  68. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  69. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  70. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  71. isMerger = Sysconfig["isMerger"].(bool)
  72. Is_Sort = Sysconfig["isSort"].(bool)
  73. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  74. LowHeavy = Sysconfig["lowHeavy"].(bool)
  75. TimingTask = Sysconfig["timingTask"].(bool)
  76. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  77. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  78. //站点配置
  79. site := mconf["site"].(map[string]interface{})
  80. SiteMap = make(map[string]map[string]interface{}, 0)
  81. start := int(time.Now().Unix())
  82. sess_site := mgo.GetMgoConn()
  83. defer mgo.DestoryMongoConn(sess_site)
  84. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  85. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  86. data_map := map[string]interface{}{
  87. "area": util.ObjToString(site_dict["area"]),
  88. "city": util.ObjToString(site_dict["city"]),
  89. "district": util.ObjToString(site_dict["district"]),
  90. "sitetype": util.ObjToString(site_dict["sitetype"]),
  91. "level": util.ObjToString(site_dict["level"]),
  92. }
  93. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  94. }
  95. log.Printf("站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  96. }
  97. func main() {
  98. go checkMapJob()
  99. updport := Sysconfig["udpport"].(string)
  100. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  101. if TimingTask {
  102. go timedTaskDay()
  103. } else {
  104. udpclient.Listen(processUdpMsg)
  105. log.Println("Udp服务监听", updport)
  106. }
  107. time.Sleep(99999 * time.Hour)
  108. }
  109. //测试组人员使用
  110. func mainT() {
  111. /*
  112. ObjectId("5da3f31aa5cb26b9b798d3aa")
  113. ObjectId("5da418c4a5cb26b9b7e3e9a6")
  114. ObjectId("5da3f2c5a5cb26b9b79847fc")
  115. ObjectId("5db2735ba5cb26b9b7c99c6f")
  116. */
  117. log.Println("测试开始")
  118. if TimingTask {
  119. go timedTaskDay()
  120. time.Sleep(99999 * time.Hour)
  121. }else {
  122. //sid = "5da3f2c5a5cb26b9b79847fc"
  123. //eid = "5db2735ba5cb26b9b7c99c6f"
  124. log.Println(sid,"---",eid)
  125. mapinfo := map[string]interface{}{}
  126. if sid == "" || eid == "" {
  127. log.Println("sid,eid参数不能为空")
  128. os.Exit(0)
  129. }
  130. mapinfo["gtid"] = sid
  131. mapinfo["lteid"] = eid
  132. mapinfo["stop"] = "true"
  133. task([]byte{}, mapinfo)
  134. time.Sleep(10 * time.Second)
  135. }
  136. }
  137. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  138. fmt.Println("接受的段数据")
  139. switch act {
  140. case mu.OP_TYPE_DATA: //上个节点的数据
  141. //从表中开始处理
  142. var mapInfo map[string]interface{}
  143. err := json.Unmarshal(data, &mapInfo)
  144. log.Println("err:", err, "mapInfo:", mapInfo)
  145. if err != nil {
  146. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  147. } else if mapInfo != nil {
  148. taskType := util.ObjToString(mapInfo["stype"])
  149. if taskType == "historyTask" {
  150. //历史更新流程
  151. go historyTask(data, mapInfo)
  152. } else if taskType == "normalTask" {
  153. //判重流程
  154. go task(data, mapInfo)
  155. } else {
  156. //其他
  157. go task(data, mapInfo)
  158. }
  159. key, _ := mapInfo["key"].(string)
  160. if key == "" {
  161. key = "udpok"
  162. }
  163. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  164. }
  165. case mu.OP_NOOP: //下个节点回应
  166. ok := string(data)
  167. if ok != "" {
  168. log.Println("ok:", ok)
  169. udptaskmap.Delete(ok)
  170. }
  171. }
  172. }
  173. //开始判重程序
  174. func task(data []byte, mapInfo map[string]interface{}) {
  175. log.Println("开始数据判重")
  176. defer util.Catch()
  177. //区间id
  178. q := map[string]interface{}{
  179. "_id": map[string]interface{}{
  180. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  181. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  182. },
  183. }
  184. log.Println(mgo.DbName, extract, q)
  185. sess := mgo.GetMgoConn()
  186. defer mgo.DestoryMongoConn(sess)
  187. //是否排序
  188. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("_id").Iter()
  189. if Is_Sort {
  190. log.Println("排序:publishtime")
  191. it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  192. }
  193. //it = sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  194. updateExtract := [][]map[string]interface{}{}
  195. log.Println("线程数:", threadNum)
  196. pool := make(chan bool, threadNum)
  197. wg := &sync.WaitGroup{}
  198. n, repeateN := 0, 0
  199. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  200. if n%10000 == 0 {
  201. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  202. }
  203. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
  204. tmp = make(map[string]interface{})
  205. repeateN++
  206. continue
  207. }
  208. pool <- true
  209. wg.Add(1)
  210. go func(tmp map[string]interface{}) {
  211. defer func() {
  212. <-pool
  213. wg.Done()
  214. }()
  215. info := NewInfo(tmp)
  216. if !LowHeavy { //是否进行低质量数据判重
  217. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  218. updateExtract = append(updateExtract, []map[string]interface{}{
  219. map[string]interface{}{
  220. "_id": tmp["_id"],
  221. },
  222. map[string]interface{}{
  223. "$set": map[string]interface{}{
  224. "repeat": -1, //无效数据标签
  225. },
  226. },
  227. })
  228. if len(updateExtract) > 500 {
  229. mgo.UpSertBulk(extract, updateExtract...)
  230. updateExtract = [][]map[string]interface{}{}
  231. }
  232. return
  233. }
  234. }
  235. b, source, reason := DM.check(info)
  236. if b { //有重复,生成更新语句,更新抽取和更新招标
  237. repeateN++
  238. var is_replace = false
  239. var mergeArr = []int64{} //更改合并数组记录
  240. var newData = &Info{} //更换新的数据池数据
  241. var repeat_idMap = map[string]interface{}{} //记录判重的
  242. var merge_idMap = map[string]interface{}{} //记录合并的
  243. repeat_idMap["_id"] = StringTOBsonId(info.id)
  244. merge_idMap["_id"] = StringTOBsonId(source.id)
  245. repeat_id := source.id //初始化一个数据
  246. if isMerger { //合并相关
  247. basic_bool := basicDataScore(source, info)
  248. if basic_bool {
  249. //已原始数据为标准 - 对比数据打判重标签-
  250. newData, mergeArr, is_replace = mergeDataFields(source, info)
  251. DM.replaceSourceData(newData, source.id) //替换
  252. //对比数据打重复标签的id,原始数据id的记录
  253. repeat_idMap["_id"] = StringTOBsonId(info.id)
  254. merge_idMap["_id"] = StringTOBsonId(source.id)
  255. repeat_id = source.id
  256. } else {
  257. //已对比数据为标准 ,数据池的数据打判重标签
  258. newData, mergeArr, is_replace = mergeDataFields(info, source)
  259. DM.replaceSourceData(newData, source.id) //替换
  260. //原始数据打重复标签的id, 对比数据id的记录
  261. repeat_idMap["_id"] = StringTOBsonId(source.id)
  262. merge_idMap["_id"] = StringTOBsonId(info.id)
  263. repeat_id = info.id
  264. }
  265. merge_map := make(map[string]interface{}, 0)
  266. if is_replace { //有过合并-更新数据
  267. merge_map = map[string]interface{}{
  268. "$set": map[string]interface{}{
  269. "merge": newData.mergemap,
  270. },
  271. }
  272. //更新合并后的数据
  273. for _, value := range mergeArr {
  274. if value == 0 {
  275. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  276. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  277. } else if value == 1 {
  278. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  279. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  280. } else if value == 2 {
  281. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  282. } else if value == 3 {
  283. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  284. } else if value == 4 {
  285. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  286. } else if value == 5 {
  287. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  288. } else if value == 6 {
  289. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  290. } else if value == 7 {
  291. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  292. } else if value == 8 {
  293. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  294. } else if value == 9 {
  295. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  296. } else if value == 10 {
  297. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  298. } else if value == 11 {
  299. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  300. } else {
  301. }
  302. }
  303. //模板数据更新
  304. updateExtract = append(updateExtract, []map[string]interface{}{
  305. merge_idMap,
  306. merge_map,
  307. })
  308. }
  309. } else { //高质量数据
  310. basic_bool := basicDataScore(source, info)
  311. if !basic_bool {
  312. DM.replaceSourceData(info, source.id) //替换
  313. repeat_idMap["_id"] = StringTOBsonId(source.id)
  314. repeat_id = info.id
  315. }
  316. }
  317. //重复数据打标签
  318. updateExtract = append(updateExtract, []map[string]interface{}{
  319. repeat_idMap,
  320. map[string]interface{}{
  321. "$set": map[string]interface{}{
  322. "repeat": 1,
  323. "repeat_reason": reason,
  324. "repeat_id": repeat_id,
  325. },
  326. },
  327. })
  328. }
  329. }(tmp)
  330. if len(updateExtract) > 500 {
  331. mgo.UpSertBulk(extract, updateExtract...)
  332. updateExtract = [][]map[string]interface{}{}
  333. }
  334. tmp = make(map[string]interface{})
  335. }
  336. wg.Wait()
  337. if len(updateExtract) > 0 {
  338. mgo.UpSertBulk(extract, updateExtract...)
  339. }
  340. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  341. //任务完成,开始发送广播通知下面节点
  342. if n > repeateN && mapInfo["stop"] == nil {
  343. for _, to := range nextNode {
  344. sid, _ := mapInfo["gtid"].(string)
  345. eid, _ := mapInfo["lteid"].(string)
  346. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  347. by, _ := json.Marshal(map[string]interface{}{
  348. "gtid": sid,
  349. "lteid": eid,
  350. "stype": util.ObjToString(to["stype"]),
  351. "key": key,
  352. })
  353. addr := &net.UDPAddr{
  354. IP: net.ParseIP(to["addr"].(string)),
  355. Port: util.IntAll(to["port"]),
  356. }
  357. node := &udpNode{by, addr, time.Now().Unix(), 0}
  358. udptaskmap.Store(key, node)
  359. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  360. }
  361. }
  362. }
  363. //支持历史更新
  364. func historyTask(data []byte, mapInfo map[string]interface{}) {
  365. fmt.Println("开始取历史时间段")
  366. defer util.Catch()
  367. sess := mgo.GetMgoConn()
  368. defer mgo.DestoryMongoConn(sess)
  369. q := map[string]interface{}{
  370. "_id": map[string]interface{}{
  371. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  372. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  373. },
  374. }
  375. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  376. minTime, maxTime := int64(0), int64(0)
  377. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  378. //取出最大最小时间
  379. info_time := tmp["comeintime"]
  380. if Is_Sort {
  381. info_time = tmp["publishtime"]
  382. }
  383. if minTime == 0 || maxTime == 0 && util.Int64All(info_time) != 0 {
  384. minTime = util.Int64All(info_time)
  385. maxTime = util.Int64All(info_time)
  386. } else {
  387. t := util.Int64All(info_time)
  388. if t < minTime && t != 0 {
  389. minTime = t
  390. }
  391. if t > maxTime && t != 0 {
  392. maxTime = t
  393. }
  394. }
  395. }
  396. //时间不正确时
  397. if minTime == 0 && maxTime == 0 {
  398. log.Println("段数据区间 不符合")
  399. return
  400. }
  401. fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
  402. gtid, lteid := util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
  403. fmt.Println(gtid, lteid)
  404. HM = NewHistorymap(gtid, lteid, minTime, maxTime)
  405. fmt.Println("开始历史数据判重")
  406. defer util.Catch()
  407. //区间id
  408. sess_history := mgo.GetMgoConn()
  409. defer mgo.DestoryMongoConn(sess_history)
  410. q_history := map[string]interface{}{
  411. "_id": map[string]interface{}{
  412. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  413. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  414. },
  415. }
  416. log.Println(mgo.DbName, extract, q_history)
  417. //是否排序
  418. it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Iter()
  419. if Is_Sort {
  420. it_history = sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
  421. }
  422. updateExtract := [][]map[string]interface{}{}
  423. log.Println("线程数:", threadNum)
  424. pool := make(chan bool, threadNum)
  425. wg := &sync.WaitGroup{}
  426. n, repeateN := 0, 0
  427. for tmp := make(map[string]interface{}); it_history.Next(&tmp); n++ {
  428. if n%10000 == 0 {
  429. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  430. }
  431. if util.IntAll(tmp["dataging"]) == 1 {
  432. tmp = make(map[string]interface{})
  433. continue
  434. }
  435. pool <- true
  436. wg.Add(1)
  437. go func(tmp map[string]interface{}) {
  438. defer func() {
  439. <-pool
  440. wg.Done()
  441. }()
  442. info := NewInfo(tmp)
  443. if !LowHeavy { //是否进行低质量数据判重
  444. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  445. updateExtract = append(updateExtract, []map[string]interface{}{
  446. map[string]interface{}{
  447. "_id": tmp["_id"],
  448. },
  449. map[string]interface{}{
  450. "$set": map[string]interface{}{
  451. "repeat": -1, //无效数据标签
  452. },
  453. },
  454. })
  455. if len(updateExtract) > 500 {
  456. mgo.UpSertBulk(extract, updateExtract...)
  457. updateExtract = [][]map[string]interface{}{}
  458. }
  459. return
  460. }
  461. }
  462. b, source, reason := HM.checkHistory(info)
  463. if b { //有重复,生成更新语句,更新抽取和更新招标
  464. if reason == "未判重记录" {
  465. fmt.Println("未判重记录")
  466. //把info的数据判重的标签更换,并新增字段
  467. HM.replaceSourceData(info, info.id) //替换即添加
  468. updateExtract = append(updateExtract, []map[string]interface{}{
  469. map[string]interface{}{
  470. "_id": tmp["_id"],
  471. },
  472. map[string]interface{}{
  473. "$set": map[string]interface{}{
  474. "repeat": 0,
  475. "repeatid": -2,
  476. },
  477. },
  478. })
  479. } else {
  480. repeateN++
  481. var is_replace = false
  482. var mergeArr = []int64{} //更改合并数组记录
  483. var newData = &Info{} //更换新的数据池数据
  484. var repeat_idMap = map[string]interface{}{} //记录判重的
  485. var merge_idMap = map[string]interface{}{} //记录合并的
  486. repeat_idMap["_id"] = StringTOBsonId(info.id)
  487. merge_idMap["_id"] = StringTOBsonId(source.id)
  488. repeat_id := source.id
  489. //以下合并相关
  490. if isMerger {
  491. basic_bool := basicDataScore(source, info)
  492. if basic_bool {
  493. //已原始数据为标准 - 对比数据打判重标签-
  494. newData, mergeArr, is_replace = mergeDataFields(source, info)
  495. HM.replaceSourceData(newData, source.id) //替换
  496. //对比数据打重复标签的id,原始数据id的记录
  497. repeat_idMap["_id"] = StringTOBsonId(info.id)
  498. merge_idMap["_id"] = StringTOBsonId(source.id)
  499. repeat_id = source.id
  500. } else {
  501. //已对比数据为标准 ,数据池的数据打判重标签
  502. newData, mergeArr, is_replace = mergeDataFields(info, source)
  503. HM.replaceSourceData(newData, source.id) //替换
  504. //原始数据打重复标签的id, 对比数据id的记录
  505. repeat_idMap["_id"] = StringTOBsonId(source.id)
  506. merge_idMap["_id"] = StringTOBsonId(info.id)
  507. repeat_id = info.id
  508. }
  509. merge_map := make(map[string]interface{}, 0)
  510. if is_replace { //有过合并-更新数据
  511. merge_map = map[string]interface{}{
  512. "$set": map[string]interface{}{
  513. "merge": newData.mergemap,
  514. },
  515. }
  516. //更新合并后的数据
  517. for _, value := range mergeArr {
  518. if value == 0 {
  519. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  520. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  521. } else if value == 1 {
  522. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  523. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  524. } else if value == 2 {
  525. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  526. } else if value == 3 {
  527. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  528. } else if value == 4 {
  529. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  530. } else if value == 5 {
  531. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  532. } else if value == 6 {
  533. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  534. } else if value == 7 {
  535. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  536. } else if value == 8 {
  537. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  538. } else if value == 9 {
  539. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  540. } else if value == 10 {
  541. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  542. } else if value == 11 {
  543. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  544. } else {
  545. }
  546. }
  547. //模板数据更新
  548. updateExtract = append(updateExtract, []map[string]interface{}{
  549. merge_idMap,
  550. merge_map,
  551. })
  552. }
  553. } else { //高质量数据
  554. basic_bool := basicDataScore(source, info)
  555. if !basic_bool {
  556. HM.replaceSourceData(info, source.id) //替换
  557. repeat_idMap["_id"] = StringTOBsonId(source.id)
  558. repeat_id = info.id
  559. }
  560. }
  561. //重复数据打标签
  562. updateExtract = append(updateExtract, []map[string]interface{}{
  563. repeat_idMap,
  564. map[string]interface{}{
  565. "$set": map[string]interface{}{
  566. "repeat": 1,
  567. "repeat_reason": reason,
  568. "repeat_id": repeat_id,
  569. },
  570. },
  571. })
  572. }
  573. }
  574. }(tmp)
  575. if len(updateExtract) > 500 {
  576. mgo.UpSertBulk(extract, updateExtract...)
  577. updateExtract = [][]map[string]interface{}{}
  578. }
  579. tmp = make(map[string]interface{})
  580. }
  581. wg.Wait()
  582. if len(updateExtract) > 0 {
  583. mgo.UpSertBulk(extract, updateExtract...)
  584. //mgo.UpdateBulk(bidding, updateBidding...)
  585. }
  586. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  587. //任务完成,开始发送广播通知下面节点
  588. if n > repeateN && mapInfo["stop"] == nil {
  589. for _, to := range nextNode {
  590. sid, _ := mapInfo["gtid"].(string)
  591. eid, _ := mapInfo["lteid"].(string)
  592. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  593. by, _ := json.Marshal(map[string]interface{}{
  594. "gtid": sid,
  595. "lteid": eid,
  596. "stype": util.ObjToString(to["stype"]),
  597. "key": key,
  598. })
  599. addr := &net.UDPAddr{
  600. IP: net.ParseIP(to["addr"].(string)),
  601. Port: util.IntAll(to["port"]),
  602. }
  603. node := &udpNode{by, addr, time.Now().Unix(), 0}
  604. udptaskmap.Store(key, node)
  605. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  606. }
  607. }
  608. }
  609. //定时任务
  610. func timedTaskDay() {
  611. c := cron.New()
  612. c.AddFunc("0 0 0 * * ?", func() { timedTaskOnce() }) //每天凌晨执行一次
  613. c.AddFunc("0 0 2 * * ?", func() { movedata() }) //每天凌晨1点执行一次
  614. c.Start()
  615. timedTaskOnce()
  616. }
  617. func timedTaskOnce() {
  618. log.Println("开始一次定时任务")
  619. defer util.Catch()
  620. now := time.Now()
  621. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
  622. curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  623. task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
  624. task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
  625. //发布时间间隔时间 半年
  626. //测试数据 6点每个间隔6个月
  627. //task_sid = "5e20965785a9271abf0ad6bd"
  628. //task_eid = "5e20968d85a9271abf0ad6c2"
  629. //between_time := int64(1565801997)
  630. //测试数据 180个点 每个隔1天
  631. //task_sid = "5e208f9b50b5ea296eccbb8a"
  632. //task_eid = "5e20968d85a9271abf0ad6c2"
  633. //between_time := int64(1563641997)
  634. between_time := curTime.Unix()-(86400*timingPubScope)
  635. lasttime := int64(0)
  636. log.Println(task_sid, task_eid,curTime.Unix(),between_time)
  637. //区间id
  638. q_start := map[string]interface{}{
  639. "_id": map[string]interface{}{
  640. "$gte": StringTOBsonId(task_sid),
  641. "$lte": StringTOBsonId(task_eid),
  642. },
  643. }
  644. sess_start := mgo.GetMgoConn()
  645. defer mgo.DestoryMongoConn(sess_start)
  646. it_start := sess_start.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
  647. startNum := 0
  648. for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); startNum++ {
  649. if startNum%10000 == 0 {
  650. log.Println("正序遍历:", startNum)
  651. }
  652. //取-符合-发布时间半年内的数据
  653. if util.IntAll(tmp_start["dataging"]) == 1 {
  654. pubtime := util.Int64All(tmp_start["publishtime"])
  655. //log.Println(startNum,"--",pubtime,"--",between_time)
  656. if pubtime>0 && pubtime>=between_time {
  657. lasttime = pubtime
  658. log.Println("找到第一条符合条件的数据")
  659. break
  660. }
  661. }
  662. }
  663. log.Println("... ...",lasttime,)
  664. if lasttime <=0 {
  665. log.Println("没找到dataging==1的数据")
  666. return
  667. }
  668. //构建第一条需要判重的数据 (数据池)
  669. log.Println("开始构建第一条需要判重的数据 ---(数据池)")
  670. DM = TimedTaskDatamap(dupdays,lasttime)
  671. sess := mgo.GetMgoConn()
  672. defer mgo.DestoryMongoConn(sess)
  673. q := map[string]interface{}{
  674. "_id": map[string]interface{}{
  675. "$gte": StringTOBsonId(task_sid),
  676. "$lte": StringTOBsonId(task_eid),
  677. },
  678. }
  679. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  680. updateExtract := [][]map[string]interface{}{}
  681. log.Println("线程数只能为1")
  682. pool := make(chan bool, threadNum)
  683. wg := &sync.WaitGroup{}
  684. n, repeateN := 0, 0
  685. pre_publishtime :=int64(0)
  686. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  687. if n%10000 == 0 {
  688. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  689. }
  690. //log.Println("当前测试重复数量:",repeateN)
  691. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
  692. tmp = make(map[string]interface{})
  693. continue
  694. }
  695. if util.IntAll(tmp["dataging"]) != 1 {
  696. tmp = make(map[string]interface{})
  697. continue
  698. }
  699. pool <- true
  700. wg.Add(1)
  701. go func(tmp map[string]interface{}) {
  702. defer func() {
  703. <-pool
  704. wg.Done()
  705. }()
  706. //log.Println("上个时间:",pre_publishtime,"当前时间--",util.Int64All(tmp["publishtime"]))
  707. if pre_publishtime==0 {
  708. pre_publishtime = util.Int64All(tmp["publishtime"])
  709. }else {
  710. //时间跨度是否大于X天
  711. if (util.Int64All(tmp["publishtime"])-pre_publishtime) >=(86400*timingSpanDay) {
  712. //重新构建数据池
  713. //log.Println("超过跨度-重新构建:",util.Int64All(tmp["publishtime"]),"---",pre_publishtime)
  714. pre_publishtime = util.Int64All(tmp["publishtime"])
  715. DM = TimedTaskDatamap(dupdays,pre_publishtime)
  716. }
  717. }
  718. info := NewInfo(tmp)
  719. if !LowHeavy { //是否进行低质量数据判重
  720. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  721. log.Println("测试-无效数据")
  722. updateExtract = append(updateExtract, []map[string]interface{}{
  723. map[string]interface{}{
  724. "_id": tmp["_id"],
  725. },
  726. map[string]interface{}{
  727. "$set": map[string]interface{}{
  728. "repeat": -1, //无效数据标签
  729. "dataging": 0,
  730. },
  731. },
  732. })
  733. if len(updateExtract) > 500 {
  734. mgo.UpSertBulk(extract, updateExtract...)
  735. updateExtract = [][]map[string]interface{}{}
  736. }
  737. return
  738. }
  739. }
  740. b, source, reason := DM.check(info)
  741. log.Println("判重结果",b,reason)
  742. if b { //有重复,生成更新语句,更新抽取和更新招标
  743. repeateN++
  744. var is_replace = false
  745. var mergeArr = []int64{} //更改合并数组记录
  746. var newData = &Info{} //更换新的数据池数据
  747. var repeat_idMap = map[string]interface{}{} //记录判重的
  748. var merge_idMap = map[string]interface{}{} //记录合并的
  749. repeat_idMap["_id"] = StringTOBsonId(info.id)
  750. merge_idMap["_id"] = StringTOBsonId(source.id)
  751. repeat_id := source.id //初始化一个数据
  752. if isMerger { //合并相关
  753. basic_bool := basicDataScore(source, info)
  754. if basic_bool {
  755. //已原始数据为标准 - 对比数据打判重标签-
  756. newData, mergeArr, is_replace = mergeDataFields(source, info)
  757. DM.replaceSourceData(newData, source.id) //替换
  758. //对比数据打重复标签的id,原始数据id的记录
  759. repeat_idMap["_id"] = StringTOBsonId(info.id)
  760. merge_idMap["_id"] = StringTOBsonId(source.id)
  761. repeat_id = source.id
  762. } else {
  763. //已对比数据为标准 ,数据池的数据打判重标签
  764. newData, mergeArr, is_replace = mergeDataFields(info, source)
  765. DM.replaceSourceData(newData, source.id) //替换
  766. //原始数据打重复标签的id, 对比数据id的记录
  767. repeat_idMap["_id"] = StringTOBsonId(source.id)
  768. merge_idMap["_id"] = StringTOBsonId(info.id)
  769. repeat_id = info.id
  770. }
  771. merge_map := make(map[string]interface{}, 0)
  772. if is_replace { //有过合并-更新数据
  773. merge_map = map[string]interface{}{
  774. "$set": map[string]interface{}{
  775. "merge": newData.mergemap,
  776. "dataging": 0,
  777. },
  778. }
  779. //更新合并后的数据
  780. for _, value := range mergeArr {
  781. if value == 0 {
  782. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  783. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  784. } else if value == 1 {
  785. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  786. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  787. } else if value == 2 {
  788. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  789. } else if value == 3 {
  790. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  791. } else if value == 4 {
  792. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  793. } else if value == 5 {
  794. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  795. } else if value == 6 {
  796. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  797. } else if value == 7 {
  798. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  799. } else if value == 8 {
  800. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  801. } else if value == 9 {
  802. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  803. } else if value == 10 {
  804. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  805. } else if value == 11 {
  806. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  807. } else {
  808. }
  809. }
  810. //模板数据更新
  811. updateExtract = append(updateExtract, []map[string]interface{}{
  812. merge_idMap,
  813. merge_map,
  814. })
  815. }
  816. } else { //高质量数据
  817. basic_bool := basicDataScore(source, info)
  818. if !basic_bool {
  819. DM.replaceSourceData(info, source.id) //替换
  820. repeat_idMap["_id"] = StringTOBsonId(source.id)
  821. repeat_id = info.id
  822. }
  823. }
  824. //重复数据打标签
  825. updateExtract = append(updateExtract, []map[string]interface{}{
  826. repeat_idMap,
  827. map[string]interface{}{
  828. "$set": map[string]interface{}{
  829. "repeat": 1,
  830. "repeat_reason": reason,
  831. "repeat_id": repeat_id,
  832. "dataging": 0,
  833. },
  834. },
  835. })
  836. }
  837. }(tmp)
  838. if len(updateExtract) > 500 {
  839. mgo.UpSertBulk(extract, updateExtract...)
  840. updateExtract = [][]map[string]interface{}{}
  841. }
  842. tmp = make(map[string]interface{})
  843. }
  844. wg.Wait()
  845. if len(updateExtract) > 0 {
  846. mgo.UpSertBulk(extract, updateExtract...)
  847. }
  848. log.Println("this timeTask over.", n, "repeateN:", repeateN)
  849. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  850. if n > repeateN {
  851. for _, to := range nextNode {
  852. next_sid := util.BsonIdToSId(task_sid)
  853. next_eid := util.BsonIdToSId(task_eid)
  854. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  855. by, _ := json.Marshal(map[string]interface{}{
  856. "gtid": next_sid,
  857. "lteid": next_eid,
  858. "stype": util.ObjToString(to["stype"]),
  859. "key": key,
  860. })
  861. addr := &net.UDPAddr{
  862. IP: net.ParseIP(to["addr"].(string)),
  863. Port: util.IntAll(to["port"]),
  864. }
  865. node := &udpNode{by, addr, time.Now().Unix(), 0}
  866. udptaskmap.Store(key, node)
  867. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  868. }
  869. }
  870. }
  871. //合并字段-并更新merge字段的值
  872. func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
  873. merge_recordMap := make(map[string]interface{}, 0)
  874. mergeArr := make([]int64, 0)
  875. //是否替换数据了-记录原始的数据
  876. is_replace := false
  877. //1、城市
  878. if source.area == "" || source.area == "全国" {
  879. //为空
  880. if info.area != "全国" && info.area != "" {
  881. merge_recordMap["area"] = source.area
  882. merge_recordMap["city"] = source.city
  883. source.area = info.area
  884. source.city = info.city
  885. mergeArr = append(mergeArr, 1)
  886. is_replace = true
  887. }
  888. } else {
  889. //不为空-查看站点相关-有值必替换
  890. if source.is_site {
  891. //是站点替换的城市
  892. merge_recordMap["site_area"] = source.area
  893. merge_recordMap["site_city"] = source.city
  894. mergeArr = append(mergeArr, 0)
  895. is_replace = true
  896. source.is_site = false
  897. }
  898. }
  899. //2、项目名称
  900. if source.projectname == "" && info.projectname != "" {
  901. merge_recordMap["projectname"] = source.projectname
  902. source.projectname = info.projectname
  903. mergeArr = append(mergeArr, 2)
  904. is_replace = true
  905. }
  906. //3、项目编号
  907. if source.projectcode == "" && info.projectcode != "" {
  908. merge_recordMap["projectcode"] = source.projectcode
  909. source.projectcode = info.projectcode
  910. mergeArr = append(mergeArr, 3)
  911. is_replace = true
  912. }
  913. //4、采购单位
  914. if source.buyer == "" && info.buyer != "" {
  915. merge_recordMap["buyer"] = source.buyer
  916. source.buyer = info.buyer
  917. mergeArr = append(mergeArr, 4)
  918. is_replace = true
  919. }
  920. //5、预算
  921. if source.budget == 0 && info.budget != 0 {
  922. merge_recordMap["budget"] = source.budget
  923. source.budget = info.budget
  924. mergeArr = append(mergeArr, 5)
  925. is_replace = true
  926. }
  927. //6、中标单位
  928. if source.winner == "" && info.winner != "" {
  929. merge_recordMap["winner"] = source.winner
  930. source.winner = info.winner
  931. mergeArr = append(mergeArr, 6)
  932. is_replace = true
  933. }
  934. //7、中标金额
  935. if source.bidamount == 0 && info.bidamount != 0 {
  936. merge_recordMap["bidamount"] = source.bidamount
  937. source.bidamount = info.bidamount
  938. mergeArr = append(mergeArr, 7)
  939. is_replace = true
  940. }
  941. //8、开标时间-地点
  942. if source.bidopentime == 0 && info.bidopentime != 0 {
  943. merge_recordMap["bidopentime"] = source.bidopentime
  944. source.bidopentime = info.bidopentime
  945. mergeArr = append(mergeArr, 8)
  946. is_replace = true
  947. }
  948. //9、合同编号
  949. if source.contractnumber == "" && info.contractnumber != "" {
  950. merge_recordMap["contractnumber"] = source.contractnumber
  951. source.contractnumber = info.contractnumber
  952. mergeArr = append(mergeArr, 9)
  953. is_replace = true
  954. }
  955. //10、发布时间
  956. if source.publishtime == 0 && info.publishtime != 0 {
  957. merge_recordMap["publishtime"] = source.publishtime
  958. source.publishtime = info.publishtime
  959. mergeArr = append(mergeArr, 10)
  960. is_replace = true
  961. }
  962. //11、代理机构
  963. if source.agency == "" && info.agency != "" {
  964. merge_recordMap["agency"] = source.agency
  965. source.agency = info.agency
  966. mergeArr = append(mergeArr, 11)
  967. is_replace = true
  968. }
  969. if is_replace { //有过替换更新
  970. //总次数+1
  971. source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
  972. merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
  973. //和哪一个数据id进行非空替换的-记录
  974. key := info.id
  975. source.mergemap[key] = merge_recordMap
  976. }
  977. //待进一步优化
  978. return source, mergeArr, is_replace
  979. }
  980. //权重评估
  981. func basicDataScore(v *Info, info *Info) bool {
  982. /*
  983. 权重评估
  984. 网站优先级判定规则:
  985. 1、中央>省>市>县区
  986. 2、政府采购>公共资源>采购单位官网>招标代理公司/平台
  987. */
  988. v_score, info_score := -1, -1
  989. dict_v := SiteMap[v.site]
  990. dict_info := SiteMap[info.site]
  991. //先判断level
  992. if dict_v != nil {
  993. v_level := util.ObjToString(dict_v["level"])
  994. if v_level == "中央" {
  995. v_score = 4
  996. } else if v_level == "省级" {
  997. v_score = 3
  998. } else if v_level == "市级" {
  999. v_score = 2
  1000. } else if v_level == "县区" {
  1001. v_score = 1
  1002. } else if v_level == "" {
  1003. } else {
  1004. v_score = 0
  1005. }
  1006. }
  1007. if dict_info != nil {
  1008. info_level := util.ObjToString(dict_info["level"])
  1009. if info_level == "中央" {
  1010. info_score = 4
  1011. } else if info_level == "省级" {
  1012. info_score = 3
  1013. } else if info_level == "市级" {
  1014. info_score = 2
  1015. } else if info_level == "县区" {
  1016. info_score = 1
  1017. } else if info_level == "" {
  1018. } else {
  1019. v_score = 0
  1020. }
  1021. }
  1022. if v_score > info_score {
  1023. return true
  1024. }
  1025. if v_score < info_score {
  1026. return false
  1027. }
  1028. //判断sitetype
  1029. if dict_v != nil {
  1030. v_sitetype := util.ObjToString(dict_v["sitetype"])
  1031. if v_sitetype == "政府采购" || v_sitetype == "政府门户" {
  1032. v_score = 4
  1033. } else if v_sitetype == "公共资源" {
  1034. v_score = 3
  1035. } else if v_sitetype == "官方网站" {
  1036. v_score = 2
  1037. } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
  1038. v_score = 1
  1039. } else if v_sitetype == "" {
  1040. } else {
  1041. v_score = 0
  1042. }
  1043. }
  1044. if dict_info != nil {
  1045. info_sitetype := util.ObjToString(dict_info["sitetype"])
  1046. if info_sitetype == "政府采购" || info_sitetype == "政府门户" {
  1047. info_score = 4
  1048. } else if info_sitetype == "公共资源" {
  1049. info_score = 3
  1050. } else if info_sitetype == "官方网站" {
  1051. info_score = 2
  1052. } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
  1053. info_score = 1
  1054. } else if info_sitetype == "" {
  1055. } else {
  1056. info_score = 0
  1057. }
  1058. }
  1059. if v_score > info_score {
  1060. return true
  1061. }
  1062. if v_score < info_score {
  1063. return false
  1064. }
  1065. //网站评估
  1066. m, n := 0, 0
  1067. if v.projectname != "" {
  1068. m++
  1069. }
  1070. if v.buyer != "" {
  1071. m++
  1072. }
  1073. if v.projectcode != "" || v.contractnumber != "" {
  1074. m++
  1075. }
  1076. if v.budget != 0 {
  1077. m++
  1078. }
  1079. if v.bidamount != 0 {
  1080. m++
  1081. }
  1082. if v.winner != "" {
  1083. m++
  1084. }
  1085. if v.bidopentime != 0 {
  1086. m++
  1087. }
  1088. if v.bidopenaddress != "" {
  1089. m++
  1090. }
  1091. if v.agency != "" {
  1092. m = m + 2
  1093. }
  1094. if v.city != "" {
  1095. m = m + 2
  1096. }
  1097. if info.projectname != "" {
  1098. n++
  1099. }
  1100. if info.buyer != "" {
  1101. n++
  1102. }
  1103. if info.projectcode != "" || info.contractnumber != "" {
  1104. n++
  1105. }
  1106. if info.budget != 0 {
  1107. n++
  1108. }
  1109. if info.bidamount != 0 {
  1110. n++
  1111. }
  1112. if info.winner != "" {
  1113. n++
  1114. }
  1115. if info.bidopentime != 0 {
  1116. n++
  1117. }
  1118. if info.bidopenaddress != "" {
  1119. n++
  1120. }
  1121. if info.agency != "" {
  1122. n = n + 2
  1123. }
  1124. if info.city != "" {
  1125. n = n + 2
  1126. }
  1127. if m > n {
  1128. return true
  1129. } else if m == n {
  1130. if v.publishtime >= info.publishtime {
  1131. return true
  1132. } else {
  1133. return false
  1134. }
  1135. } else {
  1136. return false
  1137. }
  1138. }
  1139. //无效数据
  1140. func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
  1141. var n int
  1142. if d1 != "" {
  1143. n++
  1144. }
  1145. if d2 != "" {
  1146. n++
  1147. }
  1148. if d3 != "" {
  1149. n++
  1150. }
  1151. if d4 != "" {
  1152. n++
  1153. }
  1154. if n == 0 {
  1155. return true
  1156. }
  1157. return false
  1158. }
  1159. //迁移数据dupdays+5之前的数据
  1160. func movedata() {
  1161. sess := mgo.GetMgoConn()
  1162. defer mgo.DestoryMongoConn(sess)
  1163. year, month, day := time.Now().Date()
  1164. q := map[string]interface{}{
  1165. "comeintime": map[string]interface{}{
  1166. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+5) * 24 * time.Hour).Unix(),
  1167. },
  1168. }
  1169. log.Println(q)
  1170. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  1171. index := 0
  1172. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  1173. mgo.Save(extract+"_back", tmp)
  1174. tmp = map[string]interface{}{}
  1175. if index%1000 == 0 {
  1176. log.Println("index", index)
  1177. }
  1178. }
  1179. log.Println("save to", extract+"_back", " ok index", index)
  1180. delnum := mgo.Delete(extract, q)
  1181. log.Println("remove from ", extract, delnum)
  1182. }