Browse Source

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

fengweiqiang 5 years ago
parent
commit
1fe589fefb

+ 9 - 0
udpcreateindex/src/config.json

@@ -42,11 +42,20 @@
         "multiIndex": ""
     },
     "project": {
+		"addr": "172.17.4.189:27082",
+        "size": 2,
         "db": "extract_kf",
         "collect": "huawei_project",
         "index": "projectset_v1",
         "type": "projectset"
     },
+    "project2": {
+
+        "db": "extract_kf",
+        "collect": "huawei_project",
+        "index": "project_v2",
+        "type": "project"
+    },
     "standard": {
  		"addr": "172.17.145.163:27082",
         "size": 10,

+ 11 - 3
udpcreateindex/src/main.go

@@ -16,6 +16,7 @@ var (
 	Sysconfig          map[string]interface{} //配置文件
 	mgo                *mongodb.MongodbSim    //mongodb操作对象
 	extractmgo         *mongodb.MongodbSim    //mongodb操作对象
+	project2db         *mongodb.MongodbSim    //mongodb操作对象
 	mgostandard        *mongodb.MongodbSim    //mongodb操作对象
 	udpclient          mu.UdpClient           //udp对象
 	updport            string
@@ -25,7 +26,7 @@ var (
 	multiIndex         []string
 	BulkSize           = 400
 
-	winner, bidding, biddingback, project, buyer, standard map[string]interface{}
+	winner, bidding, biddingback, project, project2, buyer, standard map[string]interface{}
 )
 
 func init() {
@@ -39,6 +40,7 @@ func init() {
 	bidding, _ = Sysconfig["bidding"].(map[string]interface{})
 	biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
 	project, _ = Sysconfig["project"].(map[string]interface{})
+	project2, _ = Sysconfig["project2"].(map[string]interface{})
 	mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
 	mgo = &mongodb.MongodbSim{
 		MongodbAddr: mconf["addr"].(string),
@@ -46,7 +48,12 @@ func init() {
 		DbName:      mconf["db"].(string),
 	}
 	mgo.InitPool()
-
+	project2db = &mongodb.MongodbSim{
+		MongodbAddr: project2["addr"].(string),
+		Size:        util.IntAllDef(project2["pool"], 5),
+		DbName:      project2["db"].(string),
+	}
+	project2db.InitPool()
 	savedb, _ := Sysconfig["savedb"].(map[string]interface{})
 	if savedb == nil {
 		log.Println("未设置保存数据库,默认使用招标库")
@@ -91,6 +98,7 @@ func init() {
 }
 
 func main() {
+	go task_projects()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
@@ -140,7 +148,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					defer func() {
 						<-pool
 					}()
-					projectTask(data, mapInfo)
+					projectTask(data, project, mapInfo)
 				}()
 			case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
 				pool <- true

+ 10 - 3
udpcreateindex/src/projectindex.go

@@ -9,10 +9,11 @@ import (
 	"qfw/util"
 	elastic "qfw/util/elastic"
 
+	mgov "gopkg.in/mgo.v2"
 	"gopkg.in/mgo.v2/bson"
 )
 
-func projectTask(data []byte, mapInfo map[string]interface{}) {
+func projectTask(data []byte, project, mapInfo map[string]interface{}) {
 	defer util.Catch()
 	q, _ := mapInfo["query"].(map[string]interface{})
 	if q == nil {
@@ -23,8 +24,14 @@ func projectTask(data []byte, mapInfo map[string]interface{}) {
 			},
 		}
 	}
-	session := extractmgo.GetMgoConn(3600)
-	defer extractmgo.DestoryMongoConn(session)
+	var session *mgov.Session
+	if project["addr"] != nil {
+		session = project2db.GetMgoConn(3600)
+		defer project2db.DestoryMongoConn(session)
+	} else {
+		session = extractmgo.GetMgoConn(3600)
+		defer extractmgo.DestoryMongoConn(session)
+	}
 	c, _ := project["collect"].(string)
 	db, _ := project["db"].(string)
 	index, _ := project["index"].(string)

+ 28 - 0
udpcreateindex/src/task.go

@@ -0,0 +1,28 @@
+// task定时执行项目索引
+package main
+
+import (
+	"log"
+	"time"
+
+	"github.com/cron"
+)
+
+func task_projects() {
+	c := cron.New()
+	_ = c.AddFunc("20 30 5 * * *", func() {
+		t := time.Now()
+		pici := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.Local).Unix()
+		log.Println(pici)
+		mapInfo := map[string]interface{}{
+			"query": map[string]interface{}{
+				"pici": map[string]interface{}{
+					"$gte": pici - 86400,
+					"$lte": pici,
+				},
+			},
+		}
+		projectTask([]byte{}, project2, mapInfo)
+	})
+	c.Start()
+}