main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856
  1. package main
  2. import (
  3. "data_tidb/config"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/robfig/cron"
  7. "github.com/spf13/cobra"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.uber.org/zap"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  15. "net"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. UdpClient udp.UdpClient
  21. Pici int64
  22. )
  23. func init() {
  24. config.Init("./common.toml")
  25. InitLog()
  26. InitMgo()
  27. InitMysql()
  28. InitField()
  29. redis.InitRedis1("qyxy_id=127.0.0.1:8379", 1)
  30. redis.InitRedis1("project_ids=127.0.0.1:8379", 0)
  31. //redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1)
  32. log.Info("init success")
  33. }
  34. func main() {
  35. //
  36. //go SaveFunc()
  37. //go SaveTagFunc()
  38. //go SaveExpandFunc()
  39. //go SaveAttrFunc()
  40. //go SaveImfFunc()
  41. //go SaveIntentFunc()
  42. //go SaveWinnerFunc()
  43. //go SavePackageFunc()
  44. //go SavePurFunc()
  45. //go saveErrMethod()
  46. rootCmd := &cobra.Command{Use: "my cmd"}
  47. rootCmd.AddCommand(bidding())
  48. rootCmd.AddCommand(relation())
  49. rootCmd.AddCommand(projectAdd())
  50. if err := rootCmd.Execute(); err != nil {
  51. fmt.Println("rootCmd.Execute failed", err.Error())
  52. }
  53. //go SaveRelationFunc()
  54. //taskMysql()
  55. //UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  56. //UdpClient.Listen(processUdpMsg)
  57. //log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  58. c := make(chan bool, 1)
  59. <-c
  60. }
  61. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  62. defer util.Catch()
  63. switch act {
  64. case udp.OP_TYPE_DATA: //上个节点的数据
  65. var mapInfo map[string]interface{}
  66. err := json.Unmarshal(data, &mapInfo)
  67. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  68. gtid, _ := mapInfo["gtid"].(string)
  69. lteid, _ := mapInfo["lteid"].(string)
  70. if err != nil {
  71. UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写
  72. } else {
  73. //udp成功回写
  74. if k := util.ObjToString(mapInfo["key"]); k != "" {
  75. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  76. } else {
  77. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  78. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  79. }
  80. log.Info("start sync ...")
  81. doBiddingTask(gtid, lteid, mapInfo)
  82. }
  83. }
  84. }
  85. func taskMysql() {
  86. pool := make(chan bool, 5) //控制线程数
  87. wg := &sync.WaitGroup{}
  88. finalId := 0
  89. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation"))
  90. //lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation_new"))
  91. if len(*lastInfo) > 0 {
  92. finalId = util.IntAll((*lastInfo)[0]["id"])
  93. }
  94. log.Info("查询最后id---", zap.Int("finally id: ", finalId))
  95. lastid, count := 0, 0
  96. for {
  97. log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
  98. q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_bpmc_relation", lastid)
  99. //q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id=61771536 ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation")
  100. //q := fmt.Sprintf("SELECT id, name, name_id FROM %s WHERE id>%d ORDER BY id ASC limit 1000000", "dws_f_ent_baseinfo", lastid)
  101. rows, err := MysqlTool.DB.Query(q)
  102. if err != nil {
  103. log.Error("mysql query err ", zap.Error(err))
  104. }
  105. columns, err := rows.Columns()
  106. if finalId == lastid {
  107. log.Info("----finish-----", zap.Int("count: ", count))
  108. break
  109. }
  110. for rows.Next() {
  111. scanArgs := make([]interface{}, len(columns))
  112. values := make([]interface{}, len(columns))
  113. ret := make(map[string]interface{})
  114. for k := range values {
  115. scanArgs[k] = &values[k]
  116. }
  117. err = rows.Scan(scanArgs...)
  118. if err != nil {
  119. log.Error("mysql scan err ", zap.Error(err))
  120. break
  121. }
  122. for i, col := range values {
  123. if col == nil {
  124. ret[columns[i]] = nil
  125. } else {
  126. switch val := (*scanArgs[i].(*interface{})).(type) {
  127. case byte:
  128. ret[columns[i]] = val
  129. break
  130. case []byte:
  131. v := string(val)
  132. switch v {
  133. case "\x00": // 处理数据类型为bit的情况
  134. ret[columns[i]] = 0
  135. case "\x01": // 处理数据类型为bit的情况
  136. ret[columns[i]] = 1
  137. default:
  138. ret[columns[i]] = v
  139. break
  140. }
  141. break
  142. case time.Time:
  143. if val.IsZero() {
  144. ret[columns[i]] = nil
  145. } else {
  146. ret[columns[i]] = val.Format("2006-01-02 15:04:05")
  147. }
  148. break
  149. default:
  150. ret[columns[i]] = val
  151. }
  152. }
  153. }
  154. lastid = util.IntAll(ret["id"])
  155. count++
  156. if count%20000 == 0 {
  157. log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
  158. }
  159. pool <- true
  160. wg.Add(1)
  161. go func(tmp map[string]interface{}) {
  162. defer func() {
  163. <-pool
  164. wg.Done()
  165. }()
  166. //cid := util.Int64All(tmp["id"])
  167. //iid := util.ObjToString(tmp["infoid"])
  168. //name_id := util.ObjToString(tmp["name_id"])
  169. //identity_type := util.Int64All(tmp["identity_type+0"])
  170. //if name_id != "" {
  171. // coll := "bidding"
  172. // if iid > "5a862e7040d2d9bbe88e3b1f" {
  173. // coll = "bidding"
  174. // } else {
  175. // coll = "bidding_back"
  176. // }
  177. // info, _ := MongoB.FindById(coll, iid, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1})
  178. // if len(*info) > 0 {
  179. // if identity_type == 1 {
  180. // if util.ObjToString((*info)["buyertel"]) != "" {
  181. // q := make(map[string]interface{})
  182. // q["name_id"] = name_id
  183. // q["identity_type"] = identity_type
  184. // q["contact_tel"] = util.ObjToString((*info)["buyertel"])
  185. // if util.ObjToString((*info)["buyerperson"]) != "" {
  186. // q["contact_name"] = util.ObjToString((*info)["buyerperson"])
  187. // }
  188. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  189. // if cinfo != nil && len(*cinfo) > 0 {
  190. // MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  191. // }
  192. // }
  193. // } else if identity_type == 2 {
  194. // if util.ObjToString((*info)["winnertel"]) != "" {
  195. // q := make(map[string]interface{})
  196. // q["name_id"] = name_id
  197. // q["identity_type"] = identity_type
  198. // q["contact_tel"] = util.ObjToString((*info)["winnertel"])
  199. // if util.ObjToString((*info)["winnerperson"]) != "" {
  200. // q["contact_name"] = util.ObjToString((*info)["winnerperson"])
  201. // }
  202. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  203. // if cinfo != nil && len(*cinfo) > 0 {
  204. // MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  205. // }
  206. // }
  207. // } else if identity_type == 4 {
  208. // if util.ObjToString((*info)["agencytel"]) != "" {
  209. // q := make(map[string]interface{})
  210. // q["name_id"] = name_id
  211. // q["identity_type"] = identity_type
  212. // q["contact_tel"] = util.ObjToString((*info)["agencytel"])
  213. // if util.ObjToString((*info)["agencyperson"]) != "" {
  214. // q["contact_name"] = util.ObjToString((*info)["agencyperson"])
  215. // }
  216. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  217. // if cinfo != nil && len(*cinfo) > 0 {
  218. // MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  219. // }
  220. // }
  221. // }
  222. // }
  223. //}
  224. //redis.PutCKV("qyxy_id", util.ObjToString(tmp["name"]), util.ObjToString(tmp["name_id"]))
  225. saveM := make(map[string]interface{})
  226. if util.ObjToString(tmp["name_id"]) != "" {
  227. saveM["name_id"] = util.ObjToString(tmp["name_id"])
  228. } else {
  229. return
  230. }
  231. if util.ObjToString(tmp["contact_id"]) != "" {
  232. saveM["contact_id"] = util.IntAll(tmp["contact_id"])
  233. } else {
  234. return
  235. }
  236. saveM["projectid"] = util.ObjToString(tmp["projectid"])
  237. saveM["infoid"] = util.ObjToString(tmp["infoid"])
  238. saveM["identity_type"] = tmp["identity_type"]
  239. saveRelationPool <- saveM
  240. }(ret)
  241. ret = make(map[string]interface{})
  242. }
  243. _ = rows.Close()
  244. wg.Wait()
  245. }
  246. }
  247. func taskMgo() {
  248. sess := MongoP.GetMgoConn()
  249. defer MongoP.DestoryMongoConn(sess)
  250. ch := make(chan bool, 3)
  251. wg := &sync.WaitGroup{}
  252. q := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId("63411488911e1eb3459fb87e")}}
  253. field := map[string]interface{}{"ids": 1}
  254. query := sess.DB("qfw").C("projectset_20220721").Find(q).Select(field).Iter()
  255. count := 0
  256. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  257. if count%20000 == 0 {
  258. util.Debug("current ---", count, tmp["_id"])
  259. }
  260. ch <- true
  261. wg.Add(1)
  262. go func(tmp map[string]interface{}) {
  263. defer func() {
  264. <-ch
  265. wg.Done()
  266. }()
  267. id := mongodb.BsonIdToSId(tmp["_id"])
  268. for _, i := range util.ObjArrToStringArr(tmp["ids"].([]interface{})) {
  269. redis.PutCKV("s_id", i, id)
  270. }
  271. }(tmp)
  272. tmp = make(map[string]interface{})
  273. }
  274. wg.Wait()
  275. util.Debug("over ---", count)
  276. }
  277. // @Description 标讯数据
  278. // @Author J 2022/9/20 17:52
  279. func bidding() *cobra.Command {
  280. cmdClient := &cobra.Command{
  281. Use: "bidding",
  282. Short: "Start processing bidding data",
  283. Run: func(cmd *cobra.Command, args []string) {
  284. go SaveFunc()
  285. go SaveTagFunc()
  286. go SaveExpandFunc()
  287. go SaveAttrFunc()
  288. go SaveImfFunc()
  289. go SaveIntentFunc()
  290. go SaveWinnerFunc()
  291. go SavePackageFunc()
  292. go SavePurFunc()
  293. go saveErrMethod()
  294. taskB()
  295. },
  296. }
  297. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  298. return cmdClient
  299. }
  300. // @Description 企业联系人关系表
  301. // @Author J 2022/9/20 17:52
  302. func relation() *cobra.Command {
  303. cmdClient := &cobra.Command{
  304. Use: "relation",
  305. Short: "Start processing relation data",
  306. Run: func(cmd *cobra.Command, args []string) {
  307. InitEs()
  308. go SaveRelationFunc()
  309. taskR()
  310. },
  311. }
  312. return cmdClient
  313. }
  314. // @Description 项目数据(目前仅关系表数据)
  315. // @Author J 2022/9/20 17:52
  316. func projectAdd() *cobra.Command {
  317. cmdClient := &cobra.Command{
  318. Use: "project",
  319. Short: "Start processing project data",
  320. Run: func(cmd *cobra.Command, args []string) {
  321. //go SaveProFunc()
  322. //go SaveProTagFunc()
  323. //go SaveProbFunc()
  324. go SaveRelationFunc()
  325. taskPAdd()
  326. crn := cron.New()
  327. cronstr := "0 10 * * * *" // 每30min执行一次
  328. _ = crn.AddFunc(cronstr, func() {
  329. taskPAdd()
  330. })
  331. crn.Start()
  332. },
  333. }
  334. cmdClient.Flags().Int64VarP(&Pici, "pici", "p", 0, "")
  335. return cmdClient
  336. }
  337. func SaveFunc() {
  338. arru := make([]map[string]interface{}, saveSize)
  339. indexu := 0
  340. for {
  341. select {
  342. case v := <-saveBasePool:
  343. arru[indexu] = v
  344. indexu++
  345. if indexu == saveSize {
  346. saveBaseSp <- true
  347. go func(arru []map[string]interface{}) {
  348. defer func() {
  349. <-saveBaseSp
  350. }()
  351. MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
  352. }(arru)
  353. arru = make([]map[string]interface{}, saveSize)
  354. indexu = 0
  355. }
  356. case <-time.After(1000 * time.Millisecond):
  357. if indexu > 0 {
  358. saveBaseSp <- true
  359. go func(arru []map[string]interface{}) {
  360. defer func() {
  361. <-saveBaseSp
  362. }()
  363. MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
  364. }(arru[:indexu])
  365. arru = make([]map[string]interface{}, saveSize)
  366. indexu = 0
  367. }
  368. }
  369. }
  370. }
  371. func SaveExpandFunc() {
  372. arru := make([]map[string]interface{}, saveSize)
  373. indexu := 0
  374. for {
  375. select {
  376. case v := <-saveExpandPool:
  377. arru[indexu] = v
  378. indexu++
  379. if indexu == saveSize {
  380. saveExpandSp <- true
  381. go func(arru []map[string]interface{}) {
  382. defer func() {
  383. <-saveExpandSp
  384. }()
  385. MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
  386. }(arru)
  387. arru = make([]map[string]interface{}, saveSize)
  388. indexu = 0
  389. }
  390. case <-time.After(1000 * time.Millisecond):
  391. if indexu > 0 {
  392. saveExpandSp <- true
  393. go func(arru []map[string]interface{}) {
  394. defer func() {
  395. <-saveExpandSp
  396. }()
  397. MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
  398. }(arru[:indexu])
  399. arru = make([]map[string]interface{}, saveSize)
  400. indexu = 0
  401. }
  402. }
  403. }
  404. }
  405. func SaveTagFunc() {
  406. arru := make([]map[string]interface{}, saveSize)
  407. indexu := 0
  408. for {
  409. select {
  410. case v := <-saveTagPool:
  411. arru[indexu] = v
  412. indexu++
  413. if indexu == saveSize {
  414. saveTagSp <- true
  415. go func(arru []map[string]interface{}) {
  416. defer func() {
  417. <-saveTagSp
  418. }()
  419. MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
  420. }(arru)
  421. arru = make([]map[string]interface{}, saveSize)
  422. indexu = 0
  423. }
  424. case <-time.After(1000 * time.Millisecond):
  425. if indexu > 0 {
  426. saveTagSp <- true
  427. go func(arru []map[string]interface{}) {
  428. defer func() {
  429. <-saveTagSp
  430. }()
  431. MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
  432. }(arru[:indexu])
  433. arru = make([]map[string]interface{}, saveSize)
  434. indexu = 0
  435. }
  436. }
  437. }
  438. }
  439. func SaveAttrFunc() {
  440. arru := make([]map[string]interface{}, saveSize)
  441. indexu := 0
  442. for {
  443. select {
  444. case v := <-saveAttrPool:
  445. arru[indexu] = v
  446. indexu++
  447. if indexu == saveSize {
  448. saveAttrSp <- true
  449. go func(arru []map[string]interface{}) {
  450. defer func() {
  451. <-saveAttrSp
  452. }()
  453. MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
  454. }(arru)
  455. arru = make([]map[string]interface{}, saveSize)
  456. indexu = 0
  457. }
  458. case <-time.After(1000 * time.Millisecond):
  459. if indexu > 0 {
  460. saveAttrSp <- true
  461. go func(arru []map[string]interface{}) {
  462. defer func() {
  463. <-saveAttrSp
  464. }()
  465. MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
  466. }(arru[:indexu])
  467. arru = make([]map[string]interface{}, saveSize)
  468. indexu = 0
  469. }
  470. }
  471. }
  472. }
  473. func SaveImfFunc() {
  474. arru := make([]map[string]interface{}, saveSize)
  475. indexu := 0
  476. for {
  477. select {
  478. case v := <-saveIfmPool:
  479. arru[indexu] = v
  480. indexu++
  481. if indexu == saveSize {
  482. saveIfmSp <- true
  483. go func(arru []map[string]interface{}) {
  484. defer func() {
  485. <-saveIfmSp
  486. }()
  487. MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
  488. }(arru)
  489. arru = make([]map[string]interface{}, saveSize)
  490. indexu = 0
  491. }
  492. case <-time.After(1000 * time.Millisecond):
  493. if indexu > 0 {
  494. saveIfmSp <- true
  495. go func(arru []map[string]interface{}) {
  496. defer func() {
  497. <-saveIfmSp
  498. }()
  499. MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
  500. }(arru[:indexu])
  501. arru = make([]map[string]interface{}, saveSize)
  502. indexu = 0
  503. }
  504. }
  505. }
  506. }
  507. func SavePurFunc() {
  508. arru := make([]map[string]interface{}, saveSize)
  509. indexu := 0
  510. for {
  511. select {
  512. case v := <-savePurPool:
  513. arru[indexu] = v
  514. indexu++
  515. if indexu == saveSize {
  516. savePurSp <- true
  517. go func(arru []map[string]interface{}) {
  518. defer func() {
  519. <-savePurSp
  520. }()
  521. MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
  522. }(arru)
  523. arru = make([]map[string]interface{}, saveSize)
  524. indexu = 0
  525. }
  526. case <-time.After(1000 * time.Millisecond):
  527. if indexu > 0 {
  528. savePurSp <- true
  529. go func(arru []map[string]interface{}) {
  530. defer func() {
  531. <-savePurSp
  532. }()
  533. MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
  534. }(arru[:indexu])
  535. arru = make([]map[string]interface{}, saveSize)
  536. indexu = 0
  537. }
  538. }
  539. }
  540. }
  541. func SaveIntentFunc() {
  542. arru := make([]map[string]interface{}, saveSize)
  543. indexu := 0
  544. for {
  545. select {
  546. case v := <-saveIntentPool:
  547. arru[indexu] = v
  548. indexu++
  549. if indexu == saveSize {
  550. saveIntentSp <- true
  551. go func(arru []map[string]interface{}) {
  552. defer func() {
  553. <-saveIntentSp
  554. }()
  555. MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
  556. }(arru)
  557. arru = make([]map[string]interface{}, saveSize)
  558. indexu = 0
  559. }
  560. case <-time.After(1000 * time.Millisecond):
  561. if indexu > 0 {
  562. saveIntentSp <- true
  563. go func(arru []map[string]interface{}) {
  564. defer func() {
  565. <-saveIntentSp
  566. }()
  567. MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
  568. }(arru[:indexu])
  569. arru = make([]map[string]interface{}, saveSize)
  570. indexu = 0
  571. }
  572. }
  573. }
  574. }
  575. func SaveWinnerFunc() {
  576. arru := make([]map[string]interface{}, saveSize)
  577. indexu := 0
  578. for {
  579. select {
  580. case v := <-saveWinnerPool:
  581. arru[indexu] = v
  582. indexu++
  583. if indexu == saveSize {
  584. saveWinnerSp <- true
  585. go func(arru []map[string]interface{}) {
  586. defer func() {
  587. <-saveWinnerSp
  588. }()
  589. MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
  590. }(arru)
  591. arru = make([]map[string]interface{}, saveSize)
  592. indexu = 0
  593. }
  594. case <-time.After(1000 * time.Millisecond):
  595. if indexu > 0 {
  596. saveWinnerSp <- true
  597. go func(arru []map[string]interface{}) {
  598. defer func() {
  599. <-saveWinnerSp
  600. }()
  601. MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
  602. }(arru[:indexu])
  603. arru = make([]map[string]interface{}, saveSize)
  604. indexu = 0
  605. }
  606. }
  607. }
  608. }
  609. func SavePackageFunc() {
  610. arru := make([]map[string]interface{}, saveSize)
  611. indexu := 0
  612. for {
  613. select {
  614. case v := <-savePkgPool:
  615. arru[indexu] = v
  616. indexu++
  617. if indexu == saveSize {
  618. savePkgSp <- true
  619. go func(arru []map[string]interface{}) {
  620. defer func() {
  621. <-savePkgSp
  622. }()
  623. MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
  624. }(arru)
  625. arru = make([]map[string]interface{}, saveSize)
  626. indexu = 0
  627. }
  628. case <-time.After(1000 * time.Millisecond):
  629. if indexu > 0 {
  630. savePkgSp <- true
  631. go func(arru []map[string]interface{}) {
  632. defer func() {
  633. <-savePkgSp
  634. }()
  635. MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
  636. }(arru[:indexu])
  637. arru = make([]map[string]interface{}, saveSize)
  638. indexu = 0
  639. }
  640. }
  641. }
  642. }
  643. func SaveProFunc() {
  644. arru := make([]map[string]interface{}, saveSize)
  645. indexu := 0
  646. for {
  647. select {
  648. case v := <-saveProPool:
  649. arru[indexu] = v
  650. indexu++
  651. if indexu == saveSize {
  652. saveProSp <- true
  653. go func(arru []map[string]interface{}) {
  654. defer func() {
  655. <-saveProSp
  656. }()
  657. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  658. }(arru)
  659. arru = make([]map[string]interface{}, saveSize)
  660. indexu = 0
  661. }
  662. case <-time.After(1000 * time.Millisecond):
  663. if indexu > 0 {
  664. saveProSp <- true
  665. go func(arru []map[string]interface{}) {
  666. defer func() {
  667. <-saveProSp
  668. }()
  669. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  670. }(arru[:indexu])
  671. arru = make([]map[string]interface{}, saveSize)
  672. indexu = 0
  673. }
  674. }
  675. }
  676. }
  677. func SaveProbFunc() {
  678. arru := make([]map[string]interface{}, saveSize)
  679. indexu := 0
  680. for {
  681. select {
  682. case v := <-saveProbPool:
  683. arru[indexu] = v
  684. indexu++
  685. if indexu == saveSize {
  686. saveProbSp <- true
  687. go func(arru []map[string]interface{}) {
  688. defer func() {
  689. <-saveProbSp
  690. }()
  691. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  692. }(arru)
  693. arru = make([]map[string]interface{}, saveSize)
  694. indexu = 0
  695. }
  696. case <-time.After(1000 * time.Millisecond):
  697. if indexu > 0 {
  698. saveProbSp <- true
  699. go func(arru []map[string]interface{}) {
  700. defer func() {
  701. <-saveProbSp
  702. }()
  703. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  704. }(arru[:indexu])
  705. arru = make([]map[string]interface{}, saveSize)
  706. indexu = 0
  707. }
  708. }
  709. }
  710. }
  711. func SaveProTagFunc() {
  712. arru := make([]map[string]interface{}, saveSize)
  713. indexu := 0
  714. for {
  715. select {
  716. case v := <-saveProTagPool:
  717. arru[indexu] = v
  718. indexu++
  719. if indexu == saveSize {
  720. saveProTagSp <- true
  721. go func(arru []map[string]interface{}) {
  722. defer func() {
  723. <-saveProTagSp
  724. }()
  725. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  726. }(arru)
  727. arru = make([]map[string]interface{}, saveSize)
  728. indexu = 0
  729. }
  730. case <-time.After(1000 * time.Millisecond):
  731. if indexu > 0 {
  732. saveProTagSp <- true
  733. go func(arru []map[string]interface{}) {
  734. defer func() {
  735. <-saveProTagSp
  736. }()
  737. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  738. }(arru[:indexu])
  739. arru = make([]map[string]interface{}, saveSize)
  740. indexu = 0
  741. }
  742. }
  743. }
  744. }
  745. func SaveRelationFunc() {
  746. arru := make([]map[string]interface{}, saveSize)
  747. indexu := 0
  748. for {
  749. select {
  750. case v := <-saveRelationPool:
  751. arru[indexu] = v
  752. indexu++
  753. if indexu == saveSize {
  754. saveRelationSp <- true
  755. go func(arru []map[string]interface{}) {
  756. defer func() {
  757. <-saveRelationSp
  758. }()
  759. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  760. }(arru)
  761. arru = make([]map[string]interface{}, saveSize)
  762. indexu = 0
  763. }
  764. case <-time.After(1000 * time.Millisecond):
  765. if indexu > 0 {
  766. saveRelationSp <- true
  767. go func(arru []map[string]interface{}) {
  768. defer func() {
  769. <-saveRelationSp
  770. }()
  771. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  772. }(arru[:indexu])
  773. arru = make([]map[string]interface{}, saveSize)
  774. indexu = 0
  775. }
  776. }
  777. }
  778. }
  779. // 字段错误数据
  780. func saveErrMethod() {
  781. arru := make([]map[string]interface{}, 200)
  782. indexu := 0
  783. for {
  784. select {
  785. case v := <-saveErrPool:
  786. arru[indexu] = v
  787. indexu++
  788. if indexu == saveSize {
  789. saveErrSp <- true
  790. go func(arru []map[string]interface{}) {
  791. defer func() {
  792. <-saveErrSp
  793. }()
  794. MongoB.SaveBulk("bidding_tidb_f_err", arru...)
  795. }(arru)
  796. arru = make([]map[string]interface{}, saveSize)
  797. indexu = 0
  798. }
  799. case <-time.After(1000 * time.Millisecond):
  800. if indexu > 0 {
  801. saveErrSp <- true
  802. go func(arru []map[string]interface{}) {
  803. defer func() {
  804. <-saveErrSp
  805. }()
  806. MongoB.SaveBulk("bidding_tidb_f_err", arru...)
  807. }(arru[:indexu])
  808. arru = make([]map[string]interface{}, saveSize)
  809. indexu = 0
  810. }
  811. }
  812. }
  813. }