qyxy.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.mongodb.org/mongo-driver/bson/primitive"
  9. "go.mongodb.org/mongo-driver/mongo"
  10. "go.mongodb.org/mongo-driver/mongo/options"
  11. "io"
  12. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  14. "log"
  15. "net/url"
  16. "strconv"
  17. "strings"
  18. "sync"
  19. "unicode/utf8"
  20. )
  21. func countReportYear22() {
  22. ctx := context.Background()
  23. username := "SJZY_RWbid_ES"
  24. password := "SJZY@B4i4D5e6S"
  25. hosts := []string{"172.31.31.202:27081", "172.20.45.128:27080"}
  26. uri, err := BuildMongoURI(username, password, hosts, nil)
  27. if err != nil {
  28. panic(err)
  29. }
  30. clientOptions := options.Client().ApplyURI(uri)
  31. client, err := mongo.Connect(ctx, clientOptions)
  32. if err != nil {
  33. log.Fatal(err)
  34. }
  35. defer client.Disconnect(ctx)
  36. collection := client.Database("mixdata").Collection("qyxy_std")
  37. // 分段配置
  38. start := int64(1630914780)
  39. end := int64(1751238647)
  40. segment := (end - start) / 10
  41. type SegmentResult struct {
  42. Num int
  43. Year2023 int
  44. Year2024 int
  45. }
  46. resultChan := make(chan SegmentResult, 10)
  47. var outerWg sync.WaitGroup
  48. for i := 0; i < 10; i++ {
  49. segStart := start + int64(i)*segment
  50. segEnd := segStart + segment
  51. if i == 9 {
  52. segEnd = end // 最后一段到结尾
  53. }
  54. outerWg.Add(1)
  55. go func(segIdx int, segStart, segEnd int64) {
  56. defer outerWg.Done()
  57. // filter: 按段过滤
  58. filter := bson.M{
  59. "company_type": bson.M{"$ne": "个体工商户"},
  60. "updatetime": bson.M{
  61. "$gte": segStart,
  62. "$lt": segEnd,
  63. },
  64. }
  65. batchSize := int32(500)
  66. cursor, err := collection.Find(
  67. ctx,
  68. filter,
  69. options.Find().
  70. SetBatchSize(batchSize).
  71. SetSort(bson.D{{Key: "_id", Value: -1}}),
  72. )
  73. if err != nil {
  74. log.Printf("[segment %d] Find error: %v\n", segIdx, err)
  75. return
  76. }
  77. defer cursor.Close(ctx)
  78. // worker pool
  79. workerCount := 8
  80. docChan := make(chan map[string]interface{}, 1000)
  81. var wg sync.WaitGroup
  82. // 本段统计
  83. localCount := map[int]int{
  84. 2023: 0,
  85. 2024: 0,
  86. }
  87. num := 0
  88. var mu sync.Mutex
  89. for w := 0; w < workerCount; w++ {
  90. wg.Add(1)
  91. go func() {
  92. defer wg.Done()
  93. for doc := range docChan {
  94. CompanyStatus := util.ObjToString(doc["company_status"])
  95. CompanyName := util.ObjToString(doc["company_name"])
  96. UseFlag := util.IntAll(doc["use_flag"])
  97. if UseFlag > 0 ||
  98. strings.Contains(CompanyStatus, "注销") ||
  99. strings.Contains(CompanyStatus, "吊销") ||
  100. CompanyName == "" ||
  101. strings.Contains(CompanyName, "已除名") ||
  102. utf8.RuneCountInString(CompanyName) < 5 {
  103. continue
  104. }
  105. annualReportsRaw, ok := doc["annual_reports"]
  106. if !ok {
  107. continue
  108. }
  109. annualReports := make([]interface{}, 0)
  110. switch v := annualReportsRaw.(type) {
  111. case []interface{}:
  112. annualReports = v
  113. case primitive.A:
  114. annualReports = v
  115. default:
  116. continue
  117. }
  118. yearHasReport := map[int]bool{
  119. 2023: false,
  120. 2024: false,
  121. }
  122. for _, r := range annualReports {
  123. report, ok := r.(map[string]interface{})
  124. if !ok {
  125. continue
  126. }
  127. yearRaw, exists := report["report_year"]
  128. if !exists {
  129. continue
  130. }
  131. var yearInt int
  132. switch v := yearRaw.(type) {
  133. case string:
  134. y, err := strconv.Atoi(v)
  135. if err != nil {
  136. continue
  137. }
  138. yearInt = y
  139. case float64:
  140. yearInt = int(v)
  141. case int32:
  142. yearInt = int(v)
  143. case int64:
  144. yearInt = int(v)
  145. default:
  146. continue
  147. }
  148. if yearInt == 2023 {
  149. yearHasReport[2023] = true
  150. } else if yearInt == 2024 {
  151. yearHasReport[2024] = true
  152. }
  153. }
  154. mu.Lock()
  155. if yearHasReport[2023] {
  156. localCount[2023]++
  157. }
  158. if yearHasReport[2024] {
  159. localCount[2024]++
  160. }
  161. mu.Unlock()
  162. }
  163. }()
  164. }
  165. // 遍历 cursor
  166. for cursor.Next(ctx) {
  167. var doc map[string]interface{}
  168. if err := cursor.Decode(&doc); err != nil {
  169. continue
  170. }
  171. num++
  172. docChan <- doc
  173. if num%10000 == 0 {
  174. mu.Lock()
  175. log.Printf("[segment %d] processed: %d, 2023: %d, 2024: %d", segIdx, num, localCount[2023], localCount[2024])
  176. mu.Unlock()
  177. }
  178. }
  179. close(docChan)
  180. wg.Wait()
  181. resultChan <- SegmentResult{
  182. Num: num,
  183. Year2023: localCount[2023],
  184. Year2024: localCount[2024],
  185. }
  186. }(i, segStart, segEnd)
  187. }
  188. // 等待所有段结束
  189. go func() {
  190. outerWg.Wait()
  191. close(resultChan)
  192. }()
  193. // 汇总
  194. totalDocs := 0
  195. finalCount := map[int]int{
  196. 2023: 0,
  197. 2024: 0,
  198. }
  199. for segRes := range resultChan {
  200. totalDocs += segRes.Num
  201. finalCount[2023] += segRes.Year2023
  202. finalCount[2024] += segRes.Year2024
  203. }
  204. fmt.Printf("总处理文档数: %d\n", totalDocs)
  205. fmt.Printf("2023 年有年报的企业数: %d\n", finalCount[2023])
  206. fmt.Printf("2024 年有年报的企业数: %d\n", finalCount[2024])
  207. }
  208. // countReportYear 统计企业年报
  209. func countReportYear() {
  210. ctx := context.Background()
  211. username := "SJZY_RWbid_ES"
  212. password := "SJZY@B4i4D5e6S"
  213. hosts := []string{"172.31.31.202:27081", "172.20.45.128:27080"}
  214. //hosts := []string{"127.0.0.1:27083"}
  215. // 构造 URI
  216. uri, err := BuildMongoURI(username, password, hosts, nil)
  217. if err != nil {
  218. panic(err)
  219. }
  220. // 连接 MongoDB
  221. clientOptions := options.Client().ApplyURI(uri)
  222. //clientOptions.SetDirect(true)
  223. client, err := mongo.Connect(ctx, clientOptions)
  224. if err != nil {
  225. log.Fatal(err)
  226. }
  227. defer client.Disconnect(ctx)
  228. collection := client.Database("mixdata").Collection("qyxy_std")
  229. // 查询条件:company_type != 个体工商户
  230. filter := bson.M{"company_type": bson.M{"$ne": "个体工商户"}}
  231. // 批量大小
  232. batchSize := int32(500)
  233. //cursor, err := collection.Find(ctx, filter, options.Find().SetBatchSize(batchSize))
  234. cursor, err := collection.Find(
  235. ctx,
  236. filter,
  237. options.Find().
  238. SetBatchSize(batchSize).
  239. SetSort(bson.D{{Key: "_id", Value: -1}}),
  240. )
  241. if err != nil {
  242. log.Fatal(err)
  243. }
  244. defer cursor.Close(ctx)
  245. // 定义统计变量
  246. finalCount := map[int]int{
  247. 2023: 0,
  248. 2024: 0,
  249. }
  250. var mu sync.Mutex
  251. // worker 并发数
  252. workerCount := 8
  253. docChan := make(chan map[string]interface{}, 1000)
  254. var wg sync.WaitGroup
  255. // 启动 worker
  256. for i := 0; i < workerCount; i++ {
  257. wg.Add(1)
  258. go func() {
  259. defer wg.Done()
  260. for doc := range docChan {
  261. // 过滤条件
  262. CompanyStatus := util.ObjToString(doc["company_status"])
  263. CompanyName := util.ObjToString(doc["company_name"])
  264. UseFlag := util.IntAll(doc["use_flag"])
  265. if UseFlag > 0 ||
  266. strings.Contains(CompanyStatus, "注销") ||
  267. strings.Contains(CompanyStatus, "吊销") ||
  268. CompanyName == "" ||
  269. strings.Contains(CompanyName, "已除名") ||
  270. utf8.RuneCountInString(CompanyName) < 5 {
  271. continue
  272. }
  273. // annual_reports
  274. annualReportsRaw, ok := doc["annual_reports"]
  275. if !ok {
  276. continue
  277. }
  278. annualReports := make([]interface{}, 0)
  279. switch v := annualReportsRaw.(type) {
  280. case []interface{}:
  281. annualReports = v
  282. case primitive.A:
  283. annualReports = v
  284. default:
  285. log.Printf("annual_reports unexpected type: %T\n", v)
  286. continue
  287. }
  288. //annualReports, ok := annualReportsRaw.([]interface{})
  289. //if !ok {
  290. // continue
  291. //}
  292. // 检测当前企业是否在 2023/2024 有年报,只算一次
  293. yearHasReport := map[int]bool{
  294. 2023: false,
  295. 2024: false,
  296. }
  297. for _, r := range annualReports {
  298. report, ok := r.(map[string]interface{})
  299. if !ok {
  300. continue
  301. }
  302. yearRaw, exists := report["report_year"]
  303. if !exists {
  304. continue
  305. }
  306. var yearInt int
  307. switch v := yearRaw.(type) {
  308. case string:
  309. y, err := strconv.Atoi(v)
  310. if err != nil {
  311. continue
  312. }
  313. yearInt = y
  314. case float64:
  315. yearInt = int(v)
  316. case int32:
  317. yearInt = int(v)
  318. case int64:
  319. yearInt = int(v)
  320. default:
  321. continue
  322. }
  323. if yearInt == 2023 {
  324. yearHasReport[2023] = true
  325. } else if yearInt == 2024 {
  326. yearHasReport[2024] = true
  327. }
  328. }
  329. // 有就+1
  330. mu.Lock()
  331. if yearHasReport[2023] {
  332. finalCount[2023]++
  333. }
  334. if yearHasReport[2024] {
  335. finalCount[2024]++
  336. }
  337. mu.Unlock()
  338. }
  339. }()
  340. }
  341. // 主 goroutine 遍历 cursor,实时打印进度
  342. num := 0
  343. for cursor.Next(ctx) {
  344. var doc map[string]interface{}
  345. if err := cursor.Decode(&doc); err != nil {
  346. log.Println("decode error:", err)
  347. continue
  348. }
  349. num++
  350. docChan <- doc
  351. if num%10000 == 0 {
  352. mu.Lock()
  353. log.Printf("current: %d docs processed, 2023年企业数: %d, 2024年企业数: %d\n", num, finalCount[2023], finalCount[2024])
  354. mu.Unlock()
  355. }
  356. }
  357. close(docChan)
  358. // 等待所有 worker 完成
  359. wg.Wait()
  360. // 输出统计结果
  361. fmt.Printf("总处理文档数: %d\n", num)
  362. fmt.Printf("2023 年有年报的企业数: %d\n", finalCount[2023])
  363. fmt.Printf("2024 年有年报的企业数: %d\n", finalCount[2024])
  364. }
  365. // BuildMongoURI 构造 MongoDB 连接 URI
  366. func BuildMongoURI(username, password string, hosts []string, options map[string]string) (string, error) {
  367. if len(hosts) == 0 {
  368. return "", fmt.Errorf("hosts cannot be empty")
  369. }
  370. hostList := strings.Join(hosts, ",")
  371. var authPart string
  372. if username != "" {
  373. escapedUsername := url.QueryEscape(username)
  374. escapedPassword := url.QueryEscape(password)
  375. authPart = fmt.Sprintf("%s:%s@", escapedUsername, escapedPassword)
  376. // 如果密码为空,也会拼成 username:@host ,MongoDB URI 是支持的,可以保留
  377. }
  378. var optionStr string
  379. if len(options) > 0 {
  380. query := url.Values{}
  381. for k, v := range options {
  382. query.Set(k, v)
  383. }
  384. optionStr = "?" + query.Encode()
  385. }
  386. return fmt.Sprintf("mongodb://%s%s%s", authPart, hostList, optionStr), nil
  387. }
  388. func fixQyxy() {
  389. // 连接 ES
  390. url := "http://172.17.4.184:19908"
  391. //url := "http://127.0.0.1:19908"
  392. username := "jybid"
  393. password := "Top2023_JEB01i@31"
  394. client, err := elastic.NewClient(
  395. elastic.SetURL(url),
  396. elastic.SetBasicAuth(username, password),
  397. elastic.SetSniff(false),
  398. )
  399. if err != nil {
  400. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  401. }
  402. sess := MgoQy.GetMgoConn()
  403. defer MgoQy.DestoryMongoConn(sess)
  404. where := map[string]interface{}{
  405. "use_flag": 10,
  406. }
  407. queryMgo := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Iter()
  408. count := 0
  409. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  410. if count%1000 == 0 {
  411. log.Println("current:", count, tmp["_id"])
  412. }
  413. id := util.ObjToString(tmp["_id"])
  414. company_name := util.ObjToString(tmp["company_name"])
  415. if company_name == "无" || company_name == "|" || company_name == "" {
  416. continue
  417. }
  418. where2 := map[string]interface{}{
  419. "company_name": company_name,
  420. "use_flag": 0,
  421. }
  422. std, _ := MgoQy.FindOne("qyxy_std", where2)
  423. var newID string
  424. if len(*std) > 0 {
  425. newID = util.ObjToString(tmp["_id"])
  426. }
  427. query := elastic.NewBoolQuery().
  428. Must(
  429. elastic.NewTermQuery("entidlist", id), // 模糊匹配 projectname
  430. )
  431. ctx := context.Background()
  432. //开始滚动搜索
  433. scrollID := ""
  434. scroll := "10m"
  435. searchSource := elastic.NewSearchSource().
  436. Query(query).
  437. Size(10000).
  438. Sort("_doc", true) //升序排序
  439. //Sort("_doc", false) //降序排序
  440. searchService := client.Scroll("projectset").
  441. Size(10000).
  442. Scroll(scroll).
  443. SearchSource(searchSource)
  444. res, err := searchService.Do(ctx)
  445. if err != nil {
  446. if err == io.EOF {
  447. log.Println("没有数据")
  448. } else {
  449. log.Println(err)
  450. continue
  451. }
  452. }
  453. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  454. fmt.Println("总数是:", res.TotalHits())
  455. total := 0
  456. //1.处理更新es 数据
  457. for len(res.Hits.Hits) > 0 {
  458. for _, hit := range res.Hits.Hits {
  459. var doc map[string]interface{}
  460. err := json.Unmarshal(hit.Source, &doc)
  461. if err != nil {
  462. log.Printf("解析文档失败:%s", err)
  463. continue
  464. }
  465. esID := util.ObjToString(doc["id"])
  466. newEntidlist := make([]string, 0)
  467. //存入新表
  468. if entidlist, ok := doc["entidlist"].([]interface{}); ok && len(entidlist) > 0 {
  469. for _, v := range entidlist {
  470. list_id := util.ObjToString(v)
  471. if list_id != id && list_id != "-" {
  472. newEntidlist = append(newEntidlist, list_id)
  473. }
  474. }
  475. if newID != "" {
  476. newEntidlist = append(newEntidlist, newID)
  477. }
  478. //更新es
  479. esUpdate := map[string]interface{}{
  480. "entidlist": newEntidlist,
  481. }
  482. // 更新Es 数据
  483. updateEsPool <- []map[string]interface{}{
  484. {"_id": esID},
  485. esUpdate,
  486. }
  487. log.Println("aaaaaaaa", esID, esUpdate)
  488. //更新项目MongoDB
  489. MgoP.UpdateById("projectset_20230904", esID, map[string]interface{}{"$set": esUpdate})
  490. }
  491. }
  492. total = total + len(res.Hits.Hits)
  493. scrollID = res.ScrollId
  494. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  495. log.Println("current count:", total)
  496. if err != nil {
  497. if err == io.EOF {
  498. // 滚动到最后一批数据,退出循环
  499. break
  500. }
  501. log.Println("滚动搜索失败:", err, res)
  502. break // 处理错误时退出循环
  503. }
  504. }
  505. //2.删除MongoDB
  506. whereDel := map[string]interface{}{
  507. "_id": id,
  508. }
  509. MgoQy.Delete("qyxy_std", whereDel)
  510. MgoQy.SaveByOriID("wcc_qyxy_std_delete", tmp)
  511. //在循环外调用 ClearScroll
  512. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  513. if err != nil {
  514. log.Printf("清理滚动搜索失败:%s", err)
  515. }
  516. _, err = client.Scroll().ScrollId(scrollID).Scroll("2m").Do(ctx)
  517. if err != nil {
  518. if elasticErr, ok := err.(*elastic.Error); ok && elasticErr.Status == 400 {
  519. log.Println("滚动查询失败,可能是 Scroll ID 失效,直接退出")
  520. return
  521. }
  522. log.Println("滚动查询失败:", err)
  523. }
  524. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  525. if err != nil {
  526. log.Printf("清理滚动搜索失败:%s", err)
  527. }
  528. }
  529. log.Println("数据处理完毕")
  530. }
  531. // findData 找出企业信息中,company_type 不等于合体工商户的企业数据,然后写入一个临时表
  532. func findData() {
  533. Mgo := &mongodb.MongodbSim{
  534. MongodbAddr: "172.17.189.140:27080",
  535. //MongodbAddr: "127.0.0.1:27083",
  536. Size: 10,
  537. DbName: "mixdata",
  538. UserName: "SJZY_RWbid_ES",
  539. Password: "SJZY@B4i4D5e6S",
  540. //Direct: true,
  541. }
  542. Mgo.InitPool()
  543. sess := Mgo.GetMgoConn()
  544. defer Mgo.DestoryMongoConn(sess)
  545. //where := map[string]interface{}{
  546. // "company_type": map[string]interface{}{
  547. // "$ne": "个体工商户",
  548. // },
  549. //}
  550. query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(map[string]interface{}{"company_name": 1, "company_type": 1, "company_status": 1, "use_flag": 1}).Iter()
  551. count := 0
  552. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  553. if count%10000 == 0 {
  554. log.Println("current:", count)
  555. }
  556. companyType := util.ObjToString(tmp["company_type"])
  557. if companyType == "个体工商户" {
  558. continue
  559. }
  560. company_name := util.ObjToString(tmp["company_name"])
  561. whereN := map[string]interface{}{
  562. "company_name": company_name,
  563. }
  564. num := Mgo.Count("qyxy_std", whereN)
  565. if num > 1 {
  566. Mgo.Save("wcc_qyxy_20240311", tmp)
  567. }
  568. tmp = make(map[string]interface{})
  569. }
  570. log.Println("结束")
  571. }
  572. // getCompanyName
  573. func getCompanyName() {
  574. // 找出wcc_qyxy_20240311表中,公司名称有多个,并且一个use_flag=0,一个use_flag=10的数据
  575. Mgo := &mongodb.MongodbSim{
  576. MongodbAddr: "172.17.189.140:27080",
  577. //MongodbAddr: "127.0.0.1:27083",
  578. Size: 10,
  579. DbName: "mixdata",
  580. UserName: "SJZY_RWbid_ES",
  581. Password: "SJZY@B4i4D5e6S",
  582. //Direct: true,
  583. }
  584. Mgo.InitPool()
  585. companyMap := make(map[string]bool)
  586. sess := Mgo.GetMgoConn()
  587. defer Mgo.DestoryMongoConn(sess)
  588. query := sess.DB("mixdata").C("wcc_qyxy_20240311").Find(nil).Select(map[string]interface{}{"company_name": 1, "use_flag": 1}).Iter()
  589. count := 0
  590. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  591. if count%10000 == 0 {
  592. log.Println("current:", count)
  593. }
  594. name := util.ObjToString(tmp["company_name"])
  595. if companyMap[name] {
  596. continue
  597. } else {
  598. companyMap[name] = true
  599. }
  600. where := map[string]interface{}{
  601. "company_name": name,
  602. }
  603. res, _ := Mgo.Find("wcc_qyxy_20240311", where, nil, nil, false, -1, -1)
  604. flaga := false
  605. flagb := false
  606. for _, v := range *res {
  607. if util.Int64All(v["use_flag"]) == 0 {
  608. flaga = true
  609. } else if util.Int64All(v["use_flag"]) == 10 {
  610. flagb = true
  611. }
  612. }
  613. // 存在0和10 二个状态
  614. if flaga && flagb {
  615. Mgo.Save("wcc_qyxy_name_0325", map[string]interface{}{"company_name": name})
  616. }
  617. tmp = make(map[string]interface{})
  618. }
  619. log.Println("结束")
  620. }
  621. // SpecialData 处理特殊企业数据,更新 qyxy_std 表use_flag
  622. func SpecialData() {
  623. // 处理 特殊企业 数据,
  624. tables := []string{"special_enterprise", "special_foundation", "special_law_office", "special_social_organ", "special_trade_union"}
  625. //1.先处理 special_enterprise 表数据,循环数据,根据company_name 去查询 qyxy_std 表,如果
  626. // 数据只有一条,并且 special_enterprise.company_id 等于 qyxy_std._id;就更新 qyxy_std表的 use_flag 字段
  627. // qyxy_std 表
  628. Mgo := &mongodb.MongodbSim{
  629. MongodbAddr: "172.17.189.140:27080",
  630. //MongodbAddr: "127.0.0.1:27083",
  631. Size: 10,
  632. DbName: "mixdata",
  633. UserName: "SJZY_RWbid_ES",
  634. Password: "SJZY@B4i4D5e6S",
  635. //Direct: true,
  636. }
  637. Mgo.InitPool()
  638. // 181 凭安库
  639. Mgo2 := &mongodb.MongodbSim{
  640. MongodbAddr: "172.17.4.181:27001",
  641. //MongodbAddr: "127.0.0.1:27001",
  642. DbName: "mixdata",
  643. Size: 10,
  644. UserName: "",
  645. Password: "",
  646. //Direct: true,
  647. }
  648. Mgo2.InitPool()
  649. sess := Mgo2.GetMgoConn()
  650. defer Mgo2.DestoryMongoConn(sess)
  651. for _, v := range tables {
  652. query := sess.DB("mixdata").C(v).Find(nil).Select(map[string]interface{}{"company_name": 1, "use_flag": 1, "company_id": 1}).Iter()
  653. count := 0
  654. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  655. if count%10000 == 0 {
  656. log.Println("current:", count, "table - ", v, tmp["company_name"])
  657. }
  658. if _, ok := tmp["company_id"]; !ok {
  659. continue
  660. }
  661. where := map[string]interface{}{
  662. "_id": tmp["company_id"],
  663. }
  664. //update := map[string]interface{}{
  665. // "use_flag": tmp["use_flag"],
  666. //}
  667. //Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false)
  668. //name := util.ObjToString(tmp["company_name"])
  669. //where := map[string]interface{}{
  670. // "company_name": name,
  671. //}
  672. // 如果 ID 相同,std 表use_flag =10,更新
  673. datas, _ := Mgo.FindOne("qyxy_std", where)
  674. if len(*datas) == 0 {
  675. continue
  676. }
  677. //if util.Int64All((*datas)["use_flag"]) == util.Int64All(tmp["use_flag"]) {
  678. // continue
  679. //}
  680. //Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false)
  681. data := *datas
  682. data["use_flag"] = tmp["use_flag"]
  683. Mgo.SaveByOriID("wcc_std_0411", data)
  684. }
  685. }
  686. log.Println("结束")
  687. }
  688. func StdData() {
  689. Mgo := &mongodb.MongodbSim{
  690. MongodbAddr: "172.17.189.140:27080",
  691. //MongodbAddr: "127.0.0.1:27083",
  692. Size: 10,
  693. DbName: "mixdata",
  694. UserName: "SJZY_RWbid_ES",
  695. Password: "SJZY@B4i4D5e6S",
  696. //Direct: true,
  697. }
  698. Mgo.InitPool()
  699. sess := Mgo.GetMgoConn()
  700. defer Mgo.DestoryMongoConn(sess)
  701. query := sess.DB("mixdata").C("wcc_std_0410").Find(nil).Select(nil).Iter()
  702. count := 0
  703. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  704. if count%10000 == 0 {
  705. log.Println("current:", count)
  706. }
  707. autoid := util.Int64All(tmp["autoid"])
  708. if autoid == 0 {
  709. continue
  710. }
  711. where := map[string]interface{}{
  712. "autoid": autoid,
  713. }
  714. datas, _ := Mgo.FindOne("qyxy_std", where)
  715. if len(*datas) == 0 {
  716. continue
  717. }
  718. Mgo.SaveByOriID("wcc_std_0411", datas)
  719. }
  720. log.Println("over")
  721. }