123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- package main
- import (
- "encoding/json"
- "fmt"
- "github.com/xuri/excelize/v2"
- "go.uber.org/zap"
- utils "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "os"
- "strings"
- "time"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- Mgo *mongodb.MongodbSim
- Mgo181 *mongodb.MongodbSim
- Dbname string
- Dbcoll string
- Es *elastic.Elastic
- Es2 *elastic.Elastic
- Index string
- //Itype string
- EsFields []string
- yangMap = make(map[string]bool) //存储98家央企
- yangChildMap = make(map[string]bool) //存储央企 下属子公司
- //Updatetime int64
- localPort string // 本地监听端口
- UdpClient udp.UdpClient
- updatePool = make(chan []map[string]interface{}, 20000) // 更新qyxy_std 国标行业分类
- )
- var EsSaveCache = make(chan map[string]interface{}, 5000)
- var SP = make(chan bool, 5)
- func init() {
- utils.ReadConfig(&Sysconfig)
- //utils.ReadConfig("test.json", &Sysconfig)
- Dbname = Sysconfig["dbname"].(string) //
- Dbcoll = Sysconfig["dbcoll"].(string) //qyxy_std
- //qyxy_std
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: Sysconfig["mgodb"].(string),
- Size: utils.IntAllDef(Sysconfig["dbsize"], 5),
- DbName: Dbname,
- UserName: Sysconfig["uname"].(string),
- Password: Sysconfig["upwd"].(string),
- }
- Mgo.InitPool()
- //181
- if utils.ObjToString(Sysconfig["mgo181"]) != "" {
- Mgo181 = &mongodb.MongodbSim{
- MongodbAddr: utils.ObjToString(Sysconfig["mgo181"]),
- //MongodbAddr: "127.0.0.1:27001",
- DbName: "mixdata",
- Size: 10,
- UserName: "",
- Password: "",
- //Direct: true,
- }
- Mgo181.InitPool()
- }
- //es
- econf := Sysconfig["elastic"].(map[string]interface{})
- Index = econf["index"].(string)
- //Itype = econf["itype"].(string)
- Es = &elastic.Elastic{
- S_esurl: econf["addr"].(string),
- I_size: utils.IntAllDef(econf["pool"], 12),
- Username: econf["username"].(string),
- Password: econf["password"].(string),
- }
- Es.InitElasticSize()
- //集群2
- if utils.ObjToString(econf["addr2"]) != "" {
- Es2 = &elastic.Elastic{
- S_esurl: econf["addr2"].(string),
- I_size: utils.IntAllDef(econf["pool"], 12),
- Username: econf["username2"].(string),
- Password: econf["password2"].(string),
- }
- Es2.InitElasticSize()
- }
- EsFields = utils.ObjArrToStringArr(econf["esfields"].([]interface{}))
- //Updatetime = utils.Int64All(Sysconfig["updatetime"])
- localPort = Sysconfig["local_port"].(string) //udp 本地监听地址
- UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
- InitLog()
- readXlsx()
- }
- func InitLog() {
- err := log.InitLog(
- log.Path("./logs/log.out"),
- log.Level("info"),
- log.Compress(true),
- log.MaxSize(10),
- log.MaxBackups(10),
- log.MaxAge(7),
- log.Format("json"),
- )
- if err != nil {
- fmt.Printf("InitLog failed: %v\n", err)
- os.Exit(1)
- }
- }
- func main() {
- UdpClient.Listen(processUdpMsg)
- log.Info("main", zap.String("Udp服务监听", localPort))
- //go StdAll()
- go SaveEs()
- go updateMethod()
- ch := make(chan bool, 1)
- <-ch
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
- }
- log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
- if mapInfo != nil {
- //相应UDP回答
- key := utils.ObjToString(mapInfo["key"])
- if key == "" {
- key = "udpok"
- }
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- }
- if tasktype, ok := mapInfo["stype"].(string); ok {
- switch tasktype {
- case "stdall":
- go StdAll()
- default:
- fmt.Println("tasktype", tasktype)
- }
- } else {
- //拿到同步信号,开始同步数据
- if _, ok := mapInfo["start"]; ok {
- var start_time, end_time int64
- if _, ok2 := mapInfo["start_time"]; ok2 {
- start_time = utils.Int64All(mapInfo["start_time"])
- end_time = utils.Int64All(mapInfo["end_time"])
- }
- var q map[string]interface{}
- if start_time > 0 {
- if end_time > 0 {
- q = map[string]interface{}{
- "updatetime": map[string]interface{}{
- "$gte": start_time,
- "$lte": end_time,
- },
- }
- } else {
- q = map[string]interface{}{
- "updatetime": map[string]interface{}{
- "$gte": start_time,
- },
- }
- }
- go StdAdd(q) //读取qyxy_std 数据,放入es 数组
- } else {
- fmt.Println("参数 start_time 为0")
- }
- }
- }
- default:
- log.Info("processUdpMsg", zap.String("mapinfo", string(data)))
- }
- }
- // updateMethod 更新MongoDB
- func updateMethod() {
- updateSp := make(chan bool, 10)
- arru := make([][]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpdateBulk("qyxy_std", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(100 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpdateBulk("qyxy_std", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
- // readXlsx 读取央企
- func readXlsx() {
- filePath := "央企.xlsx"
- // 1. 读取 Excel(获取 A 列数据)
- f, err := excelize.OpenFile(filePath)
- if err != nil {
- log.Fatal("❌ 无法打开 Excel 文件:", zap.Error(err))
- }
- defer f.Close()
- //读取央企
- rows, err := f.GetRows("Sheet1")
- if err != nil {
- log.Fatal("❌ 无法读取 Sheet1:", zap.Error(err))
- }
- for i := 1; i < len(rows); i++ {
- name := rows[i][0]
- if name != "" {
- yangMap[name] = true
- }
- }
- // 央企下属
- rows2, err := f.GetRows("Sheet2")
- if err != nil {
- log.Fatal("❌ 无法读取 Sheet2:", zap.Error(err))
- }
- for i := 1; i < len(rows2); i++ {
- name := rows2[i][1]
- if name != "" {
- yangChildMap[name] = true
- }
- }
- }
- // getCompanyType 获取公司类型;央企、国企、央企下属、事业单位、民企
- func getCompanyType(name, ctype string) (company_type string) {
- if name == "" {
- return
- }
- if yangMap[name] {
- company_type = "央企"
- return
- }
- if yangChildMap[name] {
- company_type = "央企"
- return
- }
- if strings.Contains(ctype, "国有独资") || strings.Contains(ctype, "国有控股") ||
- ctype == "全民所有制" || ctype == "集体所有制" || ctype == "全民所有制分支机构(非法人)" ||
- ctype == "集体分支机构(非法人)" {
- company_type = "国企"
- return
- }
- company_type = "其他"
- return
- }
|