// extractudp package extract import ( "encoding/json" "fmt" "io/ioutil" "jy/cluster" db "jy/mongodbutil" ju "jy/util" log2 "log" mu "mfw/util" "net" "net/http" qu "qfw/util" "strings" "sync" log "github.com/donnie4w/go-logger/logger" "gopkg.in/mgo.v2/bson" ) var Udpclient mu.UdpClient //udp对象 var nextNodes []map[string]interface{} //udp通知抽取 func ExtractUdp() { nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{})) Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024} Udpclient.Listen(processUdpMsg) } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { log.Debug(err) } else { sid, _ := rep["gtid"].(string) eid, _ := rep["lteid"].(string) stype, _ := rep["stype"].(string) if sid == "" || eid == "" { log.Debug("err", "sid=", sid, ",eid=", eid) } else { if stype == "distributed" { //分布式抽取分支 go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra) log.Debug("分布式抽取id段", sid, " ", eid) InstanceId := qu.ObjToString(rep["InstanceId"]) db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`, map[string]interface{}{ "$set": map[string]interface{}{ "extstatus": "running", }, }, true, false) ExtractByUdp(sid, eid, ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"])) db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`, map[string]interface{}{ "$set": map[string]interface{}{ "extstatus": "ok", }, }, true, false) //<-time.NewTimer(time.Minute * time.Duration(qu.IntAll(ju.Config["DeleteInstanceTimeMinute"]))).C //cluster.DeleteInstance("instanceId[0]") log.Debug("分布式抽取完成", sid, " ", eid, "释放esc实例", qu.ObjToString(rep["ip"])) } else { udpinfo, _ := rep["key"].(string) if udpinfo == "" { udpinfo = "udpok" } go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra) log.Debug("udp通知抽取id段", sid, " ", eid) ExtractByUdp(sid, eid, ra) log.Debug("udp通知抽取完成,eid=", eid) for _, m := range nextNodes { by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": qu.ObjToString(m["stype"]), }) err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(m["addr"].(string)), Port: qu.IntAll(m["port"]), }) if err != nil { log.Debug(err) } } } } } case mu.OP_NOOP: //下个节点回应 log.Debug(string(data)) log2.Println(string(data)) case mu.OP_SEND_EMAIL: log.Debug("实例抽取完成,发送邮件:", string(data), ra.IP) log2.Println("实例抽取完成,发送邮件:", string(data), ra.IP) rep := make(map[string]interface{}) err := json.Unmarshal(data, &rep) if err != nil { log.Debug(err) log2.Println(string(data), ra.IP) } else { tmpstr := "" for k, v := range rep { switch k { case "desc": tmpstr += fmt.Sprint(v) + "," case "count": tmpstr += "实际抽取数据量" + fmt.Sprint(v) + "," case "index": tmpstr += "区间数据量为" + fmt.Sprint(v) + "," case "instanceId": tmpstr += "实例" + fmt.Sprint(v) + "," } } tmpstr = strings.TrimRight(tmpstr, ",") sendMail(tmpstr) cluster.ModifyInstanceAutoReleaseTime(qu.ObjToString(rep["instanceId"]), qu.IntAll(ju.Config["deleteInstanceTimeHour"])) } } } func sendMail(content string) { log2.Println(ju.Config["api"], ju.Config["tomail"], "jy-data-extract_3.2", "抽取完成:"+content) res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", ju.Config["api"], ju.Config["tomail"], "jy-data-extract_3.2", "抽取完成:"+content)) defer res.Body.Close() if err == nil { read, err := ioutil.ReadAll(res.Body) log2.Println("邮件发送:", string(read), err) } log2.Println("api email:", err) } var ext *ExtractTask //根据id区间抽取 func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) { defer qu.Catch() if ext == nil { ext = &ExtractTask{} ext.Id = qu.ObjToString(ju.Config["udptaskid"]) ext.InitTaskInfo() ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB) ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB) ext.InitRulePres() ext.InitRuleBacks() ext.InitRuleCore() ext.InitBlockRule() ext.InitTag() ext.InitClearFn() if ext.IsExtractCity { //版本上控制是否开始城市抽取 //初始化城市DFA信息 //ext.InitCityDFA() ext.InitCityInfo() ext.InitAreaCode() ext.InitPostCode() } //质量审核 ext.InitAuditFields() ext.InitAuditRule() ext.InitAuditClass() ext.InitAuditRecogField() //品牌抽取是否开启 ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool) ext.ResultSave(true) ext.BidSave(true) ext.IsRun = true } else { ext.BidTotal = 0 } index := 0 if len(instanceId) > 0 { //分布式抽取进度 query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}} count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query) count := count1 + count2 pageNum := (count + PageSize - 1) / PageSize limit := PageSize if count < PageSize { limit = count } fmt.Printf("count=%d,pageNum=%d,query=%v\n", count, pageNum, query) startI := 0 //接着上次任务执行 sidback := sid esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`) startI = qu.IntAll((*esc)["pagecurrent"]) if qu.ObjToString((*esc)["lastId"]) != "" { sid = qu.ObjToString((*esc)["lastId"]) } if qu.ObjToString((*esc)["lastIdback"]) != "" { sidback = qu.ObjToString((*esc)["lastIdback"]) } go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功,count=%d,pageNum=%d,query=%v\n", instanceId[1], count, pageNum, query)), mu.OP_NOOP, ra) for i := startI; i < pageNum; i++ { query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}} fmt.Printf("page=%d,query=%v\n", i+1, query) if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) > 0 { list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit) for _, v := range *list { if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据 continue } _id := qu.BsonIdToSId(v["_id"]) var j, jf *ju.Job if ext.IsFileField && v["projectinfo"] != nil { v["isextFile"] = true j, jf = ext.PreInfo(v) } else { j, _ = ext.PreInfo(v) } ext.TaskInfo.ProcessPool <- true go ext.ExtractProcess(j, jf) sid = _id index++ } db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`, map[string]interface{}{"$set": map[string]interface{}{ "lastId": sid, }}, true, false) } queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}} fmt.Printf("page=%d,queryback=%v\n", i+1, queryback) if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 { list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit) for _, v := range *list2 { if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据 continue } _id := qu.BsonIdToSId(v["_id"]) var j, jf *ju.Job if ext.IsFileField && v["projectinfo"] != nil { v["isextFile"] = true j, jf = ext.PreInfo(v) } else { j, _ = ext.PreInfo(v) } ext.TaskInfo.ProcessPool <- true go ext.ExtractProcess(j, jf) sidback = _id index++ } db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`, map[string]interface{}{"$set": map[string]interface{}{ "lastIdback": sidback, }}, true, false) } db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`, map[string]interface{}{"$set": map[string]interface{}{ "pagetotal": pageNum, "pagecurrent": i + 1, }}, true, false) } des := make(map[string]interface{}) des["desc"] = "分布式抽取完成,一小时后释放" des["count"] = count des["index"] = index des["instanceId"] = instanceId[0] des["instanceIP"] = instanceId[1] udpbytes, _ := json.Marshal(des) go Udpclient.WriteUdp(udpbytes, mu.OP_SEND_EMAIL, ra) log.Debug("抽取完成", "count:", count, "index:", index, "bidtotal:", ext.BidTotal) } else { //普通抽取 query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}} count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) log.Debug("查询条件为:", query, "查询条数:", count) pageNum := (count + PageSize - 1) / PageSize limit := PageSize if count < PageSize { limit = count } wg := sync.WaitGroup{} for i := 0; i < pageNum; i++ { query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}} fmt.Printf("page=%d,query=%v\n", i+1, query) list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit) for _, v := range *list { if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据 log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据") continue } _id := qu.BsonIdToSId(v["_id"]) var j, jf *ju.Job if ext.IsFileField && v["projectinfo"] != nil { v["isextFile"] = true j, jf = ext.PreInfo(v) } else { j, _ = ext.PreInfo(v) } ext.TaskInfo.ProcessPool <- true wg.Add(1) go func(wg *sync.WaitGroup, j, jf *ju.Job) { defer wg.Done() //log.Debug(index,j.SourceMid,) ext.ExtractProcess(j, jf) }(&wg, j, jf) index++ if index%1000 == 0 { log.Debug("index:", index, ",页码:", i+1, ",_id:", _id) } sid = _id if sid >= eid { break } } } wg.Wait() ext.BidSave(false) log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid) } }