package ent_util import ( "context" "log" "runtime" "time" elastic "app.yhyue.com/moapp/jybase/es" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" ) var ( SourceMgo, QyxyMgo *MongodbSim SpiMgo, ExtMgo *MongodbSim MysqlGlobalTool *Mysql SysConfig map[string]interface{} IsLocal bool ClickHouseConn driver.Conn EsClinet elastic.Es Es elastic.Es BuyerClassData = map[string]string{} RegionCodeData = map[string]string{} BitMapCode1 = map[string]int{} BitMapCode2 = map[string]int{} BitMapCode3 = map[string]int{} TimeLayout = "2006年01月02日" TimeLayout_New = "2006-01-02 15:04:05" Url = "https://www.jianyu360.cn/article/content/%s.html" ) const ( G_Units_Baseinfo = "dws_f_ent_baseinfo" G_Units_Contact = "dws_f_ent_contact" ) func InitGlobalVar() { IsLocal = false qu.ReadConfig(&SysConfig) //加载配置文件 //initMgo() initMysql() initClickHouse() //initVCode() //initEs() } // 初始化mgo func initMgo() { mgo := *qu.ObjToMap(SysConfig["mongo"]) b_mgo := *qu.ObjToMap(mgo["bid_mgo"]) q_mgo := *qu.ObjToMap(mgo["qy_mgo"]) s_mgo := *qu.ObjToMap(mgo["spi_mgo"]) e_mgo := *qu.ObjToMap(mgo["ext_mgo"]) SourceMgo = &MongodbSim{ MongodbAddr: qu.ObjToString(b_mgo["addr"]), DbName: qu.ObjToString(b_mgo["dbname"]), Size: 10, UserName: qu.ObjToString(b_mgo["username"]), Password: qu.ObjToString(b_mgo["password"]), } if IsLocal { SourceMgo.InitPoolDirect() } else { SourceMgo.InitPool() } QyxyMgo = &MongodbSim{ MongodbAddr: qu.ObjToString(q_mgo["addr"]), DbName: qu.ObjToString(q_mgo["dbname"]), Size: 10, UserName: qu.ObjToString(q_mgo["username"]), Password: qu.ObjToString(q_mgo["password"]), } if IsLocal { QyxyMgo.InitPoolDirect() } else { QyxyMgo.InitPool() } SpiMgo = &MongodbSim{ MongodbAddr: qu.ObjToString(s_mgo["addr"]), DbName: qu.ObjToString(s_mgo["dbname"]), Size: 10, UserName: qu.ObjToString(s_mgo["username"]), Password: qu.ObjToString(s_mgo["password"]), } if IsLocal { SpiMgo.InitPoolDirect() } else { SpiMgo.InitPool() } ExtMgo = &MongodbSim{ MongodbAddr: qu.ObjToString(e_mgo["addr"]), DbName: qu.ObjToString(e_mgo["dbname"]), Size: 10, UserName: qu.ObjToString(e_mgo["username"]), Password: qu.ObjToString(e_mgo["password"]), } if IsLocal { ExtMgo.InitPoolDirect() } else { ExtMgo.InitPool() } } func initMysql() { mysql := *qu.ObjToMap(SysConfig["mysql"]) MysqlGlobalTool = &Mysql{ Address: qu.ObjToString(mysql["addr"]), UserName: qu.ObjToString(mysql["username"]), PassWord: qu.ObjToString(mysql["password"]), DBName: qu.ObjToString(mysql["dbname"]), } MysqlGlobalTool.Init() } // 加载代码表 func initVCode() { data_types := MysqlGlobalTool.Find("code_buyerclass", nil, "", "", -1, -1) //先构建所有一级数据 for _, v := range *data_types { name := qu.ObjToString(v["name"]) code := qu.ObjToString(v["code"]) BuyerClassData[name] = code } BuyerClassData["其它"] = "00" log.Println("招标行业分类表~", len(BuyerClassData)) data_regions := MysqlGlobalTool.Find("code_area", nil, "", "", -1, -1) //先构建所有一级数据 for _, v := range *data_regions { area := qu.ObjToString(v["area"]) city := qu.ObjToString(v["city"]) district := qu.ObjToString(v["district"]) code := qu.ObjToString(v["code"]) key := area + "~" + city + "~" + district + "~" RegionCodeData[key] = code } log.Println("地域信息数量~", len(RegionCodeData)) } // 创建clickhouse连接 func initClickHouse() { ClickHouseConn, _ = connectClickhouse() getClickHouseCode() } func connectClickhouse() (driver.Conn, error) { ck := *qu.ObjToMap(SysConfig["clickhouse"]) var ( ctx = context.Background() conn, err = clickhouse.Open(&clickhouse.Options{ Addr: []string{qu.ObjToString(ck["addr"])}, DialTimeout: 10 * time.Second, MaxIdleConns: 3, MaxOpenConns: 30, Auth: clickhouse.Auth{ Database: qu.ObjToString(ck["dbname"]), Username: qu.ObjToString(ck["username"]), Password: qu.ObjToString(ck["password"]), }, Debugf: func(format string, v ...interface{}) { log.Println(format, v) }, }) ) if err != nil { return nil, err } if err := conn.Ping(ctx); err != nil { if exception, ok := err.(*clickhouse.Exception); ok { log.Println("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) } return nil, err } return conn, nil } func getClickHouseCode() { query := `SELECT label_type,label_name,bitmap_num FROM information.ent_label` rows, err := ClickHouseConn.Query(context.Background(), query) if err != nil { log.Println(err) } var ( label_type int8 label_name string bitmap_num int8 ) for rows.Next() { if err := rows.Scan( &label_type, &label_name, &bitmap_num, ); err != nil { log.Println(err) } else { if label_type == 1 { BitMapCode1[label_name] = qu.IntAll(bitmap_num) } else if label_type == 2 { BitMapCode2[label_name] = qu.IntAll(bitmap_num) } else if label_type == 3 { BitMapCode3[label_name] = qu.IntAll(bitmap_num) } } } log.Println("bitmap占位代码表数量~", len(BitMapCode1), len(BitMapCode2), len(BitMapCode3)) } func initEs() { es := *qu.ObjToMap(SysConfig["es"]) EsClinet = elastic.NewEs("07", qu.ObjToString(es["addr"]), 100, qu.ObjToString(es["username"]), qu.ObjToString(es["password"])) Es = elastic.NewEs("07", qu.ObjToString(es["addr"]), 100, qu.ObjToString(es["username"]), qu.ObjToString(es["password"])) } // ... func GetNewInfo(index, query string) map[string]interface{} { //log.Println("query -- ", query) esCon := elastic.VarEs.(*elastic.EsV7) client := esCon.GetEsConn() defer esCon.DestoryEsConn(client) res := map[string]interface{}{} if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Println("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Source(query).Do(context.TODO()) if err != nil { log.Println("从ES查询出错", err.Error()) return nil } if searchResult.Hits != nil { resNum := searchResult.Hits.TotalHits.Value res["total"] = resNum } if searchResult.Aggregations != nil { bidamount_sum, _ := searchResult.Aggregations.Sum("bidamount_sum") res["bidamount_sum"] = *bidamount_sum.Value } } return res }