main.go 17 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/robfig/cron/v3"
  7. "go.uber.org/zap"
  8. "gorm.io/gorm"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. Mgo *mongodb.MongodbSim
  17. MgoP *mongodb.MongodbSim
  18. MgoSpider *mongodb.MongodbSim
  19. AnalysisDB *gorm.DB
  20. JianyuDB *gorm.DB
  21. JianyuSubjectDB *gorm.DB
  22. endTime string //取数据,时间范围的截止时间;如果为空,就取当前时间
  23. accountOrderPool = make(chan DwdFAccountOrder, 5000)
  24. accountOrderChangePool = make(chan DwdFAccountOrderChange, 5000)
  25. accountReturnPool = make(chan DwdFAccountReturn, 5000)
  26. accountReturnChangePool = make(chan DwdFAccountReturnChange, 5000)
  27. )
  28. // var endTime = "2024-11-02"
  29. func main() {
  30. local, _ := time.LoadLocation("Asia/Shanghai")
  31. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  32. eid, err := c.AddFunc(GF.Cron.Spec, dealData)
  33. if err != nil {
  34. log.Info("main", zap.Any("AddFunc err", err))
  35. }
  36. log.Info("main", zap.Any("eid", eid))
  37. c.Start()
  38. defer c.Stop()
  39. select {}
  40. }
  41. // dealData 处理订单相关数据
  42. func dealData() {
  43. //1.清空数据表
  44. endTime = time.Now().Format("2006-01-02 15:04:05")
  45. if GF.Cron.Url != "" {
  46. // 请求头
  47. headers := map[string]string{
  48. "Content-Type": "application/json",
  49. }
  50. // 请求体
  51. body := []byte(`{
  52. "msgtype": "text",
  53. "text": {
  54. "content": "开始清零订单回款数据",
  55. "mentioned_list":["@all"]
  56. }
  57. }`)
  58. _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
  59. if err != nil {
  60. log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
  61. }
  62. }
  63. truncateTables() //清空数据表
  64. log.Info("配置取值时间范围为空", zap.String("默认取值截止时间为:", endTime))
  65. //重新打开通道
  66. accountOrderPool = make(chan DwdFAccountOrder, 5000)
  67. accountOrderChangePool = make(chan DwdFAccountOrderChange, 5000)
  68. accountReturnPool = make(chan DwdFAccountReturn, 5000)
  69. accountReturnChangePool = make(chan DwdFAccountReturnChange, 5000)
  70. go saveData() //保存订单数据
  71. //go saveAccountOrder() //保存订单数据
  72. go dealAllDataAccountOrder2()
  73. //dealAllDataAccountOrder() //1.处理归集后-存量剑鱼订单表-dwd_f_account_order
  74. go dealAllDataAccountOrderChangeRecord() //2.处理归集后-存量业绩表更表-dwd_f_account_order_change
  75. go dealAllDataAccountReturn() //3.处理归集后-存量剑鱼回款表-dwd_f_account_return
  76. go dealAllDataAccountReturnChange() //4.处理归集后-剑鱼回款变更表-dwd_f_account_return_change
  77. }
  78. // dealAllDataAccountOrder2 处理归集后-存量剑鱼订单表-dwd_f_account_order
  79. func dealAllDataAccountOrder2() {
  80. defer func() {
  81. if r := recover(); r != nil {
  82. log.Info("dealAllDataAccountOrder2 Panic recovered", zap.Any("reason", r))
  83. }
  84. log.Info("dealAllDataAccountOrder2 over")
  85. }()
  86. now := time.Now()
  87. var total int64
  88. AnalysisDB.Debug().Model(&DataexportOrder{}).Where("autoUpdate < ?", endTime).Count(&total)
  89. log.Info("dealAllDataAccountOrder", zap.Any("总数是", total))
  90. rowsPerPage := 100 // 每页的数量
  91. totalPages := (int(total) / rowsPerPage) + 1 //总页数
  92. // 控制并发数的 channel,限制最大并发数量为 10
  93. concurrencyLimit := 5
  94. sem := make(chan struct{}, concurrencyLimit)
  95. var wg sync.WaitGroup // 等待所有协程完成
  96. // 启动多协程处理
  97. for i := 0; i < totalPages; i++ {
  98. wg.Add(1) // 增加一个 WaitGroup 计数
  99. go func(page int) {
  100. defer wg.Done() // 完成时减去一个计数
  101. // 获取并发信号,限制同时处理的协程数
  102. sem <- struct{}{} // 向 channel 中发送一个空结构体,占用一个槽位
  103. defer func() { <-sem }() // 在协程结束时从 channel 中取出,释放一个槽位
  104. offset := page * rowsPerPage
  105. rows, err := AnalysisDB.Debug().Model(&DataexportOrder{}).Where("autoUpdate < ?", endTime).
  106. Order("id desc").Offset(offset).Limit(rowsPerPage).Rows()
  107. if err != nil {
  108. log.Info("dealAllDataAccountOrder, Rows Error", zap.Error(err))
  109. return
  110. }
  111. defer func() {
  112. err = rows.Close()
  113. if err != nil {
  114. log.Info("Err rows.Close", zap.Error(err))
  115. }
  116. }()
  117. log.Info("dealAllDataAccountOrder ", zap.Int("current page", page))
  118. // 处理每一行数据
  119. for rows.Next() {
  120. var dataExOrder DataexportOrder //原来的订单表
  121. var accountOrder DwdFAccountOrder //归集后的订单表
  122. var user1 DwdFUserbaseBaseinfo //个人用户表
  123. var returnRecord ReturnMoneyRecord //原来的回款表
  124. var returnRecords []ReturnMoneyRecord //原来的回款表
  125. var contract Contract //合同表
  126. // ScanRows 方法用于将一行记录扫描至结构体
  127. err = AnalysisDB.ScanRows(rows, &dataExOrder)
  128. if err != nil {
  129. log.Info("dealAllDataAccountOrder,ScanRows err ", zap.Error(err))
  130. }
  131. filter := dataExOrder.Filter
  132. filterMap := make(map[string]interface{})
  133. err := json.Unmarshal([]byte(filter), &filterMap)
  134. if err != nil {
  135. log.Info("dealAllDataAccountOrder, filter.json.Unmarshal", zap.Error(err))
  136. }
  137. //订单编号不为空
  138. if dataExOrder.OrderCode != "" {
  139. // 业务逻辑...
  140. accountOrder.OrderCode = dataExOrder.OrderCode
  141. var osr OrderSaleRecord //原来的业绩变更表
  142. err = JianyuDB.Order("id asc").Where("ordercode = ? ", dataExOrder.OrderCode).First(&osr).Error
  143. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  144. log.Info("dealAllDataAccountOrder", zap.Error(err))
  145. }
  146. //有业绩表更表记录
  147. if osr.ID > 0 {
  148. if osr.SalerName == "-" {
  149. accountOrder.SalerName = "运营"
  150. } else {
  151. accountOrder.SalerName = osr.SalerName
  152. }
  153. accountOrder.SalerDept = osr.SalerDept
  154. }
  155. accountOrder.CompanyName = dataExOrder.CompanyName
  156. if len(dataExOrder.UserID) > 20 { //个人身份
  157. err = JianyuSubjectDB.Where("userid = ? ", dataExOrder.UserID).First(&user1).Error
  158. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  159. log.Info("dealAllDataAccountOrder", zap.Error(err))
  160. }
  161. accountOrder.UserRegtime = user1.LRegistedate
  162. } else {
  163. //企业身份
  164. rs := new(struct {
  165. PositionId int `gorm:"column:position_id"`
  166. LRegistedate *time.Time `gorm:"column:l_registedate"`
  167. })
  168. if dataExOrder.UserID != "" {
  169. sql := fmt.Sprintf(`SELECT im.position_id,ub.l_registedate FROM
  170. (SELECT userid,position_id FROM Jianyu_subjectdb.dwd_f_userbase_id_mapping WHERE position_id = %s) im
  171. LEFT JOIN Jianyu_subjectdb.dwd_f_userbase_baseinfo ub
  172. ON im.userid=ub.userid`, dataExOrder.UserID)
  173. err = JianyuSubjectDB.Raw(sql).Scan(&rs).Error
  174. if err != nil {
  175. log.Info("处理用户订单表数据", zap.String("查询企业用户注册时间失败", dataExOrder.UserID))
  176. }
  177. accountOrder.UserRegtime = rs.LRegistedate
  178. }
  179. }
  180. accountOrder.CreateTime = dataExOrder.CreateTime
  181. if dataExOrder.IsBackstageOrder == 0 { //线上订单
  182. accountOrder.ReturnTime = dataExOrder.PayTime
  183. accountOrder.TotalReceived = util.IntAll(filterMap["originalAmount"]) //累计已收
  184. } else if dataExOrder.IsBackstageOrder == 1 {
  185. err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Where("state = 1").Order("return_time asc").First(&returnRecord).Error
  186. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  187. log.Info("dealAllDataAccountOrder", zap.Error(err))
  188. }
  189. accountOrder.ReturnTime = returnRecord.ReturnTime
  190. err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Find(&returnRecords).Error
  191. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  192. log.Info("dealAllDataAccountOrder", zap.Error(err))
  193. }
  194. totalMoney := 0
  195. if len(returnRecords) > 0 {
  196. for _, v := range returnRecords {
  197. totalMoney += v.ReturnMoney
  198. }
  199. }
  200. accountOrder.TotalReceived = totalMoney
  201. }
  202. accountOrder.SaleTime = dataExOrder.SaleTime
  203. accountOrder.OriginalPrice = dataExOrder.OriginalPrice
  204. //合同金额
  205. if dataExOrder.IsBackstageOrder == 0 { //线上订单
  206. if dataExOrder.OrderStatus == 1 {
  207. accountOrder.ContractMoney = util.IntAll(filterMap["originalAmount"])
  208. } else {
  209. accountOrder.ContractMoney = dataExOrder.OrderMoney
  210. }
  211. } else { //线下订单
  212. accountOrder.ContractMoney = dataExOrder.PayMoney
  213. }
  214. accountOrder.Commission = dataExOrder.Commission
  215. accountOrder.ProceduresMoney = dataExOrder.ProceduresMoney
  216. accountOrder.ReceivableAmount = accountOrder.ContractMoney - accountOrder.Commission - accountOrder.ProceduresMoney
  217. accountOrder.ProductType = dataExOrder.ProductType
  218. if dataExOrder.ProductType == "大会员" || dataExOrder.ProductType == "大会员-子账号" || dataExOrder.ProductType == "大会员-补充包" {
  219. switch util.IntAll(filterMap["level"]) {
  220. case 1:
  221. accountOrder.DataSpec = "专家版"
  222. case 2:
  223. accountOrder.DataSpec = "智慧版"
  224. case 3:
  225. accountOrder.DataSpec = "商机版"
  226. case 4:
  227. accountOrder.DataSpec = "试用版"
  228. case 5:
  229. accountOrder.DataSpec = "定制版"
  230. case 6:
  231. accountOrder.DataSpec = "商机版2.0"
  232. case 7:
  233. accountOrder.DataSpec = "专家版2.0"
  234. }
  235. //付费类型 VipType
  236. switch util.IntAll(filterMap["recordPayType"]) {
  237. case 1:
  238. accountOrder.VipType = "购买"
  239. case 2:
  240. accountOrder.VipType = "续费"
  241. case 3:
  242. accountOrder.VipType = "升级"
  243. case 4:
  244. accountOrder.VipType = "试用"
  245. }
  246. } else {
  247. accountOrder.DataSpec = dataExOrder.DataSpec
  248. ////付费类型 VipType
  249. switch dataExOrder.VipType {
  250. case 0:
  251. accountOrder.VipType = "购买"
  252. case 1:
  253. accountOrder.VipType = "续费"
  254. case 2:
  255. accountOrder.VipType = "升级"
  256. case 3:
  257. accountOrder.VipType = "试用"
  258. }
  259. }
  260. //OrderStatus 订单状态
  261. if dataExOrder.RefundStatus == 1 || dataExOrder.RefundStatus == 2 {
  262. accountOrder.OrderStatus = "已完成"
  263. } else {
  264. switch dataExOrder.OrderStatus {
  265. case 1:
  266. accountOrder.OrderStatus = "已完成"
  267. case -1:
  268. accountOrder.OrderStatus = "逻辑删除"
  269. case -2:
  270. accountOrder.OrderStatus = "已取消"
  271. case -3:
  272. accountOrder.OrderStatus = "已取消"
  273. case 0:
  274. accountOrder.OrderStatus = "未完成"
  275. }
  276. }
  277. //回款状态
  278. if dataExOrder.IsBackstageOrder == 0 {
  279. if accountOrder.OrderStatus == "已支付" {
  280. accountOrder.ReturnStatus = "全额回款"
  281. } else {
  282. accountOrder.ReturnStatus = "未支付"
  283. }
  284. } else if dataExOrder.IsBackstageOrder == 1 {
  285. switch dataExOrder.ReturnStatus {
  286. case 1:
  287. accountOrder.ReturnStatus = "全额回款"
  288. case 2:
  289. accountOrder.ReturnStatus = "部分回款"
  290. case 0:
  291. accountOrder.ReturnStatus = "未回款"
  292. }
  293. }
  294. //refund_status 回款状态
  295. switch dataExOrder.RefundStatus {
  296. case 0:
  297. accountOrder.RefundStatus = "未退款"
  298. case 1:
  299. accountOrder.RefundStatus = "全额退款"
  300. case 2:
  301. accountOrder.RefundStatus = "部分退款"
  302. }
  303. //
  304. accountOrder.UserPhone = dataExOrder.UserPhone
  305. accountOrder.UserID = dataExOrder.UserID
  306. if dataExOrder.VipStarttime == nil || dataExOrder.VipStarttime.Year() > 1000 {
  307. accountOrder.VipStarttime = dataExOrder.VipStarttime
  308. accountOrder.VipEndtime = dataExOrder.VipEndtime
  309. }
  310. // 合同状态
  311. err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Find(&contract).Error
  312. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  313. log.Info("dealAllDataAccountOrder", zap.Error(err))
  314. }
  315. if contract.ContractStatus == 1 {
  316. accountOrder.ContractStatus = "已签协议"
  317. } else if contract.ContractStatus == 0 {
  318. accountOrder.ContractStatus = "未签协议"
  319. }
  320. accountOrder.ContractCode = contract.ContractCode
  321. accountOrder.ContractTime = contract.ContractTime
  322. if dataExOrder.SigningSubject == "h01" {
  323. accountOrder.SigningSubject = "北京剑鱼信息技术有限公司"
  324. } else if dataExOrder.SigningSubject == "h02" {
  325. accountOrder.SigningSubject = "北京拓普丰联信息科技股份有限公司"
  326. }
  327. accountOrder.OrderChannel = dataExOrder.OrderChannel
  328. accountOrder.DistributionChannel = dataExOrder.DistributionChannel
  329. if dataExOrder.IsBackstageOrder == 1 {
  330. accountOrder.IsBackstageOrder = "是"
  331. } else {
  332. accountOrder.IsBackstageOrder = "否"
  333. }
  334. switch dataExOrder.PayWay {
  335. case "ali", "ali_app", "ali_pc": //ali、ali_app、ali_pc 处理为“支付宝”
  336. accountOrder.PayWay = "支付宝"
  337. case "wx", "wx_app", "wx_js", "wx_pc": // wx、wx_app、wx_js、wx_pc 处理为“微信”
  338. accountOrder.PayWay = "微信"
  339. case "transferAccounts":
  340. accountOrder.PayWay = "对公转账"
  341. default:
  342. accountOrder.PayWay = dataExOrder.PayWay
  343. }
  344. //生成订单数据
  345. accountOrderPool <- accountOrder
  346. //err = AnalysisDB.Create(&accountOrder).Error
  347. //if err != nil {
  348. // log.Info("dealAllDataAccountOrder Create ", zap.Error(err))
  349. //}
  350. }
  351. }
  352. }(i)
  353. }
  354. // 等待所有协程处理完成
  355. wg.Wait()
  356. log.Info("dealAllDataAccountOrder 迭代结束", zap.Int64("数据总量", total))
  357. close(accountOrderPool)
  358. log.Info("dealAllDataAccountOrder", zap.Any("处理时长", time.Since(now).Minutes()))
  359. }
  360. // saveAccountOrder 保存归集订单表
  361. func saveAccountOrder() {
  362. defer func() {
  363. if r := recover(); r != nil {
  364. log.Info("saveAccountOrder Panic recovered", zap.Any("reason", r))
  365. }
  366. log.Info("saveAccountOrder over")
  367. }()
  368. for {
  369. select {
  370. case accountOrder, ok := <-accountOrderPool:
  371. if !ok {
  372. // 通道已关闭,退出循环
  373. return
  374. }
  375. err := AnalysisDB.Create(&accountOrder).Error
  376. if err != nil {
  377. log.Info("dealAllDataAccountOrder Create ", zap.Error(err), zap.String("order_code", accountOrder.OrderCode))
  378. }
  379. }
  380. }
  381. }
  382. // saveData 保存数据
  383. func saveData() {
  384. var wg sync.WaitGroup
  385. // 启动保存每个通道数据的goroutine
  386. wg.Add(4) // 因为有4个通道
  387. go func() {
  388. defer wg.Done() // 完成时减少计数器
  389. for accountOrder := range accountOrderPool {
  390. err := AnalysisDB.Create(&accountOrder).Error
  391. if err != nil {
  392. log.Info("dealAllDataAccountOrder Create", zap.Error(err), zap.String("order_code", accountOrder.OrderCode))
  393. }
  394. }
  395. }()
  396. go func() {
  397. defer wg.Done()
  398. for accountOrderChange := range accountOrderChangePool {
  399. err := AnalysisDB.Create(&accountOrderChange).Error
  400. if err != nil {
  401. log.Info("dealAllDataAccountOrderChange Create", zap.Error(err), zap.String("order_code", accountOrderChange.OrderCode))
  402. }
  403. }
  404. }()
  405. go func() {
  406. defer wg.Done()
  407. for accountReturn := range accountReturnPool {
  408. err := AnalysisDB.Create(&accountReturn).Error
  409. if err != nil {
  410. log.Info("dealAllDataAccountReturn Create", zap.Error(err), zap.String("order_code", accountReturn.OrderCode))
  411. }
  412. }
  413. }()
  414. go func() {
  415. defer wg.Done()
  416. for accountReturnChange := range accountReturnChangePool {
  417. err := AnalysisDB.Create(&accountReturnChange).Error
  418. if err != nil {
  419. log.Info("dealAllDataAccountReturnChange Create", zap.Error(err), zap.String("order_code", accountReturnChange.OrderCode))
  420. }
  421. }
  422. }()
  423. // 等待所有goroutine完成
  424. wg.Wait()
  425. // 所有通道的数据都处理完毕后打印日志
  426. log.Info("All account data has been processed and saved successfully.")
  427. if GF.Cron.Url != "" {
  428. // 请求头
  429. headers := map[string]string{
  430. "Content-Type": "application/json",
  431. }
  432. // 请求体
  433. body := []byte(`{
  434. "msgtype": "text",
  435. "text": {
  436. "content": "订单回款数据处理完毕",
  437. "mentioned_list":["@all"]
  438. }
  439. }`)
  440. _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
  441. if err != nil {
  442. log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
  443. }
  444. }
  445. }
  446. // truncateTables 清空订单相关数据表
  447. func truncateTables() {
  448. var (
  449. account_order DwdFAccountOrder
  450. account_order_change DwdFAccountOrderChange
  451. account_return DwdFAccountReturn
  452. account_return_change DwdFAccountReturnChange
  453. )
  454. table1 := account_order.TableName()
  455. table2 := account_order_change.TableName()
  456. table3 := account_return.TableName()
  457. table4 := account_return_change.TableName()
  458. // 清空表 1
  459. db, err := AnalysisDB.DB()
  460. if err != nil {
  461. panic("获取数据库连接对象失败:" + err.Error())
  462. }
  463. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table1))
  464. if err != nil {
  465. log.Info("清空失败", zap.String("数据表", table1))
  466. }
  467. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table2))
  468. if err != nil {
  469. log.Info("清空失败", zap.String("数据表", table2))
  470. }
  471. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table3))
  472. if err != nil {
  473. log.Info("清空失败", zap.String("数据表", table3))
  474. }
  475. _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table4))
  476. if err != nil {
  477. log.Info("清空失败", zap.String("数据表", table4))
  478. }
  479. log.Info("所有数据表清空完毕", zap.String("数据表是:", fmt.Sprintf("%s,%s,%s,%s", table1, table2, table3, table4)))
  480. }