|
@@ -21,6 +21,7 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
+
|
|
|
var (
|
|
|
Sysconfig map[string]interface{} //配置文件
|
|
|
mconf map[string]interface{} //mongodb配置信息
|
|
@@ -33,7 +34,7 @@ var (
|
|
|
nextNode []map[string]interface{} //下节点数组
|
|
|
dupdays = 7 //初始化判重范围
|
|
|
DM *datamap //
|
|
|
-
|
|
|
+ Update *updateInfo
|
|
|
//正则筛选相关
|
|
|
FilterRegTitle = regexp.MustCompile("^_$")
|
|
|
FilterRegTitle_0 = regexp.MustCompile("^_$")
|
|
@@ -89,6 +90,10 @@ func init() {
|
|
|
dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
|
|
|
//加载数据
|
|
|
DM = NewDatamap(dupdays, lastid)
|
|
|
+ //更新池
|
|
|
+ Update = newUpdatePool()
|
|
|
+ go Update.updateData()
|
|
|
+
|
|
|
FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
|
|
|
FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
|
|
|
FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
|
|
@@ -215,7 +220,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
sess := mgo.GetMgoConn()
|
|
|
defer mgo.DestoryMongoConn(sess)
|
|
|
it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
|
|
|
- updateExtract := [][]map[string]interface{}{}
|
|
|
pool := make(chan bool, threadNum)
|
|
|
wg := &sync.WaitGroup{}
|
|
|
n, repeateN := 0, 0
|
|
@@ -226,8 +230,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
|
|
|
if util.IntAll((*source)["sourcewebsite"]) == 1 {
|
|
|
repeateN++
|
|
|
- updatelock.Lock()
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ Update.updatePool <- []map[string]interface{}{
|
|
|
map[string]interface{}{
|
|
|
"_id": tmp["_id"],
|
|
|
},
|
|
@@ -238,12 +241,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
"repeat_reason": "sourcewebsite为1,重复",
|
|
|
},
|
|
|
},
|
|
|
- })
|
|
|
- if len(updateExtract) >= 500 {
|
|
|
- mgo.UpSertBulk(extract, updateExtract...)
|
|
|
- updateExtract = [][]map[string]interface{}{}
|
|
|
}
|
|
|
- updatelock.Unlock()
|
|
|
tmp = make(map[string]interface{})
|
|
|
continue
|
|
|
}
|
|
@@ -274,7 +272,8 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
source.repeat_ids = repeat_ids
|
|
|
//替换数据池-更新
|
|
|
DM.replacePoolData(source)
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{//原始数据打标签
|
|
|
+
|
|
|
+ Update.updatePool <- []map[string]interface{}{//原始数据打标签
|
|
|
map[string]interface{}{
|
|
|
"_id": StringTOBsonId(source.id),
|
|
|
},
|
|
@@ -283,8 +282,8 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
"repeat_ids": repeat_ids,
|
|
|
},
|
|
|
},
|
|
|
- })
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
|
|
|
+ }
|
|
|
+ Update.updatePool <- []map[string]interface{}{//重复数据打标签
|
|
|
updateID,
|
|
|
map[string]interface{}{
|
|
|
"$set": map[string]interface{}{
|
|
@@ -293,21 +292,13 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
"repeat_id": source.id,
|
|
|
},
|
|
|
},
|
|
|
- })
|
|
|
+ }
|
|
|
}
|
|
|
}(tmp)
|
|
|
- updatelock.Lock()
|
|
|
- if len(updateExtract) >=500 {
|
|
|
- mgo.UpSertBulk(extract, updateExtract...)
|
|
|
- updateExtract = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- updatelock.Unlock()
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
wg.Wait()
|
|
|
- if len(updateExtract) > 0 {
|
|
|
- mgo.UpSertBulk(extract, updateExtract...)
|
|
|
- }
|
|
|
+
|
|
|
log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
|
|
|
|
|
|
time.Sleep(30 * time.Second)
|
|
@@ -368,8 +359,7 @@ func historyTaskDay() {
|
|
|
log.Println("查询的最后一个任务Id:",lteid)
|
|
|
break
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
|
|
|
time.Sleep(5 * time.Minute)
|
|
|
|