package main import ( "bytes" "context" "encoding/json" "errors" "fmt" "go.mongodb.org/mongo-driver/bson" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "net/http" "strconv" "strings" "sync" "time" ) func getBidding2233() { MgoB = &mongodb.MongodbSim{ MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": 1753113600, "$lt": time.Now().Unix(), }, } query := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count) } spicode := util.ObjToString(tmp["spidercode"]) if spicode == "sdxzbiddingsjzypc" { MgoB.SaveByOriID("wcc_sdxzbiddingsjzypc", tmp) } } log.Println("count:", count) } // exportBidding exportBidding func exportBidding() { MgoB = &mongodb.MongodbSim{ MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) where := map[string]interface{}{ "subtype": "开标记录", } query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "channel": 1, "detail": 1, "comeintime": 1, "extracttype": 1, "infoformat": 1}).Iter() count := 0 ch := make(chan bool, 20) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() MgoB.SaveByOriID("wcc_bidding_kaibiao", tmp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("处理完毕") } func getBidding0311() { // 3. 构造 MongoDB 查询 filter := bson.M{ "$and": []bson.M{ { "$or": []bson.M{ {"subtype": "合同"}, {"toptype": "结果"}, {"toptype": "招标"}, }, }, {"publishtime": bson.M{"$gte": 1714492800, "$lt": 1719763200}}, }, } //where := map[string]interface{}{ // "publishtime": map[string]interface{}{ // "$gte": 1704038400, // "$lte": 1735660800, // }, //} sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) query := sess.DB("qfw").C("bidding").Find(filter).Select(map[string]interface{}{"buyer": 1, "toptype": 1, "subtype": 1}).Iter() count := 0 ch := make(chan bool, 5) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current:", count, tmp["_id"]) } if util.IntAll(tmp["extracttype"]) == -1 { continue } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() MgoB.SaveByOriID("wcc_bidding_20250311_1148", tmp) }(tmp) //if util.ObjToString(tmp["toptype"]) == "招标" || util.ObjToString(tmp["toptype"]) == "结果" || util.ObjToString(tmp["subtype"]) == "合同" { // MgoB.SaveByOriID("wcc_bidding_20250311_1148-2", tmp) //} tmp = make(map[string]interface{}) } wg.Wait() } // getBidding2 获取bidding 1.3日无二级分类数据 func getBidding2() { //2024-1-3日数据 where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": 1704211200, "$lte": 1704297600, }, "subtype": map[string]interface{}{ "$exists": 1, }, } sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //texts := make([]string, 0) query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "toptype": 1, "subtype": 1, "href": 1, "detail": 1, "channel": 1}).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current:", count) } if util.IntAll(tmp["extracttype"]) == -1 { continue } id := mongodb.BsonIdToSId(tmp["_id"]) tmp["jyhref"] = GetJyURLByID(id) MgoB.SaveByOriID("wcc_bidding_20240103_subtype_exists", tmp) tmp = make(map[string]interface{}) } log.Println("over") } // callAi 调用大模型 func callAi() { sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //where := map[string]interface{}{ // "subtype_a": map[string]interface{}{ // "$exists": 0, // }, //} query := sess.DB("qfw_data").C("wcc_bidding_20240103_subtype_exists").Find(nil).Select(nil).Iter() count := 0 //ch := make(chan bool, 1) //wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("-------- current:", count, tmp["_id"], " ---------") } //ch <- true //wg.Add(1) //go func(tmp map[string]interface{}) { // defer func() { // <-ch // wg.Done() // }() id := mongodb.BsonIdToSId(tmp["_id"]) title := util.ObjToString(tmp["title"]) detail := util.ObjToString(tmp["detail"]) data := map[string]interface{}{ "title": title, "detail": detail, } reqData := map[string]interface{}{ "texts": []interface{}{data}, } now := time.Now() res := send(reqData) log.Println(time.Since(now).Seconds(), tmp["_id"]) subtype := res["result"].([]interface{}) result := subtype[0] types := strings.Split(util.ObjToString(result), "-") update := make(map[string]interface{}) if len(types) == 2 { update["toptype_ai"] = types[0] update["subtype_ai"] = types[1] //没有内容 if detail == "" { update["data_type"] = 1 } else { update["data_type"] = 0 } MgoB.UpdateById("wcc_bidding_20240103_subtype_exists", id, map[string]interface{}{"$set": update}) } //}(tmp) tmp = make(map[string]interface{}) } //wg.Wait() log.Println("over") } // getBidding 调用分类大模型 func getBidding() { //2024-1-3日数据 where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": 1704211200, "$lte": 1704297600, }, "subtype": map[string]interface{}{ "$exists": 0, }, } sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //texts := make([]string, 0) query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "toptype": 1, "href": 1, "detail": 1}).Iter() count := 0 ch := make(chan bool, 10) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current:", count) } if util.IntAll(tmp["extracttype"]) == -1 { continue } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) title := util.ObjToString(tmp["title"]) detail := util.ObjToString(tmp["detail"]) tmp["bidding_id"] = id data := map[string]interface{}{ "title": title, "detail": detail, } reqData := map[string]interface{}{ "texts": []interface{}{data}, } res := SendAi(reqData) subtype := res["result"].([]interface{}) result := subtype[0] types := strings.Split(util.ObjToString(result), "-") if len(types) == 2 { tmp["new_toptype"] = types[0] tmp["new_subtype"] = types[1] } tmp["jyhref"] = GetJyURLByID(id) //没有内容 if detail == "" { tmp["data_type"] = 1 } else { tmp["data_type"] = 0 } MgoB.Save("wcc_20240103-2", tmp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("over") } func send(data map[string]interface{}) (res map[string]interface{}) { url := "http://192.168.3.109:16688" jsonData, err := json.Marshal(data) if err != nil { fmt.Println("JSON marshal error:", err) return } req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { fmt.Println("Request error:", err) return } req.Header.Set("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { fmt.Println("Request error:", err) return } defer resp.Body.Close() err = json.NewDecoder(resp.Body).Decode(&res) if err != nil { fmt.Println("Response decoding error:", err) return } return } // SendAi 调用大模型招标分类 func SendAi(data map[string]interface{}) (res map[string]interface{}) { // 设置 2 秒的超时 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() url := "http://192.168.3.109:16688" jsonData, err := json.Marshal(data) if err != nil { fmt.Println("JSON marshal error:", err) return } req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { fmt.Println("Request error:", err) return } req.Header.Set("Content-Type", "application/json") // 将请求与上下文关联 req = req.WithContext(ctx) client := &http.Client{} resp, err := client.Do(req) if err != nil { // 使用 errors.Is 检查错误是否是超时错误 if errors.Is(err, context.DeadlineExceeded) { fmt.Println("Request timed out") return } fmt.Println("Request error:", err) return } defer resp.Body.Close() err = json.NewDecoder(resp.Body).Decode(&res) if err != nil { fmt.Println("Response decoding error:", err) return } return } // getCount 北京中科闻歌科技股份有限公司 对指定数据源网站内的中标公告的数据量的查询需求 func getCount() { //startTime := int64(1672502400) //2023-01-01 //endTime := int64(1680278400) //2023-4-01 //endTime2 := int64(1688140800) //2023-7-01 //endTime3 := int64(1696089600) //2023-10-01 endTime4 := int64(1704038400) //2024-1-01 endTime5 := int64(1714492800) //2024-5-01 where := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(strconv.FormatInt(endTime4, 16) + "0000000000000000"), "$lte": mongodb.StringTOBsonId(strconv.FormatInt(endTime5, 16) + "0000000000000000"), }, } sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) log.Println(where) query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"site": 1, "subtype": 1, "attach_text": 1}).Iter() count := 0 count1 := 0 //中国南方电网 标讯数据 count2 := 0 //中国南方电网 标讯成交数据 count3 := 0 //中国南方电网 标讯有附件数据 count4 := 0 //中国南方电网 标讯成交有附件数据 count5 := 0 //国家电网公司电子商务平台 标讯数据 count6 := 0 //国家电网公司电子商务平台 标讯成交数据 count7 := 0 //国家电网公司电子商务平台 标讯有附件数据 count8 := 0 //国家电网公司电子商务平台 标讯成交有附件数据 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count, count1, count2, count3, count4, count5, count6, count7, count8) } site := util.ObjToString(tmp["site"]) subtype := util.ObjToString(tmp["subtype"]) attachText, _ := tmp["attach_text"].(map[string]interface{}) if site == "中国南方电网" { count1++ if subtype == "中标" || subtype == "单一" || subtype == "成交" || subtype == "合同" { count2++ if attachText != nil { count4++ } } if attachText != nil { count3++ } } if site == "国家电网公司电子商务平台" { count5++ if subtype == "中标" || subtype == "单一" || subtype == "成交" || subtype == "合同" { count6++ if attachText != nil { count8++ } } if attachText != nil { count7++ } } //if util.ObjToString(tmp["site"]) != "中国南方电网" { // count1++ // if util.ObjToString(tmp["subtype"]) == "中标" || util.ObjToString(tmp["subtype"]) == "单一" || util.ObjToString(tmp["subtype"]) == "成交" || util.ObjToString(tmp["subtype"]) == "合同" { // count2++ // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok { // count4++ // } // } // // 附件 // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok { // count3++ // } // //} // //if util.ObjToString(tmp["site"]) != "国家电网公司电子商务平台" { // count5++ // if util.ObjToString(tmp["subtype"]) == "中标" || util.ObjToString(tmp["subtype"]) == "单一" || util.ObjToString(tmp["subtype"]) == "成交" || util.ObjToString(tmp["subtype"]) == "合同" { // count6++ // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok { // count8++ // } // } // //右附件 // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok { // count7++ // } //} tmp = make(map[string]interface{}) } log.Println("count1", count1) log.Println("count2", count2) log.Println("count3", count3) log.Println("count4", count4) log.Println("count5", count5) log.Println("count6", count6) log.Println("count7", count7) log.Println("count8", count8) log.Println("over") } func deleteEs() { Mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) //es Es := &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19908", S_esurl: "http://172.17.4.184:19908", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } Es.InitElasticSize() //es 新集群 EsNew := &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19905", S_esurl: "http://172.17.4.184:19905", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } EsNew.InitElasticSize() where := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId("673458dbb25c3e1deb2fef58"), "$lte": mongodb.StringTOBsonId("6734638db25c3e1deb303821"), }, "extracttype": -1, } query := sess.DB("qfw").C("bidding").Find(&where).Select(nil).Sort("-_id").Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%100 == 0 { log.Println("current:", count) } id := mongodb.BsonIdToSId(tmp["_id"]) err := Es.DeleteByID("bidding", id) if err != nil { log.Println("es bidding", id) } err = EsNew.DeleteByID("bidding", id) if err != nil { log.Println("es new bidding", id) } err = EsNew.DeleteByID("bidding_year", id) if err != nil { log.Println("es new bidding_year", id) } err = EsNew.DeleteByID("bidding_free", id) if err != nil { log.Println("es new bidding_free", id) } } }