init.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package ent_util
  2. import (
  3. "context"
  4. "runtime"
  5. "time"
  6. "log"
  7. "github.com/ClickHouse/clickhouse-go/v2"
  8. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  9. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  11. )
  12. var (
  13. SourceMgo, QyxyMgo *MongodbSim
  14. SpiMgo, ExtMgo *MongodbSim
  15. MysqlGlobalTool *Mysql
  16. SysConfig map[string]interface{}
  17. IsLocal bool
  18. ClickHouseConn driver.Conn
  19. EsClinet *elastic.Elastic
  20. BuyerClassData = map[string]string{}
  21. RegionCodeData = map[string]string{}
  22. TimeLayout = "2006年01月02日"
  23. TimeLayout_New = "2006-01-02 15:04:05"
  24. Url = "https://www.jianyu360.cn/article/content/%s.html"
  25. )
  26. const (
  27. G_Units_Baseinfo = "dws_f_ent_baseinfo"
  28. G_Units_Contact = "dws_f_ent_contact"
  29. )
  30. func InitGlobalVar() {
  31. IsLocal = true
  32. qu.ReadConfig(&SysConfig) //加载配置文件
  33. initMgo()
  34. initMysql()
  35. initVCode()
  36. initClickHouse()
  37. initEs()
  38. }
  39. // 初始化mgo
  40. func initMgo() {
  41. mgo := *qu.ObjToMap(SysConfig["mongo"])
  42. b_mgo := *qu.ObjToMap(mgo["bid_mgo"])
  43. q_mgo := *qu.ObjToMap(mgo["qy_mgo"])
  44. s_mgo := *qu.ObjToMap(mgo["spi_mgo"])
  45. e_mgo := *qu.ObjToMap(mgo["ext_mgo"])
  46. SourceMgo = &MongodbSim{
  47. MongodbAddr: qu.ObjToString(b_mgo["addr"]),
  48. DbName: qu.ObjToString(b_mgo["dbname"]),
  49. Size: 10,
  50. UserName: qu.ObjToString(b_mgo["username"]),
  51. Password: qu.ObjToString(b_mgo["password"]),
  52. }
  53. if IsLocal {
  54. SourceMgo.InitPoolDirect()
  55. } else {
  56. SourceMgo.InitPool()
  57. }
  58. QyxyMgo = &MongodbSim{
  59. MongodbAddr: qu.ObjToString(q_mgo["addr"]),
  60. DbName: qu.ObjToString(q_mgo["dbname"]),
  61. Size: 10,
  62. UserName: qu.ObjToString(q_mgo["username"]),
  63. Password: qu.ObjToString(q_mgo["password"]),
  64. }
  65. if IsLocal {
  66. QyxyMgo.InitPoolDirect()
  67. } else {
  68. QyxyMgo.InitPool()
  69. }
  70. SpiMgo = &MongodbSim{
  71. MongodbAddr: qu.ObjToString(s_mgo["addr"]),
  72. DbName: qu.ObjToString(s_mgo["dbname"]),
  73. Size: 10,
  74. UserName: qu.ObjToString(s_mgo["username"]),
  75. Password: qu.ObjToString(s_mgo["password"]),
  76. }
  77. if IsLocal {
  78. SpiMgo.InitPoolDirect()
  79. } else {
  80. SpiMgo.InitPool()
  81. }
  82. ExtMgo = &MongodbSim{
  83. MongodbAddr: qu.ObjToString(e_mgo["addr"]),
  84. DbName: qu.ObjToString(e_mgo["dbname"]),
  85. Size: 10,
  86. UserName: qu.ObjToString(e_mgo["username"]),
  87. Password: qu.ObjToString(e_mgo["password"]),
  88. }
  89. if IsLocal {
  90. ExtMgo.InitPoolDirect()
  91. } else {
  92. ExtMgo.InitPool()
  93. }
  94. }
  95. func initMysql() {
  96. mysql := *qu.ObjToMap(SysConfig["mysql"])
  97. MysqlGlobalTool = &Mysql{
  98. Address: qu.ObjToString(mysql["addr"]),
  99. UserName: qu.ObjToString(mysql["username"]),
  100. PassWord: qu.ObjToString(mysql["password"]),
  101. DBName: qu.ObjToString(mysql["dbname"]),
  102. }
  103. MysqlGlobalTool.Init()
  104. }
  105. // 加载代码表
  106. func initVCode() {
  107. data_types := MysqlGlobalTool.Find("code_buyerclass", nil, "", "", -1, -1)
  108. //先构建所有一级数据
  109. for _, v := range *data_types {
  110. name := qu.ObjToString(v["name"])
  111. code := qu.ObjToString(v["code"])
  112. BuyerClassData[name] = code
  113. }
  114. BuyerClassData["其它"] = "00"
  115. log.Println("招标行业分类表~", len(BuyerClassData))
  116. data_regions := MysqlGlobalTool.Find("code_area", nil, "", "", -1, -1)
  117. //先构建所有一级数据
  118. for _, v := range *data_regions {
  119. area := qu.ObjToString(v["area"])
  120. city := qu.ObjToString(v["city"])
  121. district := qu.ObjToString(v["district"])
  122. code := qu.ObjToString(v["code"])
  123. key := area + "~" + city + "~" + district + "~"
  124. RegionCodeData[key] = code
  125. }
  126. log.Println("地域信息数量~", len(RegionCodeData))
  127. }
  128. // 创建clickhouse连接
  129. func initClickHouse() {
  130. ClickHouseConn, _ = connectClickhouse()
  131. }
  132. func connectClickhouse() (driver.Conn, error) {
  133. ck := *qu.ObjToMap(SysConfig["clickhouse"])
  134. var (
  135. ctx = context.Background()
  136. conn, err = clickhouse.Open(&clickhouse.Options{
  137. Addr: []string{qu.ObjToString(ck["addr"])},
  138. DialTimeout: 10 * time.Second,
  139. MaxIdleConns: 3,
  140. MaxOpenConns: 30,
  141. Auth: clickhouse.Auth{
  142. Database: qu.ObjToString(ck["dbname"]),
  143. Username: qu.ObjToString(ck["username"]),
  144. Password: qu.ObjToString(ck["password"]),
  145. },
  146. Debugf: func(format string, v ...interface{}) {
  147. log.Println(format, v)
  148. },
  149. })
  150. )
  151. if err != nil {
  152. return nil, err
  153. }
  154. if err := conn.Ping(ctx); err != nil {
  155. if exception, ok := err.(*clickhouse.Exception); ok {
  156. log.Println("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
  157. }
  158. return nil, err
  159. }
  160. return conn, nil
  161. }
  162. func initEs() {
  163. es := *qu.ObjToMap(SysConfig["es"])
  164. EsClinet = &elastic.Elastic{
  165. S_esurl: qu.ObjToString(es["addr"]),
  166. I_size: 10,
  167. Username: qu.ObjToString(es["username"]),
  168. Password: qu.ObjToString(es["password"]),
  169. }
  170. EsClinet.InitElasticSize()
  171. }
  172. // ...
  173. func GetNewInfo(index, query string) map[string]interface{} {
  174. //log.Println("query -- ", query)
  175. client := EsClinet.GetEsConn()
  176. defer EsClinet.DestoryEsConn(client)
  177. res := map[string]interface{}{}
  178. if client != nil {
  179. defer func() {
  180. if r := recover(); r != nil {
  181. log.Println("[E]", r)
  182. for skip := 1; ; skip++ {
  183. _, file, line, ok := runtime.Caller(skip)
  184. if !ok {
  185. break
  186. }
  187. go log.Println("%v,%v\n", file, line)
  188. }
  189. }
  190. }()
  191. searchResult, err := client.Search().Index(index).Source(query).Do(context.TODO())
  192. if err != nil {
  193. log.Println("从ES查询出错", err.Error())
  194. return nil
  195. }
  196. if searchResult.Hits != nil {
  197. resNum := searchResult.Hits.TotalHits.Value
  198. res["total"] = resNum
  199. }
  200. if searchResult.Aggregations != nil {
  201. bidamount_sum, _ := searchResult.Aggregations.Sum("bidamount_sum")
  202. res["bidamount_sum"] = *bidamount_sum.Value
  203. }
  204. }
  205. return res
  206. }