save.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. package main
  2. import "time"
  3. func RunSaveFunc() {
  4. //go SaveFunc()
  5. //go SaveExpandFunc()
  6. //go SaveDetailFunc()
  7. //go SaveAttrFunc()
  8. //go SaveIntentFunc()
  9. //go SaveBidderFunc()
  10. //go SaveGoodsFunc()
  11. //go saveErrMethod()
  12. }
  13. func SaveFunc() {
  14. arru := make([]map[string]interface{}, saveSize)
  15. indexu := 0
  16. for {
  17. select {
  18. case v := <-saveBasePool:
  19. arru[indexu] = v
  20. indexu++
  21. if indexu == saveSize {
  22. saveBaseSp <- true
  23. go func(arru []map[string]interface{}) {
  24. defer func() {
  25. <-saveBaseSp
  26. }()
  27. MysqlTool.InsertBulk("dwd_f_bid_baseinfo", BaseField, arru...)
  28. }(arru)
  29. arru = make([]map[string]interface{}, saveSize)
  30. indexu = 0
  31. }
  32. case <-time.After(1000 * time.Millisecond):
  33. if indexu > 0 {
  34. saveBaseSp <- true
  35. go func(arru []map[string]interface{}) {
  36. defer func() {
  37. <-saveBaseSp
  38. }()
  39. MysqlTool.InsertBulk("dwd_f_bid_baseinfo", BaseField, arru...)
  40. }(arru[:indexu])
  41. arru = make([]map[string]interface{}, saveSize)
  42. indexu = 0
  43. }
  44. }
  45. }
  46. }
  47. func SaveExpandFunc() {
  48. arru := make([]map[string]interface{}, saveSize)
  49. indexu := 0
  50. for {
  51. select {
  52. case v := <-saveExpandPool:
  53. arru[indexu] = v
  54. indexu++
  55. if indexu == saveSize {
  56. saveExpandSp <- true
  57. go func(arru []map[string]interface{}) {
  58. defer func() {
  59. <-saveExpandSp
  60. }()
  61. MysqlTool.InsertBulk("dwd_f_bid_expand_baseinfo", ExpandField, arru...)
  62. }(arru)
  63. arru = make([]map[string]interface{}, saveSize)
  64. indexu = 0
  65. }
  66. case <-time.After(1000 * time.Millisecond):
  67. if indexu > 0 {
  68. saveExpandSp <- true
  69. go func(arru []map[string]interface{}) {
  70. defer func() {
  71. <-saveExpandSp
  72. }()
  73. MysqlTool.InsertBulk("dwd_f_bid_expand_baseinfo", ExpandField, arru...)
  74. }(arru[:indexu])
  75. arru = make([]map[string]interface{}, saveSize)
  76. indexu = 0
  77. }
  78. }
  79. }
  80. }
  81. func SaveDetailFunc() {
  82. arru := make([]map[string]interface{}, saveSize)
  83. indexu := 0
  84. for {
  85. select {
  86. case v := <-saveDetailPool:
  87. arru[indexu] = v
  88. indexu++
  89. if indexu == saveSize {
  90. saveDetailSp <- true
  91. go func(arru []map[string]interface{}) {
  92. defer func() {
  93. <-saveDetailSp
  94. }()
  95. MysqlTool.InsertBulk("dwd_f_bid_detail", DetailField, arru...)
  96. }(arru)
  97. arru = make([]map[string]interface{}, saveSize)
  98. indexu = 0
  99. }
  100. case <-time.After(1000 * time.Millisecond):
  101. if indexu > 0 {
  102. saveDetailSp <- true
  103. go func(arru []map[string]interface{}) {
  104. defer func() {
  105. <-saveDetailSp
  106. }()
  107. MysqlTool.InsertBulk("dwd_f_bid_detail", DetailField, arru...)
  108. }(arru[:indexu])
  109. arru = make([]map[string]interface{}, saveSize)
  110. indexu = 0
  111. }
  112. }
  113. }
  114. }
  115. func SaveAttrFunc() {
  116. arru := make([]map[string]interface{}, saveSize)
  117. indexu := 0
  118. for {
  119. select {
  120. case v := <-saveAttrPool:
  121. arru[indexu] = v
  122. indexu++
  123. if indexu == saveSize {
  124. saveAttrSp <- true
  125. go func(arru []map[string]interface{}) {
  126. defer func() {
  127. <-saveAttrSp
  128. }()
  129. MysqlTool.InsertBulk("dwd_f_bid_file_text", AttrField, arru...)
  130. }(arru)
  131. arru = make([]map[string]interface{}, saveSize)
  132. indexu = 0
  133. }
  134. case <-time.After(1000 * time.Millisecond):
  135. if indexu > 0 {
  136. saveAttrSp <- true
  137. go func(arru []map[string]interface{}) {
  138. defer func() {
  139. <-saveAttrSp
  140. }()
  141. MysqlTool.InsertBulk("dwd_f_bid_file_text", AttrField, arru...)
  142. }(arru[:indexu])
  143. arru = make([]map[string]interface{}, saveSize)
  144. indexu = 0
  145. }
  146. }
  147. }
  148. }
  149. func SaveIntentFunc() {
  150. arru := make([]map[string]interface{}, saveSize)
  151. indexu := 0
  152. for {
  153. select {
  154. case v := <-saveIntentPool:
  155. arru[indexu] = v
  156. indexu++
  157. if indexu == saveSize {
  158. saveIntentSp <- true
  159. go func(arru []map[string]interface{}) {
  160. defer func() {
  161. <-saveIntentSp
  162. }()
  163. MysqlTool.InsertBulk("dwd_f_bid_intention_baseinfo", IntentField, arru...)
  164. }(arru)
  165. arru = make([]map[string]interface{}, saveSize)
  166. indexu = 0
  167. }
  168. case <-time.After(1000 * time.Millisecond):
  169. if indexu > 0 {
  170. saveIntentSp <- true
  171. go func(arru []map[string]interface{}) {
  172. defer func() {
  173. <-saveIntentSp
  174. }()
  175. MysqlTool.InsertBulk("dwd_f_bid_intention_baseinfo", IntentField, arru...)
  176. }(arru[:indexu])
  177. arru = make([]map[string]interface{}, saveSize)
  178. indexu = 0
  179. }
  180. }
  181. }
  182. }
  183. func SaveBidderFunc() {
  184. arru := make([]map[string]interface{}, saveSize)
  185. indexu := 0
  186. for {
  187. select {
  188. case v := <-saveBidderPool:
  189. arru[indexu] = v
  190. indexu++
  191. if indexu == saveSize {
  192. saveBidderSp <- true
  193. go func(arru []map[string]interface{}) {
  194. defer func() {
  195. <-saveBidderSp
  196. }()
  197. MysqlTool.InsertBulk("dwd_f_bid_package_bidder_baseinfo", BidderField, arru...)
  198. }(arru)
  199. arru = make([]map[string]interface{}, saveSize)
  200. indexu = 0
  201. }
  202. case <-time.After(1000 * time.Millisecond):
  203. if indexu > 0 {
  204. saveBidderSp <- true
  205. go func(arru []map[string]interface{}) {
  206. defer func() {
  207. <-saveBidderSp
  208. }()
  209. MysqlTool.InsertBulk("dwd_f_bid_package_bidder_baseinfo", BidderField, arru...)
  210. }(arru[:indexu])
  211. arru = make([]map[string]interface{}, saveSize)
  212. indexu = 0
  213. }
  214. }
  215. }
  216. }
  217. func SaveGoodsFunc() {
  218. arru := make([]map[string]interface{}, saveSize)
  219. indexu := 0
  220. for {
  221. select {
  222. case v := <-saveGoodsPool:
  223. arru[indexu] = v
  224. indexu++
  225. if indexu == saveSize {
  226. saveGoodsSp <- true
  227. go func(arru []map[string]interface{}) {
  228. defer func() {
  229. <-saveGoodsSp
  230. }()
  231. MysqlTool.InsertBulk("dwd_f_bid_package_goods_baseinfo", GoodsField, arru...)
  232. }(arru)
  233. arru = make([]map[string]interface{}, saveSize)
  234. indexu = 0
  235. }
  236. case <-time.After(1000 * time.Millisecond):
  237. if indexu > 0 {
  238. saveGoodsSp <- true
  239. go func(arru []map[string]interface{}) {
  240. defer func() {
  241. <-saveGoodsSp
  242. }()
  243. MysqlTool.InsertBulk("dwd_f_bid_package_goods_baseinfo", GoodsField, arru...)
  244. }(arru[:indexu])
  245. arru = make([]map[string]interface{}, saveSize)
  246. indexu = 0
  247. }
  248. }
  249. }
  250. }
  251. /*
  252. 以下项目相关涉及
  253. */
  254. func SaveProFunc() {
  255. arru := make([]map[string]interface{}, saveSize)
  256. indexu := 0
  257. for {
  258. select {
  259. case v := <-saveProPool:
  260. arru[indexu] = v
  261. indexu++
  262. if indexu == saveSize {
  263. saveProSp <- true
  264. go func(arru []map[string]interface{}) {
  265. defer func() {
  266. <-saveProSp
  267. }()
  268. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  269. }(arru)
  270. arru = make([]map[string]interface{}, saveSize)
  271. indexu = 0
  272. }
  273. case <-time.After(1000 * time.Millisecond):
  274. if indexu > 0 {
  275. saveProSp <- true
  276. go func(arru []map[string]interface{}) {
  277. defer func() {
  278. <-saveProSp
  279. }()
  280. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  281. }(arru[:indexu])
  282. arru = make([]map[string]interface{}, saveSize)
  283. indexu = 0
  284. }
  285. }
  286. }
  287. }
  288. func SaveProbFunc() {
  289. arru := make([]map[string]interface{}, saveSize)
  290. indexu := 0
  291. for {
  292. select {
  293. case v := <-saveProbPool:
  294. arru[indexu] = v
  295. indexu++
  296. if indexu == saveSize {
  297. saveProbSp <- true
  298. go func(arru []map[string]interface{}) {
  299. defer func() {
  300. <-saveProbSp
  301. }()
  302. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  303. }(arru)
  304. arru = make([]map[string]interface{}, saveSize)
  305. indexu = 0
  306. }
  307. case <-time.After(1000 * time.Millisecond):
  308. if indexu > 0 {
  309. saveProbSp <- true
  310. go func(arru []map[string]interface{}) {
  311. defer func() {
  312. <-saveProbSp
  313. }()
  314. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  315. }(arru[:indexu])
  316. arru = make([]map[string]interface{}, saveSize)
  317. indexu = 0
  318. }
  319. }
  320. }
  321. }
  322. func SaveProTagFunc() {
  323. arru := make([]map[string]interface{}, saveSize)
  324. indexu := 0
  325. for {
  326. select {
  327. case v := <-saveProTagPool:
  328. arru[indexu] = v
  329. indexu++
  330. if indexu == saveSize {
  331. saveProTagSp <- true
  332. go func(arru []map[string]interface{}) {
  333. defer func() {
  334. <-saveProTagSp
  335. }()
  336. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  337. }(arru)
  338. arru = make([]map[string]interface{}, saveSize)
  339. indexu = 0
  340. }
  341. case <-time.After(1000 * time.Millisecond):
  342. if indexu > 0 {
  343. saveProTagSp <- true
  344. go func(arru []map[string]interface{}) {
  345. defer func() {
  346. <-saveProTagSp
  347. }()
  348. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  349. }(arru[:indexu])
  350. arru = make([]map[string]interface{}, saveSize)
  351. indexu = 0
  352. }
  353. }
  354. }
  355. }
  356. func SaveRelationFunc() {
  357. arru := make([]map[string]interface{}, saveSize)
  358. indexu := 0
  359. for {
  360. select {
  361. case v := <-saveRelationPool:
  362. arru[indexu] = v
  363. indexu++
  364. if indexu == saveSize {
  365. saveRelationSp <- true
  366. go func(arru []map[string]interface{}) {
  367. defer func() {
  368. <-saveRelationSp
  369. }()
  370. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  371. }(arru)
  372. arru = make([]map[string]interface{}, saveSize)
  373. indexu = 0
  374. }
  375. case <-time.After(1000 * time.Millisecond):
  376. if indexu > 0 {
  377. saveRelationSp <- true
  378. go func(arru []map[string]interface{}) {
  379. defer func() {
  380. <-saveRelationSp
  381. }()
  382. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  383. }(arru[:indexu])
  384. arru = make([]map[string]interface{}, saveSize)
  385. indexu = 0
  386. }
  387. }
  388. }
  389. }
  390. // 字段错误数据
  391. func saveErrMethod() {
  392. arru := make([]map[string]interface{}, 200)
  393. indexu := 0
  394. for {
  395. select {
  396. case v := <-saveErrPool:
  397. arru[indexu] = v
  398. indexu++
  399. if indexu == saveSize {
  400. saveErrSp <- true
  401. go func(arru []map[string]interface{}) {
  402. defer func() {
  403. <-saveErrSp
  404. }()
  405. MongoB.SaveBulk("bidding_mgo_to_tidb_f_err", arru...)
  406. }(arru)
  407. arru = make([]map[string]interface{}, saveSize)
  408. indexu = 0
  409. }
  410. case <-time.After(1000 * time.Millisecond):
  411. if indexu > 0 {
  412. saveErrSp <- true
  413. go func(arru []map[string]interface{}) {
  414. defer func() {
  415. <-saveErrSp
  416. }()
  417. MongoB.SaveBulk("bidding_mgo_to_tidb_f_err", arru...)
  418. }(arru[:indexu])
  419. arru = make([]map[string]interface{}, saveSize)
  420. indexu = 0
  421. }
  422. }
  423. }
  424. }