task.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. package entity
  2. import (
  3. util "app.yhyue.com/moapp/jybase/common"
  4. "app.yhyue.com/moapp/jybase/encrypt"
  5. . "app.yhyue.com/moapp/jybase/mongodb"
  6. "context"
  7. "fmt"
  8. "github.com/gogf/gf/v2/util/gconv"
  9. "github.com/zeromicro/go-zero/core/logx"
  10. "go.mongodb.org/mongo-driver/bson/primitive"
  11. "regexp"
  12. "sort"
  13. "strconv"
  14. "strings"
  15. "time"
  16. . "userBehaviorTask/config"
  17. )
  18. type Task struct {
  19. }
  20. // 搜索结果汇总
  21. var SearchInfo = make(map[string][]map[string]interface{})
  22. // 三级页信息汇总
  23. var ContentInfo = make(map[string][]map[string]interface{})
  24. // 职位信息获取
  25. var positionUser = map[string]string{}
  26. // 三级页详情获取
  27. var biddingInfo map[string]map[string]interface{}
  28. // 数据库中数据初始化
  29. var subInItInfo map[string]map[string]string
  30. // 最终整合数据处理
  31. var subInfo = make(map[string]map[string]string)
  32. var (
  33. ArticleId = regexp.MustCompile(".*article/content/(.*)\\.html")
  34. NologinId = regexp.MustCompile(".*nologin/content/(.*)\\.html")
  35. )
  36. func (t *Task) Run() {
  37. go util.SimpleCrontab(false, DbConf.StatisticTime, func() {
  38. now := time.Now()
  39. start := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  40. end := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  41. t.ShuaKu(start.Unix(), end.Unix())
  42. })
  43. }
  44. func (t *Task) ShuaKu(start, end int64) {
  45. //职位标识与雇员标识处理
  46. UserHandle()
  47. //数据库数据初始化
  48. subRecommend()
  49. SearchInfo = make(map[string][]map[string]interface{})
  50. ContentInfo = make(map[string][]map[string]interface{})
  51. subInfo = make(map[string]map[string]string)
  52. searchHandle(start, end)
  53. biddingInfo = map[string]map[string]interface{}{}
  54. ContentHandle(start, end)
  55. //批量处理入库
  56. for userId, v := range SearchInfo {
  57. subInfo[userId] = map[string]string{
  58. "searchfor": gconv.String(v),
  59. }
  60. }
  61. for userId, v := range ContentInfo {
  62. //排序
  63. sort.Slice(v, func(i, j int) bool {
  64. return v[i]["datetimeInt"].(int64) > v[j]["datetimeInt"].(int64)
  65. })
  66. if len(v) > DbConf.InfoCount {
  67. ContentInfo[userId] = v[:DbConf.InfoCount-1]
  68. }
  69. if _, ok := subInfo[userId]; ok {
  70. subInfo[userId]["browse"] = gconv.String(v)
  71. } else {
  72. subInfo[userId] = map[string]string{
  73. "browse": gconv.String(v),
  74. }
  75. }
  76. }
  77. updateTime := time.Now().Format("2006-01-02 15:04:05")
  78. for userId, v := range subInfo {
  79. searchfor := ""
  80. browse := ""
  81. if _, ok := subInItInfo[userId]; ok {
  82. //汇总数据存在
  83. data := subInItInfo[userId]
  84. if v["searchfor"] == "" && data["searchfor"] != "" {
  85. searchfor = data["searchfor"]
  86. } else {
  87. searchfor = v["searchfor"]
  88. }
  89. if v["browse"] == "" && data["browse"] != "" {
  90. browse = data["browse"]
  91. } else {
  92. browse = v["browse"]
  93. }
  94. //修改操作
  95. sql := fmt.Sprintf(`alter table sub_recommend_rule UPDATE browse ='%s' ,searchfor='%s',update_time='%s' where userid = '%s'`, browse, searchfor, updateTime, userId)
  96. err := ClickhouseConn.Exec(context.Background(), sql)
  97. if err != nil {
  98. logx.Error(err)
  99. }
  100. } else {
  101. //需要新增汇总数据
  102. searchfor := v["searchfor"]
  103. browse := v["browse"]
  104. sql := fmt.Sprintf("INSERT INTO sub_recommend_rule (userid, searchfor, browse, update_time) values ('%s','%s','%s','%s')", userId, searchfor, browse, updateTime)
  105. err := ClickhouseConn.Exec(context.Background(), sql)
  106. if err != nil {
  107. logx.Error(err)
  108. }
  109. }
  110. }
  111. SearchInfo = make(map[string][]map[string]interface{})
  112. ContentInfo = make(map[string][]map[string]interface{})
  113. subInfo = make(map[string]map[string]string)
  114. biddingInfo = map[string]map[string]interface{}{}
  115. positionUser = map[string]string{}
  116. subInItInfo = map[string]map[string]string{}
  117. }
  118. func UserHandle() {
  119. positionUser = map[string]string{}
  120. sqlStr := "SELECT a.id, b.phone, a.ent_id FROM base_position a INNER JOIN base_user b ON a.type = 1 AND a.user_id = b.id"
  121. BaseServiceMysql.SelectByBath(10, func(l *[]map[string]interface{}) bool {
  122. for _, value := range *l {
  123. positionId := gconv.String(value["id"])
  124. phone := gconv.String(value["phone"])
  125. ent_id := gconv.Int64(value["ent_id"])
  126. entUser := JianYuMysql.FindOne("entniche_user", map[string]interface{}{
  127. "ent_id": ent_id,
  128. "phone": phone,
  129. }, "id", "")
  130. if entUser == nil {
  131. continue
  132. }
  133. entUserId := gconv.String((*entUser)["id"])
  134. positionUser[positionId] = entUserId
  135. }
  136. return true
  137. }, sqlStr)
  138. }
  139. // 搜索条件查询
  140. func searchHandle(start, end int64) {
  141. /*startTime := primitive.NewObjectIDFromTimestamp(time.Unix(start, 0))
  142. endTime := primitive.NewObjectIDFromTimestamp(time.Unix(end, 0))*/
  143. startTime := strconv.FormatInt(start, 16) + "0000000000000000"
  144. endTime := strconv.FormatInt(end, 16) + "0000000000000000"
  145. startTimeId, _ := primitive.ObjectIDFromHex(startTime)
  146. endTimeId, _ := primitive.ObjectIDFromHex(endTime)
  147. logx.Info("搜索条件start。。。", start, end, startTime, endTime)
  148. sess := MgoLog.GetMgoConn()
  149. defer MgoLog.DestoryMongoConn(sess)
  150. it := sess.DB("qfw").C("jy_search_log").Find(map[string]interface{}{
  151. "_id": map[string]interface{}{
  152. "$gte": startTimeId,
  153. "$lt": endTimeId,
  154. },
  155. }).Sort("createtime").Select(map[string]interface{}{}).Iter()
  156. var numb int64
  157. for m := make(map[string]interface{}); it.Next(&m); {
  158. numb++
  159. if numb%1000 == 0 {
  160. logx.Info("搜索条件跑了", numb)
  161. }
  162. userId := gconv.String(m["s_userid"])
  163. if !IsObjectIdHex(userId) {
  164. //职位标识替换为企业用户表示
  165. if positionUser[userId] == "" {
  166. continue
  167. }
  168. userId = positionUser[userId]
  169. }
  170. if SearchInfo[userId] != nil {
  171. if len(SearchInfo[userId]) >= DbConf.SearchCount {
  172. continue
  173. }
  174. }
  175. wordsMode := gconv.String(m["wordsMode"])
  176. key := []string{}
  177. key1 := gconv.Strings(util.If(gconv.String(m["search_word"]) == "", []string{}, strings.Split(gconv.String(m["search_word"]), " ")))
  178. key2 := gconv.Strings(util.If(gconv.String(m["additionalWords"]) == "", []string{}, strings.Split(gconv.String(m["additionalWords"]), ",")))
  179. if wordsMode == "包含所有" {
  180. key1 = append(key1, key2...)
  181. if len(key1) > 0 {
  182. key = append(key, strings.Join(key1, "+"))
  183. }
  184. } else {
  185. //任意一个
  186. if len(key1) > 0 {
  187. key = append(key, strings.Join(key1, "+"))
  188. }
  189. if len(key2) > 0 {
  190. key = append(key, key2...)
  191. }
  192. }
  193. if userId == "6291ca5e31e4ba4956a74a25" {
  194. logx.Info(111)
  195. }
  196. if len(key) == 0 {
  197. continue
  198. }
  199. searchMap := map[string]interface{}{
  200. "winnerTel": util.If(gconv.String(m["search_winnerTel"]) == "y", 1, 0), //0:不限 1:有中标单位联系方式
  201. "selectType": m["search_selectType"], //搜索范围
  202. "fileExists": gconv.Int64(m["fileExists"]), //0:不限 1:有附件 -1:无附件
  203. "notkeys": strings.Split(gconv.String(m["exclusionWords"]), " "), //排除词
  204. "area": strings.Split(gconv.String(m["search_area"]), ","),
  205. "city": strings.Split(gconv.String(m["search_city"]), ","),
  206. "keys": key, //关键词
  207. "buyerClass": strings.Split(gconv.String(m["search_buyerClass"]), ","), //采购单位行业
  208. "buyerTel": util.If(gconv.String(m["search_buyerTel"]) == "y", 1, 0), //0:不限 1:有采购单位联系方式
  209. "price": m["search_price"], //金额范围,可能没有开始金额,也可能没有结束金额
  210. "topType": strings.Split(gconv.String(m["search_topType"]), ","), //一级信息类型
  211. "subType": strings.Split(gconv.String(m["search_subType"]), ","), //二级信息类型
  212. "industry": strings.Split(gconv.String(m["search_industry"]), ","), //行业分类
  213. "datetime": time.Unix(gconv.Int64(m["createtime"]), 0).Format("2006-01-02 15:04:05"), //搜索时间
  214. }
  215. if _, ok := SearchInfo[userId]; ok {
  216. SearchInfo[userId] = append(SearchInfo[userId], searchMap)
  217. } else {
  218. SearchInfo[userId] = []map[string]interface{}{
  219. searchMap,
  220. }
  221. }
  222. }
  223. }
  224. // 三级页浏览记录查询
  225. func ContentHandle(start, end int64) {
  226. var AppContentInfo = make(map[string][]map[string]interface{})
  227. var numb int64
  228. startTime := strconv.FormatInt(start, 16) + "0000000000000000"
  229. endTime := strconv.FormatInt(end, 16) + "0000000000000000"
  230. startTimeId, _ := primitive.ObjectIDFromHex(startTime)
  231. endTimeId, _ := primitive.ObjectIDFromHex(endTime)
  232. logx.Info("三级页浏览数据start。。。", start, end, startTime, endTime)
  233. sess := MgoLog.GetMgoConn()
  234. defer MgoLog.DestoryMongoConn(sess)
  235. //jylog
  236. it := sess.DB("qfw").C("jy_logs").Find(map[string]interface{}{
  237. "_id": map[string]interface{}{
  238. "$gte": startTimeId,
  239. "$lt": endTimeId,
  240. },
  241. }).Sort("_id").Select(map[string]interface{}{
  242. "date": 1,
  243. "url": 1,
  244. "userid": 1,
  245. }).Iter()
  246. for m := make(map[string]interface{}); it.Next(&m); {
  247. numb++
  248. if numb%1000 == 0 {
  249. logx.Info("pc三级页跑了", numb)
  250. }
  251. InformationHandle(m, ContentInfo)
  252. }
  253. //jyapp
  254. numb = 0
  255. sess1 := MgoLog.GetMgoConn()
  256. defer MgoLog.DestoryMongoConn(sess1)
  257. it1 := sess1.DB("qfw").C("jyapp_logs").Find(map[string]interface{}{
  258. "_id": map[string]interface{}{
  259. "$gte": startTimeId,
  260. "$lt": endTimeId,
  261. },
  262. }).Sort("_id").Select(map[string]interface{}{
  263. "date": 1,
  264. "url": 1,
  265. "userid": 1,
  266. }).Iter()
  267. for m := make(map[string]interface{}); it1.Next(&m); {
  268. numb++
  269. if numb%1000 == 0 {
  270. logx.Info("app三级页记录跑了", numb)
  271. }
  272. InformationHandle(m, AppContentInfo)
  273. }
  274. //两个map合成一个处理
  275. for userId, v := range AppContentInfo {
  276. if ContentInfo[userId] == nil {
  277. ContentInfo[userId] = v
  278. } else {
  279. ContentInfo[userId] = append(ContentInfo[userId], v...)
  280. }
  281. }
  282. }
  283. // 资讯数据处理
  284. func InformationHandle(m map[string]interface{}, data map[string][]map[string]interface{}) {
  285. userId := gconv.String(m["userid"])
  286. if !IsObjectIdHex(userId) {
  287. //职位标识替换为企业用户表示
  288. if positionUser[userId] == "" {
  289. return
  290. }
  291. userId = positionUser[userId]
  292. }
  293. if data[userId] != nil {
  294. if len(data[userId]) >= DbConf.InfoCount {
  295. return
  296. }
  297. }
  298. fu1 := ArticleId.FindStringSubmatch(gconv.String(m["url"]))
  299. fu2 := NologinId.FindStringSubmatch(gconv.String(m["url"]))
  300. infoId := ""
  301. if len(fu1) > 1 {
  302. if len(fu1[1]) > 10 {
  303. infoId = encrypt.DecodeArticleId2ByCheck(fu1[1])[0]
  304. }
  305. } else if len(fu2) > 0 {
  306. if len(fu2[1]) > 10 {
  307. infoId = encrypt.DecodeArticleId2ByCheck(fu2[1])[0]
  308. }
  309. }
  310. if infoId == "" {
  311. return
  312. }
  313. //分类标签,bidding表gov_classify.root,过滤掉只有一级的,取最后一级
  314. rootStr := ""
  315. area := ""
  316. city := ""
  317. district := ""
  318. projectname := ""
  319. if _, ok := biddingInfo[infoId]; !ok {
  320. bidding, _ := MgoBidding.FindById("bidding", infoId, `{"gov_classify":1,"area":1,"projectname":1,"city":1,"district":1}`)
  321. biddingInfo[infoId] = map[string]interface{}{}
  322. if bidding == nil {
  323. return
  324. }
  325. gov_classify := gconv.Map((*bidding)["gov_classify"])
  326. area = gconv.String((*bidding)["area"])
  327. projectname = gconv.String((*bidding)["projectname"])
  328. district = gconv.String((*bidding)["district"])
  329. city = gconv.String((*bidding)["city"])
  330. if gov_classify != nil {
  331. root := strings.Split(gconv.String(gov_classify["root"]), "/")
  332. if len(root) == 0 {
  333. return
  334. } else {
  335. for i := len(root) - 1; i >= 1; i-- {
  336. if root[i] != "" {
  337. rootStr = root[i]
  338. biddingInfo[infoId] = map[string]interface{}{
  339. "area": area,
  340. "city": city,
  341. "projectname": projectname,
  342. "district": district,
  343. "rootStr": rootStr,
  344. }
  345. break
  346. }
  347. }
  348. }
  349. }
  350. } else {
  351. rootStr = gconv.String(biddingInfo[infoId]["rootStr"])
  352. area = gconv.String(biddingInfo[infoId]["area"])
  353. city = gconv.String(biddingInfo[infoId]["city"])
  354. district = gconv.String(biddingInfo[infoId]["district"])
  355. projectname = gconv.String(biddingInfo[infoId]["projectname"])
  356. }
  357. if rootStr == "" {
  358. return
  359. }
  360. searchMap := map[string]interface{}{
  361. "infoid": infoId, //信息id
  362. "classify": rootStr, //分类标签,bidding表gov_classify.root,过滤掉只有一级的,取最后一级
  363. "datetimeInt": gconv.Int64(m["date"]), //访问时间
  364. "datetime": time.Unix(gconv.Int64(m["date"]), 0).Format("2006-01-02 15:04:05"), //搜索时间
  365. "area": area,
  366. "city": city,
  367. "projectname": projectname,
  368. "district": district,
  369. }
  370. if _, ok := data[userId]; ok {
  371. data[userId] = append(data[userId], searchMap)
  372. } else {
  373. data[userId] = []map[string]interface{}{
  374. searchMap,
  375. }
  376. }
  377. }
  378. type Recommend struct {
  379. UserId string `ch:"userid"`
  380. SearchFor string `ch:"searchfor"`
  381. Browse string `ch:"browse"`
  382. }
  383. // subRecommend初始化
  384. func subRecommend() {
  385. subInItInfo = map[string]map[string]string{}
  386. rData1, err := ClickhouseConn.Query(context.Background(), `SELECT userid, searchfor, browse FROM sub_recommend_rule order by userid`)
  387. if err != nil {
  388. logx.Error("汇总表查询出错:", err)
  389. return
  390. }
  391. for rData1.Next() {
  392. data := Recommend{}
  393. rData1.ScanStruct(&data)
  394. userid := data.UserId
  395. subInItInfo[userid] = map[string]string{
  396. "searchfor": data.SearchFor,
  397. "browse": data.Browse,
  398. }
  399. }
  400. }