123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- package main
- import (
- util "app.yhyue.com/data_processing/common_utils"
- "app.yhyue.com/data_processing/common_utils/elastic"
- "app.yhyue.com/data_processing/common_utils/log"
- "app.yhyue.com/data_processing/common_utils/mongodb"
- "app.yhyue.com/data_processing/common_utils/mysqldb"
- "fieldproject_common/config"
- "fmt"
- "github.com/spf13/cobra"
- "go.uber.org/zap"
- "sync"
- "time"
- )
- var (
- MongoTool, MongoTool1, MongoTool2 *mongodb.MongodbSim
- MysqlB, MysqlM *mysqldb.Mysql
- Es *elastic.Elastic
- ChEs chan bool
- saveSize int
- savePool chan map[string]interface{}
- saveSp chan bool
- EsSaveCache chan map[string]interface{}
- SP chan bool
- updateEsPool chan []map[string]interface{}
- updateEsSp chan bool
- )
- func init() {
- config.Init("./common.toml")
- InitLog()
- InitMgo()
- InitMysql()
- InitEs()
- ChEs = make(chan bool, 10)
- saveSize = 200
- savePool = make(chan map[string]interface{}, 5000)
- saveSp = make(chan bool, 2)
- EsSaveCache = make(chan map[string]interface{}, 1000)
- SP = make(chan bool, 5)
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 1)
- log.Info("init success")
- }
- func main() {
- //task()
- //taskBiddingData()
- //taskCompanyData()
- //taskMedicalData()
- rootCmd := &cobra.Command{Use: "my cmd"}
- rootCmd.AddCommand(institution())
- rootCmd.AddCommand(product())
- rootCmd.AddCommand(bidding())
- if err := rootCmd.Execute(); err != nil {
- fmt.Println("rootCmd.Execute failed", err.Error())
- }
- c := make(chan bool, 1)
- <-c
- }
- func task() {
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- ch := make(chan bool, 2)
- wg := &sync.WaitGroup{}
- log.Info(fmt.Sprintf("%d", MongoTool.Count("qyxy_0824", nil)))
- field := map[string]interface{}{"use_flag": 0, "province_short": 0, "create_time": 0, "update_time": 0}
- query := sess.DB(config.Conf.DB.Mongo.Dbname).C("qyxy_0824").Find(nil).Select(field).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%2000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- delete(tmp, "_id")
- m := make(map[string]interface{})
- if util.ObjToString(tmp["company_district"]) != "" {
- m["district"] = tmp["company_district"]
- } else if util.ObjToString(tmp["city"]) != "" {
- m["city"] = tmp["company_city"]
- } else {
- m["area"] = tmp["company_area"]
- }
- if len(m) > 0 {
- info := MysqlB.FindOne("code_area", m, "", "")
- if info != nil && len(*info) > 0 {
- tmp["area_code"] = (*info)["code"]
- } else {
- tmp["area_code"] = "000000"
- }
- } else {
- tmp["area_code"] = "000000"
- }
- delete(tmp, "company_area")
- delete(tmp, "company_city")
- delete(tmp, "company_district")
- tmp["comeintime"] = time.Now()
- tmp["updatetime"] = time.Now()
- tmp["sourcetype"] = 1
- MysqlB.Insert("company_business_model", map[string]interface{}{"company_id": tmp["company_id"],
- "business_model": util.IntAll(tmp["business_type"]), "company_field_code": "0101", "comeintime": time.Now()})
- delete(tmp, "business_type")
- MysqlB.Insert("company_baseinfo", tmp)
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- // @Description mysql迭代
- // @Author J 2022/8/9 17:32
- func taskMedicalData() {
- pool := make(chan bool, 10) //控制线程数
- wg := &sync.WaitGroup{}
- finalId := 0
- lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT * FROM %s where mark_id = 4 ORDER BY id DESC LIMIT 1", "institution_baseinfo"))
- //lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT * FROM %s where id=58830 ORDER BY id DESC LIMIT 1", "institution_baseinfo"))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Info("查询最后id---", zap.Int("finally id: ", finalId))
- lastid, count := 0, 0
- for {
- log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
- q := fmt.Sprintf("SELECT id, company_id FROM %s WHERE id > %d AND mark_id = 4 ORDER BY id ASC limit 1000000", "institution_baseinfo", lastid)
- //q := fmt.Sprintf("SELECT id, company_id FROM %s WHERE id=58830 ORDER BY id ASC limit 1000000", "institution_baseinfo")
- rows, err := MysqlM.DB.Query(q)
- if err != nil {
- log.Error("mysql query err ", zap.Error(err))
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- log.Info("----finish-----", zap.Int("count: ", count))
- break
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Error("mysql scan err ", zap.Error(err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["id"])
- count++
- if count%2000 == 0 {
- log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- // mark_id = 1
- //taskB(util.ObjToString(tmp["company_id"]))
- // mark_id = 4
- taskB_1(util.ObjToString(tmp["company_id"]))
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- wg.Wait()
- }
- }
- func SaveMethod() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool.SaveBulk("bidding_p_list_0907", arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool.SaveBulk("bidding_p_list_0907", arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|