/** 分布式抽取 **/ package cluster import ( "encoding/json" "fmt" "jy/extract" db "jy/mongodbutil" ju "jy/util" "log" mu "mfw/util" "net" qu "qfw/util" "time" "gopkg.in/mgo.v2/bson" ) var EscIds map[string][]string //id区间 //根据esc数量实例数量id划段 func IdsRange(table, endate string) int { start := time.Date(2015, 11, 3, 0, 0, 0, 0, time.Local) end, _ := time.ParseInLocation(qu.Date_Short_Layout, endate, time.Local) EscIds = map[string][]string{} list, _ := db.Mgo.Find("ecs", `{"Status":"Running"}`, nil, nil, false, -1, -1) ids := RangeIdsByDate(len(*list), start, end) for k, v := range *list { db.Mgo.UpdateById("ecs", qu.BsonIdToSId(v["_id"]), map[string]interface{}{ "$set": map[string]interface{}{ "extask": []string{ ids[fmt.Sprint(k)][0], ids[fmt.Sprint(k)][1], ids[fmt.Sprint(k)][2], qu.ObjToString(v["InstanceId"]), }, }, }) } return len(*list) } //启动任务 func RunEcsTask() int { list, _ := db.Mgo.Find("ecs", `{"extstatus":"deploy"}`, nil, nil, false, -1, -1) num := 0 for _, v := range *list { if extask, ok := v["extask"].([]interface{}); ok { ip := qu.ObjToString(v["ip_nw"]) by, _ := json.Marshal(map[string]interface{}{ "ip": ip, "gtid": extask[0], "lteid": extask[1], "InstanceId": extask[3], "stype": "distributed", }) err := extract.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(ip), Port: qu.IntAll(ju.Config["udpport"]), }) if err != nil { log.Println(err) } else { num++ time.Sleep(2 * time.Second) log.Println("分发任务", string(by)) } } } return num } //id分段 func RangeIdsByDate(escnum int, start, edate time.Time) map[string][]string { ids := map[string][]string{} task, _ := db.Mgo.FindById("task", qu.ObjToString(ju.Config["udptaskid"]), nil) log.Println(qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"])) DB := db.MgoFactory(2, 3, 120, qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"])) total := DB.Count("bidding", `{"comeintime":{"$lt":`+fmt.Sprint(edate.Unix())+`}}`) total_back := DB.Count("bidding_back", `{}`) total += total_back pagesize := (total + escnum - 1) / escnum log.Printf("total:%d total_back:%d pagesize:%d escnum:%d", total, total_back, pagesize, escnum) nums := 0 for i := 0; i < escnum; i++ { log.Println("escnum", i) sid := bson.NewObjectIdWithTime(start) var eid bson.ObjectId var idsnum = 0 table := "bidding_back" for { tmpsid := bson.NewObjectIdWithTime(start) end := start.Add(4 * time.Hour) if end.Unix() > edate.Unix() { eid = bson.NewObjectIdWithTime(edate) } else { eid = bson.NewObjectIdWithTime(end) } start = end query := bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": eid}} count := DB.Count(table, query) log.Println(count, table, query) if count < 1 { //校验是否切换table tmpnum := DB.Count(table, bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": bson.NewObjectIdWithTime(end.Add(24 * 10 * time.Hour) /*连续10天无数据*/)}}) if tmpnum < 1 && table != "bidding" { table = "bidding" start = start.Add(-4 * time.Hour) continue } } else { idsnum += count } log.Printf("i:%d count:%d,date:%s", i, idsnum, end.Format(qu.Date_Full_Layout)) if idsnum >= pagesize || start.Unix() > time.Now().Unix() || count > 5000000 { //测试数据count > 5000000 break } } nums += idsnum ids[fmt.Sprint(i)] = []string{qu.BsonIdToSId(sid), qu.BsonIdToSId(eid), fmt.Sprint(idsnum)} log.Println("nums", nums) } return ids }