job.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package job
  2. import (
  3. uuid2 "app.yhyue.com/moapp/jybase/go-xweb/uuid"
  4. "context"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/util/gconv"
  7. "github.com/gogf/gf/v2/util/gutil"
  8. "github.com/pkg/errors"
  9. "regexp"
  10. "strings"
  11. "time"
  12. "workTasks/common/match"
  13. "workTasks/urlStatic/clickIterSource"
  14. )
  15. type (
  16. UserId string
  17. UUid string
  18. MatchObj struct {
  19. urlMapping map[string][]*matchItem
  20. result map[UUid]map[UserId]int64 //统计计算次数
  21. uuidCodeMapping map[UUid]string
  22. matchObj *match.TrieNode
  23. }
  24. matchItem struct {
  25. UUid UUid
  26. Name string
  27. Code string
  28. Rule map[string]interface{}
  29. }
  30. dbStruct struct {
  31. UserId string `json:"userId"`
  32. Code string `json:"code"`
  33. Num int64 `json:"num"`
  34. Date time.Time `json:"date"`
  35. }
  36. )
  37. var re = regexp.MustCompile(`\[[^\]]*\]`)
  38. func NewMatchObj(ctx context.Context) *MatchObj {
  39. var (
  40. urlMapping = map[string][]*matchItem{}
  41. treeMatch = &match.TrieNode{}
  42. uuidCodeMapping = map[UUid]string{}
  43. result = map[UUid]map[UserId]int64{}
  44. )
  45. for _, mObj := range g.Cfg().MustGet(ctx, "match").Maps() {
  46. var (
  47. name = gconv.String(mObj["name"])
  48. code = gconv.String(mObj["code"])
  49. rule = gconv.Map(mObj["rule"])
  50. urlArr []string
  51. newMatchItem []*matchItem
  52. )
  53. for _, s := range gconv.Strings(rule["url"]) {
  54. urlArr = append(urlArr, s)
  55. }
  56. arr := re.FindAllStringSubmatch(code, -1)
  57. if len(arr) > 0 {
  58. for _, str := range arr {
  59. var (
  60. rep = str[0]
  61. key = rep[1 : len(rep)-1]
  62. repVal = gconv.Strings(rule[key])
  63. )
  64. for _, val := range repVal {
  65. ruleItem := gconv.Map(gutil.Copy(rule))
  66. ruleItem[key] = val
  67. var (
  68. uuid = UUid(uuid2.New())
  69. finalCode = strings.ReplaceAll(code, rep, val)
  70. )
  71. uuidCodeMapping[uuid] = finalCode
  72. newMatchItem = append(newMatchItem, &matchItem{
  73. UUid: uuid,
  74. Name: name,
  75. Code: strings.ReplaceAll(code, rep, val),
  76. Rule: ruleItem,
  77. })
  78. }
  79. }
  80. } else {
  81. uuid := UUid(uuid2.New())
  82. uuidCodeMapping[uuid] = code
  83. newMatchItem = append(newMatchItem, &matchItem{
  84. UUid: uuid,
  85. Name: name,
  86. Code: code,
  87. Rule: rule,
  88. })
  89. }
  90. if len(urlArr) > 0 {
  91. for _, uStr := range gconv.Strings(rule["url"]) {
  92. if uStr != "" {
  93. treeMatch.Insert(uStr)
  94. }
  95. urlMapping[uStr] = newMatchItem
  96. }
  97. } else {
  98. urlMapping[""] = newMatchItem
  99. }
  100. }
  101. for uid, _ := range uuidCodeMapping {
  102. result[uid] = make(map[UserId]int64)
  103. }
  104. return &MatchObj{
  105. matchObj: treeMatch,
  106. urlMapping: urlMapping,
  107. result: result,
  108. uuidCodeMapping: uuidCodeMapping,
  109. }
  110. }
  111. func (o *MatchObj) Match(ctx context.Context, start, end time.Time) error {
  112. rows, err := clickIterSource.Ch_analysis.Query(ctx, "SELECT date,url,user_id,mini_program_code,action_id,breaker_name,page_id FROM dwd_f_personnel_behavior WHERE user_id!='' AND date>=? and date<? ", start.Format(time.DateTime), end.Format(time.DateTime))
  113. if err != nil {
  114. return errors.Wrap(err, "加载日志数据异常")
  115. }
  116. defer rows.Close()
  117. for rows.Next() {
  118. var (
  119. date time.Time
  120. user_id, url, mini_program_code, action_id, breaker_name, page_id string
  121. )
  122. if err := rows.Scan(
  123. &date, &url, &user_id, &mini_program_code, &action_id, &breaker_name, &page_id,
  124. ); err != nil {
  125. g.Log().Errorf(ctx, "字段映射异常 %v", err)
  126. }
  127. for _, str := range append(o.matchObj.FindWords(url), "") {
  128. for _, item := range o.urlMapping[str] {
  129. var isValid = func() bool {
  130. for key, val := range item.Rule {
  131. if key == "url" {
  132. continue
  133. }
  134. switch key {
  135. case "mini_program_code":
  136. if !hasPrefix(mini_program_code, val) {
  137. return false
  138. }
  139. case "action_id":
  140. if !compareValue(action_id, val) {
  141. return false
  142. }
  143. case "breaker_name":
  144. if !compareValue(breaker_name, val) {
  145. return false
  146. }
  147. case "page_id":
  148. if !compareValue(page_id, val) {
  149. return false
  150. }
  151. }
  152. }
  153. return true
  154. }()
  155. if isValid {
  156. if _, ok := o.result[item.UUid][UserId(user_id)]; ok {
  157. o.result[item.UUid][UserId(user_id)]++
  158. } else {
  159. o.result[item.UUid][UserId(user_id)] = 1
  160. }
  161. }
  162. }
  163. }
  164. }
  165. return nil
  166. }
  167. func compareValue(val string, setting interface{}) bool {
  168. if v, ok := setting.(string); ok {
  169. return v == val
  170. } else if v, ok := setting.([]string); ok {
  171. for _, s := range v {
  172. if s == val {
  173. return true
  174. }
  175. }
  176. }
  177. return false
  178. }
  179. func hasPrefix(val string, setting interface{}) bool {
  180. if v, ok := setting.(string); ok {
  181. return strings.HasPrefix(val, v)
  182. } else if v, ok := setting.([]string); ok {
  183. for _, s := range v {
  184. return strings.HasPrefix(val, s)
  185. }
  186. }
  187. return false
  188. }
  189. func (o *MatchObj) Save(ctx context.Context, data time.Time) error {
  190. var (
  191. r []*dbStruct
  192. total int
  193. )
  194. for uuid, mData := range o.result {
  195. for userId, num := range mData {
  196. r = append(r, &dbStruct{
  197. UserId: string(userId),
  198. Code: o.uuidCodeMapping[uuid],
  199. Num: num,
  200. Date: data,
  201. })
  202. }
  203. if len(r) >= 2000 {
  204. var t = r
  205. if _, err := g.DB().Save(ctx, "dwd_d_visit", t, 500); err != nil {
  206. g.Log().Errorf(ctx, "save errr %v", err)
  207. }
  208. total += 500
  209. r = []*dbStruct{}
  210. }
  211. }
  212. if len(r) > 0 {
  213. if _, err := g.DB().Save(ctx, "dwd_d_visit", r, 500); err != nil {
  214. g.Log().Errorf(ctx, "save errr %v", err)
  215. }
  216. total += len(r)
  217. }
  218. if total > 0 {
  219. g.Log().Infof(ctx, "任务[%s] 共%d条数据", data, total)
  220. }
  221. return nil
  222. }