package front import ( "encoding/json" "fmt" "github.com/tealeg/xlsx" "io/ioutil" "mongodb" qu "qfw/util" "regexp" "strings" "sync" . "task" . "udptask" "util" ) var fileIndexMap = map[string]bool{ "href": true, "title": true, "site": true, "channel": true, "area": true, "city": true, "spidercode": true, "publishtime": true, "jsondata": true, "district": true, "type": true, "publishdept": true, } var publishtimeReg = regexp.MustCompile("\\d{4}-\\d{2}-\\d{2}") var fields = map[string]interface{}{"state": 1, "spidercode": 1, "site": 1, "channel": 1, "title": 1, "href": 1} func (f *Front) PrepareBidding() { defer qu.Catch() sess := util.MgoDT.GetMgoConn() defer util.MgoDT.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 10) fields := map[string]interface{}{ "spidercode": 1, "title": 1, "href": 1, } query := map[string]interface{}{ "state": map[string]interface{}{ "$ne": 1, }, } count := util.MgoDT.Count("bidding_copy", query) it := sess.DB(util.MgoDT.DbName).C("bidding_copy").Find(&query).Select(&fields).Iter() n := 0 oknum := 0 arr := [][]map[string]interface{}{} save := []map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() list, _ := util.MgoB.Find("bidding", map[string]interface{}{"title": tmp["title"]}, nil, nil, false, -1, -1) state := 0 //标记是否找到对应的bidding数据 for _, l := range *list { if qu.ObjToString(l["href"]) == qu.ObjToString(tmp["href"]) && qu.ObjToString(l["spidercode"]) == qu.ObjToString(tmp["spidercode"]) { state = 1 lock.Lock() oknum++ l["state"] = 0 save = append(save, l) lock.Unlock() break } } update := []map[string]interface{}{} update = append(update, map[string]interface{}{"_id": tmp["_id"]}) update = append(update, map[string]interface{}{"$set": map[string]interface{}{"state": state}}) lock.Lock() arr = append(arr, update) if len(arr) > 500 { util.MgoDT.UpdateBulk("bidding_copy", arr...) arr = [][]map[string]interface{}{} } if len(save) > 500 { util.MgoDT.SaveBulk(util.DataColl, save...) save = []map[string]interface{}{} } lock.Unlock() }(tmp) if n%1000 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { util.MgoDT.UpdateBulk("bidding_copy", arr...) arr = [][]map[string]interface{}{} } if len(save) > 0 { util.MgoDT.SaveBulk(util.DataColl, save...) save = []map[string]interface{}{} } msg := "bidding_copy表共" + fmt.Sprint(count) + "条数据,成功拉取对应bidding信息" + fmt.Sprint(oknum) + "条" qu.Debug("bidding表获取信息结果:", msg) f.ServeJson(map[string]interface{}{"ok": true, "msg": msg}) } func (f *Front) ViewBidding() { defer qu.Catch() if f.Method() == "POST" { searchStr := f.GetString("search[value]") search := strings.TrimSpace(searchStr) start, _ := f.GetInteger("start") limit, _ := f.GetInteger("length") draw, _ := f.GetInteger("draw") query := map[string]interface{}{} state, _ := f.GetInteger("state") if state != -1 { query["state"] = state } if search != "" { query["$or"] = []interface{}{ map[string]interface{}{"title": map[string]interface{}{"$regex": search}}, } } qu.Debug("query:", query) task, _ := util.MgoDT.Find("bidding_copy", query, map[string]interface{}{"_id": -1}, nil, false, start, limit) count := util.MgoDT.Count("bidding_copy", query) f.ServeJson(map[string]interface{}{ "draw": draw, "data": *task, "recordsFiltered": count, "recordsTotal": count, }) } else { f.Render("task/biddingview.html") } } func (f *Front) ImportData() { defer qu.Catch() dataFile, _, err := f.GetFile("xlsx") msgArr := []string{} ok := true coll := "" save := []map[string]interface{}{} if err == nil { binary, _ := ioutil.ReadAll(dataFile) xls, _ := xlsx.OpenBinary(binary) sheet := xls.Sheets[0] coll = sheet.Name //sheetName当做表名 if RunningTask[coll] { //正在处理任务的表禁止导入数据 ok = false msgArr = append(msgArr, coll+"表已被占用!") goto END } fieldsTmp := map[string]int{} //记录字段对应的位置 for i, row := range sheet.Rows { if i == 0 { //得到字段属性 for j, cell := range row.Cells { if fileIndexMap[cell.Value] { //去除无效字段 fieldsTmp[cell.Value] = j } } } else { //生成数据 result := map[string]interface{}{} cells := row.Cells cellsLen := len(cells) for field, index := range fieldsTmp { if cellsLen >= index+1 { cell := cells[index] //字段处理 if field == "comeintime" { comeintime, _ := cell.Int64() result[field] = comeintime } else if field == "publishtime" { publishtime := cell.Value if publishtime == "0" || publishtime == "" { result[field] = "0" } else if publishtimeReg.MatchString(publishtime) { //2022-10-15 23:49:49 result[field] = publishtime } else if cell.NumFmt == "m/d/yy h:mm" { t, _ := cell.GetTime(false) result[field] = t.Format(qu.Date_Full_Layout) } else { //其他类型视为异常 ok = false msgArr = append(msgArr, "第"+fmt.Sprint(i+1)+"行publishtime字段异常") goto END } } else { //其它字段 result[field] = cell.Value } } } //必要字段缺失校验 noFields := checkField(result) if len(noFields) == 0 { result["state"] = 0 result["times"] = 0 save = append(save, result) } else { ok = false msgArr = append(msgArr, "第"+fmt.Sprint(i+1)+"行"+strings.Join(noFields, ",")+"字段不存在") } } } } END: if ok { //保存数据 if len(save) > 0 && coll != "" && !strings.Contains(coll, "Sheet") { util.MgoDT.SaveBulk(coll, save...) } else { ok = false msgArr = append(msgArr, "存储表异常") } } save = []map[string]interface{}{} f.ServeJson(map[string]interface{}{"ok": ok, "msg": msgArr}) } func (f *Front) DataList() { defer qu.Catch() coll := f.GetString("coll") if f.Method() == "POST" { searchStr := f.GetString("search[value]") search := strings.TrimSpace(searchStr) start, _ := f.GetInteger("start") limit, _ := f.GetInteger("length") draw, _ := f.GetInteger("draw") query := map[string]interface{}{} state, _ := f.GetInteger("state") //数据状态 if state != -2 { query["state"] = state } if search != "" { query["$or"] = []interface{}{ map[string]interface{}{"title": map[string]interface{}{"$regex": search}}, map[string]interface{}{"site": map[string]interface{}{"$regex": search}}, map[string]interface{}{"channel": map[string]interface{}{"$regex": search}}, map[string]interface{}{"spidercode": map[string]interface{}{"$regex": search}}, } } qu.Debug("query:", query) task, _ := util.MgoDT.Find(coll, query, map[string]interface{}{"_id": 1}, fields, false, start, limit) count := util.MgoDT.Count(coll, query) f.ServeJson(map[string]interface{}{ "draw": draw, "data": *task, "recordsFiltered": count, "recordsTotal": count, }) } else { f.T["coll"] = coll f.T["taskid"] = f.GetString("taskid") f.Render("task/datalist.html", &f.T) } } func (f *Front) DataView() { defer qu.Catch() id := f.GetString("id") coll := f.GetString("coll") data, _ := util.MgoDT.FindById(coll, id, "") delete(*data, "_id") //contenthtml detail 单独拿出来 detail := qu.ObjToString((*data)["detail"]) contenthtml := qu.ObjToString((*data)["contenthtml"]) summary := qu.ObjToString((*data)["summary"]) f.T["detail"] = detail f.T["contenthtml"] = contenthtml f.T["summary"] = summary delete(*data, "detail") delete(*data, "contenthtml") delete(*data, "summary") f.T["id"] = id f.T["data"] = *data f.T["coll"] = coll f.T["datacoll"] = util.DataColl f.T["taskid"] = f.GetString("taskid") f.Render("task/dataview.html", &f.T) } func (f *Front) DataSend() { defer qu.Catch() ok := false msg := "" taskid := f.GetString("taskid") task, _ := util.Mgo.FindById("task", taskid, nil) if len(*task) > 0 { flows := (*task)["flows"].([]interface{}) t := &Task{ ID: taskid, DB: qu.ObjToString((*task)["s_db"]), Name: qu.ObjToString((*task)["s_name"]), Coll: qu.ObjToString((*task)["s_coll"]), Flows: qu.ObjArrToStringArr(flows), IsBidding: (*task)["isbidding"].(bool), } t.SendData() ok = true } f.ServeJson(map[string]interface{}{"rep": ok, "msg": msg}) } func (f *Front) DataUpdate() { defer qu.Catch() updateStr := f.GetStringComm("update") updateMap := map[string]interface{}{} ok := false msg := "" if err := json.Unmarshal([]byte(updateStr), &updateMap); err != nil { qu.Debug("data Unmarshal Failed:", err) } else { //delete(updateMap, "_id") coll := f.GetString("coll") id := f.GetString("id") //字段类型转换 util.FormatFields(updateMap) util.FormatNumber(updateMap) //时间类型转换 query := map[string]interface{}{ "_id": mongodb.StringTOBsonId(id), } //数据推送 if coll == util.DataColl { //bidding数据推送 //更新数据 updateMap["state"] = 1 //添加数据下载成功标记 ok = util.MgoDT.Update(coll, query, map[string]interface{}{"$set": updateMap}, false, false) if ok { q := map[string]interface{}{ "_id": map[string]interface{}{ "$lt": mongodb.StringTOBsonId(id), }, } flows := []interface{}{} task, _ := util.Mgo.FindById("task", f.GetString("taskid"), map[string]interface{}{"flows": 1}) if len(*task) > 0 { flows, _ = (*task)["flows"].([]interface{}) //任务上配了流程 } gtid := util.GTEID // 起始id one, _ := util.MgoDT.Find(coll, q, map[string]interface{}{"_id": -1}, map[string]interface{}{"_id": 1}, false, 0, 1) if len(*one) == 1 { gtid = mongodb.BsonIdToSId((*one)[0]["_id"]) } if len(flows) > 0 { //任务上有处理流程,走处理流程 SendUdp(coll, gtid, id, util.FlowsMap["抽取"].Stype, util.FlowsMap["抽取"].Addr, util.FlowsMap["抽取"].Port) //直接掉抽取 } else { //直接更新 UpdateBiddingData(gtid, id) } } else { msg = "数据更新失败" } } else { //非bidding数据推送 data, _ := util.MgoDT.FindById(coll, id, nil) //记录当前表的源数据 //拼装最终数据result result := map[string]interface{}{ "dataging": 0, //补充dataging字段 "T": "bidding", } mustFiledNum := 0 //记录更新有效字段个数 for k, _ := range util.Fields { if v := updateMap[k]; v != nil { result[k] = v mustFiledNum++ } else if v := (*data)[k]; v != nil { mustFiledNum++ result[k] = v } } qu.Debug(result) if mustFiledNum > 0 { //保存服务推送有效数据 flag, id, coll := SaveObj(4002, "title", result) updateMap["state"] = 1 //添加数据下载成功标记 updateMap["sendflag"] = flag updateMap["biddingid"] = id updateMap["biddingcoll"] = coll } else { msg = "推送失败,无有效字段" } //更新数据 ok = util.MgoDT.Update(coll, query, map[string]interface{}{"$set": updateMap}, false, false) } } f.ServeJson(map[string]interface{}{"rep": ok, "msg": msg}) } func (f *Front) DataDelete() { defer qu.Catch() taskid := f.GetString("taskid") task, _ := util.Mgo.FindById("task", taskid, nil) ok := false msg := "" if len(*task) > 0 { coll := qu.ObjToString((*task)["s_coll"]) code := f.GetString("code") codeArr := strings.Split(code, ",") query := map[string]interface{}{ "spidercode": map[string]interface{}{ "$in": codeArr, }, } qu.Debug("删除无效数据的爬虫:", code) ok = util.MgoDT.Del(coll, query) } else { msg = "任务信息查询失败" } f.ServeJson(map[string]interface{}{"msg": msg, "ok": ok}) } func checkField(result map[string]interface{}) (arr []string) { defer qu.Catch() for _, f := range []string{"href", "title", "site", "channel", "area", "spidercode", "publishtime"} { if qu.ObjToString(result[f]) == "" { arr = append(arr, f) } } return }