|
@@ -10,11 +10,13 @@ import (
|
|
|
|
|
|
//定时增量数据处理
|
|
|
func AddTaskSensitiveWordsData() {
|
|
|
- log.Println("部署增量......")
|
|
|
+ log.Println("部署定时新增...")
|
|
|
+ runStartOnce()
|
|
|
for {
|
|
|
tick := time.Tick(time.Hour * 24 * 7) //查询七天前
|
|
|
ctime := <-tick
|
|
|
- cronData := time.Date(ctime.Year(), ctime.Month(), ctime.Day()-7, ctime.Hour(), ctime.Minute(), ctime.Second(), 0, time.Local)
|
|
|
+ log.Println("相隔一周~~~执行一次")
|
|
|
+ cronData := time.Date(ctime.Year(), ctime.Month(), ctime.Day()-7, 0, 0, 0, 0, time.Local)
|
|
|
q := map[string]interface{}{
|
|
|
"updatetime": map[string]interface{}{
|
|
|
"$gte": cronData.Unix(),
|
|
@@ -27,8 +29,11 @@ func AddTaskSensitiveWordsData() {
|
|
|
"updatetime": 1,
|
|
|
"company_type": 1,
|
|
|
}).Iter()
|
|
|
- total:=0
|
|
|
+ total,isok:=0,0
|
|
|
for tmp := map[string]interface{}{}; iter.Next(&tmp); total++{
|
|
|
+ if total%10000==0 {
|
|
|
+ log.Println("cur index ",total,isok)
|
|
|
+ }
|
|
|
if company_name, ok := tmp["company_name"].(string); ok {
|
|
|
if reglen.MatchString(company_name) || !unstart_strReg.MatchString(company_name) ||
|
|
|
con_strReg.MatchString(company_name) {
|
|
@@ -43,14 +48,62 @@ func AddTaskSensitiveWordsData() {
|
|
|
"qy_name": company_name,
|
|
|
})
|
|
|
if new_id!=nil {
|
|
|
- dealWithEsData(company_name, BsonTOStringId(BsonTOStringId(new_id)))
|
|
|
+ isok++
|
|
|
+ dealWithEsData(company_name, BsonTOStringId(new_id))
|
|
|
}
|
|
|
}
|
|
|
tmp = map[string]interface{}{}
|
|
|
}
|
|
|
- log.Println("tick ok", cronData)
|
|
|
+ log.Println("is over ",total)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//执行一次
|
|
|
+func runStartOnce() {
|
|
|
+ log.Println("立即执行一次...")
|
|
|
+ nowtime := time.Now()
|
|
|
+ cronData := time.Date(nowtime.Year(), nowtime.Month(), nowtime.Day()-7, 0, 0, 0, 0, time.Local)
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "updatetime": map[string]interface{}{
|
|
|
+ "$gte": cronData.Unix(),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ sess := Qfw_Mgo.GetMgoConn()
|
|
|
+ defer Qfw_Mgo.DestoryMongoConn(sess)
|
|
|
+ iter := sess.DB(Qfw_Mgo.DbName).C(QfwCollName).Find(&q).Select(map[string]interface{}{
|
|
|
+ "company_name": 1,
|
|
|
+ "updatetime": 1,
|
|
|
+ "company_type": 1,
|
|
|
+ }).Iter()
|
|
|
+ total,isok:=0,0
|
|
|
+ for tmp := map[string]interface{}{}; iter.Next(&tmp); total++{
|
|
|
+ if total%10000==0 {
|
|
|
+ log.Println("cur index ",total,isok)
|
|
|
+ }
|
|
|
+ if company_name, ok := tmp["company_name"].(string); ok {
|
|
|
+ if reglen.MatchString(company_name) || !unstart_strReg.MatchString(company_name) ||
|
|
|
+ con_strReg.MatchString(company_name) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ company_type:= qu.ObjToString(tmp["company_type"])
|
|
|
+ if strings.Contains(company_type,"个人")||strings.Contains(company_type,"个体"){
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //存mgo
|
|
|
+ new_id :=Qfw_Mgo.Save("unique_qyxy", map[string]interface{}{
|
|
|
+ "qy_name": company_name,
|
|
|
+ })
|
|
|
+ if new_id!=nil {
|
|
|
+ isok++
|
|
|
+ dealWithEsData(company_name, BsonTOStringId(new_id))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
}
|
|
|
+ log.Println("is over ",total)
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
//处理是否新增es
|
|
|
func dealWithEsData(name string, tmpid string) {
|
|
|
query := `{"query":{"bool":{"must":[{"term":{"` + Es_index + `.name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":1,"sort":[],"facets":{}}`
|