company.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package main
  2. import (
  3. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  4. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  5. "log"
  6. "strings"
  7. "sync"
  8. )
  9. // getCompany 获取公司信息
  10. func getCompany() {
  11. // 181 凭安库
  12. Mgo2 := &mongodb.MongodbSim{
  13. MongodbAddr: "172.17.4.181:27001",
  14. //MongodbAddr: "127.0.0.1:27001",
  15. DbName: "mixdata",
  16. Size: 10,
  17. UserName: "",
  18. Password: "",
  19. Direct: true,
  20. }
  21. Mgo2.InitPool()
  22. //1.获取凭安特企,河南数据
  23. where := map[string]interface{}{
  24. "report_year": 2023,
  25. }
  26. defer util.Catch()
  27. sess := Mgo2.GetMgoConn()
  28. defer Mgo2.DestoryMongoConn(sess)
  29. count := 0
  30. it := sess.DB("mixdata").C("annual_report_asset").Find(where).Select(nil).Iter()
  31. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  32. if count%10000 == 0 {
  33. log.Println("corrent:", count, tmp["company_id"])
  34. }
  35. isa := false
  36. isb := false
  37. //总资产,个体户选择不公示
  38. if tmp["total_amount"] == nil || util.ObjToString(tmp["total_amount"]) == "企业选择不公示" || util.ObjToString(tmp["total_amount"]) == "个体户选择不公示" {
  39. isa = true
  40. }
  41. if tmp["business_income"] == nil || util.ObjToString(tmp["business_income"]) == "企业选择不公示" || util.ObjToString(tmp["business_income"]) == "个体户选择不公示" {
  42. isb = true
  43. }
  44. if isa && isb {
  45. continue
  46. }
  47. insert := map[string]interface{}{
  48. "company_id": tmp["company_id"],
  49. "total_amount": tmp["total_amount"],
  50. "business_income": tmp["business_income"],
  51. "use_flag": tmp["use_flag"],
  52. }
  53. Mgo2.InsertOrUpdate("wcc", "wcc_annual_report_asset", insert)
  54. }
  55. }
  56. // updateReport 更新补充年报信息
  57. func updateReport() {
  58. // 181 凭安库
  59. Mgo2 := &mongodb.MongodbSim{
  60. MongodbAddr: "172.17.4.181:27001",
  61. //MongodbAddr: "127.0.0.1:27001",
  62. DbName: "mixdata",
  63. Size: 10,
  64. UserName: "",
  65. Password: "",
  66. //Direct: true,
  67. }
  68. Mgo2.InitPool()
  69. MgoB := &mongodb.MongodbSim{
  70. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  71. //MongodbAddr: "127.0.0.1:27083",
  72. Size: 10,
  73. DbName: "qfw",
  74. UserName: "SJZY_RWbid_ES",
  75. Password: "SJZY@B4i4D5e6S",
  76. //Direct: true,
  77. }
  78. MgoB.InitPool()
  79. defer util.Catch()
  80. sess := MgoB.GetMgoConn()
  81. defer MgoB.DestoryMongoConn(sess)
  82. count := 0
  83. ch := make(chan bool, 15)
  84. wg := &sync.WaitGroup{}
  85. it := sess.DB("qfw").C("wcc_2025_guangdong_qyxy").Find(nil).Select(nil).Iter()
  86. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  87. if count%10000 == 0 {
  88. log.Println("current:", count, tmp["company_name"])
  89. }
  90. ch <- true
  91. wg.Add(1)
  92. go func(tmp map[string]interface{}) {
  93. defer func() {
  94. <-ch
  95. wg.Done()
  96. }()
  97. where := map[string]interface{}{
  98. "company_id": tmp["id"],
  99. "report_year": 2023,
  100. }
  101. id := mongodb.BsonIdToSId(tmp["_id"])
  102. report, _ := Mgo2.FindOne("annual_report_asset", where)
  103. if (*report) != nil {
  104. update := make(map[string]interface{})
  105. total_amount := util.ObjToString((*report)["total_amount"])
  106. if total_amount != "" && !strings.Contains(total_amount, "不公示") {
  107. update["total_amount"] = (*report)["total_amount"]
  108. update["total_amount2"] = strings.TrimSpace(strings.ReplaceAll(total_amount, "万元", ""))
  109. }
  110. business_income := util.ObjToString((*report)["business_income"])
  111. if business_income != "" && !strings.Contains(business_income, "不公示") {
  112. update["business_income"] = (*report)["business_income"]
  113. update["business_income2"] = strings.TrimSpace(strings.ReplaceAll(business_income, "万元", ""))
  114. }
  115. if len(update) > 0 {
  116. MgoB.UpdateById("wcc_2025_guangdong_qyxy", id, map[string]interface{}{"$set": update})
  117. }
  118. }
  119. }(tmp)
  120. tmp = map[string]interface{}{}
  121. }
  122. wg.Wait()
  123. log.Println("数据处理完毕")
  124. }
  125. // updateCapital 获取更新注册资金
  126. func updateCapital() {
  127. MgoB2 := &mongodb.MongodbSim{
  128. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  129. //MongodbAddr: "127.0.0.1:27083",
  130. Size: 10,
  131. DbName: "mixdata",
  132. UserName: "SJZY_RWbid_ES",
  133. Password: "SJZY@B4i4D5e6S",
  134. //Direct: true,
  135. }
  136. MgoB2.InitPool()
  137. MgoB := &mongodb.MongodbSim{
  138. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  139. //MongodbAddr: "127.0.0.1:27083",
  140. Size: 10,
  141. DbName: "qfw",
  142. UserName: "SJZY_RWbid_ES",
  143. Password: "SJZY@B4i4D5e6S",
  144. //Direct: true,
  145. }
  146. MgoB.InitPool()
  147. defer util.Catch()
  148. sess := MgoB.GetMgoConn()
  149. defer MgoB.DestoryMongoConn(sess)
  150. count := 0
  151. ch := make(chan bool, 15)
  152. wg := &sync.WaitGroup{}
  153. it := sess.DB("qfw").C("wcc_2025_guangdong_qyxy").Find(nil).Select(nil).Iter()
  154. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  155. if count%10000 == 0 {
  156. log.Println("current:", count, tmp["company_name"])
  157. }
  158. ch <- true
  159. wg.Add(1)
  160. go func(tmp map[string]interface{}) {
  161. defer func() {
  162. <-ch
  163. wg.Done()
  164. }()
  165. where := map[string]interface{}{
  166. "_id": tmp["id"],
  167. }
  168. id := tmp["_id"]
  169. report, _ := MgoB2.FindOne("qyxy_std", where)
  170. if (*report) != nil {
  171. update := make(map[string]interface{})
  172. if _, ok := (*report)["capital"]; ok {
  173. update["capital"] = (*report)["capital"]
  174. }
  175. if _, ok := (*report)["real_capital"]; ok {
  176. update["real_capital"] = (*report)["real_capital"]
  177. }
  178. if len(update) > 0 {
  179. MgoB.UpdateById("wcc_2025_guangdong_qyxy", id, map[string]interface{}{"$set": update})
  180. }
  181. }
  182. }(tmp)
  183. tmp = map[string]interface{}{}
  184. }
  185. wg.Wait()
  186. log.Println("数据处理完毕")
  187. }