|
@@ -0,0 +1,285 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ "gopkg.in/mgo.v2/bson"
|
|
|
+ "log"
|
|
|
+ "qfw/mongodb"
|
|
|
+ qu "qfw/util"
|
|
|
+ "qfw/util/mysql"
|
|
|
+ "qfw/util/redis"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ Sysconfig map[string]interface{}
|
|
|
+ JyMysql *mysql.Mysql
|
|
|
+ JyzhshiMysql *mysql.Mysql
|
|
|
+ JypushzhshiMysql *mysql.Mysql
|
|
|
+ Mgo *mongodb.MongodbSim
|
|
|
+ BuryClassType map[string]interface{}
|
|
|
+ ProvinceMap map[string]interface{}
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ qu.ReadConfig(&Sysconfig)
|
|
|
+ //jyMysql := qu.ObjToMap(Sysconfig["mysql"])
|
|
|
+ //JyMysql = &mysql.Mysql{
|
|
|
+ // Address: (*jyMysql)["address"].(string),
|
|
|
+ // UserName: (*jyMysql)["username"].(string),
|
|
|
+ // PassWord: (*jyMysql)["password"].(string),
|
|
|
+ // DBName: (*jyMysql)["dbName"].(string),
|
|
|
+ //}
|
|
|
+ //JyMysql.Init()
|
|
|
+ //jyzhshiMysql := qu.ObjToMap(Sysconfig["jyzhengshimysql"])
|
|
|
+ //JyzhshiMysql = &mysql.Mysql{
|
|
|
+ // Address: (*jyzhshiMysql)["address"].(string),
|
|
|
+ // UserName: (*jyzhshiMysql)["username"].(string),
|
|
|
+ // PassWord: (*jyzhshiMysql)["password"].(string),
|
|
|
+ // DBName: (*jyzhshiMysql)["dbName"].(string),
|
|
|
+ //}
|
|
|
+ //JyzhshiMysql.Init()
|
|
|
+ //jypushzhshiMysql := qu.ObjToMap(Sysconfig["pushzhengshimysql"])
|
|
|
+ //JypushzhshiMysql = &mysql.Mysql{
|
|
|
+ // Address: (*jypushzhshiMysql)["address"].(string),
|
|
|
+ // UserName: (*jypushzhshiMysql)["username"].(string),
|
|
|
+ // PassWord: (*jypushzhshiMysql)["password"].(string),
|
|
|
+ // DBName: (*jypushzhshiMysql)["dbName"].(string),
|
|
|
+ //}
|
|
|
+ //JypushzhshiMysql.Init()
|
|
|
+ mogConfig := Sysconfig["mgo"].(map[string]interface{})
|
|
|
+ Mgo = &mongodb.MongodbSim{
|
|
|
+ MongodbAddr: qu.ObjToString(mogConfig["addr"]),
|
|
|
+ DbName: qu.ObjToString(mogConfig["db"]),
|
|
|
+ Size: qu.IntAllDef(mogConfig["size"], 15),
|
|
|
+ }
|
|
|
+ Mgo.InitPool()
|
|
|
+
|
|
|
+ redis.InitRedis(qu.ObjToString(Sysconfig["redis_addrs"]))
|
|
|
+ //
|
|
|
+ //initBuryClassType()
|
|
|
+ //initProvince()
|
|
|
+}
|
|
|
+func pushtj() {
|
|
|
+
|
|
|
+ // 查剑鱼主库用户
|
|
|
+ userRs := JyzhshiMysql.SelectBySql("select id,`name`,phone from entniche_user WHERE ent_id=10511")
|
|
|
+ //idRs :=JyzhshiMysql.SelectBySql("select id from entniche_user WHERE ent_id=10511")
|
|
|
+ log.Println(userRs)
|
|
|
+ idStrList := []string{}
|
|
|
+ for _, user := range *userRs {
|
|
|
+ //log.Println( user["id"],qu.ObjToString( user["id"]))
|
|
|
+ idStrList =append(idStrList,strconv.FormatInt(user["id"].(int64), 10))
|
|
|
+ }
|
|
|
+ idStr :=strings.Join(idStrList,",")
|
|
|
+ log.Println(idStr)
|
|
|
+ pushtjSql:="SELECT userid,COUNT(*) count FROM pushentniche WHERE userid in (" + idStr + ") and date <1625068800 GROUP BY userid "
|
|
|
+ // 聚合推送库信息
|
|
|
+ tjRs := JypushzhshiMysql.SelectBySql(pushtjSql)
|
|
|
+ tjMap := map[int64]interface{}{}
|
|
|
+ for _,v := range *tjRs{
|
|
|
+ tjMap[v["userid"].(int64)]=v["count"]
|
|
|
+ }
|
|
|
+ log.Println(tjMap)
|
|
|
+
|
|
|
+ for _,user := range *userRs{
|
|
|
+ count := tjMap[user["id"].(int64)]
|
|
|
+ if count == nil{
|
|
|
+ count =0
|
|
|
+ }
|
|
|
+ user["count"] = count
|
|
|
+ //fmt.Println(user["name"])
|
|
|
+ //fmt.Println(user["phone"],)
|
|
|
+ //fmt.Println(user["count"])
|
|
|
+ //if count !=0{
|
|
|
+ fmt.Println(user["name"],user["phone"],user["count"])
|
|
|
+
|
|
|
+ //}
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+func main() {
|
|
|
+ //dtafilter()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ // 线上线下去重统计
|
|
|
+ dataImport()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ var res []bson.M
|
|
|
+ //线上去重统计
|
|
|
+ err := sess.DB(Mgo.DbName).C("20210701kdxfentdataexport").Pipe([]bson.M{
|
|
|
+ //bson.M{"$match": bson.M{"createtime": bson.M{ "$lt":1619280000}}},
|
|
|
+ bson.M{"$group": bson.M{"_id": bson.M{"id": "$infoid"}, "count": bson.M{"$sum": 1}}},
|
|
|
+ }).All(&res)
|
|
|
+ fmt.Println(err)
|
|
|
+ fmt.Println("线上数据去重后数据量:", len(res))
|
|
|
+ //pushtj()
|
|
|
+ // 推送数据统计
|
|
|
+}
|
|
|
+
|
|
|
+func dtafilter() {
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "entid": 10511,
|
|
|
+ }
|
|
|
+ fields := map[string]int{
|
|
|
+ "_id": 1,
|
|
|
+ "infoid": 1,
|
|
|
+ }
|
|
|
+ it := sess.DB("jyqyfw_historyData").C("20201222Kdxf_entexportdata").Find(query).Select(fields).Sort("_id").Iter()
|
|
|
+ var count = 0
|
|
|
+ var ccc = 0
|
|
|
+ kdxfData := map[string]bool{}
|
|
|
+ for m := make(map[string]interface{}); it.Next(&m); {
|
|
|
+ ccc++
|
|
|
+ if kdxfData[qu.ObjToString(m["infoid"])] {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ kdxfData[qu.ObjToString(m["infoid"])] = true
|
|
|
+ count++
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Println("加载20201222Kdxf_entexportdata数据完成", len(kdxfData), count, ccc)
|
|
|
+}
|
|
|
+
|
|
|
+func dataImport() {
|
|
|
+ entid := qu.IntAll(Sysconfig["entid"])
|
|
|
+ query := map[string]interface{}{}
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ //fields := Sysconfig["Fields"].(map[string]int)
|
|
|
+ fields := map[string]int{
|
|
|
+ "_id": 1,
|
|
|
+ "area": 1,
|
|
|
+ "buyerclass": 1,
|
|
|
+ "city": 1,
|
|
|
+ "subtype": 1,
|
|
|
+ "toptype": 1,
|
|
|
+ "matchkey": 1,
|
|
|
+ "id": 1,
|
|
|
+ }
|
|
|
+ query["appid"] = "jyHDhXQQIAAgdZQEBLERV2"
|
|
|
+ //线下导出数据内部去重
|
|
|
+ it := sess.DB("jyqyfw_historyData").C("20201126Kdxf").Find(map[string]interface{}{"createtime": map[string]interface{}{"$lt": 1625068800}}).Select(fields).Sort("_id").Iter()
|
|
|
+ var filterCount = 0
|
|
|
+ var saveCount = 0
|
|
|
+ var count = 0
|
|
|
+ kdxfData := map[string]bool{}
|
|
|
+ for m := make(map[string]interface{}); it.Next(&m); {
|
|
|
+ //先进行redis去重
|
|
|
+ /*isExist, err := redis.Exists("other", "entexportdata_"+qu.ObjToString(m["id"])+"_"+fmt.Sprintln(entid))
|
|
|
+ if err != nil {
|
|
|
+ log.Println("企业订阅数据导出redis判重失败")
|
|
|
+ continue
|
|
|
+ } else if isExist {
|
|
|
+ filterCount++
|
|
|
+ log.Println("数据重复,id ", qu.ObjToString(m["id"]), "entid ", entid)
|
|
|
+ continue
|
|
|
+ }*/
|
|
|
+ if kdxfData[qu.ObjToString(m["id"])] {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ kdxfData[qu.ObjToString(m["id"])] = true
|
|
|
+ count++
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Println("加载20201126Kdxf完成", len(kdxfData), count)
|
|
|
+ log.Println("加载20201126Kdxf完成", len(kdxfData), count)
|
|
|
+ //线下数据跟线上导出数据去重
|
|
|
+ //todo 赵珑月 导过数据后改表名 表名会变 建索引
|
|
|
+ for k, _ := range kdxfData {
|
|
|
+ //log.Println(k)
|
|
|
+ cm := Mgo.Count("20210701kdxfentdataexport", map[string]interface{}{"infoid": qu.ObjToString(k)})
|
|
|
+ if cm > 0 {
|
|
|
+ log.Println("20210701kdxfentdataexport,id ", qu.ObjToString(k), "entid ", entid)
|
|
|
+ filterCount++
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ /*cy := JyMysql.Count("pushentniche_test", map[string]interface{}{"infoid": qu.ObjToString(k)})
|
|
|
+ if cy > 0 {
|
|
|
+ log.Println("pushentniche_test数据重复,id ", qu.ObjToString(k), "entid ", entid)
|
|
|
+ filterCount++
|
|
|
+ continue
|
|
|
+ }*/
|
|
|
+ saveCount++
|
|
|
+ log.Println("重复数据量:", filterCount, "线上线下去重后数据量:", saveCount)
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("数据处理完成。。。。")
|
|
|
+ log.Println("线上线下重复数据量:", filterCount, "线上线下去重后数据量:", saveCount, "线下数据内部去重后数据量:", count)
|
|
|
+}
|
|
|
+
|
|
|
+func initBuryClassType() {
|
|
|
+ infoType := JyMysql.SelectBySql("SELECT id,name FROM infotype")
|
|
|
+ BuryClassType = make(map[string]interface{})
|
|
|
+ for _, val := range *infoType {
|
|
|
+ k := qu.ObjToString(val["name"])
|
|
|
+ BuryClassType[k] = val["id"]
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func initProvince() {
|
|
|
+ proArr := JyMysql.SelectBySql("SELECT id,name FROM province")
|
|
|
+ ProvinceMap = make(map[string]interface{})
|
|
|
+ for _, val := range *proArr {
|
|
|
+ k := qu.ObjToString(val["name"])
|
|
|
+ ProvinceMap[k] = val["id"]
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//历史数据存到mysql pushentniche推送表中
|
|
|
+func saveMysqlPush(tmp map[string]interface{}, entId int) bool {
|
|
|
+ saveMap := map[string]interface{}{}
|
|
|
+ saveMap["entid"] = entId
|
|
|
+ infoid := qu.ObjToString(tmp["id"])
|
|
|
+ saveMap["infoid"] = infoid
|
|
|
+ /*matchkeys := qu.ObjToString(tmp["matchkey"])
|
|
|
+ if matchkeys != "" {
|
|
|
+ matchkeysStr := ""
|
|
|
+ matchkeysArr := strings.Split(matchkeys, ",")
|
|
|
+ for i, val := range matchkeysArr {
|
|
|
+ if i < len(matchkeysArr)-1 {
|
|
|
+ matchkeysStr += val + " "
|
|
|
+ } else {
|
|
|
+ matchkeysStr += val
|
|
|
+ }
|
|
|
+ }
|
|
|
+ saveMap["matchkeys"] = matchkeysStr
|
|
|
+ }
|
|
|
+ area := qu.ObjToString(tmp["area"])
|
|
|
+ city := qu.ObjToString(tmp["city"])
|
|
|
+ buyerClass := qu.ObjToString(tmp["buyerclass"])
|
|
|
+ toptype := qu.ObjToString(tmp["toptype"])
|
|
|
+ subtype := qu.ObjToString(tmp["subtype"])
|
|
|
+ saveMap["deptid"] = 0
|
|
|
+ saveMap["userid"] = 0
|
|
|
+ saveMap["date"] = time.Now().Unix()
|
|
|
+ if area != "" {
|
|
|
+ saveMap["area"] = qu.IntAll(ProvinceMap[area])
|
|
|
+ }
|
|
|
+ if city != "" {
|
|
|
+ saveMap["city"] = qu.IntAll(ProvinceMap[city])
|
|
|
+ }
|
|
|
+ if buyerClass != "" {
|
|
|
+ saveMap["buyerclass"] = qu.IntAll(BuryClassType[buyerClass])
|
|
|
+ }
|
|
|
+ if toptype != "" {
|
|
|
+ saveMap["toptype"] = qu.IntAll(BuryClassType[toptype])
|
|
|
+ }
|
|
|
+ if subtype != "" {
|
|
|
+ saveMap["subtype"] = qu.IntAll(BuryClassType[subtype])
|
|
|
+ }*/
|
|
|
+ b := JyMysql.Insert("pushentniche_test", saveMap)
|
|
|
+ if b <= 0 {
|
|
|
+ log.Println("数据插入pushentniche_test表失败,id ", qu.ObjToString(tmp["id"]), "entid ", entId)
|
|
|
+ return false
|
|
|
+ } /*else {
|
|
|
+ redis.Put("other", "entexportdata_"+infoid+"_"+fmt.Sprintln(entId), 1, -1)
|
|
|
+ }*/
|
|
|
+ return true
|
|
|
+}
|