package main import ( "fmt" "log" "time" elastic "es" "flag" "mongodb" common "qfw/util" "github.com/robfig/cron" ) var ( Mgo *mongodb.MongodbSim Es elastic.Es cfg = new(Config) SEXhs = common.SimpleEncrypt{Key: "topJYBX2019"} mode = flag.Int("m", 1, "") ) func init() { common.ReadConfig(&cfg) Mgo = mongodb.NewMgo(cfg.Db.Address, cfg.Db.DbName, cfg.Db.DbSize) Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password) } func FindData() { session := Mgo.GetMgoConn() query := map[string]interface{}{ "createtime": map[string]interface{}{ "$gte": time.Now().Unix() - 86400, }, "$or": []map[string]interface{}{ map[string]interface{}{ "projectId": "", }, map[string]interface{}{ "projectId": map[string]interface{}{ "$exists": 0, }, }, }} defer Mgo.DestoryMongoConn(session) iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Iter() thisData, count := map[string]interface{}{}, 0 for { if !iter.Next(&thisData) { break } count++ id := common.ObjToString(thisData["id"]) query := `{"query": {"bool": {"must": [{"term": {"ids": "%s"}}],"must_not": [],"should": []}}}` querys := fmt.Sprintf(query, id) projectId := "" data := Es.Get("projectset", "projectset", querys) if data != nil && *data != nil && len(*data) > 0 { projectId = common.ObjToString((*data)[0]["_id"]) log.Println("projectId", projectId, count) } else { log.Println("ES未查到项目id", id, count) } // if projectId != "" { ok := Mgo.UpdateById("usermail", thisData["_id"], map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}}) if ok { log.Println("项目id更新成功 ", id, projectId) } else { log.Println("项目id更新失败! ", id, projectId) } } thisData = map[string]interface{}{} } } func main() { flag.Parse() if *mode == 2 { FindHistory() } else { FindData() c := cron.New() c.AddFunc(cfg.CornExp, func() { FindData() }) c.Start() select {} } } func FindHistory() { session := Mgo.GetMgoConn() query := map[string]interface{}{ "$or": []map[string]interface{}{ map[string]interface{}{ "projectId": "", }, map[string]interface{}{ "projectId": map[string]interface{}{ "$exists": 0, }, }, }} defer Mgo.DestoryMongoConn(session) iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Iter() thisData := map[string]interface{}{} for { if !iter.Next(&thisData) { break } id := common.ObjToString(thisData["id"]) query := `{"query": {"bool": {"must": [{"term": {"ids": "%s"}}],"must_not": [],"should": []}}}` querys := fmt.Sprintf(query, id) projectId := "" data := Es.Get("projectset", "projectset", querys) if data != nil && *data != nil && len(*data) > 0 { projectId = common.ObjToString((*data)[0]["_id"]) log.Println("projectId", projectId) } else { log.Println("ES未查到项目id", id) } // if projectId != "" { ok := Mgo.UpdateById("usermail", thisData["_id"], map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}}) if ok { log.Println("项目id更新成功 ", id, projectId) } else { log.Println("项目id更新失败! ", id, projectId) } } thisData = map[string]interface{}{} } }