123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package main
- import (
- "context"
- "encoding/json"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "strings"
- // "app.yhyue.com/moapp/jybase/common"
- "time"
- elastic "app.yhyue.com/moapp/jybase/es"
- esV7 "github.com/olivere/elastic/v7"
- )
- var (
- Es elastic.Es
- Mgo *mongodb.MongodbSim
- )
- func init() {
- Es = elastic.NewEs("v7", "http://172.17.4.184:19905", 20, "jybid", "Top2023_JEB01i@31")
- //Mgo = &mongodb.MongodbSim{
- // //MongodbAddr: "127.0.0.1:27080",
- // MongodbAddr: "172.17.4.85:27080",
- // DbName: "top",
- // Size: 10,
- // //Direct: true,
- //}
- //Mgo.InitPool()
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "qfw",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- }
- type MySource struct {
- Querys string
- }
- func (m *MySource) Source() (interface{}, error) {
- mp := make(map[string]interface{})
- json.Unmarshal([]byte(m.Querys), &mp)
- return mp["query"], nil
- }
- func main() {
- //filePath := "./file2.xlsx"
- // SE := qu.SimpleEncrypt{Key: "topJYBX2019"}
- // log.Println(SE.DecodeString("QltHc2AmagsIUFwVV0dybyZpAwECX0YK"))
- // return
- //xlFile, _ := xlsx.OpenFile(filePath)
- //获取行数
- // length := len(xlFile.Sheets[0].Rows)
- //开辟除表头外的行数的数组内存
- // resourceArr := make([]map[string]interface{}, length-1)
- //遍历sheet
- //for _, sheet := range xlFile.Sheets {
- //遍历每一行
- //for rowIndex, row := range sheet.Rows {
- //跳过第一行表头信息
- //if rowIndex == 0 {
- // continue
- //}
- //s_winner := row.Cells[0].Value
- counts := 0
- esCon := elastic.VarEs.(*elastic.EsV7)
- client := esCon.GetEsConn()
- defer esCon.DestoryEsConn(client)
- cc := &MySource{
- Querys: `{
- "query": {
- "bool": {
- "must": [
- {
- "terms": {
- "subtype": ["中标", "单一", "成交", "合同"]
- }
- },
- {
- "terms": {
- "area": ["北京", "上海", "江苏", "浙江", "广东"]
- }
- },
- {
- "range": {
- "comeintime": {
- "gte": 1640966400,
- "lt": 1703952000
- }
- }
- }
- ]
- }
- }
- }`,
- }
- ctx, _ := context.WithTimeout(context.Background(), 5*time.Hour)
- //游标查询,index不支持别名,只能写索引库的名称
- res, err := client.Scroll("bidding").Query(cc).Size(500).Do(ctx) //查询一条获取游标
- if err == nil {
- numDocs := 0
- scrollId := res.ScrollId
- count := 1
- for {
- if scrollId == "" {
- log.Println("ScrollId Is Error")
- break
- }
- var searchResult *esV7.SearchResult
- var err error
- if count == 1 {
- searchResult = res
- } else {
- searchResult, err = client.Scroll("projectset").Size(500).ScrollId(scrollId).Do(ctx) //查询
- if err != nil {
- if err.Error() == "EOS" { //迭代完毕
- log.Println("Es Search Data Over:", err)
- } else {
- log.Println("Es Search Data Error:", err)
- }
- break
- }
- }
- log.Println("此次处理条数 ", len(searchResult.Hits.Hits))
- for _, hit := range searchResult.Hits.Hits {
- //开始处理数据
- doc := make(map[string]interface{})
- if json.Unmarshal(hit.Source, &doc) == nil {
- delete(doc, "filetext")
- delete(doc, "detail")
- counts++
- log.Println("当前数量 ", counts)
- projectName := util.ObjToString(doc["projectname"])
- if strings.Contains(projectName, "非政府") {
- continue
- }
- buyerclass := util.ObjToString(doc["buyerclass"])
- if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
- continue
- }
- err := Mgo.InsertOrUpdate("qfw", "wcc_bank_poc2", doc)
- if err != nil {
- log.Println("error", doc["id"])
- }
- }
- }
- scrollId = searchResult.ScrollId
- count++
- }
- client.ClearScroll().ScrollId(scrollId).Do(ctx) //清理游标
- log.Println("Result Data Count:", numDocs)
- } else {
- log.Println("查询失败 ", err)
- }
- }
|