123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- package ent_util
- import (
- "context"
- "log"
- "runtime"
- "time"
- elastic "app.yhyue.com/moapp/jybase/es"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
- qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- )
- var (
- SourceMgo, QyxyMgo *MongodbSim
- SpiMgo, ExtMgo *MongodbSim
- MysqlGlobalTool *Mysql
- SysConfig map[string]interface{}
- IsLocal bool
- ClickHouseConn driver.Conn
- EsClinet elastic.Es
- Es elastic.Es
- BuyerClassData = map[string]string{}
- RegionCodeData = map[string]string{}
- BitMapCode1 = map[string]int{}
- BitMapCode2 = map[string]int{}
- BitMapCode3 = map[string]int{}
- TimeLayout = "2006年01月02日"
- TimeLayout_New = "2006-01-02 15:04:05"
- Url = "https://www.jianyu360.cn/article/content/%s.html"
- )
- const (
- G_Units_Baseinfo = "dws_f_ent_baseinfo"
- G_Units_Contact = "dws_f_ent_contact"
- )
- func InitGlobalVar() {
- IsLocal = false
- qu.ReadConfig(&SysConfig) //加载配置文件
- initMgo()
- initMysql()
- initClickHouse()
- initVCode()
- initEs()
- }
- // 初始化mgo
- func initMgo() {
- mgo := *qu.ObjToMap(SysConfig["mongo"])
- b_mgo := *qu.ObjToMap(mgo["bid_mgo"])
- q_mgo := *qu.ObjToMap(mgo["qy_mgo"])
- s_mgo := *qu.ObjToMap(mgo["spi_mgo"])
- e_mgo := *qu.ObjToMap(mgo["ext_mgo"])
- SourceMgo = &MongodbSim{
- MongodbAddr: qu.ObjToString(b_mgo["addr"]),
- DbName: qu.ObjToString(b_mgo["dbname"]),
- Size: 20,
- UserName: qu.ObjToString(b_mgo["username"]),
- Password: qu.ObjToString(b_mgo["password"]),
- }
- if IsLocal {
- SourceMgo.InitPoolDirect()
- } else {
- SourceMgo.InitPool()
- }
- QyxyMgo = &MongodbSim{
- MongodbAddr: qu.ObjToString(q_mgo["addr"]),
- DbName: qu.ObjToString(q_mgo["dbname"]),
- Size: 20,
- UserName: qu.ObjToString(q_mgo["username"]),
- Password: qu.ObjToString(q_mgo["password"]),
- }
- if IsLocal {
- QyxyMgo.InitPoolDirect()
- } else {
- QyxyMgo.InitPool()
- }
- SpiMgo = &MongodbSim{
- MongodbAddr: qu.ObjToString(s_mgo["addr"]),
- DbName: qu.ObjToString(s_mgo["dbname"]),
- Size: 20,
- UserName: qu.ObjToString(s_mgo["username"]),
- Password: qu.ObjToString(s_mgo["password"]),
- }
- if IsLocal {
- SpiMgo.InitPoolDirect()
- } else {
- SpiMgo.InitPool()
- }
- ExtMgo = &MongodbSim{
- MongodbAddr: qu.ObjToString(e_mgo["addr"]),
- DbName: qu.ObjToString(e_mgo["dbname"]),
- Size: 20,
- UserName: qu.ObjToString(e_mgo["username"]),
- Password: qu.ObjToString(e_mgo["password"]),
- }
- if IsLocal {
- ExtMgo.InitPoolDirect()
- } else {
- ExtMgo.InitPool()
- }
- }
- func initMysql() {
- mysql := *qu.ObjToMap(SysConfig["mysql"])
- MysqlGlobalTool = &Mysql{
- Address: qu.ObjToString(mysql["addr"]),
- UserName: qu.ObjToString(mysql["username"]),
- PassWord: qu.ObjToString(mysql["password"]),
- DBName: qu.ObjToString(mysql["dbname"]),
- }
- MysqlGlobalTool.Init()
- }
- // 加载代码表
- func initVCode() {
- data_types := MysqlGlobalTool.Find("code_buyerclass", nil, "", "", -1, -1)
- //先构建所有一级数据
- for _, v := range *data_types {
- name := qu.ObjToString(v["name"])
- code := qu.ObjToString(v["code"])
- BuyerClassData[name] = code
- }
- BuyerClassData["其它"] = "00"
- log.Println("招标行业分类表~", len(BuyerClassData))
- data_regions := MysqlGlobalTool.Find("code_area", nil, "", "", -1, -1)
- //先构建所有一级数据
- for _, v := range *data_regions {
- area := qu.ObjToString(v["area"])
- city := qu.ObjToString(v["city"])
- district := qu.ObjToString(v["district"])
- code := qu.ObjToString(v["code"])
- key := area + "~" + city + "~" + district + "~"
- RegionCodeData[key] = code
- }
- log.Println("地域信息数量~", len(RegionCodeData))
- }
- // 创建clickhouse连接
- func initClickHouse() {
- ClickHouseConn, _ = connectClickhouse()
- getClickHouseCode()
- }
- func connectClickhouse() (driver.Conn, error) {
- ck := *qu.ObjToMap(SysConfig["clickhouse"])
- var (
- ctx = context.Background()
- conn, err = clickhouse.Open(&clickhouse.Options{
- Addr: []string{qu.ObjToString(ck["addr"])},
- DialTimeout: 10 * time.Second,
- MaxIdleConns: 3,
- MaxOpenConns: 30,
- Auth: clickhouse.Auth{
- Database: qu.ObjToString(ck["dbname"]),
- Username: qu.ObjToString(ck["username"]),
- Password: qu.ObjToString(ck["password"]),
- },
- Debugf: func(format string, v ...interface{}) {
- log.Println(format, v)
- },
- })
- )
- if err != nil {
- return nil, err
- }
- if err := conn.Ping(ctx); err != nil {
- if exception, ok := err.(*clickhouse.Exception); ok {
- log.Println("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
- }
- return nil, err
- }
- return conn, nil
- }
- func getClickHouseCode() {
- query := `SELECT label_type,label_name,bitmap_num FROM information.ent_label`
- rows, err := ClickHouseConn.Query(context.Background(), query)
- if err != nil {
- log.Println(err)
- }
- var (
- label_type int8
- label_name string
- bitmap_num int8
- )
- for rows.Next() {
- if err := rows.Scan(
- &label_type,
- &label_name,
- &bitmap_num,
- ); err != nil {
- log.Println(err)
- } else {
- if label_type == 1 {
- BitMapCode1[label_name] = qu.IntAll(bitmap_num)
- } else if label_type == 2 {
- BitMapCode2[label_name] = qu.IntAll(bitmap_num)
- } else if label_type == 3 {
- BitMapCode3[label_name] = qu.IntAll(bitmap_num)
- }
- }
- }
- log.Println("bitmap占位代码表数量~", len(BitMapCode1), len(BitMapCode2), len(BitMapCode3))
- }
- func initEs() {
- es := *qu.ObjToMap(SysConfig["es"])
- EsClinet = elastic.NewEs("07", qu.ObjToString(es["addr"]), 100, qu.ObjToString(es["username"]), qu.ObjToString(es["password"]))
- Es = elastic.NewEs("07", qu.ObjToString(es["addr"]), 100, qu.ObjToString(es["username"]), qu.ObjToString(es["password"]))
- }
- // ...
- func GetNewInfo(index, query string) map[string]interface{} {
- //log.Println("query -- ", query)
- esCon := elastic.VarEs.(*elastic.EsV7)
- client := esCon.GetEsConn()
- defer esCon.DestoryEsConn(client)
- res := map[string]interface{}{}
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Println("%v,%v\n", file, line)
- }
- }
- }()
- searchResult, err := client.Search().Index(index).Source(query).Do(context.TODO())
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return nil
- }
- if searchResult.Hits != nil {
- resNum := searchResult.Hits.TotalHits.Value
- res["total"] = resNum
- }
- if searchResult.Aggregations != nil {
- bidamount_sum, _ := searchResult.Aggregations.Sum("bidamount_sum")
- res["bidamount_sum"] = *bidamount_sum.Value
- }
- }
- return res
- }
|