|
- package main
- import (
- "fmt"
- "gorm.io/driver/clickhouse"
- "gorm.io/gorm"
- "gorm.io/gorm/logger"
- "strings"
- "time"
- "unicode/utf8"
- //_ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
- //"github.com/gogf/gf/v2/frame/g"
- //"github.com/gogf/gf/v2/os/gctx"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "log"
- "net/url"
- )
- // investData 投资关系存量数据处理
- func investData() {
- where := map[string]interface{}{
- //"stock_type": "企业法人",
- "stock_type": map[string]interface{}{
- "$ne": "自然人股东",
- },
- //"use_flag": map[string]interface{}{
- // "$lt": 5,
- //},
- }
- //{stock_type:{$ne:"自然人股东"}}
- /**
- 找 company_partner ,stock_type不是自然人股东的数据,循环数据,判断stock_name 长度大于3
- 根据stack_name 到 qyxy_std 找 company_name 想等数据存在,存在就写入;增量数据直接查询法人库表
- */
- //Mgo := &mongodb.MongodbSim{
- // MongodbAddr: "172.17.4.181:27001",
- // //MongodbAddr: "127.0.0.1:27001",
- // DbName: "mixdata",
- // Size: 10,
- //}
- //
- //Mgo.InitPool()
- sess := MgoPA.GetMgoConn()
- defer MgoPA.DestoryMongoConn(sess)
- //正式环境
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- //测试环境
- //username := "jytop"
- //password := "pwdTopJy123"
- //host := "192.168.3.207:19000"
- // 本地环境
- //username := "wcc"
- //password := "123"
- //host := "localhost:9090"
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- ////dsn := "clickhouse://jybi_admin:Da#jy20230825@127.0.0.1:8123/default?dial_timeout=10s&read_timeout=20s"
- ////dsn := "clickhouse://jytop:pwdTopJy123@192.168.3.207:18123/information?dial_timeout=10s&read_timeout=20s"
- ////dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s"
- //dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s"
- //clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s
- //dsn := "clickhouse://jybi_admin:Da#jy20230825@localhost:18123/test?dial_timeout=10s&read_timeout=20s"
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- //res := make(map[string]interface{})
- //db.Table("information.information").Select([]string{"id", "endtime", "starttime"}).Find(&res, "datajson_id = ? ", "6350a552911e1eb345b55413")
- //log.Println(res)
- query := sess.DB("mixdata").C("company_partner").Find(where).Select(nil).Sort("update_time").Iter()
- count := 0
- //datas := make([]EntMapCode, 0)
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%100 == 0 {
- log.Println("current ---", count, tmp["stock_name"], tmp["update_time"])
- }
- if util.ObjToString(tmp["stock_type"]) == "自然人股东" {
- continue
- }
- if util.IntAll(tmp["use_flag"]) > 5 {
- continue
- }
- //历史数据,作废
- if util.IntAll(tmp["is_history"]) != 0 {
- continue
- }
- //股东名称 长度小于5,认为不是企业
- if utf8.RuneCountInString(util.ObjToString(tmp["stock_name"])) < 5 {
- continue
- }
- // 公司名称为空
- if util.ObjToString(tmp["stock_name"]) == "" || util.ObjToString(tmp["company_name"]) == "" {
- continue
- }
- //投资关系-0201,交易关系-0301,
- //管辖关系-0101,直属关系-0102,组成关系-0103
- data := EntMapCode{
- AName: util.ObjToString(tmp["stock_name"]),
- BName: util.ObjToString(tmp["company_name"]),
- Code: "0201",
- InvestRatio: util.ObjToString(tmp["stock_proportion"]),
- InvestPrice: util.ObjToString(tmp["stock_capital"]),
- CreateTime: time.Now().Unix(),
- UpdateTime: time.Now().Unix(),
- }
- if err = db.Create(data).Error; err != nil {
- log.Println("create err", err, data.AName)
- }
- }
- log.Printf("over")
- }
- // biddingAllData 交易关系数据处理
- func biddingAllData() {
- /**
- 查询bidding 数据中,采购单位和中标单位,就是交易关系
- */
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- //正式环境
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- //测试环境
- //username := "jytop"
- //password := "pwdTopJy123"
- //host := "192.168.3.207:19000"
- // 本地环境
- //username := "wcc"
- //password := "123"
- //host := "localhost:9090"
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- //db.Logger.LogMode(gor)
- //res := make(map[string]interface{})
- //db.Table("information.information").Select([]string{"id", "endtime", "starttime"}).Find(&res, "datajson_id = ? ", "6350a552911e1eb345b55413")
- //log.Println(res)
- query := sess.DB(GF.MongoB.DB).C("bidding").Find(nil).Select(nil).Sort("_id").Iter()
- count := 0
- //datas := make([]EntMapCode, 0)
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current ---", count, tmp["_id"])
- }
- if util.IntAll(tmp["extracttype"]) == -1 {
- continue
- }
- rsa, rsb := false, false
- // 1.判断规则打物业标签
- if tag_topinformation, ok := tmp["tag_topinformation"]; !ok {
- rsa = false
- } else {
- if tags, ok := tag_topinformation.([]interface{}); ok {
- for _, tag := range tags {
- tg := util.ObjToString(tag)
- if tg == "情报_物业" {
- rsa = true
- break
- }
- }
- }
- }
- // 2.判断人工智能打物业标签
- if tag_topinformation_ai, ok := tmp["tag_topinformation_ai"]; !ok {
- rsb = false
- } else {
- if tags, ok := tag_topinformation_ai.([]interface{}); ok {
- for _, tag := range tags {
- tg := util.ObjToString(tag)
- if tg == "情报_物业" {
- rsb = true
- break
- }
- }
- }
- }
- // 规则和人工智能,都没打上物业标签
- if !rsb && !rsa {
- continue
- }
- if util.ObjToString(tmp["buyer"]) == "" {
- continue
- }
- // 没有中标单位,直接跳过
- if sWinner, ok := tmp["s_winner"]; !ok {
- continue
- } else {
- //投资关系-0201,交易关系-0301,
- //管辖关系-0101,直属关系-0102,组成关系-0103
- winners := util.ObjToString(sWinner)
- wins := strings.Split(winners, ",")
- for _, winer := range wins {
- exist := EntMapCode{}
- db.Where(&EntMapCode{AName: util.ObjToString(tmp["buyer"]), BName: winer}).First(&exist)
- if exist.CreateTime == 0 {
- data := EntMapCode{
- AName: util.ObjToString(tmp["buyer"]),
- BName: winer,
- Code: "0301",
- CreateTime: time.Now().Unix(),
- UpdateTime: time.Now().Unix(),
- }
- if err = db.Create(data).Error; err != nil {
- log.Println("create err", err, data.AName, data.BName)
- }
- }
- }
- }
- }
- log.Printf("over")
- }
- // organizeData 管辖关系数据;special_enterprise
- func organizeData() {
- sess := MgoPA.GetMgoConn()
- defer MgoPA.DestoryMongoConn(sess)
- //正式环境
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- query := sess.DB("mixdata").C("special_enterprise").Find(nil).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%1000 == 0 {
- log.Println("current ---", count, tmp["company_name"], tmp["organizer"])
- }
- if util.ObjToString(tmp["company_status"]) != "正常" {
- continue
- }
- if util.IntAll(tmp["use_flag"]) != 0 {
- continue
- }
- // 公司名称为空
- if util.ObjToString(tmp["organizer"]) == "" || util.ObjToString(tmp["company_name"]) == "" {
- continue
- }
- //投资关系-0201,交易关系-0301,
- //管辖关系-0101,直属关系-0102,组成关系-0103
- data := EntMapCode{
- AName: util.ObjToString(tmp["organizer"]),
- BName: util.ObjToString(tmp["company_name"]),
- Code: "0101",
- CreateTime: time.Now().Unix(),
- UpdateTime: time.Now().Unix(),
- }
- if err = db.Create(data).Error; err != nil {
- log.Println("create err", err, data.AName)
- }
- }
- log.Printf("over")
- }
- // areaData 处理政府机关省市区单位
- func areaData() {
- //f, err := excelize.OpenFile("./管辖关系.xlsx")
- //if err != nil {
- // fmt.Println(err)
- // return
- //}
- //defer func() {
- // if err := f.Close(); err != nil {
- // fmt.Println(err)
- // }
- //}()
- //
- //rows, err := f.GetRows("区")
- //if err != nil {
- // fmt.Println(err)
- // return
- //}
- ////================================//
- //f2, err := excelize.OpenFile("./area.xlsx")
- //if err != nil {
- // fmt.Println(err)
- // return
- //}
- //defer func() {
- // if err := f2.Close(); err != nil {
- // fmt.Println(err)
- // }
- //}()
- //
- //rows2, err := f2.GetRows("区")
- //if err != nil {
- // fmt.Println(err)
- // return
- //}
- //
- //for i := 1; i < len(rows); i++ {
- // aname := rows[i][1]
- // bname := rows[i][2]
- // //a := strings.Replace(aname, "河南省", "", -1)
- // //a := strings.ReplaceAll(aname, "郑州市", "")
- // //b := strings.ReplaceAll(bname, "郑州市", "")
- // fmt.Println(aname, bname)
- // for j := 1; j < len(rows2); j++ {
- // //var newNamea, newNameb string
- // //if rows2[j][0] == "北京" || rows2[j][0] == "天津" || rows2[j][0] == "上海" || rows2[j][0] == "重庆" {
- // // //newNamea = rows2[j][0] + "市" + a
- // // //newNameb = rows2[j][0] + "市" + b
- // //} else {
- // // newNamea = rows2[j][0] + "省" + a
- // // newNameb = rows2[j][0] + "省" + b
- // //}
- // if strings.Contains(rows2[j][2], "县") {
- // continue
- // }
- //
- // newNamea := strings.ReplaceAll(aname, "郑州市金水区", rows2[j][1]+rows2[j][2])
- // newNameb := strings.ReplaceAll(bname, "郑州市金水区", rows2[j][1]+rows2[j][2])
- //
- // insert := map[string]interface{}{
- // "aname": newNamea,
- // "bname": newNameb,
- // "code": rows[i][4],
- // }
- // MgoB.Save("wcc_zhengfujigou_5", insert)
- // }
- //}
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- //正式环境
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- query := sess.DB(GF.MongoB.DB).C("wcc_zhengfujigou").Find(nil).Select(nil).Sort("_id").Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current ---", count, tmp["aname"], tmp["bname"])
- }
- //判断重复数据
- exist := EntMapCode{}
- db.Where(&EntMapCode{AName: util.ObjToString(tmp["aname"]), BName: util.ObjToString(tmp["bname"])}).First(&exist)
- if exist.CreateTime > 0 {
- continue
- }
- data := EntMapCode{
- AName: util.ObjToString(tmp["aname"]),
- BName: util.ObjToString(tmp["bname"]),
- CreateTime: time.Now().Unix(),
- UpdateTime: time.Now().Unix(),
- }
- //管辖关系-0101,直属关系-0102,组成关系-0103
- if util.ObjToString(tmp["code"]) == "管辖关系" {
- data.Code = "0101"
- } else if util.ObjToString(tmp["code"]) == "直属关系" {
- data.Code = "0102"
- } else if util.ObjToString(tmp["code"]) == "组成关系" {
- data.Code = "0103"
- }
- if err = db.Create(data).Error; err != nil {
- log.Println("create err", err, data.AName, data.BName)
- }
- }
- log.Println("over")
- }
- // updateInfoId 更新法人库表ID,更新映射表对应的法人库ID
- func updateInfoId() {
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- //=======//
- // 执行原始 SQL 查询
- batchSize := 100
- page := 1
- for {
- log.Println("current page ", page)
- var entMapCodes []EntMapCode
- if err := db.Raw("SELECT * FROM ent_map_code where a_id = '' or b_id = '' ORDER BY create_time LIMIT ? OFFSET ? ", batchSize, (page-1)*batchSize).Scan(&entMapCodes).Error; err != nil {
- if err == gorm.ErrRecordNotFound {
- break
- }
- log.Fatal("failed to fetch data:", err)
- }
- // 处理查询到的数据
- for _, entMapCode := range entMapCodes {
- if entMapCode.AId == "" {
- AInfo := EntInfo{}
- db.Model(&EntInfo{}).Where("company_name = ? ", entMapCode.AName).Select("company_name", "id").First(&AInfo)
- if AInfo.ID != "" {
- update1 := map[string]interface{}{
- "a_id": AInfo.ID,
- }
- db.Model(&entMapCode).Where(&EntMapCode{AName: entMapCode.AName}).Updates(update1)
- }
- }
- if entMapCode.BId == "" {
- BInfo := EntInfo{}
- db.Model(&EntInfo{}).Where("company_name = ? ", entMapCode.BName).Select("company_name", "id").First(&BInfo)
- if BInfo.ID != "" {
- update2 := map[string]interface{}{
- "b_id": BInfo.ID,
- }
- db.Model(&entMapCode).Where(&EntMapCode{BName: entMapCode.BName}).Updates(update2)
- }
- }
- }
- // 如果查询结果为空,则退出循环
- if len(entMapCodes) < batchSize {
- break
- }
- page++
- // 释放查询结果的内存
- entMapCodes = nil
- }
- log.Println("迭代结束")
- }
- // dealErrData 处理 b_name=郑州仲裁委员会办公室 的错误数据。由于没有郑州市,导致拼接错误
- func dealErrData() {
- username := GF.Clickhouse.Username
- password := GF.Clickhouse.Password
- host := GF.Clickhouse.Host
- encodedPassword := url.QueryEscape(password)
- dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
- db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
- Logger: logger.Default.LogMode(logger.Silent),
- })
- if err != nil {
- log.Fatal("打开数据库失败:", err)
- } else {
- log.Println("连接数据库成功", db.Name())
- }
- batchSize := 10
- page := 1
- for {
- var entMapCodes []EntMapCode
- if err := db.Raw("SELECT * FROM ent_map_code where b_name = '郑州仲裁委员会办公室' ORDER BY create_time LIMIT ? OFFSET ? ", batchSize, (page-1)*batchSize).Scan(&entMapCodes).Error; err != nil {
- if err == gorm.ErrRecordNotFound {
- return
- }
- log.Fatal("failed to fetch data:", err)
- }
- for _, entMapCode := range entMapCodes {
- am := entMapCode.AName
- if am == "郑州市人民政府" {
- continue
- }
- bm := entMapCode.BName
- newBM := strings.ReplaceAll(am, "人民政府", "仲裁委员会办公室")
- newCode := entMapCode
- newCode.BName = newBM
- db.Where(&EntMapCode{AName: am, BName: bm}).Delete(&entMapCode)
- log.Println(am, newBM)
- BInfo := EntInfo{}
- db.Model(&EntInfo{}).Where("company_name = ? ", newBM).Select("company_name", "id").First(&BInfo)
- if BInfo.ID != "" {
- newCode.BId = BInfo.ID
- } else {
- newCode.BId = ""
- }
- db.Create(newCode)
- }
- // 如果查询结果为空,则退出循环
- if len(entMapCodes) < batchSize {
- break
- }
- page++
- // 释放查询结果的内存
- entMapCodes = nil
- }
- log.Println("over")
- }
|