Ver Fonte

更新 bidding_listen

wcc há 1 ano atrás
pai
commit
04deb63648
1 ficheiros alterados com 13 adições e 38 exclusões
  1. 13 38
      bidding_listen/main.go

+ 13 - 38
bidding_listen/main.go

@@ -1,15 +1,14 @@
 package main
 
 import (
-	utils "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"context"
 	"fmt"
 	"github.com/robfig/cron/v3"
 	"go.uber.org/zap"
+	utils "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb"
-	"sync"
 	"time"
 )
 
@@ -46,7 +45,7 @@ func specData() {
 	log.Info("main", zap.String("结束", "over"))
 }
 
-//deleteData 删除数据
+// deleteData 删除数据
 func deleteData() {
 	defer utils.Catch()
 	now := time.Now()
@@ -180,28 +179,19 @@ func dealBidding() {
 		"contenthtml": 0}).Iter()
 	count := 0
 
-	ch := make(chan bool, 10)
-	wg := &sync.WaitGroup{}
-
-	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+	tmp := make(map[string]interface{})
+	for {
+		//for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if !query.Next(&tmp) {
+			break
+		}
+		count++
 		if count%1000 == 0 {
 			log.Info("dealBidding", zap.Int("current", count))
 		}
-
-		ch <- true
-		wg.Add(1)
-		go func(tmp map[string]interface{}) {
-			defer func() {
-				<-ch
-				wg.Done()
-			}()
-
-			saveBidding(tmp)
-		}(tmp)
+		saveBidding(tmp)
 		tmp = map[string]interface{}{}
 	}
-
-	wg.Wait()
 	log.Info("dealBidding", zap.Int("over ", count))
 	//没有数据时,发送邮件
 	if count == 0 {
@@ -209,7 +199,7 @@ func dealBidding() {
 	}
 }
 
-//saveBidding 保存bidding数据
+// saveBidding 保存bidding数据
 func saveBidding(tmp map[string]interface{}) {
 	sess := MgoP.GetMgoConn()
 	defer MgoP.DestoryMongoConn(sess)
@@ -344,10 +334,8 @@ func saveBidding(tmp map[string]interface{}) {
 
 }
 
-//dealProject 更新项目信息
+// dealProject 更新项目信息
 func dealProject() {
-	buyerPool := make(chan bool, 5) //控制线程数
-	wg := &sync.WaitGroup{}
 
 	sql := fmt.Sprintf(`select * from %s where project_id = '' and is_repeat = 0 `, GF.Mysql.Table)
 
@@ -385,28 +373,15 @@ func dealProject() {
 			}
 		}
 
-		//
-		buyerPool <- true
-		wg.Add(1)
-
 		if total%100 == 0 {
 			log.Info("dealProject", zap.Int("current", total), zap.Any("bidding_id", ret["bidding_id"]))
 		}
-
-		go func(tmp map[string]interface{}) {
-			defer func() {
-				<-buyerPool
-				wg.Done()
-			}()
-			updateProject(tmp)
-
-		}(ret)
+		updateProject(ret)
 
 		ret = make(map[string]interface{})
 	}
 
 	rows.Close()
-	wg.Wait()
 
 	log.Info("dealProject", zap.String("deal", "over"))
 }