main.go 24 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/robfig/cron/v3"
  7. "go.uber.org/zap"
  8. "gorm.io/gorm"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. Mgo *mongodb.MongodbSim
  17. MgoP *mongodb.MongodbSim
  18. MgoSpider *mongodb.MongodbSim
  19. AnalysisDB *gorm.DB
  20. JianyuDB *gorm.DB
  21. JianyuSubjectDB *gorm.DB
  22. ClickhouseDB *gorm.DB
  23. endTime string //取数据,时间范围的截止时间;如果为空,就取当前时间
  24. accountOrderPool = make(chan DwdFAccountOrder, 5000)
  25. accountOrderChangePool = make(chan DwdFAccountOrderChange, 5000)
  26. accountReturnPool = make(chan DwdFAccountReturn, 5000)
  27. accountReturnChangePool = make(chan DwdFAccountReturnChange, 5000)
  28. )
  29. func main() {
  30. local, _ := time.LoadLocation("Asia/Shanghai")
  31. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  32. //eid, err := c.AddFunc(GF.Cron.Spec, dealData) //tidb 数据库
  33. eid, err := c.AddFunc(GF.Cron.Spec, dealDataForClickhouse) //Clickhouse数据库
  34. if err != nil {
  35. log.Info("main", zap.Any("AddFunc err", err))
  36. }
  37. log.Info("main", zap.Any("eid", eid))
  38. c.Start()
  39. defer c.Stop()
  40. select {}
  41. }
  42. // dealData 处理订单相关数据
  43. func dealData() {
  44. //1.清空数据表
  45. endTime = time.Now().Format("2006-01-02 15:04:05")
  46. if GF.Cron.Url != "" {
  47. // 请求头
  48. headers := map[string]string{
  49. "Content-Type": "application/json",
  50. }
  51. // 请求体
  52. body := []byte(`{
  53. "msgtype": "text",
  54. "text": {
  55. "content": "开始清零订单回款数据",
  56. "mentioned_list":["@all"]
  57. }
  58. }`)
  59. _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
  60. if err != nil {
  61. log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
  62. }
  63. }
  64. truncateTables() //清空数据表
  65. log.Info("配置取值时间范围为空", zap.String("默认取值截止时间为:", endTime))
  66. //重新打开通道
  67. accountOrderPool = make(chan DwdFAccountOrder, 5000)
  68. accountOrderChangePool = make(chan DwdFAccountOrderChange, 5000)
  69. accountReturnPool = make(chan DwdFAccountReturn, 5000)
  70. accountReturnChangePool = make(chan DwdFAccountReturnChange, 5000)
  71. go saveData() //保存订单数据
  72. //go saveAccountOrder() //保存订单数据
  73. go dealAllDataAccountOrder2()
  74. //dealAllDataAccountOrder() //1.处理归集后-存量剑鱼订单表-dwd_f_account_order
  75. go dealAllDataAccountOrderChangeRecord() //2.处理归集后-存量业绩表更表-dwd_f_account_order_change
  76. go dealAllDataAccountReturn() //3.处理归集后-存量剑鱼回款表-dwd_f_account_return
  77. go dealAllDataAccountReturnChange() //4.处理归集后-剑鱼回款变更表-dwd_f_account_return_change
  78. }
  79. func dealDataForClickhouse() {
  80. //1.清空数据表
  81. endTime = time.Now().Format("2006-01-02 15:04:05")
  82. if GF.Cron.Url != "" {
  83. // 请求头
  84. headers := map[string]string{
  85. "Content-Type": "application/json",
  86. }
  87. // 请求体
  88. body := []byte(`{
  89. "msgtype": "text",
  90. "text": {
  91. "content": "开始清空 Clickhouse 订单回款相关数据",
  92. "mentioned_list":["@all"]
  93. }
  94. }`)
  95. _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
  96. if err != nil {
  97. log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
  98. }
  99. }
  100. //clickhouse 数据清空
  101. truncateClickhouse()
  102. log.Info("配置取值时间范围为空", zap.String("默认取值截止时间为:", endTime))
  103. //重新打开通道
  104. accountOrderPool = make(chan DwdFAccountOrder, 5000)
  105. accountOrderChangePool = make(chan DwdFAccountOrderChange, 5000)
  106. accountReturnPool = make(chan DwdFAccountReturn, 5000)
  107. accountReturnChangePool = make(chan DwdFAccountReturnChange, 5000)
  108. go saveDataClickhouse() //保存订单数据
  109. go dealAllDataAccountOrder2()
  110. go dealAllDataAccountOrderChangeRecord() //2.处理归集后-存量业绩表更表-dwd_f_account_order_change
  111. go dealAllDataAccountReturn() //3.处理归集后-存量剑鱼回款表-dwd_f_account_return
  112. go dealAllDataAccountReturnChange() //4.处理归集后-剑鱼回款变更表-dwd_f_account_return_change
  113. }
  114. // dealAllDataAccountOrder2 处理归集后-存量剑鱼订单表-dwd_f_account_order
  115. func dealAllDataAccountOrder2() {
  116. defer func() {
  117. if r := recover(); r != nil {
  118. log.Info("dealAllDataAccountOrder2 Panic recovered", zap.Any("reason", r))
  119. }
  120. log.Info("dealAllDataAccountOrder2 over")
  121. }()
  122. now := time.Now()
  123. var total int64
  124. AnalysisDB.Debug().Model(&DataexportOrder{}).Where("autoUpdate < ?", endTime).Count(&total)
  125. log.Info("dealAllDataAccountOrder", zap.Any("总数是", total))
  126. rowsPerPage := 100 // 每页的数量
  127. totalPages := (int(total) / rowsPerPage) + 1 //总页数
  128. // 控制并发数的 channel,限制最大并发数量为 10
  129. concurrencyLimit := 5
  130. sem := make(chan struct{}, concurrencyLimit)
  131. var wg sync.WaitGroup // 等待所有协程完成
  132. // 启动多协程处理
  133. for i := 0; i < totalPages; i++ {
  134. wg.Add(1) // 增加一个 WaitGroup 计数
  135. go func(page int) {
  136. defer wg.Done() // 完成时减去一个计数
  137. // 获取并发信号,限制同时处理的协程数
  138. sem <- struct{}{} // 向 channel 中发送一个空结构体,占用一个槽位
  139. defer func() { <-sem }() // 在协程结束时从 channel 中取出,释放一个槽位
  140. offset := page * rowsPerPage
  141. rows, err := AnalysisDB.Debug().Model(&DataexportOrder{}).Where("autoUpdate < ?", endTime).
  142. Order("id desc").Offset(offset).Limit(rowsPerPage).Rows()
  143. if err != nil {
  144. log.Info("dealAllDataAccountOrder, Rows Error", zap.Error(err))
  145. return
  146. }
  147. defer func() {
  148. err = rows.Close()
  149. if err != nil {
  150. log.Info("Err rows.Close", zap.Error(err))
  151. }
  152. }()
  153. log.Info("dealAllDataAccountOrder ", zap.Int("current page", page))
  154. // 处理每一行数据
  155. for rows.Next() {
  156. var dataExOrder DataexportOrder //原来的订单表
  157. var accountOrder DwdFAccountOrder //归集后的订单表
  158. var user1 DwdFUserbaseBaseinfo //个人用户表
  159. var returnRecord ReturnMoneyRecord //原来的回款表
  160. var returnRecords []ReturnMoneyRecord //原来的回款表
  161. var contract Contract //合同表
  162. // ScanRows 方法用于将一行记录扫描至结构体
  163. err = AnalysisDB.ScanRows(rows, &dataExOrder)
  164. if err != nil {
  165. log.Info("dealAllDataAccountOrder,ScanRows err ", zap.Error(err))
  166. }
  167. filter := dataExOrder.Filter
  168. filterMap := make(map[string]interface{})
  169. err := json.Unmarshal([]byte(filter), &filterMap)
  170. if err != nil {
  171. log.Info("dealAllDataAccountOrder, filter.json.Unmarshal", zap.Error(err))
  172. }
  173. //订单编号不为空
  174. if dataExOrder.OrderCode != "" {
  175. // 业务逻辑...
  176. accountOrder.OrderCode = dataExOrder.OrderCode
  177. var osr OrderSaleRecord //原来的业绩变更表
  178. err = JianyuDB.Order("id asc").Where("ordercode = ? ", dataExOrder.OrderCode).First(&osr).Error
  179. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  180. log.Info("dealAllDataAccountOrder", zap.Error(err))
  181. }
  182. //有业绩表更表记录
  183. if osr.ID > 0 {
  184. if osr.SalerName == "-" {
  185. accountOrder.SalerName = "运营"
  186. } else {
  187. accountOrder.SalerName = osr.SalerName
  188. }
  189. accountOrder.SalerDept = osr.SalerDept
  190. }
  191. accountOrder.CompanyName = dataExOrder.CompanyName
  192. if len(dataExOrder.UserID) > 20 { //个人身份
  193. err = JianyuSubjectDB.Where("userid = ? ", dataExOrder.UserID).First(&user1).Error
  194. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  195. log.Info("dealAllDataAccountOrder", zap.Error(err))
  196. }
  197. accountOrder.UserRegtime = user1.LRegistedate
  198. } else {
  199. //企业身份
  200. rs := new(struct {
  201. PositionId int `gorm:"column:position_id"`
  202. LRegistedate *time.Time `gorm:"column:l_registedate"`
  203. })
  204. if dataExOrder.UserID != "" {
  205. sql := fmt.Sprintf(`SELECT im.position_id,ub.l_registedate FROM
  206. (SELECT userid,position_id FROM Jianyu_subjectdb.dwd_f_userbase_id_mapping WHERE position_id = %s) im
  207. LEFT JOIN Jianyu_subjectdb.dwd_f_userbase_baseinfo ub
  208. ON im.userid=ub.userid`, dataExOrder.UserID)
  209. err = JianyuSubjectDB.Raw(sql).Scan(&rs).Error
  210. if err != nil {
  211. log.Info("处理用户订单表数据", zap.String("查询企业用户注册时间失败", dataExOrder.UserID))
  212. }
  213. accountOrder.UserRegtime = rs.LRegistedate
  214. }
  215. }
  216. accountOrder.CreateTime = dataExOrder.CreateTime
  217. if dataExOrder.IsBackstageOrder == 0 { //线上订单
  218. accountOrder.ReturnTime = dataExOrder.PayTime
  219. accountOrder.TotalReceived = util.IntAll(filterMap["originalAmount"]) //累计已收
  220. } else if dataExOrder.IsBackstageOrder == 1 {
  221. err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Where("state = 1").Order("return_time asc").First(&returnRecord).Error
  222. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  223. log.Info("dealAllDataAccountOrder", zap.Error(err))
  224. }
  225. accountOrder.ReturnTime = returnRecord.ReturnTime
  226. err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Find(&returnRecords).Error
  227. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  228. log.Info("dealAllDataAccountOrder", zap.Error(err))
  229. }
  230. totalMoney := 0
  231. if len(returnRecords) > 0 {
  232. for _, v := range returnRecords {
  233. totalMoney += v.ReturnMoney
  234. }
  235. }
  236. accountOrder.TotalReceived = totalMoney
  237. }
  238. accountOrder.SaleTime = dataExOrder.SaleTime
  239. accountOrder.OriginalPrice = dataExOrder.OriginalPrice
  240. //合同金额
  241. if dataExOrder.IsBackstageOrder == 0 { //线上订单
  242. if dataExOrder.OrderStatus == 1 {
  243. accountOrder.ContractMoney = util.IntAll(filterMap["originalAmount"])
  244. } else {
  245. accountOrder.ContractMoney = dataExOrder.OrderMoney
  246. }
  247. } else { //线下订单
  248. accountOrder.ContractMoney = dataExOrder.PayMoney
  249. }
  250. accountOrder.Commission = dataExOrder.Commission
  251. accountOrder.ProceduresMoney = dataExOrder.ProceduresMoney
  252. accountOrder.ReceivableAmount = accountOrder.ContractMoney - accountOrder.Commission - accountOrder.ProceduresMoney
  253. accountOrder.ProductType = dataExOrder.ProductType
  254. if dataExOrder.ProductType == "大会员" || dataExOrder.ProductType == "大会员-子账号" || dataExOrder.ProductType == "大会员-补充包" {
  255. switch util.IntAll(filterMap["level"]) {
  256. case 1:
  257. accountOrder.DataSpec = "专家版"
  258. case 2:
  259. accountOrder.DataSpec = "智慧版"
  260. case 3:
  261. accountOrder.DataSpec = "商机版"
  262. case 4:
  263. accountOrder.DataSpec = "试用版"
  264. case 5:
  265. accountOrder.DataSpec = "定制版"
  266. case 6:
  267. accountOrder.DataSpec = "商机版2.0"
  268. case 7:
  269. accountOrder.DataSpec = "专家版2.0"
  270. }
  271. //付费类型 VipType
  272. switch util.IntAll(filterMap["recordPayType"]) {
  273. case 1:
  274. accountOrder.VipType = "购买"
  275. case 2:
  276. accountOrder.VipType = "续费"
  277. case 3:
  278. accountOrder.VipType = "升级"
  279. case 4:
  280. accountOrder.VipType = "试用"
  281. }
  282. } else {
  283. accountOrder.DataSpec = dataExOrder.DataSpec
  284. ////付费类型 VipType
  285. switch dataExOrder.VipType {
  286. case 0:
  287. accountOrder.VipType = "购买"
  288. case 1:
  289. accountOrder.VipType = "续费"
  290. case 2:
  291. accountOrder.VipType = "升级"
  292. case 3:
  293. accountOrder.VipType = "试用"
  294. }
  295. }
  296. //OrderStatus 订单状态
  297. if dataExOrder.RefundStatus == 1 || dataExOrder.RefundStatus == 2 {
  298. accountOrder.OrderStatus = "已完成"
  299. } else {
  300. switch dataExOrder.OrderStatus {
  301. case 1:
  302. accountOrder.OrderStatus = "已完成"
  303. case -1:
  304. accountOrder.OrderStatus = "逻辑删除"
  305. case -2:
  306. accountOrder.OrderStatus = "已取消"
  307. case -3:
  308. accountOrder.OrderStatus = "已取消"
  309. case 0:
  310. accountOrder.OrderStatus = "未完成"
  311. }
  312. }
  313. //回款状态
  314. if dataExOrder.IsBackstageOrder == 0 {
  315. if accountOrder.OrderStatus == "已支付" {
  316. accountOrder.ReturnStatus = "全额回款"
  317. } else {
  318. accountOrder.ReturnStatus = "未支付"
  319. }
  320. } else if dataExOrder.IsBackstageOrder == 1 {
  321. switch dataExOrder.ReturnStatus {
  322. case 1:
  323. accountOrder.ReturnStatus = "全额回款"
  324. case 2:
  325. accountOrder.ReturnStatus = "部分回款"
  326. case 0:
  327. accountOrder.ReturnStatus = "未回款"
  328. }
  329. }
  330. //refund_status 回款状态
  331. switch dataExOrder.RefundStatus {
  332. case 0:
  333. accountOrder.RefundStatus = "未退款"
  334. case 1:
  335. accountOrder.RefundStatus = "全额退款"
  336. case 2:
  337. accountOrder.RefundStatus = "部分退款"
  338. }
  339. //
  340. accountOrder.UserPhone = dataExOrder.UserPhone
  341. accountOrder.UserID = dataExOrder.UserID
  342. if dataExOrder.VipStarttime == nil || dataExOrder.VipStarttime.Year() > 1000 {
  343. accountOrder.VipStarttime = dataExOrder.VipStarttime
  344. accountOrder.VipEndtime = dataExOrder.VipEndtime
  345. }
  346. // 合同状态
  347. err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Find(&contract).Error
  348. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  349. log.Info("dealAllDataAccountOrder", zap.Error(err))
  350. }
  351. if contract.ContractStatus == 1 {
  352. accountOrder.ContractStatus = "已签协议"
  353. } else if contract.ContractStatus == 0 {
  354. accountOrder.ContractStatus = "未签协议"
  355. }
  356. accountOrder.ContractCode = contract.ContractCode
  357. accountOrder.ContractTime = contract.ContractTime
  358. if dataExOrder.SigningSubject == "h01" {
  359. accountOrder.SigningSubject = "北京剑鱼信息技术有限公司"
  360. } else if dataExOrder.SigningSubject == "h02" {
  361. accountOrder.SigningSubject = "北京拓普丰联信息科技股份有限公司"
  362. }
  363. accountOrder.OrderChannel = dataExOrder.OrderChannel
  364. accountOrder.DistributionChannel = dataExOrder.DistributionChannel
  365. if dataExOrder.IsBackstageOrder == 1 {
  366. accountOrder.IsBackstageOrder = "是"
  367. } else {
  368. accountOrder.IsBackstageOrder = "否"
  369. }
  370. switch dataExOrder.PayWay {
  371. case "ali", "ali_app", "ali_pc": //ali、ali_app、ali_pc 处理为“支付宝”
  372. accountOrder.PayWay = "支付宝"
  373. case "wx", "wx_app", "wx_js", "wx_pc": // wx、wx_app、wx_js、wx_pc 处理为“微信”
  374. accountOrder.PayWay = "微信"
  375. case "transferAccounts":
  376. accountOrder.PayWay = "对公转账"
  377. default:
  378. accountOrder.PayWay = dataExOrder.PayWay
  379. }
  380. //生成订单数据
  381. accountOrderPool <- accountOrder
  382. //err = AnalysisDB.Create(&accountOrder).Error
  383. //if err != nil {
  384. // log.Info("dealAllDataAccountOrder Create ", zap.Error(err))
  385. //}
  386. }
  387. }
  388. }(i)
  389. }
  390. // 等待所有协程处理完成
  391. wg.Wait()
  392. log.Info("dealAllDataAccountOrder 迭代结束", zap.Int64("数据总量", total))
  393. close(accountOrderPool)
  394. log.Info("dealAllDataAccountOrder", zap.Any("处理时长", time.Since(now).Minutes()))
  395. }
  396. // saveData 保存数据
  397. func saveData() {
  398. var wg sync.WaitGroup
  399. // 启动保存每个通道数据的goroutine
  400. wg.Add(4) // 因为有4个通道
  401. //1.DwdFAccountOrder
  402. go func() {
  403. defer wg.Done() // 完成时减少计数器
  404. batchSize := 100 // 设置批量插入大小
  405. var batch []DwdFAccountOrder
  406. for accountOrder := range accountOrderPool {
  407. batch = append(batch, accountOrder)
  408. if len(batch) >= batchSize {
  409. err := AnalysisDB.Model(DwdFAccountOrder{}).CreateInBatches(batch, batchSize).Error
  410. if err != nil {
  411. log.Info("saveData DwdFAccountOrder", zap.Error(err))
  412. }
  413. batch = nil // 清空当前批次
  414. }
  415. }
  416. // 插入剩余的批次
  417. if len(batch) > 0 {
  418. err := AnalysisDB.Model(DwdFAccountOrder{}).CreateInBatches(batch, len(batch)).Error
  419. if err != nil {
  420. log.Info("saveData DwdFAccountOrder", zap.Error(err))
  421. }
  422. }
  423. }()
  424. //2.DwdFAccountOrderChange
  425. go func() {
  426. defer wg.Done()
  427. batchSize := 100 // 设置批量插入大小
  428. var batch []DwdFAccountOrderChange
  429. for accountOrderChange := range accountOrderChangePool {
  430. batch = append(batch, accountOrderChange)
  431. if len(batch) >= batchSize {
  432. err := AnalysisDB.Model(DwdFAccountOrderChange{}).CreateInBatches(batch, batchSize).Error
  433. if err != nil {
  434. log.Info("saveData DwdFAccountOrderChange", zap.Error(err))
  435. }
  436. batch = nil // 清空当前批次
  437. }
  438. }
  439. // 插入剩余的批次
  440. if len(batch) > 0 {
  441. err := AnalysisDB.Model(DwdFAccountOrderChange{}).CreateInBatches(batch, len(batch)).Error
  442. if err != nil {
  443. log.Info("saveData DwdFAccountOrderChange", zap.Error(err))
  444. }
  445. }
  446. }()
  447. //3.DwdFAccountReturn
  448. go func() {
  449. defer wg.Done()
  450. batchSize := 100 // 设置批量插入大小
  451. var batch []DwdFAccountReturn
  452. for accountReturn := range accountReturnPool {
  453. batch = append(batch, accountReturn)
  454. if len(batch) >= batchSize {
  455. err := AnalysisDB.Model(DwdFAccountReturn{}).CreateInBatches(batch, batchSize).Error
  456. if err != nil {
  457. log.Info("saveData DwdFAccountReturn", zap.Error(err))
  458. }
  459. batch = nil // 清空当前批次
  460. }
  461. }
  462. // 插入剩余的批次
  463. if len(batch) > 0 {
  464. err := AnalysisDB.Model(DwdFAccountReturn{}).CreateInBatches(batch, len(batch)).Error
  465. if err != nil {
  466. log.Info("saveData DwdFAccountReturn", zap.Error(err))
  467. }
  468. }
  469. }()
  470. //4.DwdFAccountReturnChange
  471. go func() {
  472. defer wg.Done()
  473. batchSize := 100 // 设置批量插入大小
  474. var batch []DwdFAccountReturnChange
  475. for accountReturnChange := range accountReturnChangePool {
  476. batch = append(batch, accountReturnChange)
  477. if len(batch) >= batchSize {
  478. err := AnalysisDB.Model(DwdFAccountReturnChange{}).CreateInBatches(batch, batchSize).Error
  479. if err != nil {
  480. log.Info("saveData DwdFAccountReturnChange", zap.Error(err))
  481. }
  482. batch = nil // 清空当前批次
  483. }
  484. }
  485. // 插入剩余的批次
  486. if len(batch) > 0 {
  487. err := AnalysisDB.Model(DwdFAccountReturnChange{}).CreateInBatches(batch, len(batch)).Error
  488. if err != nil {
  489. log.Info("saveData DwdFAccountReturnChange", zap.Error(err))
  490. }
  491. }
  492. }()
  493. // 等待所有goroutine完成
  494. wg.Wait()
  495. // 所有通道的数据都处理完毕后打印日志
  496. log.Info("All account data has been processed and saved successfully.")
  497. if GF.Cron.Url != "" {
  498. // 请求头
  499. headers := map[string]string{
  500. "Content-Type": "application/json",
  501. }
  502. // 请求体
  503. body := []byte(`{
  504. "msgtype": "text",
  505. "text": {
  506. "content": "订单回款数据处理完毕",
  507. "mentioned_list":["@all"]
  508. }
  509. }`)
  510. _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
  511. if err != nil {
  512. log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
  513. }
  514. }
  515. }
  516. // saveDataClickhouse 保存数据,Clickhouse
  517. func saveDataClickhouse() {
  518. var wg sync.WaitGroup
  519. //// 启动保存每个通道数据的goroutine
  520. wg.Add(4) // 因为有4个通道
  521. //1.DwdFAccountOrder
  522. go func() {
  523. defer wg.Done()
  524. batchSize := 100 // 设置批量插入大小
  525. var batch []DwdFAccountOrder
  526. for accountOrder := range accountOrderPool {
  527. batch = append(batch, accountOrder)
  528. if len(batch) >= batchSize {
  529. err := ClickhouseDB.Model(DwdFOrder{}).CreateInBatches(batch, batchSize).Error
  530. if err != nil {
  531. log.Info("saveDataClickhouse DwdFOrder", zap.Error(err))
  532. }
  533. batch = nil // 清空当前批次
  534. }
  535. }
  536. // 插入剩余的批次
  537. if len(batch) > 0 {
  538. err := ClickhouseDB.Model(DwdFOrder{}).CreateInBatches(batch, len(batch)).Error
  539. if err != nil {
  540. log.Info("saveDataClickhouse DwdFOrder", zap.Error(err))
  541. }
  542. }
  543. }()
  544. //
  545. //2.DwdFAccountOrderChange
  546. go func() {
  547. defer wg.Done()
  548. batchSize := 100 // 设置批量插入大小
  549. var batch []DwdFAccountOrderChange
  550. for accountOrderChange := range accountOrderChangePool {
  551. batch = append(batch, accountOrderChange)
  552. if len(batch) >= batchSize {
  553. err := ClickhouseDB.Model(DwdFOrderChange{}).CreateInBatches(batch, batchSize).Error
  554. if err != nil {
  555. log.Info("saveDataClickhouse DwdFOrderChange", zap.Error(err))
  556. }
  557. batch = nil // 清空当前批次
  558. }
  559. }
  560. // 插入剩余的批次
  561. if len(batch) > 0 {
  562. err := ClickhouseDB.Model(DwdFOrderChange{}).CreateInBatches(batch, len(batch)).Error
  563. if err != nil {
  564. log.Info("saveDataClickhouse DwdFOrderChange", zap.Error(err))
  565. }
  566. }
  567. }()
  568. //3.DwdFAccountReturn
  569. go func() {
  570. defer wg.Done()
  571. batchSize := 100 // 设置批量插入大小
  572. var batch []DwdFAccountReturn
  573. for accountReturn := range accountReturnPool {
  574. batch = append(batch, accountReturn)
  575. if len(batch) >= batchSize {
  576. err := ClickhouseDB.Model(DwdFReturn{}).CreateInBatches(batch, batchSize).Error
  577. if err != nil {
  578. log.Info("saveDataClickhouse DwdFReturn", zap.Error(err))
  579. }
  580. batch = nil // 清空当前批次
  581. }
  582. }
  583. // 插入剩余的批次
  584. if len(batch) > 0 {
  585. err := ClickhouseDB.Model(DwdFReturn{}).CreateInBatches(batch, len(batch)).Error
  586. if err != nil {
  587. log.Info("saveDataClickhouse DwdFReturn", zap.Error(err))
  588. }
  589. }
  590. }()
  591. //4.DwdFAccountReturnChange
  592. go func() {
  593. defer wg.Done()
  594. batchSize := 100 // 设置批量插入大小
  595. var batch []DwdFAccountReturnChange
  596. for accountReturnChange := range accountReturnChangePool {
  597. batch = append(batch, accountReturnChange)
  598. if len(batch) >= batchSize {
  599. err := ClickhouseDB.Model(DwdFReturnChange{}).CreateInBatches(batch, batchSize).Error
  600. if err != nil {
  601. log.Info("saveDataClickhouse DwdFReturnChange", zap.Error(err))
  602. }
  603. batch = nil // 清空当前批次
  604. }
  605. }
  606. // 插入剩余的批次
  607. if len(batch) > 0 {
  608. err := ClickhouseDB.Model(DwdFReturnChange{}).CreateInBatches(batch, len(batch)).Error
  609. if err != nil {
  610. log.Info("saveDataClickhouse DwdFReturnChange", zap.Error(err))
  611. }
  612. }
  613. }()
  614. // 等待所有goroutine完成
  615. wg.Wait()
  616. // 所有通道的数据都处理完毕后打印日志
  617. log.Info("All clickhouse data has been processed and saved successfully.")
  618. if GF.Cron.Url != "" {
  619. // 请求头
  620. headers := map[string]string{
  621. "Content-Type": "application/json",
  622. }
  623. // 请求体
  624. body := []byte(`{
  625. "msgtype": "text",
  626. "text": {
  627. "content": "Clickhouse 订单回款数据处理完毕",
  628. "mentioned_list":["@all"]
  629. }
  630. }`)
  631. _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
  632. if err != nil {
  633. log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
  634. }
  635. }
  636. }
  637. // truncateTables 清空订单相关数据表
  638. func truncateTables() {
  639. var (
  640. account_order DwdFAccountOrder
  641. account_order_change DwdFAccountOrderChange
  642. account_return DwdFAccountReturn
  643. account_return_change DwdFAccountReturnChange
  644. )
  645. table1 := account_order.TableName()
  646. table2 := account_order_change.TableName()
  647. table3 := account_return.TableName()
  648. table4 := account_return_change.TableName()
  649. // 清空表 1
  650. db, err := AnalysisDB.DB()
  651. if err != nil {
  652. panic("获取数据库连接对象失败:" + err.Error())
  653. }
  654. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table1))
  655. if err != nil {
  656. log.Info("清空失败", zap.String("数据表", table1))
  657. }
  658. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table2))
  659. if err != nil {
  660. log.Info("清空失败", zap.String("数据表", table2))
  661. }
  662. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table3))
  663. if err != nil {
  664. log.Info("清空失败", zap.String("数据表", table3))
  665. }
  666. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table4))
  667. if err != nil {
  668. log.Info("清空失败", zap.String("数据表", table4))
  669. }
  670. log.Info("所有数据表清空完毕", zap.String("数据表是:", fmt.Sprintf("%s,%s,%s,%s", table1, table2, table3, table4)))
  671. }