Przeglądaj źródła

时间publishtime修改

apple 5 lat temu
rodzic
commit
9f862dd1aa

+ 1 - 1
udpfilterdup/src/config.json

@@ -5,7 +5,7 @@
         "addr": "192.168.3.207:27092",
         "pool": 5,
         "db": "extract_kf",
-        "extract": "zk",
+        "extract": "zheng_test1_jd1",
         "site": {
             "dbname": "zhaolongyue",
             "coll": "site"

+ 0 - 1
udpfilterdup/src/datamap.go

@@ -24,7 +24,6 @@ type Info struct {
 	projectname string  //项目名称
 	projectcode string  //项目编号
 	publishtime int64   //发布时间
-	comeintime  int64   //采集时间
 	bidopentime int64   //开标时间
 	agencyaddr  string  //开标地点
 	site        string  //站点

+ 66 - 31
udpfilterdup/src/main.go

@@ -90,7 +90,7 @@ func init() {
 		}
 		SiteMap[util.ObjToString(site_dict["site"])] = data_map
 	}
-	fmt.Printf("用时:%d秒,%d个", int(time.Now().Unix())-start, len(SiteMap))
+	fmt.Printf("站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
 }
 
 func main() {
@@ -105,8 +105,13 @@ func main() {
 
 //测试组人员使用
 func mainT() {
-	//sid = "568551000000000000000000"
-	//eid = "5e0f65000000000000000000"
+	//ObjectId("5df8c03ee9d1f601e4ea5ffc")
+	//ObjectId("5e0d4cb40cf41612e063fc2d")
+
+	//ObjectId("5dfbd43ce9d1f601e43fa402")
+	//ObjectId("5e0954b30cf41612e061d0c8")
+	//sid = "5dfbd43ce9d1f601e43fa402"
+	//eid = "5e0954b30cf41612e061d0c8"
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {
 		log.Println("sid,eid参数不能为空")
@@ -115,7 +120,7 @@ func mainT() {
 	mapinfo["gtid"] = sid
 	mapinfo["lteid"] = eid
 	mapinfo["stop"] = "true"
-	task([]byte{}, mapinfo)
+	historyTask([]byte{}, mapinfo)
 	time.Sleep(5 * time.Second)
 }
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
@@ -179,7 +184,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			},
 		}
 	}
-	log.Println(extract,mgo.DbName,q)
+	log.Println(mgo.DbName,extract,q)
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 	updateExtract := [][]map[string]interface{}{}
 	log.Println("线程数:",threadNum)
@@ -192,7 +197,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
 		}
-
 		pool <- true
 		wg.Add(1)
 		go func(tmp map[string]interface{}) {
@@ -329,22 +333,33 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	defer util.Catch()
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
-	q := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
-			"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
-		},
+
+	var q map[string]interface{}
+	if idtype == "1" {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mapInfo["gtid"].(string),
+				"$lte": mapInfo["lteid"].(string),
+			},
+		}
+	} else {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
 	}
 
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
 	minTime, maxTime := int64(0), int64(0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); {
 		//取出最大最小时间
-		if minTime == 0 || maxTime == 0 {
-			minTime = util.Int64All(tmp["comeintime"])
-			maxTime = util.Int64All(tmp["comeintime"])
+		if minTime == 0 || maxTime == 0 &&util.Int64All(tmp["publishtime"])!=0{
+			minTime = util.Int64All(tmp["publishtime"])
+			maxTime = util.Int64All(tmp["publishtime"])
 		} else {
-			t := util.Int64All(tmp["comeintime"])
+			t := util.Int64All(tmp["publishtime"])
 			if t < minTime && t != 0 {
 				minTime = t
 			}
@@ -353,29 +368,49 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 			}
 		}
 	}
+	//时间不正确时
+	if minTime==0&&maxTime==0 {
+		log.Println("段数据区间 publishtime不符合")
+		return
+	}
 	fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
-	//最小时间== 1568087634 最大时间== 1568103381
-	HM = NewHistorymap(util.ObjToString(mapInfo["gtid"]),
-		util.ObjToString(mapInfo["lteid"]), minTime, maxTime)
+	gtid,lteid:= util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
+	fmt.Println(gtid,lteid)
+	HM = NewHistorymap(gtid,lteid, minTime, maxTime)
+	fmt.Println("开始历史数据判重")
+
+	return
+
 
-	//return
-	//开始判重...
 	defer util.Catch()
-	sess_task := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess_task)
-	q_task := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
-			"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
-		},
-	}
-	it_task := sess.DB(mgo.DbName).C(extract).Find(&q_task).Iter()
+	//区间id
+	sess_history:= mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess_history)
+	var q_history map[string]interface{}
+	if idtype == "1" {
+		q_history = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mapInfo["gtid"].(string),
+				"$lte": mapInfo["lteid"].(string),
+			},
+		}
+	} else {
+		q_history = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	}
+	log.Println(mgo.DbName,extract,q_history)
+	it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
 	updateExtract := [][]map[string]interface{}{}
+	log.Println("线程数:",threadNum)
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
 	//mapLock := &sync.Mutex{}
 	n, repeateN := 0, 0
-	for tmp := make(map[string]interface{}); it_task.Next(&tmp); n++ {
+	for tmp := make(map[string]interface{}); it_history.Next(&tmp); n++ {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
 		}
@@ -820,7 +855,7 @@ func basicDataScore(v *Info, info *Info) bool {
 	if m > n {
 		return true
 	} else if m == n {
-		if v.comeintime >= info.comeintime {
+		if v.publishtime >= info.publishtime {
 			return true
 		} else {
 			return false