123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517 |
- package main
- import (
- "encoding/json"
- "errors"
- "fmt"
- "github.com/robfig/cron/v3"
- "go.uber.org/zap"
- "gorm.io/gorm"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "sync"
- "time"
- )
- var (
- Mgo *mongodb.MongodbSim
- MgoP *mongodb.MongodbSim
- MgoSpider *mongodb.MongodbSim
- AnalysisDB *gorm.DB
- JianyuDB *gorm.DB
- JianyuSubjectDB *gorm.DB
- endTime string //取数据,时间范围的截止时间;如果为空,就取当前时间
- accountOrderPool = make(chan DwdFAccountOrder, 5000)
- accountOrderChangePool = make(chan DwdFAccountOrderChange, 5000)
- accountReturnPool = make(chan DwdFAccountReturn, 5000)
- accountReturnChangePool = make(chan DwdFAccountReturnChange, 5000)
- )
- // var endTime = "2024-11-02"
- func main() {
- local, _ := time.LoadLocation("Asia/Shanghai")
- c := cron.New(cron.WithLocation(local), cron.WithSeconds())
- eid, err := c.AddFunc(GF.Cron.Spec, dealData)
- if err != nil {
- log.Info("main", zap.Any("AddFunc err", err))
- }
- log.Info("main", zap.Any("eid", eid))
- c.Start()
- defer c.Stop()
- select {}
- }
- // dealData 处理订单相关数据
- func dealData() {
- //1.清空数据表
- endTime = time.Now().Format("2006-01-02 15:04:05")
- if GF.Cron.Url != "" {
- // 请求头
- headers := map[string]string{
- "Content-Type": "application/json",
- }
- // 请求体
- body := []byte(`{
- "msgtype": "text",
- "text": {
- "content": "开始清零订单回款数据",
- "mentioned_list":["@all"]
- }
- }`)
- _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
- if err != nil {
- log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
- }
- }
- truncateTables() //清空数据表
- log.Info("配置取值时间范围为空", zap.String("默认取值截止时间为:", endTime))
- //重新打开通道
- accountOrderPool = make(chan DwdFAccountOrder, 5000)
- accountOrderChangePool = make(chan DwdFAccountOrderChange, 5000)
- accountReturnPool = make(chan DwdFAccountReturn, 5000)
- accountReturnChangePool = make(chan DwdFAccountReturnChange, 5000)
- go saveData() //保存订单数据
- //go saveAccountOrder() //保存订单数据
- go dealAllDataAccountOrder2()
- //dealAllDataAccountOrder() //1.处理归集后-存量剑鱼订单表-dwd_f_account_order
- go dealAllDataAccountOrderChangeRecord() //2.处理归集后-存量业绩表更表-dwd_f_account_order_change
- go dealAllDataAccountReturn() //3.处理归集后-存量剑鱼回款表-dwd_f_account_return
- go dealAllDataAccountReturnChange() //4.处理归集后-剑鱼回款变更表-dwd_f_account_return_change
- }
- // dealAllDataAccountOrder2 处理归集后-存量剑鱼订单表-dwd_f_account_order
- func dealAllDataAccountOrder2() {
- defer func() {
- if r := recover(); r != nil {
- log.Info("dealAllDataAccountOrder2 Panic recovered", zap.Any("reason", r))
- }
- log.Info("dealAllDataAccountOrder2 over")
- }()
- now := time.Now()
- var total int64
- AnalysisDB.Debug().Model(&DataexportOrder{}).Where("autoUpdate < ?", endTime).Count(&total)
- log.Info("dealAllDataAccountOrder", zap.Any("总数是", total))
- rowsPerPage := 100 // 每页的数量
- totalPages := (int(total) / rowsPerPage) + 1 //总页数
- // 控制并发数的 channel,限制最大并发数量为 10
- concurrencyLimit := 5
- sem := make(chan struct{}, concurrencyLimit)
- var wg sync.WaitGroup // 等待所有协程完成
- // 启动多协程处理
- for i := 0; i < totalPages; i++ {
- wg.Add(1) // 增加一个 WaitGroup 计数
- go func(page int) {
- defer wg.Done() // 完成时减去一个计数
- // 获取并发信号,限制同时处理的协程数
- sem <- struct{}{} // 向 channel 中发送一个空结构体,占用一个槽位
- defer func() { <-sem }() // 在协程结束时从 channel 中取出,释放一个槽位
- offset := page * rowsPerPage
- rows, err := AnalysisDB.Debug().Model(&DataexportOrder{}).Where("autoUpdate < ?", endTime).
- Order("id desc").Offset(offset).Limit(rowsPerPage).Rows()
- if err != nil {
- log.Info("dealAllDataAccountOrder, Rows Error", zap.Error(err))
- return
- }
- defer func() {
- err = rows.Close()
- if err != nil {
- log.Info("Err rows.Close", zap.Error(err))
- }
- }()
- log.Info("dealAllDataAccountOrder ", zap.Int("current page", page))
- // 处理每一行数据
- for rows.Next() {
- var dataExOrder DataexportOrder //原来的订单表
- var accountOrder DwdFAccountOrder //归集后的订单表
- var user1 DwdFUserbaseBaseinfo //个人用户表
- var returnRecord ReturnMoneyRecord //原来的回款表
- var returnRecords []ReturnMoneyRecord //原来的回款表
- var contract Contract //合同表
- // ScanRows 方法用于将一行记录扫描至结构体
- err = AnalysisDB.ScanRows(rows, &dataExOrder)
- if err != nil {
- log.Info("dealAllDataAccountOrder,ScanRows err ", zap.Error(err))
- }
- filter := dataExOrder.Filter
- filterMap := make(map[string]interface{})
- err := json.Unmarshal([]byte(filter), &filterMap)
- if err != nil {
- log.Info("dealAllDataAccountOrder, filter.json.Unmarshal", zap.Error(err))
- }
- //订单编号不为空
- if dataExOrder.OrderCode != "" {
- // 业务逻辑...
- accountOrder.OrderCode = dataExOrder.OrderCode
- var osr OrderSaleRecord //原来的业绩变更表
- err = JianyuDB.Order("id asc").Where("ordercode = ? ", dataExOrder.OrderCode).First(&osr).Error
- if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
- log.Info("dealAllDataAccountOrder", zap.Error(err))
- }
- //有业绩表更表记录
- if osr.ID > 0 {
- if osr.SalerName == "-" {
- accountOrder.SalerName = "运营"
- } else {
- accountOrder.SalerName = osr.SalerName
- }
- accountOrder.SalerDept = osr.SalerDept
- }
- accountOrder.CompanyName = dataExOrder.CompanyName
- if len(dataExOrder.UserID) > 20 { //个人身份
- err = JianyuSubjectDB.Where("userid = ? ", dataExOrder.UserID).First(&user1).Error
- if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
- log.Info("dealAllDataAccountOrder", zap.Error(err))
- }
- accountOrder.UserRegtime = user1.LRegistedate
- } else {
- //企业身份
- rs := new(struct {
- PositionId int `gorm:"column:position_id"`
- LRegistedate *time.Time `gorm:"column:l_registedate"`
- })
- if dataExOrder.UserID != "" {
- sql := fmt.Sprintf(`SELECT im.position_id,ub.l_registedate FROM
- (SELECT userid,position_id FROM Jianyu_subjectdb.dwd_f_userbase_id_mapping WHERE position_id = %s) im
- LEFT JOIN Jianyu_subjectdb.dwd_f_userbase_baseinfo ub
- ON im.userid=ub.userid`, dataExOrder.UserID)
- err = JianyuSubjectDB.Raw(sql).Scan(&rs).Error
- if err != nil {
- log.Info("处理用户订单表数据", zap.String("查询企业用户注册时间失败", dataExOrder.UserID))
- }
- accountOrder.UserRegtime = rs.LRegistedate
- }
- }
- accountOrder.CreateTime = dataExOrder.CreateTime
- if dataExOrder.IsBackstageOrder == 0 { //线上订单
- accountOrder.ReturnTime = dataExOrder.PayTime
- accountOrder.TotalReceived = util.IntAll(filterMap["originalAmount"]) //累计已收
- } else if dataExOrder.IsBackstageOrder == 1 {
- err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Where("state = 1").Order("return_time asc").First(&returnRecord).Error
- if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
- log.Info("dealAllDataAccountOrder", zap.Error(err))
- }
- accountOrder.ReturnTime = returnRecord.ReturnTime
- err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Find(&returnRecords).Error
- if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
- log.Info("dealAllDataAccountOrder", zap.Error(err))
- }
- totalMoney := 0
- if len(returnRecords) > 0 {
- for _, v := range returnRecords {
- totalMoney += v.ReturnMoney
- }
- }
- accountOrder.TotalReceived = totalMoney
- }
- accountOrder.SaleTime = dataExOrder.SaleTime
- accountOrder.OriginalPrice = dataExOrder.OriginalPrice
- //合同金额
- if dataExOrder.IsBackstageOrder == 0 { //线上订单
- if dataExOrder.OrderStatus == 1 {
- accountOrder.ContractMoney = util.IntAll(filterMap["originalAmount"])
- } else {
- accountOrder.ContractMoney = dataExOrder.OrderMoney
- }
- } else { //线下订单
- accountOrder.ContractMoney = dataExOrder.PayMoney
- }
- accountOrder.Commission = dataExOrder.Commission
- accountOrder.ProceduresMoney = dataExOrder.ProceduresMoney
- accountOrder.ReceivableAmount = accountOrder.ContractMoney - accountOrder.Commission - accountOrder.ProceduresMoney
- accountOrder.ProductType = dataExOrder.ProductType
- if dataExOrder.ProductType == "大会员" || dataExOrder.ProductType == "大会员-子账号" || dataExOrder.ProductType == "大会员-补充包" {
- switch util.IntAll(filterMap["level"]) {
- case 1:
- accountOrder.DataSpec = "专家版"
- case 2:
- accountOrder.DataSpec = "智慧版"
- case 3:
- accountOrder.DataSpec = "商机版"
- case 4:
- accountOrder.DataSpec = "试用版"
- case 5:
- accountOrder.DataSpec = "定制版"
- case 6:
- accountOrder.DataSpec = "商机版2.0"
- case 7:
- accountOrder.DataSpec = "专家版2.0"
- }
- //付费类型 VipType
- switch util.IntAll(filterMap["recordPayType"]) {
- case 1:
- accountOrder.VipType = "购买"
- case 2:
- accountOrder.VipType = "续费"
- case 3:
- accountOrder.VipType = "升级"
- case 4:
- accountOrder.VipType = "试用"
- }
- } else {
- accountOrder.DataSpec = dataExOrder.DataSpec
- ////付费类型 VipType
- switch dataExOrder.VipType {
- case 0:
- accountOrder.VipType = "购买"
- case 1:
- accountOrder.VipType = "续费"
- case 2:
- accountOrder.VipType = "升级"
- case 3:
- accountOrder.VipType = "试用"
- }
- }
- //OrderStatus 订单状态
- if dataExOrder.RefundStatus == 1 || dataExOrder.RefundStatus == 2 {
- accountOrder.OrderStatus = "已完成"
- } else {
- switch dataExOrder.OrderStatus {
- case 1:
- accountOrder.OrderStatus = "已完成"
- case -1:
- accountOrder.OrderStatus = "逻辑删除"
- case -2:
- accountOrder.OrderStatus = "已取消"
- case -3:
- accountOrder.OrderStatus = "已取消"
- case 0:
- accountOrder.OrderStatus = "未完成"
- }
- }
- //回款状态
- if dataExOrder.IsBackstageOrder == 0 {
- if accountOrder.OrderStatus == "已支付" {
- accountOrder.ReturnStatus = "全额回款"
- } else {
- accountOrder.ReturnStatus = "未支付"
- }
- } else if dataExOrder.IsBackstageOrder == 1 {
- switch dataExOrder.ReturnStatus {
- case 1:
- accountOrder.ReturnStatus = "全额回款"
- case 2:
- accountOrder.ReturnStatus = "部分回款"
- case 0:
- accountOrder.ReturnStatus = "未回款"
- }
- }
- //refund_status 回款状态
- switch dataExOrder.RefundStatus {
- case 0:
- accountOrder.RefundStatus = "未退款"
- case 1:
- accountOrder.RefundStatus = "全额退款"
- case 2:
- accountOrder.RefundStatus = "部分退款"
- }
- //
- accountOrder.UserPhone = dataExOrder.UserPhone
- accountOrder.UserID = dataExOrder.UserID
- if dataExOrder.VipStarttime == nil || dataExOrder.VipStarttime.Year() > 1000 {
- accountOrder.VipStarttime = dataExOrder.VipStarttime
- accountOrder.VipEndtime = dataExOrder.VipEndtime
- }
- // 合同状态
- err = AnalysisDB.Where("order_code = ? ", dataExOrder.OrderCode).Find(&contract).Error
- if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
- log.Info("dealAllDataAccountOrder", zap.Error(err))
- }
- if contract.ContractStatus == 1 {
- accountOrder.ContractStatus = "已签协议"
- } else if contract.ContractStatus == 0 {
- accountOrder.ContractStatus = "未签协议"
- }
- accountOrder.ContractCode = contract.ContractCode
- accountOrder.ContractTime = contract.ContractTime
- if dataExOrder.SigningSubject == "h01" {
- accountOrder.SigningSubject = "北京剑鱼信息技术有限公司"
- } else if dataExOrder.SigningSubject == "h02" {
- accountOrder.SigningSubject = "北京拓普丰联信息科技股份有限公司"
- }
- accountOrder.OrderChannel = dataExOrder.OrderChannel
- accountOrder.DistributionChannel = dataExOrder.DistributionChannel
- if dataExOrder.IsBackstageOrder == 1 {
- accountOrder.IsBackstageOrder = "是"
- } else {
- accountOrder.IsBackstageOrder = "否"
- }
- switch dataExOrder.PayWay {
- case "ali", "ali_app", "ali_pc": //ali、ali_app、ali_pc 处理为“支付宝”
- accountOrder.PayWay = "支付宝"
- case "wx", "wx_app", "wx_js", "wx_pc": // wx、wx_app、wx_js、wx_pc 处理为“微信”
- accountOrder.PayWay = "微信"
- case "transferAccounts":
- accountOrder.PayWay = "对公转账"
- default:
- accountOrder.PayWay = dataExOrder.PayWay
- }
- //生成订单数据
- accountOrderPool <- accountOrder
- //err = AnalysisDB.Create(&accountOrder).Error
- //if err != nil {
- // log.Info("dealAllDataAccountOrder Create ", zap.Error(err))
- //}
- }
- }
- }(i)
- }
- // 等待所有协程处理完成
- wg.Wait()
- log.Info("dealAllDataAccountOrder 迭代结束", zap.Int64("数据总量", total))
- close(accountOrderPool)
- log.Info("dealAllDataAccountOrder", zap.Any("处理时长", time.Since(now).Minutes()))
- }
- // saveAccountOrder 保存归集订单表
- func saveAccountOrder() {
- defer func() {
- if r := recover(); r != nil {
- log.Info("saveAccountOrder Panic recovered", zap.Any("reason", r))
- }
- log.Info("saveAccountOrder over")
- }()
- for {
- select {
- case accountOrder, ok := <-accountOrderPool:
- if !ok {
- // 通道已关闭,退出循环
- return
- }
- err := AnalysisDB.Create(&accountOrder).Error
- if err != nil {
- log.Info("dealAllDataAccountOrder Create ", zap.Error(err), zap.String("order_code", accountOrder.OrderCode))
- }
- }
- }
- }
- // saveData 保存数据
- func saveData() {
- var wg sync.WaitGroup
- // 启动保存每个通道数据的goroutine
- wg.Add(4) // 因为有4个通道
- go func() {
- defer wg.Done() // 完成时减少计数器
- for accountOrder := range accountOrderPool {
- err := AnalysisDB.Create(&accountOrder).Error
- if err != nil {
- log.Info("dealAllDataAccountOrder Create", zap.Error(err), zap.String("order_code", accountOrder.OrderCode))
- }
- }
- }()
- go func() {
- defer wg.Done()
- for accountOrderChange := range accountOrderChangePool {
- err := AnalysisDB.Create(&accountOrderChange).Error
- if err != nil {
- log.Info("dealAllDataAccountOrderChange Create", zap.Error(err), zap.String("order_code", accountOrderChange.OrderCode))
- }
- }
- }()
- go func() {
- defer wg.Done()
- for accountReturn := range accountReturnPool {
- err := AnalysisDB.Create(&accountReturn).Error
- if err != nil {
- log.Info("dealAllDataAccountReturn Create", zap.Error(err), zap.String("order_code", accountReturn.OrderCode))
- }
- }
- }()
- go func() {
- defer wg.Done()
- for accountReturnChange := range accountReturnChangePool {
- err := AnalysisDB.Create(&accountReturnChange).Error
- if err != nil {
- log.Info("dealAllDataAccountReturnChange Create", zap.Error(err), zap.String("order_code", accountReturnChange.OrderCode))
- }
- }
- }()
- // 等待所有goroutine完成
- wg.Wait()
- // 所有通道的数据都处理完毕后打印日志
- log.Info("All account data has been processed and saved successfully.")
- if GF.Cron.Url != "" {
- // 请求头
- headers := map[string]string{
- "Content-Type": "application/json",
- }
- // 请求体
- body := []byte(`{
- "msgtype": "text",
- "text": {
- "content": "订单回款数据处理完毕",
- "mentioned_list":["@all"]
- }
- }`)
- _, err := HTTPRequest("POST", GF.Cron.Url, headers, body)
- if err != nil {
- log.Info("HTTPRequest ", zap.Error(err), zap.String("body", string(body)))
- }
- }
- }
- // truncateTables 清空订单相关数据表
- func truncateTables() {
- var (
- account_order DwdFAccountOrder
- account_order_change DwdFAccountOrderChange
- account_return DwdFAccountReturn
- account_return_change DwdFAccountReturnChange
- )
- table1 := account_order.TableName()
- table2 := account_order_change.TableName()
- table3 := account_return.TableName()
- table4 := account_return_change.TableName()
- // 清空表 1
- db, err := AnalysisDB.DB()
- if err != nil {
- panic("获取数据库连接对象失败:" + err.Error())
- }
- _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table1))
- if err != nil {
- log.Info("清空失败", zap.String("数据表", table1))
- }
- _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table2))
- if err != nil {
- log.Info("清空失败", zap.String("数据表", table2))
- }
- _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table3))
- if err != nil {
- log.Info("清空失败", zap.String("数据表", table3))
- }
- _, err = db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", table4))
- if err != nil {
- log.Info("清空失败", zap.String("数据表", table4))
- }
- log.Info("所有数据表清空完毕", zap.String("数据表是:", fmt.Sprintf("%s,%s,%s,%s", table1, table2, table3, table4)))
- }
|