123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- package main
- import (
- "log"
- "qfw/util"
- elastic "qfw/util/elastic"
- "sync"
- "unicode/utf8"
- u "./util"
- "gopkg.in/mgo.v2/bson"
- )
- //定时查询bidding中extract_state为2的数据生成索引
- func biddingPurchaingTask(q map[string]interface{}) {
- defer util.Catch()
- //线程池
- SaveUpdageLock := sync.Mutex{}
- //连接参数
- c, _ := bidding["collect"].(string) //bidding表
- db, _ := bidding["db"].(string) //库
- index, _ := bidding["index"].(string) //索引别名
- itype, _ := bidding["type"].(string)
- //
- session := mgo.GetMgoConn(86400)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- log.Println("biddingPurchaingTask: ", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Select(bson.M{
- "projectinfo.attachment": 0,
- "contenthtml": 0,
- }).Iter()
- arrEs := make([]map[string]interface{}, savesizei)
- arrMgo := [][]map[string]interface{}{}
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- n++
- if util.IntAll(tmp["extracttype"]) == -1 { //重复数据不生索引
- continue
- }
- newTmp := map[string]interface{}{} //最终生索引的数据
- //oss拼装filetext
- filetext := getFileText(tmp)
- newTmp["filetext"] = filetext
- //purchasing
- newTmp["purchasing"] = tmp["purchasing"]
- //purchasinglist
- newTmp["purchasinglist"] = tmp["purchasinglist"]
- //处理数据
- if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
- if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
- delete(tmp, "supervisorrate")
- }
- }
- //对projectscope字段的索引处理
- ps, _ := tmp["projectscope"].(string)
- if len(ps) > ESLEN {
- tmp["projectscope"] = string(([]rune(ps))[:4000])
- }
- SaveUpdageLock.Lock()
- for _, v := range biddingIndexFields { //索引字段
- if tmp[v] != nil {
- if "projectinfo" == v {
- mp, _ := tmp[v].(map[string]interface{})
- if mp != nil {
- newmap := map[string]interface{}{}
- for _, v1 := range projectinfoFields {
- if mp[v1] != nil {
- newmap[v1] = mp[v1]
- }
- }
- newTmp[v] = newmap
- attachments := mp["attachments"]
- con := ""
- if attachments != nil {
- am, _ := attachments.(map[string]interface{})
- if am != nil {
- for _, v1 := range am {
- vm, _ := v1.(map[string]interface{})
- if vm != nil {
- c, _ := vm["content"].(string)
- con += c
- }
- }
- }
- }
- con = FilterDetailSpace(con)
- if con != "" {
- newTmp["attachments"] = con
- }
- }
- } else {
- if v == "detail" {
- detail, _ := tmp[v].(string)
- newTmp[v] = FilterDetail(detail)
- } else {
- newTmp[v] = tmp[v]
- }
- }
- }
- }
- arrEs = append(arrEs, newTmp)
- arrMgo = append(arrMgo, []map[string]interface{}{ //要更新数据
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "extract_state": 4,
- },
- },
- })
- //批量更新
- if len(arrMgo) >= savesizei-1 {
- mgo.UpdateBulkAll(db, c, arrMgo...)
- arrMgo = [][]map[string]interface{}{}
- }
- //生索引
- if len(arrEs) >= savesizei-1 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- arrEs = []map[string]interface{}{}
- }
- SaveUpdageLock.Unlock()
- //计数
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- SaveUpdageLock.Lock()
- if len(arrMgo) > 0 {
- mgo.UpdateBulkAll(db, c, arrMgo...)
- }
- if len(arrEs) > 0 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- }
- SaveUpdageLock.Unlock()
- log.Println("create filetext index...over", n)
- }
- func getFileText(tmp map[string]interface{}) (filetext string) {
- if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
- for _, tmpData1 := range attchMap {
- if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok {
- for _, result := range tmpData2 {
- if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
- if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" {
- bs := u.OssGetObject(attach_url) //oss读数据
- if utf8.RuneCountInString(filetext+bs) < util.IntAllDef(Sysconfig["filelength"], 100000) {
- filetext += bs + "\n"
- } else {
- break
- }
- }
- }
- }
- }
- }
- }
- return
- }
|