123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- package main
- import (
- "esindex/config"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "math"
- "reflect"
- "regexp"
- "strconv"
- "strings"
- "sync"
- )
- var (
- regLetter = regexp.MustCompile("[a-z]*")
- )
- func projectTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- } else {
- if q["pici"] == nil {
- idMap, _ := q["_id"].(map[string]interface{})
- if idMap != nil {
- tmpQ := map[string]interface{}{}
- for c, id := range idMap {
- if idStr, ok := id.(string); ok && id != "" {
- tmpQ[c] = mongodb.StringTOBsonId(idStr)
- }
- }
- q["_id"] = tmpQ
- }
- }
- }
- conn := MgoP.GetMgoConn()
- defer MgoP.DestoryMongoConn(conn)
- count, _ := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(&q).Count()
- log.Info("projectTask", zap.String("coll", config.Conf.DB.MongoP.Coll), zap.Any("查询语句:", q), zap.Int64("同步总数:", count))
- query := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(q).Iter()
- n := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- if n%2000 == 0 {
- log.Info("current", zap.Int("count", n))
- log.Info("current", zap.Any("_id", tmp["_id"]))
- }
- newTmp := make(map[string]interface{})
- newTmp["s_projectname"] = tmp["projectname"]
- for f, ftype := range ProjectField {
- if tmp[f] != nil {
- if f == "package" {
- pp := map[string]map[string]interface{}{}
- if packages, ok := tmp["package"].(map[string]interface{}); ok {
- for _, pks := range packages {
- if pk, ok := pks.([]interface{}); ok {
- for _, v := range pk {
- if p, ok := v.(map[string]interface{}); ok {
- winner := util.ObjToString(p["winner"])
- bidamount := util.Float64All((p["bidamount"]))
- if len(winner) > 4 && bidamount > 0 {
- p := map[string]interface{}{
- "winner": winner,
- "bidamount": bidamount,
- }
- pp[winner] = p
- }
- }
- }
- }
- }
- } else {
- winner := util.ObjToString(tmp["winner"])
- bidamount := util.Float64All(tmp["bidamount"])
- if len(winner) > 4 && bidamount > 0 {
- p := map[string]interface{}{
- "winner": winner,
- "bidamount": bidamount,
- }
- pp[winner] = p
- }
- }
- pk1 := []map[string]interface{}{}
- for _, v := range pp {
- pk1 = append(pk1, v)
- }
- if len(pk1) > 0 {
- newTmp["package1"] = pk1
- }
- } else if f == "topscopeclass" {
- if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
- tc := []string{}
- m2 := map[string]bool{}
- for _, v := range topscopeclass {
- str := util.ObjToString(v)
- str = regLetter.ReplaceAllString(str, "") // 去除字母
- if !m2[str] {
- m2[str] = true
- tc = append(tc, str)
- }
- }
- newTmp["topscopeclass"] = tc
- }
- } else if f == "list" {
- if list, ok := tmp[f].([]interface{}); ok {
- var newList []map[string]interface{}
- for _, item := range list {
- item1 := item.(map[string]interface{})
- listm := make(map[string]interface{})
- for f1, ftype1 := range ProjectListF {
- if item1[f1] != nil {
- if f == "topscopeclass" || f == "subscopeclass" {
- listm[f] = item1[f1]
- } else {
- if fieldval := item1[f1]; reflect.TypeOf(fieldval).String() != ftype1 {
- continue
- } else {
- if fieldval != "" {
- listm[f1] = fieldval
- }
- }
- }
- }
- }
- newList = append(newList, listm)
- }
- newTmp[f] = newList
- }
- } else if f == "budget" || f == "bidamount" || f == "sortprice" {
- if tmp[f] != nil && util.Float64All(tmp[f]) <= 1000000000 {
- newTmp[f] = tmp[f]
- }
- } else if f == "projectscope" {
- projectscopeRune := []rune(util.ObjToString(tmp[f]))
- if len(projectscopeRune) > 1000 {
- newTmp[f] = util.ObjToString(tmp[f])[:1000]
- } else {
- newTmp[f] = tmp[f]
- }
- } else if f == "ids" || f == "mpc" || f == "mpn" || f == "review_experts" || f == "winnerorder" ||
- f == "entidlist" || f == "first_cooperation" || f == "subscopeclass" || f == "jgtime" {
- newTmp[f] = tmp[f]
- } else if f == "_id" {
- newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"])
- newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
- } else {
- if fieldval := tmp[f]; reflect.TypeOf(fieldval).String() != ftype && ftype != "" {
- continue
- } else {
- if fieldval != "" {
- newTmp[f] = fieldval
- }
- }
- }
- }
- }
- budget := util.Float64All(newTmp["budget"])
- bidamount := util.Float64All(newTmp["bidamount"])
- if float64(budget) > 0 && float64(bidamount) > 0 {
- rate := float64(1) - float64(bidamount)/float64(budget)
- f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
- //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
- if f < 0 || f > 0.6 {
- delete(newTmp, "bidamount")
- newTmp["prate_flag"] = 1
- } else {
- newTmp["project_rate"] = f
- }
- }
- bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
- fzb_publishtime := int64(0) //记录第一个招标信息的publishtime
- bidcycle_flag := false //判断是否已计算出标书表编制周期
- list := tmp["list"].([]interface{})
- for _, m := range list {
- tmpM := m.(map[string]interface{})
- if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
- tmpB := util.Float64All(tmpM["bidamount"])
- tmpM["bidamount"] = tmpB
- }
- //计算bidcycle标书表编制周期字段
- if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
- if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
- zb_bidopentime := util.Int64All(tmpM["bidopentime"])
- zb_publishtime := util.Int64All(tmpM["publishtime"])
- if zb_publishtime > 0 {
- if zb_bidopentime > 0 {
- if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
- f_day := float64(tmpTime) / float64(86400)
- day := math.Ceil(f_day)
- tmp["bidcycle"] = int(day)
- bidcycle_flag = true
- }
- }
- if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
- fzb_publishtime = zb_publishtime
- }
- }
- }
- }
- }
- //计算bidcycle标书表编制周期字段
- //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
- if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
- if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
- f_day := float64(tmpTime) / float64(86400)
- day := math.Ceil(f_day)
- newTmp["bidcycle"] = int(day)
- }
- }
- //项目名称副标题
- subtitleProjectname := util.ObjToString(tmp["subtitle_projectname"])
- if subtitleProjectname != "" {
- newTmp["subtitle_projectname"] = subtitleProjectname
- if !cache.Contains(uint32(hash(subtitleProjectname))) {
- cache.Add(uint32(hash(subtitleProjectname)))
- cacheModify = true
- }
- } else {
- name := getNewName(tmp)
- if name != "" {
- newTmp["subtitle_projectname"] = name
- update := make(map[string]interface{})
- update["subtitle_projectname"] = name
- res := MgoP.UpdateById(config.Conf.DB.MongoP.Coll, mongodb.BsonIdToSId(tmp["_id"]), map[string]interface{}{"$set": update})
- if !res {
- log.Info("项目数据", zap.Any(mongodb.BsonIdToSId(tmp["_id"]), "项目名称副标题更新失败"))
- }
- }
- }
- saveProjectEsPool <- newTmp
- tmp = make(map[string]interface{})
- }
- log.Info("create project index...over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
- }
- // projectDetailTask 项目索引,添加详情字段
- func projectDetailTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- } else {
- if q["pici"] == nil {
- idMap, _ := q["_id"].(map[string]interface{})
- if idMap != nil {
- tmpQ := map[string]interface{}{}
- for c, id := range idMap {
- if idStr, ok := id.(string); ok && id != "" {
- tmpQ[c] = mongodb.StringTOBsonId(idStr)
- }
- }
- q["_id"] = tmpQ
- }
- }
- }
- conn := MgoP.GetMgoConn()
- defer MgoP.DestoryMongoConn(conn)
- count, _ := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(&q).Count()
- log.Info("projectDetailTask", zap.String("coll", config.Conf.DB.MongoP.Coll), zap.Any("查询语句:", q), zap.Int64("同步总数:", count))
- query := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(q).Iter()
- n := 0
- //
- ch := make(chan bool, 10)
- wg := &sync.WaitGroup{}
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- if n%2000 == 0 {
- log.Info("current", zap.Int("count", n))
- log.Info("current", zap.Any("_id", tmp["_id"]))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- newTmp := make(map[string]interface{})
- newTmp["s_projectname"] = tmp["projectname"]
- for f, ftype := range ProjectField {
- if tmp[f] != nil {
- if f == "package" {
- pp := map[string]map[string]interface{}{}
- if packages, ok := tmp["package"].(map[string]interface{}); ok {
- for _, pks := range packages {
- if pk, ok := pks.([]interface{}); ok {
- for _, v := range pk {
- if p, ok := v.(map[string]interface{}); ok {
- winner := util.ObjToString(p["winner"])
- bidamount := util.Float64All((p["bidamount"]))
- if len(winner) > 4 && bidamount > 0 {
- p := map[string]interface{}{
- "winner": winner,
- "bidamount": bidamount,
- }
- pp[winner] = p
- }
- }
- }
- }
- }
- } else {
- winner := util.ObjToString(tmp["winner"])
- bidamount := util.Float64All(tmp["bidamount"])
- if len(winner) > 4 && bidamount > 0 {
- p := map[string]interface{}{
- "winner": winner,
- "bidamount": bidamount,
- }
- pp[winner] = p
- }
- }
- pk1 := []map[string]interface{}{}
- for _, v := range pp {
- pk1 = append(pk1, v)
- }
- if len(pk1) > 0 {
- newTmp["package1"] = pk1
- }
- } else if f == "topscopeclass" {
- if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
- tc := []string{}
- m2 := map[string]bool{}
- for _, v := range topscopeclass {
- str := util.ObjToString(v)
- str = regLetter.ReplaceAllString(str, "") // 去除字母
- if !m2[str] {
- m2[str] = true
- tc = append(tc, str)
- }
- }
- newTmp["topscopeclass"] = tc
- }
- } else if f == "list" {
- if list, ok := tmp[f].([]interface{}); ok {
- var newList []map[string]interface{}
- for _, item := range list {
- item1 := item.(map[string]interface{})
- listm := make(map[string]interface{})
- for f1, ftype1 := range ProjectListF {
- if item1[f1] != nil {
- if f == "topscopeclass" || f == "subscopeclass" {
- listm[f] = item1[f1]
- } else {
- if fieldval := item1[f1]; reflect.TypeOf(fieldval).String() != ftype1 {
- continue
- } else {
- if fieldval != "" {
- listm[f1] = fieldval
- }
- }
- }
- }
- }
- newList = append(newList, listm)
- }
- newTmp[f] = newList
- }
- } else if f == "budget" || f == "bidamount" || f == "sortprice" {
- if tmp[f] != nil && util.Float64All(tmp[f]) <= 1000000000 {
- newTmp[f] = tmp[f]
- }
- } else if f == "projectscope" {
- projectscopeRune := []rune(util.ObjToString(tmp[f]))
- if len(projectscopeRune) > 1000 {
- newTmp[f] = util.ObjToString(tmp[f])[:1000]
- } else {
- newTmp[f] = tmp[f]
- }
- } else if f == "ids" || f == "mpc" || f == "mpn" || f == "review_experts" || f == "winnerorder" ||
- f == "entidlist" || f == "first_cooperation" || f == "subscopeclass" || f == "jgtime" {
- newTmp[f] = tmp[f]
- } else if f == "_id" {
- newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"])
- newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
- } else {
- if fieldval := tmp[f]; reflect.TypeOf(fieldval).String() != ftype && ftype != "" {
- continue
- } else {
- if fieldval != "" {
- newTmp[f] = fieldval
- }
- }
- }
- }
- }
- budget := util.Float64All(newTmp["budget"])
- bidamount := util.Float64All(newTmp["bidamount"])
- if float64(budget) > 0 && float64(bidamount) > 0 {
- rate := float64(1) - float64(bidamount)/float64(budget)
- f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
- //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
- if f < 0 || f > 0.6 {
- delete(newTmp, "bidamount")
- newTmp["prate_flag"] = 1
- } else {
- newTmp["project_rate"] = f
- }
- }
- bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
- fzb_publishtime := int64(0) //记录第一个招标信息的publishtime
- bidcycle_flag := false //判断是否已计算出标书表编制周期
- //项目详情,辅助字段,处理过的list里面id,英文逗号拼接
- detail_ids := util.ObjToString(tmp["detail_ids"]) //1,2,3
- detailIds := make([]string, 0)
- if detail_ids != "" {
- detailIds = strings.Split(detail_ids, ",") //[1,2,3]
- }
- detail := make([]string, 0) //最终的详情字段
- //todo 回去原索引 详情字段
- projectID := mongodb.BsonIdToSId(tmp["_id"])
- _, oldProject := Es.GetById(config.Conf.DB.Es.IndexPD, projectID)
- if oldProject != nil {
- oldDetail := util.ObjToString(oldProject["detail"])
- if oldDetail != "" {
- old_details := strings.Split(oldDetail, " ")
- detail = append(detail, old_details...)
- }
- }
- //统计现有字符串长度
- totalNumCount := CountChineseCharacters(detail)
- list := tmp["list"].([]interface{})
- for _, m := range list {
- tmpM := m.(map[string]interface{})
- if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
- tmpB := util.Float64All(tmpM["bidamount"])
- tmpM["bidamount"] = tmpB
- }
- //计算bidcycle标书表编制周期字段
- if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
- if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
- zb_bidopentime := util.Int64All(tmpM["bidopentime"])
- zb_publishtime := util.Int64All(tmpM["publishtime"])
- if zb_publishtime > 0 {
- if zb_bidopentime > 0 {
- if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
- f_day := float64(tmpTime) / float64(86400)
- day := math.Ceil(f_day)
- tmp["bidcycle"] = int(day)
- bidcycle_flag = true
- }
- }
- if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
- fzb_publishtime = zb_publishtime
- }
- }
- }
- }
- //todo 处理项目详情 新字段;获取es 已有数据,判断是否需要更新detail
- infoid := util.ObjToString(tmpM["infoid"])
- if infoid != "" && !IsInStringArray(infoid, detailIds) && (totalNumCount < config.Conf.DB.Es.DetailCount) {
- detailIds = append(detailIds, infoid)
- if infoid > "5a862e7040d2d9bbe88e3b1f" {
- biddingData, _ := MgoB.FindById("bidding", infoid, nil)
- biddingDetail := util.ObjToString((*biddingData)["detail"])
- da, _ := CleanHTMLTags(biddingDetail)
- characterArray := SplitTextByChinesePunctuation(da)
- detail = append(detail, RemoveDuplicates(characterArray)...)
- } else {
- biddingData, _ := MgoB.FindById("bidding_back", infoid, nil)
- biddingDetail := util.ObjToString((*biddingData)["detail"])
- da, _ := CleanHTMLTags(biddingDetail)
- characterArray := SplitTextByChinesePunctuation(da)
- detail = append(detail, RemoveDuplicates(characterArray)...)
- }
- }
- }
- if len(detail) > 0 {
- detailNew := RemoveDuplicates(detail)
- newTmp["detail"] = strings.Join(detailNew, " ")
- }
- //计算bidcycle标书表编制周期字段
- //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
- if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
- if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
- f_day := float64(tmpTime) / float64(86400)
- day := math.Ceil(f_day)
- newTmp["bidcycle"] = int(day)
- }
- }
- //todo 这里和上面正常项目索引做了区别,不在单独处理,直接使用数据库数据
- //项目名称副标题
- subtitleProjectname := util.ObjToString(tmp["subtitle_projectname"])
- if subtitleProjectname != "" {
- newTmp["subtitle_projectname"] = subtitleProjectname
- }
- //更新项目表,已经处理过的 标讯id
- if len(detailIds) > 0 {
- new_bidding_ids := strings.Join(detailIds, ",")
- update := map[string]interface{}{
- "detail_ids": new_bidding_ids,
- }
- MgoP.UpdateById(config.Conf.DB.MongoP.Coll, mongodb.BsonIdToSId(tmp["_id"]), map[string]interface{}{"$set": update})
- }
- saveProjectDetailEsPool <- newTmp
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- log.Info("create projectDetailTask index...over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
- }
|