Bläddra i källkod

更新 catch 保存

wcc 4 dagar sedan
förälder
incheckning
65fe62c61f
4 ändrade filer med 131 tillägg och 11 borttagningar
  1. 12 0
      createEsIndex/bidding_es.go
  2. BIN
      createEsIndex/createindex
  3. 61 1
      createEsIndex/init.go
  4. 58 10
      createEsIndex/utils.go

+ 12 - 0
createEsIndex/bidding_es.go

@@ -548,6 +548,11 @@ func biddingTaskById(mapInfo map[string]interface{}) {
 		(*tmp)["detail"] = val
 	}
 
+	//针对产权数据,暂时不入es 索引库
+	if util.IntAll((*tmp)["infoformat"]) == 3 {
+		return
+	}
+
 	if util.IntAll((*tmp)["extracttype"]) == 1 {
 		newTmp, update := GetEsField(*tmp, stype)
 		newTmp["dataweight"] = 0                        //索引数据新增 jy置顶字段
@@ -559,6 +564,13 @@ func biddingTaskById(mapInfo map[string]interface{}) {
 			//	{"$set": update},
 			//}
 		}
+
+		newTmp["stype"] = stype
+		pici := util.Int64All((*tmp)["pici"])
+		//MongoDB 存在pici 字段
+		if pici > 0 {
+			newTmp["pici"] = pici
+		}
 		saveEsPool <- newTmp
 	}
 	log.Info("biddingTaskById over", zap.Any("mapInfo", mapInfo))

BIN
createEsIndex/createindex


+ 61 - 1
createEsIndex/init.go

@@ -15,6 +15,7 @@ import (
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb"
 	"os"
+	"path/filepath"
 	"strings"
 	"sync"
 	"time"
@@ -438,7 +439,7 @@ func GetIndexName(client *es7.Client, name string) (string, error) {
 }
 
 // InitBitmap 初始化项目名称副标题 bitmap
-func InitBitmap() {
+func InitBitmap2() {
 	if config.Conf.Env.Dbfile != "" {
 		dbfile = &config.Conf.Env.Dbfile
 	}
@@ -471,6 +472,65 @@ func InitBitmap() {
 	}()
 }
 
+func InitBitmap() {
+	if config.Conf.Env.Dbfile != "" {
+		dbfile = &config.Conf.Env.Dbfile
+	}
+	log.Info("InitBitmap", zap.String("dbfile", *dbfile))
+
+	// 1. 尝试加载主文件
+	if tryLoadBitmapFile(*dbfile) {
+		log.Info("InitBitmap", zap.String("状态", "主文件加载成功"))
+	} else if tryLoadBitmapFile(*dbfile + ".bak") {
+		log.Warn("InitBitmap", zap.String("警告", "主文件加载失败,已回退至备份文件"))
+	} else {
+		// 2. 两个文件都失败,删除旧文件并加载指定备份文件
+		log.Error("InitBitmap", zap.String("错误", "主文件和备份文件都无法加载,使用手动备份恢复"))
+		// 删除损坏的文件
+		os.Remove(*dbfile)
+		os.Remove(*dbfile + ".bak")
+
+		// 3. 加载你指定的备份文件 db_0730
+		backupFile := strings.TrimSuffix(*dbfile, filepath.Ext(*dbfile)) + "_0730"
+		if tryLoadBitmapFile(backupFile) {
+			log.Info("InitBitmap", zap.String("恢复状态", "成功加载手动备份 "+backupFile))
+		} else {
+			log.Error("InitBitmap", zap.String("恢复状态", "手动备份文件也加载失败,将初始化空缓存"))
+			cache = roaring.NewBitmap()
+		}
+	}
+
+	// 后台保存逻辑保持不变
+	go func() {
+		for {
+			time.Sleep(10 * time.Second)
+			if cacheModify {
+				saveDb()
+				cacheModify = false
+			}
+		}
+	}()
+}
+
+// tryLoadBitmapFile 读取并尝试解析到 cache(不 panic,返回是否成功)
+func tryLoadBitmapFile(path string) bool {
+	bs, err := ioutil.ReadFile(path)
+	if err != nil {
+		log.Warn("读取失败", zap.String("文件", path), zap.Error(err))
+		return false
+	}
+	if len(bs) == 0 {
+		log.Warn("文件内容为空", zap.String("文件", path))
+		return false
+	}
+	_, err = cache.FromBuffer(bs)
+	if err != nil {
+		log.Warn("文件解析失败(可能损坏)", zap.String("文件", path), zap.Error(err))
+		return false
+	}
+	return true
+}
+
 // InitRule  初始化移动标签规则
 func InitRule() {
 	//关键词

+ 58 - 10
createEsIndex/utils.go

@@ -505,27 +505,75 @@ func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
 //
 //}
 
+// saveDb 文件写入
+//func saveDb() {
+//	mutex.Lock()
+//	defer mutex.Unlock()
+//	// 如果 cache 为空,则无需执行写入操作
+//	if cache == nil {
+//		log.Error("saveDb", zap.Any("cache", "为空"))
+//	}
+//
+//	if cache.GetCardinality() > 0 {
+//		fo, err := os.OpenFile(*dbfile, os.O_CREATE|os.O_RDWR|os.O_SYNC|os.O_TRUNC, 0777)
+//		if err != nil {
+//			log.Info("saveDb", zap.Error(err))
+//		}
+//
+//		defer fo.Close()
+//		_, err = cache.WriteTo(fo)
+//		if err != nil {
+//			log.Info("saveDb", zap.Any("cache.WriteTo", err))
+//		}
+//	}
+//}
+
 // saveDb 文件写入
 func saveDb() {
 	mutex.Lock()
 	defer mutex.Unlock()
-	// 如果 cache 为空,则无需执行写入操作
+
 	if cache == nil {
-		log.Error("saveDb", zap.Any("cache", "为空"))
+		log.Error("saveDb", zap.String("cache", "为空"))
+		return
+	}
+	if cache.GetCardinality() == 0 {
+		log.Info("saveDb", zap.String("提示", "cache 为空,未保存"))
+		return
 	}
 
-	if cache.GetCardinality() > 0 {
-		fo, err := os.OpenFile(*dbfile, os.O_CREATE|os.O_RDWR|os.O_SYNC|os.O_TRUNC, 0777)
-		if err != nil {
-			log.Info("saveDb", zap.Error(err))
-		}
+	tmpFile := *dbfile + ".tmp"
+	bakFile := *dbfile + ".bak"
+
+	// 1. 写入临时文件
+	tmp, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666)
+	if err != nil {
+		log.Error("saveDb", zap.String("创建 tmp 文件失败", tmpFile), zap.Error(err))
+		return
+	}
+	_, err = cache.WriteTo(tmp)
+	tmp.Close()
+	if err != nil {
+		log.Error("saveDb", zap.String("写入 tmp 文件失败", tmpFile), zap.Error(err))
+		return
+	}
 
-		defer fo.Close()
-		_, err = cache.WriteTo(fo)
+	// 2. 原文件备份(.bak)
+	if _, err := os.Stat(*dbfile); err == nil {
+		err = os.Rename(*dbfile, bakFile)
 		if err != nil {
-			log.Info("saveDb", zap.Any("cache.WriteTo", err))
+			log.Warn("备份失败", zap.String("源文件", *dbfile), zap.Error(err))
 		}
 	}
+
+	// 3. 原子替换新文件
+	err = os.Rename(tmpFile, *dbfile)
+	if err != nil {
+		log.Error("saveDb", zap.String("tmp 替换 dbfile 失败", ""), zap.Error(err))
+		return
+	}
+
+	log.Info("saveDb", zap.String("状态", "写入成功,主文件已更新"))
 }
 
 // getNewName 获取新的不重复名称