save.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. package main
  2. import "time"
  3. func SaveFunc() {
  4. arru := make([]map[string]interface{}, saveSize)
  5. indexu := 0
  6. for {
  7. select {
  8. case v := <-saveBasePool:
  9. arru[indexu] = v
  10. indexu++
  11. if indexu == saveSize {
  12. saveBaseSp <- true
  13. go func(arru []map[string]interface{}) {
  14. defer func() {
  15. <-saveBaseSp
  16. }()
  17. MysqlTool.InsertBulk("dwd_f_bid_baseinfo", BaseField, arru...)
  18. }(arru)
  19. arru = make([]map[string]interface{}, saveSize)
  20. indexu = 0
  21. }
  22. case <-time.After(1000 * time.Millisecond):
  23. if indexu > 0 {
  24. saveBaseSp <- true
  25. go func(arru []map[string]interface{}) {
  26. defer func() {
  27. <-saveBaseSp
  28. }()
  29. MysqlTool.InsertBulk("dwd_f_bid_baseinfo", BaseField, arru...)
  30. }(arru[:indexu])
  31. arru = make([]map[string]interface{}, saveSize)
  32. indexu = 0
  33. }
  34. }
  35. }
  36. }
  37. func SaveExpandFunc() {
  38. arru := make([]map[string]interface{}, saveSize)
  39. indexu := 0
  40. for {
  41. select {
  42. case v := <-saveExpandPool:
  43. arru[indexu] = v
  44. indexu++
  45. if indexu == saveSize {
  46. saveExpandSp <- true
  47. go func(arru []map[string]interface{}) {
  48. defer func() {
  49. <-saveExpandSp
  50. }()
  51. MysqlTool.InsertBulk("dwd_f_bid_expand_baseinfo", ExpandField, arru...)
  52. }(arru)
  53. arru = make([]map[string]interface{}, saveSize)
  54. indexu = 0
  55. }
  56. case <-time.After(1000 * time.Millisecond):
  57. if indexu > 0 {
  58. saveExpandSp <- true
  59. go func(arru []map[string]interface{}) {
  60. defer func() {
  61. <-saveExpandSp
  62. }()
  63. MysqlTool.InsertBulk("dwd_f_bid_expand_baseinfo", ExpandField, arru...)
  64. }(arru[:indexu])
  65. arru = make([]map[string]interface{}, saveSize)
  66. indexu = 0
  67. }
  68. }
  69. }
  70. }
  71. func SaveDetailFunc() {
  72. arru := make([]map[string]interface{}, saveSize)
  73. indexu := 0
  74. for {
  75. select {
  76. case v := <-saveDetailPool:
  77. arru[indexu] = v
  78. indexu++
  79. if indexu == saveSize {
  80. saveDetailSp <- true
  81. go func(arru []map[string]interface{}) {
  82. defer func() {
  83. <-saveDetailSp
  84. }()
  85. MysqlTool.InsertBulk("dwd_f_bid_detail", DetailField, arru...)
  86. }(arru)
  87. arru = make([]map[string]interface{}, saveSize)
  88. indexu = 0
  89. }
  90. case <-time.After(1000 * time.Millisecond):
  91. if indexu > 0 {
  92. saveDetailSp <- true
  93. go func(arru []map[string]interface{}) {
  94. defer func() {
  95. <-saveDetailSp
  96. }()
  97. MysqlTool.InsertBulk("dwd_f_bid_detail", DetailField, arru...)
  98. }(arru[:indexu])
  99. arru = make([]map[string]interface{}, saveSize)
  100. indexu = 0
  101. }
  102. }
  103. }
  104. }
  105. func SaveAttrFunc() {
  106. arru := make([]map[string]interface{}, saveSize)
  107. indexu := 0
  108. for {
  109. select {
  110. case v := <-saveAttrPool:
  111. arru[indexu] = v
  112. indexu++
  113. if indexu == saveSize {
  114. saveAttrSp <- true
  115. go func(arru []map[string]interface{}) {
  116. defer func() {
  117. <-saveAttrSp
  118. }()
  119. MysqlTool.InsertBulk("dwd_f_bid_file_text", AttrField, arru...)
  120. }(arru)
  121. arru = make([]map[string]interface{}, saveSize)
  122. indexu = 0
  123. }
  124. case <-time.After(1000 * time.Millisecond):
  125. if indexu > 0 {
  126. saveAttrSp <- true
  127. go func(arru []map[string]interface{}) {
  128. defer func() {
  129. <-saveAttrSp
  130. }()
  131. MysqlTool.InsertBulk("dwd_f_bid_file_text", AttrField, arru...)
  132. }(arru[:indexu])
  133. arru = make([]map[string]interface{}, saveSize)
  134. indexu = 0
  135. }
  136. }
  137. }
  138. }
  139. func SaveIntentFunc() {
  140. arru := make([]map[string]interface{}, saveSize)
  141. indexu := 0
  142. for {
  143. select {
  144. case v := <-saveIntentPool:
  145. arru[indexu] = v
  146. indexu++
  147. if indexu == saveSize {
  148. saveIntentSp <- true
  149. go func(arru []map[string]interface{}) {
  150. defer func() {
  151. <-saveIntentSp
  152. }()
  153. MysqlTool.InsertBulk("dwd_f_bid_intention_baseinfo", IntentField, arru...)
  154. }(arru)
  155. arru = make([]map[string]interface{}, saveSize)
  156. indexu = 0
  157. }
  158. case <-time.After(1000 * time.Millisecond):
  159. if indexu > 0 {
  160. saveIntentSp <- true
  161. go func(arru []map[string]interface{}) {
  162. defer func() {
  163. <-saveIntentSp
  164. }()
  165. MysqlTool.InsertBulk("dwd_f_bid_intention_baseinfo", IntentField, arru...)
  166. }(arru[:indexu])
  167. arru = make([]map[string]interface{}, saveSize)
  168. indexu = 0
  169. }
  170. }
  171. }
  172. }
  173. func SaveProFunc() {
  174. arru := make([]map[string]interface{}, saveSize)
  175. indexu := 0
  176. for {
  177. select {
  178. case v := <-saveProPool:
  179. arru[indexu] = v
  180. indexu++
  181. if indexu == saveSize {
  182. saveProSp <- true
  183. go func(arru []map[string]interface{}) {
  184. defer func() {
  185. <-saveProSp
  186. }()
  187. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  188. }(arru)
  189. arru = make([]map[string]interface{}, saveSize)
  190. indexu = 0
  191. }
  192. case <-time.After(1000 * time.Millisecond):
  193. if indexu > 0 {
  194. saveProSp <- true
  195. go func(arru []map[string]interface{}) {
  196. defer func() {
  197. <-saveProSp
  198. }()
  199. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  200. }(arru[:indexu])
  201. arru = make([]map[string]interface{}, saveSize)
  202. indexu = 0
  203. }
  204. }
  205. }
  206. }
  207. func SaveProbFunc() {
  208. arru := make([]map[string]interface{}, saveSize)
  209. indexu := 0
  210. for {
  211. select {
  212. case v := <-saveProbPool:
  213. arru[indexu] = v
  214. indexu++
  215. if indexu == saveSize {
  216. saveProbSp <- true
  217. go func(arru []map[string]interface{}) {
  218. defer func() {
  219. <-saveProbSp
  220. }()
  221. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  222. }(arru)
  223. arru = make([]map[string]interface{}, saveSize)
  224. indexu = 0
  225. }
  226. case <-time.After(1000 * time.Millisecond):
  227. if indexu > 0 {
  228. saveProbSp <- true
  229. go func(arru []map[string]interface{}) {
  230. defer func() {
  231. <-saveProbSp
  232. }()
  233. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  234. }(arru[:indexu])
  235. arru = make([]map[string]interface{}, saveSize)
  236. indexu = 0
  237. }
  238. }
  239. }
  240. }
  241. func SaveProTagFunc() {
  242. arru := make([]map[string]interface{}, saveSize)
  243. indexu := 0
  244. for {
  245. select {
  246. case v := <-saveProTagPool:
  247. arru[indexu] = v
  248. indexu++
  249. if indexu == saveSize {
  250. saveProTagSp <- true
  251. go func(arru []map[string]interface{}) {
  252. defer func() {
  253. <-saveProTagSp
  254. }()
  255. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  256. }(arru)
  257. arru = make([]map[string]interface{}, saveSize)
  258. indexu = 0
  259. }
  260. case <-time.After(1000 * time.Millisecond):
  261. if indexu > 0 {
  262. saveProTagSp <- true
  263. go func(arru []map[string]interface{}) {
  264. defer func() {
  265. <-saveProTagSp
  266. }()
  267. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  268. }(arru[:indexu])
  269. arru = make([]map[string]interface{}, saveSize)
  270. indexu = 0
  271. }
  272. }
  273. }
  274. }
  275. func SaveRelationFunc() {
  276. arru := make([]map[string]interface{}, saveSize)
  277. indexu := 0
  278. for {
  279. select {
  280. case v := <-saveRelationPool:
  281. arru[indexu] = v
  282. indexu++
  283. if indexu == saveSize {
  284. saveRelationSp <- true
  285. go func(arru []map[string]interface{}) {
  286. defer func() {
  287. <-saveRelationSp
  288. }()
  289. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  290. }(arru)
  291. arru = make([]map[string]interface{}, saveSize)
  292. indexu = 0
  293. }
  294. case <-time.After(1000 * time.Millisecond):
  295. if indexu > 0 {
  296. saveRelationSp <- true
  297. go func(arru []map[string]interface{}) {
  298. defer func() {
  299. <-saveRelationSp
  300. }()
  301. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  302. }(arru[:indexu])
  303. arru = make([]map[string]interface{}, saveSize)
  304. indexu = 0
  305. }
  306. }
  307. }
  308. }
  309. // 字段错误数据
  310. func saveErrMethod() {
  311. arru := make([]map[string]interface{}, 200)
  312. indexu := 0
  313. for {
  314. select {
  315. case v := <-saveErrPool:
  316. arru[indexu] = v
  317. indexu++
  318. if indexu == saveSize {
  319. saveErrSp <- true
  320. go func(arru []map[string]interface{}) {
  321. defer func() {
  322. <-saveErrSp
  323. }()
  324. MongoB.SaveBulk("bidding_mgo_to_tidb_f_err", arru...)
  325. }(arru)
  326. arru = make([]map[string]interface{}, saveSize)
  327. indexu = 0
  328. }
  329. case <-time.After(1000 * time.Millisecond):
  330. if indexu > 0 {
  331. saveErrSp <- true
  332. go func(arru []map[string]interface{}) {
  333. defer func() {
  334. <-saveErrSp
  335. }()
  336. MongoB.SaveBulk("bidding_mgo_to_tidb_f_err", arru...)
  337. }(arru[:indexu])
  338. arru = make([]map[string]interface{}, saveSize)
  339. indexu = 0
  340. }
  341. }
  342. }
  343. }