|
@@ -1,638 +0,0 @@
|
|
|
-package main
|
|
|
-
|
|
|
-import (
|
|
|
- "fmt"
|
|
|
- "github.com/cron"
|
|
|
- "github.com/go-xweb/xweb"
|
|
|
- "mongodb"
|
|
|
- "net/http"
|
|
|
- qu "qfw/util"
|
|
|
- sp "spiderutil"
|
|
|
- "sync"
|
|
|
- "time"
|
|
|
-
|
|
|
- "go.mongodb.org/mongo-driver/bson/primitive"
|
|
|
-)
|
|
|
-
|
|
|
-var (
|
|
|
- Webport string
|
|
|
- ListenPort string
|
|
|
- Config map[string]interface{}
|
|
|
- Mgo *mongodb.MongodbSim
|
|
|
- Coll string //ocr_flie_over
|
|
|
- MoveCollFile string //bidding_file
|
|
|
- MoveCollNomal string //bidding_nomal
|
|
|
- Api string
|
|
|
- To string
|
|
|
- TaskTime int
|
|
|
- SaveMgoCache = make(chan map[string]interface{}, 1000) //
|
|
|
- SP = make(chan bool, 5)
|
|
|
- //StartId string
|
|
|
- Stop bool
|
|
|
-)
|
|
|
-
|
|
|
-func init() {
|
|
|
- qu.ReadConfig(&Config)
|
|
|
- InitFileInfo() //初始化附件解析信息
|
|
|
- InitOss() //oss
|
|
|
- Webport = qu.ObjToString(Config["webport"])
|
|
|
- ListenPort = qu.ObjToString(Config["listenport"])
|
|
|
- Coll = qu.ObjToString(Config["coll"])
|
|
|
- MoveCollFile = qu.ObjToString(Config["movecollfile"])
|
|
|
- MoveCollNomal = qu.ObjToString(Config["movecollnomal"])
|
|
|
- //StartId = qu.ObjToString(Config["startid"])
|
|
|
- Api = qu.ObjToString(Config["api"])
|
|
|
- To = qu.ObjToString(Config["to"])
|
|
|
- TaskTime = qu.IntAll(Config["tasktime"])
|
|
|
- Mgo = &mongodb.MongodbSim{
|
|
|
- MongodbAddr: qu.ObjToString(Config["address"]),
|
|
|
- DbName: qu.ObjToString(Config["dbname"]),
|
|
|
- Size: qu.IntAll(Config["size"]),
|
|
|
- UserName: qu.ObjToString(Config["username"]),
|
|
|
- Password: qu.ObjToString(Config["password"]),
|
|
|
- }
|
|
|
- Mgo.InitPool()
|
|
|
-}
|
|
|
-func main() {
|
|
|
- qu.Debug("start...")
|
|
|
- c := cron.New()
|
|
|
- c.Start()
|
|
|
- c.AddFunc("0 0 0 ? * *", DeleteData)
|
|
|
- go monitor()
|
|
|
- go SaveData() //数据保存
|
|
|
- go FileDataMoveToBidding() //附件数据迁移至bidding
|
|
|
- go NormalDataMoveToBidding() //正常数据迁移至bidding
|
|
|
- xweb.Run(":" + Webport)
|
|
|
- //ch := make(chan bool, 1)
|
|
|
- //<-ch
|
|
|
-}
|
|
|
-func monitor() {
|
|
|
- //最好是单实例调用
|
|
|
- http.HandleFunc("/movebidding/stop", func(w http.ResponseWriter, r *http.Request) {
|
|
|
- fmt.Println("停止数据迁移...")
|
|
|
- Stop = true
|
|
|
- w.Write([]byte("ok"))
|
|
|
- })
|
|
|
- http.HandleFunc("/movebidding/run", func(w http.ResponseWriter, r *http.Request) {
|
|
|
- fmt.Println("启动数据迁移...")
|
|
|
- Stop = false
|
|
|
- w.Write([]byte("ok"))
|
|
|
- })
|
|
|
- http.ListenAndServe(":"+ListenPort, nil)
|
|
|
-}
|
|
|
-
|
|
|
-func NormalDataMoveToBidding() {
|
|
|
- defer qu.Catch()
|
|
|
- for {
|
|
|
- query := map[string]interface{}{ //查询标物解析完成,未迁移的数据
|
|
|
- "bid_completetime": map[string]interface{}{
|
|
|
- "$exists": true,
|
|
|
- },
|
|
|
- "moveok": map[string]interface{}{
|
|
|
- "$exists": false,
|
|
|
- },
|
|
|
- }
|
|
|
- count := Mgo.Count(MoveCollNomal, query)
|
|
|
- qu.Debug("bidding_nomal,本轮查询数据量:", count, Stop)
|
|
|
- if count > 0 && !Stop {
|
|
|
- NormalDataMove(query)
|
|
|
- } else {
|
|
|
- time.Sleep(time.Duration(TaskTime) * time.Second)
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func NormalDataMove(query map[string]interface{}) {
|
|
|
- defer qu.Catch()
|
|
|
- sess := Mgo.GetMgoConn()
|
|
|
- defer Mgo.DestoryMongoConn(sess)
|
|
|
- fields := map[string]interface{}{
|
|
|
- "bid_completetime": 0,
|
|
|
- "bid_starttime": 0,
|
|
|
- "goods_read": 0,
|
|
|
- }
|
|
|
- lock := sync.Mutex{}
|
|
|
- ch := make(chan bool, 10)
|
|
|
- wg := sync.WaitGroup{}
|
|
|
- updateArr := [][]map[string]interface{}{}
|
|
|
- it := sess.DB("qfw").C(MoveCollNomal).Find(&query).Select(&fields).Iter()
|
|
|
- n := 0
|
|
|
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
- ch <- true
|
|
|
- wg.Add(1)
|
|
|
- go func(tmp map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-ch
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- _id := tmp["_id"]
|
|
|
- update := []map[string]interface{}{}
|
|
|
- update = append(update, map[string]interface{}{"_id": _id})
|
|
|
- newId := primitive.NewObjectID()
|
|
|
- tmp["_id"] = newId
|
|
|
- if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
|
|
|
- strid := mongodb.BsonIdToSId(newId)
|
|
|
- tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strid) + `.html`
|
|
|
- }
|
|
|
- //更新
|
|
|
- set := map[string]interface{}{"moveok": true}
|
|
|
- set["biddingid"] = mongodb.BsonIdToSId(newId)
|
|
|
- update = append(update, map[string]interface{}{
|
|
|
- "$set": set,
|
|
|
- })
|
|
|
- //save
|
|
|
- SaveMgoCache <- tmp
|
|
|
- lock.Lock()
|
|
|
- //update
|
|
|
- updateArr = append(updateArr, update)
|
|
|
- if len(updateArr) > 200 {
|
|
|
- Mgo.UpdateBulk(MoveCollNomal, updateArr...)
|
|
|
- updateArr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- }(tmp)
|
|
|
- if n%100 == 0 {
|
|
|
- qu.Debug("current:", n)
|
|
|
- }
|
|
|
- tmp = map[string]interface{}{}
|
|
|
- }
|
|
|
- wg.Wait()
|
|
|
- lock.Lock()
|
|
|
- if len(updateArr) > 0 {
|
|
|
- Mgo.UpdateBulk(MoveCollNomal, updateArr...)
|
|
|
- updateArr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- qu.Debug("bidding_nomal,本轮迁移数据量:", n)
|
|
|
-}
|
|
|
-
|
|
|
-func FileDataMoveToBidding() {
|
|
|
- defer qu.Catch()
|
|
|
- for {
|
|
|
- query := map[string]interface{}{ //查询附件解析完成,未迁移的数据
|
|
|
- "bid_completetime": map[string]interface{}{
|
|
|
- "$exists": true,
|
|
|
- },
|
|
|
- "file_completetime": map[string]interface{}{
|
|
|
- "$exists": true,
|
|
|
- },
|
|
|
- "moveok": map[string]interface{}{
|
|
|
- "$exists": false,
|
|
|
- },
|
|
|
- }
|
|
|
- count := Mgo.Count(MoveCollFile, query)
|
|
|
- qu.Debug("bidding_file,本轮查询数据量:", count, Stop)
|
|
|
- if count > 0 && !Stop {
|
|
|
- FileDataMove(query)
|
|
|
- } else {
|
|
|
- time.Sleep(time.Duration(TaskTime) * time.Second)
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func FileDataMove(query map[string]interface{}) {
|
|
|
- defer qu.Catch()
|
|
|
- sess := Mgo.GetMgoConn()
|
|
|
- defer Mgo.DestoryMongoConn(sess)
|
|
|
- fields := map[string]interface{}{
|
|
|
- "attach_time": 0,
|
|
|
- "file_completetime": 0,
|
|
|
- "file_starttime": 0,
|
|
|
- "bid_completetime": 0,
|
|
|
- "bid_starttime": 0,
|
|
|
- "file_read": 0,
|
|
|
- }
|
|
|
- lock := sync.Mutex{}
|
|
|
- ch := make(chan bool, 10)
|
|
|
- wg := sync.WaitGroup{}
|
|
|
- updateArr := [][]map[string]interface{}{}
|
|
|
- it := sess.DB("qfw").C(MoveCollFile).Find(&query).Select(&fields).Iter()
|
|
|
- n := 0
|
|
|
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
- ch <- true
|
|
|
- wg.Add(1)
|
|
|
- go func(tmp map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-ch
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- _id := tmp["_id"]
|
|
|
- update := []map[string]interface{}{}
|
|
|
- update = append(update, map[string]interface{}{"_id": _id})
|
|
|
- newId := primitive.NewObjectID()
|
|
|
- tmp["_id"] = newId //新id
|
|
|
- if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
|
|
|
- strid := mongodb.BsonIdToSId(newId)
|
|
|
- tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strid) + `.html`
|
|
|
- }
|
|
|
- //更新
|
|
|
- set := map[string]interface{}{"moveok": true}
|
|
|
- set["biddingid"] = mongodb.BsonIdToSId(newId)
|
|
|
- site := qu.ObjToString(tmp["site"]) //解析附件站点
|
|
|
- if limitRatio := OssSite[site]; limitRatio > 0 { //配置站点解析附件,根据准确率情况替换正文
|
|
|
- replace, filetext := AnalysisFile(true, limitRatio, tmp)
|
|
|
- if replace { //替换正文
|
|
|
- tmp["detail"] = filetext
|
|
|
- set["filetext"] = true
|
|
|
- }
|
|
|
- } else { //其它网站附件信息,detail无效,只有一个附件且不是ocr识别的,替换正文
|
|
|
- //判断detail是否有效
|
|
|
- detail := qu.ObjToString(tmp["detail"])
|
|
|
- detail = sp.FilterDetail(detail) //只保留文本内容
|
|
|
- if len([]rune(detail)) <= 5 || (len([]rune(detail)) <= 50 && SpecialTextReg.MatchString(detail)) {
|
|
|
- replace, filetext := AnalysisFile(false, 0, tmp)
|
|
|
- if replace { //替换正文
|
|
|
- tmp["detail"] = filetext
|
|
|
- set["filetext"] = true
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- //IsReplaceDetailSite := OssSite[site]
|
|
|
- //if IsReplaceDetailSite {
|
|
|
- // replace, fileOk, filetext := AnalysisFile_back(IsReplaceDetailSite, tmp) //解析附件是否替换到detail
|
|
|
- // if replace {
|
|
|
- // tmp["detail"] = filetext //替换正文
|
|
|
- // }
|
|
|
- // if !fileOk { //附件异常
|
|
|
- // set["filerr"] = true
|
|
|
- // //set["filetext"] = filetext//文本过大,导致更新mongo失败
|
|
|
- // }
|
|
|
- //}
|
|
|
- update = append(update, map[string]interface{}{
|
|
|
- "$set": set,
|
|
|
- })
|
|
|
- //save
|
|
|
- SaveMgoCache <- tmp
|
|
|
- lock.Lock()
|
|
|
- //update
|
|
|
- updateArr = append(updateArr, update)
|
|
|
- if len(updateArr) > 200 {
|
|
|
- Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
- updateArr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- }(tmp)
|
|
|
- if n%100 == 0 {
|
|
|
- qu.Debug("current:", n)
|
|
|
- }
|
|
|
- tmp = map[string]interface{}{}
|
|
|
- }
|
|
|
- wg.Wait()
|
|
|
- lock.Lock()
|
|
|
- if len(updateArr) > 0 {
|
|
|
- Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
- updateArr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- qu.Debug("bidding_file,本轮迁移数据量:", n)
|
|
|
-}
|
|
|
-
|
|
|
-func SaveData() {
|
|
|
- fmt.Println("Save Bidding Data...")
|
|
|
- savearr := make([]map[string]interface{}, 200)
|
|
|
- indexh := 0
|
|
|
- for {
|
|
|
- select {
|
|
|
- case v := <-SaveMgoCache:
|
|
|
- savearr[indexh] = v
|
|
|
- indexh++
|
|
|
- if indexh == 200 { //超过200条开始保存
|
|
|
- SP <- true
|
|
|
- go func(savearr []map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-SP
|
|
|
- }()
|
|
|
- Mgo.SaveBulk("bidding", savearr...)
|
|
|
- }(savearr)
|
|
|
- savearr = make([]map[string]interface{}, 200)
|
|
|
- indexh = 0
|
|
|
- }
|
|
|
- case <-time.After(10 * time.Second): //超过10秒开始保存
|
|
|
- qu.Debug("定时保存数据...", indexh)
|
|
|
- if indexh > 0 {
|
|
|
- SP <- true
|
|
|
- go func(savearr []map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-SP
|
|
|
- }()
|
|
|
- Mgo.SaveBulk("bidding", savearr...)
|
|
|
- }(savearr[:indexh])
|
|
|
- savearr = make([]map[string]interface{}, 200)
|
|
|
- indexh = 0
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func DeleteData() {
|
|
|
- query := map[string]interface{}{
|
|
|
- "comeintime": map[string]interface{}{
|
|
|
- "$lt": time.Now().AddDate(0, -1, 0).Unix(),
|
|
|
- },
|
|
|
- "moveok": true,
|
|
|
- }
|
|
|
- n := Mgo.Delete(MoveCollFile, query)
|
|
|
- n1 := Mgo.Delete(MoveCollNomal, query)
|
|
|
- qu.Debug("bidding_file删除数据量:", n, " bidding_nomal删除数据量:", n1)
|
|
|
-}
|
|
|
-
|
|
|
-/*func DataMoveToBidding() {
|
|
|
- defer qu.Catch()
|
|
|
- for {
|
|
|
- //查询ocr_flie_over获取要迁移的id段
|
|
|
- gtid, lteid := GetIdInterval()
|
|
|
- if gtid != "" && lteid != "" {
|
|
|
- MoveData(gtid, lteid)
|
|
|
- } else {
|
|
|
- time.Sleep(time.Duration(TaskTime) * time.Second)
|
|
|
- }
|
|
|
- }
|
|
|
-}*/
|
|
|
-
|
|
|
-//获取ID区间
|
|
|
-/*func GetIdInterval() (string, string) {
|
|
|
- defer qu.Catch()
|
|
|
- qu.Debug("获取id段...")
|
|
|
- query := map[string]interface{}{
|
|
|
- "isused": false,
|
|
|
- }
|
|
|
- list, _ := Mgo.Find(Coll, query, map[string]interface{}{"_id": 1}, nil, false, -1, -1)
|
|
|
- dataLength := len(*list)
|
|
|
- if dataLength == 0 { //无迁移id段
|
|
|
- return "", ""
|
|
|
- }
|
|
|
- gtid, lteid := "", ""
|
|
|
- idArr := []interface{}{}
|
|
|
- for i, l := range *list {
|
|
|
- gtidTmp := qu.ObjToString(l["gtid"])
|
|
|
- lteidTmp := qu.ObjToString(l["lteid"])
|
|
|
- if gtidTmp == "" || lteidTmp == "" || gtidTmp >= lteidTmp {
|
|
|
- qu.Debug("查询id段出错")
|
|
|
- SendMail("查询id段异常")
|
|
|
- time.Sleep(1 * time.Minute)
|
|
|
- return "", ""
|
|
|
- }
|
|
|
- if i == 0 { //第一条数据的起始id为最终迁移数据的起始id
|
|
|
- gtid = gtidTmp
|
|
|
- }
|
|
|
- if i == dataLength-1 { //最后一条数据的结束id为最终迁移数据的结束id
|
|
|
- lteid = lteidTmp
|
|
|
- }
|
|
|
- idArr = append(idArr, l["_id"])
|
|
|
- }
|
|
|
- //成功获取id区间
|
|
|
- if gtid != "" && lteid != "" && gtid < lteid {
|
|
|
- if gtid < StartId { //本轮起始id小于上轮结束id,id区间异常
|
|
|
- qu.Debug("异常区间:", gtid, StartId)
|
|
|
- SendMail("id区间异常")
|
|
|
- time.Sleep(1 * time.Minute)
|
|
|
- return "", ""
|
|
|
- }
|
|
|
- StartId = lteid
|
|
|
- for _, id := range idArr { //更新
|
|
|
- Mgo.Update(Coll, map[string]interface{}{"_id": id}, map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{"isused": true, "updatetime": time.Now().Unix()},
|
|
|
- }, false, false)
|
|
|
- }
|
|
|
- return gtid, lteid
|
|
|
- }
|
|
|
- return "", ""
|
|
|
-
|
|
|
-}*/
|
|
|
-
|
|
|
-//处理迁移数据
|
|
|
-/*func MoveData(gtid, lteid string) {
|
|
|
- defer qu.Catch()
|
|
|
- qu.Debug("迁移开始,ID 区间:", gtid, lteid)
|
|
|
- sess := Mgo.GetMgoConn()
|
|
|
- defer Mgo.DestoryMongoConn(sess)
|
|
|
- query := map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": mongodb.StringTOBsonId(gtid),
|
|
|
- "$lte": mongodb.StringTOBsonId(lteid),
|
|
|
- },
|
|
|
- }
|
|
|
- fields := map[string]interface{}{
|
|
|
- "attach_time": 0,
|
|
|
- }
|
|
|
- lock := sync.Mutex{}
|
|
|
- ch := make(chan bool, 3)
|
|
|
- wg := sync.WaitGroup{}
|
|
|
- saveArr := []map[string]interface{}{}
|
|
|
- updateArr := [][]map[string]interface{}{}
|
|
|
- count, _ := sess.DB("qfw").C(MoveCollFile).Find(&query).Count()
|
|
|
- qu.Debug("查询数据总条数:", count)
|
|
|
- it := sess.DB("qfw").C(MoveCollFile).Find(&query).Select(&fields).Iter()
|
|
|
- n := 0
|
|
|
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
- ch <- true
|
|
|
- wg.Add(1)
|
|
|
- go func(tmp map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-ch
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- if tmp["moveok"] != nil {
|
|
|
- return
|
|
|
- }
|
|
|
- _id := tmp["_id"]
|
|
|
- update := []map[string]interface{}{}
|
|
|
- update = append(update, map[string]interface{}{"_id": _id})
|
|
|
- newId := primitive.NewObjectID()
|
|
|
- if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
|
|
|
- strid := mongodb.BsonIdToSId(newId)
|
|
|
- tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strid) + `.html`
|
|
|
- }
|
|
|
- //更新
|
|
|
- set := map[string]interface{}{"moveok": true}
|
|
|
- set["biddingid"] = mongodb.BsonIdToSId(newId)
|
|
|
-
|
|
|
- site := qu.ObjToString(tmp["site"]) //解析附件站点
|
|
|
- IsReplaceDetailSite := OssSite[site]
|
|
|
- if IsReplaceDetailSite {
|
|
|
- replace, fileOk, filetext := AnalysisFile(IsReplaceDetailSite, tmp) //解析附件是否替换到detail
|
|
|
- if replace {
|
|
|
- tmp["detail"] = filetext //替换正文
|
|
|
- }
|
|
|
- if !fileOk { //附件异常
|
|
|
- set["filerr"] = true
|
|
|
- //set["filetext"] = filetext//文本过大,导致更新mongo失败
|
|
|
- }
|
|
|
- }
|
|
|
- update = append(update, map[string]interface{}{
|
|
|
- "$set": set,
|
|
|
- })
|
|
|
- tmp["_id"] = newId
|
|
|
- tmp["comeintime"] = time.Now().Unix()
|
|
|
- lock.Lock()
|
|
|
- //save
|
|
|
- saveArr = append(saveArr, tmp)
|
|
|
- if len(saveArr) > 50 { //不能设置过大,加上保存服务批量保存数据,会造成两id间数据缺失
|
|
|
- Mgo.SaveBulk("bidding", saveArr...)
|
|
|
- saveArr = []map[string]interface{}{}
|
|
|
- }
|
|
|
- //update
|
|
|
- updateArr = append(updateArr, update)
|
|
|
- if len(updateArr) > 500 {
|
|
|
- Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
- updateArr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- }(tmp)
|
|
|
- if n%100 == 0 {
|
|
|
- qu.Debug("current:", n)
|
|
|
- }
|
|
|
- tmp = map[string]interface{}{}
|
|
|
- }
|
|
|
- wg.Wait()
|
|
|
- lock.Lock()
|
|
|
- if len(saveArr) > 0 {
|
|
|
- Mgo.SaveBulk("bidding", saveArr...)
|
|
|
- saveArr = []map[string]interface{}{}
|
|
|
- }
|
|
|
- if len(updateArr) > 0 {
|
|
|
- Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
- updateArr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- qu.Debug("迁移结束,ID 区间:", gtid, lteid)
|
|
|
-}*/
|
|
|
-
|
|
|
-/*func SendMail(bodyTextAll string) {
|
|
|
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", Api, To, "ocr_file_over", bodyTextAll))
|
|
|
- if err == nil {
|
|
|
- defer res.Body.Close()
|
|
|
- read, err := ioutil.ReadAll(res.Body)
|
|
|
- qu.Debug("邮件发送成功:", string(read), err)
|
|
|
- } else {
|
|
|
- qu.Debug("邮件发送失败:", err)
|
|
|
- }
|
|
|
-}
|
|
|
-*/
|
|
|
-
|
|
|
-/*func FileText() {
|
|
|
- defer qu.Catch()
|
|
|
- sess := Mgo.GetMgoConn()
|
|
|
- defer Mgo.DestoryMongoConn(sess)
|
|
|
- ch := make(chan bool, 10)
|
|
|
- wg := &sync.WaitGroup{}
|
|
|
- lock := &sync.Mutex{}
|
|
|
- field := map[string]interface{}{
|
|
|
- "attach_text": 1,
|
|
|
- }
|
|
|
- it := sess.DB("mxs").C("bidding_text").Find(nil).Select(&field).Iter()
|
|
|
- n := 0
|
|
|
- arr := [][]map[string]interface{}{}
|
|
|
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
- ch <- true
|
|
|
- wg.Add(1)
|
|
|
- go func(tmp map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-ch
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- update := []map[string]interface{}{}
|
|
|
- update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
- ok, filetext := AnalysisFile(tmp)
|
|
|
- if ok {
|
|
|
- update = append(update, map[string]interface{}{"$set": map[string]interface{}{
|
|
|
- "filetext": filetext,
|
|
|
- }})
|
|
|
- }
|
|
|
- //filetext := qu.ObjToString(tmp["filetext"])
|
|
|
- //ok, filetext, state, comratio, notcomratio := AnalysisFileTest(filetext) //解析
|
|
|
- //update := []map[string]interface{}{}
|
|
|
- //update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
- //update = append(update, map[string]interface{}{"$set": map[string]interface{}{
|
|
|
- // "ok": ok,
|
|
|
- // "filetexb": filetext,
|
|
|
- // "state": state,
|
|
|
- // "comratio": comratio,
|
|
|
- // "notcomratio": notcomratio,
|
|
|
- //}})
|
|
|
- lock.Lock()
|
|
|
- if len(update) == 2 {
|
|
|
- arr = append(arr, update)
|
|
|
- }
|
|
|
- if len(arr) > 500 {
|
|
|
- Mgo.UpdateBulk("bidding_text", arr...)
|
|
|
- arr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- }(tmp)
|
|
|
- if n%1000 == 0 {
|
|
|
- qu.Debug("current:", n)
|
|
|
- }
|
|
|
- tmp = map[string]interface{}{}
|
|
|
- }
|
|
|
- wg.Wait()
|
|
|
- lock.Lock()
|
|
|
- if len(arr) > 0 {
|
|
|
- Mgo.UpdateBulk("bidding_text", arr...)
|
|
|
- arr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
-}
|
|
|
-func testdata() {
|
|
|
- sess := Mgo.GetMgoConn()
|
|
|
- defer Mgo.DestoryMongoConn(sess)
|
|
|
- fields := map[string]interface{}{
|
|
|
- "attach_text": 1,
|
|
|
- }
|
|
|
- query := map[string]interface{}{
|
|
|
- "site": "中国招标投标公共服务平台",
|
|
|
- }
|
|
|
- lock := sync.Mutex{}
|
|
|
- ch := make(chan bool, 10)
|
|
|
- wg := sync.WaitGroup{}
|
|
|
- updateArr := [][]map[string]interface{}{}
|
|
|
- count, _ := sess.DB("qfw").C("bidding").Find(&query).Count()
|
|
|
- qu.Debug("查询数据总条数:", count)
|
|
|
- it := sess.DB("qfw").C("bidding").Find(&query).Select(&fields).Iter()
|
|
|
- n := 0
|
|
|
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
- ch <- true
|
|
|
- wg.Add(1)
|
|
|
- go func(tmp map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-ch
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
- if id > "62335b000000000000000000" {
|
|
|
- return
|
|
|
- }
|
|
|
- update := []map[string]interface{}{}
|
|
|
- update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
- ok, filetext := AnalysisFile(tmp) //解析附件是否替换到detail
|
|
|
- if !ok {
|
|
|
- return
|
|
|
- }
|
|
|
- update = append(update, map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "detail": filetext,
|
|
|
- "detailupdate": true,
|
|
|
- },
|
|
|
- })
|
|
|
- lock.Lock()
|
|
|
- //update
|
|
|
- updateArr = append(updateArr, update)
|
|
|
- if len(updateArr) > 500 {
|
|
|
- Mgo.UpdateBulk("bidding", updateArr...)
|
|
|
- updateArr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- }(tmp)
|
|
|
- if n%100 == 0 {
|
|
|
- qu.Debug("current:", n)
|
|
|
- }
|
|
|
- tmp = map[string]interface{}{}
|
|
|
- }
|
|
|
- wg.Wait()
|
|
|
- lock.Lock()
|
|
|
- if len(updateArr) > 0 {
|
|
|
- Mgo.UpdateBulk("bidding", updateArr...)
|
|
|
- updateArr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
-}*/
|