marketanalysis.go 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510
  1. package marketanalysis
  2. import (
  3. "app.yhyue.com/moapp/jybase/common"
  4. "app.yhyue.com/moapp/jybase/encrypt"
  5. elastic "app.yhyue.com/moapp/jybase/es"
  6. "app.yhyue.com/moapp/jybase/mongodb"
  7. "app.yhyue.com/moapp/jybase/mysql"
  8. "app.yhyue.com/moapp/jybase/redis"
  9. "app.yhyue.com/moapp/jypkg/common/src/qfw/util/jy"
  10. "encoding/json"
  11. "errors"
  12. "fmt"
  13. "log"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. MAPool chan bool
  20. MATimeout int
  21. MAProjectNumLimit int
  22. MAKeyWordsCount int
  23. ProjectCount int
  24. PtIndex = "projectset"
  25. PtType = "projectset"
  26. FieldsDetail = `"purchasing","projectname.pname"`
  27. Fields = `"projectname.pname"`
  28. )
  29. func MAInit(limit, timeOut, projectNumLimit, keyWordsCount, pCount int, ptIndex, ptType string, fields []string) {
  30. if limit == 0 {
  31. limit = 5
  32. }
  33. MAPool = make(chan bool, limit)
  34. for i := 0; i < limit; i++ {
  35. MAPool <- true
  36. }
  37. if timeOut <= 0 {
  38. timeOut = 20
  39. }
  40. MATimeout = timeOut
  41. if projectNumLimit <= 0 {
  42. projectNumLimit = 600000
  43. }
  44. MAProjectNumLimit = projectNumLimit
  45. if keyWordsCount <= 0 {
  46. keyWordsCount = 300
  47. }
  48. MAKeyWordsCount = keyWordsCount
  49. //项目数量
  50. ProjectCount = pCount
  51. //es 索引
  52. if ptIndex != "" {
  53. PtIndex = ptIndex
  54. PtType = ptType
  55. }
  56. //
  57. if len(fields) > 1 {
  58. Fields = fmt.Sprintf(`"%s"`, fields[0])
  59. FieldsDetail = fmt.Sprintf(`"%s"`, strings.Join(fields, `","`))
  60. }
  61. }
  62. // AnalysisRequestParam 接口原请求参数
  63. type AnalysisRequestParam struct {
  64. KeysItemsStr string //分析内容【字符串】结构和o_member_jy.a_items保持一致
  65. RangeTime string //时间【字符串】 时间戳开始-结束时间戳
  66. RangeTimeExtra string //时间【字符串】前段回显使用
  67. Area string //省份【对象字符串】
  68. Industry string //行业【对象字符串】
  69. BuyerClass string //采购单位类型【字符串】多个采购单位类型用逗号拼接
  70. Buyer string //采购单位
  71. Winner string //中标单位
  72. Sort int //排序:默认0:成交时间倒序;1:项目金额倒序
  73. PageSize int //默认每页10条
  74. PageNum int //默认当前第一页
  75. IsDetail bool //是否是项目明细请求
  76. MatchingMode string //匹配方式 title:标题 content:项目名称/标的物
  77. }
  78. type viewKeyWord struct {
  79. Keyword []string `json:"key"` //关键词
  80. Appended []string `json:"appendkey"` //附加词
  81. Exclude []string `json:"notkey"` //排除词
  82. MatchWay int `json:"matchway"` //匹配模式
  83. }
  84. // keyWordGroup 订阅词结构体
  85. type keyWordGroup struct {
  86. A_Key []viewKeyWord `json:"a_key"`
  87. ItemName string `json:"s_item"`
  88. UpdateTime int64 `json:"updatetime"`
  89. }
  90. // AnalysisRequestFormat 格式化后参数
  91. type AnalysisRequestFormat struct {
  92. KeysItems []keyWordGroup
  93. Area, City []string //省份城市
  94. STime, ETime int64 //开始结束时间
  95. Industry []string //行业
  96. BuyerClass []string //采购单位类型
  97. Buyer string //采购单位
  98. Winner string //中标单位
  99. Sort int //排序:默认0:成交时间倒序;1:项目金额倒序
  100. PageSize int //默认每页10条
  101. PageNum int //默认当前第一页
  102. }
  103. type AnalysisEntity struct {
  104. MgoRecordId string
  105. BaseParam AnalysisRequestParam
  106. FormatParam AnalysisRequestFormat
  107. UId, Pid string
  108. ProjectInfo projectInfo
  109. Offline int // 1-离线 2-实时
  110. State int // 状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
  111. MgoUserId string
  112. Phone string // 手机号
  113. PositionId int64
  114. OriginalTotal int64 // 数据总数
  115. KeysTotal int64 //关键词数量
  116. PositionType int
  117. EntId int
  118. EntUserId int
  119. Source string
  120. Mgo *mongodb.MongodbSim
  121. MySql *mysql.Mysql
  122. }
  123. type projectInfo struct {
  124. Count int64
  125. List []ProjectList
  126. }
  127. type ProjectList struct {
  128. Name string `json:"name"` //项目名称
  129. Id string `json:"id"` //项目id
  130. Area string `json:"area"` //地区
  131. DealTime int64 `json:"dealTime"` //成交时间
  132. BidStatus string `json:"bidStatus"` //项目类型
  133. BuyerClass string `json:"buyerClass"` //采购单位类型
  134. Winner []string `json:"winner"` //中标单位
  135. WinnerId []string `json:"winnerId"` //中标单位id
  136. Buyer string `json:"buyer"` //采购单位
  137. BidAmount float64 `json:"bidAmount"` //中标金额
  138. Budget float64 `json:"budget"` //预算
  139. }
  140. // ForMatData 获取格式化请求参数
  141. func (a *AnalysisEntity) ForMatData() error {
  142. //格式化订阅词
  143. if err := json.Unmarshal([]byte(a.BaseParam.KeysItemsStr), &a.FormatParam.KeysItems); err != nil {
  144. return fmt.Errorf("关键词组格式异常")
  145. }
  146. if a.FormatParam.KeysItems == nil || len(a.FormatParam.KeysItems) == 0 {
  147. return fmt.Errorf("请选择关键词组")
  148. }
  149. var flag bool
  150. // 判断关键词是不是为空
  151. for i := 0; i < len(a.FormatParam.KeysItems); i++ {
  152. items := a.FormatParam.KeysItems[i]
  153. for j := 0; j < len(items.A_Key); j++ {
  154. AKey := items.A_Key[j]
  155. if len(AKey.Keyword) > 0 {
  156. flag = true
  157. break
  158. }
  159. }
  160. if flag {
  161. break
  162. }
  163. }
  164. if !flag {
  165. return fmt.Errorf("请选择关键词组")
  166. }
  167. //格式化时间段
  168. if timeArr := strings.Split(a.BaseParam.RangeTime, "-"); len(timeArr) == 2 {
  169. a.FormatParam.STime = common.Int64All(timeArr[0])
  170. a.FormatParam.ETime = common.Int64All(timeArr[1])
  171. if a.FormatParam.STime == 0 || a.FormatParam.ETime == 0 {
  172. return fmt.Errorf("开始时间和结束时间不能为空")
  173. }
  174. } else {
  175. return fmt.Errorf("时间戳格式异常")
  176. }
  177. //格式化省份、城市
  178. if areaStr := strings.TrimSpace(a.BaseParam.Area); areaStr != "" {
  179. imap := map[string][]string{}
  180. if err := json.Unmarshal([]byte(a.BaseParam.Area), &imap); err != nil {
  181. return fmt.Errorf("非法地区信息")
  182. }
  183. var city, area []string
  184. for name, v := range imap {
  185. if len(v) == 0 {
  186. area = append(area, name)
  187. } else {
  188. for _, vv := range v {
  189. city = append(city, vv)
  190. }
  191. }
  192. }
  193. a.FormatParam.Area = area
  194. a.FormatParam.City = city
  195. }
  196. //格式化行业
  197. if industryStr := strings.TrimSpace(a.BaseParam.Industry); industryStr != "" {
  198. imap := map[string][]string{}
  199. if err := json.Unmarshal([]byte(industryStr), &imap); err != nil {
  200. return fmt.Errorf("非法行业信息")
  201. }
  202. var farr []string
  203. for name, v := range imap {
  204. for _, vv := range v {
  205. farr = append(farr, fmt.Sprintf("%s_%s", name, vv))
  206. }
  207. }
  208. if len(farr) > 0 {
  209. //P510 行业:其它
  210. if qt := jy.IndustryHandle(strings.Join(farr, ",")); len(qt) > 0 {
  211. farr = append(farr, qt...)
  212. }
  213. }
  214. a.FormatParam.Industry = farr
  215. }
  216. //格式化类型
  217. if buyerClassStr := strings.TrimSpace(a.BaseParam.BuyerClass); buyerClassStr != "" {
  218. a.FormatParam.BuyerClass = strings.Split(buyerClassStr, ",")
  219. }
  220. //中标企业
  221. a.FormatParam.Winner = a.BaseParam.Winner
  222. //采购单位
  223. a.FormatParam.Buyer = a.BaseParam.Buyer
  224. //排序
  225. a.FormatParam.Sort = common.If(a.BaseParam.Sort != 0 && a.BaseParam.Sort != 1, 0, a.BaseParam.Sort).(int)
  226. if a.BaseParam.PageNum*a.BaseParam.PageSize > ProjectCount {
  227. a.BaseParam.PageNum = ProjectCount / a.BaseParam.PageSize
  228. }
  229. //当前页码
  230. a.FormatParam.PageNum = common.If(a.BaseParam.PageNum < 1 || a.BaseParam.PageNum > 1000, 1, a.BaseParam.PageNum).(int)
  231. //默认每页10条
  232. a.FormatParam.PageSize = common.If(a.BaseParam.PageSize < 1 || a.BaseParam.PageSize > 100, 50, a.BaseParam.PageSize).(int)
  233. return nil
  234. }
  235. // ForMatData 获取格式化请求参数
  236. func (a *AnalysisEntity) ForMatDataPdf() (string, error) {
  237. //格式化订阅词
  238. if err := json.Unmarshal([]byte(a.BaseParam.KeysItemsStr), &a.FormatParam.KeysItems); err != nil {
  239. return "", fmt.Errorf("关键词组格式异常")
  240. }
  241. //格式化时间段
  242. if timeArr := strings.Split(a.BaseParam.RangeTime, "-"); len(timeArr) == 2 {
  243. a.FormatParam.STime = common.Int64All(timeArr[0])
  244. a.FormatParam.ETime = common.Int64All(timeArr[1])
  245. if a.FormatParam.STime == 0 || a.FormatParam.ETime == 0 {
  246. return "", fmt.Errorf("开始时间和结束时间不能为空")
  247. }
  248. } else {
  249. return "", fmt.Errorf("时间戳格式异常")
  250. }
  251. //格式化省份、城市
  252. if areaStr := strings.TrimSpace(a.BaseParam.Area); areaStr != "" {
  253. imap := map[string][]string{}
  254. if err := json.Unmarshal([]byte(a.BaseParam.Area), &imap); err != nil {
  255. return "", fmt.Errorf("非法地区信息")
  256. }
  257. var city, area []string
  258. for name, v := range imap {
  259. if len(v) == 0 {
  260. area = append(area, name)
  261. } else {
  262. for _, vv := range v {
  263. city = append(city, vv)
  264. }
  265. }
  266. }
  267. a.FormatParam.Area = area
  268. a.FormatParam.City = city
  269. }
  270. //格式化行业
  271. if industryStr := strings.TrimSpace(a.BaseParam.Industry); industryStr != "" {
  272. imap := map[string][]string{}
  273. if err := json.Unmarshal([]byte(industryStr), &imap); err != nil {
  274. return "", fmt.Errorf("非法行业信息")
  275. }
  276. var farr []string
  277. for name, v := range imap {
  278. for _, vv := range v {
  279. farr = append(farr, fmt.Sprintf("%s_%s", name, vv))
  280. }
  281. }
  282. a.FormatParam.Industry = farr
  283. }
  284. //格式化类型
  285. if buyerClassStr := strings.TrimSpace(a.BaseParam.BuyerClass); buyerClassStr != "" {
  286. a.FormatParam.BuyerClass = strings.Split(buyerClassStr, ",")
  287. }
  288. //中标企业
  289. a.FormatParam.Winner = a.BaseParam.Winner
  290. //采购单位
  291. a.FormatParam.Buyer = a.BaseParam.Buyer
  292. //排序
  293. a.FormatParam.Sort = common.If(a.BaseParam.Sort != 0 && a.BaseParam.Sort != 1, 0, a.BaseParam.Sort).(int)
  294. if a.BaseParam.PageNum*a.BaseParam.PageSize > ProjectCount {
  295. a.BaseParam.PageNum = ProjectCount / a.BaseParam.PageSize
  296. }
  297. //当前页码
  298. a.FormatParam.PageNum = common.If(a.BaseParam.PageNum < 1 || a.BaseParam.PageNum > 1000, 1, a.BaseParam.PageNum).(int)
  299. //默认每页10条
  300. a.FormatParam.PageSize = common.If(a.BaseParam.PageSize < 1 || a.BaseParam.PageSize > 100, 50, a.BaseParam.PageSize).(int)
  301. data := map[string]interface{}{
  302. "s_keysItems": a.BaseParam.KeysItemsStr,
  303. "s_rangeTime": a.BaseParam.RangeTime,
  304. "s_rangeTimeExtra": a.BaseParam.RangeTimeExtra,
  305. "s_area": a.BaseParam.Area,
  306. "s_industry": a.BaseParam.Industry,
  307. "s_buyerClass": a.BaseParam.BuyerClass,
  308. "s_matchingMode": a.BaseParam.MatchingMode,
  309. "s_userId": a.UId,
  310. "s_parentId": a.Pid,
  311. "s_mgoUserId": a.MgoUserId,
  312. "i_positionId": a.PositionId,
  313. "s_phone": a.Phone,
  314. "audit": 1,
  315. }
  316. if a.OriginalTotal > 0 {
  317. data["l_originalTotal"] = a.OriginalTotal
  318. }
  319. if a.Source != "" {
  320. data["source"] = a.Source
  321. }
  322. rs, b := a.Mgo.FindOne(ReportHistoryTable, data)
  323. if b && rs != nil && len(*rs) > 0 {
  324. return common.InterfaceToStr((*rs)["_id"]), errors.New("当期分析已存在")
  325. }
  326. return "", nil
  327. }
  328. // GetProjectInfoList 项目明细
  329. func (a *AnalysisEntity) GetProjectInfoList() error {
  330. var (
  331. queryDefault = `,"sort": [{%s}],"from": %d,"size": %d`
  332. start = (a.FormatParam.PageNum - 1) * a.FormatParam.PageSize
  333. sort = `"jgtime": "desc"`
  334. )
  335. if a.FormatParam.Sort > 0 {
  336. sort = `"bidamount": "desc","budget": "desc"`
  337. }
  338. countSql := fmt.Sprintf(a.GetCommonQuerySql(), "")
  339. queryDefault = fmt.Sprintf(queryDefault, sort, start, a.FormatParam.PageSize)
  340. finalSql := fmt.Sprintf(a.GetCommonQuerySql(), queryDefault)
  341. log.Println("finalSql:", finalSql)
  342. count, hits := elastic.GetWithCount(PtIndex, PtType, countSql, finalSql)
  343. //hits, count := elastic.GetOA(PtIndex, PtType, finalSql)
  344. if count > 0 {
  345. a.ProjectInfo.Count = count
  346. source := *hits
  347. for _, v := range source {
  348. var winnerIdArr []string
  349. if common.ObjToString(v["s_winner"]) != "" && v["entidlist"] != nil {
  350. idObjs, _ := v["entidlist"].([]interface{})
  351. for _, v := range common.ObjArrToStringArr(idObjs) {
  352. if v != "" && v != "-" {
  353. v = encrypt.EncodeArticleId2ByCheck(v)
  354. }
  355. winnerIdArr = append(winnerIdArr, v)
  356. }
  357. }
  358. a.ProjectInfo.List = append(a.ProjectInfo.List, ProjectList{
  359. Name: common.ObjToString(v["projectname"]),
  360. Id: encrypt.EncodeArticleId2ByCheck(common.ObjToString(v["id"])),
  361. Area: common.ObjToString(v["area"]),
  362. DealTime: common.Int64All(v["jgtime"]), //截止时间
  363. BidStatus: common.ObjToString(v["bidstatus"]),
  364. BuyerClass: common.ObjToString(v["buyerclass"]),
  365. Winner: strings.Split(common.ObjToString(v["s_winner"]), ","),
  366. WinnerId: winnerIdArr,
  367. Buyer: common.ObjToString(v["buyer"]),
  368. BidAmount: common.Float64All(v["bidamount"]), //中标金额
  369. Budget: common.Float64All(v["budget"]), //预算
  370. })
  371. }
  372. }
  373. return nil
  374. }
  375. // SaveAnalysisRecord 保存分析记录
  376. func (a *AnalysisEntity) SaveAnalysisRecord() error {
  377. if a.Offline == ValueRealTime {
  378. a.State = ReportStateGenerated
  379. } else {
  380. a.State = ReportStateGenerating
  381. }
  382. data := map[string]interface{}{
  383. "s_keysItems": a.BaseParam.KeysItemsStr,
  384. "s_rangeTime": a.BaseParam.RangeTime,
  385. "s_rangeTimeExtra": a.BaseParam.RangeTimeExtra,
  386. "s_area": a.BaseParam.Area,
  387. "s_industry": a.BaseParam.Industry,
  388. "s_buyerClass": a.BaseParam.BuyerClass,
  389. "s_matchingMode": a.BaseParam.MatchingMode,
  390. "s_userId": a.UId,
  391. "s_parentId": a.Pid,
  392. "i_state": common.If(a.Source == "analysisPDF", 1, a.State), //状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
  393. "l_updateTime": time.Now().Unix(), //生成时间 or 取消时间
  394. "l_createTime": time.Now().Unix(),
  395. "i_offline": common.If(a.Source == "analysisPDF", 2, a.Offline),
  396. "s_mgoUserId": a.MgoUserId,
  397. "i_positionId": a.PositionId,
  398. "s_phone": a.Phone,
  399. }
  400. if a.OriginalTotal > 0 {
  401. data["l_originalTotal"] = a.OriginalTotal
  402. }
  403. if a.KeysTotal > 0 {
  404. data["l_keysTotal"] = a.KeysTotal
  405. }
  406. if a.Source != "" {
  407. data["source"] = a.Source
  408. }
  409. a.MgoRecordId = a.Mgo.Save(ReportHistoryTable, data)
  410. if a.MgoRecordId == "" {
  411. return fmt.Errorf("分析创建异常")
  412. }
  413. return nil
  414. }
  415. // GetAnalysisFromMgoDb 从数据库中获取分析记录
  416. func (a *AnalysisEntity) GetAnalysisFromMgoDb() error {
  417. if a.MgoRecordId == "" {
  418. return fmt.Errorf("缺少参数")
  419. }
  420. queryMap := map[string]interface{}{
  421. "_id": mongodb.StringTOBsonId(a.MgoRecordId),
  422. "i_del": map[string]interface{}{"$ne": 1},
  423. }
  424. if a.UId == a.Pid { //主账号
  425. queryMap["s_parentId"] = a.Pid
  426. } else {
  427. queryMap["s_userId"] = a.UId
  428. }
  429. res, _ := a.Mgo.FindOne(ReportHistoryTable, queryMap)
  430. if res == nil || len(*res) == 0 {
  431. return fmt.Errorf("未查询到相关数据")
  432. }
  433. a.Offline = common.IntAll((*res)["i_offline"])
  434. a.State = common.IntAll((*res)["i_state"])
  435. a.BaseParam.KeysItemsStr, _ = (*res)["s_keysItems"].(string)
  436. a.BaseParam.RangeTime, _ = (*res)["s_rangeTime"].(string)
  437. a.BaseParam.RangeTimeExtra, _ = (*res)["s_rangeTimeExtra"].(string)
  438. a.BaseParam.Area, _ = (*res)["s_area"].(string)
  439. a.BaseParam.Industry, _ = (*res)["s_industry"].(string)
  440. a.BaseParam.BuyerClass, _ = (*res)["s_buyerClass"].(string)
  441. a.BaseParam.MatchingMode, _ = (*res)["s_matchingMode"].(string)
  442. return nil
  443. }
  444. // removeEmptyRecord 删除空报告记录
  445. func (a *AnalysisEntity) removeEmptyRecord() {
  446. queryMap := map[string]interface{}{
  447. "_id": mongodb.StringTOBsonId(a.MgoRecordId),
  448. }
  449. if a.UId == a.Pid { //主账号
  450. queryMap["s_parentId"] = a.Pid
  451. } else {
  452. queryMap["s_userId"] = a.UId
  453. }
  454. a.Mgo.Update(ReportHistoryTable, queryMap, map[string]interface{}{
  455. "$set": map[string]interface{}{"i_del": 1},
  456. }, false, false)
  457. //log.Println("删除空报告", queryMap)
  458. }
  459. // GetRecordList 获取分析记录
  460. func (a *AnalysisEntity) GetRecordList(pageNum, PageSize int, positionType int, entId, entUserId int) (total int, list []map[string]interface{}) {
  461. queryMap := map[string]interface{}{
  462. "i_del": map[string]interface{}{"$ne": 1},
  463. }
  464. if a.UId == a.Pid { //主账号
  465. queryMap["s_parentId"] = a.Pid
  466. } else {
  467. queryMap["s_userId"] = a.UId
  468. }
  469. if pageNum == 1 {
  470. total = a.Mgo.Count(ReportHistoryTable, queryMap)
  471. if total == 0 {
  472. return
  473. }
  474. } else {
  475. total = -1
  476. }
  477. res, _ := a.Mgo.Find(ReportHistoryTable, queryMap, `{"l_createTime":-1}`, nil, false, (pageNum-1)*PageSize, PageSize)
  478. if res == nil || len(*res) == 0 {
  479. return
  480. }
  481. var ids []string
  482. for _, m := range *res {
  483. ids = append(ids, common.InterfaceToStr(m["_id"]))
  484. }
  485. idMap := make(map[string]map[string]interface{})
  486. tName, _ := GetMongoColl(MarketScaleMain)
  487. mids, _ := a.Mgo.Find(tName, map[string]interface{}{
  488. "s_m_id": map[string]interface{}{
  489. "$in": ids,
  490. },
  491. }, "", `{"market_profile":1,"s_m_id":1}`, false, -1, -1)
  492. if mids != nil && len(*mids) > 0 {
  493. for _, m := range *mids {
  494. marketProfile, _ := m["market_profile"].(map[string]interface{})
  495. if common.IntAll(marketProfile["project_count"]) > 0 {
  496. idMap[common.InterfaceToStr(m["s_m_id"])] = marketProfile
  497. }
  498. }
  499. }
  500. //用户消息开关
  501. open := GetMsgOpen(a.Mgo, a.MgoUserId, positionType, entId, entUserId)
  502. for _, row := range *res {
  503. var status int
  504. if row["i_state"] != nil {
  505. status = common.IntAll(row["i_state"])
  506. if status == ReportStateFailed {
  507. status = ReportStateGenerating // 生成失败对外还是展示为生成中
  508. }
  509. }
  510. var (
  511. isDownload bool
  512. )
  513. marketProfile := make(map[string]interface{})
  514. if marketProfile = idMap[mongodb.BsonIdToSId(row["_id"])]; marketProfile != nil && common.IntAll(marketProfile["project_count"]) > 0 && status == 1 {
  515. isDownload = true
  516. }
  517. //P510 回显 移除 其它
  518. industry := common.ObjToString(row["s_industry"])
  519. if industry != "" {
  520. industry = strings.ReplaceAll(industry, "\"其它\"", "")
  521. }
  522. data := map[string]interface{}{
  523. "id": encodeId(mongodb.BsonIdToSId(row["_id"])),
  524. "keysItems": common.ObjToString(row["s_keysItems"]),
  525. "area": common.ObjToString(row["s_area"]),
  526. "industry": industry,
  527. "buyerclass": common.ObjToString(row["s_buyerClass"]),
  528. "rangeTime": common.ObjToString(row["s_rangeTime"]),
  529. "s_rangeTimeExtra": common.ObjToString(row["s_rangeTimeExtra"]),
  530. "createTime": common.Int64All(row["l_createTime"]),
  531. "matchingMode": common.ObjToString(row["s_matchingMode"]), //项目匹配方式
  532. "state": common.If(row["i_state"] == nil, nil, status),
  533. "updateTime": common.Int64All(row["l_updateTime"]),
  534. "msgOpen": open,
  535. "isDownload": isDownload,
  536. "marketProfile": marketProfile,
  537. }
  538. list = append(list, data)
  539. }
  540. return
  541. }
  542. func (a *AnalysisEntity) GetRecordPdfList(pageNum, PageSize int) (total int, list []map[string]interface{}) {
  543. queryMap := map[string]interface{}{
  544. "i_del": map[string]interface{}{"$ne": 1},
  545. "source": "analysisPDF",
  546. }
  547. if a.UId == a.Pid { //主账号
  548. queryMap["s_parentId"] = a.Pid
  549. } else {
  550. queryMap["s_userId"] = a.UId
  551. }
  552. if pageNum == 1 {
  553. total = a.Mgo.Count(ReportHistoryTable, queryMap)
  554. if total == 0 {
  555. return
  556. }
  557. } else {
  558. total = -1
  559. }
  560. res, _ := a.Mgo.Find(ReportHistoryTable, queryMap, `{"l_createTime":-1}`, nil, false, (pageNum-1)*PageSize, PageSize)
  561. if res == nil || len(*res) == 0 {
  562. return
  563. }
  564. for _, row := range *res {
  565. data := map[string]interface{}{
  566. "id": encodeId(mongodb.BsonIdToSId(row["_id"])),
  567. "keysItems": common.ObjToString(row["s_keysItems"]),
  568. "area": common.ObjToString(row["s_area"]),
  569. "industry": common.ObjToString(row["s_industry"]),
  570. "buyerclass": common.ObjToString(row["s_buyerClass"]),
  571. "rangeTime": common.ObjToString(row["s_rangeTime"]),
  572. "s_rangeTimeExtra": common.ObjToString(row["s_rangeTimeExtra"]),
  573. "createTime": common.Int64All(row["l_createTime"]),
  574. "matchingMode": common.ObjToString(row["s_matchingMode"]), //项目匹配方式
  575. "updateTime": common.Int64All(row["l_updateTime"]),
  576. "audit": common.Int64All(row["audit"]),
  577. }
  578. list = append(list, data)
  579. }
  580. return
  581. }
  582. // GetQueryItem 获取查询条件,前端回显使用
  583. func (a *AnalysisEntity) getQueryItem() (map[string]interface{}, error) {
  584. return map[string]interface{}{
  585. "keysItems": a.BaseParam.KeysItemsStr,
  586. "area": a.BaseParam.Area,
  587. "industry": a.BaseParam.Industry,
  588. "buyerclass": a.BaseParam.BuyerClass,
  589. "rangeTime": a.BaseParam.RangeTime,
  590. "s_rangeTimeExtra": a.BaseParam.RangeTimeExtra,
  591. "matchingMode": a.BaseParam.MatchingMode,
  592. }, nil
  593. }
  594. // GetPartResult 分块儿获取报告内容
  595. func (a *AnalysisEntity) GetPartResult(flag int) (map[string]interface{}, error) {
  596. defer common.Catch()
  597. if flag == MarketQueryItem { //返回查询内容
  598. return a.getQueryItem()
  599. }
  600. thisCacheKey := fmt.Sprintf(ReportCacheKey, a.MgoRecordId, flag)
  601. if cacheData := redis.Get(ReportCacheDB, thisCacheKey); cacheData != nil {
  602. if cacheMap, ok := cacheData.(map[string]interface{}); ok && len(cacheMap) > 0 {
  603. return cacheMap, nil
  604. }
  605. }
  606. rData, err := func() (map[string]interface{}, error) {
  607. //控制并发&&超时返回超时异常
  608. select {
  609. case <-time.After(time.Duration(MATimeout) * time.Second * 20):
  610. return nil, fmt.Errorf("查询超时,请稍后重试")
  611. case <-MAPool:
  612. }
  613. start := time.Now()
  614. defer func() {
  615. MAPool <- true
  616. log.Printf("report %s[%d] speed %d ms\n", a.MgoRecordId, flag, time.Now().Sub(start).Milliseconds())
  617. }()
  618. //校验报告是否合法
  619. //if flag != MarketScaleMain {
  620. // if exists, _ := redis.Exists(ReportCacheDB, fmt.Sprintf(ReportCacheKey, a.MgoRecordId, 1)); !exists {
  621. // return nil, fmt.Errorf("报告异常请求,请刷新重试")
  622. // }
  623. //}
  624. // 1. 查mongo
  625. var rData map[string]interface{}
  626. var err error
  627. rData, err = a.GetMongoData(flag)
  628. // 查到数据则直接返回
  629. if err == nil && rData != nil {
  630. return rData, nil
  631. }
  632. // 2.没有查询到数据 判断是不是离线的
  633. // 正常情况下正在离线生的不会走到这里
  634. // 离线的应该生成完报告之后才会调用获取结果接口 这里处理是防止直接调接口传正在离线生的报告
  635. if a.Offline == ValueOffline || a.Offline == 0 {
  636. return nil, err
  637. }
  638. // 3. 实时则查es
  639. rData, err = a.RealTimeQuery(flag)
  640. if err == nil && len(rData) > 0 {
  641. // 4.存库
  642. a.SaveMongoReport(rData, flag)
  643. return rData, err
  644. }
  645. return nil, err
  646. }()
  647. if err == nil && rData != nil && len(rData) > 0 {
  648. delete(rData, "s_m_id")
  649. delete(rData, "_id")
  650. redis.Put(ReportCacheDB, thisCacheKey, rData, ReportCacheTime)
  651. }
  652. return rData, err
  653. }
  654. // 实时查询
  655. func (a *AnalysisEntity) RealTimeQuery(flag int) (map[string]interface{}, error) {
  656. switch flag {
  657. case MarketScaleMain:
  658. rData, err := a.MarketTime()
  659. //非离线
  660. if a.Offline != 1 && err != nil { //若无报告内容,删除报告记录
  661. go a.removeEmptyRecord()
  662. }
  663. return rData, err
  664. case MarketTopProject:
  665. return a.ProjectTop10()
  666. case MarketProjectAllData:
  667. return a.AllData()
  668. case MarketScaleRefine:
  669. return a.MarketScaleRefineQuery()
  670. case MarketBuyerAndWinner:
  671. return a.BuyerWinnerAnalysis(), nil
  672. }
  673. return nil, fmt.Errorf("未知请求")
  674. }
  675. // GetMongoData 从mongo库查询数据
  676. func (a *AnalysisEntity) GetMongoData(flag int) (map[string]interface{}, error) {
  677. collName, err := GetMongoColl(flag)
  678. if err != nil || collName == "" {
  679. return nil, fmt.Errorf("未知请求")
  680. }
  681. // 查询
  682. query := map[string]interface{}{
  683. "s_m_id": a.MgoRecordId,
  684. }
  685. data, b := a.Mgo.FindOne(collName, query)
  686. if !b || data == nil || len(*data) == 0 {
  687. log.Println("没有查询到数据", b, query, collName)
  688. return nil, nil
  689. }
  690. delete(*data, "s_m_id")
  691. delete(*data, "_id")
  692. return *data, nil
  693. }
  694. // GetMongoColl 获取mgo库对应的Coll
  695. func GetMongoColl(flag int) (string, error) {
  696. switch flag {
  697. case MarketScaleMain:
  698. return CollMarketScaleMain, nil
  699. case MarketTopProject:
  700. return CollMarketTopProject, nil
  701. case MarketProjectAllData:
  702. return CollMarketProjectAllData, nil
  703. case MarketScaleRefine:
  704. return CollMarketScaleRefine, nil
  705. case MarketBuyerAndWinner:
  706. return CollMarketBuyerAndWinner, nil
  707. }
  708. return "", fmt.Errorf("未知类型")
  709. }
  710. // SaveMongoReport 数据存mongo库
  711. func (a *AnalysisEntity) SaveMongoReport(rData map[string]interface{}, flag int) {
  712. collName, _ := GetMongoColl(flag)
  713. rData["s_m_id"] = a.MgoRecordId
  714. b, err := json.Marshal(rData)
  715. if err != nil {
  716. log.Println("JSON marshal error:", err)
  717. } else {
  718. var saveMap map[string]interface{}
  719. err = json.Unmarshal(b, &saveMap)
  720. if err != nil {
  721. log.Println("JSON Unmarshal error:", err)
  722. a.Mgo.Save(collName, rData)
  723. } else {
  724. a.Mgo.Save(collName, saveMap)
  725. }
  726. }
  727. }
  728. // GetAnalyzingReport 是否有正在分析的离线报告
  729. func (a *AnalysisEntity) GetAnalyzingReport() string {
  730. query := map[string]interface{}{
  731. "s_userId": a.UId,
  732. "i_state": map[string]interface{}{"$in": []int{ReportStateGenerating, ReportStateFailed}}, //状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
  733. "i_del": map[string]interface{}{"$ne": 1},
  734. }
  735. //log.Println("query:", query)
  736. rs, b := a.Mgo.FindOne(ReportHistoryTable, query)
  737. //log.Println("rs,b:", rs, b)
  738. if b && rs != nil {
  739. return mongodb.BsonIdToSId((*rs)["_id"])
  740. }
  741. return ""
  742. }
  743. // IsOffline 判断是否符合在线分析的条件
  744. func (a *AnalysisEntity) IsOffline() (offline bool) {
  745. //离线生成:订阅词(关键词+排除词)超过300个(数量支持配置),或单次分析数据超过60万条,则离线生成;
  746. keyCount := 0
  747. for i := 0; i < len(a.FormatParam.KeysItems); i++ {
  748. items := a.FormatParam.KeysItems[i]
  749. for j := 0; j < len(items.A_Key); j++ {
  750. AKey := items.A_Key[j]
  751. for k := 0; k < len(AKey.Keyword); k++ {
  752. keyCount++
  753. }
  754. for k := 0; k < len(AKey.Appended); k++ {
  755. keyCount++
  756. }
  757. for k := 0; k < len(AKey.Exclude); k++ {
  758. keyCount++
  759. }
  760. }
  761. }
  762. if keyCount > MAKeyWordsCount {
  763. // 查询配置
  764. if mac := a.getMarUserAccount(); mac != nil {
  765. if keyCount >= mac.Threshold {
  766. a.Offline = ValueOffline
  767. a.KeysTotal = int64(keyCount)
  768. return true
  769. }
  770. } else {
  771. a.Offline = ValueOffline
  772. a.KeysTotal = int64(keyCount)
  773. return true
  774. }
  775. }
  776. // 查询数据量
  777. countSql := fmt.Sprintf(a.GetCommonQuerySql(), "")
  778. log.Println("IsOffline count SQL:", countSql)
  779. now := time.Now()
  780. dataCount := elastic.Count(PtIndex, PtType, countSql)
  781. b := time.Since(now)
  782. log.Println("IsOffline 统计数据量耗时:", b)
  783. log.Println("IsOffline 数据量:", dataCount)
  784. a.OriginalTotal = dataCount
  785. if int(dataCount) > MAProjectNumLimit {
  786. a.Offline = ValueOffline
  787. return true
  788. }
  789. a.Offline = ValueRealTime
  790. return false
  791. }
  792. type MarUserAccount struct {
  793. Threshold int // 关键词离线标准量
  794. }
  795. // 获取离线市场报告分析关键词标准信息
  796. func (a *AnalysisEntity) getMarUserAccount() *MarUserAccount {
  797. rs := a.MySql.SelectBySql(fmt.Sprintf("SELECT threshold FROM %s where position_id=? and state = 0;", TablejianyuMarUserAccount), a.PositionId)
  798. if rs != nil && len(*rs) > 0 {
  799. return &MarUserAccount{Threshold: common.IntAll((*rs)[0]["threshold"])}
  800. }
  801. return nil
  802. }
  803. func (a *AnalysisEntity) GetReportState() (generated bool, needUpdate bool, err error) {
  804. // 查库
  805. err = a.GetAnalysisFromMgoDb()
  806. if err != nil {
  807. return
  808. }
  809. // 已经生成的可以直接返回
  810. if a.State == ReportStateGenerated {
  811. generated = true
  812. return
  813. }
  814. // 该字段没有值 说明是还没有被更新的历史数据 需要更新
  815. // 且判断之后后续需要更新i_state和i_offline字段
  816. if a.Offline == 0 {
  817. needUpdate = true
  818. }
  819. // 如果没有值的话 说明这是历史数据 需要接着走下面的流程进行判断
  820. // 格式化数据 用于后面校验个数
  821. if err = a.ForMatData(); err != nil {
  822. return
  823. }
  824. return
  825. }
  826. // Cancel 取消正在分析中的报告
  827. func (a *AnalysisEntity) Cancel() (bool, error) {
  828. // 取消报告
  829. queryMap := map[string]interface{}{
  830. "_id": a.MgoRecordId,
  831. "i_state": map[string]interface{}{"$in": []int{ReportStateGenerating, ReportStateFailed}}, //状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
  832. "i_del": map[string]interface{}{"$ne": 1},
  833. }
  834. if a.UId == a.Pid { //主账号
  835. queryMap["s_parentId"] = a.Pid
  836. } else {
  837. queryMap["s_userId"] = a.UId
  838. }
  839. //验证
  840. rs, b := a.Mgo.FindOne(ReportHistoryTable, queryMap)
  841. //
  842. if !b || rs == nil {
  843. return false, fmt.Errorf("未查询到该记录")
  844. }
  845. update := a.Mgo.UpdateById(ReportHistoryTable, a.MgoRecordId, map[string]interface{}{"$set": map[string]interface{}{
  846. "l_updateTime": time.Now().Unix(),
  847. "i_state": ReportStateCanceled,
  848. }})
  849. if !update {
  850. log.Println("分析报告取消失败:", a.MgoRecordId)
  851. return false, fmt.Errorf("取消失败")
  852. }
  853. // redis里面放取消标识
  854. redis.Put(ReportCacheDB, fmt.Sprintf(ReportCanceledKey, a.MgoRecordId), 1, ReportCanceledTime)
  855. return update, nil
  856. }
  857. // UpdateOffline 更新报告是否是离线报告
  858. func (a *AnalysisEntity) UpdateOffline(offline bool) bool {
  859. set := map[string]interface{}{
  860. "i_state": common.If(offline, ReportStateGenerating, ReportStateGenerated),
  861. "i_offline": common.If(offline, ValueOffline, ValueRealTime),
  862. "l_updateTime": time.Now().Unix(),
  863. "s_mgoUserId": a.MgoUserId, // 这里更新这些字段是因为这几个字段是p437 版本新加上的 历史数据没有这些字段
  864. "s_positionId": a.PositionId,
  865. "s_phone": a.Phone,
  866. }
  867. if a.OriginalTotal > 0 {
  868. set["l_originalTotal"] = a.OriginalTotal
  869. }
  870. data := map[string]interface{}{
  871. "$set": set,
  872. }
  873. update := a.Mgo.UpdateById(ReportHistoryTable, a.MgoRecordId, data)
  874. if !update {
  875. log.Println("UpdateOffline 更新报告状态失败:", data, a.MgoRecordId)
  876. }
  877. return update
  878. }
  879. // UpdateState 更新报告生成状态
  880. func (a *AnalysisEntity) UpdateState(state int) bool {
  881. setMap := map[string]interface{}{
  882. "i_state": state,
  883. "l_updateTime": time.Now().Unix(),
  884. }
  885. if state == ReportStateGenerated {
  886. setMap["l_finishTime"] = time.Now().Unix()
  887. }
  888. data := map[string]interface{}{
  889. "$set": setMap,
  890. }
  891. update := a.Mgo.UpdateById(ReportHistoryTable, a.MgoRecordId, data)
  892. if !update {
  893. log.Println("UpdateState 更新报告生成状态失败:", data, a.MgoRecordId)
  894. }
  895. return update
  896. }
  897. // Delete 删除正在分析中的报告
  898. func (a *AnalysisEntity) Delete() (bool, error) {
  899. // 删除报告
  900. queryMap := map[string]interface{}{
  901. "_id": a.MgoRecordId,
  902. "i_del": map[string]interface{}{"$ne": 1},
  903. }
  904. if a.UId == a.Pid { //主账号
  905. queryMap["s_parentId"] = a.Pid
  906. } else {
  907. queryMap["s_userId"] = a.UId
  908. }
  909. //验证
  910. rs, b := a.Mgo.FindOne(ReportHistoryTable, queryMap)
  911. //
  912. if !b || rs == nil {
  913. return false, fmt.Errorf("未查询到该记录")
  914. }
  915. update := a.Mgo.UpdateById(ReportHistoryTable, a.MgoRecordId, map[string]interface{}{"$set": map[string]interface{}{
  916. "l_updateTime": time.Now().Unix(),
  917. "i_del": 1,
  918. }})
  919. if !update {
  920. log.Println("分析报告删除失败:", a.MgoRecordId)
  921. return false, fmt.Errorf("删除失败")
  922. }
  923. // redis里面放取消标识
  924. redis.Put(ReportCacheDB, fmt.Sprintf(ReportCanceledKey, a.MgoRecordId), 1, ReportCanceledTime)
  925. return update, nil
  926. }
  927. // GetCommonQuerySql 公共筛选
  928. func (a *AnalysisEntity) GetCommonQuerySql() string {
  929. var musts, bools []string
  930. //时间
  931. musts = append(musts, fmt.Sprintf(`{"range":{"jgtime":{"gte":%d,"lte":%d}}}`, a.FormatParam.STime, a.FormatParam.ETime))
  932. //地区
  933. if len(a.FormatParam.Area) > 0 || len(a.FormatParam.City) > 0 {
  934. var areaCity []string
  935. if len(a.FormatParam.Area) > 0 {
  936. areaCity = append(areaCity, fmt.Sprintf(`{"terms":{"area":["%s"]}}`, strings.Join(a.FormatParam.Area, `","`)))
  937. }
  938. if len(a.FormatParam.City) > 0 {
  939. areaCity = append(areaCity, fmt.Sprintf(`{"terms":{"city":["%s"]}}`, strings.Join(a.FormatParam.City, `","`)))
  940. }
  941. musts = append(musts, fmt.Sprintf(`{"bool":{"should":[%s],"minimum_should_match": 1}}`, strings.Join(areaCity, ",")))
  942. }
  943. //行业
  944. if len(a.FormatParam.Industry) > 0 {
  945. musts = append(musts, fmt.Sprintf(`{"terms":{"subscopeclass":["%s"]}}`, strings.Join(a.FormatParam.Industry, `","`)))
  946. }
  947. //类型
  948. if len(a.FormatParam.BuyerClass) > 0 {
  949. musts = append(musts, fmt.Sprintf(`{"terms":{"buyerclass":["%s"]}}`, strings.Join(a.FormatParam.BuyerClass, `","`)))
  950. }
  951. //分析报告中标状态限制
  952. musts = append(musts, fmt.Sprintf(queryBoolMust, pSearchDecMust))
  953. //订阅词
  954. for _, v := range getAllKeywordArr(a.FormatParam.KeysItems) {
  955. if sql := getKeyWordSql(v, a.BaseParam.MatchingMode); sql != "" {
  956. bools = append(bools, sql)
  957. }
  958. }
  959. //中标企业
  960. if a.FormatParam.Winner != "" {
  961. var winnerId string
  962. if mongodb.IsObjectIdHex(a.FormatParam.Winner) {
  963. winnerId = a.FormatParam.Winner
  964. } else {
  965. rData := elastic.Get("qyxy", "qyxy", fmt.Sprintf(`{"query": {"bool": {"must": [{"term": {"company_name": "%s"}}]}},"_source":["_id"],"size": 1}`, a.FormatParam.Winner))
  966. if rData != nil && len(*rData) > 0 {
  967. winnerId = common.InterfaceToStr((*rData)[0]["_id"])
  968. }
  969. }
  970. if winnerId != "" {
  971. musts = append(musts, fmt.Sprintf(`{"term":{"entidlist":"%s"}}`, winnerId))
  972. } else {
  973. musts = append(musts, fmt.Sprintf(`{"term":{"s_winner":"%s"}}`, a.FormatParam.Winner))
  974. }
  975. }
  976. //采购单位
  977. if a.FormatParam.Buyer != "" {
  978. musts = append(musts, fmt.Sprintf(`{"term":{"buyer":"%s"}}`, a.FormatParam.Buyer))
  979. }
  980. return fmt.Sprintf(`{"query":{"bool":{"must":[%s],"should":[%s],"minimum_should_match": %d}}%s}`, strings.Join(musts, ","), strings.Join(bools, ","), common.If(len(bools) > 0, 1, 0).(int), "%s")
  981. }
  982. // GetCommonQuerySqlWithAggs 此方法用于聚合查询
  983. func (a *AnalysisEntity) GetCommonQuerySqlWithAggs() string {
  984. return fmt.Sprintf(a.GetCommonQuerySql(), `,"aggs":{%s},"size":0`)
  985. }
  986. // 市场概况+时间分布
  987. func (a *AnalysisEntity) MarketTime() (map[string]interface{}, error) {
  988. var (
  989. sql []string
  990. monthB, yearB bool
  991. MonthRange, YearRange string
  992. isOffline = a.Offline == 1
  993. )
  994. sql = append(sql, fmt.Sprintf(aggsMarketAnalysis, "thismarket", fmt.Sprintf(`{"key":"%s","from":%d,"to":%d}`, "market", a.FormatParam.STime, a.FormatParam.ETime)))
  995. n_stime := a.FormatParam.STime
  996. var n_mae AnalysisEntity
  997. n_mae.FormatParam = a.FormatParam
  998. n_mae.BaseParam = a.BaseParam
  999. n_mae.MgoRecordId = a.MgoRecordId
  1000. n_mae.FormatParam.ETime = a.FormatParam.ETime
  1001. if time.Unix(a.FormatParam.STime, 0).AddDate(1, 0, 0).Unix() >= time.Unix(a.FormatParam.ETime, 0).Unix() {
  1002. n_stime = getPreviousMarket(time.Unix(a.FormatParam.STime, 0), time.Unix(a.FormatParam.ETime, 0))
  1003. sql = append(sql, fmt.Sprintf(aggsMarketAnalysis, "oldmarket", fmt.Sprintf(`{"key":"%s","from":%d,"to":%d}`, "market", n_stime, a.FormatParam.STime)))
  1004. }
  1005. if isOffline || time.Unix(a.FormatParam.STime, 0).AddDate(0, 1, 0).Unix() < time.Unix(a.FormatParam.ETime, 0).Unix() {
  1006. var mon_time, year_time int64
  1007. stime, etime := time.Unix(a.FormatParam.STime, 0), time.Unix(a.FormatParam.ETime, 0)
  1008. monthB, MonthRange = GetMonthData(isOffline, stime, etime)
  1009. sql = append(sql, fmt.Sprintf(projectTimeDistribution, "monthtime", MonthRange))
  1010. if monthB {
  1011. mon_time = stime.AddDate(0, -1, 0).Unix()
  1012. } else {
  1013. mon_time = a.FormatParam.STime
  1014. }
  1015. //年度数据
  1016. yearB, YearRange = GetYearData(isOffline, stime, etime)
  1017. sql = append(sql, fmt.Sprintf(projectTimeDistribution, "yeartime", YearRange))
  1018. if yearB {
  1019. year_time = stime.AddDate(-1, 0, 0).Unix()
  1020. } else {
  1021. year_time = a.FormatParam.STime
  1022. }
  1023. if n_stime > mon_time {
  1024. n_stime = mon_time
  1025. }
  1026. if n_stime > year_time {
  1027. n_stime = year_time
  1028. }
  1029. }
  1030. //非离线
  1031. if !isOffline {
  1032. n_mae.FormatParam.STime = n_stime
  1033. }
  1034. finalSql := fmt.Sprintf(n_mae.GetCommonQuerySqlWithAggs(), strings.Join(sql, ","))
  1035. log.Printf("final MarketScaleRefineQuery sql: %s", finalSql)
  1036. rMap := make(map[string]interface{})
  1037. rMapData := make(map[string]interface{})
  1038. thisRow := marketTime{}
  1039. res := GetAggs(PtIndex, PtType, finalSql)
  1040. if res == nil || len(res) == 0 {
  1041. return nil, fmt.Errorf("未查询到项目")
  1042. }
  1043. Rest(res, &thisRow)
  1044. if thisRow.Thismarket.Buckets != nil && len(thisRow.Thismarket.Buckets) != 0 {
  1045. Projectmarket := thisRow.Thismarket.Buckets[0]
  1046. if Projectmarket.ProjectCount.DocCount == 0 {
  1047. return nil, fmt.Errorf("未查询到项目数据")
  1048. }
  1049. if Projectmarket.ProjectCount.DocCount > MAProjectNumLimit {
  1050. return nil, fmt.Errorf("项目数量超出上限")
  1051. }
  1052. rMapData["project_count"] = Projectmarket.ProjectCount.DocCount
  1053. rMapData["projctamout"] = Projectmarket.ProjectAmount.Value
  1054. rMapData["projectavgmoney"] = Projectmarket.ProjectAvgMoney.Value
  1055. rMapData["buyercount"] = Projectmarket.BuyerCount.Value
  1056. rMapData["winnercount"] = Projectmarket.WinnerCount.Value
  1057. if thisRow.Oldmarket.Buckets != nil && len(thisRow.Oldmarket.Buckets) != 0 {
  1058. oldProjectmarket := thisRow.Oldmarket.Buckets[0]
  1059. //环比数据
  1060. rMapData["projctamount_ratio"] = sequential(Projectmarket.ProjectAmount.Value, oldProjectmarket.ProjectAmount.Value)
  1061. rMapData["project_count_ratio"] = sequential(float64(Projectmarket.ProjectCount.DocCount), float64(oldProjectmarket.ProjectCount.DocCount))
  1062. rMapData["projectavgmoney_ratio"] = sequential(Projectmarket.ProjectAvgMoney.Value, oldProjectmarket.ProjectAvgMoney.Value)
  1063. rMapData["buyercount_ratio"] = sequential(float64(Projectmarket.BuyerCount.Value), float64(oldProjectmarket.BuyerCount.Value))
  1064. rMapData["winnercount_ratio"] = sequential(float64(Projectmarket.WinnerCount.Value), float64(oldProjectmarket.WinnerCount.Value))
  1065. }
  1066. }
  1067. rMap["market_profile"] = rMapData
  1068. if thisRow.Monthtime.Buckets != nil && len(thisRow.Monthtime.Buckets) != 0 {
  1069. rMap["month_distribution"] = n_mae.TimeData(monthB, thisRow.Monthtime.Buckets)
  1070. }
  1071. if thisRow.Yeartime.Buckets != nil && len(thisRow.Yeartime.Buckets) != 0 {
  1072. rMap["year_distribution"] = n_mae.TimeData(yearB, thisRow.Yeartime.Buckets)
  1073. }
  1074. return rMap, nil
  1075. }
  1076. // 时间分布月,年通用数据处理
  1077. func (a *AnalysisEntity) TimeData(_b bool, thisRow []Buckets) map[string]interface{} {
  1078. var count_ss, amout_ss []map[string]interface{}
  1079. for k, v := range thisRow {
  1080. //环比多取一期数据
  1081. if _b && k == 0 {
  1082. continue
  1083. }
  1084. count := make(map[string]interface{})
  1085. amount := make(map[string]interface{})
  1086. count["minth"] = v.Key
  1087. count["value"] = v.ScaleTotal.DocCount
  1088. amount["minth"] = v.Key
  1089. amount["value"] = v.ScaleAmount.Value
  1090. //整月,年统计环比
  1091. if _b {
  1092. doccount := thisRow[k-1].ScaleTotal.DocCount
  1093. amountvalue := thisRow[k-1].ScaleAmount.Value
  1094. count["ratio"] = sequential(float64(v.ScaleTotal.DocCount), float64(doccount))
  1095. amount["ratio"] = sequential(v.ScaleAmount.Value, amountvalue)
  1096. }
  1097. count_ss = append(count_ss, count)
  1098. amout_ss = append(amout_ss, amount)
  1099. }
  1100. rMapData := make(map[string]interface{})
  1101. rMapData["project_count"] = count_ss
  1102. rMapData["project_amount"] = amout_ss
  1103. return rMapData
  1104. }
  1105. // top10
  1106. func (a *AnalysisEntity) ProjectTop10() (rMap map[string]interface{}, err error) {
  1107. finalSql := fmt.Sprintf(a.GetCommonQuerySql(), queryTop10)
  1108. log.Println("ProjectTop10:", finalSql)
  1109. hits := elastic.Get(PtIndex, PtType, finalSql)
  1110. rMap = map[string]interface{}{}
  1111. bArr := []map[string]interface{}{}
  1112. source := *hits
  1113. for _, v := range source {
  1114. bA := map[string]interface{}{}
  1115. if v["ids"] != nil && len(InterToSliceString(v["ids"])) > 0 {
  1116. idss := InterToSliceString(v["ids"])
  1117. bA["_id"] = encodeId(idss[len(idss)-1])
  1118. } else {
  1119. bA["_id"] = nil
  1120. }
  1121. if v["area"] == "" {
  1122. bA["city"] = nil
  1123. } else {
  1124. bA["area"] = v["area"]
  1125. }
  1126. if v["city"] == "" {
  1127. bA["city"] = nil
  1128. } else {
  1129. bA["city"] = v["city"]
  1130. }
  1131. bA["jgtime"] = v["jgtime"]
  1132. bA["projectname"] = v["s_projectname"]
  1133. bA["sortprice"] = v["sortprice"]
  1134. ids := v["entidlist"]
  1135. var id_s []string
  1136. var winner_s []string
  1137. for _, i := range InterToSliceString(ids) {
  1138. iid := encodeId(i)
  1139. id_s = append(id_s, iid)
  1140. }
  1141. bA["eidlist"] = id_s
  1142. bA["winner_s"] = nil
  1143. if common.ObjToString(v["s_winner"]) != "" {
  1144. w_s := strings.Split(common.ObjToString(v["s_winner"]), ",")
  1145. for _, vv := range w_s {
  1146. winner_s = append(winner_s, vv)
  1147. }
  1148. bA["winner_s"] = winner_s
  1149. }
  1150. bArr = append(bArr, bA)
  1151. }
  1152. rMap["ProjectTop10"] = bArr
  1153. return
  1154. }
  1155. func (a *AnalysisEntity) AllData() (rMap map[string]interface{}, err error) {
  1156. rMap = map[string]interface{}{}
  1157. var aggs []string
  1158. area := a.FormatParam.Area
  1159. city := a.FormatParam.City
  1160. buyclass := a.FormatParam.BuyerClass
  1161. aggs = append(aggs, aggsAllCM, fmt.Sprintf(queryAggsSortprice, sortpriceStr), aggsArea)
  1162. if (len(area) + len(city)) != 1 {
  1163. aggs = append(aggs, aggsAreaAmounttop3, aggsAreaCounttop3)
  1164. }
  1165. if len(buyclass) != 1 {
  1166. //aggs = append(aggs, aggs_buyerclass, aggs_buyerclass_other, aggs_buyerclass_amounttop3, aggs_buyerclass_counttop3)
  1167. aggs = append(aggs, aggsBuyerclass, aggsBuyerclassAmounttop3, aggsBuyerclassCounttop3)
  1168. }
  1169. finalSql := fmt.Sprintf(a.GetCommonQuerySqlWithAggs(), strings.Join(aggs, ","))
  1170. log.Println("allData sql:", finalSql)
  1171. res := GetAggs(PtIndex, PtType, finalSql)
  1172. if res == nil || len(res) == 0 {
  1173. return
  1174. }
  1175. thisRow := AreaCTop{}
  1176. for name, object := range res {
  1177. bArr, err := object.MarshalJSON()
  1178. if len(bArr) == 0 || err != nil {
  1179. continue
  1180. }
  1181. if name == "project_amount" {
  1182. if json.Unmarshal(bArr, &thisRow.Amount) != nil {
  1183. continue
  1184. }
  1185. } else if name == "project_count" {
  1186. if json.Unmarshal(bArr, &thisRow.ProjectCount) != nil {
  1187. continue
  1188. }
  1189. //} else if name == "buyerclass_scale_other" {
  1190. // if json.Unmarshal(bArr, &thisRow.BuyerclassScaleOther) != nil {
  1191. // continue
  1192. // }
  1193. } else if name == "project_count_not0" {
  1194. if json.Unmarshal(bArr, &thisRow.CountNot0) != nil {
  1195. continue
  1196. }
  1197. } else if name == "sortprice_ranges" {
  1198. if json.Unmarshal(bArr, &thisRow.SortpriceRanges) != nil {
  1199. continue
  1200. }
  1201. } else if name == "area_distribution" {
  1202. if json.Unmarshal(bArr, &thisRow.AreaDistribution) != nil {
  1203. continue
  1204. }
  1205. } else if name == "buyerclass_scale" {
  1206. if json.Unmarshal(bArr, &thisRow.BuyerclassScale) != nil {
  1207. continue
  1208. }
  1209. } else if name == "area_amount_top3" {
  1210. if json.Unmarshal(bArr, &thisRow.AreaAmountTop3) != nil {
  1211. continue
  1212. }
  1213. } else if name == "area_count_top3" {
  1214. if json.Unmarshal(bArr, &thisRow.AreaCountTop3) != nil {
  1215. continue
  1216. }
  1217. } else if name == "buyerclass_amount_top3" {
  1218. if json.Unmarshal(bArr, &thisRow.BuyclassAmountTop3) != nil {
  1219. continue
  1220. }
  1221. } else if name == "buyerclass_count_top3" {
  1222. if json.Unmarshal(bArr, &thisRow.BuyclassCountTop3) != nil {
  1223. continue
  1224. }
  1225. }
  1226. }
  1227. data := ProjectScale(thisRow)
  1228. area_data := AreaDistribute(thisRow)
  1229. customerData, customerOther := CustomerDistribute(thisRow)
  1230. //customerOther, customerData := CustomerDistributeDetails(thisRow, customerDetails)
  1231. var ids []string
  1232. for _, v := range thisRow.AreaAmountTop3.Buckets {
  1233. for _, va := range v.WinnerTop.Buckets {
  1234. ids = append(ids, va.Winner)
  1235. }
  1236. }
  1237. for _, v := range thisRow.BuyclassAmountTop3.Buckets {
  1238. for _, va := range v.WinnerTop.Buckets {
  1239. ids = append(ids, va.Winner)
  1240. }
  1241. }
  1242. for _, v := range thisRow.AreaCountTop3.Buckets {
  1243. for _, va := range v.WinnerTop.Buckets {
  1244. ids = append(ids, va.Winner)
  1245. }
  1246. }
  1247. for _, v := range thisRow.BuyclassCountTop3.Buckets {
  1248. for _, va := range v.BidcountTop.Buckets {
  1249. ids = append(ids, va.Winner)
  1250. }
  1251. }
  1252. eid := IDToName(ids)
  1253. rMap["projectScale"] = data
  1254. rMap["area_infos"] = area_data
  1255. rMap["customer_scale"] = customerData
  1256. rMap["customer_scale_other"] = customerOther //客户分布(其它)
  1257. rMap["scaleAreaAmountTop"] = AmountCompute(thisRow, "area", eid)
  1258. rMap["scaleBuyclassAmountTop"] = AmountCompute(thisRow, "buyclass", eid)
  1259. rMap["scaleAreaCountTop"] = CountCompute(thisRow, "area", eid)
  1260. rMap["scaleBuyclassCountTop"] = CountCompute(thisRow, "buyclass", eid)
  1261. return
  1262. }
  1263. // MarketScaleRefineQuery 细化聚合
  1264. func (a *AnalysisEntity) MarketScaleRefineQuery() (rMap map[string]interface{}, err error) {
  1265. //关键词分组聚合
  1266. var aggsGroup []string
  1267. //获取总金额及总数
  1268. aggsGroup = append(aggsGroup, `"projectAmount":{"sum":{"field":"sortprice"}}`)
  1269. aggsGroup = append(aggsGroup, `"projectTotal":{"filter":{"match_all":{}}}`)
  1270. itemDataMap := map[string]int64{}
  1271. for _, group := range a.FormatParam.KeysItems {
  1272. var bools []string
  1273. for _, v := range getGroupKeywordArr(group.A_Key) {
  1274. if sql := getKeyWordSql(v, a.BaseParam.MatchingMode); sql != "" {
  1275. bools = append(bools, sql)
  1276. }
  1277. }
  1278. if len(bools) > 0 {
  1279. aggsGroup = append(aggsGroup, fmt.Sprintf(`"%s":{"filter":{"bool":{"should":[%s],"minimum_should_match": 1}},"aggs":{"project_count":{"filter":{"match_all":{}}},"project_amount":{"sum":{"field":"sortprice"}},"winner_total_top":{"terms":{"field":"entidlist","exclude":["-"],"order":[{"refine_winner_total":"desc"}],"size":%d},"aggs":{"refine_winner_total":{"filter":{"match_all":{}}}}},"winner_amount_top":{"terms":{"field":"entidlist","exclude":["-"],"order":[{"refine_winner_amount":"desc"}],"size":%d},"aggs":{"refine_winner_amount":{"sum":{"field":"sortprice"}}}}}}`, group.ItemName, strings.Join(bools, ","), topWinnerLimit, topWinnerLimit))
  1280. }
  1281. itemDataMap[group.ItemName] = group.UpdateTime
  1282. }
  1283. finalSql := fmt.Sprintf(a.GetCommonQuerySqlWithAggs(), strings.Join(aggsGroup, ","))
  1284. fmt.Println("finalSql-----4", finalSql)
  1285. rMap = map[string]interface{}{}
  1286. res := GetAggs(PtIndex, PtType, finalSql)
  1287. if res == nil || len(res) == 0 {
  1288. return
  1289. }
  1290. scale := scaleRefineData{IdSwitch: map[string]string{}}
  1291. for name, object := range res {
  1292. bArr, err := object.MarshalJSON()
  1293. if len(bArr) == 0 || err != nil {
  1294. continue
  1295. }
  1296. if name == "projectTotal" {
  1297. st := simpleTotal{}
  1298. if err := json.Unmarshal(bArr, &st); err == nil {
  1299. scale.Total = st
  1300. }
  1301. continue
  1302. } else if name == "projectAmount" {
  1303. ss := simpleSum{}
  1304. if err := json.Unmarshal(bArr, &ss); err == nil {
  1305. scale.Amount = ss
  1306. }
  1307. continue
  1308. }
  1309. thisRow := scaleRefineRow{}
  1310. if err := json.Unmarshal(bArr, &thisRow); err != nil {
  1311. continue
  1312. }
  1313. thisRow.ItemName = name
  1314. thisRow.UpdateTime = itemDataMap[name]
  1315. scale.Data = append(scale.Data, &thisRow)
  1316. }
  1317. scale.formatData() //获取所需数据
  1318. scale.doIdSwitch() //补充企业名称
  1319. return map[string]interface{}{
  1320. "scaleRefineAll": scale.ReturnData.Overall,
  1321. "scaleRefineTotalTop": scale.ReturnData.TotalTop,
  1322. "scaleRefineAmountTop": scale.ReturnData.AmountTop,
  1323. }, nil
  1324. }
  1325. // 采购单位分析
  1326. func (a *AnalysisEntity) BuyerWinnerAnalysis() map[string]interface{} {
  1327. var datas []string
  1328. //采购单位-采购规模分布
  1329. //buyer_scalefmt := fmt.Sprintf(buyer_procurement_scale, sortprice_str)
  1330. //采购单位-项目数量 采购单位-项目金额
  1331. datas = append(datas, buyerProcurementScale, buyerCount, buyerSortprice)
  1332. //中标单位-规模分布
  1333. //winner_scalefmt := fmt.Sprintf(winner_procurement_scale, sortprice_str)
  1334. //中标单位-项目数量 中标单位-项目金额
  1335. datas = append(datas, winnerProcurementScale, winnerCount, winnerSortprice)
  1336. finalSql := fmt.Sprintf(a.GetCommonQuerySqlWithAggs(), strings.Join(datas, ","))
  1337. log.Printf("final PurchasingAnalysiseQuery sql: %s", finalSql)
  1338. t := time.Now()
  1339. res := GetAggs(PtIndex, PtType, finalSql)
  1340. if res == nil || len(res) == 0 {
  1341. return nil
  1342. }
  1343. //计算entidlist大于1的中标单位金额
  1344. entArrMap := make(map[string]float64)
  1345. winnerKeyMap := make(map[string]int)
  1346. newSql := strings.ReplaceAll(fmt.Sprintf(a.GetCommonQuerySql(), `,"sort": [{"sortprice": {"order": "desc"}}], "_source": ["entidlist","sortprice"],"size":10000`), `],"should":[`, fmt.Sprintf(`%s],"should":[`, `,{"script": {"script": {"source": "doc['entidlist'].length > 1"}}}`))
  1347. log.Println("winner new sql", newSql)
  1348. multiBid := elastic.Get(PtIndex, PtType, newSql)
  1349. if multiBid != nil && len(*multiBid) > 0 {
  1350. for _, m := range *multiBid {
  1351. entidlist, _ := m["entidlist"].([]interface{})
  1352. entArr := common.ObjArrToStringArr(entidlist)
  1353. sortprice := common.Float64All(m["sortprice"])
  1354. entLen := len(entArr)
  1355. switch entLen {
  1356. case 1:
  1357. entArrMap[entArr[0]] = sortprice
  1358. //winnerKeyMap[entArr[0]]++
  1359. case 0:
  1360. default:
  1361. assessedAmount := sortprice / common.Float64All(entLen)
  1362. for _, m2 := range entArr {
  1363. //winnerKeyMap[m2]++
  1364. entArrMap[m2] = assessedAmount
  1365. }
  1366. }
  1367. }
  1368. }
  1369. log.Println("采购单位-中标单位分析报告es查询耗时===", time.Since(t))
  1370. var thisBuyerWinnerRow BuyerWinnerRow
  1371. for name, object := range res {
  1372. bArr, err := object.MarshalJSON()
  1373. if len(bArr) == 0 || err != nil {
  1374. continue
  1375. }
  1376. if name == "buyer_amount_distribution" {
  1377. if json.Unmarshal(bArr, &thisBuyerWinnerRow.BuyerAmountDistribution) != nil {
  1378. continue
  1379. }
  1380. } else if name == "buyer_count_top3" {
  1381. if json.Unmarshal(bArr, &thisBuyerWinnerRow.BuyerCountTop3) != nil {
  1382. continue
  1383. }
  1384. } else if name == "buyer_amount_top3" {
  1385. if json.Unmarshal(bArr, &thisBuyerWinnerRow.BuyerAmountTop3) != nil {
  1386. continue
  1387. }
  1388. } else if name == "winner_amount_distribution" {
  1389. if json.Unmarshal(bArr, &thisBuyerWinnerRow.WinnerAmountDistribution) != nil {
  1390. continue
  1391. }
  1392. } else if name == "winner_count_top3" {
  1393. if json.Unmarshal(bArr, &thisBuyerWinnerRow.WinnerCountTop3) != nil {
  1394. continue
  1395. }
  1396. } else if name == "winner_amount_top3" {
  1397. if json.Unmarshal(bArr, &thisBuyerWinnerRow.WinnerAmountTop3) != nil {
  1398. continue
  1399. }
  1400. }
  1401. }
  1402. var winnerKeys []string
  1403. for _, v := range thisBuyerWinnerRow.BuyerCountTop3.Buckets {
  1404. for _, v1 := range v.SWinnerTop.Buckets {
  1405. winnerKeyMap[v1.Key]++
  1406. }
  1407. }
  1408. for _, v := range thisBuyerWinnerRow.BuyerAmountTop3.Buckets {
  1409. for _, v1 := range v.SWinnerTop.Buckets {
  1410. winnerKeyMap[v1.Key]++
  1411. }
  1412. }
  1413. for _, v := range thisBuyerWinnerRow.WinnerCountTop3.SWinnerCount {
  1414. winnerKeyMap[v.Key]++
  1415. }
  1416. for _, v := range thisBuyerWinnerRow.WinnerAmountTop3.SWinnerAmount {
  1417. winnerKeyMap[v.Key]++
  1418. }
  1419. for k := range winnerKeyMap {
  1420. winnerKeys = append(winnerKeys, k)
  1421. }
  1422. winnerName := GetEntNameByIds(winnerKeys)
  1423. //rMap := make(map[string]interface{})
  1424. var rMap = sync.Map{}
  1425. sy := sync.WaitGroup{}
  1426. sy.Add(2)
  1427. go BuyerAnalysis(thisBuyerWinnerRow, &rMap, winnerName, &sy)
  1428. go WinningAnalysis(thisBuyerWinnerRow, &rMap, entArrMap, winnerName, &sy)
  1429. sy.Wait()
  1430. log.Println("采购单位-中标单位分析报告程序计算耗时===", time.Since(t))
  1431. rMaps := make(map[string]interface{})
  1432. rMap.Range(func(key, value interface{}) bool {
  1433. rMaps[common.InterfaceToStr(key)] = value
  1434. return true
  1435. })
  1436. return rMaps
  1437. }
  1438. func (a *AnalysisEntity) GetPdfPageApi() (finalDate map[string]interface{}, err error) {
  1439. var (
  1440. wg = sync.WaitGroup{}
  1441. lock = &sync.Mutex{}
  1442. )
  1443. finalDate = make(map[string]interface{})
  1444. for i := 1; i <= 5; i++ {
  1445. wg.Add(1)
  1446. go func(flag int) {
  1447. defer wg.Done()
  1448. mgoData, _ := a.GetMongoData(flag)
  1449. lock.Lock()
  1450. for s, i2 := range mgoData {
  1451. finalDate[s] = i2
  1452. }
  1453. lock.Unlock()
  1454. }(i)
  1455. }
  1456. wg.Wait()
  1457. return
  1458. }