package main import ( "database/sql" "encoding/json" "fmt" "github.com/tealeg/xlsx" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "regexp" "strings" "sync" "tieta_data/config" ) var ( queryClose = make(chan bool) queryCloseOver = make(chan bool) pool = make(chan bool, 2) StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$") _datereg = regexp.MustCompile("20[0-2][0-9][年-][0-9]{1,2}[月-][0-9]{1,2}[日-]([0-9]{1,2}时[0-9]{0,2})?") StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$") replaceStr = regexp.MustCompile("(工程|采购|项目|[?!、【】()—()--]|栏标价|中标候选人|招标代理)") ) func taskExcelP(tmp map[string]interface{}, row *xlsx.Row) { for _, v := range FieldArr { if FieldMap[v] != "" { if v == "项目id" { row.AddCell().SetValue(mongodb.BsonIdToSId(tmp["_id"])) } else if v == "预算金额(元)" || v == "中标金额(元)" { if tmp[FieldMap[v]] != nil { row.AddCell().SetValue(tmp[FieldMap[v]]) } else { row.AddCell().SetValue("") } } else if v == "招标数据更新时间" || v == "中标数据更新时间" { p := util.Int64All(tmp[FieldMap[v]]) if p > 0 { row.AddCell().SetValue(util.FormatDateByInt64(&p, util.Date_Full_Layout)) } else { row.AddCell().SetValue("") } } else { row.AddCell().SetValue(util.ObjToString(tmp[FieldMap[v]])) } } else { row.AddCell().SetValue("") } } } func taskExcelB(tmp map[string]interface{}, row *xlsx.Row) { for _, v := range FieldArr1 { if FieldMap1[v] != "" { if v == "信息id" { row.AddCell().SetValue(util.ObjToString(tmp["id"])) } else if v == "项目id" { info, _ := MongoTool.FindOne(config.Conf.DB.Mongo.Pcoll, bson.M{"ids": util.ObjToString(tmp["id"])}) if len(*info) > 0 { row.AddCell().SetValue(mongodb.BsonIdToSId((*info)["_id"])) } else { row.AddCell().SetValue("") } } else if v == "发布时间" || v == "标书获取时间" || v == "标书截止时间" || v == "投标开始时间" || v == "投标截止时间" || v == "开标时间" { p := util.Int64All(tmp[FieldMap1[v]]) if p > 0 { row.AddCell().SetValue(util.FormatDateByInt64(&p, util.Date_Full_Layout)) } else { row.AddCell().SetValue("") } } else if v == "预算金额(元)" || v == "中标金额(元)" { if tmp[FieldMap1[v]] != nil { row.AddCell().SetValue(tmp[FieldMap1[v]]) } else { row.AddCell().SetValue("") } } else if v == "是否电子招标" { if util.ObjToString(tmp[FieldMap1[v]]) != "" { row.AddCell().SetValue("是") } else { row.AddCell().SetValue("否") } } else { row.AddCell().SetValue(util.ObjToString(tmp[FieldMap1[v]])) } } else { row.AddCell().SetValue("") } } } func loadData() { finalId := 0 lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT msg_id FROM %s ORDER BY msg_id DESC LIMIT 1", config.Conf.DB.Mysql.Pcoll)) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["msg_id"]) } util.Debug("loadData---", "finally id", finalId) lastid, count := 0, 0 for { util.Debug("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT * FROM %s WHERE msg_id > %d ORDER BY msg_id ASC limit 100000", config.Conf.DB.Mysql.Pcoll, lastid) rows, err := MysqlTool.DB.Query(q) if err != nil { util.Debug("loadData---", err) } columns, err := rows.Columns() if finalId == lastid { util.Debug("---loadData-finish----", count) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { util.Debug("loadData---", err) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["msg_id"]) count++ if count%20000 == 0 { util.Debug("loadData current-------", count, lastid) } pool <- true wg.Add(1) func(result map[string]interface{}) { defer func() { <-pool wg.Done() }() bys, _ := json.Marshal(result) var pro *Project _ = json.Unmarshal(bys, &pro) id := pro.ProjectId if v := pro.ProjectName; v != "" { k := mapPn[v] if k == nil { k = &Key{Arr: []string{id}} mapPn[v] = k } else { k.Arr = append(k.Arr, id) } } if v := pro.ProjectCode; v != "" { k := mapPc[v] if k == nil { k = &Key{Arr: []string{id}} mapPc[v] = k } else { k.Arr = append(k.Arr, id) } } if pro.Buyer != "" && len([]rune(pro.Buyer)) > 2 { k := mapPb[pro.Buyer] if k == nil { k = &Key{Arr: []string{id}} mapPb[pro.Buyer] = k } else { k.Arr = append(k.Arr, id) } } AllIdsMap[id] = &ID{Id: id, P: pro} }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } log.Info("load project over..", zap.Int("n", count)) } func taskProject() { pool := make(chan bool, 2) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT msg_id FROM %s where projectId is null ORDER BY msg_id DESC LIMIT 1", config.Conf.DB.Mysql.Coll)) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["msg_id"]) } util.Debug("taskProject---", "finally id", finalId) lastid, count := 0, 0 for { util.Debug("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT * FROM %s WHERE msg_id > %d And projectId is null ORDER BY msg_id ASC limit 100000", config.Conf.DB.Mysql.Coll, lastid) var stmtOut *sql.Stmt var tx *sql.Tx var err error if tx == nil { stmtOut, err = MysqlTool.DB.Prepare(q) } else { stmtOut, err = tx.Prepare(q) } rows, err := stmtOut.Query() if err != nil { util.Debug("taskProject---", err) } columns, err := rows.Columns() if finalId == lastid { util.Debug("----finish----------", count) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { util.Debug("taskProject---", err) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["msg_id"]) count++ if count%2000 == 0 { util.Debug("current-------", count, lastid) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() info := ParseInfo(tmp) startProjectMerge(info, tmp) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() stmtOut.Close() wg.Wait() } log.Info("所有线程执行完成...", zap.Int("count:", count)) } func ParseInfo(tmp map[string]interface{}) (info *Info) { bys, _ := json.Marshal(tmp) var thisinfo *Info _ = json.Unmarshal(bys, &thisinfo) if thisinfo == nil { return nil } if thisinfo.ProjectName == "" { thisinfo.ProjectName = thisinfo.Title } if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 { thisinfo.pnbval++ } if thisinfo.ProjectCode != "" { if thisinfo.ProjectCode != "" { if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 { thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "") } } thisinfo.pnbval++ } if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 { thisinfo.pnbval++ } else { thisinfo.Buyer = "" } if tmp["multipackage"] == nil || util.ObjToString(tmp["multipackage"]) == "否" { thisinfo.MultiPackage = 0 } else { thisinfo.MultiPackage = 1 } //winners整理、清理 winner := util.ObjToString(tmp["s_winner"]) thisinfo.Winners = strings.Split(winner, ",") thisinfo.LenPC = len([]rune(thisinfo.ProjectCode)) thisinfo.LenPN = len([]rune(thisinfo.ProjectName)) return thisinfo }