package main import ( "data_mgo_to_tidb_project/config" "fmt" "strings" "sync" "time" "go.uber.org/zap" "database/sql" "app.yhyue.com/moapp/jybase/date" "app.yhyue.com/moapp/jybase/mongodb" "github.com/gogf/gf/v2/util/gconv" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" ) //projectset同步 func Projectset(gteId, ltId string) { sess := MongoP.GetMgoConn() defer MongoP.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Info.Ch) wg := &sync.WaitGroup{} query := map[string]interface{}{} if gteId != "" && ltId != "" { query = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gteId), "$lt": mongodb.StringTOBsonId(ltId), }, } log.Info("info", zap.Any("ids_zengliang", map[string]interface{}{ "gteId": gteId, "lteId": ltId, })) } else if config.Conf.Info.ProjectsetIdGt != "" { query = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(config.Conf.Info.ProjectsetIdGt), "$lt": mongodb.StringTOBsonId(config.Conf.Info.ProjectsetIdLt), }, } log.Info("info", zap.Any("ids_cunliang", map[string]interface{}{ "gteId": config.Conf.Info.ProjectsetIdGt, "lteId": config.Conf.Info.ProjectsetIdLt, })) } log.Info("info", zap.Any("query", query)) it := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Info.TableName).Find(query).Sort("-_id").Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(tmp); count++ { if count%10000 == 0 { log.Info(fmt.Sprintf("current --- %d ,%s", count, tmp["_id"])) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() MysqlTool.ExecTx("项目同步", func(tx *sql.Tx) bool { ok1 := dwd_f_bid_project_baseinfo(tx, tmp) ok2 := dwd_f_bid_project_record(tx, tmp) ok3 := dwd_f_bid_project_final_package(tx, tmp) ok4 := dwd_f_bid_project_contacts(tx, tmp) ok5 := dwd_f_bid_bidunit_record(tx, tmp) if !(ok1 && ok2 && ok3 && ok4 && ok5) { log.Info(fmt.Sprintf("current --- %d ,%s", count, tmp["_id"])) } return ok1 && ok2 && ok3 && ok4 && ok5 }) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } //dwd_f_bid_project_baseinfo 项目基础数据存储 func dwd_f_bid_project_baseinfo(tx *sql.Tx, tmp map[string]interface{}) bool { saveMap := map[string]interface{}{} _id := mongodb.BsonIdToSId(tmp["_id"]) //项目id saveMap["s_projectid"] = _id //项目编号 saveMap["s_projectcode"] = gconv.String(tmp["projectcode"]) //项目名称 saveMap["s_projectname"] = gconv.String(tmp["projectname"]) //s_area_code area := gconv.String(tmp["area"]) saveMap["s_area_code"] = AreaCode[area] //s_city_code city := gconv.String(tmp["city"]) saveMap["s_city_code"] = AreaCode[fmt.Sprintf("%s,%s", area, city)] //s_district_code district := gconv.String(tmp["district"]) saveMap["s_district_code"] = AreaCode[fmt.Sprintf("%s,%s,%s", area, city, district)] //项目状态 saveMap["s_bidstatus"] = gconv.String(tmp["bidstatus"]) //采购单位 采购单位取的name_id saveMap["s_buyer_id"] = GetNameId(gconv.String(tmp["buyer"])) //代理机构 saveMap["s_agency_id"] = GetNameId(gconv.String(tmp["agency"])) //招标分类 saveMap["s_bidtype"] = gconv.String(tmp["bidtype"]) //时间相关 //开标时间 首条标讯公告时间 招标时间 结果时间 最后一条公告发布时间 TransferDateTimeInfo(tmp, &saveMap, []string{"bidopentime", "firsttime", "zbtime", "jgtime", "lasttime"}) //金额相关 预算 中标金额 TransferMoneyRateInfo(tmp, &saveMap, []string{"budget", "bidamount"}, []float64{1000000000.0, 1000000000.0}) //是否多包 saveMap["i_multipackage"] = gconv.Int(tmp["multipackage"]) // saveMap["d_updatetime"] = time.Now().Format(date.Date_Full_Layout) saveMap["d_createtime"] = time.Now().Format(date.Date_Full_Layout) //存储 return InsertGlobalMysqlData(tx, T_dwd_f_bid_project_baseinfo, saveMap, _id) > 0 } //dwd_f_bid_project_record 项目标讯关系表 func dwd_f_bid_project_record(tx *sql.Tx, tmp map[string]interface{}) bool { _id := mongodb.BsonIdToSId(tmp["_id"]) //遍历list 获取infoid list, ok := tmp["list"].([]interface{}) if !ok { return true } listMap := gconv.Maps(list) if len(listMap) > 0 { for _, v := range listMap { saveMap := map[string]interface{}{} // infoid := gconv.String(v["infoid"]) saveMap["s_infoid"] = infoid saveMap["s_projectid"] = _id saveMap["d_createtime"] = time.Now().Format(date.Date_Full_Layout) //存储 if id := InsertGlobalMysqlData(tx, T_dwd_f_bid_project_record, saveMap, _id); id <= 0 { return false } } } return true } //dwd_f_bid_project_final_package 项目子包信息(最终状态包信息) func dwd_f_bid_project_final_package(tx *sql.Tx, tmp map[string]interface{}) bool { //package _id := mongodb.BsonIdToSId(tmp["_id"]) //遍历list 获取infoid packageList := gconv.Map(tmp["package"]) if len(packageList) > 0 { for _, v := range packageList { for _, vv := range gconv.Maps(v) { saveMap := map[string]interface{}{} // saveMap["s_projectid"] = _id packagecode := gconv.String(vv["origin"]) if VarcharCheck(packagecode, 255) { continue } saveMap["s_packagecode"] = packagecode //标(包)段编号 saveMap["s_packagename"] = gconv.String(vv["name"]) //标段名称 saveMap["s_detail"] = gconv.String(vv["text"]) //标段内容 // TransferMoneyRateInfo(vv, &saveMap, []string{"budget", "bidamount"}, []float64{1000000000.0, 1000000000.0}) // saveMap["d_createtime"] = time.Now().Format(date.Date_Full_Layout) saveMap["d_updatetime"] = time.Now().Format(date.Date_Full_Layout) saveMap["s_winner"] = GetNameId(gconv.String(vv["winner"])) //存储 if id := InsertGlobalMysqlData(tx, T_dwd_f_bid_project_final_package, saveMap, _id); id <= 0 { return false } } } } return true } //dwd_f_bid_project_contacts 项目通讯录 func dwd_f_bid_project_contacts(tx *sql.Tx, tmp map[string]interface{}) bool { now := time.Now().Format(date.Date_Full_Layout) _id := mongodb.BsonIdToSId(tmp["_id"]) for _, v := range []int{1, 2, 3} { saveMap := map[string]interface{}{ "s_projectid": _id, "d_updatetime": now, "d_createtime": now, } if v == 1 { //采购单位 //联系电话 buyertel := gconv.String(tmp["buyertel"]) if buyertel == "" { continue } if VarcharCheck(buyertel, 50) { continue } //采购单位 buyer := gconv.String(tmp["buyer"]) saveMap["s_name_id"] = GetNameId(buyer) saveMap["s_contact_name"] = buyer saveMap["s_contact_tel"] = buyertel saveMap["identity_type"] = v if id := InsertGlobalMysqlData(tx, T_dwd_f_bid_project_contacts, saveMap, _id); id <= 0 { return false } } else if v == 2 { //中标单位 // 中标单位需要遍历list listMap := gconv.Maps(tmp["list"]) winnerMap := map[string]string{} for _, vv := range listMap { s_winner := gconv.String(vv["s_winner"]) winnertel := gconv.String(vv["winnertel"]) if winnertel == "" { continue } if VarcharCheck(winnertel, 50) { continue } for _, vvv := range strings.Split(s_winner, ",") { winnerMap[vvv] = winnertel } } //获取到所有的中标单位 for winner, winnertel := range winnerMap { saveMap["s_name_id"] = GetNameId(winner) saveMap["s_contact_name"] = winner saveMap["s_contact_tel"] = winnertel saveMap["identity_type"] = v if id := InsertGlobalMysqlData(tx, T_dwd_f_bid_project_contacts, saveMap, _id); id <= 0 { return false } } } else if v == 3 { //代理机构 //联系电话 agencytel := gconv.String(tmp["agencytel"]) if agencytel == "" { continue } if VarcharCheck(agencytel, 50) { continue } //代理机构 agency := gconv.String(tmp["agency"]) saveMap["s_name_id"] = GetNameId(agency) saveMap["s_contact_name"] = agency saveMap["s_contact_tel"] = agencytel saveMap["identity_type"] = v if id := InsertGlobalMysqlData(tx, T_dwd_f_bid_project_contacts, saveMap, _id); id <= 0 { return false } } } return true } //dwd_f_bid_bidunit_record 标讯采购人、投标人关系记录 func dwd_f_bid_bidunit_record(tx *sql.Tx, tmp map[string]interface{}) bool { //package _id := mongodb.BsonIdToSId(tmp["_id"]) //凭安库的id 需要查company_id for _, first_cooperation := range gconv.Strings(tmp["first_cooperation"]) { // s_winner_id := GetNameIdByCompanyId(first_cooperation) saveMap := map[string]interface{}{ "s_buyer_id": GetNameId(gconv.String(tmp["buyer"])), "s_winner_id": s_winner_id, "d_createtime": time.Now().Format(date.Date_Full_Layout), "s_projectid": _id, } if id := InsertGlobalMysqlData(tx, T_dwd_f_bid_bidunit_record, saveMap, _id); id <= 0 { return false } } return true } // 插入并打印信息 func InsertGlobalMysqlData(tx *sql.Tx, name string, data map[string]interface{}, mark string) int64 { id := MysqlTool.InsertByTx(tx, name, data) if id == -1 { log.Info("插入数据异常", zap.String(name, mark)) if config.Conf.Alarm.IsOpen { Alert(fmt.Sprintf("%s表 检测到异常数据同步,id:%s
", name, mark)) } } return id }