zhangjinkun před 5 roky
rodič
revize
ad9636077d
1 změnil soubory, kde provedl 38 přidání a 41 odebrání
  1. 38 41
      udpfilterdup/src/main.go

+ 38 - 41
udpfilterdup/src/main.go

@@ -22,16 +22,16 @@ import (
 )
 
 var (
-	Sysconfig map[string]interface{} //配置文件
-	mconf     map[string]interface{} //mongodb配置信息
-	mgo       *MongodbSim            //mongodb操作对象
-	extract   string
+	Sysconfig    map[string]interface{} //配置文件
+	mconf        map[string]interface{} //mongodb配置信息
+	mgo          *MongodbSim            //mongodb操作对象
+	extract      string
 	extract_back string
-	udpclient mu.UdpClient             //udp对象
-	nextNode  []map[string]interface{} //下节点数组
-	dupdays   = 5                      //初始化判重范围
-	DM        *datamap                 //
-	HM        *historymap              //判重数据
+	udpclient    mu.UdpClient             //udp对象
+	nextNode     []map[string]interface{} //下节点数组
+	dupdays      = 5                      //初始化判重范围
+	DM           *datamap                 //
+	HM           *historymap              //判重数据
 
 	lastid = ""
 
@@ -41,15 +41,15 @@ var (
 	FilterRegTitle_1 = regexp.MustCompile("^_$")
 	FilterRegTitle_2 = regexp.MustCompile("^_$")
 
-	isMerger   bool                              //是否合并
-	Is_Sort    bool                              //是否排序
-	threadNum  int                               //线程数量
-	SiteMap    map[string]map[string]interface{} //站点map
-	LowHeavy   bool                              //低质量数据判重
-	TimingTask bool                              //是否定时任务
-	timingSpanDay int64							//时间跨度
-	timingPubScope int64						//发布时间周期
-	sid, eid   string                            //测试人员判重使用
+	isMerger       bool                              //是否合并
+	Is_Sort        bool                              //是否排序
+	threadNum      int                               //线程数量
+	SiteMap        map[string]map[string]interface{} //站点map
+	LowHeavy       bool                              //低质量数据判重
+	TimingTask     bool                              //是否定时任务
+	timingSpanDay  int64                             //时间跨度
+	timingPubScope int64                             //发布时间周期
+	sid, eid       string                            //测试人员判重使用
 )
 
 func init() {
@@ -120,7 +120,7 @@ func main() {
 }
 
 //测试组人员使用
-func mainT() {
+func main() {
 	/*
 		ObjectId("5da3f31aa5cb26b9b798d3aa")
 		ObjectId("5da418c4a5cb26b9b7e3e9a6")
@@ -132,11 +132,11 @@ func mainT() {
 		log.Println("定时任务测试开始")
 		go timedTaskDay()
 		time.Sleep(99999 * time.Hour)
-	}else {
+	} else {
 		sid = "5c2c10fda5cb26b9b75e6f7f"
 		eid = "5e976e4a50b5ea296ef376b9"
 		log.Println("正常判重测试开始")
-		log.Println(sid,"---",eid)
+		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}
 		if sid == "" || eid == "" {
 			log.Println("sid,eid参数不能为空")
@@ -649,8 +649,8 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 //定时任务
 func timedTaskDay() {
 	c := cron.New()
-	c.AddFunc("0 0 0 * * ?", func() { timedTaskOnce() }) //每天凌晨执行一次
-	c.AddFunc("0 0 2 * * ?", func() { movedata() })      //每天凌晨1点执行一次
+	c.AddFunc("0 0 1 * * ?", func() { movedata() })      //每天凌晨1点执行一次
+	c.AddFunc("0 0 2 * * ?", func() { timedTaskOnce() }) //每天凌晨2点执行一次
 	c.Start()
 	timedTaskOnce()
 }
@@ -659,15 +659,12 @@ func timedTaskOnce() {
 	log.Println("开始一次定时任务")
 	defer util.Catch()
 
-
 	now := time.Now()
 	preTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
 	curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
 	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
 	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
 
-
-
 	//发布时间间隔时间 半年
 	//测试数据 6点每个间隔6个月
 	//task_sid = "5e20965785a9271abf0ad6bd"
@@ -679,9 +676,9 @@ func timedTaskOnce() {
 	//task_eid = "5e20968d85a9271abf0ad6c2"
 	//between_time := int64(1563641997)
 
-	between_time := curTime.Unix()-(86400*timingPubScope)
+	between_time := curTime.Unix() - (86400 * timingPubScope)
 	lasttime := int64(0)
-	log.Println(task_sid, task_eid,curTime.Unix(),between_time)
+	log.Println(task_sid, task_eid, curTime.Unix(), between_time)
 	//区间id
 	q_start := map[string]interface{}{
 		"_id": map[string]interface{}{
@@ -702,7 +699,7 @@ func timedTaskOnce() {
 		if util.IntAll(tmp_start["dataging"]) == 1 {
 			pubtime := util.Int64All(tmp_start["publishtime"])
 			//log.Println(startNum,"--",pubtime,"--",between_time)
-			if pubtime>0 && pubtime>=between_time {
+			if pubtime > 0 && pubtime >= between_time {
 				lasttime = pubtime
 				log.Println("找到第一条符合条件的数据")
 				break
@@ -710,15 +707,15 @@ func timedTaskOnce() {
 		}
 	}
 
-	log.Println("... ...",lasttime,)
-	if lasttime <=0 {
+	log.Println("... ...", lasttime)
+	if lasttime <= 0 {
 		log.Println("没找到dataging==1的数据")
 		return
 	}
 
 	//构建第一条需要判重的数据   (数据池)
 	log.Println("开始构建第一条需要判重的数据 ---(数据池)")
-	DM = TimedTaskDatamap(dupdays,lasttime)
+	DM = TimedTaskDatamap(dupdays, lasttime)
 
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
@@ -735,7 +732,7 @@ func timedTaskOnce() {
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
 	n, repeateN := 0, 0
-	pre_publishtime :=int64(0)
+	pre_publishtime := int64(0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
@@ -760,15 +757,15 @@ func timedTaskOnce() {
 
 			//log.Println("上个时间:",pre_publishtime,"当前时间--",util.Int64All(tmp["publishtime"]))
 
-			if pre_publishtime==0 {
+			if pre_publishtime == 0 {
 				pre_publishtime = util.Int64All(tmp["publishtime"])
-			}else {
+			} else {
 				//时间跨度是否大于X天
-				if (util.Int64All(tmp["publishtime"])-pre_publishtime) >=(86400*timingSpanDay) {
+				if (util.Int64All(tmp["publishtime"]) - pre_publishtime) >= (86400 * timingSpanDay) {
 					//重新构建数据池
 					//log.Println("超过跨度-重新构建:",util.Int64All(tmp["publishtime"]),"---",pre_publishtime)
 					pre_publishtime = util.Int64All(tmp["publishtime"])
-					DM = TimedTaskDatamap(dupdays,pre_publishtime)
+					DM = TimedTaskDatamap(dupdays, pre_publishtime)
 				}
 			}
 
@@ -796,7 +793,7 @@ func timedTaskOnce() {
 			}
 
 			b, source, reason := DM.check(info)
-			log.Println("判重结果",b,reason)
+			log.Println("判重结果", b, reason)
 			if b { //有重复,生成更新语句,更新抽取和更新招标
 				repeateN++
 				var is_replace = false
@@ -1246,20 +1243,20 @@ func movedata() {
 	year, month, day := time.Now().Date()
 	q := map[string]interface{}{
 		"comeintime": map[string]interface{}{
-			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+5) * 24 * time.Hour).Unix(),
+			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix(),
 		},
 	}
 	log.Println(q)
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
 	index := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
-		mgo.Save(extract+"_back", tmp)
+		mgo.Save(extract_back, tmp)
 		tmp = map[string]interface{}{}
 		if index%1000 == 0 {
 			log.Println("index", index)
 		}
 	}
-	log.Println("save to", extract+"_back", " ok index", index)
+	log.Println("save to", extract_back, " ok index", index)
 	delnum := mgo.Delete(extract, q)
 	log.Println("remove from ", extract, delnum)
 }