projectindex.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package main
  2. import (
  3. "strconv"
  4. //"fmt"
  5. "log"
  6. "qfw/util"
  7. elastic "qfw/util/elastic"
  8. mgov "gopkg.in/mgo.v2"
  9. "gopkg.in/mgo.v2/bson"
  10. )
  11. func projectTask(data []byte, project, mapInfo map[string]interface{}) {
  12. defer util.Catch()
  13. q, _ := mapInfo["query"].(map[string]interface{})
  14. if q == nil {
  15. q = map[string]interface{}{
  16. "_id": bson.M{
  17. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  18. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  19. },
  20. }
  21. } else {
  22. if q["pici"] == nil {
  23. idMap, _ := q["_id"].(map[string]interface{})
  24. if idMap != nil {
  25. tmpQ := map[string]interface{}{}
  26. for c, id := range idMap {
  27. if idStr, ok := id.(string); ok && id != "" {
  28. tmpQ[c] = util.StringTOBsonId(idStr)
  29. }
  30. }
  31. q["_id"] = tmpQ
  32. }
  33. }
  34. }
  35. var session *mgov.Session
  36. if project["addr"] != nil {
  37. session = project2db.GetMgoConn(3600)
  38. defer project2db.DestoryMongoConn(session)
  39. } else {
  40. session = extractmgo.GetMgoConn(3600)
  41. defer extractmgo.DestoryMongoConn(session)
  42. }
  43. c, _ := project["collect"].(string)
  44. db, _ := project["db"].(string)
  45. index, _ := project["index"].(string)
  46. itype, _ := project["type"].(string)
  47. count, _ := session.DB(db).C(c).Find(&q).Count()
  48. savepool := make(chan bool, 10)
  49. log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  50. query := session.DB(db).C(c).Find(q).Iter()
  51. arr := make([]map[string]interface{}, savesizei)
  52. var n int
  53. i := 0
  54. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  55. pp := map[string]map[string]interface{}{}
  56. if packages, ok := tmp["package"].(map[string]interface{}); ok {
  57. for _, pks := range packages {
  58. if pk, ok := pks.([]interface{}); ok {
  59. for _, v := range pk {
  60. if p, ok := v.(map[string]interface{}); ok {
  61. winner := util.ObjToString(p["winner"])
  62. bidamount := util.Float64All((p["bidamount"]))
  63. if len(winner) > 4 && bidamount > 0 {
  64. p := map[string]interface{}{
  65. "winner": winner,
  66. "bidamount": bidamount,
  67. }
  68. pp[winner] = p
  69. }
  70. }
  71. }
  72. }
  73. }
  74. } else {
  75. winner := util.ObjToString(tmp["winner"])
  76. bidamount := util.Float64All(tmp["bidamount"])
  77. if len(winner) > 4 && bidamount > 0 {
  78. p := map[string]interface{}{
  79. "winner": winner,
  80. "bidamount": bidamount,
  81. }
  82. pp[winner] = p
  83. }
  84. }
  85. pk1 := []map[string]interface{}{}
  86. for _, v := range pp {
  87. pk1 = append(pk1, v)
  88. }
  89. if len(pk1) > 0 {
  90. tmp["package1"] = pk1
  91. }
  92. budget := util.Float64All(tmp["budget"])
  93. bidamount := util.Float64All(tmp["bidamount"])
  94. if float64(budget) > 0 && float64(bidamount) > 0 {
  95. rate := float64(1) - float64(bidamount)/float64(budget)
  96. f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
  97. //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
  98. if f < 0 || f > 0.6 {
  99. delete(tmp, "bidamount")
  100. tmp["prate_flag"] = 1
  101. } else {
  102. tmp["project_rate"] = f
  103. }
  104. }
  105. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  106. tc := []string{}
  107. m2 := map[string]bool{}
  108. for _, v := range topscopeclass {
  109. str := util.ObjToString(v)
  110. str = reg_letter.ReplaceAllString(str, "") // 去除字母
  111. if !m2[str] {
  112. m2[str] = true
  113. tc = append(tc, str)
  114. }
  115. }
  116. tmp["topscopeclass"] = tc
  117. }
  118. //不生索引字段
  119. delete(tmp, "package")
  120. delete(tmp, "infofield")
  121. list := tmp["list"].([]interface{})
  122. for _, m := range list {
  123. tmpM := m.(map[string]interface{})
  124. //删除purchasing,review_experts
  125. delete(tmpM, "purchasing")
  126. delete(tmpM, "review_experts")
  127. if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
  128. tmpB := util.Float64All(tmpM["bidamount"])
  129. tmpM["bidamount"] = tmpB
  130. }
  131. //projectscope截断
  132. listProjectscopeRune := []rune(util.ObjToString(tmpM["projectscope"]))
  133. if len(listProjectscopeRune) > 1000 {
  134. tmpM["projectscope"] = string(listProjectscopeRune[:1000])
  135. }
  136. }
  137. //projectscope截断
  138. projectscopeRune := []rune(util.ObjToString(tmp["projectscope"]))
  139. if len(projectscopeRune) > 1000 {
  140. tmp["projectscope"] = string(projectscopeRune[:1000])
  141. }
  142. // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  143. // tmp["budget"] = nil
  144. // }
  145. // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  146. // tmp["bidamount"] = nil
  147. // }
  148. //go IS.Add("project")
  149. arr[i] = tmp
  150. n++
  151. if i == savesizei-1 {
  152. savepool <- true
  153. tmps := arr
  154. go func(tmpn *[]map[string]interface{}) {
  155. defer func() {
  156. <-savepool
  157. }()
  158. elastic.BulkSave(index, itype, tmpn, true)
  159. }(&tmps)
  160. i = 0
  161. arr = make([]map[string]interface{}, savesizei)
  162. }
  163. if n%savesizei == 0 {
  164. log.Println("当前:", n)
  165. }
  166. tmp = make(map[string]interface{})
  167. }
  168. if i > 0 {
  169. util.Debug(arr)
  170. elastic.BulkSave(index, itype, &arr, true)
  171. }
  172. log.Println(mapInfo, "create project index...over", n)
  173. }