projectindex.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package main
  2. import (
  3. "strconv"
  4. "strings"
  5. //"fmt"
  6. "log"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. mgov "gopkg.in/mgo.v2"
  10. "gopkg.in/mgo.v2/bson"
  11. )
  12. func projectTask(data []byte, project, mapInfo map[string]interface{}) {
  13. defer util.Catch()
  14. q, _ := mapInfo["query"].(map[string]interface{})
  15. if q == nil {
  16. q = map[string]interface{}{
  17. "_id": bson.M{
  18. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  19. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  20. },
  21. }
  22. }
  23. var session *mgov.Session
  24. if project["addr"] != nil {
  25. session = project2db.GetMgoConn(3600)
  26. defer project2db.DestoryMongoConn(session)
  27. } else {
  28. session = extractmgo.GetMgoConn(3600)
  29. defer extractmgo.DestoryMongoConn(session)
  30. }
  31. c, _ := project["collect"].(string)
  32. db, _ := project["db"].(string)
  33. index, _ := project["index"].(string)
  34. itype, _ := project["type"].(string)
  35. count, _ := session.DB(db).C(c).Find(&q).Count()
  36. savepool := make(chan bool, 10)
  37. log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  38. query := session.DB(db).C(c).Find(q).Iter()
  39. arr := make([]map[string]interface{}, savesizei)
  40. var n int
  41. i := 0
  42. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  43. pp := map[string]map[string]interface{}{}
  44. if packages, ok := tmp["package"].(map[string]interface{}); ok {
  45. for _, pks := range packages {
  46. if pk, ok := pks.([]interface{}); ok {
  47. for _, v := range pk {
  48. if p, ok := v.(map[string]interface{}); ok {
  49. winner := util.ObjToString(p["winner"])
  50. bidamount := util.Float64All((p["bidamount"]))
  51. if len(winner) > 4 && bidamount > 0 {
  52. p := map[string]interface{}{
  53. "winner": winner,
  54. "bidamount": bidamount,
  55. }
  56. pp[winner] = p
  57. }
  58. }
  59. }
  60. }
  61. }
  62. } else {
  63. winner := util.ObjToString(tmp["winner"])
  64. bidamount := util.Float64All(tmp["bidamount"])
  65. if len(winner) > 4 && bidamount > 0 {
  66. p := map[string]interface{}{
  67. "winner": winner,
  68. "bidamount": bidamount,
  69. }
  70. pp[winner] = p
  71. }
  72. }
  73. pk1 := []map[string]interface{}{}
  74. for _, v := range pp {
  75. pk1 = append(pk1, v)
  76. }
  77. if len(pk1) > 0 {
  78. tmp["package1"] = pk1
  79. }
  80. budget := util.Float64All(tmp["budget"])
  81. bidamount := util.Float64All(tmp["bidamount"])
  82. if float64(budget) > 0 && float64(bidamount) > 0 {
  83. rate := float64(1) - float64(bidamount)/float64(budget)
  84. f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 2, 64), 64)
  85. tmp["project_rate"] = f
  86. }
  87. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  88. tc := []string{}
  89. for _, v := range topscopeclass {
  90. str := util.ObjToString(v)
  91. str = strings.ReplaceAll(str, "t", "")
  92. str = strings.ReplaceAll(str, "d", "")
  93. str = strings.ReplaceAll(str, "p", "")
  94. tc = append(tc, str)
  95. }
  96. tmp["topscopeclass"] = tc
  97. }
  98. //不生索引字段
  99. delete(tmp, "package")
  100. delete(tmp, "winnerorder")
  101. delete(tmp, "infofield")
  102. delete(tmp, "budgettag")
  103. delete(tmp, "bidamounttag")
  104. list := tmp["list"].([]interface{})
  105. for _, m := range list {
  106. tmpM := m.(map[string]interface{})
  107. if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
  108. tmpB := util.Float64All(tmpM["bidamount"])
  109. tmpM["bidamount"] = tmpB
  110. }
  111. //projectscope截断
  112. listProjectscopeRune := []rune(util.ObjToString(tmpM["projectscope"]))
  113. if len(listProjectscopeRune) > 1000 {
  114. tmpM["projectscope"] = string(listProjectscopeRune[:1000])
  115. }
  116. }
  117. //projectscope截断
  118. projectscopeRune := []rune(util.ObjToString(tmp["projectscope"]))
  119. if len(projectscopeRune) > 1000 {
  120. tmp["projectscope"] = string(projectscopeRune[:1000])
  121. }
  122. // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  123. // tmp["budget"] = nil
  124. // }
  125. // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  126. // tmp["bidamount"] = nil
  127. // }
  128. //go IS.Add("project")
  129. arr[i] = tmp
  130. n++
  131. if i == savesizei-1 {
  132. savepool <- true
  133. tmps := arr
  134. go func(tmpn *[]map[string]interface{}) {
  135. defer func() {
  136. <-savepool
  137. }()
  138. elastic.BulkSave(index, itype, tmpn, true)
  139. }(&tmps)
  140. i = 0
  141. arr = make([]map[string]interface{}, savesizei)
  142. }
  143. if n%savesizei == 0 {
  144. log.Println("当前:", n)
  145. }
  146. tmp = make(map[string]interface{})
  147. }
  148. if i > 0 {
  149. elastic.BulkSave(index, itype, &arr, true)
  150. }
  151. log.Println(mapInfo, "create project index...over", n)
  152. }