|
@@ -116,15 +116,22 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
|
|
|
limit = count
|
|
|
}
|
|
|
log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
|
|
|
+ sidback := sid
|
|
|
//接着上次任务执行
|
|
|
startI := 0
|
|
|
if len(instanceId) > 0 {
|
|
|
esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`)
|
|
|
startI = qu.IntAll((*esc)["pagecurrent"])
|
|
|
+ if qu.ObjToString((*esc)["lastId"]) != "" {
|
|
|
+ sid = qu.ObjToString((*esc)["lastId"])
|
|
|
+ }
|
|
|
+ if qu.ObjToString((*esc)["lastIdback"]) != "" {
|
|
|
+ sidback = qu.ObjToString((*esc)["lastIdback"])
|
|
|
+ }
|
|
|
}
|
|
|
- sidback := sid
|
|
|
+
|
|
|
for i := startI; i < pageNum; i++ {
|
|
|
- query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
|
|
|
+ query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
|
|
|
log.Printf("page=%d,query=%v", i+1, query)
|
|
|
if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query) > 0 {
|
|
|
list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
|
|
@@ -135,11 +142,15 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
|
|
|
go ext.ExtractProcess(j)
|
|
|
sid = qu.BsonIdToSId(v["_id"])
|
|
|
}
|
|
|
+ db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
|
|
|
+ map[string]interface{}{"$set": map[string]interface{}{
|
|
|
+ "lastId": sid,
|
|
|
+ }}, true, false)
|
|
|
}
|
|
|
queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
|
|
|
log.Printf("page=%d,queryback=%v", i+1, queryback)
|
|
|
if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
|
|
|
- list2, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, 0, limit)
|
|
|
+ list2, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
|
|
|
for _, v := range *list2 {
|
|
|
//log.Println(v["_id"])
|
|
|
j := PreInfo(v)
|
|
@@ -147,6 +158,10 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
|
|
|
go ext.ExtractProcess(j)
|
|
|
sidback = qu.BsonIdToSId(v["_id"])
|
|
|
}
|
|
|
+ db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
|
|
|
+ map[string]interface{}{"$set": map[string]interface{}{
|
|
|
+ "lastIdback": sidback,
|
|
|
+ }}, true, false)
|
|
|
}
|
|
|
//分布式抽取进度
|
|
|
if len(instanceId) > 0 {
|