Эх сурвалжийг харах

更新 采购单位 全量同步

wcc 1 жил өмнө
parent
commit
14dff100fc
1 өөрчлөгдсөн 43 нэмэгдсэн , 30 устгасан
  1. 43 30
      createEsIndex/buyertask.go

+ 43 - 30
createEsIndex/buyertask.go

@@ -331,9 +331,11 @@ func initSpecialNames() {
 
 // buyerAll 采购单位全量数据
 func buyerAll() {
-	if len(specialNames) < 1 {
-		initSpecialNames()
-	}
+	arrEs := make([]map[string]interface{}, 0) //最终生索引数据
+	winerEsLock := &sync.Mutex{}
+	//if len(specialNames) < 1 {
+	//	initSpecialNames()
+	//}
 
 	countSql := fmt.Sprintf(`SELECT	count(id)	FROM dws_f_ent_tags `)
 	dataCounts := Mysql.CountBySql(countSql)
@@ -355,7 +357,7 @@ func buyerAll() {
 	}
 
 	log.Info("buyerAll", zap.Int("finalId", finalId))
-	buyerPool := make(chan bool, 3) //控制线程数
+	buyerPool := make(chan bool, 10) //控制线程数
 	wg := &sync.WaitGroup{}
 
 	lastid, total := 0, 0
@@ -367,6 +369,7 @@ func buyerAll() {
                b.name, 
 			   b.seo_id,
                t.id,
+               t.status,
                t.name_id, 
 			   b.company_id,
                t.createtime,
@@ -381,10 +384,10 @@ func buyerAll() {
                LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
                LEFT JOIN code_area AS c ON b.city_code = c.code 
 
-			WHERE  t.id > %d 
+			WHERE  t.id > %d and t.status > 0
            ORDER BY t.id ASC
           LIMIT %d;
-      `, lastid, 200)
+      `, lastid, 1000)
 
 		ctx := context.Background()
 		rows, err := Mysql.DB.QueryContext(ctx, query)
@@ -427,7 +430,7 @@ func buyerAll() {
 
 			lastid = util.IntAll(ret["id"])
 			total++
-			if total%1000 == 0 {
+			if total%2000 == 0 {
 				log.Info("buyerAll", zap.Int("current total", total), zap.Int("lastid", lastid))
 			}
 
@@ -439,29 +442,29 @@ func buyerAll() {
 					<-buyerPool
 					wg.Done()
 				}()
-				//MySQL 更新
-				update := map[string]interface{}{}
 				name := util.ObjToString(tmp["name"])
+				//MySQL 更新
+				//update := map[string]interface{}{}
 				//company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里
-				if util.ObjToString(tmp["company_id"]) != "" || specialNames[name] {
-					realCount++
-					update["status"] = 1
-				} else if ruleBuyer(name) { //不符合条件,排除
-					update["status"] = -1
-				} else { //默认2,认为可信
-					realCount++
-					update["status"] = 2
-				}
-				//1.更新MySQL
-				where := map[string]interface{}{
-					"name_id": tmp["name_id"],
-				}
-				if len(update) > 0 {
-					Mysql.Update("dws_f_ent_tags", where, update)
-				}
+				//if util.ObjToString(tmp["company_id"]) != "" || specialNames[name] {
+				//	realCount++
+				//	update["status"] = 1
+				//} else if ruleBuyer(name) { //不符合条件,排除
+				//	update["status"] = -1
+				//} else { //默认2,认为可信
+				//	realCount++
+				//	update["status"] = 2
+				//}
+				////1.更新MySQL
+				//where := map[string]interface{}{
+				//	"name_id": tmp["name_id"],
+				//}
+				//if len(update) > 0 {
+				//	Mysql.Update("dws_f_ent_tags", where, update)
+				//}
 
 				//2.生索引,status = 1或者2 才生索引
-				if util.IntAll(update["status"]) == 1 || util.IntAll(update["status"]) == 2 {
+				if util.IntAll(tmp["status"]) == 1 || util.IntAll(tmp["status"]) == 2 {
 					data := make(map[string]interface{}, 0)
 					data["name"] = name
 					data["name_id"] = tmp["name_id"]
@@ -493,12 +496,16 @@ func buyerAll() {
 					} else {
 						data["is_contact"] = false
 					}
-					arrEs := make([]map[string]interface{}, 0) //最终生索引数据
+
+					//写入es
+					winerEsLock.Lock()
 					arrEs = append(arrEs, data)
-					err = Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
-					if err != nil {
-						log.Info("buyerAll", zap.Any("InsertOrUpdate err", err))
+					if len(arrEs) >= EsBulkSize {
+						tmps := arrEs
+						Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
+						arrEs = []map[string]interface{}{}
 					}
+					winerEsLock.Unlock()
 				}
 
 			}(ret)
@@ -517,6 +524,12 @@ func buyerAll() {
 		wg.Wait()
 	}
 
+	if len(arrEs) > 0 {
+		tmps := arrEs
+		Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
+		arrEs = []map[string]interface{}{}
+	}
+
 	log.Info("buyerAll", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount))
 
 }