package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/elastic" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "app.yhyue.com/data_processing/common_utils/mysqldb" "fieldproject_common/config" "fmt" "github.com/spf13/cobra" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" "sync" "time" ) var ( MongoTool, MongoTool1, MongoTool2 *mongodb.MongodbSim MysqlB, MysqlM *mysqldb.Mysql Es *elastic.Elastic ChEs chan bool saveSize int savePool chan map[string]interface{} saveSp chan bool updatePool chan []map[string]interface{} updateSp chan bool EsSaveCache chan map[string]interface{} SP chan bool updateEsPool chan []map[string]interface{} updateEsSp chan bool saveBasePool chan map[string]interface{} saveBaseSp chan bool saveBasePool1 chan map[string]interface{} saveBaseSp1 chan bool saveRcPool chan map[string]interface{} saveRcSp chan bool ) func init() { config.Init("./common.toml") InitLog() InitMgo() InitMysql() InitEs() ChEs = make(chan bool, 10) saveSize = 200 savePool = make(chan map[string]interface{}, 5000) saveSp = make(chan bool, 2) updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 2) EsSaveCache = make(chan map[string]interface{}, 1000) SP = make(chan bool, 5) updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 1) saveBasePool = make(chan map[string]interface{}, 5000) saveBaseSp = make(chan bool, 1) saveBasePool1 = make(chan map[string]interface{}, 5000) saveBaseSp1 = make(chan bool, 1) saveRcPool = make(chan map[string]interface{}, 5000) saveRcSp = make(chan bool, 1) InitField() log.Info("init success") } func main() { //go UpdateMethod() //task() //taskBiddingData() //taskCompanyData() //taskMedicalData() rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(institution()) rootCmd.AddCommand(bidding()) rootCmd.AddCommand(dealer()) // 经销商 rootCmd.AddCommand(dealerEs()) rootCmd.AddCommand(ent()) // 法人 rootCmd.AddCommand(register()) // 许可备案 rootCmd.AddCommand(product()) // 产品信息 rootCmd.AddCommand(project()) // 标的物信息宽表、中标信息 if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } c := make(chan bool, 1) <-c } func task() { sess := MongoTool2.GetMgoConn() defer MongoTool2.DestoryMongoConn(sess) ch := make(chan bool, 2) wg := &sync.WaitGroup{} //log.Info(fmt.Sprintf("%d", MongoTool.Count("qyxy_0824", nil))) field := map[string]interface{}{"company_name": 1, "company_id": 1} query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_company_info").Find(nil).Select(field).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() update := make(map[string]interface{}) name := util.ObjToString(tmp["company_name"]) info := MysqlB.FindOne("dws_f_ent_baseinfo", bson.M{"name": name}, "name_id", "") if info != nil && len(*info) > 0 { update["name_id"] = (*info)["name_id"] } else { info = MysqlM.FindOne("dws_d_name_id_record", bson.M{"name": name}, "name_id", "") if info != nil && len(*info) > 0 { update["name_id"] = (*info)["name_id"] } } if len(update) > 0 { updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": update}, } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } // @Description mysql迭代 // @Author J 2022/8/9 17:32 func taskMedicalData() { pool := make(chan bool, 10) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT * FROM %s where mark_id = 4 ORDER BY id DESC LIMIT 1", "institution_baseinfo")) //lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT * FROM %s where id=58830 ORDER BY id DESC LIMIT 1", "institution_baseinfo")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("查询最后id---", zap.Int("finally id: ", finalId)) lastid, count := 0, 0 for { log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid)) q := fmt.Sprintf("SELECT id, company_id FROM %s WHERE id > %d AND mark_id = 4 ORDER BY id ASC limit 1000000", "institution_baseinfo", lastid) //q := fmt.Sprintf("SELECT id, company_id FROM %s WHERE id=58830 ORDER BY id ASC limit 1000000", "institution_baseinfo") rows, err := MysqlM.DB.Query(q) if err != nil { log.Error("mysql query err ", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { log.Info("----finish-----", zap.Int("count: ", count)) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("mysql scan err ", zap.Error(err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%2000 == 0 { log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid)) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() // mark_id = 1 //taskB(util.ObjToString(tmp["company_id"])) // mark_id = 4 taskB_1(util.ObjToString(tmp["company_id"])) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } func SaveMethod() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool.SaveBulk("bidding_p_list_0907", arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool.SaveBulk("bidding_p_list_0907", arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveFunc(table string, arr []string) { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveBasePool: arru[indexu] = v indexu++ if indexu == saveSize { saveBaseSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveBaseSp }() MysqlM.InsertBulk(table, arr, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveBaseSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveBaseSp }() MysqlM.InsertBulk(table, arr, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveFunc1(table string, arr []string) { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveBasePool1: arru[indexu] = v indexu++ if indexu == saveSize { saveBaseSp1 <- true go func(arru []map[string]interface{}) { defer func() { <-saveBaseSp1 }() MysqlM.InsertBulk(table, arr, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveBaseSp1 <- true go func(arru []map[string]interface{}) { defer func() { <-saveBaseSp1 }() MysqlM.InsertBulk(table, arr, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveFuncRc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveRcPool: arru[indexu] = v indexu++ if indexu == saveSize { saveRcSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveRcSp }() MysqlM.InsertBulk("dws_d_name_id_record", RecordField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveRcSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveRcSp }() MysqlM.InsertBulk("dws_d_name_id_record", RecordField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func UpdateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool2.UpdateBulk("zktest_mysql_company_info", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool2.UpdateBulk("zktest_mysql_company_info", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } }