|
@@ -25,8 +25,8 @@ var (
|
|
|
ExtLogs map[*TaskInfo][]map[string]interface{} //抽取日志
|
|
|
TaskList map[string]*ExtractTask //任务列表
|
|
|
saveLimit = 200 //抽取日志批量保存
|
|
|
-
|
|
|
- Fields = `{"title":1,"detail":1,"contenthtml":1,"href":1,"site":1,"spidercode":1,"toptype":1,"area":1,"city":1}`
|
|
|
+ PageSize = 5000 //查询分页
|
|
|
+ Fields = `{"title":1,"detail":1,"contenthtml":1,"href":1,"site":1,"spidercode":1,"toptype":1,"area":1,"city":1}`
|
|
|
)
|
|
|
|
|
|
//启动测试抽取
|
|
@@ -123,19 +123,29 @@ func StopExtractTaskId(taskId string) bool {
|
|
|
func RunExtractTask(taskId string) {
|
|
|
ext := TaskList[taskId]
|
|
|
query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
|
|
|
- list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
|
|
|
- for k, v := range *list {
|
|
|
- log.Println(k, v["_id"])
|
|
|
+ count := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
|
|
|
+ pageNum := (count + PageSize - 1) / PageSize
|
|
|
+ log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
|
|
|
+ for i := 0; i < pageNum; i++ {
|
|
|
+ query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
|
|
|
+ log.Printf("page=%d,query=%v", i+1, query)
|
|
|
+ list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, PageSize)
|
|
|
+ for _, v := range *list {
|
|
|
+ log.Println(v["_id"])
|
|
|
+ if !ext.IsRun {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ j := PreInfo(v)
|
|
|
+ ext.TaskInfo.ProcessPool <- true
|
|
|
+ go ext.ExtractProcess(j)
|
|
|
+ ext.TaskInfo.LastExtId = qu.BsonIdToSId(v["_id"])
|
|
|
+ }
|
|
|
+ db.Mgo.UpdateById("task", ext.Id, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`)
|
|
|
if !ext.IsRun {
|
|
|
break
|
|
|
}
|
|
|
- j := PreInfo(v)
|
|
|
- ext.TaskInfo.ProcessPool <- true
|
|
|
- go ext.ExtractProcess(j)
|
|
|
- ext.TaskInfo.LastExtId = qu.BsonIdToSId(v["_id"])
|
|
|
}
|
|
|
//更新task.s_extlastid
|
|
|
- db.Mgo.UpdateById("task", ext.Id, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`)
|
|
|
time.AfterFunc(1*time.Minute, func() { RunExtractTask(taskId) })
|
|
|
}
|
|
|
|
|
@@ -696,20 +706,26 @@ type FieldValue struct {
|
|
|
//分析抽取结果并保存
|
|
|
func AnalysisSaveResult(doc *map[string]interface{}, result map[string][]*ju.ExtField, task *TaskInfo) {
|
|
|
_id := qu.BsonIdToSId((*doc)["_id"])
|
|
|
- result = ScoreFields(result)
|
|
|
+ iscore, _ := ju.Config["fieldscore"].(bool)
|
|
|
+ if iscore { //打分
|
|
|
+ result = ScoreFields(result)
|
|
|
+ }
|
|
|
//结果排序
|
|
|
values := map[string][]*ju.SortObject{}
|
|
|
for key, val := range result {
|
|
|
fieldValue := map[string][]interface{}{}
|
|
|
- // for _, v := range val {
|
|
|
- // if fieldValue[fmt.Sprint(v.Value)] == nil {
|
|
|
- // fieldValue[fmt.Sprint(v.Value)] = []interface{}{0, v.Value}
|
|
|
- // } else {
|
|
|
- // fieldValue[fmt.Sprint(v.Value)][0] = qu.IntAll(fieldValue[fmt.Sprint(v.Value)][0]) + 1
|
|
|
- // }
|
|
|
- // }
|
|
|
- for _, v := range val {
|
|
|
- fieldValue[fmt.Sprint(v.Value)+v.Type] = []interface{}{v.Score, v.Value}
|
|
|
+ if iscore { //走打分
|
|
|
+ for _, v := range val {
|
|
|
+ fieldValue[fmt.Sprint(v.Value)+v.Type] = []interface{}{v.Score, v.Value}
|
|
|
+ }
|
|
|
+ } else { //不走打分,按出现频次
|
|
|
+ for _, v := range val {
|
|
|
+ if fieldValue[fmt.Sprint(v.Value)] == nil {
|
|
|
+ fieldValue[fmt.Sprint(v.Value)] = []interface{}{0, v.Value}
|
|
|
+ } else {
|
|
|
+ fieldValue[fmt.Sprint(v.Value)][0] = qu.IntAll(fieldValue[fmt.Sprint(v.Value)][0]) + 1
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
objects := []*ju.SortObject{}
|
|
|
for k, v := range fieldValue {
|