package main import ( "data_tidb/config" "fmt" "github.com/shopspring/decimal" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "reflect" "regexp" "sort" "strconv" "strings" "sync" "time" ) var ( regLetter = regexp.MustCompile("[a-z]*") ) func doBiddingTask(gtid, lteid string, mapInfo map[string]interface{}) { sess := MongoB.GetMgoConn() defer MongoB.DestoryMongoConn(sess) ch := make(chan bool, 10) wg := &sync.WaitGroup{} stype := util.ObjToString(mapInfo["stype"]) q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}} query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if util.IntAll(tmp["dataprocess"]) != 8 { return } if stype == "bidding_history" && tmp["history_updatetime"] == nil { return } taskBase(tmp) taskTags(tmp) taskExpand(tmp) taskAtts(tmp) taskInfoformat(tmp) taskIntent(tmp) taskWinner(tmp) taskPackage(tmp) taskPur(tmp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func taskB() { sess := MongoB.GetMgoConn() defer MongoB.DestoryMongoConn(sess) ch := make(chan bool, 10) wg := &sync.WaitGroup{} //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("634eac71911e1eb345b2d861")} q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId("632d42d667a6b0a2861eef92")}} query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if util.IntAll(tmp["extracttype"]) != -1 { taskBase(tmp) taskTags(tmp) taskExpand(tmp) taskAtts(tmp) taskInfoformat(tmp) taskIntent(tmp) taskWinner(tmp) //taskPackage(tmp) taskPur(tmp) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } // @Description 基本信息 // @Author J 2022/9/22 11:12 func taskBase(tmp map[string]interface{}) { saveM := make(map[string]interface{}) var errf []string // 异常字段 for _, f := range BaseField { if f == "infoid" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "area_code" { if tmp["area"] != nil { saveM[f] = AreaCode[util.ObjToString(tmp["area"])] } } else if f == "city_code" { if tmp["area"] != nil && tmp["city"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) saveM[f] = AreaCode[c] } } else if f == "district_code" { if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"]) saveM[f] = AreaCode[c] } } else if f == "toptype_code" { if obj := util.ObjToString(tmp["toptype"]); obj != "" { saveM[f] = TopTypeCode[obj] } } else if f == "subtype_code" { if obj := util.ObjToString(tmp["subtype"]); obj != "" { saveM[f] = SubTypeCode[obj] } } else if f == "buyerclass_code" { if obj := util.ObjToString(tmp["buyerclass"]); obj != "" { saveM[f] = BuyerCode[obj] } } else if f == "createtime" || f == "updatetime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "comeintime" || f == "publishtime" || f == "bidopentime" { if tmp[f] != nil && util.IntAll(tmp[f]) > 0 { t := util.Int64All(tmp[f]) saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "multipackage" || f == "isValidFile" { if tmp[f] == nil { saveM[f] = 0 } else { saveM[f] = tmp[f] } } else if f == "buyer_id" { if b := util.ObjToString(tmp["buyer"]); b != "" { saveM["buyer"] = b //if code := redis.GetStr("qyxy_id", b); code != "" { if code := getNameId(b); code != "" { saveM[f] = code } } } else if f == "agency_id" { if b := util.ObjToString(tmp["agency"]); b != "" { saveM["agency"] = b //if code := redis.GetStr("qyxy_id", b); code != "" { if code := getNameId(b); code != "" { saveM[f] = code } } else { if tmp[f] != nil { saveM[f] = tmp[f] } } } else { if tmp[f] != nil { if BaseVMap[f] != nil { var b bool saveM[f], b = verifyF(f, tmp[f], BaseVMap[f]) // 保存异常字段数据 if b { errf = append(errf, f) } } else { saveM[f] = tmp[f] } } } } saveBasePool <- saveM if len(errf) > 0 { saveErrPool <- map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "f": strings.Join(errf, ",")} } } func getNameId(name string) string { info := MysqlTool.FindOne("dws_f_ent_baseinfo", map[string]interface{}{"name": name}, "name_id", "") if info != nil && (*info)["name_Id"] != nil { return util.ObjToString((*info)["name_Id"]) } else { return "" } } // @Description 扩展信息 // @Author J 2022/9/22 11:13 func taskExpand(tmp map[string]interface{}) { saveM := make(map[string]interface{}) var errf []string // 异常字段 for _, f := range ExpandField { if f == "infoid" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "project_startdate" || f == "project_completedate" || f == "signstarttime" || f == "bidendtime" || f == "bidstarttime" || f == "docstarttime" || f == "docendtime" || f == "signaturedate" || f == "signendtime" { if tmp[f] != nil && util.IntAll(tmp[f]) > 0 { t := util.Int64All(tmp[f]) saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "createtime" || f == "updatetime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "bidway" { if util.ObjToString(tmp[f]) == "电子投标" { saveM[f] = 1 } else if util.ObjToString(tmp[f]) == "纸质投标" { saveM[f] = 0 } } else if f == "review_experts" { if tmp[f] != nil { if reflect.TypeOf(tmp[f]).String() == "string" { saveM[f] = tmp[f] } else if reflect.TypeOf(tmp[f]).String() == "[]interface {}" { if arr, ok := tmp[f].([]interface{}); ok { saveM[f] = strings.Join(util.ObjArrToStringArr(arr), ",") } } } } else if f == "bid_guarantee" || f == "contract_guarantee" { if tmp[f] != nil { if tmp[f].(bool) { saveM[f] = 1 } else { saveM[f] = 0 } } } else if f == "supervisorrate" || f == "agencyfee" { if tmp[f] != nil { if reflect.TypeOf(tmp[f]).String() == "string" { v2, err := strconv.ParseFloat(strings.ReplaceAll(util.ObjToString(tmp[f]), "%", ""), 64) if err != nil { v, _ := decimal.NewFromFloat(v2).Div(decimal.NewFromFloat(float64(100))).Float64() saveM[f] = v } } else { saveM[f], _ = util.FormatFloat(util.Float64All(tmp[f]), 4) } } } else if f == "project_duration" { if tmp[f] != nil { tmp[f] = util.IntAll(tmp[f]) } } else { if tmp[f] != nil { if ExpandVMap[f] != nil { var b bool saveM[f], b = verifyF(f, tmp[f], ExpandVMap[f]) // 保存异常字段数据 if b { errf = append(errf, f) } } else { saveM[f] = tmp[f] } } } } saveExpandPool <- saveM if len(errf) > 0 { saveErrPool <- map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "f": strings.Join(errf, ",")} } } // @Description 标签记录 // @Author J 2022/9/22 11:13 func taskTags(tmp map[string]interface{}) { id := mongodb.BsonIdToSId(tmp["_id"]) if topArr, ok := tmp["topscopeclass"].([]interface{}); ok { for _, i2 := range topArr { tclass := regLetter.ReplaceAllString(util.ObjToString(i2), "") // 去除字母 code := TopScopeCode[tclass] saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)} //MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}) } } if subArr, ok := tmp["subscopeclass"].([]interface{}); ok { for _, i2 := range subArr { sc := strings.Split(util.ObjToString(i2), "_") if len(sc) > 1 { code := SubScopeCode[sc[1]] saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)} } //MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}) } } if tArr, ok := tmp["certificate_class"].([]interface{}); ok { for _, i2 := range tArr { if util.ObjToString(i2) == "ISO" { saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "01", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)} //MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "01", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}) } else if util.ObjToString(i2) == "AAA" { saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "02", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)} //MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "02", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}) } else if util.ObjToString(i2) == "ISO,AAA" { saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "03", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)} //MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "03", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}) } } } } // @Description 附件 // @Author J 2022/9/22 11:13 func taskAtts(tmp map[string]interface{}) { id := mongodb.BsonIdToSId(tmp["_id"]) if tmp["projectinfo"] != nil { if pinfo, o := tmp["projectinfo"].(map[string]interface{}); o { if attsMap, ok := pinfo["attachments"].(map[string]interface{}); ok { for _, attr := range attsMap { if at, ok := attr.(map[string]interface{}); ok { if util.ObjToString(at["fid"]) != "" { ftype := "" for _, s := range FileTypeArr { ft := strings.ToLower(util.ObjToString(tmp["ftype"])) if strings.Contains(ft, s) { ftype = s break } } saveAttrPool <- map[string]interface{}{"infoid": id, "org_url": at["org_url"], "size": at["size"], "fid": at["fid"], "filename": at["filename"], "ftype": ftype, "file_type": 0, "createtime": time.Now().Format(util.Date_Full_Layout)} } } } } } } if attachTxt, o := tmp["attach_text"].(map[string]interface{}); o { if len(attachTxt) > 0 { for _, at := range attachTxt { at1 := at.(map[string]interface{}) if len(at1) > 0 { for k, v := range at1 { if reflect.TypeOf(v).String() == "string" { if util.ObjToString(at1["attach_url"]) != "" { saveAttrPool <- map[string]interface{}{"infoid": id, "fid": at1["attach_url"], "filename": at1["file_name"], "file_type": 1, "createtime": time.Now().Format(util.Date_Full_Layout)} } break } else { if at2, ok := at1[k].(map[string]interface{}); ok { if util.ObjToString(at2["attach_url"]) != "" { saveAttrPool <- map[string]interface{}{"infoid": id, "fid": at2["attach_url"], "filename": at2["file_name"], "file_type": 1, "createtime": time.Now().Format(util.Date_Full_Layout)} } } } } } } } } } // @Description 拟建 // @Author J 2022/9/22 15:56 func taskInfoformat(tmp map[string]interface{}) { if util.IntAll(tmp["infoformat"]) != 2 && tmp["projectinfo"] != nil { return } if info, ok := tmp["projectinfo"].(map[string]interface{}); ok { delete(info, "attachments") if len(info) > 0 { saveM := make(map[string]interface{}) for _, f := range IfmField { if f == "infoid" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "createtime" || f == "updatetime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "approvetime" { if info[f] != nil && util.IntAll(tmp[f]) > 0 { saveM[f] = info[f] } } else { if info[f] != nil { saveM[f] = info[f] } } } saveIfmPool <- saveM } } } // @Description 采购意向 // @Author J 2022/9/22 16:27 func taskIntent(tmp map[string]interface{}) { if arr, ok := tmp["procurementlist"].([]interface{}); ok { for _, p := range arr { p1 := p.(map[string]interface{}) saveM := make(map[string]interface{}) for _, f := range IntentField { if f == "infoid" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "createtime" || f == "updatetime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "buyer_id" { if b := util.ObjToString(tmp["buyer"]); b != "" { //if code := redis.GetStr("qyxy_id", b); code != "" { // saveM[f] = code //} if code := getNameId(b); code != "" { saveM[f] = code } } } else { if p1[f] != nil { saveM[f] = p1[f] } } } saveIntentPool <- saveM } } } // @Description 中标单位 // @Author J 2022/9/27 10:58 func taskWinner(tmp map[string]interface{}) { if wod, ok := tmp["winnerorder"].([]interface{}); ok { for _, w := range wod { w1 := w.(map[string]interface{}) if w1["sort"] != nil { saveM := make(map[string]interface{}) for _, f := range WinnerField { if f == "infoid" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "createtime" || f == "updatetime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "winnersort" { saveM[f] = util.IntAll(w1["sort"]) } else if f == "winner_id" { if b := util.ObjToString(w1["entname"]); b != "" { saveM["winner"] = b //if code := redis.GetStr("qyxy_id", b); code != "" { // saveM[f] = code //} if code := getNameId(b); code != "" { saveM[f] = code } } } else if f == "package_id" { } } saveWinnerPool <- saveM } } } warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",") if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 { warr = append(warr, util.ObjToString(tmp["winner"])) } for _, s := range warr { saveM := make(map[string]interface{}) for _, f := range WinnerField { if f == "infoid" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "createtime" || f == "updatetime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "winnersort" { saveM[f] = 0 } else if f == "winner_id" { if s != "" { saveM["winner"] = s //if code := redis.GetStr("qyxy_id", s); code != "" { // saveM[f] = code //} if code := getNameId(s); code != "" { saveM[f] = code } } } } saveWinnerPool <- saveM } } func BinarySearch(s []string, k string) int { sort.Strings(s) lo, hi := 0, len(s)-1 for lo <= hi { m := (lo + hi) >> 1 if s[m] < k { lo = m + 1 } else if s[m] > k { hi = m - 1 } else { return m } } return -1 } func taskPackage(tmp map[string]interface{}) { } // @Description 标的物 // @Author J 2022/9/29 16:48 func taskPur(tmp map[string]interface{}) { if plist, ok := tmp["purchasinglist"].([]interface{}); ok { for _, p := range plist { saveM := make(map[string]interface{}) p1 := p.(map[string]interface{}) for _, f := range PurField { if f == "infoid" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "unitprice" || f == "totalprice" { if p1[f] != nil { if reflect.TypeOf(p1[f]).String() == "string" { } else { if util.Float64All(p1[f]) <= 10000000000 { saveM[f], _ = util.FormatFloat(util.Float64All(p1[f]), 4) } } } } else { if p1[f] != nil { if reflect.TypeOf(p1[f]).String() == "string" { if f == "item" || f == "itemname" || f == "brandname" { if len(util.ObjToString(p1[f])) <= 500 { saveM[f] = p1[f] } } else { saveM[f] = p1[f] } } else { saveM[f] = p1[f] } } } } savePurPool <- saveM } } }