Ver Fonte

新增定时buyer索引任务

maxiaoshan há 4 anos atrás
pai
commit
65bb5f55ed

+ 143 - 0
udpcreateindex/src/buyertask.go

@@ -0,0 +1,143 @@
+package main
+
+import (
+	"log"
+	qu "qfw/util"
+	elastic "qfw/util/elastic"
+	"sync"
+	"time"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+var fieldArr = []string{"institute_type", "fixedphone", "mobilephone", "latestfixedphone", "latestmobilephone"}
+
+func buyerEsTaskOnce() {
+	defer qu.Catch()
+	arrEs := []map[string]interface{}{}
+	buyerEsLock := &sync.Mutex{}
+	pool := make(chan bool, 3)
+	wg := &sync.WaitGroup{}
+
+	now := time.Now()
+	preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
+	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
+	task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
+	task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
+	log.Println("buyer 区间id:", task_sid, task_eid)
+	//区间id
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": qu.StringTOBsonId(task_sid),
+			"$lt":  qu.StringTOBsonId(task_eid),
+		},
+	}
+	//参数
+	buyerent, _ := standard["buyerent"].(map[string]interface{})
+	buyer_ent := qu.ObjToString(buyerent["collect1"])
+	buyer_enterr := qu.ObjToString(buyerent["collect2"])
+	index, _ := buyerent["index"].(string)
+	itype, _ := buyerent["type"].(string)
+	//mongo
+	sess := mgostandard.GetMgoConn()
+	defer mgostandard.DestoryMongoConn(sess)
+
+	log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_ent)
+	it_1 := sess.DB(mgostandard.DbName).C(buyer_ent).Find(&q).Select(map[string]interface{}{
+		"buyer_name":        1,
+		"institute_type":    1,
+		"buyerclass":        1,
+		"fixedphone":        1,
+		"mobilephone":       1,
+		"latestfixedphone":  1,
+		"latestmobilephone": 1,
+	}).Sort("_id").Iter()
+	num_1 := 0
+	for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
+		if num_1%100 == 0 && num_1 > 0 {
+			log.Println("当前表:", buyer_ent, "数量:", num_1)
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			savetmp := map[string]interface{}{}
+			_id := qu.BsonIdToSId(tmp["_id"])
+			if buyerclass, ok := tmp["buyerclass"].([]interface{}); ok && len(buyerclass) > 0 {
+				for _, v := range qu.ObjArrToStringArr(buyerclass) {
+					if len(buyerclass) >= 2 && v != "其它" {
+						savetmp["buyerclass"] = v
+						break
+					} else if len(buyerclass) == 1 {
+						savetmp["buyerclass"] = v
+						break
+					}
+				}
+			}
+			savetmp["_id"] = _id
+			savetmp["name"] = tmp["buyer_name"]
+			savetmp["buyer_name"] = tmp["buyer_name"]
+			for _, f := range fieldArr {
+				if val := qu.ObjToString(tmp[f]); val != "" {
+					savetmp[f] = val
+				}
+			}
+			buyerEsLock.Lock()
+			arrEs = append(arrEs, savetmp)
+			if len(arrEs) >= BulkSize {
+				tmps := arrEs
+				elastic.BulkSave(index, itype, &tmps, true)
+				arrEs = []map[string]interface{}{}
+			}
+			buyerEsLock.Unlock()
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+
+	log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_enterr)
+	it_2 := sess.DB(mgostandard.DbName).C(buyer_enterr).Find(&q).Sort("_id").Iter()
+	num_2 := 0
+	for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
+		if num_2%100 == 0 && num_2 > 0 {
+			log.Println("当前表:", buyer_enterr, "数量:", num_2)
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			savetmp := map[string]interface{}{}
+			tmp_id := qu.BsonIdToSId(tmp["_id"])
+			savetmp["_id"] = tmp_id
+			savetmp["name"] = tmp["name"]
+			savetmp["buyer_name"] = tmp["name"]
+			if tmp["buyerclass"] != nil {
+				savetmp["buyerclass"] = tmp["buyerclass"]
+			}
+			buyerEsLock.Lock()
+			arrEs = append(arrEs, savetmp)
+			if len(arrEs) >= BulkSize {
+				tmps := arrEs
+				elastic.BulkSave(index, itype, &tmps, true)
+				arrEs = []map[string]interface{}{}
+			}
+			buyerEsLock.Unlock()
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+
+	wg.Wait()
+	buyerEsLock.Lock()
+	if len(arrEs) > 0 {
+		tmps := arrEs
+		elastic.BulkSave(index, itype, &tmps, true)
+		arrEs = []map[string]interface{}{}
+	}
+	buyerEsLock.Unlock()
+	log.Println("buyeres  索引完毕!  总计:", num_1+num_2)
+}

+ 5 - 4
udpcreateindex/src/config.json

@@ -88,9 +88,10 @@
         	"type": "winner"
 		},
         "buyerent":{
-			"collect": "buyer_enterprise",
-        	"index": "buyerent_v1",
-        	"type": "buyerent"
+			"collect1": "buyer_enterprise",
+			"collect2": "buyer_err",
+        	"index": "buyer_v2",
+        	"type": "buyer"
 		},
  		"agencyent":{
 			"collect": "agency_enterprise",
@@ -99,7 +100,7 @@
 		}
     },
     "elastic": {
-        "addr": "http://192.168.3.128:9800",
+        "addr": "http://192.168.3.11:9800",
         "pool": 12
     }
 }

+ 8 - 3
udpcreateindex/src/task.go

@@ -15,13 +15,18 @@ func task_index() {
 	//c.AddFunc("0 30 * * * *", func() { task_biddingfile() }) //每30分钟执行一次
 	//c.AddFunc("0 22 14 * * *", func() { task_qyxyindex() })
 
-	c.AddFunc("0 0 0 * * ?", func() { task_winnerextract() }) //每天凌晨执行一次生索引
+	c.AddFunc("0 0 0 * * ?", func() { task_winneres() }) //每天凌晨执行一次winner生索引
+	c.AddFunc("0 0 1 * * ?", func() { task_buyeres() })  //每天1点执行一次buyer生索引
 	c.Start()
 }
-func task_winnerextract() {
-	log.Println("开始执行一次定时任务,winnerextract")
+func task_winneres() {
+	log.Println("定时任务,winneres")
 	winnerEsTaskOnce()
 }
+func task_buyeres() {
+	log.Println("定时任务,buyeres")
+	buyerEsTaskOnce()
+}
 
 //招标附件、标的物,临时用
 func task_biddingfile() {

+ 1 - 1
udpcreateindex/src/winnerextract.go → udpcreateindex/src/winnertask.go

@@ -22,7 +22,7 @@ func winnerEsTaskOnce() {
 	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
 	task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
 	task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
-	log.Println("区间id:", task_sid, task_eid)
+	log.Println("winner 区间id:", task_sid, task_eid)
 	//区间id
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{