package main import ( "encoding/json" "esindex/config" "esindex/oss" "fmt" "github.com/spf13/viper" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "reflect" "regexp" "strconv" "strings" "sync" "time" "unicode/utf8" ) var ( TimeV1 = regexp.MustCompile("(\\d{4})[年.]?$") TimeV2 = regexp.MustCompile("(\\d{4}[年.\\-/]?)(\\d{1,2}[月.\\-/]?$)") TimeClear = regexp.MustCompile("[年|月|/|.|-]") filterSpace = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]") date1 = regexp.MustCompile("20[0-2][0-9][年|\\-/.][0-9]{1,2}[月|\\-/.][0-9]{1,2}[日]?") HtmlReg = regexp.MustCompile("<[^>]+>") ) func biddingTask(mapInfo map[string]interface{}) { defer util.Catch() stype := util.ObjToString(mapInfo["stype"]) if stype == "bidding" { uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])}, "lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}} MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 8, "updatetime": time.Now().Unix()}}, false, true) } q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } else { //针对gte/lte,单独转换 q = convertToMongoID(q) } ch := make(chan bool, 10) wg := &sync.WaitGroup{} //bidding库 biddingConn := MgoB.GetMgoConn() count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count() log.Info("bidding表", zap.Int64("同步总数:", count)) it := biddingConn.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(&q).Select(map[string]interface{}{ "contenthtml": 0, }).Iter() c1, index := 0, 0 var indexLock sync.Mutex for tmp := make(map[string]interface{}); it.Next(tmp); c1++ { if c1%1000 == 0 { log.Info("biddingTask", zap.Int("current:", c1)) log.Info("biddingTask", zap.Any("current:_id =>", tmp["_id"])) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) return } //只针对增量数据处理;全量数据 需要用extracttype字段判断 //7: 重复数据 //8: 不重复 if util.IntAll(tmp["dataprocess"]) != 8 { return } //// 增量数据使用上面判断;全量数据使用下面配置 //-1:重复 ,1:不重复 ,0:入库 9:分类 //if util.IntAll(tmp["extracttype"]) != 1 { // return //} //针对产权数据,暂时不入es 索引库 if util.IntAll(tmp["infoformat"]) == 3 { return } /** 数据抽取时,有的数据的发布时间是之前的,属于增量历史数据,在判重和同步到bidding表是,会添加history_updatetime 字段,所以下面判断才会处理 */ if stype == "bidding_history" && tmp["history_updatetime"] == nil { return } indexLock.Lock() index++ indexLock.Unlock() newTmp, update := GetEsField(tmp, stype) newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段 //针对中国政府采购网,单独处理 if util.ObjToString(tmp["site"]) == "中国政府采购网" { objectType := MatchService(tmp) if objectType != "" { newTmp["object_type"] = objectType } } if len(update) > 0 { updateBiddingPool <- []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update}, } } if util.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" { // 剑鱼信息发布数据 通过udp通知信息发布程序 go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"])) } saveEsPool <- newTmp }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("biddingTask over", zap.Int("count", c1), zap.Int("index", index)) ////发送udp,附件补采 才需要 //data := map[string]interface{}{ // "stype": "update", // "gtid": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), // "lteid": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), //} //target := &net.UDPAddr{ // Port: 1782, // IP: net.ParseIP("127.0.0.1"), //} //bytes, _ := json.Marshal(data) //err := UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target) //if err != nil { // log.Info("biddingTask ", zap.Any("WriteUdp err", err), zap.Any("target", target)) //} // //log.Info("biddingTask ", zap.Any("target", target), zap.Any("data", data)) // // //重采平台需要 //mapInfo["stype"] = "" //datas, _ := json.Marshal(mapInfo) //var next = &net.UDPAddr{ // IP: net.ParseIP("127.0.0.1"), // Port: 1910, //} //log.Info("bidding index es over", zap.Any("es", next), zap.String("mapinfo", string(datas))) } //biddingAllTask 补充存量数据 func biddingAllTask(mapInfo map[string]interface{}) { defer util.Catch() stype := util.ObjToString(mapInfo["stype"]) q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } else { //针对gte/lte,单独转换 q = convertToMongoID(q) } ch := make(chan bool, 50) wg := &sync.WaitGroup{} //bidding库 biddingConn := MgoB.GetMgoConn() it := biddingConn.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(&q).Select(map[string]interface{}{ "contenthtml": 0, }).Iter() c1, index := 0, 0 var indexLock sync.Mutex for tmp := make(map[string]interface{}); it.Next(tmp); c1++ { if c1%20000 == 0 { log.Info("biddingAllTask", zap.Int("current:", c1)) log.Info("biddingAllTask", zap.Any("current:_id =>", tmp["_id"])) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) return } // 针对存量数据,重复数据不进索引 if util.IntAll(tmp["extracttype"]) == -1 { return } //针对产权数据,暂时不入es 索引库 if util.IntAll(tmp["infoformat"]) == 3 { return } indexLock.Lock() index++ indexLock.Unlock() newTmp, update := GetEsField(tmp, stype) //针对中国政府采购网,单独处理 if util.ObjToString(tmp["site"]) == "中国政府采购网" { objectType := MatchService(tmp) if objectType != "" { newTmp["object_type"] = objectType } } newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段 if len(update) > 0 { updateBiddingPool <- []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update}, } } saveEsPool <- newTmp }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("biddingAllTask over", zap.Int("count", c1), zap.Int("index", index)) } //biddingAllDataTask 处理配置文件的存量数据 func biddingAllDataTask() { type Biddingall struct { Coll string Gtid string Lteid string } type RoutinesConf struct { Num int } type AllConf struct { All map[string]Biddingall Routines RoutinesConf } var all AllConf viper.SetConfigFile("biddingall.toml") viper.SetConfigName("biddingall") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") err := viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 fmt.Println("ReadInConfig err =>", err) return } err = viper.Unmarshal(&all) if err != nil { fmt.Println("biddingAllDataTask Unmarshal err =>", err) return } for k, conf := range all.All { go dealData(conf.Coll, conf.Gtid, conf.Lteid, k, all.Routines.Num) } } func dealData(coll, gtid, lteid, kword string, routines int) { ch := make(chan bool, routines) wg := &sync.WaitGroup{} q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid), }, } biddingConn := MgoB.GetMgoConn() it := biddingConn.DB(config.Conf.DB.MongoB.Dbname).C(coll).Find(&q).Select(map[string]interface{}{ "contenthtml": 0, }).Iter() c1, index := 0, 0 var indexLock sync.Mutex for tmp := make(map[string]interface{}); it.Next(tmp); c1++ { if c1%20000 == 0 { log.Info(kword, zap.Int("current:", c1)) log.Info(kword, zap.Any("current:_id =>", tmp["_id"])) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) return } // 针对存量数据,重复数据不进索引 if util.IntAll(tmp["extracttype"]) == -1 { return } //针对产权数据,暂时不入es 索引库 if util.IntAll(tmp["infoformat"]) == 3 { return } indexLock.Lock() index++ indexLock.Unlock() newTmp, update := GetEsField(tmp, "biddingall") //针对中国政府采购网,单独处理 if util.ObjToString(tmp["site"]) == "中国政府采购网" { objectType := MatchService(tmp) if objectType != "" { newTmp["object_type"] = objectType } } newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段 if len(update) > 0 { updateBiddingPool <- []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update}, } } saveEsPool <- newTmp }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info(fmt.Sprintf("%s over", kword), zap.Int("count", c1), zap.Int("index", index)) } func biddingTaskById(mapInfo map[string]interface{}) { defer util.Catch() stype := util.ObjToString(mapInfo["stype"]) infoid := util.ObjToString(mapInfo["infoid"]) tmp, _ := MgoB.FindById(config.Conf.DB.MongoB.Coll, infoid, map[string]interface{}{"contenthtml": 0}) if sensitive := util.ObjToString((*tmp)["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 return } if util.IntAll((*tmp)["extracttype"]) == 1 { newTmp, update := GetEsField(*tmp, stype) newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段 if len(update) > 0 { //updateBiddingPool <- []map[string]interface{}{{ // "_id": mongodb.StringTOBsonId(infoid), //}, // {"$set": update}, //} } saveEsPool <- newTmp } log.Info("biddingTaskById over", zap.Any("mapInfo", mapInfo)) } // GetEsField @Description ES字段 // @Author J 2022/6/7 11:34 AM func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{}, map[string]interface{}) { newTmp := make(map[string]interface{}) update := make(map[string]interface{}) // bidding 修改字段 saveErr := make(map[string]interface{}) //for field, ftype := range config.Conf.DB.Es.FieldEs { for field, ftype := range BiddingField { if tmp[field] != nil { // if field == "purchasinglist" { //标的物处理 purchasinglist_new := []map[string]interface{}{} if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 { for _, ls := range pcl { lsm_new := make(map[string]interface{}) lsm := ls.(map[string]interface{}) for pf, pftype := range BiddingLevelField[field] { lsmv := lsm[pf] if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype { lsm_new[pf] = lsm[pf] } } if lsm_new != nil && len(lsm_new) > 0 { purchasinglist_new = append(purchasinglist_new, lsm_new) } } } if len(purchasinglist_new) > 0 { newTmp[field] = purchasinglist_new } } else if field == "procurementlist" { if tmp["procurementlist"] != nil { var arr []interface{} plist := tmp["procurementlist"].([]interface{}) for _, p := range plist { p1 := p.(map[string]interface{}) p2 := make(map[string]interface{}) for k, v := range BiddingLevelField[field] { if k == "projectname" && util.ObjToString(p1[k]) == "" { p2[k] = util.ObjToString(tmp["projectname"]) } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" { p2[k] = util.ObjToString(tmp["buyer"]) } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" { res := getMethod(util.ObjToString(p1[k])) if res != 0 { p2[k] = res } } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v { p2[k] = p1[k] } } arr = append(arr, p2) } if len(arr) > 0 { newTmp[field] = arr } } } else if field == "projectscope" { ps, _ := tmp["projectscope"].(string) newTmp["projectscope"] = ps //新版本已无需记录长度 //if len(ps) > pscopeLength { // saveErr["projectscope"] = ps // saveErr["projectscope_length"] = len(ps) //} } else if field == "winnerorder" { //中标候选 winnerorder_new := []map[string]interface{}{} if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 { for _, win := range winnerorder { winMap_new := make(map[string]interface{}) winMap := win.(map[string]interface{}) for wf, wftype := range BiddingLevelField[field] { wfv := winMap[wf] if wfv != nil && reflect.TypeOf(wfv).String() == wftype { if wf == "sort" && util.Int64All(wfv) > 100 { continue } winMap_new[wf] = winMap[wf] } } if winMap_new != nil && len(winMap_new) > 0 { winnerorder_new = append(winnerorder_new, winMap_new) } } } if len(winnerorder_new) > 0 { newTmp[field] = winnerorder_new } } else if field == "qualifies" { //项目资质 qs := []string{} if q, _ := tmp[field].([]interface{}); len(q) > 0 { for _, v := range q { v1 := v.(map[string]interface{}) qs = append(qs, util.ObjToString(v1["key"])) } } if len(qs) > 0 { newTmp[field] = strings.Join(qs, ",") } } else if field == "bidopentime" { if tmp[field] != nil && tmp["bidendtime"] == nil { newTmp["bidendtime"] = tmp[field] newTmp[field] = tmp[field] } else if tmp[field] == nil && tmp["bidendtime"] != nil { newTmp["bidendtime"] = tmp[field] newTmp[field] = tmp["bidendtime"] } else { if tmp["bidopentime"] != nil { newTmp[field] = tmp["bidopentime"] } } } else if field == "detail" { //过滤 detail, _ := tmp[field].(string) detail = filterSpace.ReplaceAllString(detail, "") // 不需要再保存记录长度 //if len(detail) > pscopeLength { // saveErr["detail"] = detail // saveErr["detail_length"] = len(detail) //} if tmp["cleartag"] != nil { if tmp["cleartag"].(bool) { text, _ := FilterDetail(detail) newTmp[field] = util.ObjToString(tmp["title"]) + " " + text } else { newTmp[field] = util.ObjToString(tmp["title"]) + " " + detail } } else { text, b := FilterDetail(detail) newTmp[field] = util.ObjToString(tmp["title"]) + " " + text update["cleartag"] = b } } else if field == "topscopeclass" || field == "entidlist" { newTmp[field] = tmp[field] } else if field == "_id" { newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"]) newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"]) } else if field == "publishtime" || field == "comeintime" { //字段类型不正确,特别处理 if tmp[field] != nil && util.Int64All(tmp[field]) > 0 { newTmp[field] = util.Int64All(tmp[field]) } } else if field == "package" { //分包信息处理 packages := dealPackage(tmp) if len(packages) > 0 { newTmp["package"] = packages newTmp["subpackage"] = 1 } } else { //其它字段判断数据类型,不正确舍弃 if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype { continue } else { if fieldval != "" { newTmp[field] = fieldval } } } } } // 附件内容长度不做限制,大于20万字符,只做记录 filetext := getFileText(tmp) if len([]rune(filetext)) > 10 { newTmp["filetext"] = filetext if len([]rune(filetext)) > fileLength { saveErr["filetext"] = filetext saveErr["filetext_length"] = len([]rune(filetext)) } } YuceEndtime(newTmp) // 预测结果时间 if stype == "bidding" || stype == "bidding_history" || stype == "index-by-id" { newTmp["createtime"] = time.Now().Unix() // es库数据创建时间,只有增量数据有 newTmp["pici"] = time.Now().Unix() //createtime跟pici一样,为了剑鱼功能需要,并行存在一段时间,之后可以删掉createtime update["pici"] = time.Now().Unix() } if len(saveErr) > 0 { saveErr["infoid"] = mongodb.BsonIdToSId(tmp["_id"]) saveErr["time"] = time.Now().Unix() saveErrBidPool <- saveErr } return newTmp, update } // @Description 采购意向 预计采购时间处理 // @Author J 2022/6/7 8:04 PM func getMethod(str string) int64 { if TimeV1.MatchString(str) { arr := TimeV1.FindStringSubmatch(str) st := arr[1] + "0000" parseInt, err := strconv.ParseInt(st, 10, 64) if err == nil { return parseInt } } else if TimeV2.MatchString(str) { arr := TimeV2.FindStringSubmatch(str) str1 := arr[2] if len(str1) == 1 { str1 = "0" + str1 } str2 := TimeClear.ReplaceAllString(arr[1], "") + TimeClear.ReplaceAllString(str1, "") + "00" parseInt, err := strconv.ParseInt(str2, 10, 64) if err == nil { return parseInt } } return 0 } func FilterDetail(text string) (string, bool) { b := false // 清理标记 for _, s := range config.Conf.DB.Es.DetailFilter { reg := regexp.MustCompile(s) if reg.MatchString(text) { text = reg.ReplaceAllString(text, "") if !b { b = true } } } return text, b } // @Description 附件内容 // @Author J 2022/6/7 1:54 PM func getFileText(tmp map[string]interface{}) (filetext string) { if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok { for _, tmpData1 := range attchMap { if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok { for _, result := range tmpData2 { if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok { if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" { bs := oss.OssGetObject(attach_url, mongodb.BsonIdToSId(tmp["_id"])) //oss读数据 //附件总长度限制550000,其中最后一个文件长度限制50000 size := config.Conf.DB.Oss.Filesize if size <= 0 { size = 500000 } if utf8.RuneCountInString(filetext+bs) < 50000+size { filetext += bs + "\n" } else { if len(bs) > 50000 { filetext += bs[0:50000] } else { filetext += bs } } //附件总长度限制550000 if utf8.RuneCountInString(filetext) >= 50000+size { return } //正式环境 //if utf8.RuneCountInString(filetext+bs) < fileLength { // filetext += bs + "\n" //} else { // if utf8.RuneCountInString(bs) > fileLength { // filetext = bs[0:fileLength] // } else { // filetext = bs // } // break //} } } } } } } return } // 预测结果时间 func YuceEndtime(tmp map[string]interface{}) { flag := false flag2 := false scope := []string{"信息技术_运维服务", "信息技术_软件开发", "信息技术_系统集成及安全", "信息技术_其他"} titles := []string{"短信服务", "短信发送服务"} details := []string{"短信发送服务", "短信服务平台", "短信服务项目"} subscopeclass := util.ObjToString(tmp["s_subscopeclass"]) //先判断满足 s_subscopeclass 条件 for _, v := range scope { if strings.Contains(subscopeclass, v) { flag = true break } } //满足 s_subscopeclass ,再去判断title detail if flag { title := util.ObjToString(tmp["title"]) for _, v := range titles { if strings.Contains(title, v) { flag2 = true } } if !flag2 { detail := util.ObjToString(tmp["detail"]) for _, v := range details { if strings.Contains(detail, v) { flag2 = true } } } } if !flag2 { return } subtype := util.ObjToString(tmp["subtype"]) if subtype == "成交" || subtype == "合同" { // yucestarttime、yuceendtime yucestarttime, yuceendtime := int64(0), int64(0) // 项目周期中 if util.ObjToString(tmp["projectperiod"]) != "" { dateStr := date1.FindStringSubmatch(util.ObjToString(tmp["projectperiod"])) if len(dateStr) == 2 { sdate := FormatDateStr(dateStr[0]) edate := FormatDateStr(dateStr[1]) if sdate < edate && sdate != 0 && edate != 0 { yucestarttime = sdate yuceendtime = edate } } } if yucestarttime > 0 && yuceendtime > yucestarttime { tmp["yuceendtime"] = yuceendtime return } // 预测开始时间 合同签订日期 if yucestarttime == 0 { if util.IntAll(tmp["signaturedate"]) <= 0 { if util.IntAll(tmp["publishtime"]) <= 0 { return } else { yucestarttime = util.Int64All(tmp["publishtime"]) } } else { yucestarttime = util.Int64All(tmp["signaturedate"]) } } // 预测结束时间 if yucestarttime > 0 && yuceendtime == 0 { if util.IntAll(tmp["project_duration"]) > 0 && util.ObjToString(tmp["project_timeunit"]) != "" { yuceendtime = YcEndTime(yucestarttime, util.IntAll(tmp["project_duration"]), util.ObjToString(tmp["project_timeunit"])) tmp["yuceendtime"] = yuceendtime } } } } func FormatDateStr(ds string) int64 { ds = strings.Replace(ds, "年", "-", -1) ds = strings.Replace(ds, "月", "-", -1) ds = strings.Replace(ds, "日", "", -1) ds = strings.Replace(ds, "/", "-", -1) ds = strings.Replace(ds, ".", "-", -1) location, err := time.ParseInLocation(util.Date_Short_Layout, ds, time.Local) if err != nil { log.Error("FormatDateStr", zap.Error(err)) return 0 } else { return location.Unix() } } func YcEndTime(starttime int64, num int, unit string) int64 { yuceendtime := int64(0) if unit == "日历天" || unit == "天" || unit == "日" { yuceendtime = starttime + int64(num*86400) } else if unit == "周" { yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num*7).Unix() } else if unit == "月" { yuceendtime = time.Unix(starttime, 0).AddDate(0, num, 0).Unix() } else if unit == "年" { yuceendtime = time.Unix(starttime, 0).AddDate(num, 0, 0).Unix() } else if unit == "工作日" { n := num / 7 * 2 yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num+n).Unix() } return yuceendtime } // UdpMethod @Description rpc调用信息发布程序接口 // @Author J 2022/4/13 9:13 AM func UdpMethod(id string) { mapinfo := map[string]interface{}{ "infoid": id, "stype": "jyfb_data_over", } datas, _ := json.Marshal(mapinfo) log.Info("UdpMethod", zap.Any("JyUdpAddr", JyUdpAddr), zap.String("mapinfo", string(datas))) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, JyUdpAddr) } //MatchService 针对中国招标网,匹配关键词打标签,object_type,货物、服务、工程,jsondata.item func MatchService(tmp map[string]interface{}) (res string) { if jsondata, ok := tmp["jsondata"]; ok { if da, ok := jsondata.(map[string]interface{}); ok { if item, ok := da["item"]; ok { services := []string{"货物", "服务", "工程"} for _, v := range services { if strings.Contains(util.ObjToString(item), v) { return v } } } } } return } //dealPackage 处理package 字段 func dealPackage(tmp map[string]interface{}) (newpackages []map[string]interface{}) { package1, ok1 := tmp["package"] s_winner, ok2 := tmp["s_winner"] bidamount, ok3 := tmp["bidamount"] var innerWinners = make([]string, 0) var biaoAmounts = make([]float64, 0) // 三个字段都存在 if ok3 && ok2 && ok1 { packageMap, ok := package1.(map[string]interface{}) if ok { if len(packageMap) >= 2 { var packages = make([]map[string]interface{}, 0) //var newTmp = make(map[string]interface{}) winner_amount_count := 0 for _, pack := range packageMap { var newPackage = make(map[string]interface{}) pac, okk := pack.(map[string]interface{}) if okk { _, okk1 := pac["winner"] _, okk2 := pac["bidamount"] _, okk3 := pac["name"] if okk1 { innerWinners = append(innerWinners, util.ObjToString(pac["winner"])) } if okk2 { biaoAmounts = append(biaoAmounts, util.Float64All(pac["bidamount"])) } //winner bidamount 二个字段都存在 if okk1 && okk2 { winner_amount_count++ newPackage["winner"] = pac["winner"] newPackage["bidamount"] = pac["bidamount"] if okk3 { newPackage["name"] = pac["name"] } packages = append(packages, newPackage) } } } //出现次数大于1 if winner_amount_count > 1 { swinner := util.ObjToString(s_winner) swinners := strings.Split(swinner, ",") //判断里外 winner 是否相等 eq := StringSliceValuesEqual(swinners, innerWinners) if eq { //判断金额相等 if Float64Equal1Precision(Float64SliceSum(biaoAmounts), util.Float64All(bidamount)) { newpackages = packages } } } } } } return }