main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "regexp"
  7. "sync"
  8. "time"
  9. "context"
  10. elastic "es"
  11. "mongodb"
  12. common "qfw/util"
  13. es "github.com/olivere/elastic"
  14. "github.com/robfig/cron"
  15. )
  16. var (
  17. Mgo *mongodb.MongodbSim
  18. Bidding *mongodb.MongodbSim
  19. Es elastic.Es
  20. cfg = new(Config)
  21. SEXhs = common.SimpleEncrypt{Key: "topJYBX2019"}
  22. ClearHtml = regexp.MustCompile("<[^>]*>")
  23. )
  24. func init() {
  25. common.ReadConfig(&cfg)
  26. Mgo = mongodb.NewMgo(cfg.Db.Address, cfg.Db.DbName, cfg.Db.DbSize)
  27. Bidding = mongodb.NewMgoWithUser(cfg.Bidding.Address, cfg.Bidding.DbName, cfg.Bidding.UserName, cfg.Bidding.Password, cfg.Bidding.DbSize)
  28. // Es = &elastic.Elastic{
  29. // S_esurl: cfg.Es.Address,
  30. // I_size: cfg.Es.DbSize,
  31. // }
  32. // Es.InitElasticSize()
  33. Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password)
  34. }
  35. func runJob() {
  36. log.Println("项目匹配定时任务开始------")
  37. log.Println("Cfg: ", cfg)
  38. NowTime, isOk := getRange(cfg.LastTime)
  39. if isOk {
  40. EsData, esCount := getEsData(cfg.LastTime, NowTime)
  41. log.Println("本次查询到项目数据: ", esCount, " 条")
  42. count := FindData(EsData)
  43. log.Println("本次迁移数据条数: ", count)
  44. cfg.LastTime = NowTime
  45. common.WriteSysConfig(cfg)
  46. }
  47. log.Println("项目匹配定时任务结束------", NowTime)
  48. }
  49. func getRange(LastTime int64) (int64, bool) {
  50. endTime, isOk := int64(0), true
  51. esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d"}}}}},"_source":["pici"],"sort":{"pici":"desc"},"from":0,"size":1}`
  52. idQuery := fmt.Sprintf(esquery, LastTime)
  53. res := Es.Get(cfg.Es.Index, cfg.Es.IType, idQuery)
  54. if res != nil && *res != nil && len(*res) == 1 {
  55. endTime = common.Int64All((*res)[0]["pici"])
  56. } else {
  57. endTime = LastTime
  58. isOk = false
  59. log.Println("本次任务未查找到数据...", idQuery)
  60. }
  61. return endTime, isOk
  62. }
  63. type MySource struct {
  64. Querys string
  65. }
  66. func (m *MySource) Source() (interface{}, error) {
  67. mp := make(map[string]interface{})
  68. json.Unmarshal([]byte(m.Querys), &mp)
  69. return mp["query"], nil
  70. }
  71. func getEsData(firstTime, LastTime int64) (map[string]map[string]interface{}, int) {
  72. esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d","lt":"%d"}}}}}}`
  73. esquery = fmt.Sprintf(esquery, firstTime, LastTime)
  74. //查询条件类型转换
  75. // var q es.Query
  76. // tmpQuery := es.BoolQuery{}
  77. // tmpQuery.QueryStrings = esquery
  78. // q = tmpQuery
  79. cc := &MySource{
  80. Querys: esquery,
  81. }
  82. dataMap, numDocs := map[string]map[string]interface{}{}, 0
  83. ctx, _ := context.WithTimeout(context.Background(), 5*time.Minute)
  84. esCon := elastic.VarEs.(*elastic.EsV7)
  85. client := esCon.GetEsConn()
  86. defer esCon.DestoryEsConn(client)
  87. res, err := client.Scroll(cfg.Es.Index).Query(cc).Size(200).Do(ctx) //查询一条获取游标
  88. if err == nil {
  89. scrollId := res.ScrollId
  90. count := 1
  91. for {
  92. if scrollId == "" {
  93. log.Println("ScrollId Is Error")
  94. break
  95. }
  96. var searchResult *es.SearchResult
  97. var err error
  98. if count == 1 {
  99. searchResult = res
  100. } else {
  101. searchResult, err = client.Scroll(cfg.Es.Index).Size(200).ScrollId(scrollId).Do(ctx) //查询
  102. if err != nil {
  103. if err.Error() == "EOS" { //迭代完毕
  104. log.Println("Es Search Data Over:", err)
  105. } else {
  106. log.Println("Es Search Data Error:", err)
  107. }
  108. break
  109. }
  110. }
  111. log.Println("此次处理条数 ", len(searchResult.Hits.Hits))
  112. for _, hit := range searchResult.Hits.Hits {
  113. tmp := make(map[string]interface{})
  114. if json.Unmarshal(hit.Source, &tmp) == nil {
  115. id := common.ObjToString(tmp["id"])
  116. log.Println("id", id)
  117. dataMap[id] = tmp
  118. numDocs += 1
  119. } else {
  120. log.Println("序列化失败!")
  121. }
  122. }
  123. scrollId = searchResult.ScrollId
  124. count++
  125. }
  126. client.ClearScroll().ScrollId(scrollId).Do(ctx) //清理游标
  127. log.Println("Result Data Count:", numDocs)
  128. } else {
  129. log.Println("Es Search Data Error", err)
  130. }
  131. return dataMap, numDocs
  132. }
  133. func FindData(data map[string]map[string]interface{}) int {
  134. query, count, session := map[string]interface{}{"appid": "jyKClXQQQCAQ5cS0lMIyVC"}, 0, Mgo.GetMgoConn()
  135. defer func() {
  136. Mgo.DestoryMongoConn(session)
  137. }()
  138. wg := &sync.WaitGroup{}
  139. ch := make(chan bool, 10)
  140. iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Sort("_id").Iter()
  141. thisData := map[string]interface{}{}
  142. for {
  143. if !iter.Next(&thisData) {
  144. break
  145. }
  146. id := mongodb.BsonIdToSId(thisData["_id"])
  147. info_id := mongodb.BsonIdToSId(thisData["id"])
  148. appid := mongodb.BsonIdToSId(thisData["appid"])
  149. projectId := common.ObjToString(thisData["projectId"])
  150. if projectId == "" {
  151. querystr := `{"query": {"bool": {"must": [{"term": {"projectset.ids": "%s"}}],"must_not": [],"should": []}}}`
  152. querystrs := fmt.Sprintf(querystr, id)
  153. datas := Es.Get("projectset", "projectset", querystrs)
  154. if datas != nil && *datas != nil && len(*datas) > 0 {
  155. projectId = common.ObjToString((*datas)[0]["_id"])
  156. Mgo.UpdateById("usermail", id, map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}})
  157. }
  158. }
  159. if data[projectId] != nil {
  160. projectData := data[projectId]
  161. wg.Add(1)
  162. ch <- true
  163. go func(thisData, projectData map[string]interface{}) {
  164. defer func() {
  165. <-ch
  166. wg.Done()
  167. }()
  168. if projectDataInList, ok := projectData["list"].([]interface{}); ok {
  169. for _, v := range projectDataInList {
  170. if v_map, oks := v.(map[string]interface{}); oks {
  171. infoid := common.ObjToString(v_map["infoid"])
  172. topType := common.ObjToString(v_map["toptype"])
  173. if topType == cfg.Rule {
  174. log.Println("匹配到项目结果---", id, "-", projectId)
  175. count++
  176. // esData := Es.GetByIdField("bidding", "bidding", infoid, "")
  177. esData, _ := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(infoid)})
  178. if esData != nil && len(*esData) > 0 {
  179. (*esData)["projectId"] = projectId
  180. (*esData)["sourceId"] = info_id
  181. (*esData)["id"] = infoid
  182. (*esData)["appid"] = appid
  183. (*esData)["createtime"] = time.Now().Unix()
  184. details := common.ObjToString((*esData)["detail"])
  185. (*esData)["details"] = details
  186. (*esData)["detail"] = ClearHtml.ReplaceAllString(details, "")
  187. if Mgo.Count(cfg.Db.ColName, map[string]interface{}{"id": infoid}) < 1 {
  188. mgoId := Mgo.Save(cfg.Db.ColName, *esData)
  189. if mgoId != "" {
  190. // delok := Mgo.Del(cfg.Db.TemporaryColName, map[string]interface{}{"_id": thisData["_id"]})
  191. // if delok {
  192. // log.Println("新华三定时数据删除成功---", id, "-", projectId, "-", mgoId)
  193. // } else {
  194. // log.Println("新华三定时数据删除失败!!!", id, "-", projectId, "-", mgoId)
  195. // }
  196. log.Println("保存到项目接口成功---", id, "-", projectId, "-", mgoId)
  197. } else {
  198. log.Println("保存到项目接口失败!!!", id, "-", projectId)
  199. }
  200. }
  201. }
  202. }
  203. }
  204. }
  205. }
  206. }(thisData, projectData)
  207. }
  208. //
  209. thisData = map[string]interface{}{}
  210. }
  211. wg.Wait()
  212. return count
  213. }
  214. func LtData(data map[string]map[string]interface{}) int {
  215. count, session := 0, Mgo.GetMgoConn()
  216. stime := time.Now().AddDate(0, 0, -7)
  217. BidStartTime := time.Date(stime.Year(), stime.Month(), stime.Day(), 0, 0, 0, 0, stime.Location()).Unix()
  218. query := map[string]interface{}{
  219. "appid": "jyGQ1XQQsEAwNeSENOFR9D",
  220. "createtime": map[string]interface{}{
  221. "$gt": BidStartTime,
  222. "$lte": BidStartTime + 86400*7,
  223. },
  224. }
  225. defer func() {
  226. Mgo.DestoryMongoConn(session)
  227. }()
  228. wg := &sync.WaitGroup{}
  229. ch := make(chan bool, 10)
  230. iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Sort("_id").Iter()
  231. thisData := map[string]interface{}{}
  232. for {
  233. if !iter.Next(&thisData) {
  234. break
  235. }
  236. id := mongodb.BsonIdToSId(thisData["_id"])
  237. info_id := mongodb.BsonIdToSId(thisData["id"])
  238. appid := mongodb.BsonIdToSId(thisData["appid"])
  239. projectId := common.ObjToString(thisData["projectId"])
  240. if projectId == "" {
  241. querystr := `{"query": {"bool": {"must": [{"term": {"projectset.ids": "%s"}}],"must_not": [],"should": []}}}`
  242. querystrs := fmt.Sprintf(querystr, id)
  243. datas := Es.Get("projectset", "projectset", querystrs)
  244. if datas != nil && *datas != nil && len(*datas) > 0 {
  245. projectId = common.ObjToString((*datas)[0]["_id"])
  246. Mgo.UpdateById("usermail", id, map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}})
  247. }
  248. }
  249. if data[projectId] != nil {
  250. projectData := data[projectId]
  251. wg.Add(1)
  252. ch <- true
  253. go func(thisData, projectData map[string]interface{}) {
  254. defer func() {
  255. <-ch
  256. wg.Done()
  257. }()
  258. if projectDataInList, ok := projectData["list"].([]interface{}); ok {
  259. for _, v := range projectDataInList {
  260. if v_map, oks := v.(map[string]interface{}); oks {
  261. infoid := common.ObjToString(v_map["infoid"])
  262. topType := common.ObjToString(v_map["toptype"])
  263. if topType == cfg.Rule {
  264. log.Println("匹配到项目结果---", id, "-", projectId)
  265. count++
  266. // esData := Es.GetByIdField("bidding", "bidding", infoid, "")
  267. esData, _ := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(infoid)})
  268. if esData != nil && len(*esData) > 0 {
  269. (*esData)["projectId"] = projectId
  270. (*esData)["sourceId"] = info_id
  271. (*esData)["id"] = infoid
  272. (*esData)["appid"] = appid
  273. (*esData)["createtime"] = time.Now().Unix()
  274. details := common.ObjToString((*esData)["detail"])
  275. (*esData)["details"] = details
  276. (*esData)["detail"] = ClearHtml.ReplaceAllString(details, "")
  277. if Mgo.Count(cfg.Db.ColName, map[string]interface{}{"id": infoid}) < 1 {
  278. mgoId := Mgo.Save(cfg.Db.ColName, *esData)
  279. if mgoId != "" {
  280. // delok := Mgo.Del(cfg.Db.TemporaryColName, map[string]interface{}{"_id": thisData["_id"]})
  281. // if delok {
  282. // log.Println("新华三定时数据删除成功---", id, "-", projectId, "-", mgoId)
  283. // } else {
  284. // log.Println("新华三定时数据删除失败!!!", id, "-", projectId, "-", mgoId)
  285. // }
  286. log.Println("保存到项目接口成功---", id, "-", projectId, "-", mgoId)
  287. } else {
  288. log.Println("保存到项目接口失败!!!", id, "-", projectId)
  289. }
  290. }
  291. }
  292. }
  293. }
  294. }
  295. }
  296. }(thisData, projectData)
  297. }
  298. //
  299. thisData = map[string]interface{}{}
  300. }
  301. wg.Wait()
  302. return count
  303. }
  304. func getDetails(id string) string {
  305. info, ok := Bidding.FindOne("bidding", map[string]interface{}{"_id": mongodb.StringTOBsonId(id)})
  306. details := ""
  307. if ok && info != nil && len(*info) > 0 {
  308. details = common.ObjToString((*info)["detail"])
  309. }
  310. return details
  311. }
  312. func main() {
  313. runJob()
  314. c := cron.New()
  315. c.AddFunc(cfg.CornExp, func() {
  316. runJob()
  317. })
  318. c.Start()
  319. select {}
  320. }