main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  6. "github.com/robfig/cron/v3"
  7. "go.uber.org/zap"
  8. "gorm.io/gorm"
  9. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "strings"
  13. "time"
  14. )
  15. var (
  16. GF GlobalConf
  17. MysqlDB *gorm.DB
  18. MgoQY *mongodb.MongodbSim //181凭安
  19. Mgo *mongodb.MongodbSim //标讯地址-凭安库mixdata
  20. Esa *es.Elastic
  21. Esb *es.Elastic
  22. ClickHouseConn driver.Conn
  23. entLabelMap = make(map[uint64]string)
  24. nameBitMap = make(map[string]uint64) //和上面k-v 反过来
  25. //更新es
  26. updateEsPool = make(chan []map[string]interface{}, 5000)
  27. updateEsSp = make(chan bool, 5) //保存协程
  28. RegionCodeData = map[string]string{}
  29. //保存到es
  30. saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
  31. saveEsSp = make(chan bool, 5)
  32. EsBulkSize = 50
  33. )
  34. func main() {
  35. //定时任务
  36. local, _ := time.LoadLocation("Asia/Shanghai")
  37. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  38. // 每天执行一次更新法人库信息
  39. _, err := c.AddFunc(GF.Env.Spec, dealIncDataEntInfo)
  40. if err != nil {
  41. jlog.Info("AddFunc dealIncData err", zap.Error(err))
  42. }
  43. //每周执行一次凭安数据
  44. _, err = c.AddFunc(GF.Env.Spec2, dealIncDataQY)
  45. if err != nil {
  46. jlog.Info("AddFunc dealIncDataQY err", zap.Error(err))
  47. }
  48. c.Start()
  49. defer c.Stop()
  50. //dealIncEntInfo()
  51. //dealIncData() //处理增量数据
  52. //dealAllData()// 处理存量数据
  53. select {}
  54. }
  55. // dealIncDataQY 处理凭安增量数据
  56. func dealIncDataQY() {
  57. go SaveEsMethod()
  58. go dealIncQyData() //处理凭安增量数据
  59. }
  60. // dealIncDataEntInfo 处理法人库增量数据
  61. func dealIncDataEntInfo() {
  62. go updateEsMethod()
  63. go dealIncEntInfo()
  64. }
  65. // dealAllData 处理法人库存量数据
  66. func dealAllData() {
  67. go updateEsMethod()
  68. go allUpdateBitmap2() //单个更新
  69. }
  70. // 处理存量法人库数据,单个更新bitmap;
  71. func allUpdateBitmap2() {
  72. jlog.Info("allUpdateBitmap2", zap.String("开始处理标签", "vvvvvvvvvvvvvv"))
  73. ctx := context.Background()
  74. // 批量处理数据
  75. batchSize := 100 // 每批处理的数据量
  76. offset := 0
  77. count := 0
  78. for {
  79. query := fmt.Sprintf(`
  80. SELECT id,company_name,credit_no,org_tags,bitmapToArray(company_label) labels FROM ent_info LIMIT %d OFFSET %d
  81. `, batchSize, offset)
  82. rows, err := ClickHouseConn.Query(ctx, query)
  83. if err != nil {
  84. jlog.Info("allUpdateBitmap2", zap.Error(err))
  85. }
  86. if !rows.Next() {
  87. break
  88. }
  89. for rows.Next() {
  90. count++
  91. var id, companyName, creditNo, orgTags string
  92. var addLabels = make([]uint64, 0)
  93. var oldLabels = make([]uint64, 0)
  94. if err = rows.Scan(&id, &companyName, &creditNo, &orgTags, &oldLabels); err != nil {
  95. jlog.Info("allUpdateBitmap2", zap.Error(err))
  96. }
  97. if count%10000 == 0 {
  98. jlog.Info("allUpdateBitmap2:", zap.Int("current", offset), zap.String("id", id), zap.String("companyName", companyName))
  99. }
  100. //1 dealCompanyNo
  101. tags1 := dealCompanyNo(creditNo)
  102. if len(tags1) > 0 {
  103. addLabels = append(addLabels, tags1...)
  104. }
  105. //2.处理 org_tags
  106. tags2 := dealOrgTags(orgTags)
  107. if len(tags2) > 0 {
  108. addLabels = append(addLabels, tags2...)
  109. }
  110. if len(addLabels) > 0 {
  111. // 构建 toUInt64 数组字符串
  112. toUInt64Array := buildToUInt64Array(addLabels)
  113. // SQL 动态生成
  114. sql := fmt.Sprintf(`
  115. ALTER TABLE information.ent_info
  116. UPDATE company_label = bitmapOr(company_label, bitmapBuild(%s))
  117. WHERE company_name = ?
  118. `, toUInt64Array)
  119. err = ClickHouseConn.Exec(context.Background(), sql, companyName)
  120. if err != nil {
  121. jlog.Info("allUpdateBitmap2", zap.Error(err))
  122. }
  123. }
  124. //2.更新es
  125. var labelNames = make([]string, 0)
  126. oldLabels = append(oldLabels, addLabels...)
  127. for _, v := range oldLabels {
  128. if name, ok := entLabelMap[v]; ok {
  129. labelNames = append(labelNames, name)
  130. }
  131. }
  132. if len(labelNames) > 0 {
  133. labelNames = removeDuplicates(labelNames) //去重
  134. esUpdate := map[string]interface{}{
  135. "company_label": strings.Join(labelNames, ","),
  136. }
  137. updateEsPool <- []map[string]interface{}{
  138. {"_id": id},
  139. esUpdate,
  140. }
  141. }
  142. }
  143. offset += batchSize
  144. }
  145. jlog.Info("allUpdateBitmap2", zap.Int("数据迭代完毕", offset))
  146. }
  147. // dealCompanyNo dealCompanyNo 统一信用代码
  148. func dealCompanyNo(companyNo string) (newTags []uint64) {
  149. // 前缀与标签映射表
  150. prefixTagMap := map[string]uint64{
  151. "11": 151, "12": 152, "13": 153, "19": 154,
  152. "21": 155, "29": 156, "31": 157, "32": 158,
  153. "33": 159, "34": 160, "35": 161, "39": 162,
  154. "41": 163, "49": 164, "51": 165, "52": 166,
  155. "53": 167, "59": 168, "61": 169, "62": 170,
  156. "69": 171, "71": 172, "72": 173, "79": 174,
  157. "81": 175, "89": 176, "91": 177, "92": 178,
  158. "93": 179, "A1": 180, "A2": 181, "N1": 182,
  159. "N2": 183, "N3": 184, "N9": 185, "Y1": 186,
  160. }
  161. // 遍历映射表进行前缀匹配
  162. for prefix, tag := range prefixTagMap {
  163. if strings.HasPrefix(companyNo, prefix) {
  164. newTags = append(newTags, tag)
  165. }
  166. }
  167. return
  168. }
  169. // dealOrgTags 处理国民经济行业分类
  170. func dealOrgTags(org_tags string) (newTags []uint64) {
  171. var categoryMap = map[string]uint64{
  172. "外交": 2193,
  173. "发展和改革": 2194,
  174. "科学技术/科技": 2195,
  175. "民族事务": 2196,
  176. "保密局": 2197,
  177. "国安局": 2198,
  178. "司法": 2199,
  179. "法院": 2200,
  180. "检察院": 2201,
  181. "人力资源和社会保障": 2202,
  182. "生态环境": 2203,
  183. "交通运输": 2204,
  184. "农业农村": 2205,
  185. "退役军人事务": 2206,
  186. "人民银行": 2207,
  187. "国防": 2208,
  188. "教育": 2209,
  189. "党校": 2210,
  190. "工业和信息化": 2211,
  191. "公安": 2212,
  192. "民政": 2213,
  193. "财政": 2214,
  194. "自然资源(包含规划)": 2215,
  195. "住建": 2216,
  196. "水利": 2217,
  197. "商务": 2218,
  198. "卫生健康": 2219,
  199. "应急管理": 2220,
  200. "审计": 2221,
  201. "国有资产监督管理": 2222,
  202. "海关": 2223,
  203. "市场监督": 2224,
  204. "证券监督管理": 2225,
  205. "体育": 2226,
  206. "统计": 2227,
  207. "国际发展合作": 2228,
  208. "税务": 2229,
  209. "金融": 2230,
  210. "广播电视": 2231,
  211. "信访": 2232,
  212. "知识产权": 2233,
  213. "医疗保障": 2234,
  214. "新华通讯社": 2235,
  215. "气象": 2236,
  216. "科学院": 2237,
  217. "工程院": 2238,
  218. "粮食和物资储备": 2239,
  219. "数据": 2240,
  220. "烟草专卖": 2241,
  221. "林业和草原": 2242,
  222. "民用航空": 2243,
  223. "文物": 2244,
  224. "疾病预防控制": 2245,
  225. "消防救援": 2246,
  226. "药品监督": 2247,
  227. "能源": 2248,
  228. "移民": 2249,
  229. "铁路": 2250,
  230. "邮政": 2251,
  231. "中医药": 2252,
  232. "外汇": 2253,
  233. "供销合作社": 2254,
  234. "公共资源交易中心": 2255,
  235. "监狱": 2256,
  236. "城乡建设": 2257,
  237. "文旅": 2258,
  238. "人民防空": 2259,
  239. "园林": 2260,
  240. "物流口岸": 2261,
  241. "大数据": 2262,
  242. "政务服务": 2263,
  243. "地方史志": 2264,
  244. "住房公积金管理中心": 2265,
  245. "仲裁": 2266,
  246. "招商": 2267,
  247. "社保中心": 2268,
  248. "管委会": 2269,
  249. "人民政府": 2270,
  250. "工商联": 2271,
  251. "残联": 2272,
  252. "妇联": 2273,
  253. "艺术联": 2274,
  254. "侨联": 2275,
  255. "台联": 2276,
  256. "城管": 2277,
  257. "编办": 2278,
  258. "政协": 2279,
  259. "民主党派": 2280,
  260. "党委": 2281,
  261. "团委": 2282,
  262. "人大": 2283,
  263. "档案局": 2284,
  264. "武装": 2285,
  265. "医院": 2286,
  266. "渔业": 2287,
  267. "学校": 2288,
  268. }
  269. orgTags := strings.TrimSpace(org_tags)
  270. if strings.Contains(orgTags, "-") {
  271. classifications := readFile() //读取国标行业分类
  272. if len(classifications) > 0 {
  273. ss := findEntCodesByLabel(orgTags, classifications)
  274. newTags = append(newTags, ss...)
  275. }
  276. } else {
  277. //其它单独标签
  278. for prefix, tag := range categoryMap {
  279. if orgTags == prefix {
  280. newTags = append(newTags, tag)
  281. }
  282. }
  283. }
  284. return
  285. }
  286. // 根据label返回对应的EntCode数组
  287. func findEntCodesByLabel(label string, classifications []IndustryClassification) []uint64 {
  288. // 将label分割成多个层级
  289. labels := strings.Split(label, "-")
  290. var result []uint64
  291. if len(labels) > 0 {
  292. for k, v := range labels {
  293. rs := findIndustryClassification(k+1, v, classifications)
  294. if rs != nil {
  295. result = append(result, rs.EntCode)
  296. }
  297. }
  298. }
  299. return result
  300. }
  301. // 根据level和name查找对应的IndustryClassification
  302. func findIndustryClassification(level int, name string, classifications []IndustryClassification) *IndustryClassification {
  303. // 遍历每个分类
  304. for _, classification := range classifications {
  305. // 如果当前分类的level和name匹配,返回当前分类
  306. if classification.Level == level && classification.Name == name {
  307. return &classification
  308. }
  309. // 如果当前分类有子分类,则递归查找子分类
  310. if len(classification.Children) > 0 {
  311. if result := findIndustryClassification(level, name, classification.Children); result != nil {
  312. return result
  313. }
  314. }
  315. }
  316. // 如果没有找到匹配的分类,返回nil
  317. return nil
  318. }
  319. // updateEsMethod 更新es
  320. func updateEsMethod() {
  321. arru := make([][]map[string]interface{}, 200)
  322. indexu := 0
  323. for {
  324. select {
  325. case v := <-updateEsPool:
  326. arru[indexu] = v
  327. indexu++
  328. if indexu == 200 {
  329. updateEsSp <- true
  330. go func(arru [][]map[string]interface{}) {
  331. defer func() {
  332. <-updateEsSp
  333. }()
  334. Esa.UpdateBulk(GF.Env.Esindex, arru...)
  335. }(arru)
  336. arru = make([][]map[string]interface{}, 200)
  337. indexu = 0
  338. }
  339. case <-time.After(1000 * time.Millisecond):
  340. if indexu > 0 {
  341. updateEsSp <- true
  342. go func(arru [][]map[string]interface{}) {
  343. defer func() {
  344. <-updateEsSp
  345. }()
  346. Esa.UpdateBulk(GF.Env.Esindex, arru...)
  347. }(arru[:indexu])
  348. arru = make([][]map[string]interface{}, 200)
  349. indexu = 0
  350. }
  351. }
  352. }
  353. }
  354. // SaveEsMethod 保存es
  355. func SaveEsMethod() {
  356. arru := make([]map[string]interface{}, EsBulkSize)
  357. indexu := 0
  358. for {
  359. select {
  360. case v := <-saveEsPool:
  361. arru[indexu] = v
  362. indexu++
  363. if indexu == EsBulkSize {
  364. saveEsSp <- true
  365. go func(arru []map[string]interface{}) {
  366. defer func() {
  367. <-saveEsSp
  368. }()
  369. Esa.BulkSave(GF.Env.Esindex, arru)
  370. }(arru)
  371. arru = make([]map[string]interface{}, EsBulkSize)
  372. indexu = 0
  373. }
  374. case <-time.After(1000 * time.Millisecond):
  375. if indexu > 0 {
  376. saveEsSp <- true
  377. go func(arru []map[string]interface{}) {
  378. defer func() {
  379. <-saveEsSp
  380. }()
  381. Esa.BulkSave(GF.Env.Esindex, arru)
  382. }(arru[:indexu])
  383. arru = make([]map[string]interface{}, EsBulkSize)
  384. indexu = 0
  385. }
  386. }
  387. }
  388. }