increment.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/google/uuid"
  6. "go.uber.org/zap"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "strings"
  10. "time"
  11. )
  12. // dealIncQyData 处理凭安增量数据
  13. func dealIncQyData() {
  14. log.Info("dealIncQyData", zap.String("开始处理", "-------凭安增量数据"))
  15. defer util.Catch()
  16. sess := Mgo.GetMgoConn()
  17. defer Mgo.DestoryMongoConn(sess)
  18. //获取本周一的时间
  19. monday := getMondayOfCurrentWeek()
  20. where := map[string]interface{}{
  21. "updatetime": map[string]interface{}{
  22. "$gt": monday.Unix(),
  23. //"$gt": 1733068800,
  24. },
  25. }
  26. log.Info("dealIncQyData", zap.Any("where", where))
  27. //query := `INSERT INTO information.ent_info (id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) VALUES(?, ?, ?, bitmapBuild(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
  28. //1、处理qyxy_std 凭安增量数据
  29. count := 0
  30. it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(where).Limit(100).Select(nil).Iter()
  31. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  32. if count%1000 == 0 {
  33. log.Info("dealIncQyData", zap.Int("current:", count), zap.Any("company_name", tmp["company_name"]))
  34. }
  35. use_flag := util.IntAll(tmp["use_flag"])
  36. is_history := util.IntAll(tmp["is_history"])
  37. company_type := util.ObjToString(tmp["company_type"])
  38. company_name := util.ObjToString(tmp["company_name"])
  39. //
  40. if use_flag > 5 || is_history != 0 || company_name == "" || company_type == "个体工商户" {
  41. continue
  42. }
  43. ctx := context.Background()
  44. //1.判断是否存在
  45. query := `SELECT id FROM information.ent_info WHERE company_name = ?`
  46. rows, err := ClickHouseConn.Query(ctx, query, company_name)
  47. if err != nil {
  48. log.Info("dealIncQyData", zap.Error(err))
  49. }
  50. count2 := 0
  51. var id string
  52. for rows.Next() {
  53. if err = rows.Scan(
  54. &id,
  55. ); err != nil {
  56. log.Info("dealIncQyData", zap.Error(err))
  57. }
  58. count2++
  59. }
  60. //2.已经存在的法人库,直接诶跳过
  61. if count2 > 0 {
  62. continue
  63. }
  64. id = uuid.New().String()
  65. id = strings.ReplaceAll(id, "-", "")
  66. company_id := util.ObjToString(tmp["_id"])
  67. area := util.ObjToString(tmp["company_area"])
  68. city := util.ObjToString(tmp["company_city"])
  69. district := util.ObjToString(tmp["company_district"])
  70. area_code, city_code, district_code := CalculateRegionCode(area, city, district)
  71. company_address := util.ObjToString(tmp["company_address"])
  72. //获取Bitmap 和对应字符串
  73. company_label, company_label_str := getCompanyLabelBitmap(util.ObjToString(tmp["credit_no"]), company_name)
  74. company_code := util.ObjToString(tmp["company_code"])
  75. credit_no := util.ObjToString(tmp["credit_no"])
  76. org_code := util.ObjToString(tmp["org_code"])
  77. tax_code := util.ObjToString(tmp["tax_code"])
  78. establish_date := util.Int64All(tmp["establish_date"])
  79. legal_person := util.ObjToString(tmp["legal_person"])
  80. legal_person_caption := util.ObjToString(tmp["legal_person_caption"])
  81. company_status := util.ObjToString(tmp["company_status"])
  82. authority := util.ObjToString(tmp["authority"])
  83. issue_date := util.Int64All(tmp["issue_date"])
  84. operation_startdate := util.ObjToString(tmp["operation_startdate"])
  85. operation_enddate := util.ObjToString(tmp["operation_enddate"])
  86. capital := util.ObjToString(tmp["capital"])
  87. business_scope := util.ObjToString(tmp["business_scope"])
  88. comeintime := util.Int64All(tmp["comeintime"])
  89. updatetime := util.Int64All(tmp["updatetime"])
  90. legal_person_type := util.IntAll(tmp["legal_person_type"])
  91. real_capital := util.ObjToString(tmp["real_capital"])
  92. en_name := util.ObjToString(tmp["en_name"])
  93. list_code := util.ObjToString(tmp["list_code"])
  94. employee_no := util.IntAll(tmp["employee_no"])
  95. website := util.ObjToString(tmp["website"])
  96. company_phone := util.ObjToString(tmp["company_phone"])
  97. company_email := util.ObjToString(tmp["company_email"])
  98. //插入Clickhouse的数据
  99. query2 := `INSERT INTO information.ent_info (id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) VALUES(?, ?, ?, bitmapBuild(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
  100. err = ClickHouseConn.Exec(context.Background(), query2, id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email)
  101. if err != nil {
  102. log.Info("dealIncQyData", zap.String("company_name", company_name), zap.Error(err))
  103. } else {
  104. //保存到es
  105. data := map[string]interface{}{
  106. "_id": id,
  107. "id": id,
  108. "company_name": company_name,
  109. "company_id": company_id,
  110. "company_address": company_address,
  111. "area_code": area_code,
  112. "city_code": city_code,
  113. "district_code": district_code,
  114. "company_label": company_label_str,
  115. "company_code": company_code,
  116. "credit_no": credit_no,
  117. "org_code": org_code,
  118. "tax_code": tax_code,
  119. "establish_date": establish_date,
  120. "legal_person": legal_person,
  121. "legal_person_caption": legal_person_caption,
  122. "company_status": company_status,
  123. "company_type": company_type,
  124. "authority": authority,
  125. "issue_date": issue_date,
  126. "operation_startdate": operation_startdate,
  127. "operation_enddate": operation_enddate,
  128. "capital": capital,
  129. "business_scope": business_scope,
  130. "comeintime": comeintime,
  131. "updatetime": updatetime,
  132. "legal_person_type": legal_person_type,
  133. "real_capital": real_capital,
  134. "en_name": en_name,
  135. "list_code": list_code,
  136. "employee_no": employee_no,
  137. "website": website,
  138. "company_phone": company_phone,
  139. "company_email": company_email,
  140. }
  141. saveEsPool <- data
  142. log.Info("dealIncQyData", zap.String("name", company_name), zap.String("company_id", company_id))
  143. }
  144. }
  145. log.Info("dealIncQyData", zap.String("凭安增量数据", "-------处理完毕"))
  146. }
  147. // dealIncEntInfo 处理法人库增量数据
  148. func dealIncEntInfo() {
  149. log.Info("dealIncEntInfo", zap.String("开始处理标签", "--------"))
  150. now := time.Now()
  151. yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
  152. ctx := context.Background()
  153. // 批量处理数据
  154. batchSize := 100 // 每批处理的数据量
  155. offset := 0
  156. count := 0
  157. for {
  158. query := fmt.Sprintf(`
  159. SELECT id,company_name,credit_no,org_tags,bitmapToArray(company_label) labels FROM ent_info where updatetime >= %v LIMIT %d OFFSET %d
  160. `, yesterday.Unix(), batchSize, offset)
  161. rows, err := ClickHouseConn.Query(ctx, query)
  162. if err != nil {
  163. log.Info("dealIncEntInfo", zap.Error(err))
  164. }
  165. if !rows.Next() {
  166. break
  167. }
  168. for rows.Next() {
  169. count++
  170. var id, companyName, creditNo, orgTags string
  171. var addLabels = make([]uint64, 0)
  172. var oldLabels = make([]uint64, 0)
  173. if err = rows.Scan(&id, &companyName, &creditNo, &orgTags, &oldLabels); err != nil {
  174. log.Info("dealIncEntInfo", zap.Error(err))
  175. }
  176. if count%10000 == 0 {
  177. log.Info("dealIncEntInfo:", zap.Int("current", offset), zap.String("id", id), zap.String("companyName", companyName))
  178. }
  179. //1 dealCompanyNo
  180. tags1 := dealCompanyNo(creditNo)
  181. if len(tags1) > 0 {
  182. addLabels = append(addLabels, tags1...)
  183. }
  184. //2.处理 org_tags
  185. tags2 := dealOrgTags(orgTags)
  186. if len(tags2) > 0 {
  187. addLabels = append(addLabels, tags2...)
  188. }
  189. if len(addLabels) > 0 {
  190. // 构建 toUInt64 数组字符串
  191. toUInt64Array := buildToUInt64Array(addLabels)
  192. // SQL 动态生成
  193. sql := fmt.Sprintf(`
  194. ALTER TABLE information.ent_info
  195. UPDATE company_label = bitmapOr(company_label, bitmapBuild(%s))
  196. WHERE company_name = ?
  197. `, toUInt64Array)
  198. err = ClickHouseConn.Exec(context.Background(), sql, companyName)
  199. if err != nil {
  200. log.Info("dealIncEntInfo", zap.Error(err))
  201. }
  202. }
  203. //2.更新es
  204. var labelNames = make([]string, 0)
  205. oldLabels = append(oldLabels, addLabels...)
  206. for _, v := range oldLabels {
  207. if name, ok := entLabelMap[v]; ok {
  208. labelNames = append(labelNames, name)
  209. }
  210. }
  211. if len(labelNames) > 0 {
  212. labelNames = removeDuplicates(labelNames) //去重
  213. esUpdate := map[string]interface{}{
  214. "company_label": strings.Join(labelNames, ","),
  215. }
  216. updateEsPool <- []map[string]interface{}{
  217. {"_id": id},
  218. esUpdate,
  219. }
  220. }
  221. }
  222. offset += batchSize
  223. }
  224. log.Info("dealIncEntInfo", zap.Int("数据迭代完毕", offset))
  225. }