main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "bufio"
  5. "compress/gzip"
  6. "encoding/json"
  7. "fmt"
  8. "go.uber.org/zap"
  9. "io"
  10. "io/ioutil"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  13. "net"
  14. "os"
  15. "strings"
  16. "time"
  17. )
  18. var (
  19. CurrentColl string //当前表名
  20. //SkipCollName string
  21. //sendMsg string
  22. collCount int // 当前表数据量
  23. insertCount int // insert数据
  24. updateCount int // update数据
  25. saveLog = make(map[string]interface{})
  26. saveArr [][]map[string]interface{}
  27. UdpClient udp.UdpClient
  28. qyxyEsAddr *net.UDPAddr
  29. localPort string // 本地监听端口
  30. startTime int64 //std 程序开始执行的时间
  31. readPath string //
  32. )
  33. func main() {
  34. localPort = GF.Env.Localport //udp 本地监听地址
  35. UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
  36. readPath = GF.Env.Path //手动指定同步的文件夹名称,否则就使用udp 传递数据
  37. qyxyEsAddr = &net.UDPAddr{
  38. Port: GF.Env.Esport,
  39. IP: net.ParseIP(GF.Env.Targetip),
  40. }
  41. log.Info("main", zap.Any("qyxyEsAddr", qyxyEsAddr))
  42. startTime = time.Now().Unix()
  43. UdpClient.Listen(processUdpMsg)
  44. log.Info("main", zap.String("Udp服务监听本地端口", localPort))
  45. ch := make(chan bool, 1)
  46. <-ch
  47. }
  48. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  49. switch act {
  50. case udp.OP_TYPE_DATA:
  51. var mapInfo map[string]interface{}
  52. err := json.Unmarshal(data, &mapInfo)
  53. log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
  54. if err != nil {
  55. log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
  56. UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  57. } else if mapInfo != nil {
  58. key, _ := mapInfo["key"].(string)
  59. if key == "" {
  60. key = "udpok"
  61. }
  62. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  63. //拿到同步信号,开始同步数据
  64. if _, ok := mapInfo["start"]; ok {
  65. if _, okk := mapInfo["path"]; okk {
  66. path := util.ObjToString(mapInfo["path"])
  67. //没有指定配置文件的指定目录,就使用udp 传递目录
  68. if path != "" {
  69. readPath = path
  70. }
  71. }
  72. // 开始执行
  73. log.Info("processUdpMsg", zap.String("readPath", readPath))
  74. if readPath != "" {
  75. go dealPath(readPath)
  76. }
  77. }
  78. }
  79. default:
  80. log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "========"))
  81. }
  82. }
  83. func dealPath(path string) {
  84. if !strings.HasSuffix(path, "/") {
  85. path = path + "/" ///Users/wangchengcheng/Desktop/jianyu/upload/20221119
  86. }
  87. //std 程序只需要关注6个表
  88. for _, c := range CollArr {
  89. start := time.Now()
  90. collCount = 0
  91. insertCount = 0
  92. updateCount = 0
  93. CurrentColl = c
  94. subPath := path + c + "/"
  95. //判断文件夹存在
  96. _, err := os.Stat(subPath) ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/
  97. if err != nil {
  98. log.Info("dealPath", zap.Any("os.Stat err", subPath))
  99. continue
  100. }
  101. subFiles, _ := ioutil.ReadDir(subPath)
  102. for _, s := range subFiles { //七天的文件夹名
  103. if s.IsDir() {
  104. ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
  105. dealinfo(subPath + s.Name())
  106. }
  107. }
  108. // 判断最后的数据不足500条时 执行
  109. if len(saveArr) > 0 {
  110. tmps := saveArr
  111. MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  112. saveArr = [][]map[string]interface{}{}
  113. }
  114. duration := time.Since(start)
  115. result := map[string]interface{}{
  116. "count": collCount,
  117. "duration": duration.Minutes(), //运行时长
  118. "insert": insertCount, //插入数量
  119. "update": updateCount, //更新数量
  120. }
  121. saveLog[c] = result
  122. }
  123. MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveLog})
  124. //执行完毕通知es 程序
  125. data := map[string]interface{}{
  126. "start": true,
  127. "start_time": startTime,
  128. }
  129. SendUdpMsg(data, qyxyEsAddr)
  130. }
  131. func dealinfo(path string) {
  132. count := 0
  133. file := path + "/split.json.gz"
  134. log.Info("dealinfo", zap.Any("current date", file))
  135. _, err := os.Stat(file)
  136. if err == nil {
  137. // 打开本地gz格式压缩包
  138. fr, err := os.Open(file)
  139. if err != nil {
  140. panic(err)
  141. } else {
  142. println("open file success!")
  143. }
  144. // defer: 在函数退出时,执行关闭文件
  145. defer fr.Close()
  146. // 创建gzip文件读取对象
  147. gr, err := gzip.NewReader(fr)
  148. if err != nil {
  149. panic(err)
  150. }
  151. // defer: 在函数退出时,执行关闭gzip对象
  152. defer gr.Close()
  153. bfRd := bufio.NewReader(gr)
  154. for {
  155. line, err := bfRd.ReadBytes('\n')
  156. if err != nil {
  157. if err == io.EOF {
  158. fmt.Println("read gzip data finish! ")
  159. break
  160. } else {
  161. fmt.Println("[read gzip data err]: ", err)
  162. }
  163. }
  164. if len(line) > 0 {
  165. count = hookfn(line, count)
  166. }
  167. if count%1000 == 0 {
  168. log.Info("dealinfo", zap.Any("current exc---", fmt.Sprintf("%s-%d", file, count)))
  169. }
  170. }
  171. }
  172. }
  173. //hookfn 处理拿到的每行数据
  174. func hookfn(line []byte, count int) int {
  175. tmp := make(map[string]interface{})
  176. err := json.Unmarshal(line, &tmp)
  177. if err != nil {
  178. log.Error("hookfn", zap.Any("Unmarshal", err))
  179. }
  180. count++
  181. collCount++
  182. if _, ok := tmp["company_id"]; ok {
  183. //统计插入、更新数量
  184. if util.ObjToString(tmp["_operation_type"]) == "insert" {
  185. insertCount++
  186. } else {
  187. updateCount++
  188. }
  189. //针对数据表 不同处理
  190. switch CurrentColl {
  191. case "company_base":
  192. dealCompanyBase(tmp)
  193. case "company_employee":
  194. dealCompanyEmployee(tmp)
  195. case "company_history_name":
  196. dealHistoryName(tmp)
  197. case "company_partner":
  198. dealCompanyPartner(tmp)
  199. case "annual_report_base":
  200. dealAnnualReportBase(tmp)
  201. case "annual_report_website":
  202. dealAnnualReportWebsite(tmp)
  203. default:
  204. fmt.Println("CurrentColl =>", CurrentColl)
  205. }
  206. }
  207. return count
  208. }
  209. //dealCompanyBase company_base数据表
  210. func dealCompanyBase(data map[string]interface{}) {
  211. update := make(map[string]interface{})
  212. save := make(map[string]interface{})
  213. for _, v := range company_base {
  214. if data[v] == nil {
  215. continue
  216. }
  217. // company_type 公司类型处理
  218. if v == "company_type" {
  219. save["company_type_old"] = data[v]
  220. if text := util.ObjToString(data["company_type"]); text != "" {
  221. if strings.Contains(text, "个体") || strings.Contains(text, "非公司") {
  222. save["company_type"] = "个体工商户"
  223. } else {
  224. text = strings.ReplaceAll(text, "(", "(")
  225. text = strings.ReplaceAll(text, ")", ")")
  226. if stype := QyStypeMap[text]; stype != "" {
  227. save["company_type"] = stype
  228. } else {
  229. save["company_type"] = "其他"
  230. }
  231. }
  232. }
  233. } else if v == "company_status" {
  234. save["company_status_old"] = data[v]
  235. if text := util.ObjToString(data["company_status"]); text != "" {
  236. text = strings.ReplaceAll(text, "(", "(")
  237. text = strings.ReplaceAll(text, ")", ")")
  238. if status := CompanyStatusMap[text]; status != "" {
  239. save["company_status"] = status
  240. } else {
  241. save["company_status"] = "其他"
  242. }
  243. }
  244. } else if v == "capital" {
  245. // capital/currency
  246. text := util.ObjToString(data[v])
  247. if currency := GetCurrency(text); currency != "" {
  248. save["currency"] = currency //币种
  249. }
  250. capital := ObjToMoney(text)
  251. capital = capital / 10000
  252. if capital != 0 {
  253. save[v] = capital
  254. }
  255. } else if v == "use_flag" {
  256. save[v] = util.IntAll(data[v])
  257. } else {
  258. save[v] = data[v]
  259. }
  260. }
  261. // mysql create_time/update_time
  262. save["create_time_msql"] = data["create_time"]
  263. save["update_time_msql"] = data["update_time"]
  264. save["_id"] = data["company_id"]
  265. save["createtime"] = time.Now().Unix()
  266. save["updatetime"] = time.Now().Unix()
  267. // company_area/company_city/company_district
  268. pshort := util.ObjToString(data["province_short"])
  269. save["company_area"] = province_map[pshort]
  270. //company_city,company_district
  271. for i, field := range AreaFiled {
  272. if data[field] == nil {
  273. continue
  274. }
  275. if code := fmt.Sprint(data[field]); code != "" {
  276. if i == 0 && len(code) >= 8 { //credit_no企业信用代码
  277. code = code[2:8]
  278. } else if i == 1 && len(code) >= 6 { //company_code注册号
  279. code = code[:6]
  280. }
  281. if city := AddressMap[code]; city != nil { //未作废中取
  282. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  283. if city.City != "" {
  284. save["company_city"] = city.City //市
  285. }
  286. if city.District != "" {
  287. save["company_district"] = city.District //县
  288. }
  289. break
  290. }
  291. } else { //作废中取
  292. if city := AddressOldMap[code]; city != nil {
  293. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  294. if city.City != "" {
  295. save["company_city"] = city.City //市
  296. }
  297. if city.District != "" {
  298. save["company_district"] = city.District //县
  299. }
  300. }
  301. break
  302. }
  303. }
  304. }
  305. }
  306. // search_type
  307. if t := util.ObjToString(save["company_type"]); t != "" {
  308. if t != "个体工商户" && t != "其他" {
  309. t1 := util.ObjToString(save["company_type_old"])
  310. name := util.ObjToString(save["company_name"])
  311. if strings.Contains(t1, "有限合伙") {
  312. save["search_type"] = "有限合伙"
  313. } else if strings.Contains(t1, "合伙") {
  314. save["search_type"] = "普通合伙"
  315. } else if strings.Contains(name, "股份") ||
  316. (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
  317. save["search_type"] = "股份有限公司"
  318. } else {
  319. save["search_type"] = "有限责任公司"
  320. }
  321. }
  322. }
  323. // company_shortname
  324. if m := getStName(util.ObjToString(save["company_name"])); m != "" {
  325. save["company_shortname"] = m
  326. }
  327. // bid_unittype
  328. flag := false
  329. for _, v := range WordsArr {
  330. if strings.Contains(util.ObjToString(save["business_scope"]), v) {
  331. flag = true
  332. break
  333. }
  334. }
  335. if flag {
  336. if save["bid_unittype"] != nil {
  337. save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商")
  338. } else {
  339. save["bid_unittype"] = []string{"厂商"}
  340. }
  341. }
  342. update["$set"] = save
  343. updataInfo := []map[string]interface{}{
  344. {"_id": save["_id"]},
  345. update,
  346. }
  347. saveArr = append(saveArr, updataInfo)
  348. //500 条处理一次,打印一次记录
  349. if len(saveArr) >= 500 {
  350. tmps := saveArr
  351. res := MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  352. if !res {
  353. log.Info("dealCompanyBase", zap.Any("UpSertBulk company_base err", res))
  354. }
  355. saveArr = [][]map[string]interface{}{}
  356. }
  357. }
  358. //dealCompanyEmployee company_employee
  359. func dealCompanyEmployee(data map[string]interface{}) {
  360. save := make(map[string]interface{})
  361. save["_id"] = data["company_id"]
  362. save["updatetime"] = time.Now().Unix()
  363. oldTmp := &map[string]interface{}{}
  364. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  365. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  366. var names []string
  367. var arr []map[string]interface{}
  368. if (*oldTmp)["employees"] != nil {
  369. arr = util.ObjArrToMapArr((*oldTmp)["employees"].([]interface{}))
  370. } else {
  371. arr = make([]map[string]interface{}, 0)
  372. }
  373. ep := make(map[string]interface{})
  374. if util.ObjToString(data["_operation_type"]) == "insert" {
  375. ep["employee_name"] = data["employee_name"]
  376. ep["position"] = data["position"]
  377. ep["is_history"] = data["is_history"]
  378. ep["_id"] = util.IntAll(data["id"])
  379. arr = append(arr, ep)
  380. names = append(names, util.ObjToString(data["employee_name"]))
  381. } else {
  382. eq_flag := true
  383. for _, m := range arr {
  384. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  385. eq_flag = false
  386. m["employee_name"] = data["employee_name"]
  387. m["position"] = data["position"]
  388. m["is_history"] = data["is_history"]
  389. break
  390. }
  391. }
  392. if eq_flag {
  393. ep := make(map[string]interface{})
  394. ep["employee_name"] = data["employee_name"]
  395. ep["position"] = data["position"]
  396. ep["is_history"] = data["is_history"]
  397. ep["_id"] = util.IntAll(data["id"])
  398. arr = append(arr, ep)
  399. names = append(names, util.ObjToString(data["employee_name"]))
  400. }
  401. }
  402. save["employees"] = arr
  403. save["employee_name"] = strings.Join(names, ",")
  404. saveInfo := []map[string]interface{}{
  405. {"_id": data["company_id"]},
  406. {"$set": save},
  407. }
  408. saveArr = append(saveArr, saveInfo)
  409. //500 条处理一次,打印一次记录
  410. if len(saveArr) >= 500 {
  411. tmps := saveArr
  412. MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  413. saveArr = [][]map[string]interface{}{}
  414. }
  415. }
  416. //dealCompanyPartner
  417. func dealCompanyPartner(data map[string]interface{}) {
  418. save := make(map[string]interface{})
  419. save["_id"] = data["company_id"]
  420. save["updatetime"] = time.Now().Unix()
  421. oldTmp := &map[string]interface{}{}
  422. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  423. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  424. var names []string
  425. var arr []map[string]interface{}
  426. if (*oldTmp)["partners"] != nil {
  427. arr = util.ObjArrToMapArr((*oldTmp)["partners"].([]interface{}))
  428. } else {
  429. arr = make([]map[string]interface{}, 0)
  430. }
  431. if util.ObjToString(data["_operation_type"]) == "insert" {
  432. exp := make(map[string]interface{})
  433. exp["stock_capital"] = data["stock_capital"]
  434. exp["stock_name"] = data["stock_name"]
  435. exp["identify_no"] = data["identify_no"]
  436. exp["stock_realcapital"] = data["stock_realcapital"]
  437. exp["is_history"] = data["is_history"]
  438. exp["is_personal"] = data["is_personal"]
  439. exp["stock_type"] = data["stock_type"]
  440. exp["identify_type"] = data["identify_type"]
  441. exp["_id"] = util.IntAll(data["id"])
  442. arr = append(arr, exp)
  443. names = append(names, util.ObjToString(data["stock_name"]))
  444. } else {
  445. eqFlag := true
  446. for _, m := range arr {
  447. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  448. eqFlag = false
  449. m["stock_capital"] = data["stock_capital"]
  450. m["stock_name"] = data["stock_name"]
  451. m["identify_no"] = data["identify_no"]
  452. m["stock_realcapital"] = data["stock_realcapital"]
  453. m["is_history"] = data["is_history"]
  454. m["is_personal"] = data["is_personal"]
  455. m["stock_type"] = data["stock_type"]
  456. m["identify_type"] = data["identify_type"]
  457. break
  458. }
  459. }
  460. if eqFlag {
  461. exp := make(map[string]interface{})
  462. exp["stock_capital"] = data["stock_capital"]
  463. exp["stock_name"] = data["stock_name"]
  464. exp["identify_no"] = data["identify_no"]
  465. exp["stock_realcapital"] = data["stock_realcapital"]
  466. exp["is_history"] = data["is_history"]
  467. exp["is_personal"] = data["is_personal"]
  468. exp["stock_type"] = data["stock_type"]
  469. exp["identify_type"] = data["identify_type"]
  470. exp["_id"] = util.IntAll(data["id"])
  471. arr = append(arr, exp)
  472. names = append(names, util.ObjToString(data["stock_name"]))
  473. }
  474. }
  475. save["partners"] = arr
  476. save["stock_name"] = strings.Join(names, ",")
  477. saveInfo := []map[string]interface{}{
  478. {"_id": data["company_id"]},
  479. {"$set": save},
  480. }
  481. saveArr = append(saveArr, saveInfo)
  482. //500 条处理一次,打印一次记录
  483. if len(saveArr) >= 500 {
  484. tmps := saveArr
  485. MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  486. saveArr = [][]map[string]interface{}{}
  487. }
  488. }
  489. //dealAnnualReportBase annual_report_base
  490. func dealAnnualReportBase(data map[string]interface{}) {
  491. save := make(map[string]interface{})
  492. save["_id"] = data["company_id"]
  493. save["updatetime"] = time.Now().Unix()
  494. oldTmp := &map[string]interface{}{}
  495. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  496. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  497. var arr []map[string]interface{}
  498. if (*oldTmp)["annual_reports"] != nil {
  499. arr = util.ObjArrToMapArr((*oldTmp)["annual_reports"].([]interface{}))
  500. } else {
  501. arr = make([]map[string]interface{}, 0)
  502. }
  503. year := 0
  504. phone, email := "", ""
  505. employeeNum := 0
  506. if util.ObjToString(data["_operation_type"]) == "insert" {
  507. exp := make(map[string]interface{})
  508. exp["operator_name"] = data["operator_name"]
  509. exp["report_year"] = data["report_year"]
  510. exp["zip_code"] = data["zip_code"]
  511. exp["employee_no"] = data["employee_no"]
  512. exp["member_no"] = data["member_no"]
  513. exp["company_phone"] = data["company_phone"]
  514. exp["company_email"] = data["company_email"]
  515. exp["_id"] = util.IntAll(data["id"])
  516. arr = append(arr, exp)
  517. if year < util.IntAll(data["report_year"]) {
  518. year = util.IntAll(data["report_year"])
  519. phone = util.ObjToString(data["company_phone"])
  520. email = util.ObjToString(data["company_email"])
  521. employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
  522. memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
  523. if employeeNo > 0 {
  524. employeeNum = employeeNo
  525. } else if memberNo > 0 {
  526. employeeNum = memberNo
  527. }
  528. }
  529. } else {
  530. eqFlag := true
  531. for _, m := range arr {
  532. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  533. eqFlag = false
  534. m["operator_name"] = data["operator_name"]
  535. m["report_year"] = data["report_year"]
  536. m["zip_code"] = data["zip_code"]
  537. m["employee_no"] = data["employee_no"]
  538. m["member_no"] = data["member_no"]
  539. m["company_phone"] = data["company_phone"]
  540. m["company_email"] = data["company_email"]
  541. break
  542. }
  543. }
  544. if eqFlag {
  545. exp := make(map[string]interface{})
  546. exp["operator_name"] = data["operator_name"]
  547. exp["report_year"] = data["report_year"]
  548. exp["zip_code"] = data["zip_code"]
  549. exp["employee_no"] = data["employee_no"]
  550. exp["member_no"] = data["member_no"]
  551. exp["company_phone"] = data["company_phone"]
  552. exp["company_email"] = data["company_email"]
  553. exp["_id"] = util.IntAll(data["id"])
  554. arr = append(arr, exp)
  555. if year < util.IntAll(data["report_year"]) {
  556. year = util.IntAll(data["report_year"])
  557. phone = util.ObjToString(data["company_phone"])
  558. email = util.ObjToString(data["company_email"])
  559. employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
  560. memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
  561. if employeeNo > 0 {
  562. employeeNum = employeeNo
  563. } else if memberNo > 0 {
  564. employeeNum = memberNo
  565. }
  566. }
  567. }
  568. }
  569. save["annual_reports"] = arr
  570. if year != 0 {
  571. save["company_phone"] = phone
  572. save["company_email"] = email
  573. save["employee_num"] = employeeNum
  574. }
  575. saveInfo := []map[string]interface{}{
  576. {"_id": data["company_id"]},
  577. {"$set": save},
  578. }
  579. saveArr = append(saveArr, saveInfo)
  580. //500 条处理一次,打印一次记录
  581. if len(saveArr) >= 500 {
  582. tmps := saveArr
  583. MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  584. saveArr = [][]map[string]interface{}{}
  585. }
  586. }
  587. //dealHistoryName company_history_name
  588. func dealHistoryName(data map[string]interface{}) {
  589. save := make(map[string]interface{})
  590. save["_id"] = data["company_id"]
  591. save["updatetime"] = time.Now().Unix()
  592. var names []string
  593. name := data["history_name"].(string)
  594. names = append(names, name)
  595. save["history_name"] = strings.Join(names, ",")
  596. saveInfo := []map[string]interface{}{
  597. {"_id": data["company_id"]},
  598. {"$set": save},
  599. }
  600. saveArr = append(saveArr, saveInfo)
  601. addSet := []map[string]interface{}{
  602. {"_id": data["company_id"]},
  603. {"$addToSet": map[string]interface{}{
  604. "history_names": map[string]interface{}{
  605. "$each": names}}},
  606. }
  607. //单独对每条的历史名称追加数组
  608. MongoTool.UpSertBulk(GF.Env.Dbsave, addSet)
  609. //500 条处理一次,打印一次记录
  610. if len(saveArr) >= 500 {
  611. tmps := saveArr
  612. MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  613. saveArr = [][]map[string]interface{}{}
  614. }
  615. }
  616. //dealAnnualReportWebsite annual_report_website
  617. func dealAnnualReportWebsite(data map[string]interface{}) {
  618. save := make(map[string]interface{})
  619. save["_id"] = data["company_id"]
  620. save["updatetime"] = time.Now().Unix()
  621. year := 0
  622. web := ""
  623. if year < util.IntAll(data["report_year"]) && util.IntAll(data["is_history"]) == 0 {
  624. year = util.IntAll(data["report_year"])
  625. web = util.ObjToString(data["website_url"])
  626. }
  627. if year != 0 {
  628. save["website_url"] = web
  629. }
  630. saveInfo := []map[string]interface{}{
  631. {"_id": data["company_id"]},
  632. {"$set": save},
  633. }
  634. saveArr = append(saveArr, saveInfo)
  635. //500 条处理一次,打印一次记录
  636. if len(saveArr) >= 500 {
  637. tmps := saveArr
  638. MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  639. saveArr = [][]map[string]interface{}{}
  640. }
  641. }