123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- package main
- import (
- "buyer_data/config"
- "fmt"
- "github.com/spf13/cobra"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
- "time"
- )
- func main() {
- rootCmd := &cobra.Command{Use: "my cmd"}
- rootCmd.AddCommand(buyerEnt()) // buyer_enterprise
- rootCmd.AddCommand(buyerErr()) // buyer_err
- rootCmd.AddCommand(buyerTidb()) // dws_f_ent_baseinfo
- rootCmd.AddCommand(deduplication()) // 去重
- rootCmd.AddCommand(buyerPy()) // buyer_detail_1019
- rootCmd.AddCommand(buyerTask()) // reliability不存在的数据
- if err := rootCmd.Execute(); err != nil {
- fmt.Println("rootCmd.Execute failed", err.Error())
- }
- c := make(chan bool, 1)
- <-c
- }
- func buyerEnt() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "buyer-mongo",
- Short: "Start dispose buyer data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveMethod()
- taskInfo1()
- },
- }
- return cmdClient
- }
- func buyerErr() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "buyer-err",
- Short: "Start dispose buyer data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveMethod()
- taskInfo2()
- },
- }
- return cmdClient
- }
- func buyerTidb() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "buyer-tidb",
- Short: "Start dispose buyer-tidb data",
- Run: func(cmd *cobra.Command, args []string) {
- InitMysql()
- go SaveMethod()
- go updateMethod()
- taskMysql()
- },
- }
- return cmdClient
- }
- func deduplication() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "buyer-dep",
- Short: "Start deduplication data",
- Run: func(cmd *cobra.Command, args []string) {
- redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Code, config.Conf.DB.Redis.Addr), 8)
- taskInfo5()
- },
- }
- return cmdClient
- }
- func buyerPy() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "buyer-py",
- Short: "Start dispose buyer-python data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveMethod()
- go updateMethod()
- taskInfo4()
- },
- }
- return cmdClient
- }
- func buyerTask() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "buyer-err",
- Short: "Start dispose buyer data",
- Run: func(cmd *cobra.Command, args []string) {
- go updateMethod()
- taskInfo3()
- },
- }
- return cmdClient
- }
- func SaveMethod() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool.SaveBulk(config.Conf.DB.Mongo.SaveColl, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool.SaveBulk(config.Conf.DB.Mongo.SaveColl, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func updateMethod() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk(config.Conf.DB.Mongo.SaveColl, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1 * time.Second):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk(config.Conf.DB.Mongo.SaveColl, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|