|
@@ -14,24 +14,23 @@ import (
|
|
|
|
|
|
func TimeTask() {
|
|
|
c := cron.New()
|
|
|
- //cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?"
|
|
|
- cronstr := "0 0 */" + fmt.Sprint(TaskTime) + " * * ?" //每TaskTime小时执行一次
|
|
|
- _ = c.AddFunc(cronstr, func() { StartTask() })
|
|
|
+ cronstrBd := "0 0 */" + fmt.Sprint(BdTaskTime) + " * * ?" //每TaskTime小时执行一次
|
|
|
+ //cronstr := "0 0 " + fmt.Sprint(TaskTime) + " * * ?" //每天TaskTime跑一次
|
|
|
+ cronstrPa := "0 0 15 ? * " + fmt.Sprint(PaTaskTime) //凭安增量数据每周三跑一次
|
|
|
+ util.Debug(cronstrPa)
|
|
|
+ _ = c.AddFunc(cronstrBd, func() { GetBdData() })
|
|
|
+ _ = c.AddFunc(cronstrPa, func() { GetPaData() })
|
|
|
c.Start()
|
|
|
}
|
|
|
-func StartTask() {
|
|
|
- GetBdData() //百度数据
|
|
|
- GetPaData() //凭安数据
|
|
|
-}
|
|
|
|
|
|
func GetBdData() {
|
|
|
count := 0
|
|
|
lastid := ""
|
|
|
- sess := MongoTool.GetMgoConn()
|
|
|
- defer MongoTool.DestoryMongoConn(sess)
|
|
|
+ sess := MgoBd.GetMgoConn()
|
|
|
+ defer MgoBd.DestoryMongoConn(sess)
|
|
|
fields := map[string]interface{}{"data": 1, "down_time": 1}
|
|
|
q := bson.M{"down_time": bson.M{"$gte": LastTime}}
|
|
|
- query := sess.DB(Dbname).C(CollBd).Find(q).Select(fields).Iter()
|
|
|
+ query := sess.DB(Dbname_bd).C(CollBd).Find(q).Select(fields).Iter()
|
|
|
tmp := make(map[string]interface{})
|
|
|
for query.Next(&tmp) {
|
|
|
lastid = mongodb.BsonIdToSId(tmp["_id"])
|
|
@@ -41,15 +40,16 @@ func GetBdData() {
|
|
|
findEnt(tmp)
|
|
|
count++
|
|
|
}
|
|
|
+ util.Debug("baidu 处理", count, "条数据")
|
|
|
}
|
|
|
|
|
|
func GetPaData() {
|
|
|
count := 0
|
|
|
lastid := ""
|
|
|
- sess := MongoTool.GetMgoConn()
|
|
|
- defer MongoTool.DestoryMongoConn(sess)
|
|
|
+ sess := MgoMix.GetMgoConn()
|
|
|
+ defer MgoMix.DestoryMongoConn(sess)
|
|
|
fields := map[string]interface{}{"changes": 1, "company_id": 1, "company_name": 1}
|
|
|
- query := sess.DB(Dbname).C(CollPa).Find(nil).Select(fields).Iter()
|
|
|
+ query := sess.DB(Dbname_pa).C(CollPa).Find(nil).Select(fields).Iter()
|
|
|
tmp := make(map[string]interface{})
|
|
|
for query.Next(&tmp) {
|
|
|
lastid = mongodb.BsonIdToSId(tmp["_id"])
|
|
@@ -59,9 +59,8 @@ func GetPaData() {
|
|
|
if tmp["changes"] != nil && len(tmp["changes"].([]interface{})) > 0 {
|
|
|
currentTime := time.Now().Unix()
|
|
|
q := bson.M{"company_name": tmp["company_name"]}
|
|
|
- changeEnt, _ := MongoTool.FindOne(CollSave, q)
|
|
|
+ changeEnt, _ := MgoMix.FindOne(CollSave, q)
|
|
|
if changeEnt != nil && len(*changeEnt) > 0 {
|
|
|
- util.Debug("凭安数据---更新企业信息-----ID----", tmp["_id"])
|
|
|
changeList := tmp["changes"].([]interface{})
|
|
|
if len(changeList) < len((*changeEnt)["changes"].([]interface{})) {
|
|
|
tmp["changes"] = (*changeEnt)["changes"].([]interface{})
|
|
@@ -73,7 +72,6 @@ func GetPaData() {
|
|
|
tmp["updatetime"] = currentTime
|
|
|
}
|
|
|
}else {
|
|
|
- util.Debug("凭安数据---新增企业信息-----ID----", tmp["_id"])
|
|
|
infoList := tmp["changes"].([]interface{})
|
|
|
for _, item := range infoList {
|
|
|
item1 := item.(map[string]interface{})
|
|
@@ -95,6 +93,7 @@ func GetPaData() {
|
|
|
count++
|
|
|
}
|
|
|
}
|
|
|
+ util.Debug("pingan 处理", count, "条数据")
|
|
|
}
|
|
|
|
|
|
func findEnt(tmp map[string]interface{}) {
|
|
@@ -107,13 +106,11 @@ func findEnt(tmp map[string]interface{}) {
|
|
|
infoList := (*changeData)["list"].([]interface{})
|
|
|
currentTime := time.Now().Unix()
|
|
|
q := bson.M{"company_name": (*ent)["entName"]}
|
|
|
- changeEnt, _ := MongoTool.FindOne(CollSave, q)
|
|
|
- util.Debug(*changeEnt)
|
|
|
+ changeEnt, _ := MgoMix.FindOne(CollSave, q)
|
|
|
update := map[string]interface{}{}
|
|
|
if changeEnt != nil && len(*changeEnt) > 0 {
|
|
|
//1、企业变更库有该企业信息
|
|
|
if (*changeEnt)["changes"] != nil{
|
|
|
- util.Debug("百度----更新企业信息-----ID----", (*changeEnt)["_id"])
|
|
|
(*changeEnt)["updatetime"] = currentTime
|
|
|
if len(infoList) > len(tmp["changes"].([]interface{})) {
|
|
|
mapArr := setChangeInfo(infoList)
|
|
@@ -133,11 +130,10 @@ func findEnt(tmp map[string]interface{}) {
|
|
|
}
|
|
|
} else {
|
|
|
//2、企业变更库没有该企业信息
|
|
|
- paEnt, _ := MongoTool.FindOne(CollQy, q)
|
|
|
+ paEnt, _ := MgoMix.FindOne(CollQy, q)
|
|
|
saveEnt := map[string]interface{}{}
|
|
|
if saveEnt != nil && len(*paEnt) > 0 {
|
|
|
//3、企业库有该企业信息
|
|
|
- util.Debug("百度----新增企业信息----ID-----", (*paEnt)["_id"])
|
|
|
saveEnt["_id"] = primitive.NewObjectID()
|
|
|
saveEnt["company_id"] = (*paEnt)["company_id"]
|
|
|
saveEnt["company_name"] = (*ent)["entName"]
|
|
@@ -167,7 +163,7 @@ func findEnt(tmp map[string]interface{}) {
|
|
|
saveEnt["company_name"] = (*ent)["entName"]
|
|
|
saveEnt["createtime"] = currentTime
|
|
|
saveEnt["changes"] = setChangeInfo(infoList)
|
|
|
- MongoTool.Save(CollBack, saveEnt)
|
|
|
+ MgoMix.Save(CollBack, saveEnt)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -219,7 +215,7 @@ func SaveData() {
|
|
|
defer func() {
|
|
|
<-SP
|
|
|
}()
|
|
|
- MongoTool.UpSertBulk(CollSave, arru...)
|
|
|
+ MgoMix.UpSertBulk(CollSave, arru...)
|
|
|
}(arru)
|
|
|
arru = make([][]map[string]interface{}, 200)
|
|
|
indexu = 0
|
|
@@ -231,7 +227,7 @@ func SaveData() {
|
|
|
defer func() {
|
|
|
<-SP
|
|
|
}()
|
|
|
- MongoTool.UpSertBulk(CollSave, arru...)
|
|
|
+ MgoMix.UpSertBulk(CollSave, arru...)
|
|
|
}(arru[:indexu])
|
|
|
arru = make([][]map[string]interface{}, 200)
|
|
|
indexu = 0
|