main.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "qfw/util"
  7. "sync"
  8. "time"
  9. )
  10. func main() {
  11. go updateMethod()
  12. //go SaveMethod()
  13. go TaskFun()
  14. c := make(chan bool, 1)
  15. <-c
  16. }
  17. var coll string
  18. func main1() {
  19. flag.StringVar(&coll, "coll", "", "表名")
  20. flag.Parse()
  21. if coll == "" {
  22. flag.PrintDefaults()
  23. log.Println("参数错误.")
  24. return
  25. }
  26. //go updateMethod()
  27. go SaveMethod()
  28. task()
  29. ch := make(chan bool, 1)
  30. <-ch
  31. }
  32. func task() {
  33. pool := make(chan bool, 10) //控制线程数
  34. wg := &sync.WaitGroup{}
  35. finalId := 0
  36. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", coll))
  37. if len(*lastInfo) > 0 {
  38. finalId = util.IntAll((*lastInfo)[0]["id"])
  39. }
  40. util.Debug("finally id---", finalId)
  41. lastid, count := 0, 0
  42. for {
  43. util.Debug("重新查询,lastid---", lastid)
  44. q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", coll, lastid)
  45. //q := "SELECT * FROM company_base WHERE company_id='282a0fd3080deffd317b686081df4e66'"
  46. rows, err := MysqlTool.DB.Query(q)
  47. if err != nil{
  48. log.Println(err)
  49. }
  50. columns, err := rows.Columns()
  51. if finalId == lastid {
  52. util.Debug("----finish----------", count)
  53. break
  54. }
  55. //if lastid >= 186177635 {
  56. // util.Debug("--finish--lastid--", lastid)
  57. // break
  58. //}
  59. for rows.Next(){
  60. scanArgs := make([]interface{}, len(columns))
  61. values := make([]interface{}, len(columns))
  62. ret := make(map[string]interface{})
  63. for k := range values {
  64. scanArgs[k] = &values[k]
  65. }
  66. err = rows.Scan(scanArgs...)
  67. if err != nil {
  68. log.Println(err)
  69. break
  70. }
  71. for i, col := range values {
  72. if v, ok := col.([]uint8); ok {
  73. ret[columns[i]] = string(v)
  74. } else {
  75. ret[columns[i]] = col
  76. }
  77. }
  78. lastid = util.IntAll(ret["id"])
  79. count++
  80. if count % 20000 == 0 {
  81. util.Debug("current-------", count, lastid)
  82. }
  83. //if strings.Contains(util.ObjToString(ret["company_type"]), "个体") {
  84. // continue
  85. //}
  86. pool <- true
  87. wg.Add(1)
  88. go func(tmp map[string]interface{}) {
  89. defer func() {
  90. <-pool
  91. wg.Done()
  92. }()
  93. //saveInfo := []map[string]interface{}{
  94. // {"_id": util.IntAll(tmp["id"])},
  95. // {"$set": tmp},
  96. //}
  97. //updatePool <- saveInfo
  98. tmp["_id"] = util.IntAll(tmp["id"])
  99. savePool <- tmp
  100. }(ret)
  101. ret = make(map[string]interface{})
  102. }
  103. _ = rows.Close()
  104. wg.Wait()
  105. }
  106. }
  107. func updateMethod() {
  108. arru := make([][]map[string]interface{}, saveSize)
  109. indexu := 0
  110. for {
  111. select {
  112. case v := <-updatePool:
  113. arru[indexu] = v
  114. indexu++
  115. if indexu == saveSize {
  116. updateSp <- true
  117. go func(arru [][]map[string]interface{}) {
  118. defer func() {
  119. <-updateSp
  120. }()
  121. MongoTool.UpSertBulk(DbSave, arru...)
  122. }(arru)
  123. arru = make([][]map[string]interface{}, saveSize)
  124. indexu = 0
  125. }
  126. case <-time.After(1000 * time.Millisecond):
  127. if indexu > 0 {
  128. updateSp <- true
  129. go func(arru [][]map[string]interface{}) {
  130. defer func() {
  131. <-updateSp
  132. }()
  133. MongoTool.UpSertBulk(DbSave, arru...)
  134. }(arru[:indexu])
  135. arru = make([][]map[string]interface{}, saveSize)
  136. indexu = 0
  137. }
  138. }
  139. }
  140. }
  141. func SaveMethod() {
  142. log.Println("Mgo Save...")
  143. arru := make([]map[string]interface{}, saveSize)
  144. indexu := 0
  145. for {
  146. select {
  147. case v := <-savePool:
  148. arru[indexu] = v
  149. indexu++
  150. if indexu == saveSize {
  151. saveSp <- true
  152. go func(arru []map[string]interface{}) {
  153. defer func() {
  154. <-saveSp
  155. }()
  156. MongoTool1.SaveBulk(coll, arru...)
  157. }(arru)
  158. arru = make([]map[string]interface{}, saveSize)
  159. indexu = 0
  160. }
  161. case <-time.After(1000 * time.Millisecond):
  162. if indexu > 0 {
  163. saveSp <- true
  164. go func(arru []map[string]interface{}) {
  165. defer func() {
  166. <-saveSp
  167. }()
  168. MongoTool1.SaveBulk(coll, arru...)
  169. }(arru[:indexu])
  170. arru = make([]map[string]interface{}, saveSize)
  171. indexu = 0
  172. }
  173. }
  174. }
  175. }