package main import ( "fmt" "github.com/spf13/viper" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "strconv" "strings" "sync" "time" ) var ( partner = []string{"identify_no", "stock_type", "stock_name", "identify_type", "stock_capital", "stock_realcapital"} employee = []string{"employee_name", "position"} TypeMap = map[string]string{ "采购单位": "1", "中标单位": "2", // 投标企业 "代理机构": "3", "厂商": "4", } TypeMap1 = map[string]string{ "固定电话": "1", "手机号": "2", "邮箱": "3", "不存在": "4", } ) // StdAdd 增量数据 func StdAdd(q interface{}) { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) pool := make(chan bool, 18) wg := &sync.WaitGroup{} //q := bson.M{"_id": "affe29f8d061f3faa4170cafba41f316"} //q := bson.M{"updatetime": bson.M{"$gt": Updatetime}} log.Info("StdAdd", zap.Any("q", q)) it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%20000 == 0 { //log.Println("current:", count) log.Info("StdAdd", zap.Int("current:", count)) log.Info("StdAdd", zap.Any("q", q), zap.Any("updatetime", tmp["updatetime"])) } if util.IntAll(tmp["use_flag"]) > 5 { continue } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() esMap := map[string]interface{}{} //生索引字段处理 for _, field := range EsFields { if tmp[field] == nil { continue } if field == "company_name" { esMap[field] = tmp["company_name"] esMap["name"] = tmp["company_name"] } else if field == "history_name" { var nameArr []string names := util.ObjToString(tmp["history_name"]) if strings.Contains(names, ",") { nameArr = append(nameArr, strings.Split(names, ",")...) } if len(nameArr) > 0 { esMap["history_name"] = nameArr } } else if field == "establish_date" { // 成立日期修改成时间戳 location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local) if err != nil { util.Debug(err) } else { esMap["establish_date"] = location.Unix() } } else if field == "cancel_date" { esMap["cancel_date"] = tmp["cancel_date"] location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["cancel_date"]), time.Local) if err != nil { util.Debug(err) } else { esMap["cancel_date_unix"] = location.Unix() } } else if field == "lastupdatetime" { esMap["lastupdatetime"] = tmp["update_time_msql"] } else if field == "bid_projectname" { if pname, ok := tmp["bid_projectname"].([]interface{}); ok { p1 := util.ObjArrToStringArr(pname) esMap["bid_projectname"] = strings.Join(p1, ",") } } else if field == "bid_purchasing" { if pur, ok := tmp["bid_purchasing"].([]interface{}); ok { p1 := util.ObjArrToStringArr(pur) esMap["bid_purchasing"] = strings.Join(p1, ",") } } else if field == "bid_area" { if areas, ok := tmp["bid_area"].([]interface{}); ok { p1 := util.ObjArrToStringArr(areas) esMap["bid_area"] = strings.Join(p1, ",") } } else if field == "partners" { if ps, ok := tmp["partners"].([]interface{}); ok { var parr []map[string]interface{} for _, v := range ps { p := make(map[string]interface{}) v1 := v.(map[string]interface{}) for _, field := range partner { if v1[field] == nil { continue } if field == "stock_capital" || field == "stock_realcapital" { if v, err := strconv.ParseFloat(util.ObjToString(v1[field]), 64); err == nil { v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位 p[field] = v } } else { p[field] = v1[field] } } if len(p) > 0 { parr = append(parr, p) } } if len(parr) > 0 { esMap[field] = parr } } } else if field == "employees" { if ps, ok := tmp["employees"].([]interface{}); ok { var parr []map[string]interface{} for _, v := range ps { p := make(map[string]interface{}) v1 := v.(map[string]interface{}) for _, field := range employee { if v1[field] == nil { continue } else { p[field] = v1[field] } } if len(p) > 0 { parr = append(parr, p) } } if len(parr) > 0 { esMap[field] = parr } } } else if field == "bid_unittype" { if t2, ok := tmp["bid_unittype"].([]interface{}); ok { var arr []string for _, v := range util.ObjArrToStringArr(t2) { arr = append(arr, TypeMap[v]) } esMap["bid_unittype"] = strings.Join(arr, ",") } } else if field == "bid_contracttype" { if t2, ok := tmp["bid_contracttype"].([]interface{}); ok { var arr []string for _, v := range util.ObjArrToStringArr(t2) { arr = append(arr, TypeMap1[v]) } esMap["bid_contracttype"] = strings.Join(arr, ",") } } else if field == "_id" { esMap["_id"] = tmp["_id"] esMap["id"] = tmp["_id"] } else { esMap[field] = tmp[field] } } company_type := util.ObjToString(tmp["company_type"]) company_name := util.ObjToString(tmp["company_name"]) if company_type == "个体工商户" { if len([]rune(company_name)) >= 5 { esMap["company_type_int"] = 31 } else { esMap["company_type_int"] = 32 } } else if company_type == "其他" || company_type == "" { if len([]rune(company_name)) >= 4 { esMap["company_type_int"] = 21 } else { esMap["company_type_int"] = 22 } } else { if company_type == "内资分公司" || company_type == "内资非法人企业、非公司私营企业及内资非公司企业分支机构" { esMap["company_type_int"] = 12 } else if len([]rune(company_name)) >= 4 { esMap["company_type_int"] = 11 } else { esMap["company_type_int"] = 13 } } //国企、央企、其他 own_type := getCompanyType(company_name, company_type) if own_type != "" { esMap["ownership_type"] = own_type } // 添加企业的国标行业分类 companyID := util.ObjToString(tmp["_id"]) updateMgo := make(map[string]interface{}, 0) if own_type != "" { updateMgo["ownership_type"] = own_type } where := map[string]interface{}{ "company_id": companyID, } industry, _ := Mgo181.FindOne("company_industry", where) if industry != nil && len(*industry) > 0 { if util.ObjToString((*industry)["industry_l1_name"]) != "" { esMap["national_top"] = (*industry)["industry_l1_name"] updateMgo["national_top"] = (*industry)["industry_l1_name"] } if util.ObjToString((*industry)["industry_l2_name"]) != "" { esMap["national_sub"] = (*industry)["industry_l2_name"] updateMgo["national_sub"] = (*industry)["industry_l2_name"] } if util.ObjToString((*industry)["industry_l3_name"]) != "" { esMap["national_subsub"] = (*industry)["industry_l3_name"] updateMgo["national_subsub"] = (*industry)["industry_l3_name"] } } if len(updateMgo) > 0 { updatePool <- []map[string]interface{}{ {"_id": companyID}, {"$set": updateMgo}, } } EsSaveCache <- esMap // 保存es }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info("StdAdd", zap.Any("q", q), zap.Int("Run Over...Count::", count)) } // StdAll 分段处理存量数据 func StdAll() { type Biddingall struct { Coll string Gtime int64 Ltime int64 } type RoutinesConf struct { Num int64 } type AllConf struct { All map[string]Biddingall Routines RoutinesConf } var all AllConf viper.SetConfigFile("stdall.toml") viper.SetConfigName("stdall") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") err := viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 fmt.Println("ReadInConfig err =>", err) return } err = viper.Unmarshal(&all) if err != nil { fmt.Println("biddingAllDataTask Unmarshal err =>", err) return } //fmt.Println("all", all) for k, conf := range all.All { go dealAll(conf.Coll, k, conf.Gtime, conf.Ltime, all.Routines.Num) } } func dealAll(coll, kword string, gtime, ltime, routines int64) { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "updatetime": map[string]interface{}{ "$gt": gtime, "$lte": ltime, }, } log.Info("dealAll", zap.Any("q", q)) pool := make(chan bool, routines) wg := &sync.WaitGroup{} it := sess.DB(Dbname).C(coll).Find(q).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%20000 == 0 { log.Info(kword, zap.Int("current:", count)) log.Info(kword, zap.Any("updatetime =>", tmp["updatetime"])) } pool <- true if util.IntAll(tmp["use_flag"]) > 5 { continue } wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() esMap := map[string]interface{}{} //生索引字段处理 for _, field := range EsFields { if tmp[field] == nil { continue } if field == "company_name" { esMap[field] = tmp["company_name"] esMap["name"] = tmp["company_name"] } else if field == "history_name" { var nameArr []string for _, v := range strings.Split(util.ObjToString(tmp["history_name"]), ",") { if v != "" { nameArr = append(nameArr, v) } } if len(nameArr) > 0 { esMap["history_name"] = nameArr } } else if field == "establish_date" { // 成立日期修改成时间戳 location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local) if err != nil { util.Debug(err) } else { esMap["establish_date"] = location.Unix() } } else if field == "lastupdatetime" { esMap["lastupdatetime"] = tmp["update_time_msql"] } else if field == "cancel_date" { esMap["cancel_date"] = tmp["cancel_date"] location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["cancel_date"]), time.Local) if err != nil { util.Debug(err) } else { esMap["cancel_date_unix"] = location.Unix() } } else if field == "bid_projectname" { if pname, ok := tmp["bid_projectname"].([]interface{}); ok { p1 := util.ObjArrToStringArr(pname) esMap["bid_projectname"] = strings.Join(p1, ",") } } else if field == "bid_purchasing" { if pur, ok := tmp["bid_purchasing"].([]interface{}); ok { p1 := util.ObjArrToStringArr(pur) esMap["bid_purchasing"] = strings.Join(p1, ",") } } else if field == "bid_area" { if areas, ok := tmp["bid_area"].([]interface{}); ok { p1 := util.ObjArrToStringArr(areas) esMap["bid_area"] = strings.Join(p1, ",") } } else if field == "partners" { if ps, ok := tmp["partners"].([]interface{}); ok { var parr []map[string]interface{} for _, v := range ps { p := make(map[string]interface{}) v1 := v.(map[string]interface{}) for _, field := range partner { if v1[field] == nil { continue } if field == "stock_capital" || field == "stock_realcapital" { text := util.ObjToString(v1[field]) if strings.Contains(text, "万元") { text = strings.Replace(text, "万元", "", -1) } if v, err := strconv.ParseFloat(text, 64); err == nil { v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位 p[field] = v } } else { p[field] = v1[field] } } if len(p) > 0 { parr = append(parr, p) } } if len(parr) > 0 { esMap[field] = parr } } } else if field == "employees" { if ps, ok := tmp["employees"].([]interface{}); ok { var parr []map[string]interface{} for _, v := range ps { p := make(map[string]interface{}) v1 := v.(map[string]interface{}) for _, field := range employee { if v1[field] == nil { continue } else { p[field] = v1[field] } } if len(p) > 0 { parr = append(parr, p) } } if len(parr) > 0 { esMap[field] = parr } } } else if field == "bid_unittype" { if t2, ok := tmp["bid_unittype"].([]interface{}); ok { var arr []string for _, v := range util.ObjArrToStringArr(t2) { arr = append(arr, TypeMap[v]) } esMap["bid_unittype"] = strings.Join(arr, ",") } } else if field == "bid_contracttype" { if t2, ok := tmp["bid_contracttype"].([]interface{}); ok { var arr []string for _, v := range util.ObjArrToStringArr(t2) { arr = append(arr, TypeMap1[v]) } esMap["bid_contracttype"] = strings.Join(arr, ",") } } else if field == "_id" { esMap["_id"] = tmp["_id"] esMap["id"] = tmp["_id"] } else { esMap[field] = tmp[field] } } company_type := util.ObjToString(tmp["company_type"]) company_name := util.ObjToString(tmp["company_name"]) if company_type == "个体工商户" { if len([]rune(company_name)) >= 5 { esMap["company_type_int"] = 31 } else { esMap["company_type_int"] = 32 } } else if company_type == "其他" || company_type == "" { if len([]rune(company_name)) >= 4 { esMap["company_type_int"] = 21 } else { esMap["company_type_int"] = 22 } if strings.HasSuffix(company_name, "公司") || strings.HasSuffix(company_name, "集团") { esMap["company_type_int"] = 12 } } else { if company_type == "内资分公司" || company_type == "内资非法人企业、非公司私营企业及内资非公司企业分支机构" { esMap["company_type_int"] = 12 } else if len([]rune(company_name)) >= 4 { esMap["company_type_int"] = 11 } else { esMap["company_type_int"] = 13 } } EsSaveCache <- esMap // 保存es }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(kword, zap.Any("Run Over...Count", count)) } // SaveEs 过滤后数据存库 func SaveEs() { arru := make([]map[string]interface{}, 100) indexu := 0 for { select { case v := <-EsSaveCache: arru[indexu] = v indexu++ if indexu == 100 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(Index, arru) // 存在第二个集群 if Es2 != nil && Es2.S_esurl != "" { Es2.BulkSave(Index, arru) } }(arru) arru = make([]map[string]interface{}, 100) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(Index, arru) // 存在第二个集群 if Es2 != nil && Es2.S_esurl != "" { Es2.BulkSave(Index, arru) } }(arru[:indexu]) arru = make([]map[string]interface{}, 100) indexu = 0 } } } }