123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971 |
- package main
- import (
- "bufio"
- "compress/gzip"
- "encoding/json"
- "fmt"
- "go.uber.org/zap"
- "io"
- "io/ioutil"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "os"
- "runtime"
- "strings"
- "sync"
- "time"
- )
- var (
- //CurrentColl string //当前表名
- //SkipCollName string
- //sendMsg string
- //collCount int // 当前表数据量
- //insertCount int // insert数据
- //updateCount int // update数据
- //saveLog = make(map[string]interface{})
- savelog sync.Map
- //saveArr [][]map[string]interface{}
- UdpClient udp.UdpClient
- qyxyEsAddr *net.UDPAddr
- localPort string // 本地监听端口
- startTime int64 //std 程序开始执行的时间
- readPath string //
- )
- func main() {
- Init() //初始化配置
- localPort = GF.Env.Localport //udp 本地监听地址
- UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
- readPath = GF.Env.Path //手动指定同步的文件夹名称,否则就使用udp 传递数据
- qyxyEsAddr = &net.UDPAddr{
- Port: GF.Env.Esport,
- IP: net.ParseIP(GF.Env.Targetip),
- }
- log.Info("main", zap.Any("qyxyEsAddr", qyxyEsAddr))
- UdpClient.Listen(processUdpMsg)
- log.Info("main", zap.String("Udp服务监听本地端口", localPort))
- ch := make(chan bool, 1)
- <-ch
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
- if err != nil {
- log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
- UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- //拿到同步信号,开始同步数据
- if _, ok := mapInfo["start"]; ok {
- if _, okk := mapInfo["path"]; okk {
- path := util.ObjToString(mapInfo["path"])
- //没有指定配置文件的指定目录,就使用udp 传递目录
- if path != "" {
- readPath = path
- }
- }
- // 开始执行
- log.Info("processUdpMsg", zap.String("readPath", readPath))
- if readPath != "" {
- startTime = time.Now().Unix()
- go dealPath(readPath)
- }
- }
- }
- default:
- log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "========"))
- }
- }
- func dealPath(path string) {
- if !strings.HasSuffix(path, "/") {
- path = path + "/" ///Users/wangchengcheng/Desktop/jianyu/upload/20221119
- }
- //var wg sync.WaitGroup
- //std 程序只需要关注6个表
- for _, c := range CollArr {
- subPath := path + c + "/"
- //判断文件夹存在
- _, err := os.Stat(subPath) ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/
- if err != nil {
- log.Info("dealPath", zap.Any("os.Stat err", subPath))
- continue
- }
- //wg.Add(1)
- dealSubPath(c, subPath)
- //go dealSubPath(c, subPath, &wg)
- }
- //wg.Wait()
- //更新 nseo_id
- updateStd()
- //最终记录
- var saveRes = make(map[string]interface{})
- for _, c := range CollArr {
- data, ok := savelog.Load(c)
- if ok {
- saveRes[c] = data
- }
- }
- MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveRes})
- //执行完毕通知es 程序
- data := map[string]interface{}{
- "start": true,
- "start_time": startTime,
- "end_time": time.Now().Unix(),
- }
- log.Info("dealPath", zap.String(path, "数据同步结束"))
- SendUdpMsg(data, qyxyEsAddr)
- }
- // dealSubPath 处理最里面层级数据;Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
- // c 当前表;
- func dealSubPath(c, subPath string) {
- //defer wg.Done()
- log.Info("dealSubPath", zap.String("开始处理path:", subPath))
- start := time.Now()
- //var fileWg sync.WaitGroup
- var linesMap sync.Map
- subFiles, _ := ioutil.ReadDir(subPath)
- for _, s := range subFiles { //七天的文件夹名
- if s.IsDir() {
- //fileWg.Add(1)
- dealinfo(c, subPath+s.Name(), &linesMap)
- ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
- //go dealinfo(c, subPath+s.Name(), &fileWg, &linesMap)
- }
- }
- //fileWg.Wait()
- cCount, _ := linesMap.Load(c)
- duration := time.Since(start)
- result := map[string]interface{}{
- "count": cCount,
- "duration": fmt.Sprintf("%v min", duration.Minutes()), //运行时长
- }
- savelog.Store(c, result)
- }
- func dealinfo(c, path string, linesMap *sync.Map) {
- //defer wg.Done()
- count := 0
- file := path + "/split.json.gz"
- log.Info("dealinfo", zap.Any("current date", file))
- fileInfo, err := os.Stat(file)
- if fileInfo.Size() == 0 {
- return
- }
- if err == nil {
- // 打开本地gz格式压缩包
- fr, err := os.Open(file)
- if err != nil {
- log.Info("dealinfo", zap.Error(err))
- return
- } else {
- fmt.Println("open file success!", file)
- }
- // defer: 在函数退出时,执行关闭文件
- defer fr.Close()
- // 创建gzip文件读取对象
- gr, err := gzip.NewReader(fr)
- if err != nil {
- log.Info("reader "+file, zap.Error(err))
- return
- }
- // defer: 在函数退出时,执行关闭gzip对象
- defer gr.Close()
- wg := sync.WaitGroup{}
- ch := make(chan bool, 3)
- bfRd := bufio.NewReader(gr)
- for {
- line, err := bfRd.ReadBytes('\n')
- if err != nil {
- if err == io.EOF {
- log.Info("dealinfo", zap.String(fmt.Sprintf("%v/split.json.gz", path), "read gzip data finish!"))
- fmt.Println("read gzip data finish! ")
- break
- } else {
- log.Error("dealinfo", zap.Any(fmt.Sprintf("%v/split.json.gz;read gzip err", path), err))
- }
- }
- if len(line) > 0 {
- count++
- linesMap.Store(c, count)
- ch <- true
- wg.Add(1)
- go func(c string, line []byte) {
- defer func() {
- <-ch
- wg.Done()
- }()
- hookfn(c, line)
- }(c, line)
- }
- if count%5000 == 0 {
- printMemoryUsage()
- log.Info("dealinfo", zap.Any("current exc---", fmt.Sprintf("%s-%d", file, count)))
- }
- }
- wg.Wait()
- }
- }
- // hookfn 处理拿到的每行数据
- func hookfn(c string, line []byte) {
- tmp := make(map[string]interface{})
- err := json.Unmarshal(line, &tmp)
- if err != nil {
- log.Error("hookfn", zap.Any("Unmarshal", err))
- }
- if _, ok := tmp["company_id"]; ok {
- //针对数据表 不同处理
- switch c {
- case "company_base":
- dealCompanyBase(tmp)
- case "company_employee":
- dealCompanyEmployee(tmp)
- case "company_history_name":
- dealHistoryName(tmp)
- case "company_partner":
- dealCompanyPartner(tmp)
- case "annual_report_base":
- dealAnnualReportBase(tmp)
- case "annual_report_website":
- dealAnnualReportWebsite(tmp)
- //处理特使企业信息
- case "special_enterprise", "special_foundation", "special_gov_unit", "special_hongkong_company", "special_law_office", "special_social_organ", "special_trade_union":
- dealSpecial(c, tmp)
- default:
- fmt.Println("CurrentColl =>", c)
- }
- }
- }
- // dealCompanyBase company_base数据表
- func dealCompanyBase(data map[string]interface{}) {
- update := make(map[string]interface{})
- save := make(map[string]interface{})
- for _, v := range company_base {
- if data[v] == nil {
- continue
- }
- // company_type 公司类型处理
- if v == "company_type" {
- save["company_type_old"] = data[v]
- if text := util.ObjToString(data["company_type"]); text != "" {
- if strings.Contains(text, "个体") || strings.Contains(text, "非公司") {
- save["company_type"] = "个体工商户"
- } else {
- text = strings.ReplaceAll(text, "(", "(")
- text = strings.ReplaceAll(text, ")", ")")
- if stype := QyStypeMap[text]; stype != "" {
- save["company_type"] = stype
- } else {
- save["company_type"] = "其他"
- }
- }
- }
- } else if v == "company_status" {
- save["company_status_old"] = data[v]
- if text := util.ObjToString(data["company_status"]); text != "" {
- text = strings.ReplaceAll(text, "(", "(")
- text = strings.ReplaceAll(text, ")", ")")
- if status := CompanyStatusMap[text]; status != "" {
- save["company_status"] = status
- } else {
- save["company_status"] = "其他"
- }
- }
- } else if v == "capital" {
- // capital/currency
- text := util.ObjToString(data[v])
- if currency := GetCurrency(text); currency != "" {
- save["currency"] = currency //币种
- }
- capital := ObjToMoney(text)
- capital = capital / 10000
- //if capital != 0 {
- save[v] = capital
- //}
- } else if v == "use_flag" {
- save[v] = util.IntAll(data[v])
- } else {
- save[v] = data[v]
- }
- }
- // mysql create_time/update_time
- save["create_time_msql"] = data["create_time"]
- save["update_time_msql"] = data["update_time"]
- save["_id"] = data["company_id"]
- save["autoid"] = util.Int64All(data["id"])
- save["createtime"] = time.Now().Unix()
- save["updatetime"] = time.Now().Unix()
- // company_area/company_city/company_district
- pshort := util.ObjToString(data["province_short"])
- save["company_area"] = province_map[pshort]
- //company_city,company_district
- for i, field := range AreaFiled {
- if data[field] == nil {
- continue
- }
- if code := fmt.Sprint(data[field]); code != "" {
- if i == 0 && len(code) >= 8 { //credit_no企业信用代码
- code = code[2:8]
- } else if i == 1 && len(code) >= 6 { //company_code注册号
- code = code[:6]
- }
- if city := AddressMap[code]; city != nil { //未作废中取
- if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
- if city.City != "" {
- save["company_city"] = city.City //市
- }
- if city.District != "" {
- save["company_district"] = city.District //县
- }
- break
- }
- } else { //作废中取
- if city := AddressOldMap[code]; city != nil {
- if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
- if city.City != "" {
- save["company_city"] = city.City //市
- }
- if city.District != "" {
- save["company_district"] = city.District //县
- }
- }
- break
- }
- }
- }
- }
- // search_type
- if t := util.ObjToString(save["company_type"]); t != "" {
- if t != "个体工商户" && t != "其他" {
- t1 := util.ObjToString(save["company_type_old"])
- name := util.ObjToString(save["company_name"])
- if strings.Contains(t1, "有限合伙") {
- save["search_type"] = "有限合伙"
- } else if strings.Contains(t1, "合伙") {
- save["search_type"] = "普通合伙"
- } else if strings.Contains(name, "股份") ||
- (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
- save["search_type"] = "股份有限公司"
- } else {
- save["search_type"] = "有限责任公司"
- }
- }
- }
- // company_shortname
- if m := getStName(util.ObjToString(save["company_name"])); m != "" {
- save["company_shortname"] = m
- }
- // bid_unittype
- flag := false
- for _, v := range WordsArr {
- if strings.Contains(util.ObjToString(save["business_scope"]), v) {
- flag = true
- break
- }
- }
- if flag {
- if save["bid_unittype"] != nil {
- save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商")
- } else {
- save["bid_unittype"] = []string{"厂商"}
- }
- }
- update["$set"] = save
- updataInfo := []map[string]interface{}{
- {"_id": save["_id"]},
- update,
- }
- MongoTool.UpSertBulk(GF.Env.Dbsave, updataInfo)
- //saveArr = append(saveArr, updataInfo)
- //
- ////500 条处理一次,打印一次记录
- //if len(saveArr) >= 500 {
- // tmps := saveArr
- // res := MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- // if !res {
- // log.Info("dealCompanyBase", zap.Any("UpSertBulk company_base err", res))
- // }
- // saveArr = [][]map[string]interface{}{}
- //}
- }
- // dealSpecial 处理特殊企业数据
- func dealSpecial(coll string, data map[string]interface{}) {
- update := make(map[string]interface{})
- save := make(map[string]interface{})
- companyID := util.ObjToString(data["company_id"])
- if companyID == "" {
- return
- }
- for _, v := range company_base {
- if data[v] == nil {
- continue
- }
- // company_type 公司类型处理
- if v == "company_type" {
- save["company_type_old"] = data[v]
- if text := util.ObjToString(data["company_type"]); text != "" {
- if strings.Contains(text, "个体") || strings.Contains(text, "非公司") {
- save["company_type"] = "个体工商户"
- } else {
- text = strings.ReplaceAll(text, "(", "(")
- text = strings.ReplaceAll(text, ")", ")")
- if stype := QyStypeMap[text]; stype != "" {
- save["company_type"] = stype
- } else {
- save["company_type"] = "其他"
- }
- }
- }
- } else if v == "company_status" {
- save["company_status_old"] = data[v]
- if text := util.ObjToString(data["company_status"]); text != "" {
- text = strings.ReplaceAll(text, "(", "(")
- text = strings.ReplaceAll(text, ")", ")")
- if status := CompanyStatusMap[text]; status != "" {
- save["company_status"] = status
- } else {
- save["company_status"] = "其他"
- }
- }
- } else if v == "capital" {
- // capital/currency
- text := util.ObjToString(data[v])
- if currency := GetCurrency(text); currency != "" {
- save["currency"] = currency //币种
- }
- capital := ObjToMoney(text)
- capital = capital / 10000
- if capital != 0 {
- save[v] = capital
- }
- } else if v == "use_flag" {
- save[v] = util.IntAll(data[v])
- } else {
- save[v] = data[v]
- }
- }
- // mysql create_time/update_time
- save["create_time_msql"] = data["create_time"]
- save["update_time_msql"] = data["update_time"]
- save["_id"] = data["company_id"]
- save["createtime"] = time.Now().Unix()
- save["updatetime"] = time.Now().Unix()
- // company_area/company_city/company_district
- pshort := util.ObjToString(data["province_short"])
- save["company_area"] = province_map[pshort]
- //company_city,company_district
- for i, field := range AreaFiled {
- if data[field] == nil {
- continue
- }
- if code := fmt.Sprint(data[field]); code != "" {
- if i == 0 && len(code) >= 8 { //credit_no企业信用代码
- code = code[2:8]
- } else if i == 1 && len(code) >= 6 { //company_code注册号
- code = code[:6]
- }
- if city := AddressMap[code]; city != nil { //未作废中取
- if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
- if city.City != "" {
- save["company_city"] = city.City //市
- }
- if city.District != "" {
- save["company_district"] = city.District //县
- }
- break
- }
- } else { //作废中取
- if city := AddressOldMap[code]; city != nil {
- if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
- if city.City != "" {
- save["company_city"] = city.City //市
- }
- if city.District != "" {
- save["company_district"] = city.District //县
- }
- }
- break
- }
- }
- }
- }
- // search_type
- if t := util.ObjToString(save["company_type"]); t != "" {
- if t != "个体工商户" && t != "其他" {
- t1 := util.ObjToString(save["company_type_old"])
- name := util.ObjToString(save["company_name"])
- if strings.Contains(t1, "有限合伙") {
- save["search_type"] = "有限合伙"
- } else if strings.Contains(t1, "合伙") {
- save["search_type"] = "普通合伙"
- } else if strings.Contains(name, "股份") ||
- (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
- save["search_type"] = "股份有限公司"
- } else {
- save["search_type"] = "有限责任公司"
- }
- }
- }
- // company_shortname
- if m := getStName(util.ObjToString(save["company_name"])); m != "" {
- save["company_shortname"] = m
- }
- // bid_unittype
- flag := false
- for _, v := range WordsArr {
- if strings.Contains(util.ObjToString(save["business_scope"]), v) {
- flag = true
- break
- }
- }
- if flag {
- if save["bid_unittype"] != nil {
- save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商")
- } else {
- save["bid_unittype"] = []string{"厂商"}
- }
- }
- oldInfo, _ := MongoTool.FindOne(GF.Env.Dbsave, map[string]interface{}{"_id": companyID})
- if len(*oldInfo) > 0 && util.Int64All((*oldInfo)["autoid"]) > 0 {
- save["autoid"] = (*oldInfo)["autoid"]
- } else {
- id := util.Int64All(data["id"])
- /**
- special_enterprise,起始ID 从2000000000 开始+data["id"]
- special_foundation,起始ID 从3000000000 开始+data["id"]
- special_gov_unit,起始ID 从4000000000 开始+data["id"]
- special_hongkong_company,起始ID 从5000000000 开始+data["id"]
- special_law_office,起始ID 从6000000000 开始+data["id"]
- special_social_organ,起始ID 从7000000000 开始+data["id"]
- special_trade_union,起始ID 从8000000000 开始+data["id"]
- */
- startID := 0
- if coll == "special_enterprise" {
- startID = 2000000000
- } else if coll == "special_foundation" {
- startID = 3000000000
- } else if coll == "special_gov_unit" {
- startID = 4000000000
- } else if coll == "special_hongkong_company" {
- startID = 5000000000
- } else if coll == "special_law_office" {
- startID = 6000000000
- } else if coll == "special_social_organ" {
- startID = 7000000000
- } else if coll == "special_trade_union" {
- startID = 8000000000
- }
- autoid := startID + int(id)
- save["autoid"] = autoid
- }
- update["$set"] = save
- updataInfo := []map[string]interface{}{
- {"_id": save["_id"]},
- update,
- }
- MongoTool.UpSertBulk(GF.Env.Dbsave, updataInfo)
- }
- // dealCompanyEmployee company_employee
- func dealCompanyEmployee(data map[string]interface{}) {
- save := make(map[string]interface{})
- save["_id"] = data["company_id"]
- save["updatetime"] = time.Now().Unix()
- oldTmp := &map[string]interface{}{}
- fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
- oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
- var names []string
- var arr []map[string]interface{}
- if (*oldTmp)["employees"] != nil {
- arr = util.ObjArrToMapArr((*oldTmp)["employees"].([]interface{}))
- } else {
- arr = make([]map[string]interface{}, 0)
- }
- ep := make(map[string]interface{})
- if util.ObjToString(data["_operation_type"]) == "insert" {
- ep["employee_name"] = data["employee_name"]
- ep["position"] = data["position"]
- ep["is_history"] = data["is_history"]
- ep["_id"] = util.IntAll(data["id"])
- arr = append(arr, ep)
- names = append(names, util.ObjToString(data["employee_name"]))
- } else {
- eq_flag := true
- for _, m := range arr {
- if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
- eq_flag = false
- m["employee_name"] = data["employee_name"]
- m["position"] = data["position"]
- m["is_history"] = data["is_history"]
- break
- }
- }
- if eq_flag {
- ep := make(map[string]interface{})
- ep["employee_name"] = data["employee_name"]
- ep["position"] = data["position"]
- ep["is_history"] = data["is_history"]
- ep["_id"] = util.IntAll(data["id"])
- arr = append(arr, ep)
- names = append(names, util.ObjToString(data["employee_name"]))
- }
- }
- save["employees"] = arr
- save["employee_name"] = strings.Join(names, ",")
- saveInfo := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$set": save},
- }
- MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
- //saveArr = append(saveArr, saveInfo)
- ////500 条处理一次,打印一次记录
- //if len(saveArr) >= 500 {
- // tmps := saveArr
- // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- // saveArr = [][]map[string]interface{}{}
- //}
- }
- // dealCompanyPartner
- func dealCompanyPartner(data map[string]interface{}) {
- save := make(map[string]interface{})
- save["_id"] = data["company_id"]
- save["updatetime"] = time.Now().Unix()
- oldTmp := &map[string]interface{}{}
- fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
- oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
- var names []string
- var arr []map[string]interface{}
- if (*oldTmp)["partners"] != nil {
- arr = util.ObjArrToMapArr((*oldTmp)["partners"].([]interface{}))
- } else {
- arr = make([]map[string]interface{}, 0)
- }
- if util.ObjToString(data["_operation_type"]) == "insert" {
- exp := make(map[string]interface{})
- exp["stock_capital"] = data["stock_capital"]
- exp["stock_name"] = data["stock_name"]
- exp["identify_no"] = data["identify_no"]
- exp["stock_realcapital"] = data["stock_realcapital"]
- exp["is_history"] = data["is_history"]
- exp["is_personal"] = data["is_personal"]
- exp["stock_type"] = data["stock_type"]
- exp["identify_type"] = data["identify_type"]
- exp["_id"] = util.IntAll(data["id"])
- arr = append(arr, exp)
- names = append(names, util.ObjToString(data["stock_name"]))
- } else {
- eqFlag := true
- for _, m := range arr {
- if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
- eqFlag = false
- m["stock_capital"] = data["stock_capital"]
- m["stock_name"] = data["stock_name"]
- m["identify_no"] = data["identify_no"]
- m["stock_realcapital"] = data["stock_realcapital"]
- m["is_history"] = data["is_history"]
- m["is_personal"] = data["is_personal"]
- m["stock_type"] = data["stock_type"]
- m["identify_type"] = data["identify_type"]
- break
- }
- }
- if eqFlag {
- exp := make(map[string]interface{})
- exp["stock_capital"] = data["stock_capital"]
- exp["stock_name"] = data["stock_name"]
- exp["identify_no"] = data["identify_no"]
- exp["stock_realcapital"] = data["stock_realcapital"]
- exp["is_history"] = data["is_history"]
- exp["is_personal"] = data["is_personal"]
- exp["stock_type"] = data["stock_type"]
- exp["identify_type"] = data["identify_type"]
- exp["_id"] = util.IntAll(data["id"])
- arr = append(arr, exp)
- names = append(names, util.ObjToString(data["stock_name"]))
- }
- }
- save["partners"] = arr
- save["stock_name"] = strings.Join(names, ",")
- saveInfo := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$set": save},
- }
- MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
- //saveArr = append(saveArr, saveInfo)
- ////500 条处理一次,打印一次记录
- //if len(saveArr) >= 500 {
- // tmps := saveArr
- // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- // saveArr = [][]map[string]interface{}{}
- //}
- }
- // dealAnnualReportBase annual_report_base
- func dealAnnualReportBase(data map[string]interface{}) {
- save := make(map[string]interface{})
- save["_id"] = data["company_id"]
- save["updatetime"] = time.Now().Unix()
- oldTmp := &map[string]interface{}{}
- fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
- oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
- var arr []map[string]interface{}
- if (*oldTmp)["annual_reports"] != nil {
- arr = util.ObjArrToMapArr((*oldTmp)["annual_reports"].([]interface{}))
- } else {
- arr = make([]map[string]interface{}, 0)
- }
- year := 0
- phone, email := "", ""
- employeeNum := 0
- if util.ObjToString(data["_operation_type"]) == "insert" {
- exp := make(map[string]interface{})
- exp["operator_name"] = data["operator_name"]
- exp["report_year"] = data["report_year"]
- exp["zip_code"] = data["zip_code"]
- exp["employee_no"] = data["employee_no"]
- exp["member_no"] = data["member_no"]
- exp["company_phone"] = data["company_phone"]
- exp["company_email"] = data["company_email"]
- exp["_id"] = util.IntAll(data["id"])
- arr = append(arr, exp)
- if year < util.IntAll(data["report_year"]) {
- year = util.IntAll(data["report_year"])
- phone = util.ObjToString(data["company_phone"])
- email = util.ObjToString(data["company_email"])
- employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
- memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
- if employeeNo > 0 {
- employeeNum = employeeNo
- } else if memberNo > 0 {
- employeeNum = memberNo
- }
- }
- } else {
- eqFlag := true
- for _, m := range arr {
- if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
- eqFlag = false
- m["operator_name"] = data["operator_name"]
- m["report_year"] = data["report_year"]
- m["zip_code"] = data["zip_code"]
- m["employee_no"] = data["employee_no"]
- m["member_no"] = data["member_no"]
- m["company_phone"] = data["company_phone"]
- m["company_email"] = data["company_email"]
- break
- }
- }
- if eqFlag {
- exp := make(map[string]interface{})
- exp["operator_name"] = data["operator_name"]
- exp["report_year"] = data["report_year"]
- exp["zip_code"] = data["zip_code"]
- exp["employee_no"] = data["employee_no"]
- exp["member_no"] = data["member_no"]
- exp["company_phone"] = data["company_phone"]
- exp["company_email"] = data["company_email"]
- exp["_id"] = util.IntAll(data["id"])
- arr = append(arr, exp)
- if year < util.IntAll(data["report_year"]) {
- year = util.IntAll(data["report_year"])
- phone = util.ObjToString(data["company_phone"])
- email = util.ObjToString(data["company_email"])
- employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
- memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
- if employeeNo > 0 {
- employeeNum = employeeNo
- } else if memberNo > 0 {
- employeeNum = memberNo
- }
- }
- }
- }
- save["annual_reports"] = arr
- if year != 0 {
- save["company_phone"] = phone
- save["company_email"] = email
- save["employee_num"] = employeeNum
- }
- saveInfo := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$set": save},
- }
- MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
- //
- //saveArr = append(saveArr, saveInfo)
- ////500 条处理一次,打印一次记录
- //if len(saveArr) >= 500 {
- // tmps := saveArr
- // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- // saveArr = [][]map[string]interface{}{}
- //}
- }
- // dealHistoryName company_history_name
- func dealHistoryName(data map[string]interface{}) {
- save := make(map[string]interface{})
- save["_id"] = data["company_id"]
- save["updatetime"] = time.Now().Unix()
- var names []string
- if data["history_name"] != nil {
- name := data["history_name"].(string)
- names = append(names, name)
- save["history_name"] = strings.Join(names, ",")
- }
- saveInfo := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$set": save},
- }
- //saveArr = append(saveArr, saveInfo)
- addSet := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$addToSet": map[string]interface{}{
- "history_names": map[string]interface{}{
- "$each": names}}},
- }
- //单独对每条的历史名称追加数组
- MongoTool.UpSertBulk(GF.Env.Dbsave, addSet)
- MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
- ////500 条处理一次,打印一次记录
- //if len(saveArr) >= 500 {
- // tmps := saveArr
- // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- // saveArr = [][]map[string]interface{}{}
- //}
- }
- // dealAnnualReportWebsite annual_report_website
- func dealAnnualReportWebsite(data map[string]interface{}) {
- save := make(map[string]interface{})
- save["_id"] = data["company_id"]
- save["updatetime"] = time.Now().Unix()
- year := 0
- web := ""
- if year < util.IntAll(data["report_year"]) && util.IntAll(data["is_history"]) == 0 {
- year = util.IntAll(data["report_year"])
- web = util.ObjToString(data["website_url"])
- }
- if year != 0 {
- save["website_url"] = web
- }
- saveInfo := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$set": save},
- }
- MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
- //saveArr = append(saveArr, saveInfo)
- //500 条处理一次,打印一次记录
- //if len(saveArr) >= 500 {
- // tmps := saveArr
- // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- // saveArr = [][]map[string]interface{}{}
- //}
- }
- func printMemoryUsage() {
- var memStats runtime.MemStats
- runtime.ReadMemStats(&memStats)
- // 将字节转换为兆字节(MB)
- allocatedMB := float64(memStats.Alloc) / 1024 / 1024
- totalAllocatedMB := float64(memStats.TotalAlloc) / 1024 / 1024
- heapAllocMB := float64(memStats.HeapAlloc) / 1024 / 1024
- log.Info("printMemoryUsage", zap.Any("当前程序已分配的内存大小", allocatedMB))
- log.Info("printMemoryUsage", zap.Any("程序自启动以来总共分配的内存大小", totalAllocatedMB))
- log.Info("printMemoryUsage", zap.Any("堆上当前已分配但尚未释放的内存", heapAllocMB))
- log.Info("printMemoryUsage", zap.Any("堆上分配的对象数", memStats.HeapObjects))
- }
|