|
@@ -28,6 +28,20 @@ const (
|
|
|
ReportCacheDB = "other"
|
|
|
ReportCacheKey = "marketAnalysis_%s_%d"
|
|
|
ReportCacheTime = 60 * 5
|
|
|
+ ReportCanceledKey = "marketAnalysisCanceled_%s" // 取消的报告id
|
|
|
+ ReportCanceledTime = 60 * 60 * 24
|
|
|
+
|
|
|
+ ReportStateGenerating = 0 // 报告生成状态 生成中
|
|
|
+ ReportStateGenerated = 1 // 生成 成功
|
|
|
+ ReportStateCanceled = 2 // 已取消
|
|
|
+ ReportStateFailed = -1 // 生成失败
|
|
|
+ CollMarketScaleMain = "marketanalysisreport_scal" //市场规模 报告模块对应的mongo 集合名称
|
|
|
+ CollMarketTopProject = "marketanalysisreport_top" //项目规模TOP10
|
|
|
+ CollMarketProjectAllData = "marketanalysisreport_all" //项目规模 地区分布 客户分布 地区客户top3
|
|
|
+ CollMarketScaleRefine = "marketanalysisreport_refine" //细化市场
|
|
|
+ CollMarketBuyerAndWinner = "marketanalysisreport_bw" //市场-采购单位&&中标企业
|
|
|
+ ValueOffline = 1 // 离线
|
|
|
+ ValueRealTime = 2 // 实时
|
|
|
)
|
|
|
|
|
|
var MarketAnalysisPool chan bool
|
|
@@ -85,11 +99,17 @@ type AnalysisRequestFormat struct {
|
|
|
}
|
|
|
|
|
|
type MarketAnalysisEntity struct {
|
|
|
- MgoRecordId string
|
|
|
- BaseParam AnalysisRequestParam
|
|
|
- FormatParam AnalysisRequestFormat
|
|
|
- UId, Pid string
|
|
|
- ProjectInfo projectInfo
|
|
|
+ MgoRecordId string
|
|
|
+ BaseParam AnalysisRequestParam
|
|
|
+ FormatParam AnalysisRequestFormat
|
|
|
+ UId, Pid string
|
|
|
+ ProjectInfo projectInfo
|
|
|
+ Offline int // 1-离线 2-实时
|
|
|
+ State int // 状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
|
|
|
+ MgoUserId string
|
|
|
+ Phone string // 手机号
|
|
|
+ PositionId int
|
|
|
+ OriginalTotal int64 // 数据总数
|
|
|
}
|
|
|
|
|
|
type projectInfo struct {
|
|
@@ -233,7 +253,12 @@ func (mae *MarketAnalysisEntity) GetProjectInfoList() error {
|
|
|
|
|
|
// SaveAnalysisRecord 保存分析记录
|
|
|
func (mae *MarketAnalysisEntity) SaveAnalysisRecord() error {
|
|
|
- mae.MgoRecordId = db.Mgo.Save(ReportHistoryTable, map[string]interface{}{
|
|
|
+ if mae.Offline == ValueRealTime {
|
|
|
+ mae.State = ReportStateGenerated
|
|
|
+ } else {
|
|
|
+ mae.State = ReportStateGenerating
|
|
|
+ }
|
|
|
+ data := map[string]interface{}{
|
|
|
"s_keysItems": mae.BaseParam.KeysItemsStr,
|
|
|
"s_rangeTime": mae.BaseParam.RangeTime,
|
|
|
"s_rangeTimeExtra": mae.BaseParam.RangeTimeExtra,
|
|
@@ -243,8 +268,18 @@ func (mae *MarketAnalysisEntity) SaveAnalysisRecord() error {
|
|
|
"s_matchingMode": mae.BaseParam.MatchingMode,
|
|
|
"s_userId": mae.UId,
|
|
|
"s_parentId": mae.Pid,
|
|
|
+ "i_state": mae.State, //状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
|
|
|
+ "l_updateTime": time.Now().Unix(), //生成时间 or 取消时间
|
|
|
"l_createTime": time.Now().Unix(),
|
|
|
- })
|
|
|
+ "i_offline": mae.Offline,
|
|
|
+ "s_mgoUserId": mae.MgoUserId,
|
|
|
+ "i_positionId": mae.PositionId,
|
|
|
+ "s_phone": mae.Phone,
|
|
|
+ }
|
|
|
+ if mae.OriginalTotal > 0 {
|
|
|
+ data["l_originalTotal"] = mae.OriginalTotal
|
|
|
+ }
|
|
|
+ mae.MgoRecordId = db.Mgo.Save(ReportHistoryTable, data)
|
|
|
if mae.MgoRecordId == "" {
|
|
|
return fmt.Errorf("分析创建异常")
|
|
|
}
|
|
@@ -269,6 +304,8 @@ func (mae *MarketAnalysisEntity) GetAnalysisFromMgoDb() error {
|
|
|
if res == nil || len(*res) == 0 {
|
|
|
return fmt.Errorf("未查询到相关数据")
|
|
|
}
|
|
|
+ mae.Offline = qutil.IntAll((*res)["i_offline"])
|
|
|
+ mae.State = qutil.IntAll((*res)["i_state"])
|
|
|
mae.BaseParam.KeysItemsStr, _ = (*res)["s_keysItems"].(string)
|
|
|
mae.BaseParam.RangeTime, _ = (*res)["s_rangeTime"].(string)
|
|
|
mae.BaseParam.RangeTimeExtra, _ = (*res)["s_rangeTimeExtra"].(string)
|
|
@@ -317,8 +354,17 @@ func (mae *MarketAnalysisEntity) GetRecordList(pageNum, PageSize int) (total int
|
|
|
if res == nil || len(*res) == 0 {
|
|
|
return
|
|
|
}
|
|
|
+ //用户消息开关
|
|
|
+ open := GetMsgOpen(mae.MgoUserId)
|
|
|
for _, row := range *res {
|
|
|
- list = append(list, map[string]interface{}{
|
|
|
+ var status int
|
|
|
+ if row["i_state"] != nil {
|
|
|
+ status = qutil.IntAll(row["i_state"])
|
|
|
+ if status == ReportStateFailed {
|
|
|
+ status = ReportStateGenerating // 生成失败对外还是展示为生成中
|
|
|
+ }
|
|
|
+ }
|
|
|
+ data := map[string]interface{}{
|
|
|
"id": util.EncodeId(mongodb.BsonIdToSId(row["_id"])),
|
|
|
"keysItems": qutil.ObjToString(row["s_keysItems"]),
|
|
|
"area": qutil.ObjToString(row["s_area"]),
|
|
@@ -328,7 +374,11 @@ func (mae *MarketAnalysisEntity) GetRecordList(pageNum, PageSize int) (total int
|
|
|
"s_rangeTimeExtra": qutil.ObjToString(row["s_rangeTimeExtra"]),
|
|
|
"createTime": qutil.Int64All(row["l_createTime"]),
|
|
|
"matchingMode": qutil.ObjToString(row["s_matchingMode"]), //项目匹配方式
|
|
|
- })
|
|
|
+ "state": qutil.If(row["i_state"] == nil, nil, status),
|
|
|
+ "updateTime": qutil.Int64All(row["l_updateTime"]),
|
|
|
+ "msgOpen": open,
|
|
|
+ }
|
|
|
+ list = append(list, data)
|
|
|
}
|
|
|
return
|
|
|
}
|
|
@@ -376,30 +426,283 @@ func (mae *MarketAnalysisEntity) GetPartResult(flag int) (map[string]interface{}
|
|
|
return nil, fmt.Errorf("报告异常请求,请刷新重试")
|
|
|
}
|
|
|
}
|
|
|
- switch flag {
|
|
|
- case marketScaleMain:
|
|
|
- rData, err := mae.MarketTime()
|
|
|
- if err != nil { //若无报告内容,删除报告记录
|
|
|
- go mae.removeEmptyRecord()
|
|
|
- }
|
|
|
+ // 1. 查mongo
|
|
|
+ var rData map[string]interface{}
|
|
|
+ var err error
|
|
|
+ rData, err = mae.GetMongoData(flag)
|
|
|
+ // 查到数据则直接返回
|
|
|
+ if err == nil && rData != nil {
|
|
|
+ return rData, nil
|
|
|
+ }
|
|
|
+ // 2.没有查询到数据 判断是不是离线的
|
|
|
+ // 正常情况下正在离线生的不会走到这里
|
|
|
+ // 离线的应该生成完报告之后才会调用获取结果接口 这里处理是防止直接调接口传正在离线生的报告
|
|
|
+ if mae.Offline == ValueOffline || mae.Offline == 0 {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ // 3. 实时则查es
|
|
|
+ rData, err = mae.realTimeQuery(flag)
|
|
|
+ if err == nil && len(rData) > 0 {
|
|
|
+ // 4.存库
|
|
|
+ mae.SaveMongoReport(rData, flag)
|
|
|
return rData, err
|
|
|
- case marketTopProject:
|
|
|
- return mae.ProjectTop10()
|
|
|
- case marketProjectAllData:
|
|
|
- return mae.AllData()
|
|
|
- case marketScaleRefine:
|
|
|
- return mae.marketScaleRefineQuery()
|
|
|
- case marketBuyerAndWinner:
|
|
|
- return mae.BuyerWinnerAnalysis(), nil
|
|
|
}
|
|
|
- return nil, fmt.Errorf("未知请求")
|
|
|
+ return nil, err
|
|
|
}()
|
|
|
if err == nil && rData != nil && len(rData) > 0 {
|
|
|
+ delete(rData, "s_m_id")
|
|
|
+ delete(rData, "_id")
|
|
|
redis.Put(ReportCacheDB, thisCacheKey, rData, ReportCacheTime)
|
|
|
}
|
|
|
return rData, err
|
|
|
}
|
|
|
|
|
|
+// 实时查询
|
|
|
+func (mae *MarketAnalysisEntity) realTimeQuery(flag int) (map[string]interface{}, error) {
|
|
|
+ switch flag {
|
|
|
+ case marketScaleMain:
|
|
|
+ rData, err := mae.MarketTime()
|
|
|
+ if err != nil { //若无报告内容,删除报告记录
|
|
|
+ go mae.removeEmptyRecord()
|
|
|
+ }
|
|
|
+ return rData, err
|
|
|
+ case marketTopProject:
|
|
|
+ return mae.ProjectTop10()
|
|
|
+ case marketProjectAllData:
|
|
|
+ return mae.AllData()
|
|
|
+ case marketScaleRefine:
|
|
|
+ return mae.marketScaleRefineQuery()
|
|
|
+ case marketBuyerAndWinner:
|
|
|
+ return mae.BuyerWinnerAnalysis(), nil
|
|
|
+ }
|
|
|
+ return nil, fmt.Errorf("未知请求")
|
|
|
+}
|
|
|
+
|
|
|
+// GetMongoData 从mongo库查询数据
|
|
|
+func (mae *MarketAnalysisEntity) GetMongoData(flag int) (map[string]interface{}, error) {
|
|
|
+ collName, err := GetMongoColl(flag)
|
|
|
+ if err != nil || collName == "" {
|
|
|
+ return nil, fmt.Errorf("未知请求")
|
|
|
+ }
|
|
|
+ // 查询
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "s_m_id": mae.MgoRecordId,
|
|
|
+ }
|
|
|
+ data, b := db.Mgo.FindOne(collName, query)
|
|
|
+ if !b || data == nil || len(*data) == 0 {
|
|
|
+ log.Println("没有查询到数据", b, query, collName)
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ delete(*data, "s_m_id")
|
|
|
+ delete(*data, "_id")
|
|
|
+ return *data, nil
|
|
|
+}
|
|
|
+
|
|
|
+// GetMongoColl 获取mgo库对应的Coll
|
|
|
+func GetMongoColl(flag int) (string, error) {
|
|
|
+ switch flag {
|
|
|
+ case marketScaleMain:
|
|
|
+ return CollMarketScaleMain, nil
|
|
|
+ case marketTopProject:
|
|
|
+ return CollMarketTopProject, nil
|
|
|
+ case marketProjectAllData:
|
|
|
+ return CollMarketProjectAllData, nil
|
|
|
+ case marketScaleRefine:
|
|
|
+ return CollMarketScaleRefine, nil
|
|
|
+ case marketBuyerAndWinner:
|
|
|
+ return CollMarketBuyerAndWinner, nil
|
|
|
+ }
|
|
|
+ return "", fmt.Errorf("未知类型")
|
|
|
+}
|
|
|
+
|
|
|
+// SaveMongoReport 数据存mongo库
|
|
|
+func (mae *MarketAnalysisEntity) SaveMongoReport(rData map[string]interface{}, flag int) {
|
|
|
+ collName, _ := GetMongoColl(flag)
|
|
|
+ rData["s_m_id"] = mae.MgoRecordId
|
|
|
+ db.Mgo.Save(collName, rData)
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+// GetAnalyzingReport 是否有正在分析的离线报告
|
|
|
+func (mae *MarketAnalysisEntity) GetAnalyzingReport() string {
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "s_userId": mae.UId,
|
|
|
+ "i_state": map[string]interface{}{"$in": []int{ReportStateGenerating, ReportStateFailed}}, //状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
|
|
|
+ "i_del": map[string]interface{}{"$ne": 1},
|
|
|
+ }
|
|
|
+ //log.Println("query:", query)
|
|
|
+ rs, b := db.Mgo.FindOne(ReportHistoryTable, query)
|
|
|
+ //log.Println("rs,b:", rs, b)
|
|
|
+ if b && rs != nil {
|
|
|
+ return mongodb.BsonIdToSId((*rs)["_id"])
|
|
|
+ }
|
|
|
+ return ""
|
|
|
+}
|
|
|
+
|
|
|
+// IsOffline 判断是否符合在线分析的条件
|
|
|
+func (mae *MarketAnalysisEntity) IsOffline() (offline bool) {
|
|
|
+ //离线生成:订阅词(关键词+排除词)超过300个(数量支持配置),或单次分析数据超过60万条,则离线生成;
|
|
|
+ keyCount := 0
|
|
|
+ for i := 0; i < len(mae.FormatParam.KeysItems); i++ {
|
|
|
+ items := mae.FormatParam.KeysItems[i]
|
|
|
+ for j := 0; j < len(items.A_Key); j++ {
|
|
|
+ AKey := items.A_Key[j]
|
|
|
+ for k := 0; k < len(AKey.Keyword); k++ {
|
|
|
+ keyCount++
|
|
|
+ }
|
|
|
+ for k := 0; k < len(AKey.Appended); k++ {
|
|
|
+ keyCount++
|
|
|
+ }
|
|
|
+ for k := 0; k < len(AKey.Exclude); k++ {
|
|
|
+ keyCount++
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if keyCount > config.Config.MarketAnalysisPool.KeyWordsCount {
|
|
|
+ mae.Offline = ValueOffline
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ // 查询数据量
|
|
|
+ countSql := fmt.Sprintf(mae.GetCommonQuerySql(), "")
|
|
|
+ log.Println("IsOffline count SQL:", countSql)
|
|
|
+ a := time.Now()
|
|
|
+ dataCount := elastic.Count("projectset", "projectset", countSql)
|
|
|
+ b := time.Since(a)
|
|
|
+ log.Println("IsOffline 统计数据量耗时:", b)
|
|
|
+ log.Println("IsOffline 数据量:", dataCount)
|
|
|
+ mae.OriginalTotal = dataCount
|
|
|
+ if int(dataCount) > config.Config.MarketAnalysisPool.ProjectNumLimit {
|
|
|
+ mae.Offline = ValueOffline
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ mae.Offline = ValueRealTime
|
|
|
+ return false
|
|
|
+}
|
|
|
+func (mae *MarketAnalysisEntity) GetReportState() (generated bool, needUpdate bool, err error) {
|
|
|
+ // 查库
|
|
|
+ err = mae.GetAnalysisFromMgoDb()
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // 已经生成的可以直接返回
|
|
|
+ if mae.State == ReportStateGenerated {
|
|
|
+ generated = true
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // 该字段没有值 说明是还没有被更新的历史数据 需要更新
|
|
|
+ // 且判断之后后续需要更新i_state和i_offline字段
|
|
|
+ if mae.Offline == 0 {
|
|
|
+ needUpdate = true
|
|
|
+ }
|
|
|
+ // 如果没有值的话 说明这是历史数据 需要接着走下面的流程进行判断
|
|
|
+ // 格式化数据 用于后面校验个数
|
|
|
+ if err = mae.ForMatData(); err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ return
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+// Cancel 取消正在分析中的报告
|
|
|
+func (mae *MarketAnalysisEntity) Cancel() (bool, error) {
|
|
|
+ // 取消报告
|
|
|
+ queryMap := map[string]interface{}{
|
|
|
+ "_id": mae.MgoRecordId,
|
|
|
+ "i_state": map[string]interface{}{"$in": []int{ReportStateGenerating, ReportStateFailed}}, //状态:默认0:生成中;1:已生成;2:已取消;-1:生成失败
|
|
|
+ "i_del": map[string]interface{}{"$ne": 1},
|
|
|
+ }
|
|
|
+ if mae.UId == mae.Pid { //主账号
|
|
|
+ queryMap["s_parentId"] = mae.Pid
|
|
|
+ } else {
|
|
|
+ queryMap["s_userId"] = mae.UId
|
|
|
+ }
|
|
|
+ //验证
|
|
|
+ rs, b := db.Mgo.FindOne(ReportHistoryTable, queryMap)
|
|
|
+ //
|
|
|
+ if !b || rs == nil {
|
|
|
+ return false, fmt.Errorf("未查询到该记录")
|
|
|
+ }
|
|
|
+ update := db.Mgo.UpdateById(ReportHistoryTable, mae.MgoRecordId, map[string]interface{}{"$set": map[string]interface{}{
|
|
|
+ "l_updateTime": time.Now().Unix(),
|
|
|
+ "i_state": ReportStateCanceled,
|
|
|
+ }})
|
|
|
+ if !update {
|
|
|
+ log.Println("分析报告取消失败:", mae.MgoRecordId)
|
|
|
+ return false, fmt.Errorf("取消失败")
|
|
|
+ }
|
|
|
+ // redis里面放取消标识
|
|
|
+ redis.Put(ReportCacheDB, fmt.Sprintf(ReportCanceledKey, mae.MgoRecordId), 1, ReportCanceledTime)
|
|
|
+ return update, nil
|
|
|
+}
|
|
|
+
|
|
|
+// UpdateOffline 更新报告是否是离线报告
|
|
|
+func (mae *MarketAnalysisEntity) UpdateOffline(offline bool) bool {
|
|
|
+ set := map[string]interface{}{
|
|
|
+ "i_state": qutil.If(offline, ReportStateGenerating, ReportStateGenerated),
|
|
|
+ "i_offline": qutil.If(offline, ValueOffline, ValueRealTime),
|
|
|
+ "l_updateTime": time.Now().Unix(),
|
|
|
+ "s_mgoUserId": mae.MgoUserId, // 这里更新这些字段是因为这几个字段是p437 版本新加上的 历史数据没有这些字段
|
|
|
+ "s_positionId": mae.PositionId,
|
|
|
+ "s_phone": mae.Phone,
|
|
|
+ }
|
|
|
+ if mae.OriginalTotal > 0 {
|
|
|
+ set["l_originalTotal"] = mae.OriginalTotal
|
|
|
+ }
|
|
|
+ data := map[string]interface{}{
|
|
|
+ "$set": set,
|
|
|
+ }
|
|
|
+ update := db.Mgo.UpdateById(ReportHistoryTable, mae.MgoRecordId, data)
|
|
|
+ if !update {
|
|
|
+ log.Println("UpdateOffline 更新报告状态失败:", data, mae.MgoRecordId)
|
|
|
+ }
|
|
|
+ return update
|
|
|
+}
|
|
|
+
|
|
|
+// UpdateState 更新报告生成状态
|
|
|
+func (mae *MarketAnalysisEntity) UpdateState(state int) bool {
|
|
|
+ data := map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "i_state": state,
|
|
|
+ "l_updateTime": time.Now().Unix(),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ update := db.Mgo.UpdateById(ReportHistoryTable, mae.MgoRecordId, data)
|
|
|
+ if !update {
|
|
|
+ log.Println("UpdateState 更新报告生成状态失败:", data, mae.MgoRecordId)
|
|
|
+ }
|
|
|
+ return update
|
|
|
+}
|
|
|
+
|
|
|
+// Delete 删除正在分析中的报告
|
|
|
+func (mae *MarketAnalysisEntity) Delete() (bool, error) {
|
|
|
+ // 删除报告
|
|
|
+ queryMap := map[string]interface{}{
|
|
|
+ "_id": mae.MgoRecordId,
|
|
|
+ "i_del": map[string]interface{}{"$ne": 1},
|
|
|
+ }
|
|
|
+ if mae.UId == mae.Pid { //主账号
|
|
|
+ queryMap["s_parentId"] = mae.Pid
|
|
|
+ } else {
|
|
|
+ queryMap["s_userId"] = mae.UId
|
|
|
+ }
|
|
|
+ //验证
|
|
|
+ rs, b := db.Mgo.FindOne(ReportHistoryTable, queryMap)
|
|
|
+ //
|
|
|
+ if !b || rs == nil {
|
|
|
+ return false, fmt.Errorf("未查询到该记录")
|
|
|
+ }
|
|
|
+ update := db.Mgo.UpdateById(ReportHistoryTable, mae.MgoRecordId, map[string]interface{}{"$set": map[string]interface{}{
|
|
|
+ "l_updateTime": time.Now().Unix(),
|
|
|
+ "i_del": 1,
|
|
|
+ }})
|
|
|
+ if !update {
|
|
|
+ log.Println("分析报告删除失败:", mae.MgoRecordId)
|
|
|
+ return false, fmt.Errorf("删除失败")
|
|
|
+ }
|
|
|
+ // redis里面放取消标识
|
|
|
+ redis.Put(ReportCacheDB, fmt.Sprintf(ReportCanceledKey, mae.MgoRecordId), 1, ReportCanceledTime)
|
|
|
+ return update, nil
|
|
|
+}
|
|
|
func GetEntNameByIds(ids []string) (returnMap map[string]string) {
|
|
|
returnMap = map[string]string{}
|
|
|
if len(ids) == 0 {
|
|
@@ -418,3 +721,22 @@ func GetEntNameByIds(ids []string) (returnMap map[string]string) {
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
+
|
|
|
+// GetMsgOpen 获取用户服务通知开关是否开启
|
|
|
+func GetMsgOpen(mgoUserId string) bool {
|
|
|
+ pushSetMap, _ := db.Mgo.FindById("user", mgoUserId, `{"o_pushset":1,"s_m_openid":1}`)
|
|
|
+ //log.Println(mgoUserId, pushSetMap)
|
|
|
+ if pushSetMap != nil && len(*pushSetMap) > 0 {
|
|
|
+ pushset := qutil.ObjToMap((*pushSetMap)["o_pushset"])
|
|
|
+ if pushset == nil || len(*pushset) == 0 {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ msgServicePushSet := qutil.ObjToMap((*pushset)["o_msg_service"])
|
|
|
+ if msgServicePushSet != nil {
|
|
|
+ if qutil.IntAll((*msgServicePushSet)["i_apppush"]) == 1 || qutil.IntAll((*msgServicePushSet)["i_wxpush"]) == 1 {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|