id_find.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  11. "log"
  12. "net"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. UdpClient udp.UdpClient
  18. nextAddr *net.UDPAddr
  19. )
  20. func findID() {
  21. UdpClient = udp.UdpClient{Local: ":1199", BufSize: 1024}
  22. nextAddr = &net.UDPAddr{
  23. Port: util.IntAll(17833),
  24. IP: net.ParseIP("127.0.0.1"),
  25. }
  26. UdpClient.Listen(processUdpMsg)
  27. Mgo := &mongodb.MongodbSim{
  28. //MongodbAddr: "127.0.0.1:27083",
  29. MongodbAddr: "172.17.189.140:27080",
  30. DbName: "qfw",
  31. Size: 10,
  32. Direct: true,
  33. UserName: "SJZY_RWbid_ES",
  34. Password: "SJZY@B4i4D5e6S",
  35. }
  36. Mgo.InitPool()
  37. start := -1
  38. end := 0
  39. st := util.GetDayStartSecond(start) //
  40. et := util.GetDayStartSecond(end) //
  41. startID := fmt.Sprintf("%x0000000000000000", st) //开始ID
  42. endID := fmt.Sprintf("%x0000000000000000", et) // 结束ID
  43. //urla := "http://127.0.0.1:19805"
  44. urla := "http://172.17.4.184:19805"
  45. usernamea := "es_all"
  46. passworda := "TopJkO2E_d1x"
  47. //创建 Elasticsearch 客户端
  48. client, err := elastic.NewClient(
  49. elastic.SetURL(urla),
  50. elastic.SetBasicAuth(usernamea, passworda),
  51. elastic.SetSniff(false),
  52. )
  53. if err != nil {
  54. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  55. }
  56. id1 := mongodb.StringTOBsonId(startID)
  57. id2 := mongodb.StringTOBsonId(endID)
  58. mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
  59. sess := Mgo.GetMgoConn()
  60. defer Mgo.DestoryMongoConn(sess)
  61. fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
  62. query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter()
  63. count := 0
  64. ch := make(chan bool, 15)
  65. wg := &sync.WaitGroup{}
  66. var ids = make([]string, 0)
  67. var lock sync.Mutex
  68. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  69. if count%10000 == 0 {
  70. log.Println("current:", count, len(ids))
  71. }
  72. if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
  73. ch <- true
  74. wg.Add(1)
  75. go func(tmp map[string]interface{}) {
  76. defer func() {
  77. <-ch
  78. wg.Done()
  79. }()
  80. id := mongodb.BsonIdToSId(tmp["_id"])
  81. exist, _ := documentExists(client, "bidding", id)
  82. if !exist {
  83. lock.Lock()
  84. ids = append(ids, id)
  85. lock.Unlock()
  86. }
  87. }(tmp)
  88. tmp = map[string]interface{}{}
  89. }
  90. }
  91. wg.Wait()
  92. for _, v := range ids {
  93. data := map[string]interface{}{
  94. "stype": "index_by_id",
  95. "infoid": v,
  96. }
  97. SendUdpMsg(data, nextAddr)
  98. time.Sleep(time.Second)
  99. }
  100. log.Println("over", len(ids))
  101. }
  102. func findIDHWY() {
  103. UdpClient = udp.UdpClient{Local: ":1199", BufSize: 1024}
  104. nextAddr = &net.UDPAddr{
  105. Port: util.IntAll(17833),
  106. IP: net.ParseIP("127.0.0.1"),
  107. }
  108. UdpClient.Listen(processUdpMsg)
  109. Mgo := &mongodb.MongodbSim{
  110. //MongodbAddr: "127.0.0.1:27083",
  111. MongodbAddr: "172.17.189.140:27080",
  112. DbName: "qfw",
  113. Size: 10,
  114. Direct: true,
  115. UserName: "SJZY_RWbid_ES",
  116. Password: "SJZY@B4i4D5e6S",
  117. }
  118. Mgo.InitPool()
  119. start := -1
  120. end := 0
  121. st := util.GetDayStartSecond(start) //
  122. et := util.GetDayStartSecond(end) //
  123. startID := fmt.Sprintf("%x0000000000000000", st) //开始ID
  124. endID := fmt.Sprintf("%x0000000000000000", et) // 结束ID
  125. //urla := "http://127.0.0.1:19805"
  126. urla := "http://172.17.4.184:19905"
  127. usernamea := "jybid"
  128. passworda := "Top2023_JEB01i@31"
  129. //创建 Elasticsearch 客户端
  130. client, err := elastic.NewClient(
  131. elastic.SetURL(urla),
  132. elastic.SetBasicAuth(usernamea, passworda),
  133. elastic.SetSniff(false),
  134. )
  135. if err != nil {
  136. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  137. }
  138. id1 := mongodb.StringTOBsonId(startID)
  139. id2 := mongodb.StringTOBsonId(endID)
  140. mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
  141. sess := Mgo.GetMgoConn()
  142. defer Mgo.DestoryMongoConn(sess)
  143. fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
  144. query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter()
  145. count := 0
  146. ch := make(chan bool, 15)
  147. wg := &sync.WaitGroup{}
  148. var ids = make([]string, 0)
  149. var lock sync.Mutex
  150. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  151. if count%10000 == 0 {
  152. log.Println("current:", count, len(ids))
  153. }
  154. if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
  155. ch <- true
  156. wg.Add(1)
  157. go func(tmp map[string]interface{}) {
  158. defer func() {
  159. <-ch
  160. wg.Done()
  161. }()
  162. id := mongodb.BsonIdToSId(tmp["_id"])
  163. exist, _ := documentExists(client, "bidding", id)
  164. if !exist {
  165. lock.Lock()
  166. ids = append(ids, id)
  167. lock.Unlock()
  168. }
  169. }(tmp)
  170. tmp = map[string]interface{}{}
  171. }
  172. }
  173. wg.Wait()
  174. for _, v := range ids {
  175. data := map[string]interface{}{
  176. "stype": "index_by_id",
  177. "infoid": v,
  178. }
  179. SendUdpMsg(data, nextAddr)
  180. time.Sleep(time.Second)
  181. }
  182. log.Println("over", len(ids))
  183. }
  184. //processUdpMsg 处理udp
  185. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  186. switch act {
  187. case udp.OP_TYPE_DATA:
  188. case udp.OP_NOOP:
  189. da := string(data)
  190. log.Println("收到回复数据", da)
  191. default:
  192. fmt.Println("current_listen : processUdpMsg =====", act)
  193. }
  194. }
  195. //SendUdpMsg 通知处理企业新增数据
  196. func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
  197. bytes, _ := json.Marshal(data)
  198. err := UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
  199. if err != nil {
  200. log.Println(err)
  201. }
  202. log.Println(data)
  203. log.Println(target)
  204. }
  205. // documentExists 检查指定 ID 是否存在于 Elasticsearch 中
  206. func documentExists(client *elastic.Client, indexName, documentID string) (bool, error) {
  207. exists, err := client.Exists().
  208. Index(indexName).
  209. Id(documentID).
  210. Do(context.Background())
  211. if err != nil {
  212. return false, err
  213. }
  214. return exists, nil
  215. }