123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- package main
- import (
- "data_project_information/config"
- "fmt"
- "github.com/robfig/cron/v3"
- "github.com/spf13/cobra"
- "time"
- )
- func init() {
- config.Init("./common.toml")
- InitLog()
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- saveSize = 200
- savePool = make(chan map[string]interface{}, 5000)
- saveSp = make(chan bool, 3)
- }
- func main() {
- go SaveFunc()
- rootCmd := &cobra.Command{Use: "my cmd"}
- rootCmd.AddCommand(project())
- rootCmd.AddCommand(projectAdd())
- rootCmd.AddCommand(tidb())
- if err := rootCmd.Execute(); err != nil {
- fmt.Println("rootCmd.Execute failed", err.Error())
- }
- c := make(chan bool, 1)
- <-c
- }
- func project() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "project",
- Short: "Start processing project data",
- Run: func(cmd *cobra.Command, args []string) {
- InitMgo()
- InitEs()
- go updateFuc()
- task()
- },
- }
- return cmdClient
- }
- func projectAdd() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "project-add",
- Short: "Start processing project data",
- Run: func(cmd *cobra.Command, args []string) {
- InitMgo()
- InitEs()
- go updateFuc()
- taskAdd()
- crn := cron.New()
- _, _ = crn.AddFunc("10 * * * *", func() {
- taskAdd()
- })
- crn.Start()
- ch := make(chan bool, 1)
- <-ch
- },
- }
- return cmdClient
- }
- func tidb() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "tidb",
- Short: "Start processing tidb data",
- Run: func(cmd *cobra.Command, args []string) {
- InitMgo()
- InitMysql()
- go SaveFunc()
- taskT()
- },
- }
- //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
- return cmdClient
- }
- func updateFuc() {
- arru := make([][]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
- func SaveFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MysqlTool.InsertBulk("property_project", BaseField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MysqlTool.InsertBulk("property_project", BaseField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|