main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. package main
  2. import (
  3. "data_tidb/config"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/spf13/cobra"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.uber.org/zap"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  14. "net"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. UdpClient udp.UdpClient
  20. )
  21. func init() {
  22. config.Init("./common.toml")
  23. InitLog()
  24. InitMgo()
  25. InitMysql()
  26. InitField()
  27. redis.InitRedis1("qyxy_id=127.0.0.1:8379", 1)
  28. //redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1)
  29. log.Info("init success")
  30. }
  31. func main() {
  32. //go SaveFunc()
  33. //go SaveTagFunc()
  34. //go saveErrMethod()
  35. //rootCmd := &cobra.Command{Use: "my cmd"}
  36. //rootCmd.AddCommand(bidding())
  37. //rootCmd.AddCommand(project())
  38. //if err := rootCmd.Execute(); err != nil {
  39. // fmt.Println("rootCmd.Execute failed", err.Error())
  40. //}
  41. taskMysql()
  42. //UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  43. //UdpClient.Listen(processUdpMsg)
  44. //log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  45. c := make(chan bool, 1)
  46. <-c
  47. }
  48. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  49. defer util.Catch()
  50. switch act {
  51. case udp.OP_TYPE_DATA: //上个节点的数据
  52. var mapInfo map[string]interface{}
  53. err := json.Unmarshal(data, &mapInfo)
  54. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  55. gtid, _ := mapInfo["gtid"].(string)
  56. lteid, _ := mapInfo["lteid"].(string)
  57. if err != nil || gtid == "" || lteid == "" {
  58. UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写
  59. } else {
  60. //udp成功回写
  61. if k := util.ObjToString(mapInfo["key"]); k != "" {
  62. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  63. } else {
  64. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  65. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  66. }
  67. log.Info("start sync ...")
  68. doBiddingTask(gtid, lteid, mapInfo)
  69. }
  70. }
  71. }
  72. func taskMysql() {
  73. pool := make(chan bool, 5) //控制线程数
  74. wg := &sync.WaitGroup{}
  75. finalId := 6500183
  76. //lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo"))
  77. //lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation_new"))
  78. //if len(*lastInfo) > 0 {
  79. // finalId = util.IntAll((*lastInfo)[0]["id"])
  80. //}
  81. log.Info("查询最后id---", zap.Int("finally id: ", finalId))
  82. lastid, count := 0, 0
  83. for {
  84. log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
  85. q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation_new", lastid)
  86. //q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id=61771536 ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation")
  87. //q := fmt.Sprintf("SELECT id, name, name_id FROM %s WHERE id>%d ORDER BY id ASC limit 1000000", "dws_f_ent_baseinfo", lastid)
  88. rows, err := MysqlTool.DB.Query(q)
  89. if err != nil {
  90. log.Error("mysql query err ", zap.Error(err))
  91. }
  92. columns, err := rows.Columns()
  93. if finalId == lastid {
  94. log.Info("----finish-----", zap.Int("count: ", count))
  95. break
  96. }
  97. for rows.Next() {
  98. scanArgs := make([]interface{}, len(columns))
  99. values := make([]interface{}, len(columns))
  100. ret := make(map[string]interface{})
  101. for k := range values {
  102. scanArgs[k] = &values[k]
  103. }
  104. err = rows.Scan(scanArgs...)
  105. if err != nil {
  106. log.Error("mysql scan err ", zap.Error(err))
  107. break
  108. }
  109. for i, col := range values {
  110. if col == nil {
  111. ret[columns[i]] = nil
  112. } else {
  113. switch val := (*scanArgs[i].(*interface{})).(type) {
  114. case byte:
  115. ret[columns[i]] = val
  116. break
  117. case []byte:
  118. v := string(val)
  119. switch v {
  120. case "\x00": // 处理数据类型为bit的情况
  121. ret[columns[i]] = 0
  122. case "\x01": // 处理数据类型为bit的情况
  123. ret[columns[i]] = 1
  124. default:
  125. ret[columns[i]] = v
  126. break
  127. }
  128. break
  129. case time.Time:
  130. if val.IsZero() {
  131. ret[columns[i]] = nil
  132. } else {
  133. ret[columns[i]] = val.Format("2006-01-02 15:04:05")
  134. }
  135. break
  136. default:
  137. ret[columns[i]] = val
  138. }
  139. }
  140. }
  141. lastid = util.IntAll(ret["id"])
  142. count++
  143. if count%20000 == 0 {
  144. log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
  145. }
  146. pool <- true
  147. wg.Add(1)
  148. go func(tmp map[string]interface{}) {
  149. defer func() {
  150. <-pool
  151. wg.Done()
  152. }()
  153. cid := util.Int64All(tmp["id"])
  154. iid := util.ObjToString(tmp["infoid"])
  155. name_id := util.ObjToString(tmp["name_id"])
  156. identity_type := util.Int64All(tmp["identity_type+0"])
  157. if name_id != "" {
  158. coll := "bidding"
  159. if iid > "5a862e7040d2d9bbe88e3b1f" {
  160. coll = "bidding"
  161. } else {
  162. coll = "bidding_back"
  163. }
  164. info, _ := MongoB.FindById(coll, iid, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1})
  165. if len(*info) > 0 {
  166. if identity_type == 1 {
  167. if util.ObjToString((*info)["buyertel"]) != "" {
  168. q := make(map[string]interface{})
  169. q["name_id"] = name_id
  170. q["identity_type"] = identity_type
  171. q["contact_tel"] = util.ObjToString((*info)["buyertel"])
  172. if util.ObjToString((*info)["buyerperson"]) != "" {
  173. q["contact_name"] = util.ObjToString((*info)["buyerperson"])
  174. }
  175. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  176. if cinfo != nil && len(*cinfo) > 0 {
  177. MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  178. }
  179. }
  180. } else if identity_type == 2 {
  181. if util.ObjToString((*info)["winnertel"]) != "" {
  182. q := make(map[string]interface{})
  183. q["name_id"] = name_id
  184. q["identity_type"] = identity_type
  185. q["contact_tel"] = util.ObjToString((*info)["winnertel"])
  186. if util.ObjToString((*info)["winnerperson"]) != "" {
  187. q["contact_name"] = util.ObjToString((*info)["winnerperson"])
  188. }
  189. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  190. if cinfo != nil && len(*cinfo) > 0 {
  191. MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  192. }
  193. }
  194. } else if identity_type == 4 {
  195. if util.ObjToString((*info)["agencytel"]) != "" {
  196. q := make(map[string]interface{})
  197. q["name_id"] = name_id
  198. q["identity_type"] = identity_type
  199. q["contact_tel"] = util.ObjToString((*info)["agencytel"])
  200. if util.ObjToString((*info)["agencyperson"]) != "" {
  201. q["contact_name"] = util.ObjToString((*info)["agencyperson"])
  202. }
  203. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  204. if cinfo != nil && len(*cinfo) > 0 {
  205. MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  206. }
  207. }
  208. }
  209. }
  210. }
  211. //redis.PutCKV("qyxy_id", util.ObjToString(tmp["name"]), util.ObjToString(tmp["name_id"]))
  212. }(ret)
  213. ret = make(map[string]interface{})
  214. }
  215. _ = rows.Close()
  216. wg.Wait()
  217. }
  218. }
  219. func taskMgo() {
  220. sess := MongoP.GetMgoConn()
  221. defer MongoP.DestoryMongoConn(sess)
  222. ch := make(chan bool, 3)
  223. wg := &sync.WaitGroup{}
  224. q := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId("63411488911e1eb3459fb87e")}}
  225. field := map[string]interface{}{"ids": 1}
  226. query := sess.DB("qfw").C("projectset_20220721").Find(q).Select(field).Iter()
  227. count := 0
  228. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  229. if count%20000 == 0 {
  230. util.Debug("current ---", count, tmp["_id"])
  231. }
  232. ch <- true
  233. wg.Add(1)
  234. go func(tmp map[string]interface{}) {
  235. defer func() {
  236. <-ch
  237. wg.Done()
  238. }()
  239. id := mongodb.BsonIdToSId(tmp["_id"])
  240. for _, i := range util.ObjArrToStringArr(tmp["ids"].([]interface{})) {
  241. redis.PutCKV("s_id", i, id)
  242. }
  243. }(tmp)
  244. tmp = make(map[string]interface{})
  245. }
  246. wg.Wait()
  247. util.Debug("over ---", count)
  248. }
  249. // @Description 标讯数据
  250. // @Author J 2022/9/20 17:52
  251. func bidding() *cobra.Command {
  252. cmdClient := &cobra.Command{
  253. Use: "bidding",
  254. Short: "Start processing bidding data",
  255. Run: func(cmd *cobra.Command, args []string) {
  256. go SaveFunc()
  257. go SaveTagFunc()
  258. go SaveExpandFunc()
  259. go SaveAttrFunc()
  260. go SaveImfFunc()
  261. go SaveIntentFunc()
  262. go SaveWinnerFunc()
  263. go SavePackageFunc()
  264. go SavePurFunc()
  265. go saveErrMethod()
  266. taskB()
  267. },
  268. }
  269. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  270. return cmdClient
  271. }
  272. // @Description 项目数据
  273. // @Author J 2022/9/20 17:52
  274. func project() *cobra.Command {
  275. cmdClient := &cobra.Command{
  276. Use: "project",
  277. Short: "Start processing project data",
  278. Run: func(cmd *cobra.Command, args []string) {
  279. //go SaveProFunc()
  280. //go SaveProTagFunc()
  281. //go SaveProbFunc()
  282. go SaveRelationFunc()
  283. taskP()
  284. },
  285. }
  286. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  287. return cmdClient
  288. }
  289. func SaveFunc() {
  290. arru := make([]map[string]interface{}, saveSize)
  291. indexu := 0
  292. for {
  293. select {
  294. case v := <-saveBasePool:
  295. arru[indexu] = v
  296. indexu++
  297. if indexu == saveSize {
  298. saveBaseSp <- true
  299. go func(arru []map[string]interface{}) {
  300. defer func() {
  301. <-saveBaseSp
  302. }()
  303. MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
  304. }(arru)
  305. arru = make([]map[string]interface{}, saveSize)
  306. indexu = 0
  307. }
  308. case <-time.After(1000 * time.Millisecond):
  309. if indexu > 0 {
  310. saveBaseSp <- true
  311. go func(arru []map[string]interface{}) {
  312. defer func() {
  313. <-saveBaseSp
  314. }()
  315. MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
  316. }(arru[:indexu])
  317. arru = make([]map[string]interface{}, saveSize)
  318. indexu = 0
  319. }
  320. }
  321. }
  322. }
  323. func SaveExpandFunc() {
  324. arru := make([]map[string]interface{}, saveSize)
  325. indexu := 0
  326. for {
  327. select {
  328. case v := <-saveExpandPool:
  329. arru[indexu] = v
  330. indexu++
  331. if indexu == saveSize {
  332. saveExpandSp <- true
  333. go func(arru []map[string]interface{}) {
  334. defer func() {
  335. <-saveExpandSp
  336. }()
  337. MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
  338. }(arru)
  339. arru = make([]map[string]interface{}, saveSize)
  340. indexu = 0
  341. }
  342. case <-time.After(1000 * time.Millisecond):
  343. if indexu > 0 {
  344. saveExpandSp <- true
  345. go func(arru []map[string]interface{}) {
  346. defer func() {
  347. <-saveExpandSp
  348. }()
  349. MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
  350. }(arru[:indexu])
  351. arru = make([]map[string]interface{}, saveSize)
  352. indexu = 0
  353. }
  354. }
  355. }
  356. }
  357. func SaveTagFunc() {
  358. arru := make([]map[string]interface{}, saveSize)
  359. indexu := 0
  360. for {
  361. select {
  362. case v := <-saveTagPool:
  363. arru[indexu] = v
  364. indexu++
  365. if indexu == saveSize {
  366. saveTagSp <- true
  367. go func(arru []map[string]interface{}) {
  368. defer func() {
  369. <-saveTagSp
  370. }()
  371. MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
  372. }(arru)
  373. arru = make([]map[string]interface{}, saveSize)
  374. indexu = 0
  375. }
  376. case <-time.After(1000 * time.Millisecond):
  377. if indexu > 0 {
  378. saveTagSp <- true
  379. go func(arru []map[string]interface{}) {
  380. defer func() {
  381. <-saveTagSp
  382. }()
  383. MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
  384. }(arru[:indexu])
  385. arru = make([]map[string]interface{}, saveSize)
  386. indexu = 0
  387. }
  388. }
  389. }
  390. }
  391. func SaveAttrFunc() {
  392. arru := make([]map[string]interface{}, saveSize)
  393. indexu := 0
  394. for {
  395. select {
  396. case v := <-saveAttrPool:
  397. arru[indexu] = v
  398. indexu++
  399. if indexu == saveSize {
  400. saveAttrSp <- true
  401. go func(arru []map[string]interface{}) {
  402. defer func() {
  403. <-saveAttrSp
  404. }()
  405. MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
  406. }(arru)
  407. arru = make([]map[string]interface{}, saveSize)
  408. indexu = 0
  409. }
  410. case <-time.After(1000 * time.Millisecond):
  411. if indexu > 0 {
  412. saveAttrSp <- true
  413. go func(arru []map[string]interface{}) {
  414. defer func() {
  415. <-saveAttrSp
  416. }()
  417. MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
  418. }(arru[:indexu])
  419. arru = make([]map[string]interface{}, saveSize)
  420. indexu = 0
  421. }
  422. }
  423. }
  424. }
  425. func SaveImfFunc() {
  426. arru := make([]map[string]interface{}, saveSize)
  427. indexu := 0
  428. for {
  429. select {
  430. case v := <-saveIfmPool:
  431. arru[indexu] = v
  432. indexu++
  433. if indexu == saveSize {
  434. saveIfmSp <- true
  435. go func(arru []map[string]interface{}) {
  436. defer func() {
  437. <-saveIfmSp
  438. }()
  439. MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
  440. }(arru)
  441. arru = make([]map[string]interface{}, saveSize)
  442. indexu = 0
  443. }
  444. case <-time.After(1000 * time.Millisecond):
  445. if indexu > 0 {
  446. saveIfmSp <- true
  447. go func(arru []map[string]interface{}) {
  448. defer func() {
  449. <-saveIfmSp
  450. }()
  451. MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
  452. }(arru[:indexu])
  453. arru = make([]map[string]interface{}, saveSize)
  454. indexu = 0
  455. }
  456. }
  457. }
  458. }
  459. func SavePurFunc() {
  460. arru := make([]map[string]interface{}, saveSize)
  461. indexu := 0
  462. for {
  463. select {
  464. case v := <-savePurPool:
  465. arru[indexu] = v
  466. indexu++
  467. if indexu == saveSize {
  468. savePurSp <- true
  469. go func(arru []map[string]interface{}) {
  470. defer func() {
  471. <-savePurSp
  472. }()
  473. MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
  474. }(arru)
  475. arru = make([]map[string]interface{}, saveSize)
  476. indexu = 0
  477. }
  478. case <-time.After(1000 * time.Millisecond):
  479. if indexu > 0 {
  480. savePurSp <- true
  481. go func(arru []map[string]interface{}) {
  482. defer func() {
  483. <-savePurSp
  484. }()
  485. MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
  486. }(arru[:indexu])
  487. arru = make([]map[string]interface{}, saveSize)
  488. indexu = 0
  489. }
  490. }
  491. }
  492. }
  493. func SaveIntentFunc() {
  494. arru := make([]map[string]interface{}, saveSize)
  495. indexu := 0
  496. for {
  497. select {
  498. case v := <-saveIntentPool:
  499. arru[indexu] = v
  500. indexu++
  501. if indexu == saveSize {
  502. saveIntentSp <- true
  503. go func(arru []map[string]interface{}) {
  504. defer func() {
  505. <-saveIntentSp
  506. }()
  507. MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
  508. }(arru)
  509. arru = make([]map[string]interface{}, saveSize)
  510. indexu = 0
  511. }
  512. case <-time.After(1000 * time.Millisecond):
  513. if indexu > 0 {
  514. saveIntentSp <- true
  515. go func(arru []map[string]interface{}) {
  516. defer func() {
  517. <-saveIntentSp
  518. }()
  519. MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
  520. }(arru[:indexu])
  521. arru = make([]map[string]interface{}, saveSize)
  522. indexu = 0
  523. }
  524. }
  525. }
  526. }
  527. func SaveWinnerFunc() {
  528. arru := make([]map[string]interface{}, saveSize)
  529. indexu := 0
  530. for {
  531. select {
  532. case v := <-saveWinnerPool:
  533. arru[indexu] = v
  534. indexu++
  535. if indexu == saveSize {
  536. saveWinnerSp <- true
  537. go func(arru []map[string]interface{}) {
  538. defer func() {
  539. <-saveWinnerSp
  540. }()
  541. MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
  542. }(arru)
  543. arru = make([]map[string]interface{}, saveSize)
  544. indexu = 0
  545. }
  546. case <-time.After(1000 * time.Millisecond):
  547. if indexu > 0 {
  548. saveWinnerSp <- true
  549. go func(arru []map[string]interface{}) {
  550. defer func() {
  551. <-saveWinnerSp
  552. }()
  553. MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
  554. }(arru[:indexu])
  555. arru = make([]map[string]interface{}, saveSize)
  556. indexu = 0
  557. }
  558. }
  559. }
  560. }
  561. func SavePackageFunc() {
  562. arru := make([]map[string]interface{}, saveSize)
  563. indexu := 0
  564. for {
  565. select {
  566. case v := <-savePkgPool:
  567. arru[indexu] = v
  568. indexu++
  569. if indexu == saveSize {
  570. savePkgSp <- true
  571. go func(arru []map[string]interface{}) {
  572. defer func() {
  573. <-savePkgSp
  574. }()
  575. MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
  576. }(arru)
  577. arru = make([]map[string]interface{}, saveSize)
  578. indexu = 0
  579. }
  580. case <-time.After(1000 * time.Millisecond):
  581. if indexu > 0 {
  582. savePkgSp <- true
  583. go func(arru []map[string]interface{}) {
  584. defer func() {
  585. <-savePkgSp
  586. }()
  587. MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
  588. }(arru[:indexu])
  589. arru = make([]map[string]interface{}, saveSize)
  590. indexu = 0
  591. }
  592. }
  593. }
  594. }
  595. func SaveProFunc() {
  596. arru := make([]map[string]interface{}, saveSize)
  597. indexu := 0
  598. for {
  599. select {
  600. case v := <-saveProPool:
  601. arru[indexu] = v
  602. indexu++
  603. if indexu == saveSize {
  604. saveProSp <- true
  605. go func(arru []map[string]interface{}) {
  606. defer func() {
  607. <-saveProSp
  608. }()
  609. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  610. }(arru)
  611. arru = make([]map[string]interface{}, saveSize)
  612. indexu = 0
  613. }
  614. case <-time.After(1000 * time.Millisecond):
  615. if indexu > 0 {
  616. saveProSp <- true
  617. go func(arru []map[string]interface{}) {
  618. defer func() {
  619. <-saveProSp
  620. }()
  621. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  622. }(arru[:indexu])
  623. arru = make([]map[string]interface{}, saveSize)
  624. indexu = 0
  625. }
  626. }
  627. }
  628. }
  629. func SaveProbFunc() {
  630. arru := make([]map[string]interface{}, saveSize)
  631. indexu := 0
  632. for {
  633. select {
  634. case v := <-saveProbPool:
  635. arru[indexu] = v
  636. indexu++
  637. if indexu == saveSize {
  638. saveProbSp <- true
  639. go func(arru []map[string]interface{}) {
  640. defer func() {
  641. <-saveProbSp
  642. }()
  643. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  644. }(arru)
  645. arru = make([]map[string]interface{}, saveSize)
  646. indexu = 0
  647. }
  648. case <-time.After(1000 * time.Millisecond):
  649. if indexu > 0 {
  650. saveProbSp <- true
  651. go func(arru []map[string]interface{}) {
  652. defer func() {
  653. <-saveProbSp
  654. }()
  655. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  656. }(arru[:indexu])
  657. arru = make([]map[string]interface{}, saveSize)
  658. indexu = 0
  659. }
  660. }
  661. }
  662. }
  663. func SaveProTagFunc() {
  664. arru := make([]map[string]interface{}, saveSize)
  665. indexu := 0
  666. for {
  667. select {
  668. case v := <-saveProTagPool:
  669. arru[indexu] = v
  670. indexu++
  671. if indexu == saveSize {
  672. saveProTagSp <- true
  673. go func(arru []map[string]interface{}) {
  674. defer func() {
  675. <-saveProTagSp
  676. }()
  677. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  678. }(arru)
  679. arru = make([]map[string]interface{}, saveSize)
  680. indexu = 0
  681. }
  682. case <-time.After(1000 * time.Millisecond):
  683. if indexu > 0 {
  684. saveProTagSp <- true
  685. go func(arru []map[string]interface{}) {
  686. defer func() {
  687. <-saveProTagSp
  688. }()
  689. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  690. }(arru[:indexu])
  691. arru = make([]map[string]interface{}, saveSize)
  692. indexu = 0
  693. }
  694. }
  695. }
  696. }
  697. func SaveRelationFunc() {
  698. arru := make([]map[string]interface{}, saveSize)
  699. indexu := 0
  700. for {
  701. select {
  702. case v := <-saveRelationPool:
  703. arru[indexu] = v
  704. indexu++
  705. if indexu == saveSize {
  706. saveRelationSp <- true
  707. go func(arru []map[string]interface{}) {
  708. defer func() {
  709. <-saveRelationSp
  710. }()
  711. MysqlTool.InsertBulk("dws_f_bpmc_relation_new", RelationField, arru...)
  712. }(arru)
  713. arru = make([]map[string]interface{}, saveSize)
  714. indexu = 0
  715. }
  716. case <-time.After(1000 * time.Millisecond):
  717. if indexu > 0 {
  718. saveRelationSp <- true
  719. go func(arru []map[string]interface{}) {
  720. defer func() {
  721. <-saveRelationSp
  722. }()
  723. MysqlTool.InsertBulk("dws_f_bpmc_relation_new", RelationField, arru...)
  724. }(arru[:indexu])
  725. arru = make([]map[string]interface{}, saveSize)
  726. indexu = 0
  727. }
  728. }
  729. }
  730. }
  731. // 字段错误数据
  732. func saveErrMethod() {
  733. arru := make([]map[string]interface{}, 200)
  734. indexu := 0
  735. for {
  736. select {
  737. case v := <-saveErrPool:
  738. arru[indexu] = v
  739. indexu++
  740. if indexu == saveSize {
  741. saveErrSp <- true
  742. go func(arru []map[string]interface{}) {
  743. defer func() {
  744. <-saveErrSp
  745. }()
  746. MongoB.SaveBulk("bidding_tidb_f_err", arru...)
  747. }(arru)
  748. arru = make([]map[string]interface{}, saveSize)
  749. indexu = 0
  750. }
  751. case <-time.After(1000 * time.Millisecond):
  752. if indexu > 0 {
  753. saveErrSp <- true
  754. go func(arru []map[string]interface{}) {
  755. defer func() {
  756. <-saveErrSp
  757. }()
  758. MongoB.SaveBulk("bidding_tidb_f_err", arru...)
  759. }(arru[:indexu])
  760. arru = make([]map[string]interface{}, saveSize)
  761. indexu = 0
  762. }
  763. }
  764. }
  765. }