package main import ( "context" "encoding/json" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "strings" // "app.yhyue.com/moapp/jybase/common" "time" elastic "app.yhyue.com/moapp/jybase/es" esV7 "github.com/olivere/elastic/v7" ) var ( Es elastic.Es Mgo *mongodb.MongodbSim ) func init() { Es = elastic.NewEs("v7", "http://172.17.4.184:19905", 20, "jybid", "Top2023_JEB01i@31") //Mgo = &mongodb.MongodbSim{ // //MongodbAddr: "127.0.0.1:27080", // MongodbAddr: "172.17.4.85:27080", // DbName: "top", // Size: 10, // //Direct: true, //} //Mgo.InitPool() Mgo = &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() } type MySource struct { Querys string } func (m *MySource) Source() (interface{}, error) { mp := make(map[string]interface{}) json.Unmarshal([]byte(m.Querys), &mp) return mp["query"], nil } func main() { //filePath := "./file2.xlsx" // SE := qu.SimpleEncrypt{Key: "topJYBX2019"} // log.Println(SE.DecodeString("QltHc2AmagsIUFwVV0dybyZpAwECX0YK")) // return //xlFile, _ := xlsx.OpenFile(filePath) //获取行数 // length := len(xlFile.Sheets[0].Rows) //开辟除表头外的行数的数组内存 // resourceArr := make([]map[string]interface{}, length-1) //遍历sheet //for _, sheet := range xlFile.Sheets { //遍历每一行 //for rowIndex, row := range sheet.Rows { //跳过第一行表头信息 //if rowIndex == 0 { // continue //} //s_winner := row.Cells[0].Value counts := 0 esCon := elastic.VarEs.(*elastic.EsV7) client := esCon.GetEsConn() defer esCon.DestoryEsConn(client) cc := &MySource{ Querys: `{ "query": { "bool": { "must": [ { "terms": { "subtype": ["中标", "单一", "成交", "合同"] } }, { "terms": { "area": ["北京", "上海", "江苏", "浙江", "广东"] } }, { "range": { "comeintime": { "gte": 1640966400, "lt": 1703952000 } } } ] } } }`, } ctx, _ := context.WithTimeout(context.Background(), 5*time.Hour) //游标查询,index不支持别名,只能写索引库的名称 res, err := client.Scroll("bidding").Query(cc).Size(500).Do(ctx) //查询一条获取游标 if err == nil { numDocs := 0 scrollId := res.ScrollId count := 1 for { if scrollId == "" { log.Println("ScrollId Is Error") break } var searchResult *esV7.SearchResult var err error if count == 1 { searchResult = res } else { searchResult, err = client.Scroll("projectset").Size(500).ScrollId(scrollId).Do(ctx) //查询 if err != nil { if err.Error() == "EOS" { //迭代完毕 log.Println("Es Search Data Over:", err) } else { log.Println("Es Search Data Error:", err) } break } } log.Println("此次处理条数 ", len(searchResult.Hits.Hits)) for _, hit := range searchResult.Hits.Hits { //开始处理数据 doc := make(map[string]interface{}) if json.Unmarshal(hit.Source, &doc) == nil { delete(doc, "filetext") delete(doc, "detail") counts++ log.Println("当前数量 ", counts) projectName := util.ObjToString(doc["projectname"]) if strings.Contains(projectName, "非政府") { continue } buyerclass := util.ObjToString(doc["buyerclass"]) if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" { continue } err := Mgo.InsertOrUpdate("qfw", "wcc_bank_poc2", doc) if err != nil { log.Println("error", doc["id"]) } } } scrollId = searchResult.ScrollId count++ } client.ClearScroll().ScrollId(scrollId).Do(ctx) //清理游标 log.Println("Result Data Count:", numDocs) } else { log.Println("查询失败 ", err) } }