main.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. package src1
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "github.com/cron"
  10. "log"
  11. mu "mfw/util"
  12. "net"
  13. "os"
  14. "qfw/util"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "time"
  19. "gopkg.in/mgo.v2/bson"
  20. )
  21. var (
  22. Sysconfig map[string]interface{} //配置文件
  23. mconf map[string]interface{} //mongodb配置信息
  24. mgo *MongodbSim //mongodb操作对象
  25. extract string
  26. extract_back string
  27. udpclient mu.UdpClient //udp对象
  28. nextNode []map[string]interface{} //下节点数组
  29. dupdays = 5 //初始化判重范围
  30. DM *datamap //
  31. //正则筛选相关
  32. FilterRegTitle = regexp.MustCompile("^_$")
  33. FilterRegTitle_0 = regexp.MustCompile("^_$")
  34. FilterRegTitle_1 = regexp.MustCompile("^_$")
  35. FilterRegTitle_2 = regexp.MustCompile("^_$")
  36. isMerger bool //是否合并
  37. threadNum int //线程数量
  38. SiteMap map[string]map[string]interface{} //站点map
  39. LowHeavy bool //低质量数据判重
  40. TimingTask bool //是否定时任务
  41. timingSpanDay int64 //时间跨度
  42. timingPubScope int64 //发布时间周期
  43. sid,eid,lastid string //测试人员判重使用
  44. IdType bool //默认object类型
  45. )
  46. func init() {
  47. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  48. flag.StringVar(&sid, "sid", "", "开始id")
  49. flag.StringVar(&eid, "eid", "", "结束id")
  50. flag.Parse()
  51. //172.17.145.163:27080
  52. util.ReadConfig(&Sysconfig)
  53. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  54. mconf = Sysconfig["mongodb"].(map[string]interface{})
  55. mgo = &MongodbSim{
  56. MongodbAddr: mconf["addr"].(string),
  57. DbName: mconf["db"].(string),
  58. Size: util.IntAllDef(mconf["pool"], 10),
  59. }
  60. mgo.InitPool()
  61. extract = mconf["extract"].(string)
  62. extract_back = mconf["extract_back"].(string)
  63. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  64. //加载数据
  65. DM = NewDatamap(dupdays, lastid)
  66. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  67. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  68. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  69. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  70. isMerger = Sysconfig["isMerger"].(bool)
  71. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  72. LowHeavy = Sysconfig["lowHeavy"].(bool)
  73. TimingTask = Sysconfig["timingTask"].(bool)
  74. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  75. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  76. //站点配置
  77. site := mconf["site"].(map[string]interface{})
  78. SiteMap = make(map[string]map[string]interface{}, 0)
  79. start := int(time.Now().Unix())
  80. sess_site := mgo.GetMgoConn()
  81. defer mgo.DestoryMongoConn(sess_site)
  82. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  83. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  84. data_map := map[string]interface{}{
  85. "area": util.ObjToString(site_dict["area"]),
  86. "city": util.ObjToString(site_dict["city"]),
  87. "district": util.ObjToString(site_dict["district"]),
  88. "sitetype": util.ObjToString(site_dict["sitetype"]),
  89. "level": util.ObjToString(site_dict["level"]),
  90. "weight": util.ObjToString(site_dict["weight"]),
  91. }
  92. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  93. }
  94. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  95. }
  96. func mainT() {
  97. go checkMapJob()
  98. updport := Sysconfig["udpport"].(string)
  99. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  100. udpclient.Listen(processUdpMsg)
  101. log.Println("Udp服务监听", updport)
  102. if TimingTask {
  103. go timedTaskDay()
  104. }
  105. time.Sleep(99999 * time.Hour)
  106. }
  107. //测试组人员使用
  108. func main() {
  109. if TimingTask {
  110. log.Println("定时任务测试开始")
  111. go timedTaskDay()
  112. time.Sleep(99999 * time.Hour)
  113. } else {
  114. //IdType = true //打开id字符串模式
  115. sid = "5ef01220801f744d045f51f1"
  116. eid = "5ef61eb3801f744d046402dd"
  117. log.Println("正常判重测试开始")
  118. log.Println(sid, "---", eid)
  119. mapinfo := map[string]interface{}{}
  120. if sid == "" || eid == "" {
  121. log.Println("sid,eid参数不能为空")
  122. os.Exit(0)
  123. }
  124. mapinfo["gtid"] = sid
  125. mapinfo["lteid"] = eid
  126. mapinfo["stop"] = "true"
  127. task([]byte{}, mapinfo)
  128. time.Sleep(99999 * time.Hour)
  129. }
  130. }
  131. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  132. fmt.Println("接受的段数据")
  133. switch act {
  134. case mu.OP_TYPE_DATA: //上个节点的数据
  135. //从表中开始处理
  136. var mapInfo map[string]interface{}
  137. err := json.Unmarshal(data, &mapInfo)
  138. log.Println("err:", err, "mapInfo:", mapInfo)
  139. if err != nil {
  140. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  141. } else if mapInfo != nil {
  142. taskType := util.ObjToString(mapInfo["stype"])
  143. if taskType == "normalTask" {
  144. //判重流程
  145. go task(data, mapInfo)
  146. } else {
  147. //其他
  148. go task(data, mapInfo)
  149. }
  150. key, _ := mapInfo["key"].(string)
  151. if key == "" {
  152. key = "udpok"
  153. }
  154. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  155. }
  156. case mu.OP_NOOP: //下个节点回应
  157. ok := string(data)
  158. if ok != "" {
  159. log.Println("ok:", ok)
  160. udptaskmap.Delete(ok)
  161. }
  162. }
  163. }
  164. //开始判重程序
  165. func task(data []byte, mapInfo map[string]interface{}) {
  166. log.Println("开始数据判重")
  167. defer util.Catch()
  168. //区间id
  169. q := map[string]interface{}{
  170. "_id": map[string]interface{}{
  171. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  172. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  173. },
  174. }
  175. if IdType {
  176. q = map[string]interface{}{
  177. "_id": map[string]interface{}{
  178. "$gt": mapInfo["gtid"].(string),
  179. "$lte": mapInfo["lteid"].(string),
  180. },
  181. }
  182. }
  183. log.Println(mgo.DbName, extract, q)
  184. sess := mgo.GetMgoConn()
  185. defer mgo.DestoryMongoConn(sess)
  186. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  187. updateExtract := [][]map[string]interface{}{}
  188. ids:=[]string{}
  189. pool := make(chan bool, threadNum)
  190. wg := &sync.WaitGroup{}
  191. n, repeateN := 0, 0
  192. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  193. if n%10000 == 0 {
  194. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  195. }
  196. source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
  197. if util.IntAll((*source)["sourcewebsite"]) == 1 && tmp["attach_text"]!=nil{
  198. repeateN++
  199. updateExtract = append(updateExtract, []map[string]interface{}{
  200. map[string]interface{}{
  201. "_id": tmp["_id"],
  202. },
  203. map[string]interface{}{
  204. "$set": map[string]interface{}{
  205. "repeat": 1,
  206. "dataging":0,
  207. "repeat_reason": "sourcewebsite为1,重复",
  208. },
  209. },
  210. })
  211. if len(updateExtract) >= 200 {
  212. mgo.UpSertBulk(extract, updateExtract...)
  213. updateExtract = [][]map[string]interface{}{}
  214. }
  215. tmp = make(map[string]interface{})
  216. continue
  217. }
  218. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
  219. util.IntAll(tmp["dataging"]) == 1 {
  220. if util.IntAll(tmp["repeat"]) == 1 {
  221. repeateN++
  222. }
  223. tmp = make(map[string]interface{})
  224. continue
  225. }
  226. pool <- true
  227. wg.Add(1)
  228. go func(tmp map[string]interface{}) {
  229. defer func() {
  230. <-pool
  231. wg.Done()
  232. }()
  233. info := NewInfo(tmp)
  234. if !LowHeavy { //是否进行低质量数据判重
  235. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  236. updateExtract = append(updateExtract, []map[string]interface{}{
  237. map[string]interface{}{
  238. "_id": tmp["_id"],
  239. },
  240. map[string]interface{}{
  241. "$set": map[string]interface{}{
  242. "repeat": -1, //无效数据标签
  243. },
  244. },
  245. })
  246. if len(updateExtract) >= 200 {
  247. mgo.UpSertBulk(extract, updateExtract...)
  248. updateExtract = [][]map[string]interface{}{}
  249. }
  250. return
  251. }
  252. }
  253. //正常判重
  254. b, source, reason := DM.check(info)
  255. if b { //有重复,生成更新语句,更新抽取和更新招标
  256. repeateN++
  257. var updateID = map[string]interface{}{} //记录更新判重的
  258. updateID["_id"] = StringTOBsonId(info.id)
  259. if IdType {
  260. updateID["_id"] = info.id
  261. }
  262. updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
  263. updateID,
  264. map[string]interface{}{
  265. "$set": map[string]interface{}{
  266. "repeat": 1,
  267. "repeat_reason": reason,
  268. "repeat_id": source.id,
  269. },
  270. },
  271. })
  272. }
  273. }(tmp)
  274. if len(updateExtract) >= 200 {
  275. mgo.UpSertBulk(extract, updateExtract...)
  276. updateExtract = [][]map[string]interface{}{}
  277. }
  278. tmp = make(map[string]interface{})
  279. }
  280. wg.Wait()
  281. if len(updateExtract) > 0 {
  282. mgo.UpSertBulk(extract, updateExtract...)
  283. }
  284. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  285. //任务完成,开始发送广播通知下面节点
  286. if n > repeateN && mapInfo["stop"] == nil {
  287. log.Println("判重任务完成发送udp")
  288. for _, to := range nextNode {
  289. sid, _ := mapInfo["gtid"].(string)
  290. eid, _ := mapInfo["lteid"].(string)
  291. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  292. by, _ := json.Marshal(map[string]interface{}{
  293. "gtid": sid,
  294. "lteid": eid,
  295. "stype": util.ObjToString(to["stype"]),
  296. "key": key,
  297. "ids": strings.Join(ids, ","),
  298. })
  299. addr := &net.UDPAddr{
  300. IP: net.ParseIP(to["addr"].(string)),
  301. Port: util.IntAll(to["port"]),
  302. }
  303. node := &udpNode{by, addr, time.Now().Unix(), 0}
  304. udptaskmap.Store(key, node)
  305. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  306. }
  307. }
  308. }
  309. //定时任务--定时任务--定时任务
  310. func timedTaskDay() {
  311. log.Println("部署定时任务")
  312. c := cron.New()
  313. c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() })
  314. c.Start()
  315. }
  316. func timedTaskOnce() {
  317. defer util.Catch()
  318. log.Println("开始一次迁移任务")
  319. movedata()
  320. log.Println("开始一次任务判重")
  321. //当前时间-8 -4 小时
  322. now := time.Now()
  323. log.Println(now)
  324. preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local)
  325. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local)
  326. log.Println(preTime,curTime)
  327. task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
  328. task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
  329. between_time := curTime.Unix() - (86400 * timingPubScope)
  330. log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
  331. //区间id
  332. q_start := map[string]interface{}{
  333. "_id": map[string]interface{}{
  334. "$gte": StringTOBsonId(task_sid),
  335. "$lte": StringTOBsonId(task_eid),
  336. },
  337. }
  338. sess := mgo.GetMgoConn()
  339. defer mgo.DestoryMongoConn(sess)
  340. it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
  341. num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
  342. updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
  343. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  344. dayArr := []map[string]interface{}{}
  345. for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
  346. if num%10000 == 0 {
  347. log.Println("正序遍历:", num)
  348. }
  349. source := util.ObjToMap(tmp["jsondata"])
  350. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  351. updateExtract = append(updateExtract, []map[string]interface{}{
  352. map[string]interface{}{
  353. "_id": tmp["_id"],
  354. },
  355. map[string]interface{}{
  356. "$set": map[string]interface{}{
  357. "repeat": 1,
  358. "dataging": 0,
  359. "repeat_reason": "sourcewebsite为1 重复",
  360. },
  361. },
  362. })
  363. if len(updateExtract) > 50 {
  364. mgo.UpSertBulk(extract, updateExtract...)
  365. updateExtract = [][]map[string]interface{}{}
  366. }
  367. tmp = make(map[string]interface{})
  368. continue
  369. }
  370. //取-符合-发布时间X年内的数据
  371. if util.IntAll(tmp["dataging"]) == 1 {
  372. pubtime := util.Int64All(tmp["publishtime"])
  373. if pubtime > 0 && pubtime >= between_time {
  374. oknum++
  375. if deterTime==0 {
  376. log.Println("找到第一条符合条件的数据")
  377. deterTime = util.Int64All(tmp["publishtime"])
  378. dayArr = append(dayArr,tmp)
  379. }else {
  380. if pubtime-deterTime >timingSpanDay*86400 {
  381. //新数组重新构建,当前组数据加到全部组数据
  382. pendAllArr = append(pendAllArr,dayArr)
  383. dayArr = []map[string]interface{}{}
  384. deterTime = util.Int64All(tmp["publishtime"])
  385. dayArr = append(dayArr,tmp)
  386. }else {
  387. dayArr = append(dayArr,tmp)
  388. }
  389. }
  390. }else {
  391. //不在两年内的也清标记
  392. updateExtract = append(updateExtract, []map[string]interface{}{
  393. map[string]interface{}{
  394. "_id": tmp["_id"],
  395. },
  396. map[string]interface{}{
  397. "$set": map[string]interface{}{
  398. "dataging": 0,
  399. },
  400. },
  401. })
  402. if len(updateExtract) > 50 {
  403. mgo.UpSertBulk(extract, updateExtract...)
  404. updateExtract = [][]map[string]interface{}{}
  405. }
  406. }
  407. }
  408. tmp = make(map[string]interface{})
  409. }
  410. //批量更新标记
  411. if len(updateExtract) > 0 {
  412. mgo.UpSertBulk(extract, updateExtract...)
  413. updateExtract = [][]map[string]interface{}{}
  414. }
  415. if len(dayArr)>0 {
  416. pendAllArr = append(pendAllArr,dayArr)
  417. dayArr = []map[string]interface{}{}
  418. }
  419. log.Println("查询数量:",num,"符合条件:",oknum)
  420. if len(pendAllArr) <= 0 {
  421. log.Println("没找到dataging==1的数据")
  422. return
  423. }
  424. //测试分组数量是否正确
  425. testNum:=0
  426. for k,v:=range pendAllArr {
  427. log.Println("第",k,"组--","数量:",len(v))
  428. testNum = testNum+len(v)
  429. }
  430. log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
  431. n, repeateN := 0, 0
  432. for k,v:=range pendAllArr { //每组结束更新一波数据
  433. //构建当前组的数据池
  434. log.Println("构建第",k,"组---(数据池)")
  435. //当前组的第一个发布时间
  436. first_pt :=util.Int64All(v[0]["publishtime"])
  437. DM = TimedTaskDatamap(dupdays, first_pt)
  438. log.Println("开始遍历判重第",k,"组 共计数量:",len(v))
  439. n = n+len(v)
  440. log.Println("统计目前总数量:",n,"重复数量:",repeateN)
  441. for _,tmp:=range v {
  442. info := NewInfo(tmp)
  443. if !LowHeavy { //是否进行低质量数据判重
  444. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  445. log.Println("无效数据")
  446. updateExtract = append(updateExtract, []map[string]interface{}{
  447. map[string]interface{}{
  448. "_id": tmp["_id"],
  449. },
  450. map[string]interface{}{
  451. "$set": map[string]interface{}{
  452. "repeat": -1, //无效数据标签
  453. "dataging": 0,
  454. },
  455. },
  456. })
  457. if len(updateExtract) > 50 {
  458. mgo.UpSertBulk(extract, updateExtract...)
  459. updateExtract = [][]map[string]interface{}{}
  460. }
  461. continue
  462. }
  463. }
  464. b, source, reason := DM.check(info)
  465. if b { //有重复,生成更新语句,更新抽取和更新招标
  466. log.Println("判重结果", b, reason,"目标id",info.id)
  467. repeateN++
  468. //重复数据打标签
  469. updateExtract = append(updateExtract, []map[string]interface{}{
  470. map[string]interface{}{
  471. "_id": tmp["_id"],
  472. },
  473. map[string]interface{}{
  474. "$set": map[string]interface{}{
  475. "repeat": 1,
  476. "repeat_reason": reason,
  477. "repeat_id": source.id,
  478. "dataging": 0,
  479. },
  480. },
  481. })
  482. }else {
  483. updateExtract = append(updateExtract, []map[string]interface{}{
  484. map[string]interface{}{
  485. "_id": tmp["_id"],
  486. },
  487. map[string]interface{}{
  488. "$set": map[string]interface{}{
  489. "dataging": 0,//符合条件的都为dataging==0
  490. },
  491. },
  492. })
  493. }
  494. if len(updateExtract) > 50 {
  495. mgo.UpSertBulk(extract, updateExtract...)
  496. updateExtract = [][]map[string]interface{}{}
  497. }
  498. }
  499. //每组数据结束-更新数据
  500. if len(updateExtract) > 0 {
  501. mgo.UpSertBulk(extract, updateExtract...)
  502. updateExtract = [][]map[string]interface{}{}
  503. }
  504. }
  505. if len(updateExtract) > 0 {
  506. mgo.UpSertBulk(extract, updateExtract...)
  507. updateExtract = [][]map[string]interface{}{}
  508. }
  509. log.Println("this timeTask over.", n, "repeateN:", repeateN)
  510. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  511. if n > repeateN {
  512. for _, to := range nextNode {
  513. next_sid := util.BsonIdToSId(task_sid)
  514. next_eid := util.BsonIdToSId(task_eid)
  515. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  516. by, _ := json.Marshal(map[string]interface{}{
  517. "gtid": next_sid,
  518. "lteid": next_eid,
  519. "stype": util.ObjToString(to["stype"]),
  520. "key": key,
  521. })
  522. addr := &net.UDPAddr{
  523. IP: net.ParseIP(to["addr"].(string)),
  524. Port: util.IntAll(to["port"]),
  525. }
  526. node := &udpNode{by, addr, time.Now().Unix(), 0}
  527. udptaskmap.Store(key, node)
  528. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  529. }
  530. }
  531. }
  532. //迁移数据dupdays+5之前的数据
  533. func movedata() {
  534. sess := mgo.GetMgoConn()
  535. defer mgo.DestoryMongoConn(sess)
  536. year, month, day := time.Now().Date()
  537. now:=time.Now()
  538. move_time := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local).Unix()
  539. q := map[string]interface{}{
  540. "comeintime": map[string]interface{}{
  541. "$lt": move_time,
  542. },
  543. }
  544. log.Println(q)
  545. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  546. index := 0
  547. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  548. mgo.Save(extract_back, tmp)
  549. tmp = map[string]interface{}{}
  550. if index%1000 == 0 {
  551. log.Println("index", index)
  552. }
  553. }
  554. log.Println("save to", extract_back, " ok index", index)
  555. qv := map[string]interface{}{
  556. "comeintime": map[string]interface{}{
  557. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(),
  558. },
  559. }
  560. delnum := mgo.Delete(extract, qv)
  561. log.Println("remove from ", extract, delnum)
  562. }