wangchuanjin 8 mēneši atpakaļ
vecāks
revīzija
b6b07dac52

+ 2 - 0
data_analysis/entity/entity.go

@@ -35,6 +35,7 @@ func sync_add(fromTable string, saveField []string, id int64, f func(fv map[stri
 	} else {
 		q = fmt.Sprintf(`select * from %s where id>%d order by id`, fromTable, id)
 	}
+	Mysql_Main.ExecBySql(`SET session max_execution_time=86400`)
 	Mysql_Main.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
 		for _, v := range *l {
 			index++
@@ -66,6 +67,7 @@ func sync_update(fromTable, start_layout, end_layout string, saveFields []string
 	log.Println("开始同步更新", toTable, "表 。。。")
 	index := 0
 	array := [][]interface{}{}
+	Mysql_Main.ExecBySql(`SET session max_execution_time=86400`)
 	Mysql_Main.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
 		for _, v := range *l {
 			index++

+ 65 - 51
data_analysis/entity/finaljob.go

@@ -55,6 +55,7 @@ func (f *FinalJob) LoadDatas(toTable string) {
 	f.All = map[int64]string{}
 	reTry := 0
 	for {
+		Mysql_Main.ExecBySql(`SET session max_execution_time=86400`)
 		Mysql_Main.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
 			for _, v := range *l {
 				index++
@@ -109,63 +110,76 @@ func (f *FinalJob) Run(start_layout, end_layout string) {
 	if f.All == nil || len(f.All) == 0 {
 		f.LoadDatas(toTable)
 	}
-	index := 0
-	var addIndex int64
-	var updateIndex int64
-	pool := make(chan bool, Config.UpdatePool)
-	wait := &sync.WaitGroup{}
-	lock := &sync.Mutex{}
 	query := `select ` + strings.Join(f.SaveFields(), ",") + ` from ` + f.TableName()
 	if start_layout != "" && end_layout != "" && f.IncField != "" {
 		query += ` where ` + f.IncField + `>='` + start_layout + `' and ` + f.IncField + `<'` + end_layout + `'`
 	}
-	Mysql_Main.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
-		for _, v := range *l {
-			index++
-			var result strings.Builder
-			array := []interface{}{}
-			for _, field := range f.SaveFields() {
-				result.WriteString(fmt.Sprintf("%v", v[field]))
-				array = append(array, v[field])
-			}
-			resultMd5 := GetMd5String(result.String())
-			id := Int64All(v["id"])
-			pool <- true
-			wait.Add(1)
-			go func(delInsertId int64, delInsertArr []interface{}) {
-				defer func() {
-					<-pool
-					wait.Done()
-				}()
-				lock.Lock()
-				allVal := f.All[id]
-				lock.Unlock()
-				if allVal == "" {
-					r1, r2 := Mysql_Main.InsertBatch(toTable, f.SaveFields(), delInsertArr)
-					if r1 > 0 && r2 > 0 {
-						lock.Lock()
-						f.All[id] = resultMd5
-						lock.Unlock()
-						atomic.AddInt64(&addIndex, 1)
-					}
-				} else if allVal != resultMd5 {
-					if Mysql_Main.ExecTx("", func(tx *sql.Tx) bool {
-						r3 := Mysql_Main.UpdateOrDeleteBySqlByTx(tx, `delete from `+toTable+" where id=?", delInsertId)
-						r1, r2 := Mysql_Main.InsertBatchByTx(tx, toTable, f.SaveFields(), delInsertArr)
-						return r1 > 0 && r2 > 0 && r3 > 0
-					}) {
-						lock.Lock()
-						f.All[id] = resultMd5
-						lock.Unlock()
-						atomic.AddInt64(&updateIndex, 1)
+	thisIndex := 0
+	for {
+		thisIndex++
+		index := 0
+		var addSuccessIndex, addFailIndex, updateSuccessIndex, updateFailIndex int64
+		pool := make(chan bool, Config.UpdatePool)
+		wait := &sync.WaitGroup{}
+		lock := &sync.Mutex{}
+		Mysql_Main.ExecBySql(`SET session max_execution_time=86400`)
+		Mysql_Main.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
+			for _, v := range *l {
+				index++
+				var result strings.Builder
+				array := []interface{}{}
+				for _, field := range f.SaveFields() {
+					result.WriteString(fmt.Sprintf("%v", v[field]))
+					array = append(array, v[field])
+				}
+				resultMd5 := GetMd5String(result.String())
+				id := Int64All(v["id"])
+				pool <- true
+				wait.Add(1)
+				go func(delInsertId int64, delInsertArr []interface{}) {
+					defer func() {
+						<-pool
+						wait.Done()
+					}()
+					lock.Lock()
+					allVal := f.All[id]
+					lock.Unlock()
+					if allVal == "" {
+						r1, r2 := Mysql_Main.InsertBatch(toTable, f.SaveFields(), delInsertArr)
+						if r1 > 0 && r2 > 0 {
+							lock.Lock()
+							f.All[id] = resultMd5
+							lock.Unlock()
+							atomic.AddInt64(&addSuccessIndex, 1)
+						} else {
+							atomic.AddInt64(&addFailIndex, 1)
+						}
+					} else if allVal != resultMd5 {
+						if Mysql_Main.ExecTx("", func(tx *sql.Tx) bool {
+							r3 := Mysql_Main.UpdateOrDeleteBySqlByTx(tx, `delete from `+toTable+" where id=?", delInsertId)
+							r1, r2 := Mysql_Main.InsertBatchByTx(tx, toTable, f.SaveFields(), delInsertArr)
+							return r1 > 0 && r2 > 0 && r3 > 0
+						}) {
+							lock.Lock()
+							f.All[id] = resultMd5
+							lock.Unlock()
+							atomic.AddInt64(&updateSuccessIndex, 1)
+						} else {
+							atomic.AddInt64(&updateFailIndex, 1)
+						}
 					}
+				}(id, array)
+				if index%500 == 0 {
+					log.Println("对比公网", fromTableName, "数据结束", index)
 				}
-			}(id, array)
-			if index%500 == 0 {
-				log.Println("对比公网", fromTableName, "数据结束", index)
 			}
+		}, query)
+		wait.Wait()
+		log.Println("开始对比公网", fromTableName, "数据结束。。。", index, "新增成功", addSuccessIndex, "新增失败", addFailIndex, "更新成功", updateSuccessIndex, "更新失败", updateFailIndex)
+		notEqCount := Mysql_Main.CountBySql(`SELECT COUNT(1) FROM `+fromTableName+` a INNER JOIN `+toTable+` b ON (a.`+f.IncField+`>=? and a.`+f.IncField+`<? and a.id=b.id AND a.autoUpdate!=b.autoUpdate)`, start_layout, end_layout)
+		if notEqCount == 0 || thisIndex == 10 {
+			break
 		}
-	}, query)
-	wait.Wait()
-	log.Println("开始对比公网", fromTableName, "数据结束。。。", index, "新增", addIndex, "更新", updateIndex)
+		SendMail(fmt.Sprintf("%s表第%d次同步结束后还有%d条数据未同步完,休眠10s后重试", toTable, thisIndex, notEqCount))
+	}
 }

+ 2 - 8
data_analysis/main.go

@@ -81,7 +81,6 @@ func start() {
 			log.Println("Mysql_Main Ping", err)
 			Mysql_Main.Init()
 		}
-		Mysql_Main.ExecBySql(`SET session max_execution_time=86400`)
 		incSyncRun(incSyncMap)
 		fullSyncRun(fullSyncMap)
 		isRuning = false
@@ -89,20 +88,15 @@ func start() {
 	now := time.Now()
 	next := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
 	next = next.Add(time.Minute * Config.Duration)
-	time.AfterFunc(next.Sub(now), func() {
-		start()
-	})
+	time.AfterFunc(next.Sub(now), start)
 }
 func startNew() {
 	if err := Mysql_Main.DB.Ping(); err != nil {
 		log.Println("Mysql_Main Ping", err)
 		Mysql_Main.Init()
 	}
-	Mysql_Main.ExecBySql(`SET session max_execution_time=86400`)
 	incNewSyncRun(incNewSyncMap)
-	time.AfterFunc(time.Minute*Config.IncTask.Duration, func() {
-		incNewSyncRun(incNewSyncMap)
-	})
+	time.AfterFunc(time.Minute*Config.IncTask.Duration, startNew)
 }
 
 func fullSyncRun(es map[string]Entity) {