소스 검색

索引修改、udp扩充

zhangjinkun 5 년 전
부모
커밋
9672e8200a
4개의 변경된 파일304개의 추가작업 그리고 67개의 파일을 삭제
  1. 20 0
      udpcreateindex/src/config.json
  2. 45 12
      udpcreateindex/src/main.go
  3. 178 0
      udpcreateindex/src/standardata.go
  4. 61 55
      udps/main.go

+ 20 - 0
udpcreateindex/src/config.json

@@ -47,6 +47,26 @@
         "index": "projectset_v1",
         "type": "projectset"
     },
+    "standard": {
+ 		"addr": "172.17.145.163:27082",
+        "size": 10,
+        "db": "qfw",
+    	"winnerent":{
+			"collect": "winner_enterprise",
+        	"index": "winnerent_v1",
+        	"type": "winnerent"
+		},
+        "buyerent":{
+			"collect": "buyer_enterprise",
+        	"index": "buyerent_v1",
+        	"type": "buyerent"
+		},
+ 		"agencyent":{
+			"collect": "agency_enterprise",
+       	 	"index": "agencyent_v1",
+       		"type": "agencyent"
+		}
+    },
     "mongodb": {
         "addr": "10.172.242.243:27080,10.30.94.175:27081,10.81.232.246:27082",
         "pool": 10,

+ 45 - 12
udpcreateindex/src/main.go

@@ -13,17 +13,19 @@ import (
 )
 
 var (
-	Sysconfig                                                      map[string]interface{} //配置文件
-	mgo                                                            *mongodb.MongodbSim    //mongodb操作对象
-	extractmgo                                                     *mongodb.MongodbSim    //mongodb操作对象
-	udpclient                                                      mu.UdpClient           //udp对象
-	updport                                                        string
-	winner, winnerenterprise, bidding, biddingback, project, buyer map[string]interface{}
-	savesizei                                                      = 500
-	biddingIndexFields                                             = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
-	projectinfoFields                                              []string
-	multiIndex                                                     []string
-	BulkSize                                                       = 400
+	Sysconfig          map[string]interface{} //配置文件
+	mgo                *mongodb.MongodbSim    //mongodb操作对象
+	extractmgo         *mongodb.MongodbSim    //mongodb操作对象
+	mgostandard        *mongodb.MongodbSim    //mongodb操作对象
+	udpclient          mu.UdpClient           //udp对象
+	updport            string
+	savesizei          = 500
+	biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
+	projectinfoFields  []string
+	multiIndex         []string
+	BulkSize           = 400
+
+	winner, bidding, biddingback, project, buyer, standard map[string]interface{}
 )
 
 func init() {
@@ -32,7 +34,7 @@ func init() {
 	go checkMapJob()
 	updport, _ = Sysconfig["updport"].(string)
 	winner, _ = Sysconfig["winner"].(map[string]interface{})
-	winnerenterprise, _ = Sysconfig["winnerenterprise"].(map[string]interface{})
+	standard, _ = Sysconfig["standard"].(map[string]interface{})
 	buyer, _ = Sysconfig["buyer"].(map[string]interface{})
 	bidding, _ = Sysconfig["bidding"].(map[string]interface{})
 	biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
@@ -60,6 +62,13 @@ func init() {
 		}
 		extractmgo.InitPool()
 	}
+	mgostandard = &mongodb.MongodbSim{
+		MongodbAddr: standard["addr"].(string),
+		Size:        util.IntAllDef(standard["pool"], 5),
+		DbName:      standard["db"].(string),
+	}
+	mgostandard.InitPool()
+	log.Println(standard["addr"].(string))
 
 	econf := Sysconfig["elastic"].(map[string]interface{})
 	elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
@@ -173,6 +182,30 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					buyerTask(data, mapInfo)
 				}()
+			case "winnerent": //标准库
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					standardTask("winnerent", mapInfo)
+				}()
+			case "buyerent": //标准库
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					standardTask("buyerent", mapInfo)
+				}()
+			case "agencyent": //标准库
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					standardTask("agencyent", mapInfo)
+				}()
 			default:
 				pool <- true
 				go func() {

+ 178 - 0
udpcreateindex/src/standardata.go

@@ -0,0 +1,178 @@
+package main
+
+import (
+	"log"
+	"qfw/util"
+	elastic "qfw/util/elastic"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+func standardTask(stype string, mapInfo map[string]interface{}) {
+	defer util.Catch()
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": bson.M{
+				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	}
+	switch stype {
+	case "winnerent":
+		winnerEnt(q)
+	case "buyerent":
+		buyerEnt(q)
+	case "agencyent":
+		agencyEnt(q)
+	}
+}
+
+//winnerent
+func winnerEnt(q map[string]interface{}) {
+	session := mgostandard.GetMgoConn(3600)
+	defer mgostandard.DestoryMongoConn(session)
+	db, _ := standard["db"].(string)
+	winnerent, _ := standard["winnerent"].(map[string]interface{})
+	c, _ := winnerent["collect"].(string)
+	index, _ := winnerent["index"].(string)
+	itype, _ := winnerent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Iter()
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		//不生索引字段
+		delete(tmp, "partners")
+		delete(tmp, "wechat_accounts")
+		delete(tmp, "tmp_id")
+		tmp["company"] = tmp["company_name"]
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create winnerent index...over", n)
+}
+
+//buyerent
+func buyerEnt(q map[string]interface{}) {
+	session := mgostandard.GetMgoConn(3600)
+	defer mgostandard.DestoryMongoConn(session)
+	db, _ := standard["db"].(string)
+	buyerent, _ := standard["buyerent"].(map[string]interface{})
+	c, _ := buyerent["collect"].(string)
+	index, _ := buyerent["index"].(string)
+	itype, _ := buyerent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Iter()
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		//不生索引字段
+		delete(tmp, "partners")
+		delete(tmp, "wechat_accounts")
+		delete(tmp, "tmp_id")
+		tmp["buyer"] = tmp["buyer_name"]
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create buyerent index...over", n)
+}
+
+//agencyent
+func agencyEnt(q map[string]interface{}) {
+	session := mgostandard.GetMgoConn(3600)
+	defer mgostandard.DestoryMongoConn(session)
+	db, _ := standard["db"].(string)
+	agencyent, _ := standard["agencyent"].(map[string]interface{})
+	c, _ := agencyent["collect"].(string)
+	index, _ := agencyent["index"].(string)
+	itype, _ := agencyent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Iter()
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		//不生索引字段
+		delete(tmp, "partners")
+		delete(tmp, "wechat_accounts")
+		delete(tmp, "tmp_id")
+
+		tmp["agency"] = tmp["agency_name"]
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create agencyent index...over", n)
+}

+ 61 - 55
udps/main.go

@@ -6,72 +6,78 @@ import (
 	"log"
 	mu "mfw/util"
 	"net"
-	qu "qfw/util"
+	"os"
+	qutil "qfw/util"
+	"qfw/util/mongodb"
 	"time"
 
 	"gopkg.in/mgo.v2/bson"
 )
 
-var udpclient mu.UdpClient //udp对象
-var nextNodes []map[string]interface{}
-
-var startDate, endDate, ip, port, stype, sid, eid string
+var startDate, endDate string
 
 func main() {
-	//2015-11-03,2017-04-01
-	//2017-04-01,2017-06-01
-	//2017-06-01,2018-06-01
-	//2018-06-01,2019-02-20
-	/*
-ObjectId("5da3f31aa5cb26b9b798d3aa")
-ObjectId("5da418c4a5cb26b9b7e3e9a6")
-*/
-
-	flag.StringVar(&sid, "sid", "", "开始id")
-	flag.StringVar(&eid, "eid", "", "结束id")
+	ip, p, tmptime, tmpkey, id1, id2, stype, q, bkey, param := "", 0, 0, "", "", "", "", "", "", ""
 	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
-	flag.StringVar(&endDate, "end", "2019-11-10", "结束日期2006-01-02")
+	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
-	flag.StringVar(&port, "port", "1488", "dup端口")
-	flag.StringVar(&stype, "stype", "", "stype")
+	flag.IntVar(&p, "p", 0, "端口")
+	flag.IntVar(&tmptime, "tmptime", 0, "时间查询")
+	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")
+	flag.StringVar(&id1, "gtid", "", "gtid")
+	flag.StringVar(&id2, "lteid", "", "lteid")
+	flag.StringVar(&stype, "stype", "", "stype,传递类型")
+	flag.StringVar(&bkey, "bkey", "", "bkey,加上此参数表示不生关键词和摘要")
+	flag.StringVar(&q, "q", "", "q查询语句\"{'':''}\",有q就不要gtid,lteid")
+	flag.StringVar(&param, "param", "", "param,生信息发布或其他索引时用双引号套单引号\"{'mgoaddr':'','d':'','c':'','index':'','type':''}\"")
 	flag.Parse()
-	var startid, endid bson.ObjectId
-	if sid != "" && eid != "" {
-		startid = qu.StringTOBsonId(sid)
-		endid = qu.StringTOBsonId(eid)
-	} else {
-		start, _ := time.ParseInLocation(qu.Date_Short_Layout, startDate, time.Local)
-		end, _ := time.ParseInLocation(qu.Date_Short_Layout, endDate, time.Local)
-		startid = bson.NewObjectIdWithTime(start)
-		endid = bson.NewObjectIdWithTime(end)
+	if startDate != "" || endDate != "" {
+		start, _ := time.ParseInLocation(qutil.Date_Short_Layout, startDate, time.Local)
+		end, _ := time.ParseInLocation(qutil.Date_Short_Layout, endDate, time.Local)
+		id1 = qutil.BsonIdToSId(bson.NewObjectIdWithTime(start))
+		id2 = qutil.BsonIdToSId(bson.NewObjectIdWithTime(end))
+		log.Println(id1, id2)
 	}
-	log.Println(startid, endid, ip, port, stype)
-	udpclient = mu.UdpClient{Local: ":1470", BufSize: 1024}
-	udpclient.Listen(processUdpMsg)
-	by, _ := json.Marshal(map[string]interface{}{
-		"gtid":  startid,
-		"lteid": endid,
-		"stype": stype,
-	})
-	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-		IP:   net.ParseIP(ip),
-		Port: qu.IntAll(port),
-	})
-	b := make(chan bool, 1)
-	<-b
-}
-
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-	switch act {
-	case mu.OP_TYPE_DATA:
-		var mapInfo map[string]interface{}
-		err := json.Unmarshal(data, &mapInfo)
-		if err != nil {
-			log.Println(err)
-		} else {
-			log.Println(mapInfo)
+	if ip != "" && p > 0 && ((id1 != "" && id2 != "") || (q != "" || tmptime > 0)) {
+		toadd := &net.UDPAddr{
+			IP:   net.ParseIP(ip),
+			Port: p,
+		}
+		udp := mu.UdpClient{Local: ":50010", BufSize: 1024}
+		udp.Listen(func(b byte, data []byte, add *net.UDPAddr) {
+			switch b {
+			case mu.OP_NOOP:
+				log.Println(string(data))
+				os.Exit(0)
+			}
+		})
+		m1 := map[string]interface{}{
+			"gtid":  id1,
+			"lteid": id2,
+			"stype": stype,
 		}
-	case mu.OP_NOOP: //下个节点回应
-		log.Println("发送成功", string(data))
+		if bkey != "" {
+			m1["bkey"] = bkey
+		}
+		if q != "" {
+			m1["query"] = mongodb.ObjToMQ(q, true) //qutil.ObjToMap(q)
+		}
+		if tmptime > 0 && tmpkey != "" {
+			m1["query"] = map[string]interface{}{tmpkey: map[string]interface{}{"$gte": tmptime}}
+		}
+		if param != "" {
+			pm := qutil.ObjToMap(param)
+			for k, v := range *pm {
+				m1[k] = v
+			}
+		}
+
+		by, _ := json.Marshal(m1)
+		log.Println(string(by))
+		udp.WriteUdp(by, mu.OP_TYPE_DATA, toadd)
+		time.Sleep(30 * time.Second)
+	} else {
+		flag.PrintDefaults()
+		log.Println("参数错误.")
 	}
 }