فهرست منبع

Merge branch 'dev3.2' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.2

6 سال پیش
والد
کامیت
8e400c2a16
3فایلهای تغییر یافته به همراه124 افزوده شده و 6 حذف شده
  1. 3 1
      udpprojectset/src/cleareids.go
  2. 7 3
      udpprojectset/src/config.json
  3. 114 2
      udpprojectset/src/main.go

+ 3 - 1
udpprojectset/src/cleareids.go

@@ -31,7 +31,7 @@ func clearedis() {
 
 func clearPKey() {
 	log.Println("开始清理")
-	nowtime := currentMegerTime //time.Now().Unix()
+	nowtime := currentMegerTime //time.Now().Unix() int64(1461204000)
 	wg := sync.WaitGroup{}
 	for _, pncb := range []*KeyMap{PNKey, PCKey, PBKey} {
 		wg.Add(1)
@@ -56,6 +56,7 @@ func clearPNCBKey(pncb *KeyMap, nowtime int64) {
 func clearIdsKeys(pKey *KeyMap, nowtime int64) []string {
 	defer util.Catch()
 	delkey := []string{}
+	pKey.Lock.Lock()
 	for k, ma := range pKey.Map {
 		ids := ma.Arr
 		delids := []interface{}{}
@@ -94,6 +95,7 @@ func clearIdsKeys(pKey *KeyMap, nowtime int64) []string {
 			}
 		}
 	}
+	pKey.Lock.Unlock()
 	return delkey
 }
 

+ 7 - 3
udpprojectset/src/config.json

@@ -6,12 +6,12 @@
         "to": "zhangjinkun@topnet.net.cn",
         "api": "http://10.171.112.160:19281/_send/_mail"
     },
-    "thread": 30,
-    "extractColl": "bidding20190423",
+    "thread": 1,
+    "extractColl": "bidding20190521",
     "projectColl": "projectset",
     "lenprojectname": 18,
     "redisPoolSize": 60,
-    "redisaddrs": "ids=127.0.0.1:6379,keys=127.0.0.1:6379,info=127.0.0.1:6379",
+    "redisaddrs": "ids=192.168.3.18:3379,keys=192.168.3.18:3379,info=192.168.3.18:3379",
     "clearedis": {
         "open": true,
         "clearcron": "0 10 15 ? * 4",
@@ -21,6 +21,10 @@
         "projectlen": 5,
         "projectcodelen": 8
     },
+    "taskstock": {
+        "open": true,
+        "endate": "2019-06-30"
+    },
     "udpport": ":1482",
     "nextNode": [
         {

+ 114 - 2
udpprojectset/src/main.go

@@ -146,6 +146,12 @@ func main() {
 	log.Println("load data from redis finished.", n)
 	//清理redis
 	//clearedis()
+	if taskstock, ok := Sysconfig["taskstock"].(map[string]interface{}); ok { //跑存量数据
+		if b, _ := taskstock["open"].(bool); b {
+			endate, _ := taskstock["endate"].(string)
+			taskStock(endate)
+		}
+	}
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
@@ -168,7 +174,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			}
 			go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
 			SingleThread <- true
-			go task(data, mapInfo)
+			go taskInc(mapInfo)
 		}
 	case mu.OP_NOOP: //下个节点回应
 		ok := string(data)
@@ -179,7 +185,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 }
 
-func task(data []byte, mapInfo map[string]interface{}) {
+func taskInc(mapInfo map[string]interface{}) {
 	defer func() {
 		<-SingleThread
 	}()
@@ -282,6 +288,112 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	}
 }
 
+func taskStock(endDate string) {
+	defer func() {
+		<-SingleThread
+	}()
+	defer util.Catch()
+	publishtimes := []map[string]interface{}{}
+	start, _ := time.ParseInLocation(util.Date_Short_Layout, "2015-11-01", time.Local)
+	end, _ := time.ParseInLocation(util.Date_Short_Layout, endDate, time.Local)
+	for {
+		publishtime := map[string]interface{}{
+			"date":  start.Format(util.Date_Short_Layout),
+			"stime": start.Unix(),
+			"etime": start.Add(24 * time.Hour).Unix(),
+		}
+		publishtimes = append(publishtimes, publishtime)
+		start = start.Add(24 * time.Hour)
+		if start.Unix() > end.Unix() {
+			break
+		}
+	}
+	sess := MQFW.GetMgoConn()
+	defer MQFW.DestoryMongoConn(sess)
+	wg := &sync.WaitGroup{}
+	idmap := &sync.Map{}
+	count, index := 0, 0
+	for _, v := range publishtimes {
+		q := map[string]interface{}{
+			"publishtime": map[string]interface{}{
+				"$gt":  util.Int64All(v["stime"]),
+				"$lte": util.Int64All(v["etime"]),
+			},
+		}
+		log.Println(q)
+		//数据正序处理
+		it := sess.DB(MQFW.DbName).C(extractColl).Find(&q).Sort("publishtime").Iter()
+		datenum := 0
+		for tmp := make(map[string]interface{}); it.Next(tmp); {
+			if index%10000 == 0 {
+				log.Println(index, tmp["_id"])
+			}
+			index++
+			datenum++
+			if util.IntAll(tmp["repeat"]) == 1 {
+				tmp = make(map[string]interface{})
+				continue
+			}
+			pt := util.Int64All(tmp["publishtime"])
+			if pt > currentMegerTime {
+				currentMegerTime = pt
+			}
+			count++
+			currentMegerCount++
+			if currentMegerCount > 300000 {
+				log.Println("执行清理", currentMegerTime)
+				time.Sleep(1 * time.Second)
+				clearPKey()
+				currentMegerCount = 0
+			}
+			thisid := util.BsonIdToSId(tmp["_id"])
+			b, err := redis.Exists(INFOID, thisid)
+			if err != nil {
+				log.Println("checkid err", err.Error())
+			}
+			if !b {
+				wg.Add(1)
+				idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成
+				MultiThread <- true
+				go func(tmp map[string]interface{}, thisid string) {
+					defer func() {
+						<-MultiThread
+						wg.Done()
+						idmap.Delete(tmp["_id"])
+					}()
+					info := PreThisInfo(tmp)
+					if info != nil {
+						lockPNCBMap(info)
+						startProjectMerge(info, tmp)
+						redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
+						currentMegerTime = info.Publishtime
+						unlockPNCBMap(info)
+					}
+				}(tmp, thisid)
+			}
+			if count%1000 == 0 {
+				log.Println("count:", count)
+			}
+			tmp = make(map[string]interface{})
+		}
+		log.Println(v["date"], datenum)
+	}
+	for {
+		time.Sleep(5 * time.Second)
+		n := 0
+		idmap.Range(func(key interface{}, v interface{}) bool {
+			n++
+			log.Println(key, v)
+			return true
+		})
+		if n < 1 {
+			break
+		}
+	}
+	wg.Wait()
+	log.Println("taskStock over...", index, count)
+}
+
 func NewPushInfo(tmp map[string]interface{}) bson.M {
 	return bson.M{
 		"comeintime":  tmp["comeintime"],