Forráskód Böngészése

针对针对中国政府采购网,添加object_type 字段

wcc 2 éve
szülő
commit
2d7ca7e161
4 módosított fájl, 117 hozzáadás és 13 törlés
  1. 51 1
      createEsIndex/bidding_es.go
  2. 20 3
      createEsIndex/common.toml
  3. 33 0
      createEsIndex/es_test.go
  4. 13 9
      createEsIndex/main.go

+ 51 - 1
createEsIndex/bidding_es.go

@@ -8,6 +8,7 @@ import (
 	"encoding/json"
 	"esindex/config"
 	"esindex/oss"
+	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
 	"reflect"
@@ -61,6 +62,7 @@ func biddingTask(mapInfo map[string]interface{}) {
 	for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
 		if c1%1000 == 0 {
 			log.Info("biddingTask", zap.Int("current:", c1))
+			log.Info("biddingAllTask", zap.Any("current:_id =>", tmp["_id"]))
 		}
 		ch <- true
 		wg.Add(1)
@@ -73,12 +75,24 @@ func biddingTask(mapInfo map[string]interface{}) {
 				tmp = make(map[string]interface{})
 				return
 			}
+			//只针对增量数据处理;全量数据 需要用extracttype字段判断
 			if util.IntAll(tmp["dataprocess"]) != 8 {
 				return
 			}
+			//// 增量数据使用上面判断;全量数据使用下面配置
 			//if util.IntAll(tmp["extracttype"]) != 1 {
 			//	return
 			//}
+
+			//针对产权数据,暂时不入es 索引库
+			if util.IntAll(tmp["infoformat"]) == 3 {
+				return
+			}
+
+			/**
+			数据抽取时,有的数据的发布时间是之前的,属于增量历史数据,在判重和同步到bidding表是,会添加history_updatetime
+			字段,所以下面判断才会处理
+			*/
 			if stype == "bidding_history" && tmp["history_updatetime"] == nil {
 				return
 			}
@@ -87,6 +101,15 @@ func biddingTask(mapInfo map[string]interface{}) {
 			indexLock.Unlock()
 			newTmp, update := GetEsField(tmp, stype)
 			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
+
+			//针对中国政府采购网,单独处理
+			if util.ObjToString(tmp["site"]) == "中国政府采购网" {
+				objectType := MatchService(tmp)
+				if objectType != "" {
+					newTmp["object_type"] = objectType
+				}
+			}
+
 			if len(update) > 0 {
 				updateBiddingPool <- []map[string]interface{}{{
 					"_id": tmp["_id"],
@@ -136,12 +159,13 @@ func biddingAllTask(mapInfo map[string]interface{}) {
 	biddingConn := MgoB.GetMgoConn()
 	it := biddingConn.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(&q).Select(map[string]interface{}{
 		"contenthtml": 0,
-	}).Sort("-_id").Iter()
+	}).Iter()
 	c1, index := 0, 0
 	var indexLock sync.Mutex
 	for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
 		if c1%20000 == 0 {
 			log.Info("biddingAllTask", zap.Int("current:", c1))
+			log.Info("biddingAllTask", zap.Any("current:_id =>", tmp["_id"]))
 		}
 		ch <- true
 		wg.Add(1)
@@ -154,9 +178,16 @@ func biddingAllTask(mapInfo map[string]interface{}) {
 				tmp = make(map[string]interface{})
 				return
 			}
+			// 针对17833,需要单独屏蔽这个判断,不需要处理
 			if util.IntAll(tmp["extracttype"]) == -1 {
 				return
 			}
+
+			//针对产权数据,暂时不入es 索引库
+			if util.IntAll(tmp["infoformat"]) == 3 {
+				return
+			}
+
 			indexLock.Lock()
 			index++
 			indexLock.Unlock()
@@ -547,3 +578,22 @@ func UdpMethod(id string) {
 	log.Info("UdpMethod", zap.Any("JyUdpAddr", JyUdpAddr), zap.String("mapinfo", string(datas)))
 	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, JyUdpAddr)
 }
+
+//MatchService 针对中国招标网,匹配关键词打标签,object_type,货物、服务、工程,jsondata.item
+func MatchService(tmp map[string]interface{}) (res string) {
+	if jsondata, ok := tmp["jsondata"]; ok {
+		if da, ok := jsondata.(map[string]interface{}); ok {
+			if item, ok := da["item"]; ok {
+				fmt.Println("item=>", item)
+				services := []string{"货物", "服务", "工程"}
+				for _, v := range services {
+					if strings.Contains(util.ObjToString(item), v) {
+						return v
+					}
+				}
+			}
+		}
+	}
+
+	return
+}

+ 20 - 3
createEsIndex/common.toml

@@ -1,4 +1,3 @@
-
 [udp]
 locport = ":1783"
 jyaddr = "127.0.0.1"
@@ -33,7 +32,7 @@ password = ""
 addr = "http://192.168.3.206:9800"
 addrp = "http://192.168.3.206:9800"
 size = 5
-indexb = "bidding_v1"
+indexb = "bidding_v2"
 typeb = "bidding"
 indexp = "projectset_v2"
 typep = "projectset"
@@ -117,6 +116,7 @@ format = "text"
 "spidercode" = "string"
 "subtype" = "string"
 "toptype" = "string"
+"projectinfo" = ""
 "purchasing" = "string"
 "purchasinglist" = ""
 "channel" = "string"
@@ -151,6 +151,23 @@ format = "text"
 "docstarttime" = "int64"
 "signendtime" = "int64"
 "signstarttime" = "int64"
+"issue_quota" = "float64"
+"bidopen_shape" = "string"
+"quote_mode" = "string"
+"is_acquire_tender" = "bool"
+"is_payment_deposit" = "bool"
+"is_joint_bidding" = "bool"
+"procurementlist" = ""
+"object_type"="string"  ##针对中国政府采购网,添加字段,区分货物、服务和工程
+[db.es.fieldprojectinfo]
+"approvecode" = "string"
+"approvecontent" = "string"
+"approvestatus" = "string"
+"approvetime" = "string"
+"approvedept" = "string"
+"approvenumber" = "string"
+"projecttype" = "string"
+"approvecity" = "string"
 [db.es.fieldpurchasinglist]
 "itemname" = "string"
 "item" = "string"
@@ -165,7 +182,7 @@ format = "text"
 "buyer" = "string"
 "item" = "string"
 "projectscope" = "string"
-"expurasingtime" = "string"
+"expurasingtime" = "int64"
 "totalprice" = "float64"
 [db.es.fieldwinnerorder]
 "sort" = "int"

+ 33 - 0
createEsIndex/es_test.go

@@ -0,0 +1,33 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"testing"
+)
+
+func TestMatchService(t *testing.T) {
+	data := `{
+
+    "_id" : "6422d91e779467cff1a84885",
+    "area" : "全国",
+    "city" : "",
+    "extracttype" : 0,
+    "s_sha" : "d7cc66ac91dc6551991df0a37331b628de4c70973c6844f1ee6ef1c2d4e29e95",
+    "jsondata" : {
+        "area_city_district" : "福建",
+        "buyer" : "莆田市第一医院",
+        "item" : " 货物/医药品/医用材料/其他医用材料",
+        "agency" : "福建省荔卫药械招标服务有限公司"
+    },
+    "channel" : "地方公告"
+}`
+
+	var obj map[string]interface{}
+	if err := json.Unmarshal([]byte(data), &obj); err != nil {
+		panic(err)
+	}
+
+	objectType := MatchService(obj)
+	fmt.Println("objectType=>", objectType)
+}

+ 13 - 9
createEsIndex/main.go

@@ -30,12 +30,12 @@ var (
 	UdpTaskMap = &sync.Map{}
 	JyUdpAddr  *net.UDPAddr
 
-	EsBulkSize        = 100 // es批量保存大小
-	updateBiddingPool = make(chan []map[string]interface{}, 5000)
+	EsBulkSize        = 100                                       // es批量保存大小
+	updateBiddingPool = make(chan []map[string]interface{}, 5000) //更新bingding数据
 	updateBiddingSp   = make(chan bool, 5)
-	saveEsPool        = make(chan map[string]interface{}, 5000)
+	saveEsPool        = make(chan map[string]interface{}, 5000) //保存binding数据到es
 	saveEsSp          = make(chan bool, 5)
-	saveProjectEsPool = make(chan map[string]interface{}, 5000)
+	saveProjectEsPool = make(chan map[string]interface{}, 5000) //保存project数据到es
 	saveProjectSp     = make(chan bool, 5)
 	saveEsAllPool     = make(chan map[string]interface{}, 5000)
 	saveEsAllSp       = make(chan bool, 5)
@@ -66,10 +66,11 @@ func init() {
 }
 
 func main() {
-	go checkMapJob()
-	go task_index()
 
-	go UpdateBidding()
+	go checkMapJob() //udp 发送邮件
+	go task_index()  //定时同步更新winner_enterprise、buyer_enterprise ES索引;这个功能很少变动,几乎不需要维护
+
+	go UpdateBidding() //更新bidding表数据
 	go SaveEsMethod()
 	go SaveAllEsMethod()
 	go SaveProjectEs()
@@ -102,7 +103,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
 			tasktype, _ := mapInfo["stype"].(string)
 			switch tasktype {
-			case "index-by-id":
+			case "index-by-id": //单个索引
 				pool <- true
 				go func() {
 					defer func() {
@@ -126,6 +127,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					biddingAllTask(mapInfo)
 				}()
+
 			case "bidding_history":
 				pool <- true
 				go func() {
@@ -150,7 +152,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					biddingDataTask(data, mapInfo)
 				}()
-			case "biddingdelbyextracttype": //根据extracttype删除es
+			case "biddingdelbyextracttype": //根据bidding表extracttype=-1,删除es中重复数据
 				pool <- true
 				go func() {
 					defer func() {
@@ -201,6 +203,7 @@ type UdpNode struct {
 	retry     int
 }
 
+//UpdateBidding 更新bidding表数据
 func UpdateBidding() {
 	arru := make([][]map[string]interface{}, 200)
 	indexu := 0
@@ -271,6 +274,7 @@ func SaveBidErr() {
 	}
 }
 
+//SaveEsMethod 保存到es
 func SaveEsMethod() {
 	arru := make([]map[string]interface{}, EsBulkSize)
 	indexu := 0