|
@@ -90,9 +90,15 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
case mu.OP_NOOP: //下个节点回应
|
|
case mu.OP_NOOP: //下个节点回应
|
|
log.Debug(string(data))
|
|
log.Debug(string(data))
|
|
case mu.OP_SEND_EMAIL:
|
|
case mu.OP_SEND_EMAIL:
|
|
- log.Debug(data,ra.IP)
|
|
|
|
- sendMail(string(data))
|
|
|
|
- cluster.ModifyInstanceAutoReleaseTime(qu.ObjToString("InstanceId"),1)
|
|
|
|
|
|
+ log.Debug(data, ra.IP)
|
|
|
|
+ rep := make(map[string]interface{})
|
|
|
|
+ err := json.Unmarshal(data, &rep)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Debug(err)
|
|
|
|
+ } else {
|
|
|
|
+ sendMail(string(data))
|
|
|
|
+ cluster.ModifyInstanceAutoReleaseTime(qu.ObjToString(rep["instanceId"]), qu.IntAll(ju.Config["deleteInstanceTimeHour"]))
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
func sendMail(content string) {
|
|
func sendMail(content string) {
|
|
@@ -103,6 +109,7 @@ func sendMail(content string) {
|
|
log.Debug("邮件发送:", string(read), err)
|
|
log.Debug("邮件发送:", string(read), err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
var ext *ExtractTask
|
|
var ext *ExtractTask
|
|
|
|
|
|
//根据id区间抽取
|
|
//根据id区间抽取
|
|
@@ -226,12 +233,22 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
|
|
"pagecurrent": i + 1,
|
|
"pagecurrent": i + 1,
|
|
}}, true, false)
|
|
}}, true, false)
|
|
}
|
|
}
|
|
- go Udpclient.WriteUdp([]byte(`分布式抽取完成,一小时后释放, sid:`+sid+`, eid:`+eid+`, count:`+fmt.Sprint(count)+`,index:`+fmt.Sprint(index)+`,bidtotal:`+fmt.Sprint(ext.BidTotal)+`,释放esc实例: `+instanceId[0]+`,`+instanceId[1]), mu.OP_SEND_EMAIL, ra)
|
|
|
|
|
|
+ des := make(map[string]interface{})
|
|
|
|
+ des["desc"]=`分布式抽取完成,一小时后释放, sid:`+sid+`, eid:`+eid+`, count:`+fmt.Sprint(count)+`,index:`+fmt.Sprint(index)+`,bidtotal:`+fmt.Sprint(ext.BidTotal)+`,释放esc实例: `+instanceId[0]+`,`+instanceId[1]
|
|
|
|
+ des["sid"] = sid
|
|
|
|
+ des["eid"] = eid
|
|
|
|
+ des["count"] = count
|
|
|
|
+ des["index"] = index
|
|
|
|
+ des["bidtotal"] = ext.BidTotal
|
|
|
|
+ des["instanceId"] = instanceId[0]
|
|
|
|
+ des["instanceIP"] = instanceId[1]
|
|
|
|
+ udpbytes, _ := json.Marshal(des)
|
|
|
|
+ go Udpclient.WriteUdp(udpbytes, mu.OP_SEND_EMAIL, ra)
|
|
log.Debug("抽取完成", "count:", count, "index:", index, "bidtotal:", ext.BidTotal)
|
|
log.Debug("抽取完成", "count:", count, "index:", index, "bidtotal:", ext.BidTotal)
|
|
} else { //普通抽取
|
|
} else { //普通抽取
|
|
query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
|
|
query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
|
|
count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
|
|
count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
|
|
- log.Debug("查询条件为:",query,"查询条数:",count)
|
|
|
|
|
|
+ log.Debug("查询条件为:", query, "查询条数:", count)
|
|
pageNum := (count + PageSize - 1) / PageSize
|
|
pageNum := (count + PageSize - 1) / PageSize
|
|
limit := PageSize
|
|
limit := PageSize
|
|
if count < PageSize {
|
|
if count < PageSize {
|