main.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "sync"
  6. qu "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/mongodb"
  8. "app.yhyue.com/moapp/jybase/mysql"
  9. "github.com/gogf/gf/v2/util/gconv"
  10. )
  11. var config map[string]interface{}
  12. var Mgo *mongodb.MongodbSim
  13. var TidbCall *mysql.Mysql
  14. var TidbBi *mysql.Mysql
  15. func InitMongo(mgs map[string]interface{}) {
  16. Mgo = &mongodb.MongodbSim{
  17. MongodbAddr: qu.ObjToString(mgs["address"]),
  18. Size: qu.IntAll(mgs["size"]),
  19. DbName: qu.ObjToString(mgs["dbName"]),
  20. ReplSet: qu.ObjToString(mgs["replSet"]),
  21. UserName: qu.ObjToString(mgs["userName"]),
  22. Password: qu.ObjToString(mgs["password"]),
  23. }
  24. Mgo.InitPool()
  25. log.Println("初始化 mongodb")
  26. }
  27. func InitMysql(mys map[string]interface{}) {
  28. TidbBi = &mysql.Mysql{
  29. Address: qu.ObjToString(mys["address"]),
  30. UserName: qu.ObjToString(mys["userName"]),
  31. PassWord: qu.ObjToString(mys["passWord"]),
  32. DBName: qu.ObjToString(mys["dbName"]),
  33. MaxOpenConns: qu.IntAll(mys["maxOpenConns"]),
  34. MaxIdleConns: qu.IntAll(mys["maxIdleConns"]),
  35. }
  36. TidbBi.Init()
  37. log.Println("初始化 mysql")
  38. }
  39. func InitTidb(mys map[string]interface{}) {
  40. TidbCall = &mysql.Mysql{
  41. Address: qu.ObjToString(mys["address"]),
  42. UserName: qu.ObjToString(mys["userName"]),
  43. PassWord: qu.ObjToString(mys["passWord"]),
  44. DBName: qu.ObjToString(mys["dbName"]),
  45. MaxOpenConns: qu.IntAll(mys["maxOpenConns"]),
  46. MaxIdleConns: qu.IntAll(mys["maxIdleConns"]),
  47. }
  48. TidbCall.Init()
  49. log.Println("初始化 Tidb")
  50. }
  51. func init() {
  52. qu.ReadConfig(&config)
  53. //
  54. mgs, _ := config["mongodb"].(map[string]interface{})
  55. InitMongo(mgs)
  56. mys, _ := config["tidb1"].(map[string]interface{})
  57. InitMysql(mys)
  58. tbs, _ := config["tidb2"].(map[string]interface{})
  59. InitTidb(tbs)
  60. }
  61. func main() {
  62. //
  63. log.Println("电销 开始")
  64. // do1()
  65. log.Println("电销 结束")
  66. //
  67. log.Println("合力易捷 开始")
  68. // do2()
  69. log.Println("合力易捷 结束")
  70. }
  71. //电销线索刷库
  72. func do1() {
  73. var (
  74. pool = make(chan bool, 5)
  75. wait = &sync.WaitGroup{}
  76. )
  77. i := 0
  78. for {
  79. count := TidbBi.CountBySql(`SELECT count(1) FROM dwd_f_crm_clue_info where company_nature is null`)
  80. if count == 0 {
  81. log.Println("find no data end")
  82. return
  83. }
  84. TidbBi.SelectByBath(10, func(l *[]map[string]interface{}) bool {
  85. for _, v := range *l {
  86. pool <- true
  87. wait.Add(1)
  88. i++
  89. go func(thisData map[string]interface{}) {
  90. defer func() {
  91. <-pool
  92. wait.Done()
  93. }()
  94. id := gconv.Int64(thisData["id"])
  95. query := map[string]interface{}{
  96. "id": id,
  97. }
  98. cluename := gconv.String(thisData["cluename"])
  99. update := getCompanyType(cluename)
  100. ok := TidbBi.Update("dwd_f_crm_clue_info", query, update)
  101. if !ok {
  102. log.Println("crm clue info update err", query, update)
  103. }
  104. }(v)
  105. }
  106. if i%5000 == 0 {
  107. log.Println(fmt.Sprintf("current --- %d ", i))
  108. }
  109. return true
  110. }, `SELECT * FROM dwd_f_crm_clue_info where company_nature is null limit 500`)
  111. }
  112. wait.Wait()
  113. }
  114. //电销线索刷库 无并发
  115. func do1_bak() {
  116. i := 0
  117. for {
  118. count := TidbBi.CountBySql(`SELECT count(1) FROM dwd_f_crm_clue_info where company_nature is null`)
  119. if count == 0 {
  120. log.Println("find no data end")
  121. return
  122. }
  123. TidbBi.SelectByBath(10, func(l *[]map[string]interface{}) bool {
  124. for _, v := range *l {
  125. i++
  126. id := gconv.Int64(v["id"])
  127. query := map[string]interface{}{
  128. "id": id,
  129. }
  130. cluename := gconv.String(v["cluename"])
  131. update := getCompanyType(cluename)
  132. ok := TidbBi.Update("dwd_f_crm_clue_info", query, update)
  133. if !ok {
  134. log.Println("crm clue info update err", query, update)
  135. }
  136. }
  137. if i%5000 == 0 {
  138. log.Println(fmt.Sprintf("current --- %d ", i))
  139. }
  140. return true
  141. }, `SELECT * FROM dwd_f_crm_clue_info where company_nature is null limit 500`)
  142. }
  143. }
  144. //合力易捷刷库
  145. func do2() {
  146. i := 0
  147. var (
  148. pool = make(chan bool, 5)
  149. wait = &sync.WaitGroup{}
  150. )
  151. for {
  152. count := TidbCall.CountBySql(`SELECT count(1) FROM customer where company_nature is null`)
  153. if count == 0 {
  154. log.Println("find no data end")
  155. return
  156. }
  157. TidbCall.SelectByBath(10, func(l *[]map[string]interface{}) bool {
  158. for _, v := range *l {
  159. pool <- true
  160. wait.Add(1)
  161. i++
  162. go func(thisData map[string]interface{}) {
  163. defer func() {
  164. <-pool
  165. wait.Done()
  166. }()
  167. id := gconv.String(thisData["unique_id"])
  168. query := map[string]interface{}{
  169. "unique_id": id,
  170. }
  171. company := gconv.String(thisData["company"])
  172. update := getCompanyType(company)
  173. ok := TidbCall.Update("customer", query, update)
  174. if !ok {
  175. log.Println("customer info update err", query, update)
  176. }
  177. }(v)
  178. }
  179. if i%5000 == 0 {
  180. log.Println(fmt.Sprintf("current --- %d ", i))
  181. }
  182. return true
  183. }, `SELECT * FROM customer where company_nature is null limit 1000`)
  184. }
  185. wait.Wait()
  186. }
  187. //合力易捷刷库 无并发
  188. func do2_bak() {
  189. i := 0
  190. for {
  191. count := TidbCall.CountBySql(`SELECT count(1) FROM customer where company_nature is null`)
  192. if count == 0 {
  193. log.Println("find no data end")
  194. return
  195. }
  196. TidbCall.SelectByBath(10, func(l *[]map[string]interface{}) bool {
  197. for _, v := range *l {
  198. i++
  199. id := gconv.String(v["unique_id"])
  200. query := map[string]interface{}{
  201. "unique_id": id,
  202. }
  203. company := gconv.String(v["company"])
  204. update := getCompanyType(company)
  205. ok := TidbCall.Update("customer", query, update)
  206. if !ok {
  207. log.Println("customer info update err", query, update)
  208. }
  209. }
  210. if i%5000 == 0 {
  211. log.Println(fmt.Sprintf("current --- %d ", i))
  212. }
  213. return true
  214. }, `SELECT * FROM customer where company_nature is null limit 1000`)
  215. }
  216. }
  217. //公司性质、公司核验
  218. func getCompanyType(name string) map[string]interface{} {
  219. data := map[string]interface{}{
  220. "company_nature": 0,
  221. "company_verification": 0,
  222. }
  223. if c := TidbBi.CountBySql(`select count(1) from group_company_name where company_name=?`, name); c > 0 {
  224. data["company_nature"] = 1
  225. }
  226. if c := Mgo.Count("qyxy_std", map[string]interface{}{"company_name": name}); c > 0 {
  227. data["company_verification"] = 1
  228. }
  229. return data
  230. }