package main import ( "fmt" "go.uber.org/zap" "gorm.io/driver/clickhouse" "gorm.io/gorm" "gorm.io/gorm/logger" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "strings" "time" "unicode/utf8" //"log" "net/url" ) // increInvest 增量投资关系 数据处理 func increInvest() { //每周三执行,查询 11天之前,update_time 的数据 now := time.Now() //start := now.AddDate(0, 0, -2) //start := time.Date(now.Year(), now.Month(), now.Day()-2, 0, 0, 0, 0, now.Location()) offset := int(time.Monday - now.Weekday()) if offset > 0 { offset = -6 } weekBeginDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).AddDate(0, 0, offset) //weekMonday := weekBeginDate.Format("2006-01-02") where := map[string]interface{}{ "jy_updatetime": map[string]interface{}{ "$gte": weekBeginDate.Unix(), }, } sess := MgoPA.GetMgoConn() defer MgoPA.DestoryMongoConn(sess) //正式环境 username := GF.Clickhouse.Username password := GF.Clickhouse.Password host := GF.Clickhouse.Host encodedPassword := url.QueryEscape(password) dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host) db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) if err != nil { log.Fatal("链接数据库失败") } else { log.Info("increInvest", zap.String("clickhouse 打开成功", db.Name())) } log.Info("increInvest", zap.Any("where", where)) query := sess.DB("mixdata").C("company_partner").Find(where).Select(nil).Iter() count := 0 //datas := make([]EntMapCode, 0) for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Info("increInvest", zap.Int("current", count), zap.Any(util.ObjToString(tmp["stock_name"]), tmp["update_time"])) } if util.ObjToString(tmp["stock_type"]) == "自然人股东" { continue } if util.IntAll(tmp["use_flag"]) > 5 { continue } //历史数据,作废 if util.IntAll(tmp["is_history"]) != 0 { continue } //股东名称 长度小于5,认为不是企业 if utf8.RuneCountInString(util.ObjToString(tmp["stock_name"])) < 5 { continue } // 公司名称为空 if util.ObjToString(tmp["stock_name"]) == "" || util.ObjToString(tmp["company_name"]) == "" { continue } //投资关系-0201,交易关系-0301, //管辖关系-0101,直属关系-0102,组成关系-0103 //判断是否已经存在;存在就更新,不存在就插入 exist := EntMapCode{} db.Where(&EntMapCode{AName: util.ObjToString(tmp["stock_name"]), BName: util.ObjToString(tmp["company_name"])}).First(&exist) if exist.CreateTime > 0 { if exist.InvestRatio != util.ObjToString(tmp["stock_proportion"]) || exist.InvestPrice != util.ObjToString(tmp["stock_capital"]) { //存在数据,需要更新 update := EntMapCode{ InvestRatio: util.ObjToString(tmp["stock_proportion"]), InvestPrice: util.ObjToString(tmp["stock_capital"]), } db.Model(&exist).Where(&EntMapCode{AName: util.ObjToString(tmp["stock_name"]), BName: util.ObjToString(tmp["company_name"])}).Updates(update) } } else { AInfo := EntInfo{} BInfo := EntInfo{} db.Model(&EntInfo{}).Where("company_name = ? ", tmp["stock_name"]).Select("company_name", "id").First(&AInfo) db.Model(&EntInfo{}).Where("company_name = ? ", tmp["company_name"]).Select("company_name", "id").First(&BInfo) data := EntMapCode{ AId: AInfo.ID, BId: BInfo.ID, AName: util.ObjToString(tmp["stock_name"]), BName: util.ObjToString(tmp["company_name"]), Code: "0201", InvestRatio: util.ObjToString(tmp["stock_proportion"]), InvestPrice: util.ObjToString(tmp["stock_capital"]), CreateTime: time.Now().Unix(), UpdateTime: time.Now().Unix(), } if err = db.Create(data).Error; err != nil { log.Info("increInvest", zap.String("create err", data.AName)) } } } log.Info("increInvest", zap.String("处理结束", "over")) } // increBidding 处理标讯增量 交易关系数据 func increBidding() { sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //正式环境 username := GF.Clickhouse.Username password := GF.Clickhouse.Password host := GF.Clickhouse.Host encodedPassword := url.QueryEscape(password) dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host) db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), //不打印日志 }) if err != nil { log.Fatal("increBidding 链接数据库失败") } else { log.Info("increBidding", zap.String("clickhouse 打开成功", db.Name())) } now := time.Now() starttTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()) endTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": starttTime.Unix(), "$lte": endTime.Unix(), }, } log.Info("increBidding", zap.Any("where", where)) query := sess.DB(GF.MongoB.DB).C("bidding").Find(where).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Info("increBidding", zap.Any("current", count), zap.Any("_id", tmp["_id"])) } if util.IntAll(tmp["extracttype"]) == -1 { continue } rsa, rsb := false, false // 1.判断规则打物业标签 if tag_topinformation, ok := tmp["tag_topinformation"]; !ok { rsa = false } else { if tags, ok := tag_topinformation.([]interface{}); ok { for _, tag := range tags { tg := util.ObjToString(tag) if tg == "情报_物业" { rsa = true break } } } } // 2.判断人工智能打物业标签 if tag_topinformation_ai, ok := tmp["tag_topinformation_ai"]; !ok { rsb = false } else { if tags, ok := tag_topinformation_ai.([]interface{}); ok { for _, tag := range tags { tg := util.ObjToString(tag) if tg == "情报_物业" { rsb = true break } } } } // 规则和人工智能,都没打上物业标签 if !rsb && !rsa { continue } if util.ObjToString(tmp["buyer"]) == "" { continue } // 没有中标单位,直接跳过 if sWinner, ok := tmp["s_winner"]; !ok { continue } else { //投资关系-0201,交易关系-0301, //管辖关系-0101,直属关系-0102,组成关系-0103 winners := util.ObjToString(sWinner) wins := strings.Split(winners, ",") for _, winer := range wins { exist := EntMapCode{} db.Where(&EntMapCode{AName: util.ObjToString(tmp["buyer"]), BName: winer}).First(&exist) if exist.CreateTime == 0 { AInfo := EntInfo{} BInfo := EntInfo{} db.Model(&EntInfo{}).Where("company_name = ? ", util.ObjToString(tmp["buyer"])).Select("company_name", "id").First(&AInfo) db.Model(&EntInfo{}).Where("company_name = ? ", winer).Select("company_name", "id").First(&BInfo) data := EntMapCode{ AId: AInfo.ID, BId: BInfo.ID, AName: util.ObjToString(tmp["buyer"]), BName: winer, Code: "0301", CreateTime: time.Now().Unix(), UpdateTime: time.Now().Unix(), } if err = db.Create(data).Error; err != nil { log.Info("create err", zap.String(data.AName, data.BName)) } } } } } log.Info("increBidding", zap.Any("over, total", count)) }