Browse Source

Merge branch 'dev3.3' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.3

fengweiqiang 5 years ago
parent
commit
3a3616da44

+ 2 - 1
fullproject/src/config.json

@@ -1,5 +1,6 @@
 {
-    "mongodbServers": "192.168.3.207:27082",
+    "loadStart":-1,
+	"mongodbServers": "192.168.3.207:27082",
     "mongodbPoolSize": 10,
     "mongodbName": "cesuo",
     "extractColl": "key1_biddingall",

+ 1 - 0
fullproject/src/init.go

@@ -60,6 +60,7 @@ func init() {
 		DbName:      Sysconfig["mongodbName"].(string),
 	}
 	MongoTool.InitPool()
+
 	ExtractColl = Sysconfig["extractColl"].(string)
 	ProjectColl = Sysconfig["projectColl"].(string)
 	NextNode = Sysconfig["nextNode"].([]interface{})

+ 1 - 0
fullproject/src/load_data.go

@@ -8,6 +8,7 @@ import (
 //初始加载数据,默认加载最近6个月的数据
 
 func (p *ProjectTask) loadData(starttime int64) {
+	log.Println("load start..", starttime)
 	p.findLock.Lock()
 	defer p.findLock.Unlock()
 	p.AllIdsMapLock.Lock()

+ 9 - 1
fullproject/src/main.go

@@ -5,6 +5,7 @@ import (
 	"log"
 	mu "mfw/util"
 	"net"
+	"qfw/util"
 	"time"
 )
 
@@ -20,7 +21,12 @@ func main() {
 	//udp强制合并  信息id1,id2,id3 [项目id] 不存在时新建  qzhb
 	//udp强制拆分  项目id,信息id1,id2          qzcf
 	//udp重新合并  信息id1,id2,id3             cxhb
-	P_QL.loadData(0)
+	if Sysconfig["loadStart"] != nil {
+		loadStart := util.Int64All(Sysconfig["loadStart"])
+		if loadStart > -1 {
+			P_QL.loadData(loadStart)
+		}
+	}
 	time.Sleep(99999 * time.Hour)
 }
 
@@ -49,6 +55,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 						<-SingleThread
 					}()
 					P_QL.currentType = tasktype
+					P_QL.pici = time.Now().Unix()
 					P_QL.taskQl(mapInfo)
 				}()
 			case "project": //增量合并,未抽取到项目名称或项目编号的不合并  bidding中mergestatus 1已合并 2字段问题不合并 3历史待合并
@@ -58,6 +65,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 						<-SingleThread
 					}()
 					P_QL.currentType = tasktype
+					P_QL.pici = time.Now().Unix()
 					P_QL.taskZl(mapInfo)
 				}()
 

+ 21 - 9
fullproject/src/project.go

@@ -393,7 +393,7 @@ var FIELDS = []string{
 	"winner",
 	"budget",
 	"bidamount",
-	"bidstatus",
+	//"bidstatus",
 	"agency",
 	"projectscope",
 	"bidopentime",
@@ -412,6 +412,13 @@ var bidtype = map[string]string{
 	"竞价": "竞价",
 }
 
+var bidstatus = map[string]string{
+	"中标": "中标",
+	"成交": "成交",
+	"废标": "废标",
+	"流标": "流标",
+}
+
 //招标时间zbtime、中标时间jgtime、项目状态bidstatus、招标类型bidtype、最后发布时间lasttime、首次发布时间firsttime
 
 func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (string, *ProjectInfo) {
@@ -446,6 +453,10 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 		bt = "招标"
 	}
 	set["bidtype"] = bt
+	bs, _ := tmp["bidstatus"].(string)
+	if bidstatus[bs] != "" {
+		set["bidstatus"] = bs
+	}
 	if set["bidstatus"] == nil && thisinfo.TopType == "结果" {
 		set["bidstatus"] = thisinfo.SubType
 	}
@@ -461,14 +472,15 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 	set["list"] = []bson.M{
 		push,
 	}
-	p.updatePool <- []map[string]interface{}{
-		map[string]interface{}{
-			"_id": pId,
-		},
-		map[string]interface{}{
-			"$set": set,
-		},
-	}
+	p.savePool <- set
+	//	[]map[string]interface{}{
+	//		map[string]interface{}{
+	//			"_id": pId,
+	//		},
+	//		map[string]interface{}{
+	//			"$set": ,
+	//		},
+	//	}
 	return pId.Hex(), &p1
 }
 

+ 117 - 37
fullproject/src/task.go

@@ -36,7 +36,9 @@ type ProjectTask struct {
 	//采购单位、项目名称、项目编号
 	mapPb, mapPn, mapPc map[string]*Key
 	//更新或新增通道
-	updatePool chan []map[string]interface{}
+	updatePool           chan []map[string]interface{}
+	savePool             chan map[string]interface{}
+	saveSign, updateSign chan bool
 	//表名
 	coll string
 	//当前状态是全量还是增量
@@ -55,13 +57,16 @@ func NewPT() *ProjectTask {
 		InitMinTime: int64(1325347200),
 		name:        "全/增量对象",
 		thread:      4,
-		updatePool:  make(chan []map[string]interface{}, 2000),
+		updatePool:  make(chan []map[string]interface{}, 1000),
+		savePool:    make(chan map[string]interface{}, 2000),
 		wg:          sync.WaitGroup{},
-		AllIdsMap:   make(map[string]*ID, 10000000),
-		mapPb:       make(map[string]*Key, 3000000),
-		mapPn:       make(map[string]*Key, 10000000),
-		mapPc:       make(map[string]*Key, 10000000),
+		AllIdsMap:   make(map[string]*ID, 100000),
+		mapPb:       make(map[string]*Key, 1000000),
+		mapPn:       make(map[string]*Key, 1000000),
+		mapPc:       make(map[string]*Key, 1500000),
 		saveSize:    200,
+		saveSign:    make(chan bool, 1),
+		updateSign:  make(chan bool, 1),
 		coll:        ProjectColl,
 	}
 }
@@ -71,30 +76,65 @@ var P_QL *ProjectTask
 //初始化全量合并对象
 func init() {
 	P_QL = NewPT()
+	go P_QL.saveQueue()
 	go P_QL.updateQueue()
 	go P_QL.clearMem()
+}
 
+func (p *ProjectTask) saveQueue() {
+	arr := make([]map[string]interface{}, p.saveSize)
+	indexs := 0
+	for {
+		select {
+		case <-p.saveSign:
+			if indexs > 0 {
+				MongoTool.SaveBulk(p.coll, arr[:indexs]...)
+				arr = make([]map[string]interface{}, p.saveSize)
+				indexs = 0
+			}
+			p.updateSign <- true
+		case v := <-p.savePool:
+			arr[indexs] = v
+			indexs++
+			if indexs == p.saveSize {
+				MongoTool.SaveBulk(p.coll, arr...)
+				arr = make([]map[string]interface{}, p.saveSize)
+				indexs = 0
+			}
+		case <-time.After(100 * time.Millisecond):
+			if indexs > 0 {
+				MongoTool.SaveBulk(p.coll, arr[:indexs]...)
+				arr = make([]map[string]interface{}, p.saveSize)
+				indexs = 0
+			}
+		}
+	}
 }
 
 //项目保存和更新通道
 func (p *ProjectTask) updateQueue() {
-	arr := make([][]map[string]interface{}, p.saveSize)
-	index := 0
+	arru := make([][]map[string]interface{}, p.saveSize)
+	indexu := 0
 	for {
 		select {
 		case v := <-p.updatePool:
-			arr[index] = v
-			index++
-			if index == p.saveSize {
-				MongoTool.UpSertBulk(p.coll, arr...)
-				arr = make([][]map[string]interface{}, p.saveSize)
-				index = 0
+			arru[indexu] = v
+			indexu++
+			if indexu == p.saveSize {
+				//更新之前先保存
+				p.saveSign <- true
+				<-p.updateSign
+				MongoTool.UpdateBulk(p.coll, arru...)
+				arru = make([][]map[string]interface{}, p.saveSize)
+				indexu = 0
 			}
-		case <-time.After(2 * time.Second):
-			if index > 0 {
-				MongoTool.UpSertBulk(p.coll, arr[:index]...)
-				arr = make([][]map[string]interface{}, p.saveSize)
-				index = 0
+		case <-time.After(100 * time.Millisecond):
+			if indexu > 0 {
+				p.saveSign <- true
+				<-p.updateSign
+				MongoTool.UpdateBulk(p.coll, arru[:indexu]...)
+				arru = make([][]map[string]interface{}, p.saveSize)
+				indexu = 0
 			}
 		}
 	}
@@ -106,8 +146,8 @@ func (p *ProjectTask) clearMem() {
 	//在内存中保留最近6个月的信息
 	validTime := int64(6 * 30 * 86400)
 	//跑全量时每4分钟跑一次,跑增量时400分钟跑一次
-	c.AddFunc("50 0/4 * * * *", func() {
-		if p.currentType == "ql" || p.clearContimes >= 100 {
+	c.AddFunc("50 0/10 * * * *", func() {
+		if p.currentType == "ql" || p.clearContimes >= 60 {
 			//跳过的次数清零
 			p.clearContimes = 0
 			//信息进入查找对比全局锁
@@ -224,6 +264,7 @@ func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
 		}
 	}
 	//生成查询语句执行
+	log.Println("查询语句:", q)
 	p.enter(db, coll, q)
 
 }
@@ -286,35 +327,74 @@ func nextNode(gtid, lteid, stype string, pici int64) {
 
 func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 	defer util.Catch()
+	count, taskcount := 0, 0
+	//var lastid interface{}
+	//	for {
+	//		if lastid != nil {
+	//			if q == nil {
+	//				q = map[string]interface{}{}
+	//			}
+	//			if q["_id"] == nil {
+	//				q["_id"] = map[string]interface{}{}
+	//			}
+	//			q["_id"].(map[string]interface{})["$gt"] = lastid
+	//		}
+	pool := make(chan bool, p.thread)
+	log.Println("start project", q)
 	sess := MongoTool.GetMgoConn()
 	defer MongoTool.DestoryMongoConn(sess)
-	query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
-	pool := make(chan bool, p.thread)
-	count := 0
+	query := sess.DB(db).C(coll).Find(q).Sort("_id").Select(map[string]interface{}{
+		"blocks":   0,
+		"fieldall": 0,
+	}).Iter()
+	//over := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
-		info := ParseInfo(tmp)
-		if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
-			pool <- true
-			go func(info *Info, tmp map[string]interface{}) {
-				defer func() {
-					p.currentTime = info.Publishtime
-					<-pool
-				}()
-				p.startProjectMerge(info, tmp)
-			}(info, tmp)
-		} else {
-			//信息错误,进行更新
+		if util.IntAll(tmp["repeat"]) == 0 {
+			info := ParseInfo(tmp)
+			if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
+				pool <- true
+				taskcount++
+				go func(info *Info, tmp map[string]interface{}) {
+					defer func() {
+						p.currentTime = info.Publishtime
+						<-pool
+					}()
+					p.startProjectMerge(info, tmp)
+				}(info, tmp)
+			} else {
+				//信息错误,进行更新
+			}
 		}
 		if count%1000 == 0 {
 			log.Println("current", count)
 		}
+		//			if taskcount > 0 && taskcount%50000 == 0 { //歇歇
+		//				log.Println("pause start..", taskcount)
+		//				for n := 0; n < p.thread; n++ {
+		//					pool <- true
+		//				}
+		//				for n := 0; n < p.thread; n++ {
+		//					<-pool
+		//				}
+		//				log.Println("pause over..")
+		//			}
+		//lastid = tmp["_id"]
 		tmp = make(map[string]interface{})
+		//		if count > 40000 {
+		//			query.Close()
+		//			break
+		//		}
+		//over++
 	}
 	//阻塞
 	for n := 0; n < p.thread; n++ {
 		pool <- true
 	}
-	log.Println("所有线程执行完成...", count)
+	//		if over == 0 {
+	//			break
+	//		}
+	//}
+	log.Println("所有线程执行完成...", count, taskcount)
 
 }