123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- package main
- import (
- "fmt"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "sync"
- )
- func updateProject() {
- defer util.Catch()
- // 项目数据
- MgoP := &mongodb.MongodbSim{
- MongodbAddr: "172.17.4.85:27080",
- //MongodbAddr: "127.0.0.1:27080",
- Size: 10,
- DbName: "qfw",
- //Direct: true,
- }
- MgoP.InitPool()
- sess := MgoP.GetMgoConn()
- defer MgoP.DestoryMongoConn(sess)
- selected := map[string]interface{}{"projectname": 1, "tag_topinformation": 1, "pici": 1}
- it := sess.DB("qfw").C("projectset_20230904").Find(nil).Select(selected).Sort("pici").Iter()
- fmt.Println("updateProject 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current", count, tmp["projectname"])
- }
- if _, ok := tmp["tag_topinformation"]; ok {
- update := map[string]interface{}{
- "tag_topinformation": tmp["tag_topinformation"],
- }
- projectID := mongodb.BsonIdToSId(tmp["_id"])
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": projectID},
- //{"_id": mongodb.BsonIdToSId(tmp["_id"])},
- update,
- }
- log.Println("aaaaaaaaaaaa", projectID, tmp["projectname"])
- //err := Es.UpdateDocument("projectset", projectID, update)
- //if err != nil {
- // log.Println(projectID, tmp["projectname"], "更新es 失败")
- //}
- }
- }
- log.Println("结束~~~~~~~~~~~~~")
- }
- // updateProjectDetail 更新项目详情字段
- func updateProjectDetail() {
- defer util.Catch()
- // 项目数据
- sess := MgoR.GetMgoConn()
- defer MgoR.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "pici": map[string]interface{}{
- "$gt": 1111,
- "$lte": 222,
- },
- }
- it := sess.DB("qfw").C("projectset_20230904").Find(where).Select(nil).Iter()
- fmt.Println("updateProject 开始")
- count := 0
- ch := make(chan bool, 15)
- wg := &sync.WaitGroup{}
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%2000 == 0 {
- log.Println("current", count, tmp["pici"], tmp["_id"])
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- projectID := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- //项目详情,辅助字段,处理过的list里面id,英文逗号拼接
- detailIds := make([]string, 0)
- detail := make([]string, 0) //最终的详情字段
- list := tmp["list"].([]interface{})
- for _, m := range list {
- tmpM := m.(map[string]interface{})
- //todo 处理项目详情 新字段;获取es 已有数据,判断是否需要更新detail
- infoid := util.ObjToString(tmpM["infoid"])
- if infoid != "" {
- detailIds = append(detailIds, infoid)
- if infoid > "5a862e7040d2d9bbe88e3b1f" {
- biddingData, _ := MgoB.FindById("bidding", infoid, nil)
- biddingDetail := util.ObjToString((*biddingData)["detail"])
- da, _ := CleanHTMLTags(biddingDetail)
- characterArray := SplitTextByChinesePunctuation(da)
- detail = append(detail, RemoveDuplicates(characterArray)...)
- } else {
- biddingData, _ := MgoB.FindById("bidding_back", infoid, nil)
- biddingDetail := util.ObjToString((*biddingData)["detail"])
- da, _ := CleanHTMLTags(biddingDetail)
- characterArray := SplitTextByChinesePunctuation(da)
- detail = append(detail, RemoveDuplicates(characterArray)...)
- }
- }
- }
- if len(detail) > 0 {
- detailNew := RemoveDuplicates(detail)
- update["detail"] = detailNew
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": projectID},
- //{"_id": mongodb.BsonIdToSId(tmp["_id"])},
- update,
- }
- //newTmp["detail"] = strings.Join(detailNew, " ")
- }
- //
- //
- }(tmp)
- }
- wg.Wait()
- log.Println("数据处理完毕")
- }
|