maxiaoshan 4 年之前
父節點
當前提交
cfceb373e3
共有 4 個文件被更改,包括 149 次插入17 次删除
  1. 12 0
      src/saveServer/config.json
  2. 3 17
      src/saveServer/main.go
  3. 2 0
      src/saveServer/savedata.go
  4. 132 0
      src/saveServer/util.go

+ 12 - 0
src/saveServer/config.json

@@ -2,6 +2,18 @@
     "mongodbServers": "192.168.3.207:27092",
     "mongodbPoolSize": "5",
     "mongodbName": "spider",
+    "spider":{
+    	"addr": "192.168.3.207:27092",
+		"db": "spider",
+		"coll": "spider_repeatdata",
+		"size": 5
+    },
+    "editor":{
+    	"addr": "192.168.3.207:27092",
+		"db": "editor",
+		"coll": "task",
+		"size": 5
+    },
     "defauldetail": "由于网络故障,信息无法正常显示,请查看来源网站。",
     "redisServer": "title_repeat_fulljudgement=192.168.3.207:1579,shaid=192.168.3.207:1479",
     "port": "7999",

+ 3 - 17
src/saveServer/main.go

@@ -30,16 +30,6 @@ const (
 	LEVEL_ERROR            = 2
 )
 
-var client *util.Client
-var config map[string]interface{}
-var port string
-var mustfield []string
-var field []string
-var jsondatafield []string
-var jsondatacheckfield []string
-var randomreg *regexp.Regexp
-var repeattime int
-
 func loadConfig() bool {
 	log.Println("reload config ...")
 	qutil.ReadConfig(&config) //config.json
@@ -72,14 +62,9 @@ func inits() {
 		log.Println("配置错误,系统退出。")
 		os.Exit(0)
 	}
+	//mgo连接
+	InitMgo()
 	port = qutil.ObjToString(config["port"])
-	//初始化mongo
-	tools.Mgo = &tools.MongodbSim{
-		MongodbAddr: qutil.ObjToString(config["mongodbServers"]),
-		Size:        qutil.IntAll(config["mongodbPoolSize"]),
-		DbName:      qutil.ObjToString(config["mongodbName"]),
-	}
-	tools.Mgo.InitPool()
 	//初始化redis
 	tools.InitRedis(qutil.ObjToString(config["redisServer"]))
 	//初始化邮件信息
@@ -110,6 +95,7 @@ func inits() {
 	//		tempinfo.TempSite[v] = true
 	//	}
 	//	go tempinfo.Task() //temp_bidding to bidding
+	go SaveRepeat()    //记录重复数据
 	go TimerSave()     //定时保存数据
 	go TimingTask()    //定时发邮件
 	go TimingSaveNum() //定时检测半小时内保存数据量

+ 2 - 0
src/saveServer/savedata.go

@@ -191,6 +191,7 @@ func NewSaveBidding(tmp map[string]interface{}) (b bool, res int) {
 	//qutil.Debug("href:", href, "redis isexist:", isExist, "dataging:", tmp["dataging"])
 	dataging := qutil.IntAll(tmp["dataging"])
 	if isExist && dataging == 0 { //重复数据
+		SaveMgoCache <- tmp //记录重复数据
 		res = 4
 		b = true
 		//go log.Println("repeat:", href, uuid)
@@ -201,6 +202,7 @@ func NewSaveBidding(tmp map[string]interface{}) (b bool, res int) {
 	//publishtime校验
 	publishtime := qutil.Int64All(tmp["publishtime"])
 	if publishtime <= 0 {
+		NewEditorTask(site, spidercode, href) //编辑器新建任务
 		errorData(LEVEL_WARN, "warn-publishtime", "Publishtime Is Null", site, spidercode, href)
 	} else {
 		now := time.Now().Unix()

+ 132 - 0
src/saveServer/util.go

@@ -0,0 +1,132 @@
+package main
+
+import (
+	"mfw/util"
+	qutil "qfw/util"
+	"regexp"
+	"saveServer/tools"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+var (
+	client             *util.Client
+	config             map[string]interface{}
+	port               string
+	mustfield          []string
+	field              []string
+	jsondatafield      []string
+	jsondatacheckfield []string
+	randomreg          *regexp.Regexp
+	repeattime         int
+
+	Mgo          *tools.MongodbSim
+	MgoS         *tools.MongodbSim
+	MgoE         *tools.MongodbSim
+	CollS        string
+	CollE        string
+	SaveMgoCache = make(chan map[string]interface{}, 1000) //更新集合
+	SP           = make(chan bool, 5)
+)
+
+func InitMgo() {
+	//bidding
+	tools.Mgo = &tools.MongodbSim{
+		MongodbAddr: qutil.ObjToString(config["mongodbServers"]),
+		Size:        qutil.IntAll(config["mongodbPoolSize"]),
+		DbName:      qutil.ObjToString(config["mongodbName"]),
+	}
+	tools.Mgo.InitPool()
+	//spider
+	spider := config["spider"].(map[string]interface{})
+	qutil.Debug(spider)
+	CollS = qutil.ObjToString(spider["coll"])
+	MgoS = &tools.MongodbSim{
+		MongodbAddr: qutil.ObjToString(spider["addr"]),
+		DbName:      qutil.ObjToString(spider["db"]),
+		Size:        qutil.IntAll(spider["size"]),
+	}
+	MgoS.InitPool()
+	//editor
+	editor := config["editor"].(map[string]interface{})
+	qutil.Debug(editor)
+	CollE = qutil.ObjToString(editor["coll"])
+	MgoE = &tools.MongodbSim{
+		MongodbAddr: qutil.ObjToString(editor["addr"]),
+		DbName:      qutil.ObjToString(editor["db"]),
+		Size:        qutil.IntAll(editor["size"]),
+	}
+	MgoE.InitPool()
+}
+
+//保存重复数据
+func SaveRepeat() {
+	defer qutil.Catch()
+	qutil.Debug("Save Repeat Data...")
+	arru := make([]map[string]interface{}, 500)
+	indexu := 0
+	for {
+		select {
+		case v := <-SaveMgoCache:
+			arru[indexu] = v
+			indexu++
+			if indexu == 500 {
+				SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					MgoS.SaveBulk(CollS, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, 500)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					MgoS.SaveBulk(CollS, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, 500)
+				indexu = 0
+			}
+		}
+	}
+}
+
+//publishtime <=0编辑器建任务
+func NewEditorTask(site, spidercode, href string) {
+	defer qutil.Catch()
+	lua := MgoE.FindOne("luaconfig", map[string]interface{}{"code": spidercode})
+	if len(lua) > 0 {
+		event := qutil.IntAll(lua["event"])
+		modifyuser := qutil.ObjToString(lua["modifyuser"])
+		modifyuserid := qutil.ObjToString(lua["modifyuserid"])
+		param := lua["param_common"].(primitive.A)
+		channel := ""
+		if len(param) >= 3 {
+			channel = qutil.ObjToString(param[2])
+		}
+		task := map[string]interface{}{
+			"s_site":       site,
+			"s_channel":    channel,
+			"s_code":       spidercode,
+			"i_state":      1,
+			"s_modify":     modifyuser,
+			"s_modifyid":   modifyuserid,
+			"i_urgency":    "4",
+			"event":        event,
+			"l_comeintime": time.Now().Unix(),
+			"l_complete":   time.Now().AddDate(0, 0, 1).Unix(),
+			"s_descript":   "发布时间异常,href:" + href,
+			"type":         "publishtime_err",
+		}
+		MgoE.Save("task", task)
+	} else {
+		qutil.Debug("Find Lua Error:", spidercode)
+	}
+}