package main import ( "bufio" "compress/gzip" "encoding/json" "fmt" "go.uber.org/zap" "io" "io/ioutil" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "os" "strings" "sync" "time" ) var ( //CurrentColl string //当前表名 //SkipCollName string //sendMsg string //collCount int // 当前表数据量 //insertCount int // insert数据 //updateCount int // update数据 //saveLog = make(map[string]interface{}) savelog sync.Map //saveArr [][]map[string]interface{} UdpClient udp.UdpClient qyxyEsAddr *net.UDPAddr localPort string // 本地监听端口 startTime int64 //std 程序开始执行的时间 readPath string // ) func main() { localPort = GF.Env.Localport //udp 本地监听地址 UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024} readPath = GF.Env.Path //手动指定同步的文件夹名称,否则就使用udp 传递数据 qyxyEsAddr = &net.UDPAddr{ Port: GF.Env.Esport, IP: net.ParseIP(GF.Env.Targetip), } log.Info("main", zap.Any("qyxyEsAddr", qyxyEsAddr)) startTime = time.Now().Unix() UdpClient.Listen(processUdpMsg) log.Info("main", zap.String("Udp服务监听本地端口", localPort)) ch := make(chan bool, 1) <-ch } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo)) if err != nil { log.Info("processUdpMsg", zap.Any("Unmarshal err", err)) UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) //拿到同步信号,开始同步数据 if _, ok := mapInfo["start"]; ok { if _, okk := mapInfo["path"]; okk { path := util.ObjToString(mapInfo["path"]) //没有指定配置文件的指定目录,就使用udp 传递目录 if path != "" { readPath = path } } // 开始执行 log.Info("processUdpMsg", zap.String("readPath", readPath)) if readPath != "" { go dealPath(readPath) } } } default: log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "========")) } } func dealPath(path string) { if !strings.HasSuffix(path, "/") { path = path + "/" ///Users/wangchengcheng/Desktop/jianyu/upload/20221119 } var wg sync.WaitGroup //std 程序只需要关注6个表 for _, c := range CollArr { subPath := path + c + "/" //判断文件夹存在 _, err := os.Stat(subPath) ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/ if err != nil { log.Info("dealPath", zap.Any("os.Stat err", subPath)) continue } wg.Add(1) go dealSubPath(c, subPath, &wg) } wg.Wait() //更新 nseo_id updateStd() //最终记录 var saveRes = make(map[string]interface{}) for _, c := range CollArr { data, ok := savelog.Load(c) if ok { saveRes[c] = data } } MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveRes}) //执行完毕通知es 程序 data := map[string]interface{}{ "start": true, "start_time": startTime, "end_time": time.Now().Unix(), } log.Info("dealPath", zap.String(path, "数据同步结束")) SendUdpMsg(data, qyxyEsAddr) } //dealSubPath 处理最里面层级数据;Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224 //c 当前表; func dealSubPath(c, subPath string, wg *sync.WaitGroup) { defer wg.Done() log.Info("dealSubPath", zap.String("开始处理path:", subPath)) start := time.Now() var fileWg sync.WaitGroup var linesMap sync.Map subFiles, _ := ioutil.ReadDir(subPath) for _, s := range subFiles { //七天的文件夹名 if s.IsDir() { fileWg.Add(1) ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224 go dealinfo(c, subPath+s.Name(), &fileWg, &linesMap) } } fileWg.Wait() cCount, _ := linesMap.Load(c) duration := time.Since(start) result := map[string]interface{}{ "count": cCount, "duration": fmt.Sprintf("%v min", duration.Minutes()), //运行时长 } savelog.Store(c, result) } func dealinfo(c, path string, wg *sync.WaitGroup, linesMap *sync.Map) { defer wg.Done() count := 0 file := path + "/split.json.gz" log.Info("dealinfo", zap.Any("current date", file)) _, err := os.Stat(file) if err == nil { // 打开本地gz格式压缩包 fr, err := os.Open(file) if err != nil { panic(err) } else { println("open file success!") } // defer: 在函数退出时,执行关闭文件 defer fr.Close() // 创建gzip文件读取对象 gr, err := gzip.NewReader(fr) if err != nil { panic(err) } // defer: 在函数退出时,执行关闭gzip对象 defer gr.Close() bfRd := bufio.NewReader(gr) for { line, err := bfRd.ReadBytes('\n') if err != nil { if err == io.EOF { log.Info("dealinfo", zap.String(fmt.Sprintf("%v/split.json.gz", path), "read gzip data finish!")) fmt.Println("read gzip data finish! ") break } else { log.Error("dealinfo", zap.Any(fmt.Sprintf("%v/split.json.gz;read gzip err", path), err)) } } if len(line) > 0 { count++ hookfn(c, line) linesMap.Store(c, count) } if count%1000 == 0 { log.Info("dealinfo", zap.Any("current exc---", fmt.Sprintf("%s-%d", file, count))) } } } } //hookfn 处理拿到的每行数据 func hookfn(c string, line []byte) { tmp := make(map[string]interface{}) err := json.Unmarshal(line, &tmp) if err != nil { log.Error("hookfn", zap.Any("Unmarshal", err)) } if _, ok := tmp["company_id"]; ok { //针对数据表 不同处理 switch c { case "company_base": dealCompanyBase(tmp) case "company_employee": dealCompanyEmployee(tmp) case "company_history_name": dealHistoryName(tmp) case "company_partner": dealCompanyPartner(tmp) case "annual_report_base": dealAnnualReportBase(tmp) case "annual_report_website": dealAnnualReportWebsite(tmp) default: fmt.Println("CurrentColl =>", c) } } } //dealCompanyBase company_base数据表 func dealCompanyBase(data map[string]interface{}) { update := make(map[string]interface{}) save := make(map[string]interface{}) for _, v := range company_base { if data[v] == nil { continue } // company_type 公司类型处理 if v == "company_type" { save["company_type_old"] = data[v] if text := util.ObjToString(data["company_type"]); text != "" { if strings.Contains(text, "个体") || strings.Contains(text, "非公司") { save["company_type"] = "个体工商户" } else { text = strings.ReplaceAll(text, "(", "(") text = strings.ReplaceAll(text, ")", ")") if stype := QyStypeMap[text]; stype != "" { save["company_type"] = stype } else { save["company_type"] = "其他" } } } } else if v == "company_status" { save["company_status_old"] = data[v] if text := util.ObjToString(data["company_status"]); text != "" { text = strings.ReplaceAll(text, "(", "(") text = strings.ReplaceAll(text, ")", ")") if status := CompanyStatusMap[text]; status != "" { save["company_status"] = status } else { save["company_status"] = "其他" } } } else if v == "capital" { // capital/currency text := util.ObjToString(data[v]) if currency := GetCurrency(text); currency != "" { save["currency"] = currency //币种 } capital := ObjToMoney(text) capital = capital / 10000 if capital != 0 { save[v] = capital } } else if v == "use_flag" { save[v] = util.IntAll(data[v]) } else { save[v] = data[v] } } // mysql create_time/update_time save["create_time_msql"] = data["create_time"] save["update_time_msql"] = data["update_time"] save["_id"] = data["company_id"] save["autoid"] = util.Int64All(data["id"]) save["createtime"] = time.Now().Unix() save["updatetime"] = time.Now().Unix() // company_area/company_city/company_district pshort := util.ObjToString(data["province_short"]) save["company_area"] = province_map[pshort] //company_city,company_district for i, field := range AreaFiled { if data[field] == nil { continue } if code := fmt.Sprint(data[field]); code != "" { if i == 0 && len(code) >= 8 { //credit_no企业信用代码 code = code[2:8] } else if i == 1 && len(code) >= 6 { //company_code注册号 code = code[:6] } if city := AddressMap[code]; city != nil { //未作废中取 if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) { if city.City != "" { save["company_city"] = city.City //市 } if city.District != "" { save["company_district"] = city.District //县 } break } } else { //作废中取 if city := AddressOldMap[code]; city != nil { if city.Province != "" && city.Province == util.ObjToString(save["company_area"]) { if city.City != "" { save["company_city"] = city.City //市 } if city.District != "" { save["company_district"] = city.District //县 } } break } } } } // search_type if t := util.ObjToString(save["company_type"]); t != "" { if t != "个体工商户" && t != "其他" { t1 := util.ObjToString(save["company_type_old"]) name := util.ObjToString(save["company_name"]) if strings.Contains(t1, "有限合伙") { save["search_type"] = "有限合伙" } else if strings.Contains(t1, "合伙") { save["search_type"] = "普通合伙" } else if strings.Contains(name, "股份") || (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) { save["search_type"] = "股份有限公司" } else { save["search_type"] = "有限责任公司" } } } // company_shortname if m := getStName(util.ObjToString(save["company_name"])); m != "" { save["company_shortname"] = m } // bid_unittype flag := false for _, v := range WordsArr { if strings.Contains(util.ObjToString(save["business_scope"]), v) { flag = true break } } if flag { if save["bid_unittype"] != nil { save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商") } else { save["bid_unittype"] = []string{"厂商"} } } update["$set"] = save updataInfo := []map[string]interface{}{ {"_id": save["_id"]}, update, } MongoTool.UpSertBulk(GF.Env.Dbsave, updataInfo) //saveArr = append(saveArr, updataInfo) // ////500 条处理一次,打印一次记录 //if len(saveArr) >= 500 { // tmps := saveArr // res := MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...) // if !res { // log.Info("dealCompanyBase", zap.Any("UpSertBulk company_base err", res)) // } // saveArr = [][]map[string]interface{}{} //} } //dealCompanyEmployee company_employee func dealCompanyEmployee(data map[string]interface{}) { save := make(map[string]interface{}) save["_id"] = data["company_id"] save["updatetime"] = time.Now().Unix() oldTmp := &map[string]interface{}{} fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1} oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields) var names []string var arr []map[string]interface{} if (*oldTmp)["employees"] != nil { arr = util.ObjArrToMapArr((*oldTmp)["employees"].([]interface{})) } else { arr = make([]map[string]interface{}, 0) } ep := make(map[string]interface{}) if util.ObjToString(data["_operation_type"]) == "insert" { ep["employee_name"] = data["employee_name"] ep["position"] = data["position"] ep["is_history"] = data["is_history"] ep["_id"] = util.IntAll(data["id"]) arr = append(arr, ep) names = append(names, util.ObjToString(data["employee_name"])) } else { eq_flag := true for _, m := range arr { if util.IntAll(data["id"]) == util.IntAll(m["_id"]) { eq_flag = false m["employee_name"] = data["employee_name"] m["position"] = data["position"] m["is_history"] = data["is_history"] break } } if eq_flag { ep := make(map[string]interface{}) ep["employee_name"] = data["employee_name"] ep["position"] = data["position"] ep["is_history"] = data["is_history"] ep["_id"] = util.IntAll(data["id"]) arr = append(arr, ep) names = append(names, util.ObjToString(data["employee_name"])) } } save["employees"] = arr save["employee_name"] = strings.Join(names, ",") saveInfo := []map[string]interface{}{ {"_id": data["company_id"]}, {"$set": save}, } MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo) //saveArr = append(saveArr, saveInfo) ////500 条处理一次,打印一次记录 //if len(saveArr) >= 500 { // tmps := saveArr // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...) // saveArr = [][]map[string]interface{}{} //} } //dealCompanyPartner func dealCompanyPartner(data map[string]interface{}) { save := make(map[string]interface{}) save["_id"] = data["company_id"] save["updatetime"] = time.Now().Unix() oldTmp := &map[string]interface{}{} fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1} oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields) var names []string var arr []map[string]interface{} if (*oldTmp)["partners"] != nil { arr = util.ObjArrToMapArr((*oldTmp)["partners"].([]interface{})) } else { arr = make([]map[string]interface{}, 0) } if util.ObjToString(data["_operation_type"]) == "insert" { exp := make(map[string]interface{}) exp["stock_capital"] = data["stock_capital"] exp["stock_name"] = data["stock_name"] exp["identify_no"] = data["identify_no"] exp["stock_realcapital"] = data["stock_realcapital"] exp["is_history"] = data["is_history"] exp["is_personal"] = data["is_personal"] exp["stock_type"] = data["stock_type"] exp["identify_type"] = data["identify_type"] exp["_id"] = util.IntAll(data["id"]) arr = append(arr, exp) names = append(names, util.ObjToString(data["stock_name"])) } else { eqFlag := true for _, m := range arr { if util.IntAll(data["id"]) == util.IntAll(m["_id"]) { eqFlag = false m["stock_capital"] = data["stock_capital"] m["stock_name"] = data["stock_name"] m["identify_no"] = data["identify_no"] m["stock_realcapital"] = data["stock_realcapital"] m["is_history"] = data["is_history"] m["is_personal"] = data["is_personal"] m["stock_type"] = data["stock_type"] m["identify_type"] = data["identify_type"] break } } if eqFlag { exp := make(map[string]interface{}) exp["stock_capital"] = data["stock_capital"] exp["stock_name"] = data["stock_name"] exp["identify_no"] = data["identify_no"] exp["stock_realcapital"] = data["stock_realcapital"] exp["is_history"] = data["is_history"] exp["is_personal"] = data["is_personal"] exp["stock_type"] = data["stock_type"] exp["identify_type"] = data["identify_type"] exp["_id"] = util.IntAll(data["id"]) arr = append(arr, exp) names = append(names, util.ObjToString(data["stock_name"])) } } save["partners"] = arr save["stock_name"] = strings.Join(names, ",") saveInfo := []map[string]interface{}{ {"_id": data["company_id"]}, {"$set": save}, } MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo) //saveArr = append(saveArr, saveInfo) ////500 条处理一次,打印一次记录 //if len(saveArr) >= 500 { // tmps := saveArr // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...) // saveArr = [][]map[string]interface{}{} //} } //dealAnnualReportBase annual_report_base func dealAnnualReportBase(data map[string]interface{}) { save := make(map[string]interface{}) save["_id"] = data["company_id"] save["updatetime"] = time.Now().Unix() oldTmp := &map[string]interface{}{} fields := map[string]interface{}{"employees": 1, "partners": 1, "annual_reports": 1} oldTmp, _ = MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": data["company_id"]}, fields) var arr []map[string]interface{} if (*oldTmp)["annual_reports"] != nil { arr = util.ObjArrToMapArr((*oldTmp)["annual_reports"].([]interface{})) } else { arr = make([]map[string]interface{}, 0) } year := 0 phone, email := "", "" employeeNum := 0 if util.ObjToString(data["_operation_type"]) == "insert" { exp := make(map[string]interface{}) exp["operator_name"] = data["operator_name"] exp["report_year"] = data["report_year"] exp["zip_code"] = data["zip_code"] exp["employee_no"] = data["employee_no"] exp["member_no"] = data["member_no"] exp["company_phone"] = data["company_phone"] exp["company_email"] = data["company_email"] exp["_id"] = util.IntAll(data["id"]) arr = append(arr, exp) if year < util.IntAll(data["report_year"]) { year = util.IntAll(data["report_year"]) phone = util.ObjToString(data["company_phone"]) email = util.ObjToString(data["company_email"]) employeeNo := DealMemberNo(util.ObjToString(data["employee_no"])) memberNo := DealMemberNo(util.ObjToString(data["member_no"])) if employeeNo > 0 { employeeNum = employeeNo } else if memberNo > 0 { employeeNum = memberNo } } } else { eqFlag := true for _, m := range arr { if util.IntAll(data["id"]) == util.IntAll(m["_id"]) { eqFlag = false m["operator_name"] = data["operator_name"] m["report_year"] = data["report_year"] m["zip_code"] = data["zip_code"] m["employee_no"] = data["employee_no"] m["member_no"] = data["member_no"] m["company_phone"] = data["company_phone"] m["company_email"] = data["company_email"] break } } if eqFlag { exp := make(map[string]interface{}) exp["operator_name"] = data["operator_name"] exp["report_year"] = data["report_year"] exp["zip_code"] = data["zip_code"] exp["employee_no"] = data["employee_no"] exp["member_no"] = data["member_no"] exp["company_phone"] = data["company_phone"] exp["company_email"] = data["company_email"] exp["_id"] = util.IntAll(data["id"]) arr = append(arr, exp) if year < util.IntAll(data["report_year"]) { year = util.IntAll(data["report_year"]) phone = util.ObjToString(data["company_phone"]) email = util.ObjToString(data["company_email"]) employeeNo := DealMemberNo(util.ObjToString(data["employee_no"])) memberNo := DealMemberNo(util.ObjToString(data["member_no"])) if employeeNo > 0 { employeeNum = employeeNo } else if memberNo > 0 { employeeNum = memberNo } } } } save["annual_reports"] = arr if year != 0 { save["company_phone"] = phone save["company_email"] = email save["employee_num"] = employeeNum } saveInfo := []map[string]interface{}{ {"_id": data["company_id"]}, {"$set": save}, } MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo) // //saveArr = append(saveArr, saveInfo) ////500 条处理一次,打印一次记录 //if len(saveArr) >= 500 { // tmps := saveArr // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...) // saveArr = [][]map[string]interface{}{} //} } //dealHistoryName company_history_name func dealHistoryName(data map[string]interface{}) { save := make(map[string]interface{}) save["_id"] = data["company_id"] save["updatetime"] = time.Now().Unix() var names []string if data["history_name"] != nil { name := data["history_name"].(string) names = append(names, name) save["history_name"] = strings.Join(names, ",") } saveInfo := []map[string]interface{}{ {"_id": data["company_id"]}, {"$set": save}, } //saveArr = append(saveArr, saveInfo) addSet := []map[string]interface{}{ {"_id": data["company_id"]}, {"$addToSet": map[string]interface{}{ "history_names": map[string]interface{}{ "$each": names}}}, } //单独对每条的历史名称追加数组 MongoTool.UpSertBulk(GF.Env.Dbsave, addSet) MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo) ////500 条处理一次,打印一次记录 //if len(saveArr) >= 500 { // tmps := saveArr // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...) // saveArr = [][]map[string]interface{}{} //} } //dealAnnualReportWebsite annual_report_website func dealAnnualReportWebsite(data map[string]interface{}) { save := make(map[string]interface{}) save["_id"] = data["company_id"] save["updatetime"] = time.Now().Unix() year := 0 web := "" if year < util.IntAll(data["report_year"]) && util.IntAll(data["is_history"]) == 0 { year = util.IntAll(data["report_year"]) web = util.ObjToString(data["website_url"]) } if year != 0 { save["website_url"] = web } saveInfo := []map[string]interface{}{ {"_id": data["company_id"]}, {"$set": save}, } MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo) //saveArr = append(saveArr, saveInfo) //500 条处理一次,打印一次记录 //if len(saveArr) >= 500 { // tmps := saveArr // MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...) // saveArr = [][]map[string]interface{}{} //} }