main.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package main
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "encoding/json"
  6. "flag"
  7. "fmt"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "io"
  10. "io/ioutil"
  11. "log"
  12. "mongodb"
  13. "os"
  14. "qfw/util"
  15. "strings"
  16. "time"
  17. )
  18. var (
  19. MongoTool *mongodb.MongodbSim
  20. updatePool chan []map[string]interface{}
  21. updateSp chan bool
  22. savePool chan map[string]interface{}
  23. saveSp chan bool
  24. saveSize int
  25. saveArr [][]map[string]interface{}
  26. CollArr []string
  27. CurrentColl string
  28. )
  29. func init() {
  30. MongoTool = &mongodb.MongodbSim{
  31. MongodbAddr: "192.168.3.207:29098",
  32. Size: 10,
  33. DbName: "qfw",
  34. }
  35. MongoTool.InitPool()
  36. saveSize = 200
  37. updatePool = make(chan []map[string]interface{}, 10000)
  38. updateSp = make(chan bool, 5)
  39. savePool = make(chan map[string]interface{}, 5000)
  40. saveSp = make(chan bool, 5)
  41. CollArr = []string{"company_base", "company_employee", "company_history_name", "company_partner", "annual_report_base", "annual_report_website"}
  42. }
  43. func main1() {
  44. var path string
  45. flag.StringVar(&path, "p", "", "路径 /nas/qyxy/ftp/ftpuser/upload/20210811/")
  46. flag.Parse()
  47. if path == "" {
  48. flag.PrintDefaults()
  49. log.Fatal("参数错误.")
  50. }
  51. go updateMethod()
  52. // /nas/qyxy/ftp/ftpuser/upload/20210905/
  53. task(path)
  54. c := make(chan bool, 1)
  55. <-c
  56. }
  57. func main() {
  58. go updateMethod()
  59. var path string
  60. flag.StringVar(&path, "p", "", "路径 /")
  61. flag.Parse()
  62. if path == "" {
  63. flag.PrintDefaults()
  64. log.Fatal("参数错误.")
  65. }
  66. subFiles, _ := ioutil.ReadDir(path)
  67. for _, s := range subFiles {
  68. count := 0
  69. CurrentColl = strings.Split(s.Name(), ".")[0]
  70. file := path + s.Name()
  71. util.Debug("current date: ", file)
  72. // 打开本地gz格式压缩包
  73. fr, err := os.Open(file)
  74. if err != nil {
  75. panic(err)
  76. } else {
  77. println("open file success!")
  78. }
  79. // defer: 在函数退出时,执行关闭文件
  80. defer fr.Close()
  81. // 创建gzip文件读取对象
  82. gr, err := gzip.NewReader(fr)
  83. if err != nil {
  84. panic(err)
  85. }
  86. // defer: 在函数退出时,执行关闭gzip对象
  87. defer gr.Close()
  88. bfRd := bufio.NewReader(gr)
  89. for {
  90. line, err := bfRd.ReadBytes('\n')
  91. count = hookfn(line, count)
  92. if err != nil {
  93. if err == io.EOF {
  94. fmt.Println("read gzip data finish! ")
  95. if len(saveArr) > 0 {
  96. tmps := saveArr
  97. MongoTool.UpSertBulk(CurrentColl, tmps...)
  98. saveArr = [][]map[string]interface{}{}
  99. }
  100. break
  101. } else {
  102. fmt.Println("[read gzip data err]: ", err)
  103. }
  104. }
  105. if count%5000 == 0 {
  106. util.Debug("current exc---", file, count)
  107. }
  108. }
  109. }
  110. }
  111. func task(path string) {
  112. for _, c := range CollArr {
  113. CurrentColl = c
  114. subPath := path + c + "/"
  115. subFiles, _ := ioutil.ReadDir(subPath)
  116. for _, s := range subFiles {
  117. if s.IsDir() {
  118. taskinfo(subPath + s.Name())
  119. }
  120. }
  121. }
  122. }
  123. func taskinfo(path string) {
  124. count := 0
  125. file := path + "/split.json.gz"
  126. util.Debug("current date: ", file)
  127. // 打开本地gz格式压缩包
  128. fr, err := os.Open(file)
  129. if err != nil {
  130. panic(err)
  131. } else {
  132. println("open file success!")
  133. }
  134. // defer: 在函数退出时,执行关闭文件
  135. defer fr.Close()
  136. // 创建gzip文件读取对象
  137. gr, err := gzip.NewReader(fr)
  138. if err != nil {
  139. panic(err)
  140. }
  141. // defer: 在函数退出时,执行关闭gzip对象
  142. defer gr.Close()
  143. bfRd := bufio.NewReader(gr)
  144. for {
  145. line, err := bfRd.ReadBytes('\n')
  146. count = hookfn(line, count)
  147. if err != nil {
  148. if err == io.EOF {
  149. fmt.Println("read gzip data finish! ")
  150. break
  151. } else {
  152. fmt.Println("[read gzip data err]: ", err)
  153. }
  154. }
  155. if count%5000 == 0 {
  156. util.Debug("current exc---", file, count)
  157. }
  158. }
  159. }
  160. func hookfn1(line []byte, count int) int {
  161. tmp := make(map[string]interface{})
  162. err := json.Unmarshal(line, &tmp)
  163. if err != nil {
  164. util.Debug("err---", err)
  165. }
  166. count++
  167. save := make(map[string]interface{})
  168. save["$push"] = map[string]interface{}{
  169. CurrentColl: tmp,
  170. }
  171. save["$set"] = bson.M{"_id": tmp["company_id"]}
  172. saveInfo := []map[string]interface{}{
  173. {"_id": tmp["company_id"]},
  174. save,
  175. }
  176. updatePool <- saveInfo
  177. return count
  178. }
  179. func hookfn(line []byte, count int) int {
  180. tmp := make(map[string]interface{})
  181. err := json.Unmarshal(line, &tmp)
  182. if err != nil {
  183. util.Debug("err---", err)
  184. }
  185. count++
  186. if len(tmp) > 0 {
  187. id := tmp["_id"].(map[string]interface{})["$oid"]
  188. delete(tmp, "_id")
  189. saveInfo := []map[string]interface{}{
  190. {"_id": mongodb.StringTOBsonId(util.ObjToString(id))},
  191. tmp,
  192. }
  193. saveArr = append(saveArr, saveInfo)
  194. if len(saveArr) > 500 {
  195. tmps := saveArr
  196. MongoTool.UpSertBulk(CurrentColl, tmps...)
  197. saveArr = [][]map[string]interface{}{}
  198. }
  199. }
  200. return count
  201. }
  202. func updateMethod() {
  203. arru := make([][]map[string]interface{}, saveSize)
  204. indexu := 0
  205. for {
  206. select {
  207. case v := <-updatePool:
  208. arru[indexu] = v
  209. indexu++
  210. if indexu == saveSize {
  211. updateSp <- true
  212. go func(arru [][]map[string]interface{}) {
  213. defer func() {
  214. <-updateSp
  215. }()
  216. MongoTool.UpSertBulk("jy_tmp", arru...)
  217. }(arru)
  218. arru = make([][]map[string]interface{}, saveSize)
  219. indexu = 0
  220. }
  221. case <-time.After(1000 * time.Millisecond):
  222. if indexu > 0 {
  223. updateSp <- true
  224. go func(arru [][]map[string]interface{}) {
  225. defer func() {
  226. <-updateSp
  227. }()
  228. MongoTool.UpSertBulk("jy_tmp", arru...)
  229. }(arru[:indexu])
  230. arru = make([][]map[string]interface{}, saveSize)
  231. indexu = 0
  232. }
  233. }
  234. }
  235. }