123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- /**
- 分布式抽取
- **/
- package cluster
- import (
- "encoding/json"
- "fmt"
- "jy/extract"
- db "jy/mongodbutil"
- ju "jy/util"
- "log"
- mu "mfw/util"
- "net"
- qu "qfw/util"
- "time"
- "gopkg.in/mgo.v2/bson"
- )
- var EscIds map[string][]string //id区间
- //根据esc数量实例数量id划段
- func IdsRange(table, endate string) int {
- start := time.Date(2015, 11, 3, 0, 0, 0, 0, time.Local)
- end, _ := time.ParseInLocation(qu.Date_Short_Layout, endate, time.Local)
- EscIds = map[string][]string{}
- list, _ := db.Mgo.Find("ecs", `{"Status":"Running"}`, nil, nil, false, -1, -1)
- ids := RangeIdsByDate(len(*list), start, end)
- for k, v := range *list {
- db.Mgo.UpdateById("ecs", qu.BsonIdToSId(v["_id"]), map[string]interface{}{
- "$set": map[string]interface{}{
- "extask": []string{
- ids[fmt.Sprint(k)][0],
- ids[fmt.Sprint(k)][1],
- ids[fmt.Sprint(k)][2],
- qu.ObjToString(v["InstanceId"]),
- },
- },
- })
- }
- return len(*list)
- }
- //启动任务
- func RunEcsTask() int {
- list, _ := db.Mgo.Find("ecs", `{"extstatus":"deploy"}`, nil, nil, false, -1, -1)
- num := 0
- for _, v := range *list {
- if extask, ok := v["extask"].([]interface{}); ok {
- ip := qu.ObjToString(v["ip_nw"])
- by, _ := json.Marshal(map[string]interface{}{
- "ip": ip,
- "gtid": extask[0],
- "lteid": extask[1],
- "InstanceId": extask[3],
- "stype": "distributed",
- })
- err := extract.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP(ip),
- Port: qu.IntAll(ju.Config["udpport"]),
- })
- if err != nil {
- log.Println(err)
- } else {
- num++
- time.Sleep(2 * time.Second)
- log.Println("分发任务", string(by))
- }
- }
- }
- return num
- }
- //id分段
- func RangeIdsByDate(escnum int, start, edate time.Time) map[string][]string {
- ids := map[string][]string{}
- task, _ := db.Mgo.FindById("task", qu.ObjToString(ju.Config["udptaskid"]), nil)
- log.Println(qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
- DB := db.MgoFactory(2, 3, 120, qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
- total := DB.Count("bidding", `{"comeintime":{"$lt":`+fmt.Sprint(edate.Unix())+`}}`)
- total_back := DB.Count("bidding_back", `{}`)
- total += total_back
- pagesize := (total + escnum - 1) / escnum
- log.Printf("total:%d total_back:%d pagesize:%d escnum:%d", total, total_back, pagesize, escnum)
- nums := 0
- for i := 0; i < escnum; i++ {
- log.Println("escnum", i)
- sid := bson.NewObjectIdWithTime(start)
- var eid bson.ObjectId
- var idsnum = 0
- table := "bidding_back"
- for {
- tmpsid := bson.NewObjectIdWithTime(start)
- end := start.Add(4 * time.Hour)
- if end.Unix() > edate.Unix() {
- eid = bson.NewObjectIdWithTime(edate)
- } else {
- eid = bson.NewObjectIdWithTime(end)
- }
- start = end
- query := bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": eid}}
- count := DB.Count(table, query)
- log.Println(count, table, query)
- if count < 1 { //校验是否切换table
- tmpnum := DB.Count(table, bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": bson.NewObjectIdWithTime(end.Add(24 * 10 * time.Hour) /*连续10天无数据*/)}})
- if tmpnum < 1 && table != "bidding" {
- table = "bidding"
- start = start.Add(-4 * time.Hour)
- continue
- }
- } else {
- idsnum += count
- }
- log.Printf("i:%d count:%d,date:%s", i, idsnum, end.Format(qu.Date_Full_Layout))
- if idsnum >= pagesize || start.Unix() > time.Now().Unix() || count > 5000000 { //测试数据count > 5000000
- break
- }
- }
- nums += idsnum
- ids[fmt.Sprint(i)] = []string{qu.BsonIdToSId(sid), qu.BsonIdToSId(eid), fmt.Sprint(idsnum)}
- log.Println("nums", nums)
- }
- return ids
- }
|