123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- // extractudp
- package extract
- import (
- "encoding/json"
- db "jy/mongodbutil"
- ju "jy/util"
- "log"
- mu "mfw/util"
- "net"
- qu "qfw/util"
- "gopkg.in/mgo.v2/bson"
- )
- var Udpclient mu.UdpClient //udp对象
- var nextNodes []map[string]interface{}
- //udp通知抽取
- func ExtractUdp() {
- nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
- Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
- Udpclient.Listen(processUdpMsg)
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var rep map[string]interface{}
- err := json.Unmarshal(data, &rep)
- if err != nil {
- log.Println(err)
- } else {
- sid, _ := rep["gtid"].(string)
- eid, _ := rep["lteid"].(string)
- stype, _ := rep["stype"].(string)
- if stype == "distributed" { //分布式抽取分支
- log.Println("分布式抽取id段", sid, eid)
- InstanceId := qu.ObjToString(rep["InstanceId"])
- db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "extstatus": "running",
- },
- }, true, false)
- ExtractByUdp(sid, eid, qu.ObjToString(rep["InstanceId"]))
- db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "extstatus": "ok",
- },
- }, true, false)
- log.Println("分布式抽取完成", sid, eid, "释放esc实例", qu.ObjToString(rep["ip"]))
- } else {
- log.Println("udp通知抽取id段", sid, eid)
- ExtractByUdp(sid, eid)
- log.Println("udp通知抽取完成,eid=", eid)
- for _, m := range nextNodes {
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": qu.ObjToString(m["stype"]),
- })
- err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP(m["addr"].(string)),
- Port: qu.IntAll(m["port"]),
- })
- if err != nil {
- log.Println(err)
- }
- }
- }
- }
- case mu.OP_NOOP: //下个节点回应
- var rep map[string]interface{}
- err := json.Unmarshal(data, &rep)
- if err != nil {
- log.Println(err)
- } else {
- log.Println(rep)
- }
- }
- }
- //根据id区间抽取
- func ExtractByUdp(sid, eid string, instanceId ...string) {
- ext := &ExtractTask{}
- ext.Id = qu.ObjToString(ju.Config["udptaskid"])
- ext.InitTaskInfo()
- ext.TaskInfo.DB = db.MgoFactory(2, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
- ext.InitRulePres()
- ext.InitRuleBacks()
- ext.InitRuleCore()
- ext.InitTag()
- ext.InitClearFn()
- if ext.IsExtractCity { //版本上控制是否开始城市抽取
- //初始化城市DFA信息
- ext.InitDFA()
- }
- //质量审核
- ext.InitAuditRule()
- ext.InitAuditClass()
- ext.InitAuditRecogField()
- go ext.ResultSave()
- go ext.BidSave()
- ext.IsRun = true
- if len(instanceId) > 0 { //分布式抽取进度
- query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
- count1 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
- count2 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", query)
- count := count1 + count2
- pageNum := (count + PageSize - 1) / PageSize
- limit := PageSize
- if count < PageSize {
- limit = count
- }
- log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
- startI := 0 //接着上次任务执行
- sidback := sid
- esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`)
- startI = qu.IntAll((*esc)["pagecurrent"])
- if qu.ObjToString((*esc)["lastId"]) != "" {
- sid = qu.ObjToString((*esc)["lastId"])
- }
- if qu.ObjToString((*esc)["lastIdback"]) != "" {
- sidback = qu.ObjToString((*esc)["lastIdback"])
- }
- for i := startI; i < pageNum; i++ {
- query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
- log.Printf("page=%d,query=%v", i+1, query)
- if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query) > 0 {
- list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
- for _, v := range *list {
- //log.Println(v["_id"])
- j := PreInfo(v)
- ext.TaskInfo.ProcessPool <- true
- go ext.ExtractProcess(j)
- sid = qu.BsonIdToSId(v["_id"])
- }
- db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
- map[string]interface{}{"$set": map[string]interface{}{
- "lastId": sid,
- }}, true, false)
- }
- queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
- log.Printf("page=%d,queryback=%v", i+1, queryback)
- if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
- list2, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
- for _, v := range *list2 {
- //log.Println(v["_id"])
- j := PreInfo(v)
- ext.TaskInfo.ProcessPool <- true
- go ext.ExtractProcess(j)
- sidback = qu.BsonIdToSId(v["_id"])
- }
- db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
- map[string]interface{}{"$set": map[string]interface{}{
- "lastIdback": sidback,
- }}, true, false)
- }
- db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
- map[string]interface{}{"$set": map[string]interface{}{
- "pagetotal": pageNum,
- "pagecurrent": i + 1,
- }}, true, false)
- }
- } else { //普通抽取
- query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
- list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
- for _, v := range *list {
- //log.Println(v["_id"])
- j := PreInfo(v)
- ext.TaskInfo.ProcessPool <- true
- go ext.ExtractProcess(j)
- }
- }
- }
|