|
@@ -196,11 +196,11 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
sess := MQFW.GetMgoConn()
|
|
|
defer MQFW.DestoryMongoConn(sess)
|
|
|
//数据正序处理
|
|
|
- it := sess.DB(MQFW.DbName).C(extractColl).Find(&q).Sort("publishtime").Iter()
|
|
|
+ it := sess.DB(MQFW.DbName).C(extractColl).Find(map[string]interface{}{}).Sort("publishtime").Iter()
|
|
|
count, index := 0, 0
|
|
|
pici := time.Now().Unix()
|
|
|
wg := &sync.WaitGroup{}
|
|
|
- idmap := &sync.Map{}
|
|
|
+ //idmap := &sync.Map{}
|
|
|
for tmp := make(map[string]interface{}); it.Next(tmp); {
|
|
|
if index%10000 == 0 {
|
|
|
log.Println(index, tmp["_id"])
|
|
@@ -228,13 +228,13 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
if !b {
|
|
|
wg.Add(1)
|
|
|
- idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成
|
|
|
+ //idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成
|
|
|
MultiThread <- true
|
|
|
go func(tmp map[string]interface{}, thisid string) {
|
|
|
defer func() {
|
|
|
<-MultiThread
|
|
|
wg.Done()
|
|
|
- idmap.Delete(tmp["_id"])
|
|
|
+ //idmap.Delete(tmp["_id"])
|
|
|
}()
|
|
|
info := PreThisInfo(tmp)
|
|
|
if info != nil {
|
|
@@ -251,18 +251,18 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
- for {
|
|
|
- time.Sleep(5 * time.Second)
|
|
|
- n := 0
|
|
|
- idmap.Range(func(key interface{}, v interface{}) bool {
|
|
|
- n++
|
|
|
- log.Println(key, v)
|
|
|
- return true
|
|
|
- })
|
|
|
- if n < 1 {
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
+ // for {
|
|
|
+ // time.Sleep(5 * time.Second)
|
|
|
+ // n := 0
|
|
|
+ // idmap.Range(func(key interface{}, v interface{}) bool {
|
|
|
+ // n++
|
|
|
+ // log.Println(key, v)
|
|
|
+ // return true
|
|
|
+ // })
|
|
|
+ // if n < 1 {
|
|
|
+ // break
|
|
|
+ // }
|
|
|
+ // }
|
|
|
wg.Wait()
|
|
|
log.Println("task over...", index, count)
|
|
|
//发送udp,调用生成项目索引
|