Browse Source

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

# Conflicts:
#	udpfilterdup/src/main.go
apple 5 năm trước cách đây
mục cha
commit
2ad99d2213
2 tập tin đã thay đổi với 70 bổ sung61 xóa
  1. 69 60
      udpfilterdup/src/main.go
  2. 1 1
      udpfilterdup/src/mgo.go

+ 69 - 60
udpfilterdup/src/main.go

@@ -8,8 +8,6 @@ import (
 	"encoding/json"
 	"flag"
 	"fmt"
-	"github.com/cron"
-	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -18,6 +16,9 @@ import (
 	"regexp"
 	"sync"
 	"time"
+
+	"github.com/cron"
+	"gopkg.in/mgo.v2/bson"
 )
 
 var (
@@ -31,7 +32,7 @@ var (
 	DM        *datamap                 //
 	HM        *historymap              //判重数据
 
-	lastid    = ""
+	lastid = ""
 
 	//正则筛选相关
 	FilterRegTitle   = regexp.MustCompile("^_$")
@@ -39,13 +40,13 @@ var (
 	FilterRegTitle_1 = regexp.MustCompile("^_$")
 	FilterRegTitle_2 = regexp.MustCompile("^_$")
 
-	isMerger         bool                              //是否合并
-	Is_Sort          bool                              //是否排序
-	threadNum        int                               //线程数量
-	SiteMap          map[string]map[string]interface{} //站点map
-	LowHeavy		 bool							   //低质量数据判重
-	TimingTask		 bool							   //是否定时任务
-	sid, eid string                          		   //测试人员判重使用
+	isMerger   bool                              //是否合并
+	Is_Sort    bool                              //是否排序
+	threadNum  int                               //线程数量
+	SiteMap    map[string]map[string]interface{} //站点map
+	LowHeavy   bool                              //低质量数据判重
+	TimingTask bool                              //是否定时任务
+	sid, eid   string                            //测试人员判重使用
 )
 
 func init() {
@@ -74,8 +75,8 @@ func init() {
 	isMerger = Sysconfig["isMerger"].(bool)
 	Is_Sort = Sysconfig["isSort"].(bool)
 	threadNum = util.IntAllDef(Sysconfig["threads"], 1)
-	LowHeavy =  Sysconfig["lowHeavy"].(bool)
-	TimingTask =  Sysconfig["timingTask"].(bool)
+	LowHeavy = Sysconfig["lowHeavy"].(bool)
+	TimingTask = Sysconfig["timingTask"].(bool)
 	//站点配置
 	site := mconf["site"].(map[string]interface{})
 	SiteMap = make(map[string]map[string]interface{}, 0)
@@ -102,15 +103,11 @@ func main() {
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	if TimingTask {
 		go timedTaskDay()
-	}else {
+	} else {
 		udpclient.Listen(processUdpMsg)
 		log.Println("Udp服务监听", updport)
 	}
 
-
-
-
-
 	time.Sleep(99999 * time.Hour)
 }
 
@@ -142,9 +139,6 @@ func mainT() {
 		task([]byte{}, mapinfo)
 		time.Sleep(10 * time.Second)
 	}
-
-
-
 }
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	fmt.Println("接受的段数据")
@@ -183,6 +177,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 	}
 }
+
 //开始判重程序
 func task(data []byte, mapInfo map[string]interface{}) {
 	log.Println("开始数据判重")
@@ -215,7 +210,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
 		}
-		if util.IntAll(tmp["repeat"]) == 1||util.IntAll(tmp["repeat"]) == -1 {
+		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
 			tmp = make(map[string]interface{})
 			repeateN++
 			continue
@@ -228,7 +223,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				wg.Done()
 			}()
 			info := NewInfo(tmp)
-			if !LowHeavy {	//是否进行低质量数据判重
+			if !LowHeavy { //是否进行低质量数据判重
 				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
 					updateExtract = append(updateExtract, []map[string]interface{}{
 						map[string]interface{}{
@@ -236,7 +231,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 						},
 						map[string]interface{}{
 							"$set": map[string]interface{}{
-								"repeat": -1,//无效数据标签
+								"repeat": -1, //无效数据标签
 							},
 						},
 					})
@@ -258,9 +253,9 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				var merge_idMap = map[string]interface{}{}  //记录合并的
 				repeat_idMap["_id"] = StringTOBsonId(info.id)
 				merge_idMap["_id"] = StringTOBsonId(source.id)
-				repeat_id := source.id//初始化一个数据
+				repeat_id := source.id //初始化一个数据
 
-				if isMerger {//合并相关
+				if isMerger { //合并相关
 					basic_bool := basicDataScore(source, info)
 					if basic_bool {
 						//已原始数据为标准 - 对比数据打判重标签-
@@ -324,7 +319,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 							merge_map,
 						})
 					}
-				}else { //高质量数据
+				} else { //高质量数据
 					basic_bool := basicDataScore(source, info)
 					if !basic_bool {
 						DM.replaceSourceData(info, source.id) //替换
@@ -390,7 +385,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
 
-	q:= map[string]interface{}{
+	q := map[string]interface{}{
 		"_id": map[string]interface{}{
 			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
 			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
@@ -468,7 +463,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 				wg.Done()
 			}()
 			info := NewInfo(tmp)
-			if !LowHeavy {	//是否进行低质量数据判重
+			if !LowHeavy { //是否进行低质量数据判重
 				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
 					updateExtract = append(updateExtract, []map[string]interface{}{
 						map[string]interface{}{
@@ -476,7 +471,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 						},
 						map[string]interface{}{
 							"$set": map[string]interface{}{
-								"repeat": -1,//无效数据标签
+								"repeat": -1, //无效数据标签
 							},
 						},
 					})
@@ -580,7 +575,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 								merge_map,
 							})
 						}
-					}else { //高质量数据
+					} else { //高质量数据
 						basic_bool := basicDataScore(source, info)
 						if !basic_bool {
 							HM.replaceSourceData(info, source.id) //替换
@@ -640,23 +635,23 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	}
 }
 
-
 //定时任务
-func timedTaskDay()  {
+func timedTaskDay() {
 	c := cron.New()
 	c.AddFunc("0 0 0 * * ?", func() { timedTaskOnce() }) //每天凌晨执行一次
+	c.AddFunc("0 1 0 * * ?", func() { movedata() })      //每天凌晨1点执行一次
 	c.Start()
 	timedTaskOnce()
 }
-func timedTaskOnce()  {
+func timedTaskOnce() {
 	log.Println("开始一次定时任务")
 	now := time.Now()
-	preTime:=time.Date(now.Year(),now.Month(),now.Day()-1,0,0,0,0,time.Local)
-	curTime:=time.Date(now.Year(),now.Month(),now.Day(),0,0,0,0,time.Local)
+	preTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
+	curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
 	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
 	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
-	lastid :=task_sid
-	log.Println(task_sid,task_eid)
+	lastid := task_sid
+	log.Println(task_sid, task_eid)
 
 	//ObjectId("5da3f31aa5cb26b9b798d3aa")
 	//ObjectId("5da418c4a5cb26b9b7e3e9a6")
@@ -667,7 +662,7 @@ func timedTaskOnce()  {
 	//区间id
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
-			"$gte":  StringTOBsonId(task_sid),
+			"$gte": StringTOBsonId(task_sid),
 			"$lte": StringTOBsonId(task_eid),
 		},
 	}
@@ -679,14 +674,13 @@ func timedTaskOnce()  {
 		if startNum%10000 == 0 {
 			log.Println("正序遍历:", startNum)
 		}
-		if util.IntAll(tmp_start["dataging"])==1 {//取起始id
+		if util.IntAll(tmp_start["dataging"]) == 1 { //取起始id
 			lastid = BsonTOStringId(tmp_start["_id"])
 			break
 		}
 	}
 
-
-	DM = NewDatamap(dupdays,lastid)
+	DM = NewDatamap(dupdays, lastid)
 	log.Println("本地数据加载完成,定时任务数据判重开始")
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
@@ -700,11 +694,11 @@ func timedTaskOnce()  {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
 		}
-		if util.IntAll(tmp["repeat"]) == 1||util.IntAll(tmp["repeat"]) == -1{
+		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
 			tmp = make(map[string]interface{})
 			continue
 		}
-		if util.IntAll(tmp["dataging"])!=1 {
+		if util.IntAll(tmp["dataging"]) != 1 {
 			tmp = make(map[string]interface{})
 			continue
 		}
@@ -717,7 +711,7 @@ func timedTaskOnce()  {
 				wg.Done()
 			}()
 			info := NewInfo(tmp)
-			if !LowHeavy {	//是否进行低质量数据判重
+			if !LowHeavy { //是否进行低质量数据判重
 				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
 					updateExtract = append(updateExtract, []map[string]interface{}{
 						map[string]interface{}{
@@ -725,7 +719,7 @@ func timedTaskOnce()  {
 						},
 						map[string]interface{}{
 							"$set": map[string]interface{}{
-								"repeat": -1,//无效数据标签
+								"repeat":   -1, //无效数据标签
 								"dataging": 0,
 							},
 						},
@@ -748,9 +742,9 @@ func timedTaskOnce()  {
 				var merge_idMap = map[string]interface{}{}  //记录合并的
 				repeat_idMap["_id"] = StringTOBsonId(info.id)
 				merge_idMap["_id"] = StringTOBsonId(source.id)
-				repeat_id := source.id//初始化一个数据
+				repeat_id := source.id //初始化一个数据
 
-				if isMerger {//合并相关
+				if isMerger { //合并相关
 					basic_bool := basicDataScore(source, info)
 					if basic_bool {
 						//已原始数据为标准 - 对比数据打判重标签-
@@ -774,7 +768,7 @@ func timedTaskOnce()  {
 					if is_replace { //有过合并-更新数据
 						merge_map = map[string]interface{}{
 							"$set": map[string]interface{}{
-								"merge": newData.mergemap,
+								"merge":    newData.mergemap,
 								"dataging": 0,
 							},
 						}
@@ -815,7 +809,7 @@ func timedTaskOnce()  {
 							merge_map,
 						})
 					}
-				}else { //高质量数据
+				} else { //高质量数据
 					basic_bool := basicDataScore(source, info)
 					if !basic_bool {
 						DM.replaceSourceData(info, source.id) //替换
@@ -832,7 +826,7 @@ func timedTaskOnce()  {
 							"repeat":        1,
 							"repeat_reason": reason,
 							"repeat_id":     repeat_id,
-							"dataging": 0,
+							"dataging":      0,
 						},
 					},
 				})
@@ -851,7 +845,6 @@ func timedTaskOnce()  {
 	}
 	log.Println("this timeTask over.", n, "repeateN:", repeateN)
 
-
 	//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
 	if n > repeateN {
 		for _, to := range nextNode {
@@ -875,15 +868,6 @@ func timedTaskOnce()  {
 	}
 }
 
-
-
-
-
-
-
-
-
-
 //合并字段-并更新merge字段的值
 func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
 
@@ -1190,3 +1174,28 @@ func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
 	}
 	return false
 }
+
+//迁移数据dupdays+5之前的数据
+func movedata() {
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	year, month, day := time.Now().Date()
+	q := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+5) * 24 * time.Hour).Unix(),
+		},
+	}
+	log.Println(q)
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		mgo.Save(extract+"_back", tmp)
+		tmp = map[string]interface{}{}
+		if index%1000 == 0 {
+			log.Println("index", index)
+		}
+	}
+	log.Println("save to", extract+"_back", " ok index", index)
+	delnum := mgo.Delete(extract, q)
+	log.Println("remove from ", extract, delnum)
+}

+ 1 - 1
udpfilterdup/src/mgo.go

@@ -144,7 +144,7 @@ func (m *MongodbSim) InitPool() {
 	opts := options.Client()
 	opts.SetConnectTimeout(3 * time.Second)
 	opts.ApplyURI("mongodb://" + m.MongodbAddr)
-	opts.SetMaxPoolSize(uint16(m.Size))
+	opts.SetMaxPoolSize(uint64(m.Size))
 	m.pool = make(chan bool, m.Size)
 	opts.SetMaxConnIdleTime(2 * time.Hour)
 	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)