package main import ( "encoding/json" "gopkg.in/mgo.v2/bson" "jy/mongodbutil" "log" mu "mfw/util" "net" "net/rpc" "path" qu "qfw/util" "strings" "sync/atomic" "time" ) var udpclient mu.UdpClient //udp对象 var Sysconfig map[string]interface{} var MgoIP, MgoDB, MgoC, MgoFileFiled, GetDataIp, GetDataPort string var ChanA, ChanB = make(chan bool), make(chan bool, 1) var tmpNUM int32 func init() { qu.ReadConfig(&Sysconfig) MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"]) MgoDB = qu.ObjToString(Sysconfig["mongodb_db"]) MgoC = qu.ObjToString(Sysconfig["mongodb_c"]) GetDataIp = qu.ObjToString(Sysconfig["get_data_ip"]) GetDataPort = qu.ObjToString(Sysconfig["get_data_port"]) MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo") if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" { log.Println("获取配置文件参数失败", Sysconfig) return } mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB) log.Println(mongodbutil.Mgo.Get().Ping()) } func main() { log.Println(Sysconfig) udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024} udpclient.Listen(processUdpMsg) log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"]) ticker := time.NewTicker(time.Second * 30) var num int task: for { select { case <-ticker.C: num++ go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, &net.UDPAddr{ IP: net.ParseIP(GetDataIp), Port: qu.IntAll(GetDataPort), }) log.Println("程序启动,开启循环请求数据信息", num) case abc, ok := <-ChanA: log.Println("abc,ok:", abc, ok) if !ok { log.Println("主动循环请求数据已关闭") break task } } } b := make(chan bool, 1) <-b } // "file2text": "192.168.3.207:1234", func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: atomic.AddInt32(&tmpNUM, 1) v := atomic.LoadInt32(&tmpNUM) if v == 1 { ChanA <- true close(ChanA) } log.Println("data:", string(data), ra.String()) var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { if string(data) == "没有新数据" { time.Sleep(time.Second * 30) } else { time.Sleep(time.Second * 30) } go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra) return } if qu.ObjToString(mapInfo["permission"]) != "ocr_task" { log.Println("数据异常 :", string(data), ra.String()) if qu.ObjToString(mapInfo["permission"]) == "stop" { log.Println(mapInfo) panic("释放实例") } time.Sleep(time.Second * 30) go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra) return } ObjectId := qu.ObjToString(mapInfo["id"]) if ObjectId == "" || !bson.IsObjectIdHex(ObjectId) { log.Println("获取数据id错误", mapInfo, ra.String()) time.Sleep(time.Second * 30) go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra) return } log.Println("获取数据成功:", mapInfo, ra.String()) data, _ := mongodbutil.Mgo.FindById(MgoC, ObjectId, bson.M{"_id": 1, MgoFileFiled: 1}) if len(*data) == 0 { if qu.ObjToString(mapInfo["is_start"]) == "true" { return } go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra) return } if v, ok := (*data)[MgoFileFiled].(map[string]interface{}); !ok { if qu.ObjToString(mapInfo["is_start"]) == "true" { return } go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra) return } else { switch v["attachments"].(type) { case map[string]interface{}: att := v["attachments"].(map[string]interface{}) updateNum := 0 for attk, vaatt := range att { if fileinfo, ok := vaatt.(map[string]interface{}); !ok { //log.Println(mid, "mgo 结构体转换失败", vaatt) continue } else { ChanB <- true if qu.ObjToString(fileinfo["fid"]) == "" { //log.Println(mid, "mgo ", MgoFileFiled,"没有fid ") <-ChanB continue } save(bson.ObjectIdHex(ObjectId), attk, data, &fileinfo, &updateNum) <-ChanB } } } if qu.ObjToString(mapInfo["is_start"]) == "true" { return } go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra) } case mu.OP_NOOP: //下个节点回应 log.Println("接收成功", string(data)) case mu.OP_DELETE_DOWNLOADERCODES: log.Println(string(data)) udpclient.WriteUdp([]byte(`{"permission":"stop"}`), mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(Sysconfig["udpip"].(string)), Port: qu.IntAll(Sysconfig["udpport"].(string)), }) } } func save(mid interface{}, attk string, qmap, fileinfo *map[string]interface{}, updatenum *int) { defer qu.Catch() type FileData struct { ObjId string //Id OrgUrl string //源下载地址 Fid string Name string Type string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls Content string //识别内容 } client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"])) if err != nil { log.Println(mid, "rpc err :", err) return } defer client.Close() var reply []byte //bs, _ := ioutil.ReadFile("1.docx") var fffpath string fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"])) if strings.TrimSpace(fffpath) == "" { fffpath = qu.ObjToString((*fileinfo)["ftype"]) } else { fffpath = fffpath[1:] } fileData := &FileData{ ObjId: mid.(bson.ObjectId).String(), OrgUrl: qu.ObjToString((*fileinfo)["url"]), Name: qu.ObjToString((*fileinfo)["filename"]), Fid: qu.ObjToString((*fileinfo)["fid"]), //附件id Type: fffpath, } //log.Println(mid, fileData) err = client.Call("FileToText.FileToContext", fileData, &reply) if err != nil { log.Println(mid, "call ocr error:", err) return } //fileinfo["ftype"] = "doc" //reply = []byte("jdsfkldasjflkj") //fileinfo["ftype"] = "zip" //testfiles := []map[string]interface { //}{ // {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"}, // {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"}, // {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"}, //} //reply, _ = json.Marshal(testfiles) if len(reply) == 0 { log.Println(mid, "rpc返回数据为空:", qu.ObjToString((*fileinfo)["fid"]), string(reply)) return } //log.Println(mid, string(reply)) rdata := make(map[string]interface{}) if err := json.Unmarshal(reply, &rdata); err != nil { log.Println(mid, "rpc返回数据解析失败:", qu.ObjToString((*fileinfo)["fid"]), err) return } if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" { if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" { (*fileinfo)["content"] = rdata["contextc"] } else { (*fileinfo)["content"] = rdata["context"] } (*fileinfo)["expend"] = rdata["expend"] delete(*fileinfo, "update") //log.Println((*fileinfo)) (*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk] = *fileinfo //asdf := (*qmap)[MgoFileFiled].(map[string]interface{}) //qwer := asdf["attachments"].(map[string]interface{}) //qwer[attk] =*fileinfo //log.Println((*qmap)[MgoFileFiled]) updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{ "$set": bson.M{ MgoFileFiled: (*qmap)[MgoFileFiled], }, }) if updateBool { *updatenum++ mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{ "$set": bson.M{ "updatefileNum": &updatenum, },}) log.Println(mid, "mongo更新数据成功") } else { log.Println(mid, "mongo更新数据失败", qu.ObjToString((*fileinfo)["fid"])) } //nowHour := time.Now().Hour() //rdlock.Lock() //if nowHour != hourNum { // log.Println("send email:", SendMail(fmt.Sprint(updateBool, mid))) // hourNum = nowHour //} //rdlock.Unlock() } else { log.Println(mid, "调用rpc服务解析异常:", mid, qu.ObjToString((*fileinfo)["fid"]), rdata["err"]) } } // //var hourNum int //var rdlock sync.RWMutex