123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- package main
- import (
- "fmt"
- "github.com/cron"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "log"
- "mongodb"
- "qfw/util"
- "regexp"
- "strings"
- "time"
- )
- func TimeTask() {
- //GetPaData()
- c := cron.New()
- cronstrBd := "0 0 */" + fmt.Sprint(BdTaskTime) + " * * ?" //每TaskTime小时执行一次
- //cronstr := "0 0 " + fmt.Sprint(TaskTime) + " * * ?" //每天TaskTime跑一次
- cronstrPa := "0 0 15 ? * " + fmt.Sprint(PaTaskTime) //凭安增量数据每周二跑一次
- _ = c.AddFunc(cronstrBd, func() { GetBdData() })
- _ = c.AddFunc(cronstrPa, func() { GetPaData() })
- c.Start()
- }
- func GetBdData() {
- count := 0
- lastid := ""
- sess := MgoBd.GetMgoConn()
- defer MgoBd.DestoryMongoConn(sess)
- fields := map[string]interface{}{"data": 1, "down_time": 1}
- q := bson.M{"down_time": bson.M{"$gt": LastTime}}
- query := sess.DB(Dbname_bd).C(CollBd).Find(q).Select(fields).Iter()
- tmp := make(map[string]interface{})
- for query.Next(&tmp) {
- lastid = mongodb.BsonIdToSId(tmp["_id"])
- if count%1000 == 0 {
- util.Debug("baidu ----current----", count, lastid)
- }
- findEnt(tmp)
- count++
- }
- util.Debug("baidu 处理", count, "条数据", ",lasttime---", LastTime)
- }
- func GetPaData() {
- count := 0
- lastid := ""
- sess := MgoMix.GetMgoConn()
- defer MgoMix.DestoryMongoConn(sess)
- fields := map[string]interface{}{"changes": 1, "company_id": 1, "company_name": 1, "company_type": 1, "establish_date": 1, "create_time": 1}
- query := sess.DB(Dbname_pa).C(CollPa).Find(nil).Select(fields).Iter()
- c := MgoMix.Count(CollPa, nil)
- util.Debug("ping an count ------", c)
- tmp := make(map[string]interface{})
- for query.Next(&tmp) {
- lastid = mongodb.BsonIdToSId(tmp["company_id"])
- if count%1000 == 0 {
- util.Debug("ping an ----current-----", count, lastid)
- }
- if strings.Contains(util.ObjToString(tmp["company_type"]), "个体") {
- continue
- }
- currentTime := time.Now().Unix()
- if tmp["changes"] != nil && len(tmp["changes"].([]interface{})) > 0 {
- delete(tmp, "establish_date")
- q := bson.M{"company_name": tmp["company_name"]}
- changeEnt, _ := MgoMix.FindOne(CollSave, q)
- if changeEnt != nil && len(*changeEnt) > 0 {
- tmpList := tmp["changes"].([]interface{})
- changeList := clearRepeat((*changeEnt)["changes"].([]interface{}))
- if len(tmpList) > len(changeList) {
- infoList := clearRepeat(tmp["changes"].([]interface{}))
- for _, item := range infoList {
- item1 := item.(map[string]interface{})
- setMark(item1)
- }
- tmp["changes"] = infoList
- tmp["updatetime"] = currentTime
- }
- }else {
- infoList := clearRepeat(tmp["changes"].([]interface{}))
- for _, item := range infoList {
- item1 := item.(map[string]interface{})
- setMark(item1)
- }
- tmp["_id"] = primitive.NewObjectID()
- tmp["createtime"] = currentTime
- tmp["updatetime"] = currentTime
- }
- update := make(map[string]interface{})
- tmp["datasource"] = "pingan"
- update["$set"] = tmp
- updateInfo := []map[string]interface{}{
- {
- "_id": tmp["_id"],
- },
- update,
- }
- MgoSaveCache <- updateInfo
- count++
- }else {
- //{
- // "change_code": "100000",
- // "change_name": "新设立公司",
- // "change_push": true,
- // "change_info": "新设立公司",
- // "change_keyword": ["新设立"]
- //},
- setupData := ""
- if tmp["establish_date"] != nil {
- if timeTmp, ok := tmp["establish_date"].(primitive.DateTime); ok {
- t := timeTmp.Time()
- setupData = util.FormatDate(&t, util.Date_Short_Layout)
- } else if timeTmp, ok := tmp["establish_date"].(string); ok && timeTmp != "" {
- t := timeReg.FindString(timeTmp)
- if t != "" {
- setupData = t
- }
- }
- }
- createData := ""
- if tmp["create_time"] != nil {
- if timeTmp, ok := tmp["create_time"].(primitive.DateTime); ok {
- t := timeTmp.Time()
- createData = util.FormatDate(&t, util.Date_Short_Layout)
- } else if timeTmp, ok := tmp["create_time"].(string); ok && timeTmp != "" {
- t := timeReg.FindString(timeTmp)
- if t != "" {
- createData = t
- }
- }
- }
- tm2, _ := time.Parse("2006-01-02", createData)
- //当前时间17天内
- if tm2.Unix() < (time.Now().Unix() - 17 * 60 * 60 * 24) {
- continue
- }
- delete(tmp, "establish_date")
- delete(tmp, "create_time")
- changeInfo := make(map[string]interface{})
- changeInfo["change_field"] = "新设立公司"
- changeInfo["change_name_new"] = "新设立公司"
- changeInfo["content_before"] = ""
- changeInfo["content_after"] = "新设立公司"
- changeInfo["change_date"] = setupData
- tmp["changes"] = []map[string]interface{}{changeInfo}
- tmp["_id"] = primitive.NewObjectID()
- tmp["createtime"] = currentTime
- tmp["updatetime"] = currentTime
- tmp["datasource"] = "pingan"
- update := make(map[string]interface{})
- update["$set"] = tmp
- updateInfo := []map[string]interface{}{
- {
- "_id": tmp["_id"],
- },
- update,
- }
- MgoSaveCache <- updateInfo
- count++
- }
- }
- util.Debug("pingan 处理", count, "条数据")
- }
- func findEnt(tmp map[string]interface{}) {
- if LastTime < util.Int64All(tmp["down_time"]) {
- LastTime = util.Int64All(tmp["down_time"])
- }
- data := util.ObjToMap(tmp["data"])
- ent := util.ObjToMap((*data)["basicData"])
- changeData := util.ObjToMap((*data)["changeRecordData"])
- infoList := (*changeData)["list"].([]interface{})
- currentTime := time.Now().Unix()
- m := util.ObjToString((*ent)["entName"])
- m = strings.ReplaceAll(m, "(", "(")
- m = strings.ReplaceAll(m, ")", ")")
- q := bson.M{"company_name": m}
- changeEnt, _ := MgoMix.FindOne(CollSave, q)
- update := map[string]interface{}{}
- if changeEnt != nil && len(*changeEnt) > 0 {
- //1、企业变更库有该企业信息
- if (*changeEnt)["changes"] != nil{
- (*changeEnt)["updatetime"] = currentTime
- if len(infoList) > len((*changeEnt)["changes"].([]interface{})) {
- mapArr := setChangeInfo(infoList)
- for _, v := range mapArr{
- setMark(v)
- }
- (*changeEnt)["changes"] = mapArr
- }
- update["$set"] = *changeEnt
- updateInfo := []map[string]interface{}{
- {
- "_id": (*changeEnt)["_id"],
- },
- update,
- }
- MgoSaveCache <- updateInfo
- }
- } else {
- //2、企业变更库没有该企业信息
- paEnt, _ := MgoMix.FindOne(CollQy, q)
- saveEnt := map[string]interface{}{}
- if saveEnt != nil && len(*paEnt) > 0 {
- //3、企业库有该企业信息
- saveEnt["datasource"] = "baidu"
- saveEnt["_id"] = primitive.NewObjectID()
- saveEnt["company_id"] = (*paEnt)["company_id"]
- saveEnt["company_name"] = (*ent)["entName"]
- saveEnt["createtime"] = currentTime
- saveEnt["updatetime"] = currentTime
- if (*paEnt)["changes"] != nil{
- changeArr := (*paEnt)["changes"].([]interface{})
- mapArr := setChangeInfo(infoList)
- for _, v := range util.ObjArrToMapArr(changeArr){
- setMark(v)
- mapArr = append(mapArr, v)
- }
- saveEnt["changes"] = mapArr
- }else {
- saveEnt["changes"] = setChangeInfo(infoList)
- }
- update["$set"] = saveEnt
- updateInfo := []map[string]interface{}{
- {
- "_id": saveEnt["_id"],
- },
- update,
- }
- MgoSaveCache <- updateInfo
- } else {
- //4、企业库没有该企业信息
- saveEnt["company_name"] = (*ent)["entName"]
- saveEnt["createtime"] = currentTime
- saveEnt["changes"] = setChangeInfo(infoList)
- MgoMix.Save(CollBack, saveEnt)
- }
- }
- }
- func setChangeInfo(list []interface{}) []map[string]interface{} {
- var arr []map[string]interface{}
- for _, item := range list {
- tmp := map[string]interface{}{}
- item1 := item.(map[string]interface{})
- tmp["change_date"] = item1["date"]
- tmp["content_before"] = item1["oldValue"]
- tmp["content_after"] = item1["newValue"]
- tmp["change_field"] = item1["fieldName"]
- setMark(tmp)
- arr = append(arr, tmp)
- }
- return arr
- }
- func setMark(tmp map[string]interface{}) {
- for _, v := range ChangeMap {
- str := util.ObjToString(tmp["change_field"])
- regArr := v["change_key_reg"].([]string)
- for _, v1 := range regArr {
- matched, _ := regexp.MatchString(v1, str)
- if matched {
- tmp["change_name_new"] = v["change_name"]
- return
- }
- }
- }
- }
- func clearRepeat(list []interface{}) []interface{} {
- var tmp []interface{}
- if len(list) > 1 {
- for k, v := range list{
- if k < len(list)-1 {
- if fmt.Sprint(list[k]) != fmt.Sprint(list[k+1]) {
- tmp = append(tmp, v)
- }
- }else {
- tmp = append(tmp, v)
- }
- }
- return tmp
- }else {
- return list
- }
- }
- var MgoSaveCache = make(chan []map[string]interface{}, 2000)
- var SP = make(chan bool, 5)
- func SaveData() {
- log.Println("Mgo Save...")
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-MgoSaveCache:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- SP <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-SP
- }()
- MgoMix.UpSertBulk(CollSave, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- SP <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-SP
- }()
- MgoMix.UpSertBulk(CollSave, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|