package main import ( "log" "qfw/util" elastic "qfw/util/elastic" "sync" "unicode/utf8" u "./util" "gopkg.in/mgo.v2/bson" ) //定时查询bidding中extract_state为2的数据生成索引 func biddingPurchaingTask(q map[string]interface{}) { defer util.Catch() //锁 SaveUpdageLock := sync.Mutex{} //连接参数 c, _ := bidding["collect"].(string) //bidding表 db, _ := bidding["db"].(string) //库 index, _ := bidding["index"].(string) //索引别名 itype, _ := bidding["type"].(string) // session := mgo.GetMgoConn(86400) defer mgo.DestoryMongoConn(session) count, _ := session.DB(db).C(c).Find(&q).Count() log.Println("biddingPurchaingTask: ", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(db).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).Iter() arrEs := make([]map[string]interface{}, savesizei) arrMgo := [][]map[string]interface{}{} var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { n++ if util.IntAll(tmp["extracttype"]) == -1 { // || util.IntAll(tmp["dataging"]) == 1 { //重复数据不生索引 tmp = make(map[string]interface{}) continue } newTmp := map[string]interface{}{} //最终生索引的数据 saveArr := []map[string]interface{}{} //oss拼装filetext if filetext := getFileText(tmp); len(filetext) > 10 { if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1 tmp["detail"] = filetext saveArr = append(saveArr, map[string]interface{}{"_id": tmp["_id"]}) saveArr = append(saveArr, map[string]interface{}{ "$set": map[string]interface{}{ "filedetail": 1, "detail": filetext, }, }) } newTmp["filetext"] = filetext } //purchasing if purchasing, ok := tmp["purchasing"].(string); ok { if len(purchasing) > 0 { newTmp["purchasing"] = tmp["purchasing"] } } //purchasinglist if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok { util.Debug(len(purchasinglist)) if len(purchasinglist) > 0 { purchasinglist_new := []map[string]interface{}{} for _, ls := range purchasinglist { lsm_new := make(map[string]interface{}) lsm := ls.(map[string]interface{}) for _, pf := range purchasinglistFields { if lsm[pf] != nil { lsm_new[pf] = lsm[pf] } } if lsm_new != nil && len(lsm_new) > 0 { purchasinglist_new = append(purchasinglist_new, lsm_new) } } util.Debug(len(purchasinglist_new), purchasinglist_new) if len(purchasinglist_new) > 0 { newTmp["purchasinglist"] = purchasinglist_new } } } //处理数据 if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引 if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型 delete(tmp, "supervisorrate") } } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } SaveUpdageLock.Lock() for _, v := range biddingIndexFields { //索引字段 if tmp[v] != nil { if "projectinfo" == v { mp, _ := tmp[v].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for _, v1 := range projectinfoFields { if mp[v1] != nil { newmap[v1] = mp[v1] } } newTmp[v] = newmap // attachments := mp["attachments"] // con := "" // if attachments != nil { // am, _ := attachments.(map[string]interface{}) // if am != nil { // for _, v1 := range am { // vm, _ := v1.(map[string]interface{}) // if vm != nil { // c, _ := vm["content"].(string) // con += c // } // } // } // } // con = FilterDetailSpace(con) // if con != "" { // newTmp["attachments"] = con // } } } else { if v == "detail" { detail, _ := tmp[v].(string) newTmp[v] = FilterDetail(detail) } else { newTmp[v] = tmp[v] } } } } arrEs = append(arrEs, newTmp) if len(saveArr) > 0 { arrMgo = append(arrMgo, saveArr) //要更新数据 } // arrMgo = append(arrMgo, []map[string]interface{}{ //要更新数据 // map[string]interface{}{ // "_id": tmp["_id"], // }, // map[string]interface{}{ // "$set": map[string]interface{}{ // "extract_state": 4, // }, // }, // }) //批量更新 if len(arrMgo) >= savesizei-1 { mgo.UpdateBulkAll(db, c, arrMgo...) arrMgo = [][]map[string]interface{}{} } //生索引 if len(arrEs) >= savesizei-1 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } SaveUpdageLock.Unlock() //计数 if n%savesizei == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } SaveUpdageLock.Lock() if len(arrMgo) > 0 { mgo.UpdateBulkAll(db, c, arrMgo...) } if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) } SaveUpdageLock.Unlock() log.Println("create filetext index...over", n) } //定时任务site:中国招标投标公共服务平台 /* 注意: 1、调用此任务时config.json中indexfields配置不要有purchasing、purchasinglist、filetext */ func site_attach_text(q map[string]interface{}) { defer util.Catch() //锁 SaveUpdageLock := sync.Mutex{} //连接参数 c, _ := bidding["collect"].(string) //bidding表 db, _ := bidding["db"].(string) //库 index, _ := bidding["index"].(string) //索引别名 itype, _ := bidding["type"].(string) // session := mgo.GetMgoConn(86400) defer mgo.DestoryMongoConn(session) count, _ := session.DB(db).C(c).Find(&q).Count() log.Println("site_attach_text: ", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(db).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).Iter() arrEs := make([]map[string]interface{}, savesizei) arrMgo := [][]map[string]interface{}{} var n int var indexnum int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { n++ //计数 if n%savesizei == 0 { log.Println("当前:", n) } site, _ := tmp["site"].(string) if util.IntAll(tmp["extracttype"]) == -1 || site != "中国招标投标公共服务平台" || tmp["attach_text"] == nil { tmp = make(map[string]interface{}) continue } newTmp := map[string]interface{}{} //最终生索引的数据 saveArr := []map[string]interface{}{} filetext := getFileText(tmp) //oss拼装filetext if len(filetext) > 10 { tmp["detail"] = filetext //filetext替换detail saveArr = append(saveArr, map[string]interface{}{"_id": tmp["_id"]}) saveArr = append(saveArr, map[string]interface{}{ "$set": map[string]interface{}{ "filedetail": 1, "detail": filetext, }, }) newTmp["filetext"] = filetext // } else { //log.Println("filetext is null string:", tmp["_id"]) tmp = make(map[string]interface{}) continue } indexnum++ //purchasing if purchasing, ok := tmp["purchasing"].(string); ok { if len(purchasing) > 0 { newTmp["purchasing"] = tmp["purchasing"] } } //purchasinglist if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok { if len(purchasinglist) > 0 { purchasinglist_new := []map[string]interface{}{} for _, ls := range purchasinglist { lsm_new := make(map[string]interface{}) lsm := ls.(map[string]interface{}) for _, pf := range purchasinglistFields { if lsm[pf] != nil { lsm_new[pf] = lsm[pf] } } if lsm_new != nil && len(lsm_new) > 0 { purchasinglist_new = append(purchasinglist_new, lsm_new) } } if len(purchasinglist_new) > 0 { newTmp["purchasinglist"] = purchasinglist_new } } } //处理数据 if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引 if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型 delete(tmp, "supervisorrate") } } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } SaveUpdageLock.Lock() for _, v := range biddingIndexFields { //索引字段 if tmp[v] != nil { if "projectinfo" == v { mp, _ := tmp[v].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for _, v1 := range projectinfoFields { if mp[v1] != nil { newmap[v1] = mp[v1] } } newTmp[v] = newmap } } else { if v == "detail" { detail, _ := tmp[v].(string) newTmp[v] = FilterDetail(detail) } else { newTmp[v] = tmp[v] } } } } arrEs = append(arrEs, newTmp) //要生索引数据 if len(saveArr) > 0 { arrMgo = append(arrMgo, saveArr) //要更新数据 } //批量更新 if len(arrMgo) >= savesizei-1 { mgo.UpdateBulkAll(db, c, arrMgo...) arrMgo = [][]map[string]interface{}{} } //生索引 if len(arrEs) >= savesizei-1 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } SaveUpdageLock.Unlock() tmp = make(map[string]interface{}) } SaveUpdageLock.Lock() if len(arrMgo) > 0 { mgo.UpdateBulkAll(db, c, arrMgo...) } if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) } SaveUpdageLock.Unlock() log.Println("create filetext index...over", n, indexnum) } func getFileText(tmp map[string]interface{}) (filetext string) { if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok { for _, tmpData1 := range attchMap { if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok { for _, result := range tmpData2 { if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok { if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" { bs := u.OssGetObject(attach_url) //oss读数据 if utf8.RuneCountInString(filetext+bs) < fileLength { filetext += bs + "\n" } else { break } } } } } } } return }