package service import ( . "app.yhyue.com/moapp/jybase/common" . "app.yhyue.com/moapp/jybase/mongodb" "dataPrefer/db" "dataPrefer/entity" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gcfg" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/util/gconv" "golang.org/x/net/context" "log" "strings" "sync" "time" ) const ( Result_20220219 = "result_20220219" Result_20220218 = "result_20220218" Bidding = "bidding" Bidding_master = "bidding_master" ) var ( allSite = map[string]*entity.SiteInfo{} loginSite = map[string]bool{} delaySite = map[string]bool{} ruleConf *gcfg.Config competitorSite = map[string]bool{} gatherRules, structRules map[string][]*entity.Rule allExistsFields = map[string]string{} partExistsFields = map[string]string{} govSiteYes map[string]*g.Var extractSelectFields = map[string]interface{}{"_id": 1, "agency": 1, "area": 1, "bidamount": 1, "bidendtime": 1, "bidopentime": 1, "budget": 1, "buyer": 1, "buyertel": 1, "city": 1, "detail": 1, "isValidFile": 1, "procurementlist": 1, "projectcode": 1, "projectname": 1, "projectperiod": 1, "purchasinglist": 1, "title": 1, "winner": 1, "s_winner": 1, "winnertel": 1, "prefer_score": 1, "repeat": 1, "site": 1, "toptype": 1, "repeat_id": 1, "ai_zhipu": 1, "ext_ai_record": 1, "dataging": 1} biddingSelectFields = map[string]interface{}{"_id": 1, "detail": 1, "procurementlist": 1, "purchasinglist": 1, "isValidFile": 1, "pici": 1} ) func reLoadConf() { ctx := gctx.New() ruleConf = g.Config("rule.yaml") for _, v := range ruleConf.MustGet(ctx, "站点画像.竞品网站.list").Strings() { competitorSite[v] = true } if err := ruleConf.MustGet(ctx, "采集字段").Scan(&gatherRules); err != nil { log.Fatalln("采集字段 scan error", err) } if err := ruleConf.MustGet(ctx, "结构化字段").Scan(&structRules); err != nil { log.Fatalln("结构化字段 scan error", err) } for _, v := range ruleConf.MustGet(ctx, "对比大模型.allExistsAndSame.fields").Strings() { vs := strings.Split(v, ":") allExistsFields[vs[0]] = vs[1] } for _, v := range ruleConf.MustGet(ctx, "对比大模型.partExistsAndSame.fields").Strings() { vs := strings.Split(v, ":") partExistsFields[vs[0]] = vs[1] } govSiteYes = ruleConf.MustGet(ctx, "站点画像.政府采购网站.yes").MapStrVar() } func initSite() { log.Println("开始初始化网站数据。。。") wait := &sync.WaitGroup{} wait.Add(1) go func() { sess := db.Mgo_Site.GetMgoConn() defer func() { db.Mgo_Site.DestoryMongoConn(sess) wait.Done() }() siteIt := sess.DB("editor").C("site").Find(nil).Select(map[string]interface{}{ "site": 1, "site_subtype": 1, "site_toptype": 1, "type_plate": 1, }).Iter() for m := make(map[string]interface{}); siteIt.Next(&m); { allSite[gconv.String(m["site"])] = &entity.SiteInfo{ Subtype: gconv.String(m["site_subtype"]), Toptype: gconv.String(m["site_toptype"]), Site: gconv.String(m["site"]), TypePlate: gconv.String(m["type_plate"]), } m = make(map[string]interface{}) } log.Println("site表数据初始化结束。。。", len(allSite)) }() // wait.Add(1) go func() { sess := db.Mgo_Site.GetMgoConn() defer func() { db.Mgo_Site.DestoryMongoConn(sess) wait.Done() }() loginSiteIt := sess.DB("editor").C("site_login").Find(nil).Select(map[string]interface{}{ "site": 1, }).Iter() for m := make(map[string]interface{}); loginSiteIt.Next(&m); { loginSite[gconv.String(m["site"])] = true m = make(map[string]interface{}) } log.Println("site_login表数据初始化结束。。。", len(loginSite)) }() // wait.Add(1) go func() { sess := db.Mgo_Site.GetMgoConn() defer func() { db.Mgo_Site.DestoryMongoConn(sess) wait.Done() }() delaySiteIt := sess.DB("spider").C("spider_compete").Find(map[string]interface{}{"compete": true}).Select(map[string]interface{}{ "site": 1, }).Iter() for m := make(map[string]interface{}); delaySiteIt.Next(&m); { delaySite[gconv.String(m["site"])] = true m = make(map[string]interface{}) } log.Println("spider_compete表数据初始化结束。。。", len(delaySite)) }() wait.Wait() } func IncDataByTime() { //{comeintime:{$gt:1747886400,$lt:1747888200}} query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": 1747756800, "$lt": 1747843200, }, } //query := map[string]interface{}{ // "$or": []map[string]interface{}{ // map[string]interface{}{ // "_id": StringTOBsonId("682bf3325f834436f023b091"), // }, // map[string]interface{}{ // "repeat_id": "682bf3325f834436f023b091", // }, // }, //} //query := map[string]interface{}{ // "_id": StringTOBsonId("682cd75d5f834436f02824ee"), //} IncData(query) } func IncDataById(sid, eid string) { query := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": StringTOBsonId(sid), "$lte": StringTOBsonId(eid), }, } if sid == eid { query = map[string]interface{}{ "_id": StringTOBsonId(sid), } } IncData(query) } func IncData(query map[string]interface{}) { defer Catch() log.Println("start。。。", query) reLoadConf() initSite() // sess := db.Mgo_Main.GetMgoConn() defer db.Mgo_Main.DestoryMongoConn(sess) ctx := gctx.New() it := sess.DB(db.Mgo_Main.DbName).C(Bidding).Find(query).Select(biddingSelectFields).Sort("_id").Iter() index := 0 pool := make(chan bool, g.Config().MustGet(ctx, "scoringPoolSize").Int()) wait := &sync.WaitGroup{} for m := make(map[string]interface{}); it.Next(&m); { index++ if index%5000 == 0 { log.Println("处理数据", index) } pool <- true wait.Add(1) go func(nm map[string]interface{}) { defer func() { <-pool wait.Done() }() bidding, ok := db.Mgo_Extract.FindOneByField(Result_20220219, map[string]interface{}{"_id": nm["_id"]}, extractSelectFields) if !ok || bidding == nil || len(*bidding) == 0 { bidding, ok = db.Mgo_Extract.FindOneByField(Result_20220218, map[string]interface{}{"_id": nm["_id"]}, extractSelectFields) } _id := BsonIdToSId(nm["_id"]) if !ok || bidding == nil || len(*bidding) == 0 { log.Println("抽取表中没有找到数据", _id) return } else if gconv.Int((*bidding)["dataging"]) == 1 { //抽取表extract中dataging=1跳过 log.Println("抽取表中dataging是1跳过", _id) return } else if gconv.Int((*bidding)["repeat"]) == 1 && gconv.String((*bidding)["repeat_id"]) == "" { log.Println("repeat=1并且repeat_id不存在,过滤掉", _id) return } for k, _ := range *bidding { if nm[k] != nil { continue } nm[k] = (*bidding)[k] } score := MakeScore(ctx, nm) if score == -1 { return } Preferred(ctx, nm, score) }(m) m = make(map[string]interface{}) } wait.Wait() log.Println("over。。。", index) } // 去打分 func MakeScore(ctx context.Context, nm map[string]interface{}) int { totalScore := Scoring(ctx, nm) log.Println("标讯", BsonIdToSId(nm["_id"]), "分数", totalScore) nm["prefer_score"] = totalScore db.Mgo_Main.UpdateById(Bidding, nm["_id"], map[string]interface{}{ "$set": map[string]interface{}{ "prefer_score": totalScore, }, }) return totalScore } // 打分 func Scoring(ctx context.Context, data map[string]interface{}) int { _id := BsonIdToSId(data["_id"]) // var countScore = func(field string, rule *entity.Rule) int { var totalScore int switch rule.Type { case "exists": for _, vv := range gconv.Strings(rule.Value) { if data[vv] != nil { continue } totalScore += rule.Score log.Println(_id, vv, "字段不存在", rule.Score) } case "length": valueLen := len([]rune(gconv.String(data[field]))) if rule.Min >= 0 && rule.Max > 0 { if valueLen >= rule.Min && valueLen <= rule.Max { totalScore += rule.Score log.Println(_id, field, "字符串长度大于", rule.Min, "小于", rule.Max, rule.Score) } } else if rule.Min >= 0 { if valueLen > rule.Min { totalScore += rule.Score log.Println(_id, field, "字符串长度大于", rule.Min, rule.Score) } } case "equal": var isDeduct bool switch value := rule.Value.(type) { case int, int32, int64: isDeduct = gconv.Int64(value) == gconv.Int64(data[field]) case float32, float64: isDeduct = gconv.Float64(value) == gconv.Float64(data[field]) case bool: isDeduct = gconv.Bool(value) == gconv.Bool(data[field]) default: isDeduct = rule.Value == data[field] } if isDeduct { totalScore += rule.Score log.Println(_id, field, "值等于", rule.Value, rule.Score) } } return totalScore } totalScore := 100 sts := structRules[gconv.String(data["toptype"])] if sts == nil { sts = structRules["其他"] } //结构化字段 for _, v := range sts { totalScore += countScore("", v) } //采集字段 for k, v := range gatherRules { for _, vv := range v { if thisScore := countScore(k, vv); thisScore != 0 { totalScore += thisScore break } } } site := gconv.String(data["site"]) if allSite[site] != nil { score := 0 if topType := ruleConf.MustGet(ctx, "站点画像.政府采购网站.topType").String(); allSite[site].Toptype == topType { if govSiteYes[allSite[site].Subtype].IsNil() { score = govSiteYes["默认"].Int() log.Println(_id, "地方政府网站", score) } else { score = govSiteYes[allSite[site].Subtype].Int() log.Println(_id, allSite[site].Subtype, "类网站", score) } } else { score = ruleConf.MustGet(ctx, "站点画像.政府采购网站.not.score").Int() log.Println(_id, "非", topType, score) } totalScore += score } // if loginSite[site] { score := ruleConf.MustGet(ctx, "站点画像.需登录网站.score").Int() totalScore += score log.Println(_id, "需登录网站", score) } // if competitorSite[site] { score := ruleConf.MustGet(ctx, "站点画像.竞品网站.score").Int() totalScore += score log.Println(_id, "竞品网站", score) } // if delaySite[site] { score := ruleConf.MustGet(ctx, "站点画像.延时采集网站.score").Int() totalScore += score log.Println(_id, "延时采集网站", score) } // if data["ai_zhipu"] != nil { aiZhipu, _ := data["ai_zhipu"].(map[string]interface{}) compare := map[string]interface{}{} extAiRecord, _ := data["ext_ai_record"].(map[string]interface{}) for _, v := range []map[string]string{allExistsFields, partExistsFields} { for kk, _ := range v { if extAiRecord[kk] != nil { compare[kk] = extAiRecord[kk] } else if data[kk] != nil { compare[kk] = data[kk] } } } //全部字段存在,且不一致 allFieldExists := 0 for k, v := range allExistsFields { if compare[k] == nil || aiZhipu[v] == nil { break } allFieldExists++ } if allFieldExists > 0 && allFieldExists == len(allExistsFields) { score := ruleConf.MustGet(ctx, "对比大模型.allExistsAndSame.score").Int() for k, v := range allExistsFields { if gconv.String(compare[k]) == gconv.String(aiZhipu[v]) { continue } totalScore += score log.Println(_id, "对比大模型,字段都存在,", v, "的值不一致", score) } } //部分字段存在,且不一致 partFieldExists := false for k, v := range partExistsFields { if compare[k] == nil || aiZhipu[v] == nil { continue } partFieldExists = true break } if partFieldExists { score := ruleConf.MustGet(ctx, "对比大模型.partExistsAndSame.score").Int() for k, v := range partExistsFields { if compare[k] == nil || aiZhipu[v] == nil || gconv.String(compare[k]) == gconv.String(aiZhipu[v]) { continue } totalScore += score log.Println(_id, "对比大模型,部分字段存在,", v, "的值不一致", score) } } } return totalScore } // 数据优选 func Preferred(ctx context.Context, mainData map[string]interface{}, score int) { _id := BsonIdToSId(mainData["_id"]) rid := gconv.String(mainData["repeat_id"]) isRepeat := gconv.Int(mainData["repeat"]) var miniPici, repeatPici int64 var bestId string = _id var bestScore int = score //找被我判重掉的数据 var allDatas []map[string]interface{} wait := &sync.WaitGroup{} lock := &sync.Mutex{} for _, v := range []string{Result_20220219, Result_20220218} { wait.Add(1) go func(collection string) { defer wait.Done() if isRepeat == 1 && rid == "" { return } findId := _id if isRepeat == 1 { findId = rid } datas, ok := db.Mgo_Extract.Find(collection, map[string]interface{}{"repeat_id": findId}, nil, extractSelectFields, false, -1, -1) if ok && *datas != nil && len(*datas) > 0 { lock.Lock() allDatas = append(allDatas, *datas...) lock.Unlock() } }(v) if isRepeat == 1 && rid != "" { wait.Add(1) go func(collection string) { defer wait.Done() data, ok := db.Mgo_Extract.FindById(collection, rid, extractSelectFields) if ok && *data != nil && len(*data) > 0 { lock.Lock() allDatas = append(allDatas, *data) lock.Unlock() } }(v) } } wait.Wait() pool := make(chan bool, g.Config().MustGet(ctx, "preferPoolSize").Int()) repeatMap := map[string]bool{} findIds := []interface{}{StringTOBsonId(_id)} for _, vv := range allDatas { thisId := BsonIdToSId(vv["_id"]) if thisId == _id || repeatMap[thisId] { continue } repeatMap[thisId] = true pool <- true wait.Add(1) go func(vm map[string]interface{}) { defer func() { <-pool wait.Done() }() bidObj, _ := db.Mgo_Main.FindOneByField(Bidding, map[string]interface{}{"_id": vm["_id"]}, biddingSelectFields) thisId := BsonIdToSId(vm["_id"]) if bidObj == nil || len(*bidObj) == 0 { log.Println("bidding中没有找到", thisId) return } for k, v := range *bidObj { vm[k] = v } if pici := gconv.Int64(vm["pici"]); pici > 0 { if miniPici == 0 || miniPici > pici { miniPici = pici } if thisId == rid { repeatPici = pici } } preferScore := MakeScore(ctx, vm) lock.Lock() findIds = append(findIds, vm["_id"]) if preferScore > bestScore || (preferScore == bestScore && thisId > bestId) { bestId = thisId bestScore = preferScore } lock.Unlock() }(vv) } wait.Wait() updateIds := []interface{}{} for _, v := range findIds { if BsonIdToSId(v) == bestId { continue } updateIds = append(updateIds, v) } prevBests, ok := db.Mgo_Main.Find(Bidding, map[string]interface{}{ "_id": map[string]interface{}{ "$in": findIds, }, "isprefer": 1, }, `{"prefer_score":-1,"_id":1}`, `{"_id":1,"pici":1}`, false, 0, 1) if !ok { log.Println("查询上次优选id失败", findIds) return } prevBestId := "" var prevPici int64 if prevBests != nil && len(*prevBests) == 1 { prevBestId = BsonIdToSId((*prevBests)[0]["_id"]) prevPici = gconv.Int64((*prevBests)[0]["pici"]) } if prevBestId == "" { prevBestId = rid } if prevPici == 0 { prevPici = repeatPici } if prevPici == 0 { prevPici = miniPici } //优选 if prevBestId == "" || prevBestId != bestId { set := map[string]interface{}{ "isprefer": 1, "prefertime": time.Now().Unix(), "extracttype": 1, } if prevPici > 0 { set["pici"] = prevPici } if prevBestId != "" && prevBestId != bestId { set["old_preferid"] = prevBestId } db.Mgo_Main.UpdateById(Bidding, bestId, map[string]interface{}{ "$set": set, }) db.Mgo_Main.Save("bidding_prefer_log", map[string]interface{}{ "create_time": time.Now().Unix(), "old_preferid": prevBestId, "preferid": bestId, "ids": findIds, }) } //取消优选 db.Mgo_Main.Update(Bidding, map[string]interface{}{ "_id": map[string]interface{}{ "$in": updateIds, }, "isprefer": 1, }, map[string]interface{}{ "$set": map[string]interface{}{ "isprefer": -1, "extracttype": -1, "prefertime": time.Now().Unix(), }, "$unset": map[string]interface{}{"old_preferid": ""}, }, false, true) // db.Mgo_Main.Update(Bidding, map[string]interface{}{ "_id": map[string]interface{}{ "$in": updateIds, }, "extracttype": 1, }, map[string]interface{}{ "$set": map[string]interface{}{ "extracttype": -1, }, }, false, true) } // 数据优选 func PreferredTest(ctx context.Context, mainData map[string]interface{}, score int) { _id := BsonIdToSId(mainData["_id"]) rid := gconv.String(mainData["repeat_id"]) isRepeat := gconv.Int(mainData["repeat"]) var bestId string = _id var bestScore int = score //找被我判重掉的数据 var allDatas []map[string]interface{} wait := &sync.WaitGroup{} lock := &sync.Mutex{} for _, v := range []string{Result_20220219, Result_20220218} { wait.Add(1) go func(collection string) { defer wait.Done() if isRepeat == 1 && rid == "" { return } findId := _id if isRepeat == 1 { findId = rid } datas, ok := db.Mgo_Extract.Find(collection, map[string]interface{}{"repeat_id": findId}, nil, extractSelectFields, false, -1, -1) if ok && *datas != nil && len(*datas) > 0 { lock.Lock() allDatas = append(allDatas, *datas...) lock.Unlock() } }(v) if isRepeat == 1 && rid != "" { wait.Add(1) go func(collection string) { defer wait.Done() data, ok := db.Mgo_Extract.FindById(collection, rid, extractSelectFields) if ok && *data != nil && len(*data) > 0 { lock.Lock() allDatas = append(allDatas, *data) lock.Unlock() } }(v) } } wait.Wait() pool := make(chan bool, g.Config().MustGet(ctx, "preferPoolSize").Int()) repeatMap := map[string]bool{} findIds := []interface{}{StringTOBsonId(_id)} groupMap := map[string]map[string]interface{}{_id: mainData} for _, vv := range allDatas { thisId := BsonIdToSId(vv["_id"]) if thisId == _id || repeatMap[thisId] { continue } repeatMap[thisId] = true pool <- true wait.Add(1) go func(vm map[string]interface{}) { defer func() { <-pool wait.Done() }() preferScore := MakeScore(ctx, vm) lock.Lock() thisId := BsonIdToSId(vm["_id"]) groupMap[thisId] = vm findIds = append(findIds, vm["_id"]) if preferScore > bestScore || (preferScore == bestScore && thisId > bestId) { bestId = thisId bestScore = preferScore } lock.Unlock() }(vv) } wait.Wait() //找master_id //datas, ok := db.Mgo_Main.Find(Bidding, map[string]interface{}{ // "_id": map[string]interface{}{ // "$in": findIds, // }, //}, map[string]interface{}{"prefer_score": -1, "_id": 1}, map[string]interface{}{"_id": 0, "master_id": 1}, false, -1, -1) //if !ok { // log.Println("bidding中找master_id出错", findIds) // return //} updateIds := []interface{}{} existsMasterIds := map[string]bool{} //if datas != nil && len(*datas) > 0 { // for _, v := range *datas { // if v["master_id"] == nil { // continue // } // if masterId == "" { // masterId = gconv.String(v["master_id"]) // } // existsMasterIds[BsonIdToSId(v["_id"])] = true // } //} for _, v := range findIds { if existsMasterIds[BsonIdToSId(v)] { continue } updateIds = append(updateIds, v) } //保存最优的数据 data, ok := db.Mgo_Main.FindById(Bidding, bestId, nil) if ok && data != nil && len(*data) > 0 { delete(*data, "detail") delete(*data, "contenthtml") delete(*data, "master_id") if oldObj := groupMap[bestId]; oldObj != nil { if repeatId := gconv.String(oldObj["repeat_id"]); repeatId != "" { (*data)["old_id"] = repeatId if oldObj = groupMap[repeatId]; oldObj != nil { if siteObj := allSite[gconv.String(oldObj["site"])]; siteObj != nil { (*data)["old_site_toptype"] = siteObj.Toptype } (*data)["old_prefer_score"] = oldObj["prefer_score"] } } } if allSite[gconv.String((*data)["site"])] != nil { (*data)["site_toptype"] = allSite[gconv.String((*data)["site"])].Toptype } (*data)["id"] = bestId db.Mgo_Main.Update(Bidding_master, map[string]interface{}{"_id": (*data)["_id"]}, map[string]interface{}{"$set": *data}, true, false) } }