allData.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. package main
  2. import (
  3. "fmt"
  4. "gorm.io/driver/clickhouse"
  5. "gorm.io/gorm"
  6. "gorm.io/gorm/logger"
  7. "strings"
  8. "time"
  9. "unicode/utf8"
  10. //_ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
  11. //"github.com/gogf/gf/v2/frame/g"
  12. //"github.com/gogf/gf/v2/os/gctx"
  13. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  14. "log"
  15. "net/url"
  16. )
  17. // investData 投资关系存量数据处理
  18. func investData() {
  19. where := map[string]interface{}{
  20. //"stock_type": "企业法人",
  21. "stock_type": map[string]interface{}{
  22. "$ne": "自然人股东",
  23. },
  24. //"use_flag": map[string]interface{}{
  25. // "$lt": 5,
  26. //},
  27. }
  28. //{stock_type:{$ne:"自然人股东"}}
  29. /**
  30. 找 company_partner ,stock_type不是自然人股东的数据,循环数据,判断stock_name 长度大于3
  31. 根据stack_name 到 qyxy_std 找 company_name 想等数据存在,存在就写入;增量数据直接查询法人库表
  32. */
  33. //Mgo := &mongodb.MongodbSim{
  34. // MongodbAddr: "172.17.4.181:27001",
  35. // //MongodbAddr: "127.0.0.1:27001",
  36. // DbName: "mixdata",
  37. // Size: 10,
  38. //}
  39. //
  40. //Mgo.InitPool()
  41. sess := MgoPA.GetMgoConn()
  42. defer MgoPA.DestoryMongoConn(sess)
  43. //正式环境
  44. username := GF.Clickhouse.Username
  45. password := GF.Clickhouse.Password
  46. host := GF.Clickhouse.Host
  47. //测试环境
  48. //username := "jytop"
  49. //password := "pwdTopJy123"
  50. //host := "192.168.3.207:19000"
  51. // 本地环境
  52. //username := "wcc"
  53. //password := "123"
  54. //host := "localhost:9090"
  55. encodedPassword := url.QueryEscape(password)
  56. dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
  57. ////dsn := "clickhouse://jybi_admin:Da#jy20230825@127.0.0.1:8123/default?dial_timeout=10s&read_timeout=20s"
  58. ////dsn := "clickhouse://jytop:pwdTopJy123@192.168.3.207:18123/information?dial_timeout=10s&read_timeout=20s"
  59. ////dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s"
  60. //dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s"
  61. //clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s
  62. //dsn := "clickhouse://jybi_admin:Da#jy20230825@localhost:18123/test?dial_timeout=10s&read_timeout=20s"
  63. db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
  64. Logger: logger.Default.LogMode(logger.Silent),
  65. })
  66. if err != nil {
  67. log.Fatal("打开数据库失败:", err)
  68. } else {
  69. log.Println("连接数据库成功", db.Name())
  70. }
  71. //res := make(map[string]interface{})
  72. //db.Table("information.information").Select([]string{"id", "endtime", "starttime"}).Find(&res, "datajson_id = ? ", "6350a552911e1eb345b55413")
  73. //log.Println(res)
  74. query := sess.DB("mixdata").C("company_partner").Find(where).Select(nil).Sort("update_time").Iter()
  75. count := 0
  76. //datas := make([]EntMapCode, 0)
  77. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  78. if count%100 == 0 {
  79. log.Println("current ---", count, tmp["stock_name"], tmp["update_time"])
  80. }
  81. if util.ObjToString(tmp["stock_type"]) == "自然人股东" {
  82. continue
  83. }
  84. if util.IntAll(tmp["use_flag"]) > 5 {
  85. continue
  86. }
  87. //历史数据,作废
  88. if util.IntAll(tmp["is_history"]) != 0 {
  89. continue
  90. }
  91. //股东名称 长度小于5,认为不是企业
  92. if utf8.RuneCountInString(util.ObjToString(tmp["stock_name"])) < 5 {
  93. continue
  94. }
  95. // 公司名称为空
  96. if util.ObjToString(tmp["stock_name"]) == "" || util.ObjToString(tmp["company_name"]) == "" {
  97. continue
  98. }
  99. //投资关系-0201,交易关系-0301,
  100. //管辖关系-0101,直属关系-0102,组成关系-0103
  101. data := EntMapCode{
  102. AName: util.ObjToString(tmp["stock_name"]),
  103. BName: util.ObjToString(tmp["company_name"]),
  104. Code: "0201",
  105. InvestRatio: util.ObjToString(tmp["stock_proportion"]),
  106. InvestPrice: util.ObjToString(tmp["stock_capital"]),
  107. CreateTime: time.Now().Unix(),
  108. UpdateTime: time.Now().Unix(),
  109. }
  110. if err = db.Create(data).Error; err != nil {
  111. log.Println("create err", err, data.AName)
  112. }
  113. }
  114. log.Printf("over")
  115. }
  116. // biddingAllData 交易关系数据处理
  117. func biddingAllData() {
  118. /**
  119. 查询bidding 数据中,采购单位和中标单位,就是交易关系
  120. */
  121. sess := MgoB.GetMgoConn()
  122. defer MgoB.DestoryMongoConn(sess)
  123. //正式环境
  124. username := GF.Clickhouse.Username
  125. password := GF.Clickhouse.Password
  126. host := GF.Clickhouse.Host
  127. //测试环境
  128. //username := "jytop"
  129. //password := "pwdTopJy123"
  130. //host := "192.168.3.207:19000"
  131. // 本地环境
  132. //username := "wcc"
  133. //password := "123"
  134. //host := "localhost:9090"
  135. encodedPassword := url.QueryEscape(password)
  136. dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
  137. db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
  138. Logger: logger.Default.LogMode(logger.Silent),
  139. })
  140. if err != nil {
  141. log.Fatal("打开数据库失败:", err)
  142. } else {
  143. log.Println("连接数据库成功", db.Name())
  144. }
  145. //db.Logger.LogMode(gor)
  146. //res := make(map[string]interface{})
  147. //db.Table("information.information").Select([]string{"id", "endtime", "starttime"}).Find(&res, "datajson_id = ? ", "6350a552911e1eb345b55413")
  148. //log.Println(res)
  149. query := sess.DB(GF.MongoB.DB).C("bidding").Find(nil).Select(nil).Sort("_id").Iter()
  150. count := 0
  151. //datas := make([]EntMapCode, 0)
  152. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  153. if count%10000 == 0 {
  154. log.Println("current ---", count, tmp["_id"])
  155. }
  156. if util.IntAll(tmp["extracttype"]) == -1 {
  157. continue
  158. }
  159. rsa, rsb := false, false
  160. // 1.判断规则打物业标签
  161. if tag_topinformation, ok := tmp["tag_topinformation"]; !ok {
  162. rsa = false
  163. } else {
  164. if tags, ok := tag_topinformation.([]interface{}); ok {
  165. for _, tag := range tags {
  166. tg := util.ObjToString(tag)
  167. if tg == "情报_物业" {
  168. rsa = true
  169. break
  170. }
  171. }
  172. }
  173. }
  174. // 2.判断人工智能打物业标签
  175. if tag_topinformation_ai, ok := tmp["tag_topinformation_ai"]; !ok {
  176. rsb = false
  177. } else {
  178. if tags, ok := tag_topinformation_ai.([]interface{}); ok {
  179. for _, tag := range tags {
  180. tg := util.ObjToString(tag)
  181. if tg == "情报_物业" {
  182. rsb = true
  183. break
  184. }
  185. }
  186. }
  187. }
  188. // 规则和人工智能,都没打上物业标签
  189. if !rsb && !rsa {
  190. continue
  191. }
  192. if util.ObjToString(tmp["buyer"]) == "" {
  193. continue
  194. }
  195. // 没有中标单位,直接跳过
  196. if sWinner, ok := tmp["s_winner"]; !ok {
  197. continue
  198. } else {
  199. //投资关系-0201,交易关系-0301,
  200. //管辖关系-0101,直属关系-0102,组成关系-0103
  201. winners := util.ObjToString(sWinner)
  202. wins := strings.Split(winners, ",")
  203. for _, winer := range wins {
  204. exist := EntMapCode{}
  205. db.Where(&EntMapCode{AName: util.ObjToString(tmp["buyer"]), BName: winer}).First(&exist)
  206. if exist.CreateTime == 0 {
  207. data := EntMapCode{
  208. AName: util.ObjToString(tmp["buyer"]),
  209. BName: winer,
  210. Code: "0301",
  211. CreateTime: time.Now().Unix(),
  212. UpdateTime: time.Now().Unix(),
  213. }
  214. if err = db.Create(data).Error; err != nil {
  215. log.Println("create err", err, data.AName, data.BName)
  216. }
  217. }
  218. }
  219. }
  220. }
  221. log.Printf("over")
  222. }
  223. // organizeData 管辖关系数据;special_enterprise
  224. func organizeData() {
  225. sess := MgoPA.GetMgoConn()
  226. defer MgoPA.DestoryMongoConn(sess)
  227. //正式环境
  228. username := GF.Clickhouse.Username
  229. password := GF.Clickhouse.Password
  230. host := GF.Clickhouse.Host
  231. encodedPassword := url.QueryEscape(password)
  232. dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
  233. db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
  234. Logger: logger.Default.LogMode(logger.Silent),
  235. })
  236. if err != nil {
  237. log.Fatal("打开数据库失败:", err)
  238. } else {
  239. log.Println("连接数据库成功", db.Name())
  240. }
  241. query := sess.DB("mixdata").C("special_enterprise").Find(nil).Select(nil).Iter()
  242. count := 0
  243. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  244. if count%1000 == 0 {
  245. log.Println("current ---", count, tmp["company_name"], tmp["organizer"])
  246. }
  247. if util.ObjToString(tmp["company_status"]) != "正常" {
  248. continue
  249. }
  250. if util.IntAll(tmp["use_flag"]) != 0 {
  251. continue
  252. }
  253. // 公司名称为空
  254. if util.ObjToString(tmp["organizer"]) == "" || util.ObjToString(tmp["company_name"]) == "" {
  255. continue
  256. }
  257. //投资关系-0201,交易关系-0301,
  258. //管辖关系-0101,直属关系-0102,组成关系-0103
  259. data := EntMapCode{
  260. AName: util.ObjToString(tmp["organizer"]),
  261. BName: util.ObjToString(tmp["company_name"]),
  262. Code: "0101",
  263. CreateTime: time.Now().Unix(),
  264. UpdateTime: time.Now().Unix(),
  265. }
  266. if err = db.Create(data).Error; err != nil {
  267. log.Println("create err", err, data.AName)
  268. }
  269. }
  270. log.Printf("over")
  271. }
  272. // areaData 处理政府机关省市区单位
  273. func areaData() {
  274. //f, err := excelize.OpenFile("./管辖关系.xlsx")
  275. //if err != nil {
  276. // fmt.Println(err)
  277. // return
  278. //}
  279. //defer func() {
  280. // if err := f.Close(); err != nil {
  281. // fmt.Println(err)
  282. // }
  283. //}()
  284. //
  285. //rows, err := f.GetRows("区")
  286. //if err != nil {
  287. // fmt.Println(err)
  288. // return
  289. //}
  290. ////================================//
  291. //f2, err := excelize.OpenFile("./area.xlsx")
  292. //if err != nil {
  293. // fmt.Println(err)
  294. // return
  295. //}
  296. //defer func() {
  297. // if err := f2.Close(); err != nil {
  298. // fmt.Println(err)
  299. // }
  300. //}()
  301. //
  302. //rows2, err := f2.GetRows("区")
  303. //if err != nil {
  304. // fmt.Println(err)
  305. // return
  306. //}
  307. //
  308. //for i := 1; i < len(rows); i++ {
  309. // aname := rows[i][1]
  310. // bname := rows[i][2]
  311. // //a := strings.Replace(aname, "河南省", "", -1)
  312. // //a := strings.ReplaceAll(aname, "郑州市", "")
  313. // //b := strings.ReplaceAll(bname, "郑州市", "")
  314. // fmt.Println(aname, bname)
  315. // for j := 1; j < len(rows2); j++ {
  316. // //var newNamea, newNameb string
  317. // //if rows2[j][0] == "北京" || rows2[j][0] == "天津" || rows2[j][0] == "上海" || rows2[j][0] == "重庆" {
  318. // // //newNamea = rows2[j][0] + "市" + a
  319. // // //newNameb = rows2[j][0] + "市" + b
  320. // //} else {
  321. // // newNamea = rows2[j][0] + "省" + a
  322. // // newNameb = rows2[j][0] + "省" + b
  323. // //}
  324. // if strings.Contains(rows2[j][2], "县") {
  325. // continue
  326. // }
  327. //
  328. // newNamea := strings.ReplaceAll(aname, "郑州市金水区", rows2[j][1]+rows2[j][2])
  329. // newNameb := strings.ReplaceAll(bname, "郑州市金水区", rows2[j][1]+rows2[j][2])
  330. //
  331. // insert := map[string]interface{}{
  332. // "aname": newNamea,
  333. // "bname": newNameb,
  334. // "code": rows[i][4],
  335. // }
  336. // MgoB.Save("wcc_zhengfujigou_5", insert)
  337. // }
  338. //}
  339. sess := MgoB.GetMgoConn()
  340. defer MgoB.DestoryMongoConn(sess)
  341. //正式环境
  342. username := GF.Clickhouse.Username
  343. password := GF.Clickhouse.Password
  344. host := GF.Clickhouse.Host
  345. encodedPassword := url.QueryEscape(password)
  346. dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
  347. db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
  348. Logger: logger.Default.LogMode(logger.Silent),
  349. })
  350. if err != nil {
  351. log.Fatal("打开数据库失败:", err)
  352. } else {
  353. log.Println("连接数据库成功", db.Name())
  354. }
  355. query := sess.DB(GF.MongoB.DB).C("wcc_zhengfujigou").Find(nil).Select(nil).Sort("_id").Iter()
  356. count := 0
  357. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  358. if count%10000 == 0 {
  359. log.Println("current ---", count, tmp["aname"], tmp["bname"])
  360. }
  361. //判断重复数据
  362. exist := EntMapCode{}
  363. db.Where(&EntMapCode{AName: util.ObjToString(tmp["aname"]), BName: util.ObjToString(tmp["bname"])}).First(&exist)
  364. if exist.CreateTime > 0 {
  365. continue
  366. }
  367. data := EntMapCode{
  368. AName: util.ObjToString(tmp["aname"]),
  369. BName: util.ObjToString(tmp["bname"]),
  370. CreateTime: time.Now().Unix(),
  371. UpdateTime: time.Now().Unix(),
  372. }
  373. //管辖关系-0101,直属关系-0102,组成关系-0103
  374. if util.ObjToString(tmp["code"]) == "管辖关系" {
  375. data.Code = "0101"
  376. } else if util.ObjToString(tmp["code"]) == "直属关系" {
  377. data.Code = "0102"
  378. } else if util.ObjToString(tmp["code"]) == "组成关系" {
  379. data.Code = "0103"
  380. }
  381. if err = db.Create(data).Error; err != nil {
  382. log.Println("create err", err, data.AName, data.BName)
  383. }
  384. }
  385. log.Println("over")
  386. }
  387. // updateInfoId 更新法人库表ID,更新映射表对应的法人库ID
  388. func updateInfoId() {
  389. username := GF.Clickhouse.Username
  390. password := GF.Clickhouse.Password
  391. host := GF.Clickhouse.Host
  392. encodedPassword := url.QueryEscape(password)
  393. dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
  394. db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
  395. Logger: logger.Default.LogMode(logger.Silent),
  396. })
  397. if err != nil {
  398. log.Fatal("打开数据库失败:", err)
  399. } else {
  400. log.Println("连接数据库成功", db.Name())
  401. }
  402. //=======//
  403. // 执行原始 SQL 查询
  404. batchSize := 100
  405. page := 1
  406. for {
  407. log.Println("current page ", page)
  408. var entMapCodes []EntMapCode
  409. if err := db.Raw("SELECT * FROM ent_map_code where a_id = '' or b_id = '' ORDER BY create_time LIMIT ? OFFSET ? ", batchSize, (page-1)*batchSize).Scan(&entMapCodes).Error; err != nil {
  410. if err == gorm.ErrRecordNotFound {
  411. break
  412. }
  413. log.Fatal("failed to fetch data:", err)
  414. }
  415. // 处理查询到的数据
  416. for _, entMapCode := range entMapCodes {
  417. if entMapCode.AId == "" {
  418. AInfo := EntInfo{}
  419. db.Model(&EntInfo{}).Where("company_name = ? ", entMapCode.AName).Select("company_name", "id").First(&AInfo)
  420. if AInfo.ID != "" {
  421. update1 := map[string]interface{}{
  422. "a_id": AInfo.ID,
  423. }
  424. db.Model(&entMapCode).Where(&EntMapCode{AName: entMapCode.AName}).Updates(update1)
  425. }
  426. }
  427. if entMapCode.BId == "" {
  428. BInfo := EntInfo{}
  429. db.Model(&EntInfo{}).Where("company_name = ? ", entMapCode.BName).Select("company_name", "id").First(&BInfo)
  430. if BInfo.ID != "" {
  431. update2 := map[string]interface{}{
  432. "b_id": BInfo.ID,
  433. }
  434. db.Model(&entMapCode).Where(&EntMapCode{BName: entMapCode.BName}).Updates(update2)
  435. }
  436. }
  437. }
  438. // 如果查询结果为空,则退出循环
  439. if len(entMapCodes) < batchSize {
  440. break
  441. }
  442. page++
  443. }
  444. log.Println("迭代结束")
  445. }