wcc 2 maanden geleden
bovenliggende
commit
f2edab5b64

BIN
project_chuan/dealProposed22Concurrent


+ 2 - 2
project_chuan/main.go

@@ -6,10 +6,10 @@ import (
 )
 
 func main() {
-
+	//ddd()
 	//exportTestData2()
 	//dealXlsxData()
-	dealProposed22Concurrent()
+	dealProposed22Concurrent() //处理存量,拟建项目数据
 	//dealProposed()
 	log.Info("main", zap.String("aaaa", "数据处理完毕"))
 }

+ 32 - 2
project_chuan/project.go

@@ -9,12 +9,14 @@ import (
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"regexp"
 	"sort"
 	"strings"
 	"sync"
+	"time"
 )
 
-// dealProposed22Concurrent 多协程处理
+// dealProposed22Concurrent 多协程处理,拟建存量数据
 func dealProposed22Concurrent() {
 	// 1. 初始化 ES 客户端
 	client, err := elastic.NewClient(
@@ -34,6 +36,7 @@ func dealProposed22Concurrent() {
 	query := map[string]interface{}{
 		"firsttime": map[string]interface{}{
 			"$gte": 1735660800,
+			"$lte": 1748102400,
 		},
 	}
 	iter := coll.Find(query).Select(nil).Iter()
@@ -76,6 +79,7 @@ func dealProposed22Concurrent() {
 	wg.Wait()
 }
 
+// processOneProposed 处理存量数据
 func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
 	defer func() {
 		if r := recover(); r != nil {
@@ -150,6 +154,11 @@ func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
 		"city":            tmp["city"],
 		"district":        tmp["district"],
 		"bidding":         biddings,
+		"updatetime":      time.Now().Unix(),
+	}
+
+	if isValidCodeFormat(util.ObjToString("approvecode")) {
+		insert["approvecode"] = tmp["approvecode"]
 	}
 
 	if buyer != "" {
@@ -160,7 +169,18 @@ func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
 		insert["credit_no"] = (*std)["credit_no"]
 	}
 
-	MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22_0524", insert)
+	whereExist := map[string]interface{}{
+		"proposed_id": proposedID,
+	}
+	exist, _ := MgoP.FindOne("wcc_dealProposed22_0524", whereExist)
+	// 存在就更新
+	if exist != nil && len(*exist) > 0 {
+		exitsid := mongodb.BsonIdToSId((*exist)["_id"])
+		MgoP.UpdateById("wcc_dealProposed22_0524", exitsid, insert)
+	} else {
+		insert["comeintime"] = time.Now().Unix()
+		MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22_0524", insert)
+	}
 }
 
 func cloneMap(src map[string]interface{}) map[string]interface{} {
@@ -799,3 +819,13 @@ func GetIdByURL(url string) string {
 
 	return ""
 }
+
+// isValidCodeFormat 判断 拟建项目编码
+func isValidCodeFormat(s string) bool {
+	pattern := `^\d{4}-\d{6}-\d{2}-\d{2}-\d{6}$`
+	matched, err := regexp.MatchString(pattern, s)
+	if err != nil {
+		return false
+	}
+	return matched
+}

+ 176 - 3
project_chuan/xlsx.go

@@ -155,7 +155,7 @@ func exportTestData() {
 	sess := MgoP.GetMgoConn()
 	defer MgoP.DestoryMongoConn(sess)
 
-	queryMgo := sess.DB("qfw").C("wcc_dealXlsxData_0523").Find(nil).Select(nil).Iter()
+	queryMgo := sess.DB("qfw").C("wcc_dealXlsxData_0524").Find(nil).Select(nil).Iter()
 	count := 0
 
 	for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
@@ -241,6 +241,141 @@ func exportTestData() {
 	log.Println("✅ 数据写入完成,共写入行数:", rowIndex-startRow)
 }
 
+// exportTestData1 导出统计局 拟建数据
+func exportTestData1() {
+	// 1. 连接 MongoDB
+	sess := MgoP.GetMgoConn()
+	defer MgoP.DestoryMongoConn(sess)
+
+	// 2. 打开 Excel 模板
+	f, err := excelize.OpenFile("统计局入库项目数据目录树.xlsx")
+	if err != nil {
+		log.Fatalf("无法打开Excel模板: %v", err)
+	}
+	sheet := f.GetSheetName(0)
+	rowIndex := 3 // 假设第一行是表头
+
+	where := map[string]interface{}{
+		"city": "陇南市",
+	}
+	// 3. 查询 Mongo 数据
+	queryMgo := sess.DB("qfw").C("wcc_dealProposed22_0524").Find(where).Iter()
+	count := 0
+
+	for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
+		if count%1000 == 0 {
+			log.Println("current", count)
+		}
+
+		//projectname := util.ObjToString(tmp["p1_project_name"])
+		//where := map[string]interface{}{"projectname": projectname}
+		//pro_project, _ := MgoP.FindOne("projectset_proposed", where)
+
+		//ppid := ""
+		//if pro_project != nil {
+		//	ppid = mongodb.BsonIdToSId((*pro_project)["_id"])
+		//}
+
+		v1 := []interface{}{
+			//ppid,
+			tmp["proposed_id"],
+			tmp["approvecode"],
+			tmp["projectname"],
+			tmp["buyer"],
+			tmp["credit_no"],
+			fmt.Sprintf("%s-%s-%s", tmp["area"], tmp["city"], tmp["district"]),
+		}
+
+		// ----------- 核心容错处理逻辑 ------------
+		if biddingListRaw, ok := tmp["bidding"]; ok {
+			biddingList, ok := biddingListRaw.([]interface{})
+			if !ok || len(biddingList) == 0 {
+				writeEmptyRow(f, sheet, rowIndex, v1)
+				rowIndex++
+			} else {
+				for _, bidItem := range biddingList {
+					bidMap, ok := bidItem.(map[string]interface{})
+					if !ok {
+						continue
+					}
+
+					if util.ObjToString(bidMap["toptype"]) == "拟建" {
+						continue
+					}
+					// v2
+					v2 := make([]interface{}, 9)
+					for i := range v2 {
+						v2[i] = ""
+					}
+
+					if projRaw, ok := bidMap["project"]; ok {
+						if project, ok := projRaw.(map[string]interface{}); ok {
+							firstTimeStr := ""
+							if val, ok := project["firsttime"]; ok {
+								if t := util.Int64All(val); t > 0 {
+									firstTimeStr = time.Unix(t, 0).In(time.FixedZone("CST", 8*3600)).Format("2006-01-02 15:04:05")
+								}
+							}
+							v2 = []interface{}{
+								project["project_id"],
+								project["projectname"],
+								project["bidamount"],
+								project["area"],
+								project["city"],
+								project["bidstatus"],
+								project["buyer"],
+								firstTimeStr,
+								bidMap["score"],
+							}
+						}
+					}
+
+					publishTimeStr := ""
+					if val, ok := bidMap["publishtime"]; ok {
+						if t := util.Int64All(val); t > 0 {
+							publishTimeStr = time.Unix(t, 0).In(time.FixedZone("CST", 8*3600)).Format("2006-01-02 15:04:05")
+						}
+					}
+					v3 := []interface{}{
+						bidMap["id"],
+						bidMap["title"],
+						bidMap["subtype"],
+						bidMap["area"],
+						bidMap["city"],
+						bidMap["buyer"],
+						bidMap["budget"],
+						bidMap["buyerperson"],
+						bidMap["buyertel"],
+						bidMap["s_winner"],
+						bidMap["bidamount"],
+						bidMap["winnertel"],
+						bidMap["agency"],
+						publishTimeStr,
+						bidMap["score"],
+					}
+
+					fullRow := append(append(v1, v2...), v3...)
+					cell, _ := excelize.CoordinatesToCellName(1, rowIndex)
+					f.SetSheetRow(sheet, cell, &fullRow)
+					rowIndex++
+				}
+			}
+		} else {
+			// bidding 字段不存在
+			writeEmptyRow(f, sheet, rowIndex, v1)
+			rowIndex++
+		}
+	}
+
+	// 4. 保存结果
+	if err := f.SaveAs("输出结果.xlsx"); err != nil {
+		log.Fatalf("保存文件失败: %v", err)
+	}
+
+	log.Println("导出完成,保存为 输出结果.xlsx")
+}
+
+// exportTestData2 导出 陇南市 拟建数据
 func exportTestData2() {
 	// 1. 连接 MongoDB
 	sess := MgoP.GetMgoConn()
@@ -278,12 +413,23 @@ func exportTestData2() {
 		v1 := []interface{}{
 			//ppid,
 			tmp["proposed_id"],
-			tmp["proposed_number"],
+			tmp["approvecode"],
 			tmp["projectname"],
 			tmp["buyer"],
 			tmp["credit_no"],
-			fmt.Sprintf("%s-%s", tmp["area"], tmp["city"]),
 		}
+		//fmt.Sprintf("%s-%s-%s", tmp["area"], tmp["city"], tmp["district"]),
+		acd := ""
+		if util.ObjToString(tmp["area"]) != "" {
+			acd = util.ObjToString(tmp["area"])
+		}
+		if util.ObjToString(tmp["city"]) != "" {
+			acd = acd + "-" + util.ObjToString(tmp["city"])
+		}
+		if util.ObjToString(tmp["district"]) != "" {
+			acd = acd + "-" + util.ObjToString(tmp["district"])
+		}
+		v1 = append(v1, acd)
 
 		// ----------- 核心容错处理逻辑 ------------
 		if biddingListRaw, ok := tmp["bidding"]; ok {
@@ -387,3 +533,30 @@ func writeEmptyRow(f *excelize.File, sheet string, rowIndex int, v1 []interface{
 	cell, _ := excelize.CoordinatesToCellName(1, rowIndex)
 	f.SetSheetRow(sheet, cell, &fullRow)
 }
+
+// 测试,更新approvecode 字段
+func ddd() {
+	sess := MgoP.GetMgoConn()
+	defer MgoP.DestoryMongoConn(sess)
+
+	// 3. 查询 Mongo 数据
+	queryMgo := sess.DB("qfw").C("wcc_dealProposed22_0524").Find(nil).Iter()
+	count := 0
+
+	for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
+		if count%1000 == 0 {
+			log.Println("current", count)
+		}
+
+		pro_id := util.ObjToString(tmp["proposed_id"])
+		pro, _ := MgoP.FindById("projectset_proposed", pro_id, nil)
+		if util.ObjToString((*pro)["approvecode"]) != "" {
+			update := map[string]interface{}{
+				"approvecode": (*pro)["approvecode"],
+			}
+
+			id := mongodb.BsonIdToSId(tmp["_id"])
+			MgoP.UpdateById("wcc_dealProposed22_0524", id, map[string]interface{}{"$set": update})
+		}
+	}
+}

BIN
project_chuan/统计局入库项目数据目录树.xlsx


BIN
project_chuan/陇南市输出结果.xlsx