Browse Source

数据锁

zhengkun 3 years ago
parent
commit
8b5abd22c1
4 changed files with 118 additions and 122 deletions
  1. 31 29
      src/fullDataRepeat.go
  2. 3 2
      src/historyRepeat.go
  3. 5 5
      src/increaseRepeat.go
  4. 79 86
      src/main.go

+ 31 - 29
src/fullDataRepeat.go

@@ -7,30 +7,32 @@ import (
 	"sync"
 	"time"
 )
+
 var timeLayout = "2006-01-02"
+
 //var timeLayout = "2006-01-02 15:04:05"
 
 //划分时间段落
 func initModelArr() []map[string]interface{} {
-	modelArr := make([]map[string]interface{},0)
+	modelArr := make([]map[string]interface{}, 0)
 	start := time.Date(2021, 12, 15, 0, 0, 0, 0, time.Local).Unix()
 	end := time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local).Unix()
 	gte_time := start
-	lt_time := start+86400
+	lt_time := start + 86400
 	log.Println("开始构建数据池...一周...")
-	FullDM = TimedTaskDatamap(dupdays, start,1)
+	FullDM = TimedTaskDatamap(dupdays, start, 1)
 	log.Println("......")
-	log.Println("开启...全量判重...",start,"~",end)
+	log.Println("开启...全量判重...", start, "~", end)
 	for {
 		modelArr = append(modelArr, map[string]interface{}{
 			"publishtime": map[string]interface{}{
 				"$gte": gte_time,
-				"$lt": lt_time,
+				"$lt":  lt_time,
 			},
 		})
 		gte_time = lt_time
-		lt_time = gte_time+86400
-		if lt_time>end {
+		lt_time = gte_time + 86400
+		if lt_time > end {
 			break
 		}
 	}
@@ -40,22 +42,22 @@ func initModelArr() []map[string]interface{} {
 //全量数据处理
 func fullDataRepeat() {
 	modelArr := initModelArr()
-	for _,query := range modelArr {
+	for _, query := range modelArr {
 		pt := *qu.ObjToMap(query["publishtime"])
 		time_str := time.Unix(qu.Int64All(pt["$gte"]), 0).Format(timeLayout)
-		dealWithfullData(query,time_str)
+		dealWithfullData(query, time_str)
 	}
 }
 
 //多线程~处理数据
-func dealWithfullData (query map[string]interface{},time_str string) {
+func dealWithfullData(query map[string]interface{}, time_str string) {
 
-	log.Println("开始处理~",time_str,"~",query)
+	log.Println("开始处理~", time_str, "~", query)
 	sess := data_mgo.GetMgoConn()
 	defer data_mgo.DestoryMongoConn(sess)
 	it := sess.DB(data_mgo.DbName).C(extract).Find(&query).Sort("publishtime").Iter()
-	total, isok ,repeatN:= 0,0,0
-	dataAllDict := make(map[string][]map[string]interface{},0)
+	total, isok, repeatN := 0, 0, 0
+	dataAllDict := make(map[string][]map[string]interface{}, 0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
 		if qu.IntAll(tmp["repeat"]) == 1 || qu.IntAll(tmp["repeat"]) == -1 {
 			tmp = make(map[string]interface{})
@@ -63,21 +65,21 @@ func dealWithfullData (query map[string]interface{},time_str string) {
 		}
 		isok++
 		subtype := qu.ObjToString(tmp["subtype"])
-		if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
-			subtype=="竞谈"||subtype=="竞价" {
+		if subtype == "招标" || subtype == "邀标" || subtype == "询价" ||
+			subtype == "竞谈" || subtype == "竞价" {
 			subtype = "招标"
 		}
 		dataArr := dataAllDict[subtype]
-		if dataArr==nil {
+		if dataArr == nil {
 			dataArr = []map[string]interface{}{}
 		}
-		dataArr = append(dataArr,tmp)
+		dataArr = append(dataArr, tmp)
 		dataAllDict[subtype] = dataArr
 		tmp = make(map[string]interface{})
 	}
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
-	for _,dataArr := range dataAllDict {
+	for _, dataArr := range dataAllDict {
 		fmt.Print("...")
 		pool <- true
 		wg.Add(1)
@@ -87,24 +89,24 @@ func dealWithfullData (query map[string]interface{},time_str string) {
 				wg.Done()
 			}()
 			num := 0
-			for _,tmp := range dataArr{
+			for _, tmp := range dataArr {
 				info := NewInfo(tmp)
-				b,source,reason := FullDM.check(info)
+				b, source, reason := FullDM.check(info)
 				if b {
 					num++
 					AddGroupPool.pool <- map[string]interface{}{
-						"_id":StringTOBsonId(info.id),
-						"repeat_id" : source.id,
-						"reason" : reason,
-						"update_time" : qu.Int64All(time.Now().Unix()),
+						"_id":         StringTOBsonId(info.id),
+						"repeat_id":   source.id,
+						"reason":      reason,
+						"update_time": qu.Int64All(time.Now().Unix()),
 					}
 				}
 			}
-			numberlock.Lock()
-			repeatN+=num
-			numberlock.Unlock()
+			numlock.Lock()
+			repeatN += num
+			numlock.Unlock()
 		}(dataArr)
 	}
 	wg.Wait()
-	log.Println("处理结束~",time_str,"总计需判重~",isok,"~重复量",repeatN)
-}
+	log.Println("处理结束~", time_str, "总计需判重~", isok, "~重复量", repeatN)
+}

+ 3 - 2
src/historyRepeat.go

@@ -161,8 +161,8 @@ func historyRepeat() {
 					b, source, reason := curTM.check(info)
 					if b { //有重复,更新
 						repeateN++
-						updatelock.Lock()
 						if judgeIsReplaceInfo(source.href, info.href) {
+							datalock.Lock()
 							temp_source_id := source.id
 							temp_info_id := info.id
 							temp_source := info
@@ -207,6 +207,8 @@ func historyRepeat() {
 							} else {
 								log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
 							}
+
+							datalock.Unlock()
 						} else {
 							Update.updatePool <- []map[string]interface{}{ //重复数据打标签
 								map[string]interface{}{
@@ -223,7 +225,6 @@ func historyRepeat() {
 								},
 							}
 						}
-						updatelock.Unlock()
 					} else {
 						Update.updatePool <- []map[string]interface{}{ //重复数据打标签
 							map[string]interface{}{

+ 5 - 5
src/increaseRepeat.go

@@ -37,7 +37,7 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 			tmp = make(map[string]interface{})
 			continue
 		}
-		if qu.IntAll(tmp["dataging"]) == 1 {
+		if qu.IntAll(tmp["dataging"]) == 1 && !IsFull {
 			tmp = make(map[string]interface{})
 			continue
 		}
@@ -89,7 +89,7 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 						num++
 						//判断是否为~替换数据~模式
 						if judgeIsReplaceInfo(source.href, info.href) {
-							updatelock.Lock()
+							datalock.Lock()
 							temp_source_id := source.id
 							temp_info_id := info.id
 							temp_source := info
@@ -125,7 +125,7 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 							} else {
 								log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
 							}
-							updatelock.Unlock()
+							datalock.Unlock()
 						} else {
 							//更新池~更新
 							Update.updatePool <- []map[string]interface{}{ //重复数据打标签
@@ -144,9 +144,9 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 					}
 				}
 			}
-			numberlock.Lock()
+			numlock.Lock()
 			repeatN += num
-			numberlock.Unlock()
+			numlock.Unlock()
 		}(dataArr)
 	}
 	wg.Wait()

+ 79 - 86
src/main.go

@@ -20,50 +20,34 @@ import (
 
 var (
 	Sysconfig                          map[string]interface{} //配置文件
-	mconf                              map[string]interface{} //mongodb配置信息
-	data_mgo                           *MongodbSim            //mongodb操作对象
-	task_mgo                           *MongodbSim            //mongodb操作对象
+	mconf                              map[string]interface{}
+	data_mgo, task_mgo                 *MongodbSim
 	task_collName, task_bidding        string
 	extract, extract_back, extract_log string
-	udpclient                          mu.UdpClient             //udp对象
-	nextNode                           []map[string]interface{} //下节点数组
-	dupdays                            = 7                      //初始化判重范围
-	DM                                 *datamap                 //
+	udpclient                          mu.UdpClient
+	nextNode                           []map[string]interface{}
+	dupdays                            = 7
+	DM, FullDM                         *datamap
 	Update                             *updateInfo
 	AddGroupPool                       *addGroupInfo
-	FullDM                             *datamap //\临时全量数据池
 	//正则筛选相关
-	FilterRegTitle                    = regexp.MustCompile("^_$")
-	FilterRegTitle_0                  = regexp.MustCompile("^_$")
-	FilterRegTitle_1                  = regexp.MustCompile("^_$")
-	FilterRegTitle_2                  = regexp.MustCompile("^_$")
-	threadNum                         int                               //线程数量
-	SiteMap                           map[string]map[string]interface{} //站点map
-	LowHeavy                          bool                              //低质量数据判重
-	TimingTask                        bool                              //是否定时任务
-	timingSpanDay                     int64                             //时间跨度
-	timingPubScope                    int64                             //发布时间周期
-	gtid, lastid, sec_gtid, sec_lteid string                            //命令输入
-	lteid                             string                            //历史增量属性
-	IsFull                            bool                              //是否全量
-	updatelock                        sync.Mutex                        //锁4
-	numberlock                        sync.Mutex                        //锁4
-	userName, passWord                string                            //mongo -用户密码
-	jyfb_data                         map[string]string                 //任务池
-	taskList                          []map[string]interface{}          //任务池
-	isUpdateSite                      bool
-	MP                                *nsqdata.Producer
+	FilterRegTitle                             = regexp.MustCompile("^_$")
+	FilterRegTitle_0                           = regexp.MustCompile("^_$")
+	FilterRegTitle_1                           = regexp.MustCompile("^_$")
+	FilterRegTitle_2                           = regexp.MustCompile("^_$")
+	threadNum                                  int
+	SiteMap                                    map[string]map[string]interface{}
+	LowHeavy, TimingTask, IsFull, isUpdateSite bool
+	timingSpanDay, timingPubScope              int64
+	gtid, lastid, sec_gtid, sec_lteid, lteid   string
+	updatelock, datalock, numlock              sync.Mutex
+	userName, passWord                         string
+	jyfb_data                                  map[string]string
+	taskList                                   []map[string]interface{}
+	MP                                         *nsqdata.Producer
 )
 
-//初始化加载
-func init() {
-	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
-	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")   //历史
-	flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
-	flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
-	flag.Parse()
-
-	qu.ReadConfig(&Sysconfig)
+func initMgo() {
 	userName = qu.ObjToString(Sysconfig["userName"])
 	passWord = qu.ObjToString(Sysconfig["passWord"])
 	log.Println("集群用户密码:", userName, passWord)
@@ -99,13 +83,6 @@ func init() {
 	extract_back = mconf["extract_back"].(string)
 	extract_log = mconf["extract_log"].(string)
 
-	dupdays = qu.IntAllDef(Sysconfig["dupdays"], 5)
-	//加载数据
-	DM = NewDatamap(dupdays, lastid)
-	//更新池
-	Update = newUpdatePool()
-	go Update.updateData()
-
 	FilterRegTitle = regexp.MustCompile(qu.ObjToString(Sysconfig["specialwords"]))
 	FilterRegTitle_0 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_0"]))
 	FilterRegTitle_1 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_1"]))
@@ -115,22 +92,25 @@ func init() {
 	TimingTask = Sysconfig["timingTask"].(bool)
 	timingSpanDay = qu.Int64All(Sysconfig["timingSpanDay"])
 	timingPubScope = qu.Int64All(Sysconfig["timingPubScope"])
+}
+func initOther() {
+	dupdays = qu.IntAllDef(Sysconfig["dupdays"], 5)
+	DM = NewDatamap(dupdays, lastid)
+	Update = newUpdatePool()
+	go Update.updateData()
 
 	var err error
 	MP, err = nsqdata.NewProducer("192.168.3.166:4150", "testnsq", true)
 	if err != nil {
 		log.Fatal("通道配置异常~", err)
 	}
+
 	c := cron.New()
 	c.AddFunc("0 0 1 ? * WED", func() {
 		isUpdateSite = true
 	})
 	c.Start()
-	//站点配置
-	initSite()
 }
-
-//初始化站点信息
 func initSite() {
 	site := mconf["site"].(map[string]interface{})
 	SiteMap = make(map[string]map[string]interface{}, 0)
@@ -150,6 +130,57 @@ func initSite() {
 	log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
 }
 
+//初始化加载
+func init() {
+	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
+	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")   //历史
+	flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
+	flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
+	flag.Parse()
+
+	qu.ReadConfig(&Sysconfig)
+	initMgo()
+	initOther()
+	initSite()
+}
+
+func mainT() {
+	IsFull = true
+	//AddGroupPool = newAddGroupPool()
+	//go AddGroupPool.addGroupData()
+	//fullDataRepeat() //全量判重
+
+	//increaseRepeat(map[string]interface{}{
+	//	"gtid":  "12ec61170ae152a3c2310f02",
+	//	"lteid": "92ec61170ae152a3c2310f02",
+	//})
+
+	//gtid = "62ec2dd00ae152a3c230c1a1"
+	//lteid = "62ec2dd00ae152a3c230c1e1"
+	//historyRepeat()
+
+	time.Sleep(99999 * time.Hour)
+}
+
+//主函数
+func main() {
+	go checkMapJob()
+	updport := Sysconfig["udpport"].(string)
+	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Println("Udp服务监听", updport)
+	if TimingTask {
+		log.Println("正常历史部署")
+		go historyRepeat()
+	} else {
+		if !IsFull { //正常增量
+			log.Println("正常增量部署,监听任务")
+			go getRepeatTask()
+		}
+	}
+	time.Sleep(99999 * time.Hour)
+}
+
 //udp接收
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
@@ -200,41 +231,3 @@ func getRepeatTask() {
 		}
 	}
 }
-
-func mainT() {
-	IsFull = true
-
-	//AddGroupPool = newAddGroupPool()
-	//go AddGroupPool.addGroupData()
-	//fullDataRepeat() //全量判重
-
-	//increaseRepeat(map[string]interface{}{
-	//	"gtid":  "12ec61170ae152a3c2310f02",
-	//	"lteid": "92ec61170ae152a3c2310f02",
-	//})
-
-	//gtid = "62ec2dd00ae152a3c230c1a1"
-	//lteid = "62ec2dd00ae152a3c230c1e1"
-	//historyRepeat()
-
-	time.Sleep(99999 * time.Hour)
-}
-
-//主函数
-func main() {
-	go checkMapJob()
-	updport := Sysconfig["udpport"].(string)
-	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
-	udpclient.Listen(processUdpMsg)
-	log.Println("Udp服务监听", updport)
-	if TimingTask {
-		log.Println("正常历史部署")
-		go historyRepeat()
-	} else {
-		if !IsFull { //正常增量
-			log.Println("正常增量部署,监听任务")
-			go getRepeatTask()
-		}
-	}
-	time.Sleep(99999 * time.Hour)
-}