|
@@ -2,9 +2,8 @@ package main
|
|
|
|
|
|
import (
|
|
|
"fmt"
|
|
|
- "io/ioutil"
|
|
|
+ "github.com/cron"
|
|
|
"mongodb"
|
|
|
- "net/http"
|
|
|
qu "qfw/util"
|
|
|
"sync"
|
|
|
"time"
|
|
@@ -15,15 +14,18 @@ import (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- Webport string
|
|
|
- Config map[string]interface{}
|
|
|
- Mgo *mongodb.MongodbSim
|
|
|
- Coll string //ocr_flie_over
|
|
|
- MoveColl string //bidding_file
|
|
|
- Api string
|
|
|
- To string
|
|
|
- StartId string
|
|
|
- TaskTime int
|
|
|
+ Webport string
|
|
|
+ Config map[string]interface{}
|
|
|
+ Mgo *mongodb.MongodbSim
|
|
|
+ Coll string //ocr_flie_over
|
|
|
+ MoveCollFile string //bidding_file
|
|
|
+ MoveCollNomal string //bidding_nomal
|
|
|
+ Api string
|
|
|
+ To string
|
|
|
+ TaskTime int
|
|
|
+ SaveMgoCache = make(chan map[string]interface{}, 1000) //
|
|
|
+ SP = make(chan bool, 5)
|
|
|
+ //StartId string
|
|
|
)
|
|
|
|
|
|
func init() {
|
|
@@ -32,8 +34,9 @@ func init() {
|
|
|
InitOss() //oss
|
|
|
Webport = qu.ObjToString(Config["webport"])
|
|
|
Coll = qu.ObjToString(Config["coll"])
|
|
|
- MoveColl = qu.ObjToString(Config["movecoll"])
|
|
|
- StartId = qu.ObjToString(Config["startid"])
|
|
|
+ MoveCollFile = qu.ObjToString(Config["movecollfile"])
|
|
|
+ MoveCollNomal = qu.ObjToString(Config["movecollnomal"])
|
|
|
+ //StartId = qu.ObjToString(Config["startid"])
|
|
|
Api = qu.ObjToString(Config["api"])
|
|
|
To = qu.ObjToString(Config["to"])
|
|
|
TaskTime = qu.IntAll(Config["tasktime"])
|
|
@@ -47,14 +50,247 @@ func init() {
|
|
|
Mgo.InitPool()
|
|
|
}
|
|
|
func main() {
|
|
|
- go DataMoveToBidding()
|
|
|
qu.Debug("start...")
|
|
|
+ c := cron.New()
|
|
|
+ c.Start()
|
|
|
+ c.AddFunc("0 0 0 ? * *", DeleteData)
|
|
|
+ go SaveData() //数据保存
|
|
|
+ go FileDataMoveToBidding() //附件数据迁移至bidding
|
|
|
+ go NormalDataMoveToBidding() //正常数据迁移至bidding
|
|
|
+ //go DataMoveToBidding()
|
|
|
xweb.Run(":" + Webport)
|
|
|
// ch := make(chan bool, 1)
|
|
|
// <-ch
|
|
|
}
|
|
|
|
|
|
-func DataMoveToBidding() {
|
|
|
+func NormalDataMoveToBidding() {
|
|
|
+ defer qu.Catch()
|
|
|
+ for {
|
|
|
+ query := map[string]interface{}{ //查询标物解析完成,未迁移的数据
|
|
|
+ "bid_completetime": map[string]interface{}{
|
|
|
+ "$exists": true,
|
|
|
+ },
|
|
|
+ "moveok": map[string]interface{}{
|
|
|
+ "$exists": false,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ count := Mgo.Count(MoveCollNomal, query)
|
|
|
+ qu.Debug("bidding_nomal,本轮查询数据量:", count)
|
|
|
+ if count > 0 {
|
|
|
+ NormalDataMove(query)
|
|
|
+ } else {
|
|
|
+ time.Sleep(time.Duration(TaskTime) * time.Second)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func NormalDataMove(query map[string]interface{}) {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "bid_completetime": 0,
|
|
|
+ "bid_starttime": 0,
|
|
|
+ }
|
|
|
+ lock := sync.Mutex{}
|
|
|
+ ch := make(chan bool, 3)
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ updateArr := [][]map[string]interface{}{}
|
|
|
+ it := sess.DB("qfw").C(MoveCollNomal).Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ _id := tmp["_id"]
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": _id})
|
|
|
+ newId := primitive.NewObjectID()
|
|
|
+ tmp["_id"] = newId
|
|
|
+ //更新
|
|
|
+ set := map[string]interface{}{"moveok": true}
|
|
|
+ set["biddingid"] = mongodb.BsonIdToSId(newId)
|
|
|
+ update = append(update, map[string]interface{}{
|
|
|
+ "$set": set,
|
|
|
+ })
|
|
|
+ //save
|
|
|
+ SaveMgoCache <- tmp
|
|
|
+ lock.Lock()
|
|
|
+ //update
|
|
|
+ updateArr = append(updateArr, update)
|
|
|
+ if len(updateArr) > 200 {
|
|
|
+ Mgo.UpdateBulk(MoveCollNomal, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(updateArr) > 0 {
|
|
|
+ Mgo.UpdateBulk(MoveCollNomal, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ qu.Debug("bidding_nomal,本轮迁移数据量:", n)
|
|
|
+}
|
|
|
+
|
|
|
+func FileDataMoveToBidding() {
|
|
|
+ defer qu.Catch()
|
|
|
+ for {
|
|
|
+ query := map[string]interface{}{ //查询附件解析完成,未迁移的数据
|
|
|
+ "file_completetime": map[string]interface{}{
|
|
|
+ "$exists": true,
|
|
|
+ },
|
|
|
+ "moveok": map[string]interface{}{
|
|
|
+ "$exists": false,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ count := Mgo.Count(MoveCollFile, query)
|
|
|
+ qu.Debug("bidding_file,本轮查询数据量:", count)
|
|
|
+ if count > 0 {
|
|
|
+ FileDataMove(query)
|
|
|
+ } else {
|
|
|
+ time.Sleep(time.Duration(TaskTime) * time.Second)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func FileDataMove(query map[string]interface{}) {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "attach_time": 0,
|
|
|
+ "file_completetime": 0,
|
|
|
+ "file_starttime": 0,
|
|
|
+ }
|
|
|
+ lock := sync.Mutex{}
|
|
|
+ ch := make(chan bool, 3)
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ updateArr := [][]map[string]interface{}{}
|
|
|
+ it := sess.DB("qfw").C(MoveCollFile).Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ _id := tmp["_id"]
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": _id})
|
|
|
+ newId := primitive.NewObjectID()
|
|
|
+ if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
|
|
|
+ strid := mongodb.BsonIdToSId(newId)
|
|
|
+ tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strid) + `.html`
|
|
|
+ }
|
|
|
+ //更新
|
|
|
+ set := map[string]interface{}{"moveok": true}
|
|
|
+ set["biddingid"] = mongodb.BsonIdToSId(newId)
|
|
|
+
|
|
|
+ site := qu.ObjToString(tmp["site"]) //解析附件站点
|
|
|
+ IsReplaceDetailSite := OssSite[site]
|
|
|
+ if IsReplaceDetailSite {
|
|
|
+ //replace, fileOk, filetext := AnalysisFile(IsReplaceDetailSite, tmp) //解析附件是否替换到detail
|
|
|
+ //if replace {
|
|
|
+ // tmp["detail"] = filetext //替换正文
|
|
|
+ //}
|
|
|
+ //if !fileOk { //附件异常
|
|
|
+ // set["filerr"] = true
|
|
|
+ // //set["filetext"] = filetext//文本过大,导致更新mongo失败
|
|
|
+ //}
|
|
|
+ }
|
|
|
+ update = append(update, map[string]interface{}{
|
|
|
+ "$set": set,
|
|
|
+ })
|
|
|
+ tmp["_id"] = newId
|
|
|
+ //save
|
|
|
+ SaveMgoCache <- tmp
|
|
|
+ lock.Lock()
|
|
|
+ //update
|
|
|
+ updateArr = append(updateArr, update)
|
|
|
+ if len(updateArr) > 200 {
|
|
|
+ Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(updateArr) > 0 {
|
|
|
+ Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ qu.Debug("bidding_file,本轮迁移数据量:", n)
|
|
|
+}
|
|
|
+
|
|
|
+func SaveData() {
|
|
|
+ fmt.Println("Save Bidding Data...")
|
|
|
+ savearr := make([]map[string]interface{}, 200)
|
|
|
+ indexh := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-SaveMgoCache:
|
|
|
+ savearr[indexh] = v
|
|
|
+ indexh++
|
|
|
+ if indexh == 200 { //超过200条开始保存
|
|
|
+ SP <- true
|
|
|
+ go func(savearr []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-SP
|
|
|
+ }()
|
|
|
+ Mgo.SaveBulk("bidding", savearr...)
|
|
|
+ }(savearr)
|
|
|
+ savearr = make([]map[string]interface{}, 200)
|
|
|
+ indexh = 0
|
|
|
+ }
|
|
|
+ case <-time.After(10 * time.Second): //超过10秒开始保存
|
|
|
+ qu.Debug("定时保存数据...", indexh)
|
|
|
+ if indexh > 0 {
|
|
|
+ SP <- true
|
|
|
+ go func(savearr []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-SP
|
|
|
+ }()
|
|
|
+ Mgo.SaveBulk("bidding", savearr...)
|
|
|
+ }(savearr[:indexh])
|
|
|
+ savearr = make([]map[string]interface{}, 200)
|
|
|
+ indexh = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func DeleteData() {
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$lt": time.Now().AddDate(0, -1, 0).Unix(),
|
|
|
+ },
|
|
|
+ "moveok": true,
|
|
|
+ }
|
|
|
+ n := Mgo.Delete(MoveCollFile, query)
|
|
|
+ n1 := Mgo.Delete(MoveCollNomal, query)
|
|
|
+ qu.Debug("bidding_file删除数据量:", n, " bidding_nomal删除数据量:", n1)
|
|
|
+}
|
|
|
+
|
|
|
+/*func DataMoveToBidding() {
|
|
|
defer qu.Catch()
|
|
|
for {
|
|
|
//查询ocr_flie_over获取要迁移的id段
|
|
@@ -65,9 +301,10 @@ func DataMoveToBidding() {
|
|
|
time.Sleep(time.Duration(TaskTime) * time.Second)
|
|
|
}
|
|
|
}
|
|
|
-}
|
|
|
+}*/
|
|
|
|
|
|
-func GetIdInterval() (string, string) {
|
|
|
+//获取ID区间
|
|
|
+/*func GetIdInterval() (string, string) {
|
|
|
defer qu.Catch()
|
|
|
qu.Debug("获取id段...")
|
|
|
query := map[string]interface{}{
|
|
@@ -115,9 +352,10 @@ func GetIdInterval() (string, string) {
|
|
|
}
|
|
|
return "", ""
|
|
|
|
|
|
-}
|
|
|
+}*/
|
|
|
|
|
|
-func MoveData(gtid, lteid string) {
|
|
|
+//处理迁移数据
|
|
|
+/*func MoveData(gtid, lteid string) {
|
|
|
defer qu.Catch()
|
|
|
qu.Debug("迁移开始,ID 区间:", gtid, lteid)
|
|
|
sess := Mgo.GetMgoConn()
|
|
@@ -136,9 +374,9 @@ func MoveData(gtid, lteid string) {
|
|
|
wg := sync.WaitGroup{}
|
|
|
saveArr := []map[string]interface{}{}
|
|
|
updateArr := [][]map[string]interface{}{}
|
|
|
- count, _ := sess.DB("qfw").C(MoveColl).Find(&query).Count()
|
|
|
+ count, _ := sess.DB("qfw").C(MoveCollFile).Find(&query).Count()
|
|
|
qu.Debug("查询数据总条数:", count)
|
|
|
- it := sess.DB("qfw").C(MoveColl).Find(&query).Select(&fields).Iter()
|
|
|
+ it := sess.DB("qfw").C(MoveCollFile).Find(&query).Select(&fields).Iter()
|
|
|
n := 0
|
|
|
for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
ch <- true
|
|
@@ -190,7 +428,7 @@ func MoveData(gtid, lteid string) {
|
|
|
//update
|
|
|
updateArr = append(updateArr, update)
|
|
|
if len(updateArr) > 500 {
|
|
|
- Mgo.UpdateBulk(MoveColl, updateArr...)
|
|
|
+ Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
updateArr = [][]map[string]interface{}{}
|
|
|
}
|
|
|
lock.Unlock()
|
|
@@ -207,14 +445,14 @@ func MoveData(gtid, lteid string) {
|
|
|
saveArr = []map[string]interface{}{}
|
|
|
}
|
|
|
if len(updateArr) > 0 {
|
|
|
- Mgo.UpdateBulk(MoveColl, updateArr...)
|
|
|
+ Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
updateArr = [][]map[string]interface{}{}
|
|
|
}
|
|
|
lock.Unlock()
|
|
|
qu.Debug("迁移结束,ID 区间:", gtid, lteid)
|
|
|
-}
|
|
|
+}*/
|
|
|
|
|
|
-func SendMail(bodyTextAll string) {
|
|
|
+/*func SendMail(bodyTextAll string) {
|
|
|
res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", Api, To, "ocr_file_over", bodyTextAll))
|
|
|
if err == nil {
|
|
|
defer res.Body.Close()
|
|
@@ -224,130 +462,131 @@ func SendMail(bodyTextAll string) {
|
|
|
qu.Debug("邮件发送失败:", err)
|
|
|
}
|
|
|
}
|
|
|
+*/
|
|
|
|
|
|
-//func FileText() {
|
|
|
-// defer qu.Catch()
|
|
|
-// sess := Mgo.GetMgoConn()
|
|
|
-// defer Mgo.DestoryMongoConn(sess)
|
|
|
-// ch := make(chan bool, 10)
|
|
|
-// wg := &sync.WaitGroup{}
|
|
|
-// lock := &sync.Mutex{}
|
|
|
-// field := map[string]interface{}{
|
|
|
-// "attach_text": 1,
|
|
|
-// }
|
|
|
-// it := sess.DB("mxs").C("bidding_text").Find(nil).Select(&field).Iter()
|
|
|
-// n := 0
|
|
|
-// arr := [][]map[string]interface{}{}
|
|
|
-// for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
-// ch <- true
|
|
|
-// wg.Add(1)
|
|
|
-// go func(tmp map[string]interface{}) {
|
|
|
-// defer func() {
|
|
|
-// <-ch
|
|
|
-// wg.Done()
|
|
|
-// }()
|
|
|
-// update := []map[string]interface{}{}
|
|
|
-// update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
-// ok, filetext := AnalysisFile(tmp)
|
|
|
-// if ok {
|
|
|
-// update = append(update, map[string]interface{}{"$set": map[string]interface{}{
|
|
|
-// "filetext": filetext,
|
|
|
-// }})
|
|
|
-// }
|
|
|
-// //filetext := qu.ObjToString(tmp["filetext"])
|
|
|
-// //ok, filetext, state, comratio, notcomratio := AnalysisFileTest(filetext) //解析
|
|
|
-// //update := []map[string]interface{}{}
|
|
|
-// //update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
-// //update = append(update, map[string]interface{}{"$set": map[string]interface{}{
|
|
|
-// // "ok": ok,
|
|
|
-// // "filetexb": filetext,
|
|
|
-// // "state": state,
|
|
|
-// // "comratio": comratio,
|
|
|
-// // "notcomratio": notcomratio,
|
|
|
-// //}})
|
|
|
-// lock.Lock()
|
|
|
-// if len(update) == 2 {
|
|
|
-// arr = append(arr, update)
|
|
|
-// }
|
|
|
-// if len(arr) > 500 {
|
|
|
-// Mgo.UpdateBulk("bidding_text", arr...)
|
|
|
-// arr = [][]map[string]interface{}{}
|
|
|
-// }
|
|
|
-// lock.Unlock()
|
|
|
-// }(tmp)
|
|
|
-// if n%1000 == 0 {
|
|
|
-// qu.Debug("current:", n)
|
|
|
-// }
|
|
|
-// tmp = map[string]interface{}{}
|
|
|
-// }
|
|
|
-// wg.Wait()
|
|
|
-// lock.Lock()
|
|
|
-// if len(arr) > 0 {
|
|
|
-// Mgo.UpdateBulk("bidding_text", arr...)
|
|
|
-// arr = [][]map[string]interface{}{}
|
|
|
-// }
|
|
|
-// lock.Unlock()
|
|
|
-//}
|
|
|
-//func testdata() {
|
|
|
-// sess := Mgo.GetMgoConn()
|
|
|
-// defer Mgo.DestoryMongoConn(sess)
|
|
|
-// fields := map[string]interface{}{
|
|
|
-// "attach_text": 1,
|
|
|
-// }
|
|
|
-// query := map[string]interface{}{
|
|
|
-// "site": "中国招标投标公共服务平台",
|
|
|
-// }
|
|
|
-// lock := sync.Mutex{}
|
|
|
-// ch := make(chan bool, 10)
|
|
|
-// wg := sync.WaitGroup{}
|
|
|
-// updateArr := [][]map[string]interface{}{}
|
|
|
-// count, _ := sess.DB("qfw").C("bidding").Find(&query).Count()
|
|
|
-// qu.Debug("查询数据总条数:", count)
|
|
|
-// it := sess.DB("qfw").C("bidding").Find(&query).Select(&fields).Iter()
|
|
|
-// n := 0
|
|
|
-// for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
-// ch <- true
|
|
|
-// wg.Add(1)
|
|
|
-// go func(tmp map[string]interface{}) {
|
|
|
-// defer func() {
|
|
|
-// <-ch
|
|
|
-// wg.Done()
|
|
|
-// }()
|
|
|
-// id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
-// if id > "62335b000000000000000000" {
|
|
|
-// return
|
|
|
-// }
|
|
|
-// update := []map[string]interface{}{}
|
|
|
-// update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
-// ok, filetext := AnalysisFile(tmp) //解析附件是否替换到detail
|
|
|
-// if !ok {
|
|
|
-// return
|
|
|
-// }
|
|
|
-// update = append(update, map[string]interface{}{
|
|
|
-// "$set": map[string]interface{}{
|
|
|
-// "detail": filetext,
|
|
|
-// "detailupdate": true,
|
|
|
-// },
|
|
|
-// })
|
|
|
-// lock.Lock()
|
|
|
-// //update
|
|
|
-// updateArr = append(updateArr, update)
|
|
|
-// if len(updateArr) > 500 {
|
|
|
-// Mgo.UpdateBulk("bidding", updateArr...)
|
|
|
-// updateArr = [][]map[string]interface{}{}
|
|
|
-// }
|
|
|
-// lock.Unlock()
|
|
|
-// }(tmp)
|
|
|
-// if n%100 == 0 {
|
|
|
-// qu.Debug("current:", n)
|
|
|
-// }
|
|
|
-// tmp = map[string]interface{}{}
|
|
|
-// }
|
|
|
-// wg.Wait()
|
|
|
-// lock.Lock()
|
|
|
-// if len(updateArr) > 0 {
|
|
|
-// Mgo.UpdateBulk("bidding", updateArr...)
|
|
|
-// updateArr = [][]map[string]interface{}{}
|
|
|
-// }
|
|
|
-// lock.Unlock()
|
|
|
-//}
|
|
|
+/*func FileText() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ field := map[string]interface{}{
|
|
|
+ "attach_text": 1,
|
|
|
+ }
|
|
|
+ it := sess.DB("mxs").C("bidding_text").Find(nil).Select(&field).Iter()
|
|
|
+ n := 0
|
|
|
+ arr := [][]map[string]interface{}{}
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ ok, filetext := AnalysisFile(tmp)
|
|
|
+ if ok {
|
|
|
+ update = append(update, map[string]interface{}{"$set": map[string]interface{}{
|
|
|
+ "filetext": filetext,
|
|
|
+ }})
|
|
|
+ }
|
|
|
+ //filetext := qu.ObjToString(tmp["filetext"])
|
|
|
+ //ok, filetext, state, comratio, notcomratio := AnalysisFileTest(filetext) //解析
|
|
|
+ //update := []map[string]interface{}{}
|
|
|
+ //update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ //update = append(update, map[string]interface{}{"$set": map[string]interface{}{
|
|
|
+ // "ok": ok,
|
|
|
+ // "filetexb": filetext,
|
|
|
+ // "state": state,
|
|
|
+ // "comratio": comratio,
|
|
|
+ // "notcomratio": notcomratio,
|
|
|
+ //}})
|
|
|
+ lock.Lock()
|
|
|
+ if len(update) == 2 {
|
|
|
+ arr = append(arr, update)
|
|
|
+ }
|
|
|
+ if len(arr) > 500 {
|
|
|
+ Mgo.UpdateBulk("bidding_text", arr...)
|
|
|
+ arr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(arr) > 0 {
|
|
|
+ Mgo.UpdateBulk("bidding_text", arr...)
|
|
|
+ arr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+}
|
|
|
+func testdata() {
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "attach_text": 1,
|
|
|
+ }
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "site": "中国招标投标公共服务平台",
|
|
|
+ }
|
|
|
+ lock := sync.Mutex{}
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ updateArr := [][]map[string]interface{}{}
|
|
|
+ count, _ := sess.DB("qfw").C("bidding").Find(&query).Count()
|
|
|
+ qu.Debug("查询数据总条数:", count)
|
|
|
+ it := sess.DB("qfw").C("bidding").Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ if id > "62335b000000000000000000" {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ ok, filetext := AnalysisFile(tmp) //解析附件是否替换到detail
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ update = append(update, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "detail": filetext,
|
|
|
+ "detailupdate": true,
|
|
|
+ },
|
|
|
+ })
|
|
|
+ lock.Lock()
|
|
|
+ //update
|
|
|
+ updateArr = append(updateArr, update)
|
|
|
+ if len(updateArr) > 500 {
|
|
|
+ Mgo.UpdateBulk("bidding", updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(updateArr) > 0 {
|
|
|
+ Mgo.UpdateBulk("bidding", updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+}*/
|