maxiaoshan 5 years ago
parent
commit
ff4016a767

+ 2 - 2
udpcreateindex/src/biddingall.go

@@ -83,7 +83,7 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 				if cid == tid {
 					bnil = false
 					//更新bidding表,生成索引
-					for _, k := range fields {
+					for _, k := range fields { //fields更新到mongo的字段
 						v1 := compare[k]
 						v2 := tmp[k]
 						if v2 == nil && v1 != nil {
@@ -206,7 +206,7 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 			UpdatesLock.Lock()
 			if qutil.IntAll(update["extracttype"]) != -1 {
 				newTmp := map[string]interface{}{}
-				for _, v := range biddingIndexFields {
+				for _, v := range biddingIndexFields { //
 					if tmp[v] != nil {
 						if "projectinfo" == v {
 							mp, _ := tmp[v].(map[string]interface{})

+ 5 - 5
udpcreateindex/src/biddingindex.go

@@ -96,7 +96,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 	log.Println("开始迭代..")
 	for n, tmp := range infos {
 		n1++
-		update := map[string]interface{}{}
+		update := map[string]interface{}{} //要更新的mongo数据
 		//对比方法----------------
 		tid := qutil.BsonIdToSId(tmp["_id"])
 		if eMap[tid] != nil {
@@ -104,8 +104,8 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 			delete(eMap, tid)
 			//更新bidding表,生成索引
 			for _, k := range fields {
-				v1 := compare[k]
-				v2 := tmp[k]
+				v1 := compare[k] //extract
+				v2 := tmp[k]     //bidding
 				if v2 == nil && v1 != nil {
 					update[k] = v1
 				} else if v2 != nil && v1 != nil {
@@ -206,8 +206,8 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		//		}
 		go IS.Add("bidding")
 		if qutil.IntAll(update["extracttype"]) != -1 {
-			newTmp := map[string]interface{}{}
-			for _, v := range biddingIndexFields {
+			newTmp := map[string]interface{}{}     //最终生索引的数据
+			for _, v := range biddingIndexFields { //索引字段
 				//			if tmp[v] != nil {
 				//				newTmp[v] = tmp[v]
 				//			}

+ 13 - 36
udpcreateindex/src/config.json

@@ -1,8 +1,8 @@
 {
     "udpport": ":1483",
-    "msg_server": "123.56.236.148:7070",
+    "msg_server": "192.168.3.207:27092",
 	"savedb": {
-        "addr": "172.17.145.163:27080",
+        "addr": "192.168.3.207:27092",
         "size": 6,
         "db": "extract_v3"
     },
@@ -16,6 +16,14 @@
         "index": "winner",
         "type": "winner"
     },
+	"winnerenterprise":{
+		"addr":"172.17.145.163:27082",
+		"db":"extract_v3",
+		"collect":"winner_enterprise",
+		"size":6,
+		"index":"winner2",
+		"type":"winner_enterprise"
+	},
     "buyer": {
         "db": "qfw",
         "collect": "buyer",
@@ -36,40 +44,9 @@
         "extractdb": "extract_v3",
         "extractcollect": "result_v3",
         "indexfields": [
-            "_id",
-            "s_winner",
-            "winner",
-            "buyerclass",
-            "title",
-            "detail",
-            "area",
-            "site",
-            "bidopendate",
-            "bidopentime",
-            "buyer",
-            "city",
-            "comeintime",
-            "href",
-            "infoformat",
-            "projectcode",
-            "projectname",
-            "publishtime",
-            "s_sha",
-            "spidercode",
-            "subtype",
-            "toptype",
-            "agency",
-            "budget",
-            "bidamount",
-            "s_subscopeclass",
-            "projectscope",
-            "bidstatus",
-            "projectinfo",
-            "buyertel",
-            "buyerperson",
-            "projectid"
+            "_id","district","topscopeclass","s_winner","winner","buyerclass","title","detail","area","site","bidopendate","bidopentime","buyer","city","comeintime","href","infoformat","projectcode","projectname","publishtime","s_sha","spidercode","subtype","toptype","agency","budget","bidamount","s_subscopeclass","projectscope","bidstatus","projectinfo","buyertel","buyerperson","buyeraddr","buyerzipcode","winnertel","winnerperson","projectid"
         ],
-        "fields": "buyerclass,projectname,projectcode,bidamount,budget,agency,amount,winner,buyer,bidopendate,bidopentime,bidstatus,projectscope,buyertel,buyerperson,city,area,district,topscopeclass",
+        "fields": "buyerclass,projectname,projectcode,bidamount,budget,agency,amount,winner,buyer,bidopendate,bidopentime,bidstatus,projectscope,buyertel,buyerperson,buyeraddr,buyerzipcode,city,area,district,topscopeclass,winnertel,winnerperson",
         "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,industry",
         "multiIndex": ""
     },
@@ -80,7 +57,7 @@
         "type": "projectset"
     },
     "mongodb": {
-        "addr": "10.172.242.243:27080",
+        "addr": "192.168.3.207:27092",
         "pool": 10,
         "db": "qfw"
     },

+ 27 - 11
udpcreateindex/src/main.go

@@ -13,17 +13,18 @@ import (
 )
 
 var (
-	Sysconfig                                    map[string]interface{} //配置文件
-	mgo                                          *mongodb.MongodbSim    //mongodb操作对象
-	extractmgo                                   *mongodb.MongodbSim    //mongodb操作对象
-	udpclient                                    mu.UdpClient           //udp对象
-	updport                                      string
-	winner, bidding, biddingback, project, buyer map[string]interface{}
-	savesizei                                    = 500
-	biddingIndexFields                           = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
-	projectinfoFields                            []string
-	multiIndex                                   []string
-	BulkSize                                     = 400
+	Sysconfig                                                      map[string]interface{} //配置文件
+	mgo                                                            *mongodb.MongodbSim    //mongodb操作对象
+	extractmgo                                                     *mongodb.MongodbSim    //mongodb操作对象
+	winnerentermgo                                                 *mongodb.MongodbSim    //mongodb操作对象
+	udpclient                                                      mu.UdpClient           //udp对象
+	updport                                                        string
+	winner, winnerenterprise, bidding, biddingback, project, buyer map[string]interface{}
+	savesizei                                                      = 500
+	biddingIndexFields                                             = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
+	projectinfoFields                                              []string
+	multiIndex                                                     []string
+	BulkSize                                                       = 400
 )
 
 func init() {
@@ -32,6 +33,7 @@ func init() {
 	go checkMapJob()
 	updport, _ = Sysconfig["updport"].(string)
 	winner, _ = Sysconfig["winner"].(map[string]interface{})
+	winnerenterprise, _ = Sysconfig["winnerenterprise"].(map[string]interface{})
 	buyer, _ = Sysconfig["buyer"].(map[string]interface{})
 	bidding, _ = Sysconfig["bidding"].(map[string]interface{})
 	biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
@@ -60,6 +62,12 @@ func init() {
 		extractmgo.InitPool()
 	}
 
+	winnerentermgo = &mongodb.MongodbSim{
+		MongodbAddr: winnerenterprise["addr"].(string),
+		Size:        util.IntAllDef(winnerenterprise["size"], 5),
+		DbName:      winnerenterprise["db"].(string),
+	}
+	winnerentermgo.InitPool()
 	econf := Sysconfig["elastic"].(map[string]interface{})
 	elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
 	if bidding["indexfields"] != nil {
@@ -116,6 +124,14 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					winnerTask(data, mapInfo)
 				}()
+			case "winner_enterprise":
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					winnerEnterPriseTask(data, mapInfo)
+				}()
 			case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
 				pool <- true
 				go func() {

+ 74 - 0
udpcreateindex/src/winnerenterpriseindex.go

@@ -0,0 +1,74 @@
+package main
+
+import (
+	"log"
+	"qfw/util"
+	elastic "qfw/util/elastic"
+	"sync"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+func winnerEnterPriseTask(data []byte, mapInfo map[string]interface{}) {
+	defer util.Catch()
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": bson.M{
+				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	}
+	log.Println("++++++++++++++++++++++")
+	session := winnerentermgo.GetMgoConn(1800)
+	defer winnerentermgo.DestoryMongoConn(session)
+	c, _ := winnerenterprise["collect"].(string)
+	db, _ := winnerenterprise["db"].(string)
+	index, _ := winnerenterprise["index"].(string)
+	itype, _ := winnerenterprise["type"].(string)
+	log.Println("index===", index, "itype===", itype)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+	UpdatesLock := sync.Mutex{}
+	log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
+	query := session.DB(db).C(c).Find(q).Select(bson.M{"alias": 0, "tmp_id": 0}).Iter()
+
+	tmp := []map[string]interface{}{}
+	tmp = append(tmp, map[string]interface{}{
+		"test": "test",
+	})
+
+	elastic.BulkSave(index, itype, &tmp, true)
+	arrEs := []map[string]interface{}{}
+	var n int
+	for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
+		//go IS.Add("winner")
+		log.Println("tmp=========", tmp)
+		UpdatesLock.Lock()
+		arrEs = append(arrEs, tmp)
+		if len(arrEs) > savesizei {
+			tmps := arrEs
+			savepool <- true
+			go func(tmpn []map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, &tmpn, true)
+			}(tmps)
+			arrEs = []map[string]interface{}{}
+		}
+		UpdatesLock.Unlock()
+		if n%1000 == 0 {
+			log.Println("current:", n, util.BsonIdToSId(tmp["_id"]))
+		}
+		tmp = make(map[string]interface{})
+	}
+	UpdatesLock.Lock()
+	if len(arrEs) > 0 {
+		tmpn := arrEs
+		elastic.BulkSave(index, itype, &tmpn, true)
+	}
+	UpdatesLock.Unlock()
+	log.Println(mapInfo, "create winner_enterprise index...over", n)
+}