|
@@ -1,12 +1,14 @@
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
+ "github.com/robfig/cron/v3"
|
|
|
"go.uber.org/zap"
|
|
|
"gorm.io/gorm"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
"strings"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
var (
|
|
@@ -16,10 +18,34 @@ var (
|
|
|
|
|
|
func main() {
|
|
|
Init()
|
|
|
+ local, _ := time.LoadLocation("Asia/Shanghai")
|
|
|
+ c := cron.New(cron.WithLocation(local), cron.WithSeconds())
|
|
|
+ eid, err := c.AddFunc(GF.Cron.Spec, importData) // 处理增量专项债
|
|
|
+ if err != nil {
|
|
|
+ log.Info("main", zap.Any("AddFunc err", err))
|
|
|
+ }
|
|
|
+ log.Info("main", zap.Any("eid", eid))
|
|
|
+
|
|
|
+ c.Start()
|
|
|
+ defer c.Stop()
|
|
|
+
|
|
|
+ select {}
|
|
|
+ //importData() //导入专项债数据
|
|
|
//dealProject() // 处理项目、债券数据
|
|
|
//dealProjectBondRelation() //更新项目和债券的关联关系
|
|
|
+
|
|
|
+ //dealProjectListAll() //处理存量项目列表数据;补充列表数据使用
|
|
|
/*导出数据*/
|
|
|
- exportData()
|
|
|
+ //exportData()
|
|
|
+}
|
|
|
+
|
|
|
+// importData 导入数据
|
|
|
+func importData() {
|
|
|
+ dealProject() // 处理项目、债券数据
|
|
|
+ dealProjectBondRelation() //更新项目和债券的关联关系
|
|
|
+
|
|
|
+ log.Info("importData:", zap.String("count", "数据处理完毕"))
|
|
|
+
|
|
|
}
|
|
|
|
|
|
// dealProject 处理项目相关数据
|
|
@@ -52,6 +78,8 @@ func dealProject() {
|
|
|
}
|
|
|
|
|
|
projectName := util.ObjToString(tmp["projectName"])
|
|
|
+ //0.项目列表数据
|
|
|
+ dealProjectList(tmp)
|
|
|
//
|
|
|
where := map[string]interface{}{
|
|
|
"project.jcxx.projectName": projectName,
|
|
@@ -60,6 +88,7 @@ func dealProject() {
|
|
|
if len(*detail) == 0 {
|
|
|
continue
|
|
|
}
|
|
|
+
|
|
|
//1.项目数据
|
|
|
if project, ok := (*detail)["project"].(map[string]interface{}); ok {
|
|
|
//1.基础信息
|
|
@@ -470,3 +499,105 @@ func dealProjectBondRelation() {
|
|
|
log.Info("dealProjectBondRelation", zap.Any("数据处理完毕", count))
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// dealProjectList 处理项目列表数据
|
|
|
+func dealProjectList(tmp map[string]interface{}) {
|
|
|
+ projectName := util.ObjToString(tmp["projectName"])
|
|
|
+ project_list_exist := ProjectListInfo{}
|
|
|
+ err := MysqlDB.Where(&ProjectListInfo{ProjectName: projectName}).First(&project_list_exist).Error
|
|
|
+ if err != nil && err != gorm.ErrRecordNotFound {
|
|
|
+ // 处理查询错误
|
|
|
+ log.Error("dealProjectList;Error checking for existing project", zap.Error(err))
|
|
|
+ }
|
|
|
+
|
|
|
+ if project_list_exist.ID > 0 {
|
|
|
+ project_list_exist.Area = util.ObjToString(tmp["regionName"])
|
|
|
+ project_list_exist.City = util.ObjToString(tmp["cityName"])
|
|
|
+ project_list_exist.District = util.ObjToString(tmp["countyName"])
|
|
|
+ project_list_exist.IssueTerm = util.ObjToString(tmp["issueTerm"])
|
|
|
+ project_list_exist.IssueDate = util.ObjToString(tmp["issueDate"])
|
|
|
+ project_list_exist.IssueInterestRate = util.ObjToString(tmp["issueInterestRate"])
|
|
|
+ project_list_exist.TotalInvestment = util.Float64All(tmp["totalInvestment"])
|
|
|
+ project_list_exist.PresentIssueAmount = util.Float64All(tmp["presentIssueAmount"])
|
|
|
+ err = MysqlDB.Save(&project_list_exist).Error
|
|
|
+ if err != nil {
|
|
|
+ log.Info("dealProjectList Save (Update) ", zap.Error(err))
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ project_list := ProjectListInfo{
|
|
|
+ ProjectName: projectName,
|
|
|
+ Area: util.ObjToString(tmp["regionName"]),
|
|
|
+ City: util.ObjToString(tmp["cityName"]),
|
|
|
+ District: util.ObjToString(tmp["countyName"]),
|
|
|
+ TotalInvestment: util.Float64All(tmp["totalInvestment"]),
|
|
|
+ PresentIssueAmount: util.Float64All(tmp["presentIssueAmount"]),
|
|
|
+ IssueTerm: util.ObjToString(tmp["issueTerm"]),
|
|
|
+ IssueDate: util.ObjToString(tmp["issueDate"]),
|
|
|
+ IssueInterestRate: util.ObjToString(tmp["issueInterestRate"]),
|
|
|
+ }
|
|
|
+
|
|
|
+ err = MysqlDB.Create(&project_list).Error
|
|
|
+ if err != nil {
|
|
|
+ log.Info("dealProjectList;dealProject Create ", zap.Error(err), zap.String("project", projectName))
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// dealProjectList 处理项目列表存量数据
|
|
|
+func dealProjectListAll() {
|
|
|
+ tables := strings.Split(GF.Mongob.List, ",")
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ for _, table := range tables {
|
|
|
+ query := sess.DB("py_theme").C(table).Find(nil).Select(nil).Iter()
|
|
|
+ count := 0
|
|
|
+ for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
|
|
|
+ if count%100 == 0 {
|
|
|
+ log.Info("current:", zap.Int("count", count), zap.Any("projectName", tmp["projectName"]), zap.String("当前数据表:", table))
|
|
|
+ }
|
|
|
+
|
|
|
+ projectName := util.ObjToString(tmp["projectName"])
|
|
|
+ project_list_exist := ProjectListInfo{}
|
|
|
+ err := MysqlDB.Where(&ProjectListInfo{ProjectName: projectName}).First(&project_list_exist).Error
|
|
|
+ if err != nil && err != gorm.ErrRecordNotFound {
|
|
|
+ // 处理查询错误
|
|
|
+ log.Error("dealProjectList;Error checking for existing project", zap.Error(err))
|
|
|
+ }
|
|
|
+
|
|
|
+ if project_list_exist.ID > 0 {
|
|
|
+ project_list_exist.Area = util.ObjToString(tmp["regionName"])
|
|
|
+ project_list_exist.City = util.ObjToString(tmp["cityName"])
|
|
|
+ project_list_exist.District = util.ObjToString(tmp["countyName"])
|
|
|
+ project_list_exist.IssueTerm = util.ObjToString(tmp["issueTerm"])
|
|
|
+ project_list_exist.IssueDate = util.ObjToString(tmp["issueDate"])
|
|
|
+ project_list_exist.IssueInterestRate = util.ObjToString(tmp["issueInterestRate"])
|
|
|
+ project_list_exist.TotalInvestment = util.Float64All(tmp["totalInvestment"])
|
|
|
+ project_list_exist.PresentIssueAmount = util.Float64All(tmp["presentIssueAmount"])
|
|
|
+ err = MysqlDB.Save(&project_list_exist).Error
|
|
|
+ if err != nil {
|
|
|
+ log.Info("dealProjectList Save (Update) ", zap.Error(err))
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ project_list := ProjectListInfo{
|
|
|
+ ProjectName: projectName,
|
|
|
+ Area: util.ObjToString(tmp["regionName"]),
|
|
|
+ City: util.ObjToString(tmp["cityName"]),
|
|
|
+ District: util.ObjToString(tmp["countyName"]),
|
|
|
+ TotalInvestment: util.Float64All(tmp["totalInvestment"]),
|
|
|
+ PresentIssueAmount: util.Float64All(tmp["presentIssueAmount"]),
|
|
|
+ IssueTerm: util.ObjToString(tmp["issueTerm"]),
|
|
|
+ IssueDate: util.ObjToString(tmp["issueDate"]),
|
|
|
+ IssueInterestRate: util.ObjToString(tmp["issueInterestRate"]),
|
|
|
+ }
|
|
|
+
|
|
|
+ err = MysqlDB.Create(&project_list).Error
|
|
|
+ if err != nil {
|
|
|
+ log.Info("dealProjectList;dealProject Create ", zap.Error(err), zap.String("project", projectName))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Info("dealProjectList", zap.String("数据处理完毕", "!!!!!"))
|
|
|
+}
|