marketanalysis.go 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632
  1. package marketanalysis
  2. import (
  3. "app.yhyue.com/moapp/jybase/common"
  4. "app.yhyue.com/moapp/jybase/encrypt"
  5. elastic "app.yhyue.com/moapp/jybase/es"
  6. "app.yhyue.com/moapp/jybase/mongodb"
  7. "app.yhyue.com/moapp/jybase/mysql"
  8. "app.yhyue.com/moapp/jybase/redis"
  9. "app.yhyue.com/moapp/jypkg/common/src/qfw/util/jy"
  10. "encoding/json"
  11. "errors"
  12. "fmt"
  13. "log"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. MAPool chan bool
  20. MATimeout int
  21. MAProjectNumLimit int
  22. MAKeyWordsCount int
  23. MinKeyWordsCount int //关键词数量最低值 小于此数量,默认走线上
  24. MaxPCount int //项目最大限制数量
  25. ProjectCount int
  26. PtIndex = "projectset"
  27. PtType = "projectset"
  28. FieldsDetail = `"purchasing","projectname.pname"`
  29. Fields = `"projectname.pname"`
  30. BWCount = Top1000
  31. BWDCount = BWDistribution
  32. )
  33. func MAInit(limit, timeOut, projectNumLimit, keyWordsCount, pCount, minKCount, maxPCount, bwc, bwdc int, ptIndex, ptType string, fields []string) {
  34. if limit == 0 {
  35. limit = 5
  36. }
  37. MAPool = make(chan bool, limit)
  38. for i := 0; i < limit; i++ {
  39. MAPool <- true
  40. }
  41. if timeOut <= 0 {
  42. timeOut = 20
  43. }
  44. MATimeout = timeOut
  45. if projectNumLimit <= 0 {
  46. projectNumLimit = 600000
  47. }
  48. MAProjectNumLimit = projectNumLimit
  49. if keyWordsCount <= 0 {
  50. keyWordsCount = 300
  51. }
  52. MAKeyWordsCount = keyWordsCount
  53. if minKCount <= 0 {
  54. minKCount = 100
  55. }
  56. MinKeyWordsCount = minKCount
  57. if maxPCount <= 0 {
  58. maxPCount = 5000000
  59. }
  60. MaxPCount = maxPCount
  61. //项目数量
  62. ProjectCount = pCount
  63. //es 索引
  64. if ptIndex != "" {
  65. PtIndex = ptIndex
  66. PtType = ptType
  67. }
  68. //
  69. if len(fields) > 1 {
  70. Fields = fmt.Sprintf(`"%s"`, fields[0])
  71. FieldsDetail = fmt.Sprintf(`"%s"`, strings.Join(fields, `","`))
  72. }
  73. //
  74. if bwc > 0 {
  75. BWCount = bwc
  76. }
  77. if bwdc > 0 {
  78. BWDCount = bwdc
  79. }
  80. }
  81. // AnalysisRequestParam 接口原请求参数
  82. type AnalysisRequestParam struct {
  83. KeysItemsStr string //分析内容【字符串】结构和o_member_jy.a_items保持一致
  84. RangeTime string //时间【字符串】 时间戳开始-结束时间戳
  85. RangeTimeExtra string //时间【字符串】前段回显使用
  86. Area string //省份【对象字符串】
  87. Industry string //行业【对象字符串】
  88. BuyerClass string //采购单位类型【字符串】多个采购单位类型用逗号拼接
  89. Buyer string //采购单位
  90. Winner string //中标单位
  91. Sort int //排序:默认0:成交时间倒序;1:项目金额倒序
  92. PageSize int //默认每页10条
  93. PageNum int //默认当前第一页
  94. IsDetail bool //是否是项目明细请求
  95. MatchingMode string //匹配方式 title:标题 content:项目名称/标的物
  96. }
  97. type viewKeyWord struct {
  98. Keyword []string `json:"key"` //关键词
  99. Appended []string `json:"appendkey"` //附加词
  100. Exclude []string `json:"notkey"` //排除词
  101. MatchWay int `json:"matchway"` //匹配模式
  102. }
  103. // keyWordGroup 订阅词结构体
  104. type keyWordGroup struct {
  105. A_Key []viewKeyWord `json:"a_key"`
  106. ItemName string `json:"s_item"`
  107. UpdateTime int64 `json:"updatetime"`
  108. }
  109. // AnalysisRequestFormat 格式化后参数
  110. type AnalysisRequestFormat struct {
  111. KeysItems []keyWordGroup
  112. Area, City []string //省份城市
  113. STime, ETime int64 //开始结束时间
  114. Industry []string //行业
  115. BuyerClass []string //采购单位类型
  116. Buyer string //采购单位
  117. Winner string //中标单位
  118. Sort int //排序:默认0:成交时间倒序;1:项目金额倒序
  119. PageSize int //默认每页10条
  120. PageNum int //默认当前第一页
  121. }
  122. type AnalysisEntity struct {
  123. MgoRecordId string
  124. BaseParam AnalysisRequestParam
  125. FormatParam AnalysisRequestFormat
  126. UId, Pid string
  127. ProjectInfo projectInfo
  128. Offline int // 1-离线 2-实时
  129. State int // 状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
  130. MgoUserId string
  131. Phone string // 手机号
  132. PositionId int64
  133. OriginalTotal int64 // 数据总数
  134. KeysTotal int64 //关键词数量
  135. PositionType int
  136. EntId int
  137. EntUserId int
  138. Source string
  139. Mgo *mongodb.MongodbSim
  140. MySql *mysql.Mysql
  141. Rs int //分析来源
  142. }
  143. type projectInfo struct {
  144. Count int64
  145. List []ProjectList
  146. }
  147. type ProjectList struct {
  148. Name string `json:"name"` //项目名称
  149. Id string `json:"id"` //项目id
  150. Area string `json:"area"` //地区
  151. DealTime int64 `json:"dealTime"` //成交时间
  152. BidStatus string `json:"bidStatus"` //项目类型
  153. BuyerClass string `json:"buyerClass"` //采购单位类型
  154. Winner []string `json:"winner"` //中标单位
  155. WinnerId []string `json:"winnerId"` //中标单位id
  156. Buyer string `json:"buyer"` //采购单位
  157. BidAmount float64 `json:"bidAmount"` //中标金额
  158. Budget float64 `json:"budget"` //预算
  159. }
  160. // ForMatData 获取格式化请求参数
  161. func (a *AnalysisEntity) ForMatData() error {
  162. //格式化订阅词
  163. if err := json.Unmarshal([]byte(a.BaseParam.KeysItemsStr), &a.FormatParam.KeysItems); err != nil {
  164. return fmt.Errorf("关键词组格式异常")
  165. }
  166. if a.FormatParam.KeysItems == nil || len(a.FormatParam.KeysItems) == 0 {
  167. return fmt.Errorf("请选择关键词组")
  168. }
  169. var flag bool
  170. if a.Rs == 0 { //普通分析(非pdf)
  171. // 判断关键词是不是为空
  172. for i := 0; i < len(a.FormatParam.KeysItems); i++ {
  173. items := a.FormatParam.KeysItems[i]
  174. for j := 0; j < len(items.A_Key); j++ {
  175. AKey := items.A_Key[j]
  176. if len(AKey.Keyword) > 0 {
  177. var keys []string
  178. for _, kv := range AKey.Keyword {
  179. if strings.TrimSpace(kv) != "" {
  180. keys = append(keys, kv)
  181. }
  182. }
  183. AKey.Keyword = []string{}
  184. if len(keys) > 0 {
  185. AKey.Keyword = keys
  186. flag = true
  187. break
  188. }
  189. }
  190. if len(AKey.Keyword) == 0 {
  191. items.A_Key = append(items.A_Key[:j], items.A_Key[j+1:]...)
  192. }
  193. }
  194. if flag {
  195. break
  196. }
  197. a.FormatParam.KeysItems = append(a.FormatParam.KeysItems[:i], a.FormatParam.KeysItems[i+1:]...)
  198. }
  199. if !flag {
  200. return fmt.Errorf("请选择关键词组")
  201. }
  202. }
  203. //格式化时间段
  204. if timeArr := strings.Split(a.BaseParam.RangeTime, "-"); len(timeArr) == 2 {
  205. a.FormatParam.STime = common.Int64All(timeArr[0])
  206. a.FormatParam.ETime = common.Int64All(timeArr[1])
  207. if a.FormatParam.STime == 0 || a.FormatParam.ETime == 0 {
  208. return fmt.Errorf("开始时间和结束时间不能为空")
  209. }
  210. } else {
  211. return fmt.Errorf("时间戳格式异常")
  212. }
  213. //格式化省份、城市
  214. if areaStr := strings.TrimSpace(a.BaseParam.Area); areaStr != "" {
  215. imap := map[string][]string{}
  216. if err := json.Unmarshal([]byte(a.BaseParam.Area), &imap); err != nil {
  217. return fmt.Errorf("非法地区信息")
  218. }
  219. var city, area []string
  220. for name, v := range imap {
  221. if len(v) == 0 {
  222. area = append(area, name)
  223. } else {
  224. for _, vv := range v {
  225. city = append(city, vv)
  226. }
  227. }
  228. }
  229. a.FormatParam.Area = area
  230. a.FormatParam.City = city
  231. }
  232. //格式化行业
  233. if industryStr := strings.TrimSpace(a.BaseParam.Industry); industryStr != "" {
  234. imap := map[string][]string{}
  235. if err := json.Unmarshal([]byte(industryStr), &imap); err != nil {
  236. return fmt.Errorf("非法行业信息")
  237. }
  238. var farr []string
  239. for name, v := range imap {
  240. for _, vv := range v {
  241. farr = append(farr, fmt.Sprintf("%s_%s", name, vv))
  242. }
  243. }
  244. if len(farr) > 0 {
  245. //P510 行业:其它
  246. if qt := jy.IndustryHandle(strings.Join(farr, ",")); len(qt) > 0 {
  247. farr = append(farr, qt...)
  248. }
  249. }
  250. a.FormatParam.Industry = farr
  251. }
  252. //格式化类型
  253. if buyerClassStr := strings.TrimSpace(a.BaseParam.BuyerClass); buyerClassStr != "" {
  254. a.FormatParam.BuyerClass = strings.Split(buyerClassStr, ",")
  255. }
  256. //中标企业
  257. a.FormatParam.Winner = a.BaseParam.Winner
  258. //采购单位
  259. a.FormatParam.Buyer = a.BaseParam.Buyer
  260. //排序
  261. a.FormatParam.Sort = common.If(a.BaseParam.Sort != 0 && a.BaseParam.Sort != 1, 0, a.BaseParam.Sort).(int)
  262. if a.BaseParam.PageNum*a.BaseParam.PageSize > ProjectCount {
  263. a.BaseParam.PageNum = ProjectCount / a.BaseParam.PageSize
  264. }
  265. //当前页码
  266. a.FormatParam.PageNum = common.If(a.BaseParam.PageNum < 1 || a.BaseParam.PageNum > 1000, 1, a.BaseParam.PageNum).(int)
  267. //默认每页10条
  268. a.FormatParam.PageSize = common.If(a.BaseParam.PageSize < 1 || a.BaseParam.PageSize > 100, 50, a.BaseParam.PageSize).(int)
  269. return nil
  270. }
  271. // ForMatData 获取格式化请求参数
  272. func (a *AnalysisEntity) ForMatDataPdf() (string, error) {
  273. //格式化订阅词
  274. if err := json.Unmarshal([]byte(a.BaseParam.KeysItemsStr), &a.FormatParam.KeysItems); err != nil {
  275. return "", fmt.Errorf("关键词组格式异常")
  276. }
  277. //格式化时间段
  278. if timeArr := strings.Split(a.BaseParam.RangeTime, "-"); len(timeArr) == 2 {
  279. a.FormatParam.STime = common.Int64All(timeArr[0])
  280. a.FormatParam.ETime = common.Int64All(timeArr[1])
  281. if a.FormatParam.STime == 0 || a.FormatParam.ETime == 0 {
  282. return "", fmt.Errorf("开始时间和结束时间不能为空")
  283. }
  284. } else {
  285. return "", fmt.Errorf("时间戳格式异常")
  286. }
  287. //格式化省份、城市
  288. if areaStr := strings.TrimSpace(a.BaseParam.Area); areaStr != "" {
  289. imap := map[string][]string{}
  290. if err := json.Unmarshal([]byte(a.BaseParam.Area), &imap); err != nil {
  291. return "", fmt.Errorf("非法地区信息")
  292. }
  293. var city, area []string
  294. for name, v := range imap {
  295. if len(v) == 0 {
  296. area = append(area, name)
  297. } else {
  298. for _, vv := range v {
  299. city = append(city, vv)
  300. }
  301. }
  302. }
  303. a.FormatParam.Area = area
  304. a.FormatParam.City = city
  305. }
  306. //格式化行业
  307. if industryStr := strings.TrimSpace(a.BaseParam.Industry); industryStr != "" {
  308. imap := map[string][]string{}
  309. if err := json.Unmarshal([]byte(industryStr), &imap); err != nil {
  310. return "", fmt.Errorf("非法行业信息")
  311. }
  312. var farr []string
  313. for name, v := range imap {
  314. for _, vv := range v {
  315. farr = append(farr, fmt.Sprintf("%s_%s", name, vv))
  316. }
  317. }
  318. a.FormatParam.Industry = farr
  319. }
  320. //格式化类型
  321. if buyerClassStr := strings.TrimSpace(a.BaseParam.BuyerClass); buyerClassStr != "" {
  322. a.FormatParam.BuyerClass = strings.Split(buyerClassStr, ",")
  323. }
  324. //中标企业
  325. a.FormatParam.Winner = a.BaseParam.Winner
  326. //采购单位
  327. a.FormatParam.Buyer = a.BaseParam.Buyer
  328. //排序
  329. a.FormatParam.Sort = common.If(a.BaseParam.Sort != 0 && a.BaseParam.Sort != 1, 0, a.BaseParam.Sort).(int)
  330. if a.BaseParam.PageNum*a.BaseParam.PageSize > ProjectCount {
  331. a.BaseParam.PageNum = ProjectCount / a.BaseParam.PageSize
  332. }
  333. //当前页码
  334. a.FormatParam.PageNum = common.If(a.BaseParam.PageNum < 1 || a.BaseParam.PageNum > 1000, 1, a.BaseParam.PageNum).(int)
  335. //默认每页10条
  336. a.FormatParam.PageSize = common.If(a.BaseParam.PageSize < 1 || a.BaseParam.PageSize > 100, 50, a.BaseParam.PageSize).(int)
  337. data := map[string]interface{}{
  338. "s_keysItems": a.BaseParam.KeysItemsStr,
  339. "s_rangeTime": a.BaseParam.RangeTime,
  340. "s_rangeTimeExtra": a.BaseParam.RangeTimeExtra,
  341. "s_area": a.BaseParam.Area,
  342. "s_industry": a.BaseParam.Industry,
  343. "s_buyerClass": a.BaseParam.BuyerClass,
  344. "s_matchingMode": a.BaseParam.MatchingMode,
  345. "s_userId": a.UId,
  346. "s_parentId": a.Pid,
  347. "s_mgoUserId": a.MgoUserId,
  348. "i_positionId": a.PositionId,
  349. "s_phone": a.Phone,
  350. "audit": 1,
  351. }
  352. if a.OriginalTotal > 0 {
  353. data["l_originalTotal"] = a.OriginalTotal
  354. }
  355. if a.Source != "" {
  356. data["source"] = a.Source
  357. }
  358. rs, b := a.Mgo.FindOne(ReportHistoryTable, data)
  359. if b && rs != nil && len(*rs) > 0 {
  360. return common.InterfaceToStr((*rs)["_id"]), errors.New("当期分析已存在")
  361. }
  362. return "", nil
  363. }
  364. // GetProjectInfoList 项目明细
  365. func (a *AnalysisEntity) GetProjectInfoList() error {
  366. var (
  367. queryDefault = `,"sort": [{%s}],"from": %d,"size": %d`
  368. start = (a.FormatParam.PageNum - 1) * a.FormatParam.PageSize
  369. sort = `"jgtime": "desc"`
  370. )
  371. if a.FormatParam.Sort > 0 {
  372. sort = `"bidamount": "desc","budget": "desc"`
  373. }
  374. countSql := fmt.Sprintf(a.GetCommonQuerySql(), "")
  375. queryDefault = fmt.Sprintf(queryDefault, sort, start, a.FormatParam.PageSize)
  376. finalSql := fmt.Sprintf(a.GetCommonQuerySql(), queryDefault)
  377. log.Println("finalSql:", finalSql)
  378. count, hits := elastic.GetWithCount(PtIndex, PtType, countSql, finalSql)
  379. //hits, count := elastic.GetOA(PtIndex, PtType, finalSql)
  380. if count > 0 {
  381. a.ProjectInfo.Count = count
  382. source := *hits
  383. for _, v := range source {
  384. var winnerIdArr []string
  385. if common.ObjToString(v["s_winner"]) != "" && v["entidlist"] != nil {
  386. idObjs, _ := v["entidlist"].([]interface{})
  387. for _, v := range common.ObjArrToStringArr(idObjs) {
  388. if v != "" && v != "-" {
  389. v = encrypt.EncodeArticleId2ByCheck(v)
  390. }
  391. winnerIdArr = append(winnerIdArr, v)
  392. }
  393. }
  394. a.ProjectInfo.List = append(a.ProjectInfo.List, ProjectList{
  395. Name: common.ObjToString(v["projectname"]),
  396. Id: encrypt.EncodeArticleId2ByCheck(common.ObjToString(v["id"])),
  397. Area: common.ObjToString(v["area"]),
  398. DealTime: common.Int64All(v["jgtime"]), //截止时间
  399. BidStatus: common.ObjToString(v["bidstatus"]),
  400. BuyerClass: common.ObjToString(v["buyerclass"]),
  401. Winner: strings.Split(common.ObjToString(v["s_winner"]), ","),
  402. WinnerId: winnerIdArr,
  403. Buyer: common.ObjToString(v["buyer"]),
  404. BidAmount: common.Float64All(v["bidamount"]), //中标金额
  405. Budget: common.Float64All(v["budget"]), //预算
  406. })
  407. }
  408. }
  409. return nil
  410. }
  411. // SaveAnalysisRecord 保存分析记录
  412. func (a *AnalysisEntity) SaveAnalysisRecord() error {
  413. if a.Offline == ValueRealTime {
  414. a.State = ReportStateGenerated
  415. } else {
  416. a.State = ReportStateGenerating
  417. }
  418. data := map[string]interface{}{
  419. "s_keysItems": a.BaseParam.KeysItemsStr,
  420. "s_rangeTime": a.BaseParam.RangeTime,
  421. "s_rangeTimeExtra": a.BaseParam.RangeTimeExtra,
  422. "s_area": a.BaseParam.Area,
  423. "s_industry": a.BaseParam.Industry,
  424. "s_buyerClass": a.BaseParam.BuyerClass,
  425. "s_matchingMode": a.BaseParam.MatchingMode,
  426. "s_userId": a.UId,
  427. "s_parentId": a.Pid,
  428. "i_state": common.If(a.Source == "analysisPDF", 1, a.State), //状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
  429. "l_updateTime": time.Now().Unix(), //生成时间 or 取消时间
  430. "l_createTime": time.Now().Unix(),
  431. "i_offline": common.If(a.Source == "analysisPDF", 2, a.Offline),
  432. "s_mgoUserId": a.MgoUserId,
  433. "i_positionId": a.PositionId,
  434. "s_phone": a.Phone,
  435. }
  436. if a.OriginalTotal > 0 {
  437. data["l_originalTotal"] = a.OriginalTotal
  438. }
  439. if a.KeysTotal > 0 {
  440. data["l_keysTotal"] = a.KeysTotal
  441. }
  442. if a.Source != "" {
  443. data["source"] = a.Source
  444. }
  445. a.MgoRecordId = a.Mgo.Save(ReportHistoryTable, data)
  446. if a.MgoRecordId == "" {
  447. return fmt.Errorf("分析创建异常")
  448. }
  449. return nil
  450. }
  451. // GetAnalysisFromMgoDb 从数据库中获取分析记录
  452. func (a *AnalysisEntity) GetAnalysisFromMgoDb() error {
  453. if a.MgoRecordId == "" {
  454. return fmt.Errorf("缺少参数")
  455. }
  456. queryMap := map[string]interface{}{
  457. "_id": mongodb.StringTOBsonId(a.MgoRecordId),
  458. "i_del": map[string]interface{}{"$ne": 1},
  459. }
  460. if a.UId == a.Pid { //主账号
  461. queryMap["s_parentId"] = a.Pid
  462. } else {
  463. queryMap["s_userId"] = a.UId
  464. }
  465. res, _ := a.Mgo.FindOne(ReportHistoryTable, queryMap)
  466. if res == nil || len(*res) == 0 {
  467. return fmt.Errorf("未查询到相关数据")
  468. }
  469. a.Offline = common.IntAll((*res)["i_offline"])
  470. a.State = common.IntAll((*res)["i_state"])
  471. a.BaseParam.KeysItemsStr, _ = (*res)["s_keysItems"].(string)
  472. a.BaseParam.RangeTime, _ = (*res)["s_rangeTime"].(string)
  473. a.BaseParam.RangeTimeExtra, _ = (*res)["s_rangeTimeExtra"].(string)
  474. a.BaseParam.Area, _ = (*res)["s_area"].(string)
  475. a.BaseParam.Industry, _ = (*res)["s_industry"].(string)
  476. a.BaseParam.BuyerClass, _ = (*res)["s_buyerClass"].(string)
  477. a.BaseParam.MatchingMode, _ = (*res)["s_matchingMode"].(string)
  478. return nil
  479. }
  480. // removeEmptyRecord 删除空报告记录
  481. func (a *AnalysisEntity) removeEmptyRecord() {
  482. queryMap := map[string]interface{}{
  483. "_id": mongodb.StringTOBsonId(a.MgoRecordId),
  484. }
  485. if a.UId == a.Pid { //主账号
  486. queryMap["s_parentId"] = a.Pid
  487. } else {
  488. queryMap["s_userId"] = a.UId
  489. }
  490. a.Mgo.Update(ReportHistoryTable, queryMap, map[string]interface{}{
  491. "$set": map[string]interface{}{"i_del": 1},
  492. }, false, false)
  493. //log.Println("删除空报告", queryMap)
  494. }
  495. // GetRecordList 获取分析记录
  496. func (a *AnalysisEntity) GetRecordList(pageNum, PageSize int, positionType int, entId, entUserId int) (total int, list []map[string]interface{}) {
  497. queryMap := map[string]interface{}{
  498. "i_del": map[string]interface{}{"$ne": 1},
  499. }
  500. if a.UId == a.Pid { //主账号
  501. queryMap["s_parentId"] = a.Pid
  502. } else {
  503. queryMap["s_userId"] = a.UId
  504. }
  505. if pageNum == 1 {
  506. total = a.Mgo.Count(ReportHistoryTable, queryMap)
  507. if total == 0 {
  508. return
  509. }
  510. } else {
  511. total = -1
  512. }
  513. res, _ := a.Mgo.Find(ReportHistoryTable, queryMap, `{"l_createTime":-1}`, nil, false, (pageNum-1)*PageSize, PageSize)
  514. if res == nil || len(*res) == 0 {
  515. return
  516. }
  517. var ids []string
  518. for _, m := range *res {
  519. ids = append(ids, common.InterfaceToStr(m["_id"]))
  520. }
  521. idMap := make(map[string]map[string]interface{})
  522. tName, _ := GetMongoColl(MarketScaleMain)
  523. mids, _ := a.Mgo.Find(tName, map[string]interface{}{
  524. "s_m_id": map[string]interface{}{
  525. "$in": ids,
  526. },
  527. }, "", `{"market_profile":1,"s_m_id":1}`, false, -1, -1)
  528. if mids != nil && len(*mids) > 0 {
  529. for _, m := range *mids {
  530. marketProfile, _ := m["market_profile"].(map[string]interface{})
  531. if common.IntAll(marketProfile["project_count"]) > 0 {
  532. idMap[common.InterfaceToStr(m["s_m_id"])] = marketProfile
  533. }
  534. }
  535. }
  536. //用户消息开关
  537. open := GetMsgOpen(a.Mgo, a.MgoUserId, positionType, entId, entUserId)
  538. for _, row := range *res {
  539. var status int
  540. if row["i_state"] != nil {
  541. status = common.IntAll(row["i_state"])
  542. if status == ReportStateFailed {
  543. status = ReportStateGenerating // 生成失败对外还是展示为生成中
  544. }
  545. }
  546. var (
  547. isDownload bool
  548. )
  549. marketProfile := make(map[string]interface{})
  550. if marketProfile = idMap[mongodb.BsonIdToSId(row["_id"])]; marketProfile != nil && common.IntAll(marketProfile["project_count"]) > 0 && status == 1 {
  551. isDownload = true
  552. }
  553. //P510 回显 移除 其它
  554. industry := common.ObjToString(row["s_industry"])
  555. if industry != "" {
  556. industry = strings.ReplaceAll(industry, "\"其它\"", "")
  557. }
  558. data := map[string]interface{}{
  559. "id": encodeId(mongodb.BsonIdToSId(row["_id"])),
  560. "keysItems": common.ObjToString(row["s_keysItems"]),
  561. "area": common.ObjToString(row["s_area"]),
  562. "industry": industry,
  563. "buyerclass": common.ObjToString(row["s_buyerClass"]),
  564. "rangeTime": common.ObjToString(row["s_rangeTime"]),
  565. "s_rangeTimeExtra": common.ObjToString(row["s_rangeTimeExtra"]),
  566. "createTime": common.Int64All(row["l_createTime"]),
  567. "matchingMode": common.ObjToString(row["s_matchingMode"]), //项目匹配方式
  568. "state": common.If(row["i_state"] == nil, nil, status),
  569. "updateTime": common.Int64All(row["l_updateTime"]),
  570. "msgOpen": open,
  571. "isDownload": isDownload,
  572. "marketProfile": marketProfile,
  573. }
  574. list = append(list, data)
  575. }
  576. return
  577. }
  578. func (a *AnalysisEntity) GetRecordPdfList(pageNum, PageSize int) (total int, list []map[string]interface{}) {
  579. queryMap := map[string]interface{}{
  580. "i_del": map[string]interface{}{"$ne": 1},
  581. "source": "analysisPDF",
  582. }
  583. if a.UId == a.Pid { //主账号
  584. queryMap["s_parentId"] = a.Pid
  585. } else {
  586. queryMap["s_userId"] = a.UId
  587. }
  588. if pageNum == 1 {
  589. total = a.Mgo.Count(ReportHistoryTable, queryMap)
  590. if total == 0 {
  591. return
  592. }
  593. } else {
  594. total = -1
  595. }
  596. res, _ := a.Mgo.Find(ReportHistoryTable, queryMap, `{"l_createTime":-1}`, nil, false, (pageNum-1)*PageSize, PageSize)
  597. if res == nil || len(*res) == 0 {
  598. return
  599. }
  600. for _, row := range *res {
  601. data := map[string]interface{}{
  602. "id": encodeId(mongodb.BsonIdToSId(row["_id"])),
  603. "keysItems": common.ObjToString(row["s_keysItems"]),
  604. "area": common.ObjToString(row["s_area"]),
  605. "industry": common.ObjToString(row["s_industry"]),
  606. "buyerclass": common.ObjToString(row["s_buyerClass"]),
  607. "rangeTime": common.ObjToString(row["s_rangeTime"]),
  608. "s_rangeTimeExtra": common.ObjToString(row["s_rangeTimeExtra"]),
  609. "createTime": common.Int64All(row["l_createTime"]),
  610. "matchingMode": common.ObjToString(row["s_matchingMode"]), //项目匹配方式
  611. "updateTime": common.Int64All(row["l_updateTime"]),
  612. "audit": common.Int64All(row["audit"]),
  613. }
  614. list = append(list, data)
  615. }
  616. return
  617. }
  618. // GetQueryItem 获取查询条件,前端回显使用
  619. func (a *AnalysisEntity) getQueryItem() (map[string]interface{}, error) {
  620. return map[string]interface{}{
  621. "keysItems": a.BaseParam.KeysItemsStr,
  622. "area": a.BaseParam.Area,
  623. "industry": a.BaseParam.Industry,
  624. "buyerclass": a.BaseParam.BuyerClass,
  625. "rangeTime": a.BaseParam.RangeTime,
  626. "s_rangeTimeExtra": a.BaseParam.RangeTimeExtra,
  627. "matchingMode": a.BaseParam.MatchingMode,
  628. }, nil
  629. }
  630. // GetPartResult 分块儿获取报告内容
  631. func (a *AnalysisEntity) GetPartResult(flag int) (map[string]interface{}, error) {
  632. defer common.Catch()
  633. if flag == MarketQueryItem { //返回查询内容
  634. return a.getQueryItem()
  635. }
  636. thisCacheKey := fmt.Sprintf(ReportCacheKey, a.MgoRecordId, flag)
  637. if cacheData := redis.Get(ReportCacheDB, thisCacheKey); cacheData != nil {
  638. if cacheMap, ok := cacheData.(map[string]interface{}); ok && len(cacheMap) > 0 {
  639. return cacheMap, nil
  640. }
  641. }
  642. rData, err := func() (map[string]interface{}, error) {
  643. //控制并发&&超时返回超时异常
  644. select {
  645. case <-time.After(time.Duration(MATimeout) * time.Second * 20):
  646. return nil, fmt.Errorf("查询超时,请稍后重试")
  647. case <-MAPool:
  648. }
  649. start := time.Now()
  650. defer func() {
  651. MAPool <- true
  652. log.Printf("report %s[%d] speed %d ms\n", a.MgoRecordId, flag, time.Now().Sub(start).Milliseconds())
  653. }()
  654. //校验报告是否合法
  655. //if flag != MarketScaleMain {
  656. // if exists, _ := redis.Exists(ReportCacheDB, fmt.Sprintf(ReportCacheKey, a.MgoRecordId, 1)); !exists {
  657. // return nil, fmt.Errorf("报告异常请求,请刷新重试")
  658. // }
  659. //}
  660. // 1. 查mongo
  661. var rData map[string]interface{}
  662. var err error
  663. rData, err = a.GetMongoData(flag)
  664. // 查到数据则直接返回
  665. if err == nil && rData != nil {
  666. return rData, nil
  667. }
  668. // 2.没有查询到数据 判断是不是离线的
  669. // 正常情况下正在离线生的不会走到这里
  670. // 离线的应该生成完报告之后才会调用获取结果接口 这里处理是防止直接调接口传正在离线生的报告
  671. if a.Offline == ValueOffline || a.Offline == 0 {
  672. return nil, err
  673. }
  674. // 3. 实时则查es
  675. rData, err = a.RealTimeQuery(flag)
  676. if err == nil && len(rData) > 0 {
  677. // 4.存库
  678. a.SaveMongoReport(rData, flag)
  679. return rData, err
  680. }
  681. return nil, err
  682. }()
  683. if err == nil && rData != nil && len(rData) > 0 {
  684. delete(rData, "s_m_id")
  685. delete(rData, "_id")
  686. redis.Put(ReportCacheDB, thisCacheKey, rData, ReportCacheTime)
  687. }
  688. return rData, err
  689. }
  690. // 离线 获取采购单位和中部单位数据
  691. func (a *AnalysisEntity) BWData() (buyers []string, winners []string) {
  692. sql := `"buyer_terms": {"terms": {"field": "buyer","size": 50000}},"winner_terms": {"terms": {"field": "s_winner","size": 50000}}`
  693. finalSql := fmt.Sprintf(a.GetCommonQuerySqlWithAggs(), sql)
  694. res := GetAggs(PtIndex, PtType, finalSql)
  695. if res == nil || len(res) == 0 {
  696. return
  697. }
  698. var bw = BWBuckets{}
  699. for name, object := range res {
  700. bArr, err := object.MarshalJSON()
  701. if len(bArr) == 0 || err != nil {
  702. continue
  703. }
  704. if name == "buyer_terms" {
  705. if json.Unmarshal(bArr, &bw.Buyerterms) != nil {
  706. continue
  707. }
  708. } else if name == "winner_terms" {
  709. if json.Unmarshal(bArr, &bw.Winnerterms) != nil {
  710. continue
  711. }
  712. }
  713. }
  714. for _, bv := range bw.Buyerterms.Buckets {
  715. buyers = append(buyers, bv.Key)
  716. }
  717. for _, wv := range bw.Winnerterms.Buckets {
  718. winners = append(winners, wv.Key)
  719. }
  720. return
  721. }
  722. // 实时查询
  723. func (a *AnalysisEntity) RealTimeQuery(flag int) (map[string]interface{}, error) {
  724. switch flag {
  725. case MarketScaleMain:
  726. rData, err := a.MarketTime()
  727. //非离线
  728. if a.Offline != ValueOffline && err != nil { //若无报告内容,删除报告记录
  729. go a.removeEmptyRecord()
  730. }
  731. if rData != nil && a.Offline == ValueOffline {
  732. //采购单位和中部单位 数据
  733. rData["buyers"], rData["winners"] = a.BWData()
  734. }
  735. return rData, err
  736. case MarketTopProject:
  737. return a.ProjectTop10()
  738. case MarketProjectAllData:
  739. return a.AllData()
  740. case MarketScaleRefine:
  741. return a.MarketScaleRefineQuery()
  742. case MarketBuyerAndWinner:
  743. return a.BuyerWinnerAnalysis(), nil
  744. }
  745. return nil, fmt.Errorf("未知请求")
  746. }
  747. // GetMongoData 从mongo库查询数据
  748. func (a *AnalysisEntity) GetMongoData(flag int) (map[string]interface{}, error) {
  749. collName, err := GetMongoColl(flag)
  750. if err != nil || collName == "" {
  751. return nil, fmt.Errorf("未知请求")
  752. }
  753. // 查询
  754. query := map[string]interface{}{
  755. "s_m_id": a.MgoRecordId,
  756. }
  757. data, b := a.Mgo.FindOne(collName, query)
  758. if !b || data == nil || len(*data) == 0 {
  759. log.Println("没有查询到数据", b, query, collName)
  760. return nil, nil
  761. }
  762. delete(*data, "s_m_id")
  763. delete(*data, "_id")
  764. return *data, nil
  765. }
  766. // GetMongoColl 获取mgo库对应的Coll
  767. func GetMongoColl(flag int) (string, error) {
  768. switch flag {
  769. case MarketScaleMain:
  770. return CollMarketScaleMain, nil
  771. case MarketTopProject:
  772. return CollMarketTopProject, nil
  773. case MarketProjectAllData:
  774. return CollMarketProjectAllData, nil
  775. case MarketScaleRefine:
  776. return CollMarketScaleRefine, nil
  777. case MarketBuyerAndWinner:
  778. return CollMarketBuyerAndWinner, nil
  779. }
  780. return "", fmt.Errorf("未知类型")
  781. }
  782. // SaveMongoReport 数据存mongo库
  783. func (a *AnalysisEntity) SaveMongoReport(rData map[string]interface{}, flag int) {
  784. collName, _ := GetMongoColl(flag)
  785. rData["s_m_id"] = a.MgoRecordId
  786. b, err := json.Marshal(rData)
  787. if err != nil {
  788. log.Println("JSON marshal error:", err)
  789. } else {
  790. var saveMap map[string]interface{}
  791. err = json.Unmarshal(b, &saveMap)
  792. if err != nil {
  793. log.Println("JSON Unmarshal error:", err)
  794. a.Mgo.Save(collName, rData)
  795. } else {
  796. a.Mgo.Save(collName, saveMap)
  797. }
  798. }
  799. }
  800. // GetAnalyzingReport 是否有正在分析的离线报告
  801. func (a *AnalysisEntity) GetAnalyzingReport() string {
  802. query := map[string]interface{}{
  803. "s_userId": a.UId,
  804. "i_state": map[string]interface{}{"$in": []int{ReportStateGenerating, ReportStateFailed}}, //状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
  805. "i_del": map[string]interface{}{"$ne": 1},
  806. }
  807. //log.Println("query:", query)
  808. rs, b := a.Mgo.FindOne(ReportHistoryTable, query)
  809. //log.Println("rs,b:", rs, b)
  810. if b && rs != nil {
  811. return mongodb.BsonIdToSId((*rs)["_id"])
  812. }
  813. return ""
  814. }
  815. // IsOffline 判断是否符合在线分析的条件
  816. func (a *AnalysisEntity) IsOffline() {
  817. //离线生成:订阅词(关键词+排除词)超过300个(数量支持配置),或单次分析数据超过60万条,则离线生成;
  818. // 查询数据量
  819. countSql := fmt.Sprintf(a.GetCommonQuerySql(), "")
  820. log.Println(a.Phone, "IsOffline count SQL:", countSql)
  821. now := time.Now()
  822. dataCount := elastic.Count(PtIndex, PtType, countSql)
  823. log.Println(a.Phone, "--IsOffline 数据量:", dataCount, "---", MaxPCount)
  824. if dataCount > int64(MaxPCount) { //提示用户 数据量过大,修改生成报告条件
  825. a.Offline = ValueError
  826. return
  827. }
  828. a.OriginalTotal = dataCount
  829. keyCount := 0
  830. for i := 0; i < len(a.FormatParam.KeysItems); i++ {
  831. items := a.FormatParam.KeysItems[i]
  832. for j := 0; j < len(items.A_Key); j++ {
  833. AKey := items.A_Key[j]
  834. for k := 0; k < len(AKey.Keyword); k++ {
  835. keyCount++
  836. }
  837. for k := 0; k < len(AKey.Appended); k++ {
  838. keyCount++
  839. }
  840. for k := 0; k < len(AKey.Exclude); k++ {
  841. keyCount++
  842. }
  843. }
  844. }
  845. a.KeysTotal = int64(keyCount)
  846. //关键词数量 < 100 ,默认走线上
  847. if keyCount < MinKeyWordsCount {
  848. a.Offline = ValueRealTime
  849. return
  850. }
  851. if keyCount > MAKeyWordsCount {
  852. // 查询配置
  853. if mac := a.getMarUserAccount(); mac != nil {
  854. if keyCount >= mac.Threshold {
  855. a.Offline = ValueOffline
  856. return
  857. }
  858. } else {
  859. a.Offline = ValueOffline
  860. return
  861. }
  862. }
  863. log.Println("IsOffline 统计数据量耗时:", time.Since(now))
  864. if int(dataCount) > MAProjectNumLimit {
  865. a.Offline = ValueOffline
  866. return
  867. }
  868. a.Offline = ValueRealTime
  869. return
  870. }
  871. type MarUserAccount struct {
  872. Threshold int // 关键词离线标准量
  873. }
  874. // 获取离线市场报告分析关键词标准信息
  875. func (a *AnalysisEntity) getMarUserAccount() *MarUserAccount {
  876. rs := a.MySql.SelectBySql(fmt.Sprintf("SELECT threshold FROM %s where position_id=? and state = 0;", TablejianyuMarUserAccount), a.PositionId)
  877. if rs != nil && len(*rs) > 0 {
  878. return &MarUserAccount{Threshold: common.IntAll((*rs)[0]["threshold"])}
  879. }
  880. return nil
  881. }
  882. func (a *AnalysisEntity) GetReportState() (generated bool, needUpdate bool, err error) {
  883. // 查库
  884. err = a.GetAnalysisFromMgoDb()
  885. if err != nil {
  886. return
  887. }
  888. // 已经生成的可以直接返回
  889. if a.State == ReportStateGenerated {
  890. generated = true
  891. return
  892. }
  893. // 该字段没有值 说明是还没有被更新的历史数据 需要更新
  894. // 且判断之后后续需要更新i_state和i_offline字段
  895. if a.Offline == 0 {
  896. needUpdate = true
  897. }
  898. // 如果没有值的话 说明这是历史数据 需要接着走下面的流程进行判断
  899. // 格式化数据 用于后面校验个数
  900. if err = a.ForMatData(); err != nil {
  901. return
  902. }
  903. return
  904. }
  905. // Cancel 取消正在分析中的报告
  906. func (a *AnalysisEntity) Cancel() (bool, error) {
  907. // 取消报告
  908. queryMap := map[string]interface{}{
  909. "_id": a.MgoRecordId,
  910. "i_state": map[string]interface{}{"$in": []int{ReportStateGenerating, ReportStateFailed}}, //状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
  911. "i_del": map[string]interface{}{"$ne": 1},
  912. }
  913. if a.UId == a.Pid { //主账号
  914. queryMap["s_parentId"] = a.Pid
  915. } else {
  916. queryMap["s_userId"] = a.UId
  917. }
  918. //验证
  919. rs, b := a.Mgo.FindOne(ReportHistoryTable, queryMap)
  920. //
  921. if !b || rs == nil {
  922. return false, fmt.Errorf("未查询到该记录")
  923. }
  924. update := a.Mgo.UpdateById(ReportHistoryTable, a.MgoRecordId, map[string]interface{}{"$set": map[string]interface{}{
  925. "l_updateTime": time.Now().Unix(),
  926. "i_state": ReportStateCanceled,
  927. }})
  928. if !update {
  929. log.Println("分析报告取消失败:", a.MgoRecordId)
  930. return false, fmt.Errorf("取消失败")
  931. }
  932. // redis里面放取消标识
  933. redis.Put(ReportCacheDB, fmt.Sprintf(ReportCanceledKey, a.MgoRecordId), 1, ReportCanceledTime)
  934. return update, nil
  935. }
  936. // UpdateOffline 更新报告是否是离线报告
  937. func (a *AnalysisEntity) UpdateOffline(offline bool) bool {
  938. set := map[string]interface{}{
  939. "i_state": common.If(offline, ReportStateGenerating, ReportStateGenerated),
  940. "i_offline": common.If(offline, ValueOffline, ValueRealTime),
  941. "l_updateTime": time.Now().Unix(),
  942. "s_mgoUserId": a.MgoUserId, // 这里更新这些字段是因为这几个字段是p437 版本新加上的 历史数据没有这些字段
  943. "s_positionId": a.PositionId,
  944. "s_phone": a.Phone,
  945. }
  946. if a.OriginalTotal > 0 {
  947. set["l_originalTotal"] = a.OriginalTotal
  948. }
  949. data := map[string]interface{}{
  950. "$set": set,
  951. }
  952. update := a.Mgo.UpdateById(ReportHistoryTable, a.MgoRecordId, data)
  953. if !update {
  954. log.Println("UpdateOffline 更新报告状态失败:", data, a.MgoRecordId)
  955. }
  956. return update
  957. }
  958. // UpdateState 更新报告生成状态
  959. func (a *AnalysisEntity) UpdateState(state int) bool {
  960. setMap := map[string]interface{}{
  961. "i_state": state,
  962. "l_updateTime": time.Now().Unix(),
  963. }
  964. if state == ReportStateGenerated {
  965. setMap["l_finishTime"] = time.Now().Unix()
  966. }
  967. data := map[string]interface{}{
  968. "$set": setMap,
  969. }
  970. update := a.Mgo.UpdateById(ReportHistoryTable, a.MgoRecordId, data)
  971. if !update {
  972. log.Println("UpdateState 更新报告生成状态失败:", data, a.MgoRecordId)
  973. }
  974. return update
  975. }
  976. // Delete 删除正在分析中的报告
  977. func (a *AnalysisEntity) Delete() (bool, error) {
  978. // 删除报告
  979. queryMap := map[string]interface{}{
  980. "_id": a.MgoRecordId,
  981. "i_del": map[string]interface{}{"$ne": 1},
  982. }
  983. if a.UId == a.Pid { //主账号
  984. queryMap["s_parentId"] = a.Pid
  985. } else {
  986. queryMap["s_userId"] = a.UId
  987. }
  988. //验证
  989. rs, b := a.Mgo.FindOne(ReportHistoryTable, queryMap)
  990. //
  991. if !b || rs == nil {
  992. return false, fmt.Errorf("未查询到该记录")
  993. }
  994. update := a.Mgo.UpdateById(ReportHistoryTable, a.MgoRecordId, map[string]interface{}{"$set": map[string]interface{}{
  995. "l_updateTime": time.Now().Unix(),
  996. "i_del": 1,
  997. }})
  998. if !update {
  999. log.Println("分析报告删除失败:", a.MgoRecordId)
  1000. return false, fmt.Errorf("删除失败")
  1001. }
  1002. // redis里面放取消标识
  1003. redis.Put(ReportCacheDB, fmt.Sprintf(ReportCanceledKey, a.MgoRecordId), 1, ReportCanceledTime)
  1004. return update, nil
  1005. }
  1006. // GetCommonQuerySql 公共筛选
  1007. func (a *AnalysisEntity) GetCommonQuerySql() string {
  1008. var musts, bools []string
  1009. //时间
  1010. musts = append(musts, fmt.Sprintf(`{"range":{"jgtime":{"gte":%d,"lte":%d}}}`, a.FormatParam.STime, a.FormatParam.ETime))
  1011. //地区
  1012. if len(a.FormatParam.Area) > 0 || len(a.FormatParam.City) > 0 {
  1013. var areaCity []string
  1014. if len(a.FormatParam.Area) > 0 {
  1015. areaCity = append(areaCity, fmt.Sprintf(`{"terms":{"area":["%s"]}}`, strings.Join(a.FormatParam.Area, `","`)))
  1016. }
  1017. if len(a.FormatParam.City) > 0 {
  1018. areaCity = append(areaCity, fmt.Sprintf(`{"terms":{"city":["%s"]}}`, strings.Join(a.FormatParam.City, `","`)))
  1019. }
  1020. musts = append(musts, fmt.Sprintf(`{"bool":{"should":[%s],"minimum_should_match": 1}}`, strings.Join(areaCity, ",")))
  1021. }
  1022. //行业
  1023. if len(a.FormatParam.Industry) > 0 {
  1024. musts = append(musts, fmt.Sprintf(`{"terms":{"subscopeclass":["%s"]}}`, strings.Join(a.FormatParam.Industry, `","`)))
  1025. }
  1026. //类型
  1027. if len(a.FormatParam.BuyerClass) > 0 {
  1028. musts = append(musts, fmt.Sprintf(`{"terms":{"buyerclass":["%s"]}}`, strings.Join(a.FormatParam.BuyerClass, `","`)))
  1029. }
  1030. //分析报告中标状态限制
  1031. musts = append(musts, fmt.Sprintf(queryBoolMust, pSearchDecMust))
  1032. //订阅词
  1033. for _, v := range getAllKeywordArr(a.FormatParam.KeysItems) {
  1034. if sql := getKeyWordSql(v, a.BaseParam.MatchingMode); sql != "" {
  1035. bools = append(bools, sql)
  1036. }
  1037. }
  1038. //中标企业
  1039. if a.FormatParam.Winner != "" {
  1040. var winnerId string
  1041. if mongodb.IsObjectIdHex(a.FormatParam.Winner) {
  1042. winnerId = a.FormatParam.Winner
  1043. } else {
  1044. rData := elastic.Get("qyxy", "qyxy", fmt.Sprintf(`{"query": {"bool": {"must": [{"term": {"company_name": "%s"}}]}},"_source":["_id"],"size": 1}`, a.FormatParam.Winner))
  1045. if rData != nil && len(*rData) > 0 {
  1046. winnerId = common.InterfaceToStr((*rData)[0]["_id"])
  1047. }
  1048. }
  1049. if winnerId != "" {
  1050. musts = append(musts, fmt.Sprintf(`{"term":{"entidlist":"%s"}}`, winnerId))
  1051. } else {
  1052. musts = append(musts, fmt.Sprintf(`{"term":{"s_winner":"%s"}}`, a.FormatParam.Winner))
  1053. }
  1054. }
  1055. //采购单位
  1056. if a.FormatParam.Buyer != "" {
  1057. musts = append(musts, fmt.Sprintf(`{"term":{"buyer":"%s"}}`, a.FormatParam.Buyer))
  1058. }
  1059. return fmt.Sprintf(`{"query":{"bool":{"must":[%s],"should":[%s],"minimum_should_match": %d}}%s}`, strings.Join(musts, ","), strings.Join(bools, ","), common.If(len(bools) > 0, 1, 0).(int), "%s")
  1060. }
  1061. // GetCommonQuerySqlWithAggs 此方法用于聚合查询
  1062. func (a *AnalysisEntity) GetCommonQuerySqlWithAggs() string {
  1063. return fmt.Sprintf(a.GetCommonQuerySql(), `,"aggs":{%s},"size":0`)
  1064. }
  1065. // 市场概况+时间分布
  1066. func (a *AnalysisEntity) MarketTime() (map[string]interface{}, error) {
  1067. var (
  1068. sql []string
  1069. monthB, yearB bool
  1070. MonthRange, YearRange string
  1071. isOffline = a.Offline == 1
  1072. )
  1073. sql = append(sql, fmt.Sprintf(aggsMarketAnalysis, "thismarket", fmt.Sprintf(`{"key":"%s","from":%d,"to":%d}`, "market", a.FormatParam.STime, a.FormatParam.ETime)))
  1074. n_stime := a.FormatParam.STime
  1075. var n_mae AnalysisEntity
  1076. n_mae.FormatParam = a.FormatParam
  1077. n_mae.BaseParam = a.BaseParam
  1078. n_mae.MgoRecordId = a.MgoRecordId
  1079. n_mae.FormatParam.ETime = a.FormatParam.ETime
  1080. if time.Unix(a.FormatParam.STime, 0).AddDate(1, 0, 0).Unix() >= time.Unix(a.FormatParam.ETime, 0).Unix() {
  1081. n_stime = getPreviousMarket(time.Unix(a.FormatParam.STime, 0), time.Unix(a.FormatParam.ETime, 0))
  1082. sql = append(sql, fmt.Sprintf(aggsMarketAnalysis, "oldmarket", fmt.Sprintf(`{"key":"%s","from":%d,"to":%d}`, "market", n_stime, a.FormatParam.STime)))
  1083. }
  1084. if isOffline || time.Unix(a.FormatParam.STime, 0).AddDate(0, 1, 0).Unix() < time.Unix(a.FormatParam.ETime, 0).Unix() {
  1085. var mon_time, year_time int64
  1086. stime, etime := time.Unix(a.FormatParam.STime, 0), time.Unix(a.FormatParam.ETime, 0)
  1087. monthB, MonthRange = GetMonthData(isOffline, stime, etime)
  1088. sql = append(sql, fmt.Sprintf(projectTimeDistribution, "monthtime", MonthRange))
  1089. if monthB {
  1090. mon_time = stime.AddDate(0, -1, 0).Unix()
  1091. } else {
  1092. mon_time = a.FormatParam.STime
  1093. }
  1094. //年度数据
  1095. yearB, YearRange = GetYearData(isOffline, stime, etime)
  1096. sql = append(sql, fmt.Sprintf(projectTimeDistribution, "yeartime", YearRange))
  1097. if yearB {
  1098. year_time = stime.AddDate(-1, 0, 0).Unix()
  1099. } else {
  1100. year_time = a.FormatParam.STime
  1101. }
  1102. if n_stime > mon_time {
  1103. n_stime = mon_time
  1104. }
  1105. if n_stime > year_time {
  1106. n_stime = year_time
  1107. }
  1108. }
  1109. //非离线
  1110. if !isOffline {
  1111. n_mae.FormatParam.STime = n_stime
  1112. }
  1113. finalSql := fmt.Sprintf(n_mae.GetCommonQuerySqlWithAggs(), strings.Join(sql, ","))
  1114. log.Printf("final MarketScaleRefineQuery sql: %s", finalSql)
  1115. rMap := make(map[string]interface{})
  1116. rMapData := make(map[string]interface{})
  1117. thisRow := marketTime{}
  1118. res := GetAggs(PtIndex, PtType, finalSql)
  1119. if res == nil || len(res) == 0 {
  1120. return nil, fmt.Errorf("未查询到项目")
  1121. }
  1122. Rest(res, &thisRow)
  1123. if thisRow.Thismarket.Buckets != nil && len(thisRow.Thismarket.Buckets) != 0 {
  1124. Projectmarket := thisRow.Thismarket.Buckets[0]
  1125. if !isOffline && Projectmarket.ProjectCount.DocCount == 0 {
  1126. return nil, fmt.Errorf("未查询到项目数据")
  1127. }
  1128. // 实时分析数据量 如果关键词少于100,项目数据量可能大于MAProjectNumLimit(600000)
  1129. //if Projectmarket.ProjectCount.DocCount > MAProjectNumLimit {
  1130. // return nil, fmt.Errorf("项目数量超出上限")
  1131. //}
  1132. rMapData["project_count"] = Projectmarket.ProjectCount.DocCount
  1133. rMapData["projctamout"] = Projectmarket.ProjectAmount.Value
  1134. rMapData["projectavgmoney"] = Projectmarket.ProjectAvgMoney.Value
  1135. rMapData["buyercount"] = Projectmarket.BuyerCount.Value
  1136. rMapData["winnercount"] = Projectmarket.WinnerCount.Value
  1137. if thisRow.Oldmarket.Buckets != nil && len(thisRow.Oldmarket.Buckets) != 0 {
  1138. oldProjectmarket := thisRow.Oldmarket.Buckets[0]
  1139. //环比数据
  1140. rMapData["projctamount_ratio"] = sequential(Projectmarket.ProjectAmount.Value, oldProjectmarket.ProjectAmount.Value)
  1141. rMapData["project_count_ratio"] = sequential(float64(Projectmarket.ProjectCount.DocCount), float64(oldProjectmarket.ProjectCount.DocCount))
  1142. rMapData["projectavgmoney_ratio"] = sequential(Projectmarket.ProjectAvgMoney.Value, oldProjectmarket.ProjectAvgMoney.Value)
  1143. rMapData["buyercount_ratio"] = sequential(float64(Projectmarket.BuyerCount.Value), float64(oldProjectmarket.BuyerCount.Value))
  1144. rMapData["winnercount_ratio"] = sequential(float64(Projectmarket.WinnerCount.Value), float64(oldProjectmarket.WinnerCount.Value))
  1145. }
  1146. }
  1147. rMap["market_profile"] = rMapData
  1148. if thisRow.Monthtime.Buckets != nil && len(thisRow.Monthtime.Buckets) != 0 {
  1149. rMap["month_distribution"] = n_mae.TimeData(monthB, thisRow.Monthtime.Buckets)
  1150. }
  1151. if thisRow.Yeartime.Buckets != nil && len(thisRow.Yeartime.Buckets) != 0 {
  1152. rMap["year_distribution"] = n_mae.TimeData(yearB, thisRow.Yeartime.Buckets)
  1153. }
  1154. return rMap, nil
  1155. }
  1156. // 时间分布月,年通用数据处理
  1157. func (a *AnalysisEntity) TimeData(_b bool, thisRow []Buckets) map[string]interface{} {
  1158. var count_ss, amout_ss []map[string]interface{}
  1159. for k, v := range thisRow {
  1160. //环比多取一期数据
  1161. if _b && k == 0 {
  1162. continue
  1163. }
  1164. count := make(map[string]interface{})
  1165. amount := make(map[string]interface{})
  1166. count["minth"] = v.Key
  1167. count["value"] = v.ScaleTotal.DocCount
  1168. amount["minth"] = v.Key
  1169. amount["value"] = v.ScaleAmount.Value
  1170. //整月,年统计环比
  1171. if _b {
  1172. doccount := thisRow[k-1].ScaleTotal.DocCount
  1173. amountvalue := thisRow[k-1].ScaleAmount.Value
  1174. count["ratio"] = sequential(float64(v.ScaleTotal.DocCount), float64(doccount))
  1175. amount["ratio"] = sequential(v.ScaleAmount.Value, amountvalue)
  1176. }
  1177. count_ss = append(count_ss, count)
  1178. amout_ss = append(amout_ss, amount)
  1179. }
  1180. rMapData := make(map[string]interface{})
  1181. rMapData["project_count"] = count_ss
  1182. rMapData["project_amount"] = amout_ss
  1183. return rMapData
  1184. }
  1185. // top10
  1186. func (a *AnalysisEntity) ProjectTop10() (rMap map[string]interface{}, err error) {
  1187. finalSql := fmt.Sprintf(a.GetCommonQuerySql(), queryTop10)
  1188. log.Println("ProjectTop10:", finalSql)
  1189. hits := elastic.Get(PtIndex, PtType, finalSql)
  1190. if hits == nil || len(*hits) == 0 {
  1191. return
  1192. }
  1193. rMap = map[string]interface{}{}
  1194. bArr := []map[string]interface{}{}
  1195. source := *hits
  1196. for _, v := range source {
  1197. bA := map[string]interface{}{}
  1198. if v["ids"] != nil && len(InterToSliceString(v["ids"])) > 0 {
  1199. idss := InterToSliceString(v["ids"])
  1200. bA["_id"] = encodeId(idss[len(idss)-1])
  1201. } else {
  1202. bA["_id"] = nil
  1203. }
  1204. if v["area"] == "" {
  1205. bA["city"] = nil
  1206. } else {
  1207. bA["area"] = v["area"]
  1208. }
  1209. if v["city"] == "" {
  1210. bA["city"] = nil
  1211. } else {
  1212. bA["city"] = v["city"]
  1213. }
  1214. bA["jgtime"] = v["jgtime"]
  1215. bA["projectname"] = v["s_projectname"]
  1216. bA["sortprice"] = v["sortprice"]
  1217. ids := v["entidlist"]
  1218. var id_s []string
  1219. var winner_s []string
  1220. for _, i := range InterToSliceString(ids) {
  1221. iid := encodeId(i)
  1222. id_s = append(id_s, iid)
  1223. }
  1224. bA["eidlist"] = id_s
  1225. bA["winner_s"] = nil
  1226. if common.ObjToString(v["s_winner"]) != "" {
  1227. w_s := strings.Split(common.ObjToString(v["s_winner"]), ",")
  1228. for _, vv := range w_s {
  1229. winner_s = append(winner_s, vv)
  1230. }
  1231. bA["winner_s"] = winner_s
  1232. }
  1233. bArr = append(bArr, bA)
  1234. }
  1235. rMap["ProjectTop10"] = bArr
  1236. return
  1237. }
  1238. func (a *AnalysisEntity) AllData() (rMap map[string]interface{}, err error) {
  1239. rMap = map[string]interface{}{}
  1240. var aggs []string
  1241. area := a.FormatParam.Area
  1242. city := a.FormatParam.City
  1243. buyclass := a.FormatParam.BuyerClass
  1244. aggs = append(aggs, aggsAllCM, fmt.Sprintf(queryAggsSortprice, sortpriceStr), aggsArea)
  1245. if (len(area) + len(city)) != 1 {
  1246. aaat3 := fmt.Sprintf(aggsAreaAmounttop3, Top30)
  1247. aact3 := fmt.Sprintf(aggsAreaCounttop3, Top30)
  1248. if a.Offline == ValueOffline {
  1249. aaat3 = fmt.Sprintf(aggsAreaAmounttop3, Top30)
  1250. aact3 = fmt.Sprintf(aggsAreaCounttop3, Top30)
  1251. }
  1252. aggs = append(aggs, aaat3, aact3)
  1253. }
  1254. if len(buyclass) != 1 {
  1255. abat3 := fmt.Sprintf(aggsBuyerclassAmounttop3, Top30)
  1256. abct3 := fmt.Sprintf(aggsBuyerclassCounttop3, Top30)
  1257. if a.Offline == ValueOffline {
  1258. abat3 = fmt.Sprintf(aggsBuyerclassAmounttop3, Top30)
  1259. abct3 = fmt.Sprintf(aggsBuyerclassCounttop3, Top30)
  1260. }
  1261. //aggs = append(aggs, aggs_buyerclass, aggs_buyerclass_other, aggs_buyerclass_amounttop3, aggs_buyerclass_counttop3)
  1262. aggs = append(aggs, aggsBuyerclass, abat3, abct3)
  1263. }
  1264. finalSql := fmt.Sprintf(a.GetCommonQuerySqlWithAggs(), strings.Join(aggs, ","))
  1265. log.Println("allData sql:", finalSql)
  1266. res := GetAggs(PtIndex, PtType, finalSql)
  1267. if res == nil || len(res) == 0 {
  1268. return
  1269. }
  1270. thisRow := AreaCTop{}
  1271. for name, object := range res {
  1272. bArr, err := object.MarshalJSON()
  1273. if len(bArr) == 0 || err != nil {
  1274. continue
  1275. }
  1276. if name == "project_amount" {
  1277. if json.Unmarshal(bArr, &thisRow.Amount) != nil {
  1278. continue
  1279. }
  1280. } else if name == "project_count" {
  1281. if json.Unmarshal(bArr, &thisRow.ProjectCount) != nil {
  1282. continue
  1283. }
  1284. //} else if name == "buyerclass_scale_other" {
  1285. // if json.Unmarshal(bArr, &thisRow.BuyerclassScaleOther) != nil {
  1286. // continue
  1287. // }
  1288. } else if name == "project_count_not0" {
  1289. if json.Unmarshal(bArr, &thisRow.CountNot0) != nil {
  1290. continue
  1291. }
  1292. } else if name == "sortprice_ranges" {
  1293. if json.Unmarshal(bArr, &thisRow.SortpriceRanges) != nil {
  1294. continue
  1295. }
  1296. } else if name == "area_distribution" {
  1297. if json.Unmarshal(bArr, &thisRow.AreaDistribution) != nil {
  1298. continue
  1299. }
  1300. } else if name == "buyerclass_scale" {
  1301. if json.Unmarshal(bArr, &thisRow.BuyerclassScale) != nil {
  1302. continue
  1303. }
  1304. } else if name == "area_amount_top3" {
  1305. if json.Unmarshal(bArr, &thisRow.AreaAmountTop3) != nil {
  1306. continue
  1307. }
  1308. } else if name == "area_count_top3" {
  1309. if json.Unmarshal(bArr, &thisRow.AreaCountTop3) != nil {
  1310. continue
  1311. }
  1312. } else if name == "buyerclass_amount_top3" {
  1313. if json.Unmarshal(bArr, &thisRow.BuyclassAmountTop3) != nil {
  1314. continue
  1315. }
  1316. } else if name == "buyerclass_count_top3" {
  1317. if json.Unmarshal(bArr, &thisRow.BuyclassCountTop3) != nil {
  1318. continue
  1319. }
  1320. }
  1321. }
  1322. thisRow.Total = int64(thisRow.ProjectCount.DocCount)
  1323. data := ProjectScale(thisRow)
  1324. area_data := AreaDistribute(thisRow)
  1325. customerData, customerOther := CustomerDistribute(thisRow)
  1326. //customerOther, customerData := CustomerDistributeDetails(thisRow, customerDetails)
  1327. var ids []string
  1328. for _, v := range thisRow.AreaAmountTop3.Buckets {
  1329. for _, va := range v.WinnerTop.Buckets {
  1330. ids = append(ids, va.Winner)
  1331. }
  1332. }
  1333. for _, v := range thisRow.BuyclassAmountTop3.Buckets {
  1334. for _, va := range v.WinnerTop.Buckets {
  1335. ids = append(ids, va.Winner)
  1336. }
  1337. }
  1338. for _, v := range thisRow.AreaCountTop3.Buckets {
  1339. for _, va := range v.WinnerTop.Buckets {
  1340. ids = append(ids, va.Winner)
  1341. }
  1342. }
  1343. for _, v := range thisRow.BuyclassCountTop3.Buckets {
  1344. for _, va := range v.BidcountTop.Buckets {
  1345. ids = append(ids, va.Winner)
  1346. }
  1347. }
  1348. eid := IDToName(ids)
  1349. rMap["projectScale"] = data
  1350. rMap["area_infos"] = area_data
  1351. rMap["customer_scale"] = customerData
  1352. rMap["customer_scale_other"] = customerOther //客户分布(其它)
  1353. rMap["scaleAreaAmountTop"] = AmountCompute(thisRow, "area", eid, a.Offline == ValueOffline)
  1354. rMap["scaleBuyclassAmountTop"] = AmountCompute(thisRow, "buyclass", eid, a.Offline == ValueOffline)
  1355. rMap["scaleAreaCountTop"] = CountCompute(thisRow, "area", eid, a.Offline == ValueOffline)
  1356. rMap["scaleBuyclassCountTop"] = CountCompute(thisRow, "buyclass", eid, a.Offline == ValueOffline)
  1357. if a.Offline == ValueOffline {
  1358. rMap["count_not_0"] = thisRow.CountNot0.Count
  1359. }
  1360. return
  1361. }
  1362. // MarketScaleRefineQuery 细化聚合
  1363. func (a *AnalysisEntity) MarketScaleRefineQuery() (rMap map[string]interface{}, err error) {
  1364. //关键词分组聚合
  1365. var aggsGroup []string
  1366. //获取总金额及总数
  1367. aggsGroup = append(aggsGroup, `"projectAmount":{"sum":{"field":"sortprice"}}`)
  1368. aggsGroup = append(aggsGroup, `"projectTotal":{"filter":{"match_all":{}}}`)
  1369. itemDataMap := map[string]int64{}
  1370. for _, group := range a.FormatParam.KeysItems {
  1371. var bools []string
  1372. for _, v := range getGroupKeywordArr(group.A_Key) {
  1373. if sql := getKeyWordSql(v, a.BaseParam.MatchingMode); sql != "" {
  1374. bools = append(bools, sql)
  1375. }
  1376. }
  1377. if len(bools) > 0 {
  1378. //临时调整 进行数据验证 Top3
  1379. rwt := Top30
  1380. rwa := Top30
  1381. if a.Offline == ValueOffline { //离线
  1382. rwt = Top30
  1383. rwa = Top30
  1384. }
  1385. aggsGroup = append(aggsGroup, fmt.Sprintf(`"%s":{"filter":{"bool":{"should":[%s],"minimum_should_match": 1}},"aggs":{"project_count":{"filter":{"match_all":{}}},"project_amount":{"sum":{"field":"sortprice"}},"winner_total_top":{"terms":{"field":"entidlist","exclude":["-"],"order":[{"refine_winner_total":"desc"}],"size":%d},"aggs":{"refine_winner_total":{"filter":{"match_all":{}}}}},"winner_amount_top":{"terms":{"field":"entidlist","exclude":["-"],"order":[{"refine_winner_amount":"desc"}],"size":%d},"aggs":{"refine_winner_amount":{"sum":{"field":"sortprice"}}}}}}`, group.ItemName, strings.Join(bools, ","), rwt, rwa))
  1386. }
  1387. itemDataMap[group.ItemName] = group.UpdateTime
  1388. }
  1389. finalSql := fmt.Sprintf(a.GetCommonQuerySqlWithAggs(), strings.Join(aggsGroup, ","))
  1390. fmt.Println("finalSql-----4", finalSql)
  1391. rMap = map[string]interface{}{}
  1392. res := GetAggs(PtIndex, PtType, finalSql)
  1393. if res == nil || len(res) == 0 {
  1394. return
  1395. }
  1396. scale := scaleRefineData{IdSwitch: map[string]string{}, IsOffline: a.Offline == ValueOffline}
  1397. for name, object := range res {
  1398. bArr, err := object.MarshalJSON()
  1399. if len(bArr) == 0 || err != nil {
  1400. continue
  1401. }
  1402. if name == "projectTotal" {
  1403. st := simpleTotal{}
  1404. if err := json.Unmarshal(bArr, &st); err == nil {
  1405. scale.Total = st
  1406. }
  1407. continue
  1408. } else if name == "projectAmount" {
  1409. ss := simpleSum{}
  1410. if err := json.Unmarshal(bArr, &ss); err == nil {
  1411. scale.Amount = ss
  1412. }
  1413. continue
  1414. }
  1415. thisRow := scaleRefineRow{}
  1416. if err := json.Unmarshal(bArr, &thisRow); err != nil {
  1417. continue
  1418. }
  1419. thisRow.ItemName = name
  1420. thisRow.UpdateTime = itemDataMap[name]
  1421. scale.Data = append(scale.Data, &thisRow)
  1422. }
  1423. scale.formatData() //获取所需数据
  1424. scale.doIdSwitch() //补充企业名称
  1425. return map[string]interface{}{
  1426. "scaleRefineAll": scale.ReturnData.Overall,
  1427. "scaleRefineTotalTop": scale.ReturnData.TotalTop,
  1428. "scaleRefineAmountTop": scale.ReturnData.AmountTop,
  1429. }, nil
  1430. }
  1431. // 采购单位分析
  1432. func (a *AnalysisEntity) BuyerWinnerAnalysis() map[string]interface{} {
  1433. var datas []string
  1434. //采购单位-采购规模分布
  1435. //buyer_scalefmt := fmt.Sprintf(buyer_procurement_scale, sortprice_str)
  1436. //采购单位-项目数量 采购单位-项目金额
  1437. bc := fmt.Sprintf(buyerCount, Top50, Top30)
  1438. bs := fmt.Sprintf(buyerSortprice, Top50, Top30)
  1439. if a.Offline == ValueOffline {
  1440. bc = fmt.Sprintf(buyerCount, BWCount, Top30)
  1441. bs = fmt.Sprintf(buyerSortprice, BWCount, Top30)
  1442. }
  1443. bps := fmt.Sprintf(buyerProcurementScale, BWDCount)
  1444. datas = append(datas, bps, bc, bs)
  1445. //中标单位-规模分布
  1446. //winner_scalefmt := fmt.Sprintf(winner_procurement_scale, sortprice_str)
  1447. //中标单位-项目数量 中标单位-项目金额
  1448. wc := fmt.Sprintf(winnerCount, Top50, Top30) //正常需求30-3;目前是50-30,多获取数据 分析后进行数据损失补充,尽量减少 实时分析和离线分析的误差
  1449. ws := fmt.Sprintf(winnerSortprice, Top50, Top30)
  1450. if a.Offline == ValueOffline {
  1451. wc = fmt.Sprintf(winnerCount, BWCount, Top30)
  1452. ws = fmt.Sprintf(winnerSortprice, BWCount, Top30)
  1453. }
  1454. wps := fmt.Sprintf(winnerProcurementScale, BWDCount)
  1455. datas = append(datas, wps, wc, ws)
  1456. finalSql := fmt.Sprintf(a.GetCommonQuerySqlWithAggs(), strings.Join(datas, ","))
  1457. log.Printf("final PurchasingAnalysiseQuery sql: %s", finalSql)
  1458. t := time.Now()
  1459. res := GetAggs(PtIndex, PtType, finalSql)
  1460. if res == nil || len(res) == 0 {
  1461. return nil
  1462. }
  1463. //计算entidlist大于1的中标单位金额
  1464. entArrMap := make(map[string]float64)
  1465. winnerKeyMap := make(map[string]int)
  1466. newSql := strings.ReplaceAll(fmt.Sprintf(a.GetCommonQuerySql(), `,"sort": [{"sortprice": {"order": "desc"}}], "_source": ["entidlist","sortprice"],"size":10000`), `],"should":[`, fmt.Sprintf(`%s],"should":[`, `,{"script": {"script": {"source": "doc['entidlist'].length > 1"}}}`))
  1467. log.Println("winner new sql", newSql)
  1468. multiBid := elastic.Get(PtIndex, PtType, newSql)
  1469. if multiBid != nil && len(*multiBid) > 0 {
  1470. for _, m := range *multiBid {
  1471. entidlist, _ := m["entidlist"].([]interface{})
  1472. entArr := common.ObjArrToStringArr(entidlist)
  1473. sortprice := common.Float64All(m["sortprice"])
  1474. entLen := len(entArr)
  1475. switch entLen {
  1476. case 1:
  1477. entArrMap[entArr[0]] = sortprice
  1478. //winnerKeyMap[entArr[0]]++
  1479. case 0:
  1480. default:
  1481. assessedAmount := sortprice / common.Float64All(entLen)
  1482. for _, m2 := range entArr {
  1483. //winnerKeyMap[m2]++
  1484. entArrMap[m2] = assessedAmount
  1485. }
  1486. }
  1487. }
  1488. }
  1489. log.Println("采购单位-中标单位分析报告es查询耗时===", time.Since(t))
  1490. var thisBuyerWinnerRow BuyerWinnerRow
  1491. for name, object := range res {
  1492. bArr, err := object.MarshalJSON()
  1493. if len(bArr) == 0 || err != nil {
  1494. continue
  1495. }
  1496. if name == "buyer_amount_distribution" {
  1497. if json.Unmarshal(bArr, &thisBuyerWinnerRow.BuyerAmountDistribution) != nil {
  1498. continue
  1499. }
  1500. } else if name == "buyer_count_top3" {
  1501. if json.Unmarshal(bArr, &thisBuyerWinnerRow.BuyerCountTop3) != nil {
  1502. continue
  1503. }
  1504. } else if name == "buyer_amount_top3" {
  1505. if json.Unmarshal(bArr, &thisBuyerWinnerRow.BuyerAmountTop3) != nil {
  1506. continue
  1507. }
  1508. } else if name == "winner_amount_distribution" {
  1509. if json.Unmarshal(bArr, &thisBuyerWinnerRow.WinnerAmountDistribution) != nil {
  1510. continue
  1511. }
  1512. } else if name == "winner_count_top3" {
  1513. if json.Unmarshal(bArr, &thisBuyerWinnerRow.WinnerCountTop3) != nil {
  1514. continue
  1515. }
  1516. } else if name == "winner_amount_top3" {
  1517. if json.Unmarshal(bArr, &thisBuyerWinnerRow.WinnerAmountTop3) != nil {
  1518. continue
  1519. }
  1520. }
  1521. }
  1522. var winnerKeys []string
  1523. for _, v := range thisBuyerWinnerRow.BuyerCountTop3.Buckets {
  1524. for _, v1 := range v.SWinnerTop.Buckets {
  1525. winnerKeyMap[v1.Key]++
  1526. }
  1527. }
  1528. for _, v := range thisBuyerWinnerRow.BuyerAmountTop3.Buckets {
  1529. for _, v1 := range v.SWinnerTop.Buckets {
  1530. winnerKeyMap[v1.Key]++
  1531. }
  1532. }
  1533. for _, v := range thisBuyerWinnerRow.WinnerCountTop3.SWinnerCount {
  1534. winnerKeyMap[v.Key]++
  1535. }
  1536. for _, v := range thisBuyerWinnerRow.WinnerAmountTop3.SWinnerAmount {
  1537. winnerKeyMap[v.Key]++
  1538. }
  1539. for k := range winnerKeyMap {
  1540. winnerKeys = append(winnerKeys, k)
  1541. }
  1542. winnerName := GetEntNameByIds(winnerKeys)
  1543. //rMap := make(map[string]interface{})
  1544. var rMap = sync.Map{}
  1545. sy := sync.WaitGroup{}
  1546. sy.Add(2)
  1547. go BuyerAnalysis(thisBuyerWinnerRow, &rMap, winnerName, &sy, a.Offline == ValueOffline)
  1548. go WinningAnalysis(thisBuyerWinnerRow, &rMap, entArrMap, winnerName, &sy, a.Offline == ValueOffline)
  1549. sy.Wait()
  1550. log.Println("采购单位-中标单位分析报告程序计算耗时===", time.Since(t))
  1551. rMaps := make(map[string]interface{})
  1552. rMap.Range(func(key, value interface{}) bool {
  1553. rMaps[common.InterfaceToStr(key)] = value
  1554. return true
  1555. })
  1556. return rMaps
  1557. }
  1558. func (a *AnalysisEntity) GetPdfPageApi() (finalDate map[string]interface{}, err error) {
  1559. var (
  1560. wg = sync.WaitGroup{}
  1561. lock = &sync.Mutex{}
  1562. )
  1563. finalDate = make(map[string]interface{})
  1564. for i := 1; i <= 5; i++ {
  1565. wg.Add(1)
  1566. go func(flag int) {
  1567. defer wg.Done()
  1568. mgoData, _ := a.GetMongoData(flag)
  1569. lock.Lock()
  1570. for s, i2 := range mgoData {
  1571. finalDate[s] = i2
  1572. }
  1573. lock.Unlock()
  1574. }(i)
  1575. }
  1576. wg.Wait()
  1577. return
  1578. }