wkyuer 2 veckor sedan
förälder
incheckning
ac4acb7bd6

+ 110 - 0
sendBidToSftp/cfile/createFile.go

@@ -0,0 +1,110 @@
+package cfile
+
+import (
+	"context"
+	"encoding/csv"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
+	"os"
+	"path"
+)
+
+var (
+	headers = []string{
+		"bid",
+		"area",
+		"city",
+		"projectId",
+		"title",
+		"topType",
+		"detail",
+		"publishTime",
+		"href",
+		"jyHref",
+		"projectName",
+		"projectCode",
+		"projectScope",
+		"budget",
+		"bidAmount",
+		"buyer",
+		"buyerCreditNo",
+		"buyerPerson",
+		"buyerTel",
+		"agency",
+		"agencyPerson",
+		"agencyTel",
+		"bidOpenTime",
+		"bidEndTime",
+		"topScopeClass",
+		"winner",
+		"winnerPerson",
+		"winnerTel",
+		"winnersCreditNo",
+		"winnersEntPerson",
+		"winnersEntTel",
+		"winnersEntMail",
+	}
+	mapping = map[string]string{
+		"topScopeClass": "s_topscopeclass",
+		"winner":        "s_winner",
+		"winnerTel":     "winnertel",
+		"winnerPerson":  "winnerperson",
+		"bidEndTime":    "bidendtime",
+		"bidOpenTime":   "bidopentime",
+		"agencyTel":     "agencytel",
+		"agencyPerson":  "agencyperson",
+		"buyerTel":      "buyertel",
+		"buyerPerson":   "buyerperson",
+		"bidAmount":     "bidamount",
+		"publishTime":   "publishtime",
+		"topType":       "toptype",
+	}
+)
+
+func CreateFile(ctx context.Context, data []g.Map, out string) error {
+	dir := path.Dir(out)
+	if err := os.Mkdir(dir, 0755); err != nil {
+		if !os.IsExist(err) {
+			return err
+		}
+	}
+
+	//生成文件
+	file, err := os.Create(out)
+	if err != nil {
+		g.Log().Error(ctx, err)
+		return err
+	}
+	defer file.Close()
+
+	// 创建CSV写入器
+	writer := csv.NewWriter(file)
+	defer writer.Flush()
+	// 写入标题行
+	if err := writer.Write(headers); err != nil {
+		g.Log().Errorf(ctx, "Failed to write headers: %v", err)
+	}
+	// 写入数据行
+	c := splitData(data)
+	for _, record := range c {
+		if err := writer.Write(record); err != nil {
+			g.Log().Errorf(ctx, "Failed to write record: %v", err)
+		}
+	}
+	return nil
+}
+
+func splitData(arr []g.Map) (r [][]string) {
+	for _, m := range arr {
+		var t []string
+		for _, filed := range headers {
+			c, ok := mapping[filed]
+			if ok && c != "" {
+				filed = c
+			}
+			t = append(t, gconv.String(m[filed]))
+		}
+		r = append(r, t)
+	}
+	return r
+}

+ 35 - 0
sendBidToSftp/config.yaml

@@ -0,0 +1,35 @@
+runCron:
+  "# 0 * * * *"
+
+fileDataCut: 5000
+esPoolSize: 5
+
+runOnce:
+  isRun: true
+  startDate: "2021-08-03 00:00:00"
+  endDate: "2021-08-03 00:00:01"
+
+mongodb:
+  bidding:
+    address: "172.20.45.129:27002,172.20.45.130:27080"
+    size: 5
+    dbName: qfw_data
+    replSet: ""
+    userName: ""
+    password: ""
+
+elasticsearch:
+  default:
+    "address": "http://172.20.45.129:9206,http://172.20.45.130:9306"
+    "size": 10
+    "version": "v7"
+    "userName": ""
+    "password": ""
+
+
+sftp:
+  addr: sftp.alipay.com:22
+  user: qx-jy-purchase-data
+  pwd: GFYDBO9Y7HOJUOAT
+
+qwRobotNotice: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=e7792e7a-159d-4419-b1ed-27ea19b6ea54"

+ 264 - 0
sendBidToSftp/data/bidding.go

@@ -0,0 +1,264 @@
+package data
+
+import (
+	. "app.yhyue.com/moapp/jybase/encrypt"
+	elastic "app.yhyue.com/moapp/jybase/es"
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"context"
+	"fmt"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
+	"strings"
+	"sync"
+	"time"
+	"workTasks/common"
+)
+
+type (
+	ProjectMsg struct {
+		Id    string `json:"id" doc:"项目ID"`
+		Name  string `json:"name" doc:"项目名字"`
+		Code  string `json:"code" doc:"项目编号"`
+		Scope string `json:"scope" doc:"项目范围"`
+	}
+	EntMsg struct {
+		Person   string `json:"person" doc:"联系人"`
+		Tel      string `json:"tel" doc:"电话"`
+		CreditNo string `json:"creditNo" doc:"社会统一信用代码"`
+		Mail     string `json:"mail" doc:"邮箱"`
+	}
+)
+
+func RangeBidding(ctx context.Context, st, ed time.Time, callBack func([]g.Map)) (int64, error) {
+	sess := common.MG.DB("bidding").GetMgoConn()
+	defer common.MG.DB("bidding").DestoryMongoConn(sess)
+
+	var (
+		selectMap = map[string]interface{}{}
+		cut       = g.Cfg().MustGet(ctx, "fileDataCut").Int()
+	)
+	for _, key := range []string{"_id", "area", "city", "title", "toptype", "detail", "publishtime", "href", "budget", "bidamount", "bidopentime", "buyer", "buyerperson", "buyertel", "agency", "agencyperson", "agencytel", "bidopentime", "bidendtime", "s_topscopeclass", "s_winner", "winnerperson", "winnertel"} {
+		selectMap[key] = 1
+	}
+	it := sess.DB(common.MG.DB("bidding").DbName).C("bidding").Find(g.Map{
+		"pici": g.Map{
+			"$gte": st.Unix(),
+			"$lt":  ed.Unix(),
+		},
+	}).Select(selectMap).Iter()
+	var (
+		index  int64
+		bidSet []g.Map
+	)
+	for m := make(map[string]interface{}); it.Next(&m); {
+		index++
+		if index%10e4 == 0 {
+			g.Log().Infof(context.TODO(), "%s load %d数据", st.Format(time.DateOnly), index)
+		}
+		bidSet = append(bidSet, m)
+
+		if len(bidSet)%cut == 0 {
+			callBack(formatData(ctx, bidSet))
+			bidSet = []g.Map{}
+		}
+	}
+	if len(bidSet) > 0 {
+		callBack(formatData(ctx, bidSet))
+	}
+	return index, nil
+}
+
+// formatData 补充其他字段
+func formatData(ctx context.Context, bidList []g.Map) []g.Map {
+	var (
+		projectMsgMapping = map[string]*ProjectMsg{}
+		pLock             sync.RWMutex
+		entMsgMapping     = map[string]*EntMsg{}
+		eLock             sync.RWMutex
+
+		ids []string
+		ent []string
+	)
+	for _, m := range bidList {
+		var (
+			bid      = mongodb.BsonIdToSId(m["_id"])
+			buyer    = mongodb.BsonIdToSId(m["buyer"])
+			s_winner = mongodb.BsonIdToSId(m["s_winner"])
+		)
+		ids = append(ids, bid)
+		if buyer != "" {
+			ent = append(ent, buyer)
+		}
+		if strings.Index(s_winner, ",") > -1 {
+			for _, s := range strings.Split(s_winner, ",") {
+				if name := strings.TrimSpace(s); name != "" {
+					ent = append(ent, name)
+				}
+			}
+		} else {
+			if s_winner != "" {
+				ent = append(ent, s_winner)
+			}
+		}
+	}
+	//查询项目数据
+	batchQuery(ctx, ids, "projectset", "projectset", `{"query":{"bool":{"must":[{"terms":{"ids":["%s"]}}],"must_not":[],"should":[]}},"size":%d,"_source":["projectcode","projectname","projectscope","id","ids"]}`, func(maps *[]map[string]interface{}) error {
+		if maps != nil && len(*maps) > 0 {
+			pLock.Lock()
+			defer pLock.Unlock()
+			fmt.Println("projectset", len(*maps))
+			for _, m := range *maps {
+				var (
+					projectcode  = gconv.String(m["projectcode"])
+					projectname  = gconv.String(m["projectname"])
+					projectscope = gconv.String(m["projectscope"])
+					projectId    = gconv.String(m["id"])
+					ids          = gconv.Strings(m["ids"])
+				)
+				d := ProjectMsg{
+					Id:    projectId,
+					Name:  projectname,
+					Code:  projectcode,
+					Scope: projectscope,
+				}
+				for _, bidId := range ids {
+					projectMsgMapping[bidId] = &d
+				}
+			}
+		}
+		return nil
+	})
+
+	//查询企业数据
+	batchQuery(ctx, ent, "qyxy", "qyxy", `{"query":{"bool":{"must":[{"terms":{"name.ent_name":["%s"]}}]}},"size":%d,"_source":["credit_no","legal_person","company_email","company_phone","company_name"]}`, func(maps *[]map[string]interface{}) error {
+		if maps != nil && len(*maps) > 0 {
+			eLock.Lock()
+			defer eLock.Unlock()
+			fmt.Println("qyxy", len(*maps))
+			for _, m := range *maps {
+				var (
+					credit_no     = gconv.String(m["credit_no"])
+					legal_person  = gconv.String(m["legal_person"])
+					company_email = gconv.String(m["company_email"])
+					company_phone = gconv.String(m["company_phone"])
+					company_name  = gconv.String(m["company_name"])
+				)
+				entMsgMapping[company_name] = &EntMsg{
+					Person:   legal_person,
+					Tel:      company_phone,
+					CreditNo: credit_no,
+					Mail:     company_email,
+				}
+			}
+		}
+		return nil
+	})
+
+	//填充数据
+	for i := 0; i < len(bidList); i++ {
+		var (
+			id       = mongodb.BsonIdToSId(bidList[i]["_id"])
+			buyer    = gconv.String(bidList[i]["buyer"])
+			s_winner = gconv.String(bidList[i]["s_winner"])
+
+			buyerCreditNo                                             string //社会统一信用代码
+			winnersCreditNo, winnersPhone, winnersMail, winnersPerson string
+			projectName, projectScope, projectCode, projectId         string
+		)
+
+		if buyer != "" {
+			if buyerMsg := entMsgMapping[buyer]; buyerMsg != nil {
+				buyerCreditNo = buyerMsg.CreditNo
+			}
+		}
+		if projectMsg := projectMsgMapping[id]; projectMsg != nil {
+			projectName = projectMsg.Name
+			projectScope = projectMsg.Scope
+			projectCode = projectMsg.Code
+			projectId = projectMsg.Id
+		}
+
+		//拼接企业信用代码
+		if s_winner != "" {
+			if strings.Index(s_winner, ",") > -1 {
+				for i, s := range strings.Split(s_winner, ",") {
+					if name := strings.TrimSpace(s); name != "" {
+						if i != 0 {
+							winnersCreditNo += ","
+							winnersPhone += ","
+							winnersMail += ","
+							winnersPerson += ","
+						}
+						if winnerMsg := entMsgMapping[name]; winnerMsg != nil {
+							winnersCreditNo += winnerMsg.CreditNo
+							winnersPhone += winnerMsg.Tel
+							winnersMail += winnerMsg.Mail
+							winnersPerson += winnerMsg.Person
+						}
+					}
+				}
+			} else {
+				if winnerMsg := entMsgMapping[s_winner]; winnerMsg != nil {
+					winnersCreditNo = winnerMsg.CreditNo
+				}
+			}
+		}
+
+		bidList[i]["bid"] = id
+		bidList[i]["buyerCreditNo"] = buyerCreditNo
+		bidList[i]["winnersCreditNo"] = winnersCreditNo
+		bidList[i]["winnersEntTel"] = winnersPhone
+		bidList[i]["winnersEntMail"] = winnersMail
+		bidList[i]["winnersEntPerson"] = winnersPerson
+		bidList[i]["projectName"] = projectName
+		bidList[i]["projectScope"] = projectScope
+		bidList[i]["projectCode"] = projectCode
+		bidList[i]["projectId"] = projectId
+		if id != "" {
+			encodeId := CommonEncodeArticle("content", id)
+			bidList[i]["jyHref"] = "https://www.jianyu360.cn/article/content/" + encodeId + ".html"
+		}
+	}
+	g.Dump("cccc", len(entMsgMapping), len(projectMsgMapping))
+	return bidList
+}
+
+func batchQuery(ctx context.Context, fullData []string, index, itype, query string, callBack func(*[]map[string]interface{}) error) {
+	var (
+		wg       sync.WaitGroup
+		batchNum = 100
+		ll       = len(fullData)
+	)
+	count := ll / batchNum
+	if ll%batchNum != 0 {
+		count = count + 1
+	}
+	jobs := make(chan []string, count)
+	for i := 0; i < g.Cfg().MustGet(ctx, "esPoolSize", 5).Int(); i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			//查询企业数据
+			for siteInfo := range jobs {
+				query := fmt.Sprintf(query, strings.Join(siteInfo, "\",\""), batchNum)
+				fmt.Println(query)
+				if err := callBack(elastic.Get(index, itype, query)); err != nil {
+					g.Log().Errorf(ctx, "batchQuery 查询异常%s", err.Error())
+				}
+			}
+		}()
+	}
+
+	for i := 0; i < count; i++ {
+		var (
+			s = i * batchNum
+			e = s + batchNum
+		)
+		if e > ll {
+			e = ll
+		}
+		jobs <- fullData[s : e-1]
+	}
+	close(jobs)
+	wg.Wait()
+}

+ 104 - 0
sendBidToSftp/main.go

@@ -0,0 +1,104 @@
+package main
+
+import (
+	elastic "app.yhyue.com/moapp/jybase/es"
+	"context"
+	"fmt"
+	_ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/gcron"
+	"time"
+	"workTasks/sendBidToSftp/cfile"
+	"workTasks/sendBidToSftp/data"
+	"workTasks/sendBidToSftp/send"
+)
+
+var (
+	runJob = func(ctx context.Context, st, et time.Time) {
+		//查询当日标讯数据
+		var (
+			fileNum  int
+			dataShow = st.Format(time.DateOnly)
+			dir      = fmt.Sprintf("out/%s", dataShow)
+		)
+		count, err := data.RangeBidding(ctx, st, et, func(maps []g.Map) {
+			//生成csv文件
+			if len(maps) <= 0 {
+				return
+			}
+			err := cfile.CreateFile(ctx, maps, fmt.Sprintf("%s/%s_%03d.csv", dir, dataShow, fileNum+1))
+			if err != nil {
+				g.Log().Errorf(ctx, "生成文件异常 %v", err.Error())
+				return
+			}
+			fileNum++
+		})
+		if err != nil {
+			g.Log().Errorf(ctx, "获取 %s-%s 数据异常 %v", st.Format(time.DateTime), et.Format(time.DateTime), err)
+			return
+		}
+		g.Log().Infof(ctx, "%s共有增量数据%d条 %d个文件", dataShow, count, fileNum)
+
+		var (
+			successNum int
+			failFiles  []string
+		)
+		if count > 0 && fileNum > 0 {
+			successNum, failFiles = send.UpLoadFilesToSftp(ctx, dir)
+		}
+		g.Log().Infof(ctx, "%s上传文件 成功%d个 失败%d个", dataShow, successNum, len(failFiles))
+
+		if qwRobotNotice := g.Cfg().MustGet(ctx, "qwRobotNotice").String(); qwRobotNotice != "" {
+			var content string
+			content = fmt.Sprintf("任务[%s]\n共%d条标讯数据\n生成%d个文件", dataShow, count, fileNum)
+			if len(failFiles) > 0 {
+				content += fmt.Sprintf("\n成功上传%d个文件,失败%d个文件\n失败文件列表:%v", successNum, len(failFiles), failFiles)
+			} else {
+				content += "\b全部上传完成"
+			}
+			g.Log().Infof(ctx, content)
+			if err := send.SendSimpleMsg2ChatBot(content); err != nil {
+				g.Log().Errorf(ctx, "发送企业微信消息异常 %s", err.Error())
+			}
+		}
+	}
+)
+
+func main() {
+	mainCtx := context.Background()
+	fmt.Println(elastic.Count("bidding", "bidding", "{}"))
+	//执行一次任务
+	if g.Cfg().MustGet(mainCtx, "runOnce.isRun", false).Bool() {
+		g.Log().Infof(mainCtx, "开始执行一次性任务")
+		var (
+			startDateStr = g.Cfg().MustGet(mainCtx, "runOnce.startDate").String()
+			endDateStr   = g.Cfg().MustGet(mainCtx, "runOnce.endDate").String()
+		)
+		st, st_err := time.ParseInLocation(time.DateTime, startDateStr, time.Local)
+		et, et_err := time.ParseInLocation(time.DateTime, endDateStr, time.Local)
+		if st_err != nil || et_err != nil {
+			g.Log().Panicf(mainCtx, "开始执行时间异常%v %v", st_err, et_err)
+		}
+
+		runJob(mainCtx, st, et)
+		g.Log().Infof(mainCtx, "一次性任务执行完成")
+		return
+	}
+
+	//定时执行
+	_, cronErr := gcron.AddSingleton(mainCtx, g.Cfg().MustGet(mainCtx, "runCron", "# # * * * *").String(), func(ctx context.Context) {
+		g.Log().Infof(mainCtx, "开始执行定时任务")
+		var (
+			now = time.Now()
+
+			timeEnd   = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
+			timeStart = timeEnd.AddDate(0, 0, -1)
+		)
+		runJob(ctx, timeStart, timeEnd)
+		g.Log().Infof(mainCtx, "定时任务执行完成")
+	})
+	if cronErr != nil {
+		g.Log().Panicf(mainCtx, "创建定时任务异常 %v", cronErr)
+	}
+	select {}
+}

+ 40 - 0
sendBidToSftp/send/qywx.go

@@ -0,0 +1,40 @@
+package send
+
+import (
+	"context"
+	"github.com/gogf/gf/v2/errors/gerror"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
+)
+
+type qywxCommonRes struct {
+	Errcode int    `json:"errcode"`
+	Errmsg  string `json:"errmsg"`
+}
+
+func SendSimpleMsg2ChatBot(content string, atList ...string) error {
+	var qwRobotNotice = g.Cfg().MustGet(context.Background(), "qwRobotNotice").String()
+	if qwRobotNotice == "" {
+		return nil
+	}
+	res, err := g.Client().Header(g.MapStrStr{"Content-Type": "application/json"}).
+		Post(context.TODO(), qwRobotNotice,
+			g.Map{
+				"msgtype": "text",
+				"text": g.Map{
+					"content":        content,
+					"mentioned_list": atList,
+				},
+			})
+	if err != nil {
+		return err
+	}
+	var commonRes qywxCommonRes
+	if err := gconv.Struct(res.ReadAll(), &commonRes); err != nil {
+		return gerror.Wrap(err, "读取企业微信消息异常")
+	}
+	if commonRes.Errcode == 0 {
+		return nil
+	}
+	return gerror.New(commonRes.Errmsg)
+}

+ 104 - 0
sendBidToSftp/send/send.go

@@ -0,0 +1,104 @@
+package send
+
+import (
+	"context"
+	"fmt"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/pkg/sftp"
+	"golang.org/x/crypto/ssh"
+	"io"
+	"io/fs"
+	"os"
+	"path/filepath"
+	"time"
+)
+
+func UpLoadFilesToSftp(ctx context.Context, dir string) (int, []string) {
+	// SFTP 连接配置
+	var (
+		config = &ssh.ClientConfig{
+			User: g.Cfg().MustGet(ctx, "sftp.user").String(), // 替换为你的用户名
+			Auth: []ssh.AuthMethod{
+				ssh.Password(g.Cfg().MustGet(ctx, "sftp.pwd").String()), // 替换为你的密码
+			},
+			HostKeyCallback: ssh.InsecureIgnoreHostKey(), // 生产环境应使用 ssh.FixedHostKey
+			Timeout:         30 * time.Second,
+		}
+
+		successNum int
+		failPath   []string
+	)
+
+	// 使用 fs.WalkDir
+	fs.WalkDir(os.DirFS(dir), ".", func(path string, d fs.DirEntry, err error) error {
+		if err != nil {
+			g.Log().Infof(ctx, "访问路径 %q 失败: %v", path, err)
+			return nil
+		}
+		if !d.IsDir() {
+			// 获取文件信息
+			info, err := d.Info()
+			if err != nil {
+				g.Log().Infof(ctx, "获取文件信息失败: %v", err)
+				return nil
+			}
+			fullPath := filepath.Join(dir, path)
+			g.Log().Infof(ctx, "文件: %s (大小: %d bytes)", fullPath, info.Size())
+			// 上传文件
+			uploadErr := uploadToSFTP(config, g.Cfg().MustGet(ctx, "sftp.addr").String(), fullPath, fullPath)
+			if uploadErr != nil {
+				failPath = append(failPath, fullPath)
+				g.Log().Errorf(ctx, "%s 文件上传失败: %s", fullPath, uploadErr.Error())
+			} else {
+				successNum++
+				g.Log().Infof(ctx, "%s 文件上传成功!", fullPath)
+			}
+		}
+		return nil
+	})
+	return successNum, failPath
+}
+
+// 上传文件到SFTP服务器
+func uploadToSFTP(config *ssh.ClientConfig, addr, localPath, remotePath string) error {
+	// 1. 建立SSH连接
+	sshClient, err := ssh.Dial("tcp", addr, config)
+	if err != nil {
+		return fmt.Errorf("SSH连接失败: %w", err)
+	}
+	defer sshClient.Close()
+
+	// 2. 创建SFTP客户端
+	sftpClient, err := sftp.NewClient(sshClient)
+	if err != nil {
+		return fmt.Errorf("创建SFTP客户端失败: %w", err)
+	}
+	defer sftpClient.Close()
+
+	// 3. 打开本地文件
+	localFile, err := os.Open(localPath)
+	if err != nil {
+		return fmt.Errorf("打开本地文件失败: %w", err)
+	}
+	defer localFile.Close()
+
+	// 4. 确保远程目录存在
+	remoteDir := filepath.Dir(remotePath)
+	if err := sftpClient.MkdirAll(remoteDir); err != nil {
+		return fmt.Errorf("创建远程目录失败: %w", err)
+	}
+
+	// 5. 创建远程文件
+	remoteFile, err := sftpClient.Create(remotePath)
+	if err != nil {
+		return fmt.Errorf("创建远程文件失败: %w", err)
+	}
+	defer remoteFile.Close()
+
+	// 6. 复制文件内容
+	if _, err := io.Copy(remoteFile, localFile); err != nil {
+		return fmt.Errorf("复制文件内容失败: %w", err)
+	}
+
+	return nil
+}