123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- package main
- import (
- "customer_project/config"
- "database/sql"
- "encoding/json"
- "fmt"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "regexp"
- "strconv"
- "strings"
- "sync"
- )
- var (
- queryClose = make(chan bool)
- queryCloseOver = make(chan bool)
- pool = make(chan bool, 2)
- StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
- _datereg = regexp.MustCompile("20[0-2][0-9][年-][0-9]{1,2}[月-][0-9]{1,2}[日-]([0-9]{1,2}时[0-9]{0,2})?")
- StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
- replaceStr = regexp.MustCompile("(工程|采购|项目|[?!、【】()—()--]|栏标价|中标候选人|招标代理)")
- )
- func loadData() {
- finalId := 0
- lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT msg_id FROM %s ORDER BY msg_id DESC LIMIT 1", config.Conf.DB.Mysql.Pcoll))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["msg_id"])
- }
- log.Debug("loadData---", zap.Any("finally id", finalId))
- lastid, count := 0, 0
- for {
- log.Debug("重新查询", zap.Any("lastid---", lastid))
- q := fmt.Sprintf("SELECT * FROM %s WHERE msg_id > %d ORDER BY msg_id ASC limit 100000", config.Conf.DB.Mysql.Pcoll, lastid)
- rows, err := MysqlTool.DB.Query(q)
- if err != nil {
- log.Error("loadData---", zap.Error(err))
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- log.Debug("---loadData-finish----" + fmt.Sprint(count))
- break
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Error("loadData---", zap.Error(err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["msg_id"])
- count++
- if count%20000 == 0 {
- log.Info("loadData current-------", zap.Any("count", count), zap.Any("lastid", lastid))
- }
- pool <- true
- wg.Add(1)
- func(result map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- bys, _ := json.Marshal(result)
- var pro *Project
- _ = json.Unmarshal(bys, &pro)
- eid := pro.EntId
- id := pro.ProjectId
- var mapPn, mapPc, mapPb map[string]*Key
- if mapEnt[eid] != nil {
- mapPn = mapEnt[eid].mapPn
- mapPc = mapEnt[eid].mapPc
- mapPb = mapEnt[eid].mapPb
- if v := pro.ProjectName; v != "" {
- k := mapPn[v]
- if k == nil {
- k = &Key{Arr: []string{id}}
- mapPn[v] = k
- } else {
- k.Arr = append(k.Arr, id)
- }
- }
- if v := pro.ProjectCode; v != "" {
- k := mapPc[v]
- if k == nil {
- k = &Key{Arr: []string{id}}
- mapPc[v] = k
- } else {
- k.Arr = append(k.Arr, id)
- }
- }
- if pro.Buyer != "" && len([]rune(pro.Buyer)) > 2 {
- k := mapPb[pro.Buyer]
- if k == nil {
- k = &Key{Arr: []string{id}}
- mapPb[pro.Buyer] = k
- } else {
- k.Arr = append(k.Arr, id)
- }
- }
- AllIdsMap[id] = &ID{Id: id, P: pro}
- } else {
- mapPn = make(map[string]*Key, 1500000)
- mapPb = make(map[string]*Key, 5000000)
- mapPc = make(map[string]*Key, 5000000)
- if v := pro.ProjectName; v != "" {
- k := mapPn[v]
- if k == nil {
- k = &Key{Arr: []string{id}}
- mapPn[v] = k
- } else {
- k.Arr = append(k.Arr, id)
- }
- }
- if v := pro.ProjectCode; v != "" {
- k := mapPc[v]
- if k == nil {
- k = &Key{Arr: []string{id}}
- mapPc[v] = k
- } else {
- k.Arr = append(k.Arr, id)
- }
- }
- if pro.Buyer != "" && len([]rune(pro.Buyer)) > 2 {
- k := mapPb[pro.Buyer]
- if k == nil {
- k = &Key{Arr: []string{id}}
- mapPb[pro.Buyer] = k
- } else {
- k.Arr = append(k.Arr, id)
- }
- }
- AllIdsMap[id] = &ID{Id: id, P: pro}
- }
- mapEnt[eid] = &EntMap{
- mapPb: mapPb,
- mapPn: mapPc,
- mapPc: mapPn,
- }
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- wg.Wait()
- }
- log.Info("load project over..", zap.Int("n", count))
- }
- func taskProject() {
- pool := make(chan bool, 1) //控制线程数
- wg := &sync.WaitGroup{}
- finalId := 0
- lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id FROM %s where projectId is null and status = 1 ORDER BY id DESC LIMIT 1", config.Conf.DB.Mysql.Coll))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Debug("taskProject---", zap.Any("finalId", finalId))
- lastid, count := 0, 0
- for {
- log.Debug("重新查询,", zap.Any("lastid", lastid))
- q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d And projectId is null and status = 1 ORDER BY id ASC limit 10000", config.Conf.DB.Mysql.Coll, lastid)
- var stmtOut *sql.Stmt
- var tx *sql.Tx
- var err error
- if tx == nil {
- stmtOut, err = MysqlTool.DB.Prepare(q)
- } else {
- stmtOut, err = tx.Prepare(q)
- }
- rows, err := stmtOut.Query()
- if err != nil {
- log.Error("taskProject---", zap.Error(err))
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- log.Debug("----finish----------", zap.Int("count", count))
- break
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Error("taskProject---", zap.Error(err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["id"])
- count++
- if count%2000 == 0 {
- log.Debug("current-------", zap.Any("count", count), zap.Any("lastid", lastid))
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- info := ParseInfo(tmp)
- startProjectMerge(info, tmp)
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- stmtOut.Close()
- wg.Wait()
- }
- log.Info("所有线程执行完成...", zap.Int("count:", count))
- }
- func ParseInfo(tmp map[string]interface{}) (info *Info) {
- bys, _ := json.Marshal(tmp)
- var thisinfo *Info
- _ = json.Unmarshal(bys, &thisinfo)
- if thisinfo == nil {
- return nil
- }
- thisinfo.Budget, _ = strconv.ParseFloat(util.ObjToString(tmp["budget"]), 64)
- thisinfo.Bidamount, _ = strconv.ParseFloat(util.ObjToString(tmp["bidamount"]), 64)
- if thisinfo.ProjectName == "" {
- thisinfo.ProjectName = thisinfo.Title
- }
- if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
- thisinfo.pnbval++
- }
- if thisinfo.ProjectCode != "" {
- if thisinfo.ProjectCode != "" {
- if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
- thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
- }
- }
- thisinfo.pnbval++
- }
- if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
- thisinfo.pnbval++
- } else {
- thisinfo.Buyer = ""
- }
- //winners整理、清理
- winner := util.ObjToString(tmp["s_winner"])
- thisinfo.Winners = strings.Split(winner, ",")
- thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
- thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
- return thisinfo
- }
|