|
@@ -36,7 +36,9 @@ type ProjectTask struct {
|
|
|
//采购单位、项目名称、项目编号
|
|
|
mapPb, mapPn, mapPc map[string]*Key
|
|
|
//更新或新增通道
|
|
|
- updatePool chan []map[string]interface{}
|
|
|
+ updatePool chan []map[string]interface{}
|
|
|
+ savePool chan map[string]interface{}
|
|
|
+ saveSign, updateSign chan bool
|
|
|
//表名
|
|
|
coll string
|
|
|
//当前状态是全量还是增量
|
|
@@ -55,13 +57,16 @@ func NewPT() *ProjectTask {
|
|
|
InitMinTime: int64(1325347200),
|
|
|
name: "全/增量对象",
|
|
|
thread: 4,
|
|
|
- updatePool: make(chan []map[string]interface{}, 2000),
|
|
|
+ updatePool: make(chan []map[string]interface{}, 1000),
|
|
|
+ savePool: make(chan map[string]interface{}, 2000),
|
|
|
wg: sync.WaitGroup{},
|
|
|
- AllIdsMap: make(map[string]*ID, 10000000),
|
|
|
- mapPb: make(map[string]*Key, 3000000),
|
|
|
- mapPn: make(map[string]*Key, 10000000),
|
|
|
- mapPc: make(map[string]*Key, 10000000),
|
|
|
+ AllIdsMap: make(map[string]*ID, 100000),
|
|
|
+ mapPb: make(map[string]*Key, 1000000),
|
|
|
+ mapPn: make(map[string]*Key, 1000000),
|
|
|
+ mapPc: make(map[string]*Key, 1500000),
|
|
|
saveSize: 200,
|
|
|
+ saveSign: make(chan bool, 1),
|
|
|
+ updateSign: make(chan bool, 1),
|
|
|
coll: ProjectColl,
|
|
|
}
|
|
|
}
|
|
@@ -71,30 +76,65 @@ var P_QL *ProjectTask
|
|
|
//初始化全量合并对象
|
|
|
func init() {
|
|
|
P_QL = NewPT()
|
|
|
+ go P_QL.saveQueue()
|
|
|
go P_QL.updateQueue()
|
|
|
go P_QL.clearMem()
|
|
|
+}
|
|
|
|
|
|
+func (p *ProjectTask) saveQueue() {
|
|
|
+ arr := make([]map[string]interface{}, p.saveSize)
|
|
|
+ indexs := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-p.saveSign:
|
|
|
+ if indexs > 0 {
|
|
|
+ MongoTool.SaveBulk(p.coll, arr[:indexs]...)
|
|
|
+ arr = make([]map[string]interface{}, p.saveSize)
|
|
|
+ indexs = 0
|
|
|
+ }
|
|
|
+ p.updateSign <- true
|
|
|
+ case v := <-p.savePool:
|
|
|
+ arr[indexs] = v
|
|
|
+ indexs++
|
|
|
+ if indexs == p.saveSize {
|
|
|
+ MongoTool.SaveBulk(p.coll, arr...)
|
|
|
+ arr = make([]map[string]interface{}, p.saveSize)
|
|
|
+ indexs = 0
|
|
|
+ }
|
|
|
+ case <-time.After(100 * time.Millisecond):
|
|
|
+ if indexs > 0 {
|
|
|
+ MongoTool.SaveBulk(p.coll, arr[:indexs]...)
|
|
|
+ arr = make([]map[string]interface{}, p.saveSize)
|
|
|
+ indexs = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//项目保存和更新通道
|
|
|
func (p *ProjectTask) updateQueue() {
|
|
|
- arr := make([][]map[string]interface{}, p.saveSize)
|
|
|
- index := 0
|
|
|
+ arru := make([][]map[string]interface{}, p.saveSize)
|
|
|
+ indexu := 0
|
|
|
for {
|
|
|
select {
|
|
|
case v := <-p.updatePool:
|
|
|
- arr[index] = v
|
|
|
- index++
|
|
|
- if index == p.saveSize {
|
|
|
- MongoTool.UpSertBulk(p.coll, arr...)
|
|
|
- arr = make([][]map[string]interface{}, p.saveSize)
|
|
|
- index = 0
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == p.saveSize {
|
|
|
+ //更新之前先保存
|
|
|
+ p.saveSign <- true
|
|
|
+ <-p.updateSign
|
|
|
+ MongoTool.UpdateBulk(p.coll, arru...)
|
|
|
+ arru = make([][]map[string]interface{}, p.saveSize)
|
|
|
+ indexu = 0
|
|
|
}
|
|
|
- case <-time.After(2 * time.Second):
|
|
|
- if index > 0 {
|
|
|
- MongoTool.UpSertBulk(p.coll, arr[:index]...)
|
|
|
- arr = make([][]map[string]interface{}, p.saveSize)
|
|
|
- index = 0
|
|
|
+ case <-time.After(100 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ p.saveSign <- true
|
|
|
+ <-p.updateSign
|
|
|
+ MongoTool.UpdateBulk(p.coll, arru[:indexu]...)
|
|
|
+ arru = make([][]map[string]interface{}, p.saveSize)
|
|
|
+ indexu = 0
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -106,8 +146,8 @@ func (p *ProjectTask) clearMem() {
|
|
|
//在内存中保留最近6个月的信息
|
|
|
validTime := int64(6 * 30 * 86400)
|
|
|
//跑全量时每4分钟跑一次,跑增量时400分钟跑一次
|
|
|
- c.AddFunc("50 0/4 * * * *", func() {
|
|
|
- if p.currentType == "ql" || p.clearContimes >= 100 {
|
|
|
+ c.AddFunc("50 0/10 * * * *", func() {
|
|
|
+ if p.currentType == "ql" || p.clearContimes >= 60 {
|
|
|
//跳过的次数清零
|
|
|
p.clearContimes = 0
|
|
|
//信息进入查找对比全局锁
|
|
@@ -224,6 +264,7 @@ func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
|
|
|
}
|
|
|
}
|
|
|
//生成查询语句执行
|
|
|
+ log.Println("查询语句:", q)
|
|
|
p.enter(db, coll, q)
|
|
|
|
|
|
}
|
|
@@ -286,35 +327,74 @@ func nextNode(gtid, lteid, stype string, pici int64) {
|
|
|
|
|
|
func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
|
|
|
defer util.Catch()
|
|
|
+ count, taskcount := 0, 0
|
|
|
+ //var lastid interface{}
|
|
|
+ // for {
|
|
|
+ // if lastid != nil {
|
|
|
+ // if q == nil {
|
|
|
+ // q = map[string]interface{}{}
|
|
|
+ // }
|
|
|
+ // if q["_id"] == nil {
|
|
|
+ // q["_id"] = map[string]interface{}{}
|
|
|
+ // }
|
|
|
+ // q["_id"].(map[string]interface{})["$gt"] = lastid
|
|
|
+ // }
|
|
|
+ pool := make(chan bool, p.thread)
|
|
|
+ log.Println("start project", q)
|
|
|
sess := MongoTool.GetMgoConn()
|
|
|
defer MongoTool.DestoryMongoConn(sess)
|
|
|
- query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
|
|
|
- pool := make(chan bool, p.thread)
|
|
|
- count := 0
|
|
|
+ query := sess.DB(db).C(coll).Find(q).Sort("_id").Select(map[string]interface{}{
|
|
|
+ "blocks": 0,
|
|
|
+ "fieldall": 0,
|
|
|
+ }).Iter()
|
|
|
+ //over := 0
|
|
|
for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
|
|
|
- info := ParseInfo(tmp)
|
|
|
- if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
|
|
|
- pool <- true
|
|
|
- go func(info *Info, tmp map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- p.currentTime = info.Publishtime
|
|
|
- <-pool
|
|
|
- }()
|
|
|
- p.startProjectMerge(info, tmp)
|
|
|
- }(info, tmp)
|
|
|
- } else {
|
|
|
- //信息错误,进行更新
|
|
|
+ if util.IntAll(tmp["repeat"]) == 0 {
|
|
|
+ info := ParseInfo(tmp)
|
|
|
+ if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
|
|
|
+ pool <- true
|
|
|
+ taskcount++
|
|
|
+ go func(info *Info, tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ p.currentTime = info.Publishtime
|
|
|
+ <-pool
|
|
|
+ }()
|
|
|
+ p.startProjectMerge(info, tmp)
|
|
|
+ }(info, tmp)
|
|
|
+ } else {
|
|
|
+ //信息错误,进行更新
|
|
|
+ }
|
|
|
}
|
|
|
if count%1000 == 0 {
|
|
|
log.Println("current", count)
|
|
|
}
|
|
|
+ // if taskcount > 0 && taskcount%50000 == 0 { //歇歇
|
|
|
+ // log.Println("pause start..", taskcount)
|
|
|
+ // for n := 0; n < p.thread; n++ {
|
|
|
+ // pool <- true
|
|
|
+ // }
|
|
|
+ // for n := 0; n < p.thread; n++ {
|
|
|
+ // <-pool
|
|
|
+ // }
|
|
|
+ // log.Println("pause over..")
|
|
|
+ // }
|
|
|
+ //lastid = tmp["_id"]
|
|
|
tmp = make(map[string]interface{})
|
|
|
+ // if count > 40000 {
|
|
|
+ // query.Close()
|
|
|
+ // break
|
|
|
+ // }
|
|
|
+ //over++
|
|
|
}
|
|
|
//阻塞
|
|
|
for n := 0; n < p.thread; n++ {
|
|
|
pool <- true
|
|
|
}
|
|
|
- log.Println("所有线程执行完成...", count)
|
|
|
+ // if over == 0 {
|
|
|
+ // break
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ log.Println("所有线程执行完成...", count, taskcount)
|
|
|
|
|
|
}
|
|
|
|