package main import ( "bufio" "compress/gzip" "encoding/json" "flag" "fmt" "go.mongodb.org/mongo-driver/bson" "io" "io/ioutil" "log" "mongodb" "os" "qfw/util" "strings" "time" ) var ( MongoTool *mongodb.MongodbSim updatePool chan []map[string]interface{} updateSp chan bool savePool chan map[string]interface{} saveSp chan bool saveSize int saveArr [][]map[string]interface{} CollArr []string CurrentColl string ) func init() { MongoTool = &mongodb.MongodbSim{ MongodbAddr: "192.168.3.207:29098", Size: 10, DbName: "qfw", } MongoTool.InitPool() saveSize = 200 updatePool = make(chan []map[string]interface{}, 10000) updateSp = make(chan bool, 5) savePool = make(chan map[string]interface{}, 5000) saveSp = make(chan bool, 5) CollArr = []string{"company_base", "company_employee", "company_history_name", "company_partner", "annual_report_base", "annual_report_website"} } func main1() { var path string flag.StringVar(&path, "p", "", "路径 /nas/qyxy/ftp/ftpuser/upload/20210811/") flag.Parse() if path == "" { flag.PrintDefaults() log.Fatal("参数错误.") } go updateMethod() // /nas/qyxy/ftp/ftpuser/upload/20210905/ task(path) c := make(chan bool, 1) <-c } func main() { go updateMethod() var path string flag.StringVar(&path, "p", "", "路径 /") flag.Parse() if path == "" { flag.PrintDefaults() log.Fatal("参数错误.") } subFiles, _ := ioutil.ReadDir(path) for _, s := range subFiles { count := 0 CurrentColl = strings.Split(s.Name(), ".")[0] file := path + s.Name() util.Debug("current date: ", file) // 打开本地gz格式压缩包 fr, err := os.Open(file) if err != nil { panic(err) } else { println("open file success!") } // defer: 在函数退出时,执行关闭文件 defer fr.Close() // 创建gzip文件读取对象 gr, err := gzip.NewReader(fr) if err != nil { panic(err) } // defer: 在函数退出时,执行关闭gzip对象 defer gr.Close() bfRd := bufio.NewReader(gr) for { line, err := bfRd.ReadBytes('\n') count = hookfn(line, count) if err != nil { if err == io.EOF { fmt.Println("read gzip data finish! ") if len(saveArr) > 0 { tmps := saveArr MongoTool.UpSertBulk(CurrentColl, tmps...) saveArr = [][]map[string]interface{}{} } break } else { fmt.Println("[read gzip data err]: ", err) } } if count%5000 == 0 { util.Debug("current exc---", file, count) } } } } func task(path string) { for _, c := range CollArr { CurrentColl = c subPath := path + c + "/" subFiles, _ := ioutil.ReadDir(subPath) for _, s := range subFiles { if s.IsDir() { taskinfo(subPath + s.Name()) } } } } func taskinfo(path string) { count := 0 file := path + "/split.json.gz" util.Debug("current date: ", file) // 打开本地gz格式压缩包 fr, err := os.Open(file) if err != nil { panic(err) } else { println("open file success!") } // defer: 在函数退出时,执行关闭文件 defer fr.Close() // 创建gzip文件读取对象 gr, err := gzip.NewReader(fr) if err != nil { panic(err) } // defer: 在函数退出时,执行关闭gzip对象 defer gr.Close() bfRd := bufio.NewReader(gr) for { line, err := bfRd.ReadBytes('\n') count = hookfn(line, count) if err != nil { if err == io.EOF { fmt.Println("read gzip data finish! ") break } else { fmt.Println("[read gzip data err]: ", err) } } if count%5000 == 0 { util.Debug("current exc---", file, count) } } } func hookfn1(line []byte, count int) int { tmp := make(map[string]interface{}) err := json.Unmarshal(line, &tmp) if err != nil { util.Debug("err---", err) } count++ save := make(map[string]interface{}) save["$push"] = map[string]interface{}{ CurrentColl: tmp, } save["$set"] = bson.M{"_id": tmp["company_id"]} saveInfo := []map[string]interface{}{ {"_id": tmp["company_id"]}, save, } updatePool <- saveInfo return count } func hookfn(line []byte, count int) int { tmp := make(map[string]interface{}) err := json.Unmarshal(line, &tmp) if err != nil { util.Debug("err---", err) } count++ if len(tmp) > 0 { id := tmp["_id"].(map[string]interface{})["$oid"] delete(tmp, "_id") saveInfo := []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(util.ObjToString(id))}, tmp, } saveArr = append(saveArr, saveInfo) if len(saveArr) > 500 { tmps := saveArr MongoTool.UpSertBulk(CurrentColl, tmps...) saveArr = [][]map[string]interface{}{} } } return count } func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk("jy_tmp", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk("jy_tmp", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } }