Jianghan 2 жил өмнө
parent
commit
0022dfd288

+ 133 - 55
createEsIndex/bidding_es.go

@@ -8,6 +8,7 @@ import (
 	"encoding/json"
 	"esindex/config"
 	"esindex/oss"
+	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
 	"reflect"
 	"regexp"
@@ -15,7 +16,6 @@ import (
 	"strings"
 	"sync"
 	"time"
-	"unicode/utf8"
 )
 
 var (
@@ -24,12 +24,18 @@ var (
 	TimeClear   = regexp.MustCompile("[年|月|/|.|-]")
 	filterSpace = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]")
 	date1       = regexp.MustCompile("20[0-2][0-9][年|\\-/.][0-9]{1,2}[月|\\-/.][0-9]{1,2}[日]?")
+	HtmlReg     = regexp.MustCompile("<[^>]+>")
 )
 
 func biddingTask(mapInfo map[string]interface{}) {
 	defer util.Catch()
 
 	stype := util.ObjToString(mapInfo["stype"])
+	if stype == "bidding" {
+		uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])},
+			"lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}}
+		MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 8, "updatetime": time.Now().Unix()}}, false, false)
+	}
 	q, _ := mapInfo["query"].(map[string]interface{})
 	if q == nil {
 		q = map[string]interface{}{
@@ -53,7 +59,7 @@ func biddingTask(mapInfo map[string]interface{}) {
 	c1, index := 0, 0
 	var indexLock sync.Mutex
 	for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
-		if count%1000 == 0 {
+		if c1%1000 == 0 {
 			log.Info("biddingTask", zap.Int("current:", c1))
 		}
 		ch <- true
@@ -67,25 +73,29 @@ func biddingTask(mapInfo map[string]interface{}) {
 				tmp = make(map[string]interface{})
 				return
 			}
-			if util.IntAll(tmp["dataprocess"]) == 8 {
-				indexLock.Lock()
-				index++
-				indexLock.Unlock()
-				newTmp, update := GetEsField(tmp, stype)
-				newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
-				if len(update) > 0 {
-					updateBiddingPool <- []map[string]interface{}{{
-						"_id": tmp["_id"],
-					},
-						{"$set": update},
-					}
-				}
-				if util.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" {
-					// 剑鱼信息发布数据 通过udp通知信息发布程序
-					go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"]))
+			if util.IntAll(tmp["dataprocess"]) != 8 {
+				return
+			}
+			if stype == "bidding_history" && tmp["history_updatetime"] == nil {
+				return
+			}
+			indexLock.Lock()
+			index++
+			indexLock.Unlock()
+			newTmp, update := GetEsField(tmp, stype)
+			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
+			if len(update) > 0 {
+				updateBiddingPool <- []map[string]interface{}{{
+					"_id": tmp["_id"],
+				},
+					{"$set": update},
 				}
-				saveEsPool <- newTmp
 			}
+			if util.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" {
+				// 剑鱼信息发布数据 通过udp通知信息发布程序
+				go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"]))
+			}
+			saveEsPool <- newTmp
 		}(tmp)
 		tmp = map[string]interface{}{}
 	}
@@ -93,6 +103,68 @@ func biddingTask(mapInfo map[string]interface{}) {
 	log.Info("biddingTask over", zap.Int("count", c1), zap.Int("index", index))
 }
 
+func biddingAllTask(mapInfo map[string]interface{}) {
+	defer util.Catch()
+
+	stype := util.ObjToString(mapInfo["stype"])
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	}
+
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+
+	//bidding库
+	biddingConn := MgoB.GetMgoConn()
+	it := biddingConn.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(&q).Select(map[string]interface{}{
+		"contenthtml": 0,
+	}).Sort("-_id").Iter()
+	c1, index := 0, 0
+	var indexLock sync.Mutex
+	for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
+		if c1%20000 == 0 {
+			log.Info("biddingAllTask", zap.Int("current:", c1))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+				tmp = make(map[string]interface{})
+				return
+			}
+			if util.IntAll(tmp["extracttype"]) == -1 {
+				return
+			}
+			indexLock.Lock()
+			index++
+			indexLock.Unlock()
+			newTmp, update := GetEsField(tmp, stype)
+			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
+			if len(update) > 0 {
+				updateBiddingPool <- []map[string]interface{}{{
+					"_id": tmp["_id"],
+				},
+					{"$set": update},
+				}
+			}
+			saveEsPool <- newTmp
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	log.Info("biddingAllTask over", zap.Int("count", c1), zap.Int("index", index))
+}
+
 func biddingTaskById(mapInfo map[string]interface{}) {
 	defer util.Catch()
 
@@ -121,23 +193,10 @@ func biddingTaskById(mapInfo map[string]interface{}) {
 func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{}, map[string]interface{}) {
 	newTmp := make(map[string]interface{})
 	update := make(map[string]interface{}) // bidding 修改字段
+	saveErr := make(map[string]interface{})
 	for field, ftype := range config.Conf.DB.Es.FieldEs {
 		if tmp[field] != nil { //
-			if field == "projectinfo" {
-				mp, _ := tmp[field].(map[string]interface{})
-				if mp != nil {
-					newmap := map[string]interface{}{}
-					for k, ktype := range config.Conf.DB.Es.FieldProjectInfo {
-						mpv := mp[k]
-						if mpv != nil && reflect.TypeOf(mpv).String() == ktype {
-							newmap[k] = mp[k]
-						}
-					}
-					if len(newmap) > 0 {
-						newTmp[field] = newmap
-					}
-				}
-			} else if field == "purchasinglist" { //标的物处理
+			if field == "purchasinglist" { //标的物处理
 				purchasinglist_new := []map[string]interface{}{}
 				if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 {
 					for _, ls := range pcl {
@@ -169,14 +228,16 @@ func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{
 								p2[k] = util.ObjToString(tmp["projectname"])
 							} else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
 								p2[k] = util.ObjToString(tmp["buyer"])
-							} else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
-								res := getMethod(util.ObjToString(p1[k]))
-								if res != 0 {
-									p2[k] = res
-								}
 							} else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
 								p2[k] = p1[k]
 							}
+							//else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
+							//	res := getMethod(util.ObjToString(p1[k]))
+							//	if res != 0 {
+							//		p2[k] = res
+							//	}
+							//}
+
 						}
 						arr = append(arr, p2)
 					}
@@ -186,10 +247,10 @@ func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{
 				}
 			} else if field == "projectscope" {
 				ps, _ := tmp["projectscope"].(string)
+				newTmp["projectscope"] = ps
 				if len(ps) > pscopeLength {
-					newTmp["projectscope"] = string(([]rune(ps))[:pscopeLength])
-				} else {
-					newTmp["projectscope"] = ps
+					saveErr["projectscope"] = ps
+					saveErr["projectscope_length"] = len(ps)
 				}
 			} else if field == "winnerorder" { //中标候选
 				winnerorder_new := []map[string]interface{}{}
@@ -240,10 +301,11 @@ func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{
 				}
 			} else if field == "detail" { //过滤
 				detail, _ := tmp[field].(string)
-				if len([]rune(detail)) > detailLength {
-					detail = detail[:detailLength]
-				}
 				detail = filterSpace.ReplaceAllString(detail, "")
+				if len(detail) > pscopeLength {
+					saveErr["detail"] = detail
+					saveErr["detail_length"] = len(detail)
+				}
 				if tmp["cleartag"] != nil {
 					if tmp["cleartag"].(bool) {
 						text, _ := FilterDetail(detail)
@@ -277,11 +339,20 @@ func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{
 	filetext := getFileText(tmp)
 	if len([]rune(filetext)) > 10 {
 		newTmp["filetext"] = filetext
+		if len(filetext) > pscopeLength {
+			saveErr["filetext"] = filetext
+			saveErr["filetext_length"] = len(filetext)
+		}
 	}
 	YuceEndtime(newTmp) // 预测结果时间
-	if stype == "bidding" {
+	if stype == "bidding" || stype == "bidding_history" {
 		newTmp["createtime"] = time.Now().Unix() // es库数据创建时间,只有增量数据有
 	}
+
+	if len(saveErr) > 0 {
+		saveErr["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
+		saveErrBidPool <- saveErr
+	}
 	return newTmp, update
 }
 
@@ -333,18 +404,25 @@ func getFileText(tmp map[string]interface{}) (filetext string) {
 				for _, result := range tmpData2 {
 					if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
 						if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" {
-							bs := oss.OssGetObject(attach_url) //oss读数据
-							if utf8.RuneCountInString(filetext+bs) < fileLength {
-								filetext += bs + "\n"
+							bs := oss.OssGetObject(attach_url, mongodb.BsonIdToSId(tmp["_id"])) //oss读数据
+							//if utf8.RuneCountInString(filetext+bs) < fileLength {
+							//	filetext += bs + "\n"
+							//} else {
+							//	if utf8.RuneCountInString(bs) > fileLength {
+							//		filetext = bs[0:fileLength]
+							//	} else {
+							//		filetext = bs
+							//	}
+							//	break
+							//}
+							if len(filetext) > 500000 {
+								filetext = filetext[0:500000]
+								break
 							} else {
-								if utf8.RuneCountInString(bs) > fileLength {
-									filetext = bs[0:fileLength]
-								} else {
-									filetext = bs
+								if len(bs) <= 500000 {
+									filetext += bs + "\n"
 								}
-								break
 							}
-							//filetext += bs + "\n"
 						}
 					}
 				}

+ 1 - 0
createEsIndex/biddingdata.go

@@ -44,6 +44,7 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 		"publishtime": 1,
 		"spidercode":  1,
 		"extracttype": 1,
+		"comeintime":  1,
 	}).Sort("_id").Iter()
 	n := 0
 	//更新数组

+ 8 - 17
createEsIndex/common.toml

@@ -6,7 +6,7 @@ jyport = 11118
 
 [db]
 [db.mongoB]
-addr = "192.168.3.207:27092"
+addr = "127.0.0.1:27092"
 dbname = "wjh"
 coll = "bidding"
 size = 15
@@ -14,7 +14,7 @@ user = ""
 password = ""
 
 [db.mongoP]
-addr = "192.168.3.207:27092"
+addr = "127.0.0.1:27092"
 dbname = "wjh"
 coll = "projectset"
 size = 15
@@ -22,17 +22,18 @@ user = ""
 password = ""
 
 [db.mongoQ]
-addr = "192.168.3.207:27092"
-dbname = "wjh"
-coll = "projectset"
+addr = "127.0.0.1:27092"
+dbname = "mixdata"
+coll = "qyxy_std"
 size = 15
 user = ""
 password = ""
 
 [db.es]
-addr = "http://192.168.3.206:9800"
+addr = "http://127.0.0.1:19800"
+addrp = "http://127.0.0.1:19800"
 size = 5
-indexb = "bidding"
+indexb = "bidding_v1"
 typeb = "bidding"
 indexp = "projectset"
 typep = "projectset"
@@ -116,7 +117,6 @@ format = "text"
 "spidercode" = "string"
 "subtype" = "string"
 "toptype" = "string"
-"projectinfo" = ""
 "purchasing" = "string"
 "purchasinglist" = ""
 "channel" = "string"
@@ -146,15 +146,6 @@ format = "text"
 "multipackage" = "int32"
 "isValidFile" = "bool"
 "bid_field" = "string"
-[db.es.fieldprojectinfo]
-"approvecode" = "string"
-"approvecontent" = "string"
-"approvestatus" = "string"
-"approvetime" = "string"
-"approvedept" = "string"
-"approvenumber" = "string"
-"projecttype" = "string"
-"approvecity" = "string"
 [db.es.fieldpurchasinglist]
 "itemname" = "string"
 "item" = "string"

+ 1 - 1
createEsIndex/config/conf.go

@@ -76,6 +76,7 @@ type mgo struct {
 
 type es struct {
 	Addr                 string
+	AddrP                string
 	Size                 int
 	IndexB               string
 	TypeB                string
@@ -86,7 +87,6 @@ type es struct {
 	IndexBuyer           string
 	TypeBuyer            string
 	FieldEs              map[string]interface{}
-	FieldProjectInfo     map[string]interface{}
 	FieldPurchasingList  map[string]interface{}
 	FieldProcurementList map[string]interface{}
 	FieldWinnerOrder     map[string]interface{}

+ 6 - 0
createEsIndex/init.go

@@ -62,4 +62,10 @@ func InitEs() {
 		I_size:  config.Conf.DB.Es.Size,
 	}
 	Es.InitElasticSize()
+
+	Es1 = &elastic.Elastic{
+		S_esurl: config.Conf.DB.Es.AddrP,
+		I_size:  config.Conf.DB.Es.Size,
+	}
+	Es1.InitElasticSize()
 }

+ 92 - 4
createEsIndex/main.go

@@ -24,13 +24,13 @@ var (
 	MgoP *mongodb.MongodbSim
 	MgoQ *mongodb.MongodbSim
 
-	Es *elastic.Elastic
+	Es, Es1 *elastic.Elastic
 
 	UdpClient  udp.UdpClient
 	UdpTaskMap = &sync.Map{}
 	JyUdpAddr  *net.UDPAddr
 
-	EsBulkSize        = 200 // es批量保存大小
+	EsBulkSize        = 100 // es批量保存大小
 	updateBiddingPool = make(chan []map[string]interface{}, 5000)
 	updateBiddingSp   = make(chan bool, 5)
 	saveEsPool        = make(chan map[string]interface{}, 5000)
@@ -40,9 +40,13 @@ var (
 	saveEsAllPool     = make(chan map[string]interface{}, 5000)
 	saveEsAllSp       = make(chan bool, 5)
 
+	saveErrBidPool = make(chan map[string]interface{}, 5000)
+	saveBidSp      = make(chan bool, 5)
+
 	detailLength = 50000 // es保存detail长度
 	fileLength   = 50000 // es保存附件文本长度
 	pscopeLength = 32766 // projectscope长度
+
 )
 
 func init() {
@@ -70,6 +74,8 @@ func main() {
 	go SaveAllEsMethod()
 	go SaveProjectEs()
 
+	go SaveBidErr()
+
 	UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
 	UdpClient.Listen(processUdpMsg)
 	log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
@@ -112,6 +118,22 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					biddingTask(mapInfo)
 				}()
+			case "biddingall":
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					biddingAllTask(mapInfo)
+				}()
+			case "bidding_history":
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					biddingTask(mapInfo)
+				}()
 			case "project":
 				pool <- true
 				go func() {
@@ -214,6 +236,41 @@ func UpdateBidding() {
 	}
 }
 
+func SaveBidErr() {
+	arru := make([]map[string]interface{}, 200)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveErrBidPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == 200 {
+				saveBidSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBidSp
+					}()
+					MgoB.SaveBulk("bidding_es_err_record", arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, 200)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveBidSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBidSp
+					}()
+					MgoB.SaveBulk("bidding_es_err_record", arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, 200)
+				indexu = 0
+			}
+		}
+	}
+}
+
 func SaveEsMethod() {
 	arru := make([]map[string]interface{}, EsBulkSize)
 	indexu := 0
@@ -263,7 +320,7 @@ func SaveAllEsMethod() {
 					defer func() {
 						<-saveEsAllSp
 					}()
-					Es.BulkSave("biddingall", "bidding", &arru, true)
+					Es1.BulkSave("biddingall", "bidding", &arru, true)
 				}(arru)
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -275,7 +332,7 @@ func SaveAllEsMethod() {
 					defer func() {
 						<-saveEsAllSp
 					}()
-					Es.BulkSave("biddingall", "bidding", &arru, true)
+					Es1.BulkSave("biddingall", "bidding", &arru, true)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -349,3 +406,34 @@ func checkMapJob() {
 		}
 	}
 }
+
+func task() {
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+
+	query := sess.DB("qfw").C("bidding_0922").Find(nil).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%1000 == 0 {
+			util.Debug("current ---", count)
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if id := util.ObjToString(tmp["infoid"]); mongodb.IsObjectIdHex(id) {
+				biddingTaskById(map[string]interface{}{"infoid": id, "stype": "bidding"})
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+
+	util.Debug("over ---", count)
+}

+ 6 - 4
createEsIndex/oss/ossclient.go

@@ -2,8 +2,10 @@ package oss
 
 import (
 	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
 	"fmt"
 	"github.com/aliyun/aliyun-oss-go-sdk/oss"
+	"go.uber.org/zap"
 	"io/ioutil"
 	"os"
 )
@@ -25,25 +27,25 @@ func InitOss() {
 	ossclient = client
 }
 
-func OssGetObject(objectName string) string {
+func OssGetObject(objectName, id string) string {
 	util.Catch()
 	// 获取存储空间。
 	bucket, err := ossclient.Bucket(ossBucketName)
 	if err != nil {
-		fmt.Println("Error:", err)
 		return ""
 	}
 
 	// 下载文件到流。
 	body, err := bucket.GetObject(objectName)
 	if err != nil {
-		fmt.Println("Error:", err)
+		//log.Info("OssGetObject", zap.String("key", objectName), zap.String("id", id))
+		//log.Info("OssGetObject", zap.Error(err))
 		return ""
 	}
 	defer body.Close()
 	data, err := ioutil.ReadAll(body)
 	if err != nil {
-		fmt.Println("Error:", err)
+		log.Info("OssGetObject", zap.Error(err))
 		return ""
 	}
 	return string(data)