package main import ( "encoding/json" "fmt" "jy/mongodbutil" "log" mu "mfw/util" "net" "net/rpc" qu "qfw/util" "strings" "gopkg.in/mgo.v2/bson" ) var udpclient mu.UdpClient //udp对象 var Sysconfig map[string]interface{} var MgoIP, MgoDB, MgoC, MgoFileFiled string var ChanB chan bool func init() { qu.ReadConfig(&Sysconfig) MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"]) MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"]) MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"]) MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo") if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" { log.Println("获取配置文件参数失败", Sysconfig) return } mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB) ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5)) } func main() { log.Println(Sysconfig) udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024} udpclient.Listen(processUdpMsg) log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"]) b := make(chan bool, 1) <-b } // "file2text": "192.168.3.207:1234", func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer qu.Catch() switch act { case mu.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { log.Println("json err :", err, string(data)) return } gid := strings.TrimSpace(mapInfo["gtid"].(string)) lid := strings.TrimSpace(mapInfo["lteid"].(string)) if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) { if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{ "_id": bson.M{ "$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid), }, MgoFileFiled: bson.M{ "$ne": nil, }, }, //if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}}, nil, `{"_id":"1",`+MgoFileFiled+`:"1"}`, false, -1, -1); !b { log.Println("查询数据失败 :", string(data)) } else { fmt.Println(len(*findAll)) if len(*findAll) <= 0 { log.Println("查询数据为空 :", string(data)) return } for _, v := range *findAll { qmap := *qu.ObjToMap(v) mid := qmap["_id"] if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok { log.Println(mid, "mgo 转换异常", MgoFileFiled) continue } else { switch v["attachments"].(type) { case map[string]interface{}: att := v["attachments"].(map[string]interface{}) for _, vaatt := range att { if fileinfo, ok := vaatt.(map[string]interface{}); !ok { log.Println(mid, "mgo 结构体转换失败", vaatt) continue } else { ChanB <- true go save(mid, qmap, fileinfo) } } } } //fileMap := *qu.ObjToMap(qmap["projectinfo"]) //fmt.Println(fileMap["attachments"]) } } } else { log.Println("开始id或结束id参数错误:", string(data)) } case mu.OP_NOOP: //下个节点回应 log.Println("接收成功", string(data)) } } func save(mid interface{}, qmap, fileinfo map[string]interface{}) { defer qu.Catch() defer func() { <-ChanB }() type FileData struct { Fid string Name string Type string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls Content string //识别内容 } client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"])) if err != nil { log.Println(mid, "rpc err :", err) return } defer client.Close() var reply []byte //bs, _ := ioutil.ReadFile("1.docx") fileData := &FileData{ Name: qu.ObjToString(fileinfo["filename"]), Fid: qu.ObjToString(fileinfo["fid"]), //附件id Type: qu.ObjToString(fileinfo["ftype"]), } log.Println(mid, fileData) err = client.Call("FileToText.FileToContext", fileData, &reply) if err != nil { log.Println(mid, "call ocr error:", err) return } //fileinfo["ftype"] = "doc" //reply = []byte("jdsfkldasjflkj") //fileinfo["ftype"] = "zip" //testfiles := []map[string]interface { //}{ // {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"}, // {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"}, // {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"}, //} //reply, _ = json.Marshal(testfiles) if len(reply) == 0{ log.Println(mid, "rpc返回数据为空:", string(reply)) return } log.Println(mid, string(reply)) rdata := make(map[string]interface{}) if err := json.Unmarshal(reply, &rdata); err != nil { log.Println(mid, "rpc返回数据解析失败:", err) return } if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" { if qu.ObjToString(fileinfo["ftype"]) == "rar" || qu.ObjToString(fileinfo["ftype"]) == "zip" { fileinfo["content"] = rdata["contextc"] } else { fileinfo["content"] = rdata["context"] } if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{ "$set": bson.M{ MgoFileFiled: qmap[MgoFileFiled], }, }) { log.Println(mid, "mongo更新数据失败") } else { log.Println(mid, "mongo更新数据成功") } } else { log.Println(mid, "调用rpc服务解析异常:", rdata["err"]) } //if qu.ObjToString(fileinfo["ftype"]) == "zip" || qu.ObjToString(fileinfo["ftype"]) == "rar" { // fileDatas := make([]map[string]interface{}, 0) // if err := json.Unmarshal(reply, &fileDatas); err != nil { // log.Println("json转换错误", mid, err) // return // } // fileinfo["content"] = fileDatas //} else { // fileinfo["content"] = string(reply) //} //if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{ // "$set": bson.M{ // MgoFileFiled: qmap[MgoFileFiled], // }, //}) { // log.Println(mid, "更新数据失败") //} else { // log.Println(mid, "更新数据成功") //} }