123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/olivere/elastic/v7"
- "go.mongodb.org/mongo-driver/bson"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "log"
- "net"
- "sync"
- "time"
- )
- var (
- UdpClient udp.UdpClient
- nextAddr *net.UDPAddr
- )
- func findID() {
- UdpClient = udp.UdpClient{Local: ":1199", BufSize: 1024}
- nextAddr = &net.UDPAddr{
- Port: util.IntAll(17833),
- IP: net.ParseIP("127.0.0.1"),
- }
- UdpClient.Listen(processUdpMsg)
- Mgo := &mongodb.MongodbSim{
- //MongodbAddr: "127.0.0.1:27083",
- MongodbAddr: "172.17.189.140:27080",
- DbName: "qfw",
- Size: 10,
- Direct: true,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- }
- Mgo.InitPool()
- start := -1
- end := 0
- st := util.GetDayStartSecond(start) //
- et := util.GetDayStartSecond(end) //
- startID := fmt.Sprintf("%x0000000000000000", st) //开始ID
- endID := fmt.Sprintf("%x0000000000000000", et) // 结束ID
- //urla := "http://127.0.0.1:19805"
- urla := "http://172.17.4.184:19805"
- usernamea := "es_all"
- passworda := "TopJkO2E_d1x"
- //创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(urla),
- elastic.SetBasicAuth(usernamea, passworda),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- id1 := mongodb.StringTOBsonId(startID)
- id2 := mongodb.StringTOBsonId(endID)
- mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
- query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter()
- count := 0
- ch := make(chan bool, 15)
- wg := &sync.WaitGroup{}
- var ids = make([]string, 0)
- var lock sync.Mutex
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count, len(ids))
- }
- if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- id := mongodb.BsonIdToSId(tmp["_id"])
- exist, _ := documentExists(client, "bidding", id)
- if !exist {
- lock.Lock()
- ids = append(ids, id)
- lock.Unlock()
- }
- }(tmp)
- tmp = map[string]interface{}{}
- }
- }
- wg.Wait()
- for _, v := range ids {
- data := map[string]interface{}{
- "stype": "index_by_id",
- "infoid": v,
- }
- SendUdpMsg(data, nextAddr)
- time.Sleep(time.Second)
- }
- log.Println("over", len(ids))
- }
- func findIDHWY() {
- UdpClient = udp.UdpClient{Local: ":1199", BufSize: 1024}
- nextAddr = &net.UDPAddr{
- Port: util.IntAll(17833),
- IP: net.ParseIP("127.0.0.1"),
- }
- UdpClient.Listen(processUdpMsg)
- Mgo := &mongodb.MongodbSim{
- //MongodbAddr: "127.0.0.1:27083",
- MongodbAddr: "172.17.189.140:27080",
- DbName: "qfw",
- Size: 10,
- Direct: true,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- }
- Mgo.InitPool()
- start := -1
- end := 0
- st := util.GetDayStartSecond(start) //
- et := util.GetDayStartSecond(end) //
- startID := fmt.Sprintf("%x0000000000000000", st) //开始ID
- endID := fmt.Sprintf("%x0000000000000000", et) // 结束ID
- //urla := "http://127.0.0.1:19805"
- urla := "http://172.17.4.184:19905"
- usernamea := "jybid"
- passworda := "Top2023_JEB01i@31"
- //创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(urla),
- elastic.SetBasicAuth(usernamea, passworda),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- id1 := mongodb.StringTOBsonId(startID)
- id2 := mongodb.StringTOBsonId(endID)
- mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
- query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter()
- count := 0
- ch := make(chan bool, 15)
- wg := &sync.WaitGroup{}
- var ids = make([]string, 0)
- var lock sync.Mutex
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count, len(ids))
- }
- if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- id := mongodb.BsonIdToSId(tmp["_id"])
- exist, _ := documentExists(client, "bidding", id)
- if !exist {
- lock.Lock()
- ids = append(ids, id)
- lock.Unlock()
- }
- }(tmp)
- tmp = map[string]interface{}{}
- }
- }
- wg.Wait()
- for _, v := range ids {
- data := map[string]interface{}{
- "stype": "index_by_id",
- "infoid": v,
- }
- SendUdpMsg(data, nextAddr)
- time.Sleep(time.Second)
- }
- log.Println("over", len(ids))
- }
- //processUdpMsg 处理udp
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- case udp.OP_NOOP:
- da := string(data)
- log.Println("收到回复数据", da)
- default:
- fmt.Println("current_listen : processUdpMsg =====", act)
- }
- }
- //SendUdpMsg 通知处理企业新增数据
- func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
- bytes, _ := json.Marshal(data)
- err := UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
- if err != nil {
- log.Println(err)
- }
- log.Println(data)
- log.Println(target)
- }
- // documentExists 检查指定 ID 是否存在于 Elasticsearch 中
- func documentExists(client *elastic.Client, indexName, documentID string) (bool, error) {
- exists, err := client.Exists().
- Index(indexName).
- Id(documentID).
- Do(context.Background())
- if err != nil {
- return false, err
- }
- return exists, nil
- }
|