|
@@ -11,6 +11,7 @@ import (
|
|
|
"strings"
|
|
|
"sync"
|
|
|
. "task"
|
|
|
+ "time"
|
|
|
. "udptask"
|
|
|
"util"
|
|
|
)
|
|
@@ -29,8 +30,111 @@ var fileIndexMap = map[string]bool{
|
|
|
"type": true,
|
|
|
"publishdept": true,
|
|
|
}
|
|
|
-var publishtimeReg = regexp.MustCompile("\\d{4}-\\d{2}-\\d{2}")
|
|
|
-var fields = map[string]interface{}{"state": 1, "spidercode": 1, "site": 1, "channel": 1, "title": 1, "href": 1}
|
|
|
+var publishtimeReg1 = regexp.MustCompile("\\d{4}-\\d{2}-\\d{2}")
|
|
|
+var publishtimeReg2 = regexp.MustCompile("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}")
|
|
|
+
|
|
|
+type DataRequest struct {
|
|
|
+ From string `json:"from"`
|
|
|
+ Repeat string `json:"repeat"`
|
|
|
+ Level string `json:"level"`
|
|
|
+ Search string `json:"search"`
|
|
|
+ Starttime int64 `json:"starttime"`
|
|
|
+}
|
|
|
+
|
|
|
+func (f *Front) ImportTask() {
|
|
|
+ defer qu.Catch()
|
|
|
+ msg := ""
|
|
|
+ ok := false
|
|
|
+ coll := f.GetString("coll") //数据表
|
|
|
+ createtask := f.GetString("createtask") //是否生成任务
|
|
|
+ msgTmp := CheckTask(coll) //校验任务
|
|
|
+ if createtask == "1" && msgTmp != "" { //如果存在coll表未执行任务,不能新建任务
|
|
|
+ msg = msgTmp
|
|
|
+ } else {
|
|
|
+ if RunningTask[coll] { //判断是否有执行中的任务在使用coll表
|
|
|
+ msg = "表:" + coll + "已有任务正在运行,请稍后导入任务"
|
|
|
+ } else {
|
|
|
+ isClear := false
|
|
|
+ importstype := f.GetString("importstype") //导入方
|
|
|
+ if importstype == "0" { //导入方式为清除时,判断coll表有没有执行完的任务,但是数据未推送
|
|
|
+ if util.Mgo.Count("task", map[string]interface{}{"s_coll": coll, "i_state": 2, "issend": false}) > 0 {
|
|
|
+ msg = "表:" + coll + "有历史数据未推送"
|
|
|
+ goto L
|
|
|
+ } else { //清理表
|
|
|
+ isClear = true
|
|
|
+ if !util.MgoDT.Del(coll, nil) {
|
|
|
+ msg = "表:" + coll + "清理失败"
|
|
|
+ goto L
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ isbidding := f.GetString("isbidding") //是否是bidding
|
|
|
+ requeststr := f.GetString("requeststr")
|
|
|
+ dr := &DataRequest{}
|
|
|
+ qu.Debug(requeststr)
|
|
|
+ if json.Unmarshal([]byte(requeststr), dr) != nil {
|
|
|
+ msg = "检索内容格式化错误"
|
|
|
+ goto L
|
|
|
+ }
|
|
|
+ flows := f.GetString("select4") //处理流程
|
|
|
+ if isbidding == "1" {
|
|
|
+ //清理相关表
|
|
|
+ if isClear {
|
|
|
+ if !util.MgoDT.Del("bidding_copy", nil) { //清理bidding_cp表
|
|
|
+ msg = "表:bidding_cp清理失败"
|
|
|
+ goto L
|
|
|
+ }
|
|
|
+ if flows != "" { //有数据处理流程,清理extract抽取表
|
|
|
+ if !util.MgoDT.Del("extract", nil) {
|
|
|
+ msg = "表:extract清理失败"
|
|
|
+ goto L
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //根据检索条件检索异常数据,导入coll表
|
|
|
+ importOk, querynum, okNum := importWarnErrData(coll, dr, isbidding == "1")
|
|
|
+ if importOk { //导入数据成功
|
|
|
+ msg = "共检索数据量:" + fmt.Sprint(querynum) + "条,导入成功:" + fmt.Sprint(okNum) + "条"
|
|
|
+ if createtask == "1" { //创建任务
|
|
|
+ db := f.GetString("db") //数据库
|
|
|
+ checkfields := f.GetString("checkfields") //校验字段
|
|
|
+ name := f.GetString("name") //任务名称
|
|
|
+ flowsArr := []string{}
|
|
|
+ if flows != "" {
|
|
|
+ for _, flow := range strings.Split(flows, ",") {
|
|
|
+ flowsArr = append(flowsArr, flow)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ user := f.GetSession("user").(map[string]interface{})
|
|
|
+ save := map[string]interface{}{
|
|
|
+ "s_name": name,
|
|
|
+ "s_db": db,
|
|
|
+ "s_coll": coll,
|
|
|
+ "s_checkfields": checkfields,
|
|
|
+ "isbidding": isbidding == "1",
|
|
|
+ "flows": flowsArr,
|
|
|
+ "l_createtime": time.Now().Unix(),
|
|
|
+ "s_createuser": user["name"],
|
|
|
+ "i_state": 0,
|
|
|
+ "delete": false,
|
|
|
+ "issend": false,
|
|
|
+ }
|
|
|
+ if util.Mgo.Save("task", save) != "" {
|
|
|
+ ok = true
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ ok = true
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ msg = "检索数据量为0"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+L:
|
|
|
+ qu.Debug("导入任务:", ok, msg)
|
|
|
+ f.ServeJson(map[string]interface{}{"msg": msg, "ok": ok})
|
|
|
+}
|
|
|
|
|
|
func (f *Front) PrepareBidding() {
|
|
|
defer qu.Catch()
|
|
@@ -86,7 +190,8 @@ func (f *Front) PrepareBidding() {
|
|
|
arr = [][]map[string]interface{}{}
|
|
|
}
|
|
|
if len(save) > 500 {
|
|
|
- util.MgoDT.SaveBulk(util.DataColl, save...)
|
|
|
+ //util.MgoDT.SaveBulk(util.DataColl, save...)
|
|
|
+ util.MgoDT.SaveBulk("bidding", save...)
|
|
|
save = []map[string]interface{}{}
|
|
|
}
|
|
|
lock.Unlock()
|
|
@@ -102,7 +207,8 @@ func (f *Front) PrepareBidding() {
|
|
|
arr = [][]map[string]interface{}{}
|
|
|
}
|
|
|
if len(save) > 0 {
|
|
|
- util.MgoDT.SaveBulk(util.DataColl, save...)
|
|
|
+ //util.MgoDT.SaveBulk(util.DataColl, save...)
|
|
|
+ util.MgoDT.SaveBulk("bidding", save...)
|
|
|
save = []map[string]interface{}{}
|
|
|
}
|
|
|
msg := "bidding_copy表共" + fmt.Sprint(count) + "条数据,成功拉取对应bidding信息" + fmt.Sprint(oknum) + "条"
|
|
@@ -144,77 +250,93 @@ func (f *Front) ViewBidding() {
|
|
|
|
|
|
func (f *Front) ImportData() {
|
|
|
defer qu.Catch()
|
|
|
- dataFile, _, err := f.GetFile("xlsx")
|
|
|
msgArr := []string{}
|
|
|
ok := true
|
|
|
coll := ""
|
|
|
save := []map[string]interface{}{}
|
|
|
- if err == nil {
|
|
|
- binary, _ := ioutil.ReadAll(dataFile)
|
|
|
- xls, _ := xlsx.OpenBinary(binary)
|
|
|
- sheet := xls.Sheets[0]
|
|
|
- coll = sheet.Name //sheetName当做表名
|
|
|
- if RunningTask[coll] { //正在处理任务的表禁止导入数据
|
|
|
- ok = false
|
|
|
- msgArr = append(msgArr, coll+"表已被占用!")
|
|
|
- goto END
|
|
|
- }
|
|
|
- fieldsTmp := map[string]int{} //记录字段对应的位置
|
|
|
- for i, row := range sheet.Rows {
|
|
|
- if i == 0 { //得到字段属性
|
|
|
- for j, cell := range row.Cells {
|
|
|
- if fileIndexMap[cell.Value] { //去除无效字段
|
|
|
- fieldsTmp[cell.Value] = j
|
|
|
+ cleardata := f.GetString("cleardata")
|
|
|
+ qu.Debug(cleardata)
|
|
|
+ mf, err := f.GetFiles()
|
|
|
+ if err == nil && len(mf) == 1 {
|
|
|
+ dataFile, err := mf[0].Open()
|
|
|
+ if err == nil {
|
|
|
+ binary, _ := ioutil.ReadAll(dataFile)
|
|
|
+ xls, _ := xlsx.OpenBinary(binary)
|
|
|
+ sheet := xls.Sheets[0]
|
|
|
+ coll = sheet.Name //sheetName当做表名
|
|
|
+ if RunningTask[coll] { //正在处理任务的表禁止导入数据
|
|
|
+ ok = false
|
|
|
+ msgArr = append(msgArr, coll+"表正在执行任务中!")
|
|
|
+ goto END
|
|
|
+ }
|
|
|
+ fieldsTmp := map[string]int{} //记录字段对应的位置
|
|
|
+ for i, row := range sheet.Rows {
|
|
|
+ if i == 0 { //得到字段属性
|
|
|
+ for j, cell := range row.Cells {
|
|
|
+ if fileIndexMap[cell.Value] { //去除无效字段
|
|
|
+ fieldsTmp[cell.Value] = j
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- } else { //生成数据
|
|
|
- result := map[string]interface{}{}
|
|
|
- cells := row.Cells
|
|
|
- cellsLen := len(cells)
|
|
|
- for field, index := range fieldsTmp {
|
|
|
- if cellsLen >= index+1 {
|
|
|
- cell := cells[index]
|
|
|
- //字段处理
|
|
|
- if field == "comeintime" {
|
|
|
- comeintime, _ := cell.Int64()
|
|
|
- result[field] = comeintime
|
|
|
- } else if field == "publishtime" {
|
|
|
- publishtime := cell.Value
|
|
|
- if publishtime == "0" || publishtime == "" {
|
|
|
- result[field] = "0"
|
|
|
- } else if publishtimeReg.MatchString(publishtime) { //2022-10-15 23:49:49
|
|
|
- result[field] = publishtime
|
|
|
- } else if cell.NumFmt == "m/d/yy h:mm" {
|
|
|
- t, _ := cell.GetTime(false)
|
|
|
- result[field] = t.Format(qu.Date_Full_Layout)
|
|
|
- } else { //其他类型视为异常
|
|
|
- ok = false
|
|
|
- msgArr = append(msgArr, "第"+fmt.Sprint(i+1)+"行publishtime字段异常")
|
|
|
- goto END
|
|
|
+ } else { //生成数据
|
|
|
+ result := map[string]interface{}{}
|
|
|
+ cells := row.Cells
|
|
|
+ cellsLen := len(cells)
|
|
|
+ for field, index := range fieldsTmp {
|
|
|
+ if cellsLen >= index+1 {
|
|
|
+ cell := cells[index]
|
|
|
+ //字段处理
|
|
|
+ if field == "comeintime" {
|
|
|
+ comeintime, _ := cell.Int64()
|
|
|
+ result[field] = comeintime
|
|
|
+ } else if field == "publishtime" {
|
|
|
+ publishtime := cell.Value
|
|
|
+ if cell.NumFmt == "m/d/yy h:mm" {
|
|
|
+ t, _ := cell.GetTime(false)
|
|
|
+ result[field] = t.Format(qu.Date_Full_Layout)
|
|
|
+ } else if publishtimeReg2.MatchString(publishtime) { //2022-10-15 23:49:49
|
|
|
+ result[field] = publishtime
|
|
|
+ } else if publishtimeReg1.MatchString(publishtime) { //2022-10-15
|
|
|
+ result[field] = publishtime + " 00:00:00"
|
|
|
+ } else if publishtime == "0" || publishtime == "" {
|
|
|
+ result[field] = "0"
|
|
|
+ } else { //其他类型视为异常
|
|
|
+ ok = false
|
|
|
+ msgArr = append(msgArr, "第"+fmt.Sprint(i+1)+"行publishtime字段异常")
|
|
|
+ goto END
|
|
|
+ }
|
|
|
+ } else { //其它字段
|
|
|
+ result[field] = cell.Value
|
|
|
}
|
|
|
- } else { //其它字段
|
|
|
- result[field] = cell.Value
|
|
|
}
|
|
|
}
|
|
|
+ //必要字段缺失校验
|
|
|
+ noFields := checkField(result)
|
|
|
+ if len(noFields) == 0 {
|
|
|
+ result["state"] = 0
|
|
|
+ result["times"] = 0
|
|
|
+ save = append(save, result)
|
|
|
+ } else {
|
|
|
+ ok = false
|
|
|
+ msgArr = append(msgArr, "第"+fmt.Sprint(i+1)+"行"+strings.Join(noFields, ",")+"字段不存在")
|
|
|
+ }
|
|
|
}
|
|
|
- //必要字段缺失校验
|
|
|
- noFields := checkField(result)
|
|
|
- if len(noFields) == 0 {
|
|
|
- result["state"] = 0
|
|
|
- result["times"] = 0
|
|
|
- save = append(save, result)
|
|
|
- } else {
|
|
|
- ok = false
|
|
|
- msgArr = append(msgArr, "第"+fmt.Sprint(i+1)+"行"+strings.Join(noFields, ",")+"字段不存在")
|
|
|
- }
|
|
|
-
|
|
|
}
|
|
|
+ } else {
|
|
|
+ qu.Debug("数据导入失败:", err)
|
|
|
+ msgArr = append(msgArr, "数据导入失败!")
|
|
|
}
|
|
|
+ } else {
|
|
|
+ qu.Debug("数据导入失败:", err)
|
|
|
+ msgArr = append(msgArr, "数据导入失败!")
|
|
|
}
|
|
|
END:
|
|
|
if ok { //保存数据
|
|
|
if len(save) > 0 && coll != "" && !strings.Contains(coll, "Sheet") {
|
|
|
- util.MgoDT.SaveBulk(coll, save...)
|
|
|
+ if cleardata == "是" && util.MgoDT.Del(coll, nil) { //清理coll表
|
|
|
+ util.MgoDT.SaveBulk(coll, save...)
|
|
|
+ } else {
|
|
|
+ util.MgoDT.SaveBulk(coll, save...)
|
|
|
+ }
|
|
|
} else {
|
|
|
ok = false
|
|
|
msgArr = append(msgArr, "存储表异常")
|
|
@@ -224,6 +346,90 @@ END:
|
|
|
f.ServeJson(map[string]interface{}{"ok": ok, "msg": msgArr})
|
|
|
}
|
|
|
|
|
|
+/*
|
|
|
+//func (f *Front) ImportData() {
|
|
|
+// defer qu.Catch()
|
|
|
+// dataFile, _, err := f.GetFile("xlsx")
|
|
|
+// msgArr := []string{}
|
|
|
+// ok := true
|
|
|
+// coll := ""
|
|
|
+// save := []map[string]interface{}{}
|
|
|
+// if err == nil {
|
|
|
+// binary, _ := ioutil.ReadAll(dataFile)
|
|
|
+// xls, _ := xlsx.OpenBinary(binary)
|
|
|
+// sheet := xls.Sheets[0]
|
|
|
+// coll = sheet.Name //sheetName当做表名
|
|
|
+// if RunningTask[coll] { //正在处理任务的表禁止导入数据
|
|
|
+// ok = false
|
|
|
+// msgArr = append(msgArr, coll+"表已被占用!")
|
|
|
+// goto END
|
|
|
+// }
|
|
|
+// fieldsTmp := map[string]int{} //记录字段对应的位置
|
|
|
+// for i, row := range sheet.Rows {
|
|
|
+// if i == 0 { //得到字段属性
|
|
|
+// for j, cell := range row.Cells {
|
|
|
+// if fileIndexMap[cell.Value] { //去除无效字段
|
|
|
+// fieldsTmp[cell.Value] = j
|
|
|
+// }
|
|
|
+// }
|
|
|
+// } else { //生成数据
|
|
|
+// result := map[string]interface{}{}
|
|
|
+// cells := row.Cells
|
|
|
+// cellsLen := len(cells)
|
|
|
+// for field, index := range fieldsTmp {
|
|
|
+// if cellsLen >= index+1 {
|
|
|
+// cell := cells[index]
|
|
|
+// //字段处理
|
|
|
+// if field == "comeintime" {
|
|
|
+// comeintime, _ := cell.Int64()
|
|
|
+// result[field] = comeintime
|
|
|
+// } else if field == "publishtime" {
|
|
|
+// publishtime := cell.Value
|
|
|
+// if publishtime == "0" || publishtime == "" {
|
|
|
+// result[field] = "0"
|
|
|
+// } else if publishtimeReg.MatchString(publishtime) { //2022-10-15 23:49:49
|
|
|
+// result[field] = publishtime
|
|
|
+// } else if cell.NumFmt == "m/d/yy h:mm" {
|
|
|
+// t, _ := cell.GetTime(false)
|
|
|
+// result[field] = t.Format(qu.Date_Full_Layout)
|
|
|
+// } else { //其他类型视为异常
|
|
|
+// ok = false
|
|
|
+// msgArr = append(msgArr, "第"+fmt.Sprint(i+1)+"行publishtime字段异常")
|
|
|
+// goto END
|
|
|
+// }
|
|
|
+// } else { //其它字段
|
|
|
+// result[field] = cell.Value
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// //必要字段缺失校验
|
|
|
+// noFields := checkField(result)
|
|
|
+// if len(noFields) == 0 {
|
|
|
+// result["state"] = 0
|
|
|
+// result["times"] = 0
|
|
|
+// save = append(save, result)
|
|
|
+// } else {
|
|
|
+// ok = false
|
|
|
+// msgArr = append(msgArr, "第"+fmt.Sprint(i+1)+"行"+strings.Join(noFields, ",")+"字段不存在")
|
|
|
+// }
|
|
|
+//
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//END:
|
|
|
+// if ok { //保存数据
|
|
|
+// if len(save) > 0 && coll != "" && !strings.Contains(coll, "Sheet") {
|
|
|
+// util.MgoDT.SaveBulk(coll, save...)
|
|
|
+// } else {
|
|
|
+// ok = false
|
|
|
+// msgArr = append(msgArr, "存储表异常")
|
|
|
+// }
|
|
|
+// }
|
|
|
+// save = []map[string]interface{}{}
|
|
|
+// f.ServeJson(map[string]interface{}{"ok": ok, "msg": msgArr})
|
|
|
+//}
|
|
|
+*/
|
|
|
+
|
|
|
func (f *Front) DataList() {
|
|
|
defer qu.Catch()
|
|
|
coll := f.GetString("coll")
|
|
@@ -247,6 +453,7 @@ func (f *Front) DataList() {
|
|
|
}
|
|
|
}
|
|
|
qu.Debug("query:", query)
|
|
|
+ fields := map[string]interface{}{"state": 1, "spidercode": 1, "site": 1, "channel": 1, "title": 1, "href": 1}
|
|
|
task, _ := util.MgoDT.Find(coll, query, map[string]interface{}{"_id": 1}, fields, false, start, limit)
|
|
|
count := util.MgoDT.Count(coll, query)
|
|
|
f.ServeJson(map[string]interface{}{
|
|
@@ -294,18 +501,26 @@ func (f *Front) DataSend() {
|
|
|
taskid := f.GetString("taskid")
|
|
|
task, _ := util.Mgo.FindById("task", taskid, nil)
|
|
|
if len(*task) > 0 {
|
|
|
- flows := (*task)["flows"].([]interface{})
|
|
|
- t := &Task{
|
|
|
- ID: taskid,
|
|
|
- DB: qu.ObjToString((*task)["s_db"]),
|
|
|
- Name: qu.ObjToString((*task)["s_name"]),
|
|
|
- Coll: qu.ObjToString((*task)["s_coll"]),
|
|
|
- Flows: qu.ObjArrToStringArr(flows),
|
|
|
- IsBidding: (*task)["isbidding"].(bool),
|
|
|
+ coll := qu.ObjToString((*task)["s_coll"])
|
|
|
+ if util.MgoDT.Count(coll, map[string]interface{}{"state": 1}) > 0 { //有要被推送的数据
|
|
|
+ flows := (*task)["flows"].([]interface{})
|
|
|
+ t := &Task{
|
|
|
+ ID: taskid,
|
|
|
+ DB: qu.ObjToString((*task)["s_db"]),
|
|
|
+ Name: qu.ObjToString((*task)["s_name"]),
|
|
|
+ Coll: coll,
|
|
|
+ Flows: qu.ObjArrToStringArr(flows),
|
|
|
+ IsBidding: (*task)["isbidding"].(bool),
|
|
|
+ }
|
|
|
+ t.SendData()
|
|
|
+ ok = true
|
|
|
+ } else {
|
|
|
+ msg = "无数据需要被推送"
|
|
|
}
|
|
|
- t.SendData()
|
|
|
- ok = true
|
|
|
+ //更新任务,数据已推送
|
|
|
+ util.Mgo.UpdateById("task", taskid, map[string]interface{}{"$set": map[string]interface{}{"issend": true}})
|
|
|
}
|
|
|
+ qu.Debug("任务推送结果:", taskid, " ", ok, " ", msg)
|
|
|
f.ServeJson(map[string]interface{}{"rep": ok, "msg": msg})
|
|
|
}
|
|
|
|
|
@@ -373,10 +588,9 @@ func (f *Front) DataUpdate() {
|
|
|
result[k] = v
|
|
|
}
|
|
|
}
|
|
|
- qu.Debug(result)
|
|
|
if mustFiledNum > 0 { //保存服务推送有效数据
|
|
|
flag, id, coll := SaveObj(4002, "title", result)
|
|
|
- updateMap["state"] = 1 //添加数据下载成功标记
|
|
|
+ updateMap["state"] = 2 //添加数据推送给成功标记
|
|
|
updateMap["sendflag"] = flag
|
|
|
updateMap["biddingid"] = id
|
|
|
updateMap["biddingcoll"] = coll
|
|
@@ -413,10 +627,160 @@ func (f *Front) DataDelete() {
|
|
|
f.ServeJson(map[string]interface{}{"msg": msg, "ok": ok})
|
|
|
}
|
|
|
|
|
|
+func importWarnErrData(coll string, dr *DataRequest, isbidding bool) (ok bool, querynum, oknum int) {
|
|
|
+ defer qu.Catch()
|
|
|
+ qu.Debug("开始导入数据:", coll)
|
|
|
+ defer func() {
|
|
|
+ qu.Debug("导入数据完毕:", coll)
|
|
|
+ }()
|
|
|
+ //1、检索数据
|
|
|
+ query := map[string]interface{}{}
|
|
|
+ if dr.From != "-1" { //来源
|
|
|
+ query["from"] = dr.From
|
|
|
+ }
|
|
|
+ if dr.Repeat != "-1" { //是否重复
|
|
|
+ query["repeat"] = dr.Repeat == "1"
|
|
|
+ }
|
|
|
+ query["level"] = qu.IntAll(dr.Level) //无论biding还是非bidding数据,导入任务时,类型都不能为全部,所以不用判断
|
|
|
+ if dr.Starttime > 0 { //时间
|
|
|
+ query["comeintime"] = map[string]interface{}{
|
|
|
+ "$gte": dr.Starttime,
|
|
|
+ "$lt": dr.Starttime + 86400,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if dr.Search != "" { //搜索条件
|
|
|
+ query["$or"] = []interface{}{
|
|
|
+ map[string]interface{}{"site": map[string]interface{}{"$regex": dr.Search}},
|
|
|
+ map[string]interface{}{"spidercode": map[string]interface{}{"$regex": dr.Search}},
|
|
|
+ map[string]interface{}{"title": map[string]interface{}{"$regex": dr.Search}},
|
|
|
+ }
|
|
|
+ }
|
|
|
+ qu.Debug("导入任务query:", query)
|
|
|
+ querynum = util.MgoS.Count("spider_warn_err", query)
|
|
|
+ if querynum == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if isbidding { //如果是bidding数据,需保存到bidding_cp表一份数据
|
|
|
+ coll = "bidding_copy"
|
|
|
+ }
|
|
|
+ sess := util.MgoS.GetMgoConn()
|
|
|
+ defer util.MgoDT.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "data": 1,
|
|
|
+ }
|
|
|
+ it := sess.DB(util.MgoS.DbName).C("spider_warn_err").Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ saveColl := [][]map[string]interface{}{}
|
|
|
+ saveBidding := [][]map[string]interface{}{}
|
|
|
+ //oknum := 0 //记录biding数据从线上拉取的数量
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ set := map[string]interface{}{}
|
|
|
+ if data, ok := tmp["data"].(map[string]interface{}); ok { //必要字段
|
|
|
+ for _, field := range util.ImportDataFieldsCheck {
|
|
|
+ if data[field] == nil { //必要字段不存在,视为异常数据
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if field == "publishtime" { //
|
|
|
+ if pt, ok := data[field].(int64); ok {
|
|
|
+ set[field] = qu.FormatDateByInt64(&pt, qu.Date_Full_Layout)
|
|
|
+ } else if pt, ok := data[field].(float64); ok {
|
|
|
+ ptInt64 := int64(pt)
|
|
|
+ set[field] = qu.FormatDateByInt64(&ptInt64, qu.Date_Full_Layout)
|
|
|
+ } else if pt, ok := data[field].(string); ok {
|
|
|
+ if publishtimeReg2.MatchString(pt) { //2022-10-15 23:49:49
|
|
|
+ set[field] = pt
|
|
|
+ } else if publishtimeReg1.MatchString(pt) { //2022-10-15
|
|
|
+ set[field] = pt + " 00:00:00"
|
|
|
+ } else {
|
|
|
+ set[field] = pt
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ qu.Debug("spider_warn_err发布时间异常:", tmp["_id"])
|
|
|
+ set[field] = "0"
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ set[field] = data[field]
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ state := 0 //标记是否找到对应的bidding数据
|
|
|
+ if isbidding {
|
|
|
+ //根据title、spidercode、href查询线上bidding信息
|
|
|
+ title := qu.ObjToString(set["title"])
|
|
|
+ spidercode := qu.ObjToString(set["spidercode"])
|
|
|
+ href := qu.ObjToString(set["href"])
|
|
|
+ list, _ := util.MgoB.Find("bidding", map[string]interface{}{"title": title}, nil, nil, false, -1, -1)
|
|
|
+ for _, l := range *list {
|
|
|
+ if qu.ObjToString(l["href"]) == href && qu.ObjToString(l["spidercode"]) == spidercode {
|
|
|
+ state = 1
|
|
|
+ lock.Lock()
|
|
|
+ oknum++
|
|
|
+ l["state"] = 0
|
|
|
+ saveBidding = append(saveBidding, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": l["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": l,
|
|
|
+ },
|
|
|
+ })
|
|
|
+ lock.Unlock()
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ set["state"] = state
|
|
|
+ set["times"] = 0
|
|
|
+ update = append(update, map[string]interface{}{"$set": set})
|
|
|
+ lock.Lock()
|
|
|
+ if !isbidding { //非bidding数据导入的数据量
|
|
|
+ oknum++
|
|
|
+ }
|
|
|
+ saveColl = append(saveColl, update)
|
|
|
+ if len(saveColl) > 50 {
|
|
|
+ util.MgoDT.UpSertBulk(coll, saveColl...)
|
|
|
+ saveColl = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ if len(saveBidding) > 50 {
|
|
|
+ util.MgoDT.UpSertBulk("bidding", saveBidding...)
|
|
|
+ saveBidding = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ if len(saveColl) > 0 {
|
|
|
+ util.MgoDT.UpSertBulk(coll, saveColl...)
|
|
|
+ saveColl = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ if len(saveBidding) > 0 {
|
|
|
+ util.MgoDT.UpSertBulk("bidding", saveBidding...)
|
|
|
+ saveBidding = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ ok = true
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
func checkField(result map[string]interface{}) (arr []string) {
|
|
|
defer qu.Catch()
|
|
|
- for _, f := range []string{"href", "title", "site", "channel", "area", "spidercode", "publishtime"} {
|
|
|
- if qu.ObjToString(result[f]) == "" {
|
|
|
+ for _, f := range util.ImportDataFieldsCheck {
|
|
|
+ if result[f] == nil {
|
|
|
arr = append(arr, f)
|
|
|
}
|
|
|
}
|