main.go 7.9 KB


  1. package main
  2. import (
  3. "fmt"
  4. "github.com/spf13/viper"
  5. "go.uber.org/zap"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "math/rand"
  11. "strconv"
  12. "strings"
  13. "time"
  14. )
  15. var (
  16. MongoBase *mongodb.MongodbSim
  17. MongoStd *mongodb.MongodbSim
  18. GF GlobalConf
  19. Es *elastic.Elastic
  20. //更新es
  21. updateEsPool = make(chan []map[string]interface{}, 5000)
  22. updateEsSp = make(chan bool, 2) //保存协程
  23. // 更新mongo
  24. updatePool = make(chan []map[string]interface{}, 5000)
  25. updateSp = make(chan bool, 5)
  26. )
  27. func InitConfig() (err error) {
  28. viper.SetConfigFile("config.toml") // 指定配置文件路径
  29. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  30. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  31. viper.AddConfigPath("./")
  32. viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
  33. viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
  34. err = viper.ReadInConfig() // 查找并读取配置文件
  35. if err != nil { // 处理读取配置文件的错误
  36. return
  37. }
  38. err = viper.Unmarshal(&GF)
  39. return err
  40. }
  41. func InitES() {
  42. Es = &elastic.Elastic{
  43. //S_esurl: "http://127.0.0.1:19805",
  44. S_esurl: "http://172.17.4.184:19805",
  45. I_size: 5,
  46. Username: "es_all",
  47. Password: "TopJkO2E_d1x",
  48. }
  49. Es.InitElasticSize()
  50. }
  51. func InitMgo() {
  52. MongoStd = &mongodb.MongodbSim{
  53. //MongodbAddr: "127.0.0.1:27083",
  54. MongodbAddr: GF.MongoStd.Host,
  55. Size: 10,
  56. DbName: GF.MongoStd.DB,
  57. UserName: GF.MongoStd.Username,
  58. Password: GF.MongoStd.Password,
  59. Direct: GF.MongoStd.Direct,
  60. }
  61. MongoStd.InitPool()
  62. MongoBase = &mongodb.MongodbSim{
  63. //MongodbAddr: "127.0.0.1:27001",
  64. MongodbAddr: GF.MongoBase.Host,
  65. Size: 10,
  66. DbName: GF.MongoBase.DB,
  67. UserName: GF.MongoBase.Username,
  68. Password: GF.MongoBase.Password,
  69. Direct: GF.MongoBase.Direct,
  70. }
  71. MongoBase.InitPool()
  72. }
  73. func InitLog() {
  74. err := log.InitLog(
  75. log.Path("./logs/log.out"),
  76. //log.Path(""),
  77. log.Level("info"),
  78. log.Compress(true),
  79. log.MaxSize(10),
  80. log.MaxBackups(10),
  81. log.MaxAge(7),
  82. log.Format("json"),
  83. )
  84. if err != nil {
  85. fmt.Printf("InitLog failed: %v\n", err)
  86. }
  87. }
  88. func main() {
  89. InitConfig()
  90. InitMgo()
  91. InitLog()
  92. InitES()
  93. go updateEsMethod()
  94. go updateMethod()
  95. sess := MongoBase.GetMgoConn()
  96. defer MongoBase.DestoryMongoConn(sess)
  97. query := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Iter()
  98. count := 0
  99. //ch := make(chan bool, 20)
  100. //wg := &sync.WaitGroup{}
  101. var secondTmp int = 10000
  102. //var lock sync.Mutex
  103. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  104. //if count%1000 == 0 {
  105. // log.Info("main", zap.Any("current ", count), zap.Any("_id", tmp["_id"]), zap.Int("pre secondTmp", secondTmp))
  106. //}
  107. companyId := util.ObjToString(tmp["company_id"])
  108. stdUpdate := make(map[string]interface{}, 0)
  109. //lock.Lock()
  110. secondTmp += getRandom()
  111. //lock.Unlock()
  112. second := strconv.Itoa(secondTmp)
  113. var dates []string
  114. var first string
  115. // 注册时间不为空
  116. if util.ObjToString(tmp["establish_date"]) != "" {
  117. dates = strings.Split(util.ObjToString(tmp["establish_date"]), "-")
  118. } else {
  119. dates = strings.Split(strings.Split(util.ObjToString(tmp["create_time"]), " ")[0], "-")
  120. }
  121. first = strings.Join(dates, "")
  122. first = first[2:]
  123. seo_id := first + second
  124. stdUpdate["seo_id"] = seo_id
  125. stdUpdate["autoid"] = tmp["_id"]
  126. //stdUpdateData["$set"] = stdUpdate
  127. if count%1000 == 0 {
  128. log.Info("main", zap.Any("current ", count), zap.Any("_id", tmp["_id"]), zap.Int(" secondTmp", secondTmp), zap.String("seo_id", seo_id))
  129. }
  130. //MongoStd.Update("qyxy_std", map[string]interface{}{"_id": companyId}, stdUpdateData, true, false)
  131. //更新MongoDB
  132. updatePool <- []map[string]interface{}{
  133. {"_id": companyId},
  134. {"$set": stdUpdate},
  135. }
  136. // 更新es
  137. updateEsPool <- []map[string]interface{}{
  138. {"_id": companyId},
  139. stdUpdate,
  140. }
  141. //ch <- true
  142. //wg.Add(1)
  143. //go func(tmp map[string]interface{}) {
  144. // defer func() {
  145. // <-ch
  146. // wg.Done()
  147. // }()
  148. //
  149. // companyId := util.ObjToString(tmp["company_id"])
  150. // stdUpdate := make(map[string]interface{}, 0)
  151. // //stdUpdateData := make(map[string]interface{}, 0)
  152. //
  153. // lock.Lock()
  154. // secondTmp += getRandom()
  155. // lock.Unlock()
  156. // second := strconv.Itoa(secondTmp)
  157. //
  158. //
  159. //
  160. // //if count%1000 == 0 {
  161. // // log.Info("main", zap.Any("current ", count), zap.Any("_id", tmp["_id"]), zap.Int("secondTmp", secondTmp))
  162. // //}
  163. //
  164. // var dates []string
  165. // var first string
  166. // // 注册时间不为空
  167. // if util.ObjToString(tmp["establish_date"]) != "" {
  168. // dates = strings.Split(util.ObjToString(tmp["establish_date"]), "-")
  169. // } else {
  170. // dates = strings.Split(strings.Split(util.ObjToString(tmp["create_time"]), " ")[0], "-")
  171. // }
  172. //
  173. // first = strings.Join(dates, "")
  174. // first = first[2:]
  175. // seo_id := first + second
  176. // stdUpdate["seo_id"] = seo_id
  177. // stdUpdate["autoid"] = tmp["_id"]
  178. // //stdUpdateData["$set"] = stdUpdate
  179. //
  180. // //MongoStd.Update("qyxy_std", map[string]interface{}{"_id": companyId}, stdUpdateData, true, false)
  181. //
  182. // //更新MongoDB
  183. // updatePool <- []map[string]interface{}{
  184. // {"_id": companyId},
  185. // {"$set": stdUpdate},
  186. // }
  187. //
  188. // // 更新es
  189. // updateEsPool <- []map[string]interface{}{
  190. // {"_id": companyId},
  191. // stdUpdate,
  192. // }
  193. //
  194. //}(tmp)
  195. tmp = make(map[string]interface{})
  196. }
  197. //wg.Wait()
  198. log.Info("main", zap.String("deal", "over"), zap.Int("count", count), zap.Int("secondTmp", secondTmp))
  199. select {}
  200. }
  201. func formatNumber(num, length int) string {
  202. numStr := strconv.Itoa(num) // 将数字转换为字符串
  203. numLen := len(numStr) // 获取数字字符串的长度
  204. // 计算需要添加的前导零的个数
  205. leadingZeros := length - numLen
  206. // 生成前导零的字符串
  207. leadingZerosStr := ""
  208. for i := 0; i < leadingZeros; i++ {
  209. leadingZerosStr += "0"
  210. }
  211. // 将前导零字符串附加到数字字符串前面
  212. formattedStr := leadingZerosStr + numStr
  213. return formattedStr
  214. }
  215. //getRandom getRandom
  216. func getRandom() int {
  217. // 设置随机种子
  218. rand.Seed(time.Now().UnixNano())
  219. // 生成1到20之间的随机数
  220. randomNumber := rand.Intn(200) + 1
  221. return randomNumber
  222. }
  223. //updateEsMethod 更新es
  224. func updateEsMethod() {
  225. arru := make([][]map[string]interface{}, 200)
  226. indexu := 0
  227. for {
  228. select {
  229. case v := <-updateEsPool:
  230. arru[indexu] = v
  231. indexu++
  232. if indexu == 200 {
  233. updateEsSp <- true
  234. go func(arru [][]map[string]interface{}) {
  235. defer func() {
  236. <-updateEsSp
  237. }()
  238. Es.UpdateBulk("qyxy", arru...)
  239. }(arru)
  240. arru = make([][]map[string]interface{}, 200)
  241. indexu = 0
  242. }
  243. case <-time.After(1000 * time.Millisecond):
  244. if indexu > 0 {
  245. updateEsSp <- true
  246. go func(arru [][]map[string]interface{}) {
  247. defer func() {
  248. <-updateEsSp
  249. }()
  250. Es.UpdateBulk("qyxy", arru...)
  251. }(arru[:indexu])
  252. arru = make([][]map[string]interface{}, 200)
  253. indexu = 0
  254. }
  255. }
  256. }
  257. }
  258. //updateMethod 更新MongoDB
  259. func updateMethod() {
  260. arru := make([][]map[string]interface{}, 200)
  261. indexu := 0
  262. for {
  263. select {
  264. case v := <-updatePool:
  265. arru[indexu] = v
  266. indexu++
  267. if indexu == 200 {
  268. updateSp <- true
  269. go func(arru [][]map[string]interface{}) {
  270. defer func() {
  271. <-updateSp
  272. }()
  273. MongoStd.UpdateBulk("qyxy_std", arru...)
  274. }(arru)
  275. arru = make([][]map[string]interface{}, 200)
  276. indexu = 0
  277. }
  278. case <-time.After(1000 * time.Millisecond):
  279. if indexu > 0 {
  280. updateSp <- true
  281. go func(arru [][]map[string]interface{}) {
  282. defer func() {
  283. <-updateSp
  284. }()
  285. MongoStd.UpdateBulk("qyxy_std", arru...)
  286. }(arru[:indexu])
  287. arru = make([][]map[string]interface{}, 200)
  288. indexu = 0
  289. }
  290. }
  291. }
  292. }