package main import ( "data_tidb/config" "fmt" "github.com/shopspring/decimal" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "reflect" "regexp" "sort" "strconv" "strings" "sync" "time" ) var ( regLetter = regexp.MustCompile("[a-z]*") ) func taskB() { sess := MongoB.GetMgoConn() defer MongoB.DestoryMongoConn(sess) ch := make(chan bool, 10) wg := &sync.WaitGroup{} q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId("132d42d667a6b0a2861eef92")}} query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if util.IntAll(tmp["extracttype"]) != -1 { //taskBase(tmp) //基础标讯数据 //taskExpand(tmp) //扩展数据 //taskDetail(tmp) //正文信息 //taskAtts(tmp) //附件信息 //taskIntent(tmp) //采购意向 //taskPackage(tmp) //分包 } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("is over --- %d", count)) } // @Description 基本信息 func taskBase(tmp map[string]interface{}) { saveM := make(map[string]interface{}) var errf []string // 异常字段 for _, nf := range BaseField { f := nf[2:] if f == "info_id" { saveM[nf] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "area_code" { if tmp["area"] != nil { saveM[nf] = AreaCode[util.ObjToString(tmp["area"])] } } else if f == "city_code" { if tmp["area"] != nil && tmp["city"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) saveM[nf] = AreaCode[c] } } else if f == "district_code" { if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"]) saveM[nf] = AreaCode[c] } } else if f == "toptype_code" { if obj := util.ObjToString(tmp["toptype"]); obj != "" { saveM[nf] = TopTypeCode[obj] } } else if f == "subtype_code" { if obj := util.ObjToString(tmp["subtype"]); obj != "" { saveM[nf] = SubTypeCode[obj] } } else if f == "buyerclass_code" { if obj := util.ObjToString(tmp["buyerclass"]); obj != "" { saveM[nf] = BuyerCode[obj] } } else if f == "createtime" || f == "updatetime" { saveM[nf] = time.Now().Format(util.Date_Full_Layout) } else if f == "comeintime" || f == "publishtime" || f == "bidopentime" || f == "bidendtime" { if tmp[f] != nil && util.IntAll(tmp[f]) > 0 { t := util.Int64All(tmp[f]) saveM[nf] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "multipackage" || f == "isValidFile" { if tmp[f] == nil { saveM[nf] = 0 } else { saveM[nf] = tmp[f] } } else if f == "buyer_id" { if b := util.ObjToString(tmp["buyer"]); b != "" { if code := getNameId(b); code != "" { saveM[nf] = code } } } else if f == "agency_id" { if b := util.ObjToString(tmp["agency"]); b != "" { if code := getNameId(b); code != "" { saveM[nf] = code } } } else { if tmp[f] != nil { if BaseVMap[f] != nil { var b bool saveM[nf], b = verifyF(f, tmp[f], BaseVMap[f]) if b { // 保存异常字段数据 errf = append(errf, f) } } else { saveM[nf] = tmp[f] } } } } saveBasePool <- saveM if len(errf) > 0 { saveErrPool <- map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "f": strings.Join(errf, ",")} } } func getNameId(name string) string { info := MysqlTool.FindOne("dws_f_ent_baseinfo", map[string]interface{}{"name": name}, "name_id", "") if info != nil && (*info)["name_id"] != nil { return util.ObjToString((*info)["name_id"]) } else { return "" } } // @Description 扩展信息 func taskExpand(tmp map[string]interface{}) { saveM := make(map[string]interface{}) var errf []string // 异常字段 for _, nf := range ExpandField { f := nf[2:] if f == "info_id" { saveM[nf] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "project_startdate" || f == "project_completedate" || f == "signstarttime" || f == "bidendtime" || f == "bidstarttime" || f == "docstarttime" || f == "docendtime" || f == "signaturedate" || f == "signendtime" { if tmp[f] != nil && util.IntAll(tmp[f]) > 0 { t := util.Int64All(tmp[f]) saveM[nf] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "createtime" || f == "updatetime" { saveM[nf] = time.Now().Format(util.Date_Full_Layout) } else if f == "bidway" { if util.ObjToString(tmp[f]) == "电子投标" { saveM[nf] = 1 } else if util.ObjToString(tmp[f]) == "纸质投标" { saveM[nf] = 0 } } else if f == "review_experts" { //评标专家 if tmp[f] != nil { if reflect.TypeOf(tmp[f]).String() == "string" { saveM[nf] = tmp[f] } else if reflect.TypeOf(tmp[f]).String() == "[]interface {}" { if arr, ok := tmp[f].([]interface{}); ok { saveM[nf] = strings.Join(util.ObjArrToStringArr(arr), ",") } } } } else if f == "bid_guarantee" || f == "contract_guarantee" { if tmp[f] != nil { if tmp[f].(bool) { saveM[nf] = 1 } else { saveM[nf] = 0 } } } else if f == "agencyfee" { if tmp[f] != nil { if reflect.TypeOf(tmp[f]).String() == "string" { v2, err := strconv.ParseFloat(strings.ReplaceAll(util.ObjToString(tmp[f]), "%", ""), 64) if err != nil { v, _ := decimal.NewFromFloat(v2).Div(decimal.NewFromFloat(float64(100))).Float64() saveM[nf] = v } } else { saveM[nf], _ = util.FormatFloat(util.Float64All(tmp[f]), 4) } } } else if f == "project_duration" { if tmp[f] != nil { saveM[nf] = util.IntAll(tmp[f]) } } else { if tmp[f] != nil { if ExpandVMap[f] != nil { var b bool saveM[nf], b = verifyF(f, tmp[f], ExpandVMap[f]) if b { // 保存异常字段数据 errf = append(errf, f) } } else { saveM[nf] = tmp[f] } } } } saveExpandPool <- saveM if len(errf) > 0 { saveErrPool <- map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "f": strings.Join(errf, ",")} } } // @Description 正文信息 func taskDetail(tmp map[string]interface{}) { id := mongodb.BsonIdToSId(tmp["_id"]) s_detail := util.ObjToString(tmp["detail"]) s_contenthtml := util.ObjToString(tmp["contenthtml"]) saveDetailPool <- map[string]interface{}{ "s_info_id": id, "s_detail": s_detail, "s_contenthtml": s_contenthtml, "d_createtime": time.Now().Format(util.Date_Full_Layout), "d_updatetime": time.Now().Format(util.Date_Full_Layout), } } // @Description 附件 func taskAtts(tmp map[string]interface{}) { tmpid := mongodb.BsonIdToSId(tmp["_id"]) f_baseInfo := map[string]interface{}{} attach_text := util.ObjToMap(tmp["attach_text"]) if projectinfo := util.ObjToMap(tmp["projectinfo"]); projectinfo != nil { attachments := util.ObjToMap((*projectinfo)["attachments"]) for index, attr := range *attachments { if at, ok := attr.(map[string]interface{}); ok { if util.ObjToString(at["fid"]) != "" { //ftype := "" //for _, s := range FileTypeArr { // ft := strings.ToLower(util.ObjToString(tmp["ftype"])) // if strings.Contains(ft, s) { // ftype = s // break // } //} f_baseInfo["s_info_id"] = tmpid f_baseInfo["s_file_name"] = util.ObjToString(at["filename"]) f_baseInfo["s_file_url"] = util.ObjToString(at["org_url"]) f_baseInfo["s_file_size"] = util.ObjToString(at["size"]) f_baseInfo["s_file_suffix"] = util.ObjToString(at["ftype"]) f_baseInfo["s_file_oss_url"] = util.ObjToString(at["fid"]) f_baseInfo["d_createtime"] = time.Now().Format(util.Date_Full_Layout) f_id := InsertGlobalMysqlData("dwd_f_bid_file_baseinfo", f_baseInfo, tmpid) if f_id > 0 && util.IntAll(index) > 0 && attach_text != nil { att_key := fmt.Sprintf("%d", util.IntAll(index)-1) if att_info := util.ObjToMap((*attach_text)[att_key]); att_info != nil { taskAttsAttach(*att_info, tmpid, f_id) } } } } } } } func taskAttsAttach(att_info map[string]interface{}, tmpid string, f_id int64) { for _, v := range att_info { if att, b := v.(map[string]interface{}); b { info := map[string]interface{}{} info["s_info_id"] = tmpid info["s_file_id"] = f_id info["d_createtime"] = time.Now().Format(util.Date_Full_Layout) attach_url := util.ObjToString(att["attach_url"]) if attach_url != "" { //bs := OssGetObject(attach_url) //if utf8.RuneCountInString(bs) > 100000 { // bs = string(([]rune(bs))[:100000]) //} //info["s_file_text"] = bs } saveAttrPool <- info } } } // @Description 采购意向 func taskIntent(tmp map[string]interface{}) { procurementlist := IsMarkInterfaceMap(tmp["procurementlist"]) tmpid := mongodb.BsonIdToSId(tmp["_id"]) for _, p1 := range procurementlist { info := map[string]interface{}{} info["s_info_id"] = tmpid if p1["itemname"] != nil { info["s_intention_name"] = p1["itemname"] } if p1["projectscope"] != nil { info["s_intention_demand"] = p1["projectscope"] } if p1["item"] != nil { info["s_item"] = p1["item"] } if p1["totalprice"] != nil { info["f_totalprice"] = p1["totalprice"] } if p1["expurasingtime"] != nil { info["s_expurasingtime"] = p1["expurasingtime"] } if p1["reserved_amount"] != nil { info["s_reserved_amount"] = p1["reserved_amount"] } if b := util.ObjToString(tmp["buyer"]); b != "" { if code := getNameId(b); code != "" { info["s_buyer_id"] = code } } info["d_createtime"] = time.Now().Format(util.Date_Full_Layout) //InsertGlobalMysqlData("dwd_f_bid_intention_baseinfo", info, tmpid) saveIntentPool <- info } } // @Description 分包基本信息 func taskPackage(tmp map[string]interface{}) { tmpid := mongodb.BsonIdToSId(tmp["_id"]) //筛选分包 packages := filterPackageInfos(tmp) if len(packages) <= 1 { //单包···标讯本身 baseInfo := CPBaseInfoFromBidding(tmp, tmpid) pid := InsertGlobalMysqlData("dwd_f_bid_package_baseinfo", baseInfo, tmpid) if pid > 0 { //投标人信息 CPBidderBiddingBaseInfo(tmp, tmpid, pid) //标的物信息 new_purlist := CPBiddingPackageGoodsBaseInfo(tmp, tmpid, pid) for _, v := range new_purlist { saveGoodsPool <- v } } } else { //多包...具体源信息 for k, v := range packages { baseInfo := CPBaseInfoFromPackage(v, tmpid) pid := InsertGlobalMysqlData("dwd_f_bid_package_baseinfo", baseInfo, tmpid) if pid > 0 { //投标人信息 if k == 0 { //标的物信息 CPBidderPackageBaseInfo(v, tmp, tmpid, pid, true) new_purlist := CPBiddingPackageGoodsBaseInfo(tmp, tmpid, pid) for _, v1 := range new_purlist { saveGoodsPool <- v1 } } else { CPBidderPackageBaseInfo(v, tmp, tmpid, pid, false) } } } } } func BinarySearch(s []string, k string) int { sort.Strings(s) lo, hi := 0, len(s)-1 for lo <= hi { m := (lo + hi) >> 1 if s[m] < k { lo = m + 1 } else if s[m] > k { hi = m - 1 } else { return m } } return -1 }