package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "app.yhyue.com/data_processing/common_utils/redis" "app.yhyue.com/data_processing/common_utils/udp" "data_tidb/config" "encoding/json" "fmt" "github.com/spf13/cobra" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" "net" "sync" "time" ) var ( UdpClient udp.UdpClient ) func init() { config.Init("./common.toml") InitLog() InitMgo() InitMysql() InitField() //redis.InitRedis1("qyxy_id=127.0.0.1:4379", 1) //redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1) log.Info("init success") } func main() { //go SaveFunc() //go SaveTagFunc() //go saveErrMethod() rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(bidding()) rootCmd.AddCommand(project()) if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } //UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024} //UdpClient.Listen(processUdpMsg) //log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort)) c := make(chan bool, 1) <-c } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer util.Catch() switch act { case udp.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo)) gtid, _ := mapInfo["gtid"].(string) lteid, _ := mapInfo["lteid"].(string) if err != nil || gtid == "" || lteid == "" { UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写 } else { //udp成功回写 if k := util.ObjToString(mapInfo["key"]); k != "" { UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } else { k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"])) UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } log.Info("start sync ...") doBiddingTask(gtid, lteid, mapInfo) } } } func taskMysql() { pool := make(chan bool, 5) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 //lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo")) lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("查询最后id---", zap.Int("finally id: ", finalId)) lastid, count := 0, 0 for { log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid)) q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation", lastid) //q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id=61771536 ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation") //q := fmt.Sprintf("SELECT id, name, name_id FROM %s WHERE id>%d ORDER BY id ASC limit 1000000", "dws_f_ent_baseinfo", lastid) rows, err := MysqlTool.DB.Query(q) if err != nil { log.Error("mysql query err ", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { log.Info("----finish-----", zap.Int("count: ", count)) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("mysql scan err ", zap.Error(err)) break } for i, col := range values { if col == nil { ret[columns[i]] = nil } else { switch val := (*scanArgs[i].(*interface{})).(type) { case byte: ret[columns[i]] = val break case []byte: v := string(val) switch v { case "\x00": // 处理数据类型为bit的情况 ret[columns[i]] = 0 case "\x01": // 处理数据类型为bit的情况 ret[columns[i]] = 1 default: ret[columns[i]] = v break } break case time.Time: if val.IsZero() { ret[columns[i]] = nil } else { ret[columns[i]] = val.Format("2006-01-02 15:04:05") } break default: ret[columns[i]] = val } } } lastid = util.IntAll(ret["id"]) count++ if count%20000 == 0 { log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid)) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() cid := util.Int64All(tmp["id"]) pid := util.ObjToString(tmp["projectid"]) name_id := util.ObjToString(tmp["name_id"]) identity_type := util.Int64All(tmp["identity_type+0"]) if name_id != "" { pinfo, _ := MongoP.FindById("projectset", pid, bson.M{"ids": 1}) if len(*pinfo) > 0 { for _, id := range util.ObjArrToStringArr((*pinfo)["ids"].([]interface{})) { coll := "bidding" //if id > "5a862e7040d2d9bbe88e3b1f" { // coll = "bidding" //} else { // coll = "bidding_back" //} info, _ := MongoB.FindById(coll, id, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1}) if len(*info) > 0 { if identity_type == 1 { if util.ObjToString((*info)["buyertel"]) != "" && util.ObjToString((*info)["buyerperson"]) != "" { q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["buyerperson"]), "contact_tel": util.ObjToString((*info)["buyertel"])} cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "") if cinfo != nil && len(*cinfo) > 0 { MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]}) break } } } else if identity_type == 2 { if util.ObjToString((*info)["winnertel"]) != "" && util.ObjToString((*info)["winnerperson"]) != "" { if util.ObjToString((*info)["winnertel"]) != "" && util.ObjToString((*info)["winnerperson"]) != "" { q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["winnerperson"]), "contact_tel": util.ObjToString((*info)["winnertel"])} cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "") if cinfo != nil && len(*cinfo) > 0 { MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]}) break } } } } else if identity_type == 4 { if util.ObjToString((*info)["agencytel"]) != "" && util.ObjToString((*info)["agencyperson"]) != "" { if util.ObjToString((*info)["agencytel"]) != "" && util.ObjToString((*info)["agencyperson"]) != "" { q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["agencyperson"]), "contact_tel": util.ObjToString((*info)["agencytel"])} cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "") if cinfo != nil && len(*cinfo) > 0 { MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]}) break } } } } } } } } //redis.PutCKV("qyxy_id", util.ObjToString(tmp["name"]), util.ObjToString(tmp["name_id"])) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } func taskMgo() { sess := MongoP.GetMgoConn() defer MongoP.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} q := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId("63411488911e1eb3459fb87e")}} field := map[string]interface{}{"ids": 1} query := sess.DB("qfw").C("projectset_20220721").Find(q).Select(field).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { util.Debug("current ---", count, tmp["_id"]) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) for _, i := range util.ObjArrToStringArr(tmp["ids"].([]interface{})) { redis.PutCKV("s_id", i, id) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() util.Debug("over ---", count) } // @Description 标讯数据 // @Author J 2022/9/20 17:52 func bidding() *cobra.Command { cmdClient := &cobra.Command{ Use: "bidding", Short: "Start processing bidding data", Run: func(cmd *cobra.Command, args []string) { go SaveFunc() go SaveTagFunc() //go SaveExpandFunc() //go SaveAttrFunc() //go SaveImfFunc() //go SaveIntentFunc() //go SaveWinnerFunc() //go SavePackageFunc() //go SavePurFunc() go saveErrMethod() taskB() }, } //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]") return cmdClient } // @Description 项目数据 // @Author J 2022/9/20 17:52 func project() *cobra.Command { cmdClient := &cobra.Command{ Use: "project", Short: "Start processing project data", Run: func(cmd *cobra.Command, args []string) { go SaveProFunc() go SaveProTagFunc() go SaveProbFunc() go SaveRelationFunc() taskP() }, } //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]") return cmdClient } func SaveFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveBasePool: arru[indexu] = v indexu++ if indexu == saveSize { saveBaseSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveBaseSp }() MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveBaseSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveBaseSp }() MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveExpandFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveExpandPool: arru[indexu] = v indexu++ if indexu == saveSize { saveExpandSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveExpandSp }() MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveExpandSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveExpandSp }() MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveTagFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveTagPool: arru[indexu] = v indexu++ if indexu == saveSize { saveTagSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveTagSp }() MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveTagSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveTagSp }() MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveAttrFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveAttrPool: arru[indexu] = v indexu++ if indexu == saveSize { saveAttrSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveAttrSp }() MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveAttrSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveAttrSp }() MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveImfFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveIfmPool: arru[indexu] = v indexu++ if indexu == saveSize { saveIfmSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveIfmSp }() MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveIfmSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveIfmSp }() MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SavePurFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePurPool: arru[indexu] = v indexu++ if indexu == saveSize { savePurSp <- true go func(arru []map[string]interface{}) { defer func() { <-savePurSp }() MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { savePurSp <- true go func(arru []map[string]interface{}) { defer func() { <-savePurSp }() MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveIntentFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveIntentPool: arru[indexu] = v indexu++ if indexu == saveSize { saveIntentSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveIntentSp }() MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveIntentSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveIntentSp }() MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveWinnerFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveWinnerPool: arru[indexu] = v indexu++ if indexu == saveSize { saveWinnerSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveWinnerSp }() MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveWinnerSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveWinnerSp }() MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SavePackageFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePkgPool: arru[indexu] = v indexu++ if indexu == saveSize { savePkgSp <- true go func(arru []map[string]interface{}) { defer func() { <-savePkgSp }() MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { savePkgSp <- true go func(arru []map[string]interface{}) { defer func() { <-savePkgSp }() MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveProFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveProPool: arru[indexu] = v indexu++ if indexu == saveSize { saveProSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveProSp }() MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveProSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveProSp }() MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveProbFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveProbPool: arru[indexu] = v indexu++ if indexu == saveSize { saveProbSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveProbSp }() MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveProbSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveProbSp }() MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveProTagFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveProTagPool: arru[indexu] = v indexu++ if indexu == saveSize { saveProTagSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveProTagSp }() MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveProTagSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveProTagSp }() MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveRelationFunc() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveRelationPool: arru[indexu] = v indexu++ if indexu == saveSize { saveRelationSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveRelationSp }() MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveRelationSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveRelationSp }() MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } // 字段错误数据 func saveErrMethod() { arru := make([]map[string]interface{}, 200) indexu := 0 for { select { case v := <-saveErrPool: arru[indexu] = v indexu++ if indexu == saveSize { saveErrSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveErrSp }() MongoB.SaveBulk("bidding_tidb_f_err", arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveErrSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveErrSp }() MongoB.SaveBulk("bidding_tidb_f_err", arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } }