all.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/RoaringBitmap/roaring"
  12. "go.uber.org/zap"
  13. "gorm.io/gorm"
  14. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  15. jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  16. )
  17. // dealAllFromCompanyBase 从company_base 处理惬意数据存量
  18. func dealAllFromCompanyBase() {
  19. jlog.Info("dealAllFromCompanyBase", zap.String("开始处理", "-------企业库存量数据"))
  20. defer util.Catch()
  21. sess := MgoQY.GetMgoConn()
  22. defer MgoQY.DestoryMongoConn(sess)
  23. where := map[string]interface{}{
  24. "company_type": map[string]interface{}{
  25. "$ne": "个体工商户",
  26. },
  27. }
  28. count := 0
  29. batchSize := 100
  30. ents := make([]EntInfo, 0, batchSize)
  31. it := sess.DB(GF.MongoQy.DB).C("company_base").Find(where).Select(nil).Iter()
  32. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  33. if count%1000 == 0 {
  34. jlog.Info("dealAllFromCompanyBase", zap.Any("current:", count), zap.Any("company_name", tmp["company_name"]))
  35. }
  36. company_status := util.ObjToString(tmp["company_status"])
  37. if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") {
  38. continue
  39. }
  40. if util.IntAll(tmp["use_flag"]) > 0 {
  41. continue
  42. }
  43. if util.ObjToString(tmp["company_type"]) == "事业单位" {
  44. continue
  45. }
  46. var ent EntInfo
  47. ent.CompanyID = util.ObjToString(tmp["company_id"])
  48. ent.CompanyName = util.ObjToString(tmp["company_name"])
  49. ent.CompanyCode = util.ObjToString(tmp["company_code"])
  50. ent.CreditNo = util.ObjToString(tmp["credit_no"])
  51. ent.OrgCode = util.ObjToString(tmp["org_code"])
  52. ent.TaxCode = util.ObjToString(tmp["tax_code"])
  53. ent.EstablishDate = util.ObjToString(tmp["establish_date"])
  54. ent.LegalPerson = util.ObjToString(tmp["legal_person"])
  55. ent.LegalPersonCaption = util.ObjToString(tmp["legal_person_caption"])
  56. ent.CompanyStatus = util.ObjToString(tmp["company_status"])
  57. ent.CompanyType = util.ObjToString(tmp["company_type"])
  58. ent.Authority = util.ObjToString(tmp["authority"])
  59. ent.IssueDate = util.ObjToString(tmp["issue_date"])
  60. ent.OperationStartDate = util.ObjToString(tmp["operation_startdate"])
  61. ent.OperationEndDate = util.ObjToString(tmp["operation_enddate"])
  62. ent.Capital = util.ObjToString(tmp["capital"])
  63. ent.CompanyAddress = util.ObjToString(tmp["company_address"])
  64. ent.BusinessScope = util.ObjToString(tmp["business_scope"])
  65. ent.ComeInTime = time.Now().Unix()
  66. ent.UpdateTime = time.Now().Unix()
  67. ent.LegalPersonType = int8(util.IntAll(tmp["legal_person_type"]))
  68. ent.RealCapital = util.ObjToString(tmp["real_capital"])
  69. ent.EnName = util.ObjToString(tmp["en_name"])
  70. ent.ListCode = util.ObjToString(tmp["list_code"])
  71. //annual_reports
  72. std := getQyxyStd(util.ObjToString(tmp["company_name"]))
  73. if std != nil && len(std) > 0 {
  74. // 取出 annual_reports 字段
  75. reports, ok := std["annual_reports"].([]interface{})
  76. if ok {
  77. var maxYear float64
  78. var employeeNo string
  79. // 遍历 annual_reports 数组
  80. for i, r := range reports {
  81. if reportMap, ok := r.(map[string]interface{}); ok {
  82. year := util.Float64All(reportMap["report_year"])
  83. emp := util.ObjToString(reportMap["employee_no"])
  84. if i == 0 || year > maxYear {
  85. maxYear = year
  86. employeeNo = emp
  87. }
  88. }
  89. }
  90. if maxYear > 0 {
  91. ent.EmployeeNo = util.IntAll(employeeNo)
  92. }
  93. }
  94. }
  95. //
  96. ent.Website = util.ObjToString(tmp["website_url"])
  97. ent.CompanyPhone = util.ObjToString(tmp["company_phone"])
  98. ent.CompanyEmail = util.ObjToString(tmp["company_email"])
  99. //company_industry_tags
  100. whereIndustry := map[string]interface{}{
  101. "company_id": util.ObjToString(tmp["company_id"]),
  102. }
  103. indus, _ := MgoQY.FindOne("company_industry", whereIndustry)
  104. ent.CompanyIndustryTags = "{}" // 先给个默认值
  105. if indus != nil && len(*indus) > 0 {
  106. name_path := make([]string, 0)
  107. name_code := make([]string, 0)
  108. name_path = append(name_path, util.ObjToString((*indus)["industry_l1_name"]))
  109. name_path = append(name_path, util.ObjToString((*indus)["industry_l2_name"]))
  110. name_path = append(name_path, util.ObjToString((*indus)["industry_l3_name"]))
  111. name_path = append(name_path, util.ObjToString((*indus)["industry_l4_name"]))
  112. //
  113. name_code = append(name_code, util.ObjToString((*indus)["industry_l1_code"]))
  114. name_code = append(name_code, util.ObjToString((*indus)["industry_l2_code"]))
  115. name_code = append(name_code, util.ObjToString((*indus)["industry_l3_code"]))
  116. name_code = append(name_code, util.ObjToString((*indus)["industry_l4_code"]))
  117. industry := map[string]interface{}{
  118. "name_path": name_path,
  119. "code_path": name_code,
  120. }
  121. // map 转 JSON
  122. jsonBytes, _ := json.Marshal(industry)
  123. ent.CompanyIndustryTags = string(jsonBytes)
  124. }
  125. //
  126. area, city, district := util.ObjToString((std)["company_area"]), util.ObjToString((std)["company_city"]), util.ObjToString((std)["company_district"])
  127. area_code, city_code, district_code := CalculateRegionCode(area, city, district)
  128. ent.JYAreaCode = area_code
  129. ent.JYCityCode = city_code
  130. ent.JYDistrictCode = district_code
  131. //
  132. query := `
  133. SELECT bitmapToArray(company_label)
  134. FROM ent_info
  135. WHERE company_id = ?
  136. `
  137. var oldLabels = make([]uint64, 0)
  138. row := ClickHouseConn.QueryRow(context.Background(), query, ent.CompanyID)
  139. err := row.Scan(&oldLabels)
  140. if err != nil {
  141. if errors.Is(err, sql.ErrNoRows) {
  142. //jlog.Info("dealIncEntInfo: 没查到数据", zap.String("company_id", ent.CompanyID))
  143. } else {
  144. jlog.Info("dealIncEntInfo: 查询出错", zap.Error(err))
  145. }
  146. }
  147. // 转 RoaringBitmap
  148. rbm := roaring.NewBitmap()
  149. for _, v := range oldLabels {
  150. rbm.Add(uint32(v))
  151. }
  152. bin, _ := rbm.ToBytes()
  153. ent.JYCompanyLabel = bin
  154. ent.JYOrgTopType = "企业"
  155. company_type := util.ObjToString(tmp["company_type"])
  156. if info, ok := nameNorm[company_type]; ok {
  157. ent.JYCompanyTypeOriginCode = info.Code
  158. ent.JYCompanyTypeIsLeaf = 1
  159. ent.JYCompanyTypeLeafCode = info.Code
  160. ent.JYCompanyTypeLeafName = info.Name
  161. ent.JYCompanyTypeLeafTag = info.Tag
  162. ent.JYOrgPropertyOneTag = "工商"
  163. ent.JYOrgPropertyTwoTag = "企业"
  164. }
  165. //保存tidb
  166. //if err := MysqlDB.Create(&ent).Error; err != nil {
  167. // jlog.Info("insert failed: %v", zap.Error(err))
  168. //}
  169. ents = append(ents, ent)
  170. if len(ents) >= batchSize {
  171. if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
  172. jlog.Error("批量插入失败", zap.Error(err))
  173. }
  174. ents = ents[:0] // 清空 slice
  175. }
  176. }
  177. // 循环结束后如果还有数据
  178. if len(ents) > 0 {
  179. if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
  180. jlog.Error("批量插入失败", zap.Error(err))
  181. }
  182. }
  183. }
  184. // dealAllFromCompanyBase2 多协程批量数据
  185. func dealAllFromCompanyBase2() {
  186. jlog.Info("dealAllFromCompanyBase", zap.String("开始处理", "-------企业库存量数据"))
  187. defer util.Catch()
  188. sess := MgoQY.GetMgoConn()
  189. defer MgoQY.DestoryMongoConn(sess)
  190. where := map[string]interface{}{
  191. "company_type": map[string]interface{}{
  192. "$ne": "个体工商户",
  193. },
  194. }
  195. // channel 作为队列
  196. jobCh := make(chan map[string]interface{}, 1000) // 缓冲队列
  197. entCh := make(chan EntInfo, 1000) // 结果队列
  198. ctx, cancel := context.WithCancel(context.Background())
  199. defer cancel()
  200. // 启动 worker 处理数据
  201. workerNum := 6 // 并发度可调
  202. var wg sync.WaitGroup
  203. for i := 0; i < workerNum; i++ {
  204. wg.Add(1)
  205. go func() {
  206. defer wg.Done()
  207. for tmp := range jobCh {
  208. ent, ok := processCompany(tmp)
  209. if ok {
  210. entCh <- ent
  211. }
  212. }
  213. }()
  214. }
  215. // 启动一个写入 goroutine,专门负责批量写 DB
  216. go func() {
  217. batchSize := 100
  218. ents := make([]EntInfo, 0, batchSize)
  219. for ent := range entCh {
  220. ents = append(ents, ent)
  221. if len(ents) >= batchSize {
  222. if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
  223. jlog.Error("批量插入失败", zap.Error(err))
  224. }
  225. ents = ents[:0]
  226. }
  227. }
  228. // flush
  229. if len(ents) > 0 {
  230. if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
  231. jlog.Error("批量插入失败", zap.Error(err))
  232. }
  233. }
  234. }()
  235. // 主协程负责读 Mongo
  236. it := sess.DB(GF.MongoQy.DB).C("company_base").Find(where).Sort("_id").Iter()
  237. count := 0
  238. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  239. if count%1000 == 0 {
  240. jlog.Info("dealAllFromCompanyBase", zap.Any("current:", count), zap.Any("company_name", tmp["company_name"]))
  241. }
  242. select {
  243. case jobCh <- tmp:
  244. case <-ctx.Done():
  245. break
  246. }
  247. }
  248. close(jobCh) // 生产完毕
  249. wg.Wait() // 等所有 worker 结束
  250. close(entCh) // 再关掉结果通道,通知写入 goroutine flush 完成
  251. }
  252. // processCompany 处理单条 company_base 数据,生成 EntInfo
  253. func processCompany(tmp map[string]interface{}) (EntInfo, bool) {
  254. // 过滤条件
  255. company_status := util.ObjToString(tmp["company_status"])
  256. if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") {
  257. return EntInfo{}, false
  258. }
  259. if util.IntAll(tmp["use_flag"]) > 0 {
  260. return EntInfo{}, false
  261. }
  262. if util.ObjToString(tmp["company_type"]) == "事业单位" {
  263. return EntInfo{}, false
  264. }
  265. var ent EntInfo
  266. ent.CompanyID = util.ObjToString(tmp["company_id"])
  267. ent.CompanyName = util.ObjToString(tmp["company_name"])
  268. ent.CompanyCode = util.ObjToString(tmp["company_code"])
  269. ent.CreditNo = util.ObjToString(tmp["credit_no"])
  270. ent.OrgCode = util.ObjToString(tmp["org_code"])
  271. ent.TaxCode = util.ObjToString(tmp["tax_code"])
  272. ent.EstablishDate = util.ObjToString(tmp["establish_date"])
  273. ent.LegalPerson = util.ObjToString(tmp["legal_person"])
  274. ent.LegalPersonCaption = util.ObjToString(tmp["legal_person_caption"])
  275. ent.CompanyStatus = company_status
  276. ent.CompanyType = util.ObjToString(tmp["company_type"])
  277. ent.Authority = util.ObjToString(tmp["authority"])
  278. ent.IssueDate = util.ObjToString(tmp["issue_date"])
  279. ent.OperationStartDate = util.ObjToString(tmp["operation_startdate"])
  280. ent.OperationEndDate = util.ObjToString(tmp["operation_enddate"])
  281. ent.Capital = util.ObjToString(tmp["capital"])
  282. ent.CompanyAddress = util.ObjToString(tmp["company_address"])
  283. ent.BusinessScope = util.ObjToString(tmp["business_scope"])
  284. ent.ComeInTime = time.Now().Unix()
  285. ent.UpdateTime = time.Now().Unix()
  286. ent.LegalPersonType = int8(util.IntAll(tmp["legal_person_type"]))
  287. ent.RealCapital = util.ObjToString(tmp["real_capital"])
  288. ent.EnName = util.ObjToString(tmp["en_name"])
  289. ent.ListCode = util.ObjToString(tmp["list_code"])
  290. // annual_reports
  291. std := getQyxyStd(util.ObjToString(tmp["company_name"]))
  292. if std != nil && len(std) > 0 {
  293. reports, ok := std["annual_reports"].([]interface{})
  294. if ok {
  295. var maxYear float64
  296. var employeeNo string
  297. for i, r := range reports {
  298. if reportMap, ok := r.(map[string]interface{}); ok {
  299. year := util.Float64All(reportMap["report_year"])
  300. emp := util.ObjToString(reportMap["employee_no"])
  301. if i == 0 || year > maxYear {
  302. maxYear = year
  303. employeeNo = emp
  304. }
  305. }
  306. }
  307. if maxYear > 0 {
  308. ent.EmployeeNo = util.IntAll(employeeNo)
  309. }
  310. }
  311. }
  312. ent.Website = util.ObjToString(tmp["website_url"])
  313. ent.CompanyPhone = util.ObjToString(tmp["company_phone"])
  314. ent.CompanyEmail = util.ObjToString(tmp["company_email"])
  315. // company_industry_tags
  316. whereIndustry := map[string]interface{}{
  317. "company_id": util.ObjToString(tmp["company_id"]),
  318. }
  319. indus, _ := MgoQY.FindOne("company_industry", whereIndustry)
  320. ent.CompanyIndustryTags = "{}"
  321. if indus != nil && len(*indus) > 0 {
  322. name_path := []string{
  323. util.ObjToString((*indus)["industry_l1_name"]),
  324. util.ObjToString((*indus)["industry_l2_name"]),
  325. util.ObjToString((*indus)["industry_l3_name"]),
  326. util.ObjToString((*indus)["industry_l4_name"]),
  327. }
  328. name_code := []string{
  329. util.ObjToString((*indus)["industry_l1_code"]),
  330. util.ObjToString((*indus)["industry_l2_code"]),
  331. util.ObjToString((*indus)["industry_l3_code"]),
  332. util.ObjToString((*indus)["industry_l4_code"]),
  333. }
  334. industry := map[string]interface{}{
  335. "name_path": name_path,
  336. "code_path": name_code,
  337. }
  338. jsonBytes, _ := json.Marshal(industry)
  339. ent.CompanyIndustryTags = string(jsonBytes)
  340. }
  341. // 行政区划编码
  342. area, city, district := util.ObjToString((std)["company_area"]), util.ObjToString((std)["company_city"]), util.ObjToString((std)["company_district"])
  343. area_code, city_code, district_code := CalculateRegionCode(area, city, district)
  344. ent.JYAreaCode = area_code
  345. ent.JYCityCode = city_code
  346. ent.JYDistrictCode = district_code
  347. // ClickHouse 查询历史标签
  348. query := `SELECT bitmapToArray(company_label) FROM ent_info WHERE company_id = ?`
  349. var oldLabels = make([]uint64, 0)
  350. row := ClickHouseConn.QueryRow(context.Background(), query, ent.CompanyID)
  351. err := row.Scan(&oldLabels)
  352. if err != nil && !errors.Is(err, sql.ErrNoRows) {
  353. jlog.Info("dealIncEntInfo: 查询出错", zap.Error(err))
  354. }
  355. rbm := roaring.NewBitmap()
  356. for _, v := range oldLabels {
  357. rbm.Add(uint32(v))
  358. }
  359. bin, _ := rbm.ToBytes()
  360. ent.JYCompanyLabel = bin
  361. ent.JYOrgTopType = "企业"
  362. company_type := util.ObjToString(tmp["company_type"])
  363. if info, ok := nameNorm[company_type]; ok {
  364. ent.JYCompanyTypeOriginCode = info.Code
  365. ent.JYCompanyTypeIsLeaf = 1
  366. ent.JYCompanyTypeLeafCode = info.Code
  367. ent.JYCompanyTypeLeafName = info.Name
  368. ent.JYCompanyTypeLeafTag = info.Tag
  369. ent.JYOrgPropertyOneTag = "工商"
  370. ent.JYOrgPropertyTwoTag = "企业"
  371. ent.JYOrgPropertyThreeTag = info.Tag2
  372. }
  373. return ent, true
  374. }
  375. // dealLeaf 处理存量非叶子节点的企业数据标签
  376. func dealLeaf() {
  377. const batchSize = 50
  378. lastID := uint64(0)
  379. for {
  380. var companies []EntInfo
  381. // 分批查询
  382. if err := MysqlDB.Model(&EntInfo{}).
  383. Select("id, company_name, credit_no, company_type, jy_company_type_is_leaf").
  384. Where("jy_company_type_is_leaf = ?", 0).
  385. Order("id ASC").
  386. Limit(batchSize).
  387. Find(&companies).Error; err != nil {
  388. panic(err)
  389. }
  390. if len(companies) == 0 {
  391. fmt.Println("处理完成 ✅")
  392. break
  393. }
  394. if lastID%1000 == 0 {
  395. jlog.Info("dealLeaf", zap.Any("lastID", lastID), zap.Any("id", companies[0].ID))
  396. }
  397. // 只存储有变化的公司
  398. updates := make(map[uint64]map[string]interface{})
  399. for i := range companies {
  400. if companies[i].JYCompanyTypeIsLeaf == 1 {
  401. continue
  402. }
  403. company_name := util.ObjToString(companies[i].CompanyName)
  404. top_names := getTopNames(company_name)
  405. for _, top_name := range top_names {
  406. topwhere := map[string]interface{}{
  407. "use_flag": 0,
  408. "company_status": map[string]interface{}{
  409. "$nin": []string{"注销", "吊销", "吊销,已注销"},
  410. },
  411. "company_name": top_name,
  412. }
  413. top_bases, _ := MgoQY.FindOne("company_base", topwhere)
  414. if top_bases != nil && len(*top_bases) > 0 {
  415. //获取上级企业类型
  416. top_company_type := util.ObjToString((*top_bases)["company_type"])
  417. if norm_info, ok := nameNorm[top_company_type]; ok {
  418. // 这里判断:如果已有字段不一样,才算变更
  419. if companies[i].JYCompanyTypeLeafCode != norm_info.Code ||
  420. companies[i].JYCompanyTypeLeafName != norm_info.Name ||
  421. companies[i].JYCompanyTypeLeafTag != norm_info.Tag ||
  422. companies[i].JYOrgPropertyThreeTag != norm_info.Tag2 {
  423. updates[companies[i].ID] = map[string]interface{}{
  424. "jy_company_type_leaf_code": norm_info.Code,
  425. "jy_company_type_leaf_name": norm_info.Name,
  426. "jy_company_type_leaf_tag": norm_info.Tag,
  427. "jy_org_property_three_tag": norm_info.Tag2,
  428. }
  429. }
  430. break
  431. }
  432. } else {
  433. // 去其他库查
  434. where2 := map[string]interface{}{"company_name": top_name}
  435. enterprise, _ := MgoQY.FindOne("special_enterprise", where2)
  436. if enterprise != nil && len(*enterprise) > 0 {
  437. if companies[i].JYOrgPropertyThreeTag != "国企" {
  438. updates[companies[i].ID] = map[string]interface{}{
  439. "jy_org_property_three_tag": "国企",
  440. }
  441. }
  442. break
  443. } else {
  444. gov, _ := MgoQY.FindOne("special_gov_unit", where2)
  445. if gov != nil && len(*gov) > 0 {
  446. if companies[i].JYOrgPropertyThreeTag != "国企" {
  447. updates[companies[i].ID] = map[string]interface{}{
  448. "jy_org_property_three_tag": "国企",
  449. }
  450. }
  451. break
  452. }
  453. }
  454. }
  455. }
  456. }
  457. // 批量更新 (只更新有变化的)
  458. if len(updates) > 0 {
  459. if err := batchUpdateFields(MysqlDB, (EntInfo{}).TableName(), updates); err != nil {
  460. panic(err)
  461. }
  462. }
  463. // 更新游标
  464. lastID = companies[len(companies)-1].ID
  465. }
  466. }
  467. // 允许更新的字段白名单(非常重要,防注入)
  468. var allowedColumns = map[string]struct{}{
  469. "jy_company_type_leaf_code": {},
  470. "jy_company_type_leaf_name": {},
  471. "jy_company_type_leaf_tag": {},
  472. "jy_org_property_three_tag": {},
  473. // 需要的话继续补充其它允许批量更新的字段
  474. }
  475. // batchUpdateFields 批量更新
  476. func batchUpdateFields(db *gorm.DB, tableName string, updates map[uint64]map[string]interface{}) error {
  477. if len(updates) == 0 {
  478. return nil
  479. }
  480. // 1) 收集字段(按白名单过滤)
  481. fieldSet := make(map[string]struct{})
  482. for _, m := range updates {
  483. for col := range m {
  484. if _, ok := allowedColumns[col]; ok {
  485. fieldSet[col] = struct{}{}
  486. }
  487. }
  488. }
  489. if len(fieldSet) == 0 {
  490. return nil
  491. }
  492. fields := make([]string, 0, len(fieldSet))
  493. for col := range fieldSet {
  494. fields = append(fields, col)
  495. }
  496. // 2) 构造 CASE 语句和参数
  497. cases := make([]string, 0, len(fields))
  498. args := make([]interface{}, 0, len(updates)*len(fields)*2)
  499. idSet := make(map[uint64]struct{}, len(updates))
  500. for _, field := range fields {
  501. var sb strings.Builder
  502. sb.WriteString(field)
  503. sb.WriteString(" = CASE id ")
  504. hasWhen := false
  505. for id, m := range updates {
  506. if val, ok := m[field]; ok {
  507. sb.WriteString("WHEN ? THEN ? ")
  508. args = append(args, id, val)
  509. idSet[id] = struct{}{}
  510. hasWhen = true
  511. }
  512. }
  513. // 如果这个字段对所有 id 都没有需要更新的值,就跳过它
  514. if !hasWhen {
  515. continue
  516. }
  517. sb.WriteString("ELSE ")
  518. sb.WriteString(field)
  519. sb.WriteString(" END")
  520. cases = append(cases, sb.String())
  521. }
  522. // 如果所有字段都被跳过了(比如全被白名单过滤),直接返回
  523. if len(cases) == 0 {
  524. return nil
  525. }
  526. // 3) WHERE IN 使用占位符
  527. ids := make([]uint64, 0, len(idSet))
  528. for id := range idSet {
  529. ids = append(ids, id)
  530. }
  531. placeholders := make([]string, 0, len(ids))
  532. for range ids {
  533. placeholders = append(placeholders, "?")
  534. }
  535. for _, id := range ids {
  536. args = append(args, id)
  537. }
  538. // 4) 组装最终 SQL
  539. sql := fmt.Sprintf(
  540. "UPDATE %s SET %s WHERE id IN (%s)",
  541. tableName,
  542. strings.Join(cases, ", "),
  543. strings.Join(placeholders, ","),
  544. )
  545. // 5) 建议放在事务里执行
  546. return db.Transaction(func(tx *gorm.DB) error {
  547. return tx.Exec(sql, args...).Error
  548. })
  549. }
  550. // get 通过companyID 获取法人库数据
  551. func get() {
  552. // 2. 查询一条数据
  553. var ent EntInfo
  554. if err := MysqlDB.Where("company_id = ?", "001c2e9882ae982abf6e1e9ed06e2654").First(&ent).Error; err != nil {
  555. panic(err)
  556. }
  557. // 3. 反序列化 RoaringBitmap
  558. rbm := roaring.NewBitmap()
  559. if len(ent.JYCompanyLabel) > 0 {
  560. if err := rbm.UnmarshalBinary(ent.JYCompanyLabel); err != nil {
  561. panic(err)
  562. }
  563. }
  564. // 4. 转成 []uint64
  565. ids := make([]uint64, 0, rbm.GetCardinality())
  566. it := rbm.Iterator()
  567. for it.HasNext() {
  568. ids = append(ids, uint64(it.Next()))
  569. }
  570. fmt.Println("CompanyID:", ent.CompanyID)
  571. fmt.Println("标签ID集合:", ids)
  572. }