package main import ( "encoding/json" "gopkg.in/mgo.v2/bson" "jy/mongodbutil" "log" mu "mfw/util" "net" qu "qfw/util" "strings" "sync" ) var udpclient mu.UdpClient //udp对象 var Sysconfig map[string]interface{} var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string var sys sync.RWMutex func init() { qu.ReadConfig(&Sysconfig) MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"]) MgoDB = qu.ObjToString(Sysconfig["mongodb_db"]) MgoC = qu.ObjToString(Sysconfig["mongodb_c"]) SidField = qu.ObjToString(Sysconfig["json_sidfiled"]) EidField = qu.ObjToString(Sysconfig["json_eidfiled"]) MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo") if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" { log.Println("获取配置文件参数失败", Sysconfig) return } mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB) log.Println(mongodbutil.Mgo.Get().Ping()) } func main() { udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024} udpclient.Listen(processUdpMsg) log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"]) if Sysconfig["broadcast"].(bool){//重启的话通知分布式节点 ips := qu.ObjToString(Sysconfig["broadcast_ips"]) ipsArr := strings.Split(ips, ";") for _,v := range ipsArr{ udpclient.WriteUdp([]byte{},mu.OP_NOOP,&net.UDPAddr{ IP: net.ParseIP(v), Port: qu.IntAll(Sysconfig["broadcast_port"]), }) } } b := make(chan bool, 1) <-b } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer qu.Catch() switch act { case mu.OP_TYPE_DATA: //保存服务 go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra) tmp := make(map[string]interface{}) err := json.Unmarshal(data, &tmp) if err != nil { go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra) return } tmp["start"] = tmp[qu.ObjToString(SidField)] bytes, _ := json.Marshal(tmp) b := mongodbutil.Mgo.Save("ocr_task", string(bytes)) log.Println("保存id:",b) case mu.OP_NOOP: //其他节点回应消息打印 log.Println("节点接收成功", string(data),ra.String()) case mu.OP_GET_DOWNLOADERCODE: //分发任务 if `{"permission":"get_ocr_task"}` != string(data){ log.Println("没有权限:",string(data),ra) go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra) return } sys.Lock() datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1) if len(*datas) == 0 { go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra) sys.Unlock() return } tmp := (*datas)[0] ObjectId := tmp["_id"] sid := qu.ObjToString(tmp[qu.ObjToString(SidField)]) eid := qu.ObjToString(tmp[qu.ObjToString(EidField)]) rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{ "$gt": bson.ObjectIdHex(sid), }}, `{"_id":1,"`+MgoFileFiled+`":1}`) //log.Println(rdata) if len((*rdata)) == 0 { go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra) sys.Unlock() return } newId := (*rdata)["_id"] if newId.(bson.ObjectId).Hex() >= eid { go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//起始位置 go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务 totmp := make(map[string]string) totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")]) totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)]) tobyte, _ := json.Marshal(totmp) go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])), Port: qu.IntAll(Sysconfig["toudpport"]), }) log.Println("ocr_task处理完成,发送下个节点", string(tobyte)) mongodbutil.Mgo.Del("ocr_task", bson.M{"_id":ObjectId.(bson.ObjectId)}) sys.Unlock() return } go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务 //log.Println(newId.(bson.ObjectId).Hex()) tmp[SidField] = newId.(bson.ObjectId).Hex() mongodbutil.Mgo.Update("ocr_task", bson.M{"_id":ObjectId.(bson.ObjectId)},tmp,false,false) sys.Unlock() } }