deduplication.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. package service
  2. import (
  3. "app.yhyue.com/moapp/dataDeduplication/entity"
  4. "app.yhyue.com/moapp/dataDeduplication/rpc/deduplication"
  5. "crypto/md5"
  6. "fmt"
  7. "github.com/go-xorm/xorm"
  8. "io"
  9. "log"
  10. "strconv"
  11. "strings"
  12. )
  13. //定义orm引擎
  14. var Engine *xorm.Engine
  15. type DeduplicationService struct{}
  16. var PREFIX = "qc"
  17. //数据判重
  18. func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.Request) (*deduplication.Info, string) {
  19. log.Println("开始=====")
  20. orm := Engine.NewSession()
  21. defer orm.Close()
  22. // 模运算取企业id
  23. number, _ := strconv.Atoi(data.EntId)
  24. tableName := PREFIX + fmt.Sprintf("%03d", number%100)
  25. // 查询
  26. var rs []*entity.Deduplication
  27. var tmpList []string
  28. var valueList []interface{}
  29. var selectSql string
  30. if data.IsEnt {
  31. valueList = append(valueList, data.EntId)
  32. } else {
  33. valueList = append(valueList, data.PersonId, data.EntId)
  34. }
  35. for _, v := range strings.Split(data.InfoId, ",") {
  36. tmpList = append(tmpList, "?")
  37. valueList = append(valueList, v)
  38. }
  39. if data.IsEnt {
  40. selectSql = fmt.Sprintf("ent_id=? and info_id in (%s)", strings.Join(tmpList, ","))
  41. } else {
  42. selectSql = fmt.Sprintf("person_id = ? and ent_id=? and info_id in (%s)", strings.Join(tmpList, ","))
  43. }
  44. log.Println(selectSql)
  45. infoIdList := strings.Split(data.InfoId, ",")
  46. totalInfoCount := len(infoIdList)
  47. err := orm.Table(tableName).Cols("info_id").Where(selectSql, valueList...).Find(&rs)
  48. totalExist := len(rs)
  49. log.Println(totalExist, "已存在")
  50. if err != nil {
  51. log.Println(err, "判重查询失败")
  52. return &deduplication.Info{
  53. TotalCount: 0,
  54. ExistCount: 0,
  55. NewCount: 0,
  56. IsInsert: false,
  57. }, "判重查询失败"
  58. }
  59. if data.IsInsert {
  60. existIdMap := map[string]bool{}
  61. for _, v := range rs {
  62. existIdMap[v.InfoId] = true
  63. }
  64. // 开启事务
  65. orm.Begin()
  66. // 新增
  67. var insertList []entity.Deduplication
  68. for _, id := range infoIdList {
  69. if existIdMap[id] {
  70. log.Println("id已存在", id)
  71. continue
  72. }
  73. log.Println("新增", id)
  74. temData := entity.Deduplication{
  75. InfoId: id,
  76. EntId: data.EntId,
  77. PersonId: data.PersonId,
  78. }
  79. insertList = append(insertList, temData)
  80. if len(insertList) > 100 {
  81. _, err3 := orm.Table(tableName).Insert(insertList)
  82. insertList = []entity.Deduplication{}
  83. if err3 != nil {
  84. orm.Rollback()
  85. log.Println(err3, "新增数据失败")
  86. return &deduplication.Info{
  87. TotalCount: int64(totalInfoCount),
  88. ExistCount: int64(totalExist),
  89. NewCount: int64(totalInfoCount - totalExist),
  90. IsInsert: false,
  91. }, "新增数据失败"
  92. }
  93. }
  94. }
  95. if len(insertList) > 0 {
  96. _, err3 := orm.Table(tableName).Insert(insertList)
  97. if err3 != nil {
  98. orm.Rollback()
  99. log.Println(err3, "新增数据失败")
  100. return &deduplication.Info{
  101. TotalCount: int64(totalInfoCount),
  102. ExistCount: int64(totalExist),
  103. NewCount: int64(totalInfoCount - totalExist),
  104. IsInsert: false,
  105. }, "新增数据失败"
  106. }
  107. }
  108. err := orm.Commit()
  109. if err != nil {
  110. log.Println("提交失败")
  111. return &deduplication.Info{
  112. TotalCount: int64(totalInfoCount),
  113. ExistCount: int64(totalExist),
  114. NewCount: int64(totalInfoCount - totalExist),
  115. IsInsert: false,
  116. }, "提交失败"
  117. } else {
  118. log.Println("提交成功")
  119. return &deduplication.Info{
  120. TotalCount: int64(totalInfoCount),
  121. ExistCount: int64(totalExist),
  122. NewCount: int64(totalInfoCount - totalExist),
  123. IsInsert: true,
  124. }, ""
  125. }
  126. }
  127. return &deduplication.Info{
  128. TotalCount: int64(totalInfoCount),
  129. ExistCount: int64(totalExist),
  130. NewCount: int64(totalInfoCount - totalExist),
  131. IsInsert: false,
  132. }, ""
  133. }
  134. // 根据账户id进行判重
  135. func (service *DeduplicationService) DataDeduplicateByAccountId(data *deduplication.ByAccountRequest) (*deduplication.Info, string) {
  136. log.Println("开始=====")
  137. orm := Engine.NewSession()
  138. defer orm.Close()
  139. // 模运算取企业id todo 需要区分是int类型还是mongodb objectid类型 看一下咋转
  140. //var num01 int
  141. number, errConv := strconv.Atoi(data.AccountId)
  142. log.Println(55555,number,errConv)
  143. if errConv!=nil{
  144. log.Println("不是int类型的,hash后再取模寻表")
  145. b := a(data.AccountId)
  146. ss:=b[len(b)-2:]
  147. bt,_:=strconv.ParseInt(ss,16,64)
  148. number =int(bt)
  149. log.Println("哈希取模后的表序号",number)
  150. }
  151. tableName := PREFIX + fmt.Sprintf("%03d", number%100)
  152. // 查询
  153. //var rs entity.Deduplication
  154. var tmpList []string
  155. var valueList []interface{}
  156. var selectSql string
  157. valueList = append(valueList, data.AccountId)
  158. valueList = append(valueList, data.DataDesc)
  159. for _, v := range strings.Split(data.InfoId, ",") {
  160. if strings.TrimSpace(v)!=""{
  161. tmpList = append(tmpList, "?")
  162. valueList = append(valueList, v)
  163. }
  164. }
  165. var rs []*entity.Deduplication
  166. selectSql = fmt.Sprintf("account_id=? and data_desc=? and info_id in (%s)", strings.Join(tmpList, ","))
  167. infoIdList := strings.Split(data.InfoId, ",")
  168. totalInfoCount := len(infoIdList)
  169. err := orm.Table(tableName).Cols("info_id").Where(selectSql, valueList...).Find(&rs)
  170. if err != nil {
  171. log.Println(err, "判重查询失败")
  172. return &deduplication.Info{
  173. TotalCount: 0,
  174. ExistCount: 0,
  175. NewCount: 0,
  176. IsInsert: false,
  177. }, "判重查询失败"
  178. }
  179. existInfoIdMap := map[string]bool{}
  180. existIdList := []string{}
  181. for _, v := range rs {
  182. if existInfoIdMap[v.InfoId] {
  183. continue
  184. }else {
  185. existIdList = append(existIdList,v.InfoId)
  186. }
  187. }
  188. count := int64(len(existIdList))
  189. return &deduplication.Info{
  190. TotalCount: int64(totalInfoCount),
  191. ExistCount: count,
  192. NewCount: int64(totalInfoCount) - count,
  193. IsInsert: false,
  194. }, ""
  195. }
  196. // 根据账户id进行判重并存入数据
  197. func (service *DeduplicationService) DataDeduplicateAndSave(data *deduplication.ByAccountRequest) (*deduplication.Info, string) {
  198. log.Println("开始=====")
  199. orm := Engine.NewSession()
  200. defer orm.Close()
  201. // 模运算取企业id 企业id是int 类型的直接对100取模
  202. // objectid类型的哈希后取模 这里使用md5后取后两位数字转10进制 然后对100取模
  203. number, errConv := strconv.Atoi(data.AccountId)
  204. log.Println(55555,number,errConv)
  205. if errConv!=nil{
  206. log.Println("不是int类型的,hash后再取模寻表")
  207. b := a(data.AccountId)
  208. ss:=b[len(b)-2:]
  209. bt,_:=strconv.ParseInt(ss,16,64)
  210. number =int(bt)
  211. log.Println(bt)
  212. }
  213. tableName := PREFIX + fmt.Sprintf("%03d", number%100)
  214. // 查询
  215. var rs []*entity.Deduplication
  216. var tmpList []string
  217. var valueList []interface{}
  218. var selectSql string
  219. valueList = append(valueList, data.AccountId)
  220. valueList = append(valueList, data.DataDesc)
  221. for _, v := range strings.Split(data.InfoId, ",") {
  222. if strings.TrimSpace(v)!=""{
  223. tmpList = append(tmpList, "?")
  224. valueList = append(valueList, v)
  225. }
  226. }
  227. selectSql = fmt.Sprintf("account_id=? and data_desc=? and info_id in (%s)", strings.Join(tmpList, ","))
  228. infoIdList := strings.Split(data.InfoId, ",")
  229. totalInfoCount := len(infoIdList)
  230. err := orm.Table(tableName).Cols("info_id").Where(selectSql, valueList...).Find(&rs)
  231. existInfoIdMap := map[string]bool{}
  232. existIdList := []string{}
  233. for _, v := range rs {
  234. if existInfoIdMap[v.InfoId] {
  235. continue
  236. }else {
  237. existIdList = append(existIdList,v.InfoId)
  238. }
  239. }
  240. count := len(existIdList)
  241. log.Println(count, "已存在数据量")
  242. if err != nil {
  243. log.Println(err, "判重查询失败")
  244. return &deduplication.Info{
  245. TotalCount: 0,
  246. ExistCount: 0,
  247. NewCount: 0,
  248. }, "判重查询失败"
  249. }
  250. existIdMap := map[string]bool{}
  251. for _, v := range rs {
  252. existIdMap[v.InfoId] = true
  253. }
  254. // 新增
  255. var insertList []entity.Deduplication
  256. for _, id := range infoIdList {
  257. if existIdMap[id] {
  258. log.Println("id已存在", id)
  259. continue
  260. }
  261. log.Println("新增", id)
  262. temData := entity.Deduplication{
  263. InfoId: id,
  264. EntId: "0",
  265. PersonId: data.PersonId,
  266. AccountId: data.AccountId,
  267. DataDesc: data.DataDesc,
  268. }
  269. insertList = append(insertList, temData)
  270. }
  271. go SaveMysql(tableName,insertList)
  272. return &deduplication.Info{
  273. TotalCount: int64(totalInfoCount),
  274. ExistCount: int64(count),
  275. NewCount: int64(totalInfoCount - count),
  276. }, ""
  277. }
  278. func (service *DeduplicationService) EntCount(data *deduplication.GetEntCountRequest) (int64, string) {
  279. log.Println("开始=====")
  280. orm := Engine.NewSession()
  281. defer orm.Close()
  282. // 模运算取企业id
  283. number, _ := strconv.Atoi(data.EntId)
  284. tableName := PREFIX + fmt.Sprintf("%03d", number%100)
  285. // 查询
  286. var rs entity.Deduplication
  287. count, err := orm.Table(tableName).Where("ent_id=?", data.EntId).Count(rs)
  288. if err != nil {
  289. return 0, "查询失败"
  290. }
  291. log.Println("count=====", count)
  292. return count, ""
  293. }
  294. func a(data string) string {
  295. t := md5.New()
  296. io.WriteString(t, data)
  297. return fmt.Sprintf("%x", t.Sum(nil))
  298. }
  299. func SaveMysql(tableName string,saveList []entity.Deduplication) {
  300. //
  301. log.Println("保存数据开始")
  302. orm := Engine.NewSession()
  303. defer orm.Close()
  304. orm.Begin()
  305. insertList := []entity.Deduplication{}
  306. for _, saveData := range saveList {
  307. insertList = append(insertList, saveData)
  308. if len(insertList) > 500 {
  309. _, err3 := orm.Table(tableName).Insert(insertList)
  310. insertList = []entity.Deduplication{}
  311. if err3 != nil {
  312. orm.Rollback()
  313. log.Println(err3, "新增数据失败")
  314. }
  315. }
  316. }
  317. if len(insertList) > 0 {
  318. _, err3 := orm.Table(tableName).Insert(insertList)
  319. if err3 != nil {
  320. orm.Rollback()
  321. log.Println(err3, "新增数据失败")
  322. }
  323. }
  324. err2 := orm.Commit()
  325. log.Println("err2", err2)
  326. log.Println("保存数据结束")
  327. }