123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365 |
- package main
- import (
- "log"
- "qfw/util"
- elastic "qfw/util/elastic"
- "sync"
- "unicode/utf8"
- u "./util"
- "gopkg.in/mgo.v2/bson"
- )
- //定时查询bidding中extract_state为2的数据生成索引
- func biddingPurchaingTask(q map[string]interface{}) {
- defer util.Catch()
- //锁
- SaveUpdageLock := sync.Mutex{}
- //连接参数
- c, _ := bidding["collect"].(string) //bidding表
- db, _ := bidding["db"].(string) //库
- index, _ := bidding["index"].(string) //索引别名
- itype, _ := bidding["type"].(string)
- //
- session := mgo.GetMgoConn(86400)
- defer mgo.DestoryMongoConn(session)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- log.Println("biddingPurchaingTask: ", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Select(bson.M{
- "projectinfo.attachment": 0,
- "contenthtml": 0,
- }).Iter()
- arrEs := make([]map[string]interface{}, savesizei)
- arrMgo := [][]map[string]interface{}{}
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- n++
- if util.IntAll(tmp["extracttype"]) == -1 { // || util.IntAll(tmp["dataging"]) == 1 { //重复数据不生索引
- tmp = make(map[string]interface{})
- continue
- }
- newTmp := map[string]interface{}{} //最终生索引的数据
- saveArr := []map[string]interface{}{}
- //oss拼装filetext
- if filetext := getFileText(tmp); len(filetext) > 10 {
- if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1
- tmp["detail"] = filetext
- saveArr = append(saveArr, map[string]interface{}{"_id": tmp["_id"]})
- saveArr = append(saveArr, map[string]interface{}{
- "$set": map[string]interface{}{
- "filedetail": 1,
- "detail": filetext,
- },
- })
- }
- newTmp["filetext"] = filetext
- }
- //purchasing
- if purchasing, ok := tmp["purchasing"].(string); ok {
- if len(purchasing) > 0 {
- newTmp["purchasing"] = tmp["purchasing"]
- }
- }
- //purchasinglist
- if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok {
- util.Debug(len(purchasinglist))
- if len(purchasinglist) > 0 {
- purchasinglist_new := []map[string]interface{}{}
- for _, ls := range purchasinglist {
- lsm_new := make(map[string]interface{})
- lsm := ls.(map[string]interface{})
- for _, pf := range purchasinglistFields {
- if lsm[pf] != nil {
- lsm_new[pf] = lsm[pf]
- }
- }
- if lsm_new != nil && len(lsm_new) > 0 {
- purchasinglist_new = append(purchasinglist_new, lsm_new)
- }
- }
- util.Debug(len(purchasinglist_new), purchasinglist_new)
- if len(purchasinglist_new) > 0 {
- newTmp["purchasinglist"] = purchasinglist_new
- }
- }
- }
- //处理数据
- if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
- if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
- delete(tmp, "supervisorrate")
- }
- }
- //对projectscope字段的索引处理
- ps, _ := tmp["projectscope"].(string)
- if len(ps) > ESLEN {
- tmp["projectscope"] = string(([]rune(ps))[:4000])
- }
- SaveUpdageLock.Lock()
- for _, v := range biddingIndexFields { //索引字段
- if tmp[v] != nil {
- if "projectinfo" == v {
- mp, _ := tmp[v].(map[string]interface{})
- if mp != nil {
- newmap := map[string]interface{}{}
- for _, v1 := range projectinfoFields {
- if mp[v1] != nil {
- newmap[v1] = mp[v1]
- }
- }
- newTmp[v] = newmap
- // attachments := mp["attachments"]
- // con := ""
- // if attachments != nil {
- // am, _ := attachments.(map[string]interface{})
- // if am != nil {
- // for _, v1 := range am {
- // vm, _ := v1.(map[string]interface{})
- // if vm != nil {
- // c, _ := vm["content"].(string)
- // con += c
- // }
- // }
- // }
- // }
- // con = FilterDetailSpace(con)
- // if con != "" {
- // newTmp["attachments"] = con
- // }
- }
- } else {
- if v == "detail" {
- detail, _ := tmp[v].(string)
- newTmp[v] = FilterDetail(detail)
- } else {
- newTmp[v] = tmp[v]
- }
- }
- }
- }
- arrEs = append(arrEs, newTmp)
- if len(saveArr) > 0 {
- arrMgo = append(arrMgo, saveArr) //要更新数据
- }
- // arrMgo = append(arrMgo, []map[string]interface{}{ //要更新数据
- // map[string]interface{}{
- // "_id": tmp["_id"],
- // },
- // map[string]interface{}{
- // "$set": map[string]interface{}{
- // "extract_state": 4,
- // },
- // },
- // })
- //批量更新
- if len(arrMgo) >= savesizei-1 {
- mgo.UpdateBulkAll(db, c, arrMgo...)
- arrMgo = [][]map[string]interface{}{}
- }
- //生索引
- if len(arrEs) >= savesizei-1 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- arrEs = []map[string]interface{}{}
- }
- SaveUpdageLock.Unlock()
- //计数
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- SaveUpdageLock.Lock()
- if len(arrMgo) > 0 {
- mgo.UpdateBulkAll(db, c, arrMgo...)
- }
- if len(arrEs) > 0 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- }
- SaveUpdageLock.Unlock()
- log.Println("create filetext index...over", n)
- }
- //定时任务site:中国招标投标公共服务平台
- /*
- 注意:
- 1、调用此任务时config.json中indexfields配置不要有purchasing、purchasinglist、filetext
- */
- func site_attach_text(q map[string]interface{}) {
- defer util.Catch()
- //锁
- SaveUpdageLock := sync.Mutex{}
- //连接参数
- c, _ := bidding["collect"].(string) //bidding表
- db, _ := bidding["db"].(string) //库
- index, _ := bidding["index"].(string) //索引别名
- itype, _ := bidding["type"].(string)
- //
- session := mgo.GetMgoConn(86400)
- defer mgo.DestoryMongoConn(session)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- log.Println("site_attach_text: ", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Select(bson.M{
- "projectinfo.attachment": 0,
- "contenthtml": 0,
- }).Iter()
- arrEs := make([]map[string]interface{}, savesizei)
- arrMgo := [][]map[string]interface{}{}
- var n int
- var indexnum int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- n++
- //计数
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- site, _ := tmp["site"].(string)
- if util.IntAll(tmp["extracttype"]) == -1 || site != "中国招标投标公共服务平台" || tmp["attach_text"] == nil {
- tmp = make(map[string]interface{})
- continue
- }
- newTmp := map[string]interface{}{} //最终生索引的数据
- saveArr := []map[string]interface{}{}
- filetext := getFileText(tmp) //oss拼装filetext
- if len(filetext) > 10 {
- tmp["detail"] = filetext //filetext替换detail
- saveArr = append(saveArr, map[string]interface{}{"_id": tmp["_id"]})
- saveArr = append(saveArr, map[string]interface{}{
- "$set": map[string]interface{}{
- "filedetail": 1,
- "detail": filetext,
- },
- })
- newTmp["filetext"] = filetext //
- } else {
- //log.Println("filetext is null string:", tmp["_id"])
- tmp = make(map[string]interface{})
- continue
- }
- indexnum++
- //purchasing
- if purchasing, ok := tmp["purchasing"].(string); ok {
- if len(purchasing) > 0 {
- newTmp["purchasing"] = tmp["purchasing"]
- }
- }
- //purchasinglist
- if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok {
- if len(purchasinglist) > 0 {
- purchasinglist_new := []map[string]interface{}{}
- for _, ls := range purchasinglist {
- lsm_new := make(map[string]interface{})
- lsm := ls.(map[string]interface{})
- for _, pf := range purchasinglistFields {
- if lsm[pf] != nil {
- lsm_new[pf] = lsm[pf]
- }
- }
- if lsm_new != nil && len(lsm_new) > 0 {
- purchasinglist_new = append(purchasinglist_new, lsm_new)
- }
- }
- if len(purchasinglist_new) > 0 {
- newTmp["purchasinglist"] = purchasinglist_new
- }
- }
- }
- //处理数据
- if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
- if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
- delete(tmp, "supervisorrate")
- }
- }
- //对projectscope字段的索引处理
- ps, _ := tmp["projectscope"].(string)
- if len(ps) > ESLEN {
- tmp["projectscope"] = string(([]rune(ps))[:4000])
- }
- SaveUpdageLock.Lock()
- for _, v := range biddingIndexFields { //索引字段
- if tmp[v] != nil {
- if "projectinfo" == v {
- mp, _ := tmp[v].(map[string]interface{})
- if mp != nil {
- newmap := map[string]interface{}{}
- for _, v1 := range projectinfoFields {
- if mp[v1] != nil {
- newmap[v1] = mp[v1]
- }
- }
- newTmp[v] = newmap
- }
- } else {
- if v == "detail" {
- detail, _ := tmp[v].(string)
- newTmp[v] = FilterDetail(detail)
- } else {
- newTmp[v] = tmp[v]
- }
- }
- }
- }
- arrEs = append(arrEs, newTmp) //要生索引数据
- if len(saveArr) > 0 {
- arrMgo = append(arrMgo, saveArr) //要更新数据
- }
- //批量更新
- if len(arrMgo) >= savesizei-1 {
- mgo.UpdateBulkAll(db, c, arrMgo...)
- arrMgo = [][]map[string]interface{}{}
- }
- //生索引
- if len(arrEs) >= savesizei-1 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- arrEs = []map[string]interface{}{}
- }
- SaveUpdageLock.Unlock()
- tmp = make(map[string]interface{})
- }
- SaveUpdageLock.Lock()
- if len(arrMgo) > 0 {
- mgo.UpdateBulkAll(db, c, arrMgo...)
- }
- if len(arrEs) > 0 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- }
- SaveUpdageLock.Unlock()
- log.Println("create filetext index...over", n, indexnum)
- }
- func getFileText(tmp map[string]interface{}) (filetext string) {
- if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
- for _, tmpData1 := range attchMap {
- if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok {
- for _, result := range tmpData2 {
- if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
- if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" {
- bs := u.OssGetObject(attach_url) //oss读数据
- if utf8.RuneCountInString(filetext+bs) < fileLength {
- filetext += bs + "\n"
- } else {
- break
- }
- }
- }
- }
- }
- }
- }
- return
- }
|