package main import ( "app.yhyue.com/moapp/jybase/encrypt" . "app.yhyue.com/moapp/jybase/mongodb" . "dataIdentify/db" . "dataIdentify/service" "flag" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/util/gconv" "github.com/gogf/gf/v2/util/grand" "log" "sync" "time" ) func main() { maxSize := flag.Int("c", 0, "") poolSize := flag.Int("p", 5, "") lastId := flag.String("id", "", "") flag.Parse() log.Println("start...") sess := Mgo_Main.GetMgoConn() defer Mgo_Main.DestoryMongoConn(sess) SelectField["publishtime"] = 1 SelectField["href"] = 1 SelectField["s_winner"] = 1 query := map[string]interface{}{ //"_id": StringTOBsonId("6763aa5555a3d7e571cda133"), "extracttype": 1, } if *lastId != "" { query["_id"] = map[string]interface{}{ "$lt": StringTOBsonId(*lastId), } } it := sess.DB(Mgo_Main.DbName).C(g.Config().MustGet(gctx.New(), "mongodb.main.collection").String()).Find(query).Select(SelectField).Sort("-_id").Iter() all := map[string]int{ "是中标联合体": 0, } for k, _ := range AllQuoteMode { all[k] = 0 } var isOver = func() bool { for _, v := range all { if v < *maxSize { return false } } return true } index := 0 pool := make(chan bool, *poolSize) wait := &sync.WaitGroup{} lock := &sync.Mutex{} isAllOver := false for mm := make(map[string]interface{}); it.Next(mm); { index++ if index%50000 == 0 { log.Println("index", index, all) } pool <- true wait.Add(1) go func(m map[string]interface{}) { defer func() { <-pool wait.Done() }() subtype, _ := m["subtype"].(string) if subtype != "中标" && subtype != "成交" && subtype != "合同" { return } publishtime := gconv.Int(m["publishtime"]) if publishtime%grand.N(1, 1000) != 0 { return } _id := BsonIdToSId(m["_id"]) href := "https://www.jianyu360.com/nologin/content/" + encrypt.CommonEncodeArticle("content", _id) + ".html" m["jybxhref"] = href quoteMode, bidCommonwealth := Pretreatment(_id, m, 0) if quoteMode == "" && bidCommonwealth != 1 { return } m["quote_mode"] = quoteMode m["bid_commonwealth"] = bidCommonwealth delete(m, "detail") lock.Lock() if all[quoteMode] < *maxSize || (all["是中标联合体"] < *maxSize && bidCommonwealth == 1) { if Mgo_Main.SaveByOriID("wcj_bidding_"+time.Now().Format("20060102"), m) { log.Println("save", _id) if all[quoteMode] < *maxSize { all[quoteMode]++ } if all["是中标联合体"] < *maxSize && bidCommonwealth == 1 { all["是中标联合体"]++ } if isOver() { isAllOver = true } } } lock.Unlock() }(mm) mm = make(map[string]interface{}) if isAllOver { break } } wait.Wait() log.Println("over...", index) }