package main import ( "log" "qfw/util" elastic "qfw/util/elastic" "sync" "unicode/utf8" u "./util" "gopkg.in/mgo.v2/bson" ) //定时查询bidding中extract_state为2的数据生成索引 func biddingPurchaingTask(q map[string]interface{}) { defer util.Catch() //线程池 SaveUpdageLock := sync.Mutex{} //连接参数 c, _ := bidding["collect"].(string) //bidding表 db, _ := bidding["db"].(string) //库 index, _ := bidding["index"].(string) //索引别名 itype, _ := bidding["type"].(string) // session := mgo.GetMgoConn(86400) count, _ := session.DB(db).C(c).Find(&q).Count() log.Println("biddingPurchaingTask: ", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(db).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).Iter() arrEs := make([]map[string]interface{}, savesizei) arrMgo := [][]map[string]interface{}{} var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { n++ if util.IntAll(tmp["extracttype"]) == -1 { //重复数据不生索引 continue } newTmp := map[string]interface{}{} //最终生索引的数据 //oss拼装filetext filetext := getFileText(tmp) newTmp["filetext"] = filetext //purchasing newTmp["purchasing"] = tmp["purchasing"] //purchasinglist newTmp["purchasinglist"] = tmp["purchasinglist"] //处理数据 if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引 if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型 delete(tmp, "supervisorrate") } } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } SaveUpdageLock.Lock() for _, v := range biddingIndexFields { //索引字段 if tmp[v] != nil { if "projectinfo" == v { mp, _ := tmp[v].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for _, v1 := range projectinfoFields { if mp[v1] != nil { newmap[v1] = mp[v1] } } newTmp[v] = newmap attachments := mp["attachments"] con := "" if attachments != nil { am, _ := attachments.(map[string]interface{}) if am != nil { for _, v1 := range am { vm, _ := v1.(map[string]interface{}) if vm != nil { c, _ := vm["content"].(string) con += c } } } } con = FilterDetailSpace(con) if con != "" { newTmp["attachments"] = con } } } else { if v == "detail" { detail, _ := tmp[v].(string) newTmp[v] = FilterDetail(detail) } else { newTmp[v] = tmp[v] } } } } arrEs = append(arrEs, newTmp) arrMgo = append(arrMgo, []map[string]interface{}{ //要更新数据 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "extract_state": 4, }, }, }) //批量更新 if len(arrMgo) >= savesizei-1 { mgo.UpdateBulkAll(db, c, arrMgo...) arrMgo = [][]map[string]interface{}{} } //生索引 if len(arrEs) >= savesizei-1 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } SaveUpdageLock.Unlock() //计数 if n%savesizei == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } SaveUpdageLock.Lock() if len(arrMgo) > 0 { mgo.UpdateBulkAll(db, c, arrMgo...) } if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) } SaveUpdageLock.Unlock() log.Println("create filetext index...over", n) } func getFileText(tmp map[string]interface{}) (filetext string) { if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok { for _, tmpData1 := range attchMap { if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok { for _, result := range tmpData2 { if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok { if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" { bs := u.OssGetObject(attach_url) //oss读数据 if utf8.RuneCountInString(filetext+bs) < util.IntAllDef(Sysconfig["filelength"], 100000) { filetext += bs + "\n" } else { break } } } } } } } return }