|
@@ -18,7 +18,7 @@ var Engine *xorm.Engine
|
|
|
|
|
|
type DeduplicationService struct{}
|
|
|
|
|
|
-var PREFIX = "qc"
|
|
|
+var PREFIX = "qc_test_"
|
|
|
|
|
|
//数据判重
|
|
|
func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.Request) (*deduplication.Info, string) {
|
|
@@ -67,6 +67,8 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
|
|
|
log.Println(totalInfoCount, "本批次去重后总量")
|
|
|
log.Println(totalInfoCount-totalExist, "newCount")
|
|
|
if data.IsInsert {
|
|
|
+ insertSql := fmt.Sprintf("insert IGNORE into %s (info_id,ent_id,person_id) values ", tableName)
|
|
|
+ parmList := []string{}
|
|
|
existIdMap := map[string]bool{}
|
|
|
for _, v := range rs {
|
|
|
existIdMap[v.InfoId] = true
|
|
@@ -74,22 +76,23 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
|
|
|
// 开启事务
|
|
|
orm.Begin()
|
|
|
// 新增
|
|
|
- var insertList []entity.Deduplication
|
|
|
+ //var insertList []entity.Deduplication
|
|
|
+ var insertList []interface{}
|
|
|
for id, _ := range mapData {
|
|
|
if existIdMap[id] {
|
|
|
log.Println("id已存在", id)
|
|
|
continue
|
|
|
}
|
|
|
log.Println("新增", id)
|
|
|
- temData := entity.Deduplication{
|
|
|
- InfoId: id,
|
|
|
- EntId: data.EntId,
|
|
|
- PersonId: data.PersonId,
|
|
|
- }
|
|
|
- insertList = append(insertList, temData)
|
|
|
- if len(insertList) > 100 {
|
|
|
- _, err3 := orm.Table(tableName).Insert(insertList)
|
|
|
- insertList = []entity.Deduplication{}
|
|
|
+ parmList = append(parmList, "(?,?,?)")
|
|
|
+ insertList = append(insertList, id, data.EntId, data.PersonId)
|
|
|
+ if len(insertList) >= 100 {
|
|
|
+ insertSql = fmt.Sprintf("%s %s", insertSql, strings.Join(parmList, ","))
|
|
|
+ tmp := []interface{}{insertSql}
|
|
|
+ tmp = append(tmp, insertList...)
|
|
|
+ _, err3 := orm.Exec(tmp...)
|
|
|
+ insertList = []interface{}{}
|
|
|
+ parmList = []string{}
|
|
|
if err3 != nil {
|
|
|
orm.Rollback()
|
|
|
log.Println(err3, "新增数据失败")
|
|
@@ -103,7 +106,10 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
|
|
|
}
|
|
|
}
|
|
|
if len(insertList) > 0 {
|
|
|
- _, err3 := orm.Table(tableName).Insert(insertList)
|
|
|
+ insertSql = fmt.Sprintf("%s %s", insertSql, strings.Join(parmList, ","))
|
|
|
+ tmp := []interface{}{insertSql}
|
|
|
+ tmp = append(tmp, insertList...)
|
|
|
+ _, err3 := orm.Exec(tmp...)
|
|
|
if err3 != nil {
|
|
|
orm.Rollback()
|
|
|
log.Println(err3, "新增数据失败")
|
|
@@ -149,8 +155,7 @@ func (service *DeduplicationService) DataDeduplicateByAccountId(data *deduplicat
|
|
|
log.Println("开始=====")
|
|
|
orm := Engine.NewSession()
|
|
|
defer orm.Close()
|
|
|
- // 模运算取企业id todo 需要区分是int类型还是mongodb objectid类型 看一下咋转
|
|
|
- //var num01 int
|
|
|
+ // 模运算取企业id 需要区分是int类型还是mongodb objectid类型
|
|
|
number, errConv := strconv.Atoi(data.AccountId)
|
|
|
log.Println("账户id转换:", number, errConv)
|
|
|
if errConv != nil {
|
|
@@ -212,7 +217,7 @@ func (service *DeduplicationService) DataDeduplicateAndSave(data *deduplication.
|
|
|
// 模运算取企业id 企业id是int 类型的直接对100取模
|
|
|
// objectid类型的哈希后取模 这里使用md5后取后两位数字转10进制 然后对100取模
|
|
|
number, errConv := strconv.Atoi(data.AccountId)
|
|
|
- log.Println(55555, number, errConv)
|
|
|
+ log.Println("数据判重:判断是否int类型", number, errConv)
|
|
|
if errConv != nil {
|
|
|
log.Println("不是int类型的,hash后再取模寻表")
|
|
|
b := a(data.AccountId)
|
|
@@ -276,7 +281,6 @@ func (service *DeduplicationService) DataDeduplicateAndSave(data *deduplication.
|
|
|
insertList = append(insertList, temData)
|
|
|
|
|
|
}
|
|
|
- //log.Println(totalInfoCount, count, "88888888")
|
|
|
log.Println(len(insertList))
|
|
|
go SaveMysql(tableName, insertList)
|
|
|
log.Println("结束=====", time.Now())
|
|
@@ -317,12 +321,17 @@ func SaveMysql(tableName string, saveList []entity.Deduplication) {
|
|
|
orm := Engine.NewSession()
|
|
|
defer orm.Close()
|
|
|
orm.Begin()
|
|
|
- insertList := []entity.Deduplication{}
|
|
|
+ insertList := []interface{}{}
|
|
|
+ insertSql := fmt.Sprintf("insert IGNORE into %s (info_id,ent_id,person_id,account_id,data_desc) values ", tableName)
|
|
|
+ parmList := []string{}
|
|
|
for _, saveData := range saveList {
|
|
|
- insertList = append(insertList, saveData)
|
|
|
+ parmList = append(parmList, "(?,?,?,?,?)")
|
|
|
+ insertList = append(insertList, saveData.InfoId, saveData.EntId, saveData.PersonId, saveData.AccountId, saveData.DataDesc)
|
|
|
if len(insertList) > 500 {
|
|
|
- _, err3 := orm.Table(tableName).Insert(insertList)
|
|
|
- insertList = []entity.Deduplication{}
|
|
|
+ insertSql = fmt.Sprintf("%s %s", insertSql, strings.Join(parmList, ","))
|
|
|
+ tmp := []interface{}{insertSql}
|
|
|
+ tmp = append(tmp, insertList...)
|
|
|
+ _, err3 := orm.Exec(tmp...)
|
|
|
if err3 != nil {
|
|
|
orm.Rollback()
|
|
|
log.Println(err3, "新增数据失败")
|
|
@@ -330,11 +339,13 @@ func SaveMysql(tableName string, saveList []entity.Deduplication) {
|
|
|
}
|
|
|
}
|
|
|
if len(insertList) > 0 {
|
|
|
- _, err3 := orm.Table(tableName).Insert(insertList)
|
|
|
+ insertSql = fmt.Sprintf("%s %s", insertSql, strings.Join(parmList, ","))
|
|
|
+ tmp := []interface{}{insertSql}
|
|
|
+ tmp = append(tmp, insertList...)
|
|
|
+ _, err3 := orm.Exec(tmp...)
|
|
|
if err3 != nil {
|
|
|
orm.Rollback()
|
|
|
log.Println(err3, "新增数据失败")
|
|
|
-
|
|
|
}
|
|
|
|
|
|
}
|