projectall.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/spf13/viper"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "math"
  10. "reflect"
  11. "strconv"
  12. "sync"
  13. )
  14. //projectAllData 处理配置文件的project存量数据
  15. func projectAllData() {
  16. type Biddingall struct {
  17. Coll string
  18. Gtid string
  19. Lteid string
  20. }
  21. type RoutinesConf struct {
  22. Num int
  23. }
  24. type AllConf struct {
  25. All map[string]Biddingall
  26. Routines RoutinesConf
  27. }
  28. var all AllConf
  29. viper.SetConfigFile("projectall.toml")
  30. viper.SetConfigName("projectall") // 配置文件名称(无扩展名)
  31. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  32. viper.AddConfigPath("./")
  33. err := viper.ReadInConfig() // 查找并读取配置文件
  34. if err != nil { // 处理读取配置文件的错误
  35. fmt.Println("ReadInConfig err =>", err)
  36. return
  37. }
  38. err = viper.Unmarshal(&all)
  39. if err != nil {
  40. fmt.Println("biddingAllDataTask Unmarshal err =>", err)
  41. return
  42. }
  43. for k, conf := range all.All {
  44. go dealProject(conf.Coll, conf.Gtid, conf.Lteid, k, all.Routines.Num)
  45. }
  46. }
  47. func dealProject(coll, gtid, lteid, kword string, routines int) {
  48. ch := make(chan bool, routines)
  49. wg := &sync.WaitGroup{}
  50. q := map[string]interface{}{
  51. "_id": map[string]interface{}{
  52. "$gt": mongodb.StringTOBsonId(gtid),
  53. "$lte": mongodb.StringTOBsonId(lteid),
  54. },
  55. }
  56. conn := MgoP.GetMgoConn()
  57. defer MgoP.DestoryMongoConn(conn)
  58. count, _ := conn.DB(MgoP.DbName).C(coll).Find(&q).Count()
  59. log.Info("dealProject", zap.Int64(kword, count))
  60. query := conn.DB(MgoP.DbName).C(coll).Find(q).Iter()
  61. c1, index := 0, 0
  62. for tmp := make(map[string]interface{}); query.Next(tmp); c1++ {
  63. if c1%20000 == 0 {
  64. log.Info(kword, zap.Int("current:", c1))
  65. log.Info(kword, zap.Any("current:_id =>", tmp["_id"]))
  66. }
  67. ch <- true
  68. wg.Add(1)
  69. go func(tmp map[string]interface{}) {
  70. defer func() {
  71. <-ch
  72. wg.Done()
  73. }()
  74. newTmp := make(map[string]interface{})
  75. newTmp["s_projectname"] = tmp["projectname"]
  76. for f, ftype := range ProjectField {
  77. if tmp[f] != nil {
  78. if f == "package" {
  79. pp := map[string]map[string]interface{}{}
  80. if packages, ok := tmp["package"].(map[string]interface{}); ok {
  81. for _, pks := range packages {
  82. if pk, ok := pks.([]interface{}); ok {
  83. for _, v := range pk {
  84. if p, ok := v.(map[string]interface{}); ok {
  85. winner := util.ObjToString(p["winner"])
  86. bidamount := util.Float64All((p["bidamount"]))
  87. if len(winner) > 4 && bidamount > 0 {
  88. p := map[string]interface{}{
  89. "winner": winner,
  90. "bidamount": bidamount,
  91. }
  92. pp[winner] = p
  93. }
  94. }
  95. }
  96. }
  97. }
  98. } else {
  99. winner := util.ObjToString(tmp["winner"])
  100. bidamount := util.Float64All(tmp["bidamount"])
  101. if len(winner) > 4 && bidamount > 0 {
  102. p := map[string]interface{}{
  103. "winner": winner,
  104. "bidamount": bidamount,
  105. }
  106. pp[winner] = p
  107. }
  108. }
  109. pk1 := []map[string]interface{}{}
  110. for _, v := range pp {
  111. pk1 = append(pk1, v)
  112. }
  113. if len(pk1) > 0 {
  114. newTmp["package1"] = pk1
  115. }
  116. } else if f == "topscopeclass" {
  117. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  118. tc := []string{}
  119. m2 := map[string]bool{}
  120. for _, v := range topscopeclass {
  121. str := util.ObjToString(v)
  122. str = regLetter.ReplaceAllString(str, "") // 去除字母
  123. if !m2[str] {
  124. m2[str] = true
  125. tc = append(tc, str)
  126. }
  127. }
  128. newTmp["topscopeclass"] = tc
  129. }
  130. } else if f == "list" {
  131. if list, ok := tmp[f].([]interface{}); ok {
  132. var newList []map[string]interface{}
  133. for _, item := range list {
  134. item1 := item.(map[string]interface{})
  135. listm := make(map[string]interface{})
  136. for f1, ftype1 := range ProjectListF {
  137. if item1[f1] != nil {
  138. if f == "topscopeclass" || f == "subscopeclass" {
  139. listm[f] = item1[f1]
  140. } else {
  141. if fieldval := item1[f1]; reflect.TypeOf(fieldval).String() != ftype1 {
  142. continue
  143. } else {
  144. if fieldval != "" {
  145. listm[f1] = fieldval
  146. }
  147. }
  148. }
  149. }
  150. }
  151. newList = append(newList, listm)
  152. }
  153. newTmp[f] = newList
  154. }
  155. } else if f == "budget" || f == "bidamount" || f == "sortprice" {
  156. if tmp[f] != nil && util.Float64All(tmp[f]) <= 1000000000 {
  157. newTmp[f] = tmp[f]
  158. }
  159. } else if f == "projectscope" {
  160. projectscopeRune := []rune(util.ObjToString(tmp[f]))
  161. if len(projectscopeRune) > 1000 {
  162. newTmp[f] = util.ObjToString(tmp[f])[:1000]
  163. } else {
  164. newTmp[f] = tmp[f]
  165. }
  166. } else if f == "ids" || f == "mpc" || f == "mpn" || f == "review_experts" || f == "winnerorder" ||
  167. f == "entidlist" || f == "first_cooperation" || f == "subscopeclass" {
  168. newTmp[f] = tmp[f]
  169. } else if f == "_id" {
  170. newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"])
  171. newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
  172. } else {
  173. if fieldval := tmp[f]; reflect.TypeOf(fieldval).String() != ftype {
  174. continue
  175. } else {
  176. if fieldval != "" {
  177. newTmp[f] = fieldval
  178. }
  179. }
  180. }
  181. }
  182. }
  183. budget := util.Float64All(newTmp["budget"])
  184. bidamount := util.Float64All(newTmp["bidamount"])
  185. if float64(budget) > 0 && float64(bidamount) > 0 {
  186. rate := float64(1) - float64(bidamount)/float64(budget)
  187. f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
  188. //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
  189. if f < 0 || f > 0.6 {
  190. delete(newTmp, "bidamount")
  191. newTmp["prate_flag"] = 1
  192. } else {
  193. newTmp["project_rate"] = f
  194. }
  195. }
  196. bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
  197. fzb_publishtime := int64(0) //记录第一个招标信息的publishtime
  198. bidcycle_flag := false //判断是否已计算出标书表编制周期
  199. list := tmp["list"].([]interface{})
  200. for _, m := range list {
  201. tmpM := m.(map[string]interface{})
  202. if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
  203. tmpB := util.Float64All(tmpM["bidamount"])
  204. tmpM["bidamount"] = tmpB
  205. }
  206. //计算bidcycle标书表编制周期字段
  207. if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
  208. if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
  209. zb_bidopentime := util.Int64All(tmpM["bidopentime"])
  210. zb_publishtime := util.Int64All(tmpM["publishtime"])
  211. if zb_publishtime > 0 {
  212. if zb_bidopentime > 0 {
  213. if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
  214. f_day := float64(tmpTime) / float64(86400)
  215. day := math.Ceil(f_day)
  216. tmp["bidcycle"] = int(day)
  217. bidcycle_flag = true
  218. }
  219. }
  220. if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
  221. fzb_publishtime = zb_publishtime
  222. }
  223. }
  224. }
  225. }
  226. }
  227. //计算bidcycle标书表编制周期字段
  228. //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
  229. if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
  230. if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
  231. f_day := float64(tmpTime) / float64(86400)
  232. day := math.Ceil(f_day)
  233. newTmp["bidcycle"] = int(day)
  234. }
  235. }
  236. saveProjectEsPool <- newTmp
  237. tmp = make(map[string]interface{})
  238. }(tmp)
  239. tmp = map[string]interface{}{}
  240. }
  241. wg.Wait()
  242. log.Info(fmt.Sprintf("%s over", kword), zap.Int("count", c1), zap.Int("index", index))
  243. }