package main import ( "encoding/json" "fmt" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "log" "net" "time" ) // udpDatas 通过udp;补充数据 生 索引 func udpDatas() { UdpClient = udp.UdpClient{Local: ":18888", BufSize: 1024} UdpClient.Listen(processUdpMsg) biddingDataAddr = &net.UDPAddr{ Port: util.IntAll(1783), IP: net.ParseIP("127.0.0.1"), } MgoB = &mongodb.MongodbSim{ MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": 1753113600, "$lt": time.Now().Unix(), }, } query := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count) } //id := mongodb.BsonIdToSId(tmp["_id"]) //data := map[string]interface{}{ // "stype": "index-by-id", // "_id": id, //} } log.Println("count:", count) } // processUdpMsg 处理udp func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) //print().Println("processUdpMsg", zap.Any("mapInfo:", mapInfo)) util.Debug("processUdpMsg :=>", mapInfo) if err != nil { fmt.Println(err) } if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) } default: fmt.Println("qyxy_listen : processUdpMsg =====") } } // SendUdpMsg 通知处理企业新增数据 func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) { bytes, _ := json.Marshal(data) UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target) util.Debug("SendUdpMsg:=>", data) util.Debug("target :=>", target.IP, target.Port) }