123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- package main
- import (
- "bufio"
- "compress/gzip"
- "encoding/json"
- "flag"
- "fmt"
- "go.mongodb.org/mongo-driver/bson"
- "io"
- "io/ioutil"
- "log"
- "mongodb"
- "os"
- "qfw/util"
- "strings"
- "time"
- )
- var (
- MongoTool *mongodb.MongodbSim
- updatePool chan []map[string]interface{}
- updateSp chan bool
- savePool chan map[string]interface{}
- saveSp chan bool
- saveSize int
- saveArr [][]map[string]interface{}
- CollArr []string
- CurrentColl string
- )
- func init() {
- MongoTool = &mongodb.MongodbSim{
- MongodbAddr: "192.168.3.207:29098",
- Size: 10,
- DbName: "qfw",
- }
- MongoTool.InitPool()
- saveSize = 200
- updatePool = make(chan []map[string]interface{}, 10000)
- updateSp = make(chan bool, 5)
- savePool = make(chan map[string]interface{}, 5000)
- saveSp = make(chan bool, 5)
- CollArr = []string{"company_base", "company_employee", "company_history_name", "company_partner", "annual_report_base", "annual_report_website"}
- }
- func main1() {
- var path string
- flag.StringVar(&path, "p", "", "路径 /nas/qyxy/ftp/ftpuser/upload/20210811/")
- flag.Parse()
- if path == "" {
- flag.PrintDefaults()
- log.Fatal("参数错误.")
- }
- go updateMethod()
- // /nas/qyxy/ftp/ftpuser/upload/20210905/
- task(path)
- c := make(chan bool, 1)
- <-c
- }
- func main() {
- go updateMethod()
- var path string
- flag.StringVar(&path, "p", "", "路径 /")
- flag.Parse()
- if path == "" {
- flag.PrintDefaults()
- log.Fatal("参数错误.")
- }
- subFiles, _ := ioutil.ReadDir(path)
- for _, s := range subFiles {
- count := 0
- CurrentColl = strings.Split(s.Name(), ".")[0]
- file := path + s.Name()
- util.Debug("current date: ", file)
- // 打开本地gz格式压缩包
- fr, err := os.Open(file)
- if err != nil {
- panic(err)
- } else {
- println("open file success!")
- }
- // defer: 在函数退出时,执行关闭文件
- defer fr.Close()
- // 创建gzip文件读取对象
- gr, err := gzip.NewReader(fr)
- if err != nil {
- panic(err)
- }
- // defer: 在函数退出时,执行关闭gzip对象
- defer gr.Close()
- bfRd := bufio.NewReader(gr)
- for {
- line, err := bfRd.ReadBytes('\n')
- count = hookfn(line, count)
- if err != nil {
- if err == io.EOF {
- fmt.Println("read gzip data finish! ")
- if len(saveArr) > 0 {
- tmps := saveArr
- MongoTool.UpSertBulk(CurrentColl, tmps...)
- saveArr = [][]map[string]interface{}{}
- }
- break
- } else {
- fmt.Println("[read gzip data err]: ", err)
- }
- }
- if count%5000 == 0 {
- util.Debug("current exc---", file, count)
- }
- }
- }
- }
- func task(path string) {
- for _, c := range CollArr {
- CurrentColl = c
- subPath := path + c + "/"
- subFiles, _ := ioutil.ReadDir(subPath)
- for _, s := range subFiles {
- if s.IsDir() {
- taskinfo(subPath + s.Name())
- }
- }
- }
- }
- func taskinfo(path string) {
- count := 0
- file := path + "/split.json.gz"
- util.Debug("current date: ", file)
- // 打开本地gz格式压缩包
- fr, err := os.Open(file)
- if err != nil {
- panic(err)
- } else {
- println("open file success!")
- }
- // defer: 在函数退出时,执行关闭文件
- defer fr.Close()
- // 创建gzip文件读取对象
- gr, err := gzip.NewReader(fr)
- if err != nil {
- panic(err)
- }
- // defer: 在函数退出时,执行关闭gzip对象
- defer gr.Close()
- bfRd := bufio.NewReader(gr)
- for {
- line, err := bfRd.ReadBytes('\n')
- count = hookfn(line, count)
- if err != nil {
- if err == io.EOF {
- fmt.Println("read gzip data finish! ")
- break
- } else {
- fmt.Println("[read gzip data err]: ", err)
- }
- }
- if count%5000 == 0 {
- util.Debug("current exc---", file, count)
- }
- }
- }
- func hookfn1(line []byte, count int) int {
- tmp := make(map[string]interface{})
- err := json.Unmarshal(line, &tmp)
- if err != nil {
- util.Debug("err---", err)
- }
- count++
- save := make(map[string]interface{})
- save["$push"] = map[string]interface{}{
- CurrentColl: tmp,
- }
- save["$set"] = bson.M{"_id": tmp["company_id"]}
- saveInfo := []map[string]interface{}{
- {"_id": tmp["company_id"]},
- save,
- }
- updatePool <- saveInfo
- return count
- }
- func hookfn(line []byte, count int) int {
- tmp := make(map[string]interface{})
- err := json.Unmarshal(line, &tmp)
- if err != nil {
- util.Debug("err---", err)
- }
- count++
- if len(tmp) > 0 {
- id := tmp["_id"].(map[string]interface{})["$oid"]
- delete(tmp, "_id")
- saveInfo := []map[string]interface{}{
- {"_id": mongodb.StringTOBsonId(util.ObjToString(id))},
- tmp,
- }
- saveArr = append(saveArr, saveInfo)
- if len(saveArr) > 500 {
- tmps := saveArr
- MongoTool.UpSertBulk(CurrentColl, tmps...)
- saveArr = [][]map[string]interface{}{}
- }
- }
- return count
- }
- func updateMethod() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk("jy_tmp", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk("jy_tmp", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|