|
@@ -65,6 +65,20 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
|
|
|
log.Debug("udp通知抽取id段", sid, " ", eid)
|
|
|
ExtractByUdp(sid, eid, ra)
|
|
|
+ for _, m := range nextNodes {
|
|
|
+ by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ "gtid": sid,
|
|
|
+ "lteid": eid,
|
|
|
+ "stype": qu.ObjToString(m["stype"]),
|
|
|
+ })
|
|
|
+ err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(m["addr"].(string)),
|
|
|
+ Port: qu.IntAll(m["port"]),
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ log.Debug(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
log.Debug("udp通知抽取完成,eid=", eid)
|
|
|
}
|
|
|
}
|
|
@@ -189,8 +203,9 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
|
|
|
}, true, false)
|
|
|
}
|
|
|
log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
|
|
|
- } else { //普通抽取
|
|
|
- query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
|
|
|
+ } else {
|
|
|
+ //普通抽取
|
|
|
+ query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
|
|
|
count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
|
|
|
log.Debug("查询条件为:", query, "查询条数:", count)
|
|
|
pageNum := (count + PageSize - 1) / PageSize
|
|
@@ -199,15 +214,17 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
|
|
|
limit = count
|
|
|
}
|
|
|
wg := sync.WaitGroup{}
|
|
|
- var nn int8
|
|
|
- var tmpsid string
|
|
|
for i := 0; i < pageNum; i++ {
|
|
|
- query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
|
|
|
+ query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid)}}
|
|
|
fmt.Printf("page=%d,query=%v\n", i+1, query)
|
|
|
list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
|
|
|
for _, v := range *list {
|
|
|
- if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
|
|
|
- log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
|
|
|
+ //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
|
|
|
+ // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
|
|
|
+ // continue
|
|
|
+ //}
|
|
|
+ if qu.ObjToString(v["spidercode"]) == "a_gjggzyjypt_gcjs_kbjl" { //临时开标记录
|
|
|
+ log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
|
|
|
continue
|
|
|
}
|
|
|
_id := qu.BsonIdToSId(v["_id"])
|
|
@@ -227,10 +244,6 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
|
|
|
ext.ExtractProcess(j, jf,isSite)
|
|
|
}(&wg, j, jf)
|
|
|
index++
|
|
|
- nn++
|
|
|
- if nn ==1{
|
|
|
- tmpsid = _id
|
|
|
- }
|
|
|
if index%1000 == 0 {
|
|
|
log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)
|
|
|
}
|
|
@@ -243,20 +256,5 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
|
|
|
wg.Wait()
|
|
|
ext.BidSave(false)
|
|
|
log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid)
|
|
|
- for _, m := range nextNodes {
|
|
|
- by, _ := json.Marshal(map[string]interface{}{
|
|
|
- "gtid": tmpsid,
|
|
|
- "lteid": eid,
|
|
|
- "stype": qu.ObjToString(m["stype"]),
|
|
|
- })
|
|
|
- err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
- IP: net.ParseIP(m["addr"].(string)),
|
|
|
- Port: qu.IntAll(m["port"]),
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- log.Debug(err)
|
|
|
- }
|
|
|
- log.Debug(m,string(by))
|
|
|
- }
|
|
|
}
|
|
|
}
|