package main import ( "data_tidb/config" "fmt" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "reflect" "regexp" "sort" "strings" "sync" "time" ) var ( regLetter = regexp.MustCompile("[a-z]*") ) func TaskBidding() { sess := MongoB.GetMgoConn() defer MongoB.DestoryMongoConn(sess) ch := make(chan bool, 10) wg := &sync.WaitGroup{} q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId("6558df800000000000000000"), "$lte": mongodb.StringTOBsonId("655a31000000000000000000"), }, } query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if util.IntAll(tmp["extracttype"]) != -1 { taskBase(tmp) //基础标讯数据 taskExpand(tmp) //扩展数据 //taskDetail(tmp) //正文信息 //taskAtts(tmp) //附件信息 //taskIntent(tmp) //采购意向 //taskPackage(tmp) //分包 } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("is over --- %d", count)) } func taskBase(tmp map[string]interface{}) { tmpid := mongodb.BsonIdToSId(tmp["_id"]) info := map[string]interface{}{} info["s_info_id"] = tmpid area, city, district := util.ObjToString(tmp["area"]), util.ObjToString(tmp["city"]), util.ObjToString(tmp["district"]) if area != "" { info["area_code"] = AreaCode[area] if city != "" { info["city_code"] = AreaCode[area+","+city] if district != "" { info["district_code"] = AreaCode[area+","+city+","+district] } } } if toptype := util.ObjToString(tmp["toptype"]); toptype != "" { info["s_toptype_code"] = TopTypeCode["toptype"] } if subtype := util.ObjToString(tmp["subtype"]); subtype != "" { info["s_subtype_code"] = SubTypeCode[subtype] } if buyerclass := util.ObjToString(tmp["buyerclass"]); buyerclass != "" { info["s_buyerclass_code"] = BuyerCode[buyerclass] } //特别结构 info["i_isValidFile"] = util.IntAll(tmp["isValidFile"]) info["i_multipackage"] = util.IntAll(tmp["multipackage"]) //文本相关 transferTextInfo(tmp, &info, []string{"title", "projectname", "projectcode", "purchasing", "site", "href"}, []int{}) //金额相关 transferMoneyRateInfo(tmp, &info, []string{"budget", "bidamount", "biddiscount"}, []float64{1000000000.0, 1000000000.0, 100.0}) //时间相关 transferDateTimeInfo(tmp, &info, []string{"comeintime", "publishtime", "bidopentime", "bidendtime"}) //主体相关 if code := getNameId(util.ObjToString(tmp["buyer"])); code != "" { info["s_buyer_id"] = code } if code := getNameId(util.ObjToString(tmp["agency"])); code != "" { info["s_agency_id"] = code } info["d_updatetime"] = time.Now().Format(util.Date_Full_Layout) info["d_createtime"] = time.Now().Format(util.Date_Full_Layout) InsertGlobalMysqlData("dwd_f_bid_baseinfo", info, mongodb.BsonIdToSId(tmp["_id"])) } func getNameId(name string) string { if name == "" { return "" } info := MysqlTool.FindOne("dws_f_ent_baseinfo", map[string]interface{}{"name": name}, "name_id", "") if info != nil && (*info)["name_id"] != nil { return util.ObjToString((*info)["name_id"]) } else { return "" } } func taskExpand(tmp map[string]interface{}) { tmpid := mongodb.BsonIdToSId(tmp["_id"]) info := map[string]interface{}{} info["s_info_id"] = tmpid //文本相关 transferTextInfo(tmp, &info, []string{"projectperiod", "project_scale", "project_timeunit", "bidmethod", "getdocmethod", "currency", "funds", "payway", "bid_bond", "contract_bond", "contractcode", "buyerzipcode", "bidopenaddress", "buyeraddr", "agencyaddr", "projectaddr", "enterprise_qualification", "personnel_qualification", "performance_qualification", "enterprise_credit"}, []int{1000, 5000, 50, 255, 500, 20, 5000, 500, 1000, 500, 100, 100, 1000, 1000, 1000, 1000, 20000, 20000, 20000, 20000}) //时间相关 transferDateTimeInfo(tmp, &info, []string{"project_startdate", "project_completedate", "signstarttime", "bidendtime", "bidstarttime", "docstarttime", "docendtime", "signaturedate", "signendtime"}) //布尔相关 transferBoolInfo(tmp, &info, []string{"bid_guarantee", "contract_guarantee"}) //金额相关 transferMoneyRateInfo(tmp, &info, []string{"docamount", "agencyfee", "agencyrate"}, []float64{10000000.0, 10000000.0, 10000000.0}) //投标方式 bidway := util.ObjToString(tmp["bidway"]) if bidway == "电子投标" { info["i_bidway"] = 1 } else if bidway == "纸质投标" { info["i_bidway"] = 0 } else { } //评审专家 if tmp["review_experts"] != nil { if reflect.TypeOf(tmp["review_experts"]).String() == "string" { info["s_review_experts"] = tmp["review_experts"] } else if reflect.TypeOf(tmp["review_experts"]).String() == "[]interface {}" { if arr, ok := tmp["review_experts"].([]interface{}); ok { info["s_review_experts"] = strings.Join(util.ObjArrToStringArr(arr), ",") } } } //工期时长 if tmp["project_duration"] != nil { info["i_project_duration"] = util.IntAll(tmp["project_duration"]) } info["d_updatetime"] = time.Now().Format(util.Date_Full_Layout) info["d_createtime"] = time.Now().Format(util.Date_Full_Layout) InsertGlobalMysqlData("dwd_f_bid_expand_baseinfo", info, tmpid) } func taskDetail(tmp map[string]interface{}) { tmpid := mongodb.BsonIdToSId(tmp["_id"]) s_detail := util.ObjToString(tmp["detail"]) s_contenthtml := util.ObjToString(tmp["contenthtml"]) info := map[string]interface{}{ "s_info_id": tmpid, "s_detail": s_detail, "s_contenthtml": s_contenthtml, "d_createtime": time.Now().Format(util.Date_Full_Layout), "d_updatetime": time.Now().Format(util.Date_Full_Layout), } InsertGlobalMysqlData("dwd_f_bid_detail", info, tmpid) } func taskAtts(tmp map[string]interface{}) { tmpid := mongodb.BsonIdToSId(tmp["_id"]) f_baseInfo := map[string]interface{}{} attach_text := util.ObjToMap(tmp["attach_text"]) if projectinfo := util.ObjToMap(tmp["projectinfo"]); projectinfo != nil { attachments := util.ObjToMap((*projectinfo)["attachments"]) for index, attr := range *attachments { if at, ok := attr.(map[string]interface{}); ok { if util.ObjToString(at["fid"]) != "" { //ftype := "" //for _, s := range FileTypeArr { // ft := strings.ToLower(util.ObjToString(tmp["ftype"])) // if strings.Contains(ft, s) { // ftype = s // break // } //} f_baseInfo["s_info_id"] = tmpid f_baseInfo["s_file_name"] = util.ObjToString(at["filename"]) f_baseInfo["s_file_url"] = util.ObjToString(at["org_url"]) f_baseInfo["s_file_size"] = util.ObjToString(at["size"]) f_baseInfo["s_file_suffix"] = util.ObjToString(at["ftype"]) f_baseInfo["s_file_oss_url"] = util.ObjToString(at["fid"]) f_baseInfo["d_createtime"] = time.Now().Format(util.Date_Full_Layout) f_id := InsertGlobalMysqlData("dwd_f_bid_file_baseinfo", f_baseInfo, tmpid) if f_id > 0 && util.IntAll(index) > 0 && attach_text != nil { att_key := fmt.Sprintf("%d", util.IntAll(index)-1) if att_info := util.ObjToMap((*attach_text)[att_key]); att_info != nil { taskAttsAttach(*att_info, tmpid, f_id) } } } } } } } func taskAttsAttach(att_info map[string]interface{}, tmpid string, f_id int64) { for _, v := range att_info { if att, b := v.(map[string]interface{}); b { info := map[string]interface{}{} info["s_info_id"] = tmpid info["s_file_id"] = f_id info["d_createtime"] = time.Now().Format(util.Date_Full_Layout) attach_url := util.ObjToString(att["attach_url"]) if attach_url != "" { //bs := OssGetObject(attach_url) //if utf8.RuneCountInString(bs) > 100000 { // bs = string(([]rune(bs))[:100000]) //} //info["s_file_text"] = bs } InsertGlobalMysqlData("dwd_f_bid_file_text", info, tmpid) } } } func taskIntent(tmp map[string]interface{}) { procurementlist := IsMarkInterfaceMap(tmp["procurementlist"]) tmpid := mongodb.BsonIdToSId(tmp["_id"]) for _, p1 := range procurementlist { info := map[string]interface{}{} info["s_info_id"] = tmpid if p1["itemname"] != nil { info["s_intention_name"] = p1["itemname"] } if p1["projectscope"] != nil { info["s_intention_demand"] = p1["projectscope"] } if p1["item"] != nil { info["s_item"] = p1["item"] } if p1["totalprice"] != nil { info["f_totalprice"] = p1["totalprice"] } if p1["expurasingtime"] != nil { info["s_expurasingtime"] = p1["expurasingtime"] } if p1["reserved_amount"] != nil { info["s_reserved_amount"] = p1["reserved_amount"] } if b := util.ObjToString(tmp["buyer"]); b != "" { if code := getNameId(b); code != "" { info["s_buyer_id"] = code } } info["d_createtime"] = time.Now().Format(util.Date_Full_Layout) InsertGlobalMysqlData("dwd_f_bid_intention_baseinfo", info, tmpid) } } func taskPackage(tmp map[string]interface{}) { tmpid := mongodb.BsonIdToSId(tmp["_id"]) //筛选分包 packages := filterPackageInfos(tmp) if len(packages) <= 1 { //单包···标讯本身 baseInfo := CPBaseInfoFromBidding(tmp, tmpid) pid := InsertGlobalMysqlData("dwd_f_bid_package_baseinfo", baseInfo, tmpid) if pid > 0 { //投标人信息 CPBidderBiddingBaseInfo(tmp, tmpid, pid) //标的物信息 new_purlist := CPBiddingPackageGoodsBaseInfo(tmp, tmpid, pid) for _, v := range new_purlist { InsertGlobalMysqlData("dwd_f_bid_package_goods_baseinfo", v, tmpid) } } } else { //多包...具体源信息 for k, v := range packages { baseInfo := CPBaseInfoFromPackage(v, tmpid) pid := InsertGlobalMysqlData("dwd_f_bid_package_baseinfo", baseInfo, tmpid) if pid > 0 { //投标人信息 if k == 0 { //标的物信息 CPBidderPackageBaseInfo(v, tmp, tmpid, pid, true) new_purlist := CPBiddingPackageGoodsBaseInfo(tmp, tmpid, pid) for _, v1 := range new_purlist { InsertGlobalMysqlData("dwd_f_bid_package_goods_baseinfo", v1, tmpid) } } else { CPBidderPackageBaseInfo(v, tmp, tmpid, pid, false) } } } } } func BinarySearch(s []string, k string) int { sort.Strings(s) lo, hi := 0, len(s)-1 for lo <= hi { m := (lo + hi) >> 1 if s[m] < k { lo = m + 1 } else if s[m] > k { hi = m - 1 } else { return m } } return -1 }