main.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/elastic"
  5. "app.yhyue.com/data_processing/common_utils/log"
  6. "app.yhyue.com/data_processing/common_utils/mongodb"
  7. "app.yhyue.com/data_processing/common_utils/mysqldb"
  8. "fieldproject_common/config"
  9. "fmt"
  10. "github.com/spf13/cobra"
  11. "go.uber.org/zap"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. MongoTool, MongoTool1, MongoTool2 *mongodb.MongodbSim
  17. MysqlB, MysqlM *mysqldb.Mysql
  18. Es *elastic.Elastic
  19. ChEs chan bool
  20. saveSize int
  21. savePool chan map[string]interface{}
  22. saveSp chan bool
  23. EsSaveCache chan map[string]interface{}
  24. SP chan bool
  25. updateEsPool chan []map[string]interface{}
  26. updateEsSp chan bool
  27. )
  28. func init() {
  29. config.Init("./common.toml")
  30. InitLog()
  31. InitMgo()
  32. InitMysql()
  33. InitEs()
  34. ChEs = make(chan bool, 10)
  35. saveSize = 200
  36. savePool = make(chan map[string]interface{}, 5000)
  37. saveSp = make(chan bool, 2)
  38. EsSaveCache = make(chan map[string]interface{}, 1000)
  39. SP = make(chan bool, 5)
  40. updateEsPool = make(chan []map[string]interface{}, 5000)
  41. updateEsSp = make(chan bool, 1)
  42. log.Info("init success")
  43. }
  44. func main() {
  45. //task()
  46. //taskBiddingData()
  47. //taskCompanyData()
  48. //taskMedicalData()
  49. rootCmd := &cobra.Command{Use: "my cmd"}
  50. rootCmd.AddCommand(institution())
  51. rootCmd.AddCommand(product())
  52. rootCmd.AddCommand(bidding())
  53. if err := rootCmd.Execute(); err != nil {
  54. fmt.Println("rootCmd.Execute failed", err.Error())
  55. }
  56. c := make(chan bool, 1)
  57. <-c
  58. }
  59. func task() {
  60. sess := MongoTool.GetMgoConn()
  61. defer MongoTool.DestoryMongoConn(sess)
  62. ch := make(chan bool, 2)
  63. wg := &sync.WaitGroup{}
  64. log.Info(fmt.Sprintf("%d", MongoTool.Count("qyxy_0824", nil)))
  65. field := map[string]interface{}{"use_flag": 0, "province_short": 0, "create_time": 0, "update_time": 0}
  66. query := sess.DB(config.Conf.DB.Mongo.Dbname).C("qyxy_0824").Find(nil).Select(field).Iter()
  67. count := 0
  68. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  69. if count%2000 == 0 {
  70. log.Info(fmt.Sprintf("current --- %d", count))
  71. }
  72. ch <- true
  73. wg.Add(1)
  74. go func(tmp map[string]interface{}) {
  75. defer func() {
  76. <-ch
  77. wg.Done()
  78. }()
  79. delete(tmp, "_id")
  80. m := make(map[string]interface{})
  81. if util.ObjToString(tmp["company_district"]) != "" {
  82. m["district"] = tmp["company_district"]
  83. } else if util.ObjToString(tmp["city"]) != "" {
  84. m["city"] = tmp["company_city"]
  85. } else {
  86. m["area"] = tmp["company_area"]
  87. }
  88. if len(m) > 0 {
  89. info := MysqlB.FindOne("code_area", m, "", "")
  90. if info != nil && len(*info) > 0 {
  91. tmp["area_code"] = (*info)["code"]
  92. } else {
  93. tmp["area_code"] = "000000"
  94. }
  95. } else {
  96. tmp["area_code"] = "000000"
  97. }
  98. delete(tmp, "company_area")
  99. delete(tmp, "company_city")
  100. delete(tmp, "company_district")
  101. tmp["comeintime"] = time.Now()
  102. tmp["updatetime"] = time.Now()
  103. tmp["sourcetype"] = 1
  104. MysqlB.Insert("company_business_model", map[string]interface{}{"company_id": tmp["company_id"],
  105. "business_model": util.IntAll(tmp["business_type"]), "company_field_code": "0101", "comeintime": time.Now()})
  106. delete(tmp, "business_type")
  107. MysqlB.Insert("company_baseinfo", tmp)
  108. }(tmp)
  109. tmp = make(map[string]interface{})
  110. }
  111. wg.Wait()
  112. log.Info(fmt.Sprintf("over --- %d", count))
  113. }
  114. // @Description mysql迭代
  115. // @Author J 2022/8/9 17:32
  116. func taskMedicalData() {
  117. pool := make(chan bool, 10) //控制线程数
  118. wg := &sync.WaitGroup{}
  119. finalId := 0
  120. lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT * FROM %s where mark_id = 4 ORDER BY id DESC LIMIT 1", "institution_baseinfo"))
  121. //lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT * FROM %s where id=58830 ORDER BY id DESC LIMIT 1", "institution_baseinfo"))
  122. if len(*lastInfo) > 0 {
  123. finalId = util.IntAll((*lastInfo)[0]["id"])
  124. }
  125. log.Info("查询最后id---", zap.Int("finally id: ", finalId))
  126. lastid, count := 0, 0
  127. for {
  128. log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
  129. q := fmt.Sprintf("SELECT id, company_id FROM %s WHERE id > %d AND mark_id = 4 ORDER BY id ASC limit 1000000", "institution_baseinfo", lastid)
  130. //q := fmt.Sprintf("SELECT id, company_id FROM %s WHERE id=58830 ORDER BY id ASC limit 1000000", "institution_baseinfo")
  131. rows, err := MysqlM.DB.Query(q)
  132. if err != nil {
  133. log.Error("mysql query err ", zap.Error(err))
  134. }
  135. columns, err := rows.Columns()
  136. if finalId == lastid {
  137. log.Info("----finish-----", zap.Int("count: ", count))
  138. break
  139. }
  140. for rows.Next() {
  141. scanArgs := make([]interface{}, len(columns))
  142. values := make([]interface{}, len(columns))
  143. ret := make(map[string]interface{})
  144. for k := range values {
  145. scanArgs[k] = &values[k]
  146. }
  147. err = rows.Scan(scanArgs...)
  148. if err != nil {
  149. log.Error("mysql scan err ", zap.Error(err))
  150. break
  151. }
  152. for i, col := range values {
  153. if v, ok := col.([]uint8); ok {
  154. ret[columns[i]] = string(v)
  155. } else {
  156. ret[columns[i]] = col
  157. }
  158. }
  159. lastid = util.IntAll(ret["id"])
  160. count++
  161. if count%2000 == 0 {
  162. log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
  163. }
  164. pool <- true
  165. wg.Add(1)
  166. go func(tmp map[string]interface{}) {
  167. defer func() {
  168. <-pool
  169. wg.Done()
  170. }()
  171. // mark_id = 1
  172. //taskB(util.ObjToString(tmp["company_id"]))
  173. // mark_id = 4
  174. taskB_1(util.ObjToString(tmp["company_id"]))
  175. }(ret)
  176. ret = make(map[string]interface{})
  177. }
  178. _ = rows.Close()
  179. wg.Wait()
  180. }
  181. }
  182. func SaveMethod() {
  183. arru := make([]map[string]interface{}, saveSize)
  184. indexu := 0
  185. for {
  186. select {
  187. case v := <-savePool:
  188. arru[indexu] = v
  189. indexu++
  190. if indexu == saveSize {
  191. saveSp <- true
  192. go func(arru []map[string]interface{}) {
  193. defer func() {
  194. <-saveSp
  195. }()
  196. MongoTool.SaveBulk("bidding_p_list_0907", arru...)
  197. }(arru)
  198. arru = make([]map[string]interface{}, saveSize)
  199. indexu = 0
  200. }
  201. case <-time.After(1000 * time.Millisecond):
  202. if indexu > 0 {
  203. saveSp <- true
  204. go func(arru []map[string]interface{}) {
  205. defer func() {
  206. <-saveSp
  207. }()
  208. MongoTool.SaveBulk("bidding_p_list_0907", arru...)
  209. }(arru[:indexu])
  210. arru = make([]map[string]interface{}, saveSize)
  211. indexu = 0
  212. }
  213. }
  214. }
  215. }