main.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. package main
  2. import (
  3. "github.com/robfig/cron/v3"
  4. "go.uber.org/zap"
  5. "gorm.io/gorm"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "strings"
  10. "time"
  11. )
  12. var (
  13. Mgo *mongodb.MongodbSim //87 环境,采集 存储的MongoDB
  14. MysqlDB *gorm.DB
  15. )
  16. func main() {
  17. Init()
  18. exportData()
  19. return
  20. local, _ := time.LoadLocation("Asia/Shanghai")
  21. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  22. eid, err := c.AddFunc(GF.Cron.Spec, importData) // 处理增量专项债
  23. if err != nil {
  24. log.Info("main", zap.Any("AddFunc err", err))
  25. }
  26. log.Info("main", zap.Any("eid", eid))
  27. c.Start()
  28. defer c.Stop()
  29. select {}
  30. //importData() //导入专项债数据
  31. //dealProject() // 处理项目、债券数据
  32. //dealProjectBondRelation() //更新项目和债券的关联关系
  33. //dealProjectListAll() //处理存量项目列表数据;补充列表数据使用
  34. /*导出数据*/
  35. //exportData()
  36. }
  37. // importData 导入数据
  38. func importData() {
  39. dealProject() // 处理项目、债券数据
  40. dealProjectBondRelation() //更新项目和债券的关联关系
  41. log.Info("importData:", zap.String("count", "数据处理完毕"))
  42. }
  43. // dealProject 处理项目相关数据
  44. func dealProject() {
  45. tables := strings.Split(GF.Mongob.List, ",")
  46. detailNames := strings.Split(GF.Mongob.Detail, ",")
  47. //table := GF.Mongob.List
  48. //detailName := GF.Mongob.Detail //专项债详细表
  49. sess := Mgo.GetMgoConn()
  50. defer Mgo.DestoryMongoConn(sess)
  51. for k, table := range tables {
  52. detailName := detailNames[k]
  53. /**
  54. 这里测试用,使用 “新建成都至达州至万州铁路(南充段)(万源市)” 这个项目测试,他有变更信息
  55. */
  56. //where1 := map[string]interface{}{
  57. // //"projectName": "新建成都至达州至万州铁路(南充段)(万源市)",
  58. // "projectName": "新建成都至达州至万州铁路(南充段)(万源市)",
  59. //}
  60. //query := sess.DB("py_theme").C(table).Find(where1).Select(nil).Iter()
  61. query := sess.DB("py_theme").C(table).Find(nil).Select(nil).Iter()
  62. count := 0
  63. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  64. if count%100 == 0 {
  65. log.Info("current:", zap.Int("count", count), zap.Any("projectName", tmp["projectName"]), zap.String(table, detailName))
  66. }
  67. projectName := util.ObjToString(tmp["projectName"])
  68. //0.项目列表数据
  69. dealProjectList(tmp)
  70. //
  71. where := map[string]interface{}{
  72. "project.jcxx.projectName": projectName,
  73. }
  74. detail, _ := Mgo.FindOne(detailName, where)
  75. if len(*detail) == 0 {
  76. continue
  77. }
  78. //1.项目数据
  79. if project, ok := (*detail)["project"].(map[string]interface{}); ok {
  80. //1.基础信息
  81. var projectId int
  82. if jcxx, ok := project["jcxx"].(map[string]interface{}); ok {
  83. projectId = dealProjectBaseInfo(jcxx, projectName)
  84. }
  85. //2.还本付息
  86. if hbfx, ok := project["hbfx"].(map[string]interface{}); ok {
  87. dealProjectRepayment(hbfx, projectName, projectId)
  88. }
  89. //3.变更
  90. if bg, ok := project["bg"].([]interface{}); ok {
  91. if len(bg) > 0 {
  92. dealProjectChange(bg, projectName, projectId)
  93. }
  94. }
  95. //4.发行明细
  96. if fxmx, ok := project["fxmx"].([]interface{}); ok {
  97. if len(fxmx) > 0 {
  98. dealProjectIssueDetails(fxmx, projectName, projectId)
  99. }
  100. }
  101. }
  102. //2.处理债券信息
  103. if bonds, ok := (*detail)["bond"].([]interface{}); ok {
  104. dealBondInfo(bonds, projectName)
  105. }
  106. }
  107. log.Info("dealProject over ", zap.Int("total", count))
  108. }
  109. }
  110. // dealProjectBaseInfo 处理项目基本信息
  111. func dealProjectBaseInfo(jcxx map[string]interface{}, projectName string) (projectID int) {
  112. project_base_info_exist := ProjectBaseInfo{}
  113. err := MysqlDB.Where(&ProjectBaseInfo{ProjectName: projectName}).First(&project_base_info_exist).Error
  114. if err != nil && err != gorm.ErrRecordNotFound {
  115. // 处理查询错误
  116. log.Error("dealProjectBaseInfo;Error checking for existing project", zap.Error(err))
  117. }
  118. //当前项目基本信息已经存在
  119. if project_base_info_exist.ID > 0 {
  120. // 直接修改已存在记录的字段
  121. project_base_info_exist.TotalInvestment = util.Float64All(jcxx["totalInvestment"])
  122. project_base_info_exist.Area = util.ObjToString(jcxx["regionName"])
  123. project_base_info_exist.City = strings.ReplaceAll(util.ObjToString(jcxx["cityName"]), "本级", "")
  124. project_base_info_exist.District = strings.ReplaceAll(util.ObjToString(jcxx["countyName"]), "本级", "")
  125. project_base_info_exist.Capital = util.Float64All(jcxx["capital"])
  126. project_base_info_exist.ApplyTotalBonds = util.Float64All(jcxx["applyDebt"])
  127. project_base_info_exist.OtherDebtFinancing = util.Float64All(jcxx["portfolioFinancing"])
  128. project_base_info_exist.SpecialDebtCapital = util.Float64All(jcxx["specialDebtAsCapital"])
  129. project_base_info_exist.ExpectedReturn = util.Float64All(jcxx["expectedReturn"])
  130. project_base_info_exist.ProjectCost = util.IntAll(jcxx["projectCost"])
  131. project_base_info_exist.ProjectDomain = util.ObjToString(jcxx["projectTypeName3"])
  132. project_base_info_exist.ProjectOwner = util.ObjToString(jcxx["projectSubject"])
  133. project_base_info_exist.StartDate = util.ObjToString(jcxx["startDate"])
  134. project_base_info_exist.EndDate = util.ObjToString(jcxx["endDate"])
  135. project_base_info_exist.OperationStartDate = util.ObjToString(jcxx["operationStartDate"])
  136. project_base_info_exist.OperationEndDate = util.ObjToString(jcxx["operationEndDate"])
  137. project_base_info_exist.SourceIncome = util.ObjToString(jcxx["sourceIncome"])
  138. project_base_info_exist.ConstructionContent = util.ObjToString(jcxx["constructionContent"])
  139. project_base_info_exist.Remarks = util.ObjToString(jcxx["remarks"])
  140. project_base_info_exist.OtherDebtFinancingSource = util.ObjToString(jcxx["portfolioFinancingSource"])
  141. project_base_info_exist.CostIncomePercent = util.ObjToString(jcxx["costIncomePercent"])
  142. project_base_info_exist.CoverageMultiple = util.Float64All(jcxx["coverageMultiple"])
  143. project_base_info_exist.CompetentDepartment = util.ObjToString(jcxx["implementingAgency"])
  144. project_base_info_exist.AccountingFirm = util.ObjToString(jcxx["accountingFirm"])
  145. project_base_info_exist.LawFirm = util.ObjToString(jcxx["lawFirm"])
  146. project_base_info_exist.UpdateDate = util.ObjToString(jcxx["updateTime"])
  147. project_base_info_exist.CreateDate = util.ObjToString(jcxx["createTime"])
  148. // 使用 Save 来更新
  149. err = MysqlDB.Save(&project_base_info_exist).Error
  150. if err != nil {
  151. log.Info("dealProject Save (Update) ", zap.Error(err))
  152. }
  153. return project_base_info_exist.ID
  154. } else {
  155. project_base_info := ProjectBaseInfo{
  156. ProjectName: projectName,
  157. TotalInvestment: util.Float64All(jcxx["totalInvestment"]),
  158. Area: util.ObjToString(jcxx["regionName"]),
  159. City: strings.ReplaceAll(util.ObjToString(jcxx["cityName"]), "本级", ""),
  160. District: strings.ReplaceAll(util.ObjToString(jcxx["countyName"]), "本级", ""),
  161. Capital: util.Float64All(jcxx["capital"]),
  162. ApplyTotalBonds: util.Float64All(jcxx["applyDebt"]),
  163. OtherDebtFinancing: util.Float64All(jcxx["portfolioFinancing"]),
  164. SpecialDebtCapital: util.Float64All(jcxx["specialDebtAsCapital"]),
  165. ExpectedReturn: util.Float64All(jcxx["expectedReturn"]),
  166. ProjectCost: util.IntAll(jcxx["projectCost"]),
  167. ProjectDomain: util.ObjToString(jcxx["projectTypeName3"]), // 项目领域
  168. ProjectOwner: util.ObjToString(jcxx["projectSubject"]), //项目业主
  169. StartDate: util.ObjToString(jcxx["startDate"]),
  170. EndDate: util.ObjToString(jcxx["endDate"]),
  171. OperationStartDate: util.ObjToString(jcxx["operationStartDate"]),
  172. OperationEndDate: util.ObjToString(jcxx["operationEndDate"]),
  173. SourceIncome: util.ObjToString(jcxx["sourceIncome"]),
  174. ConstructionContent: util.ObjToString(jcxx["constructionContent"]),
  175. Remarks: util.ObjToString(jcxx["remarks"]),
  176. OtherDebtFinancingSource: util.ObjToString(jcxx["portfolioFinancingSource"]),
  177. CostIncomePercent: util.ObjToString(jcxx["costIncomePercent"]),
  178. CoverageMultiple: util.Float64All(jcxx["coverageMultiple"]),
  179. CompetentDepartment: util.ObjToString(jcxx["implementingAgency"]),
  180. AccountingFirm: util.ObjToString(jcxx["accountingFirm"]),
  181. LawFirm: util.ObjToString(jcxx["lawFirm"]),
  182. CreateDate: util.ObjToString(jcxx["createTime"]),
  183. UpdateDate: util.ObjToString(jcxx["updateTime"]),
  184. }
  185. err = MysqlDB.Create(&project_base_info).Error
  186. if err != nil {
  187. log.Info("dealProjectBaseInfo;dealProject Create ", zap.Error(err), zap.String("project", projectName))
  188. }
  189. return project_base_info.ID
  190. }
  191. }
  192. // dealProjectRepayment 处理项目-还本付息
  193. func dealProjectRepayment(hbfx map[string]interface{}, projectName string, projectID int) {
  194. project_repayment_exist := ProjectRepayment{}
  195. err := MysqlDB.Where(&ProjectRepayment{ProjectName: projectName, BondName: util.ObjToString(hbfx["bondName"])}).First(&project_repayment_exist).Error
  196. if err != nil && err != gorm.ErrRecordNotFound {
  197. // 处理查询错误
  198. log.Error("dealProjectRepayment;Error checking for existing project", zap.Error(err))
  199. }
  200. if project_repayment_exist.ID > 0 {
  201. project_repayment_exist.IssueTerm = util.IntAll(hbfx["issueTerm"])
  202. project_repayment_exist.PayInterestMethodName = util.ObjToString(hbfx["payInterestMethodName"])
  203. project_repayment_exist.ValueDate = util.ObjToString(hbfx["valueDate"])
  204. project_repayment_exist.InterestDate = util.ObjToString(hbfx["payInterestDate"])
  205. project_repayment_exist.LastInterestDate = util.ObjToString(hbfx["latelyPayInterestDate"])
  206. project_repayment_exist.ReminderRepayDays = util.IntAll(hbfx["days"])
  207. project_repayment_exist.MaturityDate = util.ObjToString(hbfx["expiryDate"])
  208. project_repayment_exist.DebtService = util.Float64All(hbfx["repayCapitalWithInterest"])
  209. project_repayment_exist.RedemptionMethod = util.ObjToString(hbfx["redemptionMethod"])
  210. project_repayment_exist.CumulativePayInterest = util.IntAll(hbfx["cumulativePayInterest"])
  211. project_repayment_exist.IsEarlyRepayPrincipal = util.ObjToString(hbfx["isEarlyRepayPrincipal"])
  212. project_repayment_exist.Remarks = util.ObjToString(hbfx["remarks"])
  213. // 使用 Save 来更新
  214. err = MysqlDB.Save(&project_repayment_exist).Error
  215. if err != nil {
  216. log.Info("dealProjectRepayment Save (Update) ", zap.Error(err))
  217. }
  218. } else {
  219. project_payment := ProjectRepayment{
  220. ProjectName: projectName,
  221. ProjectID: projectID,
  222. BondName: util.ObjToString(hbfx["bondName"]),
  223. IssueTerm: util.IntAll(hbfx["issueTerm"]),
  224. PayInterestMethodName: util.ObjToString(hbfx["payInterestMethodName"]),
  225. ValueDate: util.ObjToString(hbfx["valueDate"]),
  226. InterestDate: util.ObjToString(hbfx["payInterestDate"]),
  227. LastInterestDate: util.ObjToString(hbfx["latelyPayInterestDate"]),
  228. ReminderRepayDays: util.IntAll(hbfx["days"]), //提醒还款天数
  229. MaturityDate: util.ObjToString(hbfx["expiryDate"]), //到期日
  230. DebtService: util.Float64All(hbfx["repayCapitalWithInterest"]), //还本付息(万元)
  231. RedemptionMethod: util.ObjToString(hbfx["redemptionMethod"]),
  232. CumulativePayInterest: util.IntAll(hbfx["cumulativePayInterest"]),
  233. IsEarlyRepayPrincipal: util.ObjToString(hbfx["isEarlyRepayPrincipal"]),
  234. Remarks: util.ObjToString(hbfx["remarks"]),
  235. }
  236. err = MysqlDB.Create(&project_payment).Error
  237. if err != nil {
  238. log.Info("dealProjectRepayment;dealProject Create ", zap.Error(err), zap.String("project", projectName))
  239. }
  240. }
  241. }
  242. // dealProjectChange 处理项目变更
  243. func dealProjectChange(bg []interface{}, projectName string, projectId int) {
  244. for _, v := range bg {
  245. if bgda, ok := v.(map[string]interface{}); ok {
  246. project_change := ProjectChange{
  247. ProjectName: projectName,
  248. ProjectID: projectId,
  249. ChangeContent: util.ObjToString(bgda["changeContent"]),
  250. UpdateReason: util.ObjToString(bgda["updateReason"]),
  251. SubmitTime: util.ObjToString(bgda["submitTime"]),
  252. }
  253. err := MysqlDB.Create(&project_change).Error
  254. if err != nil {
  255. log.Info("dealProjectChange; Create err", zap.Error(err))
  256. }
  257. }
  258. }
  259. }
  260. // dealProjectIssueDetails 处理项目发行明细
  261. func dealProjectIssueDetails(fxmx []interface{}, projectName string, projectId int) {
  262. for _, v := range fxmx {
  263. if fx, ok := v.(map[string]interface{}); ok {
  264. project_bach_name := util.ObjToString(fx["projectBatchName"])
  265. issue_detail_exist := ProjectIssueDetails{}
  266. err := MysqlDB.Where(&ProjectIssueDetails{ProjectName: projectName, ProjectBachName: project_bach_name, BondName: util.ObjToString(fx["bondName"])}).First(&issue_detail_exist).Error
  267. if err != nil && err != gorm.ErrRecordNotFound {
  268. // 处理查询错误
  269. log.Error("dealProjectIssueDetails;Error checking for existing project", zap.Error(err))
  270. }
  271. // 存在
  272. if issue_detail_exist.ID > 0 {
  273. issue_detail_exist.FirstPublishDate = util.ObjToString(fx["firstPublishDate"])
  274. issue_detail_exist.BatchNum = util.IntAll(fx["batchNum"])
  275. issue_detail_exist.PresentIssueAmount = util.Float64All(fx["presentIssueAmount"])
  276. issue_detail_exist.IssueInterestRate = util.Float64All(fx["issueInterestRate"])
  277. issue_detail_exist.PresentAsSpecialAmount = util.Float64All(fx["presentAsSpecialAmount"])
  278. issue_detail_exist.TotalIssueAmount = util.Float64All(fx["totalIssueAmount"])
  279. issue_detail_exist.ReviseLog = util.ObjToString(fx["revise_log"])
  280. err = MysqlDB.Save(&issue_detail_exist).Error
  281. if err != nil {
  282. log.Info("dealProjectIssueDetails Save (Update) ", zap.Error(err))
  283. }
  284. } else {
  285. issue_detail := ProjectIssueDetails{
  286. ProjectName: projectName,
  287. ProjectID: projectId,
  288. ProjectBachName: project_bach_name,
  289. BondName: util.ObjToString(fx["bondName"]),
  290. FirstPublishDate: util.ObjToString(fx["firstPublishDate"]),
  291. BatchNum: util.IntAll(fx["batchNum"]),
  292. PresentIssueAmount: util.Float64All(fx["presentIssueAmount"]),
  293. IssueInterestRate: util.Float64All(fx["issueInterestRate"]),
  294. PresentAsSpecialAmount: util.Float64All(fx["presentAsSpecialAmount"]),
  295. TotalIssueAmount: util.Float64All(fx["totalIssueAmount"]),
  296. ReviseLog: util.ObjToString(fx["revise_log"]),
  297. }
  298. err := MysqlDB.Create(&issue_detail).Error
  299. if err != nil {
  300. log.Info("dealProjectIssueDetails; Create err", zap.Error(err))
  301. }
  302. }
  303. }
  304. }
  305. }
  306. // dealBondInfo 处理债券信息
  307. func dealBondInfo(bonds []interface{}, projectName string) {
  308. for _, v := range bonds {
  309. if bond, ok := v.(map[string]interface{}); ok {
  310. //1.基本信息
  311. if jbxx, ok := bond["jbxx"].(map[string]interface{}); ok {
  312. dealBondBase(jbxx, projectName)
  313. }
  314. //2.债券-修改记录;xgjl
  315. if xgjl, ok := bond["xgjl"].([]interface{}); ok && len(xgjl) > 0 {
  316. dealBondChange(xgjl)
  317. }
  318. ////3.相关小项目
  319. //if xgxxx, ok := bond["xgxxx"].([]interface{}); ok && len(xgxxx) > 0 {
  320. // dealRelationProject(xgxxx)
  321. //}
  322. }
  323. }
  324. }
  325. // dealBondBase 处理债券基本信息
  326. func dealBondBase(jbxx map[string]interface{}, projectName string) {
  327. bond_info_exist := BondInfo{}
  328. err := MysqlDB.Where(&BondInfo{BondName: util.ObjToString(jbxx["bondName"]), BondNo: util.ObjToString(jbxx["bondNo"])}).First(&bond_info_exist).Error
  329. if err != nil && err != gorm.ErrRecordNotFound {
  330. // 处理查询错误
  331. log.Error("dealBondBase;Error checking for existing project", zap.Error(err))
  332. }
  333. if bond_info_exist.ID > 0 {
  334. bond_info_exist.BondShortName = util.ObjToString(jbxx["bondName"])
  335. bond_info_exist.Area = util.ObjToString(jbxx["regionName"])
  336. bond_info_exist.BondNature = util.ObjToString(jbxx["bondType1Name"])
  337. bond_info_exist.BondType = util.ObjToString(jbxx["bondType2Name"])
  338. bond_info_exist.OfficialProjectType = util.ObjToString(jbxx["projectType1Name"])
  339. bond_info_exist.TotalAmount = util.Float64All(jbxx["totalAmount"])
  340. bond_info_exist.IssueDate = util.ObjToString(jbxx["issueDate"])
  341. bond_info_exist.IssuePlace = util.ObjToString(jbxx["issuePlaceName"])
  342. bond_info_exist.IssueTerm = util.IntAll(jbxx["issueTerm"])
  343. bond_info_exist.IssueInterestRate = util.ObjToString(jbxx["issueInterestRate"])
  344. bond_info_exist.IssuePhase = util.ObjToString(jbxx["issuePhase"])
  345. bond_info_exist.WayOfPayInterest = util.ObjToString(jbxx["payInterestMethodName"])
  346. bond_info_exist.NewBondAmount = util.Float64All(jbxx["newBondAmount"])
  347. bond_info_exist.CounterBondAmount = util.Float64All(jbxx["counterBondAmount"])
  348. bond_info_exist.RefinancingBondAmount = util.Float64All(jbxx["refinancingBondAmount"])
  349. bond_info_exist.RedemptionMethod = util.ObjToString(jbxx["redemptionMethod"])
  350. bond_info_exist.ValueDate = util.ObjToString(jbxx["valueDate"])
  351. bond_info_exist.ExpiryDate = util.ObjToString(jbxx["expiryDate"])
  352. bond_info_exist.PayInterestDate = util.ObjToString(jbxx["payInterestDate"])
  353. bond_info_exist.LatePayInterestDate = util.ObjToString(jbxx["latelyPayInterestDate"])
  354. bond_info_exist.IsEarlyRepayPrincipal = util.ObjToString(jbxx["isEarlyRepayPrincipal"])
  355. bond_info_exist.CumulativePayInterest = util.Float64All(jbxx["cumulativePayInterest"])
  356. bond_info_exist.IsCounterBond = util.ObjToString(jbxx["isCounterBond"])
  357. err = MysqlDB.Save(&bond_info_exist).Error
  358. if err != nil {
  359. log.Info("dealBondBase Save (Update) ", zap.Error(err))
  360. }
  361. } else {
  362. bond_info := BondInfo{
  363. BondName: util.ObjToString(jbxx["bondName"]),
  364. BondShortName: util.ObjToString(jbxx["bondShortName"]),
  365. BondNo: util.ObjToString(jbxx["bondNo"]),
  366. Area: util.ObjToString(jbxx["regionName"]),
  367. BondNature: util.ObjToString(jbxx["bondType1Name"]),
  368. BondType: util.ObjToString(jbxx["bondType2Name"]),
  369. OfficialProjectType: util.ObjToString(jbxx["projectType1Name"]),
  370. TotalAmount: util.Float64All(jbxx["totalAmount"]),
  371. IssueDate: util.ObjToString(jbxx["issueDate"]),
  372. IssuePlace: util.ObjToString(jbxx["issuePlaceName"]),
  373. IssueTerm: util.IntAll(jbxx["issueTerm"]),
  374. IssueInterestRate: util.ObjToString(jbxx["issueInterestRate"]),
  375. IssuePhase: util.ObjToString(jbxx["issuePhase"]),
  376. WayOfPayInterest: util.ObjToString(jbxx["payInterestMethodName"]),
  377. NewBondAmount: util.Float64All(jbxx["newBondAmount"]),
  378. CounterBondAmount: util.Float64All(jbxx["counterBondAmount"]),
  379. RefinancingBondAmount: util.Float64All(jbxx["refinancingBondAmount"]),
  380. RedemptionMethod: util.ObjToString(jbxx["redemptionMethod"]),
  381. ValueDate: util.ObjToString(jbxx["valueDate"]),
  382. ExpiryDate: util.ObjToString(jbxx["expiryDate"]),
  383. PayInterestDate: util.ObjToString(jbxx["payInterestDate"]),
  384. LatePayInterestDate: util.ObjToString(jbxx["latelyPayInterestDate"]),
  385. RemindPayDays: util.IntAll(jbxx["days"]),
  386. LastPayInterest: util.Float64All(jbxx["lastPayInterest"]),
  387. IsEarlyRepayPrincipal: util.ObjToString(jbxx["isEarlyRepayPrincipal"]),
  388. CumulativePayInterest: util.Float64All(jbxx["cumulativePayInterest"]),
  389. IsCounterBond: util.ObjToString(jbxx["isCounterBond"]),
  390. }
  391. //
  392. err = MysqlDB.Create(&bond_info).Error
  393. if err != nil {
  394. log.Info("dealBondBase;dealProject Create ", zap.Error(err), zap.String("project", projectName))
  395. }
  396. }
  397. }
  398. // dealBondChange 修改债券修改信息
  399. func dealBondChange(xgjl []interface{}) {
  400. }
  401. // dealProjectBondRelation 处理项目债券关联关系
  402. func dealProjectBondRelation() {
  403. //detailName := GF.Mongob.Detail //专项债详细表
  404. detailNames := strings.Split(GF.Mongob.Detail, ",")
  405. sess := Mgo.GetMgoConn()
  406. defer Mgo.DestoryMongoConn(sess)
  407. for _, detailName := range detailNames {
  408. query := sess.DB("py_theme").C(detailName).Find(nil).Select(nil).Iter()
  409. count := 0
  410. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  411. if count%100 == 0 {
  412. log.Info("current:", zap.Int("count", count))
  413. }
  414. if project, ok := tmp["project"].(map[string]interface{}); ok {
  415. //1.基础信息
  416. if jcxx, ok := project["jcxx"].(map[string]interface{}); ok {
  417. project_name := util.ObjToString(jcxx["projectName"])
  418. //fmt.Println(project_name)
  419. project_base_info_exist := ProjectBaseInfo{}
  420. err := MysqlDB.Where(&ProjectBaseInfo{ProjectName: project_name}).First(&project_base_info_exist).Error
  421. if err != nil && err != gorm.ErrRecordNotFound {
  422. // 处理查询错误
  423. log.Error("dealProjectBondRelation;Error checking for existing project", zap.Error(err))
  424. } else {
  425. if project_base_info_exist.ID > 0 {
  426. //1.更新项目的创建时间
  427. project_base_info_exist.CreateDate = util.ObjToString(jcxx["createTime"])
  428. project_base_info_exist.UpdateDate = util.ObjToString(jcxx["updateTime"])
  429. err = MysqlDB.Save(&project_base_info_exist).Error
  430. if err != nil {
  431. log.Info("dealProjectBondRelation Save (Update) ", zap.Error(err))
  432. }
  433. //2.更新项目和债券的对应关系
  434. if fxmx, ok := project["fxmx"].([]interface{}); ok {
  435. for _, v := range fxmx {
  436. if fx, ok := v.(map[string]interface{}); ok {
  437. bond_name := util.ObjToString(fx["bondName"])
  438. bond_info_exist := BondInfo{}
  439. err := MysqlDB.Where(&BondInfo{BondName: bond_name}).First(&bond_info_exist).Error
  440. if err != nil && err != gorm.ErrRecordNotFound {
  441. // 处理查询错误
  442. log.Error("dealProjectBondRelation;Error checking for existing project", zap.Error(err))
  443. } else {
  444. // 整理对应关系
  445. relation := ProjectBondRelation{
  446. ProjectID: project_base_info_exist.ID,
  447. BondID: bond_info_exist.ID,
  448. }
  449. result := MysqlDB.FirstOrCreate(&relation, ProjectBondRelation{
  450. ProjectID: relation.ProjectID,
  451. BondID: relation.BondID,
  452. })
  453. if result.Error != nil {
  454. // 处理错误
  455. log.Error("dealProjectBondRelation;Error checking for existing project", zap.Error(err), zap.String(project_name, bond_name))
  456. }
  457. }
  458. }
  459. }
  460. }
  461. }
  462. }
  463. }
  464. }
  465. }
  466. log.Info("dealProjectBondRelation", zap.Any("数据处理完毕", count))
  467. }
  468. }
  469. // dealProjectList 处理项目列表数据
  470. func dealProjectList(tmp map[string]interface{}) {
  471. projectName := util.ObjToString(tmp["projectName"])
  472. project_list_exist := ProjectListInfo{}
  473. err := MysqlDB.Where(&ProjectListInfo{ProjectName: projectName}).First(&project_list_exist).Error
  474. if err != nil && err != gorm.ErrRecordNotFound {
  475. // 处理查询错误
  476. log.Error("dealProjectList;Error checking for existing project", zap.Error(err))
  477. }
  478. if project_list_exist.ID > 0 {
  479. project_list_exist.Area = util.ObjToString(tmp["regionName"])
  480. project_list_exist.City = util.ObjToString(tmp["cityName"])
  481. project_list_exist.District = util.ObjToString(tmp["countyName"])
  482. project_list_exist.IssueTerm = util.ObjToString(tmp["issueTerm"])
  483. project_list_exist.IssueDate = util.ObjToString(tmp["issueDate"])
  484. project_list_exist.IssueInterestRate = util.ObjToString(tmp["issueInterestRate"])
  485. project_list_exist.TotalInvestment = util.Float64All(tmp["totalInvestment"])
  486. project_list_exist.PresentIssueAmount = util.Float64All(tmp["presentIssueAmount"])
  487. err = MysqlDB.Save(&project_list_exist).Error
  488. if err != nil {
  489. log.Info("dealProjectList Save (Update) ", zap.Error(err))
  490. }
  491. } else {
  492. project_list := ProjectListInfo{
  493. ProjectName: projectName,
  494. Area: util.ObjToString(tmp["regionName"]),
  495. City: util.ObjToString(tmp["cityName"]),
  496. District: util.ObjToString(tmp["countyName"]),
  497. TotalInvestment: util.Float64All(tmp["totalInvestment"]),
  498. PresentIssueAmount: util.Float64All(tmp["presentIssueAmount"]),
  499. IssueTerm: util.ObjToString(tmp["issueTerm"]),
  500. IssueDate: util.ObjToString(tmp["issueDate"]),
  501. IssueInterestRate: util.ObjToString(tmp["issueInterestRate"]),
  502. }
  503. err = MysqlDB.Create(&project_list).Error
  504. if err != nil {
  505. log.Info("dealProjectList;dealProject Create ", zap.Error(err), zap.String("project", projectName))
  506. }
  507. }
  508. }
  509. // dealProjectList 处理项目列表存量数据
  510. func dealProjectListAll() {
  511. tables := strings.Split(GF.Mongob.List, ",")
  512. sess := Mgo.GetMgoConn()
  513. defer Mgo.DestoryMongoConn(sess)
  514. for _, table := range tables {
  515. query := sess.DB("py_theme").C(table).Find(nil).Select(nil).Iter()
  516. count := 0
  517. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  518. if count%100 == 0 {
  519. log.Info("current:", zap.Int("count", count), zap.Any("projectName", tmp["projectName"]), zap.String("当前数据表:", table))
  520. }
  521. projectName := util.ObjToString(tmp["projectName"])
  522. project_list_exist := ProjectListInfo{}
  523. err := MysqlDB.Where(&ProjectListInfo{ProjectName: projectName}).First(&project_list_exist).Error
  524. if err != nil && err != gorm.ErrRecordNotFound {
  525. // 处理查询错误
  526. log.Error("dealProjectList;Error checking for existing project", zap.Error(err))
  527. }
  528. if project_list_exist.ID > 0 {
  529. project_list_exist.Area = util.ObjToString(tmp["regionName"])
  530. project_list_exist.City = util.ObjToString(tmp["cityName"])
  531. project_list_exist.District = util.ObjToString(tmp["countyName"])
  532. project_list_exist.IssueTerm = util.ObjToString(tmp["issueTerm"])
  533. project_list_exist.IssueDate = util.ObjToString(tmp["issueDate"])
  534. project_list_exist.IssueInterestRate = util.ObjToString(tmp["issueInterestRate"])
  535. project_list_exist.TotalInvestment = util.Float64All(tmp["totalInvestment"])
  536. project_list_exist.PresentIssueAmount = util.Float64All(tmp["presentIssueAmount"])
  537. err = MysqlDB.Save(&project_list_exist).Error
  538. if err != nil {
  539. log.Info("dealProjectList Save (Update) ", zap.Error(err))
  540. }
  541. } else {
  542. project_list := ProjectListInfo{
  543. ProjectName: projectName,
  544. Area: util.ObjToString(tmp["regionName"]),
  545. City: util.ObjToString(tmp["cityName"]),
  546. District: util.ObjToString(tmp["countyName"]),
  547. TotalInvestment: util.Float64All(tmp["totalInvestment"]),
  548. PresentIssueAmount: util.Float64All(tmp["presentIssueAmount"]),
  549. IssueTerm: util.ObjToString(tmp["issueTerm"]),
  550. IssueDate: util.ObjToString(tmp["issueDate"]),
  551. IssueInterestRate: util.ObjToString(tmp["issueInterestRate"]),
  552. }
  553. err = MysqlDB.Create(&project_list).Error
  554. if err != nil {
  555. log.Info("dealProjectList;dealProject Create ", zap.Error(err), zap.String("project", projectName))
  556. }
  557. }
  558. }
  559. }
  560. log.Info("dealProjectList", zap.String("数据处理完毕", "!!!!!"))
  561. }