소스 검색

no message

Jianghan 3 년 전
부모
커밋
0c1efebd66
10개의 변경된 파일78개의 추가작업 그리고 122개의 파일을 삭제
  1. 4 4
      bidding_data/config.json
  2. 46 7
      bidding_data/main.go
  3. 10 3
      bidding_data/task.go
  4. 3 4
      es/biddingindex.go
  5. 1 2
      es/config.json
  6. 2 2
      es/main.go
  7. 1 1
      export_excel/main.go
  8. 7 7
      project_data/main.go
  9. 2 90
      project_data/task.go
  10. 2 2
      qyxy_es/config.json

+ 4 - 4
bidding_data/config.json

@@ -6,12 +6,12 @@
   "qyxyColl": "oprd_qyxy",
   "uname": "SJZY_RWMIX_Other",
   "upwd": "SJZY@M34I6x7D9ata",
-  "tagFile": "/Users/jianghan/Desktop/20210123标签规则.xlsx",
+  "tagFile": "/Users/jianghan/Desktop/20220311标签规则.xlsx",
   "operators": "移动,中移在线,中移系统,中移,铁通,联通,联合网络通信,电信,天翼视讯,天翼电信,广电网络,中国广电,湖北广电",
   "checkDb": {
-    "addr": "192.168.3.166:27082",
-    "dbname": "guangdongyidong",
+    "addr": "192.168.3.207:27092",
+    "dbname": "wjh",
     "dbsize": 5,
-    "dbcoll": "20220127Gdyd_5_13"
+    "dbcoll": "oprd_bidding"
   }
 }

+ 46 - 7
bidding_data/main.go

@@ -14,11 +14,12 @@ var (
 	Dbname, Dbcoll     string
 	collSave, qyxyColl string
 
-	savePool chan map[string]interface{}
-	saveSp   chan bool
-
-	filePath string
-	tagArr   []string
+	savePool   chan map[string]interface{}
+	saveSp     chan bool
+	updatePool chan []map[string]interface{}
+	updateSp   chan bool
+	filePath   string
+	tagArr     []string
 
 	operators    []string
 	TagMatchRule = map[string][]TagMatching{}
@@ -49,6 +50,8 @@ func init() {
 	filePath = util.ObjToString(Sysconfig["tagFile"])
 	savePool = make(chan map[string]interface{}, 5000)
 	saveSp = make(chan bool, 5)
+	updatePool = make(chan []map[string]interface{}, 5000)
+	updateSp = make(chan bool, 5)
 
 	operators = strings.Split(util.ObjToString(Sysconfig["operators"]), ",")
 	initExcel(filePath)
@@ -56,7 +59,8 @@ func init() {
 }
 
 func main() {
-	go saveMethod()
+	//go saveMethod()
+	go updateMethod()
 
 	sess := Mgo1.GetMgoConn()
 	defer Mgo1.DestoryMongoConn(sess)
@@ -64,7 +68,7 @@ func main() {
 	ch := make(chan bool, 3)
 	wg := &sync.WaitGroup{}
 
-	//q := map[string]interface{}{"id": "61bc59f606a9d911e5ba6119"}
+	//q := map[string]interface{}{"id": "61547b9f1a75b8f4469b7f90"}
 	query := sess.DB(Dbname).C(Dbcoll).Find(nil).Select(nil).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
@@ -123,3 +127,38 @@ func saveMethod() {
 		}
 	}
 }
+
+func updateMethod() {
+	arru := make([][]map[string]interface{}, 200)
+	indexu := 0
+	for {
+		select {
+		case v := <-updatePool:
+			arru[indexu] = v
+			indexu++
+			if indexu == 200 {
+				updateSp <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-updateSp
+					}()
+					Mgo.UpSertBulk(collSave, arru...)
+				}(arru)
+				arru = make([][]map[string]interface{}, 200)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				updateSp <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-updateSp
+					}()
+					Mgo.UpSertBulk(collSave, arru...)
+				}(arru[:indexu])
+				arru = make([][]map[string]interface{}, 200)
+				indexu = 0
+			}
+		}
+	}
+}

+ 10 - 3
bidding_data/task.go

@@ -32,7 +32,7 @@ func taskinfo(info map[string]interface{}) {
 					if val := util.ObjToString(info[field]); val != "" {
 						for _, e1 := range matching.excludeKeyReg {
 							if e1.regs.MatchString(val) {
-								break M
+								continue
 							}
 						}
 					}
@@ -169,9 +169,16 @@ func taskinfo(info map[string]interface{}) {
 		//	}
 		//}
 	}
-	info["_id"] = mongodb.StringTOBsonId(id)
+	//info["_id"] = mongodb.StringTOBsonId(id)
+	delete(info, "_id")
 	info["updatetime"] = time.Now().Unix()
-	savePool <- info
+	//savePool <- info
+	update := []map[string]interface{}{{
+		"_id": mongodb.StringTOBsonId(id),
+	},
+		{"$set": info},
+	}
+	updatePool <- update
 }
 
 func periodMethod(str interface{}) int {

+ 3 - 4
es/biddingindex.go

@@ -3,7 +3,6 @@ package main
 import (
 	"go.mongodb.org/mongo-driver/bson"
 	"log"
-	"mongodb"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
 	"reflect"
@@ -32,15 +31,15 @@ func biddingTask() {
 	db := qutil.ObjToString(bidding["db"])
 	coll := qutil.ObjToString(bidding["collect"])
 	//q := map[string]interface{}{"updatetime": map[string]interface{}{"$gt": 1643262000}}
-	q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5db7c324a5cb26b9b78b0f09")}
-	count, _ := session.DB(db).C(coll).Find(&q).Count()
+	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5db7c324a5cb26b9b78b0f09")}
+	count, _ := session.DB(db).C(coll).Find(nil).Count()
 	index := qutil.ObjToString(bidding["index"])
 	stype := qutil.ObjToString(bidding["type"])
 	//线程池
 	UpdatesLock := sync.Mutex{}
 	qutil.Debug("查询语句:", nil, "同步总数:", count, "elastic库:")
 	//查询招标数据
-	query := session.DB(db).C(coll).Find(&q).Select(bson.M{
+	query := session.DB(db).C(coll).Find(nil).Select(bson.M{
 		"projectinfo.attachment": 0,
 		"contenthtml":            0,
 		"publishdept":            0,

+ 1 - 2
es/config.json

@@ -56,7 +56,6 @@
   },
   "elastic": {
     "addr": "http://192.168.3.206:9800",
-    "pool": 12,
-    "node": "4q7v7e6mQ5aeCwjUgM6HcA"
+    "pool": 12
   }
 }

+ 2 - 2
es/main.go

@@ -109,8 +109,8 @@ func init() {
 
 func main() {
 
-	//biddingTask()
-	projectTask()
+	biddingTask()
+	//projectTask()
 	ch := make(chan bool, 1)
 	<-ch
 }

+ 1 - 1
export_excel/main.go

@@ -71,7 +71,7 @@ func main() {
 					row.AddCell().SetValue("")
 				}
 			} else if v == "jyhref2" {
-				if util.ObjToString(tmp["bidtype"]) == "招标" {
+				if util.ObjToString(tmp["bidtype"]) == "招标" || util.ObjToString(tmp["bidtype"]) == "单一" {
 					list := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
 					id := list[0]
 					row.AddCell().SetValue(fmt.Sprintf(JYHref+"%s.html", util.CommonEncodeArticle("content", id)))

+ 7 - 7
project_data/main.go

@@ -2,18 +2,18 @@ package main
 
 import (
 	"qfw/util"
-	"qfw/util/elastic"
+	"sync"
 	"time"
 )
 
-var (
-	SingleClear = 0
-	Es          *elastic.Elastic
-	Index       string
-	Itype       string
-)
+var wait sync.WaitGroup
 
 func main() {
+	P_QL = NewPT()
+	//wait.Add(1)
+	go P_QL.updateAllQueue()
+	//go P_QL.clearMem()
+
 	if Sysconfig["loadStart"] != nil {
 		loadStart := util.Int64All(Sysconfig["loadStart"])
 		if loadStart > -1 {

+ 2 - 90
project_data/task.go

@@ -3,7 +3,6 @@ package main
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/robfig/cron"
 	"log"
 	"mongodb"
 	"qfw/util"
@@ -72,13 +71,6 @@ func NewPT() *ProjectTask {
 var P_QL *ProjectTask
 var sp = make(chan bool, 5)
 
-//初始化全量合并对象
-func init() {
-	P_QL = NewPT()
-	go P_QL.updateAllQueue()
-	//go P_QL.clearMem()
-}
-
 func (p *ProjectTask) updateAllQueue() {
 	arru := make([][]map[string]interface{}, p.saveSize)
 	indexu := 0
@@ -114,86 +106,6 @@ func (p *ProjectTask) updateAllQueue() {
 	}
 }
 
-//项目合并内存更新
-func (p *ProjectTask) clearMem() {
-	c := cron.New()
-	//在内存中保留最近6个月的信息
-	//跑全量时每5分钟跑一次,跑增量时400分钟跑一次
-	_ = c.AddFunc("50 0/5 * * * *", func() {
-		if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
-			SingleClear = 1
-			//跳过的次数清零
-			p.clearContimes = 0
-			//信息进入查找对比全局锁
-			p.findLock.Lock()
-			//defer p.findLock.Unlock()
-			//合并进行的任务都完成
-			p.wg.Wait()
-			//遍历id
-			//所有内存中的项目信息
-			p.AllIdsMapLock.Lock()
-			log.Println("清除开始")
-			//清除计数
-			clearNum := 0
-			for k, v := range p.AllIdsMap {
-				if p.currentTime-v.P.LastTime > p.validTime {
-					clearNum++
-					//删除id的map
-					delete(p.AllIdsMap, k)
-					//删除pb
-					if v.P.Buyer != "" {
-						ids := p.mapPb[v.P.Buyer]
-						if ids != nil {
-							ids.Lock.Lock()
-							ids.Arr = deleteSlice(ids.Arr, k, "pb")
-							if len(ids.Arr) == 0 {
-								delete(p.mapPb, v.P.Buyer)
-							}
-							ids.Lock.Unlock()
-						}
-					}
-					//删除mapPn
-					for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
-						if vn != "" {
-							ids := p.mapPn[vn]
-							if ids != nil {
-								ids.Lock.Lock()
-								ids.Arr = deleteSlice(ids.Arr, k, "pn")
-								if len(ids.Arr) == 0 {
-									delete(p.mapPn, vn)
-								}
-								ids.Lock.Unlock()
-							}
-						}
-					}
-					//删除mapPc
-					for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
-						if vn != "" {
-							ids := p.mapPc[vn]
-							if ids != nil {
-								ids.Lock.Lock()
-								ids.Arr = deleteSlice(ids.Arr, k, "pc")
-								if len(ids.Arr) == 0 {
-									delete(p.mapPc, vn)
-								}
-								ids.Lock.Unlock()
-							}
-						}
-					}
-					v = nil
-				}
-			}
-			p.AllIdsMapLock.Unlock()
-			p.findLock.Unlock()
-			SingleClear = 0
-			log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb))
-		} else {
-			p.clearContimes++
-		}
-	})
-	c.Start()
-}
-
 //全量合并
 func (p *ProjectTask) taskQl() {
 	defer util.Catch()
@@ -248,13 +160,13 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 	p.Brun = true
 	count := 0
 
-	q = map[string]interface{}{"updatetime": map[string]interface{}{"$gt": 1643262000}}
+	//q = map[string]interface{}{"updatetime": map[string]interface{}{"$gt": 0}}
 	util.Debug("start project", q, p.pici)
 	sess := MongoTool.GetMgoConn()
 	defer MongoTool.DestoryMongoConn(sess)
 
 	fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0}
-	ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
+	ms := sess.DB(db).C(coll).Find(nil).Select(fields).Sort("publishtime")
 	query := ms.Iter()
 	var lastid interface{}
 

+ 2 - 2
qyxy_es/config.json

@@ -3,8 +3,8 @@
   "dbsize": 12,
   "dbname": "wjh",
   "dbcoll": "oprd_qyxy",
-  "uname": "SJZY_RWMIX_Other",
-  "upwd": "SJZY@M34I6x7D9ata",
+  "uname": "",
+  "upwd": "",
   "tasktime": 0,
   "updatetime": 0,
   "elastic": {