|
@@ -3,9 +3,11 @@ package jy
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
+ "io/ioutil"
|
|
|
"log"
|
|
|
"math"
|
|
|
mg "mongodb"
|
|
|
+ "net/http"
|
|
|
. "qfw/util"
|
|
|
"qfw/util/elastic"
|
|
|
"qfw/util/mysql"
|
|
@@ -217,11 +219,11 @@ func (e *entnichePush) GetCache(code, key string) ([]*SubPushList, error) {
|
|
|
return p, nil
|
|
|
}
|
|
|
|
|
|
-func (e *entnichePush) NewDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum int, selectTime, area, buyerclass string, isEnt bool) (hasNextPage bool, result []map[string]interface{}, allCount int) {
|
|
|
+func (e *entnichePush) NewDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum int, selectTime, area, buyerclass string, isEnt bool,url string) (hasNextPage bool, result []map[string]interface{}, allCount int) {
|
|
|
if pageNum < 1 {
|
|
|
pageNum = 1
|
|
|
}
|
|
|
- result, allCount = e.newGetDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, pageNum, pageSizes, selectTime, area, buyerclass, true, isEnt)
|
|
|
+ result, allCount = e.newGetDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, pageNum, pageSizes, selectTime, area, buyerclass, true, isEnt,url)
|
|
|
if result == nil {
|
|
|
result = []map[string]interface{}{}
|
|
|
}
|
|
@@ -229,11 +231,11 @@ func (e *entnichePush) NewDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (e *entnichePush) NewExportDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, selectTime, area, buyerclass string, isEnt bool, maxCount int) (result []map[string]interface{}, allCount, secondCount int) {
|
|
|
+func (e *entnichePush) NewExportDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, selectTime, area, buyerclass string, isEnt bool, maxCount int,url string) (result []map[string]interface{}, allCount, secondCount int) {
|
|
|
if selectTime == "" && area == "" {
|
|
|
- result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount)
|
|
|
+ result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount,url)
|
|
|
} else {
|
|
|
- result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount)
|
|
|
+ result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount,url)
|
|
|
}
|
|
|
if result == nil {
|
|
|
result = []map[string]interface{}{}
|
|
@@ -241,7 +243,7 @@ func (e *entnichePush) NewExportDatas(Mgo_bidding mg.MongodbSim, bidding, biddin
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool) (result []map[string]interface{}, count int) {
|
|
|
+func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool,url string) (result []map[string]interface{}, count int) {
|
|
|
findSQL := ""
|
|
|
if isEnt {
|
|
|
findSQL = "select id,date,infoid,buyerclass,isvisit,matchkeys,type,1 as isvip from pushentniche where entid=" + fmt.Sprint(entId)
|
|
@@ -300,7 +302,7 @@ func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding,
|
|
|
list := PushMysql.SelectBySql(findSQL)
|
|
|
counts := len(*list)
|
|
|
if counts > 0 {
|
|
|
- result, _ = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, false, entId)
|
|
|
+ result, _ = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, false, entId,url)
|
|
|
}
|
|
|
if counts == 0 || *list == nil || list == nil {
|
|
|
result = []map[string]interface{}{}
|
|
@@ -308,7 +310,7 @@ func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding,
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool, maxCount int) (result []map[string]interface{}, counts, secondCount int) {
|
|
|
+func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool, maxCount int,url string) (result []map[string]interface{}, counts, secondCount int) {
|
|
|
findSQL := ""
|
|
|
countSQL := ""
|
|
|
if isEnt {
|
|
@@ -379,7 +381,7 @@ func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bid
|
|
|
log.Println("findSQL ", findSQL)
|
|
|
list = PushMysql.SelectBySql(findSQL)
|
|
|
}
|
|
|
- result, secondCount = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, true, entId)
|
|
|
+ result, secondCount = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, true, entId,url)
|
|
|
}
|
|
|
if counts == 0 || len(*list) == 0 || list == nil {
|
|
|
result = []map[string]interface{}{}
|
|
@@ -388,27 +390,72 @@ func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bid
|
|
|
}
|
|
|
|
|
|
//根据id取内容
|
|
|
-func (e *entnichePush) GetExportInfoByIds(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, pushCas []map[string]interface{}, isInfoId bool, entId int) ([]map[string]interface{}, int) {
|
|
|
+func (e *entnichePush) GetExportInfoByIds(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, pushCas []map[string]interface{}, isInfoId bool, entId int,url string) ([]map[string]interface{}, int) {
|
|
|
array := []map[string]interface{}{}
|
|
|
- var newsDatalen = make(chan bool, 1000000)
|
|
|
+ //var newsDatalen = make(chan bool, 1000000)
|
|
|
+ newCount :=0 // 新导出数量
|
|
|
+ infoIdList := []string{} // 临时infoid数组
|
|
|
+ //log.Println(pushCas,"啦啦啦")
|
|
|
if len(pushCas) == 0 {
|
|
|
return array, 0
|
|
|
}
|
|
|
m := map[string]bool{}
|
|
|
ids := []string{}
|
|
|
+ // 20210716 判重由redis 改为请求判重中台接口 每一千个请求一次
|
|
|
for _, v := range pushCas {
|
|
|
infoid := ObjToString(v["infoid"])
|
|
|
- isExist, err := redis.Exists("other", "entexportdata_"+infoid+"_"+fmt.Sprintln(entId))
|
|
|
- if err != nil {
|
|
|
- log.Println("企业搜索数据导出redis判重失败")
|
|
|
- } else if !isExist {
|
|
|
- newsDatalen <- true
|
|
|
- }
|
|
|
if m[infoid] {
|
|
|
continue
|
|
|
}
|
|
|
m[infoid] = true
|
|
|
ids = append(ids, infoid)
|
|
|
+ // 20210716 判重由redis 改为请求判重中台接口 每一千个请求一次
|
|
|
+ if len(infoIdList) > 1000 {
|
|
|
+ // 调接口
|
|
|
+ rs, err5 := Post(url, map[string]string{
|
|
|
+ "personId": "0", // 这个参数没有用
|
|
|
+ "infoId": strings.Join(infoIdList, ","),
|
|
|
+ "entId": fmt.Sprintf("%d", entId),
|
|
|
+ "isInsert": "false", // 是否插入数据
|
|
|
+ "isEnt": "true", // 是否根据企业id判重
|
|
|
+ })
|
|
|
+ if err5 != nil || IntAll(rs["code"])!=0{
|
|
|
+ log.Println("企业订阅数据导出接口判重失败", err5)
|
|
|
+ } else {
|
|
|
+ log.Println("企业订阅数据导出", rs)
|
|
|
+ // 置空
|
|
|
+ infoIdList = []string{}
|
|
|
+ // 本次数据累计
|
|
|
+ returnData := rs["data"].(map[string]interface{})
|
|
|
+ //log.Println(newCount,"加之前121")
|
|
|
+ newCount+=int(returnData["newCount"].(float64))
|
|
|
+ //log.Println(newCount,"加之后121")
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ infoIdList = append(infoIdList,infoid)
|
|
|
+ }
|
|
|
+ //
|
|
|
+ if len(infoIdList)>0{
|
|
|
+ log.Println(entId)
|
|
|
+ rs,err5 := Post(url, map[string]string{
|
|
|
+ "personId":"0",
|
|
|
+ "infoId":strings.Join(infoIdList,","),
|
|
|
+ "entId":fmt.Sprintf("%d", entId),
|
|
|
+ "isInsert":"false",
|
|
|
+ "isEnt":"true",
|
|
|
+
|
|
|
+ })
|
|
|
+ if err5 !=nil|| IntAll(rs["code"])!=0{
|
|
|
+ log.Println("判重失败===",err5)
|
|
|
+ }else {
|
|
|
+ // 置空
|
|
|
+ infoIdList = []string{}
|
|
|
+ // 本次数据累计
|
|
|
+ returnData := rs["data"].(map[string]interface{})
|
|
|
+ newCount+=int(returnData["newCount"].(float64))
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
infos := map[string]map[string]interface{}{}
|
|
|
//elasticsearch
|
|
@@ -492,6 +539,30 @@ func (e *entnichePush) GetExportInfoByIds(Mgo_bidding mg.MongodbSim, bidding, bi
|
|
|
info = InfoFormats(info, v)
|
|
|
array = append(array, info)
|
|
|
}
|
|
|
- log.Println("newsDatalen", len(newsDatalen))
|
|
|
- return array, len(newsDatalen)
|
|
|
+ log.Println("newsDatalen", newCount)
|
|
|
+ log.Println(len(array))
|
|
|
+ return array, newCount
|
|
|
+}
|
|
|
+func Post(url string, form map[string]string) (data map[string]interface{}, err error) {
|
|
|
+ str := ""
|
|
|
+ for k, v := range form {
|
|
|
+ str += "&" + k + "=" + v
|
|
|
+ }
|
|
|
+ //log.Println(str)
|
|
|
+ res, err1 := http.Post(url, "application/x-www-form-urlencoded", strings.NewReader(str))
|
|
|
+ log.Println(res)
|
|
|
+ if err1 != nil {
|
|
|
+ log.Println("post err:", err1.Error())
|
|
|
+ return nil, err1
|
|
|
+
|
|
|
+ } else if res.Body != nil {
|
|
|
+ defer res.Body.Close()
|
|
|
+ bs, _ := ioutil.ReadAll(res.Body)
|
|
|
+ err2 := json.Unmarshal(bs, &data)
|
|
|
+ if err2 != nil {
|
|
|
+ return nil, err2
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ return data, nil
|
|
|
}
|