package main import ( "encoding/json" "fmt" "github.com/robfig/cron" "github.com/spf13/cobra" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "math/rand" "net" "os" "proposed_project/config" "strconv" "strings" "sync" "time" "unicode/utf8" ) var ( UdpClient udp.UdpClient Es *elastic.Elastic EsBulkSize = 200 saveEsPool = make(chan map[string]interface{}, 5000) saveEsSp = make(chan bool, 3) ) func main() { rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(tags()) rootCmd.AddCommand(project()) rootCmd.AddCommand(nzjData()) rootCmd.AddCommand(projectAdd()) rootCmd.AddCommand(tidbSave()) rootCmd.AddCommand(tidbSave1()) rootCmd.AddCommand(tidbAddSave()) rootCmd.AddCommand(redisSave()) rootCmd.AddCommand(esSave()) rootCmd.AddCommand(projectComb()) rootCmd.AddCommand(projectCombTidb()) rootCmd.AddCommand(projectCombAdd()) rootCmd.AddCommand(newId()) if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } } func tags() *cobra.Command { cmdClient := &cobra.Command{ Use: "tags", Short: "Start processing tags data", Run: func(cmd *cobra.Command, args []string) { InitRule() go UpdateMethod() taskRun() c := make(chan bool, 1) <-c }, } return cmdClient } func project() *cobra.Command { cmdClient := &cobra.Command{ Use: "project", Short: "Start processing project data", Run: func(cmd *cobra.Command, args []string) { InitRule() loadProject() go updateAllQueue() UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024} UdpClient.Listen(processUdpMsg) log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort)) c := make(chan bool, 1) <-c }, } return cmdClient } func nzjData() *cobra.Command { cmdClient := &cobra.Command{ Use: "nzj-data", Short: "Start processing project data", Run: func(cmd *cobra.Command, args []string) { if LastId == "" { _ = cmd.Help() os.Exit(0) } go saveMethod() sid, eid := taskQ() if sid != "" && eid != "" { doTask1(sid, eid) LastId = eid } crn := cron.New() cronstr := "0 */10 * * * ?" // 每10min执行一次 _ = crn.AddFunc(cronstr, func() { if TaskSingle { sid, eid := taskQ() if sid != "" && eid != "" { TaskSingle = false doTask1(sid, eid) LastId = eid } } else { log.Info("上次任务未执行完成") } }) crn.Start() c := make(chan bool, 1) <-c }, } cmdClient.Flags().StringVarP(&LastId, "lastid", "s", "", "data start id") return cmdClient } func projectAdd() *cobra.Command { cmdClient := &cobra.Command{ Use: "project-add", Short: "Start processing project data", Run: func(cmd *cobra.Command, args []string) { if LastId == "" { _ = cmd.Help() os.Exit(0) } InitRule() loadProject() go updateAllQueue() info, _ := MgoBid.Find("nzj_bidding", nil, `{"_id": -1}`, nil, true, -1, 1) doTask(LastId, mongodb.BsonIdToSId((*info)[0]["_id"])) LastId = mongodb.BsonIdToSId((*info)[0]["_id"]) crn := cron.New() cronstr := "0 */30 * * * ?" // 每30min执行一次 _ = crn.AddFunc(cronstr, func() { if TaskSingle { info, _ := MgoBid.Find("nzj_bidding", nil, `{"_id": -1}`, nil, true, -1, 1) TaskSingle = false doTask(LastId, mongodb.BsonIdToSId((*info)[0]["_id"])) LastId = mongodb.BsonIdToSId((*info)[0]["_id"]) } else { log.Info("上次任务未执行完成") } }) crn.Start() c := make(chan bool, 1) <-c }, } cmdClient.Flags().StringVarP(&LastId, "lastid", "s", "", "data start id") return cmdClient } func tidbSave() *cobra.Command { cmdClient := &cobra.Command{ Use: "tidb", Short: "Start processing project save to tidb", Run: func(cmd *cobra.Command, args []string) { InitMysql() InitArea() go SaveFunc("dwd_f_nzj_baseinfo", BaseField) go SaveRFunc("dwd_f_nzj_follw_record", RecordField) go SaveCFunc("dwd_f_nzj_contact", ContactField) go SaveCyFunc("dwd_f_nzj_category_tags", CategoryField) go SaveEntFunc("dwd_f_nzj_ent", EntField) //redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id taskTidb(nil) c := make(chan bool, 1) <-c }, } return cmdClient } func tidbSave1() *cobra.Command { cmdClient := &cobra.Command{ Use: "tidb1", Short: "Start processing project save to tidb", Run: func(cmd *cobra.Command, args []string) { InitMysql() InitArea() go SaveFunc("dwd_f_nzj_baseinfo", BaseField) redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id taskTidb1() c := make(chan bool, 1) <-c }, } cmdClient.Flags().StringVarP(&id, "pid", "p", "", "pid") return cmdClient } func tidbAddSave() *cobra.Command { cmdClient := &cobra.Command{ Use: "tidb-add", Short: "Start processing project save to tidb", Run: func(cmd *cobra.Command, args []string) { InitMysql() InitArea() //go SaveFunc("dwd_f_nzj_baseinfo", BaseField) //go SaveRFunc("dwd_f_nzj_follw_record", RecordField) //go SaveCFunc("dwd_f_nzj_contact", ContactField) //go SaveCyFunc("dwd_f_nzj_category_tags", CategoryField) redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id taskTidb_add(bson.M{"pici": bson.M{"$gt": pici}}) crn := cron.New() cronstr := "0 */30 * * * ?" // 每30min执行一次 _ = crn.AddFunc(cronstr, func() { if TaskSingle { TaskSingle = false taskTidb_add(bson.M{"pici": bson.M{"$gt": pici}}) } else { log.Info("上次任务未执行完成") } }) crn.Start() c := make(chan bool, 1) <-c }, } cmdClient.Flags().Int64VarP(&pici, "pici", "c", 0, "pici time") return cmdClient } func redisSave() *cobra.Command { cmdClient := &cobra.Command{ Use: "redis", Short: "Start processing project save to tidb", Run: func(cmd *cobra.Command, args []string) { InitMysql() redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id redisDisp() }, } return cmdClient } func esSave() *cobra.Command { cmdClient := &cobra.Command{ Use: "es-save", Short: "Start processing project save to es", Run: func(cmd *cobra.Command, args []string) { InitMysql() InitArea() InitTagCode() InitEs() go SaveEs() esDisp() crn := cron.New() cronstr := "0 */30 * * * ?" // 每30min执行一次 _ = crn.AddFunc(cronstr, func() { if TaskSingle { TaskSingle = false esDisp() } else { log.Info("上次任务未执行完成") } }) crn.Start() c := make(chan bool, 1) <-c }, } cmdClient.Flags().Int64VarP(&pici, "pici", "c", 0, "pici time") return cmdClient } // @Description 拟在建项目关联 // @Author J 2023/4/17 09:05 func projectComb() *cobra.Command { cmdClient := &cobra.Command{ Use: "project-comb", Short: "Start processing combined project", Run: func(cmd *cobra.Command, args []string) { initSeg() InitEs() redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Project, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbt) redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Proposed, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbd) go SavePpMethod() taskComb() c := make(chan bool, 1) <-c }, } return cmdClient } // @Description 拟在建项目关联后入tidb处理 // @Author J 2023/4/17 09:06 func projectCombTidb() *cobra.Command { cmdClient := &cobra.Command{ Use: "project-comb-tidb", Short: "Start processing combined project", Run: func(cmd *cobra.Command, args []string) { InitMysql() initStage() redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Project, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbt) redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Proposed, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbd) go SaveEntFunc1("dwd_f_nzj_ent", EntField) taskD() c := make(chan bool, 1) <-c }, } return cmdClient } // @Description 拟在建关联数据 增量数据处理 // @Author J 2023/4/24 13:51 func projectCombAdd() *cobra.Command { cmdClient := &cobra.Command{ Use: "project-comb-add", Short: "Start processing combined project add", Run: func(cmd *cobra.Command, args []string) { initSeg() InitEs() InitMysql() initStage() redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Project, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbt) redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Proposed, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbd) go SaveEntFunc1("dwd_f_nzj_ent", EntField) //go taskAA() //go taskBB() crn := cron.New() cronstr := "0 0 22 * * ?" // 每天10点执行一次 _ = crn.AddFunc(cronstr, func() { if TaskSingle { //go taskAA() go taskBB() } else { log.Info("上次任务未执行完成") } }) crn.Start() c := make(chan bool, 1) <-c }, } return cmdClient } func newId() *cobra.Command { cmdClient := &cobra.Command{ Use: "newid", Short: "Start processing data", Run: func(cmd *cobra.Command, args []string) { InitMysql() info := MysqlTool.Find("dwd_f_user_claim", nil, "id, project_id", "", -1, -1) for _, m := range *info { update := make(map[string]interface{}) pid := util.ObjToString(m["project_id"]) info := MysqlTool.FindOne("dwd_f_nzj_baseinfo", bson.M{"proposed_id": pid}, "", "") if info != nil && len(*info) > 0 { update["new_id"] = pid } else { info = MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"infoid": pid}, "proposed_id", "") if info != nil && len(*info) > 0 { update["new_id"] = util.ObjToString((*info)["proposed_id"]) } else { util.Debug("err ---", pid) } } if len(update) > 0 { util.Debug(update) MysqlTool.Update("dwd_f_user_claim", bson.M{"id": util.ObjToString(m["id"])}, update) } } }, } return cmdClient } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo)) gtid, _ := mapInfo["gtid"].(string) lteid, _ := mapInfo["lteid"].(string) if err != nil || gtid == "" || lteid == "" { UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) } else { //udp成功回写 if k := util.ObjToString(mapInfo["key"]); k != "" { UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } else { k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"])) UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } log.Info("start merge ...") doTask(gtid, lteid) } case udp.OP_NOOP: //下个节点回应 } } func taskQ() (string, string) { log.Info("taskQ", zap.String("lastid", LastId)) query := bson.M{"gtid": bson.M{"$gt": LastId}} info, _ := MgoBid.Find("field_data_record", query, `{"_id": -1}`, nil, false, -1, 1) if len(*info) > 0 { newid := util.ObjToString((*info)[0]["lteid"]) log.Info("taskQ", zap.String("新lastid", newid)) return LastId, newid } else { log.Info("taskQ 未发现新数据") return "", "" } } func redisDisp() { pool := make(chan bool, 5) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlTool1.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } util.Debug("taskIterateSql---", "finally id", finalId) lastid, count := 0, 0 for { util.Debug("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT id, name, name_id, area_code, city_code FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_ent_baseinfo", lastid) rows, err := MysqlTool1.DB.Query(q) if err != nil { util.Debug("taskIterateSql---", err) } columns, err := rows.Columns() if finalId == lastid { util.Debug("----finish----------", count) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { util.Debug("taskIterateSql---", err) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%20000 == 0 { util.Debug("current-------", count, lastid) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() redis.PutCKV("ent_id", util.ObjToString(tmp["name"]), fmt.Sprintf("%s_%s_%s", util.ObjToString(tmp["name_id"]), util.ObjToString(tmp["area_code"]), util.ObjToString(tmp["city_code"]))) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } var EsField = []string{"_id", "projectname", "owner", "area", "city", "district", "nature_code", "ownerclass_code", "project_stage_code", "category_code", "total_investment", "lasttime", "proposed_id"} func esDisp() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) client := Es.GetEsConn() defer Es.DestoryEsConn(client) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} var q bson.M if pici > 0 { q = bson.M{"pici": bson.M{"$gt": pici}} } query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(q).Select(SelectF).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%200 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } if t := util.Int64All(tmp["pici"]); t > pici { pici = t } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() save := make(map[string]interface{}) for _, f := range EsField { if tmp[f] == nil { continue } if f == "_id" { save["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) save["_id"] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "area" { save[f] = tmp[f] save["area_code"] = AreaCode[util.ObjToString(tmp["area"])] if util.ObjToString(tmp["city"]) != "" { save["area_city"] = tmp["city"] } else { save["area_city"] = tmp["area"] } } else if f == "lasttime" { save[f] = util.Int64All(tmp[f]) * 1000 } else if f == "nature_code" { save[f] = tmp[f] save["nature"] = TagCode["nature"].(map[string]interface{})[util.ObjToString(tmp[f])] } else if f == "ownerclass_code" { save[f] = tmp[f] save["ownerclass"] = TagCode["owner"].(map[string]interface{})[util.ObjToString(tmp[f])] } else if f == "project_stage_code" { save[f] = tmp[f] save["project_stage"] = TagCode["project_stage"].(map[string]interface{})[util.ObjToString(tmp[f])] } else if f == "category_code" { save[f] = tmp[f] save["category"] = TagCode["category"].(map[string]interface{})[util.ObjToString(tmp[f])] } else if f == "total_investment" { text := util.ObjToString(tmp[f]) c := ObjToMoney(text) c = c / 10000 if c != 0 { c, _ = util.FormatFloat(c, 4) save[f] = c } } else { save[f] = tmp[f] } } saveEsPool <- save }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) TaskSingle = true } func saveMethod() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MgoBid.SaveBulk("nzj_bidding", arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MgoBid.SaveBulk("nzj_bidding", arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveEs() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveEsPool: arru[indexu] = v indexu++ if indexu == EsBulkSize { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() Es.BulkSave(config.Conf.DB.Es.IndexP, arru) }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() Es.BulkSave(config.Conf.DB.Es.IndexP, arru) }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } } func main1() { InitMysql() InitArea() redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id //sql := fmt.Sprintf("SELECT id, new_id FROM %s", "dwd_f_user_claim") info := MysqlTool.SelectBySql("SELECT * FROM dwd_f_user_claim where project_id not in (SELECT proposed_id FROM dwd_f_nzj_baseinfo dfnb)") for _, m := range *info { fid := util.ObjToString(m["new_id"]) //m1 := MysqlTool.Find("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": fid}, "id", "", -1, -1) pinfo, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProposedColl, fid, nil) if len(*pinfo) > 0 { } else { pinfo, _ = MgoPro.FindOne(config.Conf.DB.MongoP.ProposedColl, bson.M{"ids": fid}) MysqlTool.Update("dwd_f_user_claim", bson.M{"project_id": util.ObjToString(m["project_id"])}, bson.M{"new_id": mongodb.BsonIdToSId((*pinfo)["_id"])}) } saveM := make(map[string]interface{}) for _, f := range BaseField { if f == "lasttime" || f == "firsttime" { if t := util.Int64All((*pinfo)[f]); t > 0 { saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "proposed_id" { saveM[f] = mongodb.BsonIdToSId((*pinfo)["_id"]) } else if f == "area_code" { if (*pinfo)["area"] != nil { saveM[f] = AreaCode[util.ObjToString((*pinfo)["area"])] } } else if f == "city_code" { if (*pinfo)["area"] != nil && (*pinfo)["city"] != nil { c := util.ObjToString((*pinfo)["area"]) + "," + util.ObjToString((*pinfo)["city"]) saveM[f] = AreaCode[c] } } else if f == "owner" { if v := util.ObjToString((*pinfo)[f]); v != "" { if utf8.RuneCountInString(v) < 100 { saveM[f] = v } } } else if f == "name_id" { if b := util.ObjToString((*pinfo)["owner"]); b != "" { if eid := redis.GetStr("ent_id", b); eid != "" { saveM["name_id"] = strings.Split(eid, "_")[0] } } } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" { if (*pinfo)[f] != nil && util.IntAll((*pinfo)[f]) > 0 { t := util.Int64All((*pinfo)[f]) saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "createtime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "total_investment" { text := util.ObjToString((*pinfo)[f]) capital := ObjToMoney(text) capital = capital / 10000 if capital != 0 { capital, _ = util.FormatFloat(capital, 6) saveM[f] = capital } } else if f == "approvestatus" { if util.ObjToString((*pinfo)[f]) != "" && utf8.RuneCountInString(util.ObjToString((*pinfo)[f])) < 8 { saveM[f] = (*pinfo)[f] } } else if f == "proposed_number" { if (*pinfo)[f] == nil { now := time.Now().Unix() st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd) parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制 rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数 saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd) } else { saveM[f] = (*pinfo)[f] } } else if f == "approvecode" { if util.ObjToString((*pinfo)[f]) != "" && utf8.RuneCountInString(util.ObjToString((*pinfo)[f])) < 200 { saveM[f] = (*pinfo)[f] } } else if f == "floor_area" { if util.ObjToString((*pinfo)[f]) != "" && utf8.RuneCountInString(util.ObjToString((*pinfo)[f])) < 255 { saveM[f] = (*pinfo)[f] } } else { if (*pinfo)[f] != nil { saveM[f] = (*pinfo)[f] } } } MysqlTool.Insert("dwd_f_nzj_baseinfo", saveM) } }