package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/elastic" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "app.yhyue.com/data_processing/common_utils/mysqldb" "fieldproject_common/config" "fmt" "github.com/spf13/cobra" "go.uber.org/zap" "sync" "time" ) var ( MongoTool, MongoTool1, MongoTool2 *mongodb.MongodbSim MysqlB, MysqlM *mysqldb.Mysql Es *elastic.Elastic ChEs chan bool saveSize int savePool chan map[string]interface{} saveSp chan bool EsSaveCache chan map[string]interface{} SP chan bool updateEsPool chan []map[string]interface{} updateEsSp chan bool ) func init() { config.Init("./common.toml") InitLog() InitMgo() InitMysql() InitEs() ChEs = make(chan bool, 10) saveSize = 200 savePool = make(chan map[string]interface{}, 5000) saveSp = make(chan bool, 2) EsSaveCache = make(chan map[string]interface{}, 1000) SP = make(chan bool, 5) updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 1) log.Info("init success") } func main() { //task() //taskBiddingData() //taskCompanyData() //taskMedicalData() rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(institution()) rootCmd.AddCommand(product()) rootCmd.AddCommand(bidding()) if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } c := make(chan bool, 1) <-c } func task() { sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, 2) wg := &sync.WaitGroup{} log.Info(fmt.Sprintf("%d", MongoTool.Count("qyxy_0824", nil))) field := map[string]interface{}{"use_flag": 0, "province_short": 0, "create_time": 0, "update_time": 0} query := sess.DB(config.Conf.DB.Mongo.Dbname).C("qyxy_0824").Find(nil).Select(field).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%2000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() delete(tmp, "_id") m := make(map[string]interface{}) if util.ObjToString(tmp["company_district"]) != "" { m["district"] = tmp["company_district"] } else if util.ObjToString(tmp["city"]) != "" { m["city"] = tmp["company_city"] } else { m["area"] = tmp["company_area"] } if len(m) > 0 { info := MysqlB.FindOne("code_area", m, "", "") if info != nil && len(*info) > 0 { tmp["area_code"] = (*info)["code"] } else { tmp["area_code"] = "000000" } } else { tmp["area_code"] = "000000" } delete(tmp, "company_area") delete(tmp, "company_city") delete(tmp, "company_district") tmp["comeintime"] = time.Now() tmp["updatetime"] = time.Now() tmp["sourcetype"] = 1 MysqlB.Insert("company_business_model", map[string]interface{}{"company_id": tmp["company_id"], "business_model": util.IntAll(tmp["business_type"]), "company_field_code": "0101", "comeintime": time.Now()}) delete(tmp, "business_type") MysqlB.Insert("company_baseinfo", tmp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } // @Description mysql迭代 // @Author J 2022/8/9 17:32 func taskMedicalData() { pool := make(chan bool, 10) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT * FROM %s where mark_id = 4 ORDER BY id DESC LIMIT 1", "institution_baseinfo")) //lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT * FROM %s where id=58830 ORDER BY id DESC LIMIT 1", "institution_baseinfo")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("查询最后id---", zap.Int("finally id: ", finalId)) lastid, count := 0, 0 for { log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid)) q := fmt.Sprintf("SELECT id, company_id FROM %s WHERE id > %d AND mark_id = 4 ORDER BY id ASC limit 1000000", "institution_baseinfo", lastid) //q := fmt.Sprintf("SELECT id, company_id FROM %s WHERE id=58830 ORDER BY id ASC limit 1000000", "institution_baseinfo") rows, err := MysqlM.DB.Query(q) if err != nil { log.Error("mysql query err ", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { log.Info("----finish-----", zap.Int("count: ", count)) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("mysql scan err ", zap.Error(err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%2000 == 0 { log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid)) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() // mark_id = 1 //taskB(util.ObjToString(tmp["company_id"])) // mark_id = 4 taskB_1(util.ObjToString(tmp["company_id"])) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } func SaveMethod() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool.SaveBulk("bidding_p_list_0907", arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool.SaveBulk("bidding_p_list_0907", arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } }