projectindex.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package main
  2. import (
  3. "strconv"
  4. "strings"
  5. //"fmt"
  6. "log"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. mgov "gopkg.in/mgo.v2"
  10. "gopkg.in/mgo.v2/bson"
  11. )
  12. func projectTask(data []byte, project, mapInfo map[string]interface{}) {
  13. defer util.Catch()
  14. q, _ := mapInfo["query"].(map[string]interface{})
  15. if q == nil {
  16. q = map[string]interface{}{
  17. "_id": bson.M{
  18. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  19. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  20. },
  21. }
  22. }
  23. var session *mgov.Session
  24. if project["addr"] != nil {
  25. session = project2db.GetMgoConn(3600)
  26. defer project2db.DestoryMongoConn(session)
  27. } else {
  28. session = extractmgo.GetMgoConn(3600)
  29. defer extractmgo.DestoryMongoConn(session)
  30. }
  31. c, _ := project["collect"].(string)
  32. db, _ := project["db"].(string)
  33. index, _ := project["index"].(string)
  34. itype, _ := project["type"].(string)
  35. count, _ := session.DB(db).C(c).Find(&q).Count()
  36. savepool := make(chan bool, 10)
  37. log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  38. query := session.DB(db).C(c).Find(q).Iter()
  39. arr := make([]map[string]interface{}, savesizei)
  40. var n int
  41. i := 0
  42. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  43. pp := map[string]map[string]interface{}{}
  44. if packages, ok := tmp["package"].(map[string]interface{}); ok {
  45. for _, pks := range packages {
  46. if pk, ok := pks.([]interface{}); ok {
  47. for _, v := range pk {
  48. if p, ok := v.(map[string]interface{}); ok {
  49. winner := util.ObjToString(p["winner"])
  50. bidamount := util.Float64All((p["bidamount"]))
  51. if len(winner) > 4 && bidamount > 0 {
  52. p := map[string]interface{}{
  53. "winner": winner,
  54. "bidamount": bidamount,
  55. }
  56. pp[winner] = p
  57. }
  58. }
  59. }
  60. }
  61. }
  62. } else {
  63. winner := util.ObjToString(tmp["winner"])
  64. bidamount := util.Float64All(tmp["bidamount"])
  65. if len(winner) > 4 && bidamount > 0 {
  66. p := map[string]interface{}{
  67. "winner": winner,
  68. "bidamount": bidamount,
  69. }
  70. pp[winner] = p
  71. }
  72. }
  73. pk1 := []map[string]interface{}{}
  74. for _, v := range pp {
  75. pk1 = append(pk1, v)
  76. }
  77. if len(pk1) > 0 {
  78. tmp["package1"] = pk1
  79. }
  80. budget := util.Float64All(tmp["budget"])
  81. bidamount := util.Float64All(tmp["bidamount"])
  82. if float64(budget) > 0 && float64(bidamount) > 0 {
  83. rate := float64(1) - float64(bidamount)/float64(budget)
  84. f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 2, 64), 64)
  85. tmp["project_rate"] = f
  86. }
  87. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  88. tc := []string{}
  89. for _, v := range topscopeclass {
  90. str := util.ObjToString(v)
  91. str = strings.ReplaceAll(str, "t", "")
  92. str = strings.ReplaceAll(str, "d", "")
  93. str = strings.ReplaceAll(str, "p", "")
  94. tc = append(tc, str)
  95. }
  96. tmp["topscopeclass"] = tc
  97. }
  98. //不生索引字段
  99. delete(tmp, "package")
  100. delete(tmp, "winnerorder")
  101. delete(tmp, "infofield")
  102. list := tmp["list"].([]interface{})
  103. for _, m := range list {
  104. tmpM := m.(map[string]interface{})
  105. if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
  106. tmpB := util.Float64All(tmpM["bidamount"])
  107. tmpM["bidamount"] = tmpB
  108. }
  109. //projectscope截断
  110. listProjectscopeRune := []rune(util.ObjToString(tmpM["projectscope"]))
  111. if len(listProjectscopeRune) > 1000 {
  112. tmpM["projectscope"] = string(listProjectscopeRune[:1000])
  113. }
  114. }
  115. //projectscope截断
  116. projectscopeRune := []rune(util.ObjToString(tmp["projectscope"]))
  117. if len(projectscopeRune) > 1000 {
  118. tmp["projectscope"] = string(projectscopeRune[:1000])
  119. }
  120. // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  121. // tmp["budget"] = nil
  122. // }
  123. // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  124. // tmp["bidamount"] = nil
  125. // }
  126. //go IS.Add("project")
  127. arr[i] = tmp
  128. n++
  129. if i == savesizei-1 {
  130. savepool <- true
  131. tmps := arr
  132. go func(tmpn *[]map[string]interface{}) {
  133. defer func() {
  134. <-savepool
  135. }()
  136. elastic.BulkSave(index, itype, tmpn, true)
  137. }(&tmps)
  138. i = 0
  139. arr = make([]map[string]interface{}, savesizei)
  140. }
  141. if n%savesizei == 0 {
  142. log.Println("当前:", n)
  143. }
  144. tmp = make(map[string]interface{})
  145. }
  146. if i > 0 {
  147. elastic.BulkSave(index, itype, &arr, true)
  148. }
  149. log.Println(mapInfo, "create project index...over", n)
  150. }