task.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. package entity
  2. import (
  3. util "app.yhyue.com/moapp/jybase/common"
  4. "app.yhyue.com/moapp/jybase/encrypt"
  5. . "app.yhyue.com/moapp/jybase/mongodb"
  6. "context"
  7. "fmt"
  8. "github.com/gogf/gf/v2/util/gconv"
  9. "github.com/zeromicro/go-zero/core/logx"
  10. "regexp"
  11. "sort"
  12. "strings"
  13. "time"
  14. . "userBehaviorTask/config"
  15. )
  16. type Task struct {
  17. }
  18. // 搜索结果汇总
  19. var SearchInfo = make(map[string][]map[string]interface{})
  20. // 三级页信息汇总
  21. var ContentInfo = make(map[string][]map[string]interface{})
  22. // 职位信息获取
  23. var positionUser = map[string]string{}
  24. // 三级页详情获取
  25. var biddingInfo map[string]map[string]interface{}
  26. // 数据库中数据初始化
  27. var subInItInfo map[string]map[string]string
  28. // 最终整合数据处理
  29. var subInfo = make(map[string]map[string]string)
  30. var (
  31. ArticleId = regexp.MustCompile(".*article/content/(.*)\\.html")
  32. NologinId = regexp.MustCompile(".*nologin/content/(.*)\\.html")
  33. )
  34. func (t *Task) Run() {
  35. go util.SimpleCrontab(false, DbConf.StatisticTime, func() {
  36. now := time.Now()
  37. start := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  38. end := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  39. t.ShuaKu(start.Unix(), end.Unix())
  40. })
  41. }
  42. func (t *Task) ShuaKu(start, end int64) {
  43. //职位标识与雇员标识处理
  44. UserHandle()
  45. //数据库数据初始化
  46. subRecommend()
  47. SearchInfo = make(map[string][]map[string]interface{})
  48. ContentInfo = make(map[string][]map[string]interface{})
  49. subInfo = make(map[string]map[string]string)
  50. searchHandle(start, end)
  51. biddingInfo = map[string]map[string]interface{}{}
  52. ContentHandle(start, end)
  53. //批量处理入库
  54. for userId, v := range SearchInfo {
  55. subInfo[userId] = map[string]string{
  56. "searchfor": gconv.String(v),
  57. }
  58. }
  59. for userId, v := range ContentInfo {
  60. //排序
  61. sort.Slice(v, func(i, j int) bool {
  62. return v[i]["datetimeInt"].(int64) > v[j]["datetimeInt"].(int64)
  63. })
  64. if len(v) > DbConf.InfoCount {
  65. ContentInfo[userId] = v[:DbConf.InfoCount-1]
  66. }
  67. if _, ok := subInfo[userId]; ok {
  68. subInfo[userId]["browse"] = gconv.String(v)
  69. } else {
  70. subInfo[userId] = map[string]string{
  71. "browse": gconv.String(v),
  72. }
  73. }
  74. }
  75. loc, _ := time.LoadLocation("Asia/Shanghai")
  76. updateTime := time.Now().In(loc).Format("2006-01-02 15:04:05")
  77. for userId, v := range subInfo {
  78. searchfor := ""
  79. browse := ""
  80. if _, ok := subInItInfo[userId]; ok {
  81. //汇总数据存在
  82. data := subInItInfo[userId]
  83. if v["searchfor"] == "" && data["searchfor"] != "" {
  84. searchfor = data["searchfor"]
  85. } else {
  86. searchfor = v["searchfor"]
  87. }
  88. if v["browse"] == "" && data["browse"] != "" {
  89. browse = data["browse"]
  90. } else {
  91. browse = v["browse"]
  92. }
  93. //修改操作
  94. sql := fmt.Sprintf(`alter table sub_recommend_rule UPDATE browse ='%s' ,searchfor='%s',update_time='%s' where userid = '%s'`, browse, searchfor, updateTime, userId)
  95. err := ClickhouseConn.Exec(context.Background(), sql)
  96. if err != nil {
  97. logx.Error(err)
  98. }
  99. } else {
  100. //需要新增汇总数据
  101. searchfor := v["searchfor"]
  102. browse := v["browse"]
  103. sql := fmt.Sprintf("INSERT INTO sub_recommend_rule (userid, searchfor, browse, update_time) values ('%s','%s','%s','%s')", userId, searchfor, browse, updateTime)
  104. err := ClickhouseConn.Exec(context.Background(), sql)
  105. if err != nil {
  106. logx.Error(err)
  107. }
  108. }
  109. }
  110. SearchInfo = make(map[string][]map[string]interface{})
  111. ContentInfo = make(map[string][]map[string]interface{})
  112. subInfo = make(map[string]map[string]string)
  113. biddingInfo = map[string]map[string]interface{}{}
  114. positionUser = map[string]string{}
  115. subInItInfo = map[string]map[string]string{}
  116. }
  117. func UserHandle() {
  118. positionUser = map[string]string{}
  119. sqlStr := "SELECT a.id, b.phone, a.ent_id FROM base_position a INNER JOIN base_user b ON a.type = 1 AND a.user_id = b.id"
  120. BaseServiceMysql.SelectByBath(10, func(l *[]map[string]interface{}) bool {
  121. for _, value := range *l {
  122. positionId := gconv.String(value["id"])
  123. phone := gconv.String(value["phone"])
  124. ent_id := gconv.Int64(value["ent_id"])
  125. entUser := JianYuMysql.FindOne("entniche_user", map[string]interface{}{
  126. "ent_id": ent_id,
  127. "phone": phone,
  128. }, "id", "")
  129. if entUser == nil {
  130. continue
  131. }
  132. entUserId := gconv.String((*entUser)["id"])
  133. positionUser[positionId] = entUserId
  134. }
  135. return true
  136. }, sqlStr)
  137. }
  138. // 搜索条件查询
  139. func searchHandle(start, end int64) {
  140. /*startTime := primitive.NewObjectIDFromTimestamp(time.Unix(start, 0))
  141. endTime := primitive.NewObjectIDFromTimestamp(time.Unix(end, 0))*/
  142. logx.Info("搜索条件start。。。", start, end)
  143. sess := MgoLog.GetMgoConn()
  144. defer MgoLog.DestoryMongoConn(sess)
  145. it := sess.DB("qfw").C("jy_search_log").Find(map[string]interface{}{
  146. "createtime": map[string]interface{}{
  147. "$gte": start,
  148. "$lt": end,
  149. },
  150. }).Sort("-createtime").Select(map[string]interface{}{}).Iter()
  151. var numb int64
  152. for m := make(map[string]interface{}); it.Next(&m); {
  153. numb++
  154. if numb%1000 == 0 {
  155. logx.Info("搜索条件跑了", numb)
  156. }
  157. userId := gconv.String(m["s_userid"])
  158. if !IsObjectIdHex(userId) {
  159. //职位标识替换为企业用户表示
  160. if positionUser[userId] == "" {
  161. continue
  162. }
  163. userId = positionUser[userId]
  164. }
  165. if SearchInfo[userId] != nil {
  166. if len(SearchInfo[userId]) >= DbConf.SearchCount {
  167. continue
  168. }
  169. }
  170. wordsMode := gconv.String(m["wordsMode"])
  171. key := []string{}
  172. key1 := gconv.Strings(util.If(gconv.String(m["search_word"]) == "", []string{}, strings.Split(gconv.String(m["search_word"]), " ")))
  173. key2 := gconv.Strings(util.If(gconv.String(m["additionalWords"]) == "", []string{}, strings.Split(gconv.String(m["additionalWords"]), ",")))
  174. if wordsMode == "包含所有" {
  175. key1 = append(key1, key2...)
  176. if len(key1) > 0 {
  177. key = append(key, strings.Join(key1, "+"))
  178. }
  179. } else {
  180. //任意一个
  181. if len(key1) > 0 {
  182. key = append(key, strings.Join(key1, "+"))
  183. }
  184. if len(key2) > 0 {
  185. key = append(key, key2...)
  186. }
  187. }
  188. if len(key) == 0 {
  189. continue
  190. }
  191. searchMap := map[string]interface{}{
  192. "winnerTel": util.If(gconv.String(m["search_winnerTel"]) == "y", 1, 0), //0:不限 1:有中标单位联系方式
  193. "selectType": m["search_selectType"], //搜索范围
  194. "fileExists": gconv.Int64(m["fileExists"]), //0:不限 1:有附件 -1:无附件
  195. "notkeys": strings.Split(gconv.String(m["exclusionWords"]), " "), //排除词
  196. "area": strings.Split(gconv.String(m["search_area"]), ","),
  197. "city": strings.Split(gconv.String(m["search_city"]), ","),
  198. "keys": key, //关键词
  199. "buyerClass": strings.Split(gconv.String(m["search_buyerClass"]), ","), //采购单位行业
  200. "buyerTel": util.If(gconv.String(m["search_buyerTel"]) == "y", 1, 0), //0:不限 1:有采购单位联系方式
  201. "price": m["search_price"], //金额范围,可能没有开始金额,也可能没有结束金额
  202. "topType": strings.Split(gconv.String(m["search_topType"]), ","), //一级信息类型
  203. "subType": strings.Split(gconv.String(m["search_subType"]), ","), //二级信息类型
  204. "industry": strings.Split(gconv.String(m["search_industry"]), ","), //行业分类
  205. "datetime": time.Unix(gconv.Int64(m["createtime"]), 0).Format("2006-01-02 15:04:05"), //搜索时间
  206. }
  207. if _, ok := SearchInfo[userId]; ok {
  208. SearchInfo[userId] = append(SearchInfo[userId], searchMap)
  209. } else {
  210. SearchInfo[userId] = []map[string]interface{}{
  211. searchMap,
  212. }
  213. }
  214. }
  215. }
  216. // 三级页浏览记录查询
  217. func ContentHandle(start, end int64) {
  218. var AppContentInfo = make(map[string][]map[string]interface{})
  219. var numb int64
  220. logx.Info("三级页浏览数据start。。。", start, end)
  221. sess := MgoLog.GetMgoConn()
  222. defer MgoLog.DestoryMongoConn(sess)
  223. //jylog
  224. it := sess.DB("qfw").C("jy_logs").Find(map[string]interface{}{
  225. "date": map[string]interface{}{
  226. "$gte": start,
  227. "$lt": end,
  228. },
  229. }).Sort("-date").Select(map[string]interface{}{
  230. "date": 1,
  231. "url": 1,
  232. "userid": 1,
  233. }).Iter()
  234. for m := make(map[string]interface{}); it.Next(&m); {
  235. numb++
  236. if numb%1000 == 0 {
  237. logx.Info("pc三级页跑了", numb)
  238. }
  239. InformationHandle(m, ContentInfo)
  240. }
  241. //jyapp
  242. numb = 0
  243. sess1 := MgoLog.GetMgoConn()
  244. defer MgoLog.DestoryMongoConn(sess1)
  245. it1 := sess1.DB("qfw").C("jyapp_logs").Find(map[string]interface{}{
  246. "date": map[string]interface{}{
  247. "$gte": start,
  248. "$lt": end,
  249. },
  250. }).Sort("-date").Select(map[string]interface{}{
  251. "date": 1,
  252. "url": 1,
  253. "userid": 1,
  254. }).Iter()
  255. for m := make(map[string]interface{}); it1.Next(&m); {
  256. numb++
  257. if numb%1000 == 0 {
  258. logx.Info("app三级页记录跑了", numb)
  259. }
  260. InformationHandle(m, AppContentInfo)
  261. }
  262. //两个map合成一个处理
  263. for userId, v := range AppContentInfo {
  264. if ContentInfo[userId] == nil {
  265. ContentInfo[userId] = v
  266. } else {
  267. ContentInfo[userId] = append(ContentInfo[userId], v...)
  268. }
  269. }
  270. }
  271. // 资讯数据处理
  272. func InformationHandle(m map[string]interface{}, data map[string][]map[string]interface{}) {
  273. userId := gconv.String(m["userid"])
  274. if !IsObjectIdHex(userId) {
  275. //职位标识替换为企业用户表示
  276. if positionUser[userId] == "" {
  277. return
  278. }
  279. userId = positionUser[userId]
  280. }
  281. if data[userId] != nil {
  282. if len(data[userId]) >= DbConf.InfoCount {
  283. return
  284. }
  285. }
  286. fu1 := ArticleId.FindStringSubmatch(gconv.String(m["url"]))
  287. fu2 := NologinId.FindStringSubmatch(gconv.String(m["url"]))
  288. infoId := ""
  289. if len(fu1) > 1 {
  290. if len(fu1[1]) > 10 {
  291. infoId = encrypt.DecodeArticleId2ByCheck(fu1[1])[0]
  292. }
  293. } else if len(fu2) > 0 {
  294. if len(fu2[1]) > 10 {
  295. infoId = encrypt.DecodeArticleId2ByCheck(fu2[1])[0]
  296. }
  297. }
  298. if infoId == "" {
  299. return
  300. }
  301. //分类标签,bidding表gov_classify.root,过滤掉只有一级的,取最后一级
  302. rootStr := ""
  303. area := ""
  304. city := ""
  305. district := ""
  306. projectname := ""
  307. if _, ok := biddingInfo[infoId]; !ok {
  308. bidding, _ := MgoBidding.FindById("bidding", infoId, `{"title":1,"gov_classify":1,"area":1,"projectname":1,"city":1,"district":1}`)
  309. biddingInfo[infoId] = map[string]interface{}{}
  310. if bidding == nil {
  311. return
  312. }
  313. gov_classify := gconv.Map((*bidding)["gov_classify"])
  314. area = gconv.String((*bidding)["area"])
  315. projectname = gconv.String((*bidding)["projectname"])
  316. if projectname == "" {
  317. projectname = gconv.String((*bidding)["title"])
  318. }
  319. district = gconv.String((*bidding)["district"])
  320. city = gconv.String((*bidding)["city"])
  321. if gov_classify != nil {
  322. root := strings.Split(gconv.String(gov_classify["root"]), "/")
  323. if len(root) == 0 {
  324. return
  325. } else {
  326. for i := len(root) - 1; i >= 1; i-- {
  327. if root[i] != "" {
  328. rootStr = root[i]
  329. biddingInfo[infoId] = map[string]interface{}{
  330. "area": area,
  331. "city": city,
  332. "projectname": projectname,
  333. "district": district,
  334. "rootStr": rootStr,
  335. }
  336. break
  337. }
  338. }
  339. }
  340. }
  341. } else {
  342. rootStr = gconv.String(biddingInfo[infoId]["rootStr"])
  343. area = gconv.String(biddingInfo[infoId]["area"])
  344. city = gconv.String(biddingInfo[infoId]["city"])
  345. district = gconv.String(biddingInfo[infoId]["district"])
  346. projectname = gconv.String(biddingInfo[infoId]["projectname"])
  347. }
  348. if rootStr == "" {
  349. return
  350. }
  351. searchMap := map[string]interface{}{
  352. "infoid": infoId, //信息id
  353. "classify": rootStr, //分类标签,bidding表gov_classify.root,过滤掉只有一级的,取最后一级
  354. "datetimeInt": gconv.Int64(m["date"]), //访问时间
  355. "datetime": time.Unix(gconv.Int64(m["date"]), 0).Format("2006-01-02 15:04:05"), //搜索时间
  356. "area": area,
  357. "city": city,
  358. "projectname": projectname,
  359. "district": district,
  360. }
  361. if _, ok := data[userId]; ok {
  362. data[userId] = append(data[userId], searchMap)
  363. } else {
  364. data[userId] = []map[string]interface{}{
  365. searchMap,
  366. }
  367. }
  368. }
  369. type Recommend struct {
  370. UserId string `ch:"userid"`
  371. SearchFor string `ch:"searchfor"`
  372. Browse string `ch:"browse"`
  373. }
  374. // subRecommend初始化
  375. func subRecommend() {
  376. subInItInfo = map[string]map[string]string{}
  377. rData1, err := ClickhouseConn.Query(context.Background(), `SELECT userid, searchfor, browse FROM sub_recommend_rule order by userid`)
  378. if err != nil {
  379. logx.Error("汇总表查询出错:", err)
  380. return
  381. }
  382. for rData1.Next() {
  383. data := Recommend{}
  384. rData1.ScanStruct(&data)
  385. userid := data.UserId
  386. subInItInfo[userid] = map[string]string{
  387. "searchfor": data.SearchFor,
  388. "browse": data.Browse,
  389. }
  390. }
  391. }