tidbTask.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "fmt"
  8. "go.uber.org/zap"
  9. "math/rand"
  10. "proposed_project/config"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "unicode/utf8"
  16. )
  17. var (
  18. saveBasePool = make(chan map[string]interface{}, 5000)
  19. saveBaseSp = make(chan bool, 1)
  20. saveRcPool = make(chan map[string]interface{}, 5000)
  21. saveRcSp = make(chan bool, 1)
  22. saveCtPool = make(chan map[string]interface{}, 5000)
  23. saveCtSp = make(chan bool, 1)
  24. saveCyPool = make(chan map[string]interface{}, 5000)
  25. saveCySp = make(chan bool, 1)
  26. saveEntPool = make(chan map[string]interface{}, 5000)
  27. saveEntSp = make(chan bool, 1)
  28. BaseField = []string{"lasttime", "firsttime", "proposed_number", "proposed_id", "follow_num", "title", "projectname", "approvecode",
  29. "approvenumber", "project_stage_code", "total_investment", "funds", "owner", "name_id", "ownerclass_code", "projecttype_code", "projectaddr",
  30. "projectperiod", "project_startdate", "project_completedate", "industry_code", "approvestatus", "project_scale",
  31. "category_code", "nature_code", "construction_area", "floor_area", "area_code", "city_code", "createtime"}
  32. RecordField = []string{"proposed_id", "infoid", "follow_num", "project_stage_code", "title", "project_scale", "publishtime", "jybxhref", "createtime"}
  33. ContactField = []string{"proposed_id", "infoid", "follow_num", "name_id", "name", "contact_name", "contact_tel", "contact_addr", "createtime"}
  34. CategoryField = []string{"proposed_id", "labelcode", "labelvalues", "labelweight", "createtime"}
  35. EntField = []string{"proposed_id", "name_id", "name", "area_code", "city_code", "createtime"}
  36. AreaCode = make(map[string]string, 5000)
  37. )
  38. func InitArea() {
  39. info := MysqlTool.Find("d_area_code_back", nil, "", "", -1, -1)
  40. for _, m := range *info {
  41. var key string
  42. for i, v := range []string{"area", "city", "district"} {
  43. if i == 0 && util.ObjToString(m[v]) != "" {
  44. key = util.ObjToString(m[v])
  45. } else if util.ObjToString(m[v]) != "" {
  46. key += "," + util.ObjToString(m[v])
  47. }
  48. }
  49. AreaCode[key] = util.ObjToString(m["code"])
  50. }
  51. log.Info("InitField", zap.Int("AreaCode", len(AreaCode)))
  52. }
  53. func taskTidb(q map[string]interface{}) {
  54. sess := MgoPro.GetMgoConn()
  55. defer MgoPro.DestoryMongoConn(sess)
  56. ch := make(chan bool, config.Conf.Serve.Thread)
  57. wg := &sync.WaitGroup{}
  58. var query *mongodb.MgoIter
  59. if q != nil && len(q) > 0 {
  60. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(q).Select(SelectF).Iter()
  61. } else {
  62. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(nil).Select(SelectF).Iter()
  63. }
  64. count := 0
  65. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  66. if count%20000 == 0 {
  67. log.Info(fmt.Sprintf("current --- %d", count))
  68. }
  69. if t := util.Int64All(tmp["pici"]); t > pici {
  70. pici = t
  71. }
  72. ch <- true
  73. wg.Add(1)
  74. go func(tmp map[string]interface{}) {
  75. defer func() {
  76. <-ch
  77. wg.Done()
  78. }()
  79. //saveM := make(map[string]interface{})
  80. //for _, f := range BaseField {
  81. // if f == "lasttime" || f == "firsttime" {
  82. // if t := util.Int64All(tmp[f]); t > 0 {
  83. // saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  84. // }
  85. // } else if f == "proposed_id" {
  86. // saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  87. // } else if f == "area_code" {
  88. // if tmp["area"] != nil {
  89. // saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  90. // }
  91. // } else if f == "city_code" {
  92. // if tmp["area"] != nil && tmp["city"] != nil {
  93. // c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  94. // saveM[f] = AreaCode[c]
  95. // }
  96. // } else if f == "owner" {
  97. // if v := util.ObjToString(tmp[f]); v != "" {
  98. // if utf8.RuneCountInString(v) < 100 {
  99. // saveM[f] = v
  100. // }
  101. // }
  102. // } else if f == "name_id" {
  103. // if b := util.ObjToString(tmp["owner"]); b != "" {
  104. // if eid := redis.GetStr("ent_id", b); eid != "" {
  105. // saveM["name_id"] = strings.Split(eid, "_")[0]
  106. // }
  107. // }
  108. // } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  109. // if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  110. // t := util.Int64All(tmp[f])
  111. // saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  112. // }
  113. // } else if f == "createtime" {
  114. // saveM[f] = time.Now().Format(util.Date_Full_Layout)
  115. // } else if f == "total_investment" {
  116. // text := util.ObjToString(tmp[f])
  117. // capital := ObjToMoney(text)
  118. // capital = capital / 10000
  119. // if capital != 0 {
  120. // capital, _ = util.FormatFloat(capital, 6)
  121. // saveM[f] = capital
  122. // }
  123. // } else if f == "approvestatus" {
  124. // if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  125. // saveM[f] = tmp[f]
  126. // }
  127. // } else if f == "proposed_number" {
  128. // if tmp[f] == nil {
  129. // now := time.Now().Unix()
  130. // st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  131. // parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  132. // rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  133. // saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  134. // } else {
  135. // saveM[f] = tmp[f]
  136. // }
  137. // } else if f == "approvecode" {
  138. // if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  139. // saveM[f] = tmp[f]
  140. // }
  141. // } else if f == "floor_area" {
  142. // if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  143. // saveM[f] = tmp[f]
  144. // }
  145. // } else {
  146. // if tmp[f] != nil {
  147. // saveM[f] = tmp[f]
  148. // }
  149. // }
  150. //}
  151. //saveBasePool <- saveM
  152. //saveCy := make(map[string]interface{})
  153. //saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  154. //saveCy["labelcode"] = "category_code"
  155. //saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
  156. //saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
  157. //saveCyPool <- saveCy
  158. if ow := util.ObjToString(tmp["owner"]); ow != "" {
  159. saveEnt := make(map[string]interface{})
  160. saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  161. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  162. saveEnt["name"] = ow
  163. if eid := redis.GetStr("ent_id", ow); eid != "" {
  164. arr := strings.Split(eid, "_")
  165. saveEnt["name_id"] = arr[0]
  166. if len(arr) == 2 {
  167. saveEnt["area_code"] = arr[1]
  168. } else if len(arr) == 3 {
  169. saveEnt["city_code"] = arr[2]
  170. }
  171. }
  172. saveEnt["identify_type"] = 1
  173. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  174. saveEntPool <- saveEnt
  175. }
  176. //for _, v := range tmp["list"].([]interface{}) {
  177. // saveRc := make(map[string]interface{})
  178. // v1 := v.(map[string]interface{})
  179. // saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  180. // infoid := util.ObjToString(v1["infoid"])
  181. // saveRc["infoid"] = infoid
  182. // saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  183. // saveRc["follow_num"] = v1["follow_num"]
  184. // saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
  185. // saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
  186. // saveRc["title"] = util.ObjToString(v1["title"])
  187. // if t := util.Int64All(v1["publishtime"]); t > 0 {
  188. // saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  189. // }
  190. // saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
  191. // saveRcPool <- saveRc
  192. //
  193. // //if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
  194. // // saveCt := make(map[string]interface{})
  195. // // saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  196. // // saveCt["infoid"] = infoid
  197. // // saveCt["follow_num"] = tmp["follow_num"]
  198. // // if b := util.ObjToString(tmp["owner"]); b != "" {
  199. // // saveCt["name"] = util.ObjToString(tmp["owner"])
  200. // // if eid := redis.GetStr("ent_id", b); eid != "" {
  201. // // saveCt["name_id"] = strings.Split(eid, "_")[0]
  202. // // }
  203. // // }
  204. // // if p := util.ObjToString(v1["project_person"]); p != "" {
  205. // // saveCt["contact_name"] = p
  206. // // }
  207. // // if p := util.ObjToString(v1["project_phone"]); p != "" {
  208. // // saveCt["contact_tel"] = p
  209. // // }
  210. // // if p := util.ObjToString(v1["projectaddr"]); p != "" {
  211. // // saveCt["contact_addr"] = p
  212. // // }
  213. // // saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  214. // // saveCtPool <- saveCt
  215. // //}
  216. //}
  217. }(tmp)
  218. tmp = make(map[string]interface{})
  219. }
  220. wg.Wait()
  221. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  222. }
  223. func taskTidb_add(q map[string]interface{}) {
  224. sess := MgoPro.GetMgoConn()
  225. defer MgoPro.DestoryMongoConn(sess)
  226. ch := make(chan bool, config.Conf.Serve.Thread)
  227. wg := &sync.WaitGroup{}
  228. var query *mongodb.MgoIter
  229. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(q).Select(SelectF).Iter()
  230. count := 0
  231. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  232. if count%200 == 0 {
  233. log.Info(fmt.Sprintf("current --- %d", count))
  234. }
  235. if t := util.Int64All(tmp["pici"]); t > pici {
  236. pici = t
  237. }
  238. ch <- true
  239. wg.Add(1)
  240. go func(tmp map[string]interface{}) {
  241. defer func() {
  242. <-ch
  243. wg.Done()
  244. }()
  245. taskB(tmp)
  246. taskE(tmp)
  247. }(tmp)
  248. tmp = make(map[string]interface{})
  249. }
  250. wg.Wait()
  251. TaskSingle = true
  252. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  253. }
  254. func taskB(tmp map[string]interface{}) {
  255. saveM := make(map[string]interface{})
  256. for _, f := range BaseField {
  257. if f == "lasttime" || f == "firsttime" {
  258. if t := util.Int64All(tmp[f]); t > 0 {
  259. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  260. }
  261. } else if f == "proposed_id" {
  262. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  263. } else if f == "area_code" {
  264. if tmp["area"] != nil {
  265. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  266. }
  267. } else if f == "city_code" {
  268. if tmp["area"] != nil && tmp["city"] != nil {
  269. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  270. saveM[f] = AreaCode[c]
  271. }
  272. } else if f == "owner" {
  273. if v := util.ObjToString(tmp[f]); v != "" {
  274. if utf8.RuneCountInString(v) < 100 {
  275. saveM[f] = v
  276. }
  277. }
  278. } else if f == "name_id" {
  279. if b := util.ObjToString(tmp["owner"]); b != "" {
  280. if eid := redis.GetStr("ent_id", b); eid != "" {
  281. saveM["name_id"] = strings.Split(eid, "_")[0]
  282. }
  283. }
  284. } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  285. if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  286. t := util.Int64All(tmp[f])
  287. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  288. }
  289. } else if f == "createtime" {
  290. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  291. } else if f == "total_investment" {
  292. text := util.ObjToString(tmp[f])
  293. capital := ObjToMoney(text)
  294. capital = capital / 10000
  295. if capital != 0 {
  296. capital, _ = util.FormatFloat(capital, 6)
  297. saveM[f] = capital
  298. }
  299. } else if f == "approvestatus" {
  300. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  301. saveM[f] = tmp[f]
  302. }
  303. } else if f == "proposed_number" {
  304. if tmp[f] == nil {
  305. now := time.Now().Unix()
  306. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  307. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  308. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  309. saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  310. } else {
  311. saveM[f] = tmp[f]
  312. }
  313. } else if f == "approvecode" {
  314. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  315. saveM[f] = tmp[f]
  316. }
  317. } else if f == "floor_area" {
  318. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  319. saveM[f] = tmp[f]
  320. }
  321. } else {
  322. if tmp[f] != nil {
  323. saveM[f] = tmp[f]
  324. }
  325. }
  326. }
  327. info := MysqlTool.FindOne("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  328. if info != nil && len(*info) > 0 {
  329. saveM["updatetime"] = time.Now().Format(util.Date_Full_Layout)
  330. MysqlTool.Update("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, saveM)
  331. } else {
  332. MysqlTool.Insert("dwd_f_nzj_baseinfo", saveM)
  333. }
  334. info1 := MysqlTool.FindOne("dwd_f_nzj_category_tags", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  335. if info1 != nil && len(*info1) > 0 {
  336. } else {
  337. saveCy := make(map[string]interface{})
  338. saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  339. saveCy["labelcode"] = "category_code"
  340. saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
  341. saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
  342. MysqlTool.Insert("dwd_f_nzj_category_tags", saveCy)
  343. }
  344. info2 := MysqlTool.FindOne("dwd_f_nzj_ent", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  345. if info2 != nil && len(*info2) > 0 {
  346. } else {
  347. if ow := util.ObjToString(tmp["owner"]); ow != "" {
  348. saveEnt := make(map[string]interface{})
  349. saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  350. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  351. saveEnt["name"] = ow
  352. if eid := redis.GetStr("ent_id", ow); eid != "" {
  353. arr := strings.Split(eid, "_")
  354. saveEnt["name_id"] = arr[0]
  355. if len(arr) == 2 {
  356. saveEnt["area_code"] = arr[1]
  357. } else if len(arr) == 3 {
  358. saveEnt["city_code"] = arr[2]
  359. }
  360. }
  361. saveEnt["identify_type"] = 1
  362. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  363. MysqlTool.Insert("dwd_f_nzj_ent", saveEnt)
  364. }
  365. }
  366. }
  367. func taskE(tmp map[string]interface{}) {
  368. for _, v := range tmp["list"].([]interface{}) {
  369. v1 := v.(map[string]interface{})
  370. infoid := util.ObjToString(v1["infoid"])
  371. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
  372. if info == nil || len(*info) == 0 {
  373. saveRc := make(map[string]interface{})
  374. saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  375. saveRc["infoid"] = infoid
  376. saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  377. saveRc["follow_num"] = v1["follow_num"]
  378. saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
  379. saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
  380. saveRc["title"] = util.ObjToString(v1["title"])
  381. if t := util.Int64All(v1["publishtime"]); t > 0 {
  382. saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  383. }
  384. saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
  385. MysqlTool.Insert("dwd_f_nzj_follw_record", saveRc)
  386. }
  387. info1 := MysqlTool.FindOne("dwd_f_nzj_contact", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
  388. if info1 == nil || len(*info1) == 0 {
  389. if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
  390. saveCt := make(map[string]interface{})
  391. saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  392. saveCt["infoid"] = infoid
  393. saveCt["follow_num"] = tmp["follow_num"]
  394. if b := util.ObjToString(tmp["owner"]); b != "" {
  395. saveCt["name"] = util.ObjToString(tmp["owner"])
  396. if eid := redis.GetStr("ent_id", b); eid != "" {
  397. saveCt["name_id"] = strings.Split(eid, "_")[0]
  398. }
  399. }
  400. if p := util.ObjToString(v1["project_person"]); p != "" {
  401. saveCt["contact_name"] = p
  402. }
  403. if p := util.ObjToString(v1["project_phone"]); p != "" {
  404. saveCt["contact_tel"] = p
  405. }
  406. if p := util.ObjToString(v1["projectaddr"]); p != "" {
  407. saveCt["contact_addr"] = p
  408. }
  409. saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  410. MysqlTool.Insert("dwd_f_nzj_contact", saveCt)
  411. }
  412. }
  413. }
  414. }
  415. func SaveFunc(table string, arr []string) {
  416. arru := make([]map[string]interface{}, saveSize)
  417. indexu := 0
  418. for {
  419. select {
  420. case v := <-saveBasePool:
  421. arru[indexu] = v
  422. indexu++
  423. if indexu == saveSize {
  424. saveBaseSp <- true
  425. go func(arru []map[string]interface{}) {
  426. defer func() {
  427. <-saveBaseSp
  428. }()
  429. MysqlTool.InsertBulk(table, arr, arru...)
  430. }(arru)
  431. arru = make([]map[string]interface{}, saveSize)
  432. indexu = 0
  433. }
  434. case <-time.After(1000 * time.Millisecond):
  435. if indexu > 0 {
  436. saveBaseSp <- true
  437. go func(arru []map[string]interface{}) {
  438. defer func() {
  439. <-saveBaseSp
  440. }()
  441. MysqlTool.InsertBulk(table, arr, arru...)
  442. }(arru[:indexu])
  443. arru = make([]map[string]interface{}, saveSize)
  444. indexu = 0
  445. }
  446. }
  447. }
  448. }
  449. func SaveRFunc(table string, arr []string) {
  450. arru := make([]map[string]interface{}, saveSize)
  451. indexu := 0
  452. for {
  453. select {
  454. case v := <-saveRcPool:
  455. arru[indexu] = v
  456. indexu++
  457. if indexu == saveSize {
  458. saveRcSp <- true
  459. go func(arru []map[string]interface{}) {
  460. defer func() {
  461. <-saveRcSp
  462. }()
  463. MysqlTool.InsertBulk(table, arr, arru...)
  464. }(arru)
  465. arru = make([]map[string]interface{}, saveSize)
  466. indexu = 0
  467. }
  468. case <-time.After(1000 * time.Millisecond):
  469. if indexu > 0 {
  470. saveRcSp <- true
  471. go func(arru []map[string]interface{}) {
  472. defer func() {
  473. <-saveRcSp
  474. }()
  475. MysqlTool.InsertBulk(table, arr, arru...)
  476. }(arru[:indexu])
  477. arru = make([]map[string]interface{}, saveSize)
  478. indexu = 0
  479. }
  480. }
  481. }
  482. }
  483. func SaveCFunc(table string, arr []string) {
  484. arru := make([]map[string]interface{}, saveSize)
  485. indexu := 0
  486. for {
  487. select {
  488. case v := <-saveCtPool:
  489. arru[indexu] = v
  490. indexu++
  491. if indexu == saveSize {
  492. saveCtSp <- true
  493. go func(arru []map[string]interface{}) {
  494. defer func() {
  495. <-saveCtSp
  496. }()
  497. MysqlTool.InsertBulk(table, arr, arru...)
  498. }(arru)
  499. arru = make([]map[string]interface{}, saveSize)
  500. indexu = 0
  501. }
  502. case <-time.After(1000 * time.Millisecond):
  503. if indexu > 0 {
  504. saveCtSp <- true
  505. go func(arru []map[string]interface{}) {
  506. defer func() {
  507. <-saveCtSp
  508. }()
  509. MysqlTool.InsertBulk(table, arr, arru...)
  510. }(arru[:indexu])
  511. arru = make([]map[string]interface{}, saveSize)
  512. indexu = 0
  513. }
  514. }
  515. }
  516. }
  517. func SaveCyFunc(table string, arr []string) {
  518. arru := make([]map[string]interface{}, saveSize)
  519. indexu := 0
  520. for {
  521. select {
  522. case v := <-saveCyPool:
  523. arru[indexu] = v
  524. indexu++
  525. if indexu == saveSize {
  526. saveCySp <- true
  527. go func(arru []map[string]interface{}) {
  528. defer func() {
  529. <-saveCySp
  530. }()
  531. MysqlTool.InsertBulk(table, arr, arru...)
  532. }(arru)
  533. arru = make([]map[string]interface{}, saveSize)
  534. indexu = 0
  535. }
  536. case <-time.After(1000 * time.Millisecond):
  537. if indexu > 0 {
  538. saveCySp <- true
  539. go func(arru []map[string]interface{}) {
  540. defer func() {
  541. <-saveCySp
  542. }()
  543. MysqlTool.InsertBulk(table, arr, arru...)
  544. }(arru[:indexu])
  545. arru = make([]map[string]interface{}, saveSize)
  546. indexu = 0
  547. }
  548. }
  549. }
  550. }
  551. func SaveEntFunc(table string, arr []string) {
  552. arru := make([]map[string]interface{}, saveSize)
  553. indexu := 0
  554. for {
  555. select {
  556. case v := <-saveEntPool:
  557. arru[indexu] = v
  558. indexu++
  559. if indexu == saveSize {
  560. saveEntSp <- true
  561. go func(arru []map[string]interface{}) {
  562. defer func() {
  563. <-saveEntSp
  564. }()
  565. MysqlTool.InsertBulk(table, arr, arru...)
  566. }(arru)
  567. arru = make([]map[string]interface{}, saveSize)
  568. indexu = 0
  569. }
  570. case <-time.After(1000 * time.Millisecond):
  571. if indexu > 0 {
  572. saveEntSp <- true
  573. go func(arru []map[string]interface{}) {
  574. defer func() {
  575. <-saveEntSp
  576. }()
  577. MysqlTool.InsertBulk(table, arr, arru...)
  578. }(arru[:indexu])
  579. arru = make([]map[string]interface{}, saveSize)
  580. indexu = 0
  581. }
  582. }
  583. }
  584. }