main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "data_tidb/config"
  8. "fmt"
  9. "github.com/spf13/cobra"
  10. "go.mongodb.org/mongo-driver/bson"
  11. "go.uber.org/zap"
  12. "sync"
  13. "time"
  14. )
  15. func init() {
  16. config.Init("./common.toml")
  17. InitLog()
  18. InitMgo()
  19. InitMysql()
  20. InitField()
  21. redis.InitRedis1("qyxy_id=127.0.0.1:4379", 1)
  22. //redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1)
  23. log.Info("init success")
  24. }
  25. func main() {
  26. rootCmd := &cobra.Command{Use: "my cmd"}
  27. rootCmd.AddCommand(bidding())
  28. rootCmd.AddCommand(project())
  29. if err := rootCmd.Execute(); err != nil {
  30. fmt.Println("rootCmd.Execute failed", err.Error())
  31. }
  32. //taskMysql()
  33. //taskMgo()
  34. c := make(chan bool, 1)
  35. <-c
  36. }
  37. func taskMysql() {
  38. pool := make(chan bool, 5) //控制线程数
  39. wg := &sync.WaitGroup{}
  40. finalId := 0
  41. //lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo"))
  42. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation"))
  43. if len(*lastInfo) > 0 {
  44. finalId = util.IntAll((*lastInfo)[0]["id"])
  45. }
  46. log.Info("查询最后id---", zap.Int("finally id: ", finalId))
  47. lastid, count := 0, 0
  48. for {
  49. log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
  50. q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation", lastid)
  51. //q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id=61771536 ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation")
  52. //q := fmt.Sprintf("SELECT id, name, name_id FROM %s WHERE id>%d ORDER BY id ASC limit 1000000", "dws_f_ent_baseinfo", lastid)
  53. rows, err := MysqlTool.DB.Query(q)
  54. if err != nil {
  55. log.Error("mysql query err ", zap.Error(err))
  56. }
  57. columns, err := rows.Columns()
  58. if finalId == lastid {
  59. log.Info("----finish-----", zap.Int("count: ", count))
  60. break
  61. }
  62. for rows.Next() {
  63. scanArgs := make([]interface{}, len(columns))
  64. values := make([]interface{}, len(columns))
  65. ret := make(map[string]interface{})
  66. for k := range values {
  67. scanArgs[k] = &values[k]
  68. }
  69. err = rows.Scan(scanArgs...)
  70. if err != nil {
  71. log.Error("mysql scan err ", zap.Error(err))
  72. break
  73. }
  74. for i, col := range values {
  75. if col == nil {
  76. ret[columns[i]] = nil
  77. } else {
  78. switch val := (*scanArgs[i].(*interface{})).(type) {
  79. case byte:
  80. ret[columns[i]] = val
  81. break
  82. case []byte:
  83. v := string(val)
  84. switch v {
  85. case "\x00": // 处理数据类型为bit的情况
  86. ret[columns[i]] = 0
  87. case "\x01": // 处理数据类型为bit的情况
  88. ret[columns[i]] = 1
  89. default:
  90. ret[columns[i]] = v
  91. break
  92. }
  93. break
  94. case time.Time:
  95. if val.IsZero() {
  96. ret[columns[i]] = nil
  97. } else {
  98. ret[columns[i]] = val.Format("2006-01-02 15:04:05")
  99. }
  100. break
  101. default:
  102. ret[columns[i]] = val
  103. }
  104. }
  105. }
  106. lastid = util.IntAll(ret["id"])
  107. count++
  108. if count%20000 == 0 {
  109. log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
  110. }
  111. pool <- true
  112. wg.Add(1)
  113. go func(tmp map[string]interface{}) {
  114. defer func() {
  115. <-pool
  116. wg.Done()
  117. }()
  118. cid := util.Int64All(tmp["id"])
  119. pid := util.ObjToString(tmp["projectid"])
  120. name_id := util.ObjToString(tmp["name_id"])
  121. identity_type := util.Int64All(tmp["identity_type+0"])
  122. if name_id != "" {
  123. pinfo, _ := MongoP.FindById("projectset", pid, bson.M{"ids": 1})
  124. if len(*pinfo) > 0 {
  125. for _, id := range util.ObjArrToStringArr((*pinfo)["ids"].([]interface{})) {
  126. coll := "bidding"
  127. //if id > "5a862e7040d2d9bbe88e3b1f" {
  128. // coll = "bidding"
  129. //} else {
  130. // coll = "bidding_back"
  131. //}
  132. info, _ := MongoB.FindById(coll, id, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1})
  133. if len(*info) > 0 {
  134. if identity_type == 1 {
  135. if util.ObjToString((*info)["buyertel"]) != "" && util.ObjToString((*info)["buyerperson"]) != "" {
  136. q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["buyerperson"]), "contact_tel": util.ObjToString((*info)["buyertel"])}
  137. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  138. if cinfo != nil && len(*cinfo) > 0 {
  139. MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  140. break
  141. }
  142. }
  143. } else if identity_type == 2 {
  144. if util.ObjToString((*info)["winnertel"]) != "" && util.ObjToString((*info)["winnerperson"]) != "" {
  145. if util.ObjToString((*info)["winnertel"]) != "" && util.ObjToString((*info)["winnerperson"]) != "" {
  146. q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["winnerperson"]), "contact_tel": util.ObjToString((*info)["winnertel"])}
  147. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  148. if cinfo != nil && len(*cinfo) > 0 {
  149. MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  150. break
  151. }
  152. }
  153. }
  154. } else if identity_type == 4 {
  155. if util.ObjToString((*info)["agencytel"]) != "" && util.ObjToString((*info)["agencyperson"]) != "" {
  156. if util.ObjToString((*info)["agencytel"]) != "" && util.ObjToString((*info)["agencyperson"]) != "" {
  157. q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["agencyperson"]), "contact_tel": util.ObjToString((*info)["agencytel"])}
  158. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  159. if cinfo != nil && len(*cinfo) > 0 {
  160. MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  161. break
  162. }
  163. }
  164. }
  165. }
  166. }
  167. }
  168. }
  169. }
  170. //redis.PutCKV("qyxy_id", util.ObjToString(tmp["name"]), util.ObjToString(tmp["name_id"]))
  171. }(ret)
  172. ret = make(map[string]interface{})
  173. }
  174. _ = rows.Close()
  175. wg.Wait()
  176. }
  177. }
  178. func taskMgo() {
  179. sess := MongoP.GetMgoConn()
  180. defer MongoP.DestoryMongoConn(sess)
  181. ch := make(chan bool, 3)
  182. wg := &sync.WaitGroup{}
  183. q := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId("63411488911e1eb3459fb87e")}}
  184. field := map[string]interface{}{"ids": 1}
  185. query := sess.DB("qfw").C("projectset_20220721").Find(q).Select(field).Iter()
  186. count := 0
  187. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  188. if count%20000 == 0 {
  189. util.Debug("current ---", count, tmp["_id"])
  190. }
  191. ch <- true
  192. wg.Add(1)
  193. go func(tmp map[string]interface{}) {
  194. defer func() {
  195. <-ch
  196. wg.Done()
  197. }()
  198. id := mongodb.BsonIdToSId(tmp["_id"])
  199. for _, i := range util.ObjArrToStringArr(tmp["ids"].([]interface{})) {
  200. redis.PutCKV("s_id", i, id)
  201. }
  202. }(tmp)
  203. tmp = make(map[string]interface{})
  204. }
  205. wg.Wait()
  206. util.Debug("over ---", count)
  207. }
  208. // @Description 标讯数据
  209. // @Author J 2022/9/20 17:52
  210. func bidding() *cobra.Command {
  211. cmdClient := &cobra.Command{
  212. Use: "bidding",
  213. Short: "Start processing bidding data",
  214. Run: func(cmd *cobra.Command, args []string) {
  215. go SaveFunc()
  216. //go SaveTagFunc()
  217. //go SaveExpandFunc()
  218. //go SaveAttrFunc()
  219. //go SaveImfFunc()
  220. //go SaveIntentFunc()
  221. //go SaveWinnerFunc()
  222. //go SavePackageFunc()
  223. //go SavePurFunc()
  224. go saveErrMethod()
  225. taskB()
  226. },
  227. }
  228. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  229. return cmdClient
  230. }
  231. // @Description 项目数据
  232. // @Author J 2022/9/20 17:52
  233. func project() *cobra.Command {
  234. cmdClient := &cobra.Command{
  235. Use: "project",
  236. Short: "Start processing project data",
  237. Run: func(cmd *cobra.Command, args []string) {
  238. go SaveProFunc()
  239. go SaveProTagFunc()
  240. go SaveProbFunc()
  241. go SaveRelationFunc()
  242. taskP()
  243. },
  244. }
  245. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  246. return cmdClient
  247. }
  248. func SaveFunc() {
  249. arru := make([]map[string]interface{}, saveSize)
  250. indexu := 0
  251. for {
  252. select {
  253. case v := <-saveBasePool:
  254. arru[indexu] = v
  255. indexu++
  256. if indexu == saveSize {
  257. saveBaseSp <- true
  258. go func(arru []map[string]interface{}) {
  259. defer func() {
  260. <-saveBaseSp
  261. }()
  262. MysqlTool.InsertBulk("dws_f_bid_baseinfo_new", BaseField, arru...)
  263. }(arru)
  264. arru = make([]map[string]interface{}, saveSize)
  265. indexu = 0
  266. }
  267. case <-time.After(1000 * time.Millisecond):
  268. if indexu > 0 {
  269. saveBaseSp <- true
  270. go func(arru []map[string]interface{}) {
  271. defer func() {
  272. <-saveBaseSp
  273. }()
  274. MysqlTool.InsertBulk("dws_f_bid_baseinfo_new", BaseField, arru...)
  275. }(arru[:indexu])
  276. arru = make([]map[string]interface{}, saveSize)
  277. indexu = 0
  278. }
  279. }
  280. }
  281. }
  282. func SaveExpandFunc() {
  283. arru := make([]map[string]interface{}, saveSize)
  284. indexu := 0
  285. for {
  286. select {
  287. case v := <-saveExpandPool:
  288. arru[indexu] = v
  289. indexu++
  290. if indexu == saveSize {
  291. saveExpandSp <- true
  292. go func(arru []map[string]interface{}) {
  293. defer func() {
  294. <-saveExpandSp
  295. }()
  296. MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
  297. }(arru)
  298. arru = make([]map[string]interface{}, saveSize)
  299. indexu = 0
  300. }
  301. case <-time.After(1000 * time.Millisecond):
  302. if indexu > 0 {
  303. saveExpandSp <- true
  304. go func(arru []map[string]interface{}) {
  305. defer func() {
  306. <-saveExpandSp
  307. }()
  308. MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
  309. }(arru[:indexu])
  310. arru = make([]map[string]interface{}, saveSize)
  311. indexu = 0
  312. }
  313. }
  314. }
  315. }
  316. func SaveTagFunc() {
  317. arru := make([]map[string]interface{}, saveSize)
  318. indexu := 0
  319. for {
  320. select {
  321. case v := <-saveTagPool:
  322. arru[indexu] = v
  323. indexu++
  324. if indexu == saveSize {
  325. saveTagSp <- true
  326. go func(arru []map[string]interface{}) {
  327. defer func() {
  328. <-saveTagSp
  329. }()
  330. MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
  331. }(arru)
  332. arru = make([]map[string]interface{}, saveSize)
  333. indexu = 0
  334. }
  335. case <-time.After(1000 * time.Millisecond):
  336. if indexu > 0 {
  337. saveTagSp <- true
  338. go func(arru []map[string]interface{}) {
  339. defer func() {
  340. <-saveTagSp
  341. }()
  342. MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
  343. }(arru[:indexu])
  344. arru = make([]map[string]interface{}, saveSize)
  345. indexu = 0
  346. }
  347. }
  348. }
  349. }
  350. func SaveAttrFunc() {
  351. arru := make([]map[string]interface{}, saveSize)
  352. indexu := 0
  353. for {
  354. select {
  355. case v := <-saveAttrPool:
  356. arru[indexu] = v
  357. indexu++
  358. if indexu == saveSize {
  359. saveAttrSp <- true
  360. go func(arru []map[string]interface{}) {
  361. defer func() {
  362. <-saveAttrSp
  363. }()
  364. MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
  365. }(arru)
  366. arru = make([]map[string]interface{}, saveSize)
  367. indexu = 0
  368. }
  369. case <-time.After(1000 * time.Millisecond):
  370. if indexu > 0 {
  371. saveAttrSp <- true
  372. go func(arru []map[string]interface{}) {
  373. defer func() {
  374. <-saveAttrSp
  375. }()
  376. MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
  377. }(arru[:indexu])
  378. arru = make([]map[string]interface{}, saveSize)
  379. indexu = 0
  380. }
  381. }
  382. }
  383. }
  384. func SaveImfFunc() {
  385. arru := make([]map[string]interface{}, saveSize)
  386. indexu := 0
  387. for {
  388. select {
  389. case v := <-saveIfmPool:
  390. arru[indexu] = v
  391. indexu++
  392. if indexu == saveSize {
  393. saveIfmSp <- true
  394. go func(arru []map[string]interface{}) {
  395. defer func() {
  396. <-saveIfmSp
  397. }()
  398. MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
  399. }(arru)
  400. arru = make([]map[string]interface{}, saveSize)
  401. indexu = 0
  402. }
  403. case <-time.After(1000 * time.Millisecond):
  404. if indexu > 0 {
  405. saveIfmSp <- true
  406. go func(arru []map[string]interface{}) {
  407. defer func() {
  408. <-saveIfmSp
  409. }()
  410. MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
  411. }(arru[:indexu])
  412. arru = make([]map[string]interface{}, saveSize)
  413. indexu = 0
  414. }
  415. }
  416. }
  417. }
  418. func SavePurFunc() {
  419. arru := make([]map[string]interface{}, saveSize)
  420. indexu := 0
  421. for {
  422. select {
  423. case v := <-savePurPool:
  424. arru[indexu] = v
  425. indexu++
  426. if indexu == saveSize {
  427. savePurSp <- true
  428. go func(arru []map[string]interface{}) {
  429. defer func() {
  430. <-savePurSp
  431. }()
  432. MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
  433. }(arru)
  434. arru = make([]map[string]interface{}, saveSize)
  435. indexu = 0
  436. }
  437. case <-time.After(1000 * time.Millisecond):
  438. if indexu > 0 {
  439. savePurSp <- true
  440. go func(arru []map[string]interface{}) {
  441. defer func() {
  442. <-savePurSp
  443. }()
  444. MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
  445. }(arru[:indexu])
  446. arru = make([]map[string]interface{}, saveSize)
  447. indexu = 0
  448. }
  449. }
  450. }
  451. }
  452. func SaveIntentFunc() {
  453. arru := make([]map[string]interface{}, saveSize)
  454. indexu := 0
  455. for {
  456. select {
  457. case v := <-saveIntentPool:
  458. arru[indexu] = v
  459. indexu++
  460. if indexu == saveSize {
  461. saveIntentSp <- true
  462. go func(arru []map[string]interface{}) {
  463. defer func() {
  464. <-saveIntentSp
  465. }()
  466. MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
  467. }(arru)
  468. arru = make([]map[string]interface{}, saveSize)
  469. indexu = 0
  470. }
  471. case <-time.After(1000 * time.Millisecond):
  472. if indexu > 0 {
  473. saveIntentSp <- true
  474. go func(arru []map[string]interface{}) {
  475. defer func() {
  476. <-saveIntentSp
  477. }()
  478. MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
  479. }(arru[:indexu])
  480. arru = make([]map[string]interface{}, saveSize)
  481. indexu = 0
  482. }
  483. }
  484. }
  485. }
  486. func SaveWinnerFunc() {
  487. arru := make([]map[string]interface{}, saveSize)
  488. indexu := 0
  489. for {
  490. select {
  491. case v := <-saveWinnerPool:
  492. arru[indexu] = v
  493. indexu++
  494. if indexu == saveSize {
  495. saveWinnerSp <- true
  496. go func(arru []map[string]interface{}) {
  497. defer func() {
  498. <-saveWinnerSp
  499. }()
  500. MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
  501. }(arru)
  502. arru = make([]map[string]interface{}, saveSize)
  503. indexu = 0
  504. }
  505. case <-time.After(1000 * time.Millisecond):
  506. if indexu > 0 {
  507. saveWinnerSp <- true
  508. go func(arru []map[string]interface{}) {
  509. defer func() {
  510. <-saveWinnerSp
  511. }()
  512. MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
  513. }(arru[:indexu])
  514. arru = make([]map[string]interface{}, saveSize)
  515. indexu = 0
  516. }
  517. }
  518. }
  519. }
  520. func SavePackageFunc() {
  521. arru := make([]map[string]interface{}, saveSize)
  522. indexu := 0
  523. for {
  524. select {
  525. case v := <-savePkgPool:
  526. arru[indexu] = v
  527. indexu++
  528. if indexu == saveSize {
  529. savePkgSp <- true
  530. go func(arru []map[string]interface{}) {
  531. defer func() {
  532. <-savePkgSp
  533. }()
  534. MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
  535. }(arru)
  536. arru = make([]map[string]interface{}, saveSize)
  537. indexu = 0
  538. }
  539. case <-time.After(1000 * time.Millisecond):
  540. if indexu > 0 {
  541. savePkgSp <- true
  542. go func(arru []map[string]interface{}) {
  543. defer func() {
  544. <-savePkgSp
  545. }()
  546. MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
  547. }(arru[:indexu])
  548. arru = make([]map[string]interface{}, saveSize)
  549. indexu = 0
  550. }
  551. }
  552. }
  553. }
  554. func SaveProFunc() {
  555. arru := make([]map[string]interface{}, saveSize)
  556. indexu := 0
  557. for {
  558. select {
  559. case v := <-saveProPool:
  560. arru[indexu] = v
  561. indexu++
  562. if indexu == saveSize {
  563. saveProSp <- true
  564. go func(arru []map[string]interface{}) {
  565. defer func() {
  566. <-saveProSp
  567. }()
  568. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  569. }(arru)
  570. arru = make([]map[string]interface{}, saveSize)
  571. indexu = 0
  572. }
  573. case <-time.After(1000 * time.Millisecond):
  574. if indexu > 0 {
  575. saveProSp <- true
  576. go func(arru []map[string]interface{}) {
  577. defer func() {
  578. <-saveProSp
  579. }()
  580. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  581. }(arru[:indexu])
  582. arru = make([]map[string]interface{}, saveSize)
  583. indexu = 0
  584. }
  585. }
  586. }
  587. }
  588. func SaveProbFunc() {
  589. arru := make([]map[string]interface{}, saveSize)
  590. indexu := 0
  591. for {
  592. select {
  593. case v := <-saveProbPool:
  594. arru[indexu] = v
  595. indexu++
  596. if indexu == saveSize {
  597. saveProbSp <- true
  598. go func(arru []map[string]interface{}) {
  599. defer func() {
  600. <-saveProbSp
  601. }()
  602. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  603. }(arru)
  604. arru = make([]map[string]interface{}, saveSize)
  605. indexu = 0
  606. }
  607. case <-time.After(1000 * time.Millisecond):
  608. if indexu > 0 {
  609. saveProbSp <- true
  610. go func(arru []map[string]interface{}) {
  611. defer func() {
  612. <-saveProbSp
  613. }()
  614. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  615. }(arru[:indexu])
  616. arru = make([]map[string]interface{}, saveSize)
  617. indexu = 0
  618. }
  619. }
  620. }
  621. }
  622. func SaveProTagFunc() {
  623. arru := make([]map[string]interface{}, saveSize)
  624. indexu := 0
  625. for {
  626. select {
  627. case v := <-saveProTagPool:
  628. arru[indexu] = v
  629. indexu++
  630. if indexu == saveSize {
  631. saveProTagSp <- true
  632. go func(arru []map[string]interface{}) {
  633. defer func() {
  634. <-saveProTagSp
  635. }()
  636. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  637. }(arru)
  638. arru = make([]map[string]interface{}, saveSize)
  639. indexu = 0
  640. }
  641. case <-time.After(1000 * time.Millisecond):
  642. if indexu > 0 {
  643. saveProTagSp <- true
  644. go func(arru []map[string]interface{}) {
  645. defer func() {
  646. <-saveProTagSp
  647. }()
  648. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  649. }(arru[:indexu])
  650. arru = make([]map[string]interface{}, saveSize)
  651. indexu = 0
  652. }
  653. }
  654. }
  655. }
  656. func SaveRelationFunc() {
  657. arru := make([]map[string]interface{}, saveSize)
  658. indexu := 0
  659. for {
  660. select {
  661. case v := <-saveRelationPool:
  662. arru[indexu] = v
  663. indexu++
  664. if indexu == saveSize {
  665. saveRelationSp <- true
  666. go func(arru []map[string]interface{}) {
  667. defer func() {
  668. <-saveRelationSp
  669. }()
  670. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  671. }(arru)
  672. arru = make([]map[string]interface{}, saveSize)
  673. indexu = 0
  674. }
  675. case <-time.After(1000 * time.Millisecond):
  676. if indexu > 0 {
  677. saveRelationSp <- true
  678. go func(arru []map[string]interface{}) {
  679. defer func() {
  680. <-saveRelationSp
  681. }()
  682. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  683. }(arru[:indexu])
  684. arru = make([]map[string]interface{}, saveSize)
  685. indexu = 0
  686. }
  687. }
  688. }
  689. }
  690. // 字段错误数据
  691. func saveErrMethod() {
  692. arru := make([]map[string]interface{}, 200)
  693. indexu := 0
  694. for {
  695. select {
  696. case v := <-saveErrPool:
  697. arru[indexu] = v
  698. indexu++
  699. if indexu == saveSize {
  700. saveErrSp <- true
  701. go func(arru []map[string]interface{}) {
  702. defer func() {
  703. <-saveErrSp
  704. }()
  705. MongoB.SaveBulk("bidding_tidb_f_err", arru...)
  706. }(arru)
  707. arru = make([]map[string]interface{}, saveSize)
  708. indexu = 0
  709. }
  710. case <-time.After(1000 * time.Millisecond):
  711. if indexu > 0 {
  712. saveErrSp <- true
  713. go func(arru []map[string]interface{}) {
  714. defer func() {
  715. <-saveErrSp
  716. }()
  717. MongoB.SaveBulk("bidding_tidb_f_err", arru...)
  718. }(arru[:indexu])
  719. arru = make([]map[string]interface{}, saveSize)
  720. indexu = 0
  721. }
  722. }
  723. }
  724. }