main.go 24 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/robfig/cron/v3"
  7. "go.uber.org/zap"
  8. "gorm.io/gorm"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. Mgo *mongodb.MongodbSim
  17. MgoP *mongodb.MongodbSim
  18. MgoSpider *mongodb.MongodbSim
  19. AnalysisDB *gorm.DB
  20. JianyuDB *gorm.DB
  21. JianyuSubjectDB *gorm.DB
  22. ClickhouseDB *gorm.DB
  23. endTime string //取数据,时间范围的截止时间;如果为空,就取当前时间
  24. accountOrderPool = make(chan DwdFAccountOrder, 5000)
  25. accountOrderChangePool = make(chan DwdFAccountOrderChange, 5000)
  26. accountReturnPool = make(chan DwdFAccountReturn, 5000)
  27. accountReturnChangePool = make(chan DwdFAccountReturnChange, 5000)
  28. )
  29. func main() {
  30. local, _ := time.LoadLocation("Asia/Shanghai")
  31. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  32. eid, err := c.AddFunc(GF.Cron.Spec, dealData) //tidb 数据库
  33. //eid, err := c.AddFunc(GF.Cron.Spec, dealDataForClickhouse) //Clickhouse数据库
  34. if err != nil {
  35. log.Info("main", zap.Any("AddFunc err", err))
  36. }
  37. log.Info("main", zap.Any("eid", eid))
  38. c.Start()
  39. defer c.Stop()
  40. select {}
  41. }
  42. // dealData 处理订单相关数据
  43. func dealData() {
  44. //1.清空数据表
  45. endTime = time.Now().Format("2006-01-02 15:04:05")
  46. if GF.Cron.Url != "" {
  47. // 请求头
  48. headers := map[string]string{
  49. "Content-Type": "application/json",
  50. }
  51. // 请求体
  52. body := []byte(`{
  53. "msgtype": "text",
  54. "text": {
  55. "content": "开始清零订单回款数据",
  56. "mentioned_list":["@all"]
  57. }
  58. }`)
  59. _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
  60. if err != nil {
  61. log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
  62. }
  63. }
  64. truncateTables() //清空数据表
  65. log.Info("配置取值时间范围为空", zap.String("默认取值截止时间为:", endTime))
  66. //重新打开通道
  67. accountOrderPool = make(chan DwdFAccountOrder, 5000)
  68. accountOrderChangePool = make(chan DwdFAccountOrderChange, 5000)
  69. accountReturnPool = make(chan DwdFAccountReturn, 5000)
  70. accountReturnChangePool = make(chan DwdFAccountReturnChange, 5000)
  71. go saveData() //保存订单数据
  72. //go saveAccountOrder() //保存订单数据
  73. go dealAllDataAccountOrder2()
  74. //dealAllDataAccountOrder() //1.处理归集后-存量剑鱼订单表-dwd_f_account_order
  75. go dealAllDataAccountOrderChangeRecord() //2.处理归集后-存量业绩表更表-dwd_f_account_order_change
  76. go dealAllDataAccountReturn() //3.处理归集后-存量剑鱼回款表-dwd_f_account_return
  77. go dealAllDataAccountReturnChange() //4.处理归集后-剑鱼回款变更表-dwd_f_account_return_change
  78. }
  79. func dealDataForClickhouse() {
  80. //1.清空数据表
  81. endTime = time.Now().Format("2006-01-02 15:04:05")
  82. if GF.Cron.Url != "" {
  83. // 请求头
  84. headers := map[string]string{
  85. "Content-Type": "application/json",
  86. }
  87. // 请求体
  88. body := []byte(`{
  89. "msgtype": "text",
  90. "text": {
  91. "content": "开始清空 Clickhouse 订单回款相关数据",
  92. "mentioned_list":["@all"]
  93. }
  94. }`)
  95. _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
  96. if err != nil {
  97. log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
  98. }
  99. }
  100. //clickhouse 数据清空
  101. truncateClickhouse()
  102. log.Info("配置取值时间范围为空", zap.String("默认取值截止时间为:", endTime))
  103. //重新打开通道
  104. accountOrderPool = make(chan DwdFAccountOrder, 5000)
  105. accountOrderChangePool = make(chan DwdFAccountOrderChange, 5000)
  106. accountReturnPool = make(chan DwdFAccountReturn, 5000)
  107. accountReturnChangePool = make(chan DwdFAccountReturnChange, 5000)
  108. go saveDataClickhouse() //保存订单数据
  109. go dealAllDataAccountOrder2()
  110. go dealAllDataAccountOrderChangeRecord() //2.处理归集后-存量业绩表更表-dwd_f_account_order_change
  111. go dealAllDataAccountReturn() //3.处理归集后-存量剑鱼回款表-dwd_f_account_return
  112. go dealAllDataAccountReturnChange() //4.处理归集后-剑鱼回款变更表-dwd_f_account_return_change
  113. }
  114. // dealAllDataAccountOrder2 处理归集后-存量剑鱼订单表-dwd_f_account_order
  115. func dealAllDataAccountOrder2() {
  116. defer func() {
  117. if r := recover(); r != nil {
  118. log.Info("dealAllDataAccountOrder2 Panic recovered", zap.Any("reason", r))
  119. }
  120. log.Info("dealAllDataAccountOrder2 over")
  121. }()
  122. now := time.Now()
  123. var total int64
  124. AnalysisDB.Model(&DataexportOrder{}).Where("autoUpdate < ?", endTime).Count(&total)
  125. log.Info("dealAllDataAccountOrder", zap.Any("总数是", total))
  126. rowsPerPage := 100 // 每页的数量
  127. totalPages := (int(total) / rowsPerPage) + 1 //总页数
  128. // 控制并发数的 channel,限制最大并发数量为 10
  129. concurrencyLimit := 5
  130. sem := make(chan struct{}, concurrencyLimit)
  131. var wg sync.WaitGroup // 等待所有协程完成
  132. // 启动多协程处理
  133. for i := 0; i < totalPages; i++ {
  134. wg.Add(1) // 增加一个 WaitGroup 计数
  135. go func(page int) {
  136. defer wg.Done() // 完成时减去一个计数
  137. // 获取并发信号,限制同时处理的协程数
  138. sem <- struct{}{} // 向 channel 中发送一个空结构体,占用一个槽位
  139. defer func() { <-sem }() // 在协程结束时从 channel 中取出,释放一个槽位
  140. offset := page * rowsPerPage
  141. rows, err := AnalysisDB.Model(&DataexportOrder{}).Where("autoUpdate < ?", endTime).
  142. Order("id desc").Offset(offset).Limit(rowsPerPage).Rows()
  143. if err != nil {
  144. log.Info("dealAllDataAccountOrder, Rows Error", zap.Error(err))
  145. return
  146. }
  147. defer func() {
  148. err = rows.Close()
  149. if err != nil {
  150. log.Info("Err rows.Close", zap.Error(err))
  151. }
  152. }()
  153. log.Info("dealAllDataAccountOrder ", zap.Int("current page", page))
  154. // 处理每一行数据
  155. for rows.Next() {
  156. var dataExOrder DataexportOrder //原来的订单表
  157. var accountOrder DwdFAccountOrder //归集后的订单表
  158. var user1 DwdFUserbaseBaseinfo //个人用户表
  159. var returnRecord ReturnMoneyRecord //原来的回款表
  160. var returnRecords []ReturnMoneyRecord //原来的回款表
  161. var contract Contract //合同表
  162. // ScanRows 方法用于将一行记录扫描至结构体
  163. err = AnalysisDB.ScanRows(rows, &dataExOrder)
  164. if err != nil {
  165. log.Info("dealAllDataAccountOrder,ScanRows err ", zap.Error(err))
  166. }
  167. filter := dataExOrder.Filter
  168. filterMap := make(map[string]interface{})
  169. if filter != "" {
  170. err = json.Unmarshal([]byte(filter), &filterMap)
  171. if err != nil {
  172. log.Info("dealAllDataAccountOrder, filter.json.Unmarshal", zap.Error(err))
  173. }
  174. }
  175. //订单编号不为空
  176. if dataExOrder.OrderCode != "" {
  177. // 业务逻辑...
  178. accountOrder.OrderCode = dataExOrder.OrderCode
  179. var osr OrderSaleRecord //原来的业绩变更表
  180. err = JianyuDB.Order("id asc").Where("ordercode = ? ", dataExOrder.OrderCode).First(&osr).Error
  181. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  182. log.Info("dealAllDataAccountOrder", zap.Error(err))
  183. }
  184. //有业绩表更表记录
  185. if osr.ID > 0 {
  186. if osr.SalerName == "-" {
  187. accountOrder.SalerName = "运营"
  188. } else {
  189. accountOrder.SalerName = osr.SalerName
  190. }
  191. accountOrder.SalerDept = osr.SalerDept
  192. }
  193. accountOrder.CompanyName = dataExOrder.CompanyName
  194. if len(dataExOrder.UserID) > 20 { //个人身份
  195. err = JianyuSubjectDB.Where("userid = ? ", dataExOrder.UserID).First(&user1).Error
  196. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  197. log.Info("dealAllDataAccountOrder", zap.Error(err))
  198. }
  199. accountOrder.UserRegtime = user1.LRegistedate
  200. } else {
  201. //企业身份
  202. rs := new(struct {
  203. PositionId int `gorm:"column:position_id"`
  204. LRegistedate *time.Time `gorm:"column:l_registedate"`
  205. })
  206. if dataExOrder.UserID != "" {
  207. sql := fmt.Sprintf(`SELECT im.position_id,ub.l_registedate FROM
  208. (SELECT userid,position_id FROM Jianyu_subjectdb.dwd_f_userbase_id_mapping WHERE position_id = %s) im
  209. LEFT JOIN Jianyu_subjectdb.dwd_f_userbase_baseinfo ub
  210. ON im.userid=ub.userid`, dataExOrder.UserID)
  211. err = JianyuSubjectDB.Raw(sql).Scan(&rs).Error
  212. if err != nil {
  213. log.Info("处理用户订单表数据", zap.String("查询企业用户注册时间失败", dataExOrder.UserID))
  214. }
  215. accountOrder.UserRegtime = rs.LRegistedate
  216. }
  217. }
  218. accountOrder.CreateTime = dataExOrder.CreateTime
  219. if dataExOrder.IsBackstageOrder == 0 { //线上订单
  220. accountOrder.ReturnTime = dataExOrder.PayTime
  221. accountOrder.TotalReceived = util.IntAll(filterMap["originalAmount"]) //累计已收
  222. } else if dataExOrder.IsBackstageOrder == 1 {
  223. err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Where("state = 1").Order("return_time asc").First(&returnRecord).Error
  224. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  225. log.Info("dealAllDataAccountOrder", zap.Error(err))
  226. }
  227. accountOrder.ReturnTime = returnRecord.ReturnTime
  228. err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Find(&returnRecords).Error
  229. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  230. log.Info("dealAllDataAccountOrder", zap.Error(err))
  231. }
  232. totalMoney := 0
  233. if len(returnRecords) > 0 {
  234. for _, v := range returnRecords {
  235. totalMoney += v.ReturnMoney
  236. }
  237. }
  238. accountOrder.TotalReceived = totalMoney
  239. }
  240. accountOrder.SaleTime = dataExOrder.SaleTime
  241. accountOrder.OriginalPrice = dataExOrder.OriginalPrice
  242. //合同金额
  243. if util.IntAll(filterMap["originalAmount"]) != 0 {
  244. accountOrder.ContractMoney = util.IntAll(filterMap["originalAmount"])
  245. } else {
  246. if dataExOrder.IsBackstageOrder == 0 { //线上订单
  247. accountOrder.ContractMoney = dataExOrder.OrderMoney
  248. } else {
  249. accountOrder.ContractMoney = dataExOrder.PayMoney
  250. }
  251. }
  252. accountOrder.Commission = dataExOrder.Commission
  253. accountOrder.ProceduresMoney = dataExOrder.ProceduresMoney
  254. accountOrder.ReceivableAmount = accountOrder.ContractMoney - accountOrder.Commission - accountOrder.ProceduresMoney
  255. accountOrder.ProductType = dataExOrder.ProductType
  256. if dataExOrder.ProductType == "大会员" || dataExOrder.ProductType == "大会员-子账号" || dataExOrder.ProductType == "大会员-补充包" {
  257. switch util.IntAll(filterMap["level"]) {
  258. case 1:
  259. accountOrder.DataSpec = "专家版"
  260. case 2:
  261. accountOrder.DataSpec = "智慧版"
  262. case 3:
  263. accountOrder.DataSpec = "商机版"
  264. case 4:
  265. accountOrder.DataSpec = "试用版"
  266. case 5:
  267. accountOrder.DataSpec = "定制版"
  268. case 6:
  269. accountOrder.DataSpec = "商机版2.0"
  270. case 7:
  271. accountOrder.DataSpec = "专家版2.0"
  272. }
  273. //付费类型 VipType
  274. switch util.IntAll(filterMap["recordPayType"]) {
  275. case 1:
  276. accountOrder.VipType = "购买"
  277. case 2:
  278. accountOrder.VipType = "续费"
  279. case 3:
  280. accountOrder.VipType = "升级"
  281. case 4:
  282. accountOrder.VipType = "试用"
  283. }
  284. } else {
  285. accountOrder.DataSpec = dataExOrder.DataSpec
  286. ////付费类型 VipType
  287. switch dataExOrder.VipType {
  288. case 0:
  289. accountOrder.VipType = "购买"
  290. case 1:
  291. accountOrder.VipType = "续费"
  292. case 2:
  293. accountOrder.VipType = "升级"
  294. case 3:
  295. accountOrder.VipType = "试用"
  296. }
  297. }
  298. //OrderStatus 订单状态
  299. if dataExOrder.RefundStatus == 1 || dataExOrder.RefundStatus == 2 {
  300. accountOrder.OrderStatus = "已完成"
  301. } else {
  302. switch dataExOrder.OrderStatus {
  303. case 1:
  304. accountOrder.OrderStatus = "已完成"
  305. case -1:
  306. accountOrder.OrderStatus = "逻辑删除"
  307. case -2:
  308. accountOrder.OrderStatus = "已取消"
  309. case -3:
  310. accountOrder.OrderStatus = "已取消"
  311. case 0:
  312. accountOrder.OrderStatus = "未完成"
  313. }
  314. }
  315. //回款状态
  316. if dataExOrder.IsBackstageOrder == 0 {
  317. if accountOrder.OrderStatus == "已完成" {
  318. accountOrder.ReturnStatus = "全额回款"
  319. } else {
  320. accountOrder.ReturnStatus = "未回款"
  321. }
  322. } else if dataExOrder.IsBackstageOrder == 1 {
  323. // 回款状态
  324. switch dataExOrder.ReturnStatus {
  325. case 1:
  326. accountOrder.ReturnStatus = "全额回款"
  327. case 2:
  328. accountOrder.ReturnStatus = "部分回款"
  329. case 0:
  330. accountOrder.ReturnStatus = "未回款"
  331. }
  332. }
  333. //refund_status 退款状态
  334. switch dataExOrder.RefundStatus {
  335. case 0:
  336. accountOrder.RefundStatus = "未退款"
  337. case 1:
  338. accountOrder.RefundStatus = "全额退款"
  339. case 2:
  340. accountOrder.RefundStatus = "部分退款"
  341. }
  342. //
  343. accountOrder.UserPhone = dataExOrder.UserPhone
  344. accountOrder.UserID = dataExOrder.UserID
  345. if dataExOrder.VipStarttime == nil || dataExOrder.VipStarttime.Year() > 1000 {
  346. accountOrder.VipStarttime = dataExOrder.VipStarttime
  347. accountOrder.VipEndtime = dataExOrder.VipEndtime
  348. }
  349. // 合同状态
  350. err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Find(&contract).Error
  351. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  352. log.Info("dealAllDataAccountOrder", zap.Error(err))
  353. }
  354. if contract.ContractStatus == 1 {
  355. accountOrder.ContractStatus = "已签协议"
  356. } else if contract.ContractStatus == 0 {
  357. accountOrder.ContractStatus = "未签协议"
  358. }
  359. accountOrder.ContractCode = contract.ContractCode
  360. accountOrder.ContractTime = contract.ContractTime
  361. if dataExOrder.SigningSubject == "h01" {
  362. accountOrder.SigningSubject = "北京剑鱼信息技术有限公司"
  363. } else if dataExOrder.SigningSubject == "h02" {
  364. accountOrder.SigningSubject = "北京拓普丰联信息科技股份有限公司"
  365. }
  366. accountOrder.OrderChannel = dataExOrder.OrderChannel
  367. accountOrder.DistributionChannel = osr.DistributionChannel
  368. if dataExOrder.IsBackstageOrder == 1 {
  369. accountOrder.IsBackstageOrder = "是"
  370. } else {
  371. accountOrder.IsBackstageOrder = "否"
  372. }
  373. switch dataExOrder.PayWay {
  374. case "ali", "ali_app", "ali_pc": //ali、ali_app、ali_pc 处理为“支付宝”
  375. accountOrder.PayWay = "支付宝"
  376. case "wx", "wx_app", "wx_js", "wx_pc": // wx、wx_app、wx_js、wx_pc 处理为“微信”
  377. accountOrder.PayWay = "微信"
  378. case "transferAccounts":
  379. accountOrder.PayWay = "对公转账"
  380. default:
  381. accountOrder.PayWay = dataExOrder.PayWay
  382. }
  383. //生成订单数据
  384. accountOrderPool <- accountOrder
  385. //err = AnalysisDB.Create(&accountOrder).Error
  386. //if err != nil {
  387. // log.Info("dealAllDataAccountOrder Create ", zap.Error(err))
  388. //}
  389. }
  390. }
  391. }(i)
  392. }
  393. // 等待所有协程处理完成
  394. wg.Wait()
  395. log.Info("dealAllDataAccountOrder 迭代结束", zap.Int64("数据总量", total))
  396. close(accountOrderPool)
  397. log.Info("dealAllDataAccountOrder", zap.Any("处理时长", time.Since(now).Minutes()))
  398. }
  399. // saveData 保存数据
  400. func saveData() {
  401. var wg sync.WaitGroup
  402. // 启动保存每个通道数据的goroutine
  403. wg.Add(4) // 因为有4个通道
  404. //1.DwdFAccountOrder
  405. go func() {
  406. defer wg.Done() // 完成时减少计数器
  407. batchSize := 100 // 设置批量插入大小
  408. var batch []DwdFAccountOrder
  409. for accountOrder := range accountOrderPool {
  410. batch = append(batch, accountOrder)
  411. if len(batch) >= batchSize {
  412. err := AnalysisDB.Model(DwdFAccountOrder{}).CreateInBatches(batch, batchSize).Error
  413. if err != nil {
  414. log.Info("saveData DwdFAccountOrder", zap.Error(err))
  415. }
  416. batch = nil // 清空当前批次
  417. }
  418. }
  419. // 插入剩余的批次
  420. if len(batch) > 0 {
  421. err := AnalysisDB.Model(DwdFAccountOrder{}).CreateInBatches(batch, len(batch)).Error
  422. if err != nil {
  423. log.Info("saveData DwdFAccountOrder", zap.Error(err))
  424. }
  425. }
  426. }()
  427. //2.DwdFAccountOrderChange
  428. go func() {
  429. defer wg.Done()
  430. batchSize := 100 // 设置批量插入大小
  431. var batch []DwdFAccountOrderChange
  432. for accountOrderChange := range accountOrderChangePool {
  433. batch = append(batch, accountOrderChange)
  434. if len(batch) >= batchSize {
  435. err := AnalysisDB.Model(DwdFAccountOrderChange{}).CreateInBatches(batch, batchSize).Error
  436. if err != nil {
  437. log.Info("saveData DwdFAccountOrderChange", zap.Error(err))
  438. }
  439. batch = nil // 清空当前批次
  440. }
  441. }
  442. // 插入剩余的批次
  443. if len(batch) > 0 {
  444. err := AnalysisDB.Model(DwdFAccountOrderChange{}).CreateInBatches(batch, len(batch)).Error
  445. if err != nil {
  446. log.Info("saveData DwdFAccountOrderChange", zap.Error(err))
  447. }
  448. }
  449. }()
  450. //3.DwdFAccountReturn
  451. go func() {
  452. defer wg.Done()
  453. batchSize := 100 // 设置批量插入大小
  454. var batch []DwdFAccountReturn
  455. for accountReturn := range accountReturnPool {
  456. batch = append(batch, accountReturn)
  457. if len(batch) >= batchSize {
  458. err := AnalysisDB.Model(DwdFAccountReturn{}).CreateInBatches(batch, batchSize).Error
  459. if err != nil {
  460. log.Info("saveData DwdFAccountReturn", zap.Error(err))
  461. }
  462. batch = nil // 清空当前批次
  463. }
  464. }
  465. // 插入剩余的批次
  466. if len(batch) > 0 {
  467. err := AnalysisDB.Model(DwdFAccountReturn{}).CreateInBatches(batch, len(batch)).Error
  468. if err != nil {
  469. log.Info("saveData DwdFAccountReturn", zap.Error(err))
  470. }
  471. }
  472. }()
  473. //4.DwdFAccountReturnChange
  474. go func() {
  475. defer wg.Done()
  476. batchSize := 100 // 设置批量插入大小
  477. var batch []DwdFAccountReturnChange
  478. for accountReturnChange := range accountReturnChangePool {
  479. batch = append(batch, accountReturnChange)
  480. if len(batch) >= batchSize {
  481. err := AnalysisDB.Model(DwdFAccountReturnChange{}).CreateInBatches(batch, batchSize).Error
  482. if err != nil {
  483. log.Info("saveData DwdFAccountReturnChange", zap.Error(err))
  484. }
  485. batch = nil // 清空当前批次
  486. }
  487. }
  488. // 插入剩余的批次
  489. if len(batch) > 0 {
  490. err := AnalysisDB.Model(DwdFAccountReturnChange{}).CreateInBatches(batch, len(batch)).Error
  491. if err != nil {
  492. log.Info("saveData DwdFAccountReturnChange", zap.Error(err))
  493. }
  494. }
  495. }()
  496. // 等待所有goroutine完成
  497. wg.Wait()
  498. // 所有通道的数据都处理完毕后打印日志
  499. log.Info("All account data has been processed and saved successfully.")
  500. if GF.Cron.Url != "" {
  501. // 请求头
  502. headers := map[string]string{
  503. "Content-Type": "application/json",
  504. }
  505. // 请求体
  506. body := []byte(`{
  507. "msgtype": "text",
  508. "text": {
  509. "content": "订单回款数据处理完毕",
  510. "mentioned_list":["@all"]
  511. }
  512. }`)
  513. _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
  514. if err != nil {
  515. log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
  516. }
  517. }
  518. }
  519. // saveDataClickhouse 保存数据,Clickhouse
  520. func saveDataClickhouse() {
  521. var wg sync.WaitGroup
  522. //// 启动保存每个通道数据的goroutine
  523. wg.Add(4) // 因为有4个通道
  524. //1.DwdFAccountOrder
  525. go func() {
  526. defer wg.Done()
  527. batchSize := 100 // 设置批量插入大小
  528. var batch []DwdFAccountOrder
  529. for accountOrder := range accountOrderPool {
  530. accountOrder.SaleTime = validateAndFormatDate(accountOrder.SaleTime)
  531. accountOrder.ReturnTime = validateAndFormatDate(accountOrder.ReturnTime)
  532. accountOrder.RefundTime = validateAndFormatDate(accountOrder.RefundTime)
  533. batch = append(batch, accountOrder)
  534. if len(batch) >= batchSize {
  535. err := ClickhouseDB.Model(DwdFOrder{}).CreateInBatches(batch, batchSize).Error
  536. if err != nil {
  537. log.Info("saveDataClickhouse DwdFOrder", zap.Error(err))
  538. }
  539. batch = nil // 清空当前批次
  540. }
  541. }
  542. // 插入剩余的批次
  543. if len(batch) > 0 {
  544. err := ClickhouseDB.Model(DwdFOrder{}).CreateInBatches(batch, len(batch)).Error
  545. if err != nil {
  546. log.Info("saveDataClickhouse DwdFOrder", zap.Error(err))
  547. }
  548. }
  549. }()
  550. //
  551. //2.DwdFAccountOrderChange
  552. go func() {
  553. defer wg.Done()
  554. batchSize := 100 // 设置批量插入大小
  555. var batch []DwdFAccountOrderChange
  556. for accountOrderChange := range accountOrderChangePool {
  557. accountOrderChange.SaleTime = validateAndFormatDate(accountOrderChange.SaleTime)
  558. accountOrderChange.ContractTime = validateAndFormatDate(accountOrderChange.ContractTime)
  559. batch = append(batch, accountOrderChange)
  560. if len(batch) >= batchSize {
  561. err := ClickhouseDB.Model(DwdFOrderChange{}).CreateInBatches(batch, batchSize).Error
  562. if err != nil {
  563. log.Info("saveDataClickhouse DwdFOrderChange", zap.Error(err))
  564. }
  565. batch = nil // 清空当前批次
  566. }
  567. }
  568. // 插入剩余的批次
  569. if len(batch) > 0 {
  570. err := ClickhouseDB.Model(DwdFOrderChange{}).CreateInBatches(batch, len(batch)).Error
  571. if err != nil {
  572. log.Info("saveDataClickhouse DwdFOrderChange", zap.Error(err))
  573. }
  574. }
  575. }()
  576. //3.DwdFAccountReturn
  577. go func() {
  578. defer wg.Done()
  579. batchSize := 100 // 设置批量插入大小
  580. var batch []DwdFAccountReturn
  581. for accountReturn := range accountReturnPool {
  582. accountReturn.SaleTime = validateAndFormatDate(accountReturn.SaleTime)
  583. accountReturn.ReturnTime = validateAndFormatDate(accountReturn.ReturnTime)
  584. batch = append(batch, accountReturn)
  585. if len(batch) >= batchSize {
  586. err := ClickhouseDB.Model(DwdFReturn{}).CreateInBatches(batch, batchSize).Error
  587. if err != nil {
  588. log.Info("saveDataClickhouse DwdFReturn", zap.Error(err))
  589. }
  590. batch = nil // 清空当前批次
  591. }
  592. }
  593. // 插入剩余的批次
  594. if len(batch) > 0 {
  595. err := ClickhouseDB.Model(DwdFReturn{}).CreateInBatches(batch, len(batch)).Error
  596. if err != nil {
  597. log.Info("saveDataClickhouse DwdFReturn", zap.Error(err))
  598. }
  599. }
  600. }()
  601. //4.DwdFAccountReturnChange
  602. go func() {
  603. defer wg.Done()
  604. batchSize := 100 // 设置批量插入大小
  605. var batch []DwdFAccountReturnChange
  606. for accountReturnChange := range accountReturnChangePool {
  607. accountReturnChange.SaleTime = validateAndFormatDate(accountReturnChange.SaleTime)
  608. accountReturnChange.ReturnTime = validateAndFormatDate(accountReturnChange.ReturnTime)
  609. batch = append(batch, accountReturnChange)
  610. if len(batch) >= batchSize {
  611. err := ClickhouseDB.Model(DwdFReturnChange{}).CreateInBatches(batch, batchSize).Error
  612. if err != nil {
  613. log.Info("saveDataClickhouse DwdFReturnChange", zap.Error(err))
  614. }
  615. batch = nil // 清空当前批次
  616. }
  617. }
  618. // 插入剩余的批次
  619. if len(batch) > 0 {
  620. err := ClickhouseDB.Model(DwdFReturnChange{}).CreateInBatches(batch, len(batch)).Error
  621. if err != nil {
  622. log.Info("saveDataClickhouse DwdFReturnChange", zap.Error(err))
  623. }
  624. }
  625. }()
  626. // 等待所有goroutine完成
  627. wg.Wait()
  628. // 所有通道的数据都处理完毕后打印日志
  629. log.Info("All clickhouse data has been processed and saved successfully.")
  630. if GF.Cron.Url != "" {
  631. // 请求头
  632. headers := map[string]string{
  633. "Content-Type": "application/json",
  634. }
  635. // 请求体
  636. body := []byte(`{
  637. "msgtype": "text",
  638. "text": {
  639. "content": "Clickhouse 订单回款数据处理完毕",
  640. "mentioned_list":["@all"]
  641. }
  642. }`)
  643. _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
  644. if err != nil {
  645. log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
  646. }
  647. }
  648. }
  649. // truncateTables 清空订单相关数据表
  650. func truncateTables() {
  651. var (
  652. account_order DwdFAccountOrder
  653. account_order_change DwdFAccountOrderChange
  654. account_return DwdFAccountReturn
  655. account_return_change DwdFAccountReturnChange
  656. )
  657. table1 := account_order.TableName()
  658. table2 := account_order_change.TableName()
  659. table3 := account_return.TableName()
  660. table4 := account_return_change.TableName()
  661. // 清空表 1
  662. db, err := AnalysisDB.DB()
  663. if err != nil {
  664. panic("获取数据库连接对象失败:" + err.Error())
  665. }
  666. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table1))
  667. if err != nil {
  668. log.Info("清空失败", zap.String("数据表", table1))
  669. }
  670. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table2))
  671. if err != nil {
  672. log.Info("清空失败", zap.String("数据表", table2))
  673. }
  674. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table3))
  675. if err != nil {
  676. log.Info("清空失败", zap.String("数据表", table3))
  677. }
  678. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table4))
  679. if err != nil {
  680. log.Info("清空失败", zap.String("数据表", table4))
  681. }
  682. log.Info("所有数据表清空完毕", zap.String("数据表是:", fmt.Sprintf("%s,%s,%s,%s", table1, table2, table3, table4)))
  683. }