package main import ( "encoding/json" "log" "net" "reflect" "regexp" "strconv" "strings" "time" "go.mongodb.org/mongo-driver/bson/primitive" elastic "app.yhyue.com/moapp/jybase/es" "github.com/gogf/gf/v2/util/gconv" "github.com/robfig/cron" common "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" ) var ( Bidding, Mgo *MongodbSim Es elastic.Es Es2 elastic.Es UdpClient udp.UdpClient JyUdpAddr *net.UDPAddr cfg = new(Config) ClearHtml = regexp.MustCompile("<[^>]*>") BiddingField = make(map[string]string) BiddingLevelField = make(map[string]map[string]string) TimeV1 = regexp.MustCompile("^(\\d{4})[年.]?$") TimeV2 = regexp.MustCompile("^(\\d{4})[年./-]?(\\d{1,2})[月./-]?$") TimeV3 = regexp.MustCompile("^(\\d{4})[年./-]?(\\d{1,2})[月./-]?(\\d{1,2})[日]?$") TimeClear = regexp.MustCompile("[年|月|日|/|.|-]") ) type UdpNode struct { data []byte addr *net.UDPAddr timestamp int64 retry int } func init() { common.ReadConfig(&cfg) log.Println("配置文件 ", cfg) Mgo = &MongodbSim{ MongodbAddr: cfg.Mgo.Address, DbName: cfg.Mgo.DbName, Size: cfg.Mgo.DbSize, UserName: cfg.Mgo.UserName, Password: cfg.Mgo.Password, } Mgo.InitPool() Bidding = &MongodbSim{ MongodbAddr: cfg.Bidding.Address, DbName: cfg.Bidding.DbName, Size: cfg.Bidding.DbSize, UserName: cfg.Bidding.UserName, Password: cfg.Bidding.Password, } Bidding.InitPool() // Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password) Es = elastic.NewEs("07", "http://172.17.4.184:19908", 50, "jybid", "Top2023_JEB01i@31") log.Println("初始化完成") // Es2 = elastic.NewEs("07", "http://192.168.3.241:9205,http://192.168.3.149:9200", 20, "", "") JyUdpAddr = &net.UDPAddr{ IP: net.ParseIP(cfg.Udp.JyAddr), Port: cfg.Udp.JyPort, } InitEsBiddingField() UdpClient = udp.UdpClient{Local: ":1799", BufSize: 1024} UdpClient.Listen(processUdpMsg) log.Println("Udp服务监听 port:", ":1799") } func main() { run() c := cron.New() c.AddFunc("0 */10 * * * ?", run) c.Start() ch := make(chan bool, 1) <-ch } func run() { log.Println("开始执行阳光采购", cfg.LastId) session := Mgo.GetMgoConn() lastId := cfg.LastId query := map[string]interface{}{} if lastId != "" { query["_id"] = map[string]interface{}{"$gt": StringTOBsonId(lastId)} } log.Println("query :", query) defer Mgo.DestoryMongoConn(session) count := 0 iter := session.DB("qfw").C("bidding_yg").Find(&query).Sort("_id").Iter() thisData := map[string]interface{}{} for { if !iter.Next(&thisData) { break } count++ if count%500 == 0 { log.Println("COUNT ", count) } id := common.ObjToString(thisData["id"]) source := common.ObjToString(thisData["source"]) data := Bidding.FindById("bidding", id) if data != nil && len(data) > 0 { public_type := "平台发布" domain_firsttype, domain_secondtype, domain_thirdtype := "", "", "" if data["gov_classify"] != nil { if gov_classify, ok := data["gov_classify"].(map[string]interface{}); ok { root := common.ObjToString(gov_classify["root"]) if root != "" { rootArr := strings.Split(root, "/") domain_firsttype = rootArr[0] if len(rootArr) > 1 { domain_secondtype = rootArr[1] } if len(rootArr) > 2 { domain_thirdtype = rootArr[2] } } } } newData := GetEsField(data) updateData := map[string]interface{}{ "domain_firsttype": domain_firsttype, "domain_secondtype": domain_secondtype, "domain_thirdtype": domain_thirdtype, } deliver_area, deliver_city, deliver_district := data["area"], data["city"], data["district"] if source == "user" { public_type = "用户发布" deliver_area, deliver_city, deliver_district = data["deliver_area"], data["deliver_city"], data["deliver_district"] } else { updateData["deliver_area"] = deliver_area updateData["deliver_city"] = deliver_city updateData["deliver_district"] = deliver_district } if source == "is_yg_new" { newData["is_yg_new"] = 1 } newData["deliver_area"] = deliver_area newData["deliver_city"] = deliver_city newData["deliver_district"] = deliver_district newData["domain_firsttype"] = domain_firsttype newData["domain_secondtype"] = domain_secondtype newData["domain_thirdtype"] = domain_thirdtype newData["public_type"] = public_type newData["source_id"] = id Bidding.UpdateById("bidding", id, map[string]interface{}{"$set": updateData}) if newData["purchasinglist"] != nil { if purchasinglists, ok := newData["purchasinglist"].(primitive.A); ok { purchasinglist := common.ObjArrToMapArr(purchasinglists) if source == "user" && len(purchasinglist) > 1 { newDatas := newData itemMap := map[string]string{} itemArr := []string{} for _, v := range purchasinglist { itemname := common.ObjToString(v["itemname"]) if itemname != "" { itemMap[itemname] = "1" } } for k, _ := range itemMap { itemArr = append(itemArr, k) } citys := gconv.String(deliver_city) if citys == "" { citys = gconv.String(deliver_area) } newDatas["title"] = strings.Join(itemArr, "/") + "-" + citys mid := primitive.NewObjectID() newDatas["_id"] = mid Mgo.Save("bidding_yg_info", newDatas) newDatas["_id"] = mid.Hex() Es.Save("bidding_yg", "", newDatas) log.Println("保存成功", mid) } else { for _, v := range purchasinglist { newDatas := newData itemname := common.ObjToString(v["itemname"]) if itemname != "" { citys := gconv.String(deliver_city) if citys == "" { citys = gconv.String(deliver_area) } if gconv.String(v["unitname"]) != "" && gconv.String(v["number"]) != "" { newDatas["title"] = itemname + "-" + gconv.String(v["number"]) + gconv.String(v["unitname"]) + "-" + citys } else { newDatas["title"] = itemname + "-" + citys } } mid := primitive.NewObjectID() newDatas["_id"] = mid Mgo.Save("bidding_yg_info", newDatas) newDatas["_id"] = mid.Hex() Es.Save("bidding_yg", "", newDatas) log.Println("保存成功", mid) } } } else if purchasinglists, ok := newData["purchasinglist"].([]map[string]interface{}); ok { purchasinglist := purchasinglists if source == "user" && len(purchasinglist) > 1 { newDatas := newData itemMap := map[string]string{} itemArr := []string{} for _, v := range purchasinglist { itemname := common.ObjToString(v["itemname"]) if itemname != "" { itemMap[itemname] = "1" } } for k, _ := range itemMap { itemArr = append(itemArr, k) } citys := gconv.String(deliver_city) if citys == "" { citys = gconv.String(deliver_area) } newDatas["title"] = strings.Join(itemArr, "/") + "-" + citys mid := primitive.NewObjectID() newDatas["_id"] = mid Mgo.Save("bidding_yg_info", newDatas) newDatas["_id"] = mid.Hex() Es.Save("bidding_yg", "", newDatas) log.Println("保存成功", mid) } else { for _, v := range purchasinglist { newDatas := newData itemname := common.ObjToString(v["itemname"]) if itemname != "" { citys := gconv.String(deliver_city) if citys == "" { citys = gconv.String(deliver_area) } if gconv.String(v["unitname"]) != "" && gconv.String(v["number"]) != "" { newDatas["title"] = itemname + "-" + gconv.String(v["number"]) + gconv.String(v["unitname"]) + "-" + citys } else { newDatas["title"] = itemname + "-" + citys } } mid := primitive.NewObjectID() newDatas["_id"] = mid Mgo.Save("bidding_yg_info", newDatas) newDatas["_id"] = mid.Hex() Es.Save("bidding_yg", "", newDatas) log.Println("保存成功", mid) } } } } else { mid := primitive.NewObjectID() newData["_id"] = mid Mgo.Save("bidding_yg_info", newData) newData["_id"] = mid.Hex() Es.Save("bidding_yg", "", newData) log.Println("保存成功", mid) } if source == "user" { mapinfo := map[string]interface{}{ "infoid": id, "stype": "jyfb_data_over", } datas, _ := json.Marshal(mapinfo) log.Println("信息发布成功", JyUdpAddr, "mapinfo", string(datas)) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, JyUdpAddr) } } cfg.LastId = BsonTOStringId(thisData["_id"]) thisData = map[string]interface{}{} } common.WriteSysConfig(&cfg) log.Println("阳光采购结束", cfg.LastId) } func GetEsField(tmp map[string]interface{}) map[string]interface{} { newTmp := make(map[string]interface{}) for field, ftype := range BiddingField { if tmp[field] != nil { // if field == "purchasinglist" { //标的物处理 purchasinglist_new := []map[string]interface{}{} if pcl, _ := tmp[field].(primitive.A); len(pcl) > 0 { for _, ls := range pcl { lsm_new := make(map[string]interface{}) lsm := ls.(map[string]interface{}) for pf, pftype := range BiddingLevelField[field] { lsmv := lsm[pf] if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype { lsm_new[pf] = lsm[pf] } } if lsm_new != nil && len(lsm_new) > 0 { purchasinglist_new = append(purchasinglist_new, lsm_new) } } } if len(purchasinglist_new) > 0 { newTmp[field] = purchasinglist_new } } else if field == "procurementlist" { if tmp["procurementlist"] != nil { var arr []interface{} plist := tmp["procurementlist"].(primitive.A) for _, p := range plist { p1 := p.(map[string]interface{}) p2 := make(map[string]interface{}) for k, v := range BiddingLevelField[field] { if k == "projectname" && common.ObjToString(p1[k]) == "" { p2[k] = common.ObjToString(tmp["projectname"]) } else if k == "buyer" && common.ObjToString(p1[k]) == "" && common.ObjToString(tmp["buyer"]) != "" { p2[k] = common.ObjToString(tmp["buyer"]) } else if k == "expurasingtime" && common.ObjToString(p1[k]) != "" { res := getMethod(common.ObjToString(p1[k])) if res != 0 { p2[k] = res } } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v { p2[k] = p1[k] } } arr = append(arr, p2) } if len(arr) > 0 { newTmp[field] = arr } } } else if field == "projectscope" { ps, _ := tmp["projectscope"].(string) newTmp["projectscope"] = ps } else if field == "winnerorder" { //中标候选 winnerorder_new := []map[string]interface{}{} if winnerorder, _ := tmp[field].(primitive.A); len(winnerorder) > 0 { for _, win := range winnerorder { winMap_new := make(map[string]interface{}) winMap := win.(map[string]interface{}) for wf, wftype := range BiddingLevelField[field] { wfv := winMap[wf] if wfv != nil && reflect.TypeOf(wfv).String() == wftype { if wf == "sort" && common.Int64All(wfv) > 100 { continue } winMap_new[wf] = winMap[wf] } } if winMap_new != nil && len(winMap_new) > 0 { winnerorder_new = append(winnerorder_new, winMap_new) } } } if len(winnerorder_new) > 0 { newTmp[field] = winnerorder_new } } else if field == "qualifies" { //项目资质 qs := []string{} if q, _ := tmp[field].(primitive.A); len(q) > 0 { for _, v := range q { v1 := v.(map[string]interface{}) qs = append(qs, common.ObjToString(v1["key"])) } } if len(qs) > 0 { newTmp[field] = strings.Join(qs, ",") } } else if field == "bidopentime" { if tmp[field] != nil && tmp["bidendtime"] == nil { newTmp["bidendtime"] = tmp[field] newTmp[field] = tmp[field] } else if tmp[field] == nil && tmp["bidendtime"] != nil { newTmp["bidendtime"] = tmp[field] newTmp[field] = tmp["bidendtime"] } else { if tmp["bidopentime"] != nil { newTmp[field] = tmp["bidopentime"] } } } else if field == "detail" { //过滤 detail, _ := tmp[field].(string) detail = ClearHtml.ReplaceAllString(detail, "") newTmp[field] = common.ObjToString(tmp["title"]) + " " + detail } else if field == "topscopeclass" || field == "entidlist" { newTmp[field] = tmp[field] } else if field == "_id" { newTmp["_id"] = BsonTOStringId(tmp["_id"]) newTmp["id"] = BsonTOStringId(tmp["_id"]) } else if field == "publishtime" || field == "comeintime" { //字段类型不正确,特别处理 if tmp[field] != nil && common.Int64All(tmp[field]) > 0 { newTmp[field] = common.Int64All(tmp[field]) } } else if field == "package" { delete(newTmp, "package") } else if field == "infoformat" { newTmp[field] = tmp[field] } else { //其它字段判断数据类型,不正确舍弃 if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype && ftype != "" { continue } else { if fieldval != "" { newTmp[field] = fieldval } } } } } newTmp["pici"] = time.Now().Unix() return newTmp } func InitEsBiddingField() { info, _ := Bidding.Find("bidding_processing_field", map[string]interface{}{"stype": "bidding"}, nil, nil) if len(info) > 0 { for _, m := range info { if common.IntAll(m["level"]) == 1 { BiddingField[common.ObjToString(m["field"])] = common.ObjToString(m["ftype"]) } else if common.IntAll(m["level"]) == 2 { pfield := common.ObjToString(m["pfield"]) pfieldMap := BiddingLevelField[pfield] if pfieldMap == nil { pfieldMap = make(map[string]string, 0) } pfieldMap[common.ObjToString(m["field"])] = common.ObjToString(m["ftype"]) BiddingLevelField[pfield] = pfieldMap } } } log.Println("BiddingField es 一级字段数量", len(BiddingField)) log.Println("BiddingLevelField es 二级字段数量", len(BiddingLevelField)) } func getMethod(str string) int64 { // Handle "YYYY" format if TimeV1.MatchString(str) { arr := TimeV1.FindStringSubmatch(str) st := arr[1] + "0000" parseInt, err := strconv.ParseInt(st, 10, 64) if err == nil { return parseInt } } // Handle "YYYYMM" or "YYYY/MM" or "YYYY-MM" or "YYYY.MM" format if TimeV2.MatchString(str) { arr := TimeV2.FindStringSubmatch(str) year := arr[1] month := arr[2] if len(month) == 1 { month = "0" + month } str2 := year + month + "00" parseInt, err := strconv.ParseInt(str2, 10, 64) if err == nil { return parseInt } } // Handle "YYYYMMDD" or "YYYY/MM/DD" or "YYYY-MM-DD" or "YYYY.MM.DD" format if TimeV3.MatchString(str) { match := TimeV3.FindStringSubmatch(str) if len(match) >= 4 { year := match[1] month := match[2] day := match[3] if len(month) == 1 { month = "0" + month } if len(day) == 1 { day = "0" + day } dateStr := year + month + day parseInt, err := strconv.ParseInt(dateStr, 10, 64) if err == nil { return parseInt } } } return 0 } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer common.Catch() switch act { case udp.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("processUdpMsg mapInfo:", mapInfo, err) case udp.OP_NOOP: ok := string(data) log.Println("下节点回应:", ok) } }