main.go 21 KB


  1. package main
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "encoding/json"
  6. "fmt"
  7. "go.uber.org/zap"
  8. "io"
  9. "io/ioutil"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  13. "net"
  14. "os"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. //CurrentColl string //当前表名
  21. //SkipCollName string
  22. //sendMsg string
  23. //collCount int // 当前表数据量
  24. //insertCount int // insert数据
  25. //updateCount int // update数据
  26. //saveLog = make(map[string]interface{})
  27. savelog sync.Map
  28. //saveArr [][]map[string]interface{}
  29. UdpClient udp.UdpClient
  30. qyxyEsAddr *net.UDPAddr
  31. localPort string // 本地监听端口
  32. startTime int64 //std 程序开始执行的时间
  33. readPath string //
  34. )
  35. func main() {
  36. localPort = GF.Env.Localport //udp 本地监听地址
  37. UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
  38. readPath = GF.Env.Path //手动指定同步的文件夹名称,否则就使用udp 传递数据
  39. qyxyEsAddr = &net.UDPAddr{
  40. Port: GF.Env.Esport,
  41. IP: net.ParseIP(GF.Env.Targetip),
  42. }
  43. log.Info("main", zap.Any("qyxyEsAddr", qyxyEsAddr))
  44. startTime = time.Now().Unix()
  45. UdpClient.Listen(processUdpMsg)
  46. log.Info("main", zap.String("Udp服务监听本地端口", localPort))
  47. ch := make(chan bool, 1)
  48. <-ch
  49. }
  50. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  51. switch act {
  52. case udp.OP_TYPE_DATA:
  53. var mapInfo map[string]interface{}
  54. err := json.Unmarshal(data, &mapInfo)
  55. log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
  56. if err != nil {
  57. log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
  58. UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  59. } else if mapInfo != nil {
  60. key, _ := mapInfo["key"].(string)
  61. if key == "" {
  62. key = "udpok"
  63. }
  64. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  65. //拿到同步信号,开始同步数据
  66. if _, ok := mapInfo["start"]; ok {
  67. if _, okk := mapInfo["path"]; okk {
  68. path := util.ObjToString(mapInfo["path"])
  69. //没有指定配置文件的指定目录,就使用udp 传递目录
  70. if path != "" {
  71. readPath = path
  72. }
  73. }
  74. // 开始执行
  75. log.Info("processUdpMsg", zap.String("readPath", readPath))
  76. if readPath != "" {
  77. go dealPath(readPath)
  78. }
  79. }
  80. }
  81. default:
  82. log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "========"))
  83. }
  84. }
  85. func dealPath(path string) {
  86. if !strings.HasSuffix(path, "/") {
  87. path = path + "/" ///Users/wangchengcheng/Desktop/jianyu/upload/20221119
  88. }
  89. var wg sync.WaitGroup
  90. //std 程序只需要关注6个表
  91. for _, c := range CollArr {
  92. subPath := path + c + "/"
  93. //判断文件夹存在
  94. _, err := os.Stat(subPath) ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/
  95. if err != nil {
  96. log.Info("dealPath", zap.Any("os.Stat err", subPath))
  97. continue
  98. }
  99. wg.Add(1)
  100. go dealSubPath(c, subPath, &wg)
  101. }
  102. wg.Wait()
  103. //更新 nseo_id
  104. updateStd()
  105. //最终记录
  106. var saveRes = make(map[string]interface{})
  107. for _, c := range CollArr {
  108. data, ok := savelog.Load(c)
  109. if ok {
  110. saveRes[c] = data
  111. }
  112. }
  113. MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveRes})
  114. //执行完毕通知es 程序
  115. data := map[string]interface{}{
  116. "start": true,
  117. "start_time": startTime,
  118. "end_time": time.Now().Unix(),
  119. }
  120. log.Info("dealPath", zap.String(path, "数据同步结束"))
  121. SendUdpMsg(data, qyxyEsAddr)
  122. }
  123. //dealSubPath 处理最里面层级数据;Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
  124. //c 当前表;
  125. func dealSubPath(c, subPath string, wg *sync.WaitGroup) {
  126. defer wg.Done()
  127. log.Info("dealSubPath", zap.String("开始处理path:", subPath))
  128. start := time.Now()
  129. var fileWg sync.WaitGroup
  130. var linesMap sync.Map
  131. subFiles, _ := ioutil.ReadDir(subPath)
  132. for _, s := range subFiles { //七天的文件夹名
  133. if s.IsDir() {
  134. fileWg.Add(1)
  135. ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
  136. go dealinfo(c, subPath+s.Name(), &fileWg, &linesMap)
  137. }
  138. }
  139. fileWg.Wait()
  140. cCount, _ := linesMap.Load(c)
  141. duration := time.Since(start)
  142. result := map[string]interface{}{
  143. "count": cCount,
  144. "duration": fmt.Sprintf("%v min", duration.Minutes()), //运行时长
  145. }
  146. savelog.Store(c, result)
  147. }
  148. func dealinfo(c, path string, wg *sync.WaitGroup, linesMap *sync.Map) {
  149. defer wg.Done()
  150. count := 0
  151. file := path + "/split.json.gz"
  152. log.Info("dealinfo", zap.Any("current date", file))
  153. _, err := os.Stat(file)
  154. if err == nil {
  155. // 打开本地gz格式压缩包
  156. fr, err := os.Open(file)
  157. if err != nil {
  158. panic(err)
  159. } else {
  160. println("open file success!")
  161. }
  162. // defer: 在函数退出时,执行关闭文件
  163. defer fr.Close()
  164. // 创建gzip文件读取对象
  165. gr, err := gzip.NewReader(fr)
  166. if err != nil {
  167. panic(err)
  168. }
  169. // defer: 在函数退出时,执行关闭gzip对象
  170. defer gr.Close()
  171. bfRd := bufio.NewReader(gr)
  172. for {
  173. line, err := bfRd.ReadBytes('\n')
  174. if err != nil {
  175. if err == io.EOF {
  176. log.Info("dealinfo", zap.String(fmt.Sprintf("%v/split.json.gz", path), "read gzip data finish!"))
  177. fmt.Println("read gzip data finish! ")
  178. break
  179. } else {
  180. log.Error("dealinfo", zap.Any(fmt.Sprintf("%v/split.json.gz;read gzip err", path), err))
  181. }
  182. }
  183. if len(line) > 0 {
  184. count++
  185. hookfn(c, line)
  186. linesMap.Store(c, count)
  187. }
  188. if count%1000 == 0 {
  189. log.Info("dealinfo", zap.Any("current exc---", fmt.Sprintf("%s-%d", file, count)))
  190. }
  191. }
  192. }
  193. }
  194. //hookfn 处理拿到的每行数据
  195. func hookfn(c string, line []byte) {
  196. tmp := make(map[string]interface{})
  197. err := json.Unmarshal(line, &tmp)
  198. if err != nil {
  199. log.Error("hookfn", zap.Any("Unmarshal", err))
  200. }
  201. if _, ok := tmp["company_id"]; ok {
  202. //针对数据表 不同处理
  203. switch c {
  204. case "company_base":
  205. dealCompanyBase(tmp)
  206. case "company_employee":
  207. dealCompanyEmployee(tmp)
  208. case "company_history_name":
  209. dealHistoryName(tmp)
  210. case "company_partner":
  211. dealCompanyPartner(tmp)
  212. case "annual_report_base":
  213. dealAnnualReportBase(tmp)
  214. case "annual_report_website":
  215. dealAnnualReportWebsite(tmp)
  216. default:
  217. fmt.Println("CurrentColl =>", c)
  218. }
  219. }
  220. }
  221. //dealCompanyBase company_base数据表
  222. func dealCompanyBase(data map[string]interface{}) {
  223. update := make(map[string]interface{})
  224. save := make(map[string]interface{})
  225. for _, v := range company_base {
  226. if data[v] == nil {
  227. continue
  228. }
  229. // company_type 公司类型处理
  230. if v == "company_type" {
  231. save["company_type_old"] = data[v]
  232. if text := util.ObjToString(data["company_type"]); text != "" {
  233. if strings.Contains(text, "个体") || strings.Contains(text, "非公司") {
  234. save["company_type"] = "个体工商户"
  235. } else {
  236. text = strings.ReplaceAll(text, "(", "(")
  237. text = strings.ReplaceAll(text, ")", ")")
  238. if stype := QyStypeMap[text]; stype != "" {
  239. save["company_type"] = stype
  240. } else {
  241. save["company_type"] = "其他"
  242. }
  243. }
  244. }
  245. } else if v == "company_status" {
  246. save["company_status_old"] = data[v]
  247. if text := util.ObjToString(data["company_status"]); text != "" {
  248. text = strings.ReplaceAll(text, "(", "(")
  249. text = strings.ReplaceAll(text, ")", ")")
  250. if status := CompanyStatusMap[text]; status != "" {
  251. save["company_status"] = status
  252. } else {
  253. save["company_status"] = "其他"
  254. }
  255. }
  256. } else if v == "capital" {
  257. // capital/currency
  258. text := util.ObjToString(data[v])
  259. if currency := GetCurrency(text); currency != "" {
  260. save["currency"] = currency //币种
  261. }
  262. capital := ObjToMoney(text)
  263. capital = capital / 10000
  264. if capital != 0 {
  265. save[v] = capital
  266. }
  267. } else if v == "use_flag" {
  268. save[v] = util.IntAll(data[v])
  269. } else {
  270. save[v] = data[v]
  271. }
  272. }
  273. // mysql create_time/update_time
  274. save["create_time_msql"] = data["create_time"]
  275. save["update_time_msql"] = data["update_time"]
  276. save["_id"] = data["company_id"]
  277. save["autoid"] = util.Int64All(data["id"])
  278. save["createtime"] = time.Now().Unix()
  279. save["updatetime"] = time.Now().Unix()
  280. // company_area/company_city/company_district
  281. pshort := util.ObjToString(data["province_short"])
  282. save["company_area"] = province_map[pshort]
  283. //company_city,company_district
  284. for i, field := range AreaFiled {
  285. if data[field] == nil {
  286. continue
  287. }
  288. if code := fmt.Sprint(data[field]); code != "" {
  289. if i == 0 && len(code) >= 8 { //credit_no企业信用代码
  290. code = code[2:8]
  291. } else if i == 1 && len(code) >= 6 { //company_code注册号
  292. code = code[:6]
  293. }
  294. if city := AddressMap[code]; city != nil { //未作废中取
  295. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  296. if city.City != "" {
  297. save["company_city"] = city.City //市
  298. }
  299. if city.District != "" {
  300. save["company_district"] = city.District //县
  301. }
  302. break
  303. }
  304. } else { //作废中取
  305. if city := AddressOldMap[code]; city != nil {
  306. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  307. if city.City != "" {
  308. save["company_city"] = city.City //市
  309. }
  310. if city.District != "" {
  311. save["company_district"] = city.District //县
  312. }
  313. }
  314. break
  315. }
  316. }
  317. }
  318. }
  319. // search_type
  320. if t := util.ObjToString(save["company_type"]); t != "" {
  321. if t != "个体工商户" && t != "其他" {
  322. t1 := util.ObjToString(save["company_type_old"])
  323. name := util.ObjToString(save["company_name"])
  324. if strings.Contains(t1, "有限合伙") {
  325. save["search_type"] = "有限合伙"
  326. } else if strings.Contains(t1, "合伙") {
  327. save["search_type"] = "普通合伙"
  328. } else if strings.Contains(name, "股份") ||
  329. (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
  330. save["search_type"] = "股份有限公司"
  331. } else {
  332. save["search_type"] = "有限责任公司"
  333. }
  334. }
  335. }
  336. // company_shortname
  337. if m := getStName(util.ObjToString(save["company_name"])); m != "" {
  338. save["company_shortname"] = m
  339. }
  340. // bid_unittype
  341. flag := false
  342. for _, v := range WordsArr {
  343. if strings.Contains(util.ObjToString(save["business_scope"]), v) {
  344. flag = true
  345. break
  346. }
  347. }
  348. if flag {
  349. if save["bid_unittype"] != nil {
  350. save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商")
  351. } else {
  352. save["bid_unittype"] = []string{"厂商"}
  353. }
  354. }
  355. update["$set"] = save
  356. updataInfo := []map[string]interface{}{
  357. {"_id": save["_id"]},
  358. update,
  359. }
  360. MongoTool.UpSertBulk(GF.Env.Dbsave, updataInfo)
  361. //saveArr = append(saveArr, updataInfo)
  362. //
  363. ////500 条处理一次,打印一次记录
  364. //if len(saveArr) >= 500 {
  365. // tmps := saveArr
  366. // res := MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  367. // if !res {
  368. // log.Info("dealCompanyBase", zap.Any("UpSertBulk company_base err", res))
  369. // }
  370. // saveArr = [][]map[string]interface{}{}
  371. //}
  372. }
  373. //dealCompanyEmployee company_employee
  374. func dealCompanyEmployee(data map[string]interface{}) {
  375. save := make(map[string]interface{})
  376. save["_id"] = data["company_id"]
  377. save["updatetime"] = time.Now().Unix()
  378. oldTmp := &map[string]interface{}{}
  379. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  380. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  381. var names []string
  382. var arr []map[string]interface{}
  383. if (*oldTmp)["employees"] != nil {
  384. arr = util.ObjArrToMapArr((*oldTmp)["employees"].([]interface{}))
  385. } else {
  386. arr = make([]map[string]interface{}, 0)
  387. }
  388. ep := make(map[string]interface{})
  389. if util.ObjToString(data["_operation_type"]) == "insert" {
  390. ep["employee_name"] = data["employee_name"]
  391. ep["position"] = data["position"]
  392. ep["is_history"] = data["is_history"]
  393. ep["_id"] = util.IntAll(data["id"])
  394. arr = append(arr, ep)
  395. names = append(names, util.ObjToString(data["employee_name"]))
  396. } else {
  397. eq_flag := true
  398. for _, m := range arr {
  399. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  400. eq_flag = false
  401. m["employee_name"] = data["employee_name"]
  402. m["position"] = data["position"]
  403. m["is_history"] = data["is_history"]
  404. break
  405. }
  406. }
  407. if eq_flag {
  408. ep := make(map[string]interface{})
  409. ep["employee_name"] = data["employee_name"]
  410. ep["position"] = data["position"]
  411. ep["is_history"] = data["is_history"]
  412. ep["_id"] = util.IntAll(data["id"])
  413. arr = append(arr, ep)
  414. names = append(names, util.ObjToString(data["employee_name"]))
  415. }
  416. }
  417. save["employees"] = arr
  418. save["employee_name"] = strings.Join(names, ",")
  419. saveInfo := []map[string]interface{}{
  420. {"_id": data["company_id"]},
  421. {"$set": save},
  422. }
  423. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  424. //saveArr = append(saveArr, saveInfo)
  425. ////500 条处理一次,打印一次记录
  426. //if len(saveArr) >= 500 {
  427. // tmps := saveArr
  428. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  429. // saveArr = [][]map[string]interface{}{}
  430. //}
  431. }
  432. //dealCompanyPartner
  433. func dealCompanyPartner(data map[string]interface{}) {
  434. save := make(map[string]interface{})
  435. save["_id"] = data["company_id"]
  436. save["updatetime"] = time.Now().Unix()
  437. oldTmp := &map[string]interface{}{}
  438. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  439. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  440. var names []string
  441. var arr []map[string]interface{}
  442. if (*oldTmp)["partners"] != nil {
  443. arr = util.ObjArrToMapArr((*oldTmp)["partners"].([]interface{}))
  444. } else {
  445. arr = make([]map[string]interface{}, 0)
  446. }
  447. if util.ObjToString(data["_operation_type"]) == "insert" {
  448. exp := make(map[string]interface{})
  449. exp["stock_capital"] = data["stock_capital"]
  450. exp["stock_name"] = data["stock_name"]
  451. exp["identify_no"] = data["identify_no"]
  452. exp["stock_realcapital"] = data["stock_realcapital"]
  453. exp["is_history"] = data["is_history"]
  454. exp["is_personal"] = data["is_personal"]
  455. exp["stock_type"] = data["stock_type"]
  456. exp["identify_type"] = data["identify_type"]
  457. exp["_id"] = util.IntAll(data["id"])
  458. arr = append(arr, exp)
  459. names = append(names, util.ObjToString(data["stock_name"]))
  460. } else {
  461. eqFlag := true
  462. for _, m := range arr {
  463. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  464. eqFlag = false
  465. m["stock_capital"] = data["stock_capital"]
  466. m["stock_name"] = data["stock_name"]
  467. m["identify_no"] = data["identify_no"]
  468. m["stock_realcapital"] = data["stock_realcapital"]
  469. m["is_history"] = data["is_history"]
  470. m["is_personal"] = data["is_personal"]
  471. m["stock_type"] = data["stock_type"]
  472. m["identify_type"] = data["identify_type"]
  473. break
  474. }
  475. }
  476. if eqFlag {
  477. exp := make(map[string]interface{})
  478. exp["stock_capital"] = data["stock_capital"]
  479. exp["stock_name"] = data["stock_name"]
  480. exp["identify_no"] = data["identify_no"]
  481. exp["stock_realcapital"] = data["stock_realcapital"]
  482. exp["is_history"] = data["is_history"]
  483. exp["is_personal"] = data["is_personal"]
  484. exp["stock_type"] = data["stock_type"]
  485. exp["identify_type"] = data["identify_type"]
  486. exp["_id"] = util.IntAll(data["id"])
  487. arr = append(arr, exp)
  488. names = append(names, util.ObjToString(data["stock_name"]))
  489. }
  490. }
  491. save["partners"] = arr
  492. save["stock_name"] = strings.Join(names, ",")
  493. saveInfo := []map[string]interface{}{
  494. {"_id": data["company_id"]},
  495. {"$set": save},
  496. }
  497. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  498. //saveArr = append(saveArr, saveInfo)
  499. ////500 条处理一次,打印一次记录
  500. //if len(saveArr) >= 500 {
  501. // tmps := saveArr
  502. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  503. // saveArr = [][]map[string]interface{}{}
  504. //}
  505. }
  506. //dealAnnualReportBase annual_report_base
  507. func dealAnnualReportBase(data map[string]interface{}) {
  508. save := make(map[string]interface{})
  509. save["_id"] = data["company_id"]
  510. save["updatetime"] = time.Now().Unix()
  511. oldTmp := &map[string]interface{}{}
  512. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  513. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  514. var arr []map[string]interface{}
  515. if (*oldTmp)["annual_reports"] != nil {
  516. arr = util.ObjArrToMapArr((*oldTmp)["annual_reports"].([]interface{}))
  517. } else {
  518. arr = make([]map[string]interface{}, 0)
  519. }
  520. year := 0
  521. phone, email := "", ""
  522. employeeNum := 0
  523. if util.ObjToString(data["_operation_type"]) == "insert" {
  524. exp := make(map[string]interface{})
  525. exp["operator_name"] = data["operator_name"]
  526. exp["report_year"] = data["report_year"]
  527. exp["zip_code"] = data["zip_code"]
  528. exp["employee_no"] = data["employee_no"]
  529. exp["member_no"] = data["member_no"]
  530. exp["company_phone"] = data["company_phone"]
  531. exp["company_email"] = data["company_email"]
  532. exp["_id"] = util.IntAll(data["id"])
  533. arr = append(arr, exp)
  534. if year < util.IntAll(data["report_year"]) {
  535. year = util.IntAll(data["report_year"])
  536. phone = util.ObjToString(data["company_phone"])
  537. email = util.ObjToString(data["company_email"])
  538. employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
  539. memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
  540. if employeeNo > 0 {
  541. employeeNum = employeeNo
  542. } else if memberNo > 0 {
  543. employeeNum = memberNo
  544. }
  545. }
  546. } else {
  547. eqFlag := true
  548. for _, m := range arr {
  549. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  550. eqFlag = false
  551. m["operator_name"] = data["operator_name"]
  552. m["report_year"] = data["report_year"]
  553. m["zip_code"] = data["zip_code"]
  554. m["employee_no"] = data["employee_no"]
  555. m["member_no"] = data["member_no"]
  556. m["company_phone"] = data["company_phone"]
  557. m["company_email"] = data["company_email"]
  558. break
  559. }
  560. }
  561. if eqFlag {
  562. exp := make(map[string]interface{})
  563. exp["operator_name"] = data["operator_name"]
  564. exp["report_year"] = data["report_year"]
  565. exp["zip_code"] = data["zip_code"]
  566. exp["employee_no"] = data["employee_no"]
  567. exp["member_no"] = data["member_no"]
  568. exp["company_phone"] = data["company_phone"]
  569. exp["company_email"] = data["company_email"]
  570. exp["_id"] = util.IntAll(data["id"])
  571. arr = append(arr, exp)
  572. if year < util.IntAll(data["report_year"]) {
  573. year = util.IntAll(data["report_year"])
  574. phone = util.ObjToString(data["company_phone"])
  575. email = util.ObjToString(data["company_email"])
  576. employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
  577. memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
  578. if employeeNo > 0 {
  579. employeeNum = employeeNo
  580. } else if memberNo > 0 {
  581. employeeNum = memberNo
  582. }
  583. }
  584. }
  585. }
  586. save["annual_reports"] = arr
  587. if year != 0 {
  588. save["company_phone"] = phone
  589. save["company_email"] = email
  590. save["employee_num"] = employeeNum
  591. }
  592. saveInfo := []map[string]interface{}{
  593. {"_id": data["company_id"]},
  594. {"$set": save},
  595. }
  596. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  597. //
  598. //saveArr = append(saveArr, saveInfo)
  599. ////500 条处理一次,打印一次记录
  600. //if len(saveArr) >= 500 {
  601. // tmps := saveArr
  602. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  603. // saveArr = [][]map[string]interface{}{}
  604. //}
  605. }
  606. //dealHistoryName company_history_name
  607. func dealHistoryName(data map[string]interface{}) {
  608. save := make(map[string]interface{})
  609. save["_id"] = data["company_id"]
  610. save["updatetime"] = time.Now().Unix()
  611. var names []string
  612. if data["history_name"] != nil {
  613. name := data["history_name"].(string)
  614. names = append(names, name)
  615. save["history_name"] = strings.Join(names, ",")
  616. }
  617. saveInfo := []map[string]interface{}{
  618. {"_id": data["company_id"]},
  619. {"$set": save},
  620. }
  621. //saveArr = append(saveArr, saveInfo)
  622. addSet := []map[string]interface{}{
  623. {"_id": data["company_id"]},
  624. {"$addToSet": map[string]interface{}{
  625. "history_names": map[string]interface{}{
  626. "$each": names}}},
  627. }
  628. //单独对每条的历史名称追加数组
  629. MongoTool.UpSertBulk(GF.Env.Dbsave, addSet)
  630. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  631. ////500 条处理一次,打印一次记录
  632. //if len(saveArr) >= 500 {
  633. // tmps := saveArr
  634. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  635. // saveArr = [][]map[string]interface{}{}
  636. //}
  637. }
  638. //dealAnnualReportWebsite annual_report_website
  639. func dealAnnualReportWebsite(data map[string]interface{}) {
  640. save := make(map[string]interface{})
  641. save["_id"] = data["company_id"]
  642. save["updatetime"] = time.Now().Unix()
  643. year := 0
  644. web := ""
  645. if year < util.IntAll(data["report_year"]) && util.IntAll(data["is_history"]) == 0 {
  646. year = util.IntAll(data["report_year"])
  647. web = util.ObjToString(data["website_url"])
  648. }
  649. if year != 0 {
  650. save["website_url"] = web
  651. }
  652. saveInfo := []map[string]interface{}{
  653. {"_id": data["company_id"]},
  654. {"$set": save},
  655. }
  656. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  657. //saveArr = append(saveArr, saveInfo)
  658. //500 条处理一次,打印一次记录
  659. //if len(saveArr) >= 500 {
  660. // tmps := saveArr
  661. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  662. // saveArr = [][]map[string]interface{}{}
  663. //}
  664. }