package main import ( "customer_project/config" "database/sql" "encoding/json" "fmt" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "regexp" "strconv" "strings" "sync" ) var ( queryClose = make(chan bool) queryCloseOver = make(chan bool) pool = make(chan bool, 2) StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$") _datereg = regexp.MustCompile("20[0-2][0-9][年-][0-9]{1,2}[月-][0-9]{1,2}[日-]([0-9]{1,2}时[0-9]{0,2})?") StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$") replaceStr = regexp.MustCompile("(工程|采购|项目|[?!、【】()—()--]|栏标价|中标候选人|招标代理)") ) func loadData() { finalId := 0 lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT msg_id FROM %s ORDER BY msg_id DESC LIMIT 1", config.Conf.DB.Mysql.Pcoll)) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["msg_id"]) } log.Debug("loadData---", zap.Any("finally id", finalId)) lastid, count := 0, 0 for { log.Debug("重新查询", zap.Any("lastid---", lastid)) q := fmt.Sprintf("SELECT * FROM %s WHERE msg_id > %d ORDER BY msg_id ASC limit 100000", config.Conf.DB.Mysql.Pcoll, lastid) rows, err := MysqlTool.DB.Query(q) if err != nil { log.Error("loadData---", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { log.Debug("---loadData-finish----" + fmt.Sprint(count)) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("loadData---", zap.Error(err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["msg_id"]) count++ if count%20000 == 0 { log.Info("loadData current-------", zap.Any("count", count), zap.Any("lastid", lastid)) } pool <- true wg.Add(1) func(result map[string]interface{}) { defer func() { <-pool wg.Done() }() bys, _ := json.Marshal(result) var pro *Project _ = json.Unmarshal(bys, &pro) eid := pro.EntId id := pro.ProjectId var mapPn, mapPc, mapPb map[string]*Key if mapEnt[eid] != nil { mapPn = mapEnt[eid].mapPn mapPc = mapEnt[eid].mapPc mapPb = mapEnt[eid].mapPb if v := pro.ProjectName; v != "" { k := mapPn[v] if k == nil { k = &Key{Arr: []string{id}} mapPn[v] = k } else { k.Arr = append(k.Arr, id) } } if v := pro.ProjectCode; v != "" { k := mapPc[v] if k == nil { k = &Key{Arr: []string{id}} mapPc[v] = k } else { k.Arr = append(k.Arr, id) } } if pro.Buyer != "" && len([]rune(pro.Buyer)) > 2 { k := mapPb[pro.Buyer] if k == nil { k = &Key{Arr: []string{id}} mapPb[pro.Buyer] = k } else { k.Arr = append(k.Arr, id) } } AllIdsMap[id] = &ID{Id: id, P: pro} } else { mapPn = make(map[string]*Key, 1500000) mapPb = make(map[string]*Key, 5000000) mapPc = make(map[string]*Key, 5000000) if v := pro.ProjectName; v != "" { k := mapPn[v] if k == nil { k = &Key{Arr: []string{id}} mapPn[v] = k } else { k.Arr = append(k.Arr, id) } } if v := pro.ProjectCode; v != "" { k := mapPc[v] if k == nil { k = &Key{Arr: []string{id}} mapPc[v] = k } else { k.Arr = append(k.Arr, id) } } if pro.Buyer != "" && len([]rune(pro.Buyer)) > 2 { k := mapPb[pro.Buyer] if k == nil { k = &Key{Arr: []string{id}} mapPb[pro.Buyer] = k } else { k.Arr = append(k.Arr, id) } } AllIdsMap[id] = &ID{Id: id, P: pro} } mapEnt[eid] = &EntMap{ mapPb: mapPb, mapPn: mapPc, mapPc: mapPn, } }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } log.Info("load project over..", zap.Int("n", count)) } func taskProject() { pool := make(chan bool, 1) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id FROM %s where projectId is null and status = 1 ORDER BY id DESC LIMIT 1", config.Conf.DB.Mysql.Coll)) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Debug("taskProject---", zap.Any("finalId", finalId)) lastid, count := 0, 0 for { log.Debug("重新查询,", zap.Any("lastid", lastid)) q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d And projectId is null and status = 1 ORDER BY id ASC limit 10000", config.Conf.DB.Mysql.Coll, lastid) var stmtOut *sql.Stmt var tx *sql.Tx var err error if tx == nil { stmtOut, err = MysqlTool.DB.Prepare(q) } else { stmtOut, err = tx.Prepare(q) } rows, err := stmtOut.Query() if err != nil { log.Error("taskProject---", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { log.Debug("----finish----------", zap.Int("count", count)) _ = rows.Close() stmtOut.Close() break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("taskProject---", zap.Error(err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%2000 == 0 { log.Debug("current-------", zap.Any("count", count), zap.Any("lastid", lastid)) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() info := ParseInfo(tmp) startProjectMerge(info, tmp) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() stmtOut.Close() wg.Wait() } log.Info("所有线程执行完成...", zap.Int("count:", count)) } func ParseInfo(tmp map[string]interface{}) (info *Info) { bys, _ := json.Marshal(tmp) var thisinfo *Info _ = json.Unmarshal(bys, &thisinfo) if thisinfo == nil { return nil } thisinfo.Budget, _ = strconv.ParseFloat(util.ObjToString(tmp["budget"]), 64) thisinfo.Bidamount, _ = strconv.ParseFloat(util.ObjToString(tmp["bidamount"]), 64) if thisinfo.ProjectName == "" { thisinfo.ProjectName = thisinfo.Title } if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 { thisinfo.pnbval++ } if thisinfo.ProjectCode != "" { if thisinfo.ProjectCode != "" { if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 { thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "") } } thisinfo.pnbval++ } if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 { thisinfo.pnbval++ } else { thisinfo.Buyer = "" } //winners整理、清理 winner := util.ObjToString(tmp["s_winner"]) thisinfo.Winners = strings.Split(winner, ",") thisinfo.LenPC = len([]rune(thisinfo.ProjectCode)) thisinfo.LenPN = len([]rune(thisinfo.ProjectName)) return thisinfo }