Browse Source

定时生索引

apple 5 years ago
parent
commit
343d971be2

+ 12 - 0
udpcreateindex/src/config.json

@@ -95,5 +95,17 @@
     "elastic": {
         "addr": "http://192.168.3.128:9800",
         "pool": 12
+    },
+    "winnerextract": {
+      "threadnum": 3,
+      "db_addr": "192.168.3.207:27092",
+      "db_name": "extract_kf",
+      "db_pool": 10,
+      "db_c1": "zk_test1",
+      "db_c2": "zk_test2",
+      "es_addr": "http://192.168.3.11:9800",
+      "es_size": 10,
+      "es_index": "zktest_v1",
+      "es_type": "zktest"
     }
 }

+ 22 - 1
udpcreateindex/src/main.go

@@ -17,6 +17,7 @@ var (
 	Sysconfig            map[string]interface{} //配置文件
 	mgo                  *mongodb.MongodbSim    //mongodb操作对象
 	extractmgo           *mongodb.MongodbSim    //mongodb操作对象
+	winnermgo			 *mongodb.MongodbSim	//163的winner
 	project2db           *mongodb.MongodbSim    //mongodb操作对象
 	mgostandard          *mongodb.MongodbSim    //mongodb操作对象
 	qyxydb               *mongodb.MongodbSim    //mongodb操作对象
@@ -35,7 +36,8 @@ var (
 	other_index      string
 	other_itype      string
 
-	winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
+	winner_es		 *elastic.Elastic //winner_v1
+	winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent,winner_extract map[string]interface{}
 )
 
 func init() {
@@ -54,6 +56,7 @@ func init() {
 	project2, _ = Sysconfig["project2"].(map[string]interface{})
 	qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
 	mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
+	winner_extract,_=Sysconfig["winnerextract"].(map[string]interface{})
 	mgo = &mongodb.MongodbSim{ //mongodb为binding连接
 		MongodbAddr: mconf["addr"].(string),
 		Size:        util.IntAllDef(mconf["pool"], 5),
@@ -61,6 +64,15 @@ func init() {
 	}
 	mgo.InitPool()
 
+
+	winnermgo = &mongodb.MongodbSim{
+		MongodbAddr: winner_extract["db_addr"].(string),
+		Size:        util.IntAllDef(winner_extract["db_pool"], 5),
+		DbName:      winner_extract["db_name"].(string),
+	}
+	winnermgo.InitPool()
+
+
 	project2db = &mongodb.MongodbSim{
 		MongodbAddr: project2["addr"].(string),
 		Size:        util.IntAllDef(project2["pool"], 5),
@@ -112,6 +124,14 @@ func init() {
 		}
 		bidding_other_es.InitElasticSize()
 	}
+
+	//winner_es
+	winner_es = &elastic.Elastic{
+		S_esurl: winner_extract["es_addr"].(string),
+		I_size:  util.IntAllDef(winner_extract["es_size"], 5),
+	}
+	winner_es.InitElasticSize()
+
 	//
 	if bidding["indexfields"] != nil {
 		biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
@@ -148,6 +168,7 @@ func main() {
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
 	time.Sleep(99999 * time.Hour)
+
 }
 
 var pool = make(chan bool, 20)

+ 10 - 0
udpcreateindex/src/task.go

@@ -14,8 +14,18 @@ func task_index() {
 	c.AddFunc("20 30 5 * * *", func() { task_projects() })
 	//c.AddFunc("0 30 * * * *", func() { task_biddingfile() }) //每30分钟执行一次
 	//c.AddFunc("0 22 14 * * *", func() { task_qyxyindex() })
+
+	c.AddFunc("0 0 0 * * ?", func() { task_winnerextract() })//每天凌晨执行一次生索引
+
+
 	c.Start()
 }
+func task_winnerextract() {
+	log.Println("开始执行一次定时任务,winnerextract")
+	winnerEsTaskOnce()
+}
+
+
 
 //招标附件、标的物,临时用
 func task_biddingfile() {

+ 97 - 0
udpcreateindex/src/winnerextract.go

@@ -0,0 +1,97 @@
+package main
+
+import (
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	qu "qfw/util"
+	"sync"
+	"time"
+)
+
+
+func winnerEsTaskOnce()  {
+	defer qu.Catch()
+	now := time.Now()
+	preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
+	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
+	task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
+	task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
+	log.Println("区间id:",task_sid,task_eid)
+	//区间id
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": qu.StringTOBsonId(task_sid),
+			"$lt": qu.StringTOBsonId(task_eid),
+		},
+	}
+	//参数
+	threadnum:=qu.IntAll(winner_extract["threadnum"])
+	db_c1:=qu.ObjToString(winner_extract["db_c1"])
+	db_c2:=qu.ObjToString(winner_extract["db_c2"])
+	es_index:=qu.ObjToString(winner_extract["es_index"])
+	es_type:=qu.ObjToString(winner_extract["es_type"])
+
+	//mongo
+	sess := winnermgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	//es
+	EsConn := winner_es.GetEsConn()
+	defer winner_es.DestoryEsConn(EsConn)
+
+	it_1 := sess.DB(winnermgo.DbName).C(db_c1).Find(&q).Sort("_id").Iter()
+	num_1:=0
+	pool := make(chan bool, threadnum)
+	wg := &sync.WaitGroup{}
+	for tmp := make(map[string]interface{}); it_1.Next(&tmp);num_1++{
+		if num_1%100 == 0 && num_1>0{
+			log.Println("当前表:",db_c1,"数量:",num_1)
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			savetmp := make(map[string]interface{}, 0)
+			tmp_id := qu.BsonIdToSId(tmp["_id"])
+			savetmp["_id"] = tmp_id
+			savetmp["name"] = tmp["company_name"]
+			savetmp["pici"] = tmp["updatetime"]
+			if _, err := EsConn.Index().Index(es_index).Type(es_type).Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+				log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+
+
+
+	it_2 := sess.DB(winnermgo.DbName).C(db_c2).Find(&q).Sort("_id").Iter()
+	num_2:=0
+	for tmp := make(map[string]interface{}); it_2.Next(&tmp);num_2++{
+		if num_2%100 == 0 && num_2>0 {
+			log.Println("当前表:",db_c2,"数量:",num_1)
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			savetmp := make(map[string]interface{}, 0)
+			tmp_id := qu.BsonIdToSId(tmp["_id"])
+			savetmp["_id"] = tmp_id
+			savetmp["name"] = tmp["name"]
+			savetmp["pici"] = tmp["updatetime"]
+			if _, err := EsConn.Index().Index(es_index).Type(es_type).Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+				log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+
+	log.Println("总计:",num_1+num_2)
+
+}

+ 100 - 1
udpfilterdup/src/README.md

@@ -221,4 +221,103 @@ if isMerger { //合并相关
 							merge_map,
 						})
 					}
-				}					
+				}	
+				
+				
+				
+				
+				
+				
+"winnerextract": {
+      "db_addr": "172.17.145.163:27082",
+      "db_name": "",
+      "db_pool": 5,
+      "db_c1": "winner_enterprise",
+      "db_c2": "winner_err",
+      "es_addr": "http://172.17.145.170:9800",
+      "es_index": "winner_v1",
+      "es_type": "winner"
+    }
+    
+    
+    func testWinnerExtract() {
+    	//查询表 - 生ES - 全量
+    
+    	Mgo = &MongodbSim{
+    		MongodbAddr: "172.17.145.163:27082",
+    		DbName:      "extract_v3",
+    		Size:        10,
+    	}
+    	Mgo.InitPool()
+    	sess := Mgo.GetMgoConn()
+    	defer mgo.DestoryMongoConn(sess)
+    
+    
+    	//初始化es
+    	es.InitElasticSize("http://172.17.145.170:9800",10)
+    	EsConn := es.GetEsConn()
+    	defer es.DestoryEsConn(EsConn)
+    
+    
+    	it := sess.DB(Mgo.DbName).C("winner_enterprise").Find(map[string]interface{}{}).Sort("_id").Iter()
+    	num :=0
+    
+    
+    	pool := make(chan bool, 5)
+    	wg := &sync.WaitGroup{}
+    	for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+    		if num%10000 == 0 {
+    			log.Println("遍历数量:",num)
+    		}
+    
+    		pool <- true
+    		wg.Add(1)
+    		go func(tmp map[string]interface{}) {
+    			defer func() {
+    				<-pool
+    				wg.Done()
+    			}()
+    			savetmp := make(map[string]interface{}, 0)
+    			tmp_id := tmp["_id"].(primitive.ObjectID).Hex()
+    			savetmp["_id"] = tmp_id
+    			savetmp["name"] = tmp["company_name"]
+    			savetmp["pici"] = tmp["updatetime"]
+    
+    			if _, err := EsConn.Index().Index("winner_v1").Type("winner").Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+    				log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+    			}
+    		}(tmp)
+    		tmp = make(map[string]interface{})
+    	}
+    
+    	it_err := sess.DB(Mgo.DbName).C("winner_err").Find(map[string]interface{}{}).Sort("_id").Iter()
+    	num1 :=0
+    	pool_err := make(chan bool, 5)
+    	wg_err := &sync.WaitGroup{}
+    
+    	for tmp := make(map[string]interface{}); it_err.Next(&tmp); num++ {
+    		if num1%10000 == 0 {
+    			log.Println("遍历数量:",num1)
+    		}
+    		pool_err <- true
+    		wg_err.Add(1)
+    		go func(tmp map[string]interface{}) {
+    			defer func() {
+    				<-pool_err
+    				wg_err.Done()
+    			}()
+    			savetmp := make(map[string]interface{}, 0)
+    			tmp_id := tmp["_id"].(primitive.ObjectID).Hex()
+    			savetmp["_id"] = tmp_id
+    			savetmp["name"] = tmp["name"]
+    			savetmp["pici"] = tmp["updatetime"]
+    			if _, err := EsConn.Index().Index("winner_v1").Type("winner").Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+    				log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+    			}
+    
+    		}(tmp)
+    		tmp = make(map[string]interface{})
+    	}
+    	log.Println("num+num1==",num+num1)
+    
+    }				

+ 2 - 2
udpfilterdup/src/datamap.go

@@ -457,8 +457,8 @@ func (d *datamap) update(t int64) {
 	if TimingTask {
 		d.keymap = d.GetLatelyFiveDay(t)
 	}else {
-		//d.keymap = d.GetLatelyFiveDay(t)//测试数据采用
-		d.keymap = d.GetLatelyFiveDayDouble(t)
+		d.keymap = d.GetLatelyFiveDay(t)//测试数据采用
+		//d.keymap = d.GetLatelyFiveDayDouble(t)
 	}
 	m := map[string]bool{}
 	for _, v := range d.keymap {

+ 2 - 2
udpfilterdup/src/main.go

@@ -128,8 +128,8 @@ func mainT() {
 		time.Sleep(99999 * time.Hour)
 	} else {
 		//IdType = true  //打开id字符串模式
-		sid = "5da3f3a2a5cb26b9b799aa65"
-		eid = "5da40b26a5cb26b9b7bde10b"
+		sid = "5f0141f6801f744d046c6691"
+		eid = "5f017266801f744d046c7a17"
 		log.Println("正常判重测试开始")
 		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}