|
@@ -118,7 +118,10 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
|
|
|
ext.ResultSave(true)
|
|
|
ext.BidSave(true)
|
|
|
ext.IsRun = true
|
|
|
+ } else {
|
|
|
+ ext.BidTotal = 0
|
|
|
}
|
|
|
+ index := 0
|
|
|
if len(instanceId) > 0 { //分布式抽取进度
|
|
|
query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
|
|
|
count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
|
|
@@ -163,6 +166,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
|
|
|
ext.TaskInfo.ProcessPool <- true
|
|
|
go ext.ExtractProcess(j, jf)
|
|
|
sid = _id
|
|
|
+ index++
|
|
|
}
|
|
|
db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
|
|
|
map[string]interface{}{"$set": map[string]interface{}{
|
|
@@ -189,6 +193,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
|
|
|
ext.TaskInfo.ProcessPool <- true
|
|
|
go ext.ExtractProcess(j, jf)
|
|
|
sidback = _id
|
|
|
+ index++
|
|
|
}
|
|
|
db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
|
|
|
map[string]interface{}{"$set": map[string]interface{}{
|
|
@@ -201,6 +206,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
|
|
|
"pagecurrent": i + 1,
|
|
|
}}, true, false)
|
|
|
}
|
|
|
+ log.Debug("抽取完成", "count:", count, "index:", index, "bidtotal:", ext.BidTotal)
|
|
|
} else { //普通抽取
|
|
|
query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
|
|
|
count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
|
|
@@ -214,7 +220,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
|
|
|
query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
|
|
|
fmt.Printf("page=%d,query=%v", i+1, query)
|
|
|
list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
|
|
|
- for k, v := range *list {
|
|
|
+ for _, v := range *list {
|
|
|
if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
|
|
|
continue
|
|
|
}
|
|
@@ -232,13 +238,15 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
|
|
|
defer wg.Done()
|
|
|
ext.ExtractProcess(j, jf)
|
|
|
}()
|
|
|
- if k%1000 == 0 {
|
|
|
- log.Debug(i, k, _id)
|
|
|
+ index++
|
|
|
+ if index%1000 == 0 {
|
|
|
+ log.Debug("index:", index, "页码:", i+1, "_id:", _id)
|
|
|
}
|
|
|
sid = _id
|
|
|
}
|
|
|
}
|
|
|
wg.Wait()
|
|
|
ext.BidSave(false)
|
|
|
+ log.Debug("抽取完成", "count:", count, "index:", index, "bidtotal:", ext.BidTotal, "sid:", eid)
|
|
|
}
|
|
|
}
|