main.go 22 KB


  1. package main
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "encoding/json"
  6. "fmt"
  7. "go.uber.org/zap"
  8. "io"
  9. "io/ioutil"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  13. "net"
  14. "os"
  15. "runtime"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. //CurrentColl string //当前表名
  22. //SkipCollName string
  23. //sendMsg string
  24. //collCount int // 当前表数据量
  25. //insertCount int // insert数据
  26. //updateCount int // update数据
  27. //saveLog = make(map[string]interface{})
  28. savelog sync.Map
  29. //saveArr [][]map[string]interface{}
  30. UdpClient udp.UdpClient
  31. qyxyEsAddr *net.UDPAddr
  32. localPort string // 本地监听端口
  33. startTime int64 //std 程序开始执行的时间
  34. readPath string //
  35. )
  36. func main() {
  37. localPort = GF.Env.Localport //udp 本地监听地址
  38. UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
  39. readPath = GF.Env.Path //手动指定同步的文件夹名称,否则就使用udp 传递数据
  40. qyxyEsAddr = &net.UDPAddr{
  41. Port: GF.Env.Esport,
  42. IP: net.ParseIP(GF.Env.Targetip),
  43. }
  44. log.Info("main", zap.Any("qyxyEsAddr", qyxyEsAddr))
  45. if int64(GF.Env.Autoid) == 0 || int64(GF.Env.Seoid) == 0 {
  46. log.Error("配置文件错误", zap.String("配置文件", "autoid 或者 seoid 为 0"))
  47. os.Exit(-1)
  48. }
  49. UdpClient.Listen(processUdpMsg)
  50. log.Info("main", zap.String("Udp服务监听本地端口", localPort))
  51. ch := make(chan bool, 1)
  52. <-ch
  53. }
  54. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  55. switch act {
  56. case udp.OP_TYPE_DATA:
  57. var mapInfo map[string]interface{}
  58. err := json.Unmarshal(data, &mapInfo)
  59. log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
  60. if err != nil {
  61. log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
  62. UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  63. } else if mapInfo != nil {
  64. key, _ := mapInfo["key"].(string)
  65. if key == "" {
  66. key = "udpok"
  67. }
  68. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  69. //拿到同步信号,开始同步数据
  70. if _, ok := mapInfo["start"]; ok {
  71. if _, okk := mapInfo["path"]; okk {
  72. path := util.ObjToString(mapInfo["path"])
  73. //没有指定配置文件的指定目录,就使用udp 传递目录
  74. if path != "" {
  75. readPath = path
  76. }
  77. }
  78. // 开始执行
  79. log.Info("processUdpMsg", zap.String("readPath", readPath))
  80. if readPath != "" {
  81. startTime = time.Now().Unix()
  82. go dealPath(readPath)
  83. }
  84. }
  85. }
  86. default:
  87. log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "========"))
  88. }
  89. }
  90. func dealPath(path string) {
  91. if !strings.HasSuffix(path, "/") {
  92. path = path + "/" ///Users/wangchengcheng/Desktop/jianyu/upload/20221119
  93. }
  94. //var wg sync.WaitGroup
  95. //std 程序只需要关注6个表
  96. for _, c := range CollArr {
  97. subPath := path + c + "/"
  98. //判断文件夹存在
  99. _, err := os.Stat(subPath) ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/
  100. if err != nil {
  101. log.Info("dealPath", zap.Any("os.Stat err", subPath))
  102. continue
  103. }
  104. //wg.Add(1)
  105. dealSubPath(c, subPath)
  106. //go dealSubPath(c, subPath, &wg)
  107. }
  108. //wg.Wait()
  109. //更新 nseo_id
  110. updateStd()
  111. //最终记录
  112. var saveRes = make(map[string]interface{})
  113. for _, c := range CollArr {
  114. data, ok := savelog.Load(c)
  115. if ok {
  116. saveRes[c] = data
  117. }
  118. }
  119. MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveRes})
  120. //执行完毕通知es 程序
  121. data := map[string]interface{}{
  122. "start": true,
  123. "start_time": startTime,
  124. "end_time": time.Now().Unix(),
  125. }
  126. log.Info("dealPath", zap.String(path, "数据同步结束"))
  127. SendUdpMsg(data, qyxyEsAddr)
  128. }
  129. // dealSubPath 处理最里面层级数据;Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
  130. // c 当前表;
  131. func dealSubPath(c, subPath string) {
  132. //defer wg.Done()
  133. log.Info("dealSubPath", zap.String("开始处理path:", subPath))
  134. start := time.Now()
  135. //var fileWg sync.WaitGroup
  136. var linesMap sync.Map
  137. subFiles, _ := ioutil.ReadDir(subPath)
  138. for _, s := range subFiles { //七天的文件夹名
  139. if s.IsDir() {
  140. //fileWg.Add(1)
  141. dealinfo(c, subPath+s.Name(), &linesMap)
  142. ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
  143. //go dealinfo(c, subPath+s.Name(), &fileWg, &linesMap)
  144. }
  145. }
  146. //fileWg.Wait()
  147. cCount, _ := linesMap.Load(c)
  148. duration := time.Since(start)
  149. result := map[string]interface{}{
  150. "count": cCount,
  151. "duration": fmt.Sprintf("%v min", duration.Minutes()), //运行时长
  152. }
  153. savelog.Store(c, result)
  154. }
  155. func dealinfo(c, path string, linesMap *sync.Map) {
  156. //defer wg.Done()
  157. count := 0
  158. file := path + "/split.json.gz"
  159. log.Info("dealinfo", zap.Any("current date", file))
  160. fileInfo, err := os.Stat(file)
  161. if fileInfo.Size() == 0 {
  162. return
  163. }
  164. if err == nil {
  165. // 打开本地gz格式压缩包
  166. fr, err := os.Open(file)
  167. if err != nil {
  168. log.Info("dealinfo", zap.Error(err))
  169. return
  170. } else {
  171. fmt.Println("open file success!", file)
  172. }
  173. // defer: 在函数退出时,执行关闭文件
  174. defer fr.Close()
  175. // 创建gzip文件读取对象
  176. gr, err := gzip.NewReader(fr)
  177. if err != nil {
  178. log.Info("reader "+file, zap.Error(err))
  179. return
  180. }
  181. // defer: 在函数退出时,执行关闭gzip对象
  182. defer gr.Close()
  183. wg := sync.WaitGroup{}
  184. ch := make(chan bool, 3)
  185. bfRd := bufio.NewReader(gr)
  186. for {
  187. line, err := bfRd.ReadBytes('\n')
  188. if err != nil {
  189. if err == io.EOF {
  190. log.Info("dealinfo", zap.String(fmt.Sprintf("%v/split.json.gz", path), "read gzip data finish!"))
  191. fmt.Println("read gzip data finish! ")
  192. break
  193. } else {
  194. log.Error("dealinfo", zap.Any(fmt.Sprintf("%v/split.json.gz;read gzip err", path), err))
  195. }
  196. }
  197. if len(line) > 0 {
  198. count++
  199. linesMap.Store(c, count)
  200. ch <- true
  201. wg.Add(1)
  202. go func(c string, line []byte) {
  203. defer func() {
  204. <-ch
  205. wg.Done()
  206. }()
  207. hookfn(c, line)
  208. }(c, line)
  209. }
  210. if count%5000 == 0 {
  211. printMemoryUsage()
  212. log.Info("dealinfo", zap.Any("current exc---", fmt.Sprintf("%s-%d", file, count)))
  213. }
  214. }
  215. wg.Wait()
  216. }
  217. }
  218. // hookfn 处理拿到的每行数据
  219. func hookfn(c string, line []byte) {
  220. tmp := make(map[string]interface{})
  221. err := json.Unmarshal(line, &tmp)
  222. if err != nil {
  223. log.Error("hookfn", zap.Any("Unmarshal", err))
  224. }
  225. if _, ok := tmp["company_id"]; ok {
  226. //针对数据表 不同处理
  227. switch c {
  228. case "company_base":
  229. dealCompanyBase(tmp)
  230. case "company_employee":
  231. dealCompanyEmployee(tmp)
  232. case "company_history_name":
  233. dealHistoryName(tmp)
  234. case "company_partner":
  235. dealCompanyPartner(tmp)
  236. case "annual_report_base":
  237. dealAnnualReportBase(tmp)
  238. case "annual_report_website":
  239. dealAnnualReportWebsite(tmp)
  240. default:
  241. fmt.Println("CurrentColl =>", c)
  242. }
  243. }
  244. }
  245. // dealCompanyBase company_base数据表
  246. func dealCompanyBase(data map[string]interface{}) {
  247. update := make(map[string]interface{})
  248. save := make(map[string]interface{})
  249. for _, v := range company_base {
  250. if data[v] == nil {
  251. continue
  252. }
  253. // company_type 公司类型处理
  254. if v == "company_type" {
  255. save["company_type_old"] = data[v]
  256. if text := util.ObjToString(data["company_type"]); text != "" {
  257. if strings.Contains(text, "个体") || strings.Contains(text, "非公司") {
  258. save["company_type"] = "个体工商户"
  259. } else {
  260. text = strings.ReplaceAll(text, "(", "(")
  261. text = strings.ReplaceAll(text, ")", ")")
  262. if stype := QyStypeMap[text]; stype != "" {
  263. save["company_type"] = stype
  264. } else {
  265. save["company_type"] = "其他"
  266. }
  267. }
  268. }
  269. } else if v == "company_status" {
  270. save["company_status_old"] = data[v]
  271. if text := util.ObjToString(data["company_status"]); text != "" {
  272. text = strings.ReplaceAll(text, "(", "(")
  273. text = strings.ReplaceAll(text, ")", ")")
  274. if status := CompanyStatusMap[text]; status != "" {
  275. save["company_status"] = status
  276. } else {
  277. save["company_status"] = "其他"
  278. }
  279. }
  280. } else if v == "capital" {
  281. // capital/currency
  282. text := util.ObjToString(data[v])
  283. if currency := GetCurrency(text); currency != "" {
  284. save["currency"] = currency //币种
  285. }
  286. capital := ObjToMoney(text)
  287. capital = capital / 10000
  288. if capital != 0 {
  289. save[v] = capital
  290. }
  291. } else if v == "use_flag" {
  292. save[v] = util.IntAll(data[v])
  293. } else {
  294. save[v] = data[v]
  295. }
  296. }
  297. // mysql create_time/update_time
  298. save["create_time_msql"] = data["create_time"]
  299. save["update_time_msql"] = data["update_time"]
  300. save["_id"] = data["company_id"]
  301. save["autoid"] = util.Int64All(data["id"])
  302. save["createtime"] = time.Now().Unix()
  303. save["updatetime"] = time.Now().Unix()
  304. // company_area/company_city/company_district
  305. pshort := util.ObjToString(data["province_short"])
  306. save["company_area"] = province_map[pshort]
  307. //company_city,company_district
  308. for i, field := range AreaFiled {
  309. if data[field] == nil {
  310. continue
  311. }
  312. if code := fmt.Sprint(data[field]); code != "" {
  313. if i == 0 && len(code) >= 8 { //credit_no企业信用代码
  314. code = code[2:8]
  315. } else if i == 1 && len(code) >= 6 { //company_code注册号
  316. code = code[:6]
  317. }
  318. if city := AddressMap[code]; city != nil { //未作废中取
  319. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  320. if city.City != "" {
  321. save["company_city"] = city.City //市
  322. }
  323. if city.District != "" {
  324. save["company_district"] = city.District //县
  325. }
  326. break
  327. }
  328. } else { //作废中取
  329. if city := AddressOldMap[code]; city != nil {
  330. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  331. if city.City != "" {
  332. save["company_city"] = city.City //市
  333. }
  334. if city.District != "" {
  335. save["company_district"] = city.District //县
  336. }
  337. }
  338. break
  339. }
  340. }
  341. }
  342. }
  343. // search_type
  344. if t := util.ObjToString(save["company_type"]); t != "" {
  345. if t != "个体工商户" && t != "其他" {
  346. t1 := util.ObjToString(save["company_type_old"])
  347. name := util.ObjToString(save["company_name"])
  348. if strings.Contains(t1, "有限合伙") {
  349. save["search_type"] = "有限合伙"
  350. } else if strings.Contains(t1, "合伙") {
  351. save["search_type"] = "普通合伙"
  352. } else if strings.Contains(name, "股份") ||
  353. (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
  354. save["search_type"] = "股份有限公司"
  355. } else {
  356. save["search_type"] = "有限责任公司"
  357. }
  358. }
  359. }
  360. // company_shortname
  361. if m := getStName(util.ObjToString(save["company_name"])); m != "" {
  362. save["company_shortname"] = m
  363. }
  364. // bid_unittype
  365. flag := false
  366. for _, v := range WordsArr {
  367. if strings.Contains(util.ObjToString(save["business_scope"]), v) {
  368. flag = true
  369. break
  370. }
  371. }
  372. if flag {
  373. if save["bid_unittype"] != nil {
  374. save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商")
  375. } else {
  376. save["bid_unittype"] = []string{"厂商"}
  377. }
  378. }
  379. update["$set"] = save
  380. updataInfo := []map[string]interface{}{
  381. {"_id": save["_id"]},
  382. update,
  383. }
  384. MongoTool.UpSertBulk(GF.Env.Dbsave, updataInfo)
  385. //saveArr = append(saveArr, updataInfo)
  386. //
  387. ////500 条处理一次,打印一次记录
  388. //if len(saveArr) >= 500 {
  389. // tmps := saveArr
  390. // res := MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  391. // if !res {
  392. // log.Info("dealCompanyBase", zap.Any("UpSertBulk company_base err", res))
  393. // }
  394. // saveArr = [][]map[string]interface{}{}
  395. //}
  396. }
  397. // dealCompanyEmployee company_employee
  398. func dealCompanyEmployee(data map[string]interface{}) {
  399. save := make(map[string]interface{})
  400. save["_id"] = data["company_id"]
  401. save["updatetime"] = time.Now().Unix()
  402. oldTmp := &map[string]interface{}{}
  403. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  404. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  405. var names []string
  406. var arr []map[string]interface{}
  407. if (*oldTmp)["employees"] != nil {
  408. arr = util.ObjArrToMapArr((*oldTmp)["employees"].([]interface{}))
  409. } else {
  410. arr = make([]map[string]interface{}, 0)
  411. }
  412. ep := make(map[string]interface{})
  413. if util.ObjToString(data["_operation_type"]) == "insert" {
  414. ep["employee_name"] = data["employee_name"]
  415. ep["position"] = data["position"]
  416. ep["is_history"] = data["is_history"]
  417. ep["_id"] = util.IntAll(data["id"])
  418. arr = append(arr, ep)
  419. names = append(names, util.ObjToString(data["employee_name"]))
  420. } else {
  421. eq_flag := true
  422. for _, m := range arr {
  423. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  424. eq_flag = false
  425. m["employee_name"] = data["employee_name"]
  426. m["position"] = data["position"]
  427. m["is_history"] = data["is_history"]
  428. break
  429. }
  430. }
  431. if eq_flag {
  432. ep := make(map[string]interface{})
  433. ep["employee_name"] = data["employee_name"]
  434. ep["position"] = data["position"]
  435. ep["is_history"] = data["is_history"]
  436. ep["_id"] = util.IntAll(data["id"])
  437. arr = append(arr, ep)
  438. names = append(names, util.ObjToString(data["employee_name"]))
  439. }
  440. }
  441. save["employees"] = arr
  442. save["employee_name"] = strings.Join(names, ",")
  443. saveInfo := []map[string]interface{}{
  444. {"_id": data["company_id"]},
  445. {"$set": save},
  446. }
  447. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  448. //saveArr = append(saveArr, saveInfo)
  449. ////500 条处理一次,打印一次记录
  450. //if len(saveArr) >= 500 {
  451. // tmps := saveArr
  452. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  453. // saveArr = [][]map[string]interface{}{}
  454. //}
  455. }
  456. // dealCompanyPartner
  457. func dealCompanyPartner(data map[string]interface{}) {
  458. save := make(map[string]interface{})
  459. save["_id"] = data["company_id"]
  460. save["updatetime"] = time.Now().Unix()
  461. oldTmp := &map[string]interface{}{}
  462. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  463. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  464. var names []string
  465. var arr []map[string]interface{}
  466. if (*oldTmp)["partners"] != nil {
  467. arr = util.ObjArrToMapArr((*oldTmp)["partners"].([]interface{}))
  468. } else {
  469. arr = make([]map[string]interface{}, 0)
  470. }
  471. if util.ObjToString(data["_operation_type"]) == "insert" {
  472. exp := make(map[string]interface{})
  473. exp["stock_capital"] = data["stock_capital"]
  474. exp["stock_name"] = data["stock_name"]
  475. exp["identify_no"] = data["identify_no"]
  476. exp["stock_realcapital"] = data["stock_realcapital"]
  477. exp["is_history"] = data["is_history"]
  478. exp["is_personal"] = data["is_personal"]
  479. exp["stock_type"] = data["stock_type"]
  480. exp["identify_type"] = data["identify_type"]
  481. exp["_id"] = util.IntAll(data["id"])
  482. arr = append(arr, exp)
  483. names = append(names, util.ObjToString(data["stock_name"]))
  484. } else {
  485. eqFlag := true
  486. for _, m := range arr {
  487. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  488. eqFlag = false
  489. m["stock_capital"] = data["stock_capital"]
  490. m["stock_name"] = data["stock_name"]
  491. m["identify_no"] = data["identify_no"]
  492. m["stock_realcapital"] = data["stock_realcapital"]
  493. m["is_history"] = data["is_history"]
  494. m["is_personal"] = data["is_personal"]
  495. m["stock_type"] = data["stock_type"]
  496. m["identify_type"] = data["identify_type"]
  497. break
  498. }
  499. }
  500. if eqFlag {
  501. exp := make(map[string]interface{})
  502. exp["stock_capital"] = data["stock_capital"]
  503. exp["stock_name"] = data["stock_name"]
  504. exp["identify_no"] = data["identify_no"]
  505. exp["stock_realcapital"] = data["stock_realcapital"]
  506. exp["is_history"] = data["is_history"]
  507. exp["is_personal"] = data["is_personal"]
  508. exp["stock_type"] = data["stock_type"]
  509. exp["identify_type"] = data["identify_type"]
  510. exp["_id"] = util.IntAll(data["id"])
  511. arr = append(arr, exp)
  512. names = append(names, util.ObjToString(data["stock_name"]))
  513. }
  514. }
  515. save["partners"] = arr
  516. save["stock_name"] = strings.Join(names, ",")
  517. saveInfo := []map[string]interface{}{
  518. {"_id": data["company_id"]},
  519. {"$set": save},
  520. }
  521. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  522. //saveArr = append(saveArr, saveInfo)
  523. ////500 条处理一次,打印一次记录
  524. //if len(saveArr) >= 500 {
  525. // tmps := saveArr
  526. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  527. // saveArr = [][]map[string]interface{}{}
  528. //}
  529. }
  530. // dealAnnualReportBase annual_report_base
  531. func dealAnnualReportBase(data map[string]interface{}) {
  532. save := make(map[string]interface{})
  533. save["_id"] = data["company_id"]
  534. save["updatetime"] = time.Now().Unix()
  535. oldTmp := &map[string]interface{}{}
  536. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  537. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  538. var arr []map[string]interface{}
  539. if (*oldTmp)["annual_reports"] != nil {
  540. arr = util.ObjArrToMapArr((*oldTmp)["annual_reports"].([]interface{}))
  541. } else {
  542. arr = make([]map[string]interface{}, 0)
  543. }
  544. year := 0
  545. phone, email := "", ""
  546. employeeNum := 0
  547. if util.ObjToString(data["_operation_type"]) == "insert" {
  548. exp := make(map[string]interface{})
  549. exp["operator_name"] = data["operator_name"]
  550. exp["report_year"] = data["report_year"]
  551. exp["zip_code"] = data["zip_code"]
  552. exp["employee_no"] = data["employee_no"]
  553. exp["member_no"] = data["member_no"]
  554. exp["company_phone"] = data["company_phone"]
  555. exp["company_email"] = data["company_email"]
  556. exp["_id"] = util.IntAll(data["id"])
  557. arr = append(arr, exp)
  558. if year < util.IntAll(data["report_year"]) {
  559. year = util.IntAll(data["report_year"])
  560. phone = util.ObjToString(data["company_phone"])
  561. email = util.ObjToString(data["company_email"])
  562. employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
  563. memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
  564. if employeeNo > 0 {
  565. employeeNum = employeeNo
  566. } else if memberNo > 0 {
  567. employeeNum = memberNo
  568. }
  569. }
  570. } else {
  571. eqFlag := true
  572. for _, m := range arr {
  573. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  574. eqFlag = false
  575. m["operator_name"] = data["operator_name"]
  576. m["report_year"] = data["report_year"]
  577. m["zip_code"] = data["zip_code"]
  578. m["employee_no"] = data["employee_no"]
  579. m["member_no"] = data["member_no"]
  580. m["company_phone"] = data["company_phone"]
  581. m["company_email"] = data["company_email"]
  582. break
  583. }
  584. }
  585. if eqFlag {
  586. exp := make(map[string]interface{})
  587. exp["operator_name"] = data["operator_name"]
  588. exp["report_year"] = data["report_year"]
  589. exp["zip_code"] = data["zip_code"]
  590. exp["employee_no"] = data["employee_no"]
  591. exp["member_no"] = data["member_no"]
  592. exp["company_phone"] = data["company_phone"]
  593. exp["company_email"] = data["company_email"]
  594. exp["_id"] = util.IntAll(data["id"])
  595. arr = append(arr, exp)
  596. if year < util.IntAll(data["report_year"]) {
  597. year = util.IntAll(data["report_year"])
  598. phone = util.ObjToString(data["company_phone"])
  599. email = util.ObjToString(data["company_email"])
  600. employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
  601. memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
  602. if employeeNo > 0 {
  603. employeeNum = employeeNo
  604. } else if memberNo > 0 {
  605. employeeNum = memberNo
  606. }
  607. }
  608. }
  609. }
  610. save["annual_reports"] = arr
  611. if year != 0 {
  612. save["company_phone"] = phone
  613. save["company_email"] = email
  614. save["employee_num"] = employeeNum
  615. }
  616. saveInfo := []map[string]interface{}{
  617. {"_id": data["company_id"]},
  618. {"$set": save},
  619. }
  620. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  621. //
  622. //saveArr = append(saveArr, saveInfo)
  623. ////500 条处理一次,打印一次记录
  624. //if len(saveArr) >= 500 {
  625. // tmps := saveArr
  626. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  627. // saveArr = [][]map[string]interface{}{}
  628. //}
  629. }
  630. // dealHistoryName company_history_name
  631. func dealHistoryName(data map[string]interface{}) {
  632. save := make(map[string]interface{})
  633. save["_id"] = data["company_id"]
  634. save["updatetime"] = time.Now().Unix()
  635. var names []string
  636. if data["history_name"] != nil {
  637. name := data["history_name"].(string)
  638. names = append(names, name)
  639. save["history_name"] = strings.Join(names, ",")
  640. }
  641. saveInfo := []map[string]interface{}{
  642. {"_id": data["company_id"]},
  643. {"$set": save},
  644. }
  645. //saveArr = append(saveArr, saveInfo)
  646. addSet := []map[string]interface{}{
  647. {"_id": data["company_id"]},
  648. {"$addToSet": map[string]interface{}{
  649. "history_names": map[string]interface{}{
  650. "$each": names}}},
  651. }
  652. //单独对每条的历史名称追加数组
  653. MongoTool.UpSertBulk(GF.Env.Dbsave, addSet)
  654. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  655. ////500 条处理一次,打印一次记录
  656. //if len(saveArr) >= 500 {
  657. // tmps := saveArr
  658. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  659. // saveArr = [][]map[string]interface{}{}
  660. //}
  661. }
  662. // dealAnnualReportWebsite annual_report_website
  663. func dealAnnualReportWebsite(data map[string]interface{}) {
  664. save := make(map[string]interface{})
  665. save["_id"] = data["company_id"]
  666. save["updatetime"] = time.Now().Unix()
  667. year := 0
  668. web := ""
  669. if year < util.IntAll(data["report_year"]) && util.IntAll(data["is_history"]) == 0 {
  670. year = util.IntAll(data["report_year"])
  671. web = util.ObjToString(data["website_url"])
  672. }
  673. if year != 0 {
  674. save["website_url"] = web
  675. }
  676. saveInfo := []map[string]interface{}{
  677. {"_id": data["company_id"]},
  678. {"$set": save},
  679. }
  680. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  681. //saveArr = append(saveArr, saveInfo)
  682. //500 条处理一次,打印一次记录
  683. //if len(saveArr) >= 500 {
  684. // tmps := saveArr
  685. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  686. // saveArr = [][]map[string]interface{}{}
  687. //}
  688. }
  689. func printMemoryUsage() {
  690. var memStats runtime.MemStats
  691. runtime.ReadMemStats(&memStats)
  692. // 将字节转换为兆字节(MB)
  693. allocatedMB := float64(memStats.Alloc) / 1024 / 1024
  694. totalAllocatedMB := float64(memStats.TotalAlloc) / 1024 / 1024
  695. heapAllocMB := float64(memStats.HeapAlloc) / 1024 / 1024
  696. log.Info("printMemoryUsage", zap.Any("当前程序已分配的内存大小", allocatedMB))
  697. log.Info("printMemoryUsage", zap.Any("程序自启动以来总共分配的内存大小", totalAllocatedMB))
  698. log.Info("printMemoryUsage", zap.Any("堆上当前已分配但尚未释放的内存", heapAllocMB))
  699. log.Info("printMemoryUsage", zap.Any("堆上分配的对象数", memStats.HeapObjects))
  700. }