projectindex.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "qfw/util"
  6. elastic "qfw/util/elastic"
  7. "gopkg.in/mgo.v2/bson"
  8. )
  9. func projectTask(data []byte, mapInfo map[string]interface{}) {
  10. defer util.Catch()
  11. q, _ := mapInfo["query"].(map[string]interface{})
  12. if q == nil {
  13. q = map[string]interface{}{
  14. "_id": bson.M{
  15. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  16. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  17. },
  18. }
  19. }
  20. session := extractmgo.GetMgoConn(3600)
  21. defer extractmgo.DestoryMongoConn(session)
  22. c, _ := project["collect"].(string)
  23. db, _ := project["db"].(string)
  24. index, _ := project["index"].(string)
  25. itype, _ := project["type"].(string)
  26. count, _ := session.DB(db).C(c).Find(&q).Count()
  27. savepool := make(chan bool, 10)
  28. log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  29. query := session.DB(db).C(c).Find(q).Iter()
  30. arr := make([]map[string]interface{}, savesizei)
  31. var n int
  32. i := 0
  33. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  34. delete(tmp, "package")
  35. if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  36. tmp["budget"] = nil
  37. }
  38. if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  39. tmp["bidamount"] = nil
  40. }
  41. go IS.Add("project")
  42. arr[i] = tmp
  43. n++
  44. if i == savesizei-1 {
  45. savepool <- true
  46. tmps := arr
  47. go func(tmpn *[]map[string]interface{}) {
  48. defer func() {
  49. <-savepool
  50. }()
  51. elastic.BulkSave(index, itype, tmpn, true)
  52. }(&tmps)
  53. i = 0
  54. arr = make([]map[string]interface{}, savesizei)
  55. }
  56. if n%savesizei == 0 {
  57. log.Println("当前:", n)
  58. }
  59. tmp = make(map[string]interface{})
  60. }
  61. if i > 0 {
  62. elastic.BulkSave(index, itype, &arr, true)
  63. }
  64. log.Println(mapInfo, "create project index...over", n)
  65. }