zhangjinkun 5 vuotta sitten
vanhempi
commit
b95c56981e
3 muutettua tiedostoa jossa 58 lisäystä ja 59 poistoa
  1. 10 26
      udpfilterdup/src/config.json
  2. 1 11
      udpfilterdup/src/datamap.go
  3. 47 22
      udpfilterdup/src/main.go

+ 10 - 26
udpfilterdup/src/config.json

@@ -3,39 +3,23 @@
     "dupdays": 5,
     "mongodb": {
         "addr": "192.168.3.207:27092",
-        "pool": 15,
-        "db": "zhaolongyue",
-        "extract": "kedaxunfei_zhengfa_gnq",
-        "extract_copy": "a_testbidding",
-        "bidding": "bidding_126"
+        "pool": 5,
+        "db": "data_Xinxihua",
+        "extract": "20200103_fupin_data",
+        "site": {
+            "dbname": "zhaolongyue",
+            "coll": "site"
+        }
     },
     "jkmail": {
-        "to": "renzheng@topnet.net.cn",
+        "to": "zhangjinkun@topnet.net.cn",
         "api": "http://10.171.112.160:19281/_send/_mail"
     },
-    "nextNode": [
-        {
-            "addr": "127.0.0.11",
-            "port": 1482,
-            "stype": "project",
-            "memo": "合并项目"
-        },
-        {
-            "addr": "127.0.0.1",
-            "port": 1483,
-            "stype": "bidding",
-            "memo": "创建招标数据索引"
-        }
-    ],
-    "isMerger":false,
+    "nextNode": [],
+    "isMerger": false,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包)",
     "specialtitle_2": "项目([0-9a-zA-Z一二三四五六七八九十零123456789])",
-
-
     "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
-
-
-
 }
 

+ 1 - 11
udpfilterdup/src/datamap.go

@@ -41,8 +41,6 @@ type Info struct {
 
 var datelimit = float64(432000) //五天
 var sitelock sync.Mutex         //锁
-var reason = ""
-
 
 //判重数据
 type datamap struct {
@@ -228,8 +226,7 @@ func NewInfo(tmp map[string]interface{}) *Info {
 }
 
 //判重方法
-func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) {
-	reason = ""
+func (d *datamap) check(info *Info) (b bool, source *Info, reason string) {
 	keys := []string{}
 	//不同时间段
 	d.lock.Lock()
@@ -280,7 +277,6 @@ L:
 							reason = "href相同"
 							b = true
 							source = v
-							reasons = reason
 							break L
 						}
 						if info.href != "" && info.href != v.href {
@@ -304,7 +300,6 @@ L:
 							reason = "标题关键词且包含关系"
 							b = true
 							source = v
-							reasons = reason
 							break L
 						}
 					}
@@ -315,7 +310,6 @@ L:
 						if quickHeavyMethodTwo(v, info, reason) {
 							b = true
 							source = v
-							reasons = reason
 							break
 						}
 					} else {
@@ -325,7 +319,6 @@ L:
 							if quickHeavyMethodTwo(v, info, reason) {
 								b = true
 								source = v
-								reasons = reason
 								break
 							}
 						} else {
@@ -333,7 +326,6 @@ L:
 							if quickHeavyMethodOne(v, info, reason) {
 								b = true
 								source = v
-								reasons = reason
 								break
 							}
 						}
@@ -368,8 +360,6 @@ L:
 }
 
 func (h *historymap) checkHistory(info *Info) (b bool, source *Info, reasons string) {
-	reason = ""
-
 	h.lock.Lock()
 	defer h.lock.Unlock()
 	keys := []string{}

+ 47 - 22
udpfilterdup/src/main.go

@@ -11,6 +11,7 @@ import (
 	"log"
 	mu "mfw/util"
 	"net"
+	"os"
 	"qfw/util"
 	"qfw/util/mongodb"
 	"regexp"
@@ -23,15 +24,13 @@ var (
 	mconf     map[string]interface{} //mongodb配置信息
 	mgo       *mongodb.MongodbSim    //mongodb操作对象
 	//siteMgo      *mongodb.MongodbSim
-	extract      string
-	extract_copy string
-	bidding      string
-	udpclient    mu.UdpClient             //udp对象
-	nextNode     []map[string]interface{} //下节点数组
-	dupdays      = 5                      //初始化判重范围
-	DM           *datamap                 //
-	HM           *historymap              //判重数据
-	lastid       = ""
+	extract   string
+	udpclient mu.UdpClient             //udp对象
+	nextNode  []map[string]interface{} //下节点数组
+	dupdays   = 5                      //初始化判重范围
+	DM        *datamap                 //
+	HM        *historymap              //判重数据
+	lastid    = ""
 	/*
 		5da3f2c5a5cb26b9b79847fc
 	*/
@@ -42,10 +41,15 @@ var (
 
 	isMerger bool                              //是否合并
 	SiteMap  map[string]map[string]interface{} //站点map
+
+	idtype, sid, eid string //测试人员判重使用
 )
 
 func init() {
 	flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
+	flag.StringVar(&sid, "sid", "", "开始id")
+	flag.StringVar(&eid, "eid", "", "结束id")
+	flag.StringVar(&idtype, "idtype", "", "id类型,默认ObjectId:0,String:1")
 	flag.Parse()
 	//172.17.145.163:27080
 	util.ReadConfig(&Sysconfig)
@@ -57,7 +61,6 @@ func init() {
 		Size:        util.IntAllDef(mconf["pool"], 10),
 	}
 	extract = mconf["extract"].(string)
-	extract_copy = mconf["extract_copy"].(string)
 	mgo.InitPool()
 
 	//测试可以临时注释
@@ -69,13 +72,13 @@ func init() {
 	FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
 	isMerger = Sysconfig["isMerger"].(bool)
 
-	//配置站点Map
+	//站点配置
+	site := mconf["site"].(map[string]interface{})
 	SiteMap = make(map[string]map[string]interface{}, 0)
 	start := int(time.Now().Unix())
-	//站点配置
 	sess_site := mgo.GetMgoConn()
 	defer sess_site.Close()
-	res_site := sess_site.DB("zhaolongyue").C("site").Find(nil).Sort("_id").Iter()
+	res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(nil).Sort("_id").Iter()
 	for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
 		data_map := map[string]interface{}{
 			"area":     util.ObjToString(site_dict["area"]),
@@ -90,10 +93,8 @@ func init() {
 
 }
 
-func main() {
-
+func mainT() {
 	go checkMapJob()
-
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
@@ -102,6 +103,21 @@ func main() {
 	time.Sleep(99999 * time.Hour)
 }
 
+//测试组人员使用
+func main() {
+	//568551000000000000000000,5e0f65000000000000000000
+	mapinfo := map[string]interface{}{}
+	if sid == "" || eid == "" {
+		log.Println("sid,eid参数不能为空")
+		os.Exit(0)
+	}
+	mapinfo["gtid"] = sid
+	mapinfo["lteid"] = eid
+	mapinfo["stop"] = "true"
+	task([]byte{}, mapinfo)
+	time.Sleep(5 * time.Second)
+}
+
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 
 	fmt.Println("接受的段数据")
@@ -150,13 +166,22 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	//区间id
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
-	q := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
-			"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
-		},
+	var q map[string]interface{}
+	if idtype == "1" {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mapInfo["gtid"].(string),
+				"$lte": mapInfo["lteid"].(string),
+			},
+		}
+	} else {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
 	}
-
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 	updateExtract := [][]map[string]interface{}{}
 	pool := make(chan bool, 16)