package main import ( "github.com/cron" "go.mongodb.org/mongo-driver/bson" "log" "mongodb" "qfw/util" "reflect" "sync" "time" ) //定时任务 func TimeTask() { go SaveAdd() c := cron.New() cronstr := "0 0 2 * * ?" //每天2点执行 //cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次 err := c.AddFunc(cronstr, func() { SaveAdd() }) if err != nil { util.Debug(err) return } c.Start() } // SaveAdd 增量数据 func SaveAdd() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) pool := make(chan bool, 5) wg := &sync.WaitGroup{} if UpdateId == "" { util.Debug("update id err...") return } q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(UpdateId)}} util.Debug("q ---", q) it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%5000 == 0 { log.Println("current:", count) } if UpdateId < mongodb.BsonIdToSId(tmp["_id"]) { UpdateId = mongodb.BsonIdToSId(tmp["_id"]) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() esMap := map[string]interface{}{} //生索引字段处理 for _, field := range EsFields { if tmp[field] == nil { continue } if field == "buyerclass" { if reflect.TypeOf(tmp["buyerclass"]).String() == "[]interface {}" { esMap["buyerclass"] = util.ObjArrToStringArr(tmp["buyerclass"].([]interface{}))[0] } else { esMap["buyerclass"] = tmp["buyerclass"] } } else { esMap[field] = tmp[field] } } // 处理result if mp, ok := tmp["results"].([]interface{}); ok { var mpArr []map[string]interface{} for _, v := range mp { v1 := v.(map[string]interface{}) if v1["purchasing"] != nil { mpArr = append(mpArr, map[string]interface{}{"purchasing": v1["purchasing"]}) } } if len(mpArr) > 0 { esMap["results"] = mpArr } } EsSaveCache <- esMap }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("Run Over...Count:", count) } // SaveAll 存量数据生es func SaveAll() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) pool := make(chan bool, 10) wg := &sync.WaitGroup{} //q := bson.M{"_id": mongodb.StringTOBsonId("6194a3c105180be8dae19cbb")} it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%20000 == 0 { log.Println("current:", count, tmp["_id"]) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() esMap := map[string]interface{}{} //生索引字段处理 for _, field := range EsFields { if tmp[field] == nil { continue } if field == "buyerclass" { if reflect.TypeOf(tmp["buyerclass"]).String() == "[]interface {}" { esMap["buyerclass"] = util.ObjArrToStringArr(tmp["buyerclass"].([]interface{}))[0] } else { esMap["buyerclass"] = tmp["buyerclass"] } } else { esMap[field] = tmp[field] } } // 处理result if mp, ok := tmp["results"].([]interface{}); ok { var mpArr []map[string]interface{} for _, v := range mp { v1 := v.(map[string]interface{}) if v1["purchasing"] != nil { mpArr = append(mpArr, map[string]interface{}{"purchasing": v1["purchasing"]}) } } if len(mpArr) > 0 { esMap["results"] = mpArr } } EsSaveCache <- esMap }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("Run Over...Count:", count) } // SaveEs 过滤后数据存库 func SaveEs() { log.Println("Es Save...") arru := make([]map[string]interface{}, 100) indexu := 0 for { select { case v := <-EsSaveCache: arru[indexu] = v indexu++ if indexu == 100 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(Index, Itype, &arru, true) }(arru) arru = make([]map[string]interface{}, 100) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(Index, Itype, &arru, true) }(arru[:indexu]) arru = make([]map[string]interface{}, 100) indexu = 0 } } } }