package main import ( "fmt" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "sync" ) func updateProject() { defer util.Catch() // 项目数据 MgoP := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.85:27080", //MongodbAddr: "127.0.0.1:27080", Size: 10, DbName: "qfw", //Direct: true, } MgoP.InitPool() sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) selected := map[string]interface{}{"projectname": 1, "tag_topinformation": 1, "pici": 1} it := sess.DB("qfw").C("projectset_20230904").Find(nil).Select(selected).Sort("pici").Iter() fmt.Println("updateProject 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Println("current", count, tmp["projectname"]) } if _, ok := tmp["tag_topinformation"]; ok { update := map[string]interface{}{ "tag_topinformation": tmp["tag_topinformation"], } projectID := mongodb.BsonIdToSId(tmp["_id"]) // 更新es updateEsPool <- []map[string]interface{}{ {"_id": projectID}, //{"_id": mongodb.BsonIdToSId(tmp["_id"])}, update, } log.Println("aaaaaaaaaaaa", projectID, tmp["projectname"]) //err := Es.UpdateDocument("projectset", projectID, update) //if err != nil { // log.Println(projectID, tmp["projectname"], "更新es 失败") //} } } log.Println("结束~~~~~~~~~~~~~") } // updateProjectDetail 更新项目详情字段 func updateProjectDetail() { defer util.Catch() // 项目数据 sess := MgoR.GetMgoConn() defer MgoR.DestoryMongoConn(sess) where := map[string]interface{}{ "pici": map[string]interface{}{ "$gt": 1111, "$lte": 222, }, } it := sess.DB("qfw").C("projectset_20230904").Find(where).Select(nil).Iter() fmt.Println("updateProject 开始") count := 0 ch := make(chan bool, 15) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%2000 == 0 { log.Println("current", count, tmp["pici"], tmp["_id"]) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() projectID := mongodb.BsonIdToSId(tmp["_id"]) update := make(map[string]interface{}) //项目详情,辅助字段,处理过的list里面id,英文逗号拼接 detailIds := make([]string, 0) detail := make([]string, 0) //最终的详情字段 list := tmp["list"].([]interface{}) for _, m := range list { tmpM := m.(map[string]interface{}) //todo 处理项目详情 新字段;获取es 已有数据,判断是否需要更新detail infoid := util.ObjToString(tmpM["infoid"]) if infoid != "" { detailIds = append(detailIds, infoid) if infoid > "5a862e7040d2d9bbe88e3b1f" { biddingData, _ := MgoB.FindById("bidding", infoid, nil) biddingDetail := util.ObjToString((*biddingData)["detail"]) da, _ := CleanHTMLTags(biddingDetail) characterArray := SplitTextByChinesePunctuation(da) detail = append(detail, RemoveDuplicates(characterArray)...) } else { biddingData, _ := MgoB.FindById("bidding_back", infoid, nil) biddingDetail := util.ObjToString((*biddingData)["detail"]) da, _ := CleanHTMLTags(biddingDetail) characterArray := SplitTextByChinesePunctuation(da) detail = append(detail, RemoveDuplicates(characterArray)...) } } } if len(detail) > 0 { detailNew := RemoveDuplicates(detail) update["detail"] = detailNew // 更新es updateEsPool <- []map[string]interface{}{ {"_id": projectID}, //{"_id": mongodb.BsonIdToSId(tmp["_id"])}, update, } //newTmp["detail"] = strings.Join(detailNew, " ") } // // }(tmp) } wg.Wait() log.Println("数据处理完毕") }