tidbTask.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "fmt"
  8. "go.uber.org/zap"
  9. "math/rand"
  10. "proposed_project/config"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "unicode/utf8"
  16. )
  17. var (
  18. saveBasePool = make(chan map[string]interface{}, 5000)
  19. saveBaseSp = make(chan bool, 1)
  20. saveRcPool = make(chan map[string]interface{}, 5000)
  21. saveRcSp = make(chan bool, 1)
  22. saveCtPool = make(chan map[string]interface{}, 5000)
  23. saveCtSp = make(chan bool, 1)
  24. saveCyPool = make(chan map[string]interface{}, 5000)
  25. saveCySp = make(chan bool, 1)
  26. BaseField = []string{"lasttime", "firsttime", "proposed_number", "proposed_id", "follow_num", "title", "projectname", "approvecode",
  27. "approvenumber", "project_stage_code", "total_investment", "funds", "owner", "name_id", "ownerclass_code", "projecttype_code", "projectaddr",
  28. "projectperiod", "project_startdate", "project_completedate", "industry_code", "approvestatus", "project_scale",
  29. "category_code", "nature_code", "construction_area", "floor_area", "area_code", "city_code", "createtime"}
  30. RecordField = []string{"proposed_id", "infoid", "follow_num", "project_stage_code", "title", "project_scale", "publishtime", "jybxhref", "createtime"}
  31. ContactField = []string{"proposed_id", "infoid", "follow_num", "name_id", "name", "contact_name", "contact_tel", "contact_addr", "createtime"}
  32. CategoryField = []string{"proposed_id", "labelcode", "labelvalues", "labelweight", "createtime"}
  33. AreaCode = make(map[string]string, 5000)
  34. )
  35. func InitArea() {
  36. info := MysqlTool.Find("d_area_code_back", nil, "", "", -1, -1)
  37. for _, m := range *info {
  38. var key string
  39. for i, v := range []string{"area", "city", "district"} {
  40. if i == 0 && util.ObjToString(m[v]) != "" {
  41. key = util.ObjToString(m[v])
  42. } else if util.ObjToString(m[v]) != "" {
  43. key += "," + util.ObjToString(m[v])
  44. }
  45. }
  46. AreaCode[key] = util.ObjToString(m["code"])
  47. }
  48. log.Info("InitField", zap.Int("AreaCode", len(AreaCode)))
  49. }
  50. func taskTidb(q map[string]interface{}) {
  51. sess := MgoPro.GetMgoConn()
  52. defer MgoPro.DestoryMongoConn(sess)
  53. ch := make(chan bool, config.Conf.Serve.Thread)
  54. wg := &sync.WaitGroup{}
  55. var query *mongodb.MgoIter
  56. if q != nil && len(q) > 0 {
  57. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(q).Select(SelectF).Iter()
  58. } else {
  59. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(nil).Select(SelectF).Iter()
  60. }
  61. count := 0
  62. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  63. if count%20000 == 0 {
  64. log.Info(fmt.Sprintf("current --- %d", count))
  65. }
  66. if t := util.Int64All(tmp["pici"]); t > pici {
  67. pici = t
  68. }
  69. ch <- true
  70. wg.Add(1)
  71. go func(tmp map[string]interface{}) {
  72. defer func() {
  73. <-ch
  74. wg.Done()
  75. }()
  76. //saveM := make(map[string]interface{})
  77. //for _, f := range BaseField {
  78. // if f == "lasttime" || f == "firsttime" {
  79. // if t := util.Int64All(tmp[f]); t > 0 {
  80. // saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  81. // }
  82. // } else if f == "proposed_id" {
  83. // saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  84. // } else if f == "area_code" {
  85. // if tmp["area"] != nil {
  86. // saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  87. // }
  88. // } else if f == "city_code" {
  89. // if tmp["area"] != nil && tmp["city"] != nil {
  90. // c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  91. // saveM[f] = AreaCode[c]
  92. // }
  93. // } else if f == "owner" {
  94. // if v := util.ObjToString(tmp[f]); v != "" {
  95. // if utf8.RuneCountInString(v) < 100 {
  96. // saveM[f] = v
  97. // }
  98. // }
  99. // } else if f == "name_id" {
  100. // if b := util.ObjToString(tmp["owner"]); b != "" {
  101. // if eid := redis.GetStr("ent_id", b); eid != "" {
  102. // saveM["name_id"] = strings.Split(eid, "_")[0]
  103. // }
  104. // }
  105. // } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  106. // if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  107. // t := util.Int64All(tmp[f])
  108. // saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  109. // }
  110. // } else if f == "createtime" {
  111. // saveM[f] = time.Now().Format(util.Date_Full_Layout)
  112. // } else if f == "total_investment" {
  113. // text := util.ObjToString(tmp[f])
  114. // capital := ObjToMoney(text)
  115. // capital = capital / 10000
  116. // if capital != 0 {
  117. // capital, _ = util.FormatFloat(capital, 6)
  118. // saveM[f] = capital
  119. // }
  120. // } else if f == "approvestatus" {
  121. // if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  122. // saveM[f] = tmp[f]
  123. // }
  124. // } else if f == "proposed_number" {
  125. // if tmp[f] == nil {
  126. // now := time.Now().Unix()
  127. // st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  128. // parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  129. // rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  130. // saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  131. // } else {
  132. // saveM[f] = tmp[f]
  133. // }
  134. // } else if f == "approvecode" {
  135. // if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  136. // saveM[f] = tmp[f]
  137. // }
  138. // } else if f == "floor_area" {
  139. // if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  140. // saveM[f] = tmp[f]
  141. // }
  142. // } else {
  143. // if tmp[f] != nil {
  144. // saveM[f] = tmp[f]
  145. // }
  146. // }
  147. //}
  148. //saveBasePool <- saveM
  149. saveCy := make(map[string]interface{})
  150. saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  151. saveCy["labelcode"] = "category_code"
  152. saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
  153. saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
  154. saveCyPool <- saveCy
  155. for _, v := range tmp["list"].([]interface{}) {
  156. saveRc := make(map[string]interface{})
  157. v1 := v.(map[string]interface{})
  158. saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  159. infoid := util.ObjToString(v1["infoid"])
  160. saveRc["infoid"] = infoid
  161. saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  162. saveRc["follow_num"] = v1["follow_num"]
  163. saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
  164. saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
  165. saveRc["title"] = util.ObjToString(v1["title"])
  166. if t := util.Int64All(v1["publishtime"]); t > 0 {
  167. saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  168. }
  169. saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
  170. saveRcPool <- saveRc
  171. //if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
  172. // saveCt := make(map[string]interface{})
  173. // saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  174. // saveCt["infoid"] = infoid
  175. // saveCt["follow_num"] = tmp["follow_num"]
  176. // if b := util.ObjToString(tmp["owner"]); b != "" {
  177. // saveCt["name"] = util.ObjToString(tmp["owner"])
  178. // if eid := redis.GetStr("ent_id", b); eid != "" {
  179. // saveCt["name_id"] = strings.Split(eid, "_")[0]
  180. // }
  181. // }
  182. // if p := util.ObjToString(v1["project_person"]); p != "" {
  183. // saveCt["contact_name"] = p
  184. // }
  185. // if p := util.ObjToString(v1["project_phone"]); p != "" {
  186. // saveCt["contact_tel"] = p
  187. // }
  188. // if p := util.ObjToString(v1["projectaddr"]); p != "" {
  189. // saveCt["contact_addr"] = p
  190. // }
  191. // saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  192. // saveCtPool <- saveCt
  193. //}
  194. }
  195. }(tmp)
  196. tmp = make(map[string]interface{})
  197. }
  198. wg.Wait()
  199. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  200. }
  201. func taskTidb_add(q map[string]interface{}) {
  202. sess := MgoPro.GetMgoConn()
  203. defer MgoPro.DestoryMongoConn(sess)
  204. ch := make(chan bool, config.Conf.Serve.Thread)
  205. wg := &sync.WaitGroup{}
  206. var query *mongodb.MgoIter
  207. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(q).Select(SelectF).Iter()
  208. count := 0
  209. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  210. if count%200 == 0 {
  211. log.Info(fmt.Sprintf("current --- %d", count))
  212. }
  213. if t := util.Int64All(tmp["pici"]); t > pici {
  214. pici = t
  215. }
  216. ch <- true
  217. wg.Add(1)
  218. go func(tmp map[string]interface{}) {
  219. defer func() {
  220. <-ch
  221. wg.Done()
  222. }()
  223. taskB(tmp)
  224. taskE(tmp)
  225. }(tmp)
  226. tmp = make(map[string]interface{})
  227. }
  228. wg.Wait()
  229. TaskSingle = true
  230. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  231. }
  232. func taskB(tmp map[string]interface{}) {
  233. saveM := make(map[string]interface{})
  234. for _, f := range BaseField {
  235. if f == "lasttime" || f == "firsttime" {
  236. if t := util.Int64All(tmp[f]); t > 0 {
  237. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  238. }
  239. } else if f == "proposed_id" {
  240. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  241. } else if f == "area_code" {
  242. if tmp["area"] != nil {
  243. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  244. }
  245. } else if f == "city_code" {
  246. if tmp["area"] != nil && tmp["city"] != nil {
  247. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  248. saveM[f] = AreaCode[c]
  249. }
  250. } else if f == "owner" {
  251. if v := util.ObjToString(tmp[f]); v != "" {
  252. if utf8.RuneCountInString(v) < 100 {
  253. saveM[f] = v
  254. }
  255. }
  256. } else if f == "name_id" {
  257. if b := util.ObjToString(tmp["owner"]); b != "" {
  258. if eid := redis.GetStr("ent_id", b); eid != "" {
  259. saveM["name_id"] = strings.Split(eid, "_")[0]
  260. }
  261. }
  262. } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  263. if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  264. t := util.Int64All(tmp[f])
  265. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  266. }
  267. } else if f == "createtime" {
  268. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  269. } else if f == "total_investment" {
  270. text := util.ObjToString(tmp[f])
  271. capital := ObjToMoney(text)
  272. capital = capital / 10000
  273. if capital != 0 {
  274. capital, _ = util.FormatFloat(capital, 6)
  275. saveM[f] = capital
  276. }
  277. } else if f == "approvestatus" {
  278. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  279. saveM[f] = tmp[f]
  280. }
  281. } else if f == "proposed_number" {
  282. if tmp[f] == nil {
  283. now := time.Now().Unix()
  284. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  285. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  286. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  287. saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  288. } else {
  289. saveM[f] = tmp[f]
  290. }
  291. } else if f == "approvecode" {
  292. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  293. saveM[f] = tmp[f]
  294. }
  295. } else if f == "floor_area" {
  296. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  297. saveM[f] = tmp[f]
  298. }
  299. } else {
  300. if tmp[f] != nil {
  301. saveM[f] = tmp[f]
  302. }
  303. }
  304. }
  305. info := MysqlTool.FindOne("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  306. if info != nil && len(*info) > 0 {
  307. saveM["updatetime"] = time.Now().Format(util.Date_Full_Layout)
  308. MysqlTool.Update("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, saveM)
  309. } else {
  310. MysqlTool.Insert("dwd_f_nzj_baseinfo", saveM)
  311. }
  312. info1 := MysqlTool.FindOne("dwd_f_nzj_category_tags", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  313. if info1 != nil && len(*info1) > 0 {
  314. } else {
  315. saveCy := make(map[string]interface{})
  316. saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  317. saveCy["labelcode"] = "category_code"
  318. saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
  319. saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
  320. MysqlTool.Insert("dwd_f_nzj_category_tags", saveCy)
  321. }
  322. }
  323. func taskE(tmp map[string]interface{}) {
  324. for _, v := range tmp["list"].([]interface{}) {
  325. v1 := v.(map[string]interface{})
  326. infoid := util.ObjToString(v1["infoid"])
  327. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
  328. if info == nil || len(*info) == 0 {
  329. saveRc := make(map[string]interface{})
  330. saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  331. saveRc["infoid"] = infoid
  332. saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  333. saveRc["follow_num"] = v1["follow_num"]
  334. saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
  335. saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
  336. saveRc["title"] = util.ObjToString(v1["title"])
  337. if t := util.Int64All(v1["publishtime"]); t > 0 {
  338. saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  339. }
  340. saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
  341. MysqlTool.Insert("dwd_f_nzj_follw_record", saveRc)
  342. }
  343. info1 := MysqlTool.FindOne("dwd_f_nzj_contact", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
  344. if info1 == nil || len(*info1) == 0 {
  345. if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
  346. saveCt := make(map[string]interface{})
  347. saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  348. saveCt["infoid"] = infoid
  349. saveCt["follow_num"] = tmp["follow_num"]
  350. if b := util.ObjToString(tmp["owner"]); b != "" {
  351. saveCt["name"] = util.ObjToString(tmp["owner"])
  352. if eid := redis.GetStr("ent_id", b); eid != "" {
  353. saveCt["name_id"] = strings.Split(eid, "_")[0]
  354. }
  355. }
  356. if p := util.ObjToString(v1["project_person"]); p != "" {
  357. saveCt["contact_name"] = p
  358. }
  359. if p := util.ObjToString(v1["project_phone"]); p != "" {
  360. saveCt["contact_tel"] = p
  361. }
  362. if p := util.ObjToString(v1["projectaddr"]); p != "" {
  363. saveCt["contact_addr"] = p
  364. }
  365. saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  366. MysqlTool.Insert("dwd_f_nzj_contact", saveCt)
  367. }
  368. }
  369. }
  370. }
  371. func SaveFunc(table string, arr []string) {
  372. arru := make([]map[string]interface{}, saveSize)
  373. indexu := 0
  374. for {
  375. select {
  376. case v := <-saveBasePool:
  377. arru[indexu] = v
  378. indexu++
  379. if indexu == saveSize {
  380. saveBaseSp <- true
  381. go func(arru []map[string]interface{}) {
  382. defer func() {
  383. <-saveBaseSp
  384. }()
  385. MysqlTool.InsertBulk(table, arr, arru...)
  386. }(arru)
  387. arru = make([]map[string]interface{}, saveSize)
  388. indexu = 0
  389. }
  390. case <-time.After(1000 * time.Millisecond):
  391. if indexu > 0 {
  392. saveBaseSp <- true
  393. go func(arru []map[string]interface{}) {
  394. defer func() {
  395. <-saveBaseSp
  396. }()
  397. MysqlTool.InsertBulk(table, arr, arru...)
  398. }(arru[:indexu])
  399. arru = make([]map[string]interface{}, saveSize)
  400. indexu = 0
  401. }
  402. }
  403. }
  404. }
  405. func SaveRFunc(table string, arr []string) {
  406. arru := make([]map[string]interface{}, saveSize)
  407. indexu := 0
  408. for {
  409. select {
  410. case v := <-saveRcPool:
  411. arru[indexu] = v
  412. indexu++
  413. if indexu == saveSize {
  414. saveRcSp <- true
  415. go func(arru []map[string]interface{}) {
  416. defer func() {
  417. <-saveRcSp
  418. }()
  419. MysqlTool.InsertBulk(table, arr, arru...)
  420. }(arru)
  421. arru = make([]map[string]interface{}, saveSize)
  422. indexu = 0
  423. }
  424. case <-time.After(1000 * time.Millisecond):
  425. if indexu > 0 {
  426. saveRcSp <- true
  427. go func(arru []map[string]interface{}) {
  428. defer func() {
  429. <-saveRcSp
  430. }()
  431. MysqlTool.InsertBulk(table, arr, arru...)
  432. }(arru[:indexu])
  433. arru = make([]map[string]interface{}, saveSize)
  434. indexu = 0
  435. }
  436. }
  437. }
  438. }
  439. func SaveCFunc(table string, arr []string) {
  440. arru := make([]map[string]interface{}, saveSize)
  441. indexu := 0
  442. for {
  443. select {
  444. case v := <-saveCtPool:
  445. arru[indexu] = v
  446. indexu++
  447. if indexu == saveSize {
  448. saveCtSp <- true
  449. go func(arru []map[string]interface{}) {
  450. defer func() {
  451. <-saveCtSp
  452. }()
  453. MysqlTool.InsertBulk(table, arr, arru...)
  454. }(arru)
  455. arru = make([]map[string]interface{}, saveSize)
  456. indexu = 0
  457. }
  458. case <-time.After(1000 * time.Millisecond):
  459. if indexu > 0 {
  460. saveCtSp <- true
  461. go func(arru []map[string]interface{}) {
  462. defer func() {
  463. <-saveCtSp
  464. }()
  465. MysqlTool.InsertBulk(table, arr, arru...)
  466. }(arru[:indexu])
  467. arru = make([]map[string]interface{}, saveSize)
  468. indexu = 0
  469. }
  470. }
  471. }
  472. }
  473. func SaveCyFunc(table string, arr []string) {
  474. arru := make([]map[string]interface{}, saveSize)
  475. indexu := 0
  476. for {
  477. select {
  478. case v := <-saveCyPool:
  479. arru[indexu] = v
  480. indexu++
  481. if indexu == saveSize {
  482. saveCySp <- true
  483. go func(arru []map[string]interface{}) {
  484. defer func() {
  485. <-saveCySp
  486. }()
  487. MysqlTool.InsertBulk(table, arr, arru...)
  488. }(arru)
  489. arru = make([]map[string]interface{}, saveSize)
  490. indexu = 0
  491. }
  492. case <-time.After(1000 * time.Millisecond):
  493. if indexu > 0 {
  494. saveCySp <- true
  495. go func(arru []map[string]interface{}) {
  496. defer func() {
  497. <-saveCySp
  498. }()
  499. MysqlTool.InsertBulk(table, arr, arru...)
  500. }(arru[:indexu])
  501. arru = make([]map[string]interface{}, saveSize)
  502. indexu = 0
  503. }
  504. }
  505. }
  506. }