Selaa lähdekoodia

新增_排序_配置 false 不排序 true 排序

apple 5 vuotta sitten
vanhempi
commit
d19da3a23c

+ 4 - 3
udpfilterdup/src/config.json

@@ -3,9 +3,9 @@
     "dupdays": 5,
     "mongodb": {
         "addr": "192.168.3.207:27092",
-        "pool": 5,
+        "pool": 10,
         "db": "extract_kf",
-        "extract": "demo_data3.2",
+        "extract": "zk",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"
@@ -17,7 +17,8 @@
     },
     "nextNode": [],
     "isMerger": false,
-    "threads": 4,
+    "threads": 1,
+    "isSort":true,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",
     "specialtitle_2": "项目([0-9a-zA-Z一二三四五六七八九十零123456789])",

+ 26 - 7
udpfilterdup/src/datamap.go

@@ -78,7 +78,10 @@ func NewDatamap(days int, lastid string) *datamap {
 		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 {
 			continuSum++
 		} else {
-			pt := tmp["publishtime"]
+			pt := tmp["comeintime"]
+			if Is_Sort {
+				pt = tmp["publishtime"]
+			}
 			pt_time := qutil.Int64All(pt)
 			if pt_time <= 0 {
 				continue
@@ -124,7 +127,10 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 		true)).Sort("-_id").Iter()
 	m, n := 0, 0
 	for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); {
-		pt_s := tmp_start["publishtime"]
+		pt_s := tmp_start["comeintime"]
+		if Is_Sort {
+			pt_s = tmp_start["publishtime"]
+		}
 		pt_time := qutil.Int64All(pt_s)
 		if pt_time <= 0 {
 			continue
@@ -155,7 +161,10 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 		true)).Sort("_id").Iter()
 
 	for tmp_last := make(map[string]interface{}); it_last.Next(&tmp_last); {
-		pt_l := tmp_last["publishtime"]
+		pt_l := tmp_last["comeintime"]
+		if Is_Sort {
+			pt_l = tmp_last["publishtime"]
+		}
 		pt_time := qutil.Int64All(pt_l)
 		if pt_time <= 0 {
 			continue
@@ -342,8 +351,10 @@ L:
 
 	//往预存数据 d 添加
 	if !b {
-		//ct := info.publishtime
 		ct := info.comeintime
+		if Is_Sort {
+			ct = info.publishtime
+		}
 		dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
 		k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
 		d.lock.Lock()
@@ -497,8 +508,10 @@ L:
 	}
 	//往预存数据 d 添加
 	if !b {
-		//ct := info.publishtime
 		ct := info.comeintime
+		if Is_Sort {
+			ct = info.publishtime
+		}
 		dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
 		k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
 		data := h.data[k]
@@ -519,7 +532,10 @@ L:
 
 //替换原始数据池
 func (d *datamap) replaceSourceData(replaceData *Info, replaceId string) {
-	ct := replaceData.publishtime
+	ct := replaceData.comeintime
+	if Is_Sort {
+		ct = replaceData.publishtime
+	}
 	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
 	k := fmt.Sprintf("%s_%s_%s", dkey, replaceData.subtype, replaceData.area)
 	d.lock.Lock()
@@ -544,7 +560,10 @@ func (d *datamap) replaceSourceData(replaceData *Info, replaceId string) {
 }
 
 func (h *historymap) replaceSourceData(replaceData *Info, replaceId string) {
-	ct := replaceData.publishtime
+	ct := replaceData.comeintime
+	if Is_Sort {
+		ct = replaceData.publishtime
+	}
 	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
 	k := fmt.Sprintf("%s_%s_%s", dkey, replaceData.subtype, replaceData.area)
 	h.lock.Lock()

+ 0 - 205
udpfilterdup/src/datamap.go.bak

@@ -1,205 +0,0 @@
-package main
-
-import (
-	"fmt"
-	"log"
-	"math"
-	qutil "qfw/util"
-	"strings"
-	"sync"
-	"time"
-)
-
-type Info struct {
-	id                 string
-	title              string
-	area               string
-	city               string
-	subtype            string
-	buyer              string
-	agency             string //代理机构
-	winner             string //中标单位
-	projectname        string
-	projectcode        string
-	publishtime        int64
-	ContainSpecialWord bool
-}
-
-var datelimit = float64(432000)
-
-type datamap struct {
-	lock   sync.Mutex //锁
-	days   int        //保留几天数据
-	data   map[string][]*Info
-	keymap []string
-}
-
-func NewDatamap(days int) *datamap {
-	datelimit = qutil.Float64All(days * 86400)
-	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}}
-	dm.keymap = dm.GetLatelyFiveDay()
-	//初始化加载数据
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	it := sess.DB(mgo.DbName).C(extract).Find(nil).Sort("-_id").Iter()
-	now1 := time.Now().Unix()
-	n, continuSum := 0, 0
-	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
-		//
-		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.ObjToString(tmp["subtype"]) == "变更" {
-			continuSum++
-		} else {
-			cm := tmp["comeintime"]
-			comeintime := qutil.Int64All(cm)
-			if qutil.Float64All(now1-comeintime) < datelimit {
-				info := NewInfo(tmp)
-				k := fmt.Sprintf("%s_%s_%s", qutil.FormatDateWithObj(&cm, qutil.Date_yyyyMMdd), info.subtype, info.area)
-				data := dm.data[k]
-				if data == nil {
-					data = []*Info{}
-					//log.Println(k)
-				}
-				data = append(data, info)
-				dm.data[k] = data
-			} else {
-				break
-			}
-		}
-		if n%5000 == 0 {
-			log.Println("current n:", n, continuSum)
-		}
-		tmp = make(map[string]interface{})
-	}
-	log.Println("load data:", n)
-	//启动定时任务
-	now := time.Now()
-	t2 := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.Local)
-	go time.AfterFunc(time.Duration(int64(t2.Unix()-now.Unix()))*time.Second, func() {
-		//go time.AfterFunc(time.Duration(10)*time.Second, func() {
-		dm.update()
-	})
-	return dm
-}
-func NewInfo(tmp map[string]interface{}) *Info {
-	subtype := qutil.ObjToString(tmp["subtype"])
-	area := qutil.ObjToString(tmp["area"])
-	if area == "A" {
-		area = "全国"
-	}
-	info := &Info{}
-	info.id = qutil.BsonIdToSId(tmp["_id"])
-	info.title = qutil.ObjToString(tmp["title"])
-	info.area = area
-	info.subtype = subtype
-	info.buyer = qutil.ObjToString(tmp["buyer"])
-	info.projectname = qutil.ObjToString(tmp["projectname"])
-	info.ContainSpecialWord = FilterRegexp.MatchString(info.projectname) || FilterRegexp.MatchString(info.title)
-	info.projectcode = qutil.ObjToString(tmp["projectcode"])
-	info.city = qutil.ObjToString(tmp["city"])
-	info.agency = qutil.ObjToString(tmp["agency"])
-	//info.winner = qutil.ObjToString(tmp["winner"])
-	info.publishtime = qutil.Int64All(tmp["publishtime"])
-	return info
-}
-
-func (d *datamap) check(info *Info) (b bool, id string) {
-	d.lock.Lock()
-	defer d.lock.Unlock()
-	keys := []string{}
-	for _, k := range d.keymap {
-		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
-		if info.area != "全国" { //这个后续可以不要
-			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
-		}
-	}
-L:
-	for _, k := range keys {
-		data := d.data[k]
-		if len(data) > 1 { //对比
-			for _, v := range data {
-				if math.Abs(qutil.Float64All(v.publishtime-info.publishtime)) > datelimit {
-					continue
-				}
-				if v.agency != "" && info.agency != "" && v.agency != info.agency {
-					continue
-				}
-				n := 0
-				if v.buyer != "" && v.buyer == info.buyer {
-					n++
-				}
-				if v.projectname != "" && v.projectname == info.projectname {
-					n++
-				}
-				if !info.ContainSpecialWord && n > 1 {
-					b = true
-					id = v.id
-					break L
-				} else if v.projectcode != "" && v.projectcode == info.projectcode {
-					n++
-				}
-				if !info.ContainSpecialWord && n > 1 || n > 2 {
-					b = true
-					id = v.id
-					break L
-				}
-				//标题长度大于10且相等即为重复
-				//				if len([]rune(info.title)) > 10 && v.title == info.title {
-				//					b = true
-				//					id = v.id
-				//					break L
-				//				}
-				//标题长度大于10且包含关系+buyer/projectname/projectcode/city(全国/A的只判断包含关系即可)相等即为重复
-				if len([]rune(info.title)) > 10 && len([]rune(v.title)) > 10 && (strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
-					if info.area == "全国" || n > 0 || info.city == v.city {
-						b = true
-						id = v.id
-						break L
-					}
-				}
-			}
-		}
-	}
-	if !b {
-		k := fmt.Sprintf("%s_%s_%s", time.Now().Format(qutil.Date_yyyyMMdd), info.subtype, info.area)
-		data := d.data[k]
-		if data == nil {
-			data = []*Info{info}
-		} else {
-			data = append(data, info)
-		}
-		d.data[k] = data
-	}
-	return
-}
-
-func (d *datamap) update() {
-	//每天0点清除历史数据
-	d.lock.Lock()
-	now, now1 := time.Now(), time.Now()
-	t2 := time.Date(now1.Year(), now1.Month(), now1.Day()+1, 0, 0, 0, 0, time.Local)
-	date := now.AddDate(0, 0, -d.days).Format(qutil.Date_yyyyMMdd)
-	all, all1 := 0, 0
-	for k, v := range d.data {
-		all += len(v)
-		if strings.HasPrefix(k, date) {
-			delete(d.data, k)
-		}
-	}
-	for _, v := range d.data {
-		all1 += len(v)
-	}
-	log.Println("更新前后数据:", all, all1)
-	d.keymap = d.GetLatelyFiveDay()
-	d.lock.Unlock()
-	time.AfterFunc(time.Duration(int64(t2.Unix()-now1.Unix()))*time.Second, d.update)
-}
-
-func (d *datamap) GetLatelyFiveDay() []string {
-	array := make([]string, d.days)
-	now := time.Now()
-	for i := 0; i < d.days; i++ {
-		array[i] = now.Format(qutil.Date_yyyyMMdd)
-		now = now.AddDate(0, 0, -1)
-	}
-	return array
-}

+ 28 - 16
udpfilterdup/src/main.go

@@ -29,15 +29,14 @@ var (
 	DM        *datamap                 //
 	HM        *historymap              //判重数据
 	lastid    = ""
-	/*
-		5da3f31aa5cb26b9b798d3aa
-	*/
+
 	//正则筛选相关
 	FilterRegTitle   = regexp.MustCompile("^_$")
 	FilterRegTitle_1 = regexp.MustCompile("^_$")
 	FilterRegTitle_2 = regexp.MustCompile("^_$")
 
 	isMerger         bool                              //是否合并
+	Is_Sort          bool                              //是否排序
 	threadNum        int                               //线程数量
 	SiteMap          map[string]map[string]interface{} //站点map
 	idtype, sid, eid string                            //测试人员判重使用
@@ -60,7 +59,6 @@ func init() {
 	}
 	mgo.InitPool()
 	extract = mconf["extract"].(string)
-
 	dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
 	//加载数据
 	DM = NewDatamap(dupdays, lastid)
@@ -68,6 +66,7 @@ func init() {
 	FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
 	FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
 	isMerger = Sysconfig["isMerger"].(bool)
+	Is_Sort = Sysconfig["isSort"].(bool)
 	threadNum = util.IntAllDef(Sysconfig["threads"], 1)
 
 	//站点配置
@@ -107,9 +106,9 @@ func mainT() {
 		ObjectId("5df5071ce9d1f601e495fa54")
 		ObjectId("5e09c05f0cf41612e0626abc")
 	*/
-	sid = "5df5071ce9d1f601e495fa54"
-	eid = "5e09c05f0cf41612e0626abc"
-
+	log.Println("测试开始")
+	sid = "5da3f31aa5cb26b9b798d3aa"
+	eid = "5da418c4a5cb26b9b7e3e9a6"
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {
 		log.Println("sid,eid参数不能为空")
@@ -118,7 +117,7 @@ func mainT() {
 	mapinfo["gtid"] = sid
 	mapinfo["lteid"] = eid
 	mapinfo["stop"] = "true"
-	task([]byte{}, mapinfo)
+	historyTask([]byte{}, mapinfo)
 	time.Sleep(10 * time.Second)
 }
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
@@ -183,12 +182,16 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	log.Println(mgo.DbName, extract, q)
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
-	//it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+
+	//是否排序
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
+	if Is_Sort {
+		it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	}
+	//it = sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
 	updateExtract := [][]map[string]interface{}{}
 	log.Println("线程数:", threadNum)
 	pool := make(chan bool, threadNum)
-
 	wg := &sync.WaitGroup{}
 	//mapLock := &sync.Mutex{}
 	n, repeateN := 0, 0
@@ -397,11 +400,15 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	minTime, maxTime := int64(0), int64(0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); {
 		//取出最大最小时间
-		if minTime == 0 || maxTime == 0 && util.Int64All(tmp["publishtime"]) != 0 {
-			minTime = util.Int64All(tmp["publishtime"])
-			maxTime = util.Int64All(tmp["publishtime"])
+		info_time:=tmp["comeintime"]
+		if Is_Sort {
+			info_time = tmp["publishtime"]
+		}
+		if minTime == 0 || maxTime == 0 && util.Int64All(info_time) != 0 {
+			minTime = util.Int64All(info_time)
+			maxTime = util.Int64All(info_time)
 		} else {
-			t := util.Int64All(tmp["publishtime"])
+			t := util.Int64All(info_time)
 			if t < minTime && t != 0 {
 				minTime = t
 			}
@@ -412,7 +419,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	}
 	//时间不正确时
 	if minTime == 0 && maxTime == 0 {
-		log.Println("段数据区间 publishtime不符合")
+		log.Println("段数据区间 不符合")
 		return
 	}
 	fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
@@ -442,7 +449,12 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 		}
 	}
 	log.Println(mgo.DbName, extract, q_history)
-	it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
+
+	//是否排序
+	it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Iter()
+	if Is_Sort {
+		it_history = sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
+	}
 	updateExtract := [][]map[string]interface{}{}
 	log.Println("线程数:", threadNum)
 	pool := make(chan bool, threadNum)

+ 1 - 1
udpfilterdup/src/mgo.go

@@ -144,7 +144,7 @@ func (m *MongodbSim) InitPool() {
 	opts := options.Client()
 	opts.SetConnectTimeout(3 * time.Second)
 	opts.ApplyURI("mongodb://" + m.MongodbAddr)
-	opts.SetMaxPoolSize(uint64(m.Size))
+	opts.SetMaxPoolSize(uint16(m.Size))
 	m.pool = make(chan bool, m.Size)
 	opts.SetMaxConnIdleTime(2 * time.Hour)
 	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)