package main import ( "fmt" "github.com/spf13/viper" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "math" "reflect" "strconv" "sync" ) //projectAllData 处理配置文件的project存量数据 func projectAllData() { type Biddingall struct { Coll string Gtid string Lteid string } type RoutinesConf struct { Num int } type AllConf struct { All map[string]Biddingall Routines RoutinesConf } var all AllConf viper.SetConfigFile("projectall.toml") viper.SetConfigName("projectall") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") err := viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 fmt.Println("ReadInConfig err =>", err) return } err = viper.Unmarshal(&all) if err != nil { fmt.Println("biddingAllDataTask Unmarshal err =>", err) return } for k, conf := range all.All { go dealProject(conf.Coll, conf.Gtid, conf.Lteid, k, all.Routines.Num) } } func dealProject(coll, gtid, lteid, kword string, routines int) { ch := make(chan bool, routines) wg := &sync.WaitGroup{} q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid), }, } conn := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(conn) count, _ := conn.DB(MgoP.DbName).C(coll).Find(&q).Count() log.Info("dealProject", zap.Int64(kword, count)) query := conn.DB(MgoP.DbName).C(coll).Find(q).Iter() c1, index := 0, 0 for tmp := make(map[string]interface{}); query.Next(tmp); c1++ { if c1%20000 == 0 { log.Info(kword, zap.Int("current:", c1)) log.Info(kword, zap.Any("current:_id =>", tmp["_id"])) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() newTmp := make(map[string]interface{}) newTmp["s_projectname"] = tmp["projectname"] for f, ftype := range ProjectField { if tmp[f] != nil { if f == "package" { pp := map[string]map[string]interface{}{} if packages, ok := tmp["package"].(map[string]interface{}); ok { for _, pks := range packages { if pk, ok := pks.([]interface{}); ok { for _, v := range pk { if p, ok := v.(map[string]interface{}); ok { winner := util.ObjToString(p["winner"]) bidamount := util.Float64All((p["bidamount"])) if len(winner) > 4 && bidamount > 0 { p := map[string]interface{}{ "winner": winner, "bidamount": bidamount, } pp[winner] = p } } } } } } else { winner := util.ObjToString(tmp["winner"]) bidamount := util.Float64All(tmp["bidamount"]) if len(winner) > 4 && bidamount > 0 { p := map[string]interface{}{ "winner": winner, "bidamount": bidamount, } pp[winner] = p } } pk1 := []map[string]interface{}{} for _, v := range pp { pk1 = append(pk1, v) } if len(pk1) > 0 { newTmp["package1"] = pk1 } } else if f == "topscopeclass" { if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok { tc := []string{} m2 := map[string]bool{} for _, v := range topscopeclass { str := util.ObjToString(v) str = regLetter.ReplaceAllString(str, "") // 去除字母 if !m2[str] { m2[str] = true tc = append(tc, str) } } newTmp["topscopeclass"] = tc } } else if f == "list" { if list, ok := tmp[f].([]interface{}); ok { var newList []map[string]interface{} for _, item := range list { item1 := item.(map[string]interface{}) listm := make(map[string]interface{}) for f1, ftype1 := range ProjectListF { if item1[f1] != nil { if f == "topscopeclass" || f == "subscopeclass" { listm[f] = item1[f1] } else { if fieldval := item1[f1]; reflect.TypeOf(fieldval).String() != ftype1 { continue } else { if fieldval != "" { listm[f1] = fieldval } } } } } newList = append(newList, listm) } newTmp[f] = newList } } else if f == "budget" || f == "bidamount" || f == "sortprice" { if tmp[f] != nil && util.Float64All(tmp[f]) <= 1000000000 { newTmp[f] = tmp[f] } } else if f == "projectscope" { projectscopeRune := []rune(util.ObjToString(tmp[f])) if len(projectscopeRune) > 1000 { newTmp[f] = util.ObjToString(tmp[f])[:1000] } else { newTmp[f] = tmp[f] } } else if f == "ids" || f == "mpc" || f == "mpn" || f == "review_experts" || f == "winnerorder" || f == "entidlist" || f == "first_cooperation" || f == "subscopeclass" { newTmp[f] = tmp[f] } else if f == "_id" { newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"]) newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"]) } else { if fieldval := tmp[f]; reflect.TypeOf(fieldval).String() != ftype { continue } else { if fieldval != "" { newTmp[f] = fieldval } } } } } budget := util.Float64All(newTmp["budget"]) bidamount := util.Float64All(newTmp["bidamount"]) if float64(budget) > 0 && float64(bidamount) > 0 { rate := float64(1) - float64(bidamount)/float64(budget) f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64) //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识 if f < 0 || f > 0.6 { delete(newTmp, "bidamount") newTmp["prate_flag"] = 1 } else { newTmp["project_rate"] = f } } bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期 fzb_publishtime := int64(0) //记录第一个招标信息的publishtime bidcycle_flag := false //判断是否已计算出标书表编制周期 list := tmp["list"].([]interface{}) for _, m := range list { tmpM := m.(map[string]interface{}) if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float tmpB := util.Float64All(tmpM["bidamount"]) tmpM["bidamount"] = tmpB } //计算bidcycle标书表编制周期字段 if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" { zb_bidopentime := util.Int64All(tmpM["bidopentime"]) zb_publishtime := util.Int64All(tmpM["publishtime"]) if zb_publishtime > 0 { if zb_bidopentime > 0 { if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 { f_day := float64(tmpTime) / float64(86400) day := math.Ceil(f_day) tmp["bidcycle"] = int(day) bidcycle_flag = true } } if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime fzb_publishtime = zb_publishtime } } } } } //计算bidcycle标书表编制周期字段 //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算 if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 { if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 { f_day := float64(tmpTime) / float64(86400) day := math.Ceil(f_day) newTmp["bidcycle"] = int(day) } } saveProjectEsPool <- newTmp tmp = make(map[string]interface{}) }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info(fmt.Sprintf("%s over", kword), zap.Int("count", c1), zap.Int("index", index)) }