소스 검색

winner索引修改

maxiaoshan 4 년 전
부모
커밋
d3f4af8b08
3개의 변경된 파일49개의 추가작업 그리고 36개의 파일을 삭제
  1. 2 2
      udpcreateindex/src/buyertask.go
  2. 3 3
      udpcreateindex/src/config.json
  3. 44 31
      udpcreateindex/src/winnertask.go

+ 2 - 2
udpcreateindex/src/buyertask.go

@@ -25,8 +25,8 @@ func buyerEsTaskOnce() {
 	task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
 	task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
 	log.Println("buyer 区间id:", task_sid, task_eid)
-	task_sid = "5e6611b7aec95406dccf7151"
-	task_eid = "5f7249164bdc0447a6c90fa5"
+	// task_sid = "5e6611b7aec95406dccf7151"
+	// task_eid = "5f7249164bdc0447a6c90fa5"
 	//区间id
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{

+ 3 - 3
udpcreateindex/src/config.json

@@ -80,11 +80,11 @@
     "standard": {
  		"addr": "192.168.3.207:27092",
         "pool": 10,
-        "db": "mxs",
+        "db": "mixdata",
     	"winnerent":{
 			"collect1": "winner_enterprise",
 			"collect2": "winner_err",
-        	"index": "winner",
+        	"index": "winner_v3",
         	"type": "winner"
 		},
         "buyerent":{
@@ -100,7 +100,7 @@
 		}
     },
     "elastic": {
-        "addr": "http://192.168.3.11:9800",
+        "addr": "http://192.168.3.128:9800",
         "index": "bidding",
         "itype": "bidding",
         "pool": 12

+ 44 - 31
udpcreateindex/src/winnertask.go

@@ -22,6 +22,8 @@ func winnerEsTaskOnce() {
 	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
 	task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
 	task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
+	// task_sid = "5e6598f82c27dc56292158da"
+	// task_eid = "5f80c8f89a0c261af872294c"
 	log.Println("winner 区间id:", task_sid, task_eid)
 	//区间id
 	q := map[string]interface{}{
@@ -33,7 +35,7 @@ func winnerEsTaskOnce() {
 	//参数
 	winnerent, _ := standard["winnerent"].(map[string]interface{})
 	win_ent := qu.ObjToString(winnerent["collect1"])
-	win_enterr := qu.ObjToString(winnerent["collect2"])
+	//win_enterr := qu.ObjToString(winnerent["collect2"])
 	index, _ := winnerent["index"].(string)
 	itype, _ := winnerent["type"].(string)
 	//mongo
@@ -58,38 +60,17 @@ func winnerEsTaskOnce() {
 			tmp_id := qu.BsonIdToSId(tmp["_id"])
 			savetmp["_id"] = tmp_id
 			savetmp["name"] = tmp["company_name"]
+			savetmp["winner_name"] = tmp["company_name"]
 			savetmp["pici"] = tmp["updatetime"]
-			winerEsLock.Lock()
-			arrEs = append(arrEs, savetmp)
-			if len(arrEs) >= BulkSize {
-				tmps := arrEs
-				elastic.BulkSave(index, itype, &tmps, true)
-				arrEs = []map[string]interface{}{}
+			if province := qu.ObjToString(tmp["province"]); province != "" {
+				savetmp["province"] = province
+
+			}
+			if city := qu.ObjToString(tmp["city"]); city != "" {
+				savetmp["city"] = city
+
 			}
-			winerEsLock.Unlock()
-		}(tmp)
-		tmp = make(map[string]interface{})
-	}
 
-	log.Println("q:", q, "db:", mgostandard.DbName, "coll:", win_enterr)
-	it_2 := sess.DB(mgostandard.DbName).C(win_enterr).Find(&q).Sort("_id").Iter()
-	num_2 := 0
-	for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
-		if num_2%100 == 0 && num_2 > 0 {
-			log.Println("当前表:", win_enterr, "数量:", num_2)
-		}
-		pool <- true
-		wg.Add(1)
-		go func(tmp map[string]interface{}) {
-			defer func() {
-				<-pool
-				wg.Done()
-			}()
-			savetmp := map[string]interface{}{}
-			tmp_id := qu.BsonIdToSId(tmp["_id"])
-			savetmp["_id"] = tmp_id
-			savetmp["name"] = tmp["name"]
-			savetmp["pici"] = tmp["updatetime"]
 			winerEsLock.Lock()
 			arrEs = append(arrEs, savetmp)
 			if len(arrEs) >= BulkSize {
@@ -102,6 +83,38 @@ func winnerEsTaskOnce() {
 		tmp = make(map[string]interface{})
 	}
 
+	// log.Println("q:", q, "db:", mgostandard.DbName, "coll:", win_enterr)
+	// it_2 := sess.DB(mgostandard.DbName).C(win_enterr).Find(&q).Sort("_id").Iter()
+	// num_2 := 0
+	// for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
+	// 	if num_2%100 == 0 && num_2 > 0 {
+	// 		log.Println("当前表:", win_enterr, "数量:", num_2)
+	// 	}
+	// 	pool <- true
+	// 	wg.Add(1)
+	// 	go func(tmp map[string]interface{}) {
+	// 		defer func() {
+	// 			<-pool
+	// 			wg.Done()
+	// 		}()
+	// 		savetmp := map[string]interface{}{}
+	// 		tmp_id := qu.BsonIdToSId(tmp["_id"])
+	// 		savetmp["_id"] = tmp_id
+	// 		savetmp["name"] = tmp["name"]
+	//      savetmp["winner_name"] = tmp["name"]
+	// 		savetmp["pici"] = tmp["updatetime"]
+	// 		winerEsLock.Lock()
+	// 		arrEs = append(arrEs, savetmp)
+	// 		if len(arrEs) >= BulkSize {
+	// 			tmps := arrEs
+	// 			elastic.BulkSave(index, itype, &tmps, true)
+	// 			arrEs = []map[string]interface{}{}
+	// 		}
+	// 		winerEsLock.Unlock()
+	// 	}(tmp)
+	// 	tmp = make(map[string]interface{})
+	// }
+
 	wg.Wait()
 	winerEsLock.Lock()
 	if len(arrEs) > 0 {
@@ -110,5 +123,5 @@ func winnerEsTaskOnce() {
 		arrEs = []map[string]interface{}{}
 	}
 	winerEsLock.Unlock()
-	log.Println("winnerextract  索引完毕!  总计:", num_1+num_2)
+	log.Println("winnerextract  索引完毕!  总计:", num_1)
 }