123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- package main
- import (
- "fmt"
- "gorm.io/driver/clickhouse"
- "gorm.io/gorm"
- "gorm.io/gorm/logger"
- "strings"
- "time"
- "unicode/utf8"
- //_ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
- //"github.com/gogf/gf/v2/frame/g"
- //"github.com/gogf/gf/v2/os/gctx"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "log"
- "net/url"
- )
- // investData 投资关系存量数据处理
- func investData() {
- where := map[string]interface{}{
- //"stock_type": "企业法人",
- "stock_type": map[string]interface{}{
- "$ne": "自然人股东",
- },
- //"use_flag": map[string]interface{}{
- // "$lt": 5,
- //},
- }
- //{stock_type:{$ne:"自然人股东"}}
- /**
- 找 company_partner ,stock_type不是自然人股东的数据,循环数据,判断stock_name 长度大于3
- 根据stack_name 到 qyxy_std 找 company_name 想等数据存在,存在就写入;增量数据直接查询法人库表
- */
- //Mgo := &mongodb.MongodbSim{
- // MongodbAddr: "172.17.4.181:27001",
- // //MongodbAddr: "127.0.0.1:27001",
- // DbName: "mixdata",
- // Size: 10,
- //}
- //
- //Mgo.InitPool()
- sess := MgoPA.GetMgoConn()
- defer MgoPA.DestoryMongoConn(sess)
- //正式环境
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- //测试环境
- //username := "jytop"
- //password := "pwdTopJy123"
- //host := "192.168.3.207:19000"
- // 本地环境
- //username := "wcc"
- //password := "123"
- //host := "localhost:9090"
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- ////dsn := "clickhouse://jybi_admin:Da#jy20230825@127.0.0.1:8123/default?dial_timeout=10s&read_timeout=20s"
- ////dsn := "clickhouse://jytop:pwdTopJy123@192.168.3.207:18123/information?dial_timeout=10s&read_timeout=20s"
- ////dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s"
- //dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s"
- //clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s
- //dsn := "clickhouse://jybi_admin:Da#jy20230825@localhost:18123/test?dial_timeout=10s&read_timeout=20s"
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- //res := make(map[string]interface{})
- //db.Table("information.information").Select([]string{"id", "endtime", "starttime"}).Find(&res, "datajson_id = ? ", "6350a552911e1eb345b55413")
- //log.Println(res)
- query := sess.DB("mixdata").C("company_partner").Find(where).Select(nil).Sort("update_time").Iter()
- count := 0
- //datas := make([]EntMapCode, 0)
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%100 == 0 {
- log.Println("current ---", count, tmp["stock_name"], tmp["update_time"])
- }
- if util.ObjToString(tmp["stock_type"]) == "自然人股东" {
- continue
- }
- if util.IntAll(tmp["use_flag"]) > 5 {
- continue
- }
- //历史数据,作废
- if util.IntAll(tmp["is_history"]) != 0 {
- continue
- }
- //股东名称 长度小于5,认为不是企业
- if utf8.RuneCountInString(util.ObjToString(tmp["stock_name"])) < 5 {
- continue
- }
- // 公司名称为空
- if util.ObjToString(tmp["stock_name"]) == "" || util.ObjToString(tmp["company_name"]) == "" {
- continue
- }
- //投资关系-0201,交易关系-0301,
- //管辖关系-0101,直属关系-0102,组成关系-0103
- data := EntMapCode{
- AName: util.ObjToString(tmp["stock_name"]),
- BName: util.ObjToString(tmp["company_name"]),
- Code: "0201",
- InvestRatio: util.ObjToString(tmp["stock_proportion"]),
- InvestPrice: util.ObjToString(tmp["stock_capital"]),
- CreateTime: time.Now().Unix(),
- UpdateTime: time.Now().Unix(),
- }
- if err = db.Create(data).Error; err != nil {
- log.Println("create err", err, data.AName)
- }
- }
- log.Printf("over")
- }
- // biddingAllData 交易关系数据处理
- func biddingAllData() {
- /**
- 查询bidding 数据中,采购单位和中标单位,就是交易关系
- */
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- //正式环境
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- //测试环境
- //username := "jytop"
- //password := "pwdTopJy123"
- //host := "192.168.3.207:19000"
- // 本地环境
- //username := "wcc"
- //password := "123"
- //host := "localhost:9090"
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- //db.Logger.LogMode(gor)
- //res := make(map[string]interface{})
- //db.Table("information.information").Select([]string{"id", "endtime", "starttime"}).Find(&res, "datajson_id = ? ", "6350a552911e1eb345b55413")
- //log.Println(res)
- query := sess.DB(GF.MongoB.DB).C("bidding").Find(nil).Select(nil).Sort("_id").Iter()
- count := 0
- //datas := make([]EntMapCode, 0)
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current ---", count, tmp["_id"])
- }
- if util.IntAll(tmp["extracttype"]) == -1 {
- continue
- }
- rsa, rsb := false, false
- // 1.判断规则打物业标签
- if tag_topinformation, ok := tmp["tag_topinformation"]; !ok {
- rsa = false
- } else {
- if tags, ok := tag_topinformation.([]interface{}); ok {
- for _, tag := range tags {
- tg := util.ObjToString(tag)
- if tg == "情报_物业" {
- rsa = true
- break
- }
- }
- }
- }
- // 2.判断人工智能打物业标签
- if tag_topinformation_ai, ok := tmp["tag_topinformation_ai"]; !ok {
- rsb = false
- } else {
- if tags, ok := tag_topinformation_ai.([]interface{}); ok {
- for _, tag := range tags {
- tg := util.ObjToString(tag)
- if tg == "情报_物业" {
- rsb = true
- break
- }
- }
- }
- }
- // 规则和人工智能,都没打上物业标签
- if !rsb && !rsa {
- continue
- }
- if util.ObjToString(tmp["buyer"]) == "" {
- continue
- }
- // 没有中标单位,直接跳过
- if sWinner, ok := tmp["s_winner"]; !ok {
- continue
- } else {
- //投资关系-0201,交易关系-0301,
- //管辖关系-0101,直属关系-0102,组成关系-0103
- winners := util.ObjToString(sWinner)
- wins := strings.Split(winners, ",")
- for _, winer := range wins {
- exist := EntMapCode{}
- db.Where(&EntMapCode{AName: util.ObjToString(tmp["buyer"]), BName: winer}).First(&exist)
- if exist.CreateTime == 0 {
- data := EntMapCode{
- AName: util.ObjToString(tmp["buyer"]),
- BName: winer,
- Code: "0301",
- CreateTime: time.Now().Unix(),
- UpdateTime: time.Now().Unix(),
- }
- if err = db.Create(data).Error; err != nil {
- log.Println("create err", err, data.AName, data.BName)
- }
- }
- }
- }
- }
- log.Printf("over")
- }
- // organizeData 管辖关系数据;special_enterprise
- func organizeData() {
- sess := MgoPA.GetMgoConn()
- defer MgoPA.DestoryMongoConn(sess)
- //正式环境
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- query := sess.DB("mixdata").C("special_enterprise").Find(nil).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%1000 == 0 {
- log.Println("current ---", count, tmp["company_name"], tmp["organizer"])
- }
- if util.ObjToString(tmp["company_status"]) != "正常" {
- continue
- }
- if util.IntAll(tmp["use_flag"]) != 0 {
- continue
- }
- // 公司名称为空
- if util.ObjToString(tmp["organizer"]) == "" || util.ObjToString(tmp["company_name"]) == "" {
- continue
- }
- //投资关系-0201,交易关系-0301,
- //管辖关系-0101,直属关系-0102,组成关系-0103
- data := EntMapCode{
- AName: util.ObjToString(tmp["organizer"]),
- BName: util.ObjToString(tmp["company_name"]),
- Code: "0101",
- CreateTime: time.Now().Unix(),
- UpdateTime: time.Now().Unix(),
- }
- if err = db.Create(data).Error; err != nil {
- log.Println("create err", err, data.AName)
- }
- }
- log.Printf("over")
- }
- // areaData 处理政府机关省市区单位
- func areaData() {
- //f, err := excelize.OpenFile("./管辖关系.xlsx")
- //if err != nil {
- // fmt.Println(err)
- // return
- //}
- //defer func() {
- // if err := f.Close(); err != nil {
- // fmt.Println(err)
- // }
- //}()
- //
- //rows, err := f.GetRows("区")
- //if err != nil {
- // fmt.Println(err)
- // return
- //}
- ////================================//
- //f2, err := excelize.OpenFile("./area.xlsx")
- //if err != nil {
- // fmt.Println(err)
- // return
- //}
- //defer func() {
- // if err := f2.Close(); err != nil {
- // fmt.Println(err)
- // }
- //}()
- //
- //rows2, err := f2.GetRows("区")
- //if err != nil {
- // fmt.Println(err)
- // return
- //}
- //
- //for i := 1; i < len(rows); i++ {
- // aname := rows[i][1]
- // bname := rows[i][2]
- // //a := strings.Replace(aname, "河南省", "", -1)
- // //a := strings.ReplaceAll(aname, "郑州市", "")
- // //b := strings.ReplaceAll(bname, "郑州市", "")
- // fmt.Println(aname, bname)
- // for j := 1; j < len(rows2); j++ {
- // //var newNamea, newNameb string
- // //if rows2[j][0] == "北京" || rows2[j][0] == "天津" || rows2[j][0] == "上海" || rows2[j][0] == "重庆" {
- // // //newNamea = rows2[j][0] + "市" + a
- // // //newNameb = rows2[j][0] + "市" + b
- // //} else {
- // // newNamea = rows2[j][0] + "省" + a
- // // newNameb = rows2[j][0] + "省" + b
- // //}
- // if strings.Contains(rows2[j][2], "县") {
- // continue
- // }
- //
- // newNamea := strings.ReplaceAll(aname, "郑州市金水区", rows2[j][1]+rows2[j][2])
- // newNameb := strings.ReplaceAll(bname, "郑州市金水区", rows2[j][1]+rows2[j][2])
- //
- // insert := map[string]interface{}{
- // "aname": newNamea,
- // "bname": newNameb,
- // "code": rows[i][4],
- // }
- // MgoB.Save("wcc_zhengfujigou_5", insert)
- // }
- //}
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- //正式环境
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- query := sess.DB(GF.MongoB.DB).C("wcc_zhengfujigou").Find(nil).Select(nil).Sort("_id").Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current ---", count, tmp["aname"], tmp["bname"])
- }
- //判断重复数据
- exist := EntMapCode{}
- db.Where(&EntMapCode{AName: util.ObjToString(tmp["aname"]), BName: util.ObjToString(tmp["bname"])}).First(&exist)
- if exist.CreateTime > 0 {
- continue
- }
- data := EntMapCode{
- AName: util.ObjToString(tmp["aname"]),
- BName: util.ObjToString(tmp["bname"]),
- CreateTime: time.Now().Unix(),
- UpdateTime: time.Now().Unix(),
- }
- //管辖关系-0101,直属关系-0102,组成关系-0103
- if util.ObjToString(tmp["code"]) == "管辖关系" {
- data.Code = "0101"
- } else if util.ObjToString(tmp["code"]) == "直属关系" {
- data.Code = "0102"
- } else if util.ObjToString(tmp["code"]) == "组成关系" {
- data.Code = "0103"
- }
- if err = db.Create(data).Error; err != nil {
- log.Println("create err", err, data.AName, data.BName)
- }
- }
- log.Println("over")
- }
- // updateInfoId 更新法人库表ID,更新映射表对应的法人库ID
- func updateInfoId() {
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- //=======//
- // 执行原始 SQL 查询
- batchSize := 100
- page := 1
- for {
- log.Println("current page ", page)
- var entMapCodes []EntMapCode
- if err := db.Raw("SELECT * FROM ent_map_code where a_id = '' or b_id = '' ORDER BY create_time LIMIT ? OFFSET ? ", batchSize, (page-1)*batchSize).Scan(&entMapCodes).Error; err != nil {
- if err == gorm.ErrRecordNotFound {
- break
- }
- log.Fatal("failed to fetch data:", err)
- }
- // 处理查询到的数据
- for _, entMapCode := range entMapCodes {
- if entMapCode.AId == "" {
- AInfo := EntInfo{}
- db.Model(&EntInfo{}).Where("company_name = ? ", entMapCode.AName).Select("company_name", "id").First(&AInfo)
- if AInfo.ID != "" {
- update1 := map[string]interface{}{
- "a_id": AInfo.ID,
- }
- db.Model(&entMapCode).Where(&EntMapCode{AName: entMapCode.AName}).Updates(update1)
- }
- }
- if entMapCode.BId == "" {
- BInfo := EntInfo{}
- db.Model(&EntInfo{}).Where("company_name = ? ", entMapCode.BName).Select("company_name", "id").First(&BInfo)
- if BInfo.ID != "" {
- update2 := map[string]interface{}{
- "b_id": BInfo.ID,
- }
- db.Model(&entMapCode).Where(&EntMapCode{BName: entMapCode.BName}).Updates(update2)
- }
- }
- }
- // 如果查询结果为空,则退出循环
- if len(entMapCodes) < batchSize {
- break
- }
- page++
- // 释放查询结果的内存
- entMapCodes = nil
- }
- log.Println("迭代结束")
- }
- // dealErrData 处理 b_name=郑州仲裁委员会办公室 的错误数据。由于没有郑州市,导致拼接错误
- func dealErrData() {
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- batchSize := 10
- page := 1
- for {
- var entMapCodes []EntMapCode
- if err := db.Raw("SELECT * FROM ent_map_code where b_name = '郑州仲裁委员会办公室' ORDER BY create_time LIMIT ? OFFSET ? ", batchSize, (page-1)*batchSize).Scan(&entMapCodes).Error; err != nil {
- if err == gorm.ErrRecordNotFound {
- return
- }
- log.Fatal("failed to fetch data:", err)
- }
- for _, entMapCode := range entMapCodes {
- am := entMapCode.AName
- if am == "郑州市人民政府" {
- continue
- }
- bm := entMapCode.BName
- newBM := strings.ReplaceAll(am, "人民政府", "仲裁委员会办公室")
- newCode := entMapCode
- newCode.BName = newBM
- db.Where(&EntMapCode{AName: am, BName: bm}).Delete(&entMapCode)
- log.Println(am, newBM)
- BInfo := EntInfo{}
- db.Model(&EntInfo{}).Where("company_name = ? ", newBM).Select("company_name", "id").First(&BInfo)
- if BInfo.ID != "" {
- newCode.BId = BInfo.ID
- } else {
- newCode.BId = ""
- }
- db.Create(newCode)
- }
- // 如果查询结果为空,则退出循环
- if len(entMapCodes) < batchSize {
- break
- }
- page++
- // 释放查询结果的内存
- entMapCodes = nil
- }
- log.Println("over")
- }
|