package main import ( "fmt" "gorm.io/driver/clickhouse" "gorm.io/gorm" "gorm.io/gorm/logger" "strings" "time" "unicode/utf8" //_ "github.com/gogf/gf/contrib/drivers/clickhouse/v2" //"github.com/gogf/gf/v2/frame/g" //"github.com/gogf/gf/v2/os/gctx" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "log" "net/url" ) // investData 投资关系存量数据处理 func investData() { where := map[string]interface{}{ //"stock_type": "企业法人", "stock_type": map[string]interface{}{ "$ne": "自然人股东", }, //"use_flag": map[string]interface{}{ // "$lt": 5, //}, } //{stock_type:{$ne:"自然人股东"}} /** 找 company_partner ,stock_type不是自然人股东的数据,循环数据,判断stock_name 长度大于3 根据stack_name 到 qyxy_std 找 company_name 想等数据存在,存在就写入;增量数据直接查询法人库表 */ //Mgo := &mongodb.MongodbSim{ // MongodbAddr: "172.17.4.181:27001", // //MongodbAddr: "127.0.0.1:27001", // DbName: "mixdata", // Size: 10, //} // //Mgo.InitPool() sess := MgoPA.GetMgoConn() defer MgoPA.DestoryMongoConn(sess) //正式环境 username := GF.Clickhouse.Username password := GF.Clickhouse.Password host := GF.Clickhouse.Host //测试环境 //username := "jytop" //password := "pwdTopJy123" //host := "192.168.3.207:19000" // 本地环境 //username := "wcc" //password := "123" //host := "localhost:9090" encodedPassword := url.QueryEscape(password) dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host) ////dsn := "clickhouse://jybi_admin:Da#jy20230825@127.0.0.1:8123/default?dial_timeout=10s&read_timeout=20s" ////dsn := "clickhouse://jytop:pwdTopJy123@192.168.3.207:18123/information?dial_timeout=10s&read_timeout=20s" ////dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s" //dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s" //clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s //dsn := "clickhouse://jybi_admin:Da#jy20230825@localhost:18123/test?dial_timeout=10s&read_timeout=20s" db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) if err != nil { log.Fatal("打开数据库失败:", err) } else { log.Println("连接数据库成功", db.Name()) } //res := make(map[string]interface{}) //db.Table("information.information").Select([]string{"id", "endtime", "starttime"}).Find(&res, "datajson_id = ? ", "6350a552911e1eb345b55413") //log.Println(res) query := sess.DB("mixdata").C("company_partner").Find(where).Select(nil).Sort("update_time").Iter() count := 0 //datas := make([]EntMapCode, 0) for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%100 == 0 { log.Println("current ---", count, tmp["stock_name"], tmp["update_time"]) } if util.ObjToString(tmp["stock_type"]) == "自然人股东" { continue } if util.IntAll(tmp["use_flag"]) > 5 { continue } //历史数据,作废 if util.IntAll(tmp["is_history"]) != 0 { continue } //股东名称 长度小于5,认为不是企业 if utf8.RuneCountInString(util.ObjToString(tmp["stock_name"])) < 5 { continue } // 公司名称为空 if util.ObjToString(tmp["stock_name"]) == "" || util.ObjToString(tmp["company_name"]) == "" { continue } //投资关系-0201,交易关系-0301, //管辖关系-0101,直属关系-0102,组成关系-0103 data := EntMapCode{ AName: util.ObjToString(tmp["stock_name"]), BName: util.ObjToString(tmp["company_name"]), Code: "0201", InvestRatio: util.ObjToString(tmp["stock_proportion"]), InvestPrice: util.ObjToString(tmp["stock_capital"]), CreateTime: time.Now().Unix(), UpdateTime: time.Now().Unix(), } if err = db.Create(data).Error; err != nil { log.Println("create err", err, data.AName) } } log.Printf("over") } // biddingAllData 交易关系数据处理 func biddingAllData() { /** 查询bidding 数据中,采购单位和中标单位,就是交易关系 */ sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //正式环境 username := GF.Clickhouse.Username password := GF.Clickhouse.Password host := GF.Clickhouse.Host //测试环境 //username := "jytop" //password := "pwdTopJy123" //host := "192.168.3.207:19000" // 本地环境 //username := "wcc" //password := "123" //host := "localhost:9090" encodedPassword := url.QueryEscape(password) dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host) db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) if err != nil { log.Fatal("打开数据库失败:", err) } else { log.Println("连接数据库成功", db.Name()) } //db.Logger.LogMode(gor) //res := make(map[string]interface{}) //db.Table("information.information").Select([]string{"id", "endtime", "starttime"}).Find(&res, "datajson_id = ? ", "6350a552911e1eb345b55413") //log.Println(res) query := sess.DB(GF.MongoB.DB).C("bidding").Find(nil).Select(nil).Sort("_id").Iter() count := 0 //datas := make([]EntMapCode, 0) for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current ---", count, tmp["_id"]) } if util.IntAll(tmp["extracttype"]) == -1 { continue } rsa, rsb := false, false // 1.判断规则打物业标签 if tag_topinformation, ok := tmp["tag_topinformation"]; !ok { rsa = false } else { if tags, ok := tag_topinformation.([]interface{}); ok { for _, tag := range tags { tg := util.ObjToString(tag) if tg == "情报_物业" { rsa = true break } } } } // 2.判断人工智能打物业标签 if tag_topinformation_ai, ok := tmp["tag_topinformation_ai"]; !ok { rsb = false } else { if tags, ok := tag_topinformation_ai.([]interface{}); ok { for _, tag := range tags { tg := util.ObjToString(tag) if tg == "情报_物业" { rsb = true break } } } } // 规则和人工智能,都没打上物业标签 if !rsb && !rsa { continue } if util.ObjToString(tmp["buyer"]) == "" { continue } // 没有中标单位,直接跳过 if sWinner, ok := tmp["s_winner"]; !ok { continue } else { //投资关系-0201,交易关系-0301, //管辖关系-0101,直属关系-0102,组成关系-0103 winners := util.ObjToString(sWinner) wins := strings.Split(winners, ",") for _, winer := range wins { exist := EntMapCode{} db.Where(&EntMapCode{AName: util.ObjToString(tmp["buyer"]), BName: winer}).First(&exist) if exist.CreateTime == 0 { data := EntMapCode{ AName: util.ObjToString(tmp["buyer"]), BName: winer, Code: "0301", CreateTime: time.Now().Unix(), UpdateTime: time.Now().Unix(), } if err = db.Create(data).Error; err != nil { log.Println("create err", err, data.AName, data.BName) } } } } } log.Printf("over") } // organizeData 管辖关系数据;special_enterprise func organizeData() { sess := MgoPA.GetMgoConn() defer MgoPA.DestoryMongoConn(sess) //正式环境 username := GF.Clickhouse.Username password := GF.Clickhouse.Password host := GF.Clickhouse.Host encodedPassword := url.QueryEscape(password) dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host) db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) if err != nil { log.Fatal("打开数据库失败:", err) } else { log.Println("连接数据库成功", db.Name()) } query := sess.DB("mixdata").C("special_enterprise").Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current ---", count, tmp["company_name"], tmp["organizer"]) } if util.ObjToString(tmp["company_status"]) != "正常" { continue } if util.IntAll(tmp["use_flag"]) != 0 { continue } // 公司名称为空 if util.ObjToString(tmp["organizer"]) == "" || util.ObjToString(tmp["company_name"]) == "" { continue } //投资关系-0201,交易关系-0301, //管辖关系-0101,直属关系-0102,组成关系-0103 data := EntMapCode{ AName: util.ObjToString(tmp["organizer"]), BName: util.ObjToString(tmp["company_name"]), Code: "0101", CreateTime: time.Now().Unix(), UpdateTime: time.Now().Unix(), } if err = db.Create(data).Error; err != nil { log.Println("create err", err, data.AName) } } log.Printf("over") } // areaData 处理政府机关省市区单位 func areaData() { //f, err := excelize.OpenFile("./管辖关系.xlsx") //if err != nil { // fmt.Println(err) // return //} //defer func() { // if err := f.Close(); err != nil { // fmt.Println(err) // } //}() // //rows, err := f.GetRows("区") //if err != nil { // fmt.Println(err) // return //} ////================================// //f2, err := excelize.OpenFile("./area.xlsx") //if err != nil { // fmt.Println(err) // return //} //defer func() { // if err := f2.Close(); err != nil { // fmt.Println(err) // } //}() // //rows2, err := f2.GetRows("区") //if err != nil { // fmt.Println(err) // return //} // //for i := 1; i < len(rows); i++ { // aname := rows[i][1] // bname := rows[i][2] // //a := strings.Replace(aname, "河南省", "", -1) // //a := strings.ReplaceAll(aname, "郑州市", "") // //b := strings.ReplaceAll(bname, "郑州市", "") // fmt.Println(aname, bname) // for j := 1; j < len(rows2); j++ { // //var newNamea, newNameb string // //if rows2[j][0] == "北京" || rows2[j][0] == "天津" || rows2[j][0] == "上海" || rows2[j][0] == "重庆" { // // //newNamea = rows2[j][0] + "市" + a // // //newNameb = rows2[j][0] + "市" + b // //} else { // // newNamea = rows2[j][0] + "省" + a // // newNameb = rows2[j][0] + "省" + b // //} // if strings.Contains(rows2[j][2], "县") { // continue // } // // newNamea := strings.ReplaceAll(aname, "郑州市金水区", rows2[j][1]+rows2[j][2]) // newNameb := strings.ReplaceAll(bname, "郑州市金水区", rows2[j][1]+rows2[j][2]) // // insert := map[string]interface{}{ // "aname": newNamea, // "bname": newNameb, // "code": rows[i][4], // } // MgoB.Save("wcc_zhengfujigou_5", insert) // } //} sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //正式环境 username := GF.Clickhouse.Username password := GF.Clickhouse.Password host := GF.Clickhouse.Host encodedPassword := url.QueryEscape(password) dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host) db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) if err != nil { log.Fatal("打开数据库失败:", err) } else { log.Println("连接数据库成功", db.Name()) } query := sess.DB(GF.MongoB.DB).C("wcc_zhengfujigou").Find(nil).Select(nil).Sort("_id").Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current ---", count, tmp["aname"], tmp["bname"]) } //判断重复数据 exist := EntMapCode{} db.Where(&EntMapCode{AName: util.ObjToString(tmp["aname"]), BName: util.ObjToString(tmp["bname"])}).First(&exist) if exist.CreateTime > 0 { continue } data := EntMapCode{ AName: util.ObjToString(tmp["aname"]), BName: util.ObjToString(tmp["bname"]), CreateTime: time.Now().Unix(), UpdateTime: time.Now().Unix(), } //管辖关系-0101,直属关系-0102,组成关系-0103 if util.ObjToString(tmp["code"]) == "管辖关系" { data.Code = "0101" } else if util.ObjToString(tmp["code"]) == "直属关系" { data.Code = "0102" } else if util.ObjToString(tmp["code"]) == "组成关系" { data.Code = "0103" } if err = db.Create(data).Error; err != nil { log.Println("create err", err, data.AName, data.BName) } } log.Println("over") } // updateInfoId 更新法人库表ID,更新映射表对应的法人库ID func updateInfoId() { username := GF.Clickhouse.Username password := GF.Clickhouse.Password host := GF.Clickhouse.Host encodedPassword := url.QueryEscape(password) dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host) db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) if err != nil { log.Fatal("打开数据库失败:", err) } else { log.Println("连接数据库成功", db.Name()) } //=======// // 执行原始 SQL 查询 batchSize := 100 page := 1 for { log.Println("current page ", page) var entMapCodes []EntMapCode if err := db.Raw("SELECT * FROM ent_map_code where a_id = '' or b_id = '' ORDER BY create_time LIMIT ? OFFSET ? ", batchSize, (page-1)*batchSize).Scan(&entMapCodes).Error; err != nil { if err == gorm.ErrRecordNotFound { break } log.Fatal("failed to fetch data:", err) } // 处理查询到的数据 for _, entMapCode := range entMapCodes { if entMapCode.AId == "" { AInfo := EntInfo{} db.Model(&EntInfo{}).Where("company_name = ? ", entMapCode.AName).Select("company_name", "id").First(&AInfo) if AInfo.ID != "" { update1 := map[string]interface{}{ "a_id": AInfo.ID, } db.Model(&entMapCode).Where(&EntMapCode{AName: entMapCode.AName}).Updates(update1) } } if entMapCode.BId == "" { BInfo := EntInfo{} db.Model(&EntInfo{}).Where("company_name = ? ", entMapCode.BName).Select("company_name", "id").First(&BInfo) if BInfo.ID != "" { update2 := map[string]interface{}{ "b_id": BInfo.ID, } db.Model(&entMapCode).Where(&EntMapCode{BName: entMapCode.BName}).Updates(update2) } } } // 如果查询结果为空,则退出循环 if len(entMapCodes) < batchSize { break } page++ } log.Println("迭代结束") }