|
@@ -0,0 +1,199 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "log"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "mongodb"
|
|
|
+ common "qfw/util"
|
|
|
+ "qfw/util/elastic"
|
|
|
+
|
|
|
+ "github.com/robfig/cron"
|
|
|
+ es "gopkg.in/olivere/elastic.v1"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ Mgo *mongodb.MongodbSim
|
|
|
+ Bidding *mongodb.MongodbSim
|
|
|
+ Es *elastic.Elastic
|
|
|
+ cfg = new(Config)
|
|
|
+ SEXhs = common.SimpleEncrypt{Key: "topJYBX2019"}
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ common.ReadConfig(&cfg)
|
|
|
+ Mgo = mongodb.NewMgo(cfg.Db.Address, cfg.Db.DbName, cfg.Db.DbSize)
|
|
|
+ Bidding = mongodb.NewMgoWithUser(cfg.Bidding.Address, cfg.Bidding.DbName, cfg.Bidding.UserName, cfg.Bidding.Password, cfg.Bidding.DbSize)
|
|
|
+ Es = &elastic.Elastic{
|
|
|
+ S_esurl: cfg.Es.Address,
|
|
|
+ I_size: cfg.Es.DbSize,
|
|
|
+ }
|
|
|
+ Es.InitElasticSize()
|
|
|
+}
|
|
|
+
|
|
|
+func runJob() {
|
|
|
+ log.Println("项目匹配定时任务开始------")
|
|
|
+ log.Println("Cfg: ", cfg)
|
|
|
+ NowTime, isOk := getRange(cfg.LastTime)
|
|
|
+ if isOk {
|
|
|
+ EsData, esCount := getEsData(cfg.LastTime, NowTime)
|
|
|
+ log.Println("本次查询到项目数据: ", esCount, " 条")
|
|
|
+ count := FindData(EsData)
|
|
|
+ log.Println("本次迁移数据条数: ", count)
|
|
|
+ cfg.LastTime = NowTime
|
|
|
+ common.WriteSysConfig(cfg)
|
|
|
+ }
|
|
|
+ log.Println("项目匹配定时任务结束------", NowTime)
|
|
|
+}
|
|
|
+
|
|
|
+func getRange(LastTime int64) (int64, bool) {
|
|
|
+ endTime, isOk := int64(0), true
|
|
|
+ esquery := `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"pici":{"gte":"%d"}}}}}}},"_source":["pici"],"sort":{"pici":"desc"},"from":0,"size":1}`
|
|
|
+ idQuery := fmt.Sprintf(esquery, LastTime)
|
|
|
+ res := Es.Get(cfg.Es.Index, cfg.Es.IType, idQuery)
|
|
|
+ if res != nil && *res != nil && len(*res) == 1 {
|
|
|
+ endTime = common.Int64All((*res)[0]["pici"])
|
|
|
+ } else {
|
|
|
+ endTime = LastTime
|
|
|
+ isOk = false
|
|
|
+ log.Println("本次任务未查找到数据...", idQuery)
|
|
|
+ }
|
|
|
+ return endTime, isOk
|
|
|
+}
|
|
|
+
|
|
|
+func getEsData(firstTime, LastTime int64) (map[string]map[string]interface{}, int) {
|
|
|
+ esquery := `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"pici":{"gte":"%d","lt":"%d"}}}}}}}}`
|
|
|
+ esquery = fmt.Sprintf(esquery, firstTime, LastTime)
|
|
|
+ //查询条件类型转换
|
|
|
+ var q es.Query
|
|
|
+ tmpQuery := es.BoolQuery{}
|
|
|
+ tmpQuery.QueryStrings = esquery
|
|
|
+ q = tmpQuery
|
|
|
+
|
|
|
+ dataMap, numDocs := map[string]map[string]interface{}{}, 0
|
|
|
+ client := Es.GetEsConn()
|
|
|
+ defer Es.DestoryEsConn(client)
|
|
|
+ res, err := client.Scroll(cfg.Es.Index).Query(q).Size(200).Do() //查询一条获取游标
|
|
|
+ if err == nil {
|
|
|
+ scrollId := res.ScrollId
|
|
|
+ for {
|
|
|
+ if scrollId == "" {
|
|
|
+ log.Println("ScrollId Is Error")
|
|
|
+ break
|
|
|
+ }
|
|
|
+ searchResult, err := client.Scroll(cfg.Es.Index).Size(200).ScrollId(scrollId).Do() //查询
|
|
|
+ if err != nil {
|
|
|
+ if err.Error() == "EOS" { //迭代完毕
|
|
|
+ log.Println("Es Search Data Over:", err)
|
|
|
+ } else {
|
|
|
+ log.Println("Es Search Data Error:", err)
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ for _, hit := range searchResult.Hits.Hits {
|
|
|
+ tmp := make(map[string]interface{})
|
|
|
+ if json.Unmarshal(*hit.Source, &tmp) == nil {
|
|
|
+ id := common.ObjToString(tmp["_id"])
|
|
|
+ log.Println("id", id)
|
|
|
+ dataMap[id] = tmp
|
|
|
+ numDocs += 1
|
|
|
+ } else {
|
|
|
+ log.Println("序列化失败!")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ scrollId = searchResult.ScrollId
|
|
|
+ }
|
|
|
+ client.ClearScroll().ScrollId(scrollId).Do() //清理游标
|
|
|
+ log.Println("Result Data Count:", numDocs)
|
|
|
+ } else {
|
|
|
+ log.Println("Es Search Data Error")
|
|
|
+ }
|
|
|
+ return dataMap, numDocs
|
|
|
+}
|
|
|
+
|
|
|
+func FindData(data map[string]map[string]interface{}) int {
|
|
|
+ query, count, session := map[string]interface{}{"appid": "jyKClXQQQCAQ5cS0lMIyVC"}, 0, Mgo.GetMgoConn()
|
|
|
+ defer func() {
|
|
|
+ Mgo.DestoryMongoConn(session)
|
|
|
+ }()
|
|
|
+ iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Sort("_id").Iter()
|
|
|
+ thisData := map[string]interface{}{}
|
|
|
+ for {
|
|
|
+ if !iter.Next(&thisData) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ //
|
|
|
+ id := mongodb.BsonIdToSId(thisData["_id"])
|
|
|
+ info_id := mongodb.BsonIdToSId(thisData["id"])
|
|
|
+ appid := mongodb.BsonIdToSId(thisData["appid"])
|
|
|
+ projectId := common.ObjToString(thisData["projectId"])
|
|
|
+ if projectId == "" {
|
|
|
+ querystr := `{"query": {"bool": {"must": [{"term": {"projectset.ids": "%s"}}],"must_not": [],"should": []}}}`
|
|
|
+ querystrs := fmt.Sprintf(querystr, id)
|
|
|
+ data := Es.Get("projectset", "projectset", querystrs)
|
|
|
+ if data != nil && *data != nil && len(*data) > 0 {
|
|
|
+ projectId = common.ObjToString((*data)[0]["_id"])
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if data[projectId] != nil {
|
|
|
+ projectData := data[projectId]
|
|
|
+ if projectDataInList, ok := projectData["list"].([]interface{}); ok {
|
|
|
+ for _, v := range projectDataInList {
|
|
|
+ if v_map, oks := v.(map[string]interface{}); oks {
|
|
|
+ infoid := common.ObjToString(v_map["infoid"])
|
|
|
+ topType := common.ObjToString(v_map["toptype"])
|
|
|
+ if topType == cfg.Rule {
|
|
|
+ log.Println("匹配到项目结果---", id, "-", projectId)
|
|
|
+ count++
|
|
|
+ esData := Es.GetByIdField("bidding", "bidding", infoid, "")
|
|
|
+ if esData != nil && len(*esData) > 0 {
|
|
|
+ (*esData)["projectId"] = projectId
|
|
|
+ (*esData)["sourceId"] = info_id
|
|
|
+ (*esData)["id"] = infoid
|
|
|
+ (*esData)["appid"] = appid
|
|
|
+ (*esData)["createtime"] = time.Now().Unix()
|
|
|
+ (*esData)["details"] = getDetails(infoid)
|
|
|
+ mgoId := Mgo.Save(cfg.Db.ColName, *esData)
|
|
|
+ if mgoId != "" {
|
|
|
+ // delok := Mgo.Del(cfg.Db.TemporaryColName, map[string]interface{}{"_id": thisData["_id"]})
|
|
|
+ // if delok {
|
|
|
+ // log.Println("新华三定时数据删除成功---", id, "-", projectId, "-", mgoId)
|
|
|
+ // } else {
|
|
|
+ // log.Println("新华三定时数据删除失败!!!", id, "-", projectId, "-", mgoId)
|
|
|
+ // }
|
|
|
+ log.Println("保存到项目接口成功---", id, "-", projectId, "-", mgoId)
|
|
|
+ } else {
|
|
|
+ log.Println("保存到项目接口失败!!!", id, "-", projectId)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //
|
|
|
+ thisData = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ return count
|
|
|
+}
|
|
|
+
|
|
|
+func getDetails(id string) string {
|
|
|
+ info, ok := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(id)})
|
|
|
+ details := ""
|
|
|
+ if ok && info != nil && len(*info) > 0 {
|
|
|
+ details = common.ObjToString((*info)["detail"])
|
|
|
+ }
|
|
|
+ return details
|
|
|
+}
|
|
|
+
|
|
|
+func main() {
|
|
|
+ runJob()
|
|
|
+ c := cron.New()
|
|
|
+ c.AddFunc(cfg.CornExp, func() {
|
|
|
+ runJob()
|
|
|
+ })
|
|
|
+ c.Start()
|
|
|
+ select {}
|
|
|
+}
|