package main import ( "encoding/json" "fmt" "github.com/xuri/excelize/v2" "go.uber.org/zap" utils "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "os" "strings" "time" ) var ( Sysconfig map[string]interface{} //配置文件 Mgo *mongodb.MongodbSim Mgo181 *mongodb.MongodbSim Dbname string Dbcoll string Es *elastic.Elastic Es2 *elastic.Elastic Index string //Itype string EsFields []string yangMap = make(map[string]bool) //存储98家央企 yangChildMap = make(map[string]bool) //存储央企 下属子公司 //Updatetime int64 localPort string // 本地监听端口 UdpClient udp.UdpClient updatePool = make(chan []map[string]interface{}, 20000) // 更新qyxy_std 国标行业分类 ) var EsSaveCache = make(chan map[string]interface{}, 5000) var SP = make(chan bool, 5) func init() { utils.ReadConfig(&Sysconfig) //utils.ReadConfig("test.json", &Sysconfig) Dbname = Sysconfig["dbname"].(string) // Dbcoll = Sysconfig["dbcoll"].(string) //qyxy_std //qyxy_std Mgo = &mongodb.MongodbSim{ MongodbAddr: Sysconfig["mgodb"].(string), Size: utils.IntAllDef(Sysconfig["dbsize"], 5), DbName: Dbname, UserName: Sysconfig["uname"].(string), Password: Sysconfig["upwd"].(string), } Mgo.InitPool() //181 if utils.ObjToString(Sysconfig["mgo181"]) != "" { Mgo181 = &mongodb.MongodbSim{ MongodbAddr: utils.ObjToString(Sysconfig["mgo181"]), //MongodbAddr: "127.0.0.1:27001", DbName: "mixdata", Size: 10, UserName: "", Password: "", //Direct: true, } Mgo181.InitPool() } //es econf := Sysconfig["elastic"].(map[string]interface{}) Index = econf["index"].(string) //Itype = econf["itype"].(string) Es = &elastic.Elastic{ S_esurl: econf["addr"].(string), I_size: utils.IntAllDef(econf["pool"], 12), Username: econf["username"].(string), Password: econf["password"].(string), } Es.InitElasticSize() //集群2 if utils.ObjToString(econf["addr2"]) != "" { Es2 = &elastic.Elastic{ S_esurl: econf["addr2"].(string), I_size: utils.IntAllDef(econf["pool"], 12), Username: econf["username2"].(string), Password: econf["password2"].(string), } Es2.InitElasticSize() } EsFields = utils.ObjArrToStringArr(econf["esfields"].([]interface{})) //Updatetime = utils.Int64All(Sysconfig["updatetime"]) localPort = Sysconfig["local_port"].(string) //udp 本地监听地址 UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024} InitLog() readXlsx() } func InitLog() { err := log.InitLog( log.Path("./logs/log.out"), log.Level("info"), log.Compress(true), log.MaxSize(10), log.MaxBackups(10), log.MaxAge(7), log.Format("json"), ) if err != nil { fmt.Printf("InitLog failed: %v\n", err) os.Exit(1) } } func main() { UdpClient.Listen(processUdpMsg) log.Info("main", zap.String("Udp服务监听", localPort)) //go StdAll() go SaveEs() go updateMethod() ch := make(chan bool, 1) <-ch } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { log.Info("processUdpMsg", zap.Any("Unmarshal err", err)) } log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo)) if mapInfo != nil { //相应UDP回答 key := utils.ObjToString(mapInfo["key"]) if key == "" { key = "udpok" } go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) } if tasktype, ok := mapInfo["stype"].(string); ok { switch tasktype { case "stdall": go StdAll() default: fmt.Println("tasktype", tasktype) } } else { //拿到同步信号,开始同步数据 if _, ok := mapInfo["start"]; ok { var start_time, end_time int64 if _, ok2 := mapInfo["start_time"]; ok2 { start_time = utils.Int64All(mapInfo["start_time"]) end_time = utils.Int64All(mapInfo["end_time"]) } var q map[string]interface{} if start_time > 0 { if end_time > 0 { q = map[string]interface{}{ "updatetime": map[string]interface{}{ "$gte": start_time, "$lte": end_time, }, } } else { q = map[string]interface{}{ "updatetime": map[string]interface{}{ "$gte": start_time, }, } } go StdAdd(q) //读取qyxy_std 数据,放入es 数组 } else { fmt.Println("参数 start_time 为0") } } } default: log.Info("processUdpMsg", zap.String("mapinfo", string(data))) } } // updateMethod 更新MongoDB func updateMethod() { updateSp := make(chan bool, 10) arru := make([][]map[string]interface{}, 500) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 500 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("qyxy_std", arru...) }(arru) arru = make([][]map[string]interface{}, 500) indexu = 0 } case <-time.After(100 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("qyxy_std", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 500) indexu = 0 } } } } // readXlsx 读取央企 func readXlsx() { filePath := "央企.xlsx" // 1. 读取 Excel(获取 A 列数据) f, err := excelize.OpenFile(filePath) if err != nil { log.Fatal("❌ 无法打开 Excel 文件:", zap.Error(err)) } defer f.Close() //读取央企 rows, err := f.GetRows("Sheet1") if err != nil { log.Fatal("❌ 无法读取 Sheet1:", zap.Error(err)) } for i := 1; i < len(rows); i++ { name := rows[i][0] if name != "" { yangMap[name] = true } } // 央企下属 rows2, err := f.GetRows("Sheet2") if err != nil { log.Fatal("❌ 无法读取 Sheet2:", zap.Error(err)) } for i := 1; i < len(rows2); i++ { name := rows2[i][1] if name != "" { yangChildMap[name] = true } } } // getCompanyType 获取公司类型;央企、国企、央企下属、事业单位、民企 func getCompanyType(name, ctype string) (company_type string) { if name == "" { return } if yangMap[name] { company_type = "央企" return } if yangChildMap[name] { company_type = "央企" return } if strings.Contains(ctype, "国有独资") || strings.Contains(ctype, "国有控股") || ctype == "全民所有制" || ctype == "集体所有制" || ctype == "全民所有制分支机构(非法人)" || ctype == "集体分支机构(非法人)" { company_type = "国企" return } company_type = "其他" return }