package main import ( "encoding/json" "fmt" "log" qu "qfw/util" "regexp" "sort" "strings" "sync" "time" "github.com/cron" "go.mongodb.org/mongo-driver/bson/primitive" ) var ( //清理 Han = regexp.MustCompile("[\\p{Han}]") //匹配汉字 //es、mgo非全部字段 FieldListMap = map[string]map[string]bool{ "partners": map[string]bool{"stock_type": true, "stock_name": true, "stock_capital": false, "stock_realcapital": false, "identify_type": true, "identify_no": true}, "employees": map[string]bool{"employee_name": false, "position": false}, } //全部字段 AllFieldListMap = []string{"punishes", "operations", "illegals"} //地区处理 AreaFiled = []string{"credit_no", "company_code", "area_code"} //年报信息 AnnualReportsArr = [][]string{ []string{"report_year", "company_phone", "zip_code", "company_email", "employee_no", "operator_name"}, []string{"total_assets", "total_equity", "total_sales", "total_profit", "main_business_income", "profit_amount", "total_tax", "total_liability"}, } ) // var AllFieldListMap = map[string]string{ // "punishes": "punish_size", // "operations": "operation_size", // "illegals": "illegal_size", // } //不生索引字段 //var NotEsField = []string{"cancel_reason", "revoke_reason", "cancels"} //cancel_size type City struct { Code string `json:"code"` Province string `json:"province"` City string `json:"city"` District string `json:"district"` } //定时任务 func TimeTask() { //StartTask() c := cron.New() cronstr := "0 0 15 ? * Tue" //每周一周二15点执行 //cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次 c.AddFunc(cronstr, func() { StartTask() }) c.Start() } //开始任务 func StartTask() { log.Println("Start Task...") // query := map[string]interface{}{ // "updatetime": map[string]interface{}{ // "$gt": Updatetime, // }, // } QyxyStandard() // run := QyxyStandard() // if run { // time.Sleep(5 * time.Minute) // if Mgo.DelColl(Dbcoll) { // log.Println("Delete Coll ", Dbcoll, "Success") // } else { // log.Println("Delete Coll ", Dbcoll, "Fail") // } // } } //标准化数据,生索引 func QyxyStandard() bool { defer qu.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) pool := make(chan bool, 20) //控制线程数 wg := &sync.WaitGroup{} lock := &sync.Mutex{} //控制读写 arr := [][]map[string]interface{}{} count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count() log.Println("共查询:", count, "条") if count == 0 { return false } it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter() sum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ { if sum%100 == 0 { log.Println("current:", sum) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() update := []map[string]interface{}{} esMap := map[string]interface{}{} mgoMap := map[string]interface{}{} //id处理 idMap := tmp["_id"].(map[string]interface{}) _id := qu.ObjToString(idMap["_id"]) esMap["_id"] = _id esMap["updatetime"] = time.Now().Unix() update = append(update, map[string]interface{}{"_id": _id}) //地区处理 hadArea := false //标记是否有省份信息 for i, field := range AreaFiled { if tmp[field] == nil { continue } if code := fmt.Sprint(tmp[field]); code != "" { esMap[field] = code //加入esMap if !hadArea { if i == 0 && len(code) >= 8 { //credit_no企业信用代码 code = code[2:8] } else if i == 1 && len(code) >= 6 { //company_code注册号 code = code[:6] } if city := AddressMap[code]; city != nil { //未作废中取 if city.Province != "" { esMap["company_area"] = city.Province //省 } if city.City != "" { esMap["company_city"] = city.City //市 } if city.District != "" { esMap["company_district"] = city.District //县 } } else { //作废中取 if city := AddressOldMap[code]; city != nil { if city.Province != "" { esMap["company_area"] = city.Province //省 } if city.City != "" { esMap["company_city"] = city.City //市 } if city.District != "" { esMap["company_district"] = city.District //县 } } } if esMap["company_area"] != nil { hadArea = true } } } } //生索引字段处理 for _, field := range EsFields { if tmp[field] == nil { continue } //qu.Debug(field, tmp[field], tmp[field] == nil) if field == "capital" { //注册资本处理 text := qu.ObjToString(tmp[field]) if currency := GetCurrency(text); currency != "" { esMap["currency"] = currency //币种 } capital := ObjToMoney(text) capital = capital / 10000 if capital != 0 { esMap[field] = capital //注册资本 } } else if field == "company_type" { //企业类型处理 text := qu.ObjToString(tmp[field]) if text != "" { esMap["company_type_old"] = text //old if strings.Contains(text, "个体") { esMap[field] = "个体工商户" } else { text = strings.ReplaceAll(text, "(", "(") text = strings.ReplaceAll(text, ")", ")") if stype := QyStypeMap[text]; stype != "" { esMap[field] = stype } else { esMap[field] = "其他" } } } } else if field == "company_status" { //企业类型处理 text := qu.ObjToString(tmp[field]) if text != "" { text = strings.ReplaceAll(text, "(", "(") text = strings.ReplaceAll(text, ")", ")") if status := CompanyStatusMap[text]; status != "" { esMap[field] = status } else { esMap[field] = "其他" } } } else if strings.Contains(field, "date") || strings.Contains(field, "time") { //时间处理 if tmp[field] != nil { if timeTmp, ok := tmp[field].(primitive.DateTime); ok { t := timeTmp.Time() esMap[field] = qu.FormatDate(&t, qu.Date_Short_Layout) } else if timeTmp, ok := tmp[field].(string); ok && timeTmp != "" { t := timeReg.FindString(timeTmp) if t != "" { esMap[field] = t } } } } else { if text := qu.ObjToString(tmp[field]); text != "" { if field == "company_name" { esMap["name"] = text } esMap[field] = text } } } //不生索引字段处理 for _, field := range Fields { if text, ok := tmp[field].(string); ok && text != "" { mgoMap[field] = text } else if text, ok := tmp[field].(int32); ok { mgoMap[field] = text } } //list数据 stockName := []string{} for field, fieldMap := range FieldListMap { if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 { if len(list) > 500 { list = list[:500] } tmpArrMgo := []map[string]interface{}{} tmpArrEs := []map[string]interface{}{} for _, l := range list { tmpMapMgo := map[string]interface{}{} tmpMapEs := map[string]interface{}{} m := l.(map[string]interface{}) for f, b := range fieldMap { if text := qu.ObjToString(m[f]); text != "" { tmpMapMgo[f] = text if f == "stock_name" { stockName = append(stockName, text) } if b { tmpMapEs[f] = text } } } if len(tmpMapEs) > 0 { tmpArrEs = append(tmpArrEs, tmpMapEs) } if len(tmpMapMgo) > 0 { tmpArrMgo = append(tmpArrMgo, tmpMapMgo) } } if len(tmpArrEs) > 0 { esMap[field] = tmpArrEs } if len(tmpArrMgo) > 0 { mgoMap[field] = tmpArrMgo } } } if len(stockName) > 0 { esMap["stock_name"] = strings.Join(stockName, ",") } for _, field := range AllFieldListMap { if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 { tmpArrMgo := []map[string]interface{}{} for _, l := range list { tmpMapMgo := map[string]interface{}{} m := l.(map[string]interface{}) for k, v := range m { if tmpv := fmt.Sprint(v); v != nil && tmpv != "" { tmpMapMgo[k] = tmpv } } if len(tmpMapMgo) > 0 { tmpArrMgo = append(tmpArrMgo, tmpMapMgo) } } if len(tmpArrMgo) > 0 { mgoMap[field] = tmpArrMgo } } } //年报信息 sortArr := []string{} //存年份 sortMap := map[string]map[string]interface{}{} //key:年份;val:每一个年报中的company_phone,company_email,stock_name tmpArrMgo := []map[string]interface{}{} if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 { for _, annual_report := range annual_reports { tmpMapMgo := map[string]interface{}{} //记录每个年报信息标准化到mgo的数据 tmpMap := map[string]interface{}{} //只记录每个年报信息的company_email和company_phone report_year := "" m := annual_report.(map[string]interface{}) for i, tmpArr := range AnnualReportsArr { for _, f := range tmpArr { if text := m[f]; text != nil { if textstr := fmt.Sprint(text); textstr != "" { if f == "report_year" { report_year = textstr sortArr = append(sortArr, textstr) } else if f == "company_phone" && !Han.MatchString(textstr) && len(textstr) >= 7 { tmpMap[f] = textstr tmpMapMgo[f] = textstr } else if f == "company_email" && !Han.MatchString(textstr) && len(textstr) >= 4 { tmpMap[f] = textstr tmpMapMgo[f] = textstr } if i == 0 { //字符串信息 if f == "company_phone" || f == "company_email" { continue } tmpMapMgo[f] = textstr } else if i == 1 { //转金额 money := ObjToMoney(textstr) / 10000 tmpMapMgo[f] = money } } } } } // stock_nameArr := []string{} // if i_partners, ok := m["report_partners"].([]interface{}); ok && len(i_partners) > 0 { //股东信息 // for _, par := range i_partners { // m := par.(map[string]interface{}) // if stock_name, ok := m["stock_name"].(string); ok && stock_name != "" { // stock_nameArr = append(stock_nameArr, stock_name) // } // } // } // if len(stock_nameArr) > 0 { // stockname := strings.Join(stock_nameArr, ",") // tmpMap["stock_name"] = stockname // } sortMap[report_year] = tmpMap if len(tmpMapMgo) > 0 { tmpArrMgo = append(tmpArrMgo, tmpMapMgo) } } } if len(tmpArrMgo) > 0 { mgoMap["annual_reports"] = tmpArrMgo } if len(sortArr) > 0 && len(sortMap) > 0 { sort.Strings(sortArr) report_year := sortArr[len(sortArr)-1] for k, v := range sortMap[report_year] { esMap[k] = v } } //合并 for k, v := range esMap { if k == "partners" { continue } mgoMap[k] = v } //es数据过滤 EsSaveFlag := true company_name := qu.ObjToString(esMap["company_name"]) if len([]rune(company_name)) < 8 { EsSaveFlag = false } if EsSaveFlag { company_type := qu.ObjToString(esMap["company_type"]) if company_type == "" || company_type == "个体工商户" { EsSaveFlag = false } } if EsSaveFlag { credit_no := strings.TrimSpace(qu.ObjToString(esMap["credit_no"])) company_code := strings.TrimSpace(qu.ObjToString(esMap["company_code"])) if credit_no == "" && company_code == "" { EsSaveFlag = false } } //qu.Debug("esMap---", esMap) // qu.Debug("mgoMap---", mgoMap) // return lock.Lock() if EsSaveFlag { EsSaveCache <- esMap //过滤后数据保存 } EsSaveAllCache <- esMap //所有数据保存 update = append(update, map[string]interface{}{"$set": mgoMap}) if len(update) == 2 { arr = append(arr, update) } if len(arr) > 500 { tmps := arr Mgo.UpSertBulk(Savecoll, tmps...) arr = [][]map[string]interface{}{} } lock.Unlock() }(tmp) tmp = make(map[string]interface{}) } wg.Wait() lock.Lock() if len(arr) > 0 { Mgo.UpSertBulk(Savecoll, arr...) } lock.Unlock() log.Println("Run Over...Count:", sum) return true } //所有企业数据标准化 func HistoryQyxyStandard() bool { qu.Debug("--------History--------") defer qu.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) pool := make(chan bool, 20) //控制线程数 wg := &sync.WaitGroup{} lock := &sync.Mutex{} //控制读写 // arr := [][]map[string]interface{}{} // count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count() // log.Println("共查询:", count, "条") // if count == 0 { // return false // } it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter() sum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ { if sum%10000 == 0 { log.Println("current:", sum) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() update := []map[string]interface{}{} esMap := map[string]interface{}{} mgoMap := map[string]interface{}{} //id处理 idMap := tmp["_id"].(map[string]interface{}) _id := qu.ObjToString(idMap["_id"]) esMap["_id"] = _id esMap["updatetime"] = time.Now().Unix() update = append(update, map[string]interface{}{"_id": _id}) //地区处理 hadArea := false //标记是否有省份信息 for i, field := range AreaFiled { if tmp[field] == nil { continue } if code := fmt.Sprint(tmp[field]); code != "" { esMap[field] = code //加入esMap if !hadArea { if i == 0 && len(code) >= 8 { //credit_no企业信用代码 code = code[2:8] } else if i == 1 && len(code) >= 6 { //company_code注册号 code = code[:6] } if city := AddressMap[code]; city != nil { //未作废中取 if city.Province != "" { esMap["company_area"] = city.Province //省 } if city.City != "" { esMap["company_city"] = city.City //市 } if city.District != "" { esMap["company_district"] = city.District //县 } } else { //作废中取 if city := AddressOldMap[code]; city != nil { if city.Province != "" { esMap["company_area"] = city.Province //省 } if city.City != "" { esMap["company_city"] = city.City //市 } if city.District != "" { esMap["company_district"] = city.District //县 } } } if esMap["company_area"] != nil { hadArea = true } } } } //生索引字段处理 for _, field := range EsFields { if tmp[field] == nil { continue } //qu.Debug(field, tmp[field], tmp[field] == nil) if field == "capital" { //注册资本处理 text := qu.ObjToString(tmp[field]) if currency := GetCurrency(text); currency != "" { esMap["currency"] = currency //币种 } capital := ObjToMoney(text) capital = capital / 10000 if capital != 0 { esMap[field] = capital //注册资本 } } else if field == "company_type" { //企业类型处理 text := qu.ObjToString(tmp[field]) if text != "" { esMap["company_type_old"] = text //old if strings.Contains(text, "个体") { esMap[field] = "个体工商户" } else { text = strings.ReplaceAll(text, "(", "(") text = strings.ReplaceAll(text, ")", ")") if stype := QyStypeMap[text]; stype != "" { esMap[field] = stype } else { esMap[field] = "其他" } } } } else if field == "company_status" { //企业类型处理 text := qu.ObjToString(tmp[field]) if text != "" { text = strings.ReplaceAll(text, "(", "(") text = strings.ReplaceAll(text, ")", ")") if status := CompanyStatusMap[text]; status != "" { esMap[field] = status } else { esMap[field] = "其他" } } } else if strings.Contains(field, "date") || strings.Contains(field, "time") { //时间处理 if tmp[field] != nil { if timeTmp, ok := tmp[field].(primitive.DateTime); ok { t := timeTmp.Time() esMap[field] = qu.FormatDate(&t, qu.Date_Short_Layout) } else if timeTmp, ok := tmp[field].(string); ok && timeTmp != "" { t := timeReg.FindString(timeTmp) if t != "" { esMap[field] = t } } } } else { if text := qu.ObjToString(tmp[field]); text != "" { if field == "company_name" { esMap["name"] = text } esMap[field] = text } } } //不生索引字段处理 for _, field := range Fields { if text, ok := tmp[field].(string); ok && text != "" { mgoMap[field] = text } else if text, ok := tmp[field].(int32); ok { mgoMap[field] = text } } //list数据 stockName := []string{} for field, fieldMap := range FieldListMap { if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 { if len(list) > 500 { list = list[:500] } tmpArrMgo := []map[string]interface{}{} tmpArrEs := []map[string]interface{}{} for _, l := range list { tmpMapMgo := map[string]interface{}{} tmpMapEs := map[string]interface{}{} m := l.(map[string]interface{}) for f, b := range fieldMap { if text := qu.ObjToString(m[f]); text != "" { tmpMapMgo[f] = text if f == "stock_name" { stockName = append(stockName, text) } if b { tmpMapEs[f] = text } } } if len(tmpMapEs) > 0 { tmpArrEs = append(tmpArrEs, tmpMapEs) } if len(tmpMapMgo) > 0 { tmpArrMgo = append(tmpArrMgo, tmpMapMgo) } } if len(tmpArrEs) > 0 { esMap[field] = tmpArrEs } if len(tmpArrMgo) > 0 { mgoMap[field] = tmpArrMgo } } } if len(stockName) > 0 { esMap["stock_name"] = strings.Join(stockName, ",") } for _, field := range AllFieldListMap { if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 { tmpArrMgo := []map[string]interface{}{} for _, l := range list { tmpMapMgo := map[string]interface{}{} m := l.(map[string]interface{}) for k, v := range m { if tmpv := fmt.Sprint(v); v != nil && tmpv != "" { tmpMapMgo[k] = tmpv } } if len(tmpMapMgo) > 0 { tmpArrMgo = append(tmpArrMgo, tmpMapMgo) } } if len(tmpArrMgo) > 0 { mgoMap[field] = tmpArrMgo } } } //年报信息 sortArr := []string{} //存年份 sortMap := map[string]map[string]interface{}{} //key:年份;val:每一个年报中的company_phone,company_email,stock_name tmpArrMgo := []map[string]interface{}{} if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 { for _, annual_report := range annual_reports { tmpMapMgo := map[string]interface{}{} tmpMap := map[string]interface{}{} report_year := "" m := annual_report.(map[string]interface{}) for i, tmpArr := range AnnualReportsArr { for _, f := range tmpArr { if text := m[f]; text != nil { if textstr := fmt.Sprint(text); textstr != "" { if f == "report_year" { report_year = textstr sortArr = append(sortArr, textstr) } else if f == "company_phone" && !Han.MatchString(textstr) && len(textstr) >= 7 { tmpMap[f] = textstr tmpMapMgo[f] = textstr } else if f == "company_email" && !Han.MatchString(textstr) && len(textstr) >= 4 { tmpMap[f] = textstr tmpMapMgo[f] = textstr } if i == 0 { //字符串信息 if f == "company_phone" || f == "company_email" { continue } tmpMapMgo[f] = textstr } else if i == 1 { //转金额 money := ObjToMoney(textstr) / 10000 tmpMapMgo[f] = money } } } } } // stock_nameArr := []string{} // if i_partners, ok := m["report_partners"].([]interface{}); ok && len(i_partners) > 0 { //股东信息 // for _, par := range i_partners { // m := par.(map[string]interface{}) // if stock_name, ok := m["stock_name"].(string); ok && stock_name != "" { // stock_nameArr = append(stock_nameArr, stock_name) // } // } // } // if len(stock_nameArr) > 0 { // stockname := strings.Join(stock_nameArr, ",") // tmpMap["stock_name"] = stockname // } sortMap[report_year] = tmpMap if len(tmpMapMgo) > 0 { tmpArrMgo = append(tmpArrMgo, tmpMapMgo) } } } if len(tmpArrMgo) > 0 { mgoMap["annual_reports"] = tmpArrMgo } if len(sortArr) > 0 && len(sortMap) > 0 { sort.Strings(sortArr) report_year := sortArr[len(sortArr)-1] for k, v := range sortMap[report_year] { esMap[k] = v } } lock.Lock() EsSaveCache <- esMap lock.Unlock() }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("Run Over...Count:", sum) return true } //过滤后数据存库 func SaveEs() { log.Println("Es Save...") arru := make([]map[string]interface{}, 500) indexu := 0 for { select { case v := <-EsSaveCache: arru[indexu] = v indexu++ if indexu == 500 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(Index, Itype, &arru, true) }(arru) arru = make([]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(Index, Itype, &arru, true) }(arru[:indexu]) arru = make([]map[string]interface{}, 500) indexu = 0 } } } } //所有数据存库 func SaveAllEs() { log.Println("Es SaveAll...") arruAll := make([]map[string]interface{}, 500) indexu := 0 for { select { case v := <-EsSaveAllCache: arruAll[indexu] = v indexu++ if indexu == 500 { SPAll <- true go func(arruAll []map[string]interface{}) { defer func() { <-SPAll }() Es.BulkSave(OtherIndex, OtherItype, &arruAll, true) }(arruAll) arruAll = make([]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SPAll <- true go func(arruAll []map[string]interface{}) { defer func() { <-SPAll }() Es.BulkSave(OtherIndex, OtherItype, &arruAll, true) }(arruAll[:indexu]) arruAll = make([]map[string]interface{}, 500) indexu = 0 } } } } func InitAddress() { defer qu.Catch() log.Println("Init Address...") AddressMap = map[string]*City{} AddressOldMap = map[string]*City{} address, _ := Mgo.Find("address", nil, nil, nil, false, -1, -1) for _, tmp := range *address { code := qu.ObjToString(tmp["code"]) remark := fmt.Sprint(tmp["Remarks"]) city := &City{} tmpjson, err := json.Marshal(tmp) if err == nil { json.Unmarshal(tmpjson, city) } if remark == "已作废" { AddressOldMap[code] = city } else { AddressMap[code] = city } } } func InitQyStype() { defer qu.Catch() log.Println("Init QyStype...") QyStypeMap = map[string]string{} qystype, _ := Mgo.Find("qystype", nil, nil, nil, false, -1, -1) for _, tmp := range *qystype { name := qu.ObjToString(tmp["name"]) prename := qu.ObjToString(tmp["prename"]) QyStypeMap[name] = prename } } func InitCompanyStatus() { defer qu.Catch() log.Println("Init CompanyStatus...") CompanyStatusMap = map[string]string{} status, _ := Mgo.Find("company_status", nil, nil, nil, false, -1, -1) for _, tmp := range *status { old_status := qu.ObjToString(tmp["old"]) new_status := qu.ObjToString(tmp["new"]) CompanyStatusMap[old_status] = new_status } }