|
@@ -10,6 +10,8 @@ import (
|
|
|
"reflect"
|
|
|
"regexp"
|
|
|
"strconv"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
)
|
|
|
|
|
|
var (
|
|
@@ -243,3 +245,285 @@ func projectTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
log.Info("create project index...over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
|
|
|
}
|
|
|
+
|
|
|
+// projectDetailTask 项目索引,添加详情字段
|
|
|
+func projectDetailTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
+ defer util.Catch()
|
|
|
+ q, _ := mapInfo["query"].(map[string]interface{})
|
|
|
+ if q == nil {
|
|
|
+ q = map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
+ "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if q["pici"] == nil {
|
|
|
+ idMap, _ := q["_id"].(map[string]interface{})
|
|
|
+ if idMap != nil {
|
|
|
+ tmpQ := map[string]interface{}{}
|
|
|
+ for c, id := range idMap {
|
|
|
+ if idStr, ok := id.(string); ok && id != "" {
|
|
|
+ tmpQ[c] = mongodb.StringTOBsonId(idStr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ q["_id"] = tmpQ
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ conn := MgoP.GetMgoConn()
|
|
|
+ defer MgoP.DestoryMongoConn(conn)
|
|
|
+ count, _ := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(&q).Count()
|
|
|
+ log.Info("projectDetailTask", zap.String("coll", config.Conf.DB.MongoP.Coll), zap.Any("查询语句:", q), zap.Int64("同步总数:", count))
|
|
|
+ query := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(q).Iter()
|
|
|
+ n := 0
|
|
|
+ //
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
|
|
|
+ if n%2000 == 0 {
|
|
|
+ log.Info("current", zap.Int("count", n))
|
|
|
+ log.Info("current", zap.Any("_id", tmp["_id"]))
|
|
|
+ }
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ newTmp := make(map[string]interface{})
|
|
|
+ newTmp["s_projectname"] = tmp["projectname"]
|
|
|
+ for f, ftype := range ProjectField {
|
|
|
+ if tmp[f] != nil {
|
|
|
+ if f == "package" {
|
|
|
+ pp := map[string]map[string]interface{}{}
|
|
|
+ if packages, ok := tmp["package"].(map[string]interface{}); ok {
|
|
|
+ for _, pks := range packages {
|
|
|
+ if pk, ok := pks.([]interface{}); ok {
|
|
|
+ for _, v := range pk {
|
|
|
+ if p, ok := v.(map[string]interface{}); ok {
|
|
|
+ winner := util.ObjToString(p["winner"])
|
|
|
+ bidamount := util.Float64All((p["bidamount"]))
|
|
|
+ if len(winner) > 4 && bidamount > 0 {
|
|
|
+ p := map[string]interface{}{
|
|
|
+ "winner": winner,
|
|
|
+ "bidamount": bidamount,
|
|
|
+ }
|
|
|
+ pp[winner] = p
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ winner := util.ObjToString(tmp["winner"])
|
|
|
+ bidamount := util.Float64All(tmp["bidamount"])
|
|
|
+ if len(winner) > 4 && bidamount > 0 {
|
|
|
+ p := map[string]interface{}{
|
|
|
+ "winner": winner,
|
|
|
+ "bidamount": bidamount,
|
|
|
+ }
|
|
|
+ pp[winner] = p
|
|
|
+ }
|
|
|
+ }
|
|
|
+ pk1 := []map[string]interface{}{}
|
|
|
+ for _, v := range pp {
|
|
|
+ pk1 = append(pk1, v)
|
|
|
+ }
|
|
|
+ if len(pk1) > 0 {
|
|
|
+ newTmp["package1"] = pk1
|
|
|
+ }
|
|
|
+ } else if f == "topscopeclass" {
|
|
|
+ if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
|
|
|
+ tc := []string{}
|
|
|
+ m2 := map[string]bool{}
|
|
|
+ for _, v := range topscopeclass {
|
|
|
+ str := util.ObjToString(v)
|
|
|
+ str = regLetter.ReplaceAllString(str, "") // 去除字母
|
|
|
+ if !m2[str] {
|
|
|
+ m2[str] = true
|
|
|
+ tc = append(tc, str)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ newTmp["topscopeclass"] = tc
|
|
|
+ }
|
|
|
+ } else if f == "list" {
|
|
|
+ if list, ok := tmp[f].([]interface{}); ok {
|
|
|
+ var newList []map[string]interface{}
|
|
|
+ for _, item := range list {
|
|
|
+ item1 := item.(map[string]interface{})
|
|
|
+ listm := make(map[string]interface{})
|
|
|
+ for f1, ftype1 := range ProjectListF {
|
|
|
+ if item1[f1] != nil {
|
|
|
+ if f == "topscopeclass" || f == "subscopeclass" {
|
|
|
+ listm[f] = item1[f1]
|
|
|
+ } else {
|
|
|
+ if fieldval := item1[f1]; reflect.TypeOf(fieldval).String() != ftype1 {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ if fieldval != "" {
|
|
|
+ listm[f1] = fieldval
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ newList = append(newList, listm)
|
|
|
+ }
|
|
|
+ newTmp[f] = newList
|
|
|
+ }
|
|
|
+ } else if f == "budget" || f == "bidamount" || f == "sortprice" {
|
|
|
+ if tmp[f] != nil && util.Float64All(tmp[f]) <= 1000000000 {
|
|
|
+ newTmp[f] = tmp[f]
|
|
|
+ }
|
|
|
+ } else if f == "projectscope" {
|
|
|
+ projectscopeRune := []rune(util.ObjToString(tmp[f]))
|
|
|
+ if len(projectscopeRune) > 1000 {
|
|
|
+ newTmp[f] = util.ObjToString(tmp[f])[:1000]
|
|
|
+ } else {
|
|
|
+ newTmp[f] = tmp[f]
|
|
|
+ }
|
|
|
+ } else if f == "ids" || f == "mpc" || f == "mpn" || f == "review_experts" || f == "winnerorder" ||
|
|
|
+ f == "entidlist" || f == "first_cooperation" || f == "subscopeclass" || f == "jgtime" {
|
|
|
+ newTmp[f] = tmp[f]
|
|
|
+ } else if f == "_id" {
|
|
|
+ newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ } else {
|
|
|
+ if fieldval := tmp[f]; reflect.TypeOf(fieldval).String() != ftype && ftype != "" {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ if fieldval != "" {
|
|
|
+ newTmp[f] = fieldval
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ budget := util.Float64All(newTmp["budget"])
|
|
|
+ bidamount := util.Float64All(newTmp["bidamount"])
|
|
|
+ if float64(budget) > 0 && float64(bidamount) > 0 {
|
|
|
+ rate := float64(1) - float64(bidamount)/float64(budget)
|
|
|
+ f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
|
|
|
+ //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
|
|
|
+ if f < 0 || f > 0.6 {
|
|
|
+ delete(newTmp, "bidamount")
|
|
|
+ newTmp["prate_flag"] = 1
|
|
|
+ } else {
|
|
|
+ newTmp["project_rate"] = f
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
|
|
|
+ fzb_publishtime := int64(0) //记录第一个招标信息的publishtime
|
|
|
+ bidcycle_flag := false //判断是否已计算出标书表编制周期
|
|
|
+
|
|
|
+ //项目详情,辅助字段,处理过的list里面id,英文逗号拼接
|
|
|
+ detail_ids := util.ObjToString(tmp["detail_ids"]) //1,2,3
|
|
|
+ detailIds := make([]string, 0)
|
|
|
+ if detail_ids != "" {
|
|
|
+ detailIds = strings.Split(detail_ids, ",") //[1,2,3]
|
|
|
+ }
|
|
|
+ detail := make([]string, 0) //最终的详情字段
|
|
|
+ //todo 回去原索引 详情字段
|
|
|
+ projectID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ _, oldProject := Es.GetById(config.Conf.DB.Es.IndexPD, projectID)
|
|
|
+ if oldProject != nil {
|
|
|
+ oldDetail := oldProject["detail"]
|
|
|
+ if oddetail, ok := oldDetail.(string); ok {
|
|
|
+ if oddetail != "" {
|
|
|
+ old_details := strings.Split(oddetail, " ")
|
|
|
+ detail = append(detail, old_details...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //统计现有字符串长度
|
|
|
+ totalNumCount := CountChineseCharacters(detail)
|
|
|
+
|
|
|
+ list := tmp["list"].([]interface{})
|
|
|
+ for _, m := range list {
|
|
|
+ tmpM := m.(map[string]interface{})
|
|
|
+ if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
|
|
|
+ tmpB := util.Float64All(tmpM["bidamount"])
|
|
|
+ tmpM["bidamount"] = tmpB
|
|
|
+ }
|
|
|
+ //计算bidcycle标书表编制周期字段
|
|
|
+ if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
|
|
|
+ if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
|
|
|
+ zb_bidopentime := util.Int64All(tmpM["bidopentime"])
|
|
|
+ zb_publishtime := util.Int64All(tmpM["publishtime"])
|
|
|
+ if zb_publishtime > 0 {
|
|
|
+ if zb_bidopentime > 0 {
|
|
|
+ if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
|
|
|
+ f_day := float64(tmpTime) / float64(86400)
|
|
|
+ day := math.Ceil(f_day)
|
|
|
+ tmp["bidcycle"] = int(day)
|
|
|
+ bidcycle_flag = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
|
|
|
+ fzb_publishtime = zb_publishtime
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //todo 处理项目详情 新字段;获取es 已有数据,判断是否需要更新detail
|
|
|
+ infoid := util.ObjToString(tmpM["infoid"])
|
|
|
+ if infoid != "" && !IsInStringArray(infoid, detailIds) && (totalNumCount < config.Conf.DB.Es.DetailCount) {
|
|
|
+ detailIds = append(detailIds, infoid)
|
|
|
+ if infoid > "5a862e7040d2d9bbe88e3b1f" {
|
|
|
+ biddingData, _ := MgoB.FindById("bidding", infoid, nil)
|
|
|
+ biddingDetail := util.ObjToString((*biddingData)["detail"])
|
|
|
+ da, _ := CleanHTMLTags(biddingDetail)
|
|
|
+ characterArray := SplitTextByChinesePunctuation(da)
|
|
|
+ detail = append(detail, RemoveDuplicates(characterArray)...)
|
|
|
+ } else {
|
|
|
+ biddingData, _ := MgoB.FindById("bidding_back", infoid, nil)
|
|
|
+ biddingDetail := util.ObjToString((*biddingData)["detail"])
|
|
|
+ da, _ := CleanHTMLTags(biddingDetail)
|
|
|
+ characterArray := SplitTextByChinesePunctuation(da)
|
|
|
+ detail = append(detail, RemoveDuplicates(characterArray)...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(detail) > 0 {
|
|
|
+ detailNew := RemoveDuplicates(detail)
|
|
|
+ newTmp["detail"] = strings.Join(detailNew, " ")
|
|
|
+ }
|
|
|
+
|
|
|
+ //计算bidcycle标书表编制周期字段
|
|
|
+ //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
|
|
|
+ if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
|
|
|
+ if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
|
|
|
+ f_day := float64(tmpTime) / float64(86400)
|
|
|
+ day := math.Ceil(f_day)
|
|
|
+ newTmp["bidcycle"] = int(day)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //todo 这里和上面正常项目索引做了区别,不在单独处理,直接使用数据库数据
|
|
|
+ //项目名称副标题
|
|
|
+ subtitleProjectname := util.ObjToString(tmp["subtitle_projectname"])
|
|
|
+ if subtitleProjectname != "" {
|
|
|
+ newTmp["subtitle_projectname"] = subtitleProjectname
|
|
|
+ }
|
|
|
+
|
|
|
+ //更新项目表,已经处理过的 标讯id
|
|
|
+ if len(detailIds) > 0 {
|
|
|
+ new_bidding_ids := strings.Join(detailIds, ",")
|
|
|
+ update := map[string]interface{}{
|
|
|
+ "detail_ids": new_bidding_ids,
|
|
|
+ }
|
|
|
+ MgoP.UpdateById(config.Conf.DB.MongoP.Coll, mongodb.BsonIdToSId(tmp["_id"]), map[string]interface{}{"$set": update})
|
|
|
+ }
|
|
|
+
|
|
|
+ saveProjectDetailEsPool <- newTmp
|
|
|
+ }(tmp)
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ log.Info("create projectDetailTask index...over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
|
|
|
+}
|