main.go 27 KB


  1. package main
  2. import (
  3. "strings"
  4. "time"
  5. "github.com/robfig/cron/v3"
  6. "go.uber.org/zap"
  7. "gorm.io/gorm"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. )
  12. var (
  13. Mgo *mongodb.MongodbSim //87 环境,采集 存储的MongoDB
  14. MysqlDB *gorm.DB
  15. )
  16. func main() {
  17. Init()
  18. //导出债券数据
  19. /**
  20. //exportData()
  21. //return
  22. */
  23. local, _ := time.LoadLocation("Asia/Shanghai")
  24. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  25. eid, err := c.AddFunc(GF.Cron.Spec, importData) // 处理增量专项债
  26. if err != nil {
  27. log.Info("main", zap.Any("AddFunc err", err))
  28. }
  29. log.Info("main", zap.Any("eid", eid))
  30. c.Start()
  31. defer c.Stop()
  32. select {}
  33. //importData() //导入专项债数据
  34. //dealProject() // 处理项目、债券数据
  35. //dealProjectBondRelation() //更新项目和债券的关联关系
  36. //dealProjectListAll() //处理存量项目列表数据;补充列表数据使用
  37. /*导出数据*/
  38. //exportData()
  39. }
  40. // importData 导入数据
  41. func importData() {
  42. dealProject() // 处理项目、债券数据
  43. dealProjectBondRelation() //更新项目和债券的关联关系
  44. log.Info("importData:", zap.String("count", "数据处理完毕"))
  45. }
  46. // dealProject 处理项目相关数据
  47. func dealProject() {
  48. tables := strings.Split(GF.Mongob.List, ",")
  49. detailNames := strings.Split(GF.Mongob.Detail, ",")
  50. //table := GF.Mongob.List
  51. //detailName := GF.Mongob.Detail //专项债详细表
  52. sess := Mgo.GetMgoConn()
  53. defer Mgo.DestoryMongoConn(sess)
  54. for k, table := range tables {
  55. detailName := detailNames[k]
  56. /**
  57. 这里测试用,使用 “新建成都至达州至万州铁路(南充段)(万源市)” 这个项目测试,他有变更信息
  58. */
  59. //where1 := map[string]interface{}{
  60. // //"projectName": "新建成都至达州至万州铁路(南充段)(万源市)",
  61. // "projectName": "新建成都至达州至万州铁路(南充段)(万源市)",
  62. //}
  63. //query := sess.DB("py_theme").C(table).Find(where1).Select(nil).Iter()
  64. query := sess.DB("py_theme").C(table).Find(nil).Select(nil).Iter()
  65. count := 0
  66. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  67. if count%100 == 0 {
  68. log.Info("current:", zap.Int("count", count), zap.Any("projectName", tmp["projectName"]), zap.String(table, detailName))
  69. }
  70. projectName := util.ObjToString(tmp["projectName"])
  71. //0.项目列表数据
  72. dealProjectList(tmp)
  73. //
  74. where := map[string]interface{}{
  75. "project.jcxx.projectName": projectName,
  76. }
  77. detail, _ := Mgo.FindOne(detailName, where)
  78. if len(*detail) == 0 {
  79. continue
  80. }
  81. //1.项目数据
  82. if project, ok := (*detail)["project"].(map[string]interface{}); ok {
  83. //1.基础信息
  84. var projectId int
  85. if jcxx, ok := project["jcxx"].(map[string]interface{}); ok {
  86. projectId = dealProjectBaseInfo(jcxx, projectName)
  87. }
  88. //2.还本付息
  89. if hbfx, ok := project["hbfx"].(map[string]interface{}); ok {
  90. dealProjectRepayment(hbfx, projectName, projectId)
  91. }
  92. //3.变更
  93. if bg, ok := project["bg"].([]interface{}); ok {
  94. if len(bg) > 0 {
  95. dealProjectChange(bg, projectName, projectId)
  96. }
  97. }
  98. //4.发行明细
  99. if fxmx, ok := project["fxmx"].([]interface{}); ok {
  100. if len(fxmx) > 0 {
  101. dealProjectIssueDetails(fxmx, projectName, projectId)
  102. }
  103. }
  104. }
  105. //2.处理债券信息
  106. if bonds, ok := (*detail)["bond"].([]interface{}); ok {
  107. dealBondInfo(bonds, projectName)
  108. }
  109. }
  110. log.Info("dealProject over ", zap.Int("total", count))
  111. }
  112. }
  113. // dealProjectBaseInfo 处理项目基本信息
  114. func dealProjectBaseInfo(jcxx map[string]interface{}, projectName string) (projectID int) {
  115. project_base_info_exist := ProjectBaseInfo{}
  116. err := MysqlDB.Where(&ProjectBaseInfo{ProjectName: projectName}).First(&project_base_info_exist).Error
  117. if err != nil && err != gorm.ErrRecordNotFound {
  118. // 处理查询错误
  119. log.Error("dealProjectBaseInfo;Error checking for existing project", zap.Error(err))
  120. }
  121. //当前项目基本信息已经存在
  122. if project_base_info_exist.ID > 0 {
  123. // 直接修改已存在记录的字段
  124. project_base_info_exist.TotalInvestment = util.Float64All(jcxx["totalInvestment"])
  125. project_base_info_exist.Area = util.ObjToString(jcxx["regionName"])
  126. project_base_info_exist.City = strings.ReplaceAll(util.ObjToString(jcxx["cityName"]), "本级", "")
  127. project_base_info_exist.District = strings.ReplaceAll(util.ObjToString(jcxx["countyName"]), "本级", "")
  128. project_base_info_exist.Capital = util.Float64All(jcxx["capital"])
  129. project_base_info_exist.ApplyTotalBonds = util.Float64All(jcxx["applyDebt"])
  130. project_base_info_exist.OtherDebtFinancing = util.Float64All(jcxx["portfolioFinancing"])
  131. project_base_info_exist.SpecialDebtCapital = util.Float64All(jcxx["specialDebtAsCapital"])
  132. project_base_info_exist.ExpectedReturn = util.Float64All(jcxx["expectedReturn"])
  133. project_base_info_exist.ProjectCost = util.IntAll(jcxx["projectCost"])
  134. project_base_info_exist.ProjectDomain = util.ObjToString(jcxx["projectTypeName3"])
  135. project_base_info_exist.ProjectOwner = util.ObjToString(jcxx["projectSubject"])
  136. project_base_info_exist.StartDate = util.ObjToString(jcxx["startDate"])
  137. project_base_info_exist.EndDate = util.ObjToString(jcxx["endDate"])
  138. project_base_info_exist.OperationStartDate = util.ObjToString(jcxx["operationStartDate"])
  139. project_base_info_exist.OperationEndDate = util.ObjToString(jcxx["operationEndDate"])
  140. project_base_info_exist.SourceIncome = util.ObjToString(jcxx["sourceIncome"])
  141. project_base_info_exist.ConstructionContent = util.ObjToString(jcxx["constructionContent"])
  142. project_base_info_exist.Remarks = util.ObjToString(jcxx["remarks"])
  143. project_base_info_exist.OtherDebtFinancingSource = util.ObjToString(jcxx["portfolioFinancingSource"])
  144. project_base_info_exist.CostIncomePercent = util.ObjToString(jcxx["costIncomePercent"])
  145. project_base_info_exist.CoverageMultiple = util.Float64All(jcxx["coverageMultiple"])
  146. project_base_info_exist.CompetentDepartment = util.ObjToString(jcxx["implementingAgency"])
  147. project_base_info_exist.AccountingFirm = util.ObjToString(jcxx["accountingFirm"])
  148. project_base_info_exist.LawFirm = util.ObjToString(jcxx["lawFirm"])
  149. project_base_info_exist.UpdateDate = util.ObjToString(jcxx["updateTime"])
  150. project_base_info_exist.CreateDate = util.ObjToString(jcxx["createTime"])
  151. // 使用 Save 来更新
  152. err = MysqlDB.Save(&project_base_info_exist).Error
  153. if err != nil {
  154. log.Info("dealProject Save (Update) ", zap.Error(err))
  155. }
  156. return project_base_info_exist.ID
  157. } else {
  158. project_base_info := ProjectBaseInfo{
  159. ProjectName: projectName,
  160. TotalInvestment: util.Float64All(jcxx["totalInvestment"]),
  161. Area: util.ObjToString(jcxx["regionName"]),
  162. City: strings.ReplaceAll(util.ObjToString(jcxx["cityName"]), "本级", ""),
  163. District: strings.ReplaceAll(util.ObjToString(jcxx["countyName"]), "本级", ""),
  164. Capital: util.Float64All(jcxx["capital"]),
  165. ApplyTotalBonds: util.Float64All(jcxx["applyDebt"]),
  166. OtherDebtFinancing: util.Float64All(jcxx["portfolioFinancing"]),
  167. SpecialDebtCapital: util.Float64All(jcxx["specialDebtAsCapital"]),
  168. ExpectedReturn: util.Float64All(jcxx["expectedReturn"]),
  169. ProjectCost: util.IntAll(jcxx["projectCost"]),
  170. ProjectDomain: util.ObjToString(jcxx["projectTypeName3"]), // 项目领域
  171. ProjectOwner: util.ObjToString(jcxx["projectSubject"]), //项目业主
  172. StartDate: util.ObjToString(jcxx["startDate"]),
  173. EndDate: util.ObjToString(jcxx["endDate"]),
  174. OperationStartDate: util.ObjToString(jcxx["operationStartDate"]),
  175. OperationEndDate: util.ObjToString(jcxx["operationEndDate"]),
  176. SourceIncome: util.ObjToString(jcxx["sourceIncome"]),
  177. ConstructionContent: util.ObjToString(jcxx["constructionContent"]),
  178. Remarks: util.ObjToString(jcxx["remarks"]),
  179. OtherDebtFinancingSource: util.ObjToString(jcxx["portfolioFinancingSource"]),
  180. CostIncomePercent: util.ObjToString(jcxx["costIncomePercent"]),
  181. CoverageMultiple: util.Float64All(jcxx["coverageMultiple"]),
  182. CompetentDepartment: util.ObjToString(jcxx["implementingAgency"]),
  183. AccountingFirm: util.ObjToString(jcxx["accountingFirm"]),
  184. LawFirm: util.ObjToString(jcxx["lawFirm"]),
  185. CreateDate: util.ObjToString(jcxx["createTime"]),
  186. UpdateDate: util.ObjToString(jcxx["updateTime"]),
  187. }
  188. err = MysqlDB.Create(&project_base_info).Error
  189. if err != nil {
  190. log.Info("dealProjectBaseInfo;dealProject Create ", zap.Error(err), zap.String("project", projectName))
  191. }
  192. return project_base_info.ID
  193. }
  194. }
  195. // dealProjectRepayment 处理项目-还本付息
  196. func dealProjectRepayment(hbfx map[string]interface{}, projectName string, projectID int) {
  197. project_repayment_exist := ProjectRepayment{}
  198. err := MysqlDB.Where(&ProjectRepayment{ProjectName: projectName, BondName: util.ObjToString(hbfx["bondName"])}).First(&project_repayment_exist).Error
  199. if err != nil && err != gorm.ErrRecordNotFound {
  200. // 处理查询错误
  201. log.Error("dealProjectRepayment;Error checking for existing project", zap.Error(err))
  202. }
  203. if project_repayment_exist.ID > 0 {
  204. project_repayment_exist.IssueTerm = util.IntAll(hbfx["issueTerm"])
  205. project_repayment_exist.PayInterestMethodName = util.ObjToString(hbfx["payInterestMethodName"])
  206. project_repayment_exist.ValueDate = util.ObjToString(hbfx["valueDate"])
  207. project_repayment_exist.InterestDate = util.ObjToString(hbfx["payInterestDate"])
  208. project_repayment_exist.LastInterestDate = util.ObjToString(hbfx["latelyPayInterestDate"])
  209. project_repayment_exist.ReminderRepayDays = util.IntAll(hbfx["days"])
  210. project_repayment_exist.MaturityDate = util.ObjToString(hbfx["expiryDate"])
  211. project_repayment_exist.DebtService = util.Float64All(hbfx["repayCapitalWithInterest"])
  212. project_repayment_exist.RedemptionMethod = util.ObjToString(hbfx["redemptionMethod"])
  213. project_repayment_exist.CumulativePayInterest = util.IntAll(hbfx["cumulativePayInterest"])
  214. project_repayment_exist.IsEarlyRepayPrincipal = util.ObjToString(hbfx["isEarlyRepayPrincipal"])
  215. project_repayment_exist.Remarks = util.ObjToString(hbfx["remarks"])
  216. // 使用 Save 来更新
  217. err = MysqlDB.Save(&project_repayment_exist).Error
  218. if err != nil {
  219. log.Info("dealProjectRepayment Save (Update) ", zap.Error(err))
  220. }
  221. } else {
  222. project_payment := ProjectRepayment{
  223. ProjectName: projectName,
  224. ProjectID: projectID,
  225. BondName: util.ObjToString(hbfx["bondName"]),
  226. IssueTerm: util.IntAll(hbfx["issueTerm"]),
  227. PayInterestMethodName: util.ObjToString(hbfx["payInterestMethodName"]),
  228. ValueDate: util.ObjToString(hbfx["valueDate"]),
  229. InterestDate: util.ObjToString(hbfx["payInterestDate"]),
  230. LastInterestDate: util.ObjToString(hbfx["latelyPayInterestDate"]),
  231. ReminderRepayDays: util.IntAll(hbfx["days"]), //提醒还款天数
  232. MaturityDate: util.ObjToString(hbfx["expiryDate"]), //到期日
  233. DebtService: util.Float64All(hbfx["repayCapitalWithInterest"]), //还本付息(万元)
  234. RedemptionMethod: util.ObjToString(hbfx["redemptionMethod"]),
  235. CumulativePayInterest: util.IntAll(hbfx["cumulativePayInterest"]),
  236. IsEarlyRepayPrincipal: util.ObjToString(hbfx["isEarlyRepayPrincipal"]),
  237. Remarks: util.ObjToString(hbfx["remarks"]),
  238. }
  239. err = MysqlDB.Create(&project_payment).Error
  240. if err != nil {
  241. log.Info("dealProjectRepayment;dealProject Create ", zap.Error(err), zap.String("project", projectName))
  242. }
  243. }
  244. }
  245. // dealProjectChange 处理项目变更
  246. func dealProjectChange(bg []interface{}, projectName string, projectId int) {
  247. for _, v := range bg {
  248. if bgda, ok := v.(map[string]interface{}); ok {
  249. project_change := ProjectChange{
  250. ProjectName: projectName,
  251. ProjectID: projectId,
  252. ChangeContent: util.ObjToString(bgda["changeContent"]),
  253. UpdateReason: util.ObjToString(bgda["updateReason"]),
  254. SubmitTime: util.ObjToString(bgda["submitTime"]),
  255. }
  256. err := MysqlDB.Create(&project_change).Error
  257. if err != nil {
  258. log.Info("dealProjectChange; Create err", zap.Error(err))
  259. }
  260. }
  261. }
  262. }
  263. // dealProjectIssueDetails 处理项目发行明细
  264. func dealProjectIssueDetails(fxmx []interface{}, projectName string, projectId int) {
  265. for _, v := range fxmx {
  266. if fx, ok := v.(map[string]interface{}); ok {
  267. project_bach_name := util.ObjToString(fx["projectBatchName"])
  268. issue_detail_exist := ProjectIssueDetails{}
  269. err := MysqlDB.Where(&ProjectIssueDetails{ProjectName: projectName, ProjectBachName: project_bach_name, BondName: util.ObjToString(fx["bondName"])}).First(&issue_detail_exist).Error
  270. if err != nil && err != gorm.ErrRecordNotFound {
  271. // 处理查询错误
  272. log.Error("dealProjectIssueDetails;Error checking for existing project", zap.Error(err))
  273. }
  274. // 存在
  275. if issue_detail_exist.ID > 0 {
  276. issue_detail_exist.FirstPublishDate = util.ObjToString(fx["firstPublishDate"])
  277. issue_detail_exist.BatchNum = util.IntAll(fx["batchNum"])
  278. issue_detail_exist.PresentIssueAmount = util.Float64All(fx["presentIssueAmount"])
  279. issue_detail_exist.IssueInterestRate = util.ObjToString(fx["issueInterestRate"])
  280. issue_detail_exist.PresentAsSpecialAmount = util.Float64All(fx["presentAsSpecialAmount"])
  281. issue_detail_exist.TotalIssueAmount = util.Float64All(fx["totalIssueAmount"])
  282. issue_detail_exist.ReviseLog = util.ObjToString(fx["revise_log"])
  283. err = MysqlDB.Save(&issue_detail_exist).Error
  284. if err != nil {
  285. log.Info("dealProjectIssueDetails Save (Update) ", zap.Error(err))
  286. }
  287. } else {
  288. issue_detail := ProjectIssueDetails{
  289. ProjectName: projectName,
  290. ProjectID: projectId,
  291. ProjectBachName: project_bach_name,
  292. BondName: util.ObjToString(fx["bondName"]),
  293. FirstPublishDate: util.ObjToString(fx["firstPublishDate"]),
  294. BatchNum: util.IntAll(fx["batchNum"]),
  295. PresentIssueAmount: util.Float64All(fx["presentIssueAmount"]),
  296. IssueInterestRate: util.ObjToString(fx["issueInterestRate"]),
  297. PresentAsSpecialAmount: util.Float64All(fx["presentAsSpecialAmount"]),
  298. TotalIssueAmount: util.Float64All(fx["totalIssueAmount"]),
  299. ReviseLog: util.ObjToString(fx["revise_log"]),
  300. }
  301. err := MysqlDB.Create(&issue_detail).Error
  302. if err != nil {
  303. log.Info("dealProjectIssueDetails; Create err", zap.Error(err))
  304. }
  305. }
  306. }
  307. }
  308. }
  309. // dealBondInfo 处理债券信息
  310. func dealBondInfo(bonds []interface{}, projectName string) {
  311. for _, v := range bonds {
  312. if bond, ok := v.(map[string]interface{}); ok {
  313. //1.基本信息
  314. if jbxx, ok := bond["jbxx"].(map[string]interface{}); ok {
  315. dealBondBase(jbxx, projectName)
  316. }
  317. //2.债券-修改记录;xgjl
  318. if xgjl, ok := bond["xgjl"].([]interface{}); ok && len(xgjl) > 0 {
  319. dealBondChange(xgjl)
  320. }
  321. ////3.相关小项目
  322. //if xgxxx, ok := bond["xgxxx"].([]interface{}); ok && len(xgxxx) > 0 {
  323. // dealRelationProject(xgxxx)
  324. //}
  325. }
  326. }
  327. }
  328. // dealBondBase 处理债券基本信息
  329. func dealBondBase(jbxx map[string]interface{}, projectName string) {
  330. bond_info_exist := BondInfo{}
  331. err := MysqlDB.Where(&BondInfo{BondName: util.ObjToString(jbxx["bondName"]), BondNo: util.ObjToString(jbxx["bondNo"])}).First(&bond_info_exist).Error
  332. if err != nil && err != gorm.ErrRecordNotFound {
  333. // 处理查询错误
  334. log.Error("dealBondBase;Error checking for existing project", zap.Error(err))
  335. }
  336. if bond_info_exist.ID > 0 {
  337. bond_info_exist.BondShortName = util.ObjToString(jbxx["bondName"])
  338. bond_info_exist.Area = util.ObjToString(jbxx["regionName"])
  339. bond_info_exist.BondNature = util.ObjToString(jbxx["bondType1Name"])
  340. bond_info_exist.BondType = util.ObjToString(jbxx["bondType2Name"])
  341. bond_info_exist.OfficialProjectType = util.ObjToString(jbxx["projectType1Name"])
  342. bond_info_exist.TotalAmount = util.Float64All(jbxx["totalAmount"])
  343. bond_info_exist.IssueDate = util.ObjToString(jbxx["issueDate"])
  344. bond_info_exist.IssuePlace = util.ObjToString(jbxx["issuePlaceName"])
  345. bond_info_exist.IssueTerm = util.IntAll(jbxx["issueTerm"])
  346. bond_info_exist.IssueInterestRate = util.ObjToString(jbxx["issueInterestRate"])
  347. bond_info_exist.IssuePhase = util.ObjToString(jbxx["issuePhase"])
  348. bond_info_exist.WayOfPayInterest = util.ObjToString(jbxx["payInterestMethodName"])
  349. bond_info_exist.NewBondAmount = util.Float64All(jbxx["newBondAmount"])
  350. bond_info_exist.CounterBondAmount = util.Float64All(jbxx["counterBondAmount"])
  351. bond_info_exist.RefinancingBondAmount = util.Float64All(jbxx["refinancingBondAmount"])
  352. bond_info_exist.RedemptionMethod = util.ObjToString(jbxx["redemptionMethod"])
  353. bond_info_exist.ValueDate = util.ObjToString(jbxx["valueDate"])
  354. bond_info_exist.ExpiryDate = util.ObjToString(jbxx["expiryDate"])
  355. bond_info_exist.PayInterestDate = util.ObjToString(jbxx["payInterestDate"])
  356. bond_info_exist.LatePayInterestDate = util.ObjToString(jbxx["latelyPayInterestDate"])
  357. bond_info_exist.IsEarlyRepayPrincipal = util.ObjToString(jbxx["isEarlyRepayPrincipal"])
  358. bond_info_exist.CumulativePayInterest = util.Float64All(jbxx["cumulativePayInterest"])
  359. bond_info_exist.IsCounterBond = util.ObjToString(jbxx["isCounterBond"])
  360. err = MysqlDB.Save(&bond_info_exist).Error
  361. if err != nil {
  362. log.Info("dealBondBase Save (Update) ", zap.Error(err))
  363. }
  364. } else {
  365. bond_info := BondInfo{
  366. BondName: util.ObjToString(jbxx["bondName"]),
  367. BondShortName: util.ObjToString(jbxx["bondShortName"]),
  368. BondNo: util.ObjToString(jbxx["bondNo"]),
  369. Area: util.ObjToString(jbxx["regionName"]),
  370. BondNature: util.ObjToString(jbxx["bondType1Name"]),
  371. BondType: util.ObjToString(jbxx["bondType2Name"]),
  372. OfficialProjectType: util.ObjToString(jbxx["projectType1Name"]),
  373. TotalAmount: util.Float64All(jbxx["totalAmount"]),
  374. IssueDate: util.ObjToString(jbxx["issueDate"]),
  375. IssuePlace: util.ObjToString(jbxx["issuePlaceName"]),
  376. IssueTerm: util.IntAll(jbxx["issueTerm"]),
  377. IssueInterestRate: util.ObjToString(jbxx["issueInterestRate"]),
  378. IssuePhase: util.ObjToString(jbxx["issuePhase"]),
  379. WayOfPayInterest: util.ObjToString(jbxx["payInterestMethodName"]),
  380. NewBondAmount: util.Float64All(jbxx["newBondAmount"]),
  381. CounterBondAmount: util.Float64All(jbxx["counterBondAmount"]),
  382. RefinancingBondAmount: util.Float64All(jbxx["refinancingBondAmount"]),
  383. RedemptionMethod: util.ObjToString(jbxx["redemptionMethod"]),
  384. ValueDate: util.ObjToString(jbxx["valueDate"]),
  385. ExpiryDate: util.ObjToString(jbxx["expiryDate"]),
  386. PayInterestDate: util.ObjToString(jbxx["payInterestDate"]),
  387. LatePayInterestDate: util.ObjToString(jbxx["latelyPayInterestDate"]),
  388. RemindPayDays: util.IntAll(jbxx["days"]),
  389. LastPayInterest: util.Float64All(jbxx["lastPayInterest"]),
  390. IsEarlyRepayPrincipal: util.ObjToString(jbxx["isEarlyRepayPrincipal"]),
  391. CumulativePayInterest: util.Float64All(jbxx["cumulativePayInterest"]),
  392. IsCounterBond: util.ObjToString(jbxx["isCounterBond"]),
  393. }
  394. //
  395. err = MysqlDB.Create(&bond_info).Error
  396. if err != nil {
  397. log.Info("dealBondBase;dealProject Create ", zap.Error(err), zap.String("project", projectName))
  398. }
  399. }
  400. }
  401. // dealBondChange 修改债券修改信息
  402. func dealBondChange(xgjl []interface{}) {
  403. }
  404. // dealProjectBondRelation 处理项目债券关联关系
  405. func dealProjectBondRelation() {
  406. //detailName := GF.Mongob.Detail //专项债详细表
  407. detailNames := strings.Split(GF.Mongob.Detail, ",")
  408. sess := Mgo.GetMgoConn()
  409. defer Mgo.DestoryMongoConn(sess)
  410. for _, detailName := range detailNames {
  411. query := sess.DB("py_theme").C(detailName).Find(nil).Select(nil).Iter()
  412. count := 0
  413. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  414. if count%100 == 0 {
  415. log.Info("current:", zap.Int("count", count))
  416. }
  417. if project, ok := tmp["project"].(map[string]interface{}); ok {
  418. //1.基础信息
  419. if jcxx, ok := project["jcxx"].(map[string]interface{}); ok {
  420. project_name := util.ObjToString(jcxx["projectName"])
  421. //fmt.Println(project_name)
  422. project_base_info_exist := ProjectBaseInfo{}
  423. err := MysqlDB.Where(&ProjectBaseInfo{ProjectName: project_name}).First(&project_base_info_exist).Error
  424. if err != nil && err != gorm.ErrRecordNotFound {
  425. // 处理查询错误
  426. log.Error("dealProjectBondRelation;Error checking for existing project", zap.Error(err))
  427. } else {
  428. if project_base_info_exist.ID > 0 {
  429. //1.更新项目的创建时间
  430. project_base_info_exist.CreateDate = util.ObjToString(jcxx["createTime"])
  431. project_base_info_exist.UpdateDate = util.ObjToString(jcxx["updateTime"])
  432. err = MysqlDB.Save(&project_base_info_exist).Error
  433. if err != nil {
  434. log.Info("dealProjectBondRelation Save (Update) ", zap.Error(err))
  435. }
  436. //2.更新项目和债券的对应关系
  437. if fxmx, ok := project["fxmx"].([]interface{}); ok {
  438. for _, v := range fxmx {
  439. if fx, ok := v.(map[string]interface{}); ok {
  440. bond_name := util.ObjToString(fx["bondName"])
  441. bond_info_exist := BondInfo{}
  442. err := MysqlDB.Where(&BondInfo{BondName: bond_name}).First(&bond_info_exist).Error
  443. if err != nil && err != gorm.ErrRecordNotFound {
  444. // 处理查询错误
  445. log.Error("dealProjectBondRelation;Error checking for existing project", zap.Error(err))
  446. } else {
  447. // 整理对应关系
  448. relation := ProjectBondRelation{
  449. ProjectID: project_base_info_exist.ID,
  450. BondID: bond_info_exist.ID,
  451. }
  452. result := MysqlDB.FirstOrCreate(&relation, ProjectBondRelation{
  453. ProjectID: relation.ProjectID,
  454. BondID: relation.BondID,
  455. })
  456. if result.Error != nil {
  457. // 处理错误
  458. log.Error("dealProjectBondRelation;Error checking for existing project", zap.Error(err), zap.String(project_name, bond_name))
  459. }
  460. }
  461. }
  462. }
  463. }
  464. }
  465. }
  466. }
  467. }
  468. }
  469. log.Info("dealProjectBondRelation", zap.Any("数据处理完毕", count))
  470. }
  471. }
  472. // dealProjectList 处理项目列表数据
  473. func dealProjectList(tmp map[string]interface{}) {
  474. projectName := util.ObjToString(tmp["projectName"])
  475. project_list_exist := ProjectListInfo{}
  476. err := MysqlDB.Where(&ProjectListInfo{ProjectName: projectName}).First(&project_list_exist).Error
  477. if err != nil && err != gorm.ErrRecordNotFound {
  478. // 处理查询错误
  479. log.Error("dealProjectList;Error checking for existing project", zap.Error(err))
  480. }
  481. if project_list_exist.ID > 0 {
  482. project_list_exist.Area = util.ObjToString(tmp["regionName"])
  483. project_list_exist.City = util.ObjToString(tmp["cityName"])
  484. project_list_exist.District = util.ObjToString(tmp["countyName"])
  485. project_list_exist.IssueTerm = util.ObjToString(tmp["issueTerm"])
  486. project_list_exist.IssueDate = util.ObjToString(tmp["issueDate"])
  487. project_list_exist.IssueInterestRate = util.ObjToString(tmp["issueInterestRate"])
  488. project_list_exist.TotalInvestment = util.Float64All(tmp["totalInvestment"])
  489. project_list_exist.PresentIssueAmount = util.Float64All(tmp["presentIssueAmount"])
  490. err = MysqlDB.Save(&project_list_exist).Error
  491. if err != nil {
  492. log.Info("dealProjectList Save (Update) ", zap.Error(err))
  493. }
  494. } else {
  495. project_list := ProjectListInfo{
  496. ProjectName: projectName,
  497. Area: util.ObjToString(tmp["regionName"]),
  498. City: util.ObjToString(tmp["cityName"]),
  499. District: util.ObjToString(tmp["countyName"]),
  500. TotalInvestment: util.Float64All(tmp["totalInvestment"]),
  501. PresentIssueAmount: util.Float64All(tmp["presentIssueAmount"]),
  502. IssueTerm: util.ObjToString(tmp["issueTerm"]),
  503. IssueDate: util.ObjToString(tmp["issueDate"]),
  504. IssueInterestRate: util.ObjToString(tmp["issueInterestRate"]),
  505. }
  506. err = MysqlDB.Create(&project_list).Error
  507. if err != nil {
  508. log.Info("dealProjectList;dealProject Create ", zap.Error(err), zap.String("project", projectName))
  509. }
  510. }
  511. }
  512. // dealProjectList 处理项目列表存量数据
  513. func dealProjectListAll() {
  514. tables := strings.Split(GF.Mongob.List, ",")
  515. sess := Mgo.GetMgoConn()
  516. defer Mgo.DestoryMongoConn(sess)
  517. for _, table := range tables {
  518. query := sess.DB("py_theme").C(table).Find(nil).Select(nil).Iter()
  519. count := 0
  520. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  521. if count%100 == 0 {
  522. log.Info("current:", zap.Int("count", count), zap.Any("projectName", tmp["projectName"]), zap.String("当前数据表:", table))
  523. }
  524. projectName := util.ObjToString(tmp["projectName"])
  525. project_list_exist := ProjectListInfo{}
  526. err := MysqlDB.Where(&ProjectListInfo{ProjectName: projectName}).First(&project_list_exist).Error
  527. if err != nil && err != gorm.ErrRecordNotFound {
  528. // 处理查询错误
  529. log.Error("dealProjectList;Error checking for existing project", zap.Error(err))
  530. }
  531. if project_list_exist.ID > 0 {
  532. project_list_exist.Area = util.ObjToString(tmp["regionName"])
  533. project_list_exist.City = util.ObjToString(tmp["cityName"])
  534. project_list_exist.District = util.ObjToString(tmp["countyName"])
  535. project_list_exist.IssueTerm = util.ObjToString(tmp["issueTerm"])
  536. project_list_exist.IssueDate = util.ObjToString(tmp["issueDate"])
  537. project_list_exist.IssueInterestRate = util.ObjToString(tmp["issueInterestRate"])
  538. project_list_exist.TotalInvestment = util.Float64All(tmp["totalInvestment"])
  539. project_list_exist.PresentIssueAmount = util.Float64All(tmp["presentIssueAmount"])
  540. err = MysqlDB.Save(&project_list_exist).Error
  541. if err != nil {
  542. log.Info("dealProjectList Save (Update) ", zap.Error(err))
  543. }
  544. } else {
  545. project_list := ProjectListInfo{
  546. ProjectName: projectName,
  547. Area: util.ObjToString(tmp["regionName"]),
  548. City: util.ObjToString(tmp["cityName"]),
  549. District: util.ObjToString(tmp["countyName"]),
  550. TotalInvestment: util.Float64All(tmp["totalInvestment"]),
  551. PresentIssueAmount: util.Float64All(tmp["presentIssueAmount"]),
  552. IssueTerm: util.ObjToString(tmp["issueTerm"]),
  553. IssueDate: util.ObjToString(tmp["issueDate"]),
  554. IssueInterestRate: util.ObjToString(tmp["issueInterestRate"]),
  555. }
  556. err = MysqlDB.Create(&project_list).Error
  557. if err != nil {
  558. log.Info("dealProjectList;dealProject Create ", zap.Error(err), zap.String("project", projectName))
  559. }
  560. }
  561. }
  562. }
  563. log.Info("dealProjectList", zap.String("数据处理完毕", "!!!!!"))
  564. }