package main import ( "fmt" "github.com/cron" "go.mongodb.org/mongo-driver/bson" "log" "qfw/util" "strconv" "strings" "sync" "time" ) var ( partner = []string{"identify_no", "stock_type", "stock_name", "identify_type", "stock_capital", "stock_realcapital"} employee = []string{"employee_name", "position"} TypeMap = map[string]string{ "采购单位": "1", "投标企业": "2", "代理机构": "3", "厂商": "4", } TypeMap1 = map[string]string{ "固定电话": "1", "手机号": "2", "邮箱": "3", "不存在": "4", } ) //定时任务 func TimeTask() { c := cron.New() cronstr := "0 0 15 ? * Tue" //每周二15点执行 //cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次 err := c.AddFunc(cronstr, func() { StdAdd() }) if err != nil { util.Debug(err) return } c.Start() } // StdAdd 增量数据 func StdAdd() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) pool := make(chan bool, 5) wg := &sync.WaitGroup{} //q := bson.M{"_id": "affe29f8d061f3faa4170cafba41f316"} q := bson.M{"updatetime": bson.M{"$gt": Updatetime}} util.Debug(q) it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%5000 == 0 { log.Println("current:", count) } if util.IntAll(tmp["use_flag"]) > 5 { continue } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() esMap := map[string]interface{}{} //生索引字段处理 for _, field := range EsFields { if tmp[field] == nil { continue } if field == "company_name" { esMap[field] = tmp["company_name"] esMap["name"] = tmp["company_name"] } else if field == "history_name" { var nameArr []string names := util.ObjToString(tmp["history_name"]) if strings.Contains(names, ",") { nameArr = append(nameArr, strings.Split(names, ",")...) } if len(nameArr) > 0 { esMap["history_name"] = nameArr } } else if field == "establish_date" { // 成立日期修改成时间戳 location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local) if err != nil { util.Debug(err) } else { esMap["establish_date"] = location.Unix() } } else if field == "lastupdatetime" { esMap["lastupdatetime"] = tmp["update_time_msql"] } else if field == "bid_projectname" { if pname, ok := tmp["bid_projectname"].([]interface{}); ok { p1 := util.ObjArrToStringArr(pname) esMap["bid_projectname"] = strings.Join(p1, ",") } } else if field == "bid_purchasing" { if pur, ok := tmp["bid_purchasing"].([]interface{}); ok { p1 := util.ObjArrToStringArr(pur) esMap["bid_purchasing"] = strings.Join(p1, ",") } } else if field == "bid_area" { if areas, ok := tmp["bid_area"].([]interface{}); ok { p1 := util.ObjArrToStringArr(areas) esMap["bid_area"] = strings.Join(p1, ",") } } else if field == "partners" { if ps, ok := tmp["partners"].([]interface{}); ok { var parr []map[string]interface{} for _, v := range ps { p := make(map[string]interface{}) v1 := v.(map[string]interface{}) for _, field := range partner { if v1[field] == nil { continue } if field == "stock_capital" || field == "stock_realcapital" { if v, err := strconv.ParseFloat(util.ObjToString(v1[field]), 64); err == nil { v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位 p[field] = v } } else { p[field] = v1[field] } } if len(p) > 0 { parr = append(parr, p) } } if len(parr) > 0 { esMap[field] = parr } } } else if field == "employees" { if ps, ok := tmp["employees"].([]interface{}); ok { var parr []map[string]interface{} for _, v := range ps { p := make(map[string]interface{}) v1 := v.(map[string]interface{}) for _, field := range employee { if v1[field] == nil { continue } else { p[field] = v1[field] } } if len(p) > 0 { parr = append(parr, p) } } if len(parr) > 0 { esMap[field] = parr } } } else if field == "bid_unittype" { if t2, ok := tmp["bid_unittype"].([]interface{}); ok { var arr []string for _, v := range util.ObjArrToStringArr(t2) { arr = append(arr, TypeMap[v]) } esMap["bid_unittype"] = strings.Join(arr, ",") } } else if field == "bid_contracttype" { if t2, ok := tmp["bid_contracttype"].([]interface{}); ok { var arr []string for _, v := range util.ObjArrToStringArr(t2) { arr = append(arr, TypeMap1[v]) } esMap["bid_contracttype"] = strings.Join(arr, ",") } } else { esMap[field] = tmp[field] } } company_type := util.ObjToString(tmp["company_type"]) company_name := util.ObjToString(tmp["company_name"]) if company_type == "个体工商户" { if len([]rune(company_name)) >= 5 { esMap["company_type_int"] = 31 } else { esMap["company_type_int"] = 32 } } else if company_type == "其他" || company_type == "" { if len([]rune(company_name)) >= 4 { esMap["company_type_int"] = 21 } else { esMap["company_type_int"] = 22 } } else { if company_type == "内资分公司" { esMap["company_type_int"] = 12 } else if len([]rune(company_name)) >= 4 { esMap["company_type_int"] = 11 } else { esMap["company_type_int"] = 13 } } EsSaveCache <- esMap // 保存es }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("Run Over...Count:", count) } // StdAll 存量数据生es func StdAll() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) pool := make(chan bool, 10) wg := &sync.WaitGroup{} //q := bson.M{"_id": "f9ad04e5529023e8af0b2ad8b49bf227"} it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%20000 == 0 { log.Println("current:", count, tmp["_id"]) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() esMap := map[string]interface{}{} //生索引字段处理 for _, field := range EsFields { if tmp[field] == nil { continue } if field == "company_name" { esMap[field] = tmp["company_name"] esMap["name"] = tmp["company_name"] } else if field == "history_name" { var nameArr []string for _, v := range strings.Split(util.ObjToString(tmp["history_name"]), ";") { if v != "" { nameArr = append(nameArr, v) } } if len(nameArr) > 0 { esMap["history_name"] = nameArr } } else if field == "establish_date" { // 成立日期修改成时间戳 location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local) if err != nil { util.Debug(err) } else { esMap["establish_date"] = location.Unix() } } else if field == "lastupdatetime" { esMap["lastupdatetime"] = tmp["update_time_msql"] } else if field == "bid_projectname" { if pname, ok := tmp["bid_projectname"].([]interface{}); ok { p1 := util.ObjArrToStringArr(pname) esMap["bid_projectname"] = strings.Join(p1, ",") } } else if field == "bid_purchasing" { if pur, ok := tmp["bid_purchasing"].([]interface{}); ok { p1 := util.ObjArrToStringArr(pur) esMap["bid_purchasing"] = strings.Join(p1, ",") } } else if field == "bid_area" { if areas, ok := tmp["bid_area"].([]interface{}); ok { p1 := util.ObjArrToStringArr(areas) esMap["bid_area"] = strings.Join(p1, ",") } } else if field == "partners" { if ps, ok := tmp["partners"].([]interface{}); ok { var parr []map[string]interface{} for _, v := range ps { p := make(map[string]interface{}) v1 := v.(map[string]interface{}) for _, field := range partner { if v1[field] == nil { continue } if field == "stock_capital" || field == "stock_realcapital" { text := util.ObjToString(v1[field]) if strings.Contains(text, "万元") { text = strings.Replace(text, "万元", "", -1) } if v, err := strconv.ParseFloat(text, 64); err == nil { v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位 p[field] = v } } else { p[field] = v1[field] } } if len(p) > 0 { parr = append(parr, p) } } if len(parr) > 0 { esMap[field] = parr } } } else if field == "employees" { if ps, ok := tmp["employees"].([]interface{}); ok { var parr []map[string]interface{} for _, v := range ps { p := make(map[string]interface{}) v1 := v.(map[string]interface{}) for _, field := range employee { if v1[field] == nil { continue } else { p[field] = v1[field] } } if len(p) > 0 { parr = append(parr, p) } } if len(parr) > 0 { esMap[field] = parr } } } else if field == "bid_unittype" { if t2, ok := tmp["bid_unittype"].([]interface{}); ok { var arr []string for _, v := range util.ObjArrToStringArr(t2) { arr = append(arr, TypeMap[v]) } esMap["bid_unittype"] = strings.Join(arr, ",") } } else if field == "bid_contracttype" { if t2, ok := tmp["bid_contracttype"].([]interface{}); ok { var arr []string for _, v := range util.ObjArrToStringArr(t2) { arr = append(arr, TypeMap1[v]) } esMap["bid_contracttype"] = strings.Join(arr, ",") } } else { esMap[field] = tmp[field] } } company_type := util.ObjToString(tmp["company_type"]) company_name := util.ObjToString(tmp["company_name"]) if company_type == "个体工商户" { if len([]rune(company_name)) >= 5 { esMap["company_type_int"] = 31 } else { esMap["company_type_int"] = 32 } } else if company_type == "其他" || company_type == "" { if len([]rune(company_name)) >= 4 { esMap["company_type_int"] = 21 } else { esMap["company_type_int"] = 22 } } else { if company_type == "内资分公司" { esMap["company_type_int"] = 12 } else if len([]rune(company_name)) >= 4 { esMap["company_type_int"] = 11 } else { esMap["company_type_int"] = 13 } } EsSaveCache <- esMap // 保存es }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("Run Over...Count:", count) } // SaveEs 过滤后数据存库 func SaveEs() { log.Println("Es Save...") arru := make([]map[string]interface{}, 100) indexu := 0 for { select { case v := <-EsSaveCache: arru[indexu] = v indexu++ if indexu == 100 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(Index, Itype, &arru, true) }(arru) arru = make([]map[string]interface{}, 100) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(Index, Itype, &arru, true) }(arru[:indexu]) arru = make([]map[string]interface{}, 100) indexu = 0 } } } }