瀏覽代碼

新增附件下载程序

maxiaoshan 2 年之前
父節點
當前提交
850f5cd1ff
共有 4 個文件被更改,包括 192 次插入77 次删除
  1. 2 2
      download_file/src/config.json
  2. 132 63
      download_file/src/task.go
  3. 5 12
      download_file/src/udp.go
  4. 53 0
      download_file/src/util/util.go

+ 2 - 2
download_file/src/config.json

@@ -8,8 +8,8 @@
   "udpport": ":1782",
   "nextudp": {
     "addr": "127.0.0.1",
-    "port": 1781,
-    "stype": "extract"
+    "port": 1782,
+    "stype": "update"
   },
   "mail": {
     "to": "maxiaoshan@topnet.net.cn",

+ 132 - 63
download_file/src/task.go

@@ -3,6 +3,7 @@ package main
 import (
 	"bytes"
 	"crypto/tls"
+	"encoding/base64"
 	"fmt"
 	"github.com/PuerkitoBio/goquery"
 	"io"
@@ -50,11 +51,12 @@ var (
 )
 
 type Data struct {
-	Url      string
-	Text     string
-	Ok       bool
-	By       string
-	FileType string
+	Url        string
+	Text       string
+	Ok         bool
+	By         string
+	FileType   string
+	Base64Type bool
 }
 
 // DownloadFile 补充未下附件
@@ -63,13 +65,15 @@ func DownloadFile() bool {
 	if gtid == "" || lteid == "" {
 		return false
 	}
-	GetDataAndDownload(gtid, lteid) //下载
-	SendUdp(gtid, lteid, NextStype, NextAddr, NextPort)
+	getdata := GetDataAndDownload(gtid, lteid) //下载
+	if getdata {
+		SendUdp(gtid, lteid, NextStype, NextAddr, NextPort)
+	}
 	return true
 }
 
 // DownloadFile 补充未下附件
-func GetDataAndDownload(gtid, lteid string) {
+func GetDataAndDownload(gtid, lteid string) (getdata bool) {
 	defer qu.Catch()
 	//查询数据
 	sess := MgoB.GetMgoConn()
@@ -83,19 +87,11 @@ func GetDataAndDownload(gtid, lteid string) {
 			"$lte": mongodb.StringTOBsonId(lteid),
 		},
 	}
-	//field := map[string]interface{}{
-	//	"contenthtml": 1,
-	//	"spidercode":  1,
-	//	"href":        1,
-	//	"site":        1,
-	//	"channel":     1,
-	//	"title":       1,
-	//	"competehref": 1,
-	//	"projectinfo": 1,
+	//query = map[string]interface{}{
+	//	"_id": mongodb.StringTOBsonId("64a3fa52b44bf087514687b3"), //64a216f2b44bf0875142bc1e
 	//}
-	query = map[string]interface{}{
-		"_id": mongodb.StringTOBsonId("64a216f2b44bf0875142bc1e"),
-	}
+	count := MgoB.Count("bidding", query)
+	qu.Debug("数据量:", count, "	query:", query)
 	it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Iter()
 	n := 0
 	arr := []map[string]interface{}{}
@@ -137,6 +133,7 @@ func GetDataAndDownload(gtid, lteid string) {
 					tmp["projectinfo"] = map[string]interface{}{"attachments": attachments}
 				}
 				if len(attchText) > 0 {
+					getdata = true
 					tmp["attach_text"] = attchText
 				}
 				lock.Lock()
@@ -159,6 +156,7 @@ func GetDataAndDownload(gtid, lteid string) {
 		arr = []map[string]interface{}{}
 	}
 	qu.Debug("当前轮执行完毕:", gtid, lteid)
+	return
 }
 
 // FilterAndDownload 筛选有效数据并下载对应附件
@@ -231,6 +229,9 @@ func DealAndDownload(tmp []*Data, href string) (result []*Data, attachments, att
 		if !strings.HasPrefix(url, "https") && !strings.HasPrefix(url, "http") { //异常链接
 			if strings.HasPrefix(url, "data:image/") { //base64图片
 				//待处理TODO
+				data.Base64Type = true
+				result = append(result, data)
+				data.Url = ""
 			} else {
 				url = reg_repair_href1.ReplaceAllString(url, "") //处理../ ./ /
 				//获取href域名
@@ -253,53 +254,88 @@ func DealAndDownload(tmp []*Data, href string) (result []*Data, attachments, att
 	if len(result) > 0 {
 		index := 0
 		for _, data := range result {
-			contentType, ret := Download(data.Url) //下载
-			fileType := data.FileType              //从url或者text提取的附件类型
-			if fileType == "" {
-				fileType = GetType(contentType, ret) //获取附件类型
-				data.FileType = fileType
-			}
-			if fileType != "" {
-				fileName := "附件" + fmt.Sprint(index+1) + "." + fileType
-				fid := sp.GetHashKey(ret) + sp.TypeByExt(fileName)
-				bs := bytes.NewReader(ret)
-				size := qu.ConvertFileSize(bs.Len())
-				b, _ := sp.OssPutObject(fid, io.MultiReader(bs)) //附件上传
-				//qu.Debug("oss", fileName, size, fileType, fid)
-				data.Ok = b
-				if b {
-					attachments[fmt.Sprint(index+1)] = map[string]interface{}{
-						"fid":      fid,
-						"filename": fileName,
-						"ftype":    fileType,
-						"org_url":  data.Url,
-						"size":     size,
-						"url":      "oss",
+			if data.Base64Type {
+				fileName := "附件" + fmt.Sprint(index+1) + ".jpg"
+				i := strings.Index(data.Url, ",")
+				dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(data.Url[i+1:]))
+				ret, err := io.ReadAll(dec)
+				if err == nil && len(ret) >= 1024*3 {
+					fid := sp.GetHashKey(ret) + sp.TypeByExt(fileName)
+					bs := bytes.NewReader(ret)
+					size := qu.ConvertFileSize(bs.Len())
+					data.Ok, err = sp.OssPutObject(fid, io.MultiReader(bs)) //附件上传
+					if data.Ok {                                            //上传成功,解析附件
+						GetAttachText(fid, fileName, "jpg", "", size, index, ret, attachments, attachText)
+						index++
 					}
-					//附件解析
-					conn, err := serviced.GetOcrServerConn() //链接ocr服务治理中心
-					if err == nil {
-						resp := GetFileText(conn, fileName, fid, fileType, ret)
-						if resp != nil {
-							tmap := map[string]interface{}{}
-							for i, r := range resp.Result {
-								rmap := map[string]interface{}{
-									"file_name":  r.FileName,
-									"attach_url": r.TextUrl,
-									"state":      r.ErrorState,
-								}
-								tmap[fmt.Sprint(i)] = rmap
-							}
-							if len(tmap) > 0 {
-								attachText[fmt.Sprint(index)] = tmap
-							}
-						}
-					} else {
-						qu.Debug("附件解析服务连接失败:", err)
+				}
+			} else {
+				contentType, ret := Download(data.Url) //下载
+				fileType := data.FileType              //从url或者text提取的附件类型
+				if fileType == "" {
+					fileType = GetType(contentType, ret) //获取附件类型
+					data.FileType = fileType
+				}
+				if fileType != "" && len(ret) >= 1024*3 {
+					fileName := "附件" + fmt.Sprint(index+1) + "." + fileType
+					fid := sp.GetHashKey(ret) + sp.TypeByExt(fileName)
+					bs := bytes.NewReader(ret)
+					size := qu.ConvertFileSize(bs.Len())
+					data.Ok, _ = sp.OssPutObject(fid, io.MultiReader(bs)) //附件上传
+					if data.Ok {                                          //上传成功,解析附件
+						GetAttachText(fid, fileName, fileType, data.Url, size, index, ret, attachments, attachText)
+						index++
 					}
-					index++
 				}
 			}
+
+			//contentType, ret := Download(data.Url) //下载
+			//fileType := data.FileType              //从url或者text提取的附件类型
+			//if fileType == "" {
+			//	fileType = GetType(contentType, ret) //获取附件类型
+			//	data.FileType = fileType
+			//}
+			//if fileType != "" {
+			//	fileName := "附件" + fmt.Sprint(index+1) + "." + fileType
+			//	fid := sp.GetHashKey(ret) + sp.TypeByExt(fileName)
+			//	bs := bytes.NewReader(ret)
+			//	size := qu.ConvertFileSize(bs.Len())
+			//	b, _ := sp.OssPutObject(fid, io.MultiReader(bs)) //附件上传
+			//	//qu.Debug("oss", fileName, size, fileType, fid)
+			//	data.Ok = b
+			//	if b {
+			//		attachments[fmt.Sprint(index+1)] = map[string]interface{}{
+			//			"fid":      fid,
+			//			"filename": fileName,
+			//			"ftype":    fileType,
+			//			"org_url":  data.Url,
+			//			"size":     size,
+			//			"url":      "oss",
+			//		}
+			//		//附件解析
+			//		conn, err := serviced.GetOcrServerConn() //链接ocr服务治理中心
+			//		if err == nil {
+			//			resp := GetFileText(conn, fileName, fid, fileType, ret)
+			//			if resp != nil {
+			//				tmap := map[string]interface{}{}
+			//				for i, r := range resp.Result {
+			//					rmap := map[string]interface{}{
+			//						"file_name":  r.FileName,
+			//						"attach_url": r.TextUrl,
+			//						"state":      r.ErrorState,
+			//					}
+			//					tmap[fmt.Sprint(i)] = rmap
+			//				}
+			//				if len(tmap) > 0 {
+			//					attachText[fmt.Sprint(index)] = tmap
+			//				}
+			//			}
+			//		} else {
+			//			qu.Debug("附件解析服务连接失败:", err)
+			//		}
+			//		index++
+			//	}
+			//}
 		}
 	}
 	return
@@ -354,3 +390,36 @@ func GetType(contentType string, ret []byte) string {
 	}
 	return ""
 }
+
+func GetAttachText(fid, fileName, fileType, url, size string, index int, ret []byte, attachments, attachText map[string]interface{}) {
+	defer qu.Catch()
+	attachments[fmt.Sprint(index+1)] = map[string]interface{}{
+		"fid":      fid,
+		"filename": fileName,
+		"ftype":    fileType,
+		"org_url":  url,
+		"size":     size,
+		"url":      "oss",
+	}
+	//附件解析
+	conn, err := serviced.GetOcrServerConn() //链接ocr服务治理中心
+	if err == nil {
+		resp := GetFileText(conn, fileName, fid, fileType, ret)
+		if resp != nil {
+			tmap := map[string]interface{}{}
+			for i, r := range resp.Result {
+				rmap := map[string]interface{}{
+					"file_name":  r.FileName,
+					"attach_url": r.TextUrl,
+					"state":      r.ErrorState,
+				}
+				tmap[fmt.Sprint(i)] = rmap
+			}
+			if len(tmap) > 0 {
+				attachText[fmt.Sprint(index)] = tmap
+			}
+		}
+	} else {
+		qu.Debug("附件解析服务连接失败:", err)
+	}
+}

+ 5 - 12
download_file/src/udp.go

@@ -45,18 +45,11 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			gtid := qu.ObjToString(mapInfo["gtid"])
 			lteid := qu.ObjToString(mapInfo["lteid"])
 			Udpclient.WriteUdp([]byte(gtid+"-"+lteid+"-"+stype), mu.OP_NOOP, ra)
-			//if stype == "repeat" { //历史判重发送的udp
-			//	DataChannel <- map[string]string{
-			//		"gtid":  gtid,
-			//		"lteid": lteid,
-			//	}
-			//	//准备数据
-			//	//GetBiddingDada(gtid, lteid)
-			//} else if stype == "merge" { //将extract_redownload抽取表的数据合并到bidding_redownload
-			//	UpdateBiddingData(gtid, lteid, TmpBid, TmpExt, stype)
-			//} else if stype == "redownload" { //bidding_redownload表重新下载
-			//	Redownload(gtid, lteid, TmpBid, TmpBid, "redownload")
-			//}
+			if gtid == "" || lteid == "" {
+				qu.Debug("id段错误")
+			} else if stype == "update" { //更新bidding
+				UpdateBiddingData(gtid, lteid)
+			}
 		}
 	case mu.OP_NOOP: //下个节点回应
 		ok := string(data)

+ 53 - 0
download_file/src/util/util.go

@@ -52,3 +52,56 @@ func GetIdInterval(id string) (gtid, lteid string) {
 	}
 	return "", ""
 }
+
+func UpdateBiddingData(gtid, lteid string) {
+	defer qu.Catch()
+	//查询数据
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  mongodb.StringTOBsonId(gtid),
+			"$lte": mongodb.StringTOBsonId(lteid),
+		},
+	}
+	fields := map[string]interface{}{
+		"file_add_log": 0,
+	}
+	it := sess.DB(MgoB.DbName).C("bidding_downloadfile_log").Find(&query).Select(&fields).Iter()
+	n := 0
+	arr := [][]map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			update := []map[string]interface{}{
+				{"_id": tmp["_id"]},
+				{"$set": tmp},
+			}
+			lock.Lock()
+			arr = append(arr, update)
+			if len(arr) > 100 {
+				MgoB.UpdateBulk("bidding", arr...)
+				arr = [][]map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%100 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoB.UpdateBulk("bidding", arr...)
+		arr = [][]map[string]interface{}{}
+	}
+	qu.Debug("更新数据完毕:", gtid, lteid)
+}