increment.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package main
  2. import (
  3. "fmt"
  4. "go.uber.org/zap"
  5. "gorm.io/driver/clickhouse"
  6. "gorm.io/gorm"
  7. "gorm.io/gorm/logger"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "strings"
  11. "time"
  12. "unicode/utf8"
  13. //"log"
  14. "net/url"
  15. )
  16. // increInvest 增量投资关系 数据处理
  17. func increInvest() {
  18. //每周三执行,查询 11天之前,update_time 的数据
  19. now := time.Now()
  20. start := now.AddDate(0, 0, -11)
  21. where := map[string]interface{}{
  22. "update_time": map[string]interface{}{
  23. "$gt": start.Format("2006-01-02"),
  24. },
  25. }
  26. sess := MgoPA.GetMgoConn()
  27. defer MgoPA.DestoryMongoConn(sess)
  28. //正式环境
  29. username := GF.Clickhouse.Username
  30. password := GF.Clickhouse.Password
  31. host := GF.Clickhouse.Host
  32. encodedPassword := url.QueryEscape(password)
  33. dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
  34. db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
  35. Logger: logger.Default.LogMode(logger.Silent),
  36. })
  37. if err != nil {
  38. log.Fatal("链接数据库失败")
  39. } else {
  40. log.Info("increInvest", zap.String("clickhouse 打开成功", db.Name()))
  41. }
  42. query := sess.DB("mixdata").C("company_partner").Find(where).Select(nil).Sort("update_time").Iter()
  43. count := 0
  44. //datas := make([]EntMapCode, 0)
  45. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  46. if count%10000 == 0 {
  47. log.Info("increInvest", zap.Int("current", count), zap.Any(util.ObjToString(tmp["stock_name"]), tmp["update_time"]))
  48. }
  49. if util.ObjToString(tmp["stock_type"]) == "自然人股东" {
  50. continue
  51. }
  52. if util.IntAll(tmp["use_flag"]) > 5 {
  53. continue
  54. }
  55. //历史数据,作废
  56. if util.IntAll(tmp["is_history"]) != 0 {
  57. continue
  58. }
  59. //股东名称 长度小于5,认为不是企业
  60. if utf8.RuneCountInString(util.ObjToString(tmp["stock_name"])) < 5 {
  61. continue
  62. }
  63. // 公司名称为空
  64. if util.ObjToString(tmp["stock_name"]) == "" || util.ObjToString(tmp["company_name"]) == "" {
  65. continue
  66. }
  67. //投资关系-0201,交易关系-0301,
  68. //管辖关系-0101,直属关系-0102,组成关系-0103
  69. //判断是否已经存在;存在就更新,不存在就插入
  70. exist := EntMapCode{}
  71. db.Where(&EntMapCode{AName: util.ObjToString(tmp["stock_name"]), BName: util.ObjToString(tmp["company_name"])}).First(&exist)
  72. if exist.CreateTime > 0 {
  73. if exist.InvestRatio != util.ObjToString(tmp["stock_proportion"]) || exist.InvestPrice != util.ObjToString(tmp["stock_capital"]) {
  74. //存在数据,需要更新
  75. update := EntMapCode{
  76. InvestRatio: util.ObjToString(tmp["stock_proportion"]),
  77. InvestPrice: util.ObjToString(tmp["stock_capital"]),
  78. }
  79. db.Model(&exist).Where(&EntMapCode{AName: util.ObjToString(tmp["stock_name"]), BName: util.ObjToString(tmp["company_name"])}).Updates(update)
  80. }
  81. } else {
  82. AInfo := EntInfo{}
  83. BInfo := EntInfo{}
  84. db.Model(&EntInfo{}).Where("company_name = ? ", tmp["stock_name"]).Select("company_name", "id").First(&AInfo)
  85. db.Model(&EntInfo{}).Where("company_name = ? ", tmp["company_name"]).Select("company_name", "id").First(&BInfo)
  86. data := EntMapCode{
  87. AId: AInfo.ID,
  88. BId: BInfo.ID,
  89. AName: util.ObjToString(tmp["stock_name"]),
  90. BName: util.ObjToString(tmp["company_name"]),
  91. Code: "0201",
  92. InvestRatio: util.ObjToString(tmp["stock_proportion"]),
  93. InvestPrice: util.ObjToString(tmp["stock_capital"]),
  94. CreateTime: time.Now().Unix(),
  95. UpdateTime: time.Now().Unix(),
  96. }
  97. if err = db.Create(data).Error; err != nil {
  98. log.Info("increInvest", zap.String("create err", data.AName))
  99. }
  100. }
  101. }
  102. log.Info("increInvest", zap.String("处理结束", "over"))
  103. }
  104. // increBidding 处理标讯增量 交易关系数据
  105. func increBidding() {
  106. sess := MgoB.GetMgoConn()
  107. defer MgoB.DestoryMongoConn(sess)
  108. //正式环境
  109. username := GF.Clickhouse.Username
  110. password := GF.Clickhouse.Password
  111. host := GF.Clickhouse.Host
  112. encodedPassword := url.QueryEscape(password)
  113. dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
  114. db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
  115. Logger: logger.Default.LogMode(logger.Silent), //不打印日志
  116. })
  117. if err != nil {
  118. log.Fatal("increBidding 链接数据库失败")
  119. } else {
  120. log.Info("increBidding", zap.String("clickhouse 打开成功", db.Name()))
  121. }
  122. now := time.Now()
  123. starttTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
  124. endTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  125. where := map[string]interface{}{
  126. "comeintime": map[string]interface{}{
  127. "$gt": starttTime.Unix(),
  128. "$lte": endTime.Unix(),
  129. },
  130. }
  131. log.Info("increBidding", zap.Any("where", where))
  132. query := sess.DB(GF.MongoB.DB).C("bidding").Find(where).Select(nil).Iter()
  133. count := 0
  134. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  135. if count%10000 == 0 {
  136. log.Info("increBidding", zap.Any("current", count), zap.Any("_id", tmp["_id"]))
  137. }
  138. if util.IntAll(tmp["extracttype"]) == -1 {
  139. continue
  140. }
  141. rsa, rsb := false, false
  142. // 1.判断规则打物业标签
  143. if tag_topinformation, ok := tmp["tag_topinformation"]; !ok {
  144. rsa = false
  145. } else {
  146. if tags, ok := tag_topinformation.([]interface{}); ok {
  147. for _, tag := range tags {
  148. tg := util.ObjToString(tag)
  149. if tg == "情报_物业" {
  150. rsa = true
  151. break
  152. }
  153. }
  154. }
  155. }
  156. // 2.判断人工智能打物业标签
  157. if tag_topinformation_ai, ok := tmp["tag_topinformation_ai"]; !ok {
  158. rsb = false
  159. } else {
  160. if tags, ok := tag_topinformation_ai.([]interface{}); ok {
  161. for _, tag := range tags {
  162. tg := util.ObjToString(tag)
  163. if tg == "情报_物业" {
  164. rsb = true
  165. break
  166. }
  167. }
  168. }
  169. }
  170. // 规则和人工智能,都没打上物业标签
  171. if !rsb && !rsa {
  172. continue
  173. }
  174. if util.ObjToString(tmp["buyer"]) == "" {
  175. continue
  176. }
  177. // 没有中标单位,直接跳过
  178. if sWinner, ok := tmp["s_winner"]; !ok {
  179. continue
  180. } else {
  181. //投资关系-0201,交易关系-0301,
  182. //管辖关系-0101,直属关系-0102,组成关系-0103
  183. winners := util.ObjToString(sWinner)
  184. wins := strings.Split(winners, ",")
  185. for _, winer := range wins {
  186. exist := EntMapCode{}
  187. db.Where(&EntMapCode{AName: util.ObjToString(tmp["buyer"]), BName: winer}).First(&exist)
  188. if exist.CreateTime == 0 {
  189. data := EntMapCode{
  190. AName: util.ObjToString(tmp["buyer"]),
  191. BName: winer,
  192. Code: "0301",
  193. CreateTime: time.Now().Unix(),
  194. UpdateTime: time.Now().Unix(),
  195. }
  196. if err = db.Create(data).Error; err != nil {
  197. log.Info("create err", zap.String(data.AName, data.BName))
  198. }
  199. }
  200. }
  201. }
  202. }
  203. log.Info("increBidding", zap.Any("over, total", count))
  204. }