Selaa lähdekoodia

新闻数据保存

mxs 1 kuukausi sitten
vanhempi
commit
6a83648ac8

+ 8 - 0
src/saveServer/config.json

@@ -10,6 +10,14 @@
 		"coll": "spider_repeatdata",
 		"size": 5
     },
+	"news": {
+		"addr": "172.20.45.130:27017",
+		"db": "qfw",
+		"size": 5,
+		"username": "",
+		"password": ""
+	},
+	"dbrepeat": "http://127.0.0.1:8290",
     "defauldetail": "由于网络故障,信息无法正常显示,请查看来源网站。",
 	"redisserveraddr": "list=172.31.31.204:1579,sha=172.31.31.204:1579",
 	"bloomredisaddrs": "href=172.31.31.204:1579,detail=172.31.31.204:1579",

+ 6 - 0
src/saveServer/httpserver.go

@@ -60,6 +60,12 @@ func HttpServer(port string) {
 				saveFileMust()
 			}
 			savefilelock.Unlock()
+			//bidding_news
+			saveNewsLock.Lock()
+			if len(SaveNewsCache) > 0 {
+				saveNewsMust()
+			}
+			saveNewsLock.Unlock()
 			//update
 			updatelock.Lock()
 			if len(UpdateCache) > 0 {

+ 12 - 45
src/saveServer/main.go

@@ -9,6 +9,8 @@ import (
 	"os"
 	qutil "qfw/util"
 	gm "qfw/util/mail"
+	"saveServer/tools"
+
 	// "qfw/util/mongodb"
 	// "saveServer/tools"
 	sp "spiderutil"
@@ -73,6 +75,8 @@ func inits() {
 	//bloom redis
 	bloomRedisAddrs := qutil.ObjToString(config["bloomredisaddrs"])
 	sp.InitBloomRedisClient(bloomRedisAddrs)
+	//db repeat
+	tools.DbRepeatAddr = qutil.ObjToString(config["dbrepeat"])
 	//初始化邮件信息
 	openmail = config["openmail"].(bool)
 	mail := config["mail"].(map[string]interface{})
@@ -91,51 +95,7 @@ func inits() {
 	go SaveRepeat() //记录重复数据
 	go TimerSave()  //定时保存数据
 	//go TimingTask()    //定时发邮件
-	go TimingSaveNum() //定时检测半小时内保存数据量
-	//更新bidding没有生成sha的数据,并存入redis
-	//后续待更新
-	// func() {
-	// 	sess := tools.Mgo.GetMgoConn()
-	// 	defer tools.Mgo.DestoryMongoConn(sess)
-	// 	q := bson.M{"s_sha": bson.M{"$exists": false}}
-	// 	i := 0
-	// 	log.Println(mongodb.DB, q)
-	// 	coll := "bidding"
-	// 	pool := make(chan bool, 5)
-	// 	log.Println(tools.Mgo.DbName, coll)
-	// 	query := sess.DB(tools.Mgo.DbName).C(coll).Find(&q).Select(map[string]interface{}{
-	// 		"detail":     1,
-	// 		"href":       1,
-	// 		"detail_bak": 1,
-	// 	}).Iter()
-	// 	for tmp := make(map[string]interface{}); query.Next(&tmp); i++ {
-	// 		id := ""
-	// 		if tmp["detail_bak"] != nil {
-	// 			id = tools.Sha(qutil.ObjToString(tmp["detail_bak"]))
-	// 		} else {
-	// 			id = tools.Sha(qutil.ObjToString(tmp["detail"]))
-	// 		}
-	// 		href := qutil.ObjToString(tmp["href"])
-	// 		pool <- true
-	// 		go func(coll, id, href string, tid interface{}) {
-	// 			defer func() {
-	// 				<-pool
-	// 			}()
-	// 			//保存-存储
-	// 			set := map[string]interface{}{
-	// 				"$set": map[string]interface{}{
-	// 					"s_sha": id,
-	// 				},
-	// 			}
-	// 			tools.Mgo.UpdateById(coll, tools.BsonTOStringId(tid), set)
-	// 			// db := tools.HexToBigIntMod(href) //db
-	// 			// hashHref := tools.HexText(href)  //求hash值
-	// 			// bidcheckrepeat.BidAdd(hashHref, id, tools.BsonTOStringId(tid), db)
-	// 		}(coll, id, href, tmp["_id"])
-	// 		tmp = make(map[string]interface{})
-	// 	}
-	// 	log.Println("new-sha-createNum:", i)
-	// }()
+	//go TimingSaveNum() //定时检测半小时内保存数据量
 }
 
 var IS = &InfoStatus{time.Now().Unix(), 0, map[int]int{}, &sync.Mutex{}}
@@ -187,6 +147,12 @@ func runNew() {
 				saveFileMust()
 			}
 			savefilelock.Unlock()
+			//bidding_news
+			saveNewsLock.Lock()
+			if len(SaveNewsCache) > 0 {
+				saveNewsMust()
+			}
+			saveNewsLock.Unlock()
 			//
 			mlock.Lock()
 			saveCommMust()
@@ -199,6 +165,7 @@ func runNew() {
 		WriteBufferSize: 100,
 	}
 	client, _ = util.NewClient(cf)
+	//client.Call("", client.GetMyclient(), util.EVENT_BYE, 1, "", 30)
 }
 func handler(p *util.Packet) {
 	qutil.Try(func() {

+ 78 - 52
src/saveServer/processdata.go

@@ -11,8 +11,9 @@ import (
 )
 
 const (
-	DETAIL_TEXT = "详情请访问原网页!"
-	DETAIL_FILE = "详情请下载附件!"
+	DETAIL_TEXT     = "详情请访问原网页!"
+	DETAIL_FILE     = "详情请下载附件!"
+	NEWS_INFOFORMAT = 5
 )
 
 var DetailReg = regexp.MustCompile(DETAIL_TEXT)
@@ -124,27 +125,29 @@ func NewSaveBidding(tmp map[string]interface{}) (b bool, res int, mgoid, mgocoll
 
 	detail := qutil.ObjToString(tmp["detail"])
 	uuid := sp.Sha(detail) //拟建数据2022-04-26也改为detail判重
+	//2、重点字段校验
+	//2.1数据类型infoformat校验
+	infoformat := qutil.IntAll(tmp["infoformat"])
+	if infoformat == 0 {
+		errorData(LEVEL_ERROR, true, "infoformat", "Field Value Is Null", href, title, &warn, tmp)
+		infoformat = 1
+		tmp["infoformat"] = infoformat //设置默认值
+	}
 	//数据判重
 	if tmp["repeat"] == nil { //判断repeat,为了异常数据重推时不进行redis判重
-		b, res = dataRepeat(tmp, href, hashHref, uuid, downloadFileOk)
+		b, res = dataRepeat(infoformat, tmp, href, hashHref, uuid, downloadFileOk)
 		if b && res == 4 {
 			return
 		}
 	}
-	//2、重点字段校验
-	//2.1保存表T校验
+
+	//2.2保存表T校验
 	T := qutil.ObjToString(tmp["T"])
-	if T != "bidding" && T != SaveOtherColl && T != SaveYqColl {
+	if T != SaveBiddingColl && T != SaveOtherColl && T != SaveYqColl && T != SaveNewsColl {
 		errorData(LEVEL_ERROR, true, "T", "Save Coll Error", href, title, &warn, tmp)
 		res = 2
 		T = SaveColl //设置默认值
 	}
-	//2.2数据类型infoformat校验
-	infoformat := qutil.IntAll(tmp["infoformat"])
-	if infoformat == 0 {
-		errorData(LEVEL_ERROR, true, "infoformat", "Field Value Is Null", href, title, &warn, tmp)
-		tmp["infoformat"] = 1 //设置默认值
-	}
 	//2.3title和s_title替换
 	if s_title := tmp["s_title"]; s_title != nil { //三级页有获取标题
 		stitle := qutil.ObjToString(s_title)
@@ -344,57 +347,72 @@ func NewSaveBidding(tmp map[string]interface{}) (b bool, res int, mgoid, mgocoll
 	mgoid, mgocoll = saveData(T, result, downloadFileOk, iscompete)
 	b = true
 	//5、推送成功的数据修改列表href redis值(用于去除重复采集,推送保存服务的数据,包含lua、python采集的相同数据)
-	sp.RedisSet("list", "list_"+hashHref, "1", 86400*365*2)
+	if infoformat == NEWS_INFOFORMAT {
+		tools.AddStr("list", href)
+	} else {
+		sp.RedisSet("list", "list_"+hashHref, "1", 86400*365*2)
+	}
 	return
 }
 
 // 数据判重
-func dataRepeat(tmp map[string]interface{}, href, hashHref, uuid string, downloadFileOk bool) (b bool, res int) {
+func dataRepeat(infoformat int, tmp map[string]interface{}, href, hashHref, uuid string, downloadFileOk bool) (b bool, res int) {
 	detail := qutil.ObjToString(tmp["detail"])
 	hanArr := reg_han.FindAllString(detail, -1) //获取汉字集合
 	hanArrLen := len(hanArr)                    //汉字个数
 	filterDetail := sp.FilterDetail(detail)     //只保留文本内容
 	//新版数据判重
 	publishtime := qutil.Int64All(tmp["publishtime"])
-	isExist := false                                       //记录判重结果
-	repeatby := ""                                         //判重字段
-	RedisLock.Lock()                                       //redis判重锁
-	defer RedisLock.Unlock()                               //
-	if publishtime < time.Now().AddDate(-1, 0, 0).Unix() { //一年前数据判重或异常发布时间数据
-		isExist, _ = sp.ExistsBloomRedis("href", href)
-		if !isExist {
-			sp.AddBloomRedis("href", href)
-			//链接未判重,进行正文detail判重校验(特殊文本除外)
-			if uuid != ANNEX_DETAIL_SHA && filterDetail != "无" && !DetailExists(filterDetail) {
-				isExist, _ = sp.ExistsBloomRedis("detail", filterDetail)
-			}
-			if isExist { //历史数据被detail全量判重
-				repeatby = "bloom_detail"
-			} else { //未被判重,存入redis
-				sp.AddBloomRedis("detail", filterDetail)
-				sp.RedisSet("sha", uuid, "", 86400*365*2)
+	isExist := false                   //记录判重结果
+	repeatby := ""                     //判重字段
+	if infoformat == NEWS_INFOFORMAT { //新闻数据判重
+		if uuid != ANNEX_DETAIL_SHA && filterDetail != "无" && !DetailExists(filterDetail) {
+			isExist = tools.CheckStr("detail", uuid)
+			if isExist {
+				repeatby = "db_detail"
+			} else {
+				tools.AddStr("detail", uuid)
 			}
-		} else { //历史数据被href全量判重
-			repeatby = "bloom_href"
 		}
-	} else { //一年内数据判重(只进行正文hash判重 问题:detail含有详情请访问原网页等字眼时,不会走正文判重)
-		//增量href判重(为了去除lua、python采集的相同数据)
-		r, err := sp.RedisGet("list", "list_"+hashHref)
-		if err == nil && r != "" { //此条href数据已入库
-			isExist = true
-			repeatby = "list_href"
-		} else {
-			if uuid != ANNEX_DETAIL_SHA && filterDetail != "无" && !DetailExists(filterDetail) {
-				isExist = sp.RedisExist("sha", uuid)
+	} else {
+		RedisLock.Lock()                                       //redis判重锁
+		if publishtime < time.Now().AddDate(-1, 0, 0).Unix() { //一年前数据判重或异常发布时间数据
+			isExist, _ = sp.ExistsBloomRedis("href", href)
+			if !isExist {
+				sp.AddBloomRedis("href", href)
+				//链接未判重,进行正文detail判重校验(特殊文本除外)
+				if uuid != ANNEX_DETAIL_SHA && filterDetail != "无" && !DetailExists(filterDetail) {
+					isExist, _ = sp.ExistsBloomRedis("detail", filterDetail)
+				}
+				if isExist { //历史数据被detail全量判重
+					repeatby = "bloom_detail"
+				} else { //未被判重,存入redis
+					sp.AddBloomRedis("detail", filterDetail)
+					sp.RedisSet("sha", uuid, "", 86400*365*2)
+				}
+			} else { //历史数据被href全量判重
+				repeatby = "bloom_href"
 			}
-			if isExist { //增量detail sha判重
-				repeatby = "sha_detail"
-			} else if qutil.ObjToString(tmp["competehref"]) == "" { //竞品数据sha不存
-				sp.RedisSet("sha", uuid, "", 86400*365*2)
+		} else { //一年内数据判重(只进行正文hash判重 问题:detail含有详情请访问原网页等字眼时,不会走正文判重)
+			//增量href判重(为了去除lua、python采集的相同数据)
+			r, err := sp.RedisGet("list", "list_"+hashHref)
+			if err == nil && r != "" { //此条href数据已入库
+				isExist = true
+				repeatby = "list_href"
+			} else {
+				if uuid != ANNEX_DETAIL_SHA && filterDetail != "无" && !DetailExists(filterDetail) {
+					isExist = sp.RedisExist("sha", uuid)
+				}
+				if isExist { //增量detail sha判重
+					repeatby = "sha_detail"
+				} else if qutil.ObjToString(tmp["competehref"]) == "" { //竞品数据sha不存
+					sp.RedisSet("sha", uuid, "", 86400*365*2)
+				}
+				sp.AddBloomRedis("href", href)
+				sp.AddBloomRedis("detail", filterDetail)
 			}
-			sp.AddBloomRedis("href", href)
-			sp.AddBloomRedis("detail", filterDetail)
 		}
+		RedisLock.Unlock() //
 	}
 	if isExist {
 		//ReplaceFile(site, uuid, tmp)//正文判重,附件替换
@@ -403,8 +421,8 @@ func dataRepeat(tmp map[string]interface{}, href, hashHref, uuid string, downloa
 		tmp["repeatby"] = repeatby
 		tmp["s_sha"] = uuid
 		tmp["hashref"] = hashHref
-		SaveMgoCache <- tmp                                         //记录重复数据
-		if repeatby == "bloom_detail" || repeatby == "sha_detail" { //被正文判重,保留数据,打上标记
+		SaveMgoCache <- tmp                                                                    //记录重复数据
+		if repeatby == "bloom_detail" || repeatby == "sha_detail" || repeatby == "db_detail" { //被正文判重,保留数据,打上标记
 			/*
 				日期:2024-04-26
 				逻辑:含有效附件且正文汉字个数小于100的数据不进行正文判重
@@ -466,8 +484,16 @@ func saveData(T string, result map[string]interface{}, dfOk, iscompete bool) (st
 			saveOtherMust()
 		}
 		saveotherlock.Unlock()
-	} else if dfOk { //附件信息
-		savecoll = "bidding_file"
+	} else if T == SaveNewsColl { //新闻数据
+		savecoll = SaveNewsColl
+		saveNewsLock.Lock()
+		SaveNewsCache = append(SaveNewsCache, result)
+		if len(SaveNewsCache) > 200 {
+			saveNewsMust()
+		}
+		saveNewsLock.Unlock()
+	} else if T == SaveBiddingColl && dfOk { //附件信息
+		savecoll = SaveFileColl
 		savefilelock.Lock()
 		SaveFileCache = append(SaveFileCache, result)
 		if len(SaveFileCache) > 200 {

+ 31 - 2
src/saveServer/savedata.go

@@ -20,8 +20,9 @@ var domainClearReg = regexp.MustCompile(`((http|https)[::]//)+`)
 var htmlModelReg = regexp.MustCompile(`{{[a-zA-z.()\d,:]{5,}}}`)
 var siteReg = regexp.MustCompile(`(政府采购|公共资源)`)
 var (
-	tmpsavenum = 0 //数据量监控标识
-	StopFlag   int //暂停标识
+	tmpsavenum      = 0 //数据量监控标识
+	StopFlag        int //暂停标识
+	SaveBiddingColl     = "bidding"
 	//other
 	SaveOtherColl                 = "bidding_other" //临时存储不用的数据
 	saveothernum                  = 0
@@ -52,6 +53,12 @@ var (
 	updatelock     *sync.Mutex = new(sync.Mutex)              //更新锁
 	UpdateCache                = [][]map[string]interface{}{} //批量更新
 	UpdateLastTime             = time.Now().Unix()
+	//新闻
+	SaveNewsColl                 = "bidding_news"
+	saveNewsNum                  = 0
+	saveNewsLock     *sync.Mutex = new(sync.Mutex)            //保存新闻信息锁
+	SaveNewsCache                = []map[string]interface{}{} //批量保存新闻信息
+	SaveNewsLastTime             = time.Now().Unix()
 )
 
 // 批量保存舆情数据
@@ -95,6 +102,16 @@ func saveFileMust() {
 	SaveFileLastTime = time.Now().Unix()
 }
 
+// 批量新闻信息
+func saveNewsMust() {
+	saveNewsNum += len(SaveNewsCache)
+	MgoNews.SaveBulk(SaveNewsColl, SaveNewsCache...)
+	go log.Println("saveNewsMust:", saveNewsNum)
+	time.Sleep(time.Second * 2)
+	SaveNewsCache = []map[string]interface{}{}
+	SaveNewsLastTime = time.Now().Unix()
+}
+
 // 批量更新
 func updateMust() {
 	//qutil.Debug("---------批量更新--------")
@@ -168,6 +185,18 @@ func TimerSave() {
 			time.Sleep(30 * time.Second)
 		}
 	}()
+	//批量保存新闻信息
+	go func() {
+		for {
+			now := time.Now().Unix()
+			saveNewsLock.Lock()
+			if now-SaveNewsLastTime > 60 && len(SaveNewsCache) > 0 {
+				saveNewsMust() //
+			}
+			saveNewsLock.Unlock()
+			time.Sleep(40 * time.Second)
+		}
+	}()
 
 	//批量更新bidding
 	go func() {

+ 15 - 5
src/saveServer/util.go

@@ -28,6 +28,7 @@ var (
 
 	//Mgo          *tools.MongodbSim
 	MgoS         *tools.MongodbSim
+	MgoNews      *tools.MongodbSim
 	CollS        string
 	SaveMgoCache = make(chan map[string]interface{}, 1000) //更新集合
 	SP           = make(chan bool, 5)
@@ -47,7 +48,6 @@ func InitMgo() {
 	tools.Mgo.InitPool()
 	//spider
 	spider := config["spider"].(map[string]interface{})
-	qutil.Debug(spider)
 	CollS = qutil.ObjToString(spider["coll"])
 	MgoS = &tools.MongodbSim{
 		MongodbAddr: qutil.ObjToString(spider["addr"]),
@@ -55,9 +55,19 @@ func InitMgo() {
 		Size:        qutil.IntAll(spider["size"]),
 	}
 	MgoS.InitPool()
+	//news
+	news := config["news"].(map[string]interface{})
+	MgoNews = &tools.MongodbSim{
+		MongodbAddr: qutil.ObjToString(news["addr"]),
+		DbName:      qutil.ObjToString(news["db"]),
+		Size:        qutil.IntAll(news["size"]),
+		UserName:    qutil.ObjToString(news["username"]),
+		Password:    qutil.ObjToString(news["password"]),
+	}
+	MgoNews.InitPool()
 }
 
-//保存重复数据
+// 保存重复数据
 func SaveRepeat() {
 	defer qutil.Catch()
 	qutil.Debug("Save Repeat Data...")
@@ -95,13 +105,13 @@ func SaveRepeat() {
 	}
 }
 
-//同“详情请访问原网页!”一样,加入了“访问原网页”、“见原网页”等类似正文内容不做判重的处理
+// 同“详情请访问原网页!”一样,加入了“访问原网页”、“见原网页”等类似正文内容不做判重的处理
 func DetailExists(detail string) bool {
 	//detail = tools.Reg.ReplaceAllString(tools.Filter.ReplaceAllString(detail, ""), "")
 	return len([]rune(detail)) < 150 && SpecialTextReg.MatchString(detail)
 }
 
-//保存redis
+// 保存redis
 func RedisSave(hashHref, shaid string, comepete bool) {
 	sp.RedisClusterSet(hashHref, "", -1) //href
 	if !comepete {                       //非竞品数据redis存储shaid
@@ -109,7 +119,7 @@ func RedisSave(hashHref, shaid string, comepete bool) {
 	}
 }
 
-//DFA
+// DFA
 type DFA struct {
 	Link map[string]interface{}
 }