main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "app.yhyue.com/data_processing/common_utils/udp"
  8. "data_tidb/config"
  9. "encoding/json"
  10. "fmt"
  11. "github.com/spf13/cobra"
  12. "go.mongodb.org/mongo-driver/bson"
  13. "go.uber.org/zap"
  14. "net"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. UdpClient udp.UdpClient
  20. )
  21. func init() {
  22. config.Init("./common.toml")
  23. InitLog()
  24. InitMgo()
  25. InitMysql()
  26. InitField()
  27. //redis.InitRedis1("qyxy_id=127.0.0.1:4379", 1)
  28. //redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1)
  29. log.Info("init success")
  30. }
  31. func main() {
  32. //go SaveFunc()
  33. //go SaveTagFunc()
  34. //go saveErrMethod()
  35. rootCmd := &cobra.Command{Use: "my cmd"}
  36. rootCmd.AddCommand(bidding())
  37. rootCmd.AddCommand(project())
  38. if err := rootCmd.Execute(); err != nil {
  39. fmt.Println("rootCmd.Execute failed", err.Error())
  40. }
  41. //UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  42. //UdpClient.Listen(processUdpMsg)
  43. //log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  44. c := make(chan bool, 1)
  45. <-c
  46. }
  47. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  48. defer util.Catch()
  49. switch act {
  50. case udp.OP_TYPE_DATA: //上个节点的数据
  51. var mapInfo map[string]interface{}
  52. err := json.Unmarshal(data, &mapInfo)
  53. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  54. gtid, _ := mapInfo["gtid"].(string)
  55. lteid, _ := mapInfo["lteid"].(string)
  56. if err != nil || gtid == "" || lteid == "" {
  57. UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写
  58. } else {
  59. //udp成功回写
  60. if k := util.ObjToString(mapInfo["key"]); k != "" {
  61. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  62. } else {
  63. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  64. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  65. }
  66. log.Info("start sync ...")
  67. doBiddingTask(gtid, lteid, mapInfo)
  68. }
  69. }
  70. }
  71. func taskMysql() {
  72. pool := make(chan bool, 5) //控制线程数
  73. wg := &sync.WaitGroup{}
  74. finalId := 0
  75. //lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo"))
  76. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation"))
  77. if len(*lastInfo) > 0 {
  78. finalId = util.IntAll((*lastInfo)[0]["id"])
  79. }
  80. log.Info("查询最后id---", zap.Int("finally id: ", finalId))
  81. lastid, count := 0, 0
  82. for {
  83. log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
  84. q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation", lastid)
  85. //q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id=61771536 ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation")
  86. //q := fmt.Sprintf("SELECT id, name, name_id FROM %s WHERE id>%d ORDER BY id ASC limit 1000000", "dws_f_ent_baseinfo", lastid)
  87. rows, err := MysqlTool.DB.Query(q)
  88. if err != nil {
  89. log.Error("mysql query err ", zap.Error(err))
  90. }
  91. columns, err := rows.Columns()
  92. if finalId == lastid {
  93. log.Info("----finish-----", zap.Int("count: ", count))
  94. break
  95. }
  96. for rows.Next() {
  97. scanArgs := make([]interface{}, len(columns))
  98. values := make([]interface{}, len(columns))
  99. ret := make(map[string]interface{})
  100. for k := range values {
  101. scanArgs[k] = &values[k]
  102. }
  103. err = rows.Scan(scanArgs...)
  104. if err != nil {
  105. log.Error("mysql scan err ", zap.Error(err))
  106. break
  107. }
  108. for i, col := range values {
  109. if col == nil {
  110. ret[columns[i]] = nil
  111. } else {
  112. switch val := (*scanArgs[i].(*interface{})).(type) {
  113. case byte:
  114. ret[columns[i]] = val
  115. break
  116. case []byte:
  117. v := string(val)
  118. switch v {
  119. case "\x00": // 处理数据类型为bit的情况
  120. ret[columns[i]] = 0
  121. case "\x01": // 处理数据类型为bit的情况
  122. ret[columns[i]] = 1
  123. default:
  124. ret[columns[i]] = v
  125. break
  126. }
  127. break
  128. case time.Time:
  129. if val.IsZero() {
  130. ret[columns[i]] = nil
  131. } else {
  132. ret[columns[i]] = val.Format("2006-01-02 15:04:05")
  133. }
  134. break
  135. default:
  136. ret[columns[i]] = val
  137. }
  138. }
  139. }
  140. lastid = util.IntAll(ret["id"])
  141. count++
  142. if count%20000 == 0 {
  143. log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
  144. }
  145. pool <- true
  146. wg.Add(1)
  147. go func(tmp map[string]interface{}) {
  148. defer func() {
  149. <-pool
  150. wg.Done()
  151. }()
  152. cid := util.Int64All(tmp["id"])
  153. pid := util.ObjToString(tmp["projectid"])
  154. name_id := util.ObjToString(tmp["name_id"])
  155. identity_type := util.Int64All(tmp["identity_type+0"])
  156. if name_id != "" {
  157. pinfo, _ := MongoP.FindById("projectset", pid, bson.M{"ids": 1})
  158. if len(*pinfo) > 0 {
  159. for _, id := range util.ObjArrToStringArr((*pinfo)["ids"].([]interface{})) {
  160. coll := "bidding"
  161. //if id > "5a862e7040d2d9bbe88e3b1f" {
  162. // coll = "bidding"
  163. //} else {
  164. // coll = "bidding_back"
  165. //}
  166. info, _ := MongoB.FindById(coll, id, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1})
  167. if len(*info) > 0 {
  168. if identity_type == 1 {
  169. if util.ObjToString((*info)["buyertel"]) != "" && util.ObjToString((*info)["buyerperson"]) != "" {
  170. q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["buyerperson"]), "contact_tel": util.ObjToString((*info)["buyertel"])}
  171. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  172. if cinfo != nil && len(*cinfo) > 0 {
  173. MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  174. break
  175. }
  176. }
  177. } else if identity_type == 2 {
  178. if util.ObjToString((*info)["winnertel"]) != "" && util.ObjToString((*info)["winnerperson"]) != "" {
  179. if util.ObjToString((*info)["winnertel"]) != "" && util.ObjToString((*info)["winnerperson"]) != "" {
  180. q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["winnerperson"]), "contact_tel": util.ObjToString((*info)["winnertel"])}
  181. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  182. if cinfo != nil && len(*cinfo) > 0 {
  183. MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  184. break
  185. }
  186. }
  187. }
  188. } else if identity_type == 4 {
  189. if util.ObjToString((*info)["agencytel"]) != "" && util.ObjToString((*info)["agencyperson"]) != "" {
  190. if util.ObjToString((*info)["agencytel"]) != "" && util.ObjToString((*info)["agencyperson"]) != "" {
  191. q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["agencyperson"]), "contact_tel": util.ObjToString((*info)["agencytel"])}
  192. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  193. if cinfo != nil && len(*cinfo) > 0 {
  194. MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  195. break
  196. }
  197. }
  198. }
  199. }
  200. }
  201. }
  202. }
  203. }
  204. //redis.PutCKV("qyxy_id", util.ObjToString(tmp["name"]), util.ObjToString(tmp["name_id"]))
  205. }(ret)
  206. ret = make(map[string]interface{})
  207. }
  208. _ = rows.Close()
  209. wg.Wait()
  210. }
  211. }
  212. func taskMgo() {
  213. sess := MongoP.GetMgoConn()
  214. defer MongoP.DestoryMongoConn(sess)
  215. ch := make(chan bool, 3)
  216. wg := &sync.WaitGroup{}
  217. q := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId("63411488911e1eb3459fb87e")}}
  218. field := map[string]interface{}{"ids": 1}
  219. query := sess.DB("qfw").C("projectset_20220721").Find(q).Select(field).Iter()
  220. count := 0
  221. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  222. if count%20000 == 0 {
  223. util.Debug("current ---", count, tmp["_id"])
  224. }
  225. ch <- true
  226. wg.Add(1)
  227. go func(tmp map[string]interface{}) {
  228. defer func() {
  229. <-ch
  230. wg.Done()
  231. }()
  232. id := mongodb.BsonIdToSId(tmp["_id"])
  233. for _, i := range util.ObjArrToStringArr(tmp["ids"].([]interface{})) {
  234. redis.PutCKV("s_id", i, id)
  235. }
  236. }(tmp)
  237. tmp = make(map[string]interface{})
  238. }
  239. wg.Wait()
  240. util.Debug("over ---", count)
  241. }
  242. // @Description 标讯数据
  243. // @Author J 2022/9/20 17:52
  244. func bidding() *cobra.Command {
  245. cmdClient := &cobra.Command{
  246. Use: "bidding",
  247. Short: "Start processing bidding data",
  248. Run: func(cmd *cobra.Command, args []string) {
  249. go SaveFunc()
  250. go SaveTagFunc()
  251. //go SaveExpandFunc()
  252. //go SaveAttrFunc()
  253. //go SaveImfFunc()
  254. //go SaveIntentFunc()
  255. //go SaveWinnerFunc()
  256. //go SavePackageFunc()
  257. //go SavePurFunc()
  258. go saveErrMethod()
  259. taskB()
  260. },
  261. }
  262. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  263. return cmdClient
  264. }
  265. // @Description 项目数据
  266. // @Author J 2022/9/20 17:52
  267. func project() *cobra.Command {
  268. cmdClient := &cobra.Command{
  269. Use: "project",
  270. Short: "Start processing project data",
  271. Run: func(cmd *cobra.Command, args []string) {
  272. go SaveProFunc()
  273. go SaveProTagFunc()
  274. go SaveProbFunc()
  275. go SaveRelationFunc()
  276. taskP()
  277. },
  278. }
  279. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  280. return cmdClient
  281. }
  282. func SaveFunc() {
  283. arru := make([]map[string]interface{}, saveSize)
  284. indexu := 0
  285. for {
  286. select {
  287. case v := <-saveBasePool:
  288. arru[indexu] = v
  289. indexu++
  290. if indexu == saveSize {
  291. saveBaseSp <- true
  292. go func(arru []map[string]interface{}) {
  293. defer func() {
  294. <-saveBaseSp
  295. }()
  296. MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
  297. }(arru)
  298. arru = make([]map[string]interface{}, saveSize)
  299. indexu = 0
  300. }
  301. case <-time.After(1000 * time.Millisecond):
  302. if indexu > 0 {
  303. saveBaseSp <- true
  304. go func(arru []map[string]interface{}) {
  305. defer func() {
  306. <-saveBaseSp
  307. }()
  308. MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
  309. }(arru[:indexu])
  310. arru = make([]map[string]interface{}, saveSize)
  311. indexu = 0
  312. }
  313. }
  314. }
  315. }
  316. func SaveExpandFunc() {
  317. arru := make([]map[string]interface{}, saveSize)
  318. indexu := 0
  319. for {
  320. select {
  321. case v := <-saveExpandPool:
  322. arru[indexu] = v
  323. indexu++
  324. if indexu == saveSize {
  325. saveExpandSp <- true
  326. go func(arru []map[string]interface{}) {
  327. defer func() {
  328. <-saveExpandSp
  329. }()
  330. MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
  331. }(arru)
  332. arru = make([]map[string]interface{}, saveSize)
  333. indexu = 0
  334. }
  335. case <-time.After(1000 * time.Millisecond):
  336. if indexu > 0 {
  337. saveExpandSp <- true
  338. go func(arru []map[string]interface{}) {
  339. defer func() {
  340. <-saveExpandSp
  341. }()
  342. MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
  343. }(arru[:indexu])
  344. arru = make([]map[string]interface{}, saveSize)
  345. indexu = 0
  346. }
  347. }
  348. }
  349. }
  350. func SaveTagFunc() {
  351. arru := make([]map[string]interface{}, saveSize)
  352. indexu := 0
  353. for {
  354. select {
  355. case v := <-saveTagPool:
  356. arru[indexu] = v
  357. indexu++
  358. if indexu == saveSize {
  359. saveTagSp <- true
  360. go func(arru []map[string]interface{}) {
  361. defer func() {
  362. <-saveTagSp
  363. }()
  364. MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
  365. }(arru)
  366. arru = make([]map[string]interface{}, saveSize)
  367. indexu = 0
  368. }
  369. case <-time.After(1000 * time.Millisecond):
  370. if indexu > 0 {
  371. saveTagSp <- true
  372. go func(arru []map[string]interface{}) {
  373. defer func() {
  374. <-saveTagSp
  375. }()
  376. MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
  377. }(arru[:indexu])
  378. arru = make([]map[string]interface{}, saveSize)
  379. indexu = 0
  380. }
  381. }
  382. }
  383. }
  384. func SaveAttrFunc() {
  385. arru := make([]map[string]interface{}, saveSize)
  386. indexu := 0
  387. for {
  388. select {
  389. case v := <-saveAttrPool:
  390. arru[indexu] = v
  391. indexu++
  392. if indexu == saveSize {
  393. saveAttrSp <- true
  394. go func(arru []map[string]interface{}) {
  395. defer func() {
  396. <-saveAttrSp
  397. }()
  398. MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
  399. }(arru)
  400. arru = make([]map[string]interface{}, saveSize)
  401. indexu = 0
  402. }
  403. case <-time.After(1000 * time.Millisecond):
  404. if indexu > 0 {
  405. saveAttrSp <- true
  406. go func(arru []map[string]interface{}) {
  407. defer func() {
  408. <-saveAttrSp
  409. }()
  410. MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
  411. }(arru[:indexu])
  412. arru = make([]map[string]interface{}, saveSize)
  413. indexu = 0
  414. }
  415. }
  416. }
  417. }
  418. func SaveImfFunc() {
  419. arru := make([]map[string]interface{}, saveSize)
  420. indexu := 0
  421. for {
  422. select {
  423. case v := <-saveIfmPool:
  424. arru[indexu] = v
  425. indexu++
  426. if indexu == saveSize {
  427. saveIfmSp <- true
  428. go func(arru []map[string]interface{}) {
  429. defer func() {
  430. <-saveIfmSp
  431. }()
  432. MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
  433. }(arru)
  434. arru = make([]map[string]interface{}, saveSize)
  435. indexu = 0
  436. }
  437. case <-time.After(1000 * time.Millisecond):
  438. if indexu > 0 {
  439. saveIfmSp <- true
  440. go func(arru []map[string]interface{}) {
  441. defer func() {
  442. <-saveIfmSp
  443. }()
  444. MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
  445. }(arru[:indexu])
  446. arru = make([]map[string]interface{}, saveSize)
  447. indexu = 0
  448. }
  449. }
  450. }
  451. }
  452. func SavePurFunc() {
  453. arru := make([]map[string]interface{}, saveSize)
  454. indexu := 0
  455. for {
  456. select {
  457. case v := <-savePurPool:
  458. arru[indexu] = v
  459. indexu++
  460. if indexu == saveSize {
  461. savePurSp <- true
  462. go func(arru []map[string]interface{}) {
  463. defer func() {
  464. <-savePurSp
  465. }()
  466. MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
  467. }(arru)
  468. arru = make([]map[string]interface{}, saveSize)
  469. indexu = 0
  470. }
  471. case <-time.After(1000 * time.Millisecond):
  472. if indexu > 0 {
  473. savePurSp <- true
  474. go func(arru []map[string]interface{}) {
  475. defer func() {
  476. <-savePurSp
  477. }()
  478. MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
  479. }(arru[:indexu])
  480. arru = make([]map[string]interface{}, saveSize)
  481. indexu = 0
  482. }
  483. }
  484. }
  485. }
  486. func SaveIntentFunc() {
  487. arru := make([]map[string]interface{}, saveSize)
  488. indexu := 0
  489. for {
  490. select {
  491. case v := <-saveIntentPool:
  492. arru[indexu] = v
  493. indexu++
  494. if indexu == saveSize {
  495. saveIntentSp <- true
  496. go func(arru []map[string]interface{}) {
  497. defer func() {
  498. <-saveIntentSp
  499. }()
  500. MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
  501. }(arru)
  502. arru = make([]map[string]interface{}, saveSize)
  503. indexu = 0
  504. }
  505. case <-time.After(1000 * time.Millisecond):
  506. if indexu > 0 {
  507. saveIntentSp <- true
  508. go func(arru []map[string]interface{}) {
  509. defer func() {
  510. <-saveIntentSp
  511. }()
  512. MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
  513. }(arru[:indexu])
  514. arru = make([]map[string]interface{}, saveSize)
  515. indexu = 0
  516. }
  517. }
  518. }
  519. }
  520. func SaveWinnerFunc() {
  521. arru := make([]map[string]interface{}, saveSize)
  522. indexu := 0
  523. for {
  524. select {
  525. case v := <-saveWinnerPool:
  526. arru[indexu] = v
  527. indexu++
  528. if indexu == saveSize {
  529. saveWinnerSp <- true
  530. go func(arru []map[string]interface{}) {
  531. defer func() {
  532. <-saveWinnerSp
  533. }()
  534. MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
  535. }(arru)
  536. arru = make([]map[string]interface{}, saveSize)
  537. indexu = 0
  538. }
  539. case <-time.After(1000 * time.Millisecond):
  540. if indexu > 0 {
  541. saveWinnerSp <- true
  542. go func(arru []map[string]interface{}) {
  543. defer func() {
  544. <-saveWinnerSp
  545. }()
  546. MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
  547. }(arru[:indexu])
  548. arru = make([]map[string]interface{}, saveSize)
  549. indexu = 0
  550. }
  551. }
  552. }
  553. }
  554. func SavePackageFunc() {
  555. arru := make([]map[string]interface{}, saveSize)
  556. indexu := 0
  557. for {
  558. select {
  559. case v := <-savePkgPool:
  560. arru[indexu] = v
  561. indexu++
  562. if indexu == saveSize {
  563. savePkgSp <- true
  564. go func(arru []map[string]interface{}) {
  565. defer func() {
  566. <-savePkgSp
  567. }()
  568. MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
  569. }(arru)
  570. arru = make([]map[string]interface{}, saveSize)
  571. indexu = 0
  572. }
  573. case <-time.After(1000 * time.Millisecond):
  574. if indexu > 0 {
  575. savePkgSp <- true
  576. go func(arru []map[string]interface{}) {
  577. defer func() {
  578. <-savePkgSp
  579. }()
  580. MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
  581. }(arru[:indexu])
  582. arru = make([]map[string]interface{}, saveSize)
  583. indexu = 0
  584. }
  585. }
  586. }
  587. }
  588. func SaveProFunc() {
  589. arru := make([]map[string]interface{}, saveSize)
  590. indexu := 0
  591. for {
  592. select {
  593. case v := <-saveProPool:
  594. arru[indexu] = v
  595. indexu++
  596. if indexu == saveSize {
  597. saveProSp <- true
  598. go func(arru []map[string]interface{}) {
  599. defer func() {
  600. <-saveProSp
  601. }()
  602. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  603. }(arru)
  604. arru = make([]map[string]interface{}, saveSize)
  605. indexu = 0
  606. }
  607. case <-time.After(1000 * time.Millisecond):
  608. if indexu > 0 {
  609. saveProSp <- true
  610. go func(arru []map[string]interface{}) {
  611. defer func() {
  612. <-saveProSp
  613. }()
  614. MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
  615. }(arru[:indexu])
  616. arru = make([]map[string]interface{}, saveSize)
  617. indexu = 0
  618. }
  619. }
  620. }
  621. }
  622. func SaveProbFunc() {
  623. arru := make([]map[string]interface{}, saveSize)
  624. indexu := 0
  625. for {
  626. select {
  627. case v := <-saveProbPool:
  628. arru[indexu] = v
  629. indexu++
  630. if indexu == saveSize {
  631. saveProbSp <- true
  632. go func(arru []map[string]interface{}) {
  633. defer func() {
  634. <-saveProbSp
  635. }()
  636. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  637. }(arru)
  638. arru = make([]map[string]interface{}, saveSize)
  639. indexu = 0
  640. }
  641. case <-time.After(1000 * time.Millisecond):
  642. if indexu > 0 {
  643. saveProbSp <- true
  644. go func(arru []map[string]interface{}) {
  645. defer func() {
  646. <-saveProbSp
  647. }()
  648. MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
  649. }(arru[:indexu])
  650. arru = make([]map[string]interface{}, saveSize)
  651. indexu = 0
  652. }
  653. }
  654. }
  655. }
  656. func SaveProTagFunc() {
  657. arru := make([]map[string]interface{}, saveSize)
  658. indexu := 0
  659. for {
  660. select {
  661. case v := <-saveProTagPool:
  662. arru[indexu] = v
  663. indexu++
  664. if indexu == saveSize {
  665. saveProTagSp <- true
  666. go func(arru []map[string]interface{}) {
  667. defer func() {
  668. <-saveProTagSp
  669. }()
  670. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  671. }(arru)
  672. arru = make([]map[string]interface{}, saveSize)
  673. indexu = 0
  674. }
  675. case <-time.After(1000 * time.Millisecond):
  676. if indexu > 0 {
  677. saveProTagSp <- true
  678. go func(arru []map[string]interface{}) {
  679. defer func() {
  680. <-saveProTagSp
  681. }()
  682. MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
  683. }(arru[:indexu])
  684. arru = make([]map[string]interface{}, saveSize)
  685. indexu = 0
  686. }
  687. }
  688. }
  689. }
  690. func SaveRelationFunc() {
  691. arru := make([]map[string]interface{}, saveSize)
  692. indexu := 0
  693. for {
  694. select {
  695. case v := <-saveRelationPool:
  696. arru[indexu] = v
  697. indexu++
  698. if indexu == saveSize {
  699. saveRelationSp <- true
  700. go func(arru []map[string]interface{}) {
  701. defer func() {
  702. <-saveRelationSp
  703. }()
  704. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  705. }(arru)
  706. arru = make([]map[string]interface{}, saveSize)
  707. indexu = 0
  708. }
  709. case <-time.After(1000 * time.Millisecond):
  710. if indexu > 0 {
  711. saveRelationSp <- true
  712. go func(arru []map[string]interface{}) {
  713. defer func() {
  714. <-saveRelationSp
  715. }()
  716. MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
  717. }(arru[:indexu])
  718. arru = make([]map[string]interface{}, saveSize)
  719. indexu = 0
  720. }
  721. }
  722. }
  723. }
  724. // 字段错误数据
  725. func saveErrMethod() {
  726. arru := make([]map[string]interface{}, 200)
  727. indexu := 0
  728. for {
  729. select {
  730. case v := <-saveErrPool:
  731. arru[indexu] = v
  732. indexu++
  733. if indexu == saveSize {
  734. saveErrSp <- true
  735. go func(arru []map[string]interface{}) {
  736. defer func() {
  737. <-saveErrSp
  738. }()
  739. MongoB.SaveBulk("bidding_tidb_f_err", arru...)
  740. }(arru)
  741. arru = make([]map[string]interface{}, saveSize)
  742. indexu = 0
  743. }
  744. case <-time.After(1000 * time.Millisecond):
  745. if indexu > 0 {
  746. saveErrSp <- true
  747. go func(arru []map[string]interface{}) {
  748. defer func() {
  749. <-saveErrSp
  750. }()
  751. MongoB.SaveBulk("bidding_tidb_f_err", arru...)
  752. }(arru[:indexu])
  753. arru = make([]map[string]interface{}, saveSize)
  754. indexu = 0
  755. }
  756. }
  757. }
  758. }