action_order.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package entity
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. . "online_datasync/config"
  7. . "online_datasync/db"
  8. "strings"
  9. "time"
  10. . "app.yhyue.com/moapp/jybase/common"
  11. . "app.yhyue.com/moapp/jybase/date"
  12. )
  13. var (
  14. Action_order *action_order
  15. )
  16. type action_order struct {
  17. }
  18. func (a *action_order) TableName() string {
  19. return "action_order"
  20. }
  21. //
  22. func (a *action_order) SaveFields() []string {
  23. return []string{"userid", "product_code", "createtime", "pay_time", "pay_way", "order_status", "order_money", "pay_money", "order_snapshot_id", "bill_status", "order_type", "timestamp"}
  24. }
  25. //
  26. func (a *action_order) Run(start_unix, end_unix int64, start_layout, end_layout string) {
  27. log.Println("开始同步", a.TableName(), "表。。。")
  28. a.add()
  29. if start_unix > 0 {
  30. a.update(start_unix, end_unix, start_layout, end_layout)
  31. }
  32. log.Println("同步", a.TableName(), "表结束。。。")
  33. }
  34. //
  35. func (a *action_order) add() {
  36. log.Println("开始同步新增", a.TableName(), "表。。。")
  37. index := 0
  38. action_order_array, action_order_spec_array, action_product_record_array := []interface{}{}, []interface{}{}, []interface{}{}
  39. Mysql_From_Jianyu.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
  40. for _, v := range *l {
  41. index++
  42. TimeTask.Dataexport_order_id = Int64All(v["id"])
  43. product_code := a.getProductCode(ObjToString(v["product_type"]))
  44. //
  45. action_order_array = append(action_order_array, v["user_id"], product_code, v["create_time"], v["pay_time"], a.getPayWay(ObjToString(v["pay_way"])), v["order_status"], v["order_money"], v["pay_money"], v["id"], v["applybill_status"], a.getOrderType(IntAllDef(v["vip_type"], -1)), NowFormat(Date_Full_Layout))
  46. //
  47. var buyerclass_num interface{}
  48. var area_num interface{}
  49. var city_num interface{}
  50. if v["filter"] != nil {
  51. filter := make(map[string]interface{})
  52. if err := json.Unmarshal([]byte(ObjToString(v["filter"])), &filter); err == nil {
  53. filter_buyerclass, _ := filter["buyerclass"].([]interface{})
  54. buyerclass_num = len(filter_buyerclass)
  55. filter_area, _ := filter["area"].([]interface{})
  56. area_num = len(filter_area)
  57. filter_city, _ := filter["city"].([]interface{})
  58. city_num = len(filter_city)
  59. }
  60. }
  61. action_order_spec_array = append(action_order_spec_array, v["user_id"], v["id"], product_code, v["original_price"], v["discount_price"], v["data_count"], v["order_money"], v["order_money"], v["create_time"], v["vip_starttime"], v["vip_endtime"], v["filter"], buyerclass_num, area_num, city_num)
  62. //
  63. var status interface{}
  64. if v["vip_starttime"] != nil && v["vip_endtime"] != nil {
  65. vip_starttime, start_err := time.ParseInLocation(Date_Full_Layout, ObjToString(v["vip_starttime"]), time.Local)
  66. vip_endtime, end_err := time.ParseInLocation(Date_Full_Layout, ObjToString(v["vip_endtime"]), time.Local)
  67. if start_err == nil && end_err == nil {
  68. now := time.Now()
  69. if now.After(vip_starttime) && now.Before(vip_endtime) {
  70. status = 1
  71. } else {
  72. status = 2
  73. }
  74. }
  75. }
  76. action_product_record_array = append(action_product_record_array, v["user_id"], product_code, v["vip_starttime"], v["vip_endtime"], v["create_time"], v["id"], status, NowFormat(Date_Full_Layout))
  77. //
  78. if index%Config.InsertBathSize == 0 {
  79. log.Println("同步新增", a.TableName(), "表", index)
  80. Mysql_Main.InsertIgnoreBatch(a.TableName(), a.SaveFields(), action_order_array)
  81. action_order_array = []interface{}{}
  82. //
  83. log.Println("同步新增", Action_order_spec.TableName(), "表", index)
  84. Mysql_Main.InsertIgnoreBatch(Action_order_spec.TableName(), Action_order_spec.SaveFields(), action_order_spec_array)
  85. action_order_spec_array = []interface{}{}
  86. //
  87. log.Println("同步新增", Action_product_record.TableName(), "表", index)
  88. Mysql_Main.InsertIgnoreBatch(Action_product_record.TableName(), Action_product_record.SaveFields(), action_product_record_array)
  89. action_product_record_array = []interface{}{}
  90. }
  91. }
  92. }, `select id,user_id,create_time,pay_time,pay_way,order_status,order_money,pay_money,applybill_status,vip_type,product_type,vip_starttime,vip_endtime,original_price,discount_price,data_count,filter from dataexport_order where id>? order by id`, TimeTask.Dataexport_order_id)
  93. if len(action_order_array) > 0 {
  94. Mysql_Main.InsertIgnoreBatch(a.TableName(), a.SaveFields(), action_order_array)
  95. action_order_array = []interface{}{}
  96. //
  97. Mysql_Main.InsertIgnoreBatch(Action_order_spec.TableName(), Action_order_spec.SaveFields(), action_order_spec_array)
  98. action_order_spec_array = []interface{}{}
  99. //
  100. Mysql_Main.InsertIgnoreBatch(Action_product_record.TableName(), Action_product_record.SaveFields(), action_product_record_array)
  101. action_product_record_array = []interface{}{}
  102. }
  103. log.Println("同步新增", a.TableName(), "表结束。。。", index)
  104. return
  105. }
  106. //
  107. func (a *action_order) getOrderType(vip_type int) int {
  108. if vip_type == 0 {
  109. return 1 //试用
  110. } else if vip_type == 1 {
  111. return 3 //续费
  112. } else if vip_type == 2 {
  113. return 4 //升级
  114. }
  115. return 2
  116. }
  117. //
  118. func (a *action_order) getProductCode(product_type string) string {
  119. if strings.ToUpper(product_type) == "VIP订阅" {
  120. return "101"
  121. } else if product_type == "历史数据" {
  122. return "102"
  123. } else if product_type == "招标文件解读" {
  124. return "103"
  125. } else if strings.HasPrefix(product_type, "大会员") {
  126. return "104"
  127. } else if product_type == "剑鱼币" {
  128. return "105"
  129. } else if product_type == "中标必听课" {
  130. return "106"
  131. } else if product_type == "企业商机管理" {
  132. return "107"
  133. } else if product_type == "剑鱼课程" {
  134. return "108"
  135. } else if product_type == "招投标课程" {
  136. return "109"
  137. } else if product_type == "数据报告" {
  138. return "110"
  139. } else if product_type == "线上课程" {
  140. return "111"
  141. }
  142. return ""
  143. }
  144. //
  145. func (a *action_order) getPayWay(pay_way string) string {
  146. if pay_way == "ali_app" || pay_way == "wx_app" {
  147. return "app"
  148. } else if pay_way == "wx_js" {
  149. return "wechat"
  150. } else if pay_way == "ali_pc" || pay_way == "wx_pc" {
  151. return "pc"
  152. }
  153. return ""
  154. }
  155. //
  156. func (a *action_order) update(start_unix, end_unix int64, start_layout, end_layout string) {
  157. log.Println("开始同步更新", a.TableName(), "表 。。。")
  158. index := 0
  159. array := [][]interface{}{}
  160. fields := []string{"order_snapshot_id", "pay_time", "pay_way", "order_status", "pay_money", "timestamp"}
  161. Mysql_From_Jianyu.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
  162. for _, v := range *l {
  163. index++
  164. pay_way := a.getPayWay(ObjToString(v["pay_way"]))
  165. array = append(array, []interface{}{v["id"], v["pay_time"], pay_way, v["order_status"], v["pay_money"], NowFormat(Date_Full_Layout)})
  166. if index%Config.UpdateBathSize == 0 {
  167. log.Println("同步更新", a.TableName(), "表", index)
  168. Mysql_Main.UpdateBath(a.TableName(), fields, array)
  169. array = [][]interface{}{}
  170. }
  171. }
  172. }, `select distinct a.id,a.pay_time,a.pay_way,a.order_status,a.pay_money,a.applybill_status from dataexport_order a
  173. left join apply_invoice b on (a.id=b.order_id)
  174. left join invoice c on (a.order_code=c.order_code)
  175. where (a.pay_time>=? and a.pay_time<?) or (b.apply_date>=? and b.apply_date<?) or (c.create_time>=? and c.create_time<?) or (c.billing_time>=? and c.billing_time<?) or (c.operable_time>=? and c.operable_time<?)`, start_layout, end_layout, fmt.Sprint(start_unix), fmt.Sprint(end_unix), start_unix, end_unix, start_unix, end_unix, start_layout, end_layout)
  176. if len(array) > 0 {
  177. Mysql_Main.UpdateBath(a.TableName(), fields, array)
  178. array = [][]interface{}{}
  179. }
  180. log.Println("同步更新", a.TableName(), "表结束。。。", index)
  181. }