|
@@ -58,7 +58,7 @@ var (
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
- return
|
|
|
+ //return
|
|
|
flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
|
|
|
flag.StringVar(>id, "gtid", "", "历史增量的起始id") //历史
|
|
|
flag.StringVar(>ept, "gtept", "", "全量gte发布时间")//全量区间pt
|
|
@@ -130,7 +130,7 @@ func init() {
|
|
|
}
|
|
|
|
|
|
|
|
|
-func mainT() {
|
|
|
+func main() {
|
|
|
|
|
|
go checkMapJob()
|
|
|
updport := Sysconfig["udpport"].(string)
|
|
@@ -165,35 +165,13 @@ func mainT() {
|
|
|
}
|
|
|
|
|
|
//测试组人员使用
|
|
|
-func main() {
|
|
|
- //dealBuyerAlias() //生成别名
|
|
|
- //dealWithBuyerNameAliasRecord()//处理数据-别名相关
|
|
|
- //dealRepeatAliasBuyerName() //处理多余重复
|
|
|
- //exportXiuFuData()
|
|
|
- //buyerRedisXiuFuData()//更新别名
|
|
|
- //exportAllBuyerAlias() //导出测试数据
|
|
|
-
|
|
|
-
|
|
|
- //dealWithAddressData()//城市
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- exportTestBiddingData()
|
|
|
-
|
|
|
- //testmain()
|
|
|
-
|
|
|
- return
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+func mainT() {
|
|
|
|
|
|
if TimingTask {
|
|
|
go historyTaskDay()
|
|
|
time.Sleep(99999 * time.Hour)
|
|
|
} else {
|
|
|
IsFull = true //全量判重
|
|
|
-
|
|
|
sid := "1fffffffffffffffffffffff"
|
|
|
eid := "9fffffffffffffffffffffff"
|
|
|
mapinfo := map[string]interface{}{}
|
|
@@ -211,7 +189,7 @@ func main() {
|
|
|
time.Sleep(99999 * time.Hour)
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+//upd接收
|
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
fmt.Println("接受的段数据")
|
|
|
switch act {
|
|
@@ -239,8 +217,6 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
//开始判重程序
|
|
|
func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
log.Println("开始数据判重")
|
|
@@ -277,24 +253,24 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
if n%10000 == 0 {
|
|
|
log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
|
|
|
}
|
|
|
- source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
|
|
|
- if util.IntAll((*source)["sourcewebsite"]) == 1 {
|
|
|
- repeateN++
|
|
|
- Update.updatePool <- []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": tmp["_id"],
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "repeat": 1,
|
|
|
- "dataging":0,
|
|
|
- "repeat_reason": "sourcewebsite为1,重复",
|
|
|
- },
|
|
|
- },
|
|
|
- }
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- continue
|
|
|
- }
|
|
|
+ //source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
|
|
|
+ //if util.IntAll((*source)["sourcewebsite"]) == 1 {
|
|
|
+ // repeateN++
|
|
|
+ // Update.updatePool <- []map[string]interface{}{
|
|
|
+ // map[string]interface{}{
|
|
|
+ // "_id": tmp["_id"],
|
|
|
+ // },
|
|
|
+ // map[string]interface{}{
|
|
|
+ // "$set": map[string]interface{}{
|
|
|
+ // "repeat": 1,
|
|
|
+ // "dataging":0,
|
|
|
+ // "repeat_reason": "sourcewebsite为1,重复",
|
|
|
+ // },
|
|
|
+ // },
|
|
|
+ // }
|
|
|
+ // tmp = make(map[string]interface{})
|
|
|
+ // continue
|
|
|
+ //}
|
|
|
if util.IntAll(tmp["repeat"]) == 1 {
|
|
|
repeateN++
|
|
|
tmp = make(map[string]interface{})
|
|
@@ -381,8 +357,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
+//历史判重
|
|
|
func historyTaskDay() {
|
|
|
defer util.Catch()
|
|
|
|
|
@@ -436,25 +411,25 @@ func historyTaskDay() {
|
|
|
if num%10000 == 0 {
|
|
|
log.Println("正序遍历:", num)
|
|
|
}
|
|
|
- source := util.ObjToMap(tmp["jsondata"])
|
|
|
- if util.IntAll((*source)["sourcewebsite"]) == 1 {
|
|
|
- outnum++
|
|
|
- Update.updatePool <- []map[string]interface{}{//重复数据打标签
|
|
|
- map[string]interface{}{
|
|
|
- "_id": tmp["_id"],
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "repeat": 1,
|
|
|
- "dataging": 0,
|
|
|
- "history_updatetime":util.Int64All(time.Now().Unix()),
|
|
|
- "repeat_reason": "sourcewebsite为1 重复",
|
|
|
- },
|
|
|
- },
|
|
|
- }
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- continue
|
|
|
- }
|
|
|
+ //source := util.ObjToMap(tmp["jsondata"])
|
|
|
+ //if util.IntAll((*source)["sourcewebsite"]) == 1 {
|
|
|
+ // outnum++
|
|
|
+ // Update.updatePool <- []map[string]interface{}{//重复数据打标签
|
|
|
+ // map[string]interface{}{
|
|
|
+ // "_id": tmp["_id"],
|
|
|
+ // },
|
|
|
+ // map[string]interface{}{
|
|
|
+ // "$set": map[string]interface{}{
|
|
|
+ // "repeat": 1,
|
|
|
+ // "dataging": 0,
|
|
|
+ // "history_updatetime":util.Int64All(time.Now().Unix()),
|
|
|
+ // "repeat_reason": "sourcewebsite为1 重复",
|
|
|
+ // },
|
|
|
+ // },
|
|
|
+ // }
|
|
|
+ // tmp = make(map[string]interface{})
|
|
|
+ // continue
|
|
|
+ //}
|
|
|
|
|
|
//取-符合-发布时间X年内的数据
|
|
|
if util.IntAll(tmp["dataging"]) == 1 {
|
|
@@ -672,7 +647,6 @@ func judgeIsCurIds (gtid string,lteid string,curid string) bool {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
-
|
|
|
//迁移上一段数据
|
|
|
func moveHistoryData(startid string,endid string) {
|
|
|
sess := mgo.GetMgoConn()
|
|
@@ -699,7 +673,7 @@ func moveHistoryData(startid string,endid string) {
|
|
|
qv := map[string]interface{}{
|
|
|
"comeintime": map[string]interface{}{
|
|
|
"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
|
|
|
- },
|
|
|
+ },
|
|
|
}
|
|
|
delnum := mgo.Delete(extract, qv)
|
|
|
log.Println("remove from ", extract, delnum)
|