瀏覽代碼

新增舆情数据保存表bidding_yq

maxiaoshan 1 年之前
父節點
當前提交
bcc05d404a
共有 5 個文件被更改,包括 66 次插入11 次删除
  1. 11 0
      src/saveServer/httpserver.go
  2. 5 0
      src/saveServer/main.go
  3. 10 2
      src/saveServer/processdata.go
  4. 27 0
      src/saveServer/savedata.go
  5. 13 9
      src/saveServer/src/main.go

+ 11 - 0
src/saveServer/httpserver.go

@@ -35,26 +35,37 @@ func HttpServer(port string) {
 	http.HandleFunc("/save", func(w http.ResponseWriter, r *http.Request) {
 		b, _ := CheckAdmin(r)
 		if b {
+			//yq
+			saveyqlock.Lock()
+			if len(SaveYqCache) > 0 {
+				saveYqMust()
+			}
+			saveyqlock.Unlock()
+			//other
 			saveotherlock.Lock()
 			if len(SaveOtherCache) > 0 {
 				saveOtherMust()
 			}
 			saveotherlock.Unlock()
+			//bidding
 			savelock.Lock()
 			if len(SaveCache) > 0 {
 				saveMust()
 			}
 			savelock.Unlock()
+			//bidding_file
 			savefilelock.Lock()
 			if len(SaveFileCache) > 0 {
 				saveFileMust()
 			}
 			savefilelock.Unlock()
+			//update
 			updatelock.Lock()
 			if len(UpdateCache) > 0 {
 				updateMust()
 			}
 			updatelock.Unlock()
+			//
 			mlock.Lock()
 			if mnum > 0 {
 				saveCommMust()

+ 5 - 0
src/saveServer/main.go

@@ -159,6 +159,11 @@ func runNew() {
 		CanHandleEvents: []int{EVENT_SAVETODB, EVENT_SAVETODB_BIDDING},
 		OnRequestConnect: func() {
 			//立即保存
+			saveyqlock.Lock()
+			if len(SaveYqCache) > 0 {
+				saveYqMust()
+			}
+			saveyqlock.Unlock()
 			saveotherlock.Lock()
 			if len(SaveOtherCache) > 0 {
 				saveOtherMust()

+ 10 - 2
src/saveServer/processdata.go

@@ -106,7 +106,7 @@ func NewSaveBidding(tmp map[string]interface{}) (b bool, res int, mgoid, mgocoll
 	//2、重点字段校验
 	//2.1保存表T校验
 	T := qutil.ObjToString(tmp["T"])
-	if T != SaveColl && T != SaveOtherColl {
+	if T != SaveColl && T != SaveOtherColl && T != SaveYqColl {
 		errorData(LEVEL_ERROR, true, "T", "Save Coll Error", href, title, &warn, tmp)
 		res = 2
 		T = "bidding" //设置默认值
@@ -433,7 +433,15 @@ func saveData(T string, result map[string]interface{}, dfOk, iscompete bool) (st
 			result["detail"] = DetailReg.ReplaceAllString(detail, DETAIL_FILE)
 		}
 	}
-	if T == SaveOtherColl { //临时数据
+	if T == SaveYqColl {
+		savecoll = SaveYqColl
+		saveyqlock.Lock()
+		SaveYqCache = append(SaveYqCache, result)
+		if len(SaveYqCache) > 200 {
+			saveYqMust()
+		}
+		saveyqlock.Unlock()
+	} else if T == SaveOtherColl { //临时数据
 		savecoll = SaveOtherColl
 		saveotherlock.Lock()
 		SaveOtherCache = append(SaveOtherCache, result)

+ 27 - 0
src/saveServer/savedata.go

@@ -20,22 +20,26 @@ var domainClearReg = regexp.MustCompile(`((http|https)[::]//)+`)
 var htmlModelReg = regexp.MustCompile(`{{[a-zA-z.()\d,:]{5,}}}`)
 var siteReg = regexp.MustCompile(`(政府采购|公共资源)`)
 var saveothernum = 0
+var saveyqnum = 0
 var savenum = 0
 var savefilenum = 0
 var tmpsavenum = 0
 var updatenum = 0
 var errnum = 0
 var saveotherlock *sync.Mutex = new(sync.Mutex) //保存临时锁
+var saveyqlock *sync.Mutex = new(sync.Mutex)    //保存临时锁
 var savelock *sync.Mutex = new(sync.Mutex)      //保存锁
 var savefilelock *sync.Mutex = new(sync.Mutex)  //保存附件信息锁
 var updatelock *sync.Mutex = new(sync.Mutex)    //更新锁
 //var errorlock *sync.Mutex = new(sync.Mutex)    //异常数据锁
 var SaveOtherCache = []map[string]interface{}{} //批量保存临时
+var SaveYqCache = []map[string]interface{}{}    //批量保存临时
 var SaveCache = []map[string]interface{}{}      //批量保存
 var SaveFileCache = []map[string]interface{}{}  //批量保存附件信息
 var UpdateCache = [][]map[string]interface{}{}  //批量更新
 //var ErrorCache = []map[string]interface{}{}    //异常数据集
 var SaveOtherLastTime = time.Now().Unix()
+var SaveYqLastTime = time.Now().Unix()
 var SaveLastTime = time.Now().Unix()
 var SaveFileLastTime = time.Now().Unix()
 var UpdateLastTime = time.Now().Unix()
@@ -44,6 +48,17 @@ var SaveColl = "bidding"
 var SaveFileColl = "bidding_file"
 var ErrColl = "spider_warn"
 var SaveOtherColl = "bidding_other" //临时存储不用的数据
+var SaveYqColl = "bidding_yq"       //舆情信息存储表
+
+//批量保存舆情数据
+func saveYqMust() {
+	saveyqnum += len(SaveYqCache)
+	tools.Mgo.SaveBulk(SaveYqColl, SaveYqCache...)
+	go log.Println("saveYqMust:", saveyqnum)
+	time.Sleep(time.Second * 2)
+	SaveYqCache = []map[string]interface{}{}
+	SaveYqLastTime = time.Now().Unix()
+}
 
 //批量保存临时数据
 func saveOtherMust() {
@@ -100,6 +115,18 @@ func updateMust() {
 
 //定时保存
 func TimerSave() {
+	//批量保存bidding_yq
+	go func() {
+		for {
+			now := time.Now().Unix()
+			saveyqlock.Lock()
+			if now-SaveYqLastTime > 60 && len(SaveYqCache) > 0 {
+				saveYqMust() //
+			}
+			saveyqlock.Unlock()
+			time.Sleep(30 * time.Second)
+		}
+	}()
 	//批量保存bidding_other
 	go func() {
 		for {

+ 13 - 9
src/saveServer/src/main.go

@@ -159,17 +159,21 @@ func MoveData(gtid, lteid string) {
 				strid := mongodb.BsonIdToSId(newId)
 				tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strid) + `.html`
 			}
-			site := qu.ObjToString(tmp["site"]) //解析附件站点
-			IsReplaceDetailSite := OssSite[site]
-			replace, fileOk, filetext := AnalysisFile(IsReplaceDetailSite, tmp) //解析附件是否替换到detail
-			if IsReplaceDetailSite && replace {                                 //
-				tmp["detail"] = filetext
-			}
+			//更新
 			set := map[string]interface{}{"moveok": true}
 			set["biddingid"] = mongodb.BsonIdToSId(newId)
-			if !fileOk { //附件异常
-				set["filerr"] = true
-				set["filetext"] = filetext
+
+			site := qu.ObjToString(tmp["site"]) //解析附件站点
+			IsReplaceDetailSite := OssSite[site]
+			if IsReplaceDetailSite {
+				replace, fileOk, filetext := AnalysisFile(IsReplaceDetailSite, tmp) //解析附件是否替换到detail
+				if replace {
+					tmp["detail"] = filetext //替换正文
+				}
+				if !fileOk { //附件异常
+					set["filerr"] = true
+					//set["filetext"] = filetext//文本过大,导致更新mongo失败
+				}
 			}
 			update = append(update, map[string]interface{}{
 				"$set": set,