main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734
  1. package main
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "encoding/json"
  6. "fmt"
  7. "go.uber.org/zap"
  8. "io"
  9. "io/ioutil"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  13. "net"
  14. "os"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. //CurrentColl string //当前表名
  21. //SkipCollName string
  22. //sendMsg string
  23. //collCount int // 当前表数据量
  24. //insertCount int // insert数据
  25. //updateCount int // update数据
  26. //saveLog = make(map[string]interface{})
  27. savelog sync.Map
  28. //saveArr [][]map[string]interface{}
  29. UdpClient udp.UdpClient
  30. qyxyEsAddr *net.UDPAddr
  31. localPort string // 本地监听端口
  32. startTime int64 //std 程序开始执行的时间
  33. readPath string //
  34. )
  35. func main() {
  36. localPort = GF.Env.Localport //udp 本地监听地址
  37. UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
  38. readPath = GF.Env.Path //手动指定同步的文件夹名称,否则就使用udp 传递数据
  39. qyxyEsAddr = &net.UDPAddr{
  40. Port: GF.Env.Esport,
  41. IP: net.ParseIP(GF.Env.Targetip),
  42. }
  43. log.Info("main", zap.Any("qyxyEsAddr", qyxyEsAddr))
  44. startTime = time.Now().Unix()
  45. UdpClient.Listen(processUdpMsg)
  46. log.Info("main", zap.String("Udp服务监听本地端口", localPort))
  47. ch := make(chan bool, 1)
  48. <-ch
  49. }
  50. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  51. switch act {
  52. case udp.OP_TYPE_DATA:
  53. var mapInfo map[string]interface{}
  54. err := json.Unmarshal(data, &mapInfo)
  55. log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
  56. if err != nil {
  57. log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
  58. UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  59. } else if mapInfo != nil {
  60. key, _ := mapInfo["key"].(string)
  61. if key == "" {
  62. key = "udpok"
  63. }
  64. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  65. //拿到同步信号,开始同步数据
  66. if _, ok := mapInfo["start"]; ok {
  67. if _, okk := mapInfo["path"]; okk {
  68. path := util.ObjToString(mapInfo["path"])
  69. //没有指定配置文件的指定目录,就使用udp 传递目录
  70. if path != "" {
  71. readPath = path
  72. }
  73. }
  74. // 开始执行
  75. log.Info("processUdpMsg", zap.String("readPath", readPath))
  76. if readPath != "" {
  77. go dealPath(readPath)
  78. }
  79. }
  80. }
  81. default:
  82. log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "========"))
  83. }
  84. }
  85. func dealPath(path string) {
  86. if !strings.HasSuffix(path, "/") {
  87. path = path + "/" ///Users/wangchengcheng/Desktop/jianyu/upload/20221119
  88. }
  89. var wg sync.WaitGroup
  90. //std 程序只需要关注6个表
  91. for _, c := range CollArr {
  92. subPath := path + c + "/"
  93. //判断文件夹存在
  94. _, err := os.Stat(subPath) ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/
  95. if err != nil {
  96. log.Info("dealPath", zap.Any("os.Stat err", subPath))
  97. continue
  98. }
  99. wg.Add(1)
  100. go dealSubPath(c, subPath, &wg)
  101. }
  102. wg.Wait()
  103. //最终记录
  104. var saveRes = make(map[string]interface{})
  105. for _, c := range CollArr {
  106. data, ok := savelog.Load(c)
  107. if ok {
  108. saveRes[c] = data
  109. }
  110. }
  111. MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveRes})
  112. //执行完毕通知es 程序
  113. data := map[string]interface{}{
  114. "start": true,
  115. "start_time": startTime,
  116. "end_time": time.Now().Unix(),
  117. }
  118. log.Info("dealPath", zap.String(path, "数据同步结束"))
  119. SendUdpMsg(data, qyxyEsAddr)
  120. }
  121. //dealSubPath 处理最里面层级数据;Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
  122. //c 当前表;
  123. func dealSubPath(c, subPath string, wg *sync.WaitGroup) {
  124. defer wg.Done()
  125. log.Info("dealSubPath", zap.String("开始处理path:", subPath))
  126. start := time.Now()
  127. var fileWg sync.WaitGroup
  128. var linesMap sync.Map
  129. subFiles, _ := ioutil.ReadDir(subPath)
  130. for _, s := range subFiles { //七天的文件夹名
  131. if s.IsDir() {
  132. fileWg.Add(1)
  133. ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
  134. go dealinfo(c, subPath+s.Name(), &fileWg, &linesMap)
  135. }
  136. }
  137. fileWg.Wait()
  138. cCount, _ := linesMap.Load(c)
  139. duration := time.Since(start)
  140. result := map[string]interface{}{
  141. "count": cCount,
  142. "duration": fmt.Sprintf("%v min", duration.Minutes()), //运行时长
  143. }
  144. savelog.Store(c, result)
  145. }
  146. func dealinfo(c, path string, wg *sync.WaitGroup, linesMap *sync.Map) {
  147. defer wg.Done()
  148. count := 0
  149. file := path + "/split.json.gz"
  150. log.Info("dealinfo", zap.Any("current date", file))
  151. _, err := os.Stat(file)
  152. if err == nil {
  153. // 打开本地gz格式压缩包
  154. fr, err := os.Open(file)
  155. if err != nil {
  156. panic(err)
  157. } else {
  158. println("open file success!")
  159. }
  160. // defer: 在函数退出时,执行关闭文件
  161. defer fr.Close()
  162. // 创建gzip文件读取对象
  163. gr, err := gzip.NewReader(fr)
  164. if err != nil {
  165. panic(err)
  166. }
  167. // defer: 在函数退出时,执行关闭gzip对象
  168. defer gr.Close()
  169. bfRd := bufio.NewReader(gr)
  170. for {
  171. line, err := bfRd.ReadBytes('\n')
  172. if err != nil {
  173. if err == io.EOF {
  174. log.Info("dealinfo", zap.String(fmt.Sprintf("%v/split.json.gz", path), "read gzip data finish!"))
  175. fmt.Println("read gzip data finish! ")
  176. break
  177. } else {
  178. log.Error("dealinfo", zap.Any(fmt.Sprintf("%v/split.json.gz;read gzip err", path), err))
  179. }
  180. }
  181. if len(line) > 0 {
  182. count++
  183. hookfn(c, line)
  184. linesMap.Store(c, count)
  185. }
  186. if count%1000 == 0 {
  187. log.Info("dealinfo", zap.Any("current exc---", fmt.Sprintf("%s-%d", file, count)))
  188. }
  189. }
  190. }
  191. }
  192. //hookfn 处理拿到的每行数据
  193. func hookfn(c string, line []byte) {
  194. tmp := make(map[string]interface{})
  195. err := json.Unmarshal(line, &tmp)
  196. if err != nil {
  197. log.Error("hookfn", zap.Any("Unmarshal", err))
  198. }
  199. if _, ok := tmp["company_id"]; ok {
  200. //针对数据表 不同处理
  201. switch c {
  202. case "company_base":
  203. dealCompanyBase(tmp)
  204. case "company_employee":
  205. dealCompanyEmployee(tmp)
  206. case "company_history_name":
  207. dealHistoryName(tmp)
  208. case "company_partner":
  209. dealCompanyPartner(tmp)
  210. case "annual_report_base":
  211. dealAnnualReportBase(tmp)
  212. case "annual_report_website":
  213. dealAnnualReportWebsite(tmp)
  214. default:
  215. fmt.Println("CurrentColl =>", c)
  216. }
  217. }
  218. }
  219. //dealCompanyBase company_base数据表
  220. func dealCompanyBase(data map[string]interface{}) {
  221. update := make(map[string]interface{})
  222. save := make(map[string]interface{})
  223. for _, v := range company_base {
  224. if data[v] == nil {
  225. continue
  226. }
  227. // company_type 公司类型处理
  228. if v == "company_type" {
  229. save["company_type_old"] = data[v]
  230. if text := util.ObjToString(data["company_type"]); text != "" {
  231. if strings.Contains(text, "个体") || strings.Contains(text, "非公司") {
  232. save["company_type"] = "个体工商户"
  233. } else {
  234. text = strings.ReplaceAll(text, "(", "(")
  235. text = strings.ReplaceAll(text, ")", ")")
  236. if stype := QyStypeMap[text]; stype != "" {
  237. save["company_type"] = stype
  238. } else {
  239. save["company_type"] = "其他"
  240. }
  241. }
  242. }
  243. } else if v == "company_status" {
  244. save["company_status_old"] = data[v]
  245. if text := util.ObjToString(data["company_status"]); text != "" {
  246. text = strings.ReplaceAll(text, "(", "(")
  247. text = strings.ReplaceAll(text, ")", ")")
  248. if status := CompanyStatusMap[text]; status != "" {
  249. save["company_status"] = status
  250. } else {
  251. save["company_status"] = "其他"
  252. }
  253. }
  254. } else if v == "capital" {
  255. // capital/currency
  256. text := util.ObjToString(data[v])
  257. if currency := GetCurrency(text); currency != "" {
  258. save["currency"] = currency //币种
  259. }
  260. capital := ObjToMoney(text)
  261. capital = capital / 10000
  262. if capital != 0 {
  263. save[v] = capital
  264. }
  265. } else if v == "use_flag" {
  266. save[v] = util.IntAll(data[v])
  267. } else {
  268. save[v] = data[v]
  269. }
  270. }
  271. // mysql create_time/update_time
  272. save["create_time_msql"] = data["create_time"]
  273. save["update_time_msql"] = data["update_time"]
  274. save["_id"] = data["company_id"]
  275. save["createtime"] = time.Now().Unix()
  276. save["updatetime"] = time.Now().Unix()
  277. // company_area/company_city/company_district
  278. pshort := util.ObjToString(data["province_short"])
  279. save["company_area"] = province_map[pshort]
  280. //company_city,company_district
  281. for i, field := range AreaFiled {
  282. if data[field] == nil {
  283. continue
  284. }
  285. if code := fmt.Sprint(data[field]); code != "" {
  286. if i == 0 && len(code) >= 8 { //credit_no企业信用代码
  287. code = code[2:8]
  288. } else if i == 1 && len(code) >= 6 { //company_code注册号
  289. code = code[:6]
  290. }
  291. if city := AddressMap[code]; city != nil { //未作废中取
  292. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  293. if city.City != "" {
  294. save["company_city"] = city.City //市
  295. }
  296. if city.District != "" {
  297. save["company_district"] = city.District //县
  298. }
  299. break
  300. }
  301. } else { //作废中取
  302. if city := AddressOldMap[code]; city != nil {
  303. if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
  304. if city.City != "" {
  305. save["company_city"] = city.City //市
  306. }
  307. if city.District != "" {
  308. save["company_district"] = city.District //县
  309. }
  310. }
  311. break
  312. }
  313. }
  314. }
  315. }
  316. // search_type
  317. if t := util.ObjToString(save["company_type"]); t != "" {
  318. if t != "个体工商户" && t != "其他" {
  319. t1 := util.ObjToString(save["company_type_old"])
  320. name := util.ObjToString(save["company_name"])
  321. if strings.Contains(t1, "有限合伙") {
  322. save["search_type"] = "有限合伙"
  323. } else if strings.Contains(t1, "合伙") {
  324. save["search_type"] = "普通合伙"
  325. } else if strings.Contains(name, "股份") ||
  326. (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
  327. save["search_type"] = "股份有限公司"
  328. } else {
  329. save["search_type"] = "有限责任公司"
  330. }
  331. }
  332. }
  333. // company_shortname
  334. if m := getStName(util.ObjToString(save["company_name"])); m != "" {
  335. save["company_shortname"] = m
  336. }
  337. // bid_unittype
  338. flag := false
  339. for _, v := range WordsArr {
  340. if strings.Contains(util.ObjToString(save["business_scope"]), v) {
  341. flag = true
  342. break
  343. }
  344. }
  345. if flag {
  346. if save["bid_unittype"] != nil {
  347. save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商")
  348. } else {
  349. save["bid_unittype"] = []string{"厂商"}
  350. }
  351. }
  352. update["$set"] = save
  353. updataInfo := []map[string]interface{}{
  354. {"_id": save["_id"]},
  355. update,
  356. }
  357. MongoTool.UpSertBulk(GF.Env.Dbsave, updataInfo)
  358. //saveArr = append(saveArr, updataInfo)
  359. //
  360. ////500 条处理一次,打印一次记录
  361. //if len(saveArr) >= 500 {
  362. // tmps := saveArr
  363. // res := MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  364. // if !res {
  365. // log.Info("dealCompanyBase", zap.Any("UpSertBulk company_base err", res))
  366. // }
  367. // saveArr = [][]map[string]interface{}{}
  368. //}
  369. }
  370. //dealCompanyEmployee company_employee
  371. func dealCompanyEmployee(data map[string]interface{}) {
  372. save := make(map[string]interface{})
  373. save["_id"] = data["company_id"]
  374. save["updatetime"] = time.Now().Unix()
  375. oldTmp := &map[string]interface{}{}
  376. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  377. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  378. var names []string
  379. var arr []map[string]interface{}
  380. if (*oldTmp)["employees"] != nil {
  381. arr = util.ObjArrToMapArr((*oldTmp)["employees"].([]interface{}))
  382. } else {
  383. arr = make([]map[string]interface{}, 0)
  384. }
  385. ep := make(map[string]interface{})
  386. if util.ObjToString(data["_operation_type"]) == "insert" {
  387. ep["employee_name"] = data["employee_name"]
  388. ep["position"] = data["position"]
  389. ep["is_history"] = data["is_history"]
  390. ep["_id"] = util.IntAll(data["id"])
  391. arr = append(arr, ep)
  392. names = append(names, util.ObjToString(data["employee_name"]))
  393. } else {
  394. eq_flag := true
  395. for _, m := range arr {
  396. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  397. eq_flag = false
  398. m["employee_name"] = data["employee_name"]
  399. m["position"] = data["position"]
  400. m["is_history"] = data["is_history"]
  401. break
  402. }
  403. }
  404. if eq_flag {
  405. ep := make(map[string]interface{})
  406. ep["employee_name"] = data["employee_name"]
  407. ep["position"] = data["position"]
  408. ep["is_history"] = data["is_history"]
  409. ep["_id"] = util.IntAll(data["id"])
  410. arr = append(arr, ep)
  411. names = append(names, util.ObjToString(data["employee_name"]))
  412. }
  413. }
  414. save["employees"] = arr
  415. save["employee_name"] = strings.Join(names, ",")
  416. saveInfo := []map[string]interface{}{
  417. {"_id": data["company_id"]},
  418. {"$set": save},
  419. }
  420. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  421. //saveArr = append(saveArr, saveInfo)
  422. ////500 条处理一次,打印一次记录
  423. //if len(saveArr) >= 500 {
  424. // tmps := saveArr
  425. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  426. // saveArr = [][]map[string]interface{}{}
  427. //}
  428. }
  429. //dealCompanyPartner
  430. func dealCompanyPartner(data map[string]interface{}) {
  431. save := make(map[string]interface{})
  432. save["_id"] = data["company_id"]
  433. save["updatetime"] = time.Now().Unix()
  434. oldTmp := &map[string]interface{}{}
  435. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  436. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  437. var names []string
  438. var arr []map[string]interface{}
  439. if (*oldTmp)["partners"] != nil {
  440. arr = util.ObjArrToMapArr((*oldTmp)["partners"].([]interface{}))
  441. } else {
  442. arr = make([]map[string]interface{}, 0)
  443. }
  444. if util.ObjToString(data["_operation_type"]) == "insert" {
  445. exp := make(map[string]interface{})
  446. exp["stock_capital"] = data["stock_capital"]
  447. exp["stock_name"] = data["stock_name"]
  448. exp["identify_no"] = data["identify_no"]
  449. exp["stock_realcapital"] = data["stock_realcapital"]
  450. exp["is_history"] = data["is_history"]
  451. exp["is_personal"] = data["is_personal"]
  452. exp["stock_type"] = data["stock_type"]
  453. exp["identify_type"] = data["identify_type"]
  454. exp["_id"] = util.IntAll(data["id"])
  455. arr = append(arr, exp)
  456. names = append(names, util.ObjToString(data["stock_name"]))
  457. } else {
  458. eqFlag := true
  459. for _, m := range arr {
  460. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  461. eqFlag = false
  462. m["stock_capital"] = data["stock_capital"]
  463. m["stock_name"] = data["stock_name"]
  464. m["identify_no"] = data["identify_no"]
  465. m["stock_realcapital"] = data["stock_realcapital"]
  466. m["is_history"] = data["is_history"]
  467. m["is_personal"] = data["is_personal"]
  468. m["stock_type"] = data["stock_type"]
  469. m["identify_type"] = data["identify_type"]
  470. break
  471. }
  472. }
  473. if eqFlag {
  474. exp := make(map[string]interface{})
  475. exp["stock_capital"] = data["stock_capital"]
  476. exp["stock_name"] = data["stock_name"]
  477. exp["identify_no"] = data["identify_no"]
  478. exp["stock_realcapital"] = data["stock_realcapital"]
  479. exp["is_history"] = data["is_history"]
  480. exp["is_personal"] = data["is_personal"]
  481. exp["stock_type"] = data["stock_type"]
  482. exp["identify_type"] = data["identify_type"]
  483. exp["_id"] = util.IntAll(data["id"])
  484. arr = append(arr, exp)
  485. names = append(names, util.ObjToString(data["stock_name"]))
  486. }
  487. }
  488. save["partners"] = arr
  489. save["stock_name"] = strings.Join(names, ",")
  490. saveInfo := []map[string]interface{}{
  491. {"_id": data["company_id"]},
  492. {"$set": save},
  493. }
  494. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  495. //saveArr = append(saveArr, saveInfo)
  496. ////500 条处理一次,打印一次记录
  497. //if len(saveArr) >= 500 {
  498. // tmps := saveArr
  499. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  500. // saveArr = [][]map[string]interface{}{}
  501. //}
  502. }
  503. //dealAnnualReportBase annual_report_base
  504. func dealAnnualReportBase(data map[string]interface{}) {
  505. save := make(map[string]interface{})
  506. save["_id"] = data["company_id"]
  507. save["updatetime"] = time.Now().Unix()
  508. oldTmp := &map[string]interface{}{}
  509. fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
  510. oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
  511. var arr []map[string]interface{}
  512. if (*oldTmp)["annual_reports"] != nil {
  513. arr = util.ObjArrToMapArr((*oldTmp)["annual_reports"].([]interface{}))
  514. } else {
  515. arr = make([]map[string]interface{}, 0)
  516. }
  517. year := 0
  518. phone, email := "", ""
  519. employeeNum := 0
  520. if util.ObjToString(data["_operation_type"]) == "insert" {
  521. exp := make(map[string]interface{})
  522. exp["operator_name"] = data["operator_name"]
  523. exp["report_year"] = data["report_year"]
  524. exp["zip_code"] = data["zip_code"]
  525. exp["employee_no"] = data["employee_no"]
  526. exp["member_no"] = data["member_no"]
  527. exp["company_phone"] = data["company_phone"]
  528. exp["company_email"] = data["company_email"]
  529. exp["_id"] = util.IntAll(data["id"])
  530. arr = append(arr, exp)
  531. if year < util.IntAll(data["report_year"]) {
  532. year = util.IntAll(data["report_year"])
  533. phone = util.ObjToString(data["company_phone"])
  534. email = util.ObjToString(data["company_email"])
  535. employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
  536. memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
  537. if employeeNo > 0 {
  538. employeeNum = employeeNo
  539. } else if memberNo > 0 {
  540. employeeNum = memberNo
  541. }
  542. }
  543. } else {
  544. eqFlag := true
  545. for _, m := range arr {
  546. if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
  547. eqFlag = false
  548. m["operator_name"] = data["operator_name"]
  549. m["report_year"] = data["report_year"]
  550. m["zip_code"] = data["zip_code"]
  551. m["employee_no"] = data["employee_no"]
  552. m["member_no"] = data["member_no"]
  553. m["company_phone"] = data["company_phone"]
  554. m["company_email"] = data["company_email"]
  555. break
  556. }
  557. }
  558. if eqFlag {
  559. exp := make(map[string]interface{})
  560. exp["operator_name"] = data["operator_name"]
  561. exp["report_year"] = data["report_year"]
  562. exp["zip_code"] = data["zip_code"]
  563. exp["employee_no"] = data["employee_no"]
  564. exp["member_no"] = data["member_no"]
  565. exp["company_phone"] = data["company_phone"]
  566. exp["company_email"] = data["company_email"]
  567. exp["_id"] = util.IntAll(data["id"])
  568. arr = append(arr, exp)
  569. if year < util.IntAll(data["report_year"]) {
  570. year = util.IntAll(data["report_year"])
  571. phone = util.ObjToString(data["company_phone"])
  572. email = util.ObjToString(data["company_email"])
  573. employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
  574. memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
  575. if employeeNo > 0 {
  576. employeeNum = employeeNo
  577. } else if memberNo > 0 {
  578. employeeNum = memberNo
  579. }
  580. }
  581. }
  582. }
  583. save["annual_reports"] = arr
  584. if year != 0 {
  585. save["company_phone"] = phone
  586. save["company_email"] = email
  587. save["employee_num"] = employeeNum
  588. }
  589. saveInfo := []map[string]interface{}{
  590. {"_id": data["company_id"]},
  591. {"$set": save},
  592. }
  593. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  594. //
  595. //saveArr = append(saveArr, saveInfo)
  596. ////500 条处理一次,打印一次记录
  597. //if len(saveArr) >= 500 {
  598. // tmps := saveArr
  599. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  600. // saveArr = [][]map[string]interface{}{}
  601. //}
  602. }
  603. //dealHistoryName company_history_name
  604. func dealHistoryName(data map[string]interface{}) {
  605. save := make(map[string]interface{})
  606. save["_id"] = data["company_id"]
  607. save["updatetime"] = time.Now().Unix()
  608. var names []string
  609. if data["history_name"] != nil {
  610. name := data["history_name"].(string)
  611. names = append(names, name)
  612. save["history_name"] = strings.Join(names, ",")
  613. }
  614. saveInfo := []map[string]interface{}{
  615. {"_id": data["company_id"]},
  616. {"$set": save},
  617. }
  618. //saveArr = append(saveArr, saveInfo)
  619. addSet := []map[string]interface{}{
  620. {"_id": data["company_id"]},
  621. {"$addToSet": map[string]interface{}{
  622. "history_names": map[string]interface{}{
  623. "$each": names}}},
  624. }
  625. //单独对每条的历史名称追加数组
  626. MongoTool.UpSertBulk(GF.Env.Dbsave, addSet)
  627. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  628. ////500 条处理一次,打印一次记录
  629. //if len(saveArr) >= 500 {
  630. // tmps := saveArr
  631. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  632. // saveArr = [][]map[string]interface{}{}
  633. //}
  634. }
  635. //dealAnnualReportWebsite annual_report_website
  636. func dealAnnualReportWebsite(data map[string]interface{}) {
  637. save := make(map[string]interface{})
  638. save["_id"] = data["company_id"]
  639. save["updatetime"] = time.Now().Unix()
  640. year := 0
  641. web := ""
  642. if year < util.IntAll(data["report_year"]) && util.IntAll(data["is_history"]) == 0 {
  643. year = util.IntAll(data["report_year"])
  644. web = util.ObjToString(data["website_url"])
  645. }
  646. if year != 0 {
  647. save["website_url"] = web
  648. }
  649. saveInfo := []map[string]interface{}{
  650. {"_id": data["company_id"]},
  651. {"$set": save},
  652. }
  653. MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
  654. //saveArr = append(saveArr, saveInfo)
  655. //500 条处理一次,打印一次记录
  656. //if len(saveArr) >= 500 {
  657. // tmps := saveArr
  658. // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
  659. // saveArr = [][]map[string]interface{}{}
  660. //}
  661. }