소스 검색

增量交易信息数据新增项目来源

mxs 1 년 전
부모
커밋
35656f9358
3개의 변경된 파일45개의 추가작업 그리고 8개의 파일을 삭제
  1. 38 0
      data_project_wy/go.mod
  2. 0 1
      data_project_wy/main.go
  3. 7 7
      data_project_wy/task.go

+ 38 - 0
data_project_wy/go.mod

@@ -0,0 +1,38 @@
+module data_project_wy
+
+go 1.21.5
+
+require (
+	github.com/ClickHouse/clickhouse-go/v2 v2.23.0
+	github.com/gogf/gf/v2 v2.7.0
+	github.com/robfig/cron v1.2.0
+	go.mongodb.org/mongo-driver v1.11.4
+	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240202055658-e2ef72e18b40
+)
+
+require (
+	github.com/ClickHouse/ch-go v0.61.5 // indirect
+	github.com/andybalholm/brotli v1.1.0 // indirect
+	github.com/go-faster/city v1.0.1 // indirect
+	github.com/go-faster/errors v0.7.1 // indirect
+	github.com/golang/snappy v0.0.1 // indirect
+	github.com/google/uuid v1.6.0 // indirect
+	github.com/klauspost/compress v1.17.7 // indirect
+	github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
+	github.com/paulmach/orb v0.11.1 // indirect
+	github.com/pierrec/lz4/v4 v4.1.21 // indirect
+	github.com/pkg/errors v0.9.1 // indirect
+	github.com/segmentio/asm v1.2.0 // indirect
+	github.com/shopspring/decimal v1.3.1 // indirect
+	github.com/xdg-go/pbkdf2 v1.0.0 // indirect
+	github.com/xdg-go/scram v1.1.1 // indirect
+	github.com/xdg-go/stringprep v1.0.3 // indirect
+	github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
+	go.opentelemetry.io/otel v1.24.0 // indirect
+	go.opentelemetry.io/otel/trace v1.24.0 // indirect
+	golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
+	golang.org/x/sync v0.6.0 // indirect
+	golang.org/x/sys v0.18.0 // indirect
+	golang.org/x/text v0.13.0 // indirect
+	gopkg.in/yaml.v3 v3.0.1 // indirect
+)

+ 0 - 1
data_project_wy/main.go

@@ -12,7 +12,6 @@ func init() {
 }
 
 func main() {
-	//go SaveTransactionData() //保存增量物业信息
 	c := cron.New()
 	//增量
 	c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据

+ 7 - 7
data_project_wy/task.go

@@ -45,8 +45,8 @@ type Transaction struct {
 
 func IncTransactionDataFromBidAndPro() {
 	IncTransactionDataFromBid() //bidding
+	IncTransactionDataFromPro() //project
 	return
-	IncTransactionDataFromPro()  //project
 	IncTransactionDataMgoToCkh() //mongodb迁移至clickhouse
 }
 
@@ -223,8 +223,8 @@ func IncTransactionDataFromPro() {
 		},
 	}
 	fmt.Println("增量项目查询query:", query)
-	sess := MgoB.GetMgoConn()
-	defer MgoB.DestoryMongoConn(sess)
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
 	ch := make(chan bool, 5)
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{}
@@ -254,7 +254,7 @@ func IncTransactionDataFromPro() {
 		"tag_topinformation_ai": 1,
 	}
 	arr := [][]map[string]interface{}{}
-	it := sess.DB(MgoB.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
+	it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
 	n := 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
 		ch <- true
@@ -277,19 +277,19 @@ func IncTransactionDataFromPro() {
 				arr = append(arr, update)
 			}
 			if len(arr) > 50 {
-				MgoPro.UpSertBulk("projectset_wy", arr...)
+				MgoPro.UpSertBulk("projectset_wy_back", arr...)
 				arr = [][]map[string]interface{}{}
 			}
 			lock.Unlock()
 		}(tmp)
-		if n%100 == 0 {
+		if n%1000 == 0 {
 			fmt.Println("current:", n)
 		}
 		tmp = map[string]interface{}{}
 	}
 	wg.Wait()
 	if len(arr) > 0 {
-		MgoPro.UpSertBulk("projectset_wy", arr...)
+		MgoPro.UpSertBulk("projectset_wy_back", arr...)
 		arr = [][]map[string]interface{}{}
 	}
 	fmt.Println("执行增量项目信息完毕", ProStartTime, endTime)