package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "go.mongodb.org/mongo-driver/bson" "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "log" "net" "sync" "time" ) var ( UdpClient udp.UdpClient nextAddr *net.UDPAddr ) func findID() { UdpClient = udp.UdpClient{Local: ":1199", BufSize: 1024} nextAddr = &net.UDPAddr{ Port: util.IntAll(17833), IP: net.ParseIP("127.0.0.1"), } UdpClient.Listen(processUdpMsg) Mgo := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27083", MongodbAddr: "172.17.189.140:27080", DbName: "qfw", Size: 10, Direct: true, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", } Mgo.InitPool() start := -1 end := 0 st := util.GetDayStartSecond(start) // et := util.GetDayStartSecond(end) // startID := fmt.Sprintf("%x0000000000000000", st) //开始ID endID := fmt.Sprintf("%x0000000000000000", et) // 结束ID //urla := "http://127.0.0.1:19805" urla := "http://172.17.4.184:19805" usernamea := "es_all" passworda := "TopJkO2E_d1x" //创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(urla), elastic.SetBasicAuth(usernamea, passworda), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } id1 := mongodb.StringTOBsonId(startID) id2 := mongodb.StringTOBsonId(endID) mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段 sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1} query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter() count := 0 ch := make(chan bool, 15) wg := &sync.WaitGroup{} var ids = make([]string, 0) var lock sync.Mutex for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count, len(ids)) } if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) exist, _ := documentExists(client, "bidding", id) if !exist { lock.Lock() ids = append(ids, id) lock.Unlock() } }(tmp) tmp = map[string]interface{}{} } } wg.Wait() for _, v := range ids { data := map[string]interface{}{ "stype": "index_by_id", "infoid": v, } SendUdpMsg(data, nextAddr) time.Sleep(time.Second) } log.Println("over", len(ids)) } func findIDHWY() { UdpClient = udp.UdpClient{Local: ":1199", BufSize: 1024} nextAddr = &net.UDPAddr{ Port: util.IntAll(17833), IP: net.ParseIP("127.0.0.1"), } UdpClient.Listen(processUdpMsg) Mgo := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27083", MongodbAddr: "172.17.189.140:27080", DbName: "qfw", Size: 10, Direct: true, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", } Mgo.InitPool() start := -1 end := 0 st := util.GetDayStartSecond(start) // et := util.GetDayStartSecond(end) // startID := fmt.Sprintf("%x0000000000000000", st) //开始ID endID := fmt.Sprintf("%x0000000000000000", et) // 结束ID //urla := "http://127.0.0.1:19805" urla := "http://172.17.4.184:19905" usernamea := "jybid" passworda := "Top2023_JEB01i@31" //创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(urla), elastic.SetBasicAuth(usernamea, passworda), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } id1 := mongodb.StringTOBsonId(startID) id2 := mongodb.StringTOBsonId(endID) mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段 sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1} query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter() count := 0 ch := make(chan bool, 15) wg := &sync.WaitGroup{} var ids = make([]string, 0) var lock sync.Mutex for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count, len(ids)) } if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) exist, _ := documentExists(client, "bidding", id) if !exist { lock.Lock() ids = append(ids, id) lock.Unlock() } }(tmp) tmp = map[string]interface{}{} } } wg.Wait() for _, v := range ids { data := map[string]interface{}{ "stype": "index_by_id", "infoid": v, } SendUdpMsg(data, nextAddr) time.Sleep(time.Second) } log.Println("over", len(ids)) } //processUdpMsg 处理udp func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: case udp.OP_NOOP: da := string(data) log.Println("收到回复数据", da) default: fmt.Println("current_listen : processUdpMsg =====", act) } } //SendUdpMsg 通知处理企业新增数据 func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) { bytes, _ := json.Marshal(data) err := UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target) if err != nil { log.Println(err) } log.Println(data) log.Println(target) } // documentExists 检查指定 ID 是否存在于 Elasticsearch 中 func documentExists(client *elastic.Client, indexName, documentID string) (bool, error) { exists, err := client.Exists(). Index(indexName). Id(documentID). Do(context.Background()) if err != nil { return false, err } return exists, nil }