12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- package main
- import (
- "encoding/json"
- "fmt"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "log"
- "net"
- "time"
- )
- // udpDatas 通过udp;补充数据 生 索引
- func udpDatas() {
- UdpClient = udp.UdpClient{Local: ":18888", BufSize: 1024}
- UdpClient.Listen(processUdpMsg)
- biddingDataAddr = &net.UDPAddr{
- Port: util.IntAll(1783),
- IP: net.ParseIP("127.0.0.1"),
- }
- MgoB = &mongodb.MongodbSim{
- MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "qfw",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoB.InitPool()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gt": 1753113600,
- "$lt": time.Now().Unix(),
- },
- }
- query := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count)
- }
- //id := mongodb.BsonIdToSId(tmp["_id"])
- //data := map[string]interface{}{
- // "stype": "index-by-id",
- // "_id": id,
- //}
- }
- log.Println("count:", count)
- }
- // processUdpMsg 处理udp
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- //print().Println("processUdpMsg", zap.Any("mapInfo:", mapInfo))
- util.Debug("processUdpMsg :=>", mapInfo)
- if err != nil {
- fmt.Println(err)
- }
- if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- }
- default:
- fmt.Println("qyxy_listen : processUdpMsg =====")
- }
- }
- // SendUdpMsg 通知处理企业新增数据
- func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
- bytes, _ := json.Marshal(data)
- UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
- util.Debug("SendUdpMsg:=>", data)
- util.Debug("target :=>", target.IP, target.Port)
- }
|