package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/elastic" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "app.yhyue.com/data_processing/common_utils/redis" "app.yhyue.com/data_processing/common_utils/udp" "encoding/json" "fmt" "github.com/robfig/cron" "github.com/spf13/cobra" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" "net" "os" "proposed_project/config" "sync" "time" ) var ( UdpClient udp.UdpClient Es *elastic.Elastic EsBulkSize = 200 saveEsPool = make(chan map[string]interface{}, 5000) saveEsSp = make(chan bool, 3) ) func main() { rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(tags()) rootCmd.AddCommand(project()) rootCmd.AddCommand(nzjData()) rootCmd.AddCommand(projectAdd()) rootCmd.AddCommand(tidbSave()) rootCmd.AddCommand(tidbAddSave()) rootCmd.AddCommand(redisSave()) rootCmd.AddCommand(esSave()) if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } } func tags() *cobra.Command { cmdClient := &cobra.Command{ Use: "tags", Short: "Start processing tags data", Run: func(cmd *cobra.Command, args []string) { InitRule() go UpdateMethod() taskRun() c := make(chan bool, 1) <-c }, } return cmdClient } func project() *cobra.Command { cmdClient := &cobra.Command{ Use: "project", Short: "Start processing project data", Run: func(cmd *cobra.Command, args []string) { InitRule() loadProject() go updateAllQueue() UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024} UdpClient.Listen(processUdpMsg) log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort)) c := make(chan bool, 1) <-c }, } return cmdClient } func nzjData() *cobra.Command { cmdClient := &cobra.Command{ Use: "nzj-data", Short: "Start processing project data", Run: func(cmd *cobra.Command, args []string) { if LastId == "" { _ = cmd.Help() os.Exit(0) } go saveMethod() sid, eid := taskQ() if sid != "" && eid != "" { doTask1(sid, eid) LastId = eid } crn := cron.New() cronstr := "0 */10 * * * ?" // 每10min执行一次 _ = crn.AddFunc(cronstr, func() { if TaskSingle { sid, eid := taskQ() if sid != "" && eid != "" { TaskSingle = false doTask1(sid, eid) LastId = eid } } else { log.Info("上次任务未执行完成") } }) crn.Start() c := make(chan bool, 1) <-c }, } cmdClient.Flags().StringVarP(&LastId, "lastid", "s", "", "data start id") return cmdClient } func projectAdd() *cobra.Command { cmdClient := &cobra.Command{ Use: "project-add", Short: "Start processing project data", Run: func(cmd *cobra.Command, args []string) { if LastId == "" { _ = cmd.Help() os.Exit(0) } InitRule() loadProject() go updateAllQueue() info, _ := MgoBid.Find("nzj_bidding", nil, `{"_id": -1}`, nil, true, -1, 1) doTask(LastId, mongodb.BsonIdToSId((*info)[0]["_id"])) LastId = mongodb.BsonIdToSId((*info)[0]["_id"]) crn := cron.New() cronstr := "0 */30 * * * ?" // 每30min执行一次 _ = crn.AddFunc(cronstr, func() { if TaskSingle { info, _ := MgoBid.Find("nzj_bidding", nil, `{"_id": -1}`, nil, true, -1, 1) TaskSingle = false doTask(LastId, mongodb.BsonIdToSId((*info)[0]["_id"])) LastId = mongodb.BsonIdToSId((*info)[0]["_id"]) } else { log.Info("上次任务未执行完成") } }) crn.Start() c := make(chan bool, 1) <-c }, } cmdClient.Flags().StringVarP(&LastId, "lastid", "s", "", "data start id") return cmdClient } func tidbSave() *cobra.Command { cmdClient := &cobra.Command{ Use: "tidb", Short: "Start processing project save to tidb", Run: func(cmd *cobra.Command, args []string) { InitMysql() InitArea() go SaveFunc("dwd_f_nzj_baseinfo_new", BaseField) go SaveRFunc("dwd_f_nzj_follw_record_new", RecordField) go SaveCFunc("dwd_f_nzj_contact_new", ContactField) go SaveCyFunc("dwd_f_nzj_category_tags_new", CategoryField) go SaveEntFunc("dwd_f_nzj_ent_new", EntField) redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id taskTidb(nil) c := make(chan bool, 1) <-c }, } return cmdClient } func tidbAddSave() *cobra.Command { cmdClient := &cobra.Command{ Use: "tidb-add", Short: "Start processing project save to tidb", Run: func(cmd *cobra.Command, args []string) { InitMysql() InitArea() //go SaveFunc("dwd_f_nzj_baseinfo", BaseField) //go SaveRFunc("dwd_f_nzj_follw_record", RecordField) //go SaveCFunc("dwd_f_nzj_contact", ContactField) //go SaveCyFunc("dwd_f_nzj_category_tags", CategoryField) redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id taskTidb_add(bson.M{"pici": bson.M{"$gt": pici}}) crn := cron.New() cronstr := "0 */30 * * * ?" // 每30min执行一次 _ = crn.AddFunc(cronstr, func() { if TaskSingle { TaskSingle = false taskTidb_add(bson.M{"pici": bson.M{"$gt": pici}}) } else { log.Info("上次任务未执行完成") } }) crn.Start() c := make(chan bool, 1) <-c }, } cmdClient.Flags().Int64VarP(&pici, "pici", "c", 0, "pici time") return cmdClient } func redisSave() *cobra.Command { cmdClient := &cobra.Command{ Use: "redis", Short: "Start processing project save to tidb", Run: func(cmd *cobra.Command, args []string) { InitMysql() redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id redisDisp() }, } return cmdClient } func esSave() *cobra.Command { cmdClient := &cobra.Command{ Use: "es-save", Short: "Start processing project save to es", Run: func(cmd *cobra.Command, args []string) { InitMysql() InitArea() InitTagCode() InitEs() go SaveEs() esDisp() c := make(chan bool, 1) <-c }, } return cmdClient } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo)) gtid, _ := mapInfo["gtid"].(string) lteid, _ := mapInfo["lteid"].(string) if err != nil || gtid == "" || lteid == "" { UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) } else { //udp成功回写 if k := util.ObjToString(mapInfo["key"]); k != "" { UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } else { k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"])) UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } log.Info("start merge ...") doTask(gtid, lteid) } case udp.OP_NOOP: //下个节点回应 } } func taskQ() (string, string) { log.Info("taskQ", zap.String("lastid", LastId)) query := bson.M{"gtid": bson.M{"$gt": LastId}, "dataprocess": 8} info, _ := MgoBid.Find("bidding_processing_ids", query, `{"_id": -1}`, nil, false, -1, 1) if len(*info) > 0 { newid := util.ObjToString((*info)[0]["lteid"]) log.Info("taskQ", zap.String("新lastid", newid)) return LastId, newid } else { log.Info("taskQ 未发现新数据") return "", "" } } func redisDisp() { pool := make(chan bool, 5) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlTool1.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } util.Debug("taskIterateSql---", "finally id", finalId) lastid, count := 0, 0 for { util.Debug("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT id, name, name_id, area_code, city_code FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_ent_baseinfo", lastid) rows, err := MysqlTool1.DB.Query(q) if err != nil { util.Debug("taskIterateSql---", err) } columns, err := rows.Columns() if finalId == lastid { util.Debug("----finish----------", count) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { util.Debug("taskIterateSql---", err) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%20000 == 0 { util.Debug("current-------", count, lastid) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() redis.PutCKV("ent_id", util.ObjToString(tmp["name"]), fmt.Sprintf("%s_%s_%s", util.ObjToString(tmp["name_id"]), util.ObjToString(tmp["area_code"]), util.ObjToString(tmp["city_code"]))) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } var EsField = []string{"_id", "projectname", "owner", "area", "city", "district", "nature_code", "ownerclass_code", "project_stage_code", "category_code", "total_investment", "lasttime", "proposed_id"} func esDisp() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) client := Es.GetEsConn() defer Es.DestoryEsConn(client) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.Coll).Find(nil).Select(SelectF).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() save := make(map[string]interface{}) for _, f := range EsField { if tmp[f] == nil { continue } if f == "_id" { save["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) save["_id"] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "area" { save[f] = tmp[f] save["area_code"] = AreaCode[util.ObjToString(tmp["area"])] if util.ObjToString(tmp["city"]) != "" { save["area_city"] = tmp["city"] } else { save["area_city"] = tmp["area"] } } else if f == "lasttime" { save[f] = util.Int64All(tmp[f]) * 1000 } else if f == "nature_code" { save[f] = tmp[f] save["nature"] = TagCode["nature"].(map[string]interface{})[util.ObjToString(tmp[f])] } else if f == "ownerclass_code" { save[f] = tmp[f] save["ownerclass"] = TagCode["owner"].(map[string]interface{})[util.ObjToString(tmp[f])] } else if f == "project_stage_code" { save[f] = tmp[f] save["project_stage"] = TagCode["project_stage"].(map[string]interface{})[util.ObjToString(tmp[f])] } else if f == "category_code" { save[f] = tmp[f] save["category"] = TagCode["category"].(map[string]interface{})[util.ObjToString(tmp[f])] } else if f == "total_investment" { text := util.ObjToString(tmp[f]) c := ObjToMoney(text) c = c / 10000 if c != 0 { c, _ = util.FormatFloat(c, 4) save[f] = c } } else { save[f] = tmp[f] } } saveEsPool <- save }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func saveMethod() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MgoBid.SaveBulk("nzj_bidding", arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MgoBid.SaveBulk("nzj_bidding", arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveEs() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveEsPool: arru[indexu] = v indexu++ if indexu == EsBulkSize { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() Es.BulkSave(config.Conf.DB.Es.IndexP, arru) }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() Es.BulkSave(config.Conf.DB.Es.IndexP, arru) }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } }