|
@@ -47,19 +47,20 @@ type ProjectTask struct {
|
|
|
currentTime int64
|
|
|
//保存长度
|
|
|
saveSize int
|
|
|
+ pici int64
|
|
|
}
|
|
|
|
|
|
func NewPT() *ProjectTask {
|
|
|
return &ProjectTask{
|
|
|
InitMinTime: int64(1325347200),
|
|
|
name: "全/增量对象",
|
|
|
- thread: 3,
|
|
|
+ thread: 4,
|
|
|
updatePool: make(chan []map[string]interface{}, 2000),
|
|
|
wg: sync.WaitGroup{},
|
|
|
- AllIdsMap: make(map[string]*ID, 5000000),
|
|
|
- mapPb: make(map[string]*Key, 5000000),
|
|
|
- mapPn: make(map[string]*Key, 5000000),
|
|
|
- mapPc: make(map[string]*Key, 5000000),
|
|
|
+ AllIdsMap: make(map[string]*ID, 10000000),
|
|
|
+ mapPb: make(map[string]*Key, 3000000),
|
|
|
+ mapPn: make(map[string]*Key, 10000000),
|
|
|
+ mapPc: make(map[string]*Key, 10000000),
|
|
|
saveSize: 200,
|
|
|
coll: ProjectColl,
|
|
|
}
|
|
@@ -197,15 +198,34 @@ func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
|
|
|
// log.Println("publishtime_1索引不存在")
|
|
|
// }
|
|
|
// MongoTool.DestoryMongoConn(sess)
|
|
|
- thread := util.IntAllDef(udpInfo["thread"], 3)
|
|
|
+ thread := util.IntAllDef(udpInfo["thread"], 4)
|
|
|
if thread > 0 {
|
|
|
p.thread = thread
|
|
|
}
|
|
|
- bcon := true
|
|
|
- if bcon {
|
|
|
- //生成查询语句执行
|
|
|
- p.enter(db, coll, map[string]interface{}{})
|
|
|
+ q, _ := udpInfo["query"].(map[string]interface{})
|
|
|
+ if q == nil {
|
|
|
+ q = map[string]interface{}{}
|
|
|
+ lteid, _ := udpInfo["lteid"].(string)
|
|
|
+ var idmap map[string]interface{}
|
|
|
+ if len(lteid) > 15 {
|
|
|
+ idmap = map[string]interface{}{
|
|
|
+ "$lte": util.StringTOBsonId(lteid),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ gtid, _ := udpInfo["gtid"].(string)
|
|
|
+ if len(gtid) > 15 {
|
|
|
+ if idmap == nil {
|
|
|
+ idmap = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ idmap["$gt"] = util.StringTOBsonId(gtid)
|
|
|
+ }
|
|
|
+ if idmap != nil {
|
|
|
+ q["_id"] = idmap
|
|
|
+ }
|
|
|
}
|
|
|
+ //生成查询语句执行
|
|
|
+ p.enter(db, coll, q)
|
|
|
+
|
|
|
}
|
|
|
|
|
|
//增量合并
|
|
@@ -236,12 +256,11 @@ func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
|
|
|
},
|
|
|
}
|
|
|
}
|
|
|
- pici := time.Now().Unix()
|
|
|
if q != nil {
|
|
|
//生成查询语句执行
|
|
|
p.enter(db, coll, q)
|
|
|
}
|
|
|
- nextNode(gtid, lteid, "project", pici)
|
|
|
+ nextNode(gtid, lteid, "project", p.pici)
|
|
|
}
|
|
|
|
|
|
//通知下个节点nextNode
|