main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883
  1. package main
  2. import (
  3. "data_tidb/config"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/robfig/cron"
  7. "github.com/spf13/cobra"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.uber.org/zap"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  15. "net"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. UdpClient udp.UdpClient
  21. Pici int64
  22. piciLock sync.Mutex
  23. )
  24. func init() {
  25. config.Init("./common.toml")
  26. InitLog()
  27. InitMgo()
  28. InitMysql()
  29. InitField()
  30. redis.InitRedis1("qyxy_id=127.0.0.1:8379", 1)
  31. redis.InitRedis1("project_ids=127.0.0.1:8379", 0)
  32. //redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1)
  33. log.Info("init success")
  34. }
  35. func main() {
  36. //
  37. //go SaveFunc()
  38. //go SaveTagFunc()
  39. //go SaveExpandFunc()
  40. //go SaveAttrFunc()
  41. //go SaveImfFunc()
  42. //go SaveIntentFunc()
  43. //go SaveWinnerFunc()
  44. //go SavePackageFunc()
  45. //go SavePurFunc()
  46. //go saveErrMethod()
  47. rootCmd := &cobra.Command{Use: "my cmd"}
  48. rootCmd.AddCommand(bidding())
  49. rootCmd.AddCommand(relation())
  50. rootCmd.AddCommand(projectAdd())
  51. rootCmd.AddCommand(redisPid())
  52. rootCmd.AddCommand(repairRel())
  53. if err := rootCmd.Execute(); err != nil {
  54. fmt.Println("rootCmd.Execute failed", err.Error())
  55. }
  56. //go SaveRelationFunc()
  57. //taskMysql()
  58. //UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  59. //UdpClient.Listen(processUdpMsg)
  60. //log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  61. c := make(chan bool, 1)
  62. <-c
  63. }
  64. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  65. defer util.Catch()
  66. switch act {
  67. case udp.OP_TYPE_DATA: //上个节点的数据
  68. var mapInfo map[string]interface{}
  69. err := json.Unmarshal(data, &mapInfo)
  70. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  71. gtid, _ := mapInfo["gtid"].(string)
  72. lteid, _ := mapInfo["lteid"].(string)
  73. if err != nil {
  74. UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写
  75. } else {
  76. //udp成功回写
  77. if k := util.ObjToString(mapInfo["key"]); k != "" {
  78. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  79. } else {
  80. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  81. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  82. }
  83. log.Info("start sync ...")
  84. doBiddingTask(gtid, lteid, mapInfo)
  85. }
  86. }
  87. }
  88. func taskMysql() {
  89. pool := make(chan bool, 5) //控制线程数
  90. wg := &sync.WaitGroup{}
  91. finalId := 0
  92. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation"))
  93. //lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation_new"))
  94. if len(*lastInfo) > 0 {
  95. finalId = util.IntAll((*lastInfo)[0]["id"])
  96. }
  97. log.Info("查询最后id---", zap.Int("finally id: ", finalId))
  98. lastid, count := 0, 0
  99. for {
  100. log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
  101. q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_bpmc_relation", lastid)
  102. //q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id=61771536 ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation")
  103. //q := fmt.Sprintf("SELECT id, name, name_id FROM %s WHERE id>%d ORDER BY id ASC limit 1000000", "dws_f_ent_baseinfo", lastid)
  104. rows, err := MysqlTool.DB.Query(q)
  105. if err != nil {
  106. log.Error("mysql query err ", zap.Error(err))
  107. }
  108. columns, err := rows.Columns()
  109. if finalId == lastid {
  110. log.Info("----finish-----", zap.Int("count: ", count))
  111. break
  112. }
  113. for rows.Next() {
  114. scanArgs := make([]interface{}, len(columns))
  115. values := make([]interface{}, len(columns))
  116. ret := make(map[string]interface{})
  117. for k := range values {
  118. scanArgs[k] = &values[k]
  119. }
  120. err = rows.Scan(scanArgs...)
  121. if err != nil {
  122. log.Error("mysql scan err ", zap.Error(err))
  123. break
  124. }
  125. for i, col := range values {
  126. if col == nil {
  127. ret[columns[i]] = nil
  128. } else {
  129. switch val := (*scanArgs[i].(*interface{})).(type) {
  130. case byte:
  131. ret[columns[i]] = val
  132. break
  133. case []byte:
  134. v := string(val)
  135. switch v {
  136. case "\x00": // 处理数据类型为bit的情况
  137. ret[columns[i]] = 0
  138. case "\x01": // 处理数据类型为bit的情况
  139. ret[columns[i]] = 1
  140. default:
  141. ret[columns[i]] = v
  142. break
  143. }
  144. break
  145. case time.Time:
  146. if val.IsZero() {
  147. ret[columns[i]] = nil
  148. } else {
  149. ret[columns[i]] = val.Format("2006-01-02 15:04:05")
  150. }
  151. break
  152. default:
  153. ret[columns[i]] = val
  154. }
  155. }
  156. }
  157. lastid = util.IntAll(ret["id"])
  158. count++
  159. if count%20000 == 0 {
  160. log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
  161. }
  162. pool <- true
  163. wg.Add(1)
  164. go func(tmp map[string]interface{}) {
  165. defer func() {
  166. <-pool
  167. wg.Done()
  168. }()
  169. //cid := util.Int64All(tmp["id"])
  170. //iid := util.ObjToString(tmp["infoid"])
  171. //name_id := util.ObjToString(tmp["name_id"])
  172. //identity_type := util.Int64All(tmp["identity_type+0"])
  173. //if name_id != "" {
  174. // coll := "bidding"
  175. // if iid > "5a862e7040d2d9bbe88e3b1f" {
  176. // coll = "bidding"
  177. // } else {
  178. // coll = "bidding_back"
  179. // }
  180. // info, _ := MongoB.FindById(coll, iid, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1})
  181. // if len(*info) > 0 {
  182. // if identity_type == 1 {
  183. // if util.ObjToString((*info)["buyertel"]) != "" {
  184. // q := make(map[string]interface{})
  185. // q["name_id"] = name_id
  186. // q["identity_type"] = identity_type
  187. // q["contact_tel"] = util.ObjToString((*info)["buyertel"])
  188. // if util.ObjToString((*info)["buyerperson"]) != "" {
  189. // q["contact_name"] = util.ObjToString((*info)["buyerperson"])
  190. // }
  191. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  192. // if cinfo != nil && len(*cinfo) > 0 {
  193. // MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  194. // }
  195. // }
  196. // } else if identity_type == 2 {
  197. // if util.ObjToString((*info)["winnertel"]) != "" {
  198. // q := make(map[string]interface{})
  199. // q["name_id"] = name_id
  200. // q["identity_type"] = identity_type
  201. // q["contact_tel"] = util.ObjToString((*info)["winnertel"])
  202. // if util.ObjToString((*info)["winnerperson"]) != "" {
  203. // q["contact_name"] = util.ObjToString((*info)["winnerperson"])
  204. // }
  205. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  206. // if cinfo != nil && len(*cinfo) > 0 {
  207. // MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  208. // }
  209. // }
  210. // } else if identity_type == 4 {
  211. // if util.ObjToString((*info)["agencytel"]) != "" {
  212. // q := make(map[string]interface{})
  213. // q["name_id"] = name_id
  214. // q["identity_type"] = identity_type
  215. // q["contact_tel"] = util.ObjToString((*info)["agencytel"])
  216. // if util.ObjToString((*info)["agencyperson"]) != "" {
  217. // q["contact_name"] = util.ObjToString((*info)["agencyperson"])
  218. // }
  219. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  220. // if cinfo != nil && len(*cinfo) > 0 {
  221. // MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  222. // }
  223. // }
  224. // }
  225. // }
  226. //}
  227. //redis.PutCKV("qyxy_id", util.ObjToString(tmp["name"]), util.ObjToString(tmp["name_id"]))
  228. saveM := make(map[string]interface{})
  229. if util.ObjToString(tmp["name_id"]) != "" {
  230. saveM["name_id"] = util.ObjToString(tmp["name_id"])
  231. } else {
  232. return
  233. }
  234. if util.ObjToString(tmp["contact_id"]) != "" {
  235. saveM["contact_id"] = util.IntAll(tmp["contact_id"])
  236. } else {
  237. return
  238. }
  239. saveM["projectid"] = util.ObjToString(tmp["projectid"])
  240. saveM["infoid"] = util.ObjToString(tmp["infoid"])
  241. saveM["identity_type"] = tmp["identity_type"]
  242. saveRelationPool <- saveM
  243. }(ret)
  244. ret = make(map[string]interface{})
  245. }
  246. _ = rows.Close()
  247. wg.Wait()
  248. }
  249. }
  250. func taskMgo() {
  251. sess := MongoP.GetMgoConn()
  252. defer MongoP.DestoryMongoConn(sess)
  253. ch := make(chan bool, 3)
  254. wg := &sync.WaitGroup{}
  255. q := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId("63411488911e1eb3459fb87e")}}
  256. field := map[string]interface{}{"ids": 1}
  257. query := sess.DB("qfw").C("projectset_20220721").Find(q).Select(field).Iter()
  258. count := 0
  259. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  260. if count%20000 == 0 {
  261. util.Debug("current ---", count, tmp["_id"])
  262. }
  263. ch <- true
  264. wg.Add(1)
  265. go func(tmp map[string]interface{}) {
  266. defer func() {
  267. <-ch
  268. wg.Done()
  269. }()
  270. id := mongodb.BsonIdToSId(tmp["_id"])
  271. for _, i := range util.ObjArrToStringArr(tmp["ids"].([]interface{})) {
  272. redis.PutCKV("s_id", i, id)
  273. }
  274. }(tmp)
  275. tmp = make(map[string]interface{})
  276. }
  277. wg.Wait()
  278. util.Debug("over ---", count)
  279. }
  280. // @Description 标讯数据
  281. // @Author J 2022/9/20 17:52
  282. func bidding() *cobra.Command {
  283. cmdClient := &cobra.Command{
  284. Use: "bidding",
  285. Short: "Start processing bidding data",
  286. Run: func(cmd *cobra.Command, args []string) {
  287. go SaveFunc()
  288. go SaveTagFunc()
  289. go SaveExpandFunc()
  290. go SaveAttrFunc()
  291. go SaveImfFunc()
  292. go SaveIntentFunc()
  293. go SaveWinnerFunc()
  294. go SavePackageFunc()
  295. go SavePurFunc()
  296. go saveErrMethod()
  297. taskB()
  298. },
  299. }
  300. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  301. return cmdClient
  302. }
  303. // @Description 企业联系人关系表
  304. // @Author J 2022/9/20 17:52
  305. func relation() *cobra.Command {
  306. cmdClient := &cobra.Command{
  307. Use: "relation",
  308. Short: "Start processing relation data",
  309. Run: func(cmd *cobra.Command, args []string) {
  310. InitEs()
  311. go SaveRelationFunc()
  312. taskR()
  313. },
  314. }
  315. return cmdClient
  316. }
  317. // @Description 项目数据(目前仅关系表数据)
  318. // @Author J 2022/9/20 17:52
  319. func projectAdd() *cobra.Command {
  320. cmdClient := &cobra.Command{
  321. Use: "project",
  322. Short: "Start processing project data",
  323. Run: func(cmd *cobra.Command, args []string) {
  324. InitEs()
  325. //go SaveProFunc()
  326. //go SaveProTagFunc()
  327. //go SaveProbFunc()
  328. go SaveRelationFunc()
  329. taskR()
  330. crn := cron.New()
  331. cronstr := "0 30 * * * *" // 每30min执行一次
  332. _ = crn.AddFunc(cronstr, func() {
  333. //taskPAdd()
  334. taskR()
  335. })
  336. crn.Start()
  337. },
  338. }
  339. cmdClient.Flags().Int64VarP(&Pici, "pici", "p", 0, "")
  340. return cmdClient
  341. }
  342. func redisPid() *cobra.Command {
  343. cmdClient := &cobra.Command{
  344. Use: "redis-pid",
  345. Short: "Start processing redis pid data",
  346. Run: func(cmd *cobra.Command, args []string) {
  347. taskRedisPid()
  348. },
  349. }
  350. return cmdClient
  351. }
  352. func repairRel() *cobra.Command {
  353. cmdClient := &cobra.Command{
  354. Use: "repair-relation",
  355. Short: "Start processing redis pid data",
  356. Run: func(cmd *cobra.Command, args []string) {
  357. taskRepair()
  358. },
  359. }
  360. return cmdClient
  361. }
  362. func SaveFunc() {
  363. arru := make([]map[string]interface{}, saveSize)
  364. indexu := 0
  365. for {
  366. select {
  367. case v := <-saveBasePool:
  368. arru[indexu] = v
  369. indexu++
  370. if indexu == saveSize {
  371. saveBaseSp <- true
  372. go func(arru []map[string]interface{}) {
  373. defer func() {
  374. <-saveBaseSp
  375. }()
  376. MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
  377. }(arru)
  378. arru = make([]map[string]interface{}, saveSize)
  379. indexu = 0
  380. }
  381. case <-time.After(1000 * time.Millisecond):
  382. if indexu > 0 {
  383. saveBaseSp <- true
  384. go func(arru []map[string]interface{}) {
  385. defer func() {
  386. <-saveBaseSp
  387. }()
  388. MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
  389. }(arru[:indexu])
  390. arru = make([]map[string]interface{}, saveSize)
  391. indexu = 0
  392. }
  393. }
  394. }
  395. }
  396. func SaveExpandFunc() {
  397. arru := make([]map[string]interface{}, saveSize)
  398. indexu := 0
  399. for {
  400. select {
  401. case v := <-saveExpandPool:
  402. arru[indexu] = v
  403. indexu++
  404. if indexu == saveSize {
  405. saveExpandSp <- true
  406. go func(arru []map[string]interface{}) {
  407. defer func() {
  408. <-saveExpandSp
  409. }()
  410. MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
  411. }(arru)
  412. arru = make([]map[string]interface{}, saveSize)
  413. indexu = 0
  414. }
  415. case <-time.After(1000 * time.Millisecond):
  416. if indexu > 0 {
  417. saveExpandSp <- true
  418. go func(arru []map[string]interface{}) {
  419. defer func() {
  420. <-saveExpandSp
  421. }()
  422. MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
  423. }(arru[:indexu])
  424. arru = make([]map[string]interface{}, saveSize)
  425. indexu = 0
  426. }
  427. }
  428. }
  429. }
  430. func SaveTagFunc() {
  431. arru := make([]map[string]interface{}, saveSize)
  432. indexu := 0
  433. for {
  434. select {
  435. case v := <-saveTagPool:
  436. arru[indexu] = v
  437. indexu++
  438. if indexu == saveSize {
  439. saveTagSp <- true
  440. go func(arru []map[string]interface{}) {
  441. defer func() {
  442. <-saveTagSp
  443. }()
  444. MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
  445. }(arru)
  446. arru = make([]map[string]interface{}, saveSize)
  447. indexu = 0
  448. }
  449. case <-time.After(1000 * time.Millisecond):
  450. if indexu > 0 {
  451. saveTagSp <- true
  452. go func(arru []map[string]interface{}) {
  453. defer func() {
  454. <-saveTagSp
  455. }()
  456. MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
  457. }(arru[:indexu])
  458. arru = make([]map[string]interface{}, saveSize)
  459. indexu = 0
  460. }
  461. }
  462. }
  463. }
  464. func SaveAttrFunc() {
  465. arru := make([]map[string]interface{}, saveSize)
  466. indexu := 0
  467. for {
  468. select {
  469. case v := <-saveAttrPool:
  470. arru[indexu] = v
  471. indexu++
  472. if indexu == saveSize {
  473. saveAttrSp <- true
  474. go func(arru []map[string]interface{}) {
  475. defer func() {
  476. <-saveAttrSp
  477. }()
  478. MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
  479. }(arru)
  480. arru = make([]map[string]interface{}, saveSize)
  481. indexu = 0
  482. }
  483. case <-time.After(1000 * time.Millisecond):
  484. if indexu > 0 {
  485. saveAttrSp <- true
  486. go func(arru []map[string]interface{}) {
  487. defer func() {
  488. <-saveAttrSp
  489. }()
  490. MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
  491. }(arru[:indexu])
  492. arru = make([]map[string]interface{}, saveSize)
  493. indexu = 0
  494. }
  495. }
  496. }
  497. }
  498. func SaveImfFunc() {
  499. arru := make([]map[string]interface{}, saveSize)
  500. indexu := 0
  501. for {
  502. select {
  503. case v := <-saveIfmPool:
  504. arru[indexu] = v
  505. indexu++
  506. if indexu == saveSize {
  507. saveIfmSp <- true
  508. go func(arru []map[string]interface{}) {
  509. defer func() {
  510. <-saveIfmSp
  511. }()
  512. MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
  513. }(arru)
  514. arru = make([]map[string]interface{}, saveSize)
  515. indexu = 0
  516. }
  517. case <-time.After(1000 * time.Millisecond):
  518. if indexu > 0 {
  519. saveIfmSp <- true
  520. go func(arru []map[string]interface{}) {
  521. defer func() {
  522. <-saveIfmSp
  523. }()
  524. MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
  525. }(arru[:indexu])
  526. arru = make([]map[string]interface{}, saveSize)
  527. indexu = 0
  528. }
  529. }
  530. }
  531. }
  532. func SavePurFunc() {
  533. arru := make([]map[string]interface{}, saveSize)
  534. indexu := 0
  535. for {
  536. select {
  537. case v := <-savePurPool:
  538. arru[indexu] = v
  539. indexu++
  540. if indexu == saveSize {
  541. savePurSp <- true
  542. go func(arru []map[string]interface{}) {
  543. defer func() {
  544. <-savePurSp
  545. }()
  546. MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
  547. }(arru)
  548. arru = make([]map[string]interface{}, saveSize)
  549. indexu = 0
  550. }
  551. case <-time.After(1000 * time.Millisecond):
  552. if indexu > 0 {
  553. savePurSp <- true
  554. go func(arru []map[string]interface{}) {
  555. defer func() {
  556. <-savePurSp
  557. }()
  558. MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
  559. }(arru[:indexu])
  560. arru = make([]map[string]interface{}, saveSize)
  561. indexu = 0
  562. }
  563. }
  564. }
  565. }
  566. func SaveIntentFunc() {
  567. arru := make([]map[string]interface{}, saveSize)
  568. indexu := 0
  569. for {
  570. select {
  571. case v := <-saveIntentPool:
  572. arru[indexu] = v
  573. indexu++
  574. if indexu == saveSize {
  575. saveIntentSp <- true
  576. go func(arru []map[string]interface{}) {
  577. defer func() {
  578. <-saveIntentSp
  579. }()
  580. MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
  581. }(arru)
  582. arru = make([]map[string]interface{}, saveSize)
  583. indexu = 0
  584. }
  585. case <-time.After(1000 * time.Millisecond):
  586. if indexu > 0 {
  587. saveIntentSp <- true
  588. go func(arru []map[string]interface{}) {
  589. defer func() {
  590. <-saveIntentSp
  591. }()
  592. MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
  593. }(arru[:indexu])
  594. arru = make([]map[string]interface{}, saveSize)
  595. indexu = 0
  596. }
  597. }
  598. }
  599. }
  600. func SaveWinnerFunc() {
  601. arru := make([]map[string]interface{}, saveSize)
  602. indexu := 0
  603. for {
  604. select {
  605. case v := <-saveWinnerPool:
  606. arru[indexu] = v
  607. indexu++
  608. if indexu == saveSize {
  609. saveWinnerSp <- true
  610. go func(arru []map[string]interface{}) {
  611. defer func() {
  612. <-saveWinnerSp
  613. }()
  614. MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
  615. }(arru)
  616. arru = make([]map[string]interface{}, saveSize)
  617. indexu = 0
  618. }
  619. case <-time.After(1000 * time.Millisecond):
  620. if indexu > 0 {
  621. saveWinnerSp <- true
  622. go func(arru []map[string]interface{}) {
  623. defer func() {
  624. <-saveWinnerSp
  625. }()
  626. MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
  627. }(arru[:indexu])
  628. arru = make([]map[string]interface{}, saveSize)
  629. indexu = 0
  630. }
  631. }
  632. }
  633. }
  634. func SavePackageFunc() {
  635. arru := make([]map[string]interface{}, saveSize)
  636. indexu := 0
  637. for {
  638. select {
  639. case v := <-savePkgPool:
  640. arru[indexu] = v
  641. indexu++
  642. if indexu == saveSize {
  643. savePkgSp <- true
  644. go func(arru []map[string]interface{}) {
  645. defer func() {
  646. <-savePkgSp
  647. }()
  648. MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
  649. }(arru)
  650. arru = make([]map[string]interface{}, saveSize)
  651. indexu = 0
  652. }
  653. case <-time.After(1000 * time.Millisecond):
  654. if indexu > 0 {
  655. savePkgSp <- true
  656. go func(arru []map[string]interface{}) {
  657. defer func() {
  658. <-savePkgSp
  659. }()
  660. MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
  661. }(arru[:indexu])
  662. arru = make([]map[string]interface{}, saveSize)
  663. indexu = 0
  664. }
  665. }
  666. }
  667. }
  668. func SaveProFunc() {
  669. arru := make([]map[string]interface{}, saveSize)
  670. indexu := 0
  671. for {
  672. select {
  673. case v := <-saveProPool:
  674. arru[indexu] = v
  675. indexu++
  676. if indexu == saveSize {
  677. saveProSp <- true
  678. go func(arru []map[string]interface{}) {
  679. defer func() {
  680. <-saveProSp
  681. }()
  682. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  683. }(arru)
  684. arru = make([]map[string]interface{}, saveSize)
  685. indexu = 0
  686. }
  687. case <-time.After(1000 * time.Millisecond):
  688. if indexu > 0 {
  689. saveProSp <- true
  690. go func(arru []map[string]interface{}) {
  691. defer func() {
  692. <-saveProSp
  693. }()
  694. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  695. }(arru[:indexu])
  696. arru = make([]map[string]interface{}, saveSize)
  697. indexu = 0
  698. }
  699. }
  700. }
  701. }
  702. func SaveProbFunc() {
  703. arru := make([]map[string]interface{}, saveSize)
  704. indexu := 0
  705. for {
  706. select {
  707. case v := <-saveProbPool:
  708. arru[indexu] = v
  709. indexu++
  710. if indexu == saveSize {
  711. saveProbSp <- true
  712. go func(arru []map[string]interface{}) {
  713. defer func() {
  714. <-saveProbSp
  715. }()
  716. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  717. }(arru)
  718. arru = make([]map[string]interface{}, saveSize)
  719. indexu = 0
  720. }
  721. case <-time.After(1000 * time.Millisecond):
  722. if indexu > 0 {
  723. saveProbSp <- true
  724. go func(arru []map[string]interface{}) {
  725. defer func() {
  726. <-saveProbSp
  727. }()
  728. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  729. }(arru[:indexu])
  730. arru = make([]map[string]interface{}, saveSize)
  731. indexu = 0
  732. }
  733. }
  734. }
  735. }
  736. func SaveProTagFunc() {
  737. arru := make([]map[string]interface{}, saveSize)
  738. indexu := 0
  739. for {
  740. select {
  741. case v := <-saveProTagPool:
  742. arru[indexu] = v
  743. indexu++
  744. if indexu == saveSize {
  745. saveProTagSp <- true
  746. go func(arru []map[string]interface{}) {
  747. defer func() {
  748. <-saveProTagSp
  749. }()
  750. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  751. }(arru)
  752. arru = make([]map[string]interface{}, saveSize)
  753. indexu = 0
  754. }
  755. case <-time.After(1000 * time.Millisecond):
  756. if indexu > 0 {
  757. saveProTagSp <- true
  758. go func(arru []map[string]interface{}) {
  759. defer func() {
  760. <-saveProTagSp
  761. }()
  762. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  763. }(arru[:indexu])
  764. arru = make([]map[string]interface{}, saveSize)
  765. indexu = 0
  766. }
  767. }
  768. }
  769. }
  770. func SaveRelationFunc() {
  771. arru := make([]map[string]interface{}, saveSize)
  772. indexu := 0
  773. for {
  774. select {
  775. case v := <-saveRelationPool:
  776. arru[indexu] = v
  777. indexu++
  778. if indexu == saveSize {
  779. saveRelationSp <- true
  780. go func(arru []map[string]interface{}) {
  781. defer func() {
  782. <-saveRelationSp
  783. }()
  784. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  785. }(arru)
  786. arru = make([]map[string]interface{}, saveSize)
  787. indexu = 0
  788. }
  789. case <-time.After(1000 * time.Millisecond):
  790. if indexu > 0 {
  791. saveRelationSp <- true
  792. go func(arru []map[string]interface{}) {
  793. defer func() {
  794. <-saveRelationSp
  795. }()
  796. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  797. }(arru[:indexu])
  798. arru = make([]map[string]interface{}, saveSize)
  799. indexu = 0
  800. }
  801. }
  802. }
  803. }
  804. // 字段错误数据
  805. func saveErrMethod() {
  806. arru := make([]map[string]interface{}, 200)
  807. indexu := 0
  808. for {
  809. select {
  810. case v := <-saveErrPool:
  811. arru[indexu] = v
  812. indexu++
  813. if indexu == saveSize {
  814. saveErrSp <- true
  815. go func(arru []map[string]interface{}) {
  816. defer func() {
  817. <-saveErrSp
  818. }()
  819. MongoB.SaveBulk("bidding_tidb_f_err", arru...)
  820. }(arru)
  821. arru = make([]map[string]interface{}, saveSize)
  822. indexu = 0
  823. }
  824. case <-time.After(1000 * time.Millisecond):
  825. if indexu > 0 {
  826. saveErrSp <- true
  827. go func(arru []map[string]interface{}) {
  828. defer func() {
  829. <-saveErrSp
  830. }()
  831. MongoB.SaveBulk("bidding_tidb_f_err", arru...)
  832. }(arru[:indexu])
  833. arru = make([]map[string]interface{}, saveSize)
  834. indexu = 0
  835. }
  836. }
  837. }
  838. }