123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- package main
- import (
- "go.mongodb.org/mongo-driver/bson"
- "log"
- qutil "qfw/util"
- elastic "qfw/util/elastic"
- "reflect"
- "strings"
- "unicode/utf8"
- //elastic "qfw/util/elastic_v5"
- "regexp"
- // "strings"
- "sync"
- )
- var (
- BulkSizeBack = 400
- ESLEN = 32766
- )
- var reg_letter = regexp.MustCompile("[a-z]*")
- func biddingTask() {
- defer qutil.Catch()
- //bidding库
- session := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(session)
- db := qutil.ObjToString(bidding["db"])
- coll := qutil.ObjToString(bidding["collect"])
- //q := map[string]interface{}{"updatetime": map[string]interface{}{"$gt": 1643262000}}
- //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5db7c324a5cb26b9b78b0f09")}
- count, _ := session.DB(db).C(coll).Find(nil).Count()
- index := qutil.ObjToString(bidding["index"])
- stype := qutil.ObjToString(bidding["type"])
- //线程池
- UpdatesLock := sync.Mutex{}
- qutil.Debug("查询语句:", nil, "同步总数:", count, "elastic库:")
- //查询招标数据
- query := session.DB(db).C(coll).Find(nil).Select(bson.M{
- "projectinfo.attachment": 0,
- "contenthtml": 0,
- "publishdept": 0,
- }).Sort("_id").Iter()
- //查询抽取结果
- n := 0
- //更新数组
- arrEs := []map[string]interface{}{}
- thread := 10
- espool := make(chan bool, 5)
- var mpool = make(chan bool, thread)
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- if n%2000 == 0 {
- log.Println("current:", n, tmp["_id"])
- }
- if qutil.ObjToString(tmp["useable"]) == "0" {
- continue
- }
- mpool <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-mpool
- }()
- subscopeclass, _ := tmp["subscopeclass"].([]interface{}) //subscopeclass
- if subscopeclass != nil {
- m1 := map[string]bool{}
- newclass := []string{}
- for _, sc := range subscopeclass {
- sclass, _ := sc.(string)
- if !m1[sclass] {
- m1[sclass] = true
- newclass = append(newclass, sclass)
- }
- }
- tmp["s_subscopeclass"] = strings.Join(newclass, ",")
- tmp["subscopeclass"] = newclass
- }
- topscopeclass, _ := tmp["topscopeclass"].([]interface{}) //topscopeclass
- if topscopeclass != nil {
- m2 := map[string]bool{}
- newclass := []string{}
- for _, tc := range topscopeclass {
- tclass, _ := tc.(string)
- tclass = reg_letter.ReplaceAllString(tclass, "") // 去除字母
- if !m2[tclass] {
- m2[tclass] = true
- newclass = append(newclass, tclass)
- }
- }
- tmp["s_topscopeclass"] = strings.Join(newclass, ",")
- }
- //对projectscope字段的索引处理
- ps, _ := tmp["projectscope"].(string)
- if len(ps) > ESLEN {
- tmp["projectscope"] = string(([]rune(ps))[:4000])
- }
- //对标的物为空处理
- filetext := getFileText(tmp)
- filetextS := filetext
- if len([]rune(filetextS)) > 10 { //attach_text
- tmp["filetext"] = filetext
- }
- if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" {
- delete(tmp, "purchasing")
- }
- if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 {
- delete(tmp, "purchasinglist")
- }
- //数据为空处理
- for _, f := range []string{"bidstatus", "city", "district", "channel"} {
- if fVal, ok := tmp[f].(string); ok && fVal == "" {
- delete(tmp, f)
- }
- }
- UpdatesLock.Lock()
- newTmp := map[string]interface{}{}
- for field, ftype := range biddingIndexFieldsMap {
- if tmp[field] != nil { //
- if field == "projectinfo" {
- mp, _ := tmp[field].(map[string]interface{})
- if mp != nil {
- newmap := map[string]interface{}{}
- for k, ktype := range projectinfoFieldsMap {
- mpv := mp[k]
- if mpv != nil && reflect.TypeOf(mpv).String() == ktype {
- newmap[k] = mp[k]
- }
- }
- if len(newmap) > 0 {
- newTmp[field] = newmap
- }
- }
- } else if field == "purchasinglist" { //标的物处理
- purchasinglist_new := []map[string]interface{}{}
- if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 {
- for _, ls := range pcl {
- lsm_new := make(map[string]interface{})
- lsm := ls.(map[string]interface{})
- for pf, pftype := range purchasinglistFieldsMap {
- lsmv := lsm[pf]
- if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype {
- lsm_new[pf] = lsm[pf]
- }
- }
- if lsm_new != nil && len(lsm_new) > 0 {
- purchasinglist_new = append(purchasinglist_new, lsm_new)
- }
- }
- }
- if len(purchasinglist_new) > 0 {
- newTmp[field] = purchasinglist_new
- }
- } else if field == "winnerorder" { //中标候选
- winnerorder_new := []map[string]interface{}{}
- if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 {
- for _, win := range winnerorder {
- winMap_new := make(map[string]interface{})
- winMap := win.(map[string]interface{})
- for wf, wftype := range winnerorderlistFieldsMap {
- wfv := winMap[wf]
- if wfv != nil && reflect.TypeOf(wfv).String() == wftype {
- if wf == "sort" && qutil.Int64All(wfv) > 100 {
- continue
- }
- winMap_new[wf] = winMap[wf]
- }
- }
- if winMap_new != nil && len(winMap_new) > 0 {
- winnerorder_new = append(winnerorder_new, winMap_new)
- }
- }
- }
- if len(winnerorder_new) > 0 {
- newTmp[field] = winnerorder_new
- }
- } else if field == "qualifies" {
- //项目资质
- qs := []string{}
- if q, _ := tmp[field].([]interface{}); len(q) > 0 {
- for _, v := range q {
- v1 := v.(map[string]interface{})
- qs = append(qs, qutil.ObjToString(v1["key"]))
- }
- }
- if len(qs) > 0 {
- newTmp[field] = strings.Join(qs, ",")
- }
- } else if field == "review_experts" {
- // 评审专家
- if arr, ok := tmp["review_experts"].([]interface{}); ok && len(arr) > 0 {
- arr1 := qutil.ObjArrToStringArr(arr)
- newTmp[field] = strings.Join(arr1, ",")
- }
- } else if field == "entidlist" {
- newTmp[field] = tmp[field]
- } else if field == "bidopentime" {
- if tmp[field] != nil && tmp["bidendtime"] == nil {
- newTmp["bidendtime"] = tmp[field]
- }
- if tmp[field] == nil && tmp["bidendtime"] != nil {
- newTmp[field] = tmp["bidendtime"]
- }
- } else if field == "detail" { //过滤
- detail, _ := tmp[field].(string)
- if len([]rune(detail)) > detailLength {
- detail = detail[:detailLength]
- }
- if strings.Contains(detail, qutil.ObjToString(tmp["title"])) {
- newTmp[field] = FilterDetail(detail)
- } else {
- newTmp[field] = qutil.ObjToString(tmp["title"]) + " " + FilterDetail(detail)
- }
- } else if field == "_id" || field == "topscopeclass" { //不做处理
- newTmp[field] = tmp[field]
- } else if field == "publishtime" || field == "comeintime" {
- //字段类型不正确,特别处理
- if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
- newTmp[field] = qutil.Int64All(tmp[field])
- }
- } else if field == "s" {
- newTmp[field] = tmp[field]
- } else { //其它字段判断数据类型,不正确舍弃
- if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
- continue
- } else {
- if fieldval != "" {
- newTmp[field] = fieldval
- }
- }
- }
- }
- }
- arrEs = append(arrEs, newTmp)
- if len(arrEs) >= BulkSizeBack {
- tmps := arrEs
- espool <- true
- go func(tmps []map[string]interface{}) {
- defer func() {
- <-espool
- }()
- elastic.BulkSave(index, stype, &tmps, true)
- }(tmps)
- arrEs = []map[string]interface{}{}
- }
- UpdatesLock.Unlock()
- }(tmp)
- tmp = make(map[string]interface{})
- }
- for i := 0; i < thread; i++ {
- mpool <- true
- }
- UpdatesLock.Lock()
- if len(arrEs) > 0 {
- tmps := arrEs
- elastic.BulkSave(index, stype, &tmps, true)
- }
- UpdatesLock.Unlock()
- log.Println("create biddingback index...over", n)
- }
- var filterReg = regexp.MustCompile("<[^>]+>")
- var filterSpace = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]")
- func FilterDetail(text string) string {
- return filterReg.ReplaceAllString(text, "")
- }
- func FilterDetailSpace(text string) string {
- return filterSpace.ReplaceAllString(text, "")
- }
- // 正则判断是否包含
- func checkContains(s, sub string) bool {
- reg := regexp.MustCompile(`(?i)(^|([\s\t\n]+))(` + sub + `)($|([\s\t\n]+))`)
- return reg.MatchString(s)
- }
- func getFileText(tmp map[string]interface{}) (filetext string) {
- if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
- for _, tmpData1 := range attchMap {
- if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok {
- for _, result := range tmpData2 {
- if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
- if attach_url := qutil.ObjToString(resultMap["attach_url"]); attach_url != "" {
- bs := OssGetObject(attach_url) //oss读数据
- if utf8.RuneCountInString(filetext+bs) < fileLength {
- filetext += bs + "\n"
- } else {
- if utf8.RuneCountInString(bs) > fileLength {
- filetext = bs[0:fileLength]
- } else {
- filetext = bs
- }
- break
- }
- }
- }
- }
- }
- }
- }
- return
- }
|