|
- package main
- import (
- "encoding/json"
- "fmt"
- "log"
- "regexp"
- "sync"
- "time"
- "context"
- elastic "es"
- "mongodb"
- common "qfw/util"
- es "github.com/olivere/elastic"
- "github.com/robfig/cron"
- )
- var (
- Mgo *mongodb.MongodbSim
- Bidding *mongodb.MongodbSim
- Es elastic.Es
- cfg = new(Config)
- SEXhs = common.SimpleEncrypt{Key: "topJYBX2019"}
- ClearHtml = regexp.MustCompile("<[^>]*>")
- )
- func init() {
- common.ReadConfig(&cfg)
- Mgo = mongodb.NewMgo(cfg.Db.Address, cfg.Db.DbName, cfg.Db.DbSize)
- Bidding = mongodb.NewMgoWithUser(cfg.Bidding.Address, cfg.Bidding.DbName, cfg.Bidding.UserName, cfg.Bidding.Password, cfg.Bidding.DbSize)
- // Es = &elastic.Elastic{
- // S_esurl: cfg.Es.Address,
- // I_size: cfg.Es.DbSize,
- // }
- // Es.InitElasticSize()
- Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password)
- }
- func runJob() {
- log.Println("项目匹配定时任务开始------")
- log.Println("Cfg: ", cfg)
- NowTime, isOk := getRange(cfg.LastTime)
- if isOk {
- EsData, esCount := getEsData(cfg.LastTime, NowTime)
- log.Println("本次查询到项目数据: ", esCount, " 条")
- count := FindData(EsData)
- log.Println("本次迁移数据条数: ", count)
- cfg.LastTime = NowTime
- common.WriteSysConfig(cfg)
- }
- log.Println("项目匹配定时任务结束------", NowTime)
- }
- func getRange(LastTime int64) (int64, bool) {
- endTime, isOk := int64(0), true
- esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d"}}}}},"_source":["pici"],"sort":{"pici":"desc"},"from":0,"size":1}`
- idQuery := fmt.Sprintf(esquery, LastTime)
- res := Es.Get(cfg.Es.Index, cfg.Es.IType, idQuery)
- if res != nil && *res != nil && len(*res) == 1 {
- endTime = common.Int64All((*res)[0]["pici"])
- } else {
- endTime = LastTime
- isOk = false
- log.Println("本次任务未查找到数据...", idQuery)
- }
- return endTime, isOk
- }
- type MySource struct {
- Querys string
- }
- func (m *MySource) Source() (interface{}, error) {
- mp := make(map[string]interface{})
- json.Unmarshal([]byte(m.Querys), &mp)
- return mp["query"], nil
- }
- func getEsData(firstTime, LastTime int64) (map[string]map[string]interface{}, int) {
- esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d","lt":"%d"}}}}}}`
- esquery = fmt.Sprintf(esquery, firstTime, LastTime)
- //查询条件类型转换
- // var q es.Query
- // tmpQuery := es.BoolQuery{}
- // tmpQuery.QueryStrings = esquery
- // q = tmpQuery
- cc := &MySource{
- Querys: esquery,
- }
- dataMap, numDocs := map[string]map[string]interface{}{}, 0
- ctx, _ := context.WithTimeout(context.Background(), 5*time.Minute)
- esCon := elastic.VarEs.(*elastic.EsV7)
- client := esCon.GetEsConn()
- defer esCon.DestoryEsConn(client)
- res, err := client.Scroll(cfg.Es.Index).Query(cc).Size(200).Do(ctx) //查询一条获取游标
- if err == nil {
- scrollId := res.ScrollId
- count := 1
- for {
- if scrollId == "" {
- log.Println("ScrollId Is Error")
- break
- }
- var searchResult *es.SearchResult
- var err error
- if count == 1 {
- searchResult = res
- } else {
- searchResult, err = client.Scroll(cfg.Es.Index).Size(200).ScrollId(scrollId).Do(ctx) //查询
- if err != nil {
- if err.Error() == "EOS" { //迭代完毕
- log.Println("Es Search Data Over:", err)
- } else {
- log.Println("Es Search Data Error:", err)
- }
- break
- }
- }
- log.Println("此次处理条数 ", len(searchResult.Hits.Hits))
- for _, hit := range searchResult.Hits.Hits {
- tmp := make(map[string]interface{})
- if json.Unmarshal(hit.Source, &tmp) == nil {
- id := common.ObjToString(tmp["id"])
- log.Println("id", id)
- dataMap[id] = tmp
- numDocs += 1
- } else {
- log.Println("序列化失败!")
- }
- }
- scrollId = searchResult.ScrollId
- count++
- }
- client.ClearScroll().ScrollId(scrollId).Do(ctx) //清理游标
- log.Println("Result Data Count:", numDocs)
- } else {
- log.Println("Es Search Data Error", err)
- }
- return dataMap, numDocs
- }
- func FindData(data map[string]map[string]interface{}) int {
- query, count, session := map[string]interface{}{"appid": "jyKClXQQQCAQ5cS0lMIyVC"}, 0, Mgo.GetMgoConn()
- defer func() {
- Mgo.DestoryMongoConn(session)
- }()
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 10)
- iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Sort("_id").Iter()
- thisData := map[string]interface{}{}
- for {
- if !iter.Next(&thisData) {
- break
- }
- id := mongodb.BsonIdToSId(thisData["_id"])
- info_id := mongodb.BsonIdToSId(thisData["id"])
- appid := mongodb.BsonIdToSId(thisData["appid"])
- projectId := common.ObjToString(thisData["projectId"])
- if projectId == "" {
- querystr := `{"query": {"bool": {"must": [{"term": {"projectset.ids": "%s"}}],"must_not": [],"should": []}}}`
- querystrs := fmt.Sprintf(querystr, id)
- datas := Es.Get("projectset", "projectset", querystrs)
- if datas != nil && *datas != nil && len(*datas) > 0 {
- projectId = common.ObjToString((*datas)[0]["_id"])
- Mgo.UpdateById("usermail", id, map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}})
- }
- }
- if data[projectId] != nil {
- projectData := data[projectId]
- wg.Add(1)
- ch <- true
- go func(thisData, projectData map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if projectDataInList, ok := projectData["list"].([]interface{}); ok {
- for _, v := range projectDataInList {
- if v_map, oks := v.(map[string]interface{}); oks {
- infoid := common.ObjToString(v_map["infoid"])
- topType := common.ObjToString(v_map["toptype"])
- if topType == cfg.Rule {
- log.Println("匹配到项目结果---", id, "-", projectId)
- count++
- // esData := Es.GetByIdField("bidding", "bidding", infoid, "")
- esData, _ := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(infoid)})
- if esData != nil && len(*esData) > 0 {
- (*esData)["projectId"] = projectId
- (*esData)["sourceId"] = info_id
- (*esData)["id"] = infoid
- (*esData)["appid"] = appid
- (*esData)["createtime"] = time.Now().Unix()
- details := common.ObjToString((*esData)["detail"])
- (*esData)["details"] = details
- (*esData)["detail"] = ClearHtml.ReplaceAllString(details, "")
- if Mgo.Count(cfg.Db.ColName, map[string]interface{}{"id": infoid}) < 1 {
- mgoId := Mgo.Save(cfg.Db.ColName, *esData)
- if mgoId != "" {
- // delok := Mgo.Del(cfg.Db.TemporaryColName, map[string]interface{}{"_id": thisData["_id"]})
- // if delok {
- // log.Println("新华三定时数据删除成功---", id, "-", projectId, "-", mgoId)
- // } else {
- // log.Println("新华三定时数据删除失败!!!", id, "-", projectId, "-", mgoId)
- // }
- log.Println("保存到项目接口成功---", id, "-", projectId, "-", mgoId)
- } else {
- log.Println("保存到项目接口失败!!!", id, "-", projectId)
- }
- }
- }
- }
- }
- }
- }
- }(thisData, projectData)
- }
- //
- thisData = map[string]interface{}{}
- }
- wg.Wait()
- return count
- }
- func LtData(data map[string]map[string]interface{}) int {
- count, session := 0, Mgo.GetMgoConn()
- stime := time.Now().AddDate(0, 0, -7)
- BidStartTime := time.Date(stime.Year(), stime.Month(), stime.Day(), 0, 0, 0, 0, stime.Location()).Unix()
- query := map[string]interface{}{
- "appid": "jyGQ1XQQsEAwNeSENOFR9D",
- "createtime": map[string]interface{}{
- "$gt": BidStartTime,
- "$lte": BidStartTime + 86400*7,
- },
- }
- defer func() {
- Mgo.DestoryMongoConn(session)
- }()
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 10)
- iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Sort("_id").Iter()
- thisData := map[string]interface{}{}
- for {
- if !iter.Next(&thisData) {
- break
- }
- id := mongodb.BsonIdToSId(thisData["_id"])
- info_id := mongodb.BsonIdToSId(thisData["id"])
- appid := mongodb.BsonIdToSId(thisData["appid"])
- projectId := common.ObjToString(thisData["projectId"])
- if projectId == "" {
- querystr := `{"query": {"bool": {"must": [{"term": {"projectset.ids": "%s"}}],"must_not": [],"should": []}}}`
- querystrs := fmt.Sprintf(querystr, id)
- datas := Es.Get("projectset", "projectset", querystrs)
- if datas != nil && *datas != nil && len(*datas) > 0 {
- projectId = common.ObjToString((*datas)[0]["_id"])
- Mgo.UpdateById("usermail", id, map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}})
- }
- }
- if data[projectId] != nil {
- projectData := data[projectId]
- wg.Add(1)
- ch <- true
- go func(thisData, projectData map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if projectDataInList, ok := projectData["list"].([]interface{}); ok {
- for _, v := range projectDataInList {
- if v_map, oks := v.(map[string]interface{}); oks {
- infoid := common.ObjToString(v_map["infoid"])
- topType := common.ObjToString(v_map["toptype"])
- if topType == cfg.Rule {
- log.Println("匹配到项目结果---", id, "-", projectId)
- count++
- // esData := Es.GetByIdField("bidding", "bidding", infoid, "")
- esData, _ := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(infoid)})
- if esData != nil && len(*esData) > 0 {
- (*esData)["projectId"] = projectId
- (*esData)["sourceId"] = info_id
- (*esData)["id"] = infoid
- (*esData)["appid"] = appid
- (*esData)["createtime"] = time.Now().Unix()
- details := common.ObjToString((*esData)["detail"])
- (*esData)["details"] = details
- (*esData)["detail"] = ClearHtml.ReplaceAllString(details, "")
- if Mgo.Count(cfg.Db.ColName, map[string]interface{}{"id": infoid}) < 1 {
- mgoId := Mgo.Save(cfg.Db.ColName, *esData)
- if mgoId != "" {
- // delok := Mgo.Del(cfg.Db.TemporaryColName, map[string]interface{}{"_id": thisData["_id"]})
- // if delok {
- // log.Println("新华三定时数据删除成功---", id, "-", projectId, "-", mgoId)
- // } else {
- // log.Println("新华三定时数据删除失败!!!", id, "-", projectId, "-", mgoId)
- // }
- log.Println("保存到项目接口成功---", id, "-", projectId, "-", mgoId)
- } else {
- log.Println("保存到项目接口失败!!!", id, "-", projectId)
- }
- }
- }
- }
- }
- }
- }
- }(thisData, projectData)
- }
- //
- thisData = map[string]interface{}{}
- }
- wg.Wait()
- return count
- }
- func getDetails(id string) string {
- info, ok := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(id)})
- details := ""
- if ok && info != nil && len(*info) > 0 {
- details = common.ObjToString((*info)["detail"])
- }
- return details
- }
- func main() {
- runJob()
- c := cron.New()
- c.AddFunc(cfg.CornExp, func() {
- runJob()
- })
- c.Start()
- select {}
- }
|