package ent_util import ( "context" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" log "github.com/donnie4w/go-logger/logger" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "runtime" "time" ) var ( SourceMgo, QyxyMgo *MongodbSim SpiMgo, ExtMgo *MongodbSim MysqlGlobalTool *Mysql SysConfig map[string]interface{} IsLocal bool ClickHouseConn driver.Conn EsClinet *elastic.Elastic BuyerClassData = map[string]string{} RegionCodeData = map[string]string{} TimeLayout = "2006年01月02日" TimeLayout_New = "2006-01-02 15:04:05" Url = "https://www.jianyu360.cn/article/content/%s.html" ) const ( G_Units_Baseinfo = "dws_f_ent_baseinfo" G_Units_Contact = "dws_f_ent_contact" ) func InitGlobalVar() { IsLocal = true qu.ReadConfig(&SysConfig) //加载配置文件 initMgo() initMysql() initVCode() initClickHouse() initEs() } // 初始化mgo func initMgo() { mgo := *qu.ObjToMap(SysConfig["mongo"]) b_mgo := *qu.ObjToMap(mgo["bid_mgo"]) q_mgo := *qu.ObjToMap(mgo["qy_mgo"]) s_mgo := *qu.ObjToMap(mgo["spi_mgo"]) e_mgo := *qu.ObjToMap(mgo["ext_mgo"]) SourceMgo = &MongodbSim{ MongodbAddr: qu.ObjToString(b_mgo["addr"]), DbName: qu.ObjToString(b_mgo["dbname"]), Size: 10, UserName: qu.ObjToString(b_mgo["username"]), Password: qu.ObjToString(b_mgo["password"]), } if IsLocal { SourceMgo.InitPoolDirect() } else { SourceMgo.InitPool() } QyxyMgo = &MongodbSim{ MongodbAddr: qu.ObjToString(q_mgo["addr"]), DbName: qu.ObjToString(q_mgo["dbname"]), Size: 10, UserName: qu.ObjToString(q_mgo["username"]), Password: qu.ObjToString(q_mgo["password"]), } if IsLocal { QyxyMgo.InitPoolDirect() } else { QyxyMgo.InitPool() } SpiMgo = &MongodbSim{ MongodbAddr: qu.ObjToString(s_mgo["addr"]), DbName: qu.ObjToString(s_mgo["dbname"]), Size: 10, UserName: qu.ObjToString(s_mgo["username"]), Password: qu.ObjToString(s_mgo["password"]), } if IsLocal { SpiMgo.InitPoolDirect() } else { SpiMgo.InitPool() } ExtMgo = &MongodbSim{ MongodbAddr: qu.ObjToString(e_mgo["addr"]), DbName: qu.ObjToString(e_mgo["dbname"]), Size: 10, UserName: qu.ObjToString(e_mgo["username"]), Password: qu.ObjToString(e_mgo["password"]), } if IsLocal { ExtMgo.InitPoolDirect() } else { ExtMgo.InitPool() } } func initMysql() { mysql := *qu.ObjToMap(SysConfig["mysql"]) MysqlGlobalTool = &Mysql{ Address: qu.ObjToString(mysql["addr"]), UserName: qu.ObjToString(mysql["username"]), PassWord: qu.ObjToString(mysql["password"]), DBName: qu.ObjToString(mysql["dbname"]), } MysqlGlobalTool.Init() } // 加载代码表 func initVCode() { data_types := MysqlGlobalTool.Find("code_buyerclass", nil, "", "", -1, -1) //先构建所有一级数据 for _, v := range *data_types { name := qu.ObjToString(v["name"]) code := qu.ObjToString(v["code"]) BuyerClassData[name] = code } BuyerClassData["其它"] = "00" log.Debug("招标行业分类表~", len(BuyerClassData)) data_regions := MysqlGlobalTool.Find("code_area", nil, "", "", -1, -1) //先构建所有一级数据 for _, v := range *data_regions { area := qu.ObjToString(v["area"]) city := qu.ObjToString(v["city"]) district := qu.ObjToString(v["district"]) code := qu.ObjToString(v["code"]) key := area + "~" + city + "~" + district + "~" RegionCodeData[key] = code } log.Debug("地域信息数量~", len(RegionCodeData)) } // 创建clickhouse连接 func initClickHouse() { ClickHouseConn, _ = connectClickhouse() } func connectClickhouse() (driver.Conn, error) { ck := *qu.ObjToMap(SysConfig["clickhouse"]) var ( ctx = context.Background() conn, err = clickhouse.Open(&clickhouse.Options{ Addr: []string{qu.ObjToString(ck["addr"])}, DialTimeout: 10 * time.Second, MaxIdleConns: 3, MaxOpenConns: 30, Auth: clickhouse.Auth{ Database: qu.ObjToString(ck["dbname"]), Username: qu.ObjToString(ck["username"]), Password: qu.ObjToString(ck["password"]), }, Debugf: func(format string, v ...interface{}) { log.Debug(format, v) }, }) ) if err != nil { return nil, err } if err := conn.Ping(ctx); err != nil { if exception, ok := err.(*clickhouse.Exception); ok { log.Debug("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) } return nil, err } return conn, nil } func initEs() { es := *qu.ObjToMap(SysConfig["es"]) EsClinet = &elastic.Elastic{ S_esurl: qu.ObjToString(es["addr"]), I_size: 10, Username: qu.ObjToString(es["username"]), Password: qu.ObjToString(es["password"]), } EsClinet.InitElasticSize() } // ... func GetNewInfo(index, query string) map[string]interface{} { //log.Println("query -- ", query) client := EsClinet.GetEsConn() defer EsClinet.DestoryEsConn(client) res := map[string]interface{}{} if client != nil { defer func() { if r := recover(); r != nil { log.Debug("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Debug("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Source(query).Do(context.TODO()) if err != nil { log.Debug("从ES查询出错", err.Error()) return nil } if searchResult.Hits != nil { resNum := searchResult.Hits.TotalHits.Value res["total"] = resNum } if searchResult.Aggregations != nil { bidamount_sum, _ := searchResult.Aggregations.Sum("bidamount_sum") res["bidamount_sum"] = *bidamount_sum.Value } } return res }