Răsfoiți Sursa

企业信用index

maxiaoshan 5 ani în urmă
părinte
comite
aab739c3e1

+ 1 - 1
udpcreateindex/src/bidingpurchasing.go

@@ -14,7 +14,7 @@ import (
 //定时查询bidding中extract_state为2的数据生成索引
 func biddingPurchaingTask(q map[string]interface{}) {
 	defer util.Catch()
-	//线程池
+	//
 	SaveUpdageLock := sync.Mutex{}
 	//连接参数
 	c, _ := bidding["collect"].(string)   //bidding表

+ 8 - 0
udpcreateindex/src/config.json

@@ -58,6 +58,14 @@
         "index": "project_v2",
         "type": "project"
     },
+    "qyxy_ent": {
+		"addr": "172.17.145.163:27082",
+		"pool": 5,
+        "db": "ent2020",
+        "collect": "qyxy_ent",
+        "index": "qyxy",
+        "type": "qyxy"
+    },
     "standard": {
  		"addr": "192.168.3.207:27092",
         "size": 10,

+ 13 - 2
udpcreateindex/src/main.go

@@ -20,6 +20,7 @@ var (
 	extractmgo         *mongodb.MongodbSim    //mongodb操作对象
 	project2db         *mongodb.MongodbSim    //mongodb操作对象
 	mgostandard        *mongodb.MongodbSim    //mongodb操作对象
+	qyxydb             *mongodb.MongodbSim    //mongodb操作对象
 	udpclient          mu.UdpClient           //udp对象
 	updport            string
 	savesizei          = 500
@@ -28,7 +29,7 @@ var (
 	multiIndex         []string
 	BulkSize           = 400
 
-	winner, bidding, biddingback, project, project2, buyer, standard map[string]interface{}
+	winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
 )
 
 func init() {
@@ -43,6 +44,7 @@ func init() {
 	biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
 	project, _ = Sysconfig["project"].(map[string]interface{})
 	project2, _ = Sysconfig["project2"].(map[string]interface{})
+	qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
 	mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
 	mgo = &mongodb.MongodbSim{ //mongodb为binding连接
 		MongodbAddr: mconf["addr"].(string),
@@ -50,12 +52,22 @@ func init() {
 		DbName:      mconf["db"].(string),
 	}
 	mgo.InitPool()
+
 	project2db = &mongodb.MongodbSim{
 		MongodbAddr: project2["addr"].(string),
 		Size:        util.IntAllDef(project2["pool"], 5),
 		DbName:      project2["db"].(string),
 	}
 	project2db.InitPool()
+
+	//企业信用
+	qyxydb = &mongodb.MongodbSim{
+		MongodbAddr: qyxy_ent["addr"].(string),
+		Size:        util.IntAllDef(qyxy_ent["pool"], 5),
+		DbName:      qyxy_ent["db"].(string),
+	}
+	qyxydb.InitPool()
+
 	savedb, _ := Sysconfig["savedb"].(map[string]interface{})
 	if savedb == nil {
 		log.Println("未设置保存数据库,默认使用招标库")
@@ -102,7 +114,6 @@ func init() {
 }
 
 func main() {
-	//go task_biddingfile()
 	go task_index()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}

+ 52 - 0
udpcreateindex/src/qyxyindex.go

@@ -0,0 +1,52 @@
+package main
+
+import (
+	"log"
+	"qfw/util"
+	elastic "qfw/util/elastic"
+)
+
+func qyxyTask(q map[string]interface{}) {
+	defer util.Catch()
+	//连接
+	session := qyxydb.GetMgoConn(86400)
+	defer qyxydb.DestoryMongoConn(session)
+	//
+	c, _ := qyxy_ent["collect"].(string)
+	db, _ := qyxy_ent["db"].(string)
+	index, _ := qyxy_ent["index"].(string)
+	itype, _ := qyxy_ent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println("企业信用索引	查询语句:", q, "同步总数:", count, "elastic库:", index)
+	query := session.DB(db).C(c).Find(q).Iter()
+
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create qyxy index...over", n)
+}

+ 8 - 0
udpcreateindex/src/task.go

@@ -13,6 +13,7 @@ func task_index() {
 	c := cron.New()
 	c.AddFunc("20 30 5 * * *", func() { task_projects() })
 	c.AddFunc("0 15 * * * *", func() { task_biddingfile() }) //每两小时执行一次
+	c.AddFunc("0 22 14 * * *", func() { task_qyxyindex() })
 	c.Start()
 }
 
@@ -42,3 +43,10 @@ func task_projects() {
 	}
 	projectTask([]byte{}, project2, mapInfo)
 }
+
+//企业信用信息
+func task_qyxyindex() {
+	defer qutil.Catch()
+	q := map[string]interface{}{}
+	qyxyTask(q)
+}