Преглед на файлове

全量处理方式~分两种

zhengkun преди 3 години
родител
ревизия
d839e7c41b
променени са 9 файла, в които са добавени 215 реда и са изтрити 48 реда
  1. 1 1
      src/dataMethod.go
  2. 12 4
      src/dataMethodHeavy.go
  3. 12 4
      src/datamap.go
  4. 110 0
      src/fullDataRepeat.go
  5. 1 1
      src/fullMgoRepeat.go
  6. 12 11
      src/increaseRepeat.go
  7. 11 15
      src/main.go
  8. 8 1
      src/udptaskmap.go
  9. 48 11
      src/updateMethod.go

+ 1 - 1
src/dataMethod.go

@@ -294,7 +294,7 @@ func leadingElementSame(v *Info, info *Info) bool {
 	if info.title != "" && v.title == info.title {
 		isok++
 	}
-	if v.agency == info.agency &&info.agency != "" {
+	if v.agency == info.agency {
 		isok++
 	}
 	if v.winner == info.winner&&info.winner != "" {

+ 12 - 4
src/dataMethodHeavy.go

@@ -191,7 +191,7 @@ func tenderRepeat_A(v *Info, info *Info, reason string) (bool, string) {
 	}
 
 	if  (p1 && p2 && p3) || (p1 && p2 && p4) || (p1 && p2 && p9) ||
-		(p1 && p2 && p10) || (p1 && p2 && p11) || (p1 && p3 && p9) || (p1 && p3 && p10) ||
+		(p1 && p2 && p10) || (p1 && p2 && p11) || (p1 && p3 && p9) || (p1 && p3 && p10) || (p1 && p3 && p4) ||
 		(p1 && p4 && p9) || (p1 && p4 && p10) || (p2 && p3 && p4) ||
 		(p2 && p3 && p9) || (p2 && p3 && p10) || (p2 && p3 && p11) ||
 		(p2 && p4 && p9) || (p2 && p4 && p10) || (p2 && p4 && p11) ||
@@ -413,7 +413,13 @@ func contractRepeat_C(v *Info, info *Info) bool {
 
 
 
-
+//是否相似
+func isTheSimilarName(name1 string,name2 string) bool {
+	if strings.Contains(name1,name2) || strings.Contains(name2,name1) {
+		return true
+	}
+	return false
+}
 
 
 //快速低质量数据判重
@@ -430,7 +436,9 @@ func fastLowQualityHeavy(v *Info, info *Info, reason string) (bool, string) {
 
 	//首先判定是否为低质量数据    info目标数据
 	if info.title!=""&&(info.agency==""||v.agency=="")&&
-		info.title==v.title&&info.projectcode==""&&info.contractnumber==""&&info.buyer=="" {
+		(info.title==v.title)&&
+		(info.projectcode==""||info.projectcode==v.projectcode)&&
+		info.contractnumber==""&&info.buyer=="" {
 		isValue:=0//五要素判断
 		if info.projectname != "" {//项目名称
 			isValue++
@@ -473,7 +481,7 @@ func fastLowQualityHeavy(v *Info, info *Info, reason string) (bool, string) {
 
 //类别细节原因记录
 func judgeLowQualityData(v *Info, info *Info, reason string) (bool, string) {
-	if info.projectname!="" && info.projectname == v.projectname{//项目名称
+	if info.projectname!="" && isTheSimilarName(info.projectname,v.projectname) {
 		reason = reason + "---项目名称"
 		return true,reason
 	}

+ 12 - 4
src/datamap.go

@@ -54,7 +54,7 @@ type datamap struct {
 	keys   map[string]bool
 }
 
-//历史
+//历史~存量
 func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 	datelimit = qutil.Float64All(days * 86400)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{},map[string]bool{}}
@@ -72,6 +72,9 @@ func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 	it := sess.DB(data_mgo.DbName).C(extract_back).Find(query).Sort("-publishtime").Iter()
 	n, continuSum := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if n%10000 == 0 {
+			log.Println("当前 n:", n,"数量:" ,continuSum,tmp["_id"],tmp["publishtime"])
+		}
 		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 ||
 			qutil.IntAll(tmp["dataging"]) == 1 {
 
@@ -319,10 +322,15 @@ L:
 						break L
 					}
 					//相同发布时间-标题无包含关系 - 项目名称不等
-					if isTheSameDay(info.publishtime,v.publishtime) &&
-						!(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
-						continue
+					if isTheSameDay(info.publishtime,v.publishtime) {
+						if !isTheSimilarName(info.title,v.title){
+							continue
+						}
 					}
+					//
+
+
+
 					//不同href
 					if info.href != "" && info.href != v.href {
 						if v.title==info.title{

+ 110 - 0
src/fullDataRepeat.go

@@ -0,0 +1,110 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	qu "qfw/util"
+	"sync"
+	"time"
+)
+var timeLayout = "2006-01-02"
+//var timeLayout = "2006-01-02 15:04:05"
+
+//划分时间段落
+func initModelArr() []map[string]interface{} {
+	modelArr := make([]map[string]interface{},0)
+	start := time.Date(2021, 1, 1, 0, 0, 0, 0, time.Local).Unix()
+	end := time.Date(2021, 1, 5, 0, 0, 0, 0, time.Local).Unix()
+	gte_time := start
+	lt_time := start+86400
+	log.Println("开始构建数据池...一周...")
+	FullDM = TimedTaskDatamap(dupdays, start,1)
+	log.Println("......")
+	log.Println("开启...全量判重...",start,"~",end)
+	for {
+		modelArr = append(modelArr, map[string]interface{}{
+			"publishtime": map[string]interface{}{
+				"$gte": gte_time,
+				"$lt": lt_time,
+			},
+		})
+		gte_time = lt_time
+		lt_time = gte_time+86400
+		if lt_time>end {
+			break
+		}
+	}
+	return modelArr
+}
+
+//全量数据处理
+func fullDataRepeat() {
+	modelArr := initModelArr()
+	for _,query := range modelArr {
+		pt := *qu.ObjToMap(query["publishtime"])
+		time_str := time.Unix(qu.Int64All(pt["$gte"]), 0).Format(timeLayout)
+		dealWithfullData(query,time_str)
+	}
+}
+
+//多线程~处理数据
+func dealWithfullData (query map[string]interface{},time_str string) {
+
+	log.Println("开始处理~",time_str,"~",query)
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	it := sess.DB(data_mgo.DbName).C(extract).Find(&query).Sort("publishtime").Iter()
+	total, isok ,repeatN:= 0,0,0
+	dataAllDict := make(map[string][]map[string]interface{},0)
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if qu.IntAll(tmp["repeat"]) == 1 || qu.IntAll(tmp["repeat"]) == -1 {
+			tmp = make(map[string]interface{})
+			continue
+		}
+		isok++
+		subtype := qu.ObjToString(tmp["subtype"])
+		if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
+			subtype=="竞谈"||subtype=="竞价" {
+			subtype = "招标"
+		}
+		dataArr := dataAllDict[subtype]
+		if dataArr==nil {
+			dataArr = []map[string]interface{}{}
+		}
+		dataArr = append(dataArr,tmp)
+		dataAllDict[subtype] = dataArr
+		tmp = make(map[string]interface{})
+	}
+	pool := make(chan bool, threadNum)
+	wg := &sync.WaitGroup{}
+	for _,dataArr := range dataAllDict {
+		fmt.Print("...")
+		pool <- true
+		wg.Add(1)
+		go func(dataArr []map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			num := 0
+			for _,tmp := range dataArr{
+				info := NewInfo(tmp)
+				b,source,reason := FullDM.check(info)
+				if b {
+					num++
+					AddGroupPool.pool <- map[string]interface{}{
+						"_id":StringTOBsonId(info.id),
+						"repeat_id" : source.id,
+						"reason" : reason,
+						"update_time" : qu.Int64All(time.Now().Unix()),
+					}
+				}
+			}
+			numberlock.Lock()
+			repeatN+=num
+			numberlock.Unlock()
+		}(dataArr)
+	}
+	wg.Wait()
+	log.Println("处理结束~",time_str,"总计需判重~",isok,"~重复量",repeatN)
+}

+ 1 - 1
src/fullRepeat.go → src/fullMgoRepeat.go

@@ -9,7 +9,7 @@ import (
 )
 
 //开始全量判重程序
-func fullRepeat(sid,eid string) {
+func fullMgoRepeat(sid,eid string) {
 	defer qu.Catch()
 	//区间id-是否分段
 	if IsFull && sec_gtid!="" && sec_lteid!=""{

+ 12 - 11
src/increaseRepeat.go

@@ -2,10 +2,10 @@ package main
 
 import (
 	"encoding/json"
+	"fmt"
 	"log"
 	mu "mfw/util"
 	"net"
-	"qfw/common/src/qfw/util"
 	qu "qfw/util"
 	"sync"
 	"time"
@@ -32,12 +32,12 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 		if total%1000 == 0 {
 			log.Println("current index : ", total, isok)
 		}
-		if util.IntAll(tmp["repeat"]) == 1 {
+		if qu.IntAll(tmp["repeat"]) == 1 {
 			repeatN++
 			tmp = make(map[string]interface{})
 			continue
 		}
-		if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
+		if qu.IntAll(tmp["dataging"]) == 1 && !IsFull{
 			tmp = make(map[string]interface{})
 			continue
 		}
@@ -60,7 +60,7 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
 	for _,dataArr := range dataAllDict {
-		log.Println("处理中...","当前重复量~", repeatN)
+		fmt.Print("...")
 		pool <- true
 		wg.Add(1)
 		go func(dataArr []map[string]interface{}) {
@@ -113,7 +113,7 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 									"repeat_reason": reason,
 									"repeat_id":     source.id,
 									"dataging":		 0,
-									"updatetime_repeat" :util.Int64All(time.Now().Unix()),
+									"updatetime_repeat" :qu.Int64All(time.Now().Unix()),
 								},
 							},
 						}
@@ -126,7 +126,8 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 		}(dataArr)
 	}
 	wg.Wait()
-	log.Println("this cur task over.", total, "repeateN:", repeatN)
+	log.Println("")
+	log.Println("当前~判重~结束~", total, "重复~", repeatN)
 	//更新Ocr的标记
 	updateOcrFileData(mapInfo["lteid"].(string))
 	time.Sleep(15 * time.Second)
@@ -135,16 +136,16 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 	for _, to := range nextNode {
 		sid, _ := mapInfo["gtid"].(string)
 		eid, _ := mapInfo["lteid"].(string)
-		key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
+		key := sid + "-" + eid + "-" + qu.ObjToString(to["stype"])
 		by, _ := json.Marshal(map[string]interface{}{
 			"gtid":  sid,
 			"lteid": eid,
-			"stype": util.ObjToString(to["stype"]),
+			"stype": qu.ObjToString(to["stype"]),
 			"key":   key,
 		})
 		addr := &net.UDPAddr{
 			IP:   net.ParseIP(to["addr"].(string)),
-			Port: util.IntAll(to["port"]),
+			Port: qu.IntAll(to["port"]),
 		}
 		node := &udpNode{by, addr, time.Now().Unix(), 0}
 		udptaskmap.Store(key, node)
@@ -163,7 +164,7 @@ func updateOcrFileData(cur_lteid string)  {
 	updateOcrFile:=[][]map[string]interface{}{}
 	for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
 		cur_id := BsonTOStringId(tmp["_id"])
-		lteid:=util.ObjToString(tmp["lteid"])
+		lteid:=qu.ObjToString(tmp["lteid"])
 		if (lteid==cur_lteid) { //需要更新
 			log.Println("找到该lteid数据",cur_lteid,cur_id)
 			isUpdateOcr = true
@@ -174,7 +175,7 @@ func updateOcrFileData(cur_lteid string)  {
 				map[string]interface{}{
 					"$set": map[string]interface{}{
 						"is_repeat_status": 1,
-						"is_repeat_time" : util.Int64All(time.Now().Unix()),
+						"is_repeat_time" : qu.Int64All(time.Now().Unix()),
 					},
 				},
 			})

+ 11 - 15
src/main.go

@@ -30,6 +30,8 @@ var (
 	dupdays      = 7                      	//初始化判重范围
 	DM           *datamap                 	//
 	Update		 *updateInfo
+	AddGroupPool *addGroupInfo
+	FullDM       *datamap                 	//\临时全量数据池
 	//正则筛选相关
 	FilterRegTitle   = regexp.MustCompile("^_$")
 	FilterRegTitle_0 = regexp.MustCompile("^_$")
@@ -51,7 +53,7 @@ var (
 	taskList		[]map[string]interface{}		 //任务池
 	isUpdateSite	bool
 )
-
+//初始化加载
 func init() {
 	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
 	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")	//历史
@@ -98,7 +100,7 @@ func init() {
 	extract = mconf["extract"].(string)
 	extract_back = mconf["extract_back"].(string)
 
-	dupdays = qu.IntAllDef(Sysconfig["dupdays"], 3)
+	dupdays = qu.IntAllDef(Sysconfig["dupdays"], 5)
 	//加载数据
 	DM = NewDatamap(dupdays, lastid)
 	//更新池
@@ -123,7 +125,7 @@ func init() {
 	//站点配置
 	initSite()
 }
-
+//初始化站点信息
 func initSite(){
 	site := mconf["site"].(map[string]interface{})
 	SiteMap = make(map[string]map[string]interface{}, 0)
@@ -143,7 +145,6 @@ func initSite(){
 	log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
 
 }
-
 //udp接收
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
@@ -176,7 +177,6 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 	}
 }
-
 //监听-获取-分发判重任务
 func getRepeatTask()  {
 	for  {
@@ -197,14 +197,12 @@ func getRepeatTask()  {
 
 
 func main() {
-	//log.Println("模拟增量判重...")
+
 	IsFull = true
-	sid := "124ed2324f7bde5444f1e973"
-	eid := "924ed35e4f7bde5444f1ec1d"
-	increaseRepeat(map[string]interface{}{
-		"gtid":sid,
-		"lteid":eid,
-	})
+	AddGroupPool = newAddGroupPool()
+	go AddGroupPool.addGroupData()
+	fullDataRepeat() //全量判重
+	time.Sleep(99999 * time.Hour)
 }
 
 //主函数
@@ -222,9 +220,7 @@ func mainT() {
 			log.Println("正常增量部署,监听任务")
 			go getRepeatTask()
 		}else {
-			sid := "1fffffffffffffffffffffff"
-			eid := "9fffffffffffffffffffffff"
-			fullRepeat(sid,eid)
+			fullMgoRepeat("","")
 		}
 	}
 	time.Sleep(99999 * time.Hour)

+ 8 - 1
src/udptaskmap.go

@@ -6,6 +6,7 @@ import (
 	"log"
 	"net"
 	"net/http"
+	"strings"
 	"sync"
 	"time"
 )
@@ -35,7 +36,13 @@ func checkMapJob() {
 			node, _ := v.(*udpNode)
 			if now-node.timestamp > 120 {
 				udptaskmap.Delete(k)
-				res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "增量判重程序~严重警告", fmt.Sprintf("下节点索引~未响应~相关人员检查~%s",k.(string))))
+				info_str := ""
+				if strings.Contains(k.(string),"project") {
+					info_str = fmt.Sprintf("下节点~项目合并~未响应~相关人员检查~%s",k.(string))
+				}else {
+					info_str = fmt.Sprintf("下节点~索引~未响应~相关人员检查~%s",k.(string))
+				}
+				res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "增量判重程序~严重警告",info_str))
 				if err == nil {
 					defer res.Body.Close()
 					read, err := ioutil.ReadAll(res.Body)

+ 48 - 11
src/updateMethod.go

@@ -5,26 +5,27 @@ import (
 	"time"
 )
 
-type updateInfo struct {
-
-	//更新或新增通道
+var sp = make(chan bool, 5)
+type updateInfo struct {//更新或新增通道
 	updatePool chan []map[string]interface{}
-	//数量
 	saveSize   	int
-
 }
-
-
-
-
-var sp = make(chan bool, 5)
-
 func newUpdatePool() *updateInfo {
 	update:=&updateInfo{make(chan []map[string]interface{}, 50000),200}
 	return update
 }
 
+//临时~新增组
+type addGroupInfo struct {
+	pool chan map[string]interface{}
+	saveSize   	int
+}
+func newAddGroupPool() *addGroupInfo {
+	info:=&addGroupInfo{make(chan map[string]interface{}, 50000),200}
+	return info
+}
 
+//监听更新
 func (update *updateInfo) updateData() {
 	log.Println("开始不断监听--待更新数据")
 	tmpArr := make([][]map[string]interface{}, update.saveSize)
@@ -59,4 +60,40 @@ func (update *updateInfo) updateData() {
 			}
 		}
 	}
+}
+
+//监听新增
+func (info *addGroupInfo) addGroupData() {
+	tmpArr := make([]map[string]interface{}, info.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-info.pool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == info.saveSize {
+				sp <- true
+				go func(dataArr []map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					data_mgo.SaveBulk("zktes_full_repeat", dataArr...)
+				}(tmpArr)
+				tmpArr = make([]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp <- true
+				go func(dataArr []map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					data_mgo.SaveBulk("zktes_full_repeat", dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
 }