Procházet zdrojové kódy

更新升级 es7查询企业库

zhengkun před 2 roky
rodič
revize
440762cecc

+ 1 - 1
sensitive/src/config.json

@@ -24,7 +24,7 @@
   "client_es": "http://ela.spdata.jianyu360.com",
   "es_type": "unique_qy",
   "es_index": "unique_qy",
-  "qyxy_client_es":"http://127.0.0.1:12003",
+  "qyxy_client_es":"http://127.0.0.1:13002",
   "qyxy_es_type":"qyxy",
   "qyxy_es_index":"qyxy"
 }

+ 3 - 2
sensitive/src/main.go

@@ -10,9 +10,10 @@ func init() {
 	ul.InitC()
 }
 func main() {
-	ul.TestData()
 	go ul.AddTaskSensitiveWordsData() //增量-新增
-	ul.SentiveUdp()                   //数据流程-增量
+	//ul.SentiveUdp()                   //数据流程-增量
+
+	//ul.TestData() //临时测试数据
 
 	lock := make(chan bool)
 	<-lock

+ 24 - 25
sensitive/src/util/increasedata.go

@@ -11,7 +11,7 @@ import (
 //定时增量数据处理
 func AddTaskSensitiveWordsData() {
 	log.Println("部署定时新增...")
-	runStartOnce()
+	//runStartOnce()
 	for {
 		tick := time.Tick(time.Hour * 24 * 7) //查询七天前
 		ctime := <-tick
@@ -19,91 +19,90 @@ func AddTaskSensitiveWordsData() {
 		cronData := time.Date(ctime.Year(), ctime.Month(), ctime.Day()-7, 0, 0, 0, 0, time.Local)
 		q := map[string]interface{}{
 			"updatetime": map[string]interface{}{
-				"$gte":  cronData.Unix(),
+				"$gte": cronData.Unix(),
 			},
 		}
 		sess := Qfw_Mgo.GetMgoConn()
 		defer Qfw_Mgo.DestoryMongoConn(sess)
 		iter := sess.DB(Qfw_Mgo.DbName).C(QfwCollName).Find(&q).Select(map[string]interface{}{
 			"company_name": 1,
-			"updatetime": 1,
+			"updatetime":   1,
 			"company_type": 1,
 		}).Iter()
-		total,isok:=0,0
-		for tmp := map[string]interface{}{}; iter.Next(&tmp); total++{
-			if total%10000==0 {
-				log.Println("cur index ",total,isok)
+		total, isok := 0, 0
+		for tmp := map[string]interface{}{}; iter.Next(&tmp); total++ {
+			if total%10000 == 0 {
+				log.Println("cur index ", total, isok)
 			}
 			if company_name, ok := tmp["company_name"].(string); ok {
 				if reglen.MatchString(company_name) || !unstart_strReg.MatchString(company_name) ||
 					con_strReg.MatchString(company_name) {
 					continue
 				}
-				company_type:= qu.ObjToString(tmp["company_type"])
-				if strings.Contains(company_type,"个人")||strings.Contains(company_type,"个体"){
+				company_type := qu.ObjToString(tmp["company_type"])
+				if strings.Contains(company_type, "个人") || strings.Contains(company_type, "个体") {
 					continue
 				}
 				//存mgo
-				new_id :=Qfw_Mgo.Save("unique_qyxy", map[string]interface{}{
+				new_id := Qfw_Mgo.Save("unique_qyxy", map[string]interface{}{
 					"qy_name": company_name,
 				})
-				if new_id!=nil {
+				if new_id != nil {
 					isok++
 					dealWithEsData(company_name, BsonTOStringId(new_id))
 				}
 			}
 			tmp = map[string]interface{}{}
 		}
-		log.Println("is over ",total)
+		log.Println("is over ", total)
 	}
 }
 
 //执行一次
-func runStartOnce()  {
+func runStartOnce() {
 	log.Println("立即执行一次...")
 	nowtime := time.Now()
 	cronData := time.Date(nowtime.Year(), nowtime.Month(), nowtime.Day()-7, 0, 0, 0, 0, time.Local)
 	q := map[string]interface{}{
 		"updatetime": map[string]interface{}{
-			"$gte":  cronData.Unix(),
+			"$gte": cronData.Unix(),
 		},
 	}
 	sess := Qfw_Mgo.GetMgoConn()
 	defer Qfw_Mgo.DestoryMongoConn(sess)
 	iter := sess.DB(Qfw_Mgo.DbName).C(QfwCollName).Find(&q).Select(map[string]interface{}{
 		"company_name": 1,
-		"updatetime": 1,
+		"updatetime":   1,
 		"company_type": 1,
 	}).Iter()
-	total,isok:=0,0
-	for tmp := map[string]interface{}{}; iter.Next(&tmp); total++{
-		if total%10000==0 {
-			log.Println("cur index ",total,isok)
+	total, isok := 0, 0
+	for tmp := map[string]interface{}{}; iter.Next(&tmp); total++ {
+		if total%10000 == 0 {
+			log.Println("cur index ", total, isok)
 		}
 		if company_name, ok := tmp["company_name"].(string); ok {
 			if reglen.MatchString(company_name) || !unstart_strReg.MatchString(company_name) ||
 				con_strReg.MatchString(company_name) {
 				continue
 			}
-			company_type:= qu.ObjToString(tmp["company_type"])
-			if strings.Contains(company_type,"个人")||strings.Contains(company_type,"个体"){
+			company_type := qu.ObjToString(tmp["company_type"])
+			if strings.Contains(company_type, "个人") || strings.Contains(company_type, "个体") {
 				continue
 			}
 			//存mgo
-			new_id :=Qfw_Mgo.Save("unique_qyxy", map[string]interface{}{
+			new_id := Qfw_Mgo.Save("unique_qyxy", map[string]interface{}{
 				"qy_name": company_name,
 			})
-			if new_id!=nil {
+			if new_id != nil {
 				isok++
 				dealWithEsData(company_name, BsonTOStringId(new_id))
 			}
 		}
 		tmp = map[string]interface{}{}
 	}
-	log.Println("is over ",total)
+	log.Println("is over ", total)
 }
 
-
 //处理是否新增es
 func dealWithEsData(name string, tmpid string) {
 	query := `{"query":{"bool":{"must":[{"term":{"` + Es_index + `.name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":1,"sort":[],"facets":{}}`

+ 12 - 9
sensitive/src/util/init.go

@@ -1,11 +1,12 @@
 package util
 
 import (
-	"elastic.v1"
+	es1 "elastic.v1"
 	"importcjj/sensitive"
 	"log"
 	"net/http"
 	qu "qfw/util"
+	es7 "qfw/util/elastic_v7"
 )
 
 func init() {
@@ -41,22 +42,24 @@ func InitC() {
 
 	Fields = Sysconfig["fields"].(map[string]interface{})
 
-	//初始化es http://172.17.4.84:9800
-	Client_Es, _ = elastic.NewClient(http.DefaultClient, Sysconfig["client_es"].(string))
+	//单独的es未升级
+	Client_Es, _ = es1.NewClient(http.DefaultClient, Sysconfig["client_es"].(string))
 	Es_type, Es_index = Sysconfig["es_type"].(string), Sysconfig["es_index"].(string)
 
 	//初始化Es qyxy
-	Qyxy_Client_Es, _ = elastic.NewClient(http.DefaultClient, Sysconfig["qyxy_client_es"].(string))
+	//Qyxy_Client_Es, _ = elastic.NewClient(http.DefaultClient, Sysconfig["qyxy_client_es"].(string))
+
+	es7.InitElasticSize(Sysconfig["qyxy_client_es"].(string), 10, "es_all", "TopJkO2E_d1x")
 	Qyxy_Es_type, Qyxy_Es_index = Sysconfig["qyxy_es_type"].(string), Sysconfig["qyxy_es_index"].(string)
+
 }
 
 var Sysconfig map[string]interface{}
-var Save_Mgo,Qfw_Mgo  *MongodbSim
+var Save_Mgo, Qfw_Mgo *MongodbSim
 var Fields map[string]interface{}
 var FindBuyerC, FindAgencyC, FindWinnerC string
-var SaveCollName,QfwCollName string
+var SaveCollName, QfwCollName string
 var Es_type, Es_index string
-var Client_Es *elastic.Client
+var Client_Es *es1.Client
 var Qyxy_Es_type, Qyxy_Es_index string
-var Qyxy_Client_Es *elastic.Client
-var BuyerFilter = sensitive.New()
+var BuyerFilter = sensitive.New()

+ 16 - 14
sensitive/src/util/test.go

@@ -8,37 +8,39 @@ import (
 
 //测试方法
 func TestData() {
+	termQuery("唐双发")
+
 	sess := Save_Mgo.GetMgoConn()
 	defer Save_Mgo.DestoryMongoConn(sess)
 	q := map[string]interface{}{
-		"exists":0,
+		"exists": 0,
 	}
 	iter := sess.DB(Save_Mgo.DbName).C(SaveCollName).Find(&q).Iter()
-	total,isok:=0,0
-	for tmp := map[string]interface{}{}; iter.Next(&tmp); total++{
-		if total%100==0 {
-			log.Println("cur index ",total,isok)
+	total, isok := 0, 0
+	for tmp := map[string]interface{}{}; iter.Next(&tmp); total++ {
+		if total%100 == 0 {
+			log.Println("cur index ", total, isok)
 		}
-		if total<12000 {
+		if total < 12000 {
 			tmp = map[string]interface{}{}
 			continue
 		}
-		name:=qu.ObjToString(tmp["name"])
-		rname, b, _ ,_ := dealWithNameScoreRules(name)
-		if strings.Contains(name,"医院") {
-			if !strings.Contains(rname,"医院") {
+		name := qu.ObjToString(tmp["name"])
+		rname, b, _, _ := dealWithNameScoreRules(name)
+		if strings.Contains(name, "医院") {
+			if !strings.Contains(rname, "医院") {
 				b = false
 			}
 		}
 		if b {
 			isok++
-			Save_Mgo.UpdateById(SaveCollName,BsonTOStringId(tmp["_id"]), map[string]interface{}{
-				"$set":map[string]interface{}{
-					"exists":3,
+			Save_Mgo.UpdateById(SaveCollName, BsonTOStringId(tmp["_id"]), map[string]interface{}{
+				"$set": map[string]interface{}{
+					"exists": 3,
 				},
 			})
 		}
 		tmp = map[string]interface{}{}
 	}
-	log.Println("is over ",total,isok)
+	log.Println("is over ", total, isok)
 }

+ 16 - 21
sensitive/src/util/udpdata.go

@@ -2,6 +2,7 @@
 package util
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"go.mongodb.org/mongo-driver/bson/primitive"
@@ -9,6 +10,7 @@ import (
 	mu "mfw/util"
 	"net"
 	qu "qfw/util"
+	es7 "qfw/util/elastic_v7"
 	"regexp"
 	"strings"
 	"sync"
@@ -18,6 +20,7 @@ var task chan struct{} = make(chan struct{}, 1)
 var udpclient mu.UdpClient //udp对象
 var nextNodes []map[string]interface{}
 var specReg *regexp.Regexp = regexp.MustCompile("(,|,)")
+
 //udp通知抽取
 func SentiveUdp() {
 	nextNodes = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
@@ -55,20 +58,15 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			log.Println("...计划发送udp~统计下一节点...")
 			//
 
-
-
-
-
 		}
 	case mu.OP_NOOP: //下个节点回应
 		log.Println(string(data))
 	}
 }
 
-
 //处理方法
 func QuerySensitiveWords(sid, eid string) {
-	log.Println("开始处理敏感词匹配:", sid,"~", eid)
+	log.Println("开始处理敏感词匹配:", sid, "~", eid)
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
 			"$gt":  StringTOBsonId(sid),
@@ -112,11 +110,11 @@ func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) {
 	//	}
 	//}
 
-	if agency !="" {
+	if agency != "" {
 		if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "termQuery" {
 			tmp["agency"] = fname
-			up["log"] =map[string]interface{}{
-				"agency":fmt.Sprintf("%s_%s", flog, agency),
+			up["log"] = map[string]interface{}{
+				"agency": fmt.Sprintf("%s_%s", flog, agency),
 			}
 			up["agency"] = fname
 		}
@@ -124,8 +122,8 @@ func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) {
 	if winner != "" && !specReg.MatchString(winner) {
 		if fok, flog, fname := cheakname(winner); fok && flog != "" && flog != "termQuery" {
 			tmp["winner"] = fname
-			up["log"] =map[string]interface{}{
-				"winner":fmt.Sprintf("%s_%s", flog, winner),
+			up["log"] = map[string]interface{}{
+				"winner": fmt.Sprintf("%s_%s", flog, winner),
 			}
 			up["winner"] = fname
 		}
@@ -134,8 +132,8 @@ func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) {
 	if s_winner != "" && !specReg.MatchString(s_winner) {
 		if fok, flog, fname := cheakname(s_winner); fok && flog != "" && flog != "termQuery" {
 			tmp["s_winner"] = fname
-			up["log"] =map[string]interface{}{
-				"s_winner":fmt.Sprintf("%s_%s", flog, s_winner),
+			up["log"] = map[string]interface{}{
+				"s_winner": fmt.Sprintf("%s_%s", flog, s_winner),
 			}
 			up["s_winner"] = fname
 		}
@@ -155,7 +153,7 @@ func cheakname(name string) (up bool, log, rname string) {
 		return true, cheaklog, name
 	}
 
-	rname, isok, _ ,_ := dealWithNameScoreRules(name)
+	rname, isok, _, _ := dealWithNameScoreRules(name)
 	//if len(datas) > 0 {
 	//	for _, v := range datas {
 	//		if qu.Float64All(v["score"]) < 1.0 {
@@ -182,11 +180,12 @@ func cheakname(name string) (up bool, log, rname string) {
 	return false, "", name
 }
 func termQuery(name string) bool {
-
 	query := `{"query":{"bool":{"must":[{"term":{"company_name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":1,"sort":[],"facets":{}}`
 	tmp := make(map[string]interface{})
 	json.Unmarshal([]byte(query), &tmp)
-	searchResult, err := Qyxy_Client_Es.Search().Index(Qyxy_Es_index).Type(Qyxy_Es_type).Source(tmp).Do()
+	Qyxy_Client_Es := es7.GetEsConn()
+	defer es7.DestoryEsConn(Qyxy_Client_Es)
+	searchResult, err := Qyxy_Client_Es.Search().Index(Qyxy_Es_index).Type(Qyxy_Es_type).Source(tmp).Do(context.TODO())
 	if err != nil {
 		log.Println("从ES查询出错", err.Error(), name)
 		return false
@@ -194,7 +193,7 @@ func termQuery(name string) bool {
 		data := make(map[string]interface{}, 1)
 		if searchResult.Hits != nil {
 			for _, hit := range searchResult.Hits.Hits {
-				json.Unmarshal(*hit.Source, &data)
+				json.Unmarshal(hit.Source, &data)
 				if data["name"].(string) == name {
 					return true
 				}
@@ -246,7 +245,3 @@ func handleData(datas []string) string {
 	rstr := strings.Join(rdata, ",")
 	return rstr
 }
-
-
-
-

+ 13 - 5
src/check_city.go

@@ -8,6 +8,7 @@ import (
 )
 
 var cityEndReg *regexp.Regexp = regexp.MustCompile("(区|县|市)$")
+var ErrBuyerReg *regexp.Regexp = regexp.MustCompile("^(成都东部新区)")
 
 func getCheckDataCity(tmp map[string]interface{}, update_check *map[string]interface{}) {
 
@@ -16,6 +17,13 @@ func getCheckDataCity(tmp map[string]interface{}, update_check *map[string]inter
 	district := qu.ObjToString(tmp["district"])
 	buyer := qu.ObjToString(tmp["buyer"])
 
+	if buyer != "" && ErrBuyerReg.MatchString(buyer) && area == "浙江" {
+		(*update_check)["area"] = "四川"
+		(*update_check)["city"] = "成都市"
+		(*update_check)["district"] = ""
+		return
+	}
+
 	if (district != "" && city != "" && area != "" && area != "全国") || buyer == "" {
 		//标准城市-校验
 		rdata := standardCheckCity(area, city, district)
@@ -144,12 +152,12 @@ func standardCheckCity(area string, city string, district string) map[string]str
 	}
 	//第一步:区校验
 	if district != "" {
-		districtArr := DistrictDict[district]
+		districtArr := S_DistrictDict[district]
 		if districtArr == nil { //涉及了 个别别名相关的数据
 			trim_arr := aliasDataDistrict(district) //拆分后缀
 			if len(trim_arr) > 0 {
 				for _, alias_district := range trim_arr {
-					alias_districtArr := DistrictDict[alias_district]
+					alias_districtArr := S_DistrictDict[alias_district]
 					for _, v := range alias_districtArr {
 						if city == v.C_Name && area == v.P_Name {
 							rdata["district"] = alias_district
@@ -184,10 +192,10 @@ func standardCheckCity(area string, city string, district string) map[string]str
 
 	//第二步:区校验-失败   市-校验
 	if city != "" {
-		cityArr := CityDict[city]
+		cityArr := S_CityDict[city]
 		if cityArr == nil {
 			//把市当成区,匹配三级   - 存在优化空间- city:郑州  别名
-			districtArr := DistrictDict[city]
+			districtArr := S_DistrictDict[city]
 			for _, v := range districtArr {
 				if city == v.C_Name && area == v.P_Name {
 					rdata["area"] = districtArr[0].P_Name
@@ -221,7 +229,7 @@ func standardCheckCity(area string, city string, district string) map[string]str
 	}
 
 	//第三步:省份校验
-	if ProvinceDict[area] == nil {
+	if S_ProvinceDict[area] == nil {
 		rdata["area"] = "全国"
 		rdata["city"] = ""
 		rdata["district"] = ""

+ 86 - 31
src/main.go

@@ -14,14 +14,14 @@ import (
 	"time"
 )
 
-type Province struct {
+type S_Province struct {
 	P_Name string
 }
-type City struct {
+type S_City struct {
 	P_Name string
 	C_Name string
 }
-type District struct {
+type S_District struct {
 	P_Name string
 	C_Name string
 	D_Name string
@@ -35,12 +35,12 @@ var (
 	udpclient                             mu.UdpClient             //udp对象
 	nextNode                              []map[string]interface{} //节点信息
 	coll_name, qy_coll_name, jy_coll_name string
-	check_lock                            sync.Mutex            //更新锁
-	check_thread                          int                   //线程数
-	UpdateTask                            *updateInfo           //更新池
-	ProvinceDict                          map[string][]Province //省份-map
-	CityDict                              map[string][]City     //城市-map
-	DistrictDict                          map[string][]District //区县-map
+	check_lock                            sync.Mutex              //更新锁
+	check_thread                          int                     //线程数
+	UpdateTask                            *updateInfo             //更新池
+	S_ProvinceDict                        map[string][]S_Province //省份-map
+	S_CityDict                            map[string][]S_City     //城市-map
+	S_DistrictDict                        map[string][]S_District //区县-map
 
 	//删除字段
 	unset_dict          = map[string]interface{}{"winner": 1, "s_winner": 1, "bidamount": 1, "winnerorder": 1}
@@ -51,9 +51,9 @@ var (
 //初始化城市
 func initCheckCity() {
 	//初始化-城市配置
-	ProvinceDict = make(map[string][]Province, 0)
-	CityDict = make(map[string][]City, 0)
-	DistrictDict = make(map[string][]District, 0)
+	S_ProvinceDict = make(map[string][]S_Province, 0)
+	S_CityDict = make(map[string][]S_City, 0)
+	S_DistrictDict = make(map[string][]S_District, 0)
 
 	q := map[string]interface{}{
 		"town_code": map[string]interface{}{
@@ -74,41 +74,41 @@ func initCheckCity() {
 			province := qu.ObjToString(tmp["province"])
 			city := qu.ObjToString(tmp["city"])
 			district := qu.ObjToString(tmp["district"])
-			data := District{province, city, district}
-			if DistrictDict[district] == nil {
-				DistrictDict[district] = []District{data}
+			data := S_District{province, city, district}
+			if S_DistrictDict[district] == nil {
+				S_DistrictDict[district] = []S_District{data}
 			} else {
-				arr := DistrictDict[district]
+				arr := S_DistrictDict[district]
 				arr = append(arr, data)
-				DistrictDict[district] = arr
+				S_DistrictDict[district] = arr
 			}
 		} else {
 			if city_code > 0 {
 				province := qu.ObjToString(tmp["province"])
 				city := qu.ObjToString(tmp["city"])
-				data := City{province, city}
-				if CityDict[city] == nil {
-					CityDict[city] = []City{data}
+				data := S_City{province, city}
+				if S_CityDict[city] == nil {
+					S_CityDict[city] = []S_City{data}
 				} else {
-					arr := CityDict[city]
+					arr := S_CityDict[city]
 					arr = append(arr, data)
-					CityDict[city] = arr
+					S_CityDict[city] = arr
 				}
 			} else {
 				province := qu.ObjToString(tmp["province"])
-				data := Province{province}
-				if ProvinceDict[province] == nil {
-					ProvinceDict[province] = []Province{data}
+				data := S_Province{province}
+				if S_ProvinceDict[province] == nil {
+					S_ProvinceDict[province] = []S_Province{data}
 				} else {
-					arr := ProvinceDict[province]
+					arr := S_ProvinceDict[province]
 					arr = append(arr, data)
-					ProvinceDict[province] = arr
+					S_ProvinceDict[province] = arr
 				}
 			}
 		}
 		tmp = make(map[string]interface{})
 	}
-	log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(ProvinceDict), len(CityDict), len(DistrictDict)))
+	log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(S_ProvinceDict), len(S_CityDict), len(S_DistrictDict)))
 }
 
 //mgo-配置等
@@ -182,7 +182,7 @@ func main() {
 func mainT() {
 	sid := "12982d658fa2ac55ba96517d"
 	eid := "92982d658fa2ac55ba96517e"
-	startCheckData(sid, eid)
+	testCheckData(sid, eid)
 	time.Sleep(99999 * time.Hour)
 }
 
@@ -215,10 +215,11 @@ func startCheckData(sid, eid string) {
 			}()
 			//更新-
 			update_check := make(map[string]interface{}, 0)
+
 			//审查-城市
-			getCheckDataCity(tmp, &update_check)
+			//getCheckDataCity(tmp, &update_check)
 			//审查-中标金额
-			getCheckDataBidamount(tmp, &update_check)
+			//getCheckDataBidamount(tmp, &update_check)
 			//验证是否修复发布时间 - 对比开标日期,投标截止日期
 			getCheckDataPublishtime(tmp, &update_check)
 			//清洗分类~
@@ -369,3 +370,57 @@ func getRepeatTask() {
 		}
 	}
 }
+
+//测试版
+func testCheckData(sid, eid string) {
+	defer qu.Catch()
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(sid),
+			"$lte": StringTOBsonId(eid),
+		},
+	}
+	check_pool := make(chan bool, check_thread)
+	check_wg := &sync.WaitGroup{}
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter()
+	total, isRepair := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%10000 == 0 {
+			log.Println("当前数量:", total, isRepair, tmp["_id"])
+		}
+		update_id := map[string]interface{}{"_id": tmp["_id"]}
+		check_pool <- true
+		check_wg.Add(1)
+		go func(tmp map[string]interface{}, update_id map[string]interface{}) {
+			defer func() {
+				<-check_pool
+				check_wg.Done()
+			}()
+			//更新-
+			update_check := make(map[string]interface{}, 0)
+
+			//审查-城市
+			getCheckDataCity(tmp, &update_check)
+			//最终计算是否清洗
+			update_dict := make(map[string]interface{}, 0)
+			if len(update_check) > 0 {
+				update_check["is_standard"] = 1
+				update_dict["$set"] = update_check
+			}
+			if len(update_dict) > 0 {
+				//注意事项~更新key 不能与 删除key  同时存在
+				isRepair++
+				UpdateTask.updatePool <- []map[string]interface{}{
+					update_id,
+					update_dict,
+				}
+			}
+		}(tmp, update_id)
+		tmp = make(map[string]interface{})
+	}
+	check_wg.Wait()
+
+	log.Println("data_clean is over ", total, "~", isRepair)
+}