package main import ( "encoding/json" "fmt" "log" "regexp" "sync" "time" "context" elastic "es" "mongodb" common "qfw/util" es "github.com/olivere/elastic" "github.com/robfig/cron" ) var ( Mgo *mongodb.MongodbSim Bidding *mongodb.MongodbSim Es elastic.Es cfg = new(Config) SEXhs = common.SimpleEncrypt{Key: "topJYBX2019"} ClearHtml = regexp.MustCompile("<[^>]*>") ) func init() { common.ReadConfig(&cfg) Mgo = mongodb.NewMgo(cfg.Db.Address, cfg.Db.DbName, cfg.Db.DbSize) Bidding = mongodb.NewMgoWithUser(cfg.Bidding.Address, cfg.Bidding.DbName, cfg.Bidding.UserName, cfg.Bidding.Password, cfg.Bidding.DbSize) // Es = &elastic.Elastic{ // S_esurl: cfg.Es.Address, // I_size: cfg.Es.DbSize, // } // Es.InitElasticSize() Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password) } func runJob() { log.Println("项目匹配定时任务开始------") log.Println("Cfg: ", cfg) NowTime, isOk := getRange(cfg.LastTime) if isOk { EsData, esCount := getEsData(cfg.LastTime, NowTime) log.Println("本次查询到项目数据: ", esCount, " 条") count := FindData(EsData) log.Println("本次迁移数据条数: ", count) cfg.LastTime = NowTime common.WriteSysConfig(cfg) } log.Println("项目匹配定时任务结束------", NowTime) } func getRange(LastTime int64) (int64, bool) { endTime, isOk := int64(0), true esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d"}}}}},"_source":["pici"],"sort":{"pici":"desc"},"from":0,"size":1}` idQuery := fmt.Sprintf(esquery, LastTime) res := Es.Get(cfg.Es.Index, cfg.Es.IType, idQuery) if res != nil && *res != nil && len(*res) == 1 { endTime = common.Int64All((*res)[0]["pici"]) } else { endTime = LastTime isOk = false log.Println("本次任务未查找到数据...", idQuery) } return endTime, isOk } type MySource struct { Querys string } func (m *MySource) Source() (interface{}, error) { mp := make(map[string]interface{}) json.Unmarshal([]byte(m.Querys), &mp) return mp["query"], nil } func getEsData(firstTime, LastTime int64) (map[string]map[string]interface{}, int) { esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d","lt":"%d"}}}}}}` esquery = fmt.Sprintf(esquery, firstTime, LastTime) //查询条件类型转换 // var q es.Query // tmpQuery := es.BoolQuery{} // tmpQuery.QueryStrings = esquery // q = tmpQuery cc := &MySource{ Querys: esquery, } dataMap, numDocs := map[string]map[string]interface{}{}, 0 ctx, _ := context.WithTimeout(context.Background(), 5*time.Minute) esCon := elastic.VarEs.(*elastic.EsV7) client := esCon.GetEsConn() defer esCon.DestoryEsConn(client) res, err := client.Scroll(cfg.Es.Index).Query(cc).Size(200).Do(ctx) //查询一条获取游标 if err == nil { scrollId := res.ScrollId count := 1 for { if scrollId == "" { log.Println("ScrollId Is Error") break } var searchResult *es.SearchResult var err error if count == 1 { searchResult = res } else { searchResult, err = client.Scroll(cfg.Es.Index).Size(200).ScrollId(scrollId).Do(ctx) //查询 if err != nil { if err.Error() == "EOS" { //迭代完毕 log.Println("Es Search Data Over:", err) } else { log.Println("Es Search Data Error:", err) } break } } log.Println("此次处理条数 ", len(searchResult.Hits.Hits)) for _, hit := range searchResult.Hits.Hits { tmp := make(map[string]interface{}) if json.Unmarshal(hit.Source, &tmp) == nil { id := common.ObjToString(tmp["id"]) log.Println("id", id) dataMap[id] = tmp numDocs += 1 } else { log.Println("序列化失败!") } } scrollId = searchResult.ScrollId count++ } client.ClearScroll().ScrollId(scrollId).Do(ctx) //清理游标 log.Println("Result Data Count:", numDocs) } else { log.Println("Es Search Data Error", err) } return dataMap, numDocs } func FindData(data map[string]map[string]interface{}) int { query, count, session := map[string]interface{}{"appid": "jyKClXQQQCAQ5cS0lMIyVC"}, 0, Mgo.GetMgoConn() defer func() { Mgo.DestoryMongoConn(session) }() wg := &sync.WaitGroup{} ch := make(chan bool, 10) iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Sort("_id").Iter() thisData := map[string]interface{}{} for { if !iter.Next(&thisData) { break } id := mongodb.BsonIdToSId(thisData["_id"]) info_id := mongodb.BsonIdToSId(thisData["id"]) appid := mongodb.BsonIdToSId(thisData["appid"]) projectId := common.ObjToString(thisData["projectId"]) if projectId == "" { querystr := `{"query": {"bool": {"must": [{"term": {"projectset.ids": "%s"}}],"must_not": [],"should": []}}}` querystrs := fmt.Sprintf(querystr, id) datas := Es.Get("projectset", "projectset", querystrs) if datas != nil && *datas != nil && len(*datas) > 0 { projectId = common.ObjToString((*datas)[0]["_id"]) Mgo.UpdateById("usermail", id, map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}}) } } if data[projectId] != nil { projectData := data[projectId] wg.Add(1) ch <- true go func(thisData, projectData map[string]interface{}) { defer func() { <-ch wg.Done() }() if projectDataInList, ok := projectData["list"].([]interface{}); ok { for _, v := range projectDataInList { if v_map, oks := v.(map[string]interface{}); oks { infoid := common.ObjToString(v_map["infoid"]) topType := common.ObjToString(v_map["toptype"]) if topType == cfg.Rule { log.Println("匹配到项目结果---", id, "-", projectId) count++ // esData := Es.GetByIdField("bidding", "bidding", infoid, "") esData, _ := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(infoid)}) if esData != nil && len(*esData) > 0 { (*esData)["projectId"] = projectId (*esData)["sourceId"] = info_id (*esData)["id"] = infoid (*esData)["appid"] = appid (*esData)["createtime"] = time.Now().Unix() details := common.ObjToString((*esData)["detail"]) (*esData)["details"] = details (*esData)["detail"] = ClearHtml.ReplaceAllString(details, "") if Mgo.Count(cfg.Db.ColName, map[string]interface{}{"id": infoid}) < 1 { mgoId := Mgo.Save(cfg.Db.ColName, *esData) if mgoId != "" { // delok := Mgo.Del(cfg.Db.TemporaryColName, map[string]interface{}{"_id": thisData["_id"]}) // if delok { // log.Println("新华三定时数据删除成功---", id, "-", projectId, "-", mgoId) // } else { // log.Println("新华三定时数据删除失败!!!", id, "-", projectId, "-", mgoId) // } log.Println("保存到项目接口成功---", id, "-", projectId, "-", mgoId) } else { log.Println("保存到项目接口失败!!!", id, "-", projectId) } } } } } } } }(thisData, projectData) } // thisData = map[string]interface{}{} } wg.Wait() return count } func LtData(data map[string]map[string]interface{}) int { count, session := 0, Mgo.GetMgoConn() stime := time.Now().AddDate(0, 0, -7) BidStartTime := time.Date(stime.Year(), stime.Month(), stime.Day(), 0, 0, 0, 0, stime.Location()).Unix() query := map[string]interface{}{ "appid": "jyGQ1XQQsEAwNeSENOFR9D", "createtime": map[string]interface{}{ "$gt": BidStartTime, "$lte": BidStartTime + 86400*7, }, } defer func() { Mgo.DestoryMongoConn(session) }() wg := &sync.WaitGroup{} ch := make(chan bool, 10) iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Sort("_id").Iter() thisData := map[string]interface{}{} for { if !iter.Next(&thisData) { break } id := mongodb.BsonIdToSId(thisData["_id"]) info_id := mongodb.BsonIdToSId(thisData["id"]) appid := mongodb.BsonIdToSId(thisData["appid"]) projectId := common.ObjToString(thisData["projectId"]) if projectId == "" { querystr := `{"query": {"bool": {"must": [{"term": {"projectset.ids": "%s"}}],"must_not": [],"should": []}}}` querystrs := fmt.Sprintf(querystr, id) datas := Es.Get("projectset", "projectset", querystrs) if datas != nil && *datas != nil && len(*datas) > 0 { projectId = common.ObjToString((*datas)[0]["_id"]) Mgo.UpdateById("usermail", id, map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}}) } } if data[projectId] != nil { projectData := data[projectId] wg.Add(1) ch <- true go func(thisData, projectData map[string]interface{}) { defer func() { <-ch wg.Done() }() if projectDataInList, ok := projectData["list"].([]interface{}); ok { for _, v := range projectDataInList { if v_map, oks := v.(map[string]interface{}); oks { infoid := common.ObjToString(v_map["infoid"]) topType := common.ObjToString(v_map["toptype"]) if topType == cfg.Rule { log.Println("匹配到项目结果---", id, "-", projectId) count++ // esData := Es.GetByIdField("bidding", "bidding", infoid, "") esData, _ := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(infoid)}) if esData != nil && len(*esData) > 0 { (*esData)["projectId"] = projectId (*esData)["sourceId"] = info_id (*esData)["id"] = infoid (*esData)["appid"] = appid (*esData)["createtime"] = time.Now().Unix() details := common.ObjToString((*esData)["detail"]) (*esData)["details"] = details (*esData)["detail"] = ClearHtml.ReplaceAllString(details, "") if Mgo.Count(cfg.Db.ColName, map[string]interface{}{"id": infoid}) < 1 { mgoId := Mgo.Save(cfg.Db.ColName, *esData) if mgoId != "" { // delok := Mgo.Del(cfg.Db.TemporaryColName, map[string]interface{}{"_id": thisData["_id"]}) // if delok { // log.Println("新华三定时数据删除成功---", id, "-", projectId, "-", mgoId) // } else { // log.Println("新华三定时数据删除失败!!!", id, "-", projectId, "-", mgoId) // } log.Println("保存到项目接口成功---", id, "-", projectId, "-", mgoId) } else { log.Println("保存到项目接口失败!!!", id, "-", projectId) } } } } } } } }(thisData, projectData) } // thisData = map[string]interface{}{} } wg.Wait() return count } func getDetails(id string) string { info, ok := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(id)}) details := "" if ok && info != nil && len(*info) > 0 { details = common.ObjToString((*info)["detail"]) } return details } func main() { runJob() c := cron.New() c.AddFunc(cfg.CornExp, func() { runJob() }) c.Start() select {} }