maxiaoshan 4 lat temu
rodzic
commit
1f0b269335
1 zmienionych plików z 95 dodań i 1 usunięć
  1. 95 1
      udpcreateindex/src/buyerindex.go

+ 95 - 1
udpcreateindex/src/buyerindex.go

@@ -71,7 +71,8 @@ func buyerTask(data []byte, mapInfo map[string]interface{}) {
 }
 */
 
-func buyerTask(data []byte, mapInfo map[string]interface{}) {
+//buyer_err
+func buyerTask_err(data []byte, mapInfo map[string]interface{}) {
 	defer util.Catch()
 	q, _ := mapInfo["query"].(map[string]interface{})
 	if q == nil {
@@ -158,3 +159,96 @@ func buyerTask(data []byte, mapInfo map[string]interface{}) {
 	buyerEsLock.Unlock()
 	log.Println(mapInfo, "create buyer index...over", i)
 }
+
+//buyer_enterprise
+func buyerTask(data []byte, mapInfo map[string]interface{}) {
+	defer util.Catch()
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": bson.M{
+				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	}
+	//mongo
+	sess := mgostandard.GetMgoConn()
+	defer mgostandard.DestoryMongoConn(sess)
+	c, _ := buyer["collect"].(string)
+	db, _ := buyer["db"].(string)
+	index, _ := buyer["index"].(string)
+	itype, _ := buyer["type"].(string)
+
+	count, _ := sess.DB(db).C(c).Find(&q).Count()
+	log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
+	it := sess.DB(db).C(c).Find(&q).Select(map[string]interface{}{
+		"buyer_name":        1,
+		"institute_type":    1,
+		"buyerclass":        1,
+		"fixedphone":        1,
+		"mobilephone":       1,
+		"latestfixedphone":  1,
+		"latestmobilephone": 1,
+		"province":          1,
+		"city":              1,
+	}).Sort("_id").Iter()
+
+	arrEs := []map[string]interface{}{}
+	buyerEsLock := &sync.Mutex{}
+	pool := make(chan bool, 3)
+	wg := &sync.WaitGroup{}
+	i := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); i = i + 1 {
+		if i%1000 == 0 {
+			log.Println("current:", i)
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			savetmp := map[string]interface{}{}
+			_id := util.BsonIdToSId(tmp["_id"])
+			if buyerclass, ok := tmp["buyerclass"].([]interface{}); ok && len(buyerclass) > 0 {
+				for _, v := range util.ObjArrToStringArr(buyerclass) {
+					if len(buyerclass) >= 2 && v != "其它" {
+						savetmp["buyerclass"] = v
+						break
+					} else if len(buyerclass) == 1 {
+						savetmp["buyerclass"] = v
+						break
+					}
+				}
+			}
+			savetmp["_id"] = _id
+			savetmp["name"] = tmp["buyer_name"]
+			savetmp["buyer_name"] = tmp["buyer_name"]
+			for _, f := range fieldArr {
+				if val := util.ObjToString(tmp[f]); val != "" {
+					savetmp[f] = val
+				}
+			}
+			buyerEsLock.Lock()
+			arrEs = append(arrEs, savetmp)
+			if len(arrEs) >= BulkSize {
+				tmps := arrEs
+				elastic.BulkSave(index, itype, &tmps, true)
+				arrEs = []map[string]interface{}{}
+			}
+			buyerEsLock.Unlock()
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	buyerEsLock.Lock()
+	if len(arrEs) > 0 {
+		tmps := arrEs
+		elastic.BulkSave(index, itype, &tmps, true)
+		arrEs = []map[string]interface{}{}
+	}
+	buyerEsLock.Unlock()
+	log.Println(mapInfo, "create buyer index...over", i)
+}