main.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package main
  2. import (
  3. "fieldproject_common/config"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "utils"
  8. "utils/log"
  9. "utils/mongodb"
  10. "utils/mysqldb"
  11. )
  12. var (
  13. MongoTool *mongodb.MongodbSim
  14. MysqlTool *mysqldb.Mysql
  15. )
  16. func init() {
  17. config.Init("./common.toml")
  18. InitLog()
  19. InitMgo()
  20. InitMysql()
  21. log.Info("init success")
  22. }
  23. func main() {
  24. task()
  25. }
  26. func task() {
  27. sess := MongoTool.GetMgoConn()
  28. defer MongoTool.DestoryMongoConn(sess)
  29. ch := make(chan bool, 2)
  30. wg := &sync.WaitGroup{}
  31. log.Info(fmt.Sprintf("%d", MongoTool.Count("zktest_mysql_company_info", nil)))
  32. field := map[string]interface{}{"use_flag": 0, "province_short": 0, "create_time": 0, "update_time": 0}
  33. query := sess.DB(config.Conf.DB.Mongo.Dbname).C("zktest_mysql_company_info").Find(nil).Select(field).Sort("-_id").Iter()
  34. count := 0
  35. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  36. if count%2000 == 0 {
  37. log.Info(fmt.Sprintf("current --- %d", count))
  38. }
  39. ch <- true
  40. wg.Add(1)
  41. go func(tmp map[string]interface{}) {
  42. defer func() {
  43. <-ch
  44. wg.Done()
  45. }()
  46. delete(tmp, "_id")
  47. m := make(map[string]interface{})
  48. if util.ObjToString(tmp["district"]) != "" {
  49. m["district"] = tmp["district"]
  50. } else if util.ObjToString(tmp["city"]) != "" {
  51. m["city"] = tmp["city"]
  52. } else {
  53. m["area"] = tmp["area"]
  54. }
  55. if len(m) > 0 {
  56. info := MysqlTool.FindOne("code_area", m, "", "")
  57. if info != nil && len(*info) > 0 {
  58. tmp["areacode"] = (*info)["code"]
  59. } else {
  60. tmp["areacode"] = "000000"
  61. }
  62. } else {
  63. tmp["areacode"] = "000000"
  64. }
  65. delete(tmp, "area")
  66. delete(tmp, "city")
  67. delete(tmp, "district")
  68. tmp["comeintime"] = time.Now()
  69. tmp["updatetime"] = time.Now()
  70. tmp["sourcetype"] = 1
  71. MysqlTool.Insert("company_baseinfo", tmp)
  72. }(tmp)
  73. tmp = make(map[string]interface{})
  74. }
  75. wg.Wait()
  76. log.Info(fmt.Sprintf("over --- %d", count))
  77. }