Browse Source

插入合并

zhangjinkun 6 năm trước cách đây
mục cha
commit
09a7e52ad0

+ 8 - 5
udpprojectset/src/config.json

@@ -8,10 +8,10 @@
     },
     "thread": 1,
     "extractColl": "bidding20190521",
-    "projectColl": "projectset",
+    "projectColl": "projectset_inc",
     "lenprojectname": 18,
     "redisPoolSize": 60,
-    "redisaddrs": "ids=192.168.3.18:3379,keys=192.168.3.18:3379,info=192.168.3.18:3379",
+    "redisaddrs": "ids=127.0.0.1:6379,keys=127.0.0.1:6379,info=127.0.0.1:6379",
     "clearedis": {
         "open": true,
         "clearcron": "0 10 15 ? * 4",
@@ -23,9 +23,12 @@
     },
     "taskstock": {
         "open": true,
-		"startTime":1325347200,
-        "startdate": "2015-11-01",
-        "endate": "2019-06-30"
+        "startTime": 1446543672
+    },
+    "insertmeger": {
+        "omitmax": 10000,
+        "deviationday": 90,
+        "hourinterval": 240
     },
     "udpport": ":1482",
     "nextNode": [

+ 0 - 5
udpprojectset/src/configinc.json

@@ -1,5 +0,0 @@
-{
-    "omitmax": 1000,
-    "deviationday": 90,
-    "hourinterval": 240
-}

+ 12 - 112
udpprojectset/src/main.go

@@ -42,6 +42,7 @@ var (
 
 	currentMegerTime  int64 //合并项目的时间位置,用来清理几个月之前的项目
 	currentMegerCount int   //合并项目的计数,用来定时清理
+
 )
 
 type MegerFields struct {
@@ -78,6 +79,13 @@ func init() {
 		ProjectNamelen: util.IntAllDef(megerfields["projectlen"], 5),
 		ProjectCodelen: util.IntAllDef(megerfields["projectcodelen"], 8),
 	}
+	//插入合并参数
+	if insertmeger, ok := Sysconfig["insertmeger"].(map[string]interface{}); ok {
+		OmitNumMax = util.Int64All(insertmeger["omitmax"])
+		DeviationDay = util.Int64All(insertmeger["deviationday"])
+		HourInterval = util.Int64All(insertmeger["hourinterval"])
+	}
+
 	redis.InitRedisBySize(Sysconfig["redisaddrs"].(string), util.IntAllDef(Sysconfig["redisPoolSize"], 100), 30, 300)
 	MQFW = mongodb.MongodbSim{
 		MongodbAddr: Sysconfig["mongodbServers"].(string),
@@ -148,11 +156,7 @@ func main() {
 	if taskstock, ok := Sysconfig["taskstock"].(map[string]interface{}); ok { //跑存量数据
 		if b, _ := taskstock["open"].(bool); b {
 			RunFullData(util.Int64All(taskstock["startTime"]))
-			//			startdate, _ := taskstock["startdate"].(string)
-			//			endate, _ := taskstock["endate"].(string)
-			//			taskStock(startdate, endate)
 		}
-
 	}
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -204,7 +208,7 @@ func taskInc(mapInfo map[string]interface{}) {
 	sess := MQFW.GetMgoConn()
 	defer MQFW.DestoryMongoConn(sess)
 	//数据正序处理
-	it := sess.DB(MQFW.DbName).C(extractColl).Find(map[string]interface{}{}).Sort("publishtime").Iter()
+	it := sess.DB(MQFW.DbName).C(extractColl).Find(q).Sort("publishtime").Iter()
 	count, index := 0, 0
 	pici := time.Now().Unix()
 	wg := &sync.WaitGroup{}
@@ -219,6 +223,9 @@ func taskInc(mapInfo map[string]interface{}) {
 			continue
 		}
 		pt := util.Int64All(tmp["publishtime"])
+		if time.Now().Unix()-DeviationDay*86400 > pt { //DeviationDay前的数据不处理,走插入何必
+			continue
+		}
 		if pt > currentMegerTime {
 			currentMegerTime = pt
 		}
@@ -291,113 +298,6 @@ func taskInc(mapInfo map[string]interface{}) {
 	}
 }
 
-func taskStock(startDate, endDate string) {
-	defer func() {
-		<-SingleThread
-	}()
-	defer util.Catch()
-	publishtimes := []map[string]interface{}{}
-	start, _ := time.ParseInLocation(util.Date_Short_Layout, startDate, time.Local)
-	end, _ := time.ParseInLocation(util.Date_Short_Layout, endDate, time.Local)
-	for {
-		publishtime := map[string]interface{}{
-			"date":  start.Format(util.Date_Short_Layout),
-			"stime": start.Unix(),
-			"etime": start.Add(24 * time.Hour).Unix(),
-		}
-		publishtimes = append(publishtimes, publishtime)
-		start = start.Add(24 * time.Hour)
-		if start.Unix() > end.Unix() {
-			break
-		}
-	}
-	sess := MQFW.GetMgoConn()
-	defer MQFW.DestoryMongoConn(sess)
-	wg := &sync.WaitGroup{}
-	idmap := &sync.Map{}
-	count, index := 0, 0
-	for _, v := range publishtimes {
-		q := map[string]interface{}{
-			"publishtime": map[string]interface{}{
-				"$gt":  util.Int64All(v["stime"]),
-				"$lte": util.Int64All(v["etime"]),
-			},
-		}
-		log.Println(q)
-		//数据正序处理
-		it := sess.DB(MQFW.DbName).C(extractColl).Find(&q).Sort("publishtime").Iter()
-		datenum := 0
-		for tmp := make(map[string]interface{}); it.Next(tmp); {
-			if index%10000 == 0 {
-				log.Println(index, tmp["_id"])
-			}
-			index++
-			datenum++
-			if util.IntAll(tmp["repeat"]) == 1 {
-				tmp = make(map[string]interface{})
-				continue
-			}
-			pt := util.Int64All(tmp["publishtime"])
-			if pt > currentMegerTime {
-				currentMegerTime = pt
-			}
-			count++
-			currentMegerCount++
-			if currentMegerCount > 300000 {
-				log.Println("执行清理", currentMegerTime)
-				time.Sleep(1 * time.Second)
-				clearPKey()
-				currentMegerCount = 0
-			}
-			thisid := util.BsonIdToSId(tmp["_id"])
-			b, err := redis.Exists(INFOID, thisid)
-			if err != nil {
-				log.Println("checkid err", err.Error())
-			}
-			if !b {
-				wg.Add(1)
-				idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成
-				MultiThread <- true
-				go func(tmp map[string]interface{}, thisid string) {
-					defer func() {
-						<-MultiThread
-						wg.Done()
-						idmap.Delete(tmp["_id"])
-					}()
-					info := PreThisInfo(tmp)
-					if info != nil {
-						lockPNCBMap(info)
-						storeLock(info)
-						startProjectMerge(info, tmp)
-						redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
-						currentMegerTime = info.Publishtime
-						unlockPNCBMap(info)
-					}
-				}(tmp, thisid)
-			}
-			if count%1000 == 0 {
-				log.Println("count:", count)
-			}
-			tmp = make(map[string]interface{})
-		}
-		log.Println(v["date"], datenum)
-	}
-	for {
-		time.Sleep(5 * time.Second)
-		n := 0
-		idmap.Range(func(key interface{}, v interface{}) bool {
-			n++
-			log.Println(key, v)
-			return true
-		})
-		if n < 1 {
-			break
-		}
-	}
-	wg.Wait()
-	log.Println("taskStock over...", index, count)
-}
-
 func NewPushInfo(tmp map[string]interface{}) bson.M {
 	return bson.M{
 		"comeintime":  tmp["comeintime"],

+ 8 - 13
udpprojectset/src/projectmegerinsert.go

@@ -24,7 +24,6 @@ const (
 )
 
 var (
-	ConfigInc                              map[string]interface{}
 	OmitNumMax, DeviationDay, HourInterval int64 //提取最大遗漏数据量,项目查询时间修正区间,轮循间隔
 	InfoRScore                             = map[string][]*RScoreInfo{}
 	PnAll, PcAll, PbAll                    = map[string][]string{}, map[string][]string{}, map[string][]string{}
@@ -53,14 +52,6 @@ type MegerInfo struct { //待合并分段数据
 	Lock             *sync.Mutex
 }
 
-func init() {
-	qu.ReadConfig("./configinc.json", &ConfigInc)
-	OmitNumMax = qu.Int64All(ConfigInc["omitmax"])
-	DeviationDay = qu.Int64All(ConfigInc["deviationday"])
-	HourInterval = qu.Int64All(ConfigInc["hourinterval"])
-
-}
-
 var StartId string
 
 func main_inc() {
@@ -78,6 +69,10 @@ func main_inc() {
 func startInsertMeger(startId string) {
 	datas := getOmitData(startId)
 	for _, minfo := range datas {
+		if int64(minfo.Num) < OmitNumMax {
+			log.Println("分段信息量太小,不执行", minfo.Num)
+			continue
+		}
 		getPncbKey(minfo)
 		compareMeger(minfo)
 		//清空相关信息
@@ -97,6 +92,7 @@ func getOmitData(startId string) (list []*MegerInfo) {
 			"$gt": bson.ObjectIdHex(startId),
 		},
 	}
+	log.Println(MQFW.DbName, extractColl, q)
 	sess := MQFW.GetMgoConn()
 	defer MQFW.DestoryMongoConn(sess)
 	//数据正序处理
@@ -135,8 +131,6 @@ func getOmitData(startId string) (list []*MegerInfo) {
 		} else {
 			minfo.EndPublishtime = this.Publishtime
 		}
-		log.Println(this.Id)
-		lastId = this.Id
 		//分段
 		if (minfo.EndPublishtime-minfo.StartPublishtime)/int64(86400) > DeviationDay*int64(2) || OmitNumMax <= int64(minfo.Num) {
 			log.Println(len(list), "段信息加载完成,信息量", minfo.Num)
@@ -144,6 +138,7 @@ func getOmitData(startId string) (list []*MegerInfo) {
 			minfo = &MegerInfo{}
 			break
 		}
+		lastId = this.Id
 		minfo.Minfo = append(minfo.Minfo, this)
 		minfo.Num += 1
 	}
@@ -176,8 +171,8 @@ func getPncbKey(minfo *MegerInfo) {
 	//pn,pc,pb加载内存中
 	sess := MQFW.GetMgoConn()
 	defer MQFW.DestoryMongoConn(sess)
-	//it := sess.DB(MQFW.DbName).C(projectColl).Find(q).Sort("lastpublishtime").Iter()
-	it := sess.DB(MQFW.DbName).C(projectColl).Find(map[string]interface{}{}).Sort("pici").Iter()
+	it := sess.DB(MQFW.DbName).C(projectColl).Find(q).Sort("lastpublishtime").Iter()
+	//it := sess.DB(MQFW.DbName).C(projectColl).Find(map[string]interface{}{}).Sort("pici").Iter()
 	for tmp := make(map[string]interface{}); it.Next(tmp); {
 		if qu.ObjToString(tmp["meger_sflag"]) == "normal" {
 			pn := "pn_" + qu.ObjToString(tmp["projectname"])