123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- package main
- import (
- "context"
- "fmt"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
- log "github.com/donnie4w/go-logger/logger"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "runtime"
- "time"
- )
- var (
- SourceMgo, QyxyMgo *MongodbSim
- SpiMgo, ExtMgo *MongodbSim
- MysqlGlobalTool *Mysql
- IsLocal, IsDev bool
- ClickHouseConn driver.Conn
- EsClinet *elastic.Elastic
- TimeLayout = "2006年01月02日"
- TimeLayout_New = "2006-01-02 15:04:05"
- Url = "https://www.jianyu360.cn/article/content/%s.html"
- )
- func InitGlobalVar() {
- IsLocal = true
- initMgo()
- //initMysql()
- //initClickHouse()
- //initEs()
- }
- func initEs() {
- url := "http://127.0.0.1:13003"
- if !IsLocal {
- url = "http://172.17.4.184:19805"
- }
- EsClinet = &elastic.Elastic{
- S_esurl: url,
- I_size: 10,
- Username: "es_all",
- Password: "TopJkO2E_d1x",
- }
- EsClinet.InitElasticSize()
- }
- func initClickHouse() {
- ClickHouseConn, _ = connectClickhouse()
- }
- // 初始化mgo
- // 初始化mgo
- func initMgo() {
- if IsLocal {
- SourceMgo = &MongodbSim{
- MongodbAddr: "127.0.0.1:12005",
- DbName: "qfw",
- Size: 10,
- UserName: "zhengkun",
- Password: "zk@123123",
- }
- SourceMgo.InitPoolDirect()
- QyxyMgo = &MongodbSim{
- MongodbAddr: "127.0.0.1:12005",
- DbName: "mixdata",
- Size: 10,
- UserName: "zhengkun",
- Password: "zk@123123",
- }
- QyxyMgo.InitPoolDirect()
- SpiMgo = &MongodbSim{
- MongodbAddr: "127.0.0.1:12004",
- DbName: "zhengkun",
- Size: 10,
- UserName: "",
- Password: "",
- }
- SpiMgo.InitPoolDirect()
- ExtMgo = &MongodbSim{
- MongodbAddr: "127.0.0.1:12001",
- DbName: "qfw",
- Size: 10,
- UserName: "",
- Password: "",
- }
- ExtMgo.InitPoolDirect()
- } else {
- SourceMgo = &MongodbSim{
- MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081",
- DbName: "qfw",
- Size: 10,
- UserName: "zhengkun",
- Password: "zk@123123",
- }
- SourceMgo.InitPool()
- QyxyMgo = &MongodbSim{
- MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081",
- DbName: "mixdata",
- Size: 10,
- UserName: "zhengkun",
- Password: "zk@123123",
- }
- QyxyMgo.InitPool()
- SpiMgo = &MongodbSim{
- MongodbAddr: "172.17.4.181:27001",
- DbName: "mixdata",
- Size: 10,
- UserName: "",
- Password: "",
- }
- SpiMgo.InitPool()
- ExtMgo = &MongodbSim{
- MongodbAddr: "172.17.4.85:27080",
- DbName: "qfw",
- Size: 10,
- UserName: "",
- Password: "",
- }
- ExtMgo.InitPool()
- }
- }
- func initMysql() {
- username, password := "zhengkun", "Zk#20220824"
- address := "127.0.0.1:15001"
- if !IsLocal {
- address = "172.17.4.242:4000"
- }
- if IsDev {
- username, password = "root", "=PDT49#80Z!RVv52_z"
- address = "192.168.3.217:4000"
- }
- MysqlGlobalTool = &Mysql{
- Address: address,
- UserName: username,
- PassWord: password,
- DBName: "global_common_data",
- }
- MysqlGlobalTool.Init()
- }
- // 创建clickhouse连接
- func connectClickhouse() (driver.Conn, error) {
- var (
- ctx = context.Background()
- /*
- 外网地址:cc-2ze9tv451wov14w9e.public.clickhouse.ads.aliyuncs.com:9000
- 内网地址:cc-2ze9tv451wov14w9e.clickhouse.ads.aliyuncs.com:9000
- */
- conn, err = clickhouse.Open(&clickhouse.Options{
- Addr: []string{"cc-2ze9tv451wov14w9e.clickhouse.ads.aliyuncs.com:9000"},
- DialTimeout: 10 * time.Second,
- MaxIdleConns: 3,
- MaxOpenConns: 30,
- Auth: clickhouse.Auth{
- Database: "information",
- Username: "biservice",
- Password: "Bi_top95215#",
- },
- Debugf: func(format string, v ...interface{}) {
- fmt.Printf(format, v)
- },
- })
- )
- if err != nil {
- return nil, err
- }
- if err := conn.Ping(ctx); err != nil {
- if exception, ok := err.(*clickhouse.Exception); ok {
- fmt.Printf("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
- }
- return nil, err
- }
- return conn, nil
- }
- // 暂时无用...
- func GetNewInfo(index, query string) map[string]interface{} {
- //log.Println("query -- ", query)
- client := EsClinet.GetEsConn()
- defer EsClinet.DestoryEsConn(client)
- res := map[string]interface{}{}
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Debug("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Debug("%v,%v\n", file, line)
- }
- }
- }()
- searchResult, err := client.Search().Index(index).Source(query).Do(context.TODO())
- if err != nil {
- log.Debug("从ES查询出错", err.Error())
- return nil
- }
- if searchResult.Hits != nil {
- resNum := searchResult.Hits.TotalHits.Value
- res["total"] = resNum
- }
- if searchResult.Aggregations != nil {
- bidamount_sum, _ := searchResult.Aggregations.Sum("bidamount_sum")
- res["bidamount_sum"] = *bidamount_sum.Value
- }
- }
- return res
- }
|