123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- package main
- import (
- "fmt"
- "log"
- "time"
- elastic "es"
- "flag"
- "mongodb"
- common "qfw/util"
- "github.com/robfig/cron"
- )
- var (
- Mgo *mongodb.MongodbSim
- Es elastic.Es
- cfg = new(Config)
- SEXhs = common.SimpleEncrypt{Key: "topJYBX2019"}
- mode = flag.Int("m", 1, "")
- )
- func init() {
- common.ReadConfig(&cfg)
- Mgo = mongodb.NewMgo(cfg.Db.Address, cfg.Db.DbName, cfg.Db.DbSize)
- Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password)
- }
- func FindData() {
- session := Mgo.GetMgoConn()
- query := map[string]interface{}{
- "createtime": map[string]interface{}{
- "$gte": time.Now().Unix() - 86400,
- },
- "$or": []map[string]interface{}{
- map[string]interface{}{
- "projectId": "",
- },
- map[string]interface{}{
- "projectId": map[string]interface{}{
- "$exists": 0,
- },
- },
- }}
- defer Mgo.DestoryMongoConn(session)
- iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Iter()
- thisData, count := map[string]interface{}{}, 0
- for {
- if !iter.Next(&thisData) {
- break
- }
- count++
- id := common.ObjToString(thisData["id"])
- query := `{"query": {"bool": {"must": [{"term": {"ids": "%s"}}],"must_not": [],"should": []}}}`
- querys := fmt.Sprintf(query, id)
- projectId := ""
- data := Es.Get("projectset", "projectset", querys)
- if data != nil && *data != nil && len(*data) > 0 {
- projectId = common.ObjToString((*data)[0]["_id"])
- log.Println("projectId", projectId, count)
- } else {
- log.Println("ES未查到项目id", id, count)
- }
- //
- if projectId != "" {
- ok := Mgo.UpdateById("usermail", thisData["_id"], map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}})
- if ok {
- log.Println("项目id更新成功 ", id, projectId)
- } else {
- log.Println("项目id更新失败! ", id, projectId)
- }
- }
- thisData = map[string]interface{}{}
- }
- }
- func main() {
- flag.Parse()
- if *mode == 2 {
- FindHistory()
- } else {
- FindData()
- c := cron.New()
- c.AddFunc(cfg.CornExp, func() {
- FindData()
- })
- c.Start()
- select {}
- }
- }
- func FindHistory() {
- session := Mgo.GetMgoConn()
- query := map[string]interface{}{
- "$or": []map[string]interface{}{
- map[string]interface{}{
- "projectId": "",
- },
- map[string]interface{}{
- "projectId": map[string]interface{}{
- "$exists": 0,
- },
- },
- }}
- defer Mgo.DestoryMongoConn(session)
- iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Iter()
- thisData := map[string]interface{}{}
- for {
- if !iter.Next(&thisData) {
- break
- }
- id := common.ObjToString(thisData["id"])
- query := `{"query": {"bool": {"must": [{"term": {"ids": "%s"}}],"must_not": [],"should": []}}}`
- querys := fmt.Sprintf(query, id)
- projectId := ""
- data := Es.Get("projectset", "projectset", querys)
- if data != nil && *data != nil && len(*data) > 0 {
- projectId = common.ObjToString((*data)[0]["_id"])
- log.Println("projectId", projectId)
- } else {
- log.Println("ES未查到项目id", id)
- }
- //
- if projectId != "" {
- ok := Mgo.UpdateById("usermail", thisData["_id"], map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}})
- if ok {
- log.Println("项目id更新成功 ", id, projectId)
- } else {
- log.Println("项目id更新失败! ", id, projectId)
- }
- }
- thisData = map[string]interface{}{}
- }
- }
|