package main import ( "mongodb" "qfw/util" "strings" "sync" "time" ) var ( MongoTool *mongodb.MongodbSim StatusMap map[string]string WordsArr []string updatePool chan []map[string]interface{} updateSp chan bool ) func init() { MongoTool = &mongodb.MongodbSim{ MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082", Size: 10, DbName: "mixdata", UserName: "SJZY_RWESBid_Other", Password: "SJZY@O17t8herB3B", } MongoTool.InitPool() updatePool = make(chan []map[string]interface{}, 2000) updateSp = make(chan bool, 4) StatusMap = map[string]string{ "正常": "存续", "其他": "存续", "未注销": "吊销", "个体转企业": "存续", } WordsArr = []string{"研发", "研制", "开发", "生产", "制造", "制作", "加工", "种植"} } func main() { go updateMethod() sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} field := map[string]interface{}{"company_status": 1, "company_type": 1, "company_phone": 1, "company_email": 1, "business_scope": 1, "employees": 1, "annual_reports": 1, "company_name": 1, "company_type_old": 1} query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(field).Sort("-updatetime").Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(&tmp); count++ { if count%5000 == 0 { util.Debug("current ---", count) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() updata := make(map[string]interface{}) // company_status text := util.ObjToString(tmp["company_status"]) if text != "" { updata["company_status"] = StatusMap[text] } // employee_name var ename []string if emp, ok := tmp["employees"].([]interface{}); ok { for _, v := range emp { v1 := v.(map[string]interface{}) if util.ObjToString(v1["employee_name"]) != "" { ename = append(ename, util.ObjToString(v1["employee_name"])) } } } if len(ename) > 0 { updata["employee_name"] = strings.Join(ename, ",") } // bid_unittype 厂商 flag := false for _, v := range WordsArr { if strings.Contains(util.ObjToString(tmp["business_scope"]), v) { flag = true break } } if flag { updata["bid_unittype"] = []string{"厂商"} } // bid_contracttype var types []string if phone := util.ObjToString(tmp["company_phone"]); phone != "" { if len(phone) == 11 { types = append(types, "手机号") } else { types = append(types, "固定电话") } } if util.ObjToString(tmp["company_email"]) != "" { types = append(types, "邮箱") } if len(types) == 0 { types = append(types, "不存在") } updata["bid_contracttype"] = types // website_url if annualReports, ok := tmp["annual_reports"].([]interface{}); ok && len(annualReports) > 0 { L: for _, v := range annualReports { v1 := v.(map[string]interface{}) if v1["report_websites"] != nil { if reportWebsites, o := tmp["report_websites"].([]interface{}); o && len(reportWebsites) > 0 { for _, m := range reportWebsites { m1 := m.(map[string]interface{}) if util.ObjToString(m1["website_url"]) != "" { updata["website_url"] = util.ObjToString(m1["website_url"]) break L } } } } } } // search_type if t := util.ObjToString(tmp["company_type"]); t != "" { if t != "个体工商户" && t != "其他" { t1 := util.ObjToString(tmp["company_type_old"]) name := util.ObjToString(tmp["company_name"]) if strings.Contains(t1, "有限合伙") { updata["search_type"] = "有限合伙" } else if strings.Contains(t1, "合伙") { updata["search_type"] = "普通合伙" } else if strings.Contains(name, "股份") || (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) { updata["search_type"] = "股份有限公司" } else { updata["search_type"] = "有限责任公司" } } } save := []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": updata}, } updatePool <- save }(tmp) tmp = make(map[string]interface{}) } wg.Wait() util.Debug("over ---", count) c := make(chan bool, 1) <-c } func updateMethod() { arru := make([][]map[string]interface{}, 1000) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 1000 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk("qyxy_std", arru...) }(arru) arru = make([][]map[string]interface{}, 1000) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk("qyxy_std", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 1000) indexu = 0 } } } }