projectindex.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package main
  2. import (
  3. "log"
  4. "math"
  5. "qfw/util"
  6. "qfw/util/elastic"
  7. "strconv"
  8. )
  9. func projectTask() {
  10. defer util.Catch()
  11. session := mgo.GetMgoConn()
  12. defer mgo.DestoryMongoConn(session)
  13. c, _ := project["collect"].(string)
  14. db, _ := project["db"].(string)
  15. index, _ := project["index"].(string)
  16. itype, _ := project["type"].(string)
  17. count, _ := session.DB(db).C(c).Find(nil).Count()
  18. savepool := make(chan bool, 5)
  19. log.Println(db, c, "查询语句:", nil, "同步总数:", count, "elastic库:", index)
  20. query := session.DB(db).C(c).Find(nil).Iter()
  21. arr := make([]map[string]interface{}, savesize)
  22. var n int
  23. i := 0
  24. for tmp := make(map[string]interface{}); query.Next(tmp); i++ {
  25. if n%5000 == 0 {
  26. log.Println("current---------", n)
  27. }
  28. pp := map[string]map[string]interface{}{}
  29. if packages, ok := tmp["package"].(map[string]interface{}); ok {
  30. for _, pks := range packages {
  31. if pk, ok := pks.([]interface{}); ok {
  32. for _, v := range pk {
  33. if p, ok := v.(map[string]interface{}); ok {
  34. winner := util.ObjToString(p["winner"])
  35. bidamount := util.Float64All((p["bidamount"]))
  36. if len(winner) > 4 && bidamount > 0 {
  37. p := map[string]interface{}{
  38. "winner": winner,
  39. "bidamount": bidamount,
  40. }
  41. pp[winner] = p
  42. }
  43. }
  44. }
  45. }
  46. }
  47. } else {
  48. winner := util.ObjToString(tmp["winner"])
  49. bidamount := util.Float64All(tmp["bidamount"])
  50. if len(winner) > 4 && bidamount > 0 {
  51. p := map[string]interface{}{
  52. "winner": winner,
  53. "bidamount": bidamount,
  54. }
  55. pp[winner] = p
  56. }
  57. }
  58. pk1 := []map[string]interface{}{}
  59. for _, v := range pp {
  60. pk1 = append(pk1, v)
  61. }
  62. if len(pk1) > 0 {
  63. tmp["package1"] = pk1
  64. }
  65. budget := util.Float64All(tmp["budget"])
  66. bidamount := util.Float64All(tmp["bidamount"])
  67. if float64(budget) > 0 && float64(bidamount) > 0 {
  68. rate := float64(1) - float64(bidamount)/float64(budget)
  69. f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
  70. //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
  71. if f < 0 || f > 0.6 {
  72. delete(tmp, "bidamount")
  73. tmp["prate_flag"] = 1
  74. } else {
  75. tmp["project_rate"] = f
  76. }
  77. }
  78. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  79. tc := []string{}
  80. m2 := map[string]bool{}
  81. for _, v := range topscopeclass {
  82. str := util.ObjToString(v)
  83. str = reg_letter.ReplaceAllString(str, "") // 去除字母
  84. if !m2[str] {
  85. m2[str] = true
  86. tc = append(tc, str)
  87. }
  88. }
  89. tmp["topscopeclass"] = tc
  90. }
  91. //不生索引字段
  92. delete(tmp, "package")
  93. delete(tmp, "infofield")
  94. bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
  95. fzb_publishtime := int64(0) //记录第一个招标信息的publishtime
  96. bidcycle_flag := false //判断是否已计算出标书表编制周期
  97. list := tmp["list"].([]interface{})
  98. for _, m := range list {
  99. tmpM := m.(map[string]interface{})
  100. //删除purchasing,review_experts
  101. delete(tmpM, "purchasing")
  102. delete(tmpM, "review_experts")
  103. if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
  104. tmpB := util.Float64All(tmpM["bidamount"])
  105. tmpM["bidamount"] = tmpB
  106. }
  107. //projectscope截断
  108. listProjectscopeRune := []rune(util.ObjToString(tmpM["projectscope"]))
  109. if len(listProjectscopeRune) > 1000 {
  110. tmpM["projectscope"] = string(listProjectscopeRune[:1000])
  111. }
  112. //计算bidcycle标书表编制周期字段
  113. if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
  114. if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
  115. zb_bidopentime := util.Int64All(tmpM["bidopentime"])
  116. zb_publishtime := util.Int64All(tmpM["publishtime"])
  117. if zb_publishtime > 0 {
  118. if zb_bidopentime > 0 {
  119. if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
  120. f_day := float64(tmpTime) / float64(86400)
  121. day := math.Ceil(f_day)
  122. tmp["bidcycle"] = int(day)
  123. bidcycle_flag = true
  124. }
  125. }
  126. if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
  127. fzb_publishtime = zb_publishtime
  128. }
  129. }
  130. }
  131. }
  132. }
  133. //计算bidcycle标书表编制周期字段
  134. //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
  135. if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
  136. if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
  137. f_day := float64(tmpTime) / float64(86400)
  138. day := math.Ceil(f_day)
  139. tmp["bidcycle"] = int(day)
  140. }
  141. }
  142. //projectscope截断
  143. projectscopeRune := []rune(util.ObjToString(tmp["projectscope"]))
  144. if len(projectscopeRune) > 1000 {
  145. tmp["projectscope"] = string(projectscopeRune[:1000])
  146. }
  147. tmp["s_projectname"] = tmp["projectname"]
  148. arr[i] = tmp
  149. n++
  150. if i == savesize-1 {
  151. savepool <- true
  152. tmps := arr
  153. go func(tmpn *[]map[string]interface{}) {
  154. defer func() {
  155. <-savepool
  156. }()
  157. elastic.BulkSave(index, itype, tmpn, true)
  158. }(&tmps)
  159. i = 0
  160. arr = make([]map[string]interface{}, savesize)
  161. }
  162. if n%savesize == 1000 {
  163. log.Println("当前:", n)
  164. }
  165. tmp = make(map[string]interface{})
  166. }
  167. if i > 0 {
  168. //util.Debug(arr)
  169. elastic.BulkSave(index, itype, &arr, true)
  170. }
  171. log.Println("create project index...over", n)
  172. }