소스 검색

Merge branch 'dev3.1.2' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.1.2

unknown 6 년 전
부모
커밋
ab29725a8c
4개의 변경된 파일136개의 추가작업 그리고 210개의 파일을 삭제
  1. 1 1
      src/jy/cluster/aliecs.go
  2. 81 78
      src/jy/extract/extpackage.go
  3. 4 7
      src/udpfileserver/config.json
  4. 50 124
      src/udpfileserver/main.go

+ 1 - 1
src/jy/cluster/aliecs.go

@@ -44,7 +44,7 @@ func RunInstances(TaskName string, num, hours int) {
 				[]string{"InternetMaxBandwidthOut", "0"},
 				[]string{"InstanceChargeType", "PostPaid"},
 				[]string{"SpotStrategy", "SpotWithPriceLimit"},
-				[]string{"SpotPriceLimit", "1.99"},
+				[]string{"SpotPriceLimit", "4.99"},
 				[]string{"InstanceName", "extract"},
 				[]string{"UniqueSuffix", "true"},
 				[]string{"Password", Password},

+ 81 - 78
src/jy/extract/extpackage.go

@@ -23,98 +23,101 @@ func PackageDetail(j *ju.Job, e *ExtractTask) {
 				sonJobResult["origin"] = pkg.Origin
 				sonJobResult["type"] = pkg.Type
 				sonJobResult["winnerorder"] = pkg.WinnerOrder
-				for k, tags := range e.Tag {
-				L:
-					for _, tag := range tags {
-						if pkg.TableKV != nil {
-							for key, val := range pkg.TableKV.Kv {
-								if tag.Key == key {
-									clearmap[k] = false
-									var tmpval interface{}
-									if len(e.ClearFn[k]) > 0 {
-										data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
-										tmpval = data[0]
-									} else {
-										tmpval = val
-									}
-									sonJobResult[k] = tmpval
-									if packagenum == 1 {
-										field := &ju.ExtField{
-											Field:     k,
-											Code:      "package",
-											RuleText:  "package",
-											Type:      "table",
-											MatchType: "tag_string",
-											ExtFrom:   "package",
-											Value:     tmpval,
-											Score:     0,
+				//分包结果暂时不用
+				/*
+					for k, tags := range e.Tag {
+					L:
+						for _, tag := range tags {
+							if pkg.TableKV != nil {
+								for key, val := range pkg.TableKV.Kv {
+									if tag.Key == key {
+										clearmap[k] = false
+										var tmpval interface{}
+										if len(e.ClearFn[k]) > 0 {
+											data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
+											tmpval = data[0]
+										} else {
+											tmpval = val
+										}
+										sonJobResult[k] = tmpval
+										if packagenum == 1 {
+											field := &ju.ExtField{
+												Field:     k,
+												Code:      "package",
+												RuleText:  "package",
+												Type:      "table",
+												MatchType: "tag_string",
+												ExtFrom:   "package",
+												Value:     tmpval,
+												Score:     0,
+											}
+											j.Result[k] = append(j.Result[k], field)
 										}
-										j.Result[k] = append(j.Result[k], field)
+										break L
 									}
-									break L
 								}
 							}
-						}
-						if pkg.ColonKV != nil {
-							for key, val := range pkg.ColonKV.Kv {
-								if tag.Key == key {
-									clearmap[k] = true
-									var tmpval interface{}
-									if len(e.ClearFn[k]) > 0 {
-										data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
-										tmpval = data[0]
-									} else {
-										tmpval = val
-									}
-									sonJobResult[k] = tmpval
-									if packagenum == 1 {
-										field := &ju.ExtField{
-											Field:     k,
-											Code:      "package",
-											RuleText:  "package",
-											Type:      "colon",
-											MatchType: "tag_string",
-											ExtFrom:   "package",
-											Value:     tmpval,
-											Score:     0,
+							if pkg.ColonKV != nil {
+								for key, val := range pkg.ColonKV.Kv {
+									if tag.Key == key {
+										clearmap[k] = true
+										var tmpval interface{}
+										if len(e.ClearFn[k]) > 0 {
+											data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
+											tmpval = data[0]
+										} else {
+											tmpval = val
+										}
+										sonJobResult[k] = tmpval
+										if packagenum == 1 {
+											field := &ju.ExtField{
+												Field:     k,
+												Code:      "package",
+												RuleText:  "package",
+												Type:      "colon",
+												MatchType: "tag_string",
+												ExtFrom:   "package",
+												Value:     tmpval,
+												Score:     0,
+											}
+											j.Result[k] = append(j.Result[k], field)
 										}
-										j.Result[k] = append(j.Result[k], field)
+										break L
 									}
-									break L
 								}
 							}
-						}
-						if pkg.SpaceKV != nil {
-							for key, val := range pkg.SpaceKV.Kv {
-								if tag.Key == key {
-									clearmap[k] = true
-									var tmpval interface{}
-									if len(e.ClearFn[k]) > 0 {
-										data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
-										tmpval = data[0]
-									} else {
-										tmpval = val
-									}
-									sonJobResult[k] = tmpval
-									if packagenum == 1 {
-										field := &ju.ExtField{
-											Field:     k,
-											Code:      "package",
-											RuleText:  "package",
-											Type:      "space",
-											MatchType: "tag_string",
-											ExtFrom:   "package",
-											Value:     tmpval,
-											Score:     0,
+							if pkg.SpaceKV != nil {
+								for key, val := range pkg.SpaceKV.Kv {
+									if tag.Key == key {
+										clearmap[k] = true
+										var tmpval interface{}
+										if len(e.ClearFn[k]) > 0 {
+											data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
+											tmpval = data[0]
+										} else {
+											tmpval = val
 										}
-										j.Result[k] = append(j.Result[k], field)
+										sonJobResult[k] = tmpval
+										if packagenum == 1 {
+											field := &ju.ExtField{
+												Field:     k,
+												Code:      "package",
+												RuleText:  "package",
+												Type:      "space",
+												MatchType: "tag_string",
+												ExtFrom:   "package",
+												Value:     tmpval,
+												Score:     0,
+											}
+											j.Result[k] = append(j.Result[k], field)
+										}
+										break L
 									}
-									break L
 								}
 							}
 						}
 					}
-				}
+				*/
 				//如果有中标候选人排序,优先用第一中标候选人的中标单位和中标金额覆盖该包里面相应的字段的值
 				if pkg.WinnerOrder != nil && len(pkg.WinnerOrder) > 0 {
 					firstWinnerOrder := pkg.WinnerOrder[0]

+ 4 - 7
src/udpfileserver/config.json

@@ -1,15 +1,12 @@
 {
   "udpip": "127.0.0.1",
   "udpport": "8888",
-  "channelsize": "5",
+  "channelsize": "1",
   "dbsize": "5",
   "mongodb_one_ip": "127.0.0.1:27017",
   "mongodb_one_db": "spider",
-  "mongodb_one_c": "data_bak",
+  "mongodb_one_c": "bidding_file",
   "mongodb_one_filefiled": "projectinfo",
-  "mongodb_two_ip": "192.168.3.207:27082",
-  "mongodb_two_db": "spider",
-  "mongodb_two_c": "data_bak",
-  "mongodb_two_filefiled": "projectinfo",
-  "file2text": "192.168.3.207:1234"
+  "file2text": "192.168.3.207:1234",
+  "PageSize":5000
 }

+ 50 - 124
src/udpfileserver/main.go

@@ -2,7 +2,7 @@ package main
 
 import (
 	"encoding/json"
-	"gopkg.in/mgo.v2"
+	"gopkg.in/mgo.v2/bson"
 	"jy/mongodbutil"
 	"log"
 	mu "mfw/util"
@@ -11,26 +11,28 @@ import (
 	"path"
 	qu "qfw/util"
 	"strings"
-
-	"gopkg.in/mgo.v2/bson"
+	"time"
 )
 
 var udpclient mu.UdpClient //udp对象
 var Sysconfig map[string]interface{}
 var MgoIP, MgoDB, MgoC, MgoFileFiled string
 var ChanB chan bool
+var PageSize int
 
 func init() {
 	qu.ReadConfig(&Sysconfig)
 	MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"])
 	MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"])
 	MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"])
+	PageSize = qu.IntAllDef(Sysconfig["PageSize"],2000)
 	MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
-	if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
+	if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" ||PageSize <=0{
 		log.Println("获取配置文件参数失败", Sysconfig)
 		return
 	}
 	mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
+	log.Println(mongodbutil.Mgo.Get().Ping())
 	ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
 }
 
@@ -54,53 +56,35 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			return
 		}
 		log.Println(mapInfo)
+		stime :=time.Now()
 		gid := strings.TrimSpace(mapInfo["gtid"].(string))
 		lid := strings.TrimSpace(mapInfo["lteid"].(string))
 		if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
-			MgoSession, err := mgo.Dial(MgoIP)
-			defer MgoSession.Close()
-			if err != nil {
-				log.Println("mongo err:",err)
-				return
+			var jsq int64
+			query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid),"$lte": bson.ObjectIdHex(lid),}}
+			log.Println("query---:", query)
+			sum :=mongodbutil.Mgo.Count(MgoC,query)
+			log.Println("sum:", sum)
+			pageNum := (sum + PageSize - 1) / PageSize
+			limit := PageSize
+			if sum < PageSize {
+				limit = sum
 			}
+			for i := 0; i < pageNum; i++ {
+				query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid)}}
+				log.Println("page=", i+1,"query=", query)
+				list, b := mongodbutil.Mgo.Find(MgoC,query,nil,bson.M{"_id": 1,MgoFileFiled:1},false,0, limit)
+				if !b{
+					log.Println("查询失败")
+					continue
+				}
 
-			iter := MgoSession.DB(MgoDB).C(MgoC).Find(
-				bson.M{
-					"_id": bson.M{
-						"$gte": bson.ObjectIdHex(gid),
-						"$lte": bson.ObjectIdHex(lid),
-					},
-					MgoFileFiled: bson.M{
-						"$ne": nil,
-					},
-				},).Select(bson.M{"_id": 1,MgoFileFiled:1}).Iter()
-
-			//if findAll, b := mongodbutil.Mgo.Find(MgoC,
-			//	bson.M{
-			//		"_id": bson.M{
-			//			"$gte": bson.ObjectIdHex(gid),
-			//			"$lte": bson.ObjectIdHex(lid),
-			//		},
-			//		MgoFileFiled: bson.M{
-			//			"$ne": nil,
-			//		},
-			//	},
-			//	//if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}},
-			//	nil, `{"_id":"1",`+MgoFileFiled+`:"1"}`, false, -1, -1); !b {
-			//	log.Println("查询数据失败 :", string(data))
-			//} else {
-			var result *map[string]interface{}
-			log.Println("处理查询数据...")
-			for iter.Next(&result){
-				//for _, v := range *result {
-					qmap := qu.ObjToMap(result)
+				for _,v:=range *list {
+					updateNum :=0
+					qmap := qu.ObjToMap(v)
 					mid := (*qmap)["_id"]
 					if v, ok := (*qmap)[MgoFileFiled].(map[string]interface{}); !ok {
-						mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-							"$set": bson.M{
-								"updatefileErr": 1,
-							},})
-						//log.Println(mid, "mgo 转换异常", MgoFileFiled)
+						//log.Println(mid, "mgo 没有字段", MgoFileFiled)
 						continue
 					} else {
 						switch v["attachments"].(type) {
@@ -108,57 +92,26 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 							att := v["attachments"].(map[string]interface{})
 							for attk, vaatt := range att {
 								if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
-									mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-										"$set": bson.M{
-											"updatefileErr": 1,
-										},})
 									//log.Println(mid, "mgo 结构体转换失败", vaatt)
 									continue
 								} else {
 									ChanB <- true
-									save(mid,attk, qmap, &fileinfo)
+									if qu.ObjToString(fileinfo["fid"]) ==""{
+										<-ChanB
+										log.Println(mid, "mgo ", MgoFileFiled,"没有fid ",fileinfo)
+										continue
+									}
+									save(mid,attk, qmap, &fileinfo,&updateNum)
 									<-ChanB
 								}
 							}
 						}
 					}
-					//fileMap := *qu.ObjToMap(qmap["projectinfo"])
-					//fmt.Println(fileMap["attachments"])
+					gid = qu.BsonIdToSId(v["_id"])
+					jsq++
 				}
-			//}
-			defer iter.Close()
-			log.Println("处理查询数据结束...")
-			//fmt.Println(len(*findAll))
-				//if len(*findAll) <= 0 {
-				//	log.Println("查询数据为空 :", string(data))
-				//	return
-				//}
-				//for _, v := range *findAll {
-				//	qmap := *qu.ObjToMap(v)
-				//	mid := qmap["_id"]
-				//	if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
-				//		log.Println(mid, "mgo 转换异常", MgoFileFiled)
-				//		continue
-				//	} else {
-				//		switch v["attachments"].(type) {
-				//		case map[string]interface{}:
-				//			att := v["attachments"].(map[string]interface{})
-				//			for _, vaatt := range att {
-				//				if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
-				//					log.Println(mid, "mgo 结构体转换失败", vaatt)
-				//					continue
-				//				} else {
-				//					ChanB <- true
-				//					go save(mid, qmap, fileinfo)
-				//
-				//				}
-				//			}
-				//		}
-				//	}
-				//	//fileMap := *qu.ObjToMap(qmap["projectinfo"])
-				//	//fmt.Println(fileMap["attachments"])
-				//}
-			//}
+			}
+			log.Println("处理查询数据结束...",jsq,time.Now().Sub(stime))
 		} else {
 			log.Println("开始id或结束id参数错误:", string(data))
 		}
@@ -169,7 +122,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 
 }
-func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{}) {
+func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},updatenum *int) {
 	defer qu.Catch()
 	type FileData struct {
 		OrgUrl  string //源下载地址
@@ -180,10 +133,6 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{}) {
 	}
 	client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
 	if err != nil {
-		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-			"$set": bson.M{
-				"updatefileErr": 1,
-			},})
 		log.Println(mid, "rpc err :", err)
 		return
 	}
@@ -206,10 +155,6 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{}) {
 	//log.Println(mid, fileData)
 	err = client.Call("FileToText.FileToContext", fileData, &reply)
 	if err != nil {
-		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-			"$set": bson.M{
-				"updatefileErr": 1,
-			},})
 		log.Println(mid, "call ocr error:", err)
 		return
 	}
@@ -224,21 +169,13 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{}) {
 	//}
 	//reply, _ = json.Marshal(testfiles)
 	if len(reply) == 0{
-		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-			"$set": bson.M{
-				"updatefileErr": 1,
-			},})
-		log.Println(mid, "rpc返回数据为空:", string(reply))
+		log.Println(mid, "rpc返回数据为空:",qu.ObjToString((*fileinfo)["fid"]), string(reply))
 		return
 	}
-	log.Println(mid, string(reply)[:23])
+	//log.Println(mid, string(reply))
 	rdata := make(map[string]interface{})
 	if err := json.Unmarshal(reply, &rdata); err != nil {
-		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-			"$set": bson.M{
-				"updatefileErr": 1,
-			},})
-		log.Println(mid, "rpc返回数据解析失败:", err)
+		log.Println(mid, "rpc返回数据解析失败:",qu.ObjToString((*fileinfo)["fid"]), err)
 		return
 	}
 	if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
@@ -249,40 +186,29 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{}) {
 		}
 		//log.Println((*fileinfo))
 
-		asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
-		qwer := asdf["attachments"].(map[string]interface{})
-		qwer[attk] =*fileinfo
+		(*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk]=*fileinfo
+		//asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
+		//qwer := asdf["attachments"].(map[string]interface{})
+		//qwer[attk] =*fileinfo
 		//log.Println((*qmap)[MgoFileFiled])
 
 		updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
 			"$set": bson.M{
 				MgoFileFiled: (*qmap)[MgoFileFiled],
-				//MgoFileFiled: bson.M{
-				//	"attachments":bson.M{
-				//		attk:(*fileinfo),
-				//	},
-				//},
 			},
 		})
 		if updateBool{
+			*updatenum++
 			mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
 				"$set": bson.M{
-					"updatefileErr": 0,
+					"updatefileNum": &updatenum,
 				},})
 			log.Println(mid, "mongo更新数据成功")
 		}else {
-			mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-				"$set": bson.M{
-					"updatefileErr": 1,
-				},})
-			log.Println(mid, "mongo更新数据失败")
+			log.Println(mid, "mongo更新数据失败",qu.ObjToString((*fileinfo)["fid"]))
 		}
 	} else {
-		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-			"$set": bson.M{
-				"updatefileErr": 1,
-			},})
-		log.Println(mid, "调用rpc服务解析异常:", rdata["err"])
+		log.Println(mid, "调用rpc服务解析异常:",qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
 	}
 
 }