package service import ( "app.yhyue.com/moapp/dataDeduplication/entity" "app.yhyue.com/moapp/dataDeduplication/rpc/deduplication" "crypto/md5" "fmt" "github.com/go-xorm/xorm" "io" "log" "strconv" "strings" ) //定义orm引擎 var Engine *xorm.Engine type DeduplicationService struct{} var PREFIX = "qc" //数据判重 func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.Request) (*deduplication.Info, string) { log.Println("开始=====") orm := Engine.NewSession() defer orm.Close() // 模运算取企业id number, _ := strconv.Atoi(data.EntId) tableName := PREFIX + fmt.Sprintf("%03d", number%100) // 查询 var rs []*entity.Deduplication var tmpList []string var valueList []interface{} var selectSql string if data.IsEnt { valueList = append(valueList, data.EntId) } else { valueList = append(valueList, data.PersonId, data.EntId) } for _, v := range strings.Split(data.InfoId, ",") { tmpList = append(tmpList, "?") valueList = append(valueList, v) } if data.IsEnt { selectSql = fmt.Sprintf("ent_id=? and info_id in (%s)", strings.Join(tmpList, ",")) } else { selectSql = fmt.Sprintf("person_id = ? and ent_id=? and info_id in (%s)", strings.Join(tmpList, ",")) } log.Println(selectSql) infoIdList := strings.Split(data.InfoId, ",") totalInfoCount := len(infoIdList) err := orm.Table(tableName).Cols("info_id").Where(selectSql, valueList...).Find(&rs) totalExist := len(rs) log.Println(totalExist, "已存在") if err != nil { log.Println(err, "判重查询失败") return &deduplication.Info{ TotalCount: 0, ExistCount: 0, NewCount: 0, IsInsert: false, }, "判重查询失败" } if data.IsInsert { existIdMap := map[string]bool{} for _, v := range rs { existIdMap[v.InfoId] = true } // 开启事务 orm.Begin() // 新增 var insertList []entity.Deduplication for _, id := range infoIdList { if existIdMap[id] { log.Println("id已存在", id) continue } log.Println("新增", id) temData := entity.Deduplication{ InfoId: id, EntId: data.EntId, PersonId: data.PersonId, } insertList = append(insertList, temData) if len(insertList) > 100 { _, err3 := orm.Table(tableName).Insert(insertList) insertList = []entity.Deduplication{} if err3 != nil { orm.Rollback() log.Println(err3, "新增数据失败") return &deduplication.Info{ TotalCount: int64(totalInfoCount), ExistCount: int64(totalExist), NewCount: int64(totalInfoCount - totalExist), IsInsert: false, }, "新增数据失败" } } } if len(insertList) > 0 { _, err3 := orm.Table(tableName).Insert(insertList) if err3 != nil { orm.Rollback() log.Println(err3, "新增数据失败") return &deduplication.Info{ TotalCount: int64(totalInfoCount), ExistCount: int64(totalExist), NewCount: int64(totalInfoCount - totalExist), IsInsert: false, }, "新增数据失败" } } err := orm.Commit() if err != nil { log.Println("提交失败") return &deduplication.Info{ TotalCount: int64(totalInfoCount), ExistCount: int64(totalExist), NewCount: int64(totalInfoCount - totalExist), IsInsert: false, }, "提交失败" } else { log.Println("提交成功") return &deduplication.Info{ TotalCount: int64(totalInfoCount), ExistCount: int64(totalExist), NewCount: int64(totalInfoCount - totalExist), IsInsert: true, }, "" } } return &deduplication.Info{ TotalCount: int64(totalInfoCount), ExistCount: int64(totalExist), NewCount: int64(totalInfoCount - totalExist), IsInsert: false, }, "" } // 根据账户id进行判重 func (service *DeduplicationService) DataDeduplicateByAccountId(data *deduplication.ByAccountRequest) (*deduplication.Info, string) { log.Println("开始=====") orm := Engine.NewSession() defer orm.Close() // 模运算取企业id todo 需要区分是int类型还是mongodb objectid类型 看一下咋转 //var num01 int number, errConv := strconv.Atoi(data.AccountId) log.Println(55555,number,errConv) if errConv!=nil{ log.Println("不是int类型的,hash后再取模寻表") b := a(data.AccountId) ss:=b[len(b)-2:] bt,_:=strconv.ParseInt(ss,16,64) number =int(bt) log.Println("哈希取模后的表序号",number) } tableName := PREFIX + fmt.Sprintf("%03d", number%100) // 查询 //var rs entity.Deduplication var tmpList []string var valueList []interface{} var selectSql string valueList = append(valueList, data.AccountId) valueList = append(valueList, data.DataDesc) for _, v := range strings.Split(data.InfoId, ",") { if strings.TrimSpace(v)!=""{ tmpList = append(tmpList, "?") valueList = append(valueList, v) } } var rs []*entity.Deduplication selectSql = fmt.Sprintf("account_id=? and data_desc=? and info_id in (%s)", strings.Join(tmpList, ",")) infoIdList := strings.Split(data.InfoId, ",") totalInfoCount := len(infoIdList) err := orm.Table(tableName).Cols("info_id").Where(selectSql, valueList...).Find(&rs) if err != nil { log.Println(err, "判重查询失败") return &deduplication.Info{ TotalCount: 0, ExistCount: 0, NewCount: 0, IsInsert: false, }, "判重查询失败" } existInfoIdMap := map[string]bool{} existIdList := []string{} for _, v := range rs { if existInfoIdMap[v.InfoId] { continue }else { existIdList = append(existIdList,v.InfoId) } } count := int64(len(existIdList)) return &deduplication.Info{ TotalCount: int64(totalInfoCount), ExistCount: count, NewCount: int64(totalInfoCount) - count, IsInsert: false, }, "" } // 根据账户id进行判重并存入数据 func (service *DeduplicationService) DataDeduplicateAndSave(data *deduplication.ByAccountRequest) (*deduplication.Info, string) { log.Println("开始=====") orm := Engine.NewSession() defer orm.Close() // 模运算取企业id 企业id是int 类型的直接对100取模 // objectid类型的哈希后取模 这里使用md5后取后两位数字转10进制 然后对100取模 number, errConv := strconv.Atoi(data.AccountId) log.Println(55555,number,errConv) if errConv!=nil{ log.Println("不是int类型的,hash后再取模寻表") b := a(data.AccountId) ss:=b[len(b)-2:] bt,_:=strconv.ParseInt(ss,16,64) number =int(bt) log.Println(bt) } tableName := PREFIX + fmt.Sprintf("%03d", number%100) // 查询 var rs []*entity.Deduplication var tmpList []string var valueList []interface{} var selectSql string valueList = append(valueList, data.AccountId) valueList = append(valueList, data.DataDesc) for _, v := range strings.Split(data.InfoId, ",") { if strings.TrimSpace(v)!=""{ tmpList = append(tmpList, "?") valueList = append(valueList, v) } } selectSql = fmt.Sprintf("account_id=? and data_desc=? and info_id in (%s)", strings.Join(tmpList, ",")) infoIdList := strings.Split(data.InfoId, ",") totalInfoCount := len(infoIdList) err := orm.Table(tableName).Cols("info_id").Where(selectSql, valueList...).Find(&rs) existInfoIdMap := map[string]bool{} existIdList := []string{} for _, v := range rs { if existInfoIdMap[v.InfoId] { continue }else { existIdList = append(existIdList,v.InfoId) } } count := len(existIdList) log.Println(count, "已存在数据量") if err != nil { log.Println(err, "判重查询失败") return &deduplication.Info{ TotalCount: 0, ExistCount: 0, NewCount: 0, }, "判重查询失败" } existIdMap := map[string]bool{} for _, v := range rs { existIdMap[v.InfoId] = true } // 新增 var insertList []entity.Deduplication for _, id := range infoIdList { if existIdMap[id] { log.Println("id已存在", id) continue } log.Println("新增", id) temData := entity.Deduplication{ InfoId: id, EntId: "0", PersonId: data.PersonId, AccountId: data.AccountId, DataDesc: data.DataDesc, } insertList = append(insertList, temData) } go SaveMysql(tableName,insertList) return &deduplication.Info{ TotalCount: int64(totalInfoCount), ExistCount: int64(count), NewCount: int64(totalInfoCount - count), }, "" } func (service *DeduplicationService) EntCount(data *deduplication.GetEntCountRequest) (int64, string) { log.Println("开始=====") orm := Engine.NewSession() defer orm.Close() // 模运算取企业id number, _ := strconv.Atoi(data.EntId) tableName := PREFIX + fmt.Sprintf("%03d", number%100) // 查询 var rs entity.Deduplication count, err := orm.Table(tableName).Where("ent_id=?", data.EntId).Count(rs) if err != nil { return 0, "查询失败" } log.Println("count=====", count) return count, "" } func a(data string) string { t := md5.New() io.WriteString(t, data) return fmt.Sprintf("%x", t.Sum(nil)) } func SaveMysql(tableName string,saveList []entity.Deduplication) { // log.Println("保存数据开始") orm := Engine.NewSession() defer orm.Close() orm.Begin() insertList := []entity.Deduplication{} for _, saveData := range saveList { insertList = append(insertList, saveData) if len(insertList) > 500 { _, err3 := orm.Table(tableName).Insert(insertList) insertList = []entity.Deduplication{} if err3 != nil { orm.Rollback() log.Println(err3, "新增数据失败") } } } if len(insertList) > 0 { _, err3 := orm.Table(tableName).Insert(insertList) if err3 != nil { orm.Rollback() log.Println(err3, "新增数据失败") } } err2 := orm.Commit() log.Println("err2", err2) log.Println("保存数据结束") }