|
@@ -51,7 +51,7 @@ var (
|
|
|
)
|
|
|
|
|
|
func init() {
|
|
|
-
|
|
|
+
|
|
|
flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
|
|
|
flag.StringVar(&sid, "sid", "", "开始id")
|
|
|
flag.StringVar(&eid, "eid", "", "结束id")
|
|
@@ -141,9 +141,9 @@ func mainT() {
|
|
|
}
|
|
|
mapinfo["gtid"] = sid
|
|
|
mapinfo["lteid"] = eid
|
|
|
- //mapinfo["stop"] = "true"
|
|
|
+ mapinfo["stop"] = "true"
|
|
|
task([]byte{}, mapinfo)
|
|
|
- time.Sleep(99999 * time.Second)
|
|
|
+ time.Sleep(99999 * time.Hour)
|
|
|
}
|
|
|
}
|
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
@@ -204,17 +204,16 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
defer mgo.DestoryMongoConn(sess)
|
|
|
|
|
|
//是否排序
|
|
|
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("_id").Iter()
|
|
|
+ sortName :="_id"
|
|
|
if Is_Sort {
|
|
|
- log.Println("排序:publishtime")
|
|
|
- it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
|
|
|
+ sortName = "publishtime"
|
|
|
}
|
|
|
+ it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort(sortName).Iter()
|
|
|
updateExtract := [][]map[string]interface{}{}
|
|
|
log.Println("线程数:", threadNum)
|
|
|
pool := make(chan bool, threadNum)
|
|
|
wg := &sync.WaitGroup{}
|
|
|
n, repeateN := 0, 0
|
|
|
-
|
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
|
|
|
if n%10000 == 0 {
|
|
|
log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
|
|
@@ -244,7 +243,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
},
|
|
|
},
|
|
|
})
|
|
|
- if len(updateExtract) > 500 {
|
|
|
+ if len(updateExtract) >= 200 {
|
|
|
mgo.UpSertBulk(extract, updateExtract...)
|
|
|
updateExtract = [][]map[string]interface{}{}
|
|
|
}
|
|
@@ -351,10 +350,11 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
repeat_id = info.id
|
|
|
}
|
|
|
}
|
|
|
+ if repeateN%90==0&&repeateN>0 {
|
|
|
+ fmt.Println("最终结果","目标id:",repeat_idMap["_id"])
|
|
|
+ }
|
|
|
|
|
|
|
|
|
- log.Println("最终结果","目标id:",repeat_idMap["_id"])
|
|
|
-
|
|
|
|
|
|
//重复数据打标签
|
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
@@ -370,7 +370,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
|
|
|
}
|
|
|
}(tmp)
|
|
|
- if len(updateExtract) > 500 {
|
|
|
+ if len(updateExtract) >= 200 {
|
|
|
mgo.UpSertBulk(extract, updateExtract...)
|
|
|
updateExtract = [][]map[string]interface{}{}
|
|
|
}
|