// extractudp package extract import ( "encoding/json" db "jy/mongodbutil" ju "jy/util" "log" mu "mfw/util" "net" qu "qfw/util" "gopkg.in/mgo.v2/bson" ) var Udpclient mu.UdpClient //udp对象 var nextNodes []map[string]interface{} //udp通知抽取 func ExtractUdp() { nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{})) Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024} Udpclient.Listen(processUdpMsg) } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { log.Println(err) } else { sid, _ := rep["gtid"].(string) eid, _ := rep["lteid"].(string) stype, _ := rep["stype"].(string) if stype == "distributed" { //分布式抽取分支 log.Println("分布式抽取id段", sid, eid) InstanceId := qu.ObjToString(rep["InstanceId"]) db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`, map[string]interface{}{ "$set": map[string]interface{}{ "extstatus": "running", }, }, true, false) ExtractByUdp(sid, eid, qu.ObjToString(rep["InstanceId"])) db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`, map[string]interface{}{ "$set": map[string]interface{}{ "extstatus": "ok", }, }, true, false) log.Println("分布式抽取完成", sid, eid, "释放esc实例", qu.ObjToString(rep["ip"])) } else { log.Println("udp通知抽取id段", sid, eid) ExtractByUdp(sid, eid) log.Println("udp通知抽取完成,eid=", eid) for _, m := range nextNodes { by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": qu.ObjToString(m["stype"]), }) err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(m["addr"].(string)), Port: qu.IntAll(m["port"]), }) if err != nil { log.Println(err) } } } } case mu.OP_NOOP: //下个节点回应 var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { log.Println(err) } else { log.Println(rep) } } } //根据id区间抽取 func ExtractByUdp(sid, eid string, instanceId ...string) { ext := &ExtractTask{} ext.Id = qu.ObjToString(ju.Config["udptaskid"]) ext.InitTaskInfo() ext.TaskInfo.DB = db.MgoFactory(2, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB) ext.InitRulePres() ext.InitRuleBacks() ext.InitRuleCore() ext.InitTag() ext.InitClearFn() if ext.IsExtractCity { //版本上控制是否开始城市抽取 //初始化城市DFA信息 ext.InitDFA() } //质量审核 ext.InitAuditRule() ext.InitAuditClass() ext.InitAuditRecogField() go ext.ResultSave() go ext.BidSave() ext.IsRun = true if len(instanceId) > 0 { //分布式抽取进度 query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}} count1 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query) count2 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", query) count := count1 + count2 pageNum := (count + PageSize - 1) / PageSize limit := PageSize if count < PageSize { limit = count } log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query) startI := 0 //接着上次任务执行 sidback := sid esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`) startI = qu.IntAll((*esc)["pagecurrent"]) if qu.ObjToString((*esc)["lastId"]) != "" { sid = qu.ObjToString((*esc)["lastId"]) } if qu.ObjToString((*esc)["lastIdback"]) != "" { sidback = qu.ObjToString((*esc)["lastIdback"]) } for i := startI; i < pageNum; i++ { query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}} log.Printf("page=%d,query=%v", i+1, query) if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query) > 0 { list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit) for _, v := range *list { //log.Println(v["_id"]) j := PreInfo(v) ext.TaskInfo.ProcessPool <- true go ext.ExtractProcess(j) sid = qu.BsonIdToSId(v["_id"]) } db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`, map[string]interface{}{"$set": map[string]interface{}{ "lastId": sid, }}, true, false) } queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}} log.Printf("page=%d,queryback=%v", i+1, queryback) if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 { list2, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit) for _, v := range *list2 { //log.Println(v["_id"]) j := PreInfo(v) ext.TaskInfo.ProcessPool <- true go ext.ExtractProcess(j) sidback = qu.BsonIdToSId(v["_id"]) } db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`, map[string]interface{}{"$set": map[string]interface{}{ "lastIdback": sidback, }}, true, false) } db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`, map[string]interface{}{"$set": map[string]interface{}{ "pagetotal": pageNum, "pagecurrent": i + 1, }}, true, false) } } else { //普通抽取 query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}} list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1) for _, v := range *list { //log.Println(v["_id"]) j := PreInfo(v) ext.TaskInfo.ProcessPool <- true go ext.ExtractProcess(j) } } }