tag.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. package main
  2. import (
  3. "app.yhyue.com/moapp/jybase/common"
  4. "context"
  5. "fmt"
  6. "github.com/gogf/gf/v2/util/gconv"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "log"
  9. "regexp"
  10. "strings"
  11. "time"
  12. )
  13. func tagAllSync() {
  14. //dwd_f_userbase_baseinfo -->l_registedate 注册日期 -->userid
  15. //dwd_f_userbase_visit_info -->date 访问时间 -->userid
  16. //dwd_f_userbase_search_info -->search_area去重 jianyu_subjectdb_test.d_area_code -->userid
  17. //dwd_f_userbase_search_info -->search_word去重 -->userid
  18. //dwd_f_userbase_subscribe_info -->subscribe_areas去重 jianyu_subjectdb_test.d_area_code -->userid
  19. //dwd_f_userbase_subscribe_info -->subscribe_keywords去重 -->userid
  20. //dwd_f_userbase_order_info -->product_type去重 -->userid
  21. //dwd_f_userbase_order_info -->product_type=VIP订阅 vip_endtime 一周内到期、一月内到期 -->userid 暂时不要了
  22. //dwd_f_userbase_event_info -->createtime查昨天有没有数据, 有数据-昨日浏览过 没数据-昨日未浏览 -->userid
  23. log.Println("用户标签定时任务开始")
  24. // TiDb.ExecBySql(`SET session max_execution_time=86400`)
  25. count := 0
  26. now := time.Now()
  27. var allUser []map[string]interface{}
  28. startOfDay := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
  29. TiDb.SelectByBath(100, func(l *[]map[string]interface{}) bool {
  30. for _, v := range *l {
  31. allUser = append(allUser, v)
  32. }
  33. return true
  34. }, fmt.Sprintf(`select userid from dwd_f_userbase_visit_info where createtime>"%s" order by createtime asc`, startOfDay.Format(time.DateTime)))
  35. TiDb.Update("dwd_f_crm_attribute_label", map[string]interface{}{}, map[string]interface{}{
  36. "members_info": "昨日未浏览",
  37. "updatetime": time.Now().Format("2006-01-02 15:04:05"),
  38. })
  39. for _, v := range allUser {
  40. count++
  41. //查询用户信息
  42. userId := common.ObjToString(v["userid"])
  43. userData := TiDb.SelectBySql("select l_registedate,userid,uid,base_user_id from dwd_f_userbase_baseinfo where userid=?", userId)
  44. for _, vv := range *userData {
  45. FormatTag(vv, count)
  46. }
  47. log.Println(count)
  48. }
  49. log.Println("用户标签定时任务结束")
  50. }
  51. func FormatTag(data map[string]interface{}, count int) {
  52. registedate := common.ObjToString(data["l_registedate"])
  53. userId := common.ObjToString(data["userid"])
  54. uId := common.ObjToString(data["uid"])
  55. base_user_id := common.Int64All(data["base_user_id"])
  56. log.Println("第", count, "条:", uId, registedate, userId)
  57. if registedate == "" || uId == "" || userId == "" {
  58. log.Println("缺少信息 ", registedate, uId, userId)
  59. return
  60. }
  61. date, search_areass, search_wordss, product_types, subscribe_keywords, subscribe_areas := "", "", "", "", "", ""
  62. //访问数据
  63. visitData := TiDb.FindOne("dwd_f_userbase_visit_info", map[string]interface{}{"userid": userId}, "", "date desc")
  64. if visitData != nil && len(*visitData) > 0 {
  65. date = common.ObjToString((*visitData)["date"])
  66. }
  67. //搜索数据
  68. searchData := TiDb.Find("dwd_f_userbase_search_info", map[string]interface{}{"userid": userId}, "", "", -1, -1)
  69. if searchData != nil && len(*searchData) > 0 {
  70. search_areas, search_words := "", ""
  71. for k, v := range *searchData {
  72. search_area := common.ObjToString(v["search_area"])
  73. search_word := common.ObjToString(v["search_word"])
  74. if k == len(*searchData)-1 && search_area != "" {
  75. search_areas += search_area
  76. } else if search_area != "" {
  77. search_areas += search_area + ","
  78. }
  79. if k == len(*searchData)-1 && search_word != "" {
  80. search_words += search_word
  81. } else if search_word != "" {
  82. search_words += search_word + ","
  83. }
  84. }
  85. search_areas_arr, search_words_arr := []string{}, []string{}
  86. for _, v := range strings.Split(search_areas, ",") {
  87. isOk := false
  88. for _, vv := range search_areas_arr {
  89. if vv == v {
  90. isOk = true
  91. }
  92. }
  93. if !isOk && v != "" {
  94. search_areas_arr = append(search_areas_arr, v)
  95. }
  96. }
  97. for _, v := range strings.Split(search_words, ",") {
  98. isOk := false
  99. for _, vv := range search_words_arr {
  100. if vv == v {
  101. isOk = true
  102. }
  103. }
  104. if !isOk && v != "" {
  105. search_words_arr = append(search_words_arr, v)
  106. }
  107. }
  108. search_areass = strings.Join(search_areas_arr, ",")
  109. search_wordss = strings.Join(search_words_arr, ",")
  110. }
  111. //订单购买的服务
  112. orderData := TiDb.Find("dwd_f_userbase_order_info", map[string]interface{}{"userid": userId, "order_status": 1, "delete_status": 0}, "", "", -1, -1)
  113. //orderData := TiDb.SelectBySql("select * from dwd_f_userbase_order_info where userid=? and order_status=1 and delete_status=0 and payable_money > 0", userId)
  114. if orderData != nil && len(*orderData) > 0 {
  115. product_type_arr, product_type_arrs := []string{}, []string{}
  116. for _, v := range *orderData {
  117. payable_money := gconv.Int64(v["payable_money"])
  118. if payable_money <= 0 {
  119. continue
  120. }
  121. product_type := common.ObjToString(v["product_type"])
  122. if product_type != "" {
  123. product_type_arr = append(product_type_arr, product_type)
  124. }
  125. }
  126. for _, v := range product_type_arr {
  127. isOk := false
  128. for _, vv := range product_type_arrs {
  129. if vv == v {
  130. isOk = true
  131. }
  132. }
  133. if !isOk {
  134. product_type_arrs = append(product_type_arrs, v)
  135. }
  136. }
  137. product_types = strings.Join(product_type_arrs, ",")
  138. }
  139. //订阅数据
  140. subscribeData := TiDb.FindOne("dwd_f_userbase_subscribe_info", map[string]interface{}{"userid": userId}, "", "updatetime desc")
  141. if subscribeData != nil && len(*subscribeData) > 0 {
  142. subscribe_keywords = common.ObjToString((*subscribeData)["subscribe_keywords"])
  143. subscribe_areas = common.ObjToString((*subscribeData)["subscribe_areas"])
  144. }
  145. //会员介绍页面
  146. start := time.Now().AddDate(0, 0, -1).Format("2006-01-02") + " 00:00:00"
  147. end := time.Now().Format("2006-01-02") + " 00:00:00"
  148. eventStr := "昨日未浏览"
  149. eventCount := TiDb.SelectBySql(`select * from dwd_f_userbase_event_info where userid = "` + userId + `" and eventtype = "会员介绍页面" and createtime >= "` + start + `" and createtime <= "` + end + `"`)
  150. if eventCount != nil && len(*eventCount) > 0 {
  151. eventStr = "昨日浏览过"
  152. }
  153. nowTime := time.Now().Format("2006-01-02 15:04:05")
  154. //聊天记录标签
  155. keyStr := ""
  156. phoneRegexp := regexp.MustCompile(`^1[0-9]{10}$`)
  157. keyMap := map[string]bool{}
  158. matchArr := Base.SelectBySql(`select aa.name from socialize_keyword aa where aa.mold=0 and aa.state=0 and FIND_IN_SET(2,aa.group) `)
  159. sqls := fmt.Sprintf(`select b.content from socialize_message_mailbox a LEFT JOIN socialize_message b on b.id = a.messag_id where a.type in (4,5,7) and a.own_type = 2 and a.own_id = %d and b.title = "文本" and a.send_user_type = 2 and a.create_time >= "%s"`, base_user_id, time.Now().AddDate(0, 0, -30).Format("2006-01-02 15:04:05"))
  160. mData := Base.SelectBySql(sqls)
  161. if mData != nil && *mData != nil && len(*mData) > 0 {
  162. for _, vv := range *mData {
  163. content := common.ObjToString(vv["content"])
  164. if phoneRegexp.MatchString(content) {
  165. keyMap[content] = true
  166. }
  167. for _, vvv := range *matchArr {
  168. key := gconv.String(vvv["name"])
  169. if strings.Contains(strings.ToUpper(content), strings.ToUpper(key)) {
  170. keyMap[key] = true
  171. break
  172. }
  173. }
  174. }
  175. }
  176. if len(keyMap) > 0 {
  177. keyArr := []string{}
  178. for k := range keyMap {
  179. keyArr = append(keyArr, k)
  180. }
  181. keyStr = strings.Join(keyArr, ",")
  182. }
  183. if TiDb.Count("dwd_f_crm_attribute_label", map[string]interface{}{"uid": uId}) > 0 {
  184. TiDb.Update("dwd_f_crm_attribute_label", map[string]interface{}{"uid": uId}, map[string]interface{}{
  185. "last_login_time": common.If(date != "", date, nil),
  186. "search_areas": search_areass,
  187. "search_words": search_wordss,
  188. "subscribe_areas": subscribe_areas,
  189. "subscribe_keywords": subscribe_keywords,
  190. "product_type": product_types,
  191. "updatetime": nowTime,
  192. "members_info": eventStr,
  193. "messagekey": common.If(keyStr == "", nil, keyStr),
  194. })
  195. } else {
  196. TiDb.Insert("dwd_f_crm_attribute_label", map[string]interface{}{
  197. "uid": uId,
  198. "registedate": registedate,
  199. "last_login_time": common.If(date != "", date, nil),
  200. "search_areas": search_areass,
  201. "search_words": search_wordss,
  202. "subscribe_areas": subscribe_areas,
  203. "subscribe_keywords": subscribe_keywords,
  204. "product_type": product_types,
  205. "updatetime": nowTime,
  206. "members_info": eventStr,
  207. "messagekey": common.If(keyStr == "", nil, keyStr),
  208. })
  209. }
  210. }
  211. func tagAddSync() {
  212. log.Println("注册日期、订单增量定时任务开始")
  213. userData := TiDb.SelectBySql(`select l_registedate,createtime,uid,userid from dwd_f_userbase_baseinfo where createtime >= "` + cfg.LastUserTime + `" order by createtime asc`)
  214. if userData != nil && len(*userData) > 0 {
  215. for k, v := range *userData {
  216. nowTime := time.Now().Format("2006-01-02 15:04:05")
  217. registedate := common.ObjToString(v["l_registedate"])
  218. createtime := common.ObjToString(v["createtime"])
  219. userId := common.ObjToString(v["userid"])
  220. uId := common.ObjToString(v["uid"])
  221. if registedate == "" || uId == "" || userId == "" {
  222. log.Println("缺少信息", uId, userId)
  223. continue
  224. } else {
  225. log.Println("新注册", uId, userId)
  226. }
  227. if k == len(*userData)-1 {
  228. cfg.LastUserTime = createtime
  229. }
  230. if TiDb.Count("dwd_f_crm_attribute_label", map[string]interface{}{"uid": uId}) > 0 {
  231. TiDb.Update("dwd_f_crm_attribute_label", map[string]interface{}{"uid": uId}, map[string]interface{}{
  232. "registedate": registedate,
  233. "updatetime": nowTime,
  234. })
  235. } else {
  236. TiDb.Insert("dwd_f_crm_attribute_label", map[string]interface{}{
  237. "uid": uId,
  238. "registedate": registedate,
  239. "updatetime": nowTime,
  240. })
  241. }
  242. }
  243. }
  244. log.Println("注册日期定时任务结束")
  245. orderData := TiDb.SelectBySql(`select product_type,autoUpdate,uid from dwd_f_userbase_order_info where order_status = 1 and autoUpdate >= "` + cfg.LastOrderTime + `" order by autoUpdate desc`)
  246. if orderData != nil && len(*orderData) > 0 {
  247. for k, order := range *orderData {
  248. nowTime := time.Now().Format("2006-01-02 15:04:05")
  249. autoUpdate := common.ObjToString(order["autoUpdate"])
  250. uId := common.ObjToString(order["uid"])
  251. if uId == "" {
  252. log.Println("缺少信息")
  253. continue
  254. }
  255. product_types := ""
  256. if k == 0 {
  257. cfg.LastOrderTime = autoUpdate
  258. }
  259. orderDatas := TiDb.Find("dwd_f_userbase_order_info", map[string]interface{}{"uid": uId, "order_status": 1, "delete_status": 0}, "", "", -1, -1)
  260. if orderDatas != nil && len(*orderDatas) > 0 {
  261. product_type_arr, product_type_arrs := []string{}, []string{}
  262. for _, v := range *orderDatas {
  263. payable_money := gconv.Int64(v["payable_money"])
  264. if payable_money <= 0 {
  265. continue
  266. }
  267. product_type := common.ObjToString(v["product_type"])
  268. log.Println("product_type ", product_type)
  269. if product_type != "" {
  270. product_type_arr = append(product_type_arr, product_type)
  271. }
  272. }
  273. log.Println("product_type_arr ", product_type_arr)
  274. for _, v := range product_type_arr {
  275. isOk := false
  276. for _, vv := range product_type_arrs {
  277. if vv == v {
  278. isOk = true
  279. }
  280. }
  281. if !isOk {
  282. product_type_arrs = append(product_type_arrs, v)
  283. }
  284. }
  285. product_types = strings.Join(product_type_arrs, ",")
  286. }
  287. log.Println("product_types ", uId, product_types)
  288. if TiDb.Count("dwd_f_crm_attribute_label", map[string]interface{}{"uid": uId}) > 0 {
  289. TiDb.Update("dwd_f_crm_attribute_label", map[string]interface{}{"uid": uId}, map[string]interface{}{
  290. "product_type": product_types,
  291. "updatetime": nowTime,
  292. })
  293. } else {
  294. TiDb.Insert("dwd_f_crm_attribute_label", map[string]interface{}{
  295. "uid": uId,
  296. "product_type": product_types,
  297. "updatetime": nowTime,
  298. })
  299. }
  300. }
  301. }
  302. common.WriteSysConfig(&cfg)
  303. log.Println("注册日期、订单增量定时任务结束")
  304. }
  305. // 在线客服聊天记录30分钟一次
  306. func messageSync() {
  307. log.Println("在线客服聊天记录定时任务开始")
  308. phoneRegexp := regexp.MustCompile(`^1[0-9]{10}$`)
  309. sql := fmt.Sprintf(`select * from socialize_message where title = "文本" and create_time > "%s"`, cfg.LastMessageTime)
  310. data := Base.SelectBySql(sql)
  311. if data != nil && *data != nil && len(*data) > 0 {
  312. for _, v := range *data {
  313. isOk := false
  314. content := common.ObjToString(v["content"])
  315. matchArr := Base.SelectBySql(`select aa.name from socialize_keyword aa where aa.mold=0 and aa.state=0 and FIND_IN_SET(2,aa.group) `)
  316. for _, vv := range *matchArr {
  317. key := gconv.String(vv["name"])
  318. if strings.Contains(strings.ToUpper(content), strings.ToUpper(key)) || phoneRegexp.MatchString(content) {
  319. isOk = true
  320. }
  321. }
  322. if isOk {
  323. messag_id := common.Int64All(v["id"])
  324. mData := Base.FindOne("socialize_message_mailbox", map[string]interface{}{"messag_id": messag_id, "own_type": 2}, "", "")
  325. if mData != nil && len(*mData) > 0 {
  326. own_id := common.Int64All((*mData)["own_id"])
  327. if own_id > 0 {
  328. ok1, ok2, _ := FormatData(*mData, "message")
  329. if !ok1 {
  330. log.Println("线索卡点", "message", mData, messag_id)
  331. } else {
  332. if !ok2 {
  333. log.Println("用户分配已达上限", "message", mData, messag_id)
  334. }
  335. }
  336. }
  337. }
  338. }
  339. cfg.LastMessageTime = common.ObjToString(v["create_time"])
  340. }
  341. }
  342. common.WriteSysConfig(&cfg)
  343. log.Println("在线客服聊天记录定时任务结束")
  344. }
  345. // 客服按钮处理
  346. func customerButton() {
  347. log.Println("在线客服按钮定时任务开始")
  348. sql := fmt.Sprintf(`select * from socialize_message where title = "点击按钮" and create_time > "%s"`, cfg.LastMessageButtonTime)
  349. data := Base.SelectBySql(sql)
  350. if data != nil && *data != nil && len(*data) > 0 {
  351. for _, v := range *data {
  352. isOk := false
  353. content := common.ObjToString(v["content"])
  354. matchArr := Base.SelectBySql(`select aa.name from socialize_keyword aa where aa.mold=1 and aa.state=1 and FIND_IN_SET(2,aa.group) `)
  355. for _, vv := range *matchArr {
  356. key := gconv.String(vv["name"])
  357. if strings.Contains(strings.ToUpper(content), strings.ToUpper(key)) {
  358. isOk = true
  359. }
  360. }
  361. if isOk {
  362. messag_id := common.Int64All(v["id"])
  363. mData := Base.FindOne("socialize_message_mailbox", map[string]interface{}{"messag_id": messag_id, "own_type": 2}, "", "")
  364. if mData != nil && len(*mData) > 0 {
  365. own_id := common.Int64All((*mData)["own_id"])
  366. if own_id > 0 {
  367. ok1, ok2, _ := FormatData(*mData, "message")
  368. if !ok1 {
  369. log.Println("线索卡点", "message", mData, messag_id)
  370. } else {
  371. if !ok2 {
  372. log.Println("用户分配已达上限", "message", mData, messag_id)
  373. }
  374. }
  375. }
  376. }
  377. }
  378. cfg.LastMessageButtonTime = common.ObjToString(v["create_time"])
  379. }
  380. }
  381. common.WriteSysConfig(&cfg)
  382. log.Println("在线客服按钮定时任务结束")
  383. }
  384. func LabelToClue() {
  385. log.Println("735需求 标签进线索开始")
  386. for _, v := range lableJson.ConditionConfig {
  387. log.Println("735标签用户sql:", v.SubName)
  388. dataArr := getBitmapIntersection(v.ConditionArr)
  389. log.Println("735标签用户sql:", v.SubName, len(dataArr))
  390. if len(dataArr) == 0 {
  391. continue
  392. }
  393. for _, baseUserId := range dataArr {
  394. log.Println("735标签用户:", baseUserId)
  395. if baseUserId == 0 {
  396. continue
  397. }
  398. FormatData(map[string]interface{}{
  399. "baseUserId": baseUserId,
  400. "topName": v.TopName,
  401. "subName": v.SubName,
  402. }, "tag")
  403. }
  404. }
  405. log.Println("735需求 标签进线索结束")
  406. }
  407. type AggStruct struct {
  408. Intersection []int `ch:"intersection"`
  409. }
  410. func getBitmapIntersection(ids []Condition) []int {
  411. // 构建查询的 SELECT 和 JOIN 部分
  412. joinParts := []string{}
  413. for i := len(ids); i > 0; i-- {
  414. joinParts = append(joinParts, fmt.Sprintf(`
  415. SELECT bitobj AS bit%d
  416. FROM dwd_d_tag
  417. WHERE code = '%v'`, i, ids[i-1].Code))
  418. }
  419. selectParts := ""
  420. for i := 0; i < len(ids); i++ {
  421. if i == 0 {
  422. if gconv.Bool(ids[i+1].Fool) {
  423. selectParts = fmt.Sprintf("bitmapAnd( bit%d,bit%d)", i+2, i+1)
  424. } else {
  425. selectParts = fmt.Sprintf("bitmapAndnot( bit%d,bit%d)", i+1, i+2)
  426. }
  427. i++
  428. } else {
  429. if gconv.Bool(ids[i].Fool) {
  430. selectParts = fmt.Sprintf("bitmapAnd( bit%d,%s)", i+1, selectParts)
  431. } else {
  432. selectParts = fmt.Sprintf("bitmapAndnot( %s,bit%d)", selectParts, i+1)
  433. }
  434. }
  435. }
  436. // 构建 SQL 查询
  437. query := fmt.Sprintf(`
  438. SELECT bitmapToArray(%s) AS intersection
  439. FROM (%s) AS t1`,
  440. selectParts, // 连接 SELECT 部分
  441. strings.Join(joinParts, ") AS t%d CROSS JOIN (")) // 连接 JOIN 部分
  442. // 修正 CROSS JOIN 部分
  443. for i := 1; i < len(ids); i++ {
  444. query = strings.Replace(query, "AS t%d", fmt.Sprintf("AS t%d", i+1), 1)
  445. }
  446. log.Println("p735标签接口:", query)
  447. rows, _ := ClickhouseConn.Query(context.TODO(), query)
  448. ass := []int{}
  449. for rows.Next() {
  450. as := AggStruct{}
  451. if err := rows.ScanStruct(&as); err != nil {
  452. logx.Error(err)
  453. continue
  454. }
  455. ass = as.Intersection
  456. }
  457. rows.Close()
  458. return ass
  459. }