Bladeren bron

Merge branch 'dev4.5.1.2' into release

luwenna 4 jaren geleden
bovenliggende
commit
03661e4df8

+ 2 - 0
README.md

@@ -3,3 +3,5 @@
 weixin sdk https://github.com/wizjin/weixin
 web用xweb框架
 
+v4.5.1.2
+数据判重

+ 2 - 1
src/config.json

@@ -295,5 +295,6 @@
     "phoneFilterFlag":true,
     "pcindexCacheTime":7200,
     "pcIndexHotCacheTime":7200,
-    "pcindexCacheKey":"jypcindex2"
+    "pcindexCacheKey":"jypcindex2",
+    "dedupUrl":  "http://127.0.0.1:8888/data/deduplication"
 }

+ 2 - 1
src/jfw/front/dataExport.go

@@ -811,7 +811,8 @@ func (d *DataExport) EntDataExport(_id string) error {
 	data := &[]map[string]interface{}{}
 	current := GetCurrentCount(entId)
 	log.Println("企业总条数", current)
-	count, newCount, data = public.GetEntDataExportCount(id, entId, entUserId, remain_nums, current, isFirst, util.ObjToString(config.Sysconfig["webdomain"]))
+	url := config.Sysconfig["dedupUrl"].(string)
+	count, newCount, data = public.GetEntDataExportCount(id, entId, entUserId, remain_nums, current, isFirst, util.ObjToString(config.Sysconfig["webdomain"]), url)
 	isExport := true
 	isEntExport := true
 	if newCount > current {

+ 90 - 19
src/jfw/modules/common/src/qfw/util/jy/entnichepush.go

@@ -3,9 +3,11 @@ package jy
 import (
 	"encoding/json"
 	"fmt"
+	"io/ioutil"
 	"log"
 	"math"
 	mg "mongodb"
+	"net/http"
 	. "qfw/util"
 	"qfw/util/elastic"
 	"qfw/util/mysql"
@@ -217,11 +219,11 @@ func (e *entnichePush) GetCache(code, key string) ([]*SubPushList, error) {
 	return p, nil
 }
 
-func (e *entnichePush) NewDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum int, selectTime, area, buyerclass string, isEnt bool) (hasNextPage bool, result []map[string]interface{}, allCount int) {
+func (e *entnichePush) NewDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum int, selectTime, area, buyerclass string, isEnt bool,url string) (hasNextPage bool, result []map[string]interface{}, allCount int) {
 	if pageNum < 1 {
 		pageNum = 1
 	}
-	result, allCount = e.newGetDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, pageNum, pageSizes, selectTime, area, buyerclass, true, isEnt)
+	result, allCount = e.newGetDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, pageNum, pageSizes, selectTime, area, buyerclass, true, isEnt,url)
 	if result == nil {
 		result = []map[string]interface{}{}
 	}
@@ -229,11 +231,11 @@ func (e *entnichePush) NewDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back
 	return
 }
 
-func (e *entnichePush) NewExportDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, selectTime, area, buyerclass string, isEnt bool, maxCount int) (result []map[string]interface{}, allCount, secondCount int) {
+func (e *entnichePush) NewExportDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, selectTime, area, buyerclass string, isEnt bool, maxCount int,url string) (result []map[string]interface{}, allCount, secondCount int) {
 	if selectTime == "" && area == "" {
-		result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount)
+		result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount,url)
 	} else {
-		result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount)
+		result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount,url)
 	}
 	if result == nil {
 		result = []map[string]interface{}{}
@@ -241,7 +243,7 @@ func (e *entnichePush) NewExportDatas(Mgo_bidding mg.MongodbSim, bidding, biddin
 	return
 }
 
-func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool) (result []map[string]interface{}, count int) {
+func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool,url string) (result []map[string]interface{}, count int) {
 	findSQL := ""
 	if isEnt {
 		findSQL = "select id,date,infoid,buyerclass,isvisit,matchkeys,type,1 as isvip from pushentniche where entid=" + fmt.Sprint(entId)
@@ -300,7 +302,7 @@ func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding,
 	list := PushMysql.SelectBySql(findSQL)
 	counts := len(*list)
 	if counts > 0 {
-		result, _ = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, false, entId)
+		result, _ = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, false, entId,url)
 	}
 	if counts == 0 || *list == nil || list == nil {
 		result = []map[string]interface{}{}
@@ -308,7 +310,7 @@ func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding,
 	return
 }
 
-func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool, maxCount int) (result []map[string]interface{}, counts, secondCount int) {
+func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool, maxCount int,url string) (result []map[string]interface{}, counts, secondCount int) {
 	findSQL := ""
 	countSQL := ""
 	if isEnt {
@@ -379,7 +381,7 @@ func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bid
 			log.Println("findSQL ", findSQL)
 			list = PushMysql.SelectBySql(findSQL)
 		}
-		result, secondCount = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, true, entId)
+		result, secondCount = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, true, entId,url)
 	}
 	if counts == 0 || len(*list) == 0 || list == nil {
 		result = []map[string]interface{}{}
@@ -388,27 +390,72 @@ func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bid
 }
 
 //根据id取内容
-func (e *entnichePush) GetExportInfoByIds(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, pushCas []map[string]interface{}, isInfoId bool, entId int) ([]map[string]interface{}, int) {
+func (e *entnichePush) GetExportInfoByIds(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, pushCas []map[string]interface{}, isInfoId bool, entId int,url string) ([]map[string]interface{}, int) {
 	array := []map[string]interface{}{}
-	var newsDatalen = make(chan bool, 1000000)
+	//var newsDatalen = make(chan bool, 1000000)
+	newCount :=0  // 新导出数量
+	infoIdList := []string{} // 临时infoid数组
+	//log.Println(pushCas,"啦啦啦")
 	if len(pushCas) == 0 {
 		return array, 0
 	}
 	m := map[string]bool{}
 	ids := []string{}
+	// 20210716  判重由redis 改为请求判重中台接口  每一千个请求一次
 	for _, v := range pushCas {
 		infoid := ObjToString(v["infoid"])
-		isExist, err := redis.Exists("other", "entexportdata_"+infoid+"_"+fmt.Sprintln(entId))
-		if err != nil {
-			log.Println("企业搜索数据导出redis判重失败")
-		} else if !isExist {
-			newsDatalen <- true
-		}
 		if m[infoid] {
 			continue
 		}
 		m[infoid] = true
 		ids = append(ids, infoid)
+		// 20210716  判重由redis 改为请求判重中台接口  每一千个请求一次
+		if len(infoIdList) > 1000 {
+			//	 调接口
+			rs, err5 := Post(url, map[string]string{
+				"personId": "0",  // 这个参数没有用
+				"infoId":   strings.Join(infoIdList, ","),
+				"entId":    fmt.Sprintf("%d", entId),
+				"isInsert": "false",  // 是否插入数据
+				"isEnt":    "true", // 是否根据企业id判重
+			})
+			if err5 != nil || IntAll(rs["code"])!=0{
+				log.Println("企业订阅数据导出接口判重失败", err5)
+			} else {
+				log.Println("企业订阅数据导出", rs)
+				// 置空
+				infoIdList = []string{}
+				// 本次数据累计
+				returnData := rs["data"].(map[string]interface{})
+				//log.Println(newCount,"加之前121")
+				newCount+=int(returnData["newCount"].(float64))
+				//log.Println(newCount,"加之后121")
+			}
+
+		}
+		infoIdList = append(infoIdList,infoid)
+	}
+	//
+	if len(infoIdList)>0{
+		log.Println(entId)
+		rs,err5 := Post(url, map[string]string{
+			"personId":"0",
+			"infoId":strings.Join(infoIdList,","),
+			"entId":fmt.Sprintf("%d", entId),
+			"isInsert":"false",
+			"isEnt":"true",
+
+		})
+		if err5 !=nil|| IntAll(rs["code"])!=0{
+			log.Println("判重失败===",err5)
+		}else {
+			// 置空
+			infoIdList = []string{}
+			// 本次数据累计
+			returnData := rs["data"].(map[string]interface{})
+			newCount+=int(returnData["newCount"].(float64))
+		}
+
 	}
 	infos := map[string]map[string]interface{}{}
 	//elasticsearch
@@ -492,6 +539,30 @@ func (e *entnichePush) GetExportInfoByIds(Mgo_bidding mg.MongodbSim, bidding, bi
 		info = InfoFormats(info, v)
 		array = append(array, info)
 	}
-	log.Println("newsDatalen", len(newsDatalen))
-	return array, len(newsDatalen)
+	log.Println("newsDatalen", newCount)
+	log.Println(len(array))
+	return array, newCount
+}
+func Post(url string, form map[string]string) (data map[string]interface{}, err error) {
+	str := ""
+	for k, v := range form {
+		str += "&" + k + "=" + v
+	}
+	//log.Println(str)
+	res, err1 := http.Post(url, "application/x-www-form-urlencoded", strings.NewReader(str))
+	log.Println(res)
+	if err1 != nil {
+		log.Println("post err:", err1.Error())
+		return nil, err1
+
+	} else if res.Body != nil {
+		defer res.Body.Close()
+		bs, _ := ioutil.ReadAll(res.Body)
+		err2 := json.Unmarshal(bs, &data)
+		if err2 != nil {
+			return nil, err2
+		}
+
+	}
+	return data, nil
 }

+ 4 - 2
src/jfw/public/dataexport.go

@@ -148,8 +148,10 @@ func (this *BidSearchExport) PassBidSearchExport() (returnData map[string]interf
 		starttime := fmt.Sprint(time.Date(now.Year(), now.Month(), now.Day()-30, 0, 0, 0, 0, time.Local).Unix())
 		publishtimeSave = fmt.Sprintf("%s_%d", starttime, now.Unix())
 	} else if this.Publishtime == "thisyear" { //去年
-		starttime := fmt.Sprint(time.Date(now.Year()-1, 1, 1, 0, 0, 0, 0, time.Local).Unix())
-		endtime := fmt.Sprint(time.Date(now.Year()-1, 12, 31, 23, 59, 59, 0, time.Local).Unix())
+		// starttime := fmt.Sprint(time.Date(now.Year()-1, 1, 1, 0, 0, 0, 0, time.Local).Unix())
+		// endtime := fmt.Sprint(time.Date(now.Year()-1, 12, 31, 23, 59, 59, 0, time.Local).Unix())
+		starttime := fmt.Sprint(time.Date(now.Year()-1, now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), 0, time.Local).Unix())
+		endtime := fmt.Sprint(now.Unix())
 		publishtimeSave = fmt.Sprintf("%s_%s", starttime, endtime)
 	}
 	returnData = map[string]interface{}{

+ 98 - 29
src/jfw/public/entdataexport.go

@@ -1,25 +1,27 @@
 package public
 
 import (
+	"encoding/json"
 	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
+	"io/ioutil"
 	"log"
+	"net/http"
 	"qfw/util"
 	"qfw/util/dataexport"
 	"qfw/util/elastic"
-	"qfw/util/redis"
 	"strings"
 	"sync"
 	"time"
 )
 
-func GetEntDataExportCount(_id string, entId, entUserId, limitNum, current int, isFirst bool, webdomain string) (count, newCount int, data *[]map[string]interface{}) {
+func GetEntDataExportCount(_id string, entId, entUserId, limitNum, current int, isFirst bool, webdomain string, url string) (count, newCount int, data *[]map[string]interface{}) {
 	defer util.Catch()
 	var (
 		searchsWaitGroup = &sync.WaitGroup{}
-		searchsPool      = make(chan bool, 20)
+		//searchsPool      = make(chan bool, 20)
 		// res              = &[]map[string]interface{}{}
-		newCountPool = make(chan bool, 20000)
+		//newCountPool = make(chan bool, 20000)
 	)
 	//count = GetDataExportSearchCountUseId(_id)
 	count = dataexport.GetDataExportSearchCountByScdId(MQFW, DbConf.Elasticsearch.Main.Address, _id)
@@ -35,33 +37,49 @@ func GetEntDataExportCount(_id string, entId, entUserId, limitNum, current int,
 		log.Println("企业数据导出错误 ", err)
 		return 0, 0, nil
 	}
-	// secondCount := 0
-
-	// isOK := true
-	// if secondCount > current {
-	// 	isOK = false
-	// }
+	//  20210716 由原来的redis判重改为调用判重中台接口进行判重
+	m := map[string]bool{}
+	infoIdList := []string{}
+	insertFlag := "false"
+	if !isFirst {
+		insertFlag = "true"
+	}
 	for _, v := range *res {
-		searchsWaitGroup.Add(1)
-		searchsPool <- true
 		id := util.ObjToString(v["_id"])
-		go func(id string) {
-			defer func() {
-				searchsWaitGroup.Done()
-				<-searchsPool
-			}()
-			isExist, err := redis.Exists("other", "entexportdata_"+id+"_"+fmt.Sprintln(entId))
-			if err != nil {
-				log.Println("企业搜索数据导出redis判重失败")
-			} else if isExist {
-				log.Println("数据重复,id ", id, "entid ", entId, "userid ", entUserId)
-				return
-			}
-			newCountPool <- true
-			if !isFirst {
-				redis.Put("other", "entexportdata_"+id+"_"+fmt.Sprintln(entId), 1, -1)
+		if m[id] {
+			continue
+		}
+		m[id] = true
+		//  20210716  redis判重调整为调用判重中台接口  每一千个调用一次
+		infoIdList = append(infoIdList, id)
+		if len(infoIdList) > 1000 {
+			//	 调接口
+			rs, err5 := Post(url, map[string]string{
+				"personId": "0", // 没有使用这个参数
+				"infoId":   strings.Join(infoIdList, ","),
+				"entId":    fmt.Sprintf("%d", entId),
+				"isInsert": insertFlag,
+				"isEnt":    "true",
+			})
+			log.Println("响应结果:",rs)
+			if err5 != nil|| util.IntAll(rs["code"])!=0 {
+				log.Println("企业订阅数据导出接口判重失败", err5)
+				log.Println("企业订阅数据导出接口判重失败rs:",rs)
+				log.Println("企业订阅数据导出接口判重失败rs[code]:",rs["code"])
+				log.Println("企业订阅数据导出接口判重失败code是否为0",util.IntAll(rs["code"])!=0)
+				log.Println("企业订阅数据导出接口判重失败", err5,"rs:",rs," rs[code]:",rs["code"]," ",util.IntAll(rs["code"]),"code是否为0",util.IntAll(rs["code"])!=0)
+			} else {
+				log.Println("企业订阅数据导出")
+				// 置空
+				infoIdList = []string{}
+				// 本次数据累计
+				returnData := rs["data"].(map[string]interface{})
+				log.Println(newCount, "加之前")
+				newCount += int(returnData["newCount"].(float64))
+				log.Println(newCount, "加之后")
 			}
-		}(id)
+
+		}
 		if !isFirst {
 			delete(v, "_id")
 			v["entid"] = entId
@@ -70,9 +88,36 @@ func GetEntDataExportCount(_id string, entId, entUserId, limitNum, current int,
 			v["createtime"] = time.Now().Unix()
 		}
 	}
+	if len(infoIdList) > 0 {
+		rs, err5 := Post(url, map[string]string{
+			"personId": "0", // 没有使用这个参数
+			"infoId":   strings.Join(infoIdList, ","),
+			"entId":    fmt.Sprintf("%d", entId),
+			"isInsert": insertFlag,
+			"isEnt":    "true",
+		})
+		log.Println(rs)
+		if err5 != nil|| util.IntAll(rs["code"])!=0{
+			log.Println("企业订阅数据导出接口判重失败", err5)
+			log.Println("企业订阅数据导出接口判重失败rs:",rs)
+			log.Println("企业订阅数据导出接口判重失败rs[code]:",rs["code"])
+			log.Println("企业订阅数据导出接口判重失败code是否为0",util.IntAll(rs["code"])!=0)
+			log.Println("企业订阅数据导出接口判重失败", err5,"rs:",rs," rs[code]:",rs["code"]," ",util.IntAll(rs["code"]),"code是否为0",util.IntAll(rs["code"])!=0)
+		} else {
+			log.Println("企业订阅数据导出")
+			// 置空
+			infoIdList = []string{}
+			// 本次数据累计
+			returnData := rs["data"].(map[string]interface{})
+			log.Println(newCount, "加之前")
+			newCount += int(returnData["newCount"].(float64))
+			log.Println(newCount, "加之后")
+
+		}
+	}
 	searchsWaitGroup.Wait()
 	log.Println("企业数据导出--数据遍历完成")
-	newCount = len(newCountPool)
+	//newCount = len(newCountPool)
 	log.Println("new", newCount)
 	data = res
 	return
@@ -278,3 +323,27 @@ func FormatExportDatas(data *[]map[string]interface{}, webdomain string, dataTyp
 //	}
 //	return &res, nil
 //}
+
+func Post(url string, form map[string]string) (data map[string]interface{}, err error) {
+	str := ""
+	for k, v := range form {
+		str += "&" + k + "=" + v
+	}
+	//log.Println(str)
+	res, err1 := http.Post(url, "application/x-www-form-urlencoded", strings.NewReader(str))
+	log.Println(res)
+	if err1 != nil {
+		log.Println("post err:", err1.Error())
+		return nil, err1
+
+	} else if res.Body != nil {
+		defer res.Body.Close()
+		bs, _ := ioutil.ReadAll(res.Body)
+		err2 := json.Unmarshal(bs, &data)
+		if err2 != nil {
+			return nil, err2
+		}
+
+	}
+	return data, nil
+}