|
@@ -1291,8 +1291,12 @@ func (e *ExtractTask) BidSave(init bool) {
|
|
|
arr := e.BidArr[:saveLimit]
|
|
|
e.BidArr = e.BidArr[saveLimit:]
|
|
|
e.RWMutex.Unlock()
|
|
|
+ arr, blocks, fieldalls, fieldallsf := getFieldAllAndBlocks(arr)
|
|
|
qu.Try(func() {
|
|
|
e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
|
|
|
}, func(err interface{}) {
|
|
|
log.Debug(err)
|
|
|
})
|
|
@@ -1300,8 +1304,12 @@ func (e *ExtractTask) BidSave(init bool) {
|
|
|
arr := e.BidArr
|
|
|
e.BidArr = [][]map[string]interface{}{}
|
|
|
e.RWMutex.Unlock()
|
|
|
+ arr, blocks, fieldalls, fieldallsf := getFieldAllAndBlocks(arr)
|
|
|
qu.Try(func() {
|
|
|
e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
|
|
|
}, func(err interface{}) {
|
|
|
log.Debug(err)
|
|
|
})
|
|
@@ -1321,9 +1329,17 @@ func (e *ExtractTask) BidSave(init bool) {
|
|
|
arr2 := arr[:saveLimit]
|
|
|
arr = arr[saveLimit:]
|
|
|
lenarr = len(arr)
|
|
|
+ arr2, blocks, fieldalls, fieldallsf := getFieldAllAndBlocks(arr2)
|
|
|
e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr2...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
|
|
|
} else {
|
|
|
+ arr, blocks, fieldalls, fieldallsf := getFieldAllAndBlocks(arr)
|
|
|
e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
|
|
|
+ e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
|
|
|
break
|
|
|
}
|
|
|
}
|
|
@@ -1333,6 +1349,48 @@ func (e *ExtractTask) BidSave(init bool) {
|
|
|
time.Sleep(1 * time.Second)
|
|
|
}
|
|
|
}
|
|
|
+func getFieldAllAndBlocks(a [][]map[string]interface{}) (arr [][]map[string]interface{}, blocks, fieldalls, fieldallsf []map[string]interface{}) {
|
|
|
+ arr = [][]map[string]interface{}{}
|
|
|
+ blocks = []map[string]interface{}{}
|
|
|
+ fieldalls = []map[string]interface{}{}
|
|
|
+ fieldallsf = []map[string]interface{}{}
|
|
|
+ for _, v := range a {
|
|
|
+ _id, _ := v[0]["_id"]
|
|
|
+ if tmp, ok := v[1]["$set"].(map[string]interface{}); ok {
|
|
|
+ if tmp["blocks"] != nil {
|
|
|
+ block := map[string]interface{}{
|
|
|
+ "_id": _id,
|
|
|
+ "blocks": tmp["blocks"],
|
|
|
+ }
|
|
|
+ blocks = append(blocks, block)
|
|
|
+ }
|
|
|
+ delete(tmp, "blocks")
|
|
|
+ if f, ok := tmp["fieldall"].(map[string][]map[string]interface{}); ok {
|
|
|
+ fieldall := map[string]interface{}{
|
|
|
+ "_id": _id,
|
|
|
+ }
|
|
|
+ for k, v := range f {
|
|
|
+ fieldall[k] = v
|
|
|
+ }
|
|
|
+ fieldalls = append(fieldalls, fieldall)
|
|
|
+ }
|
|
|
+ delete(tmp, "fieldall")
|
|
|
+ if ff, ok := tmp["fieldallf"].(map[string][]map[string]interface{}); ok {
|
|
|
+ fieldallf := map[string]interface{}{
|
|
|
+ "_id": _id,
|
|
|
+ }
|
|
|
+ for k, v := range ff {
|
|
|
+ fieldallf[k] = v
|
|
|
+ }
|
|
|
+ fieldallsf = append(fieldalls, fieldallf)
|
|
|
+ }
|
|
|
+ delete(tmp, "fieldallf")
|
|
|
+ v[1] = tmp
|
|
|
+ }
|
|
|
+ arr = append(arr, v)
|
|
|
+ }
|
|
|
+ return arr, blocks, fieldalls, fieldallsf
|
|
|
+}
|
|
|
|
|
|
func (e *ExtractTask) InitAuditRecogField() {
|
|
|
defer qu.Catch()
|