123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- package main
- import (
- "fmt"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "mobile_tag/oss"
- "sync"
- "time"
- )
- var (
- Mgo *mongodb.MongodbSim
- Es *elastic.Elastic
- EsNew *elastic.Elastic
- Es3 *elastic.Elastic
- MatchArr = make([]TagMatching, 0) // 存放标签规则
- globalRegs = make([]TagMatching, 0) //关键词规则,是标签规则大前提
- //更新es
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 5) //保存协程
- // 更新mongo
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- )
- func main() {
- testOss()
- return
- go updateEsMethod() // 更新es
- go updateMethod()
- Init()
- //oss.InitOss()
- InitRule()
- taskRun()
- log.Println("over")
- c := make(chan bool, 1)
- <-c
- }
- func testOss() {
- oss.InitOss()
- detail := oss.OssGetObject("5a862f0640d2d9bbe88e3cec", "jy-datadetail")
- log.Println("detail:", detail)
- }
- // taskRun 执行
- func taskRun() {
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- ch := make(chan bool, 15)
- wg := &sync.WaitGroup{}
- //查询条件
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": mongodb.StringTOBsonId("65b7ebfc66cf0db42a996a65"),
- "$lte": mongodb.StringTOBsonId("6612323266cf0db42a75789f"),
- //"$lte": mongodb.StringTOBsonId("5f00e67e52c1d9fbf8367996"), //测试环境
- },
- }
- count := 0
- selected := map[string]interface{}{"title": 1, "detail": 1, "projectname": 1, "purchasing": 1, "buyer": 1, "attach_text": 1}
- it := sess.DB("qfw").C("bidding").Find(q).Select(selected).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%5000 == 0 {
- log.Println("current:", count, tmp["_id"])
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- // 存在附件时
- if atta, ok := tmp["attach_text"]; ok && atta != nil {
- id := mongodb.BsonIdToSId(tmp["_id"])
- if id == "" {
- return
- }
- err, doc := Es.GetById("bidding", id)
- if err != nil {
- return
- } else {
- tmp["filetext"] = doc["filetext"]
- }
- }
- gs, _, _ := TaskTags(tmp, globalRegs)
- if len(gs) > 0 {
- tags, match, add := TaskTags(tmp, MatchArr)
- if len(tags) > 0 {
- update := map[string]interface{}{
- "mobile_tag": tags,
- }
- log.Println("id--", mongodb.BsonIdToSId(tmp["_id"]), match, add)
- //更新MongoDB
- updatePool <- []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": update},
- }
- //Mgo.UpdateById("bidding", mongodb.BsonIdToSId(tmp["_id"]), map[string]interface{}{"$set": update})
- //更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": mongodb.BsonIdToSId(tmp["_id"])},
- update,
- }
- }
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- fmt.Println("over ---> ", count)
- }
- // updateMethod 更新MongoDB
- func updateMethod() {
- arru := make([][]map[string]interface{}, 50)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == 50 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 50)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 50)
- indexu = 0
- }
- }
- }
- }
- // updateEsMethod 更新es
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- EsNew.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- EsNew.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|