瀏覽代碼

取消 附件文本长度限制;优化采购单位数据

wcc 2 年之前
父節點
當前提交
6ec3114db9
共有 5 個文件被更改,包括 152 次插入111 次删除
  1. 22 26
      createEsIndex/bidding_es.go
  2. 78 38
      createEsIndex/buyertask.go
  3. 2 2
      createEsIndex/common.toml
  4. 11 7
      createEsIndex/init.go
  5. 39 38
      createEsIndex/main.go

+ 22 - 26
createEsIndex/bidding_es.go

@@ -18,7 +18,6 @@ import (
 	"strings"
 	"sync"
 	"time"
-	"unicode/utf8"
 )
 
 var (
@@ -362,7 +361,7 @@ func biddingTaskById(mapInfo map[string]interface{}) {
 func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{}, map[string]interface{}) {
 	newTmp := make(map[string]interface{})
 	update := make(map[string]interface{}) // bidding 修改字段
-	//saveErr := make(map[string]interface{})
+	saveErr := make(map[string]interface{})
 	//for field, ftype := range config.Conf.DB.Es.FieldEs {
 	for field, ftype := range BiddingField {
 		if tmp[field] != nil { //
@@ -517,14 +516,14 @@ func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{
 			}
 		}
 	}
+	// 附件内容长度不做限制,大于20万字符,只做记录
 	filetext := getFileText(tmp)
 	if len([]rune(filetext)) > 10 {
 		newTmp["filetext"] = filetext
-		// 新版不再需要记录
-		//if len(filetext) > pscopeLength {
-		//	saveErr["filetext"] = filetext
-		//	saveErr["filetext_length"] = len(filetext)
-		//}
+		if len([]rune(filetext)) > fileLength {
+			saveErr["filetext"] = filetext
+			saveErr["filetext_length"] = len([]rune(filetext))
+		}
 	}
 	YuceEndtime(newTmp) // 预测结果时间
 	if stype == "bidding" || stype == "bidding_history" || stype == "index-by-id" {
@@ -533,10 +532,11 @@ func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{
 		update["pici"] = time.Now().Unix()
 	}
 
-	//if len(saveErr) > 0 {
-	//	saveErr["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
-	//	saveErrBidPool <- saveErr
-	//}
+	if len(saveErr) > 0 {
+		saveErr["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
+		saveErr["time"] = time.Now().Unix()
+		saveErrBidPool <- saveErr
+	}
 	return newTmp, update
 }
 
@@ -589,26 +589,22 @@ func getFileText(tmp map[string]interface{}) (filetext string) {
 					if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
 						if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" {
 							bs := oss.OssGetObject(attach_url, mongodb.BsonIdToSId(tmp["_id"])) //oss读数据
-							//正式环境
-							if utf8.RuneCountInString(filetext+bs) < fileLength {
+							if len(bs) > 0 {
 								filetext += bs + "\n"
-							} else {
-								if utf8.RuneCountInString(bs) > fileLength {
-									filetext = bs[0:fileLength]
-								} else {
-									filetext = bs
-								}
-								break
 							}
-							//测试环境
-							//if len(filetext) > 500000 {
-							//	filetext = filetext[0:500000]
-							//	break
+
+							//正式环境
+							//if utf8.RuneCountInString(filetext+bs) < fileLength {
+							//	filetext += bs + "\n"
 							//} else {
-							//	if len(bs) <= 500000 {
-							//		filetext += bs + "\n"
+							//	if utf8.RuneCountInString(bs) > fileLength {
+							//		filetext = bs[0:fileLength]
+							//	} else {
+							//		filetext = bs
 							//	}
+							//	break
 							//}
+
 						}
 					}
 				}

+ 78 - 38
createEsIndex/buyertask.go

@@ -21,27 +21,32 @@ func buyerOnce() {
 		//now := time.Date(year, month, day, 0, 0, 0, 0, time.Local)
 		now := time.Now()
 		curTime := now.Format("2006-01-02")
-		insertquery := fmt.Sprintf(`
-            SELECT
-                b.name, 
-                t.id,
-                t.name_id, 
-                c.area, 
-                c.city, 
-                class.name AS buyerclass 
-               
-            FROM 
-                dws_f_ent_tags AS t 
-                LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
-                LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
-                LEFT JOIN code_area AS c ON b.city_code = c.code 
+		query := fmt.Sprintf(`
+           SELECT
+               b.name, 
+               t.id,
+               t.name_id, 
+               t.createtime,
+               t.updatetime,
+               c.area, 
+               c.city, 
+               class.name AS buyerclass 
+              
+           FROM 
+               dws_f_ent_tags AS t 
+               LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
+               LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
+               LEFT JOIN code_area AS c ON b.city_code = c.code 
 
 			WHERE  t.createtime > '%v' OR t.updatetime > '%v'
-            ORDER BY t.id ASC
-            LIMIT %d, %d;
-        `, curTime, curTime, offset, rowsPerPage)
+           ORDER BY t.createtime ASC
+           LIMIT %d, %d;
+       `, curTime, curTime, offset, rowsPerPage)
 
-		result := MysqlB.SelectBySql(insertquery)
+		result := MysqlB.SelectBySql(query)
+		if result == nil {
+			break
+		}
 
 		if len(*result) > 0 {
 			for _, re := range *result {
@@ -52,6 +57,20 @@ func buyerOnce() {
 				tmp["province"] = re["area"]
 				tmp["city"] = re["city"]
 				tmp["buyerclass"] = re["buyerclass"]
+
+				if re["createtime"] != nil {
+					if createtime, ok := re["createtime"].(time.Time); ok {
+						tmp["createtime"] = createtime.Unix()
+						if re["updatetime"] != nil {
+							if updatetime, ok := re["updatetime"].(time.Time); ok {
+								tmp["updatetime"] = updatetime.Unix()
+							}
+						} else {
+							tmp["updatetime"] = createtime.Unix()
+						}
+					}
+				}
+
 				sql := fmt.Sprintf(`select * from dws_f_ent_contact where name_id = '%v'`, re["name_id"])
 
 				counts := MysqlB.SelectBySql(sql)
@@ -64,7 +83,10 @@ func buyerOnce() {
 			}
 			total = total + len(arrEs)
 
-			Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
+			err := Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
+			if err != nil {
+				log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err))
+			}
 		}
 
 		if len(arrEs) < rowsPerPage {
@@ -79,7 +101,7 @@ func buyerOnce() {
 
 //buyerall 全量数据
 func buyerall() {
-	rowsPerPage := 10000
+	rowsPerPage := 5000
 	currentPage := 1
 	total := 0
 
@@ -88,27 +110,33 @@ func buyerall() {
 		arrEs := make([]map[string]interface{}, 0)
 		offset := (currentPage - 1) * rowsPerPage
 		query := fmt.Sprintf(`
-             SELECT
-                b.name, 
-                t.id,
-                t.name_id, 
-                c.area, 
-                c.city, 
-                class.name AS buyerclass
-               
-            FROM 
-                dws_f_ent_tags AS t 
-                LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
-                LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
-                LEFT JOIN code_area AS c ON b.city_code = c.code
-
-            ORDER BY t.id ASC
+            SELECT
+               b.name, 
+               t.id,
+               t.name_id, 
+               t.createtime,
+               t.updatetime,
+               c.area, 
+               c.city, 
+               class.name AS buyerclass
+              
+           FROM 
+               dws_f_ent_tags AS t 
+               LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
+               LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
+               LEFT JOIN code_area AS c ON b.city_code = c.code
+
+           ORDER BY t.createtime ASC
 			
-            LIMIT %d, %d;
-        `, offset, rowsPerPage)
+           LIMIT %d, %d;
+       `, offset, rowsPerPage)
 
 		result := MysqlB.SelectBySql(query)
 
+		if result == nil {
+			break
+		}
+
 		if len(*result) > 0 {
 			for _, re := range *result {
 				tmp := make(map[string]interface{}, 0)
@@ -119,8 +147,20 @@ func buyerall() {
 				tmp["province"] = re["area"]
 				tmp["city"] = re["city"]
 				tmp["buyerclass"] = re["buyerclass"]
+				if re["createtime"] != nil {
+					if createtime, ok := re["createtime"].(time.Time); ok {
+						tmp["createtime"] = createtime.Unix()
+						if re["updatetime"] != nil {
+							if updatetime, ok := re["updatetime"].(time.Time); ok {
+								tmp["updatetime"] = updatetime.Unix()
+							}
+						} else {
+							tmp["updatetime"] = createtime.Unix()
+						}
+					}
+				}
 
-				sql := fmt.Sprintf(`select * from dws_f_ent_contact where name_id = '%v'`, re["name_id"])
+				sql := fmt.Sprintf(`select id from dws_f_ent_contact where name_id = '%v'`, re["name_id"])
 
 				counts := MysqlB.SelectBySql(sql)
 				if len(*counts) > 0 {

+ 2 - 2
createEsIndex/common.toml

@@ -45,7 +45,7 @@
     bucketname = "topjy"
 [db.es]
     addr = "http://192.168.3.149:9200"      ## 正常bidding 链接
-    addrp = "http://172.17.145.178:9200" ## 采集使用的单机版地址
+    addrp = "http://192.168.3.149:9200"   ## 采集使用的单机版地址
     username = "es_all"
     password = "TopJkO2E_d1x"
     size = 5
@@ -53,7 +53,7 @@
     indextmp = "bidding_temporary"       ## 临时索引,其他程序需要
     indexp = "projectset_v1"
     indexwinner = "winner"
-    indexbuyer = "buyer_v1"
+    indexbuyer = "buyer_v2"
 detailfilter = ["(招标网|千里马|采招网|招标采购导航网|招标与采购网|中国招投标网|中国采购与招标网|中国采购与招标|优质采)[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}(qianlima|zhaobiao|okcis|zbytb|infobidding|bidcenter|youzhicai|chinabidding|Chinabidding|CHINABIDDING)[a-z0-9.\\/\\/]{0,40}",
     "招标网[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}zhaobiao[a-z0-9.\\/\\/]{0,40}",
     "千里马[\\w\\W]{0,15}[a-z0-9:\\/\\/.]{0,20}qianlima[a-z0-9.\\/\\/]{0,10}",

+ 11 - 7
createEsIndex/init.go

@@ -43,7 +43,7 @@ func InitLog() {
 		os.Exit(1)
 	}
 
-	log.Info("InitLog", zap.Any("duration", time.Since(now)))
+	log.Info("InitLog", zap.Any("duration", time.Since(now).Seconds()))
 }
 
 func InitMgo() {
@@ -56,6 +56,7 @@ func InitMgo() {
 		Password:    config.Conf.DB.MongoB.Password,
 	}
 	MgoB.InitPool()
+	log.Info("InitMgo", zap.Any("MgoB duration", time.Since(now).Seconds()))
 	MgoP = &mongodb.MongodbSim{
 		MongodbAddr: config.Conf.DB.MongoP.Addr,
 		DbName:      config.Conf.DB.MongoP.Dbname,
@@ -64,6 +65,8 @@ func InitMgo() {
 		Password:    config.Conf.DB.MongoP.Password,
 	}
 	MgoP.InitPool()
+	log.Info("InitMgo", zap.Any("MgoP duration", time.Since(now).Seconds()))
+
 	MgoQ = &mongodb.MongodbSim{
 		MongodbAddr: config.Conf.DB.MongoQ.Addr,
 		DbName:      config.Conf.DB.MongoQ.Dbname,
@@ -72,7 +75,7 @@ func InitMgo() {
 		Password:    config.Conf.DB.MongoQ.Password,
 	}
 	MgoQ.InitPool()
-
+	log.Info("InitMgo", zap.Any("MgoQ duration", time.Since(now).Seconds()))
 	MysqlB = &mysqldb.Mysql{
 		Address:  config.Conf.DB.MysqlB.Addr,
 		DBName:   config.Conf.DB.MysqlB.Dbname,
@@ -81,7 +84,7 @@ func InitMgo() {
 	}
 	MysqlB.Init()
 
-	log.Info("InitMgo", zap.Any("duration", time.Since(now)))
+	log.Info("InitMgo", zap.Any("MysqlB duration", time.Since(now).Seconds()))
 }
 
 func InitEs() {
@@ -101,7 +104,8 @@ func InitEs() {
 		Password: config.Conf.DB.Es.Password,
 	}
 	Es1.InitElasticSize()
-	log.Info("InitEs", zap.Any("duration", time.Since(now)))
+
+	log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
 }
 
 func InitField() {
@@ -117,7 +121,7 @@ func InitField() {
 		}
 	}
 	log.Info("InitField", zap.Int("ProjectField", len(ProjectField)), zap.Int("ProjectListF", len(ProjectListF)))
-	log.Info("InitField", zap.Any("duration", time.Since(now)))
+	log.Info("InitField", zap.Any("duration", time.Since(now).Seconds()))
 }
 
 //InitEsBiddingField 初始化 bidding 索引字段
@@ -141,7 +145,7 @@ func InitEsBiddingField() {
 	}
 	log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
 	log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
-	log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now)))
+	log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
 }
 
 //verifyESFields 验证es 定义字段类型和 MongoDB 数据字段
@@ -230,7 +234,7 @@ func verifyESFields() {
 	} else {
 		log.Info("es 字段类型检测结束,", zap.Int("所有字段都符合,检测字段数量为:", len(okField)))
 	}
-	log.Info("verifyESFields", zap.Any("duration", time.Since(now)))
+	log.Info("verifyESFields", zap.Any("duration", time.Since(now).Seconds()))
 }
 
 func GetIndexName(client *es7.Client, name string) (string, error) {

+ 39 - 38
createEsIndex/main.go

@@ -42,11 +42,11 @@ var (
 	saveEsAllPool     = make(chan map[string]interface{}, 5000) //存储单机版es,爬虫采集判重使用
 	saveEsAllSp       = make(chan bool, 5)
 
-	//saveErrBidPool = make(chan map[string]interface{}, 5000)
-	//saveBidSp      = make(chan bool, 5)
+	saveErrBidPool = make(chan map[string]interface{}, 5000)
+	saveBidSp      = make(chan bool, 5)
 
 	//detailLength = 50000 // es保存detail长度
-	fileLength = 50000 // es保存附件文本长度
+	fileLength = 200000 // es保存附件文本长度,大于20万 时做一个日志记录
 	//pscopeLength = 32766 // projectscope长度
 
 )
@@ -79,7 +79,7 @@ func main() {
 	go SaveAllEsMethod()
 	go SaveProjectEs()
 
-	//go SaveBidErr()
+	go SaveBidErr()
 
 	UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
 	UdpClient.Listen(processUdpMsg)
@@ -267,40 +267,41 @@ func UpdateBidding() {
 	}
 }
 
-//func SaveBidErr() {
-//	arru := make([]map[string]interface{}, 200)
-//	indexu := 0
-//	for {
-//		select {
-//		case v := <-saveErrBidPool:
-//			arru[indexu] = v
-//			indexu++
-//			if indexu == 200 {
-//				saveBidSp <- true
-//				go func(arru []map[string]interface{}) {
-//					defer func() {
-//						<-saveBidSp
-//					}()
-//					MgoB.SaveBulk("bidding_es_err_record", arru...)
-//				}(arru)
-//				arru = make([]map[string]interface{}, 200)
-//				indexu = 0
-//			}
-//		case <-time.After(1000 * time.Millisecond):
-//			if indexu > 0 {
-//				saveBidSp <- true
-//				go func(arru []map[string]interface{}) {
-//					defer func() {
-//						<-saveBidSp
-//					}()
-//					MgoB.SaveBulk("bidding_es_err_record", arru...)
-//				}(arru[:indexu])
-//				arru = make([]map[string]interface{}, 200)
-//				indexu = 0
-//			}
-//		}
-//	}
-//}
+//SaveBidErr 记录错误信息,暂时记录 附件过长的
+func SaveBidErr() {
+	arru := make([]map[string]interface{}, 200)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveErrBidPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == 200 {
+				saveBidSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBidSp
+					}()
+					MgoB.SaveBulk("bidding_es_err_record", arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, 200)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveBidSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBidSp
+					}()
+					MgoB.SaveBulk("bidding_es_err_record", arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, 200)
+				indexu = 0
+			}
+		}
+	}
+}
 
 //SaveEsMethod 保存到es
 func SaveEsMethod() {