123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package main
- import (
- "encoding/json"
- "gopkg.in/mgo.v2/bson"
- "jy/mongodbutil"
- "log"
- mu "mfw/util"
- "net"
- qu "qfw/util"
- "strings"
- "sync"
- )
- var udpclient mu.UdpClient //udp对象
- var Sysconfig map[string]interface{}
- var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
- var sys sync.RWMutex
- func init() {
- qu.ReadConfig(&Sysconfig)
- MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
- MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
- MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
- SidField = qu.ObjToString(Sysconfig["json_sidfiled"])
- EidField = qu.ObjToString(Sysconfig["json_eidfiled"])
- MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo")
- if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
- log.Println("获取配置文件参数失败", Sysconfig)
- return
- }
- mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
- log.Println(mongodbutil.Mgo.Get().Ping())
- }
- func main() {
- udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
- b := make(chan bool, 1)
- <-b
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- defer qu.Catch()
- switch act {
- case mu.OP_TYPE_DATA: //保存服务
- go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
- tmp := make(map[string]interface{})
- err := json.Unmarshal(data, &tmp)
- if err != nil {
- go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
- return
- }
- tmp["start"] = tmp[qu.ObjToString(SidField)]
- bytes, _ := json.Marshal(tmp)
- b := mongodbutil.Mgo.Save("ocr_task", string(bytes))
- log.Println("保存id:",b)
- case mu.OP_NOOP: //其他节点回应消息打印
- log.Println("接收成功", string(data))
- case mu.OP_GET_DOWNLOADERCODE: //分发任务
- sys.Lock()
- datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
- if len(*datas) == 0 {
- go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
- sys.Unlock()
- return
- }
- tmp := (*datas)[0]
- ObjectId := tmp["_id"]
- sid := qu.ObjToString(tmp[qu.ObjToString(SidField)])
- eid := qu.ObjToString(tmp[qu.ObjToString(EidField)])
- rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{
- "$gt": bson.ObjectIdHex(sid),
- }}, `{"_id":1,"`+MgoFileFiled+`":1}`)
- //log.Println(rdata)
- if len((*rdata)) == 0 {
- go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
- sys.Unlock()
- return
- }
- newId := (*rdata)["_id"]
- if newId.(bson.ObjectId).Hex() >= eid {
- go udpclient.WriteUdp([]byte(`{"id":"`+tmp["start"].(bson.ObjectId).Hex()+`"}`), mu.OP_TYPE_DATA, ra)//起始位置
- go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`"}`), mu.OP_TYPE_DATA, ra)//分发任务
- totmp := make(map[string]string)
- totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
- totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)])
- tobyte, _ := json.Marshal(totmp)
- go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])),
- Port: qu.IntAll(Sysconfig["toudpport"]),
- })
- log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
- mongodbutil.Mgo.Del("ocr_task", `{"_id":`+ObjectId.(bson.ObjectId)+`}`)
- sys.Unlock()
- return
- }
- go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`"}`), mu.OP_TYPE_DATA, ra)//分发任务
- //log.Println(newId.(bson.ObjectId).Hex())
- tmp[SidField] = newId.(bson.ObjectId).Hex()
- mongodbutil.Mgo.Update("ocr_task",
- bson.M{"_id":ObjectId.(bson.ObjectId)},tmp,false,false)
- sys.Unlock()
- }
- }
|