|
@@ -0,0 +1,638 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ "github.com/cron"
|
|
|
+ "github.com/go-xweb/xweb"
|
|
|
+ "mongodb"
|
|
|
+ "net/http"
|
|
|
+ qu "qfw/util"
|
|
|
+ sp "spiderutil"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "go.mongodb.org/mongo-driver/bson/primitive"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ Webport string
|
|
|
+ ListenPort string
|
|
|
+ Config map[string]interface{}
|
|
|
+ Mgo *mongodb.MongodbSim
|
|
|
+ Coll string //ocr_flie_over
|
|
|
+ MoveCollFile string //bidding_file
|
|
|
+ MoveCollNomal string //bidding_nomal
|
|
|
+ Api string
|
|
|
+ To string
|
|
|
+ TaskTime int
|
|
|
+ SaveMgoCache = make(chan map[string]interface{}, 1000) //
|
|
|
+ SP = make(chan bool, 5)
|
|
|
+ //StartId string
|
|
|
+ Stop bool
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ qu.ReadConfig(&Config)
|
|
|
+ InitFileInfo() //初始化附件解析信息
|
|
|
+ InitOss() //oss
|
|
|
+ Webport = qu.ObjToString(Config["webport"])
|
|
|
+ ListenPort = qu.ObjToString(Config["listenport"])
|
|
|
+ Coll = qu.ObjToString(Config["coll"])
|
|
|
+ MoveCollFile = qu.ObjToString(Config["movecollfile"])
|
|
|
+ MoveCollNomal = qu.ObjToString(Config["movecollnomal"])
|
|
|
+ //StartId = qu.ObjToString(Config["startid"])
|
|
|
+ Api = qu.ObjToString(Config["api"])
|
|
|
+ To = qu.ObjToString(Config["to"])
|
|
|
+ TaskTime = qu.IntAll(Config["tasktime"])
|
|
|
+ Mgo = &mongodb.MongodbSim{
|
|
|
+ MongodbAddr: qu.ObjToString(Config["address"]),
|
|
|
+ DbName: qu.ObjToString(Config["dbname"]),
|
|
|
+ Size: qu.IntAll(Config["size"]),
|
|
|
+ UserName: qu.ObjToString(Config["username"]),
|
|
|
+ Password: qu.ObjToString(Config["password"]),
|
|
|
+ }
|
|
|
+ Mgo.InitPool()
|
|
|
+}
|
|
|
+func main() {
|
|
|
+ qu.Debug("start...")
|
|
|
+ c := cron.New()
|
|
|
+ c.Start()
|
|
|
+ c.AddFunc("0 0 0 ? * *", DeleteData)
|
|
|
+ go monitor()
|
|
|
+ go SaveData() //数据保存
|
|
|
+ go FileDataMoveToBidding() //附件数据迁移至bidding
|
|
|
+ go NormalDataMoveToBidding() //正常数据迁移至bidding
|
|
|
+ xweb.Run(":" + Webport)
|
|
|
+ //ch := make(chan bool, 1)
|
|
|
+ //<-ch
|
|
|
+}
|
|
|
+func monitor() {
|
|
|
+ //最好是单实例调用
|
|
|
+ http.HandleFunc("/movebidding/stop", func(w http.ResponseWriter, r *http.Request) {
|
|
|
+ fmt.Println("停止数据迁移...")
|
|
|
+ Stop = true
|
|
|
+ w.Write([]byte("ok"))
|
|
|
+ })
|
|
|
+ http.HandleFunc("/movebidding/run", func(w http.ResponseWriter, r *http.Request) {
|
|
|
+ fmt.Println("启动数据迁移...")
|
|
|
+ Stop = false
|
|
|
+ w.Write([]byte("ok"))
|
|
|
+ })
|
|
|
+ http.ListenAndServe(":"+ListenPort, nil)
|
|
|
+}
|
|
|
+
|
|
|
+func NormalDataMoveToBidding() {
|
|
|
+ defer qu.Catch()
|
|
|
+ for {
|
|
|
+ query := map[string]interface{}{ //查询标物解析完成,未迁移的数据
|
|
|
+ "bid_completetime": map[string]interface{}{
|
|
|
+ "$exists": true,
|
|
|
+ },
|
|
|
+ "moveok": map[string]interface{}{
|
|
|
+ "$exists": false,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ count := Mgo.Count(MoveCollNomal, query)
|
|
|
+ qu.Debug("bidding_nomal,本轮查询数据量:", count, Stop)
|
|
|
+ if count > 0 && !Stop {
|
|
|
+ NormalDataMove(query)
|
|
|
+ } else {
|
|
|
+ time.Sleep(time.Duration(TaskTime) * time.Second)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func NormalDataMove(query map[string]interface{}) {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "bid_completetime": 0,
|
|
|
+ "bid_starttime": 0,
|
|
|
+ "goods_read": 0,
|
|
|
+ }
|
|
|
+ lock := sync.Mutex{}
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ updateArr := [][]map[string]interface{}{}
|
|
|
+ it := sess.DB("qfw").C(MoveCollNomal).Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ _id := tmp["_id"]
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": _id})
|
|
|
+ newId := primitive.NewObjectID()
|
|
|
+ tmp["_id"] = newId
|
|
|
+ if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
|
|
|
+ strid := mongodb.BsonIdToSId(newId)
|
|
|
+ tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strid) + `.html`
|
|
|
+ }
|
|
|
+ //更新
|
|
|
+ set := map[string]interface{}{"moveok": true}
|
|
|
+ set["biddingid"] = mongodb.BsonIdToSId(newId)
|
|
|
+ update = append(update, map[string]interface{}{
|
|
|
+ "$set": set,
|
|
|
+ })
|
|
|
+ //save
|
|
|
+ SaveMgoCache <- tmp
|
|
|
+ lock.Lock()
|
|
|
+ //update
|
|
|
+ updateArr = append(updateArr, update)
|
|
|
+ if len(updateArr) > 200 {
|
|
|
+ Mgo.UpdateBulk(MoveCollNomal, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(updateArr) > 0 {
|
|
|
+ Mgo.UpdateBulk(MoveCollNomal, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ qu.Debug("bidding_nomal,本轮迁移数据量:", n)
|
|
|
+}
|
|
|
+
|
|
|
+func FileDataMoveToBidding() {
|
|
|
+ defer qu.Catch()
|
|
|
+ for {
|
|
|
+ query := map[string]interface{}{ //查询附件解析完成,未迁移的数据
|
|
|
+ "bid_completetime": map[string]interface{}{
|
|
|
+ "$exists": true,
|
|
|
+ },
|
|
|
+ "file_completetime": map[string]interface{}{
|
|
|
+ "$exists": true,
|
|
|
+ },
|
|
|
+ "moveok": map[string]interface{}{
|
|
|
+ "$exists": false,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ count := Mgo.Count(MoveCollFile, query)
|
|
|
+ qu.Debug("bidding_file,本轮查询数据量:", count, Stop)
|
|
|
+ if count > 0 && !Stop {
|
|
|
+ FileDataMove(query)
|
|
|
+ } else {
|
|
|
+ time.Sleep(time.Duration(TaskTime) * time.Second)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func FileDataMove(query map[string]interface{}) {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "attach_time": 0,
|
|
|
+ "file_completetime": 0,
|
|
|
+ "file_starttime": 0,
|
|
|
+ "bid_completetime": 0,
|
|
|
+ "bid_starttime": 0,
|
|
|
+ "file_read": 0,
|
|
|
+ }
|
|
|
+ lock := sync.Mutex{}
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ updateArr := [][]map[string]interface{}{}
|
|
|
+ it := sess.DB("qfw").C(MoveCollFile).Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ _id := tmp["_id"]
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": _id})
|
|
|
+ newId := primitive.NewObjectID()
|
|
|
+ tmp["_id"] = newId //新id
|
|
|
+ if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
|
|
|
+ strid := mongodb.BsonIdToSId(newId)
|
|
|
+ tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strid) + `.html`
|
|
|
+ }
|
|
|
+ //更新
|
|
|
+ set := map[string]interface{}{"moveok": true}
|
|
|
+ set["biddingid"] = mongodb.BsonIdToSId(newId)
|
|
|
+ site := qu.ObjToString(tmp["site"]) //解析附件站点
|
|
|
+ if limitRatio := OssSite[site]; limitRatio > 0 { //配置站点解析附件,根据准确率情况替换正文
|
|
|
+ replace, filetext := AnalysisFile(true, limitRatio, tmp)
|
|
|
+ if replace { //替换正文
|
|
|
+ tmp["detail"] = filetext
|
|
|
+ set["filetext"] = true
|
|
|
+ }
|
|
|
+ } else { //其它网站附件信息,detail无效,只有一个附件且不是ocr识别的,替换正文
|
|
|
+ //判断detail是否有效
|
|
|
+ detail := qu.ObjToString(tmp["detail"])
|
|
|
+ detail = sp.FilterDetail(detail) //只保留文本内容
|
|
|
+ if len([]rune(detail)) <= 5 || (len([]rune(detail)) <= 50 && SpecialTextReg.MatchString(detail)) {
|
|
|
+ replace, filetext := AnalysisFile(false, 0, tmp)
|
|
|
+ if replace { //替换正文
|
|
|
+ tmp["detail"] = filetext
|
|
|
+ set["filetext"] = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //IsReplaceDetailSite := OssSite[site]
|
|
|
+ //if IsReplaceDetailSite {
|
|
|
+ // replace, fileOk, filetext := AnalysisFile_back(IsReplaceDetailSite, tmp) //解析附件是否替换到detail
|
|
|
+ // if replace {
|
|
|
+ // tmp["detail"] = filetext //替换正文
|
|
|
+ // }
|
|
|
+ // if !fileOk { //附件异常
|
|
|
+ // set["filerr"] = true
|
|
|
+ // //set["filetext"] = filetext//文本过大,导致更新mongo失败
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ update = append(update, map[string]interface{}{
|
|
|
+ "$set": set,
|
|
|
+ })
|
|
|
+ //save
|
|
|
+ SaveMgoCache <- tmp
|
|
|
+ lock.Lock()
|
|
|
+ //update
|
|
|
+ updateArr = append(updateArr, update)
|
|
|
+ if len(updateArr) > 200 {
|
|
|
+ Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(updateArr) > 0 {
|
|
|
+ Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ qu.Debug("bidding_file,本轮迁移数据量:", n)
|
|
|
+}
|
|
|
+
|
|
|
+func SaveData() {
|
|
|
+ fmt.Println("Save Bidding Data...")
|
|
|
+ savearr := make([]map[string]interface{}, 200)
|
|
|
+ indexh := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-SaveMgoCache:
|
|
|
+ savearr[indexh] = v
|
|
|
+ indexh++
|
|
|
+ if indexh == 200 { //超过200条开始保存
|
|
|
+ SP <- true
|
|
|
+ go func(savearr []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-SP
|
|
|
+ }()
|
|
|
+ Mgo.SaveBulk("bidding", savearr...)
|
|
|
+ }(savearr)
|
|
|
+ savearr = make([]map[string]interface{}, 200)
|
|
|
+ indexh = 0
|
|
|
+ }
|
|
|
+ case <-time.After(10 * time.Second): //超过10秒开始保存
|
|
|
+ qu.Debug("定时保存数据...", indexh)
|
|
|
+ if indexh > 0 {
|
|
|
+ SP <- true
|
|
|
+ go func(savearr []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-SP
|
|
|
+ }()
|
|
|
+ Mgo.SaveBulk("bidding", savearr...)
|
|
|
+ }(savearr[:indexh])
|
|
|
+ savearr = make([]map[string]interface{}, 200)
|
|
|
+ indexh = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func DeleteData() {
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$lt": time.Now().AddDate(0, -1, 0).Unix(),
|
|
|
+ },
|
|
|
+ "moveok": true,
|
|
|
+ }
|
|
|
+ n := Mgo.Delete(MoveCollFile, query)
|
|
|
+ n1 := Mgo.Delete(MoveCollNomal, query)
|
|
|
+ qu.Debug("bidding_file删除数据量:", n, " bidding_nomal删除数据量:", n1)
|
|
|
+}
|
|
|
+
|
|
|
+/*func DataMoveToBidding() {
|
|
|
+ defer qu.Catch()
|
|
|
+ for {
|
|
|
+ //查询ocr_flie_over获取要迁移的id段
|
|
|
+ gtid, lteid := GetIdInterval()
|
|
|
+ if gtid != "" && lteid != "" {
|
|
|
+ MoveData(gtid, lteid)
|
|
|
+ } else {
|
|
|
+ time.Sleep(time.Duration(TaskTime) * time.Second)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}*/
|
|
|
+
|
|
|
+//获取ID区间
|
|
|
+/*func GetIdInterval() (string, string) {
|
|
|
+ defer qu.Catch()
|
|
|
+ qu.Debug("获取id段...")
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "isused": false,
|
|
|
+ }
|
|
|
+ list, _ := Mgo.Find(Coll, query, map[string]interface{}{"_id": 1}, nil, false, -1, -1)
|
|
|
+ dataLength := len(*list)
|
|
|
+ if dataLength == 0 { //无迁移id段
|
|
|
+ return "", ""
|
|
|
+ }
|
|
|
+ gtid, lteid := "", ""
|
|
|
+ idArr := []interface{}{}
|
|
|
+ for i, l := range *list {
|
|
|
+ gtidTmp := qu.ObjToString(l["gtid"])
|
|
|
+ lteidTmp := qu.ObjToString(l["lteid"])
|
|
|
+ if gtidTmp == "" || lteidTmp == "" || gtidTmp >= lteidTmp {
|
|
|
+ qu.Debug("查询id段出错")
|
|
|
+ SendMail("查询id段异常")
|
|
|
+ time.Sleep(1 * time.Minute)
|
|
|
+ return "", ""
|
|
|
+ }
|
|
|
+ if i == 0 { //第一条数据的起始id为最终迁移数据的起始id
|
|
|
+ gtid = gtidTmp
|
|
|
+ }
|
|
|
+ if i == dataLength-1 { //最后一条数据的结束id为最终迁移数据的结束id
|
|
|
+ lteid = lteidTmp
|
|
|
+ }
|
|
|
+ idArr = append(idArr, l["_id"])
|
|
|
+ }
|
|
|
+ //成功获取id区间
|
|
|
+ if gtid != "" && lteid != "" && gtid < lteid {
|
|
|
+ if gtid < StartId { //本轮起始id小于上轮结束id,id区间异常
|
|
|
+ qu.Debug("异常区间:", gtid, StartId)
|
|
|
+ SendMail("id区间异常")
|
|
|
+ time.Sleep(1 * time.Minute)
|
|
|
+ return "", ""
|
|
|
+ }
|
|
|
+ StartId = lteid
|
|
|
+ for _, id := range idArr { //更新
|
|
|
+ Mgo.Update(Coll, map[string]interface{}{"_id": id}, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{"isused": true, "updatetime": time.Now().Unix()},
|
|
|
+ }, false, false)
|
|
|
+ }
|
|
|
+ return gtid, lteid
|
|
|
+ }
|
|
|
+ return "", ""
|
|
|
+
|
|
|
+}*/
|
|
|
+
|
|
|
+//处理迁移数据
|
|
|
+/*func MoveData(gtid, lteid string) {
|
|
|
+ defer qu.Catch()
|
|
|
+ qu.Debug("迁移开始,ID 区间:", gtid, lteid)
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": mongodb.StringTOBsonId(gtid),
|
|
|
+ "$lte": mongodb.StringTOBsonId(lteid),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "attach_time": 0,
|
|
|
+ }
|
|
|
+ lock := sync.Mutex{}
|
|
|
+ ch := make(chan bool, 3)
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ saveArr := []map[string]interface{}{}
|
|
|
+ updateArr := [][]map[string]interface{}{}
|
|
|
+ count, _ := sess.DB("qfw").C(MoveCollFile).Find(&query).Count()
|
|
|
+ qu.Debug("查询数据总条数:", count)
|
|
|
+ it := sess.DB("qfw").C(MoveCollFile).Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ if tmp["moveok"] != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ _id := tmp["_id"]
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": _id})
|
|
|
+ newId := primitive.NewObjectID()
|
|
|
+ if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
|
|
|
+ strid := mongodb.BsonIdToSId(newId)
|
|
|
+ tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strid) + `.html`
|
|
|
+ }
|
|
|
+ //更新
|
|
|
+ set := map[string]interface{}{"moveok": true}
|
|
|
+ set["biddingid"] = mongodb.BsonIdToSId(newId)
|
|
|
+
|
|
|
+ site := qu.ObjToString(tmp["site"]) //解析附件站点
|
|
|
+ IsReplaceDetailSite := OssSite[site]
|
|
|
+ if IsReplaceDetailSite {
|
|
|
+ replace, fileOk, filetext := AnalysisFile(IsReplaceDetailSite, tmp) //解析附件是否替换到detail
|
|
|
+ if replace {
|
|
|
+ tmp["detail"] = filetext //替换正文
|
|
|
+ }
|
|
|
+ if !fileOk { //附件异常
|
|
|
+ set["filerr"] = true
|
|
|
+ //set["filetext"] = filetext//文本过大,导致更新mongo失败
|
|
|
+ }
|
|
|
+ }
|
|
|
+ update = append(update, map[string]interface{}{
|
|
|
+ "$set": set,
|
|
|
+ })
|
|
|
+ tmp["_id"] = newId
|
|
|
+ tmp["comeintime"] = time.Now().Unix()
|
|
|
+ lock.Lock()
|
|
|
+ //save
|
|
|
+ saveArr = append(saveArr, tmp)
|
|
|
+ if len(saveArr) > 50 { //不能设置过大,加上保存服务批量保存数据,会造成两id间数据缺失
|
|
|
+ Mgo.SaveBulk("bidding", saveArr...)
|
|
|
+ saveArr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ //update
|
|
|
+ updateArr = append(updateArr, update)
|
|
|
+ if len(updateArr) > 500 {
|
|
|
+ Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(saveArr) > 0 {
|
|
|
+ Mgo.SaveBulk("bidding", saveArr...)
|
|
|
+ saveArr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ if len(updateArr) > 0 {
|
|
|
+ Mgo.UpdateBulk(MoveCollFile, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ qu.Debug("迁移结束,ID 区间:", gtid, lteid)
|
|
|
+}*/
|
|
|
+
|
|
|
+/*func SendMail(bodyTextAll string) {
|
|
|
+ res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", Api, To, "ocr_file_over", bodyTextAll))
|
|
|
+ if err == nil {
|
|
|
+ defer res.Body.Close()
|
|
|
+ read, err := ioutil.ReadAll(res.Body)
|
|
|
+ qu.Debug("邮件发送成功:", string(read), err)
|
|
|
+ } else {
|
|
|
+ qu.Debug("邮件发送失败:", err)
|
|
|
+ }
|
|
|
+}
|
|
|
+*/
|
|
|
+
|
|
|
+/*func FileText() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ field := map[string]interface{}{
|
|
|
+ "attach_text": 1,
|
|
|
+ }
|
|
|
+ it := sess.DB("mxs").C("bidding_text").Find(nil).Select(&field).Iter()
|
|
|
+ n := 0
|
|
|
+ arr := [][]map[string]interface{}{}
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ ok, filetext := AnalysisFile(tmp)
|
|
|
+ if ok {
|
|
|
+ update = append(update, map[string]interface{}{"$set": map[string]interface{}{
|
|
|
+ "filetext": filetext,
|
|
|
+ }})
|
|
|
+ }
|
|
|
+ //filetext := qu.ObjToString(tmp["filetext"])
|
|
|
+ //ok, filetext, state, comratio, notcomratio := AnalysisFileTest(filetext) //解析
|
|
|
+ //update := []map[string]interface{}{}
|
|
|
+ //update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ //update = append(update, map[string]interface{}{"$set": map[string]interface{}{
|
|
|
+ // "ok": ok,
|
|
|
+ // "filetexb": filetext,
|
|
|
+ // "state": state,
|
|
|
+ // "comratio": comratio,
|
|
|
+ // "notcomratio": notcomratio,
|
|
|
+ //}})
|
|
|
+ lock.Lock()
|
|
|
+ if len(update) == 2 {
|
|
|
+ arr = append(arr, update)
|
|
|
+ }
|
|
|
+ if len(arr) > 500 {
|
|
|
+ Mgo.UpdateBulk("bidding_text", arr...)
|
|
|
+ arr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(arr) > 0 {
|
|
|
+ Mgo.UpdateBulk("bidding_text", arr...)
|
|
|
+ arr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+}
|
|
|
+func testdata() {
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "attach_text": 1,
|
|
|
+ }
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "site": "中国招标投标公共服务平台",
|
|
|
+ }
|
|
|
+ lock := sync.Mutex{}
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ updateArr := [][]map[string]interface{}{}
|
|
|
+ count, _ := sess.DB("qfw").C("bidding").Find(&query).Count()
|
|
|
+ qu.Debug("查询数据总条数:", count)
|
|
|
+ it := sess.DB("qfw").C("bidding").Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ if id > "62335b000000000000000000" {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ ok, filetext := AnalysisFile(tmp) //解析附件是否替换到detail
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ update = append(update, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "detail": filetext,
|
|
|
+ "detailupdate": true,
|
|
|
+ },
|
|
|
+ })
|
|
|
+ lock.Lock()
|
|
|
+ //update
|
|
|
+ updateArr = append(updateArr, update)
|
|
|
+ if len(updateArr) > 500 {
|
|
|
+ Mgo.UpdateBulk("bidding", updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(updateArr) > 0 {
|
|
|
+ Mgo.UpdateBulk("bidding", updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+}*/
|