main.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "time"
  7. "context"
  8. elastic "es"
  9. "mongodb"
  10. common "qfw/util"
  11. es "github.com/olivere/elastic"
  12. "github.com/robfig/cron"
  13. )
  14. var (
  15. Mgo *mongodb.MongodbSim
  16. Bidding *mongodb.MongodbSim
  17. Es elastic.Es
  18. cfg = new(Config)
  19. SEXhs = common.SimpleEncrypt{Key: "topJYBX2019"}
  20. )
  21. func init() {
  22. common.ReadConfig(&cfg)
  23. Mgo = mongodb.NewMgo(cfg.Db.Address, cfg.Db.DbName, cfg.Db.DbSize)
  24. Bidding = mongodb.NewMgoWithUser(cfg.Bidding.Address, cfg.Bidding.DbName, cfg.Bidding.UserName, cfg.Bidding.Password, cfg.Bidding.DbSize)
  25. // Es = &elastic.Elastic{
  26. // S_esurl: cfg.Es.Address,
  27. // I_size: cfg.Es.DbSize,
  28. // }
  29. // Es.InitElasticSize()
  30. Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password)
  31. }
  32. func runJob() {
  33. log.Println("项目匹配定时任务开始------")
  34. log.Println("Cfg: ", cfg)
  35. NowTime, isOk := getRange(cfg.LastTime)
  36. if isOk {
  37. EsData, esCount := getEsData(cfg.LastTime, NowTime)
  38. log.Println("本次查询到项目数据: ", esCount, " 条")
  39. count := FindData(EsData)
  40. log.Println("本次迁移数据条数: ", count)
  41. cfg.LastTime = NowTime
  42. common.WriteSysConfig(cfg)
  43. }
  44. log.Println("项目匹配定时任务结束------", NowTime)
  45. }
  46. func getRange(LastTime int64) (int64, bool) {
  47. endTime, isOk := int64(0), true
  48. esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d"}}}}},"_source":["pici"],"sort":{"pici":"desc"},"from":0,"size":1}`
  49. idQuery := fmt.Sprintf(esquery, LastTime)
  50. res := Es.Get(cfg.Es.Index, cfg.Es.IType, idQuery)
  51. if res != nil && *res != nil && len(*res) == 1 {
  52. endTime = common.Int64All((*res)[0]["pici"])
  53. } else {
  54. endTime = LastTime
  55. isOk = false
  56. log.Println("本次任务未查找到数据...", idQuery)
  57. }
  58. return endTime, isOk
  59. }
  60. type MySource struct {
  61. Querys string
  62. }
  63. func (m *MySource) Source() (interface{}, error) {
  64. mp := make(map[string]interface{})
  65. json.Unmarshal([]byte(m.Querys), &mp)
  66. return mp["query"], nil
  67. }
  68. func getEsData(firstTime, LastTime int64) (map[string]map[string]interface{}, int) {
  69. esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d","lt":"%d"}}}}}}`
  70. esquery = fmt.Sprintf(esquery, firstTime, LastTime)
  71. //查询条件类型转换
  72. // var q es.Query
  73. // tmpQuery := es.BoolQuery{}
  74. // tmpQuery.QueryStrings = esquery
  75. // q = tmpQuery
  76. cc := &MySource{
  77. Querys: esquery,
  78. }
  79. dataMap, numDocs := map[string]map[string]interface{}{}, 0
  80. ctx, _ := context.WithTimeout(context.Background(), 5*time.Minute)
  81. esCon := elastic.VarEs.(*elastic.EsV7)
  82. client := esCon.GetEsConn()
  83. defer esCon.DestoryEsConn(client)
  84. res, err := client.Scroll(cfg.Es.Index).Query(cc).Size(200).Do(ctx) //查询一条获取游标
  85. if err == nil {
  86. scrollId := res.ScrollId
  87. count := 1
  88. for {
  89. if scrollId == "" {
  90. log.Println("ScrollId Is Error")
  91. break
  92. }
  93. var searchResult *es.SearchResult
  94. var err error
  95. if count == 1 {
  96. searchResult = res
  97. } else {
  98. searchResult, err = client.Scroll(cfg.Es.Index).Size(200).ScrollId(scrollId).Do(ctx) //查询
  99. if err != nil {
  100. if err.Error() == "EOS" { //迭代完毕
  101. log.Println("Es Search Data Over:", err)
  102. } else {
  103. log.Println("Es Search Data Error:", err)
  104. }
  105. break
  106. }
  107. }
  108. log.Println("此次处理条数 ", len(searchResult.Hits.Hits))
  109. for _, hit := range searchResult.Hits.Hits {
  110. tmp := make(map[string]interface{})
  111. if json.Unmarshal(hit.Source, &tmp) == nil {
  112. id := common.ObjToString(tmp["id"])
  113. log.Println("id", id)
  114. dataMap[id] = tmp
  115. numDocs += 1
  116. } else {
  117. log.Println("序列化失败!")
  118. }
  119. }
  120. scrollId = searchResult.ScrollId
  121. count++
  122. }
  123. client.ClearScroll().ScrollId(scrollId).Do(ctx) //清理游标
  124. log.Println("Result Data Count:", numDocs)
  125. } else {
  126. log.Println("Es Search Data Error", err)
  127. }
  128. return dataMap, numDocs
  129. }
  130. func FindData(data map[string]map[string]interface{}) int {
  131. query, count, session := map[string]interface{}{"appid": "jyKClXQQQCAQ5cS0lMIyVC"}, 0, Mgo.GetMgoConn()
  132. defer func() {
  133. Mgo.DestoryMongoConn(session)
  134. }()
  135. iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Sort("_id").Iter()
  136. thisData := map[string]interface{}{}
  137. for {
  138. if !iter.Next(&thisData) {
  139. break
  140. }
  141. //
  142. id := mongodb.BsonIdToSId(thisData["_id"])
  143. info_id := mongodb.BsonIdToSId(thisData["id"])
  144. appid := mongodb.BsonIdToSId(thisData["appid"])
  145. projectId := common.ObjToString(thisData["projectId"])
  146. if projectId == "" {
  147. querystr := `{"query": {"bool": {"must": [{"term": {"projectset.ids": "%s"}}],"must_not": [],"should": []}}}`
  148. querystrs := fmt.Sprintf(querystr, id)
  149. data := Es.Get("projectset", "projectset", querystrs)
  150. if data != nil && *data != nil && len(*data) > 0 {
  151. projectId = common.ObjToString((*data)[0]["_id"])
  152. }
  153. }
  154. if data[projectId] != nil {
  155. projectData := data[projectId]
  156. if projectDataInList, ok := projectData["list"].([]interface{}); ok {
  157. for _, v := range projectDataInList {
  158. if v_map, oks := v.(map[string]interface{}); oks {
  159. infoid := common.ObjToString(v_map["infoid"])
  160. topType := common.ObjToString(v_map["toptype"])
  161. if topType == cfg.Rule {
  162. log.Println("匹配到项目结果---", id, "-", projectId)
  163. count++
  164. esData := Es.GetByIdField("bidding", "bidding", infoid, "")
  165. if esData != nil && len(*esData) > 0 {
  166. (*esData)["projectId"] = projectId
  167. (*esData)["sourceId"] = info_id
  168. (*esData)["id"] = infoid
  169. (*esData)["appid"] = appid
  170. (*esData)["createtime"] = time.Now().Unix()
  171. (*esData)["details"] = getDetails(infoid)
  172. mgoId := Mgo.Save(cfg.Db.ColName, *esData)
  173. if mgoId != "" {
  174. // delok := Mgo.Del(cfg.Db.TemporaryColName, map[string]interface{}{"_id": thisData["_id"]})
  175. // if delok {
  176. // log.Println("新华三定时数据删除成功---", id, "-", projectId, "-", mgoId)
  177. // } else {
  178. // log.Println("新华三定时数据删除失败!!!", id, "-", projectId, "-", mgoId)
  179. // }
  180. log.Println("保存到项目接口成功---", id, "-", projectId, "-", mgoId)
  181. } else {
  182. log.Println("保存到项目接口失败!!!", id, "-", projectId)
  183. }
  184. }
  185. }
  186. }
  187. }
  188. }
  189. }
  190. //
  191. thisData = map[string]interface{}{}
  192. }
  193. return count
  194. }
  195. func getDetails(id string) string {
  196. info, ok := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(id)})
  197. details := ""
  198. if ok && info != nil && len(*info) > 0 {
  199. details = common.ObjToString((*info)["detail"])
  200. }
  201. return details
  202. }
  203. func main() {
  204. runJob()
  205. c := cron.New()
  206. c.AddFunc(cfg.CornExp, func() {
  207. runJob()
  208. })
  209. c.Start()
  210. select {}
  211. }