123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- package main
- import (
- util "app.yhyue.com/data_processing/common_utils"
- "app.yhyue.com/data_processing/common_utils/elastic"
- "app.yhyue.com/data_processing/common_utils/log"
- "app.yhyue.com/data_processing/common_utils/mongodb"
- "app.yhyue.com/data_processing/common_utils/mysqldb"
- "fieldproject_common/config"
- "fmt"
- "github.com/spf13/cobra"
- "go.mongodb.org/mongo-driver/bson"
- "go.uber.org/zap"
- "sync"
- "time"
- )
- var (
- MongoTool, MongoTool1, MongoTool2 *mongodb.MongodbSim
- MysqlB, MysqlM *mysqldb.Mysql
- Es *elastic.Elastic
- ChEs chan bool
- saveSize int
- savePool chan map[string]interface{}
- saveSp chan bool
- updatePool chan []map[string]interface{}
- updateSp chan bool
- EsSaveCache chan map[string]interface{}
- SP chan bool
- updateEsPool chan []map[string]interface{}
- updateEsSp chan bool
- saveBasePool chan map[string]interface{}
- saveBaseSp chan bool
- saveBasePool1 chan map[string]interface{}
- saveBaseSp1 chan bool
- saveRcPool chan map[string]interface{}
- saveRcSp chan bool
- )
- func init() {
- config.Init("./common.toml")
- InitLog()
- InitMgo()
- InitMysql()
- InitEs()
- ChEs = make(chan bool, 10)
- saveSize = 200
- savePool = make(chan map[string]interface{}, 5000)
- saveSp = make(chan bool, 2)
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 2)
- EsSaveCache = make(chan map[string]interface{}, 1000)
- SP = make(chan bool, 5)
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 1)
- saveBasePool = make(chan map[string]interface{}, 5000)
- saveBaseSp = make(chan bool, 1)
- saveBasePool1 = make(chan map[string]interface{}, 5000)
- saveBaseSp1 = make(chan bool, 1)
- saveRcPool = make(chan map[string]interface{}, 5000)
- saveRcSp = make(chan bool, 1)
- InitField()
- log.Info("init success")
- }
- func main() {
- //go UpdateMethod()
- //task()
- //taskBiddingData()
- //taskCompanyData()
- //taskMedicalData()
- rootCmd := &cobra.Command{Use: "my cmd"}
- rootCmd.AddCommand(institution())
- rootCmd.AddCommand(bidding())
- rootCmd.AddCommand(dealer()) // 经销商
- rootCmd.AddCommand(dealerEs())
- rootCmd.AddCommand(ent()) // 法人
- rootCmd.AddCommand(register()) // 许可备案
- rootCmd.AddCommand(product()) // 产品信息
- rootCmd.AddCommand(project()) // 标的物信息宽表、中标信息
- if err := rootCmd.Execute(); err != nil {
- fmt.Println("rootCmd.Execute failed", err.Error())
- }
- c := make(chan bool, 1)
- <-c
- }
- func task() {
- sess := MongoTool2.GetMgoConn()
- defer MongoTool2.DestoryMongoConn(sess)
- ch := make(chan bool, 2)
- wg := &sync.WaitGroup{}
- //log.Info(fmt.Sprintf("%d", MongoTool.Count("qyxy_0824", nil)))
- field := map[string]interface{}{"company_name": 1, "company_id": 1}
- query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_company_info").Find(nil).Select(field).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%20000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- update := make(map[string]interface{})
- name := util.ObjToString(tmp["company_name"])
- info := MysqlB.FindOne("dws_f_ent_baseinfo", bson.M{"name": name}, "name_id", "")
- if info != nil && len(*info) > 0 {
- update["name_id"] = (*info)["name_id"]
- } else {
- info = MysqlM.FindOne("dws_d_name_id_record", bson.M{"name": name}, "name_id", "")
- if info != nil && len(*info) > 0 {
- update["name_id"] = (*info)["name_id"]
- }
- }
- if len(update) > 0 {
- updatePool <- []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": update},
- }
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- // @Description mysql迭代
- // @Author J 2022/8/9 17:32
- func taskMedicalData() {
- pool := make(chan bool, 10) //控制线程数
- wg := &sync.WaitGroup{}
- finalId := 0
- lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT * FROM %s where mark_id = 4 ORDER BY id DESC LIMIT 1", "institution_baseinfo"))
- //lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT * FROM %s where id=58830 ORDER BY id DESC LIMIT 1", "institution_baseinfo"))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Info("查询最后id---", zap.Int("finally id: ", finalId))
- lastid, count := 0, 0
- for {
- log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
- q := fmt.Sprintf("SELECT id, company_id FROM %s WHERE id > %d AND mark_id = 4 ORDER BY id ASC limit 1000000", "institution_baseinfo", lastid)
- //q := fmt.Sprintf("SELECT id, company_id FROM %s WHERE id=58830 ORDER BY id ASC limit 1000000", "institution_baseinfo")
- rows, err := MysqlM.DB.Query(q)
- if err != nil {
- log.Error("mysql query err ", zap.Error(err))
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- log.Info("----finish-----", zap.Int("count: ", count))
- break
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Error("mysql scan err ", zap.Error(err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["id"])
- count++
- if count%2000 == 0 {
- log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- // mark_id = 1
- //taskB(util.ObjToString(tmp["company_id"]))
- // mark_id = 4
- taskB_1(util.ObjToString(tmp["company_id"]))
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- wg.Wait()
- }
- }
- func SaveMethod() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool.SaveBulk("bidding_p_list_0907", arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool.SaveBulk("bidding_p_list_0907", arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveFunc(table string, arr []string) {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveBasePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveBaseSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveBaseSp
- }()
- MysqlM.InsertBulk(table, arr, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveBaseSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveBaseSp
- }()
- MysqlM.InsertBulk(table, arr, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveFunc1(table string, arr []string) {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveBasePool1:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveBaseSp1 <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveBaseSp1
- }()
- MysqlM.InsertBulk(table, arr, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveBaseSp1 <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveBaseSp1
- }()
- MysqlM.InsertBulk(table, arr, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveFuncRc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveRcPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveRcSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveRcSp
- }()
- MysqlM.InsertBulk("dws_d_name_id_record", RecordField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveRcSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveRcSp
- }()
- MysqlM.InsertBulk("dws_d_name_id_record", RecordField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func UpdateMethod() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool2.UpdateBulk("zktest_mysql_company_info", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool2.UpdateBulk("zktest_mysql_company_info", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|