123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710 |
- package main
- import (
- util "app.yhyue.com/data_processing/common_utils"
- "bufio"
- "compress/gzip"
- "encoding/json"
- "fmt"
- "go.uber.org/zap"
- "io"
- "io/ioutil"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "os"
- "strings"
- "time"
- )
- var (
- CurrentColl string //当前表名
- //SkipCollName string
- //sendMsg string
- collCount int // 当前表数据量
- insertCount int // insert数据
- updateCount int // update数据
- saveLog = make(map[string]interface{})
- saveArr [][]map[string]interface{}
- UdpClient udp.UdpClient
- qyxyEsAddr *net.UDPAddr
- localPort string // 本地监听端口
- startTime int64 //std 程序开始执行的时间
- readPath string //
- )
- func main() {
- localPort = GF.Env.Localport //udp 本地监听地址
- UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
- readPath = GF.Env.Path //手动指定同步的文件夹名称,否则就使用udp 传递数据
- qyxyEsAddr = &net.UDPAddr{
- Port: GF.Env.Esport,
- IP: net.ParseIP(GF.Env.Targetip),
- }
- log.Info("main", zap.Any("qyxyEsAddr", qyxyEsAddr))
- startTime = time.Now().Unix()
- UdpClient.Listen(processUdpMsg)
- log.Info("main", zap.String("Udp服务监听本地端口", localPort))
- ch := make(chan bool, 1)
- <-ch
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
- if err != nil {
- log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
- UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- //拿到同步信号,开始同步数据
- if _, ok := mapInfo["start"]; ok {
- if _, okk := mapInfo["path"]; okk {
- path := util.ObjToString(mapInfo["path"])
- //没有指定配置文件的指定目录,就使用udp 传递目录
- if path != "" {
- readPath = path
- }
- }
- // 开始执行
- log.Info("processUdpMsg", zap.String("readPath", readPath))
- if readPath != "" {
- go dealPath(readPath)
- }
- }
- }
- default:
- log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "========"))
- }
- }
- func dealPath(path string) {
- if !strings.HasSuffix(path, "/") {
- path = path + "/" ///Users/wangchengcheng/Desktop/jianyu/upload/20221119
- }
- //std 程序只需要关注6个表
- for _, c := range CollArr {
- start := time.Now()
- collCount = 0
- insertCount = 0
- updateCount = 0
- CurrentColl = c
- subPath := path + c + "/"
- //判断文件夹存在
- _, err := os.Stat(subPath) ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/
- if err != nil {
- log.Info("dealPath", zap.Any("os.Stat err", subPath))
- continue
- }
- subFiles, _ := ioutil.ReadDir(subPath)
- for _, s := range subFiles { //七天的文件夹名
- if s.IsDir() {
- ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
- dealinfo(subPath + s.Name())
- }
- }
- // 判断最后的数据不足500条时 执行
- if len(saveArr) > 0 {
- tmps := saveArr
- MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- saveArr = [][]map[string]interface{}{}
- }
- duration := time.Since(start)
- result := map[string]interface{}{
- "count": collCount,
- "duration": duration.Minutes(), //运行时长
- "insert": insertCount, //插入数量
- "update": updateCount, //更新数量
- }
- saveLog[c] = result
- }
- MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveLog})
- //执行完毕通知es 程序
- data := map[string]interface{}{
- "start": true,
- "start_time": startTime,
- }
- SendUdpMsg(data, qyxyEsAddr)
- }
- func dealinfo(path string) {
- count := 0
- file := path + "/split.json.gz"
- log.Info("dealinfo", zap.Any("current date", file))
- _, err := os.Stat(file)
- if err == nil {
- // 打开本地gz格式压缩包
- fr, err := os.Open(file)
- if err != nil {
- panic(err)
- } else {
- println("open file success!")
- }
- // defer: 在函数退出时,执行关闭文件
- defer fr.Close()
- // 创建gzip文件读取对象
- gr, err := gzip.NewReader(fr)
- if err != nil {
- panic(err)
- }
- // defer: 在函数退出时,执行关闭gzip对象
- defer gr.Close()
- bfRd := bufio.NewReader(gr)
- for {
- line, err := bfRd.ReadBytes('\n')
- if err != nil {
- if err == io.EOF {
- fmt.Println("read gzip data finish! ")
- break
- } else {
- fmt.Println("[read gzip data err]: ", err)
- }
- }
- if len(line) > 0 {
- count = hookfn(line, count)
- }
- if count%1000 == 0 {
- log.Info("dealinfo", zap.Any("current exc---", fmt.Sprintf("%s-%d", file, count)))
- }
- }
- }
- }
- //hookfn 处理拿到的每行数据
- func hookfn(line []byte, count int) int {
- tmp := make(map[string]interface{})
- err := json.Unmarshal(line, &tmp)
- if err != nil {
- log.Error("hookfn", zap.Any("Unmarshal", err))
- }
- count++
- collCount++
- if _, ok := tmp["company_id"]; ok {
- //统计插入、更新数量
- if util.ObjToString(tmp["_operation_type"]) == "insert" {
- insertCount++
- } else {
- updateCount++
- }
- //针对数据表 不同处理
- switch CurrentColl {
- case "company_base":
- dealCompanyBase(tmp)
- case "company_employee":
- dealCompanyEmployee(tmp)
- case "company_history_name":
- dealHistoryName(tmp)
- case "company_partner":
- dealCompanyPartner(tmp)
- case "annual_report_base":
- dealAnnualReportBase(tmp)
- case "annual_report_website":
- dealAnnualReportWebsite(tmp)
- default:
- fmt.Println("CurrentColl =>", CurrentColl)
- }
- }
- return count
- }
- //dealCompanyBase company_base数据表
- func dealCompanyBase(data map[string]interface{}) {
- update := make(map[string]interface{})
- save := make(map[string]interface{})
- for _, v := range company_base {
- if data[v] == nil {
- continue
- }
- // company_type 公司类型处理
- if v == "company_type" {
- save["company_type_old"] = data[v]
- if text := util.ObjToString(data["company_type"]); text != "" {
- if strings.Contains(text, "个体") || strings.Contains(text, "非公司") {
- save["company_type"] = "个体工商户"
- } else {
- text = strings.ReplaceAll(text, "(", "(")
- text = strings.ReplaceAll(text, ")", ")")
- if stype := QyStypeMap[text]; stype != "" {
- save["company_type"] = stype
- } else {
- save["company_type"] = "其他"
- }
- }
- }
- } else if v == "company_status" {
- save["company_status_old"] = data[v]
- if text := util.ObjToString(data["company_status"]); text != "" {
- text = strings.ReplaceAll(text, "(", "(")
- text = strings.ReplaceAll(text, ")", ")")
- if status := CompanyStatusMap[text]; status != "" {
- save["company_status"] = status
- } else {
- save["company_status"] = "其他"
- }
- }
- } else if v == "capital" {
- // capital/currency
- text := util.ObjToString(data[v])
- if currency := GetCurrency(text); currency != "" {
- save["currency"] = currency //币种
- }
- capital := ObjToMoney(text)
- capital = capital / 10000
- if capital != 0 {
- save[v] = capital
- }
- } else if v == "use_flag" {
- save[v] = util.IntAll(data[v])
- } else {
- save[v] = data[v]
- }
- }
- // mysql create_time/update_time
- save["create_time_msql"] = data["create_time"]
- save["update_time_msql"] = data["update_time"]
- save["_id"] = data["company_id"]
- save["createtime"] = time.Now().Unix()
- save["updatetime"] = time.Now().Unix()
- // company_area/company_city/company_district
- pshort := util.ObjToString(data["province_short"])
- save["company_area"] = province_map[pshort]
- //company_city,company_district
- for i, field := range AreaFiled {
- if data[field] == nil {
- continue
- }
- if code := fmt.Sprint(data[field]); code != "" {
- if i == 0 && len(code) >= 8 { //credit_no企业信用代码
- code = code[2:8]
- } else if i == 1 && len(code) >= 6 { //company_code注册号
- code = code[:6]
- }
- if city := AddressMap[code]; city != nil { //未作废中取
- if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
- if city.City != "" {
- save["company_city"] = city.City //市
- }
- if city.District != "" {
- save["company_district"] = city.District //县
- }
- break
- }
- } else { //作废中取
- if city := AddressOldMap[code]; city != nil {
- if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) {
- if city.City != "" {
- save["company_city"] = city.City //市
- }
- if city.District != "" {
- save["company_district"] = city.District //县
- }
- }
- break
- }
- }
- }
- }
- // search_type
- if t := util.ObjToString(save["company_type"]); t != "" {
- if t != "个体工商户" && t != "其他" {
- t1 := util.ObjToString(save["company_type_old"])
- name := util.ObjToString(save["company_name"])
- if strings.Contains(t1, "有限合伙") {
- save["search_type"] = "有限合伙"
- } else if strings.Contains(t1, "合伙") {
- save["search_type"] = "普通合伙"
- } else if strings.Contains(name, "股份") ||
- (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
- save["search_type"] = "股份有限公司"
- } else {
- save["search_type"] = "有限责任公司"
- }
- }
- }
- // company_shortname
- if m := getStName(util.ObjToString(save["company_name"])); m != "" {
- save["company_shortname"] = m
- }
- // bid_unittype
- flag := false
- for _, v := range WordsArr {
- if strings.Contains(util.ObjToString(save["business_scope"]), v) {
- flag = true
- break
- }
- }
- if flag {
- if save["bid_unittype"] != nil {
- save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商")
- } else {
- save["bid_unittype"] = []string{"厂商"}
- }
- }
- update["$set"] = save
- updataInfo := []map[string]interface{}{
- {"_id": save["_id"]},
- update,
- }
- saveArr = append(saveArr, updataInfo)
- //500 条处理一次,打印一次记录
- if len(saveArr) >= 500 {
- tmps := saveArr
- res := MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- if !res {
- log.Info("dealCompanyBase", zap.Any("UpSertBulk company_base err", res))
- }
- saveArr = [][]map[string]interface{}{}
- }
- }
- //dealCompanyEmployee company_employee
- func dealCompanyEmployee(data map[string]interface{}) {
- save := make(map[string]interface{})
- save["_id"] = data["company_id"]
- save["updatetime"] = time.Now().Unix()
- oldTmp := &map[string]interface{}{}
- fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
- oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
- var names []string
- var arr []map[string]interface{}
- if (*oldTmp)["employees"] != nil {
- arr = util.ObjArrToMapArr((*oldTmp)["employees"].([]interface{}))
- } else {
- arr = make([]map[string]interface{}, 0)
- }
- ep := make(map[string]interface{})
- if util.ObjToString(data["_operation_type"]) == "insert" {
- ep["employee_name"] = data["employee_name"]
- ep["position"] = data["position"]
- ep["is_history"] = data["is_history"]
- ep["_id"] = util.IntAll(data["id"])
- arr = append(arr, ep)
- names = append(names, util.ObjToString(data["employee_name"]))
- } else {
- eq_flag := true
- for _, m := range arr {
- if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
- eq_flag = false
- m["employee_name"] = data["employee_name"]
- m["position"] = data["position"]
- m["is_history"] = data["is_history"]
- break
- }
- }
- if eq_flag {
- ep := make(map[string]interface{})
- ep["employee_name"] = data["employee_name"]
- ep["position"] = data["position"]
- ep["is_history"] = data["is_history"]
- ep["_id"] = util.IntAll(data["id"])
- arr = append(arr, ep)
- names = append(names, util.ObjToString(data["employee_name"]))
- }
- }
- save["employees"] = arr
- save["employee_name"] = strings.Join(names, ",")
- saveInfo := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$set": save},
- }
- saveArr = append(saveArr, saveInfo)
- //500 条处理一次,打印一次记录
- if len(saveArr) >= 500 {
- tmps := saveArr
- MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- saveArr = [][]map[string]interface{}{}
- }
- }
- //dealCompanyPartner
- func dealCompanyPartner(data map[string]interface{}) {
- save := make(map[string]interface{})
- save["_id"] = data["company_id"]
- save["updatetime"] = time.Now().Unix()
- oldTmp := &map[string]interface{}{}
- fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
- oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
- var names []string
- var arr []map[string]interface{}
- if (*oldTmp)["partners"] != nil {
- arr = util.ObjArrToMapArr((*oldTmp)["partners"].([]interface{}))
- } else {
- arr = make([]map[string]interface{}, 0)
- }
- if util.ObjToString(data["_operation_type"]) == "insert" {
- exp := make(map[string]interface{})
- exp["stock_capital"] = data["stock_capital"]
- exp["stock_name"] = data["stock_name"]
- exp["identify_no"] = data["identify_no"]
- exp["stock_realcapital"] = data["stock_realcapital"]
- exp["is_history"] = data["is_history"]
- exp["is_personal"] = data["is_personal"]
- exp["stock_type"] = data["stock_type"]
- exp["identify_type"] = data["identify_type"]
- exp["_id"] = util.IntAll(data["id"])
- arr = append(arr, exp)
- names = append(names, util.ObjToString(data["stock_name"]))
- } else {
- eqFlag := true
- for _, m := range arr {
- if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
- eqFlag = false
- m["stock_capital"] = data["stock_capital"]
- m["stock_name"] = data["stock_name"]
- m["identify_no"] = data["identify_no"]
- m["stock_realcapital"] = data["stock_realcapital"]
- m["is_history"] = data["is_history"]
- m["is_personal"] = data["is_personal"]
- m["stock_type"] = data["stock_type"]
- m["identify_type"] = data["identify_type"]
- break
- }
- }
- if eqFlag {
- exp := make(map[string]interface{})
- exp["stock_capital"] = data["stock_capital"]
- exp["stock_name"] = data["stock_name"]
- exp["identify_no"] = data["identify_no"]
- exp["stock_realcapital"] = data["stock_realcapital"]
- exp["is_history"] = data["is_history"]
- exp["is_personal"] = data["is_personal"]
- exp["stock_type"] = data["stock_type"]
- exp["identify_type"] = data["identify_type"]
- exp["_id"] = util.IntAll(data["id"])
- arr = append(arr, exp)
- names = append(names, util.ObjToString(data["stock_name"]))
- }
- }
- save["partners"] = arr
- save["stock_name"] = strings.Join(names, ",")
- saveInfo := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$set": save},
- }
- saveArr = append(saveArr, saveInfo)
- //500 条处理一次,打印一次记录
- if len(saveArr) >= 500 {
- tmps := saveArr
- MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- saveArr = [][]map[string]interface{}{}
- }
- }
- //dealAnnualReportBase annual_report_base
- func dealAnnualReportBase(data map[string]interface{}) {
- save := make(map[string]interface{})
- save["_id"] = data["company_id"]
- save["updatetime"] = time.Now().Unix()
- oldTmp := &map[string]interface{}{}
- fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1}
- oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields)
- var arr []map[string]interface{}
- if (*oldTmp)["annual_reports"] != nil {
- arr = util.ObjArrToMapArr((*oldTmp)["annual_reports"].([]interface{}))
- } else {
- arr = make([]map[string]interface{}, 0)
- }
- year := 0
- phone, email := "", ""
- employeeNum := 0
- if util.ObjToString(data["_operation_type"]) == "insert" {
- exp := make(map[string]interface{})
- exp["operator_name"] = data["operator_name"]
- exp["report_year"] = data["report_year"]
- exp["zip_code"] = data["zip_code"]
- exp["employee_no"] = data["employee_no"]
- exp["member_no"] = data["member_no"]
- exp["company_phone"] = data["company_phone"]
- exp["company_email"] = data["company_email"]
- exp["_id"] = util.IntAll(data["id"])
- arr = append(arr, exp)
- if year < util.IntAll(data["report_year"]) {
- year = util.IntAll(data["report_year"])
- phone = util.ObjToString(data["company_phone"])
- email = util.ObjToString(data["company_email"])
- employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
- memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
- if employeeNo > 0 {
- employeeNum = employeeNo
- } else if memberNo > 0 {
- employeeNum = memberNo
- }
- }
- } else {
- eqFlag := true
- for _, m := range arr {
- if util.IntAll(data["id"]) == util.IntAll(m["_id"]) {
- eqFlag = false
- m["operator_name"] = data["operator_name"]
- m["report_year"] = data["report_year"]
- m["zip_code"] = data["zip_code"]
- m["employee_no"] = data["employee_no"]
- m["member_no"] = data["member_no"]
- m["company_phone"] = data["company_phone"]
- m["company_email"] = data["company_email"]
- break
- }
- }
- if eqFlag {
- exp := make(map[string]interface{})
- exp["operator_name"] = data["operator_name"]
- exp["report_year"] = data["report_year"]
- exp["zip_code"] = data["zip_code"]
- exp["employee_no"] = data["employee_no"]
- exp["member_no"] = data["member_no"]
- exp["company_phone"] = data["company_phone"]
- exp["company_email"] = data["company_email"]
- exp["_id"] = util.IntAll(data["id"])
- arr = append(arr, exp)
- if year < util.IntAll(data["report_year"]) {
- year = util.IntAll(data["report_year"])
- phone = util.ObjToString(data["company_phone"])
- email = util.ObjToString(data["company_email"])
- employeeNo := DealMemberNo(util.ObjToString(data["employee_no"]))
- memberNo := DealMemberNo(util.ObjToString(data["member_no"]))
- if employeeNo > 0 {
- employeeNum = employeeNo
- } else if memberNo > 0 {
- employeeNum = memberNo
- }
- }
- }
- }
- save["annual_reports"] = arr
- if year != 0 {
- save["company_phone"] = phone
- save["company_email"] = email
- save["employee_num"] = employeeNum
- }
- saveInfo := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$set": save},
- }
- saveArr = append(saveArr, saveInfo)
- //500 条处理一次,打印一次记录
- if len(saveArr) >= 500 {
- tmps := saveArr
- MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- saveArr = [][]map[string]interface{}{}
- }
- }
- //dealHistoryName company_history_name
- func dealHistoryName(data map[string]interface{}) {
- save := make(map[string]interface{})
- save["_id"] = data["company_id"]
- save["updatetime"] = time.Now().Unix()
- var names []string
- name := data["history_name"].(string)
- names = append(names, name)
- save["history_name"] = strings.Join(names, ",")
- saveInfo := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$set": save},
- }
- saveArr = append(saveArr, saveInfo)
- addSet := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$addToSet": map[string]interface{}{
- "history_names": map[string]interface{}{
- "$each": names}}},
- }
- //单独对每条的历史名称追加数组
- MongoTool.UpSertBulk(GF.Env.Dbsave, addSet)
- //500 条处理一次,打印一次记录
- if len(saveArr) >= 500 {
- tmps := saveArr
- MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- saveArr = [][]map[string]interface{}{}
- }
- }
- //dealAnnualReportWebsite annual_report_website
- func dealAnnualReportWebsite(data map[string]interface{}) {
- save := make(map[string]interface{})
- save["_id"] = data["company_id"]
- save["updatetime"] = time.Now().Unix()
- year := 0
- web := ""
- if year < util.IntAll(data["report_year"]) && util.IntAll(data["is_history"]) == 0 {
- year = util.IntAll(data["report_year"])
- web = util.ObjToString(data["website_url"])
- }
- if year != 0 {
- save["website_url"] = web
- }
- saveInfo := []map[string]interface{}{
- {"_id": data["company_id"]},
- {"$set": save},
- }
- saveArr = append(saveArr, saveInfo)
- //500 条处理一次,打印一次记录
- if len(saveArr) >= 500 {
- tmps := saveArr
- MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
- saveArr = [][]map[string]interface{}{}
- }
- }
|