maxiaoshan před 5 roky
rodič
revize
aa8ebd9073

+ 37 - 42
udpcreateindex/src/biddingindex.go

@@ -83,52 +83,47 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	}
 	log.Println(mapInfo, "create bidding index...over", "all:", count, "n1:", n1, "n2:", n2)
 
-	go delEs(mapInfo, index, itype, db, c) //删除索引
+	//go delEs(mapInfo, index, itype, db, c) //删除索引
 }
 
 //删除索引
-func delEs(mapInfo map[string]interface{}, index, itype, db, c string) {
-	defer qutil.Catch()
-	other_delete := false
-	if other_index != "" && other_itype != "" {
-		other_delete = true
-	}
-	ids := qutil.ObjToString(mapInfo["ids"])
-	idsarr := strings.Split(ids, ",")
-	log.Println("delete ids count:", len(idsarr))
-	n1 := 0
-	update := [][]map[string]interface{}{} //将bidding表中的extracttype改为-1
-	set := map[string]interface{}{
-		"$set": map[string]interface{}{"extracttype": -1},
-	}
-	for _, id := range idsarr {
-		if id != "" {
-			update = append(update, []map[string]interface{}{ //更新
-				map[string]interface{}{
-					"_id": qutil.StringTOBsonId(id),
-				},
-				set,
-			})
-			if elastic.DelById(index, itype, id) {
-				n1++
-			}
-			if other_delete {
-				bidding_other_es.DelById(other_index, other_itype, id)
-			}
-			// go func(id string) { //删除备份库的数据
-			// 	if other_delete {
-			// 		bidding_other_es.DelById(other_index, other_itype, id)
-			// 	}
-			// }(id)
-		}
-	}
-	//更新
-	if len(update) > 0 {
-		mgo.UpdateBulkAll(db, c, update...)
-	}
+// func delEs(mapInfo map[string]interface{}, index, itype, db, c string) {
+// 	defer qutil.Catch()
+// 	other_delete := false
+// 	if other_index != "" && other_itype != "" {
+// 		other_delete = true
+// 	}
+// 	ids := qutil.ObjToString(mapInfo["ids"])
+// 	idsarr := strings.Split(ids, ",")
+// 	log.Println("delete ids count:", len(idsarr))
+// 	n1 := 0
+// 	update := [][]map[string]interface{}{} //将bidding表中的extracttype改为-1
+// 	set := map[string]interface{}{
+// 		"$set": map[string]interface{}{"extracttype": -1},
+// 	}
+// 	for _, id := range idsarr {
+// 		if id != "" {
+// 			update = append(update, []map[string]interface{}{ //更新
+// 				map[string]interface{}{
+// 					"_id": qutil.StringTOBsonId(id),
+// 				},
+// 				set,
+// 			})
+// 			if elastic.DelById(index, itype, id) {
+// 				n1++
+// 			}
+// 			if other_delete {
+// 				bidding_other_es.DelById(other_index, other_itype, id)
+// 			}
+// 		}
+// 	}
+// 	//更新
+// 	if len(update) > 0 {
+// 		mgo.UpdateBulkAll(db, c, update...)
+// 	}
 
-	log.Println("result delete bidding index...over", "all:", n1)
-}
+// 	log.Println("result delete bidding index...over", "all:", n1)
+// }
 
 func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
 	n1, n2 := 0, 0

+ 6 - 17
udpcreateindex/src/config.json

@@ -36,7 +36,7 @@
         "extractdb": "mxs",
         "extractcollect": "extract",
         "indexfields":[ 
-        "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo","purchasing","purchasinglist","filetext"
+        "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo","purchasing","purchasinglist","filetext","channel"
         ],
         "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,s_topscopeclass,area,city,district,s_winner,toptype,subtype,subscopeclass,s_subscopeclass,dataging",
         "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,approvedept,approvenumber,projecttype,approvecity",
@@ -70,11 +70,12 @@
     "standard": {
  		"addr": "192.168.3.207:27092",
         "size": 10,
-        "db": "qfw",
+        "db": "mxs",
     	"winnerent":{
-			"collect": "winner_enterprise",
-        	"index": "winnerent_v1",
-        	"type": "winnerent"
+			"collect1": "winner_enterprise",
+			"collect2": "winner_err",
+        	"index": "winner",
+        	"type": "winner"
 		},
         "buyerent":{
 			"collect": "buyer_enterprise",
@@ -95,17 +96,5 @@
     "elastic": {
         "addr": "http://192.168.3.128:9800",
         "pool": 12
-    },
-    "winnerextract": {
-      "threadnum": 3,
-      "db_addr": "192.168.3.207:27092",
-      "db_name": "extract_kf",
-      "db_pool": 10,
-      "db_c1": "zk_test1",
-      "db_c2": "zk_test2",
-      "es_addr": "http://192.168.3.11:9800",
-      "es_size": 10,
-      "es_index": "zktest_v1",
-      "es_type": "zktest"
     }
 }

+ 2 - 20
udpcreateindex/src/main.go

@@ -17,7 +17,6 @@ var (
 	Sysconfig            map[string]interface{} //配置文件
 	mgo                  *mongodb.MongodbSim    //mongodb操作对象
 	extractmgo           *mongodb.MongodbSim    //mongodb操作对象
-	winnermgo			 *mongodb.MongodbSim	//163的winner
 	project2db           *mongodb.MongodbSim    //mongodb操作对象
 	mgostandard          *mongodb.MongodbSim    //mongodb操作对象
 	qyxydb               *mongodb.MongodbSim    //mongodb操作对象
@@ -36,8 +35,7 @@ var (
 	other_index      string
 	other_itype      string
 
-	winner_es		 *elastic.Elastic //winner_v1
-	winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent,winner_extract map[string]interface{}
+	winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
 )
 
 func init() {
@@ -56,7 +54,7 @@ func init() {
 	project2, _ = Sysconfig["project2"].(map[string]interface{})
 	qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
 	mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
-	winner_extract,_=Sysconfig["winnerextract"].(map[string]interface{})
+
 	mgo = &mongodb.MongodbSim{ //mongodb为binding连接
 		MongodbAddr: mconf["addr"].(string),
 		Size:        util.IntAllDef(mconf["pool"], 5),
@@ -64,15 +62,6 @@ func init() {
 	}
 	mgo.InitPool()
 
-
-	winnermgo = &mongodb.MongodbSim{
-		MongodbAddr: winner_extract["db_addr"].(string),
-		Size:        util.IntAllDef(winner_extract["db_pool"], 5),
-		DbName:      winner_extract["db_name"].(string),
-	}
-	winnermgo.InitPool()
-
-
 	project2db = &mongodb.MongodbSim{
 		MongodbAddr: project2["addr"].(string),
 		Size:        util.IntAllDef(project2["pool"], 5),
@@ -125,13 +114,6 @@ func init() {
 		bidding_other_es.InitElasticSize()
 	}
 
-	//winner_es
-	winner_es = &elastic.Elastic{
-		S_esurl: winner_extract["es_addr"].(string),
-		I_size:  util.IntAllDef(winner_extract["es_size"], 5),
-	}
-	winner_es.InitElasticSize()
-
 	//
 	if bidding["indexfields"] != nil {
 		biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))

+ 0 - 2
udpcreateindex/src/projectindex.go

@@ -105,8 +105,6 @@ func projectTask(data []byte, project, mapInfo map[string]interface{}) {
 		delete(tmp, "package")
 		delete(tmp, "winnerorder")
 		delete(tmp, "infofield")
-		delete(tmp, "budgettag")
-		delete(tmp, "bidamounttag")
 		list := tmp["list"].([]interface{})
 		for _, m := range list {
 			tmpM := m.(map[string]interface{})

+ 1 - 5
udpcreateindex/src/task.go

@@ -15,9 +15,7 @@ func task_index() {
 	//c.AddFunc("0 30 * * * *", func() { task_biddingfile() }) //每30分钟执行一次
 	//c.AddFunc("0 22 14 * * *", func() { task_qyxyindex() })
 
-	c.AddFunc("0 0 0 * * ?", func() { task_winnerextract() })//每天凌晨执行一次生索引
-
-
+	c.AddFunc("0 0 0 * * ?", func() { task_winnerextract() }) //每天凌晨执行一次生索引
 	c.Start()
 }
 func task_winnerextract() {
@@ -25,8 +23,6 @@ func task_winnerextract() {
 	winnerEsTaskOnce()
 }
 
-
-
 //招标附件、标的物,临时用
 func task_biddingfile() {
 	defer qutil.Catch()

+ 58 - 40
udpcreateindex/src/winnerextract.go

@@ -1,50 +1,53 @@
 package main
 
 import (
-	"gopkg.in/mgo.v2/bson"
 	"log"
 	qu "qfw/util"
+	elastic "qfw/util/elastic"
 	"sync"
 	"time"
-)
 
+	"gopkg.in/mgo.v2/bson"
+)
 
-func winnerEsTaskOnce()  {
+func winnerEsTaskOnce() {
 	defer qu.Catch()
+	arrEs := []map[string]interface{}{}
+	winerEsLock := &sync.Mutex{}
+	pool := make(chan bool, 3)
+	wg := &sync.WaitGroup{}
+
 	now := time.Now()
 	preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
 	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
 	task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
 	task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
-	log.Println("区间id:",task_sid,task_eid)
+	log.Println("区间id:", task_sid, task_eid)
 	//区间id
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
 			"$gte": qu.StringTOBsonId(task_sid),
-			"$lt": qu.StringTOBsonId(task_eid),
+			"$lt":  qu.StringTOBsonId(task_eid),
 		},
 	}
 	//参数
-	threadnum:=qu.IntAll(winner_extract["threadnum"])
-	db_c1:=qu.ObjToString(winner_extract["db_c1"])
-	db_c2:=qu.ObjToString(winner_extract["db_c2"])
-	es_index:=qu.ObjToString(winner_extract["es_index"])
-	es_type:=qu.ObjToString(winner_extract["es_type"])
-
+	winnerent, _ := standard["winnerent"].(map[string]interface{})
+	win_ent := qu.ObjToString(winnerent["collect1"])
+	win_enterr := qu.ObjToString(winnerent["collect2"])
+	index, _ := winnerent["index"].(string)
+	itype, _ := winnerent["type"].(string)
 	//mongo
-	sess := winnermgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
+	sess := mgostandard.GetMgoConn()
+	defer mgostandard.DestoryMongoConn(sess)
 	//es
-	EsConn := winner_es.GetEsConn()
-	defer winner_es.DestoryEsConn(EsConn)
-
-	it_1 := sess.DB(winnermgo.DbName).C(db_c1).Find(&q).Sort("_id").Iter()
-	num_1:=0
-	pool := make(chan bool, threadnum)
-	wg := &sync.WaitGroup{}
-	for tmp := make(map[string]interface{}); it_1.Next(&tmp);num_1++{
-		if num_1%100 == 0 && num_1>0{
-			log.Println("当前表:",db_c1,"数量:",num_1)
+	// EsConn := winner_es.GetEsConn()
+	// defer winner_es.DestoryEsConn(EsConn)
+	log.Println("q:", q, "db:", mgostandard.DbName, "coll:", win_ent)
+	it_1 := sess.DB(mgostandard.DbName).C(win_ent).Find(&q).Sort("_id").Iter()
+	num_1 := 0
+	for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
+		if num_1%100 == 0 && num_1 > 0 {
+			log.Println("当前表:", win_ent, "数量:", num_1)
 		}
 		pool <- true
 		wg.Add(1)
@@ -53,25 +56,28 @@ func winnerEsTaskOnce()  {
 				<-pool
 				wg.Done()
 			}()
-			savetmp := make(map[string]interface{}, 0)
+			savetmp := map[string]interface{}{}
 			tmp_id := qu.BsonIdToSId(tmp["_id"])
 			savetmp["_id"] = tmp_id
 			savetmp["name"] = tmp["company_name"]
 			savetmp["pici"] = tmp["updatetime"]
-			if _, err := EsConn.Index().Index(es_index).Type(es_type).Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
-				log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+			winerEsLock.Lock()
+			arrEs = append(arrEs, savetmp)
+			if len(arrEs) >= BulkSize {
+				tmps := arrEs
+				elastic.BulkSave(index, itype, &tmps, true)
+				arrEs = []map[string]interface{}{}
 			}
+			winerEsLock.Unlock()
 		}(tmp)
 		tmp = make(map[string]interface{})
 	}
-
-
-
-	it_2 := sess.DB(winnermgo.DbName).C(db_c2).Find(&q).Sort("_id").Iter()
-	num_2:=0
-	for tmp := make(map[string]interface{}); it_2.Next(&tmp);num_2++{
-		if num_2%100 == 0 && num_2>0 {
-			log.Println("当前表:",db_c2,"数量:",num_1)
+	log.Println("q:", q, "db:", mgostandard.DbName, "coll:", win_enterr)
+	it_2 := sess.DB(mgostandard.DbName).C(win_enterr).Find(&q).Sort("_id").Iter()
+	num_2 := 0
+	for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
+		if num_2%100 == 0 && num_2 > 0 {
+			log.Println("当前表:", win_enterr, "数量:", num_1)
 		}
 		pool <- true
 		wg.Add(1)
@@ -80,18 +86,30 @@ func winnerEsTaskOnce()  {
 				<-pool
 				wg.Done()
 			}()
-			savetmp := make(map[string]interface{}, 0)
+			savetmp := map[string]interface{}{}
 			tmp_id := qu.BsonIdToSId(tmp["_id"])
 			savetmp["_id"] = tmp_id
 			savetmp["name"] = tmp["name"]
 			savetmp["pici"] = tmp["updatetime"]
-			if _, err := EsConn.Index().Index(es_index).Type(es_type).Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
-				log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+			winerEsLock.Lock()
+			arrEs = append(arrEs, savetmp)
+			if len(arrEs) >= BulkSize {
+				tmps := arrEs
+				elastic.BulkSave(index, itype, &tmps, true)
+				arrEs = []map[string]interface{}{}
 			}
+			winerEsLock.Unlock()
 		}(tmp)
 		tmp = make(map[string]interface{})
 	}
 
-	log.Println("总计:",num_1+num_2)
-
-}
+	wg.Wait()
+	winerEsLock.Lock()
+	if len(arrEs) > 0 {
+		tmps := arrEs
+		elastic.BulkSave(index, itype, &tmps, true)
+		arrEs = []map[string]interface{}{}
+	}
+	winerEsLock.Unlock()
+	log.Println("总计:", num_1+num_2)
+}