package service import ( "encoding/json" "fmt" "github.com/go-xweb/xweb" "github.com/tealeg/xlsx" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "io/ioutil" "log" mu "mfw/util" "mongodb" "net" qu "qfw/util" "qfw/util/elastic" "qfw/util/redis" "regexp" "strings" "time" "udptask" . "util" ) var ( rpre = regexp.MustCompile("https://www.jianyu360.cn/article/content/") rsuf = regexp.MustCompile("(.html).*") AddSpiderCode = "sdxzbiddingsjzypc" // 新增bidding数据spider_code ) type RepairRule struct { *xweb.Action repairList xweb.Mapper `xweb:"/service/jy/repair"` searchID xweb.Mapper `xweb:"/service/jy/searchID"` searchJyurl xweb.Mapper `xweb:"/service/jy/searchJyurl"` repairEdit xweb.Mapper `xweb:"/service/jy/edit"` repairDelete xweb.Mapper `xweb:"/service/jy/delete"` repairSave xweb.Mapper `xweb:"/service/jy/save"` repairCreate xweb.Mapper `xweb:"/service/jy/create"` repairNewSave xweb.Mapper `xweb:"/service/jy/newSave"` repairBulk xweb.Mapper `xweb:"/service/jy/bulk_repair"` repairImport xweb.Mapper `xweb:"/service/jy/importData"` repairImport1 xweb.Mapper `xweb:"/service/jy/importData1"` repairModify xweb.Mapper `xweb:"/service/jy/modifySave"` repairDel xweb.Mapper `xweb:"/service/jy/delSave"` repairRecord xweb.Mapper `xweb:"/service/jy/modifyRecord"` repairPro xweb.Mapper `xweb:"/service/jy/repair/pro"` repairFindPro xweb.Mapper `xweb:"/service/jy/repair/queryPro"` repairProSave xweb.Mapper `xweb:"/service/jy/repair/pro/save"` repairProRecord xweb.Mapper `xweb:"/service/jy/repair/pro/record"` repairNewAdd xweb.Mapper `xweb:"/service/jy/repair/newAdd"` repairPub xweb.Mapper `xweb:"/service/jy/repair/pubSave"` redisRepair xweb.Mapper `xweb:"/service/jy/repair/redis"` esDel xweb.Mapper `xweb:"/service/jy/repair/es/del"` esDelRecord xweb.Mapper `xweb:"/service/jy/repair/es/del/record"` esCount xweb.Mapper `xweb:"/service/jy/repair/es/count/byField"` esDelBy xweb.Mapper `xweb:"/service/jy/repair/es/del/byField"` } func (jy *RepairRule) RepairList() { defer qu.Catch() _ = jy.Render("repair/jy_repair.html") } func (jy *RepairRule) RedisRepair() { defer qu.Catch() _ = jy.Render("repair/jy_redis_repair.html") } func (jy *RepairRule) EsDel() { defer qu.Catch() _ = jy.Render("repair/jy_es_del.html") } //新增数据 func (jy *RepairRule) RepairCreate() { defer qu.Catch() jy.T["data"] = map[string]string{ "title": "", "site": "", "href": "", "channel": "", "publishtime": "", "area": "", "city": "", "district": "", "infoformat": "1"} jy.Render("repair/jy_create.html", &jy.T) } //新增数据 func (jy *RepairRule) RepairNewSave() { defer qu.Catch() log.Println("新增数据") //mongo新增 request_data := GetPostForm(jy.Request) updata := qu.ObjToMap(request_data["data"]) reasons := qu.ObjToString(request_data["reasons"]) contenthtml := qu.ObjToString(request_data["contenthtml"]) detail := detailClear(qu.ObjToString(request_data["detail"])) summary := qu.ObjToString(request_data["summary"]) (*updata)["contenthtml"] = contenthtml (*updata)["detail"] = detail if summary != "" { (*updata)["summary"] = summary } //处理个别字段 (*updata)["_id"] = primitive.NewObjectID() if hf := qu.ObjToString((*updata)["href"]); hf == "" { var Url = "https://www.jianyu360.cn/article/content/%s.html" (*updata)["href"] = fmt.Sprintf(Url, qu.CommonEncodeArticle("content", mongodb.BsonIdToSId((*updata)["_id"]))) } // spider_code (*updata)["spidercode"] = AddSpiderCode // s_sha (*updata)["s_sha"] = Sha(detail) // dataging (*updata)["dataging"] = 0 // comeintime (*updata)["comeintime"] = time.Now().Unix() // publishtime if val, ok := (*updata)["publishtime"]; ok { (*updata)["publishtime"] = qu.Int64All(val) } else { pt := int64(0) (*updata)["publishtime"] = pt } // infoformat if val, ok := (*updata)["infoformat"]; ok { (*updata)["infoformat"] = qu.IntAll(val) } else { (*updata)["infoformat"] = 1 } b := JYMgo.SaveByOriID(JyCollNameOne, *updata) // 保存服务 href := qu.ObjToString((*updata)["href"]) db := HexToBigIntMod(href) hashHref := HexText(href) shaid := Sha(detail) if !strings.Contains(href, "https://www.jianyu360.cn/") { // 剑鱼链接 不需要存redis PutRedis("title_repeat_fulljudgement", db, hashHref, mongodb.BsonIdToSId((*updata)["_id"]), -1) // 全量redis } PutRedis("shaid", 0, shaid, mongodb.BsonIdToSId((*updata)["_id"]), 5184000) // 增量reids //PutRedis("title_repeat_listpagehref", 0, href, "", 3600*24*30*24) // 列表页增量redis if b { log.Println("当前新增id:", mongodb.BsonIdToSId((*updata)["_id"])) //日志记录 user := jy.GetSession("user").(map[string]interface{}) log_data := map[string]interface{}{ "s_modifyuser": user["name"], "s_type": 1, "i_modifytime": time.Now().Unix(), "s_modifyreason": reasons, "s_backupid": mongodb.BsonIdToSId((*updata)["_id"]), "o_oldinfo": map[string]interface{}{}, "o_newinfo": updata, } Mgo.Save(JyRecord, log_data) jy.ServeJson(map[string]interface{}{ "rep": true, }) } else { jy.ServeJson(map[string]interface{}{ "rep": false, }) } } // RepairNewAdd 新增数据(无数据流程) func (jy *RepairRule) RepairNewAdd() { defer qu.Catch() jy.T["province"] = Province jy.T["city"] = ProvinceCitys jy.T["district"] = CityDistricts jy.T["topTypeArr"] = TopTypeArr jy.T["subTypeMap"] = SubTypeMap jy.T["buyerClass"] = BuyerClass jy.T["scopeClass"] = ScopeClassMap jy.Render("repair/jy_create_new.html", &jy.T) } func (jy *RepairRule) RepairPub() { defer qu.Catch() request_data := GetPostForm(jy.Request) jsdata := qu.ObjToMap(request_data["data"]) qu.Debug(*jsdata) save := make(map[string]interface{}) save["_id"] = primitive.NewObjectID() for k, v := range *jsdata { if v == "" { continue } if k == "publishtime" { save[k] = qu.Int64All(v) } else if k == "href" && v == "#" { var Url = "https://www.jianyu360.cn/article/content/%s.html" save["href"] = fmt.Sprintf(Url, qu.CommonEncodeArticle("content", mongodb.BsonIdToSId(save["_id"]))) } else if k == "detail" { v = strings.Replace(qu.ObjToString(v), "<", "<", -1) v = strings.Replace(qu.ObjToString(v), ">", ">", -1) save[k] = v } else { save[k] = v } } // spider_code save["spidercode"] = AddSpiderCode // s_sha save["s_sha"] = Sha(qu.ObjToString(save["detail"])) // dataging save["dataging"] = 0 // comeintime save["comeintime"] = time.Now().Unix() save["infoformat"] = 1 save["extracttype"] = 0 b := JYMgo.SaveByOriID(JyCollNameOne, save) if b { // 保存服务 href := qu.ObjToString(save["href"]) db := HexToBigIntMod(href) hashHref := HexText(href) shaid := Sha(qu.ObjToString(save["detail"])) if !strings.Contains(href, "https://www.jianyu360.cn/") { // 剑鱼链接 不需要存redis PutRedis("title_repeat_fulljudgement", db, hashHref, mongodb.BsonIdToSId(save["_id"]), -1) } PutRedis("shaid", 0, shaid, "", 5184000) log.Println("当前新增id:", mongodb.BsonIdToSId(save["_id"])) //日志记录 user := jy.GetSession("user").(map[string]interface{}) log_data := map[string]interface{}{ "s_modifyuser": user["name"], "s_type": 1, "i_modifytime": time.Now().Unix(), "s_modifyreason": "剑鱼直接新增数据", "s_backupid": mongodb.BsonIdToSId(save["_id"]), "o_newinfo": save, } Mgo.Save(JyRecord, log_data) indexNode := *qu.ObjToMap(Sysconfig["indexNode"]) by, _ := json.Marshal(map[string]interface{}{ "query": map[string]interface{}{ "_id": bson.M{ "$gte": save["_id"], "$lte": save["_id"], }}, "stype": qu.ObjToString(indexNode["stype"]), "coll": JyCollNameOne, }) addr := &net.UDPAddr{ IP: net.ParseIP(indexNode["addr"].(string)), Port: qu.IntAll(indexNode["port"]), } qu.Debug("udp---1---------", string(by)) udptask.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) jy.ServeJson(map[string]interface{}{ "rep": true, }) } else { jy.ServeJson(map[string]interface{}{ "rep": false, }) } } //编辑 func (jy *RepairRule) RepairEdit() { defer qu.Catch() id := jy.GetString("id") coll := jy.GetString("coll") data, _ := JYMgo.FindById(coll, id, "") delete(*data, "_id") //contenthtml detail 单独拿出来 detail := qu.ObjToString((*data)["detail"]) contenthtml := qu.ObjToString((*data)["contenthtml"]) summary := qu.ObjToString((*data)["summary"]) jy.T["detail"] = detail jy.T["contenthtml"] = contenthtml jy.T["summary"] = summary delete(*data, "detail") delete(*data, "contenthtml") delete(*data, "summary") jy.T["id"] = id jy.T["data"] = *data jy.T["coll"] = coll jy.Render("repair/jy_edit.html", &jy.T) } //删除 func (jy *RepairRule) RepairDelete() { defer qu.Catch() id := jy.GetString("_id") coll := jy.GetString("coll") old_data, _ := JYMgo.FindById(coll, id, "") //记录当前表的源数据 log.Println(id, coll) set := bson.M{"_id": mongodb.StringTOBsonId(id)} b := JYMgo.Del(coll, set) if !b { jy.ServeJson(map[string]interface{}{ "rep": b, }) } else { //删除es client := elastic.GetEsConn() defer elastic.DestoryEsConn(client) _, err := client.Delete().Index(EsIndex).Type(EsType).Id(id).Refresh(true).Do() if err != nil { log.Println("delete es err:", err) } //删除redis 指定key delName1 := RedisDelKey1 + id + "_*" redis.DelByCodePattern(RedisJYName, delName1) //jyredis := redis.RedisPool[RedisJYName].Get() //defer jyredis.Close() //if _, err := jyredis.Do("SELECT", 0); err != nil { // log.Println("删除-redis-select-db失败") //} else { // delName1 := RedisDelKey1 + id + "_*" // if _, err1 := jyredis.Do("DEL", delName1); err1 != nil { // log.Println("删除-del-redis-fail:", delName1) // } //delName2 := RedisDelKey2 + id //if _, err2 := jyredis.Do("DEL", delName2); err2 != nil { // log.Println("删除-del-redis-fail:", delName2) //} //delName3 := RedisDelKey3 + id //if _, err3 := jyredis.Do("DEL", delName3); err3 != nil { // log.Println("删除-del-redis-fail:", delName3) //} //delName4 := RedisDelKey4 + id //if _, err4 := jyredis.Do("DEL", delName4); err4 != nil { // log.Println("更新-del-redis-fail:", delName4) //} //delName5 := RedisDelKey5 + id //if _, err5 := jyredis.Do("DEL", delName5); err5 != nil { // log.Println("更新-del-redis-fail:", delName5) //} //} //日志记录 user := jy.GetSession("user").(map[string]interface{}) log_data := map[string]interface{}{ "s_modifyuser": user["name"], "s_type": 3, "i_modifytime": time.Now().Unix(), "s_modifyreason": "单条删除", "s_backupid": id, "o_oldinfo": old_data, "modifyinfo": "删除数据", } Mgo.Save(JyRecord, log_data) // udp 项目 nextNode := *qu.ObjToMap(Sysconfig["jy_pro_node"]) by, _ := json.Marshal(map[string]interface{}{ "infoid": id, "stype": "deleteInfo", }) addr := &net.UDPAddr{ IP: net.ParseIP(nextNode["addr"].(string)), Port: qu.IntAll(nextNode["port"]), } qu.Debug(string(by)) udptask.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) jy.ServeJson(map[string]interface{}{ "rep": b, }) } } //修改-mongo -es func (jy *RepairRule) RepairSave() { defer qu.Catch() if jy.Method() == "POST" { //request_data := GetPostForm(jy.Request) dataStr := jy.GetString("data") var updata map[string]interface{} if err := json.Unmarshal([]byte(dataStr), &updata); err != nil { qu.Debug("data Unmarshal Failed:", err) } coll := jy.GetString("coll") id := jy.GetString("id") old_data, _ := JYMgo.FindById(coll, id, nil) //记录当前表的源数据 delete(*old_data, "_id") reasons := jy.GetString("reasons") //contenthtml := qu.ObjToString(request_data["contenthtml"]) //detail := qu.ObjToString(request_data["detail"]) //summary := qu.ObjToString(request_data["summary"]) modifyStr := jy.GetString("modifyinfo") var modifyinfo map[string]interface{} if err := json.Unmarshal([]byte(modifyStr), &modifyinfo); err != nil { qu.Debug("modifyinfo Unmarshal Failed:", err) } //(*updata)["contenthtml"] = contenthtml if detail := qu.ObjToString(updata["detail"]); detail != "" { detail = strings.Replace(detail, "<", "<", -1) detail = strings.Replace(detail, ">", ">", -1) updata["detail"] = detail } if contenthtml := qu.ObjToString(updata["contenthtml"]); contenthtml != "" { contenthtml = strings.Replace(contenthtml, "<", "<", -1) contenthtml = strings.Replace(contenthtml, ">", ">", -1) updata["contenthtml"] = contenthtml } //变更字段 if len(modifyinfo) == 0 { jy.ServeJson(map[string]interface{}{ "rep": false, }) } else { if (*old_data)["modifyinfo"] != nil { if m, ok := (*old_data)["modifyinfo"].(map[string]interface{}); ok { for k, v := range m { modifyinfo[k] = v } } } } //处理个别字段 //if val, ok := (*old_data)["publishtime"]; ok { // (*old_data)["publishtime"] = qu.Int64All(val) //} else { // pt := int64(0) // (*old_data)["publishtime"] = pt //} //if val, ok := (*old_data)["infoformat"]; ok { // (*old_data)["infoformat"] = qu.IntAll(val) //} else { // (*old_data)["infoformat"] = 1 //} query := bson.M{ "_id": mongodb.StringTOBsonId(id), } delete(updata, "_id") for k, v := range updata { if k == "extracttype" { (updata)[k] = qu.IntAll(v) } else if k == "publishtime" || k == "bidopentime" { (updata)[k] = qu.Int64All(v) } else if k == "budget" || k == "bidamount" { (updata)[k] = qu.Float64All(v) } } updata["modifyinfo"] = modifyinfo var set map[string]interface{} set = bson.M{ "$set": updata, } //if modifyinfo["summary"] == nil { // delete(*updata, "summary") // set = bson.M{ // "$set": *updata, // "$unset": bson.M{ // "summary": "", // }, // } //} else { // set = bson.M{ // "$set": *updata, // } //} FormatNumber(updata) qu.Debug(query, updata) rep := JYMgo.Update(coll, query, set, true, false) if !rep { jy.ServeJson(map[string]interface{}{ "rep": rep, }) } else { //调udp生索引 indexNode := *qu.ObjToMap(Sysconfig["indexNode"]) by, _ := json.Marshal(map[string]interface{}{ "query": map[string]interface{}{ "_id": bson.M{ "$gte": mongodb.StringTOBsonId(id), "$lte": mongodb.StringTOBsonId(id), }}, "stype": qu.ObjToString(indexNode["stype"]), "coll": coll, }) addr := &net.UDPAddr{ IP: net.ParseIP(indexNode["addr"].(string)), Port: qu.IntAll(indexNode["port"]), } qu.Debug("udp---1---------", string(by)) udptask.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) if IsModifyPro(modifyinfo) { // udp 项目 nextNode := *qu.ObjToMap(Sysconfig["jy_pro_node"]) by1, _ := json.Marshal(map[string]interface{}{ "infoid": id, "stype": "updateInfo", "coll": coll, }) addr1 := &net.UDPAddr{ IP: net.ParseIP(nextNode["addr"].(string)), Port: qu.IntAll(nextNode["port"]), } qu.Debug("udp---2---------", string(by1)) udptask.Udpclient.WriteUdp(by1, mu.OP_TYPE_DATA, addr1) } //删除redis 指定key delName1 := RedisDelKey1 + id + "_*" redis.DelByCodePattern(RedisJYName, delName1) //jyredis := redis.RedisPool[RedisJYName].Get() //defer jyredis.Close() //if _, err := jyredis.Do("SELECT", 0); err != nil { // log.Println("更新-redis-select-db失败") //} else { // delName1 := RedisDelKey1 + id + "_*" // if _, err1 := jyredis.Do("DEL", delName1); err1 != nil { // log.Println("更新-del-redis-fail:", delName1) // } //delName2 := RedisDelKey2 + id //if _, err2 := jyredis.Do("DEL", delName2); err2 != nil { // log.Println("更新-del-redis-fail:", delName2) //} //delName3 := RedisDelKey3 + id //if _, err3 := jyredis.Do("DEL", delName3); err3 != nil { // log.Println("更新-del-redis-fail:", delName3) //} //delName4 := RedisDelKey4 + id //if _, err4 := jyredis.Do("DEL", delName4); err4 != nil { // log.Println("更新-del-redis-fail:", delName4) //} //delName5 := RedisDelKey5 + id //if _, err5 := jyredis.Do("DEL", delName5); err5 != nil { // log.Println("更新-del-redis-fail:", delName5) //} //} //日志记录 user := jy.GetSession("user").(map[string]interface{}) log_data := map[string]interface{}{ "s_modifyuser": user["name"], "s_type": 2, "i_modifytime": time.Now().Unix(), "s_modifyreason": reasons, "s_backupid": id, "o_oldinfo": old_data, "o_newinfo": updata, "modifyinfo": modifyinfo, } Mgo.Save(JyRecord, log_data) jy.ServeJson(map[string]interface{}{ "rep": rep, }) } } } //查询 func (jy *RepairRule) SearchID() { defer qu.Catch() if jy.Method() == "POST" { id := jy.GetString("_id") //先查one 没有在查two if !mongodb.IsObjectIdHex(id) { jy.ServeJson(map[string]interface{}{ "rep": false, "msg": "id格式不正确", }) } else { data_one, rep_one := JYMgo.Find(JyCollNameOne, bson.M{"_id": mongodb.StringTOBsonId(id)}, nil, nil, false, -1, -1) if !rep_one || len(*data_one) <= 0 { data_two, rep_two := JYMgo.Find(JyCollNameTwo, bson.M{"_id": mongodb.StringTOBsonId(id)}, nil, nil, false, -1, -1) qu.Debug(JyCollNameOne, id) if !rep_two || len(*data_two) <= 0 { jy.ServeJson(map[string]interface{}{ "rep": false, "msg": "未查询到数据", }) } else { jy.ServeJson(map[string]interface{}{ "orgColl": JyCollNameTwo, "rep": rep_two, "data": data_two, }) } } else { qu.Debug(JyCollNameOne, id) jy.ServeJson(map[string]interface{}{ "orgColl": JyCollNameOne, "rep": rep_one, "data": data_one, }) } } } } func (jy *RepairRule) SearchJyurl() { defer qu.Catch() if jy.Method() == "POST" { jyurl := jy.GetString("jyurl") jyurl = rpre.ReplaceAllString(jyurl, "") jyurl = rsuf.ReplaceAllString(jyurl, "") new_id := qu.CommonDecodeArticle("content", jyurl)[0] qu.Debug(new_id) if !mongodb.IsObjectIdHex(new_id) { jy.ServeJson(map[string]interface{}{ "rep": false, "msg": "href格式不正确", }) } else { data_one, rep_one := JYMgo.Find(JyCollNameOne, bson.M{"_id": mongodb.StringTOBsonId(new_id)}, nil, nil, false, -1, -1) if !rep_one || len(*data_one) <= 0 { data_two, rep_two := JYMgo.Find(JyCollNameTwo, bson.M{"_id": mongodb.StringTOBsonId(new_id)}, nil, nil, false, -1, -1) if !rep_two || len(*data_two) <= 0 { jy.ServeJson(map[string]interface{}{ "rep": false, "msg": "未查询到数据", }) } else { jy.ServeJson(map[string]interface{}{ "orgColl": JyCollNameTwo, "rep": rep_two, "data": data_two, }) } } else { jy.ServeJson(map[string]interface{}{ "orgColl": JyCollNameOne, "rep": rep_one, "data": data_one, }) } } } } func detailClear(detail string) string { cut := NewCut() new_s := cut.ClearHtml(detail) return new_s } func (jy *RepairRule) RepairBulk() { defer qu.Catch() jy.Render("repair/jy_bulk.html") } func (jy *RepairRule) RepairRecord() { defer qu.Catch() if jy.Method() == "POST" { start, _ := jy.GetInteger("start") limit, _ := jy.GetInteger("length") draw, _ := jy.GetInteger("draw") searchStr := jy.GetString("search[value]") search := strings.TrimSpace(searchStr) query := bson.M{} if search != "" { query["$or"] = []interface{}{ //bson.M{"s_customer": bson.M{"$regex": search}}, //bson.M{"s_tagname": bson.M{"$regex": search}}, } } field := map[string]interface{}{"s_backupid": 1, "s_modifyuser": 1, "i_modifytime": 1, "modifyinfo": 1, "s_modifyreason": 1} data, _ := Mgo.Find("jy_modify_log", query, `{"i_modifytime": -1}`, field, false, start, limit) count := Mgo.Count("jy_modify_log", query) jy.ServeJson(map[string]interface{}{ "draw": draw, "data": data, "recordsFiltered": count, "recordsTotal": count, }) } else { jy.Render("repair/jy_bulk.html") } } /** 批量修改 */ func (jy *RepairRule) RepairImport() { defer qu.Catch() if jy.Method() == "POST" { mf, _, err := jy.GetFile("xlsx") if err == nil { binary, err := ioutil.ReadAll(mf) if err == nil { data, err := ParsJyData(binary) if err == nil { jy.ServeJson(map[string]interface{}{ "data": data, "rep": true, }) return } } } jy.ServeJson(map[string]interface{}{ "rep": false, }) } } /** * 导入剑鱼批量修改数据 */ func ParsJyData(filebyte []byte) ([]map[string]interface{}, error) { var jyData []map[string]interface{} var keyName []string file, err := xlsx.OpenBinary(filebyte) if err != nil { return jyData, err } for i, v := range file.Sheets[0].Rows { data := make(map[string]interface{}) for ii, vv := range v.Cells { if i == 0 { keyName = append(keyName, vv.Value) } else { if vv.Value != "" { data[keyName[ii]] = vv.Value } } } if len(data) > 0 { jyData = append(jyData, data) } } return jyData, nil } /** 批量删除 */ func (jy *RepairRule) RepairImport1() { defer qu.Catch() if jy.Method() == "POST" { mf, _, err := jy.GetFile("xlsx") if err == nil { binary, err := ioutil.ReadAll(mf) if err == nil { data, err := ParsJyData(binary) if err == nil { jy.ServeJson(map[string]interface{}{ "data": data, "rep": true, }) return } } } jy.ServeJson(map[string]interface{}{ "rep": false, }) } } /** 批量修改保存 */ func (jy *RepairRule) RepairModify() { defer qu.Catch() if jy.Method() == "POST" { user := jy.GetSession("user").(map[string]interface{}) data := GetPostForm(jy.Request) var updata []map[string]interface{} json.Unmarshal([]byte(qu.ObjToString(data["data"])), &updata) var errs []map[string]interface{} for _, tmp := range updata { err := ModifyData(tmp, user) if err != nil { errs = append(errs, err) } time.Sleep(time.Millisecond * 1) } if len(errs) > 0 { jy.ServeJson(map[string]interface{}{ "rep": false, "data": errs, }) } else { jy.ServeJson(map[string]interface{}{ "rep": true, }) } } } func ModifyData(tmp map[string]interface{}, user map[string]interface{}) (err map[string]interface{}) { id := qu.ObjToString(tmp["_id"]) old_data, _ := JYMgo.FindById(JyCollNameOne, id, `{}`) coll := "" if len(*old_data) > 0 { coll = JyCollNameOne } else { qu.Debug("ModifyData--------", id) old_data, _ = JYMgo.FindById(JyCollNameTwo, id, `{}`) coll = JyCollNameTwo } delete(*old_data, "_id") //new_data := Copy(*old_data).(map[string]interface{}) new_data := make(map[string]interface{}) modifyinfo := make(map[string]interface{}) del_data := make(map[string]interface{}) for k, v := range tmp { if k == "_id" { continue } if strings.EqualFold(strings.ToUpper(qu.ObjToString(v)), "DEL") { del_data[k] = "" delete(new_data, k) } else if k == "budget" || k == "bidamount" { new_data[k] = qu.Float64All(v) } else if strings.Contains(k, "time") && 10 == len(fmt.Sprint(v)) { new_data[k] = qu.Int64All(v) } else { new_data[k] = v } modifyinfo[k] = "剑鱼维护" } //变更字段 if new_data["modifyinfo"] != nil { tmpinfo := new_data["modifyinfo"].(map[string]interface{}) for k, v := range modifyinfo { tmpinfo[k] = v } new_data["modifyinfo"] = tmpinfo } else { new_data["modifyinfo"] = modifyinfo } query := bson.M{ "_id": mongodb.StringTOBsonId(id), } delete(new_data, "_id") var set map[string]interface{} if len(del_data) == 0 { set = bson.M{ "$set": new_data, } } else { set = bson.M{ "$set": new_data, "$unset": del_data, } } rep := JYMgo.Update(coll, query, set, false, false) if !rep { return map[string]interface{}{"_id": id, "err": "更新失败"} } indexNode := *qu.ObjToMap(Sysconfig["indexNode"]) by, _ := json.Marshal(map[string]interface{}{ "query": map[string]interface{}{ "_id": bson.M{ "$gte": mongodb.StringTOBsonId(id), "$lte": mongodb.StringTOBsonId(id), }}, "stype": qu.ObjToString(indexNode["stype"]), "coll": coll, }) addr := &net.UDPAddr{ IP: net.ParseIP(indexNode["addr"].(string)), Port: qu.IntAll(indexNode["port"]), } qu.Debug("udp------------", string(by)) udptask.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) if IsModifyPro(modifyinfo) { // udp 项目 nextNode := *qu.ObjToMap(Sysconfig["jy_pro_node"]) by1, _ := json.Marshal(map[string]interface{}{ "infoid": id, "stype": "updateInfo", }) addr1 := &net.UDPAddr{ IP: net.ParseIP(nextNode["addr"].(string)), Port: qu.IntAll(nextNode["port"]), } qu.Debug("udp---2---------", string(by1)) udptask.Udpclient.WriteUdp(by1, mu.OP_TYPE_DATA, addr1) } //删除redis 指定key delName1 := RedisDelKey1 + id + "_*" redis.DelByCodePattern(RedisJYName, delName1) //jyredis := redis.RedisPool[RedisJYName].Get() //defer jyredis.Close() //if _, err := jyredis.Do("SELECT", 0); err != nil { // log.Println("更新-redis-select-db失败") //} else { // delName1 := RedisDelKey1 + id + "_*" // if _, err1 := jyredis.Do("DEL", delName1); err1 != nil { // log.Println("更新-del-redis-fail:", delName1) // } //delName2 := RedisDelKey2 + id //if _, err2 := jyredis.Do("DEL", delName2); err2 != nil { // log.Println("更新-del-redis-fail:", delName2) //} //delName3 := RedisDelKey3 + id //if _, err3 := jyredis.Do("DEL", delName3); err3 != nil { // log.Println("更新-del-redis-fail:", delName3) //} //delName4 := RedisDelKey4 + id //if _, err4 := jyredis.Do("DEL", delName4); err4 != nil { // log.Println("更新-del-redis-fail:", delName4) //} //delName5 := RedisDelKey5 + id //if _, err5 := jyredis.Do("DEL", delName5); err5 != nil { // log.Println("更新-del-redis-fail:", delName5) //} //} delete(new_data, "modifyinfo") log_data := map[string]interface{}{ "s_modifyuser": user["name"], "s_type": 2, "i_modifytime": time.Now().Unix(), "s_modifyreason": "批量修改", "s_backupid": id, "o_oldinfo": old_data, "o_newinfo": new_data, "modifyinfo": modifyinfo, } Mgo.Save(JyRecord, log_data) return nil } func (jy *RepairRule) RepairDel() { defer qu.Catch() if jy.Method() == "POST" { user := jy.GetSession("user").(map[string]interface{}) data := GetPostForm(jy.Request) var updata []map[string]interface{} json.Unmarshal([]byte(qu.ObjToString(data["data"])), &updata) var errs []map[string]interface{} for _, tmp := range updata { err := ModifyData1(tmp, user) if err != nil { errs = append(errs, err) } time.Sleep(time.Millisecond * 1) } if len(errs) > 0 { jy.ServeJson(map[string]interface{}{ "rep": false, "data": errs, }) } else { jy.ServeJson(map[string]interface{}{ "rep": true, }) } } } func ModifyData1(tmp map[string]interface{}, user map[string]interface{}) (err map[string]interface{}) { id := qu.ObjToString(tmp["_id"]) old_data, _ := JYMgo.FindById(JyCollNameOne, id, `{}`) coll := "" if len(*old_data) > 0 { coll = JyCollNameOne } else { old_data, _ = JYMgo.FindById(JyCollNameTwo, id, `{}`) coll = JyCollNameTwo } query := map[string]interface{}{ "_id": mongodb.StringTOBsonId(id), } rep := JYMgo.Del(coll, query) if !rep { return map[string]interface{}{"_id": id, "err": "删除失败"} } //删除es client := elastic.GetEsConn() defer elastic.DestoryEsConn(client) _, err1 := client.Delete().Index(EsIndex).Type(EsType).Id(id).Refresh(true).Do() if err1 != nil { log.Println("delete es err:", err1) } // udp 项目 nextNode := *qu.ObjToMap(Sysconfig["jy_pro_node"]) by, _ := json.Marshal(map[string]interface{}{ "infoid": id, "stype": "deleteInfo", }) addr := &net.UDPAddr{ IP: net.ParseIP(nextNode["addr"].(string)), Port: qu.IntAll(nextNode["port"]), } qu.Debug(string(by)) udptask.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //删除redis 指定key delName1 := RedisDelKey1 + id + "_*" redis.DelByCodePattern(RedisJYName, delName1) //jyredis := redis.RedisPool[RedisJYName].Get() //defer jyredis.Close() //if _, err := jyredis.Do("SELECT", 0); err != nil { // log.Println("更新-redis-select-db失败") //} else { // delName1 := RedisDelKey1 + id + "_*" // if _, err1 := jyredis.Do("DEL", delName1); err1 != nil { // log.Println("更新-del-redis-fail:", delName1) // } //delName2 := RedisDelKey2 + id //if _, err2 := jyredis.Do("DEL", delName2); err2 != nil { // log.Println("更新-del-redis-fail:", delName2) //} //delName3 := RedisDelKey3 + id //if _, err3 := jyredis.Do("DEL", delName3); err3 != nil { // log.Println("更新-del-redis-fail:", delName3) //} //delName4 := RedisDelKey4 + id //if _, err4 := jyredis.Do("DEL", delName4); err4 != nil { // log.Println("更新-del-redis-fail:", delName4) //} //delName5 := RedisDelKey5 + id //if _, err5 := jyredis.Do("DEL", delName5); err5 != nil { // log.Println("更新-del-redis-fail:", delName5) //} //} log_data := map[string]interface{}{ "s_modifyuser": user["name"], "s_type": 2, "i_modifytime": time.Now().Unix(), "s_modifyreason": "批量删除", "s_backupid": id, "o_oldinfo": old_data, "modifyinfo": "删除数据", } Mgo.Save(JyRecord, log_data) return nil } func (jy *RepairRule) EsCount() { defer qu.Catch() value := jy.GetString("data") field := jy.GetString("field") esquery := `{"query":{"bool":{"must":[{"term":{"` + field + `":"` + value + `"}}]}}}` count := elastic.Count(EsIndex, EsType, esquery) jy.ServeJson(map[string]interface{}{ "rep": true, "count": count, }) } func (jy *RepairRule) EsDelBy() { defer qu.Catch() value := jy.GetString("data") field := jy.GetString("field") count, _ := jy.GetInt("count") esquery := `{"query":{"bool":{"must":[{"term":{"` + field + `":"` + value + `"}}]}}}` bol := elastic.Del(EsIndex, EsType, esquery) Mgo.Save("jy_es_del_log", bson.M{"type": field, "value": value, "count": count, "createtime": time.Now().Unix()}) jy.ServeJson(map[string]interface{}{ "rep": bol, }) } func (jy *RepairRule) EsDelRecord() { defer qu.Catch() if jy.Method() == "POST" { start, _ := jy.GetInteger("start") limit, _ := jy.GetInteger("length") draw, _ := jy.GetInteger("draw") searchStr := jy.GetString("search[value]") search := strings.TrimSpace(searchStr) query := bson.M{} if search != "" { query["$or"] = []interface{}{ //bson.M{"s_customer": bson.M{"$regex": search}}, //bson.M{"s_tagname": bson.M{"$regex": search}}, } } data, _ := Mgo.Find("jy_es_del_log", query, `{"createtime": -1}`, nil, false, start, limit) count := Mgo.Count("jy_es_del_log", query) jy.ServeJson(map[string]interface{}{ "draw": draw, "data": data, "recordsFiltered": count, "recordsTotal": count, }) } }