Procházet zdrojové kódy

增量替换数据~

zhengkun před 3 roky
rodič
revize
2a0a62a156
9 změnil soubory, kde provedl 535 přidání a 929 odebrání
  1. 2 0
      src/config.json
  2. 133 99
      src/dataMethod.go
  3. 0 416
      src/dataMethodMerge.go
  4. 113 135
      src/datamap.go
  5. 2 2
      src/fullDataRepeat.go
  6. 0 111
      src/fullMgoRepeat.go
  7. 101 55
      src/increaseRepeat.go
  8. 66 106
      src/main.go
  9. 118 5
      src/mgo.go

+ 2 - 0
src/config.json

@@ -7,6 +7,7 @@
         "db": "zhengkun",
         "extract": "repeat_test",
         "extract_back": "repeat_test",
+        "extract_log": "result_replace_log",
         "site": {
             "dbname": "zhengkun",
             "coll": "site"
@@ -16,6 +17,7 @@
         "task_addrName": "127.0.0.1:27017",
         "task_dbName": "zhengkun",
         "task_collName": "repeat_test",
+        "task_bidding": "bidding",
         "pool": 10
     },
     "jkmail": {

+ 133 - 99
src/dataMethod.go

@@ -7,62 +7,62 @@ import (
 	"strings"
 )
 
-
 //完善判重数据检测-前置条件
 func convertArabicNumeralsAndLetters(data string) string {
-	newData :=data
-	res1, _ := regexp.Compile("[a-zA-Z]+");
+	newData := data
+	res1, _ := regexp.Compile("[a-zA-Z]+")
 	if res1.MatchString(data) {
-		newData = res1.ReplaceAllStringFunc(data, strings.ToUpper);
+		newData = res1.ReplaceAllStringFunc(data, strings.ToUpper)
 	}
-	res2, _ := regexp.Compile("[0-9]+");
+	res2, _ := regexp.Compile("[0-9]+")
 	if res2.MatchString(newData) {
-		arr1:=[]string {"0","1","2","3","4","5","6","7","8","9"}
-		arr2:=[]string {"零","一","二","三","四","五","六","七","八","九"}
-		for i:=0 ;i<len(arr1) ;i++  {
-			resTemp ,_:=regexp.Compile(arr1[i])
-			newData= resTemp.ReplaceAllString(newData, arr2[i]);
+		arr1 := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}
+		arr2 := []string{"零", "一", "二", "三", "四", "五", "六", "七", "八", "九"}
+		for i := 0; i < len(arr1); i++ {
+			resTemp, _ := regexp.Compile(arr1[i])
+			newData = resTemp.ReplaceAllString(newData, arr2[i])
 		}
 	}
 	return newData
 }
 
-func dealWithSpecialPhrases(str1 string,str2 string) (string,string) {
-	newStr1:=str1
-	newStr2:=str2
-	res, _ := regexp.Compile("重新招标");
+func dealWithSpecialPhrases(str1 string, str2 string) (string, string) {
+	newStr1 := str1
+	newStr2 := str2
+	res, _ := regexp.Compile("重新招标")
 	if res.MatchString(newStr1) {
-		newStr1 = res.ReplaceAllString(newStr1,"重招");
+		newStr1 = res.ReplaceAllString(newStr1, "重招")
 	}
 	if res.MatchString(newStr2) {
-		newStr2 = res.ReplaceAllString(newStr2,"重招");
+		newStr2 = res.ReplaceAllString(newStr2, "重招")
 	}
-	return newStr1,newStr2
+	return newStr1, newStr2
 }
+
 //关键词数量v
-func dealWithSpecialWordNumber(info*Info,v*Info) int {
-	okNum:=0
-	if  info.titleSpecialWord || info.specialWord {
+func dealWithSpecialWordNumber(info *Info, v *Info) int {
+	okNum := 0
+	if info.titleSpecialWord || info.specialWord {
 		okNum++
 	}
-	if  v.titleSpecialWord || v.specialWord {
+	if v.titleSpecialWord || v.specialWord {
 		okNum++
 	}
 	return okNum
 }
 
 //关键词再次判断
-func againRepeat(v *Info, info *Info ,site bool) bool {
-	if isPublishtimeInterval(info.publishtime,v.publishtime) && site {
+func againRepeat(v *Info, info *Info, site bool) bool {
+	if isPublishtimeInterval(info.publishtime, v.publishtime) && site {
 		return true
 	}
-	if isBidopentimeInterval(info.bidopentime,v.bidopentime) {
+	if isBidopentimeInterval(info.bidopentime, v.bidopentime) {
 		return true
 	}
 	if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
 		return true
 	}
-	if isBidWinningAmount(v.bidamount,info.bidamount) && v.bidamount != 0 && info.bidamount != 0{
+	if isBidWinningAmount(v.bidamount, info.bidamount) && v.bidamount != 0 && info.bidamount != 0 {
 		return true
 	}
 	if deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) && v.winner != "" && info.winner != "" {
@@ -74,12 +74,12 @@ func againRepeat(v *Info, info *Info ,site bool) bool {
 	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
 		return true
 	}
-	if v.title != info.title && v.title != "" && info.title != ""{
-		if v.projectname != info.projectname && v.projectname != "" && info.projectname != ""{
+	if v.title != info.title && v.title != "" && info.title != "" {
+		if v.projectname != info.projectname && v.projectname != "" && info.projectname != "" {
 			return true
 		}
 	}
-	if v.projectname != info.projectname && v.projectname != "" && info.projectname != ""{
+	if v.projectname != info.projectname && v.projectname != "" && info.projectname != "" {
 		return true
 	}
 
@@ -87,15 +87,15 @@ func againRepeat(v *Info, info *Info ,site bool) bool {
 }
 
 //均含有关键词再次判断
-func againContainSpecialWord (v *Info, info *Info) bool {
+func againContainSpecialWord(v *Info, info *Info) bool {
 
-	if isBidopentimeInterval(info.bidopentime,v.bidopentime) {
+	if isBidopentimeInterval(info.bidopentime, v.bidopentime) {
 		return true
 	}
 	if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
 		return true
 	}
-	if isBidWinningAmount(v.bidamount,info.bidamount) && v.bidamount != 0 && info.bidamount != 0{
+	if isBidWinningAmount(v.bidamount, info.bidamount) && v.bidamount != 0 && info.bidamount != 0 {
 		return true
 	}
 	if deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) && v.winner != "" && info.winner != "" {
@@ -108,7 +108,7 @@ func againContainSpecialWord (v *Info, info *Info) bool {
 		return true
 	}
 	//提取标题-标段号处理
-	if dealTitleSpecial(v.title,info.title) {
+	if dealTitleSpecial(v.title, info.title) {
 		return true
 	}
 
@@ -116,64 +116,63 @@ func againContainSpecialWord (v *Info, info *Info) bool {
 }
 
 //提取标题-标段号处理
-func dealTitleSpecial(title1 string,title2 string) bool{
+func dealTitleSpecial(title1 string, title2 string) bool {
 
 	regular1 := "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789]+[))]?"
 	regular2 := "[0-9a-zA-Z一二三四五六七八九十零123456789]+(包|标段|标包)"
-	regx1_1,_ := regexp.Compile(regular1)
-	str1:=regx1_1.FindString(title1)
-	if str1!="" {
+	regx1_1, _ := regexp.Compile(regular1)
+	str1 := regx1_1.FindString(title1)
+	if str1 != "" {
 		//log.Println("标题1,规则一提取:",str1)
-	}else {
-		regx1_2,_ := regexp.Compile(regular2)
-		str1=regx1_2.FindString(title1)
-		if str1!="" {
+	} else {
+		regx1_2, _ := regexp.Compile(regular2)
+		str1 = regx1_2.FindString(title1)
+		if str1 != "" {
 			//log.Println("标题1,规则二提取:",str1)
 		}
 	}
 
-	regx2_1,_ := regexp.Compile(regular1)
-	str2:=regx2_1.FindString(title2)
-	if str2!="" {
+	regx2_1, _ := regexp.Compile(regular1)
+	str2 := regx2_1.FindString(title2)
+	if str2 != "" {
 		//log.Println("标题2,规则一提取:",str2)
-	}else {
-		regx2_2,_ := regexp.Compile(regular2)
-		str2=regx2_2.FindString(title2)
-		if str2!="" {
+	} else {
+		regx2_2, _ := regexp.Compile(regular2)
+		str2 = regx2_2.FindString(title2)
+		if str2 != "" {
 			//log.Println("标题2,规则二提取:",str2)
 		}
 	}
 
 	//根据提取的结果,在进行清洗
-	if str1!="" {
+	if str1 != "" {
 		str1 = deleteExtraSpace(str1)
-		str1= strings.Replace(str1, "(", "", -1)
-		str1= strings.Replace(str1, "(", "", -1)
-		str1= strings.Replace(str1, ")", "", -1)
-		str1= strings.Replace(str1, ")", "", -1)
+		str1 = strings.Replace(str1, "(", "", -1)
+		str1 = strings.Replace(str1, "(", "", -1)
+		str1 = strings.Replace(str1, ")", "", -1)
+		str1 = strings.Replace(str1, ")", "", -1)
 		str1 = convertArabicNumeralsAndLetters(str1)
 	}
 
-	if str2!="" {
+	if str2 != "" {
 		str2 = deleteExtraSpace(str2)
-		str2= strings.Replace(str2, "(", "", -1)
-		str2= strings.Replace(str2, "(", "", -1)
-		str2= strings.Replace(str2, ")", "", -1)
-		str2= strings.Replace(str2, ")", "", -1)
+		str2 = strings.Replace(str2, "(", "", -1)
+		str2 = strings.Replace(str2, "(", "", -1)
+		str2 = strings.Replace(str2, ")", "", -1)
+		str2 = strings.Replace(str2, ")", "", -1)
 		str2 = convertArabicNumeralsAndLetters(str2)
 	}
 
 	//log.Println("最终:",str1,str2)
-	if str1!=str2 {
+	if str1 != str2 {
 		//log.Println("不一致")
 		return true
-	}else {
+	} else {
 		//log.Println("一致")
 		return false
 	}
 }
 
-
 //删除中标单位字符串中多余的空格(含tab)
 func deleteExtraSpace(s string) string {
 	//删除字符串中的多余空格,有多个空格时,仅保留一个空格
@@ -191,77 +190,75 @@ func deleteExtraSpace(s string) string {
 }
 
 //中标金额倍率:10000
-func isBidWinningAmount(f1 float64 ,f2 float64) bool {
+func isBidWinningAmount(f1 float64, f2 float64) bool {
 
-	if f1==f2||f1*10000==f2||f2*10000==f1 {
+	if f1 == f2 || f1*10000 == f2 || f2*10000 == f1 {
 		return false
 	}
 	return true
 }
 
-
 //时间间隔周期
-func isTimeIntervalPeriod(i1 int64 ,i2 int64) bool {
+func isTimeIntervalPeriod(i1 int64, i2 int64) bool {
 
 	if math.Abs(float64(i1-i2)) < 172800.0 {
 		return true
-	}else {
+	} else {
 		return false //大于48小时
 	}
 }
 
-
 //开标时间区间为一天
-func isBidopentimeInterval(i1 int64 ,i2 int64) bool {
-	if i1==0||i2==0 {
+func isBidopentimeInterval(i1 int64, i2 int64) bool {
+	if i1 == 0 || i2 == 0 {
 		return false
 	}
 	//不在同一天-或者同一天间隔超过六小时,属于不相等返回true
-	timeOne,timeTwo:=i1,i2
+	timeOne, timeTwo := i1, i2
 	day1 := qutil.FormatDateByInt64(&timeOne, qutil.Date_yyyyMMdd)
 	day2 := qutil.FormatDateByInt64(&timeTwo, qutil.Date_yyyyMMdd)
-	if day1==day2 {
+	if day1 == day2 {
 		//是否间隔超过十二小时
-		if math.Abs(float64(i1-i2)) >43200.0 {
+		if math.Abs(float64(i1-i2)) > 43200.0 {
 			return true
-		}else {
+		} else {
 			return false
 		}
-	}else {
+	} else {
 		return true
 	}
 }
 
 //发布时间区间为一天
-func isPublishtimeInterval(i1 int64 ,i2 int64) bool {
-	if i1==0||i2==0 {
+func isPublishtimeInterval(i1 int64, i2 int64) bool {
+	if i1 == 0 || i2 == 0 {
 		return false
 	}
 	//不在同一天-或者同一天间隔超过12小时,属于不相等返回true
-	timeOne,timeTwo:=i1,i2
+	timeOne, timeTwo := i1, i2
 	day1 := qutil.FormatDateByInt64(&timeOne, qutil.Date_yyyyMMdd)
 	day2 := qutil.FormatDateByInt64(&timeTwo, qutil.Date_yyyyMMdd)
-	if day1==day2 {
+	if day1 == day2 {
 		//是否间隔超过十二小时
-		if math.Abs(float64(i1-i2)) >=43200.0 {
+		if math.Abs(float64(i1-i2)) >= 43200.0 {
 			return true
-		}else {
+		} else {
 			return false
 		}
-	}else {
+	} else {
 		return true
 	}
 }
 
 //开标时间区间为一天
-func isTheSameDay(i1 int64 ,i2 int64) bool {
-	if i1==0||i2==0 {
+func isTheSameDay(i1 int64, i2 int64) bool {
+	if i1 == 0 || i2 == 0 {
 		return false
 	}
-	timeOne,timeTwo:=i1,i2
+	timeOne, timeTwo := i1, i2
 	day1 := qutil.FormatDateByInt64(&timeOne, qutil.Date_yyyyMMdd)
 	day2 := qutil.FormatDateByInt64(&timeTwo, qutil.Date_yyyyMMdd)
-	if day1==day2 {
+	if day1 == day2 {
 		return true
 	}
 	//if math.Abs(float64(i1-i2)) <=86400.0 {
@@ -270,12 +267,10 @@ func isTheSameDay(i1 int64 ,i2 int64) bool {
 	return false
 }
 
-
-
 //前置0 五要素均相等认为重复
 func leadingElementSame(v *Info, info *Info) bool {
 
-	isok:= 0
+	isok := 0
 	if info.projectname != "" && v.projectname == info.projectname {
 		isok++
 	}
@@ -286,7 +281,7 @@ func leadingElementSame(v *Info, info *Info) bool {
 		if info.contractnumber != "" && v.contractnumber == info.contractnumber {
 			isok++
 		}
-	}else {
+	} else {
 		if info.projectcode != "" && v.projectcode == info.projectcode {
 			isok++
 		}
@@ -297,30 +292,28 @@ func leadingElementSame(v *Info, info *Info) bool {
 	if v.agency == info.agency {
 		isok++
 	}
-	if v.winner == info.winner&&info.winner != "" {
+	if v.winner == info.winner && info.winner != "" {
 		isok++
 	}
 
-	if isok>=5 {
+	if isok >= 5 {
 		return true
 	}
 
-
-
 	return false
 }
 
 //buyer的优先级
 func buyerIsContinue(v *Info, info *Info) bool {
-	if !isTheSameDay(info.publishtime,v.publishtime) {
+	if !isTheSameDay(info.publishtime, v.publishtime) {
 		return true
 	}
-	if v.title != info.title && v.title != "" && info.title != ""{
-		if v.projectname != info.projectname && v.projectname != "" && info.projectname != ""{
+	if v.title != info.title && v.title != "" && info.title != "" {
+		if v.projectname != info.projectname && v.projectname != "" && info.projectname != "" {
 			return true
 		}
 	}
-	if v.projectname != info.projectname && v.projectname != "" && info.projectname != ""{
+	if v.projectname != info.projectname && v.projectname != "" && info.projectname != "" {
 		return true
 	}
 	//if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
@@ -342,8 +335,6 @@ func buyerIsContinue(v *Info, info *Info) bool {
 	return false
 }
 
-
-
 //无效数据
 func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
 	var n int
@@ -363,4 +354,47 @@ func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
 		return true
 	}
 	return false
-}
+}
+
+//判断~是否需要替换数据相关
+func judgeIsReplaceInfo(s_href string, i_href string) bool {
+	if strings.Contains(s_href, "https://www.jianyu360.cn") &&
+		!strings.Contains(i_href, "https://www.jianyu360.cn") {
+		return true
+	}
+	return false
+}
+
+//查询抽取表数据
+func confrimExtractData(source_id string, info_id string) (bool, map[string]interface{}, map[string]interface{}) {
+	source_data := map[string]interface{}{}
+	info_data := map[string]interface{}{}
+	isvalid := false
+	source_data = data_mgo.FindById(extract, source_id)
+	info_data = data_mgo.FindById(extract, info_id)
+	if len(source_data) > 2 && len(info_data) > 2 {
+		isvalid = true
+		ts_id := source_data["_id"]
+		ti_id := info_data["_id"]
+		source_data["_id"] = ti_id
+		info_data["_id"] = ts_id
+	}
+	return isvalid, info_data, source_data
+}
+
+//查询bidding表数据
+func confrimBiddingData(source_id string, info_id string) (bool, map[string]interface{}, map[string]interface{}) {
+	source_data := map[string]interface{}{}
+	info_data := map[string]interface{}{}
+	isvalid := false
+	source_data = task_mgo.FindById(task_bidding, source_id)
+	info_data = task_mgo.FindById(task_bidding, info_id)
+	if len(source_data) > 2 && len(info_data) > 2 {
+		isvalid = true
+		ts_id := source_data["_id"]
+		ti_id := info_data["_id"]
+		source_data["_id"] = ti_id
+		info_data["_id"] = ts_id
+	}
+	return isvalid, info_data, source_data
+}

+ 0 - 416
src/dataMethodMerge.go

@@ -1,416 +0,0 @@
-package main
-
-import "qfw/util"
-
-
-func mergeDataFields(source *Info, info *Info) (*Info,map[string]interface{} ,bool) {
-	update_map := map[string]interface{}{
-		"$set": map[string]interface{}{},
-	}
-	mergeMap :=source.mergemap
-	isReplace:=false
-	//项目名称
-	if source.projectname == "" && info.projectname != "" {
-		mergeMap["projectname"] = map[string]interface{}{
-			"projectname":info.projectname,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["projectname"] = info.projectname
-		source.projectname = info.projectname
-		isReplace = true
-	}
-
-	//项目编号
-	if source.projectcode == "" && info.projectcode != "" {
-		mergeMap["projectcode"] = map[string]interface{}{
-			"projectcode":info.projectcode,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["projectcode"] = info.projectcode
-		source.projectcode = info.projectcode
-		isReplace = true
-	}
-
-	//采购单位
-	if source.buyer == "" && info.buyer != "" {
-		mergeMap["buyer"] = map[string]interface{}{
-			"buyer":info.buyer,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["buyer"] = info.buyer
-		source.buyer = info.buyer
-		isReplace = true
-	}
-
-	//预算
-	if source.budget == 0 && info.budget != 0 {
-		mergeMap["budget"] = map[string]interface{}{
-			"budget":info.budget,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["budget"] = info.budget
-		source.budget = info.budget
-		isReplace = true
-	}
-
-	//中标单位
-	if source.winner == "" && info.winner != "" {
-		mergeMap["winner"] = map[string]interface{}{
-			"winner":info.winner,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["winner"] = info.winner
-		source.winner = info.winner
-		isReplace = true
-	}
-
-	//中标金额
-	if source.bidamount == 0 && info.bidamount != 0 {
-		mergeMap["bidamount"] = map[string]interface{}{
-			"bidamount":info.bidamount,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["bidamount"] = info.bidamount
-		source.bidamount = info.bidamount
-		isReplace = true
-	}
-
-	//开标时间
-	if source.bidopentime == 0 && info.bidopentime != 0 {
-		mergeMap["bidopentime"] = map[string]interface{}{
-			"bidopentime":info.bidopentime,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["bidopentime"] = info.bidopentime
-		source.bidopentime = info.bidopentime
-		isReplace = true
-	}
-
-	//合同编号
-	if source.contractnumber == "" && info.contractnumber != "" {
-		mergeMap["contractnumber"] = map[string]interface{}{
-			"contractnumber":info.contractnumber,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["contractnumber"] = info.contractnumber
-		source.contractnumber = info.contractnumber
-		isReplace = true
-	}
-
-	//代理机构
-	if source.agency == "" && info.agency != "" {
-		mergeMap["agency"] = map[string]interface{}{
-			"agency":info.agency,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["agency"] = info.agency
-		source.agency = info.agency
-		isReplace = true
-	}
-
-	source.mergemap = mergeMap
-	update_map["$set"].(map[string]interface{})["merge"] = mergeMap
-
-	return source,update_map,isReplace
-}
-
-
-//合并字段-并更新merge字段的值-
-func mergeDataFieldsArr(source *Info, info *Info) (*Info, []int64, bool) {
-
-	merge_recordMap := make(map[string]interface{}, 0)
-	mergeArr := make([]int64, 0)
-	//是否替换数据了-记录原始的数据
-	is_replace := false
-	//1、城市
-	if source.area == "" || source.area == "全国" {
-		//为空
-		if info.area != "全国" && info.area != "" {
-			merge_recordMap["area"] = source.area
-			merge_recordMap["city"] = source.city
-			source.area = info.area
-			source.city = info.city
-			mergeArr = append(mergeArr, 1)
-			is_replace = true
-		}
-	} else {
-		//不为空-查看站点相关-有值必替换
-		if source.is_site {
-			//是站点替换的城市
-			merge_recordMap["site_area"] = source.area
-			merge_recordMap["site_city"] = source.city
-			mergeArr = append(mergeArr, 0)
-			is_replace = true
-			source.is_site = false
-
-		}
-	}
-	//2、项目名称
-	if source.projectname == "" && info.projectname != "" {
-		merge_recordMap["projectname"] = source.projectname
-		source.projectname = info.projectname
-		mergeArr = append(mergeArr, 2)
-		is_replace = true
-	}
-	//3、项目编号
-	if source.projectcode == "" && info.projectcode != "" {
-		merge_recordMap["projectcode"] = source.projectcode
-		source.projectcode = info.projectcode
-		mergeArr = append(mergeArr, 3)
-		is_replace = true
-	}
-	//4、采购单位
-	if source.buyer == "" && info.buyer != "" {
-		merge_recordMap["buyer"] = source.buyer
-		source.buyer = info.buyer
-		mergeArr = append(mergeArr, 4)
-		is_replace = true
-	}
-	//5、预算
-	if source.budget == 0 && info.budget != 0 {
-		merge_recordMap["budget"] = source.budget
-		source.budget = info.budget
-		mergeArr = append(mergeArr, 5)
-		is_replace = true
-	}
-	//6、中标单位
-	if source.winner == "" && info.winner != "" {
-		merge_recordMap["winner"] = source.winner
-		source.winner = info.winner
-		mergeArr = append(mergeArr, 6)
-		is_replace = true
-	}
-
-	//7、中标金额
-	if source.bidamount == 0 && info.bidamount != 0 {
-		merge_recordMap["bidamount"] = source.bidamount
-		source.bidamount = info.bidamount
-		mergeArr = append(mergeArr, 7)
-		is_replace = true
-	}
-	//8、开标时间-地点
-	if source.bidopentime == 0 && info.bidopentime != 0 {
-		merge_recordMap["bidopentime"] = source.bidopentime
-		source.bidopentime = info.bidopentime
-		mergeArr = append(mergeArr, 8)
-		is_replace = true
-	}
-
-	//9、合同编号
-	if source.contractnumber == "" && info.contractnumber != "" {
-		merge_recordMap["contractnumber"] = source.contractnumber
-		source.contractnumber = info.contractnumber
-		mergeArr = append(mergeArr, 9)
-		is_replace = true
-	}
-
-	//10、发布时间
-	if source.publishtime == 0 && info.publishtime != 0 {
-		merge_recordMap["publishtime"] = source.publishtime
-		source.publishtime = info.publishtime
-		mergeArr = append(mergeArr, 10)
-		is_replace = true
-	}
-	//11、代理机构
-	if source.agency == "" && info.agency != "" {
-		merge_recordMap["agency"] = source.agency
-		source.agency = info.agency
-		mergeArr = append(mergeArr, 11)
-		is_replace = true
-	}
-
-	if is_replace { //有过替换更新
-		//总次数+1
-		source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
-		merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
-		//和哪一个数据id进行非空替换的-记录
-		key := info.id
-		source.mergemap[key] = merge_recordMap
-	}
-
-	//待进一步优化
-	return source, mergeArr, is_replace
-}
-
-//权重评估
-func basicDataScore(v *Info, info *Info) bool {
-
-	/*
-	  权重评估
-	  网站优先级判定规则:
-	  1、国家>省级>市级>县区
-	  2、政府采购>公共资源>官方网站|政府门户>社会公共招标平台|企业招标平台
-	  3、同sitetype-分析weight
-	  4、要素打分-分析
-	*/
-	v_score, info_score := -1, -1
-	dict_v := SiteMap[v.site]
-	dict_info := SiteMap[info.site]
-	//先判断level
-	if dict_v != nil {
-		v_level := util.ObjToString(dict_v["level"])
-		if v_level == "国家" {
-			v_score = 4
-		} else if v_level == "省级" {
-			v_score = 3
-		} else if v_level == "市级" {
-			v_score = 2
-		} else if v_level == "县区" {
-			v_score = 1
-		} else if v_level == "" {
-		} else {
-			v_score = 0
-		}
-	}
-
-	if dict_info != nil {
-		info_level := util.ObjToString(dict_info["level"])
-		if info_level == "国家" {
-			info_score = 4
-		} else if info_level == "省级" {
-			info_score = 3
-		} else if info_level == "市级" {
-			info_score = 2
-		} else if info_level == "县区" {
-			info_score = 1
-		} else if info_level == "" {
-
-		} else {
-			v_score = 0
-		}
-	}
-
-	if v_score > info_score {
-		return true
-	}
-	if v_score < info_score {
-		return false
-	}
-
-	//判断sitetype
-	if dict_v != nil {
-		v_sitetype := util.ObjToString(dict_v["sitetype"])
-		if v_sitetype == "政府采购" {
-			v_score = 4
-		} else if v_sitetype == "公共资源" {
-			v_score = 3
-		} else if v_sitetype == "官方网站"|| v_sitetype == "政府门户" {
-			v_score = 2
-		} else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
-			v_score = 1
-		} else if v_sitetype == "" {
-		} else {
-			v_score = 0
-		}
-	}
-
-	if dict_info != nil {
-		info_sitetype := util.ObjToString(dict_info["sitetype"])
-		if info_sitetype == "政府采购" {
-			info_score = 4
-		} else if info_sitetype == "公共资源" {
-			info_score = 3
-		} else if info_sitetype == "官方网站"|| info_sitetype == "政府门户" {
-			info_score = 2
-		} else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
-			info_score = 1
-		} else if info_sitetype == "" {
-		} else {
-			info_score = 0
-		}
-	}
-
-	if v_score > info_score {
-		return true
-	}
-	if v_score < info_score {
-		return false
-	}
-
-	if v_score == info_score {//同sitetype 情况下   分析weight
-		v_weight := util.IntAll(dict_v["weight"])
-		info_weight := util.IntAll(dict_info["weight"])
-		if v_weight>info_weight {
-			return true
-		}
-		if info_weight>v_weight {
-			return false
-		}
-	}
-
-	//网站评估
-	m, n := 0, 0
-	if v.projectname != "" {
-		m++
-	}
-	if v.buyer != "" {
-		m++
-	}
-	if v.projectcode != "" || v.contractnumber != "" {
-		m++
-	}
-	if v.budget != 0 {
-		m++
-	}
-	if v.bidamount != 0 {
-		m++
-	}
-	if v.winner != "" {
-		m++
-	}
-	if v.bidopentime != 0 {
-		m++
-	}
-	if v.bidopenaddress != "" {
-		m++
-	}
-	if v.agency != "" {
-		m = m + 2
-	}
-	if v.city != "" {
-		m = m + 2
-	}
-
-	if info.projectname != "" {
-		n++
-	}
-	if info.buyer != "" {
-		n++
-	}
-	if info.projectcode != "" || info.contractnumber != "" {
-		n++
-	}
-	if info.budget != 0 {
-		n++
-	}
-	if info.bidamount != 0 {
-		n++
-	}
-	if info.winner != "" {
-		n++
-	}
-	if info.bidopentime != 0 {
-		n++
-	}
-	if info.bidopenaddress != "" {
-		n++
-	}
-	if info.agency != "" {
-		n = n + 2
-	}
-	if info.city != "" {
-		n = n + 2
-	}
-
-	if m > n {
-		return true
-	} else if m == n {
-		if v.publishtime >= info.publishtime {
-			return true
-		} else {
-			return false
-		}
-	} else {
-		return false
-	}
-}

+ 113 - 135
src/datamap.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"fmt"
+	"go.mongodb.org/mongo-driver/bson/primitive"
 	"log"
 	qutil "qfw/util"
 	"reflect"
@@ -12,32 +13,31 @@ import (
 )
 
 type Info struct {
-	id    string //id
-	title string //标题
-	spidercode		string //爬虫代码
-	area           string  //省份
-	city           string  //城市
-	subtype        string  //信息类型
-	buyer          string  //采购单位
-	agency         string  //代理机构
-	winner         string  //中标单位
-	budget         float64 //预算金额
-	bidamount      float64 //中标金额
-	projectname    string  //项目名称
-	projectcode    string  //项目编号
-	contractnumber string  //合同编号
-	publishtime    int64   //发布时间
-	comeintime     int64   //入库时间
-	bidopentime    int64   //开标时间
-	bidopenaddress string  //开标地点
-	site 		   string //站点
-	href 		     string //正文的url
-	repeatid         string                 //重复id
-	titleSpecialWord bool                   //标题特殊词
-	specialWord      bool                   //再次判断的特殊词
-	mergemap         map[string]interface{} //合并记录
-	is_site          bool                   //是否站点城市
-	repeat_ids        []string               //记录所有重复id
+	id               string   //id
+	title            string   //标题
+	spidercode       string   //爬虫代码
+	area             string   //省份
+	city             string   //城市
+	subtype          string   //信息类型
+	buyer            string   //采购单位
+	agency           string   //代理机构
+	winner           string   //中标单位
+	budget           float64  //预算金额
+	bidamount        float64  //中标金额
+	projectname      string   //项目名称
+	projectcode      string   //项目编号
+	contractnumber   string   //合同编号
+	publishtime      int64    //发布时间
+	comeintime       int64    //入库时间
+	bidopentime      int64    //开标时间
+	bidopenaddress   string   //开标地点
+	site             string   //站点
+	href             string   //正文的url
+	repeatid         string   //重复id
+	titleSpecialWord bool     //标题特殊词
+	specialWord      bool     //再次判断的特殊词
+	is_site          bool     //是否站点城市
+	repeat_ids       []string //记录所有重复id
 
 }
 
@@ -46,19 +46,19 @@ var sitelock sync.Mutex         //锁
 
 //一般数据判重
 type datamap struct {
-	lock   sync.Mutex //锁
-	days   int        //保留几天数据
-	data   map[string][]*Info
-	keymap []string
+	lock     sync.Mutex //锁
+	days     int        //保留几天数据
+	data     map[string][]*Info
+	keymap   []string
 	areakeys []string
-	keys   map[string]bool
+	keys     map[string]bool
 }
 
 //历史~存量
-func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
+func TimedTaskDatamap(days int, lasttime int64, numIndex int) *datamap {
 	datelimit = qutil.Float64All(days * 86400)
-	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{},map[string]bool{}}
-	if lasttime <0 {
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{}, map[string]bool{}}
+	if lasttime < 0 {
 		log.Println("数据池空数据")
 		return dm
 	}
@@ -73,13 +73,13 @@ func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 	n, continuSum := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		if n%10000 == 0 {
-			log.Println("当前 n:", n,"数量:" ,continuSum,tmp["_id"],tmp["publishtime"])
+			log.Println("当前 n:", n, "数量:", continuSum, tmp["_id"], tmp["publishtime"])
 		}
 		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 ||
 			qutil.IntAll(tmp["dataging"]) == 1 {
 
 		} else {
-			if fmt.Sprint(reflect.TypeOf(tmp["publishtime"]))=="string" {
+			if fmt.Sprint(reflect.TypeOf(tmp["publishtime"])) == "string" {
 				continue
 			}
 			pt := tmp["publishtime"]
@@ -101,15 +101,15 @@ func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 				dm.data[k] = data
 				dm.keys[dkey] = true
 				//添加省
-				isAreaExist :=false
-				for _,v:= range dm.areakeys {
-					if v==info.area {
+				isAreaExist := false
+				for _, v := range dm.areakeys {
+					if v == info.area {
 						isAreaExist = true
 					}
 				}
 				if !isAreaExist {
 					areaArr := dm.areakeys
-					areaArr = append(areaArr,info.area)
+					areaArr = append(areaArr, info.area)
 					dm.areakeys = areaArr
 				}
 			} else {
@@ -120,7 +120,7 @@ func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 		tmp = make(map[string]interface{})
 	}
 
-	log.Printf("第%d组:数据池构建完成:%d秒,%d个\n",numIndex ,int(time.Now().Unix())-start, n)
+	log.Printf("第%d组:数据池构建完成:%d秒,%d个\n", numIndex, int(time.Now().Unix())-start, n)
 
 	return dm
 }
@@ -128,7 +128,7 @@ func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 //增量
 func NewDatamap(days int, lastid string) *datamap {
 	datelimit = qutil.Float64All(days * 86400 * 2)
-	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{},[]string{}, map[string]bool{}}
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{}, map[string]bool{}}
 	if lastid == "" {
 		log.Println("不构建数据池")
 		return dm
@@ -141,7 +141,7 @@ func NewDatamap(days int, lastid string) *datamap {
 	}}
 	log.Println("query", query)
 	it := sess.DB(data_mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter()
-	nowTime := time.Now().Unix()//当前时间的时间戳
+	nowTime := time.Now().Unix() //当前时间的时间戳
 	n, continuSum := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 
@@ -149,14 +149,13 @@ func NewDatamap(days int, lastid string) *datamap {
 		//if util.IntAll((*source)["sourcewebsite"]) == 1 {
 		//	continue
 		//}
-
-		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1{
+		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 {
 
 		} else {
-			if fmt.Sprint(reflect.TypeOf(tmp["publishtime"]))=="string" {
+			if fmt.Sprint(reflect.TypeOf(tmp["publishtime"])) == "string" {
 				continue
 			}
-			pt:= tmp["publishtime"]
+			pt := tmp["publishtime"]
 			pt_time := qutil.Int64All(pt)
 			if pt_time > time.Now().Unix() {
 				continue
@@ -174,15 +173,15 @@ func NewDatamap(days int, lastid string) *datamap {
 				dm.data[k] = data
 				dm.keys[dkey] = true
 				//添加省
-				isAreaExist :=false
-				for _,v:= range dm.areakeys {
-					if v==info.area {
+				isAreaExist := false
+				for _, v := range dm.areakeys {
+					if v == info.area {
 						isAreaExist = true
 					}
 				}
 				if !isAreaExist {
 					areaArr := dm.areakeys
-					areaArr = append(areaArr,info.area)
+					areaArr = append(areaArr, info.area)
 					dm.areakeys = areaArr
 				}
 			} else {
@@ -190,19 +189,19 @@ func NewDatamap(days int, lastid string) *datamap {
 			}
 		}
 		if n%10000 == 0 {
-			log.Println("当前 n:", n,"数量:" ,continuSum,tmp["_id"])
+			log.Println("当前 n:", n, "数量:", continuSum, tmp["_id"])
 		}
 		tmp = make(map[string]interface{})
 	}
-	log.Println("load data:", n,"总数:",continuSum)
+	log.Println("load data:", n, "总数:", continuSum)
 	return dm
 }
 
 //数据构建
 func NewInfo(tmp map[string]interface{}) *Info {
 	subtype := qutil.ObjToString(tmp["subtype"])
-	if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
-		subtype=="竞谈"||subtype=="竞价" {
+	if subtype == "招标" || subtype == "邀标" || subtype == "询价" ||
+		subtype == "竞谈" || subtype == "竞价" {
 		subtype = "招标"
 	}
 
@@ -233,16 +232,17 @@ func NewInfo(tmp map[string]interface{}) *Info {
 	info.href = qutil.ObjToString(tmp["href"])
 	info.repeatid = qutil.ObjToString(tmp["repeatid"])
 	info.specialWord = FilterRegTitle.MatchString(info.title)
-	info.titleSpecialWord = FilterRegTitle_0.MatchString(info.title) ||FilterRegTitle_1.MatchString(info.title) || FilterRegTitle_2.MatchString(info.title)
-	info.mergemap = *qutil.ObjToMap(tmp["merge"])
-	if info.mergemap == nil {
-		info.mergemap = make(map[string]interface{}, 0)
+	info.titleSpecialWord = FilterRegTitle_0.MatchString(info.title) || FilterRegTitle_1.MatchString(info.title) || FilterRegTitle_2.MatchString(info.title)
+
+	//加载repeat_ids数据
+	repeat_ids := []string{}
+	if ids_1, ok := tmp["repeat_ids"].([]interface{}); ok {
+		repeat_ids = qutil.ObjArrToStringArr(ids_1)
 	}
-	if info.repeat_ids == nil {
-		info.repeat_ids = make([]string, 0)
+	if ids_2, ok := tmp["repeat_ids"].(primitive.A); ok {
+		repeat_ids = qutil.ObjArrToStringArr(ids_2)
 	}
-
-
+	info.repeat_ids = repeat_ids
 
 	info.is_site = false
 
@@ -258,11 +258,11 @@ func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) {
 	keys := []string{}
 	d.lock.Lock()
 	for k, _ := range d.keys { //不同时间段
-		if info.area=="全国" {//匹配所有省
-			for _,v := range d.areakeys{
+		if info.area == "全国" { //匹配所有省
+			for _, v := range d.areakeys {
 				keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, v))
 			}
-		}else {//匹配指定省
+		} else { //匹配指定省
 			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
 		}
 		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
@@ -281,21 +281,21 @@ L:
 					return false, v, ""
 				}
 				//buyer 优先级高,有值且不相等过滤
-				if info.buyer!=""&&v.buyer!=""&&info.buyer!=v.buyer {
+				if info.buyer != "" && v.buyer != "" && info.buyer != v.buyer {
 					if v.title != info.title && v.title != "" && info.title != "" {
 						isTestLog = true
 					}
-					if buyerIsContinue(v,info) {
+					if buyerIsContinue(v, info) {
 						continue
 					}
 				}
-				if info.site != "" {//站点临时赋值
+				if info.site != "" { //站点临时赋值
 					sitelock.Lock()
 					dict := SiteMap[info.site]
 					sitelock.Unlock()
 					if dict != nil {
-						if (info.area == "全国" && dict["area"] != "")||
-							(info.city == "" && dict["city"] != ""){
+						if (info.area == "全国" && dict["area"] != "") ||
+							(info.city == "" && dict["city"] != "") {
 							info.is_site = true
 							info.area = qutil.ObjToString(dict["area"])
 							info.city = qutil.ObjToString(dict["city"])
@@ -304,7 +304,7 @@ L:
 				}
 
 				//前置条件-五要素均相等
-				if leadingElementSame(v,info) {
+				if leadingElementSame(v, info) {
 					reason = "五要素-相同-满足"
 					b = true
 					source = v
@@ -322,64 +322,62 @@ L:
 						break L
 					}
 					//相同发布时间-标题无包含关系 - 项目名称不等
-					if isTheSameDay(info.publishtime,v.publishtime) {
-						if !isTheSimilarName(info.title,v.title){
+					if isTheSameDay(info.publishtime, v.publishtime) {
+						if !isTheSimilarName(info.title, v.title) {
 							continue
 						}
 					}
 					//
 
-
-
 					//不同href
 					if info.href != "" && info.href != v.href {
-						if v.title==info.title{
-							if !againRepeat(v, info,true)   {//进行同站点二次判断
+						if v.title == info.title {
+							if !againRepeat(v, info, true) { //进行同站点二次判断
 								reason = "同站点-href不同-标题相同等"
 								b = true
 								source = v
 								reasons = reason
 								break L
-							}else {
+							} else {
 								continue
 							}
-						}else {
-							if againRepeat(v, info,true)  {
+						} else {
+							if againRepeat(v, info, true) {
 								continue
 							}
 						}
 					}
 				}
 				//特殊词处理
-				specialNum:= dealWithSpecialWordNumber(info,v)
+				specialNum := dealWithSpecialWordNumber(info, v)
 				//前置条件 - 标题相关,有且一个关键词
-				if specialNum==1 {
-					if againRepeat(v, info,false) {
+				if specialNum == 1 {
+					if againRepeat(v, info, false) {
 						continue
 					}
 				}
 				//前置条件3 - 标题相关,均含有关键词
-				if specialNum==2 {
+				if specialNum == 2 {
 					if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
 						v.title != "" && info.title != "" {
-						letter1,letter2:=v.title,info.title
-						res, _ := regexp.Compile("[0-9a-zA-Z]+");
-						if res.MatchString(letter1)||res.MatchString(letter2) {
-							letter1=convertArabicNumeralsAndLetters(letter1)
-							letter2=convertArabicNumeralsAndLetters(letter2)
+						letter1, letter2 := v.title, info.title
+						res, _ := regexp.Compile("[0-9a-zA-Z]+")
+						if res.MatchString(letter1) || res.MatchString(letter2) {
+							letter1 = convertArabicNumeralsAndLetters(letter1)
+							letter2 = convertArabicNumeralsAndLetters(letter2)
 						}
-						if strings.Contains(letter1,"重新招标")|| strings.Contains(letter2,"重新招标"){
-							letter1,letter2=dealWithSpecialPhrases(letter1,letter2)
+						if strings.Contains(letter1, "重新招标") || strings.Contains(letter2, "重新招标") {
+							letter1, letter2 = dealWithSpecialPhrases(letter1, letter2)
 						}
-						if letter1==letter2 {
+						if letter1 == letter2 {
 							reason = reason + "标题关键词相等关系"
-							if !againRepeat(v, info,false) {//进行二级金额判断
+							if !againRepeat(v, info, false) { //进行二级金额判断
 								b = true
 								source = v
 								reasons = reason
 								break L
 							}
-						}else {
+						} else {
 							if !(strings.Contains(letter1, letter2) || strings.Contains(letter2, letter1)) {
 								//无包含关系-即不相等
 								if againContainSpecialWord(v, info) {
@@ -390,7 +388,6 @@ L:
 					}
 				}
 
-
 				//新增快速数据过少判重
 				if LowHeavy {
 					repeat := false
@@ -459,15 +456,15 @@ L:
 		}
 
 		//添加省
-		isAreaExist :=false
-		for _,v:= range d.areakeys {
-			if v==info.area {
+		isAreaExist := false
+		for _, v := range d.areakeys {
+			if v == info.area {
 				isAreaExist = true
 			}
 		}
 		if !isAreaExist {
 			areaArr := d.areakeys
-			areaArr = append(areaArr,info.area)
+			areaArr = append(areaArr, info.area)
 			d.areakeys = areaArr
 		}
 
@@ -475,7 +472,7 @@ L:
 	}
 
 	if isTestLog {
-		reasons = reasons+"-新修改"
+		reasons = reasons + "-新修改"
 	}
 	return
 }
@@ -484,10 +481,10 @@ func (d *datamap) update(t int64) {
 
 	if TimingTask {
 
-	}else {
+	} else {
 		if IsFull {
-			d.keymap = d.GetLatelyFiveDay(t)//全量
-		}else {
+			d.keymap = d.GetLatelyFiveDay(t) //全量
+		} else {
 			d.keymap = d.GetLatelyFiveDayDouble(t) //增量
 		}
 		m := map[string]bool{}
@@ -508,7 +505,7 @@ func (d *datamap) update(t int64) {
 
 }
 
-func (d *datamap) GetLatelyFiveDay(t int64) []string  {
+func (d *datamap) GetLatelyFiveDay(t int64) []string {
 	array := make([]string, d.days)
 	now := time.Unix(t, 0)
 	for i := 0; i < d.days; i++ {
@@ -518,7 +515,7 @@ func (d *datamap) GetLatelyFiveDay(t int64) []string  {
 	return array
 }
 
-func (d *datamap) GetLatelyFiveDayDouble(t int64) []string  {//增量-两倍
+func (d *datamap) GetLatelyFiveDayDouble(t int64) []string { //增量-两倍
 	array := make([]string, d.days*2)
 	now := time.Now()
 	for i := 0; i < d.days*2; i++ {
@@ -528,8 +525,6 @@ func (d *datamap) GetLatelyFiveDayDouble(t int64) []string  {//增量-两倍
 	return array
 }
 
-
-
 //替换原始数据池-更新
 func (d *datamap) replacePoolData(newData *Info) {
 	d.lock.Lock()
@@ -538,7 +533,7 @@ func (d *datamap) replacePoolData(newData *Info) {
 	k := fmt.Sprintf("%s_%s_%s", dkey, newData.subtype, newData.area)
 	data := d.data[k]
 	for k, v := range data {
-		if v.id == newData.id {//替换
+		if v.id == newData.id { //替换
 			data[k] = newData
 			break
 		}
@@ -547,16 +542,6 @@ func (d *datamap) replacePoolData(newData *Info) {
 	d.lock.Unlock()
 }
 
-
-
-
-
-
-
-
-
-
-
 //相互替换数据池-暂时弃用
 func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
 	//删除数据池的老数据
@@ -565,7 +550,7 @@ func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
 	k_old := fmt.Sprintf("%s_%s_%s", dkey_old, oldData.subtype, oldData.area)
 	data_old := d.data[k_old]
 	for k, v := range data_old {
-		if v.id == oldData.id {//删除对应当前的老数据
+		if v.id == oldData.id { //删除对应当前的老数据
 			data_old = append(data_old[:k], data_old[k+1:]...)
 			break
 		}
@@ -590,33 +575,26 @@ func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
 		d.data[k] = data
 	}
 	//添加省
-	isAreaExist :=false
-	for _,v:= range d.areakeys {
-		if v==newData.area {
+	isAreaExist := false
+	for _, v := range d.areakeys {
+		if v == newData.area {
 			isAreaExist = true
 		}
 	}
 	if !isAreaExist {
 		areaArr := d.areakeys
-		areaArr = append(areaArr,newData.area)
+		areaArr = append(areaArr, newData.area)
 		d.areakeys = areaArr
 	}
 
 	d.lock.Unlock()
 }
+
 //总计条数-暂时弃用
 func (d *datamap) currentTotalCount() int {
-	num:=qutil.IntAll(0)
-	for _,v:=range d.data {
-		num = num+qutil.IntAll(len(v))
+	num := qutil.IntAll(0)
+	for _, v := range d.data {
+		num = num + qutil.IntAll(len(v))
 	}
 	return num
 }
-
-
-
-
-
-
-
-

+ 2 - 2
src/fullDataRepeat.go

@@ -13,8 +13,8 @@ var timeLayout = "2006-01-02"
 //划分时间段落
 func initModelArr() []map[string]interface{} {
 	modelArr := make([]map[string]interface{},0)
-	start := time.Date(2021, 1, 1, 0, 0, 0, 0, time.Local).Unix()
-	end := time.Date(2021, 1, 5, 0, 0, 0, 0, time.Local).Unix()
+	start := time.Date(2021, 12, 15, 0, 0, 0, 0, time.Local).Unix()
+	end := time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local).Unix()
 	gte_time := start
 	lt_time := start+86400
 	log.Println("开始构建数据池...一周...")

+ 0 - 111
src/fullMgoRepeat.go

@@ -1,111 +0,0 @@
-package main
-
-import (
-	"log"
-	"qfw/common/src/qfw/util"
-	qu "qfw/util"
-	"sync"
-	"time"
-)
-
-//开始全量判重程序
-func fullMgoRepeat(sid,eid string) {
-	defer qu.Catch()
-	//区间id-是否分段
-	if IsFull && sec_gtid!="" && sec_lteid!=""{
-		sid = sec_gtid
-		eid = sec_lteid
-	}
-	q := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gt":  StringTOBsonId(sid),
-			"$lte": StringTOBsonId(eid),
-		},
-	}
-	log.Println("开始全量数据判重~查询条件:",data_mgo.DbName, extract, q)
-	sess := data_mgo.GetMgoConn()
-	defer data_mgo.DestoryMongoConn(sess)
-	it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-	n, isok ,repeatN:= 0,0,0
-	dataAllDict := make(map[string][]map[string]interface{},0)
-	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
-		if n%1000 == 0 {
-			log.Println("index: ", n, isok)
-		}
-		if util.IntAll(tmp["repeat"]) == 1 {
-			repeatN++
-			tmp = make(map[string]interface{})
-			continue
-		}
-		if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
-			tmp = make(map[string]interface{})
-			continue
-		}
-		//优化空间-相同天-划分一组(在分类别)
-
-
-
-
-
-
-		isok++
-		//数据分组-按照类别分组
-		subtype := qu.ObjToString(tmp["subtype"])
-		if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
-			subtype=="竞谈"||subtype=="竞价" {
-			subtype = "招标"
-		}
-		dataArr := dataAllDict[subtype]
-		if dataArr==nil {
-			dataArr = []map[string]interface{}{}
-		}
-		dataArr = append(dataArr,tmp)
-		dataAllDict[subtype] = dataArr
-		tmp = make(map[string]interface{})
-	}
-	log.Println("类别组划分完毕:",len(dataAllDict),"组","~","需要判重:",isok,"条")
-	pool := make(chan bool, threadNum)
-	wg := &sync.WaitGroup{}
-	for _,dataArr := range dataAllDict {
-		pool <- true
-		wg.Add(1)
-		go func(dataArr []map[string]interface{}) {
-			defer func() {
-				<-pool
-				wg.Done()
-			}()
-			num := 0
-			for _,tmp := range dataArr{
-				info := NewInfo(tmp)
-				b, source, _ := DM.check(info)
-				if b {
-					num++
-					var updateID = map[string]interface{}{} //记录更新判重的
-					updateID["_id"] = StringTOBsonId(info.id)
-					repeat_ids:=source.repeat_ids
-					repeat_ids =  append(repeat_ids,info.id)
-					source.repeat_ids = repeat_ids
-					DM.replacePoolData(source)//替换数据池-更新
-					//Update.updatePool <- []map[string]interface{}{//重复数据打标签
-					//	updateID,
-					//	map[string]interface{}{
-					//		"$set": map[string]interface{}{
-					//			"repeat":        1,
-					//			"repeat_reason": reason,
-					//			"repeat_id":     source.id,
-					//			"dataging":		 0,
-					//			"updatetime_repeat" :util.Int64All(time.Now().Unix()),
-					//		},
-					//	},
-					//}
-				}
-			}
-			numberlock.Lock()
-			repeatN+=num
-			numberlock.Unlock()
-		}(dataArr)
-	}
-	wg.Wait()
-	log.Println("this full data is over.", n, "repeateN:", repeatN)
-	time.Sleep(15 * time.Second)
-}

+ 101 - 55
src/increaseRepeat.go

@@ -22,12 +22,12 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 		},
 	}
 	log.Println("~~~~~~")
-	log.Println("开始增量数据判重~查询条件:",data_mgo.DbName, extract, q)
+	log.Println("开始增量数据判重~查询条件:", data_mgo.DbName, extract, q)
 	sess := data_mgo.GetMgoConn()
 	defer data_mgo.DestoryMongoConn(sess)
 	it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-	total, isok ,repeatN:= 0,0,0
-	dataAllDict := make(map[string][]map[string]interface{},0)
+	total, isok, repeatN := 0, 0, 0
+	dataAllDict := make(map[string][]map[string]interface{}, 0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
 		if total%1000 == 0 {
 			log.Println("current index : ", total, isok)
@@ -37,29 +37,29 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 			tmp = make(map[string]interface{})
 			continue
 		}
-		if qu.IntAll(tmp["dataging"]) == 1 && !IsFull{
+		if qu.IntAll(tmp["dataging"]) == 1 && !IsFull {
 			tmp = make(map[string]interface{})
 			continue
 		}
 		//数据分组-按照类别分组
 		isok++
 		subtype := qu.ObjToString(tmp["subtype"])
-		if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
-			subtype=="竞谈"||subtype=="竞价" {
+		if subtype == "招标" || subtype == "邀标" || subtype == "询价" ||
+			subtype == "竞谈" || subtype == "竞价" {
 			subtype = "招标"
 		}
 		dataArr := dataAllDict[subtype]
-		if dataArr==nil {
+		if dataArr == nil {
 			dataArr = []map[string]interface{}{}
 		}
-		dataArr = append(dataArr,tmp)
+		dataArr = append(dataArr, tmp)
 		dataAllDict[subtype] = dataArr
 		tmp = make(map[string]interface{})
 	}
-	log.Println("类别组:",len(dataAllDict),"组","~","总计:",total,"~","需判重:",isok)
+	log.Println("类别组:", len(dataAllDict), "组", "~", "总计:", total, "~", "需判重:", isok)
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
-	for _,dataArr := range dataAllDict {
+	for _, dataArr := range dataAllDict {
 		fmt.Print("...")
 		pool <- true
 		wg.Add(1)
@@ -69,13 +69,13 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 				wg.Done()
 			}()
 			num := 0
-			for _,tmp := range dataArr{
+			for _, tmp := range dataArr {
 				info := NewInfo(tmp)
-				b,source,reason := DM.check(info)
+				b, source, reason := DM.check(info)
 				if b {
 					//判断信息是否为-指定剑鱼发布数据
-					if jyfb_data[info.spidercode]!="" { //伪判重标记
-						Update.updatePool <- []map[string]interface{}{//原始数据打标签
+					if jyfb_data[info.spidercode] != "" { //伪判重标记
+						Update.updatePool <- []map[string]interface{}{ //原始数据打标签
 							map[string]interface{}{
 								"_id": StringTOBsonId(info.id),
 							},
@@ -85,43 +85,88 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 								},
 							},
 						}
-					} else { //真实重复~~~
+					} else {
 						num++
-						var updateID = map[string]interface{}{} //记录更新判重的
-						updateID["_id"] = StringTOBsonId(info.id)
-						repeat_ids:=source.repeat_ids
-						repeat_ids =  append(repeat_ids,info.id)
-						source.repeat_ids = repeat_ids
-						DM.replacePoolData(source)//替换数据池-更新
+						//判断是否为~替换数据~模式
+						if judgeIsReplaceInfo(source.href, info.href) {
+							temp_source_id := source.id
+							temp_info_id := info.id
+							temp_source := info
+							temp_source.id = temp_source_id
+							repeat_ids := source.repeat_ids
+							repeat_ids = append(repeat_ids, temp_info_id)
+							temp_source.repeat_ids = repeat_ids
+							DM.replacePoolData(temp_source)
+							//替换抽取表数据
+							is_ext, ext_s_data, ext_i_data := confrimExtractData(temp_source_id, temp_info_id)
+							if is_ext {
+								//标记加上~
+								ext_s_data["repeat"] = 0
+								ext_s_data["repeat_ids"] = repeat_ids
 
-						Update.updatePool <- []map[string]interface{}{//原始数据打标签
-							map[string]interface{}{
-								"_id": StringTOBsonId(source.id),
-							},
-							map[string]interface{}{
-								"$set": map[string]interface{}{
-									"repeat_ids": repeat_ids,
-								},
-							},
-						}
+								ext_i_data["repeat"] = 1
+								ext_i_data["repeat_id"] = source.id
+								ext_i_data["dataging"] = 0
+								ext_i_data["repeat_reason"] = reason
+								ext_i_data["updatetime_repeat"] = qu.Int64All(time.Now().Unix())
 
-						Update.updatePool <- []map[string]interface{}{//重复数据打标签
-							updateID,
-							map[string]interface{}{
-								"$set": map[string]interface{}{
-									"repeat":        1,
-									"repeat_reason": reason,
-									"repeat_id":     source.id,
-									"dataging":		 0,
-									"updatetime_repeat" :qu.Int64All(time.Now().Unix()),
+								data_mgo.DeleteById(extract, temp_source_id)
+								data_mgo.Save(extract, ext_s_data)
+								data_mgo.DeleteById(extract, temp_info_id)
+								data_mgo.Save(extract, ext_i_data)
+							} else {
+								log.Println("抽取表~未查询到数据~", temp_source_id, "~", temp_info_id)
+							}
+							i_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
+							if i_bid {
+								task_mgo.DeleteById(task_bidding, temp_source_id)
+								task_mgo.Save(task_bidding, bid_s_data)
+								task_mgo.DeleteById(task_bidding, temp_info_id)
+								task_mgo.Save(task_bidding, bid_i_data)
+							} else {
+								log.Println("原始表~未查询到数据~", temp_source_id, "~", temp_info_id)
+							}
+							//日志记录
+							data_mgo.Save(extract_log, map[string]interface{}{
+								"_id":        StringTOBsonId(temp_info_id),
+								"replace_id": temp_source_id,
+							})
+
+						} else {
+							var updateID = map[string]interface{}{} //记录更新判重的
+							updateID["_id"] = StringTOBsonId(info.id)
+							repeat_ids := source.repeat_ids
+							repeat_ids = append(repeat_ids, info.id)
+							source.repeat_ids = repeat_ids
+							DM.replacePoolData(source) //替换数据池-更新
+							Update.updatePool <- []map[string]interface{}{ //原始数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
 								},
-							},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							}
+							Update.updatePool <- []map[string]interface{}{ //重复数据打标签
+								updateID,
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat":            1,
+										"repeat_reason":     reason,
+										"repeat_id":         source.id,
+										"dataging":          0,
+										"updatetime_repeat": qu.Int64All(time.Now().Unix()),
+									},
+								},
+							}
 						}
 					}
 				}
 			}
 			numberlock.Lock()
-			repeatN+=num
+			repeatN += num
 			numberlock.Unlock()
 		}(dataArr)
 	}
@@ -152,42 +197,43 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
 	}
 }
+
 //更新ocr表
-func updateOcrFileData(cur_lteid string)  {
+func updateOcrFileData(cur_lteid string) {
 	//更新ocr 分类表-判重的状态
-	log.Println("开始更新Ocr表-标记",cur_lteid)
+	log.Println("开始更新Ocr表-标记", cur_lteid)
 	task_sess := task_mgo.GetMgoConn()
 	defer task_mgo.DestoryMongoConn(task_sess)
-	q_task:=map[string]interface{}{}
+	q_task := map[string]interface{}{}
 	it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
-	isUpdateOcr:=false
-	updateOcrFile:=[][]map[string]interface{}{}
+	isUpdateOcr := false
+	updateOcrFile := [][]map[string]interface{}{}
 	for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
 		cur_id := BsonTOStringId(tmp["_id"])
-		lteid:=qu.ObjToString(tmp["lteid"])
-		if (lteid==cur_lteid) { //需要更新
-			log.Println("找到该lteid数据",cur_lteid,cur_id)
+		lteid := qu.ObjToString(tmp["lteid"])
+		if lteid == cur_lteid { //需要更新
+			log.Println("找到该lteid数据", cur_lteid, cur_id)
 			isUpdateOcr = true
-			updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
+			updateOcrFile = append(updateOcrFile, []map[string]interface{}{ //重复数据打标签
 				map[string]interface{}{
 					"_id": tmp["_id"],
 				},
 				map[string]interface{}{
 					"$set": map[string]interface{}{
 						"is_repeat_status": 1,
-						"is_repeat_time" : qu.Int64All(time.Now().Unix()),
+						"is_repeat_time":   qu.Int64All(time.Now().Unix()),
 					},
 				},
 			})
 			tmp = make(map[string]interface{})
 			break
-		}else {
+		} else {
 			tmp = make(map[string]interface{})
 		}
 	}
 	if !isUpdateOcr {
-		log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
-	}else {
+		log.Println("出现异常问题,查询不到ocr的lteid", cur_lteid)
+	} else {
 		if len(updateOcrFile) > 0 {
 			task_mgo.UpSertBulk(task_collName, updateOcrFile...)
 		}

+ 66 - 106
src/main.go

@@ -18,75 +18,71 @@ import (
 )
 
 var (
-	Sysconfig    map[string]interface{} 	//配置文件
-	mconf        map[string]interface{} 	//mongodb配置信息
-	data_mgo          *MongodbSim            	//mongodb操作对象
-	task_mgo     *MongodbSim            	//mongodb操作对象
-	task_collName	string
-	extract      string
-	extract_back string
-	udpclient    mu.UdpClient             	//udp对象
-	nextNode     []map[string]interface{} 	//下节点数组
-	dupdays      = 7                      	//初始化判重范围
-	DM           *datamap                 	//
-	Update		 *updateInfo
-	AddGroupPool *addGroupInfo
-	FullDM       *datamap                 	//\临时全量数据池
+	Sysconfig                          map[string]interface{} //配置文件
+	mconf                              map[string]interface{} //mongodb配置信息
+	data_mgo                           *MongodbSim            //mongodb操作对象
+	task_mgo                           *MongodbSim            //mongodb操作对象
+	task_collName, task_bidding        string
+	extract, extract_back, extract_log string
+	udpclient                          mu.UdpClient             //udp对象
+	nextNode                           []map[string]interface{} //下节点数组
+	dupdays                            = 7                      //初始化判重范围
+	DM                                 *datamap                 //
+	Update                             *updateInfo
+	AddGroupPool                       *addGroupInfo
+	FullDM                             *datamap //\临时全量数据池
 	//正则筛选相关
-	FilterRegTitle   = regexp.MustCompile("^_$")
-	FilterRegTitle_0 = regexp.MustCompile("^_$")
-	FilterRegTitle_1 = regexp.MustCompile("^_$")
-	FilterRegTitle_2 = regexp.MustCompile("^_$")
-	threadNum      int                               //线程数量
-	SiteMap        map[string]map[string]interface{} //站点map
-	LowHeavy       bool                              //低质量数据判重
-	TimingTask     bool                              //是否定时任务
-	timingSpanDay  int64                             //时间跨度
-	timingPubScope int64                             //发布时间周期
-	gtid,lastid,sec_gtid,sec_lteid string					 //命令输入
-	lteid	string									 //历史增量属性
-	IsFull		   bool								 //是否全量
-	updatelock 		sync.Mutex         				 //锁4
-	numberlock 		sync.Mutex         				 //锁4
-	userName,passWord 	string						 //mongo -用户密码
-	jyfb_data		map[string]string		 		//任务池
-	taskList		[]map[string]interface{}		 //任务池
-	isUpdateSite	bool
+	FilterRegTitle                    = regexp.MustCompile("^_$")
+	FilterRegTitle_0                  = regexp.MustCompile("^_$")
+	FilterRegTitle_1                  = regexp.MustCompile("^_$")
+	FilterRegTitle_2                  = regexp.MustCompile("^_$")
+	threadNum                         int                               //线程数量
+	SiteMap                           map[string]map[string]interface{} //站点map
+	LowHeavy                          bool                              //低质量数据判重
+	TimingTask                        bool                              //是否定时任务
+	timingSpanDay                     int64                             //时间跨度
+	timingPubScope                    int64                             //发布时间周期
+	gtid, lastid, sec_gtid, sec_lteid string                            //命令输入
+	lteid                             string                            //历史增量属性
+	IsFull                            bool                              //是否全量
+	updatelock                        sync.Mutex                        //锁4
+	numberlock                        sync.Mutex                        //锁4
+	userName, passWord                string                            //mongo -用户密码
+	jyfb_data                         map[string]string                 //任务池
+	taskList                          []map[string]interface{}          //任务池
+	isUpdateSite                      bool
 )
+
 //初始化加载
 func init() {
 	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
-	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")	//历史
+	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")   //历史
 	flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
 	flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
-
 	flag.Parse()
 
 	qu.ReadConfig(&Sysconfig)
-
 	userName = qu.ObjToString(Sysconfig["userName"])
 	passWord = qu.ObjToString(Sysconfig["passWord"])
-
-	log.Println("集群用户密码:",userName,passWord)
-
+	log.Println("集群用户密码:", userName, passWord)
 	jyfb_arr := qu.ObjArrToStringArr(Sysconfig["jyfb_data"].([]interface{}))
-	jyfb_data = make(map[string]string,0)
-	for _,v := range jyfb_arr{
+	jyfb_data = make(map[string]string, 0)
+	for _, v := range jyfb_arr {
 		jyfb_data[v] = v
 	}
-
-	log.Println("伪判重~",jyfb_data)
+	log.Println("伪判重~", jyfb_data)
 
 	task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
 	task_mgo = &MongodbSim{
 		MongodbAddr: task_mconf["task_addrName"].(string),
 		DbName:      task_mconf["task_dbName"].(string),
 		Size:        qu.IntAllDef(task_mconf["task_pool"], 10),
-		UserName:	 userName,
-		Password:	 passWord,
+		UserName:    userName,
+		Password:    passWord,
 	}
 	task_mgo.InitPool()
 	task_collName = task_mconf["task_collName"].(string)
+	task_bidding = task_mconf["task_bidding"].(string)
 
 	nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
 	mconf = Sysconfig["mongodb"].(map[string]interface{})
@@ -99,6 +95,7 @@ func init() {
 
 	extract = mconf["extract"].(string)
 	extract_back = mconf["extract_back"].(string)
+	extract_log = mconf["extract_log"].(string)
 
 	dupdays = qu.IntAllDef(Sysconfig["dupdays"], 5)
 	//加载数据
@@ -125,8 +122,9 @@ func init() {
 	//站点配置
 	initSite()
 }
+
 //初始化站点信息
-func initSite(){
+func initSite() {
 	site := mconf["site"].(map[string]interface{})
 	SiteMap = make(map[string]map[string]interface{}, 0)
 	start := int(time.Now().Unix())
@@ -143,8 +141,8 @@ func initSite(){
 	}
 	isUpdateSite = false
 	log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
-
 }
+
 //udp接收
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
@@ -165,8 +163,8 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			}
 			//插入任务-判断任务-是否存在
 			updatelock.Lock()
-			taskList = append(taskList,mapInfo)
-			log.Println("udp收到任务...数量:",len(taskList),"具体任务:",taskList)
+			taskList = append(taskList, mapInfo)
+			log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
 			updatelock.Unlock()
 		}
 	case mu.OP_NOOP: //下个节点回应
@@ -177,31 +175,37 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 	}
 }
+
 //监听-获取-分发判重任务
-func getRepeatTask()  {
-	for  {
-		if len(taskList)>0 {
+func getRepeatTask() {
+	for {
+		if len(taskList) > 0 {
 			updatelock.Lock()
 			mapInfo := taskList[0]
-			if mapInfo != nil  {
+			if mapInfo != nil {
 				increaseRepeat(mapInfo) //判重方法
 			}
 			taskList = taskList[1:]
-			log.Println("此段落结束当前任务池...",len(taskList),taskList)
+			log.Println("此段落结束当前任务池...", len(taskList), taskList)
 			updatelock.Unlock()
-		}else {
+		} else {
 			time.Sleep(15 * time.Second)
 		}
 	}
 }
 
-
 func main() {
-
 	IsFull = true
-	AddGroupPool = newAddGroupPool()
-	go AddGroupPool.addGroupData()
-	fullDataRepeat() //全量判重
+
+	//AddGroupPool = newAddGroupPool()
+	//go AddGroupPool.addGroupData()
+	//fullDataRepeat() //全量判重
+
+	increaseRepeat(map[string]interface{}{
+		"gtid":  "62ec61170ae152a3c2310f02",
+		"lteid": "62ec61170ae152a3c2310f02",
+	})
+
 	time.Sleep(99999 * time.Hour)
 }
 
@@ -215,55 +219,11 @@ func mainT() {
 	if TimingTask {
 		log.Println("正常历史部署")
 		go historyRepeat()
-	}else {
-		if !IsFull {//正常增量
+	} else {
+		if !IsFull { //正常增量
 			log.Println("正常增量部署,监听任务")
 			go getRepeatTask()
-		}else {
-			fullMgoRepeat("","")
 		}
 	}
 	time.Sleep(99999 * time.Hour)
 }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-

+ 118 - 5
src/mgo.go

@@ -2,7 +2,13 @@ package main
 
 import (
 	"context"
+	"encoding/json"
+	"fmt"
 	"log"
+	"math/big"
+	"runtime"
+	"strconv"
+	"strings"
 	"time"
 
 	"go.mongodb.org/mongo-driver/bson"
@@ -149,16 +155,14 @@ func (m *MongodbSim) InitPool() {
 	opts.SetMaxPoolSize(uint64(m.Size))
 	m.pool = make(chan bool, m.Size)
 
-	if m.UserName !="" && m.Password !="" {
+	if m.UserName != "" && m.Password != "" {
 		cre := options.Credential{
-			Username:m.UserName,
-			Password:m.Password,
+			Username: m.UserName,
+			Password: m.Password,
 		}
 		opts.SetAuth(cre)
 	}
 
-
-
 	opts.SetMaxConnIdleTime(2 * time.Hour)
 	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
 	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
@@ -236,6 +240,29 @@ func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
 	return r.InsertedID
 }
 
+//按条件更新
+func (m *MongodbSim) Update(c string, q, u interface{}, upsert bool, multi bool) bool {
+	defer catch()
+	m.Open()
+	defer m.Close()
+	ct := options.Update()
+	if upsert {
+		ct.SetUpsert(true)
+	}
+	coll := m.C.Database(m.DbName).Collection(c)
+	var err error
+	if multi {
+		_, err = coll.UpdateMany(m.Ctx, ObjToM(q), ObjToM(u), ct)
+	} else {
+		_, err = coll.UpdateOne(m.Ctx, ObjToM(q), ObjToM(u), ct)
+	}
+	if err != nil {
+		log.Println("删除错误", err.Error())
+		return false
+	}
+	return true
+}
+
 //更新by Id
 func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
 	m.Open()
@@ -313,6 +340,78 @@ func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields i
 	return results, nil
 }
 
+func ObjToOth(query interface{}) *bson.M {
+	return ObjToMQ(query, false)
+}
+func ObjToM(query interface{}) *bson.M {
+	return ObjToMQ(query, true)
+}
+
+//obj(string,M)转M,查询用到
+func ObjToMQ(query interface{}, isQuery bool) *bson.M {
+	data := make(bson.M)
+	defer catch()
+	if s2, ok2 := query.(*map[string]interface{}); ok2 {
+		data = bson.M(*s2)
+	} else if s3, ok3 := query.(*bson.M); ok3 {
+		return s3
+	} else if s3, ok3 := query.(*primitive.M); ok3 {
+		return s3
+	} else if s, ok := query.(string); ok {
+		json.Unmarshal([]byte(strings.Replace(s, "'", "\"", -1)), &data)
+		if ss, oks := data["_id"]; oks && isQuery {
+			switch ss.(type) {
+			case string:
+				data["_id"], _ = primitive.ObjectIDFromHex(ss.(string))
+			case map[string]interface{}:
+				tmp := ss.(map[string]interface{})
+				for k, v := range tmp {
+					tmp[k], _ = primitive.ObjectIDFromHex(v.(string))
+				}
+				data["_id"] = tmp
+			}
+
+		}
+	} else if s1, ok1 := query.(map[string]interface{}); ok1 {
+		data = s1
+	} else if s4, ok4 := query.(bson.M); ok4 {
+		data = s4
+	} else if s4, ok4 := query.(primitive.M); ok4 {
+		data = s4
+	} else {
+		data = nil
+	}
+	return &data
+}
+func intAllDef(num interface{}, defaultNum int) int {
+	if i, ok := num.(int); ok {
+		return int(i)
+	} else if i0, ok0 := num.(int32); ok0 {
+		return int(i0)
+	} else if i1, ok1 := num.(float64); ok1 {
+		return int(i1)
+	} else if i2, ok2 := num.(int64); ok2 {
+		return int(i2)
+	} else if i3, ok3 := num.(float32); ok3 {
+		return int(i3)
+	} else if i4, ok4 := num.(string); ok4 {
+		in, _ := strconv.Atoi(i4)
+		return int(in)
+	} else if i5, ok5 := num.(int16); ok5 {
+		return int(i5)
+	} else if i6, ok6 := num.(int8); ok6 {
+		return int(i6)
+	} else if i7, ok7 := num.(*big.Int); ok7 {
+		in, _ := strconv.Atoi(fmt.Sprint(i7))
+		return int(in)
+	} else if i8, ok8 := num.(*big.Float); ok8 {
+		in, _ := strconv.Atoi(fmt.Sprint(i8))
+		return int(in)
+	} else {
+		return defaultNum
+	}
+}
+
 //创建_id
 func NewObjectId() primitive.ObjectID {
 	return primitive.NewObjectID()
@@ -326,3 +425,17 @@ func StringTOBsonId(id string) primitive.ObjectID {
 func BsonTOStringId(id interface{}) string {
 	return id.(primitive.ObjectID).Hex()
 }
+
+//出错拦截
+func catch() {
+	if r := recover(); r != nil {
+		log.Println(r)
+		for skip := 0; ; skip++ {
+			_, file, line, ok := runtime.Caller(skip)
+			if !ok {
+				break
+			}
+			go log.Printf("%v,%v\n", file, line)
+		}
+	}
+}