Jianghan 3 年之前
父节点
当前提交
1ec480cd64

+ 7 - 7
fullproject/src_v1/config.json

@@ -4,11 +4,11 @@
     "statusdays": 15,
 	"mongodbServers": "192.168.3.207:27092",
     "mongodbPoolSize": 10,
-    "mongodbName": "wjh",
-	"hints":"publishtime_1",
-    "extractColl": "extract",
+    "mongodbName": "zhaolongyue",
+	"hints":"_id_1_publishtime_1",
+    "extractColl": "extract_test_new",
     "extractColl1": "extract",
-    "projectColl": "projectset",
+    "projectColl": "projectset-test1",
     "backupFlag": true,
     "siteColl": "site",
     "thread": 1,
@@ -22,19 +22,19 @@
         "db": ""
     },
     "bidding": {
-        "addr": "192.168.3.207:27092",
+        "addr": "127.0.0.1:27092",
         "dbname": "qfw",
         "dbsize": 5,
         "uname": "dataAnyWrite",
         "upwd": "data@dataAnyWrite"
     },
     "spider": {
-        "addr": "192.168.3.207:27092",
+        "addr": "127.0.0.1:27092",
         "dbname": "editor",
         "dbsize": 2
     },
     "es": {
-        "addr": "http://192.168.3.11:9800",
+        "addr": "http://127.0.0.1:9801",
         "index": "projectset",
         "itype": "projectset",
         "pool": 10

+ 4 - 5
fullproject/src_v1/main.go

@@ -80,7 +80,7 @@ func DealSign() {
 	}
 }
 
-func main() {
+func mainT() {
 	//udp跑增量  id段   project
 	//udp跑全量			qlT
 	//udp跑历史数据  信息id1,id2/或id段  ls
@@ -99,10 +99,9 @@ func main() {
 	time.Sleep(99999 * time.Hour)
 }
 
-//测试组人员使用
-func mainT() {
-	sid = "61ace36c45a326c6c325093e"
-	eid = "61b1738445a326c6c32c29e8"
+func main() {
+	sid = "61d3b61345a326c6c35080f8"
+	eid = "629496011cd2d8ec2137ed5b"
 	//flag.StringVar(&sid, "sid", "", "开始id")
 	//flag.StringVar(&eid, "eid", "", "结束id")
 	//flag.Parse()

+ 12 - 13
fullproject/src_v1/project.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"encoding/json"
-	"jy/util"
 	"log"
 	"math"
 	"mongodb"
@@ -98,14 +97,15 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 	p.findLock.Lock()
 	defer p.findLock.Unlock()
 	// 3.18 isfow=0数据不参与项目合并		(1表示正常数据招标流程)
-	code := strings.ReplaceAll(qu.ObjToString(tmp["spidercode"]), " ", "")
-	p.mapSpiderLock.Lock()
-	isflow := p.mapSpider[code]
-	p.mapSpiderLock.Unlock()
-	if isflow == 0 {
-		p.NewProject(tmp, info)
-		return
-	}
+	//code := strings.ReplaceAll(qu.ObjToString(tmp["spidercode"]), " ", "")
+	//p.mapSpiderLock.Lock()
+	//isflow := p.mapSpider[code]
+	//p.mapSpiderLock.Unlock()
+	//if isflow == 0 {
+	//	qu.Debug("isflow 新建项目" + code)
+	//	p.NewProject(tmp, info)
+	//	return
+	//}
 
 	//只有或没有采购单位的无法合并
 	//bpn, bpc, bptc, bpb 是否查找到,并标识位置。-1代表未查找到。
@@ -152,7 +152,6 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 
 			resVal, pjVal := Select(compareStr, info, compareProject)
 			//---------------------------------------
-			//log.Println(resVal, pjVal, compareProject)
 			if resVal > 0 {
 				compareBuyer, compareCity, compareTime, compareAgency, compareBudget, compareBidmount, score2 := p.compareBCTABB(info, compareProject, diffTime, score)
 
@@ -601,7 +600,7 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 	}
 	// first_cooperation
 	if p1.Buyer != "" && len(thisinfo.Winners) > 0 {
-		FirstCooperation(set, p1.Buyer, thisinfo.Winners, thisinfo.EntIdList)
+		//FirstCooperation(set, p1.Buyer, thisinfo.Winners, thisinfo.EntIdList)
 	}
 
 	p1.InfoFiled = make(map[string]InfoField)
@@ -989,7 +988,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	}
 	// first_cooperation
 	if pInfo.Buyer != "" && len(pInfo.Winners) > 0 {
-		FirstCooperation(set, pInfo.Buyer, pInfo.Winners, pInfo.EntIdList)
+		//FirstCooperation(set, pInfo.Buyer, pInfo.Winners, pInfo.EntIdList)
 	}
 
 	//项目规模
@@ -1473,7 +1472,7 @@ func FirstCooperation(set map[string]interface{}, b string, winns, entidlist []s
 	defer func() {
 		// 处理数组越界异常
 		if r := recover(); r != nil {
-			util.Debug("recover...:", r)
+			//qu.Debug("recover...:", r)
 		}
 	}()
 	pid := mongodb.BsonIdToSId(set["_id"])

+ 1 - 1
fullproject/src_v1/project_tool.go

@@ -1,6 +1,6 @@
 package main
 
-var DateTimeSelect = []string{"bidopentime", "signaturedate", "project_completedate", "comeintime"}
+var DateTimeSelect = []string{"bidopentime", "bidendtime", "signaturedate", "comeintime"}
 
 //项目中的字段
 var FIELDS = []string{

+ 7 - 3
fullproject/src_v1/task.go

@@ -502,9 +502,9 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 		util.Debug("共查询:", c, "条")
 	}
 	ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
-	if Sysconfig["hints"] != nil {
-		ms.Hint(Sysconfig["hints"])
-	}
+	//if Sysconfig["hints"] != nil {
+	//	ms.Hint(Sysconfig["hints"])
+	//}
 	query := ms.Iter()
 	var lastid interface{}
 L:
@@ -604,6 +604,10 @@ func ParseInfo(tmp map[string]interface{}) (info *Info) {
 		return nil
 	}
 
+	if tmp["pt_modify"] != nil {
+		thisinfo.Publishtime = util.Int64All(tmp["pt_modify"])
+		tmp["publishtime"] = tmp["pt_modify"]
+	}
 	// 处理publishtime为空
 	if thisinfo.Publishtime <= 0 {
 		for _, d := range DateTimeSelect {

+ 12 - 14
udpcreateindex/src/biddingall.go

@@ -434,8 +434,6 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 							if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
 								newTmp[field] = qutil.Int64All(tmp[field])
 							}
-						} else if field == "s" {
-							newTmp[field] = tmp[field]
 						} else { //其它字段判断数据类型,不正确舍弃
 							if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
 								continue
@@ -470,12 +468,12 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 					time.Sleep(time.Second * 10)
 				}
 				elastic.BulkSave(index, itype, &tmps, true)
-				if len(multiIndex) == 2 {
-					elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
-				}
-				if other_index != "" && other_itype != "" { //备份库同时生索引
-					bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
-				}
+				//if len(multiIndex) == 2 {
+				//	elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
+				//}
+				//if other_index != "" && other_itype != "" { //备份库同时生索引
+				//	bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+				//}
 				arrEs = []map[string]interface{}{}
 			}
 			UpdatesLock.Unlock()
@@ -499,12 +497,12 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 			time.Sleep(time.Second * 10)
 		}
 		elastic.BulkSave(index, itype, &tmps, true)
-		if len(multiIndex) == 2 {
-			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
-		}
-		if other_index != "" && other_itype != "" { //备份库同时生索引
-			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
-		}
+		//if len(multiIndex) == 2 {
+		//	elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
+		//}
+		//if other_index != "" && other_itype != "" { //备份库同时生索引
+		//	bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+		//}
 	}
 	UpdatesLock.Unlock()
 	log.Println(mapInfo, "create bidding index...over", n)

+ 9 - 0
udpcreateindex/src/biddingdata.go

@@ -24,6 +24,15 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 				"$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
 			},
 		}
+	} else {
+		idMap := q["_id"].(map[string]interface{})
+		tmpQ := map[string]interface{}{}
+		for c, id := range idMap {
+			if idStr, ok := id.(string); ok && id != "" {
+				tmpQ[c] = mongodb.StringTOBsonId(idStr)
+			}
+		}
+		q["_id"] = tmpQ
 	}
 	//bidding库
 	session := mgo.GetMgoConn()

+ 31 - 3
udpcreateindex/src/biddingindex.go

@@ -96,7 +96,11 @@ func biddingTask(data []byte, mapInfo map[string]interface{}, tasktype string) {
 		mgo.DestoryMongoConn(session)
 	}
 	log.Println(mapInfo, "create bidding index...over", "all:", count, "bidding size:", n1, ",es size:", n2)
-
+	if tasktype == "bidding_history" {
+		qutil.Debug(tasktype)
+		// 历史判重id段结束之后 生全量数据索引
+		biddingDataTask(data, mapInfo)
+	}
 }
 
 func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey, tasktype string) (int, int) {
@@ -112,7 +116,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 	log.Println("开始迭代..")
 	for n, tmp := range infos {
 		n1++
-		if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+		if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive == "测试" || sensitive == "异常" { //bidding中有敏感词,不生索引
 			tmp = make(map[string]interface{})
 			continue
 		}
@@ -405,6 +409,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 						if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
 							newTmp[field] = qutil.Int64All(tmp[field])
 						}
+
 					} else if field == "review_experts" {
 						// 评审专家
 						if arr, ok := tmp["review_experts"].([]interface{}); ok && len(arr) > 0 {
@@ -438,6 +443,10 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 			}
 			YuceEndtime(newTmp)                      // 预测结果时间
 			newTmp["createtime"] = time.Now().Unix() // es库数据创建时间,只有增量数据有
+			if qutil.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" {
+				// 剑鱼信息发布数据 通过udp通知信息发布程序
+				go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"]))
+			}
 			arrEs = append(arrEs, newTmp)
 		}
 		if len(update) > 0 {
@@ -459,7 +468,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 			}
 			elastic.BulkSave(index, itype, &tmps, true)
 			if other_index != "" && other_itype != "" {
-				bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+				elastic.BulkSave(other_index, other_itype, &tmps, true)
 			}
 			if len(multiIndex) == 2 {
 				elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
@@ -699,3 +708,22 @@ func FormatDateStr(ds string) int64 {
 		return location.Unix()
 	}
 }
+
+type Request struct {
+	InfoId string
+}
+type Response struct {
+	Rep []map[string]interface{}
+}
+
+// @Description rpc调用信息发布程序接口
+// @Author J 2022/4/13 9:13 AM
+func UdpMethod(id string) {
+	mapinfo := map[string]interface{}{
+		"infoid": id,
+		"stype":  "jyfb_data_over",
+	}
+	datas, _ := json.Marshal(mapinfo)
+	qutil.Debug(JyUdpAddr, string(datas))
+	_ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, JyUdpAddr)
+}

+ 8 - 9
udpcreateindex/src/biddingindexback.go

@@ -212,20 +212,21 @@ func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
 					} else if field == "bidopentime" {
 						if tmp[field] != nil && tmp["bidendtime"] == nil {
 							newTmp["bidendtime"] = tmp[field]
-						}
-						if tmp[field] == nil && tmp["bidendtime"] != nil {
+							newTmp[field] = tmp[field]
+						} else if tmp[field] == nil && tmp["bidendtime"] != nil {
+							newTmp["bidendtime"] = tmp[field]
 							newTmp[field] = tmp["bidendtime"]
+						} else {
+							if tmp["bidopentime"] != nil {
+								newTmp[field] = tmp["bidopentime"]
+							}
 						}
 					} else if field == "detail" { //过滤
 						detail, _ := tmp[field].(string)
 						if len([]rune(detail)) > detailLength {
 							detail = detail[:detailLength]
 						}
-						if strings.Contains(detail, qutil.ObjToString(tmp["title"])) {
-							newTmp[field] = FilterDetail(detail)
-						} else {
-							newTmp[field] = qutil.ObjToString(tmp["title"]) + " " + FilterDetail(detail)
-						}
+						newTmp[field] = qutil.ObjToString(tmp["title"]) + " " + FilterDetail(detail)
 					} else if field == "_id" || field == "topscopeclass" { //不做处理
 						newTmp[field] = tmp[field]
 					} else if field == "publishtime" || field == "comeintime" {
@@ -233,8 +234,6 @@ func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
 						if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
 							newTmp[field] = qutil.Int64All(tmp[field])
 						}
-					} else if field == "s" {
-						newTmp[field] = tmp[field]
 					} else { //其它字段判断数据类型,不正确舍弃
 						if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
 							continue

+ 2 - 3
udpcreateindex/src/buyertask.go

@@ -8,7 +8,6 @@ import (
 	elastic "qfw/util/elastic"
 	"sync"
 	"time"
-
 )
 
 var fieldArr = []string{"institute_type", "fixedphone", "mobilephone", "latestfixedphone", "latestmobilephone", "province", "city"}
@@ -25,9 +24,9 @@ func buyerEsTaskOnce() {
 	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
 	task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime))
 	task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime))
+	//task_sid = "5e6611b7aec95406dccf714f"
+	//task_eid = "625c79bf799a3acc48890f48"
 	log.Println("buyer 区间id:", task_sid, task_eid)
-	// task_sid = "5e6611b7aec95406dccf7151"
-	// task_eid = "5f7249164bdc0447a6c90fa5"
 	//区间id
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{

+ 14 - 10
udpcreateindex/src/config.json

@@ -1,12 +1,12 @@
 {
   "udpport": ":1483",
   "msg_server": "10.171.112.160:7070",
-  "uname": "root",
-  "upwd": "root",
+  "uname": "",
+  "upwd": "",
   "mongodb": {
-    "addr": "192.168.3.207:27001",
+    "addr": "192.168.3.207:27092",
     "pool": 10,
-    "db": "qfw_data"
+    "db": "wjh"
   },
   "savedb": {
     "addr": "192.168.3.207:27092",
@@ -32,15 +32,16 @@
   "biddingback": {
     "db": "qfw_data",
     "collect": "bidding",
-    "index": "bidding_v2",
+    "index": "bidding_v1",
     "type": "bidding"
   },
   "bidding": {
     "db": "qfw_data",
     "collect": "bidding",
-    "index": "bidding_v2",
+    "index": "bidding_v1",
     "type": "bidding",
-    "extractdb": "wjh",
+    "multiIndex": "",
+    "extractdb": "qfw_data",
     "extractcollect": "extract",
     "indexfields": [
       "buyerzipcode", "winnertel", "winnerperson", "contractcode", "winneraddr", "agencyaddr", "buyeraddr", "signaturedate", "projectperiod", "projectaddr", "agencytel", "agencyperson",
@@ -74,8 +75,7 @@
     "winnerorder": "sort,sortstr,entname",
     "winnerordermap": {
       "sort": "int", "sortstr": "string", "entname": "string"
-    },
-    "multiIndex": ""
+    }
   },
   "filelength": 50000,
   "detaillength": 50000,
@@ -122,10 +122,14 @@
     }
   },
   "elastic": {
-    "addr": "http://192.168.3.206:9800",
+    "addr": "http://192.168.3.11:9800",
     "pool": 12,
     "node": "4q7v7e6mQ5aeCwjUgM6HcA"
   },
+  "jyfb_udp": {
+    "addr": "127.0.0.1",
+    "port": 11118
+  },
   "filter-keyword": ["(招标网|千里马|采招网|招标采购导航网|招标与采购网|中国招投标网|中国采购与招标网|中国采购与招标|优质采)[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}(qianlima|zhaobiao|okcis|zbytb|infobidding|bidcenter|youzhicai|chinabidding|Chinabidding|CHINABIDDING)[a-z0-9.\\/\\/]{0,40}",
     "招标网[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}zhaobiao[a-z0-9.\\/\\/]{0,40}",
     "千里马[\\w\\W]{0,15}[a-z0-9:\\/\\/.]{0,20}qianlima[a-z0-9.\\/\\/]{0,10}",

+ 13 - 6
udpcreateindex/src/main.go

@@ -12,7 +12,6 @@ import (
 	_ "net/http/pprof"
 	"qfw/util"
 	elastic "qfw/util/elastic"
-	"qfw/util/redis"
 	"strings"
 	"time"
 	u "util"
@@ -58,6 +57,8 @@ var (
 	other_index      string
 	other_itype      string
 
+	JyUdpAddr *net.UDPAddr
+
 	esAddr string
 	esNode string
 
@@ -78,12 +79,13 @@ var StopFlag = false // 程序生索引停止标志
 func init() {
 	util.ReadConfig(&Sysconfig)
 	// company_id
-	redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
-	inits()
+	//redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
+	//inits()
 	//go checkMapJob()
 	detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
 	fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
 	updport, _ = Sysconfig["updport"].(string)
+
 	winner, _ = Sysconfig["winner"].(map[string]interface{})
 	standard, _ = Sysconfig["standard"].(map[string]interface{})
 	buyer, _ = Sysconfig["buyer"].(map[string]interface{})
@@ -216,22 +218,27 @@ func init() {
 	log.Println(projectinfoFields)
 	log.Println(purchasinglistFields)
 
-	initCheckCity()
-
 	FilterKeyword = util.ObjArrToStringArr(Sysconfig["filter-keyword"].([]interface{}))
+	initCheckCity()
 	//初始化oss
 	u.InitOss()
 
+	m := Sysconfig["jyfb_udp"].(map[string]interface{})
+	JyUdpAddr = &net.UDPAddr{
+		IP:   net.ParseIP(m["addr"].(string)),
+		Port: util.IntAll(m["port"]),
+	}
 }
 
 func main() {
 	//go inspectQuery()
 	//go task_index()
-	go UpdateExtract() //抽取表中新增entidlist字段
+	//go UpdateExtract() //抽取表中新增entidlist字段
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
+
 	ch := make(chan bool, 1)
 	<-ch
 }

+ 1 - 1
udpcreateindex/src/projectindex.go

@@ -50,7 +50,7 @@ func projectTask(data []byte, project, mapInfo map[string]interface{}) {
 	count, _ := session.DB(db).C(c).Find(&q).Count()
 	savepool := make(chan bool, 10)
 
-	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index, q["pici"])
 	query := session.DB(db).C(c).Find(q).Iter()
 	arr := make([]map[string]interface{}, savesizei)
 	var n int

+ 3 - 3
udpcreateindex/src/task.go

@@ -17,9 +17,9 @@ func task_index() {
 	//c.AddFunc("0 30 * * * *", func() { task_biddingfile() }) //每30分钟执行一次
 	//c.AddFunc("0 22 14 * * *", func() { task_qyxyindex() })
 
-	_ = c.AddFunc("0 0 0 * * ?", func() { task_winneres() })   //每天凌晨执行一次winner生索引
-	_ = c.AddFunc("0 0 1 * * ?", func() { task_buyeres() })    //每天1点执行一次buyer生索引
-	_ = c.AddFunc("0 0 2 * * ?", func() { task_biddingAll() }) //每天2点执行 前一天的所有招标数据
+	_ = c.AddFunc("0 0 0 * * ?", func() { task_winneres() }) //每天凌晨执行一次winner生索引
+	_ = c.AddFunc("0 0 1 * * ?", func() { task_buyeres() })  //每天1点执行一次buyer生索引
+	//_ = c.AddFunc("0 0 2 * * ?", func() { task_biddingAll() }) //每天2点执行 前一天的所有招标数据
 	c.Start()
 }
 func task_winneres() {

+ 2 - 3
udpcreateindex/src/util/ossclient.go

@@ -3,15 +3,14 @@ package util
 
 import (
 	"fmt"
+	"github.com/aliyun/aliyun-oss-go-sdk/oss"
 	"io/ioutil"
 	"os"
 	"qfw/util"
-
-	"github.com/aliyun/aliyun-oss-go-sdk/oss"
 )
 
 var (
-	ossEndpoint        = "oss-cn-beijing.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com 测试:oss-cn-beijing.aliyuncs.com
+	ossEndpoint        = "oss-cn-beijing-internal.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com 测试:oss-cn-beijing.aliyuncs.com
 	ossAccessKeyId     = "LTAI4G5x9aoZx8dDamQ7vfZi"
 	ossAccessKeySecret = "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
 	ossBucketName      = "topjy"