|
@@ -26,11 +26,13 @@ var (
|
|
|
Sysconfig map[string]interface{} //配置文件
|
|
|
mconf map[string]interface{} //mongodb配置信息
|
|
|
mgo *MongodbSim //mongodb操作对象
|
|
|
+ task_mgo *MongodbSim //mongodb操作对象
|
|
|
+ task_collName string
|
|
|
extract string
|
|
|
extract_back string
|
|
|
udpclient mu.UdpClient //udp对象
|
|
|
nextNode []map[string]interface{} //下节点数组
|
|
|
- dupdays = 5 //初始化判重范围
|
|
|
+ dupdays = 7 //初始化判重范围
|
|
|
DM *datamap //
|
|
|
|
|
|
//正则筛选相关
|
|
@@ -61,6 +63,17 @@ func init() {
|
|
|
flag.Parse()
|
|
|
//172.17.145.163:27080
|
|
|
util.ReadConfig(&Sysconfig)
|
|
|
+
|
|
|
+ task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
|
|
|
+ task_mgo = &MongodbSim{
|
|
|
+ MongodbAddr: task_mconf["task_addrName"].(string),
|
|
|
+ DbName: task_mconf["task_dbName"].(string),
|
|
|
+ Size: util.IntAllDef(task_mconf["task_pool"], 10),
|
|
|
+ }
|
|
|
+ task_mgo.InitPool()
|
|
|
+ task_collName = task_mconf["task_collName"].(string)
|
|
|
+
|
|
|
+
|
|
|
nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
|
|
|
mconf = Sysconfig["mongodb"].(map[string]interface{})
|
|
|
mgo = &MongodbSim{
|
|
@@ -130,8 +143,8 @@ func mainT() {
|
|
|
time.Sleep(99999 * time.Hour)
|
|
|
} else {
|
|
|
//IdType = true //打开id字符串模式
|
|
|
- sid := "4f16936d52c1d9fbf843c60e"
|
|
|
- eid := "6f16936d52c1d9fbf843c60e"
|
|
|
+ sid := "1f16936d52c1d9fbf843c60e"
|
|
|
+ eid := "9f16936d52c1d9fbf843c60e"
|
|
|
log.Println("正常判重测试开始")
|
|
|
log.Println(sid, "---", eid)
|
|
|
mapinfo := map[string]interface{}{}
|
|
@@ -355,7 +368,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
|
|
|
|
|
|
- time.Sleep(60 * time.Second)
|
|
|
+ time.Sleep(30 * time.Second)
|
|
|
|
|
|
//任务完成,开始发送广播通知下面节点
|
|
|
if n >= repeateN && mapInfo["stop"] == nil {
|
|
@@ -401,18 +414,25 @@ func historyTaskDay() {
|
|
|
}
|
|
|
|
|
|
//查询表最后一个id
|
|
|
- sess := mgo.GetMgoConn()
|
|
|
- defer mgo.DestoryMongoConn(sess)
|
|
|
- q:=map[string]interface{}{}
|
|
|
+ task_sess := task_mgo.GetMgoConn()
|
|
|
+ defer task_mgo.DestoryMongoConn(task_sess)
|
|
|
+ q:=map[string]interface{}{
|
|
|
+ "isused":true,
|
|
|
+ }
|
|
|
between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
|
|
|
- it_last := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("-_id").Iter()
|
|
|
+ it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
|
|
|
for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
|
|
|
- lteid = BsonTOStringId(tmp["_id"])
|
|
|
+ lteid = util.ObjToString(tmp["gtid"])
|
|
|
+ log.Println("查询的最后一个任务Id:",lteid)
|
|
|
break
|
|
|
}
|
|
|
-
|
|
|
+ //
|
|
|
+ log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
|
|
|
time.Sleep(5 * time.Minute)
|
|
|
|
|
|
+
|
|
|
+ sess := mgo.GetMgoConn()//连接器
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
//开始判重
|
|
|
q = map[string]interface{}{
|
|
|
"_id": map[string]interface{}{
|