瀏覽代碼

Merge branch 'release' into dev4.5.5

luwenna 4 年之前
父節點
當前提交
a54c43795d

+ 2 - 1
src/config.json

@@ -295,5 +295,6 @@
     "phoneFilterFlag":true,
     "pcindexCacheTime":7200,
     "pcIndexHotCacheTime":7200,
-    "pcindexCacheKey":"jypcindex2"
+    "pcindexCacheKey":"jypcindex2",
+    "dedupUrl":  "http://127.0.0.1:8888/data/deduplication"
 }

+ 2 - 1
src/jfw/front/dataExport.go

@@ -811,7 +811,8 @@ func (d *DataExport) EntDataExport(_id string) error {
 	data := &[]map[string]interface{}{}
 	current := GetCurrentCount(entId)
 	log.Println("企业总条数", current)
-	count, newCount, data = public.GetEntDataExportCount(id, entId, entUserId, remain_nums, current, isFirst, util.ObjToString(config.Sysconfig["webdomain"]))
+	url := config.Sysconfig["dedupUrl"].(string)
+	count, newCount, data = public.GetEntDataExportCount(id, entId, entUserId, remain_nums, current, isFirst, util.ObjToString(config.Sysconfig["webdomain"]), url)
 	isExport := true
 	isEntExport := true
 	if newCount > current {

+ 90 - 19
src/jfw/modules/common/src/qfw/util/jy/entnichepush.go

@@ -3,9 +3,11 @@ package jy
 import (
 	"encoding/json"
 	"fmt"
+	"io/ioutil"
 	"log"
 	"math"
 	mg "mongodb"
+	"net/http"
 	. "qfw/util"
 	"qfw/util/elastic"
 	"qfw/util/mysql"
@@ -217,11 +219,11 @@ func (e *entnichePush) GetCache(code, key string) ([]*SubPushList, error) {
 	return p, nil
 }
 
-func (e *entnichePush) NewDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum int, selectTime, area, buyerclass string, isEnt bool) (hasNextPage bool, result []map[string]interface{}, allCount int) {
+func (e *entnichePush) NewDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum int, selectTime, area, buyerclass string, isEnt bool,url string) (hasNextPage bool, result []map[string]interface{}, allCount int) {
 	if pageNum < 1 {
 		pageNum = 1
 	}
-	result, allCount = e.newGetDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, pageNum, pageSizes, selectTime, area, buyerclass, true, isEnt)
+	result, allCount = e.newGetDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, pageNum, pageSizes, selectTime, area, buyerclass, true, isEnt,url)
 	if result == nil {
 		result = []map[string]interface{}{}
 	}
@@ -229,11 +231,11 @@ func (e *entnichePush) NewDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back
 	return
 }
 
-func (e *entnichePush) NewExportDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, selectTime, area, buyerclass string, isEnt bool, maxCount int) (result []map[string]interface{}, allCount, secondCount int) {
+func (e *entnichePush) NewExportDatas(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, selectTime, area, buyerclass string, isEnt bool, maxCount int,url string) (result []map[string]interface{}, allCount, secondCount int) {
 	if selectTime == "" && area == "" {
-		result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount)
+		result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount,url)
 	} else {
-		result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount)
+		result, allCount, secondCount = e.newGetExportDatasFromMysql(Mgo_bidding, bidding, bidding_back, PushMysql, entId, userId, 1, pageSizes, selectTime, area, buyerclass, false, isEnt, maxCount,url)
 	}
 	if result == nil {
 		result = []map[string]interface{}{}
@@ -241,7 +243,7 @@ func (e *entnichePush) NewExportDatas(Mgo_bidding mg.MongodbSim, bidding, biddin
 	return
 }
 
-func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool) (result []map[string]interface{}, count int) {
+func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool,url string) (result []map[string]interface{}, count int) {
 	findSQL := ""
 	if isEnt {
 		findSQL = "select id,date,infoid,buyerclass,isvisit,matchkeys,type,1 as isvip from pushentniche where entid=" + fmt.Sprint(entId)
@@ -300,7 +302,7 @@ func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding,
 	list := PushMysql.SelectBySql(findSQL)
 	counts := len(*list)
 	if counts > 0 {
-		result, _ = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, false, entId)
+		result, _ = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, false, entId,url)
 	}
 	if counts == 0 || *list == nil || list == nil {
 		result = []map[string]interface{}{}
@@ -308,7 +310,7 @@ func (e *entnichePush) newGetDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding,
 	return
 }
 
-func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool, maxCount int) (result []map[string]interface{}, counts, secondCount int) {
+func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, PushMysql *mysql.Mysql, entId, userId int, pageNum, myPageSize int, selectTime, area, buyerclass string, isLimit bool, isEnt bool, maxCount int,url string) (result []map[string]interface{}, counts, secondCount int) {
 	findSQL := ""
 	countSQL := ""
 	if isEnt {
@@ -379,7 +381,7 @@ func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bid
 			log.Println("findSQL ", findSQL)
 			list = PushMysql.SelectBySql(findSQL)
 		}
-		result, secondCount = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, true, entId)
+		result, secondCount = e.GetExportInfoByIds(Mgo_bidding, bidding, bidding_back, *list, true, entId,url)
 	}
 	if counts == 0 || len(*list) == 0 || list == nil {
 		result = []map[string]interface{}{}
@@ -388,27 +390,72 @@ func (e *entnichePush) newGetExportDatasFromMysql(Mgo_bidding mg.MongodbSim, bid
 }
 
 //根据id取内容
-func (e *entnichePush) GetExportInfoByIds(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, pushCas []map[string]interface{}, isInfoId bool, entId int) ([]map[string]interface{}, int) {
+func (e *entnichePush) GetExportInfoByIds(Mgo_bidding mg.MongodbSim, bidding, bidding_back string, pushCas []map[string]interface{}, isInfoId bool, entId int,url string) ([]map[string]interface{}, int) {
 	array := []map[string]interface{}{}
-	var newsDatalen = make(chan bool, 1000000)
+	//var newsDatalen = make(chan bool, 1000000)
+	newCount :=0  // 新导出数量
+	infoIdList := []string{} // 临时infoid数组
+	//log.Println(pushCas,"啦啦啦")
 	if len(pushCas) == 0 {
 		return array, 0
 	}
 	m := map[string]bool{}
 	ids := []string{}
+	// 20210716  判重由redis 改为请求判重中台接口  每一千个请求一次
 	for _, v := range pushCas {
 		infoid := ObjToString(v["infoid"])
-		isExist, err := redis.Exists("other", "entexportdata_"+infoid+"_"+fmt.Sprintln(entId))
-		if err != nil {
-			log.Println("企业搜索数据导出redis判重失败")
-		} else if !isExist {
-			newsDatalen <- true
-		}
 		if m[infoid] {
 			continue
 		}
 		m[infoid] = true
 		ids = append(ids, infoid)
+		// 20210716  判重由redis 改为请求判重中台接口  每一千个请求一次
+		if len(infoIdList) > 1000 {
+			//	 调接口
+			rs, err5 := Post(url, map[string]string{
+				"personId": "0",  // 这个参数没有用
+				"infoId":   strings.Join(infoIdList, ","),
+				"entId":    fmt.Sprintf("%d", entId),
+				"isInsert": "false",  // 是否插入数据
+				"isEnt":    "true", // 是否根据企业id判重
+			})
+			if err5 != nil || IntAll(rs["code"])!=0{
+				log.Println("企业订阅数据导出接口判重失败", err5)
+			} else {
+				log.Println("企业订阅数据导出", rs)
+				// 置空
+				infoIdList = []string{}
+				// 本次数据累计
+				returnData := rs["data"].(map[string]interface{})
+				//log.Println(newCount,"加之前121")
+				newCount+=int(returnData["newCount"].(float64))
+				//log.Println(newCount,"加之后121")
+			}
+
+		}
+		infoIdList = append(infoIdList,infoid)
+	}
+	//
+	if len(infoIdList)>0{
+		log.Println(entId)
+		rs,err5 := Post(url, map[string]string{
+			"personId":"0",
+			"infoId":strings.Join(infoIdList,","),
+			"entId":fmt.Sprintf("%d", entId),
+			"isInsert":"false",
+			"isEnt":"true",
+
+		})
+		if err5 !=nil|| IntAll(rs["code"])!=0{
+			log.Println("判重失败===",err5)
+		}else {
+			// 置空
+			infoIdList = []string{}
+			// 本次数据累计
+			returnData := rs["data"].(map[string]interface{})
+			newCount+=int(returnData["newCount"].(float64))
+		}
+
 	}
 	infos := map[string]map[string]interface{}{}
 	//elasticsearch
@@ -492,6 +539,30 @@ func (e *entnichePush) GetExportInfoByIds(Mgo_bidding mg.MongodbSim, bidding, bi
 		info = InfoFormats(info, v)
 		array = append(array, info)
 	}
-	log.Println("newsDatalen", len(newsDatalen))
-	return array, len(newsDatalen)
+	log.Println("newsDatalen", newCount)
+	log.Println(len(array))
+	return array, newCount
+}
+func Post(url string, form map[string]string) (data map[string]interface{}, err error) {
+	str := ""
+	for k, v := range form {
+		str += "&" + k + "=" + v
+	}
+	//log.Println(str)
+	res, err1 := http.Post(url, "application/x-www-form-urlencoded", strings.NewReader(str))
+	log.Println(res)
+	if err1 != nil {
+		log.Println("post err:", err1.Error())
+		return nil, err1
+
+	} else if res.Body != nil {
+		defer res.Body.Close()
+		bs, _ := ioutil.ReadAll(res.Body)
+		err2 := json.Unmarshal(bs, &data)
+		if err2 != nil {
+			return nil, err2
+		}
+
+	}
+	return data, nil
 }

+ 2 - 2
src/jfw/modules/subscribepay/src/config.json

@@ -27,8 +27,8 @@
 		"userName": "admin",
 		"password": "123456"
 	},
-    "redisaddrs": "other=192.168.3.11:1712,session=192.168.3.11:1713,push=192.168.3.206:1712,pushcache_1=192.168.3.206:5000,pushcache_2_a=192.168.3.206:5001",
-    "elasticsearch": "http://192.168.3.206:9800",
+    "redisaddrs": "other=127.0.0.1:1712,session=127.0.0.1:1713,push=127.0.0.1:1712,pushcache_1=127.0.0.1:5000,pushcache_2_a=127.0.0.1:5001",
+    "elasticsearch": "http://127.0.0.1:1800",
     "elasticPoolSize": 30,
     "appid": "wx5b1c6e7cc4dac0e4",
     "appsecret": "b026103ffebd2291b3edb7a269612112",

+ 14 - 1
src/jfw/modules/subscribepay/src/config/config.go

@@ -95,6 +95,7 @@ type messageConfig struct {
 	WxTpl_Expired            *WxTplMsg //已到期
 	WxTpl_Unpaid             *WxTplMsg //未支付订单
 	WxTpl_PaySuccess         *WxTplMsg //支付成功
+	WxTpl_DataReport         *WxTplMsg //数据报告
 }
 
 type WxTplMsg struct {
@@ -238,6 +239,13 @@ var WxPayConf map[string]interface{}
 
 var Wxoauth, Wxoauthinfo string
 var AutoMergeTimeStamp int64
+var (
+	WxTpl_OnTrial_SoonExpire_SceneCode string
+	WxTpl_OnTrial_Expired_SceneCode    string
+	WxTpl_SoonExpire_SceneCode         string
+	WxTpl_Expired_SceneCode            string
+	WxTpl_DataReport_SceneCode         string
+)
 
 func init() {
 	//程序配置文件
@@ -275,5 +283,10 @@ func init() {
 		panic(err)
 	}
 	AutoMergeTimeStamp = time.Unix()
-
+	//
+	WxTpl_OnTrial_SoonExpire_SceneCode = MessageConfig.WxTpl_OnTrial_SoonExpire.First.Value
+	WxTpl_OnTrial_Expired_SceneCode = MessageConfig.WxTpl_OnTrial_Expired.First.Value
+	WxTpl_SoonExpire_SceneCode = MessageConfig.WxTpl_SoonExpire.First.Value
+	WxTpl_Expired_SceneCode = MessageConfig.WxTpl_Expired.First.Value
+	WxTpl_DataReport_SceneCode = MessageConfig.WxTpl_DataReport.First.Value
 }

+ 5 - 1
src/jfw/modules/subscribepay/src/entity/dataReportStruct.go

@@ -170,9 +170,13 @@ func sendtemp(openid, order_coded, product_type, pay_time string) {
 	if openid == "" || order_coded == "" || product_type == "" || pay_time == "" {
 		fmt.Println("数据报告模板参数异常")
 	} else {
+		wxTplMsgErr := (&util.WxTplMsgCustom{}).DataReport()
+		if wxTplMsgErr != nil {
+			log.Println(wxTplMsgErr)
+		}
 		ok, status := frpc.WxPush(config.Config.Weixinrpc, "WeiXinRpc.SendDataReportMsg", &frpc.NotifyMsg{
 			Openid:  openid,
-			Title:   "尊敬的客户,您的订单已支付成功",
+			Title:   config.MessageConfig.WxTpl_DataReport.First.Value,
 			Detail:  product_type, //产品类型
 			Service: order_coded,  //订单编号
 			Date:    pay_time,     //支付时间

+ 11 - 3
src/jfw/modules/subscribepay/src/main.go

@@ -11,13 +11,22 @@ import (
 	"rpcfollow"
 	_ "service"
 	"timetask"
-	_ "util"
+	"util"
 
 	"github.com/go-xweb/xweb"
 )
 
+func init() {
+	wxTplMsgExpiredErr := (&util.WxTplMsgCustom{}).Expired()
+	if wxTplMsgExpiredErr != nil {
+		log.Fatalln(wxTplMsgExpiredErr)
+	}
+	wxTplMsgDataReportErr := (&util.WxTplMsgCustom{}).DataReport()
+	if wxTplMsgDataReportErr != nil {
+		log.Fatalln(wxTplMsgDataReportErr)
+	}
+}
 func main() {
-
 	go func() {
 		frpc := new(rpcfollow.JyPayRpc)
 		rpc.Register(frpc)
@@ -31,7 +40,6 @@ func main() {
 			log.Println("ListenAndServe: ", err)
 		}
 	}()
-
 	go timetask.Run()
 	mux1 := http.NewServeMux()
 	xweb.RunBase(":"+Config.Webport, mux1)

+ 10 - 4
src/jfw/modules/subscribepay/src/message.json

@@ -2,7 +2,7 @@
 	"WxTpl_OnTrial_SoonExpire": {
 		"id": "_2qGuk_KkOQtiO8oV_7ZOCWfjX2FQjs6pUDYBHkpygI",
 		"first":{
-			"value":"您试用的超级订阅服务即将到期,如需获取更多精准招标信息,请及时购买。",
+			"value":"001",
 			"color":"#FE737A"
 		},
 		"remark": {
@@ -12,7 +12,7 @@
 	"WxTpl_OnTrial_Expired": {
 		"id": "_2qGuk_KkOQtiO8oV_7ZOCWfjX2FQjs6pUDYBHkpygI",
 		"first":{
-			"value":"您试用的超级订阅服务已到期,如需获取更多精准招标信息,请及时购买。",
+			"value":"002",
 			"color":"#FE737A"
 		},
 		"remark": {
@@ -22,7 +22,7 @@
 	"WxTpl_SoonExpire": {
 		"id": "3_VPNbD7fmfd8BsdjLW-a7FOP4wIhEGV7Jx-11-9c7g",
 		"first":{
-			"value":"超级订阅服务即将到期,请及时续费",
+			"value":"001",
 			"color":"#FE737A"
 		},
 		"keyword2": {
@@ -38,7 +38,7 @@
 	"WxTpl_Expired": {
 		"id": "3_VPNbD7fmfd8BsdjLW-a7FOP4wIhEGV7Jx-11-9c7g",
 		"first":{
-			"value":"超级订阅服务已到期,点击续费",
+			"value":"002",
 			"color":"#FE737A"
 		},
 		"keyword2": {
@@ -95,5 +95,11 @@
 		"keyword1": {
 			"value":"超级订阅%s"
 		}
+	},
+	"WxTpl_DataReport": {
+		"id": "1p3eqXz28HNlIJxhLoWl8zP7y0l8ZGd8YTsLvTq2J28",
+		"first":{
+			"value":"001"
+		}
 	}
 }

+ 3 - 0
src/jfw/modules/subscribepay/src/timetask/timetask.go

@@ -269,6 +269,9 @@ func expireRemind() {
 	crontab(false, TimeTaskConfig.ExpireRemind, func() {
 		defer qutil.Catch()
 		log.Println("定时任务,到期提醒,开始推送消息")
+		if wxTplMsgErr := (&util.WxTplMsgCustom{}).Expired(); wxTplMsgErr != nil {
+			log.Println(wxTplMsgErr)
+		}
 		sess := util.MQFW.GetMgoConn()
 		for {
 			if sess != nil {

+ 86 - 0
src/jfw/modules/subscribepay/src/util/wxtplmsgcustom.go

@@ -0,0 +1,86 @@
+package util
+
+import (
+	. "config"
+	"errors"
+	"log"
+	. "qfw/util/rpc"
+	"strings"
+)
+
+type WxTplMsgCustom struct {
+}
+
+/* 获取自定义模板消息
+ * @Param tplId 模板消息id
+ * @Param sceneCode 场景代码
+ */
+func (w *WxTplMsgCustom) Get(tplId string, sceneCode ...string) map[string]*TmplItem {
+	param := []interface{}{tplId}
+	wh := []string{}
+	for _, v := range sceneCode {
+		param = append(param, v)
+		wh = append(wh, "?")
+	}
+	list := Mysql.SelectBySql(`select firstdata,sceneCode from scene a 
+	inner join template_message b on (a.templateId=b.id and b.templateId=? and a.sceneCode in (`+strings.Join(wh, ",")+`) and a.state=1 and a.isTest=1)`, param...)
+	m := map[string]*TmplItem{}
+	if list != nil {
+		for _, v := range *list {
+			sceneCode, _ := v["sceneCode"].(string)
+			sceneCode = strings.TrimSpace(sceneCode)
+			//
+			firstdata, _ := v["firstdata"].(string)
+			firstdata = strings.TrimSpace(firstdata)
+			//
+			m[sceneCode] = &TmplItem{
+				Value: firstdata,
+			}
+		}
+	}
+	return m
+}
+
+//超级订阅试用即将到期、超级订阅试用已到期、超级订阅即将到期、超级订阅已到期
+func (w *WxTplMsgCustom) Expired() error {
+	//试用-微信模板消息自定义
+	onTrial_WxTplMsg := w.Get(MessageConfig.WxTpl_OnTrial_SoonExpire.Id, WxTpl_OnTrial_SoonExpire_SceneCode, WxTpl_OnTrial_Expired_SceneCode)
+	//试用-即将到期
+	if onTrial_WxTplMsg[WxTpl_OnTrial_SoonExpire_SceneCode] == nil {
+		return errors.New("试用-即将到期,微信模板消息异常")
+	}
+	MessageConfig.WxTpl_OnTrial_SoonExpire.First.Value = onTrial_WxTplMsg[WxTpl_OnTrial_SoonExpire_SceneCode].Value
+	log.Println("试用-即将到期,微信模板消息first_data", MessageConfig.WxTpl_OnTrial_SoonExpire.First.Value)
+	//试用-已到期
+	if onTrial_WxTplMsg[WxTpl_OnTrial_Expired_SceneCode] == nil {
+		return errors.New("试用-已到期,微信模板消息异常")
+	}
+	MessageConfig.WxTpl_OnTrial_Expired.First.Value = onTrial_WxTplMsg[WxTpl_OnTrial_Expired_SceneCode].Value
+	log.Println("试用-已到期,微信模板消息first_data", MessageConfig.WxTpl_OnTrial_Expired.First.Value)
+	//购买-微信模板消息自定义
+	wxTplMsg := w.Get(MessageConfig.WxTpl_Expired.Id, WxTpl_SoonExpire_SceneCode, WxTpl_Expired_SceneCode)
+	//即将到期
+	if wxTplMsg[WxTpl_SoonExpire_SceneCode] == nil {
+		return errors.New("即将到期,微信模板消息异常")
+	}
+	MessageConfig.WxTpl_SoonExpire.First.Value = wxTplMsg[WxTpl_SoonExpire_SceneCode].Value
+	log.Println("即将到期,微信模板消息first_data", MessageConfig.WxTpl_SoonExpire.First.Value)
+	//已到期
+	if wxTplMsg[WxTpl_Expired_SceneCode] == nil {
+		return errors.New("已到期,微信模板消息异常")
+	}
+	MessageConfig.WxTpl_Expired.First.Value = wxTplMsg[WxTpl_Expired_SceneCode].Value
+	log.Println("已到期,微信模板消息first_data", MessageConfig.WxTpl_Expired.First.Value)
+	return nil
+}
+
+//数据报告
+func (w *WxTplMsgCustom) DataReport() error {
+	wxTplMsg := w.Get(MessageConfig.WxTpl_DataReport.Id, WxTpl_DataReport_SceneCode)
+	if wxTplMsg[WxTpl_DataReport_SceneCode] == nil {
+		return errors.New("数据报告,微信模板消息异常")
+	}
+	MessageConfig.WxTpl_DataReport.First.Value = wxTplMsg[WxTpl_DataReport_SceneCode].Value
+	log.Println("数据报告,微信模板消息first_data", MessageConfig.WxTpl_DataReport.First.Value)
+	return nil
+}

+ 4 - 2
src/jfw/public/dataexport.go

@@ -148,8 +148,10 @@ func (this *BidSearchExport) PassBidSearchExport() (returnData map[string]interf
 		starttime := fmt.Sprint(time.Date(now.Year(), now.Month(), now.Day()-30, 0, 0, 0, 0, time.Local).Unix())
 		publishtimeSave = fmt.Sprintf("%s_%d", starttime, now.Unix())
 	} else if this.Publishtime == "thisyear" { //去年
-		starttime := fmt.Sprint(time.Date(now.Year()-1, 1, 1, 0, 0, 0, 0, time.Local).Unix())
-		endtime := fmt.Sprint(time.Date(now.Year()-1, 12, 31, 23, 59, 59, 0, time.Local).Unix())
+		// starttime := fmt.Sprint(time.Date(now.Year()-1, 1, 1, 0, 0, 0, 0, time.Local).Unix())
+		// endtime := fmt.Sprint(time.Date(now.Year()-1, 12, 31, 23, 59, 59, 0, time.Local).Unix())
+		starttime := fmt.Sprint(time.Date(now.Year()-1, now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), 0, time.Local).Unix())
+		endtime := fmt.Sprint(now.Unix())
 		publishtimeSave = fmt.Sprintf("%s_%s", starttime, endtime)
 	}
 	returnData = map[string]interface{}{

+ 98 - 29
src/jfw/public/entdataexport.go

@@ -1,25 +1,27 @@
 package public
 
 import (
+	"encoding/json"
 	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
+	"io/ioutil"
 	"log"
+	"net/http"
 	"qfw/util"
 	"qfw/util/dataexport"
 	"qfw/util/elastic"
-	"qfw/util/redis"
 	"strings"
 	"sync"
 	"time"
 )
 
-func GetEntDataExportCount(_id string, entId, entUserId, limitNum, current int, isFirst bool, webdomain string) (count, newCount int, data *[]map[string]interface{}) {
+func GetEntDataExportCount(_id string, entId, entUserId, limitNum, current int, isFirst bool, webdomain string, url string) (count, newCount int, data *[]map[string]interface{}) {
 	defer util.Catch()
 	var (
 		searchsWaitGroup = &sync.WaitGroup{}
-		searchsPool      = make(chan bool, 20)
+		//searchsPool      = make(chan bool, 20)
 		// res              = &[]map[string]interface{}{}
-		newCountPool = make(chan bool, 20000)
+		//newCountPool = make(chan bool, 20000)
 	)
 	//count = GetDataExportSearchCountUseId(_id)
 	count = dataexport.GetDataExportSearchCountByScdId(MQFW, DbConf.Elasticsearch.Main.Address, _id)
@@ -35,33 +37,49 @@ func GetEntDataExportCount(_id string, entId, entUserId, limitNum, current int,
 		log.Println("企业数据导出错误 ", err)
 		return 0, 0, nil
 	}
-	// secondCount := 0
-
-	// isOK := true
-	// if secondCount > current {
-	// 	isOK = false
-	// }
+	//  20210716 由原来的redis判重改为调用判重中台接口进行判重
+	m := map[string]bool{}
+	infoIdList := []string{}
+	insertFlag := "false"
+	if !isFirst {
+		insertFlag = "true"
+	}
 	for _, v := range *res {
-		searchsWaitGroup.Add(1)
-		searchsPool <- true
 		id := util.ObjToString(v["_id"])
-		go func(id string) {
-			defer func() {
-				searchsWaitGroup.Done()
-				<-searchsPool
-			}()
-			isExist, err := redis.Exists("other", "entexportdata_"+id+"_"+fmt.Sprintln(entId))
-			if err != nil {
-				log.Println("企业搜索数据导出redis判重失败")
-			} else if isExist {
-				log.Println("数据重复,id ", id, "entid ", entId, "userid ", entUserId)
-				return
-			}
-			newCountPool <- true
-			if !isFirst {
-				redis.Put("other", "entexportdata_"+id+"_"+fmt.Sprintln(entId), 1, -1)
+		if m[id] {
+			continue
+		}
+		m[id] = true
+		//  20210716  redis判重调整为调用判重中台接口  每一千个调用一次
+		infoIdList = append(infoIdList, id)
+		if len(infoIdList) > 1000 {
+			//	 调接口
+			rs, err5 := Post(url, map[string]string{
+				"personId": "0", // 没有使用这个参数
+				"infoId":   strings.Join(infoIdList, ","),
+				"entId":    fmt.Sprintf("%d", entId),
+				"isInsert": insertFlag,
+				"isEnt":    "true",
+			})
+			log.Println("响应结果:",rs)
+			if err5 != nil|| util.IntAll(rs["code"])!=0 {
+				log.Println("企业订阅数据导出接口判重失败", err5)
+				log.Println("企业订阅数据导出接口判重失败rs:",rs)
+				log.Println("企业订阅数据导出接口判重失败rs[code]:",rs["code"])
+				log.Println("企业订阅数据导出接口判重失败code是否为0",util.IntAll(rs["code"])!=0)
+				log.Println("企业订阅数据导出接口判重失败", err5,"rs:",rs," rs[code]:",rs["code"]," ",util.IntAll(rs["code"]),"code是否为0",util.IntAll(rs["code"])!=0)
+			} else {
+				log.Println("企业订阅数据导出")
+				// 置空
+				infoIdList = []string{}
+				// 本次数据累计
+				returnData := rs["data"].(map[string]interface{})
+				log.Println(newCount, "加之前")
+				newCount += int(returnData["newCount"].(float64))
+				log.Println(newCount, "加之后")
 			}
-		}(id)
+
+		}
 		if !isFirst {
 			delete(v, "_id")
 			v["entid"] = entId
@@ -70,9 +88,36 @@ func GetEntDataExportCount(_id string, entId, entUserId, limitNum, current int,
 			v["createtime"] = time.Now().Unix()
 		}
 	}
+	if len(infoIdList) > 0 {
+		rs, err5 := Post(url, map[string]string{
+			"personId": "0", // 没有使用这个参数
+			"infoId":   strings.Join(infoIdList, ","),
+			"entId":    fmt.Sprintf("%d", entId),
+			"isInsert": insertFlag,
+			"isEnt":    "true",
+		})
+		log.Println(rs)
+		if err5 != nil|| util.IntAll(rs["code"])!=0{
+			log.Println("企业订阅数据导出接口判重失败", err5)
+			log.Println("企业订阅数据导出接口判重失败rs:",rs)
+			log.Println("企业订阅数据导出接口判重失败rs[code]:",rs["code"])
+			log.Println("企业订阅数据导出接口判重失败code是否为0",util.IntAll(rs["code"])!=0)
+			log.Println("企业订阅数据导出接口判重失败", err5,"rs:",rs," rs[code]:",rs["code"]," ",util.IntAll(rs["code"]),"code是否为0",util.IntAll(rs["code"])!=0)
+		} else {
+			log.Println("企业订阅数据导出")
+			// 置空
+			infoIdList = []string{}
+			// 本次数据累计
+			returnData := rs["data"].(map[string]interface{})
+			log.Println(newCount, "加之前")
+			newCount += int(returnData["newCount"].(float64))
+			log.Println(newCount, "加之后")
+
+		}
+	}
 	searchsWaitGroup.Wait()
 	log.Println("企业数据导出--数据遍历完成")
-	newCount = len(newCountPool)
+	//newCount = len(newCountPool)
 	log.Println("new", newCount)
 	data = res
 	return
@@ -278,3 +323,27 @@ func FormatExportDatas(data *[]map[string]interface{}, webdomain string, dataTyp
 //	}
 //	return &res, nil
 //}
+
+func Post(url string, form map[string]string) (data map[string]interface{}, err error) {
+	str := ""
+	for k, v := range form {
+		str += "&" + k + "=" + v
+	}
+	//log.Println(str)
+	res, err1 := http.Post(url, "application/x-www-form-urlencoded", strings.NewReader(str))
+	log.Println(res)
+	if err1 != nil {
+		log.Println("post err:", err1.Error())
+		return nil, err1
+
+	} else if res.Body != nil {
+		defer res.Body.Close()
+		bs, _ := ioutil.ReadAll(res.Body)
+		err2 := json.Unmarshal(bs, &data)
+		if err2 != nil {
+			return nil, err2
+		}
+
+	}
+	return data, nil
+}