init.go 6.6 KB


  1. package ent_util
  2. import (
  3. "context"
  4. "log"
  5. "runtime"
  6. "time"
  7. elastic "app.yhyue.com/moapp/jybase/es"
  8. "github.com/ClickHouse/clickhouse-go/v2"
  9. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  10. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. )
  12. var (
  13. SourceMgo, QyxyMgo *MongodbSim
  14. SpiMgo, ExtMgo *MongodbSim
  15. MysqlGlobalTool *Mysql
  16. SysConfig map[string]interface{}
  17. IsLocal bool
  18. ClickHouseConn driver.Conn
  19. EsClinet elastic.Es
  20. Es elastic.Es
  21. BuyerClassData = map[string]string{}
  22. RegionCodeData = map[string]string{}
  23. BitMapCode1 = map[string]int{}
  24. BitMapCode2 = map[string]int{}
  25. BitMapCode3 = map[string]int{}
  26. TimeLayout = "2006年01月02日"
  27. TimeLayout_New = "2006-01-02 15:04:05"
  28. Url = "https://www.jianyu360.cn/article/content/%s.html"
  29. )
  30. const (
  31. G_Units_Baseinfo = "dws_f_ent_baseinfo"
  32. G_Units_Contact = "dws_f_ent_contact"
  33. )
  34. func InitGlobalVar() {
  35. IsLocal = false
  36. qu.ReadConfig(&SysConfig) //加载配置文件
  37. initMgo()
  38. initMysql()
  39. initClickHouse()
  40. initVCode()
  41. initEs()
  42. }
  43. // 初始化mgo
  44. func initMgo() {
  45. mgo := *qu.ObjToMap(SysConfig["mongo"])
  46. b_mgo := *qu.ObjToMap(mgo["bid_mgo"])
  47. q_mgo := *qu.ObjToMap(mgo["qy_mgo"])
  48. s_mgo := *qu.ObjToMap(mgo["spi_mgo"])
  49. e_mgo := *qu.ObjToMap(mgo["ext_mgo"])
  50. SourceMgo = &MongodbSim{
  51. MongodbAddr: qu.ObjToString(b_mgo["addr"]),
  52. DbName: qu.ObjToString(b_mgo["dbname"]),
  53. Size: 20,
  54. UserName: qu.ObjToString(b_mgo["username"]),
  55. Password: qu.ObjToString(b_mgo["password"]),
  56. }
  57. if IsLocal {
  58. SourceMgo.InitPoolDirect()
  59. } else {
  60. SourceMgo.InitPool()
  61. }
  62. QyxyMgo = &MongodbSim{
  63. MongodbAddr: qu.ObjToString(q_mgo["addr"]),
  64. DbName: qu.ObjToString(q_mgo["dbname"]),
  65. Size: 20,
  66. UserName: qu.ObjToString(q_mgo["username"]),
  67. Password: qu.ObjToString(q_mgo["password"]),
  68. }
  69. if IsLocal {
  70. QyxyMgo.InitPoolDirect()
  71. } else {
  72. QyxyMgo.InitPool()
  73. }
  74. SpiMgo = &MongodbSim{
  75. MongodbAddr: qu.ObjToString(s_mgo["addr"]),
  76. DbName: qu.ObjToString(s_mgo["dbname"]),
  77. Size: 20,
  78. UserName: qu.ObjToString(s_mgo["username"]),
  79. Password: qu.ObjToString(s_mgo["password"]),
  80. }
  81. if IsLocal {
  82. SpiMgo.InitPoolDirect()
  83. } else {
  84. SpiMgo.InitPool()
  85. }
  86. ExtMgo = &MongodbSim{
  87. MongodbAddr: qu.ObjToString(e_mgo["addr"]),
  88. DbName: qu.ObjToString(e_mgo["dbname"]),
  89. Size: 20,
  90. UserName: qu.ObjToString(e_mgo["username"]),
  91. Password: qu.ObjToString(e_mgo["password"]),
  92. }
  93. if IsLocal {
  94. ExtMgo.InitPoolDirect()
  95. } else {
  96. ExtMgo.InitPool()
  97. }
  98. }
  99. func initMysql() {
  100. mysql := *qu.ObjToMap(SysConfig["mysql"])
  101. MysqlGlobalTool = &Mysql{
  102. Address: qu.ObjToString(mysql["addr"]),
  103. UserName: qu.ObjToString(mysql["username"]),
  104. PassWord: qu.ObjToString(mysql["password"]),
  105. DBName: qu.ObjToString(mysql["dbname"]),
  106. }
  107. MysqlGlobalTool.Init()
  108. }
  109. // 加载代码表
  110. func initVCode() {
  111. data_types := MysqlGlobalTool.Find("code_buyerclass", nil, "", "", -1, -1)
  112. //先构建所有一级数据
  113. for _, v := range *data_types {
  114. name := qu.ObjToString(v["name"])
  115. code := qu.ObjToString(v["code"])
  116. BuyerClassData[name] = code
  117. }
  118. BuyerClassData["其它"] = "00"
  119. log.Println("招标行业分类表~", len(BuyerClassData))
  120. data_regions := MysqlGlobalTool.Find("code_area", nil, "", "", -1, -1)
  121. //先构建所有一级数据
  122. for _, v := range *data_regions {
  123. area := qu.ObjToString(v["area"])
  124. city := qu.ObjToString(v["city"])
  125. district := qu.ObjToString(v["district"])
  126. code := qu.ObjToString(v["code"])
  127. key := area + "~" + city + "~" + district + "~"
  128. RegionCodeData[key] = code
  129. }
  130. log.Println("地域信息数量~", len(RegionCodeData))
  131. }
  132. // 创建clickhouse连接
  133. func initClickHouse() {
  134. ClickHouseConn, _ = connectClickhouse()
  135. getClickHouseCode()
  136. }
  137. func connectClickhouse() (driver.Conn, error) {
  138. ck := *qu.ObjToMap(SysConfig["clickhouse"])
  139. var (
  140. ctx = context.Background()
  141. conn, err = clickhouse.Open(&clickhouse.Options{
  142. Addr: []string{qu.ObjToString(ck["addr"])},
  143. DialTimeout: 10 * time.Second,
  144. MaxIdleConns: 3,
  145. MaxOpenConns: 30,
  146. Auth: clickhouse.Auth{
  147. Database: qu.ObjToString(ck["dbname"]),
  148. Username: qu.ObjToString(ck["username"]),
  149. Password: qu.ObjToString(ck["password"]),
  150. },
  151. Debugf: func(format string, v ...interface{}) {
  152. log.Println(format, v)
  153. },
  154. })
  155. )
  156. if err != nil {
  157. return nil, err
  158. }
  159. if err := conn.Ping(ctx); err != nil {
  160. if exception, ok := err.(*clickhouse.Exception); ok {
  161. log.Println("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
  162. }
  163. return nil, err
  164. }
  165. return conn, nil
  166. }
  167. func getClickHouseCode() {
  168. query := `SELECT label_type,label_name,bitmap_num FROM information.ent_label`
  169. rows, err := ClickHouseConn.Query(context.Background(), query)
  170. if err != nil {
  171. log.Println(err)
  172. }
  173. var (
  174. label_type int8
  175. label_name string
  176. bitmap_num int8
  177. )
  178. for rows.Next() {
  179. if err := rows.Scan(
  180. &label_type,
  181. &label_name,
  182. &bitmap_num,
  183. ); err != nil {
  184. log.Println(err)
  185. } else {
  186. if label_type == 1 {
  187. BitMapCode1[label_name] = qu.IntAll(bitmap_num)
  188. } else if label_type == 2 {
  189. BitMapCode2[label_name] = qu.IntAll(bitmap_num)
  190. } else if label_type == 3 {
  191. BitMapCode3[label_name] = qu.IntAll(bitmap_num)
  192. }
  193. }
  194. }
  195. log.Println("bitmap占位代码表数量~", len(BitMapCode1), len(BitMapCode2), len(BitMapCode3))
  196. }
  197. func initEs() {
  198. es := *qu.ObjToMap(SysConfig["es"])
  199. EsClinet = elastic.NewEs("07", qu.ObjToString(es["addr"]), 100, qu.ObjToString(es["username"]), qu.ObjToString(es["password"]))
  200. Es = elastic.NewEs("07", qu.ObjToString(es["addr"]), 100, qu.ObjToString(es["username"]), qu.ObjToString(es["password"]))
  201. }
  202. // ...
  203. func GetNewInfo(index, query string) map[string]interface{} {
  204. //log.Println("query -- ", query)
  205. esCon := elastic.VarEs.(*elastic.EsV7)
  206. client := esCon.GetEsConn()
  207. defer esCon.DestoryEsConn(client)
  208. res := map[string]interface{}{}
  209. if client != nil {
  210. defer func() {
  211. if r := recover(); r != nil {
  212. log.Println("[E]", r)
  213. for skip := 1; ; skip++ {
  214. _, file, line, ok := runtime.Caller(skip)
  215. if !ok {
  216. break
  217. }
  218. go log.Println("%v,%v\n", file, line)
  219. }
  220. }
  221. }()
  222. searchResult, err := client.Search().Index(index).Source(query).Do(context.TODO())
  223. if err != nil {
  224. log.Println("从ES查询出错", err.Error())
  225. return nil
  226. }
  227. if searchResult.Hits != nil {
  228. resNum := searchResult.Hits.TotalHits.Value
  229. res["total"] = resNum
  230. }
  231. if searchResult.Aggregations != nil {
  232. bidamount_sum, _ := searchResult.Aggregations.Sum("bidamount_sum")
  233. res["bidamount_sum"] = *bidamount_sum.Value
  234. }
  235. }
  236. return res
  237. }