package main import ( "fmt" "log" qutil "qfw/util" "reflect" "regexp" "strings" "sync" "time" ) type Info struct { id string //id title string //标题 spidercode string //爬虫代码 area string //省份 city string //城市 subtype string //信息类型 buyer string //采购单位 agency string //代理机构 winner string //中标单位 budget float64 //预算金额 bidamount float64 //中标金额 projectname string //项目名称 projectcode string //项目编号 contractnumber string //合同编号 publishtime int64 //发布时间 comeintime int64 //入库时间 bidopentime int64 //开标时间 bidopenaddress string //开标地点 site string //站点 href string //正文的url repeatid string //重复id titleSpecialWord bool //标题特殊词 specialWord bool //再次判断的特殊词 mergemap map[string]interface{} //合并记录 is_site bool //是否站点城市 repeat_ids []string //记录所有重复id } var datelimit = float64(432000) //五天 var sitelock sync.Mutex //锁 //一般数据判重 type datamap struct { lock sync.Mutex //锁 days int //保留几天数据 data map[string][]*Info keymap []string areakeys []string keys map[string]bool } //历史 func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap { datelimit = qutil.Float64All(days * 86400) dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{},map[string]bool{}} if lasttime <0 { log.Println("数据池空数据") return dm } start := int(time.Now().Unix()) sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) query := map[string]interface{}{"publishtime": map[string]interface{}{ "$lt": lasttime, }} log.Println("query", query) it := sess.DB(data_mgo.DbName).C(extract_back).Find(query).Sort("-publishtime").Iter() n, continuSum := 0, 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 || qutil.IntAll(tmp["dataging"]) == 1 { } else { if fmt.Sprint(reflect.TypeOf(tmp["publishtime"]))=="string" { continue } pt := tmp["publishtime"] pt_time := qutil.Int64All(pt) if pt_time > time.Now().Unix() { continue } if qutil.Float64All(lasttime-pt_time) < datelimit { continuSum++ info := NewInfo(tmp) dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd) k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area) data := dm.data[k] if data == nil { data = []*Info{} } data = append(data, info) dm.data[k] = data dm.keys[dkey] = true //添加省 isAreaExist :=false for _,v:= range dm.areakeys { if v==info.area { isAreaExist = true } } if !isAreaExist { areaArr := dm.areakeys areaArr = append(areaArr,info.area) dm.areakeys = areaArr } } else { break } } tmp = make(map[string]interface{}) } log.Printf("第%d组:数据池构建完成:%d秒,%d个\n",numIndex ,int(time.Now().Unix())-start, n) return dm } //增量 func NewDatamap(days int, lastid string) *datamap { datelimit = qutil.Float64All(days * 86400 * 2) dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{},[]string{}, map[string]bool{}} if lastid == "" { log.Println("不构建数据池") return dm } //初始化加载数据 sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) query := map[string]interface{}{"_id": map[string]interface{}{ "$lte": StringTOBsonId(lastid), }} log.Println("query", query) it := sess.DB(data_mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter() nowTime := time.Now().Unix()//当前时间的时间戳 n, continuSum := 0, 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { //source := util.ObjToMap(tmp["jsondata"]) //修复临时添加 //if util.IntAll((*source)["sourcewebsite"]) == 1 { // continue //} if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1{ } else { if fmt.Sprint(reflect.TypeOf(tmp["publishtime"]))=="string" { continue } pt:= tmp["publishtime"] pt_time := qutil.Int64All(pt) if pt_time > time.Now().Unix() { continue } if qutil.Float64All(nowTime-pt_time) <= datelimit { continuSum++ info := NewInfo(tmp) dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd) k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area) data := dm.data[k] if data == nil { data = []*Info{} } data = append(data, info) dm.data[k] = data dm.keys[dkey] = true //添加省 isAreaExist :=false for _,v:= range dm.areakeys { if v==info.area { isAreaExist = true } } if !isAreaExist { areaArr := dm.areakeys areaArr = append(areaArr,info.area) dm.areakeys = areaArr } } else { break } } if n%10000 == 0 { log.Println("当前 n:", n,"数量:" ,continuSum,tmp["_id"]) } tmp = make(map[string]interface{}) } log.Println("load data:", n,"总数:",continuSum) return dm } //数据构建 func NewInfo(tmp map[string]interface{}) *Info { subtype := qutil.ObjToString(tmp["subtype"]) if subtype=="招标"||subtype=="邀标"||subtype=="询价"|| subtype=="竞谈"||subtype=="竞价" { subtype = "招标" } area := qutil.ObjToString(tmp["area"]) if area == "A" { area = "全国" } info := &Info{} info.id = BsonTOStringId(tmp["_id"]) info.title = qutil.ObjToString(tmp["title"]) info.area = area info.subtype = subtype info.spidercode = qutil.ObjToString(tmp["spidercode"]) info.buyer = qutil.ObjToString(tmp["buyer"]) info.projectname = qutil.ObjToString(tmp["projectname"]) info.contractnumber = qutil.ObjToString(tmp["contractnumber"]) info.projectcode = qutil.ObjToString(tmp["projectcode"]) info.city = qutil.ObjToString(tmp["city"]) info.agency = qutil.ObjToString(tmp["agency"]) info.winner = qutil.ObjToString(tmp["winner"]) info.budget = qutil.Float64All(tmp["budget"]) info.bidamount = qutil.Float64All(tmp["bidamount"]) info.publishtime = qutil.Int64All(tmp["publishtime"]) info.comeintime = qutil.Int64All(tmp["comeintime"]) info.bidopentime = qutil.Int64All(tmp["bidopentime"]) info.bidopenaddress = qutil.ObjToString(tmp["bidopenaddress"]) info.site = qutil.ObjToString(tmp["site"]) info.href = qutil.ObjToString(tmp["href"]) info.repeatid = qutil.ObjToString(tmp["repeatid"]) info.specialWord = FilterRegTitle.MatchString(info.title) info.titleSpecialWord = FilterRegTitle_0.MatchString(info.title) ||FilterRegTitle_1.MatchString(info.title) || FilterRegTitle_2.MatchString(info.title) info.mergemap = *qutil.ObjToMap(tmp["merge"]) if info.mergemap == nil { info.mergemap = make(map[string]interface{}, 0) } if info.repeat_ids == nil { info.repeat_ids = make([]string, 0) } info.is_site = false return info } //判重方法 //判重方法 //判重方法 func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) { reason := "" isTestLog := false keys := []string{} d.lock.Lock() for k, _ := range d.keys { //不同时间段 if info.area=="全国" {//匹配所有省 for _,v := range d.areakeys{ keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, v)) } }else {//匹配指定省 keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area)) } keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国")) } d.lock.Unlock() L: for _, k := range keys { d.lock.Lock() data := d.data[k] d.lock.Unlock() if len(data) > 0 { //对比v 找到同类型,同省或全国的数据作对比 for _, v := range data { reason = "" isTestLog = false if v.id == info.id { //正常重复 return false, v, "" } //buyer 优先级高,有值且不相等过滤 if info.buyer!=""&&v.buyer!=""&&info.buyer!=v.buyer { if v.title != info.title && v.title != "" && info.title != "" { isTestLog = true } if buyerIsContinue(v,info) { continue } } if info.site != "" {//站点临时赋值 sitelock.Lock() dict := SiteMap[info.site] sitelock.Unlock() if dict != nil { if (info.area == "全国" && dict["area"] != "")|| (info.city == "" && dict["city"] != ""){ info.is_site = true info.area = qutil.ObjToString(dict["area"]) info.city = qutil.ObjToString(dict["city"]) } } } //前置条件-五要素均相等 if leadingElementSame(v,info) { reason = "五要素-相同-满足" b = true source = v reasons = reason break L } //前置条件 - 站点相关 if info.site != "" && info.site == v.site { if info.href != "" && info.href == v.href { reason = "同站点-href相同" b = true source = v reasons = reason break L } //相同发布时间-标题无包含关系 - 项目名称不等 if isTheSameDay(info.publishtime,v.publishtime) && !(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) { continue } //不同href if info.href != "" && info.href != v.href { if v.title==info.title{ if !againRepeat(v, info,true) {//进行同站点二次判断 reason = "同站点-href不同-标题相同等" b = true source = v reasons = reason break L }else { continue } }else { if againRepeat(v, info,true) { continue } } } } //特殊词处理 specialNum:= dealWithSpecialWordNumber(info,v) //前置条件 - 标题相关,有且一个关键词 if specialNum==1 { if againRepeat(v, info,false) { continue } } //前置条件3 - 标题相关,均含有关键词 if specialNum==2 { if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 && v.title != "" && info.title != "" { letter1,letter2:=v.title,info.title res, _ := regexp.Compile("[0-9a-zA-Z]+"); if res.MatchString(letter1)||res.MatchString(letter2) { letter1=convertArabicNumeralsAndLetters(letter1) letter2=convertArabicNumeralsAndLetters(letter2) } if strings.Contains(letter1,"重新招标")|| strings.Contains(letter2,"重新招标"){ letter1,letter2=dealWithSpecialPhrases(letter1,letter2) } if letter1==letter2 { reason = reason + "标题关键词相等关系" if !againRepeat(v, info,false) {//进行二级金额判断 b = true source = v reasons = reason break L } }else { if !(strings.Contains(letter1, letter2) || strings.Contains(letter2, letter1)) { //无包含关系-即不相等 if againContainSpecialWord(v, info) { continue } } } } } //新增快速数据过少判重 if LowHeavy { repeat := false if repeat, reason = fastLowQualityHeavy(v, info, reason); repeat { b = true source = v reasons = reason break L } } //代理机构相同-非空相等 if v.agency != "" && info.agency != "" && v.agency == info.agency { reason = reason + "同机构-" repeat := false if repeat, reason = quickHeavyMethodTwo(v, info, reason); repeat { b = true source = v reasons = reason break L } } else { reason = reason + "非同机构-" if info.city != "" && info.city == v.city { reason = reason + "同城-" repeat := false if repeat, reason = quickHeavyMethodTwo(v, info, reason); repeat { b = true source = v reasons = reason break L } } else { reason = reason + "不同城-" repeat := false if repeat, reason = quickHeavyMethodOne(v, info, reason); repeat { b = true source = v reasons = reason break L } } } } } } //往预存数据 d 添加 if !b { ct := info.publishtime dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd) k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area) d.lock.Lock() data := d.data[k] if data == nil { data = []*Info{info} d.data[k] = data if !d.keys[dkey] { d.keys[dkey] = true d.update(ct) } } else { data = append(data, info) d.data[k] = data } //添加省 isAreaExist :=false for _,v:= range d.areakeys { if v==info.area { isAreaExist = true } } if !isAreaExist { areaArr := d.areakeys areaArr = append(areaArr,info.area) d.areakeys = areaArr } d.lock.Unlock() } if isTestLog { reasons = reasons+"-新修改" } return } func (d *datamap) update(t int64) { if TimingTask { }else { if IsFull { d.keymap = d.GetLatelyFiveDay(t)//全量 }else { d.keymap = d.GetLatelyFiveDayDouble(t) //增量 } m := map[string]bool{} for _, v := range d.keymap { m[v] = true } for k, _ := range d.data { if !m[k[:8]] { delete(d.data, k) } } for k, _ := range d.keys { if !m[k] { delete(d.keys, k) } } } } func (d *datamap) GetLatelyFiveDay(t int64) []string { array := make([]string, d.days) now := time.Unix(t, 0) for i := 0; i < d.days; i++ { array[i] = now.Format(qutil.Date_yyyyMMdd) now = now.AddDate(0, 0, -1) } return array } func (d *datamap) GetLatelyFiveDayDouble(t int64) []string {//增量-两倍 array := make([]string, d.days*2) now := time.Now() for i := 0; i < d.days*2; i++ { array[i] = now.Format(qutil.Date_yyyyMMdd) now = now.AddDate(0, 0, -1) } return array } //替换原始数据池-更新 func (d *datamap) replacePoolData(newData *Info) { d.lock.Lock() ct := newData.publishtime dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd) k := fmt.Sprintf("%s_%s_%s", dkey, newData.subtype, newData.area) data := d.data[k] for k, v := range data { if v.id == newData.id {//替换 data[k] = newData break } } d.data[k] = data d.lock.Unlock() } //相互替换数据池-暂时弃用 func (d *datamap) replaceSourceData(newData *Info, oldData *Info) { //删除数据池的老数据 ct_old := oldData.publishtime dkey_old := qutil.FormatDateByInt64(&ct_old, qutil.Date_yyyyMMdd) k_old := fmt.Sprintf("%s_%s_%s", dkey_old, oldData.subtype, oldData.area) data_old := d.data[k_old] for k, v := range data_old { if v.id == oldData.id {//删除对应当前的老数据 data_old = append(data_old[:k], data_old[k+1:]...) break } } d.data[k_old] = data_old //添加新的 ct := newData.publishtime dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd) k := fmt.Sprintf("%s_%s_%s", dkey, newData.subtype, newData.area) d.lock.Lock() data := d.data[k] if data == nil { data = []*Info{newData} d.data[k] = data if !d.keys[dkey] { d.keys[dkey] = true d.update(ct) } } else { data = append(data, newData) d.data[k] = data } //添加省 isAreaExist :=false for _,v:= range d.areakeys { if v==newData.area { isAreaExist = true } } if !isAreaExist { areaArr := d.areakeys areaArr = append(areaArr,newData.area) d.areakeys = areaArr } d.lock.Unlock() } //总计条数-暂时弃用 func (d *datamap) currentTotalCount() int { num:=qutil.IntAll(0) for _,v:=range d.data { num = num+qutil.IntAll(len(v)) } return num }