main.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971
  1. package main
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "encoding/json"
  6. "fmt"
  7. "go.uber.org/zap"
  8. "io"
  9. "io/ioutil"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  13. "net"
  14. "os"
  15. "runtime"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. //CurrentColl string //当前表名
  22. //SkipCollName string
  23. //sendMsg string
  24. //collCount int // 当前表数据量
  25. //insertCount int // insert数据
  26. //updateCount int // update数据
  27. //saveLog = make(map[string]interface{})
  28. savelog sync.Map
  29. //saveArr [][]map[string]interface{}
  30. UdpClient udp.UdpClient
  31. qyxyEsAddr *net.UDPAddr
  32. localPort string // 本地监听端口
  33. startTime int64 //std 程序开始执行的时间
  34. readPath string //
  35. )
  36. func main() {
  37. Init() //初始化配置
  38. localPort = GF.Env.Localport //udp 本地监听地址
  39. UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
  40. readPath = GF.Env.Path //手动指定同步的文件夹名称,否则就使用udp 传递数据
  41. qyxyEsAddr = &net.UDPAddr{
  42. Port: GF.Env.Esport,
  43. IP: net.ParseIP(GF.Env.Targetip),
  44. }
  45. log.Info("main", zap.Any("qyxyEsAddr", qyxyEsAddr))
  46. UdpClient.Listen(processUdpMsg)
  47. log.Info("main", zap.String("Udp服务监听本地端口", localPort))
  48. ch := make(chan bool, 1)
  49. <-ch
  50. }
  51. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  52. switch act {
  53. case udp.OP_TYPE_DATA:
  54. var mapInfo map[string]interface{}
  55. err := json.Unmarshal(data, &mapInfo)
  56. log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
  57. if err != nil {
  58. log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
  59. UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  60. } else if mapInfo != nil {
  61. key, _ := mapInfo["key"].(string)
  62. if key == "" {
  63. key = "udpok"
  64. }
  65. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  66. //拿到同步信号,开始同步数据
  67. if _, ok := mapInfo["start"]; ok {
  68. if _, okk := mapInfo["path"]; okk {
  69. path := util.ObjToString(mapInfo["path"])
  70. //没有指定配置文件的指定目录,就使用udp 传递目录
  71. if path != "" {
  72. readPath = path
  73. }
  74. }
  75. // 开始执行
  76. log.Info("processUdpMsg", zap.String("readPath", readPath))
  77. if readPath != "" {
  78. startTime = time.Now().Unix()
  79. go dealPath(readPath)
  80. }
  81. }
  82. }
  83. default:
  84. log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "========"))
  85. }
  86. }
  87. func dealPath(path string) {
  88. if !strings.HasSuffix(path, "/") {
  89. path = path + "/" ///Users/wangchengcheng/Desktop/jianyu/upload/20221119
  90. }
  91. //var wg sync.WaitGroup
  92. //std 程序只需要关注6个表
  93. for _, c := range CollArr {
  94. subPath := path + c + "/"
  95. //判断文件夹存在
  96. _, err := os.Stat(subPath) ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/
  97. if err != nil {
  98. log.Info("dealPath", zap.Any("os.Stat err", subPath))
  99. continue
  100. }
  101. //wg.Add(1)
  102. dealSubPath(c, subPath)
  103. //go dealSubPath(c, subPath, &wg)
  104. }
  105. //wg.Wait()
  106. //更新 nseo_id
  107. updateStd()
  108. //最终记录
  109. var saveRes = make(map[string]interface{})
  110. for _, c := range CollArr {
  111. data, ok := savelog.Load(c)
  112. if ok {
  113. saveRes[c] = data
  114. }
  115. }
  116. MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveRes})
  117. //执行完毕通知es 程序
  118. data := map[string]interface{}{
  119. "start": true,
  120. "start_time": startTime,
  121. "end_time": time.Now().Unix(),
  122. }
  123. log.Info("dealPath", zap.String(path, "数据同步结束"))
  124. SendUdpMsg(data, qyxyEsAddr)
  125. }
  126. // dealSubPath 处理最里面层级数据;Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
  127. // c 当前表;
  128. func dealSubPath(c, subPath string) {
  129. //defer wg.Done()
  130. log.Info("dealSubPath", zap.String("开始处理path:", subPath))
  131. start := time.Now()
  132. //var fileWg sync.WaitGroup
  133. var linesMap sync.Map
  134. subFiles, _ := ioutil.ReadDir(subPath)
  135. for _, s := range subFiles { //七天的文件夹名
  136. if s.IsDir() {
  137. //fileWg.Add(1)
  138. dealinfo(c, subPath+s.Name(), &linesMap)
  139. ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
  140. //go dealinfo(c, subPath+s.Name(), &fileWg, &linesMap)
  141. }
  142. }
  143. //fileWg.Wait()
  144. cCount, _ := linesMap.Load(c)
  145. duration := time.Since(start)
  146. result := map[string]interface{}{
  147. "count": cCount,
  148. "duration": fmt.Sprintf("%v min", duration.Minutes()), //运行时长
  149. }
  150. savelog.Store(c, result)
  151. }
  152. func dealinfo(c, path string, linesMap *sync.Map) {
  153. //defer wg.Done()
  154. count := 0
  155. file := path + "/split.json.gz"
  156. log.Info("dealinfo", zap.Any("current date", file))
  157. fileInfo, err := os.Stat(file)
  158. if fileInfo.Size() == 0 {
  159. return
  160. }
  161. if err == nil {
  162. // 打开本地gz格式压缩包
  163. fr, err := os.Open(file)
  164. if err != nil {
  165. log.Info("dealinfo", zap.Error(err))
  166. return
  167. } else {
  168. fmt.Println("open file success!", file)
  169. }
  170. // defer: 在函数退出时,执行关闭文件
  171. defer fr.Close()
  172. // 创建gzip文件读取对象
  173. gr, err := gzip.NewReader(fr)
  174. if err != nil {
  175. log.Info("reader "+file, zap.Error(err))
  176. return
  177. }
  178. // defer: 在函数退出时,执行关闭gzip对象
  179. defer gr.Close()
  180. wg := sync.WaitGroup{}
  181. ch := make(chan bool, 3)
  182. bfRd := bufio.NewReader(gr)
  183. for {
  184. line, err := bfRd.ReadBytes('\n')
  185. if err != nil {
  186. if err == io.EOF {
  187. log.Info("dealinfo", zap.String(fmt.Sprintf("%v/split.json.gz", path), "read gzip data finish!"))
  188. fmt.Println("read gzip data finish! ")
  189. break
  190. } else {
  191. log.Error("dealinfo", zap.Any(fmt.Sprintf("%v/split.json.gz;read gzip err", path), err))
  192. }
  193. }
  194. if len(line) > 0 {
  195. count++
  196. linesMap.Store(c, count)
  197. ch <- true
  198. wg.Add(1)
  199. go func(c string, line []byte) {
  200. defer func() {
  201. <-ch
  202. wg.Done()
  203. }()
  204. hookfn(c, line)
  205. }(c, line)
  206. }
  207. if count%5000 == 0 {
  208. printMemoryUsage()
  209. log.Info("dealinfo", zap.Any("current exc---", fmt.Sprintf("%s-%d", file, count)))
  210. }
  211. }
  212. wg.Wait()
  213. }
  214. }
  215. // hookfn 处理拿到的每行数据
  216. func hookfn(c string, line []byte) {
  217. tmp := make(map[string]interface{})
  218. err := json.Unmarshal(line, &tmp)
  219. if err != nil {
  220. log.Error("hookfn", zap.Any("Unmarshal", err))
  221. }
  222. if _, ok := tmp["company_id"]; ok {
  223. //针对数据表 不同处理
  224. switch c {
  225. case "company_base":
  226. dealCompanyBase(tmp)
  227. case "company_employee":
  228. dealCompanyEmployee(tmp)
  229. case "company_history_name":
  230. dealHistoryName(tmp)
  231. case "company_partner":
  232. dealCompanyPartner(tmp)
  233. case "annual_report_base":
  234. dealAnnualReportBase(tmp)
  235. case "annual_report_website":
  236. dealAnnualReportWebsite(tmp)
  237. //处理特使企业信息
  238. case "special_enterprise", "special_foundation", "special_gov_unit", "special_hongkong_company", "special_law_office", "special_social_organ", "special_trade_union":
  239. dealSpecial(c, tmp)
  240. default:
  241. fmt.Println("CurrentColl =>", c)
  242. }
  243. }
  244. }
  245. // dealCompanyBase company_base数据表
  246. func dealCompanyBase(data map[string]interface{}) {
  247. update := make(map[string]interface{})
  248. save := make(map[string]interface{})
  249. for _, v := range company_base {
  250. if data[v] == nil {
  251. continue
  252. }
  253. // company_type 公司类型处理
  254. if v == "company_type" {
  255. save["company_type_old"] = data[v]
  256. if text := util.ObjToString(data["company_type"]); text != "" {
  257. if strings.Contains(text, "个体") || strings.Contains(text, "非公司") {
  258. save["company_type"] = "个体工商户"
  259. } else {
  260. text = strings.ReplaceAll(text, "(", "(")
  261. text = strings.ReplaceAll(text, ")", ")")
  262. if stype := QyStypeMap[text]; stype != "" {
  263. save["company_type"] = stype
  264. } else {
  265. save["company_type"] = "其他"
  266. }
  267. }
  268. }
  269. } else if v == "company_status" {
  270. save["company_status_old"] = data[v]
  271. if text := util.ObjToString(data["company_status"]); text != "" {
  272. text = strings.ReplaceAll(text, "(", "(")
  273. text = strings.ReplaceAll(text, ")", ")")
  274. if status := CompanyStatusMap[text]; status != "" {
  275. save["company_status"] = status
  276. } else {
  277. save["company_status"] = "其他"
  278. }
  279. }
  280. } else if v == "capital" {
  281. // capital/currency
  282. text := util.ObjToString(data[v])
  283. if currency := GetCurrency(text); currency != "" {
  284. save["currency"] = currency //币种
  285. }
  286. capital := ObjToMoney(text)
  287. capital = capital / 10000
  288. //if capital != 0 {
  289. save[v] = capital
  290. //}
  291. } else if v == "use_flag" {
  292. save[v] = util.IntAll(data[v])
  293. } else {
  294. save[v] = data[v]
  295. }
  296. }
  297. // mysql create_time/update_time
  298. save["create_time_msql"] = data["create_time"]
  299. save["update_time_msql"] = data["update_time"]
  300. save["_id"] = data["company_id"]
  301. save["autoid"] = util.Int64All(data["id"])
  302. save["createtime"] = time.Now().Unix()
  303. save["updatetime"] = time.Now().Unix()
  304. // company_area/company_city/company_district
  305. pshort := util.ObjToString(data["province_short"])
  306. save["company_area"] = province_map[pshort]
  307. //company_city,company_district
  308. for i, field := range AreaFiled {
  309. if data[field] == nil {
  310. continue
  311. }
  312. if code := fmt.Sprint(data[field]); code != "" {
  313. if i == 0 && len(code) >= 8 { //credit_no企业信用代码
  314. code = code[2:8]
  315. } else if i == 1 && len(code) >= 6 { //company_code注册号
  316. code = code[:6]
  317. }
  318. if city := AddressMap[code]; city != nil { //未作废中取
  319. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  320. if city.City != "" {
  321. save["company_city"] = city.City //市
  322. }
  323. if city.District != "" {
  324. save["company_district"] = city.District //县
  325. }
  326. break
  327. }
  328. } else { //作废中取
  329. if city := AddressOldMap[code]; city != nil {
  330. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  331. if city.City != "" {
  332. save["company_city"] = city.City //市
  333. }
  334. if city.District != "" {
  335. save["company_district"] = city.District //县
  336. }
  337. }
  338. break
  339. }
  340. }
  341. }
  342. }
  343. // search_type
  344. if t := util.ObjToString(save["company_type"]); t != "" {
  345. if t != "个体工商户" && t != "其他" {
  346. t1 := util.ObjToString(save["company_type_old"])
  347. name := util.ObjToString(save["company_name"])
  348. if strings.Contains(t1, "有限合伙") {
  349. save["search_type"] = "有限合伙"
  350. } else if strings.Contains(t1, "合伙") {
  351. save["search_type"] = "普通合伙"
  352. } else if strings.Contains(name, "股份") ||
  353. (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
  354. save["search_type"] = "股份有限公司"
  355. } else {
  356. save["search_type"] = "有限责任公司"
  357. }
  358. }
  359. }
  360. // company_shortname
  361. if m := getStName(util.ObjToString(save["company_name"])); m != "" {
  362. save["company_shortname"] = m
  363. }
  364. // bid_unittype
  365. flag := false
  366. for _, v := range WordsArr {
  367. if strings.Contains(util.ObjToString(save["business_scope"]), v) {
  368. flag = true
  369. break
  370. }
  371. }
  372. if flag {
  373. if save["bid_unittype"] != nil {
  374. save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商")
  375. } else {
  376. save["bid_unittype"] = []string{"厂商"}
  377. }
  378. }
  379. update["$set"] = save
  380. updataInfo := []map[string]interface{}{
  381. {"_id": save["_id"]},
  382. update,
  383. }
  384. MongoTool.UpSertBulk(GF.Env.Dbsave, updataInfo)
  385. //saveArr = append(saveArr, updataInfo)
  386. //
  387. ////500 条处理一次,打印一次记录
  388. //if len(saveArr) >= 500 {
  389. // tmps := saveArr
  390. // res := MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  391. // if !res {
  392. // log.Info("dealCompanyBase", zap.Any("UpSertBulk company_base err", res))
  393. // }
  394. // saveArr = [][]map[string]interface{}{}
  395. //}
  396. }
  397. // dealSpecial 处理特殊企业数据
  398. func dealSpecial(coll string, data map[string]interface{}) {
  399. update := make(map[string]interface{})
  400. save := make(map[string]interface{})
  401. companyID := util.ObjToString(data["company_id"])
  402. if companyID == "" {
  403. return
  404. }
  405. for _, v := range company_base {
  406. if data[v] == nil {
  407. continue
  408. }
  409. // company_type 公司类型处理
  410. if v == "company_type" {
  411. save["company_type_old"] = data[v]
  412. if text := util.ObjToString(data["company_type"]); text != "" {
  413. if strings.Contains(text, "个体") || strings.Contains(text, "非公司") {
  414. save["company_type"] = "个体工商户"
  415. } else {
  416. text = strings.ReplaceAll(text, "(", "(")
  417. text = strings.ReplaceAll(text, ")", ")")
  418. if stype := QyStypeMap[text]; stype != "" {
  419. save["company_type"] = stype
  420. } else {
  421. save["company_type"] = "其他"
  422. }
  423. }
  424. }
  425. } else if v == "company_status" {
  426. save["company_status_old"] = data[v]
  427. if text := util.ObjToString(data["company_status"]); text != "" {
  428. text = strings.ReplaceAll(text, "(", "(")
  429. text = strings.ReplaceAll(text, ")", ")")
  430. if status := CompanyStatusMap[text]; status != "" {
  431. save["company_status"] = status
  432. } else {
  433. save["company_status"] = "其他"
  434. }
  435. }
  436. } else if v == "capital" {
  437. // capital/currency
  438. text := util.ObjToString(data[v])
  439. if currency := GetCurrency(text); currency != "" {
  440. save["currency"] = currency //币种
  441. }
  442. capital := ObjToMoney(text)
  443. capital = capital / 10000
  444. if capital != 0 {
  445. save[v] = capital
  446. }
  447. } else if v == "use_flag" {
  448. save[v] = util.IntAll(data[v])
  449. } else {
  450. save[v] = data[v]
  451. }
  452. }
  453. // mysql create_time/update_time
  454. save["create_time_msql"] = data["create_time"]
  455. save["update_time_msql"] = data["update_time"]
  456. save["_id"] = data["company_id"]
  457. save["createtime"] = time.Now().Unix()
  458. save["updatetime"] = time.Now().Unix()
  459. // company_area/company_city/company_district
  460. pshort := util.ObjToString(data["province_short"])
  461. save["company_area"] = province_map[pshort]
  462. //company_city,company_district
  463. for i, field := range AreaFiled {
  464. if data[field] == nil {
  465. continue
  466. }
  467. if code := fmt.Sprint(data[field]); code != "" {
  468. if i == 0 && len(code) >= 8 { //credit_no企业信用代码
  469. code = code[2:8]
  470. } else if i == 1 && len(code) >= 6 { //company_code注册号
  471. code = code[:6]
  472. }
  473. if city := AddressMap[code]; city != nil { //未作废中取
  474. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  475. if city.City != "" {
  476. save["company_city"] = city.City //市
  477. }
  478. if city.District != "" {
  479. save["company_district"] = city.District //县
  480. }
  481. break
  482. }
  483. } else { //作废中取
  484. if city := AddressOldMap[code]; city != nil {
  485. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  486. if city.City != "" {
  487. save["company_city"] = city.City //市
  488. }
  489. if city.District != "" {
  490. save["company_district"] = city.District //县
  491. }
  492. }
  493. break
  494. }
  495. }
  496. }
  497. }
  498. // search_type
  499. if t := util.ObjToString(save["company_type"]); t != "" {
  500. if t != "个体工商户" && t != "其他" {
  501. t1 := util.ObjToString(save["company_type_old"])
  502. name := util.ObjToString(save["company_name"])
  503. if strings.Contains(t1, "有限合伙") {
  504. save["search_type"] = "有限合伙"
  505. } else if strings.Contains(t1, "合伙") {
  506. save["search_type"] = "普通合伙"
  507. } else if strings.Contains(name, "股份") ||
  508. (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
  509. save["search_type"] = "股份有限公司"
  510. } else {
  511. save["search_type"] = "有限责任公司"
  512. }
  513. }
  514. }
  515. // company_shortname
  516. if m := getStName(util.ObjToString(save["company_name"])); m != "" {
  517. save["company_shortname"] = m
  518. }
  519. // bid_unittype
  520. flag := false
  521. for _, v := range WordsArr {
  522. if strings.Contains(util.ObjToString(save["business_scope"]), v) {
  523. flag = true
  524. break
  525. }
  526. }
  527. if flag {
  528. if save["bid_unittype"] != nil {
  529. save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商")
  530. } else {
  531. save["bid_unittype"] = []string{"厂商"}
  532. }
  533. }
  534. oldInfo, _ := MongoTool.FindOne(GF.Env.Dbsave, map[string]interface{}{"_id": companyID})
  535. if len(*oldInfo) > 0 && util.Int64All((*oldInfo)["autoid"]) > 0 {
  536. save["autoid"] = (*oldInfo)["autoid"]
  537. } else {
  538. id := util.Int64All(data["id"])
  539. /**
  540. special_enterprise,起始ID 从2000000000 开始+data["id"]
  541. special_foundation,起始ID 从3000000000 开始+data["id"]
  542. special_gov_unit,起始ID 从4000000000 开始+data["id"]
  543. special_hongkong_company,起始ID 从5000000000 开始+data["id"]
  544. special_law_office,起始ID 从6000000000 开始+data["id"]
  545. special_social_organ,起始ID 从7000000000 开始+data["id"]
  546. special_trade_union,起始ID 从8000000000 开始+data["id"]
  547. */
  548. startID := 0
  549. if coll == "special_enterprise" {
  550. startID = 2000000000
  551. } else if coll == "special_foundation" {
  552. startID = 3000000000
  553. } else if coll == "special_gov_unit" {
  554. startID = 4000000000
  555. } else if coll == "special_hongkong_company" {
  556. startID = 5000000000
  557. } else if coll == "special_law_office" {
  558. startID = 6000000000
  559. } else if coll == "special_social_organ" {
  560. startID = 7000000000
  561. } else if coll == "special_trade_union" {
  562. startID = 8000000000
  563. }
  564. autoid := startID + int(id)
  565. save["autoid"] = autoid
  566. }
  567. update["$set"] = save
  568. updataInfo := []map[string]interface{}{
  569. {"_id": save["_id"]},
  570. update,
  571. }
  572. MongoTool.UpSertBulk(GF.Env.Dbsave, updataInfo)
  573. }
  574. // dealCompanyEmployee company_employee
  575. func dealCompanyEmployee(data map[string]interface{}) {
  576. save := make(map[string]interface{})
  577. save["_id"] = data["company_id"]
  578. save["updatetime"] = time.Now().Unix()
  579. oldTmp := &map[string]interface{}{}
  580. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  581. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  582. var names []string
  583. var arr []map[string]interface{}
  584. if (*oldTmp)["employees"] != nil {
  585. arr = util.ObjArrToMapArr((*oldTmp)["employees"].([]interface{}))
  586. } else {
  587. arr = make([]map[string]interface{}, 0)
  588. }
  589. ep := make(map[string]interface{})
  590. if util.ObjToString(data["_operation_type"]) == "insert" {
  591. ep["employee_name"] = data["employee_name"]
  592. ep["position"] = data["position"]
  593. ep["is_history"] = data["is_history"]
  594. ep["_id"] = util.IntAll(data["id"])
  595. arr = append(arr, ep)
  596. names = append(names, util.ObjToString(data["employee_name"]))
  597. } else {
  598. eq_flag := true
  599. for _, m := range arr {
  600. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  601. eq_flag = false
  602. m["employee_name"] = data["employee_name"]
  603. m["position"] = data["position"]
  604. m["is_history"] = data["is_history"]
  605. break
  606. }
  607. }
  608. if eq_flag {
  609. ep := make(map[string]interface{})
  610. ep["employee_name"] = data["employee_name"]
  611. ep["position"] = data["position"]
  612. ep["is_history"] = data["is_history"]
  613. ep["_id"] = util.IntAll(data["id"])
  614. arr = append(arr, ep)
  615. names = append(names, util.ObjToString(data["employee_name"]))
  616. }
  617. }
  618. save["employees"] = arr
  619. save["employee_name"] = strings.Join(names, ",")
  620. saveInfo := []map[string]interface{}{
  621. {"_id": data["company_id"]},
  622. {"$set": save},
  623. }
  624. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  625. //saveArr = append(saveArr, saveInfo)
  626. ////500 条处理一次,打印一次记录
  627. //if len(saveArr) >= 500 {
  628. // tmps := saveArr
  629. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  630. // saveArr = [][]map[string]interface{}{}
  631. //}
  632. }
  633. // dealCompanyPartner
  634. func dealCompanyPartner(data map[string]interface{}) {
  635. save := make(map[string]interface{})
  636. save["_id"] = data["company_id"]
  637. save["updatetime"] = time.Now().Unix()
  638. oldTmp := &map[string]interface{}{}
  639. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  640. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  641. var names []string
  642. var arr []map[string]interface{}
  643. if (*oldTmp)["partners"] != nil {
  644. arr = util.ObjArrToMapArr((*oldTmp)["partners"].([]interface{}))
  645. } else {
  646. arr = make([]map[string]interface{}, 0)
  647. }
  648. if util.ObjToString(data["_operation_type"]) == "insert" {
  649. exp := make(map[string]interface{})
  650. exp["stock_capital"] = data["stock_capital"]
  651. exp["stock_name"] = data["stock_name"]
  652. exp["identify_no"] = data["identify_no"]
  653. exp["stock_realcapital"] = data["stock_realcapital"]
  654. exp["is_history"] = data["is_history"]
  655. exp["is_personal"] = data["is_personal"]
  656. exp["stock_type"] = data["stock_type"]
  657. exp["identify_type"] = data["identify_type"]
  658. exp["_id"] = util.IntAll(data["id"])
  659. arr = append(arr, exp)
  660. names = append(names, util.ObjToString(data["stock_name"]))
  661. } else {
  662. eqFlag := true
  663. for _, m := range arr {
  664. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  665. eqFlag = false
  666. m["stock_capital"] = data["stock_capital"]
  667. m["stock_name"] = data["stock_name"]
  668. m["identify_no"] = data["identify_no"]
  669. m["stock_realcapital"] = data["stock_realcapital"]
  670. m["is_history"] = data["is_history"]
  671. m["is_personal"] = data["is_personal"]
  672. m["stock_type"] = data["stock_type"]
  673. m["identify_type"] = data["identify_type"]
  674. break
  675. }
  676. }
  677. if eqFlag {
  678. exp := make(map[string]interface{})
  679. exp["stock_capital"] = data["stock_capital"]
  680. exp["stock_name"] = data["stock_name"]
  681. exp["identify_no"] = data["identify_no"]
  682. exp["stock_realcapital"] = data["stock_realcapital"]
  683. exp["is_history"] = data["is_history"]
  684. exp["is_personal"] = data["is_personal"]
  685. exp["stock_type"] = data["stock_type"]
  686. exp["identify_type"] = data["identify_type"]
  687. exp["_id"] = util.IntAll(data["id"])
  688. arr = append(arr, exp)
  689. names = append(names, util.ObjToString(data["stock_name"]))
  690. }
  691. }
  692. save["partners"] = arr
  693. save["stock_name"] = strings.Join(names, ",")
  694. saveInfo := []map[string]interface{}{
  695. {"_id": data["company_id"]},
  696. {"$set": save},
  697. }
  698. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  699. //saveArr = append(saveArr, saveInfo)
  700. ////500 条处理一次,打印一次记录
  701. //if len(saveArr) >= 500 {
  702. // tmps := saveArr
  703. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  704. // saveArr = [][]map[string]interface{}{}
  705. //}
  706. }
  707. // dealAnnualReportBase annual_report_base
  708. func dealAnnualReportBase(data map[string]interface{}) {
  709. save := make(map[string]interface{})
  710. save["_id"] = data["company_id"]
  711. save["updatetime"] = time.Now().Unix()
  712. oldTmp := &map[string]interface{}{}
  713. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  714. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  715. var arr []map[string]interface{}
  716. if (*oldTmp)["annual_reports"] != nil {
  717. arr = util.ObjArrToMapArr((*oldTmp)["annual_reports"].([]interface{}))
  718. } else {
  719. arr = make([]map[string]interface{}, 0)
  720. }
  721. year := 0
  722. phone, email := "", ""
  723. employeeNum := 0
  724. if util.ObjToString(data["_operation_type"]) == "insert" {
  725. exp := make(map[string]interface{})
  726. exp["operator_name"] = data["operator_name"]
  727. exp["report_year"] = data["report_year"]
  728. exp["zip_code"] = data["zip_code"]
  729. exp["employee_no"] = data["employee_no"]
  730. exp["member_no"] = data["member_no"]
  731. exp["company_phone"] = data["company_phone"]
  732. exp["company_email"] = data["company_email"]
  733. exp["_id"] = util.IntAll(data["id"])
  734. arr = append(arr, exp)
  735. if year < util.IntAll(data["report_year"]) {
  736. year = util.IntAll(data["report_year"])
  737. phone = util.ObjToString(data["company_phone"])
  738. email = util.ObjToString(data["company_email"])
  739. employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
  740. memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
  741. if employeeNo > 0 {
  742. employeeNum = employeeNo
  743. } else if memberNo > 0 {
  744. employeeNum = memberNo
  745. }
  746. }
  747. } else {
  748. eqFlag := true
  749. for _, m := range arr {
  750. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  751. eqFlag = false
  752. m["operator_name"] = data["operator_name"]
  753. m["report_year"] = data["report_year"]
  754. m["zip_code"] = data["zip_code"]
  755. m["employee_no"] = data["employee_no"]
  756. m["member_no"] = data["member_no"]
  757. m["company_phone"] = data["company_phone"]
  758. m["company_email"] = data["company_email"]
  759. break
  760. }
  761. }
  762. if eqFlag {
  763. exp := make(map[string]interface{})
  764. exp["operator_name"] = data["operator_name"]
  765. exp["report_year"] = data["report_year"]
  766. exp["zip_code"] = data["zip_code"]
  767. exp["employee_no"] = data["employee_no"]
  768. exp["member_no"] = data["member_no"]
  769. exp["company_phone"] = data["company_phone"]
  770. exp["company_email"] = data["company_email"]
  771. exp["_id"] = util.IntAll(data["id"])
  772. arr = append(arr, exp)
  773. if year < util.IntAll(data["report_year"]) {
  774. year = util.IntAll(data["report_year"])
  775. phone = util.ObjToString(data["company_phone"])
  776. email = util.ObjToString(data["company_email"])
  777. employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
  778. memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
  779. if employeeNo > 0 {
  780. employeeNum = employeeNo
  781. } else if memberNo > 0 {
  782. employeeNum = memberNo
  783. }
  784. }
  785. }
  786. }
  787. save["annual_reports"] = arr
  788. if year != 0 {
  789. save["company_phone"] = phone
  790. save["company_email"] = email
  791. save["employee_num"] = employeeNum
  792. }
  793. saveInfo := []map[string]interface{}{
  794. {"_id": data["company_id"]},
  795. {"$set": save},
  796. }
  797. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  798. //
  799. //saveArr = append(saveArr, saveInfo)
  800. ////500 条处理一次,打印一次记录
  801. //if len(saveArr) >= 500 {
  802. // tmps := saveArr
  803. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  804. // saveArr = [][]map[string]interface{}{}
  805. //}
  806. }
  807. // dealHistoryName company_history_name
  808. func dealHistoryName(data map[string]interface{}) {
  809. save := make(map[string]interface{})
  810. save["_id"] = data["company_id"]
  811. save["updatetime"] = time.Now().Unix()
  812. var names []string
  813. if data["history_name"] != nil {
  814. name := data["history_name"].(string)
  815. names = append(names, name)
  816. save["history_name"] = strings.Join(names, ",")
  817. }
  818. saveInfo := []map[string]interface{}{
  819. {"_id": data["company_id"]},
  820. {"$set": save},
  821. }
  822. //saveArr = append(saveArr, saveInfo)
  823. addSet := []map[string]interface{}{
  824. {"_id": data["company_id"]},
  825. {"$addToSet": map[string]interface{}{
  826. "history_names": map[string]interface{}{
  827. "$each": names}}},
  828. }
  829. //单独对每条的历史名称追加数组
  830. MongoTool.UpSertBulk(GF.Env.Dbsave, addSet)
  831. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  832. ////500 条处理一次,打印一次记录
  833. //if len(saveArr) >= 500 {
  834. // tmps := saveArr
  835. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  836. // saveArr = [][]map[string]interface{}{}
  837. //}
  838. }
  839. // dealAnnualReportWebsite annual_report_website
  840. func dealAnnualReportWebsite(data map[string]interface{}) {
  841. save := make(map[string]interface{})
  842. save["_id"] = data["company_id"]
  843. save["updatetime"] = time.Now().Unix()
  844. year := 0
  845. web := ""
  846. if year < util.IntAll(data["report_year"]) && util.IntAll(data["is_history"]) == 0 {
  847. year = util.IntAll(data["report_year"])
  848. web = util.ObjToString(data["website_url"])
  849. }
  850. if year != 0 {
  851. save["website_url"] = web
  852. }
  853. saveInfo := []map[string]interface{}{
  854. {"_id": data["company_id"]},
  855. {"$set": save},
  856. }
  857. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  858. //saveArr = append(saveArr, saveInfo)
  859. //500 条处理一次,打印一次记录
  860. //if len(saveArr) >= 500 {
  861. // tmps := saveArr
  862. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  863. // saveArr = [][]map[string]interface{}{}
  864. //}
  865. }
  866. func printMemoryUsage() {
  867. var memStats runtime.MemStats
  868. runtime.ReadMemStats(&memStats)
  869. // 将字节转换为兆字节(MB)
  870. allocatedMB := float64(memStats.Alloc) / 1024 / 1024
  871. totalAllocatedMB := float64(memStats.TotalAlloc) / 1024 / 1024
  872. heapAllocMB := float64(memStats.HeapAlloc) / 1024 / 1024
  873. log.Info("printMemoryUsage", zap.Any("当前程序已分配的内存大小", allocatedMB))
  874. log.Info("printMemoryUsage", zap.Any("程序自启动以来总共分配的内存大小", totalAllocatedMB))
  875. log.Info("printMemoryUsage", zap.Any("堆上当前已分配但尚未释放的内存", heapAllocMB))
  876. log.Info("printMemoryUsage", zap.Any("堆上分配的对象数", memStats.HeapObjects))
  877. }