wcc 2 månader sedan
förälder
incheckning
345afc3710

BIN
project_chuan/dealProposed22Concurrent


BIN
project_chuan/dealXlsxData


+ 1 - 0
project_chuan/main.go

@@ -7,6 +7,7 @@ import (
 
 func main() {
 
+	//exportTestData2()
 	//dealXlsxData()
 	dealProposed22Concurrent()
 	//dealProposed()

+ 31 - 65
project_chuan/project.go

@@ -32,7 +32,6 @@ func dealProposed22Concurrent() {
 
 	coll := sess.DB("qfw").C("projectset_proposed")
 	query := map[string]interface{}{
-		//"area": "甘肃",
 		"firsttime": map[string]interface{}{
 			"$gte": 1735660800,
 		},
@@ -40,7 +39,7 @@ func dealProposed22Concurrent() {
 	iter := coll.Find(query).Select(nil).Iter()
 
 	// 3. 并发控制
-	const maxWorkers = 1
+	const maxWorkers = 2
 	taskCh := make(chan map[string]interface{}, 2000)
 	var wg sync.WaitGroup
 
@@ -50,6 +49,9 @@ func dealProposed22Concurrent() {
 		go func() {
 			defer wg.Done()
 			for doc := range taskCh {
+				if len(doc) == 0 {
+					log.Info("aaa", zap.Any("client", client))
+				}
 				processOneProposed(doc, client)
 			}
 		}()
@@ -64,7 +66,7 @@ func dealProposed22Concurrent() {
 			log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]))
 		}
 
-		if util.ObjToString(doc["area"]) != "甘肃" {
+		if util.ObjToString(doc["area"]) == "甘肃" {
 			continue
 		}
 
@@ -86,28 +88,23 @@ func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
 	buyer := util.ObjToString(tmp["owner"])
 	proposed_number := util.ObjToString(tmp["proposed_number"])
 
-	log.Info("processOneProposed", zap.String("开始查询es", projectName))
-	results, err := searchES23(client, projectName, buyer, 18, 50)
+	//log.Info("processOneProposed", zap.String("开始查询es", projectName))
+	results, err := searchES23(client, projectName, buyer, 20, 50)
 	if err != nil {
 		log.Warn("searchES22 error", zap.Error(err))
 		return
 	}
-	log.Info("processOneProposed", zap.String("结束查询es", projectName))
-	projectIds := []string{}
-	biddingIds := []string{}
+	//log.Info("processOneProposed", zap.String("结束查询es", projectName))
 	biddings := []map[string]interface{}{}
-	//bidding_id-> bidding map 数据源
-	biddingIdMap := make(map[string]map[string]interface{}, 0)
-	//project_id -> project map
-	projectIdMap := make(map[string]map[string]interface{}, 0)
-	project_bidding_ids := make(map[string][]string, 0) //存储project_id =>[bidding_id]
 
+	// 标讯信息
 	for _, re := range results {
 		biddingID := util.ObjToString(re["id"])
-		biddingIds = append(biddingIds, biddingID)
 		da := map[string]interface{}{
 			"id":          re["id"],
 			"title":       re["title"],
+			"area":        re["area"],
+			"city":        re["city"],
 			"projectname": re["projectname"],
 			"score":       re["score"],
 			"toptype":     re["toptype"],
@@ -122,20 +119,25 @@ func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
 			"agency":      re["agency"],
 			"publishtime": re["publishtime"],
 		}
-		biddings = append(biddings, da)
-		biddingIdMap[biddingID] = da
-	}
-
-	for _, bid := range biddingIds {
-		where2 := map[string]interface{}{"ids": bid}
+		//项目信息
+		where2 := map[string]interface{}{"ids": biddingID}
 		projectset, _ := MgoP.FindOne("projectset_20230904", where2)
-		if projectset != nil && len(*projectset) > 0 {
-			pid := mongodb.BsonIdToSId((*projectset)["_id"])
-			projectIds = append(projectIds, pid)
-			p_bidding_ids := project_bidding_ids[pid]
-			p_bidding_ids = append(p_bidding_ids, bid)
-			projectIdMap[pid] = *projectset
+		if projectset != nil && len((*projectset)) > 0 {
+			v3 := map[string]interface{}{
+				"project_id":  mongodb.BsonIdToSId((*projectset)["_id"]),
+				"projectname": (*projectset)["projectname"],
+				"bidamount":   (*projectset)["bidamount"],
+				"area":        (*projectset)["area"],
+				"city":        (*projectset)["city"],
+				"firsttime":   (*projectset)["firsttime"],
+				"bidtype":     (*projectset)["bidtype"],
+				"bidstatus":   (*projectset)["bidstatus"],
+				"sortprice":   (*projectset)["sortprice"],
+				"buyer":       (*projectset)["buyer"],
+			}
+			da["project"] = v3
 		}
+		biddings = append(biddings, da)
 	}
 
 	insert := map[string]interface{}{
@@ -143,13 +145,11 @@ func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
 		"stype":           1, //代表从拟建数据-> 匹配在建数据
 		"proposed_number": proposed_number,
 		"buyer":           buyer,
-		"project_name":    tmp["projectname"],
+		"projectname":     tmp["projectname"],
 		"area":            tmp["area"],
 		"city":            tmp["city"],
 		"district":        tmp["district"],
-		//"bidding_ids":     removeDuplicates(biddingIds),
-		//"project_ids":     removeDuplicates(projectIds),
-		//"biddings":        biddings,
+		"bidding":         biddings,
 	}
 
 	if buyer != "" {
@@ -160,41 +160,7 @@ func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
 		insert["credit_no"] = (*std)["credit_no"]
 	}
 
-	projects := make([]map[string]interface{}, 0)
-	if len(project_bidding_ids) > 0 {
-		for pid, bidding_ids := range project_bidding_ids {
-			p_project := projectIdMap[pid]
-			p_bs := make([]map[string]interface{}, 0)
-			for _, vv := range bidding_ids {
-				bi := biddingIdMap[vv]
-				p_bs = append(p_bs, bi)
-			}
-
-			project := map[string]interface{}{
-				"project_id":  pid,
-				"projectname": p_project["projectname"],
-				"bidamount":   p_project["bidamount"],
-				"area":        p_project["area"],
-				"city":        p_project["city"],
-				"bidstatus":   p_project["bidstatus"],
-				"buyer":       p_project["buyer"],
-				"firsttime":   p_project["firsttime"],
-				"biddings":    p_bs,
-			}
-
-			projects = append(projects, project)
-		}
-	}
-
-	if len(projects) > 0 {
-		insert["projects"] = projects
-	}
-
-	if len(biddings) > 0 {
-		insert["biddings"] = biddings
-	}
-
-	MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22_0523", insert)
+	MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22_0524", insert)
 }
 
 func cloneMap(src map[string]interface{}) map[string]interface{} {

+ 252 - 34
project_chuan/xlsx.go

@@ -6,8 +6,10 @@ import (
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"log"
+	"time"
 )
 
+// dealXlsxData 处理xlsx 给的样例数据,匹配在建项目数据,然后存储MongoDB
 func dealXlsxData() {
 	//url := "http://172.17.4.184:19908"
 	////url := "http://127.0.0.1:19908"
@@ -64,30 +66,41 @@ func dealXlsxData() {
 		row := rows[i]
 		p1_biddings := make([]map[string]interface{}, 0)
 		p1_bidding_ids := make([]string, 0)
+		biddingIdMap := make(map[string]map[string]interface{}, 0)
+		//projectIdMap := make(map[string]map[string]interface{}, 0) //project_id -> project map
 		for i := start; i+3 < len(row); i += 4 {
-			amount := row[i]
-			title := row[i+1]
 			href := row[i+2]
 			score := row[i+3]
 			bid := GetIdByURL(href)
 			p1_bidding_ids = append(p1_bidding_ids, bid)
 			bidding, _ := MgoB.FindById("bidding", bid, nil)
-			data := map[string]interface{}{
-				"bidding_id": bid,
-				"href":       href,
-				"title":      title,
-				"socre":      score,
-				"amount":     amount,
-				"toptype":    (*bidding)["toptype"],
-				"subtype":    (*bidding)["subtype"],
+			da := map[string]interface{}{
+				"id":          bid,
+				"title":       (*bidding)["title"],
+				"area":        (*bidding)["area"],
+				"city":        (*bidding)["city"],
+				"projectname": (*bidding)["projectname"],
+				"score":       score,
+				"toptype":     (*bidding)["toptype"],
+				"subtype":     (*bidding)["subtype"],
+				"buyer":       (*bidding)["buyer"],
+				"budget":      (*bidding)["budget"],
+				"buyerperson": (*bidding)["buyerperson"],
+				"buyertel":    (*bidding)["buyertel"],
+				"s_winner":    (*bidding)["s_winner"],
+				"bidamount":   (*bidding)["bidamount"],
+				"winnertel":   (*bidding)["winnertel"],
+				"agency":      (*bidding)["agency"],
+				"publishtime": (*bidding)["publishtime"],
 			}
+			biddingIdMap[bid] = da
 			//log.Println("aa", insert, amount, title, href, score)
-			p1_biddings = append(p1_biddings, data)
+			p1_biddings = append(p1_biddings, da)
 		}
 
-		insert["biddings"] = p1_biddings
+		//insert["biddings"] = p1_biddings
 		p1_bidding_ids = removeDuplicates(p1_bidding_ids)
-		p1_projects := make([]map[string]interface{}, 0)
+
 		for _, vv := range p1_bidding_ids {
 			where2 := map[string]interface{}{
 				"ids": vv,
@@ -102,26 +115,25 @@ func dealXlsxData() {
 					"area":        (*projectset)["area"],
 					"city":        (*projectset)["city"],
 					"district":    (*projectset)["district"],
+					"bidstatus":   (*projectset)["bidstatus"],
+					"buyer":       (*projectset)["buyer"],
+					"firsttime":   (*projectset)["firsttime"],
+					"bidtype":     (*projectset)["bidtype"],
 				}
-				p1_projects = append(p1_projects, data)
-			}
-		}
-		uniqueProjects := make([]map[string]interface{}, 0)
-		seen := make(map[string]bool)
-
-		for _, project := range p1_projects {
-			pid := project["project_id"].(string)
-			if !seen[pid] {
-				seen[pid] = true
-				uniqueProjects = append(uniqueProjects, project)
+
+				biddData := biddingIdMap[vv]
+				biddData["project"] = data
+				biddingIdMap[vv] = biddData
 			}
 		}
 
-		if len(uniqueProjects) > 0 {
-			insert["projects"] = uniqueProjects
+		var insertBidding = make([]map[string]interface{}, 0)
+		for _, v := range biddingIdMap {
+			insertBidding = append(insertBidding, v)
 		}
 
-		MgoP.InsertOrUpdate("qfw", "wcc_dealXlsxData", insert)
+		insert["bidding"] = insertBidding
+		MgoP.InsertOrUpdate("qfw", "wcc_dealXlsxData_0524", insert)
 	}
 
 	log.Println("数据处理完毕--------")
@@ -129,13 +141,23 @@ func dealXlsxData() {
 
 // exportTestData 导出拓扑统计局 样例数据
 func exportTestData() {
+	// 打开已有模板
+	f, err := excelize.OpenFile("统计局入库项目数据目录树.xlsx")
+	if err != nil {
+		log.Fatal("打开模板失败:", err)
+	}
+
+	sheet := f.GetSheetName(0) // 使用第一个 Sheet,也可以改成具体 Sheet 名
+	startRow := 2              // 假设第 1 行是表头,数据从第 2 行开始
+
+	rowIndex := startRow
+
 	sess := MgoP.GetMgoConn()
 	defer MgoP.DestoryMongoConn(sess)
 
-	queryMgo := sess.DB("qfw").C("wcc_dealXlsxData").Find(nil).Select(nil).Iter()
+	queryMgo := sess.DB("qfw").C("wcc_dealXlsxData_0523").Find(nil).Select(nil).Iter()
 	count := 0
 
-	total_datas := make([][]interface{}, 0)
 	for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
 		if count%1000 == 0 {
 			log.Println("current", count)
@@ -155,17 +177,213 @@ func exportTestData() {
 			ppid,
 			tmp["p1_project_code"],
 			tmp["p1_project_name"],
-			tmp["p1_project_name"],
 			tmp["p1_project_owner"],
 			tmp["p1_project_owner_code"],
+			tmp["p1_project_owner_area"],
 		}
 
-		//2.在建项目数据
-		if projects, ok := tmp["projects"].([]interface{}); ok {
-			log.Println(projects)
+		// 遍历招采信息
+		if biddingList, ok := tmp["bidding"].([]interface{}); ok {
+			for _, bidItem := range biddingList {
+				bid := bidItem.(map[string]interface{})
+				project := bid["project"].(map[string]interface{})
+
+				// 在建招采项目信息
+				v2 := []interface{}{
+					project["project_id"],
+					project["projectname"],
+					project["bidamount"],
+					project["area"],
+					project["city"],
+					project["bidstatus"],
+					project["buyer"],
+					time.Unix(util.Int64All(project["firsttime"]), 0).In(time.FixedZone("CST", 8*3600)).Format("2006-01-02 15:04:05"),
+					bid["score"],
+				}
+
+				// 标讯信息
+				v3 := []interface{}{
+					bid["id"],
+					bid["title"],
+					bid["subtype"],
+					bid["area"],
+					bid["city"],
+					bid["buyer"],
+					bid["budget"],
+					bid["buyerperson"],
+					bid["buyertel"],
+					bid["s_winner"],
+					bid["bidamount"],
+					bid["winnertel"],
+					bid["agency"],
+					time.Unix(util.Int64All(bid["publishtime"]), 0).In(time.FixedZone("CST", 8*3600)).Format("2006-01-02 15:04:05"),
+					//bid["publishtime"],
+					bid["score"],
+				}
+
+				// 拼接成一行
+				fullRow := append(append(v1, v2...), v3...)
+
+				cell, _ := excelize.CoordinatesToCellName(1, rowIndex)
+				f.SetSheetRow(sheet, cell, &fullRow)
+				rowIndex++
+				// 写入表格,如写入 Excel
+				//excelWriter.WriteRow(fullRow) // 你需要实现 WriteRow 方法
+			}
+
 		}
+	}
+	// 保存为另一个文件,防止覆盖模板
+	outputFile := "统计局入库项目数据导出.xlsx"
+	if err := f.SaveAs(outputFile); err != nil {
+		log.Fatal("保存 Excel 失败:", err)
+	}
+	log.Println("✅ 数据写入完成,共写入行数:", rowIndex-startRow)
+}
 
-		total_datas = append(total_datas, v1)
+func exportTestData2() {
+	// 1. 连接 MongoDB
+	sess := MgoP.GetMgoConn()
+	defer MgoP.DestoryMongoConn(sess)
+
+	// 2. 打开 Excel 模板
+	f, err := excelize.OpenFile("统计局入库项目数据目录树.xlsx")
+	if err != nil {
+		log.Fatalf("无法打开Excel模板: %v", err)
 	}
+	sheet := f.GetSheetName(0)
+	rowIndex := 3 // 假设第一行是表头
 
+	where := map[string]interface{}{
+		"city": "陇南市",
+	}
+	// 3. 查询 Mongo 数据
+	queryMgo := sess.DB("qfw").C("wcc_dealProposed22_0524").Find(where).Iter()
+	count := 0
+
+	for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
+		if count%1000 == 0 {
+			log.Println("current", count)
+		}
+
+		//projectname := util.ObjToString(tmp["p1_project_name"])
+		//where := map[string]interface{}{"projectname": projectname}
+		//pro_project, _ := MgoP.FindOne("projectset_proposed", where)
+
+		//ppid := ""
+		//if pro_project != nil {
+		//	ppid = mongodb.BsonIdToSId((*pro_project)["_id"])
+		//}
+
+		v1 := []interface{}{
+			//ppid,
+			tmp["proposed_id"],
+			tmp["proposed_number"],
+			tmp["projectname"],
+			tmp["buyer"],
+			tmp["credit_no"],
+			fmt.Sprintf("%s-%s", tmp["area"], tmp["city"]),
+		}
+
+		// ----------- 核心容错处理逻辑 ------------
+		if biddingListRaw, ok := tmp["bidding"]; ok {
+			biddingList, ok := biddingListRaw.([]interface{})
+			if !ok || len(biddingList) == 0 {
+				writeEmptyRow(f, sheet, rowIndex, v1)
+				rowIndex++
+			} else {
+				for _, bidItem := range biddingList {
+					bidMap, ok := bidItem.(map[string]interface{})
+					if !ok {
+						continue
+					}
+
+					if util.ObjToString(bidMap["toptype"]) == "拟建" {
+						continue
+					}
+					// v2
+					v2 := make([]interface{}, 9)
+					for i := range v2 {
+						v2[i] = ""
+					}
+
+					if projRaw, ok := bidMap["project"]; ok {
+						if project, ok := projRaw.(map[string]interface{}); ok {
+							firstTimeStr := ""
+							if val, ok := project["firsttime"]; ok {
+								if t := util.Int64All(val); t > 0 {
+									firstTimeStr = time.Unix(t, 0).In(time.FixedZone("CST", 8*3600)).Format("2006-01-02 15:04:05")
+								}
+							}
+							v2 = []interface{}{
+								project["project_id"],
+								project["projectname"],
+								project["bidamount"],
+								project["area"],
+								project["city"],
+								project["bidstatus"],
+								project["buyer"],
+								firstTimeStr,
+								bidMap["score"],
+							}
+						}
+					}
+
+					publishTimeStr := ""
+					if val, ok := bidMap["publishtime"]; ok {
+						if t := util.Int64All(val); t > 0 {
+							publishTimeStr = time.Unix(t, 0).In(time.FixedZone("CST", 8*3600)).Format("2006-01-02 15:04:05")
+						}
+					}
+					v3 := []interface{}{
+						bidMap["id"],
+						bidMap["title"],
+						bidMap["subtype"],
+						bidMap["area"],
+						bidMap["city"],
+						bidMap["buyer"],
+						bidMap["budget"],
+						bidMap["buyerperson"],
+						bidMap["buyertel"],
+						bidMap["s_winner"],
+						bidMap["bidamount"],
+						bidMap["winnertel"],
+						bidMap["agency"],
+						publishTimeStr,
+						bidMap["score"],
+					}
+
+					fullRow := append(append(v1, v2...), v3...)
+					cell, _ := excelize.CoordinatesToCellName(1, rowIndex)
+					f.SetSheetRow(sheet, cell, &fullRow)
+					rowIndex++
+				}
+			}
+		} else {
+			// bidding 字段不存在
+			writeEmptyRow(f, sheet, rowIndex, v1)
+			rowIndex++
+		}
+	}
+
+	// 4. 保存结果
+	if err := f.SaveAs("输出结果.xlsx"); err != nil {
+		log.Fatalf("保存文件失败: %v", err)
+	}
+
+	log.Println("导出完成,保存为 输出结果.xlsx")
+}
+
+func writeEmptyRow(f *excelize.File, sheet string, rowIndex int, v1 []interface{}) {
+	emptyV2 := make([]interface{}, 9)
+	emptyV3 := make([]interface{}, 15)
+	for i := range emptyV2 {
+		emptyV2[i] = ""
+	}
+	for i := range emptyV3 {
+		emptyV3[i] = ""
+	}
+	fullRow := append(append(v1, emptyV2...), emptyV3...)
+	cell, _ := excelize.CoordinatesToCellName(1, rowIndex)
+	f.SetSheetRow(sheet, cell, &fullRow)
 }

BIN
project_chuan/统计局入库项目数据目录树.xlsx


BIN
project_chuan/陇南市输出结果.xlsx