package main import ( "fmt" "github.com/cron" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "log" "mongodb" "qfw/util" "regexp" "strings" "time" ) func TimeTask() { //GetPaData() c := cron.New() cronstrBd := "0 0 */" + fmt.Sprint(BdTaskTime) + " * * ?" //每TaskTime小时执行一次 //cronstr := "0 0 " + fmt.Sprint(TaskTime) + " * * ?" //每天TaskTime跑一次 cronstrPa := "0 0 15 ? * " + fmt.Sprint(PaTaskTime) //凭安增量数据每周二跑一次 _ = c.AddFunc(cronstrBd, func() { GetBdData() }) _ = c.AddFunc(cronstrPa, func() { GetPaData() }) c.Start() } func GetBdData() { count := 0 lastid := "" sess := MgoBd.GetMgoConn() defer MgoBd.DestoryMongoConn(sess) fields := map[string]interface{}{"data": 1, "down_time": 1} q := bson.M{"down_time": bson.M{"$gt": LastTime}} query := sess.DB(Dbname_bd).C(CollBd).Find(q).Select(fields).Iter() tmp := make(map[string]interface{}) for query.Next(&tmp) { lastid = mongodb.BsonIdToSId(tmp["_id"]) if count%1000 == 0 { util.Debug("baidu ----current----", count, lastid) } findEnt(tmp) count++ } util.Debug("baidu 处理", count, "条数据") } func GetPaData() { count := 0 lastid := "" sess := MgoMix.GetMgoConn() defer MgoMix.DestoryMongoConn(sess) fields := map[string]interface{}{"changes": 1, "company_id": 1, "company_name": 1, "company_type": 1, "establish_date": 1, "create_time": 1} query := sess.DB(Dbname_pa).C(CollPa).Find(nil).Select(fields).Iter() c := MgoMix.Count(CollPa, nil) util.Debug("ping an count ------", c) tmp := make(map[string]interface{}) for query.Next(&tmp) { lastid = mongodb.BsonIdToSId(tmp["company_id"]) if count%1000 == 0 { util.Debug("ping an ----current-----", count, lastid) } if strings.Contains(util.ObjToString(tmp["company_type"]), "个体") { continue } currentTime := time.Now().Unix() if tmp["changes"] != nil && len(tmp["changes"].([]interface{})) > 0 { delete(tmp, "establish_date") q := bson.M{"company_name": tmp["company_name"]} changeEnt, _ := MgoMix.FindOne(CollSave, q) if changeEnt != nil && len(*changeEnt) > 0 { tmpList := tmp["changes"].([]interface{}) changeList := clearRepeat((*changeEnt)["changes"].([]interface{})) if len(tmpList) > len(changeList) { infoList := clearRepeat(tmp["changes"].([]interface{})) for _, item := range infoList { item1 := item.(map[string]interface{}) setMark(item1) } tmp["changes"] = infoList tmp["updatetime"] = currentTime } }else { infoList := clearRepeat(tmp["changes"].([]interface{})) for _, item := range infoList { item1 := item.(map[string]interface{}) setMark(item1) } tmp["_id"] = primitive.NewObjectID() tmp["createtime"] = currentTime tmp["updatetime"] = currentTime } update := make(map[string]interface{}) tmp["datasource"] = "pingan" update["$set"] = tmp updateInfo := []map[string]interface{}{ { "_id": tmp["_id"], }, update, } MgoSaveCache <- updateInfo count++ }else { //{ // "change_code": "100000", // "change_name": "新设立公司", // "change_push": true, // "change_info": "新设立公司", // "change_keyword": ["新设立"] //}, setupData := "" if tmp["establish_date"] != nil { if timeTmp, ok := tmp["establish_date"].(primitive.DateTime); ok { t := timeTmp.Time() setupData = util.FormatDate(&t, util.Date_Short_Layout) } else if timeTmp, ok := tmp["establish_date"].(string); ok && timeTmp != "" { t := timeReg.FindString(timeTmp) if t != "" { setupData = t } } } createData := "" if tmp["create_time"] != nil { if timeTmp, ok := tmp["create_time"].(primitive.DateTime); ok { t := timeTmp.Time() createData = util.FormatDate(&t, util.Date_Short_Layout) } else if timeTmp, ok := tmp["create_time"].(string); ok && timeTmp != "" { t := timeReg.FindString(timeTmp) if t != "" { createData = t } } } tm2, _ := time.Parse("2006-01-02", createData) //当前时间17天内 if tm2.Unix() < (time.Now().Unix() - 17 * 60 * 60 * 24) { continue } delete(tmp, "establish_date") delete(tmp, "create_time") changeInfo := make(map[string]interface{}) changeInfo["change_field"] = "新设立公司" changeInfo["change_name_new"] = "新设立公司" changeInfo["content_before"] = "" changeInfo["content_after"] = "新设立公司" changeInfo["change_date"] = setupData tmp["changes"] = []map[string]interface{}{changeInfo} tmp["_id"] = primitive.NewObjectID() tmp["createtime"] = currentTime tmp["updatetime"] = currentTime tmp["datasource"] = "pingan" update := make(map[string]interface{}) update["$set"] = tmp updateInfo := []map[string]interface{}{ { "_id": tmp["_id"], }, update, } MgoSaveCache <- updateInfo count++ } } util.Debug("pingan 处理", count, "条数据") } func findEnt(tmp map[string]interface{}) { if LastTime < util.Int64All(tmp["down_time"]) { LastTime = util.Int64All(tmp["down_time"]) } data := util.ObjToMap(tmp["data"]) ent := util.ObjToMap((*data)["basicData"]) changeData := util.ObjToMap((*data)["changeRecordData"]) infoList := (*changeData)["list"].([]interface{}) currentTime := time.Now().Unix() q := bson.M{"company_name": (*ent)["entName"]} changeEnt, _ := MgoMix.FindOne(CollSave, q) update := map[string]interface{}{} if changeEnt != nil && len(*changeEnt) > 0 { //1、企业变更库有该企业信息 if (*changeEnt)["changes"] != nil{ (*changeEnt)["updatetime"] = currentTime if len(infoList) > len((*changeEnt)["changes"].([]interface{})) { mapArr := setChangeInfo(infoList) for _, v := range mapArr{ setMark(v) } (*changeEnt)["changes"] = mapArr } update["$set"] = *changeEnt updateInfo := []map[string]interface{}{ { "_id": (*changeEnt)["_id"], }, update, } MgoSaveCache <- updateInfo } } else { //2、企业变更库没有该企业信息 paEnt, _ := MgoMix.FindOne(CollQy, q) saveEnt := map[string]interface{}{} if saveEnt != nil && len(*paEnt) > 0 { //3、企业库有该企业信息 saveEnt["datasource"] = "baidu" saveEnt["_id"] = primitive.NewObjectID() saveEnt["company_id"] = (*paEnt)["company_id"] saveEnt["company_name"] = (*ent)["entName"] saveEnt["createtime"] = currentTime saveEnt["updatetime"] = currentTime if (*paEnt)["changes"] != nil{ changeArr := (*paEnt)["changes"].([]interface{}) mapArr := setChangeInfo(infoList) for _, v := range util.ObjArrToMapArr(changeArr){ setMark(v) mapArr = append(mapArr, v) } saveEnt["changes"] = mapArr }else { saveEnt["changes"] = setChangeInfo(infoList) } update["$set"] = saveEnt updateInfo := []map[string]interface{}{ { "_id": saveEnt["_id"], }, update, } MgoSaveCache <- updateInfo } else { //4、企业库没有该企业信息 saveEnt["company_name"] = (*ent)["entName"] saveEnt["createtime"] = currentTime saveEnt["changes"] = setChangeInfo(infoList) MgoMix.Save(CollBack, saveEnt) } } } func setChangeInfo(list []interface{}) []map[string]interface{} { var arr []map[string]interface{} for _, item := range list { tmp := map[string]interface{}{} item1 := item.(map[string]interface{}) tmp["change_date"] = item1["date"] tmp["content_before"] = item1["oldValue"] tmp["content_after"] = item1["newValue"] tmp["change_field"] = item1["fieldName"] setMark(tmp) arr = append(arr, tmp) } return arr } func setMark(tmp map[string]interface{}) { for _, v := range ChangeMap { str := util.ObjToString(tmp["change_field"]) regArr := v["change_key_reg"].([]string) for _, v1 := range regArr { matched, _ := regexp.MatchString(v1, str) if matched { tmp["change_name_new"] = v["change_name"] return } } } } func clearRepeat(list []interface{}) []interface{} { var tmp []interface{} if len(list) > 1 { for k, v := range list{ if k < len(list)-1 { if fmt.Sprint(list[k]) != fmt.Sprint(list[k+1]) { tmp = append(tmp, v) } }else { tmp = append(tmp, v) } } return tmp }else { return list } } var MgoSaveCache = make(chan []map[string]interface{}, 2000) var SP = make(chan bool, 5) func SaveData() { log.Println("Mgo Save...") arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-MgoSaveCache: arru[indexu] = v indexu++ if indexu == 200 { SP <- true go func(arru [][]map[string]interface{}) { defer func() { <-SP }() MgoMix.UpSertBulk(CollSave, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru [][]map[string]interface{}) { defer func() { <-SP }() MgoMix.UpSertBulk(CollSave, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }