package main import ( "fmt" "mongodb" "qfw/util" es "qfw/util/elastic" "strconv" "strings" "sync" "time" ) var ( MongoTool *mongodb.MongodbSim Es *es.Elastic EsSaveCache chan map[string]interface{} SP chan bool EsField = []string{"_id", "company_name", "history_name", "updatetime", "legal_person", "legal_person_certno", "credit_no", "org_code", "tax_code", "company_code", "area_code", "company_area", "company_city", "company_district", "currency", "capital", "company_type", "company_status", "establish_date", "issue_date", "lastupdatetime", "operation_startdate", "operation_enddate", "cancel_date", "revoke_date", "authority", "company_address", "business_scope", "partners", "employees", "stock_name", "company_phone", "company_email", "website_url", "search_type", "employee_name", "company_type_old"} TypeMap = map[string]string{ "采购单位": "1", "投标企业": "2", "代理机构": "3", "厂商": "4", } TypeMap1 = map[string]string{ "固定电话": "1", "手机号": "2", "邮箱": "3", "不存在": "4", } ) func init() { MongoTool = &mongodb.MongodbSim{ MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082", Size: 10, DbName: "mixdata", UserName: "SJZY_RWESBid_Other", Password: "SJZY@O17t8herB3B", //MongodbAddr: "192.168.3.207:27092", //Size: 10, //DbName: "wjh", } MongoTool.InitPool() Es = &es.Elastic{ S_esurl: "http://172.17.145.170:9800", // http://172.17.145.170:9800 I_size: 10, } Es.InitElasticSize() EsSaveCache = make(chan map[string]interface{}, 1000) SP = make(chan bool, 5) } func main() { go SaveEs() sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, 1) wg := &sync.WaitGroup{} query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(nil).Skip(6110000).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%5000 == 0 { util.Debug("current ---", count) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if p1, ok := tmp["partners"].([]interface{}); ok { p2 := p1[0].(map[string]interface{}) if p2["stock_capital"] != "" { taskinfo(tmp) } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() util.Debug("over ---", count) c := make(chan bool, 1) <-c } func taskinfo(tmp map[string]interface{}) { esMap := make(map[string]interface{}) for _, v := range EsField { if tmp[v] != nil { esMap[v] = tmp[v] } } esMap["name"] = tmp["company_name"] company_type := util.ObjToString(tmp["company_type"]) company_name := util.ObjToString(tmp["company_name"]) if company_type == "个体工商户" { if len([]rune(company_name)) >= 5 { esMap["company_type_int"] = 31 } else { esMap["company_type_int"] = 32 } } else if company_type == "其他" || company_type == "" { if len([]rune(company_name)) >= 4 { esMap["company_type_int"] = 21 } else { esMap["company_type_int"] = 22 } } else { if company_type == "内资分公司" { esMap["company_type_int"] = 12 } else if len([]rune(company_name)) >= 4 { esMap["company_type_int"] = 11 } else { esMap["company_type_int"] = 13 } } if pname, ok := tmp["bid_projectname"].([]interface{}); ok { p1 := util.ObjArrToStringArr(pname) esMap["bid_projectname"] = strings.Join(p1, ",") } if pur, ok := tmp["bid_purchasing"].([]interface{}); ok { p1 := util.ObjArrToStringArr(pur) esMap["bid_purchasing"] = strings.Join(p1, ",") } if areas, ok := tmp["bid_area"].([]interface{}); ok { p1 := util.ObjArrToStringArr(areas) esMap["bid_area"] = strings.Join(p1, ",") } if t1, ok := tmp["bid_unittype"].([]interface{}); ok { var arr []string for _, v := range util.ObjArrToStringArr(t1) { arr = append(arr, TypeMap[v]) } esMap["bid_unittype"] = strings.Join(arr, ",") } if t2, ok := tmp["bid_contracttype"].([]interface{}); ok { var arr []string for _, v := range util.ObjArrToStringArr(t2) { arr = append(arr, TypeMap1[v]) } esMap["bid_contracttype"] = strings.Join(arr, ",") } if p1, ok := tmp["partners"].([]interface{}); ok { for _, v := range p1 { v1 := v.(map[string]interface{}) if text := util.ObjToString(v1["stock_capital"]); text != "" { if c1, err := strconv.ParseFloat(text, 64); err == nil { c1, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", c1), 64) //保留小数点两位 v1["stock_capital"] = c1 } else { delete(v1, "stock_capital") } } if text := util.ObjToString(v1["stock_realcapital"]); text != "" { if c2, err := strconv.ParseFloat(text, 64); err == nil { c2, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", c2), 64) //保留小数点两位 v1["stock_realcapital"] = c2 } else { delete(v1, "stock_realcapital") } } } } EsSaveCache <- esMap } func SaveEs() { util.Debug("Es Save...") arru := make([]map[string]interface{}, 500) indexu := 0 for { select { case v := <-EsSaveCache: arru[indexu] = v indexu++ if indexu == 500 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave("qyxy_v2", "qyxy", &arru, false) }(arru) arru = make([]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave("qyxy_v2", "qyxy", &arru, false) }(arru[:indexu]) arru = make([]map[string]interface{}, 500) indexu = 0 } } } }