package main import ( "go.uber.org/zap" "gorm.io/gorm" "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "strings" ) var ( Mgo *mongodb.MongodbSim //87 环境,采集 存储的MongoDB MysqlDB *gorm.DB ) func main() { Init() dealProject() // 处理项目、债券数据 dealProjectBondRelation() //更新项目和债券的关联关系 } // dealProject 处理项目相关数据 func dealProject() { tables := strings.Split(GF.Mongob.List, ",") detailNames := strings.Split(GF.Mongob.Detail, ",") //table := GF.Mongob.List //detailName := GF.Mongob.Detail //专项债详细表 sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) for k, table := range tables { detailName := detailNames[k] /** 这里测试用,使用 “新建成都至达州至万州铁路(南充段)(万源市)” 这个项目测试,他有变更信息 */ //where1 := map[string]interface{}{ // //"projectName": "新建成都至达州至万州铁路(南充段)(万源市)", // "projectName": "新建成都至达州至万州铁路(南充段)(万源市)", //} //query := sess.DB("py_theme").C(table).Find(where1).Select(nil).Iter() query := sess.DB("py_theme").C(table).Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%100 == 0 { log.Info("current:", zap.Int("count", count), zap.Any("projectName", tmp["projectName"]), zap.String(table, detailName)) } projectName := util.ObjToString(tmp["projectName"]) // where := map[string]interface{}{ "project.jcxx.projectName": projectName, } detail, _ := Mgo.FindOne(detailName, where) if len(*detail) == 0 { continue } //1.项目数据 if project, ok := (*detail)["project"].(map[string]interface{}); ok { //1.基础信息 var projectId int if jcxx, ok := project["jcxx"].(map[string]interface{}); ok { projectId = dealProjectBaseInfo(jcxx, projectName) } //2.还本付息 if hbfx, ok := project["hbfx"].(map[string]interface{}); ok { dealProjectRepayment(hbfx, projectName, projectId) } //3.变更 if bg, ok := project["bg"].([]interface{}); ok { if len(bg) > 0 { dealProjectChange(bg, projectName, projectId) } } //4.发行明细 if fxmx, ok := project["fxmx"].([]interface{}); ok { if len(fxmx) > 0 { dealProjectIssueDetails(fxmx, projectName, projectId) } } } //2.处理债券信息 if bonds, ok := (*detail)["bond"].([]interface{}); ok { dealBondInfo(bonds, projectName) } } log.Info("dealProject over ", zap.Int("total", count)) } } // dealProjectBaseInfo 处理项目基本信息 func dealProjectBaseInfo(jcxx map[string]interface{}, projectName string) (projectID int) { project_base_info_exist := ProjectBaseInfo{} err := MysqlDB.Where(&ProjectBaseInfo{ProjectName: projectName}).First(&project_base_info_exist).Error if err != nil && err != gorm.ErrRecordNotFound { // 处理查询错误 log.Error("dealProjectBaseInfo;Error checking for existing project", zap.Error(err)) } //当前项目基本信息已经存在 if project_base_info_exist.ID > 0 { // 直接修改已存在记录的字段 project_base_info_exist.TotalInvestment = util.Float64All(jcxx["totalInvestment"]) project_base_info_exist.Area = util.ObjToString(jcxx["regionName"]) project_base_info_exist.City = strings.ReplaceAll(util.ObjToString(jcxx["cityName"]), "本级", "") project_base_info_exist.District = strings.ReplaceAll(util.ObjToString(jcxx["countyName"]), "本级", "") project_base_info_exist.Capital = util.Float64All(jcxx["capital"]) project_base_info_exist.ApplyTotalBonds = util.Float64All(jcxx["applyDebt"]) project_base_info_exist.OtherDebtFinancing = util.Float64All(jcxx["portfolioFinancing"]) project_base_info_exist.SpecialDebtCapital = util.Float64All(jcxx["specialDebtAsCapital"]) project_base_info_exist.ExpectedReturn = util.Float64All(jcxx["expectedReturn"]) project_base_info_exist.ProjectCost = util.IntAll(jcxx["projectCost"]) project_base_info_exist.ProjectDomain = util.ObjToString(jcxx["projectTypeName3"]) project_base_info_exist.ProjectOwner = util.ObjToString(jcxx["projectSubject"]) project_base_info_exist.StartDate = util.ObjToString(jcxx["startDate"]) project_base_info_exist.EndDate = util.ObjToString(jcxx["endDate"]) project_base_info_exist.OperationStartDate = util.ObjToString(jcxx["operationStartDate"]) project_base_info_exist.OperationEndDate = util.ObjToString(jcxx["operationEndDate"]) project_base_info_exist.SourceIncome = util.ObjToString(jcxx["sourceIncome"]) project_base_info_exist.ConstructionContent = util.ObjToString(jcxx["constructionContent"]) project_base_info_exist.Remarks = util.ObjToString(jcxx["remarks"]) project_base_info_exist.OtherDebtFinancingSource = util.ObjToString(jcxx["portfolioFinancingSource"]) project_base_info_exist.CostIncomePercent = util.ObjToString(jcxx["costIncomePercent"]) project_base_info_exist.CoverageMultiple = util.Float64All(jcxx["coverageMultiple"]) project_base_info_exist.CompetentDepartment = util.ObjToString(jcxx["implementingAgency"]) project_base_info_exist.AccountingFirm = util.ObjToString(jcxx["accountingFirm"]) project_base_info_exist.LawFirm = util.ObjToString(jcxx["lawFirm"]) project_base_info_exist.UpdateDate = util.ObjToString(jcxx["updateTime"]) project_base_info_exist.CreateDate = util.ObjToString(jcxx["createTime"]) // 使用 Save 来更新 err = MysqlDB.Save(&project_base_info_exist).Error if err != nil { log.Info("dealProject Save (Update) ", zap.Error(err)) } return project_base_info_exist.ID } else { project_base_info := ProjectBaseInfo{ ProjectName: projectName, TotalInvestment: util.Float64All(jcxx["totalInvestment"]), Area: util.ObjToString(jcxx["regionName"]), City: strings.ReplaceAll(util.ObjToString(jcxx["cityName"]), "本级", ""), District: strings.ReplaceAll(util.ObjToString(jcxx["countyName"]), "本级", ""), Capital: util.Float64All(jcxx["capital"]), ApplyTotalBonds: util.Float64All(jcxx["applyDebt"]), OtherDebtFinancing: util.Float64All(jcxx["portfolioFinancing"]), SpecialDebtCapital: util.Float64All(jcxx["specialDebtAsCapital"]), ExpectedReturn: util.Float64All(jcxx["expectedReturn"]), ProjectCost: util.IntAll(jcxx["projectCost"]), ProjectDomain: util.ObjToString(jcxx["projectTypeName3"]), // 项目领域 ProjectOwner: util.ObjToString(jcxx["projectSubject"]), //项目业主 StartDate: util.ObjToString(jcxx["startDate"]), EndDate: util.ObjToString(jcxx["endDate"]), OperationStartDate: util.ObjToString(jcxx["operationStartDate"]), OperationEndDate: util.ObjToString(jcxx["operationEndDate"]), SourceIncome: util.ObjToString(jcxx["sourceIncome"]), ConstructionContent: util.ObjToString(jcxx["constructionContent"]), Remarks: util.ObjToString(jcxx["remarks"]), OtherDebtFinancingSource: util.ObjToString(jcxx["portfolioFinancingSource"]), CostIncomePercent: util.ObjToString(jcxx["costIncomePercent"]), CoverageMultiple: util.Float64All(jcxx["coverageMultiple"]), CompetentDepartment: util.ObjToString(jcxx["implementingAgency"]), AccountingFirm: util.ObjToString(jcxx["accountingFirm"]), LawFirm: util.ObjToString(jcxx["lawFirm"]), CreateDate: util.ObjToString(jcxx["createTime"]), UpdateDate: util.ObjToString(jcxx["updateTime"]), } err = MysqlDB.Create(&project_base_info).Error if err != nil { log.Info("dealProjectBaseInfo;dealProject Create ", zap.Error(err), zap.String("project", projectName)) } return project_base_info.ID } } // dealProjectRepayment 处理项目-还本付息 func dealProjectRepayment(hbfx map[string]interface{}, projectName string, projectID int) { project_repayment_exist := ProjectRepayment{} err := MysqlDB.Where(&ProjectRepayment{ProjectName: projectName, BondName: util.ObjToString(hbfx["bondName"])}).First(&project_repayment_exist).Error if err != nil && err != gorm.ErrRecordNotFound { // 处理查询错误 log.Error("dealProjectRepayment;Error checking for existing project", zap.Error(err)) } if project_repayment_exist.ID > 0 { project_repayment_exist.IssueTerm = util.IntAll(hbfx["issueTerm"]) project_repayment_exist.PayInterestMethodName = util.ObjToString(hbfx["payInterestMethodName"]) project_repayment_exist.ValueDate = util.ObjToString(hbfx["valueDate"]) project_repayment_exist.InterestDate = util.ObjToString(hbfx["payInterestDate"]) project_repayment_exist.LastInterestDate = util.ObjToString(hbfx["latelyPayInterestDate"]) project_repayment_exist.ReminderRepayDays = util.IntAll(hbfx["days"]) project_repayment_exist.MaturityDate = util.ObjToString(hbfx["expiryDate"]) project_repayment_exist.DebtService = util.Float64All(hbfx["repayCapitalWithInterest"]) project_repayment_exist.RedemptionMethod = util.ObjToString(hbfx["redemptionMethod"]) project_repayment_exist.CumulativePayInterest = util.IntAll(hbfx["cumulativePayInterest"]) project_repayment_exist.IsEarlyRepayPrincipal = util.ObjToString(hbfx["isEarlyRepayPrincipal"]) project_repayment_exist.Remarks = util.ObjToString(hbfx["remarks"]) // 使用 Save 来更新 err = MysqlDB.Save(&project_repayment_exist).Error if err != nil { log.Info("dealProjectRepayment Save (Update) ", zap.Error(err)) } } else { project_payment := ProjectRepayment{ ProjectName: projectName, ProjectID: projectID, BondName: util.ObjToString(hbfx["bondName"]), IssueTerm: util.IntAll(hbfx["issueTerm"]), PayInterestMethodName: util.ObjToString(hbfx["payInterestMethodName"]), ValueDate: util.ObjToString(hbfx["valueDate"]), InterestDate: util.ObjToString(hbfx["payInterestDate"]), LastInterestDate: util.ObjToString(hbfx["latelyPayInterestDate"]), ReminderRepayDays: util.IntAll(hbfx["days"]), //提醒还款天数 MaturityDate: util.ObjToString(hbfx["expiryDate"]), //到期日 DebtService: util.Float64All(hbfx["repayCapitalWithInterest"]), //还本付息(万元) RedemptionMethod: util.ObjToString(hbfx["redemptionMethod"]), CumulativePayInterest: util.IntAll(hbfx["cumulativePayInterest"]), IsEarlyRepayPrincipal: util.ObjToString(hbfx["isEarlyRepayPrincipal"]), Remarks: util.ObjToString(hbfx["remarks"]), } err = MysqlDB.Create(&project_payment).Error if err != nil { log.Info("dealProjectRepayment;dealProject Create ", zap.Error(err), zap.String("project", projectName)) } } } // dealProjectChange 处理项目变更 func dealProjectChange(bg []interface{}, projectName string, projectId int) { for _, v := range bg { if bgda, ok := v.(map[string]interface{}); ok { project_change := ProjectChange{ ProjectName: projectName, ProjectID: projectId, ChangeContent: util.ObjToString(bgda["changeContent"]), UpdateReason: util.ObjToString(bgda["updateReason"]), SubmitTime: util.ObjToString(bgda["submitTime"]), } err := MysqlDB.Create(&project_change).Error if err != nil { log.Info("dealProjectChange; Create err", zap.Error(err)) } } } } // dealProjectIssueDetails 处理项目发行明细 func dealProjectIssueDetails(fxmx []interface{}, projectName string, projectId int) { for _, v := range fxmx { if fx, ok := v.(map[string]interface{}); ok { project_bach_name := util.ObjToString(fx["projectBatchName"]) issue_detail_exist := ProjectIssueDetails{} err := MysqlDB.Where(&ProjectIssueDetails{ProjectName: projectName, ProjectBachName: project_bach_name, BondName: util.ObjToString(fx["bondName"])}).First(&issue_detail_exist).Error if err != nil && err != gorm.ErrRecordNotFound { // 处理查询错误 log.Error("dealProjectIssueDetails;Error checking for existing project", zap.Error(err)) } // 存在 if issue_detail_exist.ID > 0 { issue_detail_exist.FirstPublishDate = util.ObjToString(fx["firstPublishDate"]) issue_detail_exist.BatchNum = util.IntAll(fx["batchNum"]) issue_detail_exist.PresentIssueAmount = util.Float64All(fx["presentIssueAmount"]) issue_detail_exist.IssueInterestRate = util.Float64All(fx["issueInterestRate"]) issue_detail_exist.PresentAsSpecialAmount = util.Float64All(fx["presentAsSpecialAmount"]) issue_detail_exist.TotalIssueAmount = util.Float64All(fx["totalIssueAmount"]) issue_detail_exist.ReviseLog = util.ObjToString(fx["revise_log"]) err = MysqlDB.Save(&issue_detail_exist).Error if err != nil { log.Info("dealProjectIssueDetails Save (Update) ", zap.Error(err)) } } else { issue_detail := ProjectIssueDetails{ ProjectName: projectName, ProjectID: projectId, ProjectBachName: project_bach_name, BondName: util.ObjToString(fx["bondName"]), FirstPublishDate: util.ObjToString(fx["firstPublishDate"]), BatchNum: util.IntAll(fx["batchNum"]), PresentIssueAmount: util.Float64All(fx["presentIssueAmount"]), IssueInterestRate: util.Float64All(fx["issueInterestRate"]), PresentAsSpecialAmount: util.Float64All(fx["presentAsSpecialAmount"]), TotalIssueAmount: util.Float64All(fx["totalIssueAmount"]), ReviseLog: util.ObjToString(fx["revise_log"]), } err := MysqlDB.Create(&issue_detail).Error if err != nil { log.Info("dealProjectIssueDetails; Create err", zap.Error(err)) } } } } } // dealBondInfo 处理债券信息 func dealBondInfo(bonds []interface{}, projectName string) { for _, v := range bonds { if bond, ok := v.(map[string]interface{}); ok { //1.基本信息 if jbxx, ok := bond["jbxx"].(map[string]interface{}); ok { dealBondBase(jbxx, projectName) } //2.债券-修改记录;xgjl if xgjl, ok := bond["xgjl"].([]interface{}); ok && len(xgjl) > 0 { dealBondChange(xgjl) } ////3.相关小项目 //if xgxxx, ok := bond["xgxxx"].([]interface{}); ok && len(xgxxx) > 0 { // dealRelationProject(xgxxx) //} } } } // dealBondBase 处理债券基本信息 func dealBondBase(jbxx map[string]interface{}, projectName string) { bond_info_exist := BondInfo{} err := MysqlDB.Where(&BondInfo{BondName: util.ObjToString(jbxx["bondName"]), BondNo: util.ObjToString(jbxx["bondNo"])}).First(&bond_info_exist).Error if err != nil && err != gorm.ErrRecordNotFound { // 处理查询错误 log.Error("dealBondBase;Error checking for existing project", zap.Error(err)) } if bond_info_exist.ID > 0 { bond_info_exist.BondShortName = util.ObjToString(jbxx["bondName"]) bond_info_exist.Area = util.ObjToString(jbxx["regionName"]) bond_info_exist.BondNature = util.ObjToString(jbxx["bondType1Name"]) bond_info_exist.BondType = util.ObjToString(jbxx["bondType2Name"]) bond_info_exist.OfficialProjectType = util.ObjToString(jbxx["projectType1Name"]) bond_info_exist.TotalAmount = util.Float64All(jbxx["totalAmount"]) bond_info_exist.IssueDate = util.ObjToString(jbxx["issueDate"]) bond_info_exist.IssuePlace = util.ObjToString(jbxx["issuePlaceName"]) bond_info_exist.IssueTerm = util.IntAll(jbxx["issueTerm"]) bond_info_exist.IssueInterestRate = util.ObjToString(jbxx["issueInterestRate"]) bond_info_exist.IssuePhase = util.ObjToString(jbxx["issuePhase"]) bond_info_exist.WayOfPayInterest = util.ObjToString(jbxx["payInterestMethodName"]) bond_info_exist.NewBondAmount = util.Float64All(jbxx["newBondAmount"]) bond_info_exist.CounterBondAmount = util.Float64All(jbxx["counterBondAmount"]) bond_info_exist.RefinancingBondAmount = util.Float64All(jbxx["refinancingBondAmount"]) bond_info_exist.RedemptionMethod = util.ObjToString(jbxx["redemptionMethod"]) bond_info_exist.ValueDate = util.ObjToString(jbxx["valueDate"]) bond_info_exist.ExpiryDate = util.ObjToString(jbxx["expiryDate"]) bond_info_exist.PayInterestDate = util.ObjToString(jbxx["payInterestDate"]) bond_info_exist.LatePayInterestDate = util.ObjToString(jbxx["latelyPayInterestDate"]) bond_info_exist.IsEarlyRepayPrincipal = util.ObjToString(jbxx["isEarlyRepayPrincipal"]) bond_info_exist.CumulativePayInterest = util.Float64All(jbxx["cumulativePayInterest"]) bond_info_exist.IsCounterBond = util.ObjToString(jbxx["isCounterBond"]) err = MysqlDB.Save(&bond_info_exist).Error if err != nil { log.Info("dealBondBase Save (Update) ", zap.Error(err)) } } else { bond_info := BondInfo{ BondName: util.ObjToString(jbxx["bondName"]), BondShortName: util.ObjToString(jbxx["bondShortName"]), BondNo: util.ObjToString(jbxx["bondNo"]), Area: util.ObjToString(jbxx["regionName"]), BondNature: util.ObjToString(jbxx["bondType1Name"]), BondType: util.ObjToString(jbxx["bondType2Name"]), OfficialProjectType: util.ObjToString(jbxx["projectType1Name"]), TotalAmount: util.Float64All(jbxx["totalAmount"]), IssueDate: util.ObjToString(jbxx["issueDate"]), IssuePlace: util.ObjToString(jbxx["issuePlaceName"]), IssueTerm: util.IntAll(jbxx["issueTerm"]), IssueInterestRate: util.ObjToString(jbxx["issueInterestRate"]), IssuePhase: util.ObjToString(jbxx["issuePhase"]), WayOfPayInterest: util.ObjToString(jbxx["payInterestMethodName"]), NewBondAmount: util.Float64All(jbxx["newBondAmount"]), CounterBondAmount: util.Float64All(jbxx["counterBondAmount"]), RefinancingBondAmount: util.Float64All(jbxx["refinancingBondAmount"]), RedemptionMethod: util.ObjToString(jbxx["redemptionMethod"]), ValueDate: util.ObjToString(jbxx["valueDate"]), ExpiryDate: util.ObjToString(jbxx["expiryDate"]), PayInterestDate: util.ObjToString(jbxx["payInterestDate"]), LatePayInterestDate: util.ObjToString(jbxx["latelyPayInterestDate"]), RemindPayDays: util.IntAll(jbxx["days"]), LastPayInterest: util.Float64All(jbxx["lastPayInterest"]), IsEarlyRepayPrincipal: util.ObjToString(jbxx["isEarlyRepayPrincipal"]), CumulativePayInterest: util.Float64All(jbxx["cumulativePayInterest"]), IsCounterBond: util.ObjToString(jbxx["isCounterBond"]), } // err = MysqlDB.Create(&bond_info).Error if err != nil { log.Info("dealBondBase;dealProject Create ", zap.Error(err), zap.String("project", projectName)) } } } // dealBondChange 修改债券修改信息 func dealBondChange(xgjl []interface{}) { } // dealProjectBondRelation 处理项目债券关联关系 func dealProjectBondRelation() { //detailName := GF.Mongob.Detail //专项债详细表 detailNames := strings.Split(GF.Mongob.Detail, ",") sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) for _, detailName := range detailNames { query := sess.DB("py_theme").C(detailName).Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%100 == 0 { log.Info("current:", zap.Int("count", count)) } if project, ok := tmp["project"].(map[string]interface{}); ok { //1.基础信息 if jcxx, ok := project["jcxx"].(map[string]interface{}); ok { project_name := util.ObjToString(jcxx["projectName"]) //fmt.Println(project_name) project_base_info_exist := ProjectBaseInfo{} err := MysqlDB.Where(&ProjectBaseInfo{ProjectName: project_name}).First(&project_base_info_exist).Error if err != nil && err != gorm.ErrRecordNotFound { // 处理查询错误 log.Error("dealProjectBondRelation;Error checking for existing project", zap.Error(err)) } else { if project_base_info_exist.ID > 0 { //1.更新项目的创建时间 project_base_info_exist.CreateDate = util.ObjToString(jcxx["createTime"]) project_base_info_exist.UpdateDate = util.ObjToString(jcxx["updateTime"]) err = MysqlDB.Save(&project_base_info_exist).Error if err != nil { log.Info("dealProjectBondRelation Save (Update) ", zap.Error(err)) } //2.更新项目和债券的对应关系 if fxmx, ok := project["fxmx"].([]interface{}); ok { for _, v := range fxmx { if fx, ok := v.(map[string]interface{}); ok { bond_name := util.ObjToString(fx["bondName"]) bond_info_exist := BondInfo{} err := MysqlDB.Where(&BondInfo{BondName: bond_name}).First(&bond_info_exist).Error if err != nil && err != gorm.ErrRecordNotFound { // 处理查询错误 log.Error("dealProjectBondRelation;Error checking for existing project", zap.Error(err)) } else { // 整理对应关系 relation := ProjectBondRelation{ ProjectID: project_base_info_exist.ID, BondID: bond_info_exist.ID, } result := MysqlDB.FirstOrCreate(&relation, ProjectBondRelation{ ProjectID: relation.ProjectID, BondID: relation.BondID, }) if result.Error != nil { // 处理错误 log.Error("dealProjectBondRelation;Error checking for existing project", zap.Error(err), zap.String(project_name, bond_name)) } } } } } } } } } } log.Info("dealProjectBondRelation", zap.Any("数据处理完毕", count)) } }