Răsfoiți Sursa

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

fengweiqiang 5 ani în urmă
părinte
comite
055d81afbc

+ 2 - 1
.gitignore

@@ -5,4 +5,5 @@ bin
 *.log
 */src/src
 *.data
-*/bin
+*/bin
+tp

+ 3 - 0
data_quality/src/config.json

@@ -0,0 +1,3 @@
+{
+
+}

+ 18 - 0
data_quality/src/main.go

@@ -0,0 +1,18 @@
+package main
+
+import (
+
+	"time"
+)
+
+func init() {
+
+}
+
+
+func main() {
+
+
+
+	time.Sleep(99999 * time.Hour)
+}

+ 7 - 5
domainameclear/src/task.go

@@ -17,10 +17,11 @@ import (
 //固定后缀
 //var Reg2 = regexp.MustCompile(`((http|https)[::]//(www.)?|www.|WWW.)([0-9A-Za-z_]+[-\.]{0,})+\.(cn|asia|hn|citic|ltd|tv|shop|com|mo|co|net|cnpc|CN|CC|cc|pro|aero|coop|hk|tw|me|rec|arts|store|firm|int|info|org|top|wang|ren|xyz|xin|pub|tech|ink|biz|red|gov|vip|art|edu)+`)
 //支持空格
-//var Reg1 = regexp.MustCompile("((http|https)[::]//(www.)?|www.|WWW.)([\\s\u3000\u2003\u00a0]{0,}[-A-Za-z0-9&@$??#/%=~_|.::,]+)+([\\s\u3000\u2003\u00a0]{0,}(com|cn|net))?[-A-Za-z0-9&@$??#/%=~_|.::,]+")
-var Reg1 = regexp.MustCompile("((http|https)[::]//(www.)?|www.|WWW.)[-A-Za-z0-9&@$??#/%=~_|.::,]+")
-var Reg2 = regexp.MustCompile("((http|https)[::]//(www.)?|www.|WWW.)(\\w+[-.]{0,})+")
-var Clear1 = regexp.MustCompile(".*(cn|com|org|net|co|mo)((\\d)+[.]{0,}(\\d){0,})$")
+var Reg1 = regexp.MustCompile("((http|https)[::]//(www\\.)?|www\\.|WWW\\.)([\\s\u3000\u2003\u00a0]{0,}[-A-Za-z0-9&@$??#/%=~_|.::,]+)+([\\s\u3000\u2003\u00a0]{0,}(com|cn|net))?[-A-Za-z0-9&@$??#/%=~_|.::,]+")
+
+//var Reg1 = regexp.MustCompile(`((http|https)[::]//(www\.)?|www\.|WWW\.)[-A-Za-z0-9&@$??#/%=~_|.::,]+`)
+var Reg2 = regexp.MustCompile("((http|https)[::]//(www\\.)?|www\\.|WWW\\.)(\\w+[-.\\s\u3000\u2003\u00a0]{0,})+")
+var Clear1 = regexp.MustCompile(".*(cn|com|org|net|co|mo|vn|en)((\\d)+[.]{0,}(\\d){0,})$")
 var RegSpace = regexp.MustCompile("[\\s\u3000\u2003\u00a0]+")
 var Replace = map[string]string{
 	":": ":",
@@ -63,7 +64,7 @@ func StartTask() {
 	field := map[string]interface{}{"detail": 1}
 	logger.Debug("query:", q)
 	it := sess.DB("qfw").C("bidding").Find(q).Select(field).Sort("_id").Iter()
-	count := Mgo.Count("bidding", q)
+	count := Mgo.Count("test", q)
 	fmt.Println("共加载数据", count)
 	sum := 0
 	wg := &sync.WaitGroup{}
@@ -169,6 +170,7 @@ func StartTask() {
 	// 	arr = [][]map[string]interface{}{}
 	// }
 	lock_dmn.Unlock()
+	fmt.Println("本轮任务结束")
 }
 
 //加载域名信息

+ 112 - 0
fullproject/modifyMoney/src/main.go

@@ -0,0 +1,112 @@
+package main
+
+import (
+	"encoding/json"
+	"flag"
+	"github.com/tealeg/xlsx"
+	"io/ioutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	qu "qfw/util"
+	"time"
+)
+
+var itype, p int
+var id, ip, budget, bidamount, file string
+func main()  {
+	flag.IntVar(&itype, "itype", 0, "类型")
+	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
+	flag.IntVar(&p, "p", 0, "端口")
+	flag.StringVar(&id, "id", "", "id")
+	flag.StringVar(&budget, "budget", "", "预算")
+	flag.StringVar(&bidamount, "bidamount", "", "中标金额")
+	flag.StringVar(&file, "file", "", "文件路径")
+	flag.Parse()
+
+	addr := &net.UDPAddr{
+		IP:   net.ParseIP(ip),
+		Port: p,
+	}
+	udp := mu.UdpClient{Local: ":50010", BufSize: 1024}
+	udp.Listen(func(b byte, data []byte, add *net.UDPAddr) {
+		switch b {
+		case mu.OP_NOOP:
+			//os.Exit(0)
+		}
+	})
+	if itype == 0 {
+		if ip != "" && p > 0 && (budget != "" || bidamount != "") && id != "" {
+			SendUdp(*addr, udp)
+		}else {
+			flag.PrintDefaults()
+			log.Println("参数错误.")
+		}
+	}else {
+		if file != "" {
+			binary, err := ioutil.ReadFile(file)
+			if err == nil {
+				data, _ := ParsData(binary)
+				if len(data) > 0 {
+					for _, v := range data{
+						if qu.ObjToString(v["_id"]) != "" {
+							id = qu.ObjToString(v["_id"])
+							budget = qu.ObjToString(v["budget"])
+							bidamount = qu.ObjToString(v["bidamount"])
+							SendUdp(*addr, udp)
+						}
+					}
+				}
+			}else {
+				qu.Debug("文件解析失败")
+			}
+		}else {
+			flag.PrintDefaults()
+			log.Println("参数错误.")
+		}
+	}
+}
+
+func SendUdp(addr net.UDPAddr, udp mu.UdpClient)  {
+
+	m1 := map[string]interface{}{
+		"id":  id,
+		"stype": "updateMoney",
+	}
+	if budget != "" {
+		m1["budget"] = budget
+	}
+	if bidamount != "" {
+		m1["bidamount"] = bidamount
+	}
+	by, _ := json.Marshal(m1)
+	log.Println(string(by))
+	_ = udp.WriteUdp(by, mu.OP_TYPE_DATA, &addr)
+	time.Sleep(1 * time.Second)
+}
+
+func ParsData(filebyte []byte) ([]map[string]interface{}, error) {
+	var data []map[string]interface{}
+	var keyName []string
+	file, err := xlsx.OpenBinary(filebyte)
+	if err != nil {
+		return data, err
+	}
+
+	for i, v := range file.Sheets[0].Rows {
+		info := make(map[string]interface{})
+		for ii, vv := range v.Cells {
+			if i == 0 {
+				keyName = append(keyName, vv.Value)
+			} else {
+				if vv.Value != "" {
+					info[keyName[ii]] = vv.Value
+				}
+			}
+		}
+		if len(info) > 0 {
+			data = append(data, info)
+		}
+	}
+	return data, nil
+}

+ 0 - 0
fullproject/src/config.json → fullproject/src_back/config.json


+ 0 - 0
fullproject/src/init.go → fullproject/src_back/init.go


+ 0 - 0
fullproject/src/load_data.go → fullproject/src_back/load_data.go


+ 0 - 0
fullproject/src/main.go → fullproject/src_back/main.go


+ 0 - 0
fullproject/src/merge_comparepnc.go → fullproject/src_back/merge_comparepnc.go


+ 0 - 0
fullproject/src/merge_select.go → fullproject/src_back/merge_select.go


+ 0 - 0
fullproject/src/project.go → fullproject/src_back/project.go


+ 0 - 0
fullproject/src/task.go → fullproject/src_back/task.go


+ 18 - 2
fullproject/src_v1/config.json

@@ -1,5 +1,5 @@
 {
-    "loadStart": 0,
+    "loadStart": 1,
 	"validdays":150,
     "statusdays": 15,
 	"mongodbServers": "192.168.3.207:27092",
@@ -15,7 +15,23 @@
         "to": "wangjianghan@topnet.net.cn",
         "api": "http://10.171.112.160:19281/_send/_mail"
     },
-    "udpport": "1182",
+    "es": {
+        "addr": "http://192.168.3.128:9800",
+        "index": "projectset_v3",
+        "itype": "projectset",
+        "pool": 10
+    },
+    "udpport": ":1482",
     "nextNode": [
+        {
+            "addr": "192.168.20.104",
+            "port": 1483,
+            "memo": "创建项目索引new"
+        },
+        {
+            "addr": "127.0.0.1",
+            "port": 14833,
+            "memo": "修改项目创建new"
+        }
     ]
 }

+ 1 - 1
fullproject/src_v1/init.go

@@ -302,7 +302,7 @@ type ProjectInfo struct {
 	score         int
 	comStr        string
 	resVal, pjVal int
-	InfoFiled     map[string]InfoField `json:"infofiled"`    //逻辑处理需要的info字段
+	InfoFiled     map[string]InfoField `json:"infofield"`    //逻辑处理需要的info字段
 	Budgettag     int                  `json:"budgettag"`    //预算是否有效标记
 	Bidamounttag  int                  `json:"bidamounttag"` //中标金额是否有效标记
 }

+ 51 - 6
fullproject/src_v1/main.go

@@ -8,6 +8,7 @@ import (
 	"os"
 	"os/signal"
 	"qfw/util"
+	"qfw/util/elastic"
 	"syscall"
 	"time"
 )
@@ -18,6 +19,9 @@ var (
 	SingleClear  = 0
 	toaddr       = []*net.UDPAddr{} //下节点对象
 	ChSign       = make(chan os.Signal)
+	Es 			 *elastic.Elastic
+	Index        string
+	Itype        string
 
 	sid, eid string //测试使用
 )
@@ -32,6 +36,15 @@ func init() {
 			Port: util.IntAll(m["port"]),
 		})
 	}
+
+	es := Sysconfig["es"].(map[string]interface{})
+	Es = &elastic.Elastic{
+		S_esurl: util.ObjToString(es["addr"]),
+		I_size:  util.IntAllDef(es["pool"], 10),
+	}
+	Index = util.ObjToString(es["index"])
+	Itype = util.ObjToString(es["itype"])
+	Es.InitElasticSize()
 }
 
 var queryClose = make(chan bool)
@@ -41,7 +54,7 @@ func DealSign() {
 	for {
 		select {
 		case sign := <-ChSign:
-			log.Println("receive:", sign)
+			//log.Println("receive:", sign)
 			if v, ok := sign.(syscall.Signal); ok && v == os.Interrupt {
 				log.Println("receice signal..,start close iter")
 				if P_QL.Brun {
@@ -58,7 +71,7 @@ func DealSign() {
 	}
 }
 
-func mainT() {
+func main() {
 	//udp跑增量  id段   project
 	//udp跑全量			qlT
 	//udp跑历史数据  信息id1,id2/或id段  ls
@@ -77,9 +90,9 @@ func mainT() {
 }
 
 //测试组人员使用
-func main() {
-	sid = "5c90370ca5cb26b9b72b3d0a"
-	eid = "5d3a88ffa5cb26b9b7755564"
+func mainT() {
+	sid = "56388138af53745d9a000001"
+	eid = "5b671f32a5cb26b9b76ddbb6"
 	//flag.StringVar(&sid, "sid", "", "开始id")
 	//flag.StringVar(&eid, "eid", "", "结束id")
 	//flag.Parse()
@@ -107,6 +120,28 @@ func main() {
 	time.Sleep(20 * time.Second)
 }
 
+func mainS() {
+	id := "5987e5e85d11e1c745d36c4c"
+	mapinfo := map[string]interface{}{}
+	mapinfo["id"] = id
+	mapinfo["stype"] = "updateMoney"
+	mapinfo["budget"] = "12345"
+	mapinfo["ip"] = "127.0.0.1"
+	mapinfo["port"] = Sysconfig["udpport"]
+	if Sysconfig["loadStart"] != nil {
+		loadStart := util.Int64All(Sysconfig["loadStart"])
+		if loadStart > -1 {
+			P_QL.loadData(loadStart)
+		}
+	}
+	P_QL.loadSite()
+	P_QL.currentType = mapinfo["stype"].(string)
+	P_QL.pici = time.Now().Unix()
+	P_QL.taskUpdateMoney(mapinfo)
+	P_QL.Brun = true
+	time.Sleep(20 * time.Second)
+}
+
 //udp调用信号
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
@@ -124,7 +159,6 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
 			SingleThread <- true
 			tasktype, _ := mapInfo["stype"].(string)
-			log.Println("tasktype:", tasktype)
 			switch tasktype {
 			case "ql": //全量合并
 				go func() {
@@ -154,12 +188,23 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					P_QL.pici = time.Now().Unix()
 					P_QL.taskUpdateInfo(mapInfo)
 				}()
+			case "updateMoney": //修改金额
+				go func() {
+					defer func() {
+						<-SingleThread
+					}()
+					P_QL.currentType = tasktype
+					P_QL.pici = time.Now().Unix()
+					P_QL.taskUpdateMoney(mapInfo)
+				}()
 			case "history": //历史数据合并,暂时不写
 				go func() {
 					defer func() {
 						<-SingleThread
 					}()
 				}()
+			default:
+				<-SingleThread
 			}
 		}
 	case mu.OP_NOOP: //下个节点回应

+ 16 - 14
fullproject/src_v1/project.go

@@ -567,7 +567,7 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 	}
 
 	p1.InfoFiled = make(map[string]InfoField)
-	infofiled := InfoField{
+	infofield := InfoField{
 		Budget:       thisinfo.Budget,
 		Bidamount:    thisinfo.Bidamount,
 		ContractCode: thisinfo.ContractCode,
@@ -575,8 +575,8 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 		ProjectCode:  thisinfo.ProjectCode,
 		Bidstatus:    bs,
 	}
-	p1.InfoFiled[thisinfo.Id] = infofiled
-	res := StructToMap(infofiled)
+	p1.InfoFiled[thisinfo.Id] = infofield
+	res := StructToMap(infofield)
 	set["infofield"] = map[string]interface{}{
 		thisinfo.Id: res,
 	}
@@ -680,7 +680,7 @@ func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info, bidty
 
 //更新项目
 func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info, pInfo *ProjectInfo, weight int, comStr string, ex int) {
-	if p.currentType != "ql" && p.currentType != "updateInfo" {
+	if p.currentType != "updateInfo" {
 		if BinarySearch(pInfo.Ids, thisinfo.Id) > -1 {
 			log.Println("repeat", thisinfo.Id)
 			return
@@ -727,12 +727,12 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 			}
 		}
 	} else if thisinfo.SubType == "合同" {
-		if pInfo.Bidstatus == "中标" || pInfo.Bidstatus == "成交" {
+		if pInfo.Bidstatus == "中标" || pInfo.Bidstatus == "成交" || pInfo.Bidstatus == "" {
 			//中标、成交不更新jgtime
-			return
+		}else {
+			set["jgtime"] = tmp["publishtime"]
+			pInfo.Jgtime = thisinfo.Publishtime
 		}
-		set["jgtime"] = tmp["publishtime"]
-		pInfo.Jgtime = thisinfo.Publishtime
 	}
 	if thisinfo.Bidopentime > pInfo.Bidopentime {
 		pInfo.Bidopentime = thisinfo.Bidopentime
@@ -759,9 +759,11 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		} else if thisinfo.Infoformat == 2 {
 			set["bidstatus"] = "拟建"
 			pInfo.Bidstatus = "拟建"
-		} else if bs == "" {
-			set["bidstatus"] = ""
-			pInfo.Bidstatus = ""
+		}else if bs == "" && bt == "结果" {
+			if pInfo.Bidstatus == "招标" {
+				set["bidstatus"] = ""
+				pInfo.Bidstatus = ""
+			}
 		} else {
 			set["bidstatus"] = "其它"
 			pInfo.Bidstatus = "其它"
@@ -900,7 +902,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		set["sortprice"] = pInfo.Budget
 	}
 
-	infofiled := InfoField{
+	infofield := InfoField{
 		Budget:       thisinfo.Budget,
 		Bidamount:    thisinfo.Bidamount,
 		ContractCode: thisinfo.ContractCode,
@@ -909,12 +911,12 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		Bidstatus:    bs,
 	}
 	copyMap := Copy(pInfo.InfoFiled).(map[string]InfoField)
-	copyMap[thisinfo.Id] = infofiled
+	copyMap[thisinfo.Id] = infofield
 	tmpMap := make(map[string]interface{})
 	for k, v := range copyMap {
 		tmpMap[k] = StructToMap(v)
 	}
-	tmpMap[thisinfo.Id] = StructToMap(infofiled)
+	tmpMap[thisinfo.Id] = StructToMap(infofield)
 	pInfo.InfoFiled = copyMap
 	set["infofield"] = tmpMap
 

+ 177 - 16
fullproject/src_v1/task.go

@@ -3,11 +3,12 @@ package main
 import (
 	"encoding/json"
 	"fmt"
+	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
-
 	"qfw/util"
 	"regexp"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -229,25 +230,12 @@ func (p *ProjectTask) clearMem() {
 		}
 	})
 	c.Start()
-	select {}
 }
 
 //全量合并
 func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
 	defer util.Catch()
-	//1、检查pubilshtime索引
-	db, _ := udpInfo["db"].(string)
-	if db == "" {
-		db = MongoTool.DbName
-	}
-	coll, _ := udpInfo["coll"].(string)
-	if coll == "" {
-		coll = ExtractColl
-	}
-	thread := util.IntAllDef(Thread, 4)
-	if thread > 0 {
-		p.thread = thread
-	}
+	p.thread = util.IntAllDef(Thread, 4)
 	q, _ := udpInfo["query"].(map[string]interface{})
 	if q == nil {
 		q = map[string]interface{}{}
@@ -271,7 +259,7 @@ func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
 	}
 	//生成查询语句执行
 	log.Println("查询语句:", q)
-	p.enter(db, coll, q)
+	p.enter(MongoTool.DbName, ExtractColl, q)
 
 }
 
@@ -351,6 +339,179 @@ func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
 	p.enter(db, coll, q)
 }
 
+//修改公告信息的预算/中标金额
+func (p *ProjectTask) taskUpdateMoney(udpInfo map[string]interface{}) {
+	defer util.Catch()
+	id := udpInfo["id"].(string)
+	budget := util.ObjToString(udpInfo["budget"])
+	bidamount := util.ObjToString(udpInfo["bidamount"])
+	if budget == "" && bidamount == "" {
+		util.Debug("")
+		return
+	}
+
+	client := Es.GetEsConn()
+	defer Es.DestoryEsConn(client)
+	esquery := `{"query": {"bool": {"must": [{"term": {"list.infoid": "`+id+`"}}]}}}`
+	data := Es.Get(Index, Itype, esquery)
+	util.Debug(*data)
+	if data != nil {
+		pid := util.ObjToString((*data)[0]["_id"])
+		pro := MongoTool.FindById(ProjectColl, pid)
+		var info *map[string]interface{}
+		for _, v := range []interface{}(pro["list"].(primitive.A)){
+			v1 := v.(map[string]interface{})
+			if util.ObjToString(v1["infoid"]) == id {
+				info = util.ObjToMap(v)
+				infoField := util.ObjToMap(pro["infofield"])
+				if budget != "" {
+					if budget != "del" {
+						newBudget, _ := strconv.ParseFloat(budget, 64)
+						if pro["budget"] == (*info)["budget"] {
+							pro["budget"] = newBudget
+						}
+						if util.IntAll(pro["multipackage"]) == 1 {
+							if packages, ok := pro["package"].(map[string]interface{}); ok {
+							M :
+									for _, v := range packages{
+										v1 := []interface{}(v.(primitive.A))
+										for _, v2 := range v1{
+											v3 := v2.(map[string]interface{})
+											if util.ObjToString(v3["infoid"]) == id {
+												if v3["budget"] != nil {
+													v3["budget"] = newBudget
+												}
+											}else {
+												break M
+											}
+										}
+									}
+							}
+						}
+						if pro["sortprice"] == (*info)["budget"] {
+							pro["sortprice"] = newBudget
+						}
+						(*info)["budget"] = newBudget
+						(*util.ObjToMap((*infoField)[id]))["budget"] = newBudget
+					}else {
+						delete(*info, "budget")
+						delete(*util.ObjToMap((*infoField)[id]), "budget")
+						if pro["budget"] == (*info)["budget"] {
+							money := FindMoney("budget", pro)
+							if money >= 0 {
+								pro["budget"] = money
+							}
+						}
+					}
+				}
+				if bidamount != "" {
+					if bidamount != "del" {
+						newBidamount, _ := strconv.ParseFloat(bidamount, 64)
+						if pro["bidamount"] == (*info)["bidamount"] {
+							pro["bidamount"] = newBidamount
+						}
+						if util.IntAll(pro["multipackage"]) == 1 {
+							if packages, ok := pro["package"].(map[string]interface{}); ok {
+							N :
+								for _, v := range packages{
+									v1 := []interface{}(v.(primitive.A))
+									for _, v2 := range v1{
+										v3 := v2.(map[string]interface{})
+										if util.ObjToString(v3["infoid"]) == id {
+											if v3["bidamount"] != nil {
+												v3["bidamount"] = newBidamount
+											}
+										}else {
+											break N
+										}
+									}
+								}
+							}
+						}
+						if pro["sortprice"] == (*info)["bidamount"] {
+							pro["sortprice"] = newBidamount
+						}
+						(*info)["bidamount"] = newBidamount
+						(*util.ObjToMap((*infoField)[id]))["bidamount"] = newBidamount
+					}else {
+						delete((*info), "bidamount")
+						delete((*util.ObjToMap((*infoField)[id])), "bidamount")
+						if pro["bidamount"] == (*info)["bidamount"] {
+							money := FindMoney("bidamount", pro)
+							if money >= 0 {
+								pro["bidamount"] = money
+							}
+						}
+					}
+				}
+				break
+			}
+		}
+		var project *ProjectInfo
+		var pInfo *Info
+		bys, _ := json.Marshal(pro)
+		_ = json.Unmarshal(bys, &project)
+		bys1, _ := json.Marshal(info)
+		_ = json.Unmarshal(bys1, &pInfo)
+		CountAmount(project, pInfo, *info)
+
+		if project.Budget > 0 {
+			util.Debug(project.Budget)
+			pro["budget"] = project.Budget
+			pro["budgettag"] = 0
+		}
+		if project.Bidamount > 0 {
+			util.Debug(project.Bidamount)
+			pro["bidamount"] = project.Bidamount
+			pro["bidamounttag"] = 0
+		}
+		set := map[string]interface{}{
+			"$set": pro,
+		}
+		MongoTool.UpdateById(ProjectColl, pid, set)
+
+		loadStart := util.Int64All(Sysconfig["loadStart"])
+		if loadStart > -1 && project.LastTime >loadStart {
+			util.Debug("内存中存在该项目信息", project.Id)
+			p.AllIdsMapLock.Lock()
+			p.AllIdsMap[pid].P = project
+			p.AllIdsMapLock.Unlock()
+		}
+
+		bol := Es.DelById(Index, Itype, pid)
+		if bol {
+			util.Debug("删除es索引, pid------", pid)
+			//调udp生索引
+			by, _ := json.Marshal(map[string]interface{}{
+				"query": map[string]interface{}{
+					"_id": bson.M{
+						"$gte": pid,
+						"$lte": pid,
+					}},
+				"stype": "project",
+			})
+			_ = udpclient.WriteUdp(by, mu.OP_TYPE_DATA, toaddr[1])
+		}
+	}
+}
+
+func FindMoney(key string, project map[string]interface{}) float64 {
+	money := -0.1
+	for i, v := range []interface{}(project["list"].(primitive.A)){
+		v1 := v.(map[string]interface{})
+		if i == 0 {
+			if v1[key] != nil {
+				money = util.Float64All(v1[key])
+			}
+		}else {
+			if v1[key] != nil && util.Float64All(v1[key]) > money {
+				money = util.Float64All(v1[key])
+			}
+		}
+	}
+	return money
+}
+
 func StringTOBsonId(id string) primitive.ObjectID {
 	objectId, _ := primitive.ObjectIDFromHex(id)
 	return objectId

+ 7 - 7
fullproject/src_v1/update.go

@@ -15,7 +15,7 @@ func (p *ProjectTask) modifyUpdate(pInfoId string, index int, info *Info, tmp ma
 	infoList := []interface{}(tmpPro["list"].(primitive.A))
 	infoMap := infoList[index].(map[string]interface{})
 	infoList[index] = updateValue(infoMap, modifyMap)
-	infofiled := InfoField{
+	infofield := InfoField{
 		Budget:       info.Budget,
 		Bidamount:    info.Bidamount,
 		ContractCode: info.ContractCode,
@@ -23,8 +23,8 @@ func (p *ProjectTask) modifyUpdate(pInfoId string, index int, info *Info, tmp ma
 		ProjectCode:  info.ProjectCode,
 		Bidstatus:    info.SubType,
 	}
-	mapTmp, _ := tmpPro["infofiled"].(map[string]interface{})
-	mapTmp[info.Id] = StructToMap(infofiled)
+	mapTmp, _ := tmpPro["infofield"].(map[string]interface{})
+	mapTmp[info.Id] = StructToMap(infofield)
 	set := map[string]interface{}{
 		"$set": tmpPro,
 	}
@@ -70,7 +70,7 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index int, info *Info, tmp
 				infoList := []interface{}(tmpPro["list"].(primitive.A))
 				infoMap := infoList[index].(map[string]interface{})
 				infoList[index] = updateValue(infoMap, modifyMap)
-				infofiled := InfoField{
+				infofield := InfoField{
 					Budget:       info.Budget,
 					Bidamount:    info.Bidamount,
 					ContractCode: info.ContractCode,
@@ -78,8 +78,8 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index int, info *Info, tmp
 					ProjectCode:  info.ProjectCode,
 					Bidstatus:    info.SubType,
 				}
-				mapTmp, _ := tmpPro["infofiled"].(map[string]interface{})
-				mapTmp[info.Id] = StructToMap(infofiled)
+				mapTmp, _ := tmpPro["infofield"].(map[string]interface{})
+				mapTmp[info.Id] = StructToMap(infofield)
 				set := map[string]interface{}{
 					"$set": tmpPro,
 				}
@@ -271,7 +271,7 @@ func (p *ProjectTask) innerMerge(pInfo *ProjectInfo, info *Info, tmp map[string]
 		}
 		mergeProject(p, pInfo, info, tmpPro, tmp)
 	}
-	mapTmp, _ := tmpPro["infofiled"].(map[string]interface{})
+	mapTmp, _ := tmpPro["infofield"].(map[string]interface{})
 	delete(mapTmp, info.Id)
 	set := map[string]interface{}{
 		"$set": tmpPro,

+ 52 - 0
fullproject/udp/src/main.go

@@ -0,0 +1,52 @@
+package main
+
+import (
+	"encoding/json"
+	"flag"
+	"log"
+	mu "mfw/util"
+	"net"
+	"os"
+	"time"
+)
+
+var ip, sid, eid, stype string
+var p int
+
+func main() {
+
+	flag.StringVar(&sid, "sid", "", "开始id")
+	flag.StringVar(&eid, "eid", "", "结束id")
+	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
+	flag.IntVar(&p, "p", 1482, "端口")
+	flag.StringVar(&stype, "stype", "", "stype")
+	flag.Parse()
+
+	if ip != "" && p > 0 && sid != "" && eid != "" {
+		addr := &net.UDPAddr{
+			IP:   net.ParseIP(ip),
+			Port: p,
+		}
+		udp := mu.UdpClient{Local: ":50010", BufSize: 1024}
+		udp.Listen(func(b byte, data []byte, add *net.UDPAddr) {
+			switch b {
+			case mu.OP_NOOP:
+				log.Println(string(data))
+				os.Exit(0)
+			}
+		})
+		m1 := map[string]interface{}{
+			"gtid":  sid,
+			"lteid": eid,
+			"stype": stype,
+		}
+
+		by, _ := json.Marshal(m1)
+		log.Println(string(by))
+		udp.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+		time.Sleep(30 * time.Second)
+	} else {
+		flag.PrintDefaults()
+		log.Println("参数错误.")
+	}
+}

+ 5 - 4
udpcreateindex/src/biddingall.go

@@ -84,10 +84,11 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 		// 	tmp = make(map[string]interface{})
 		// 	continue
 		// }
-		if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive != "" { //bidding中有敏感词,不生索引
-			tmp = make(map[string]interface{})
-			continue
-		}
+
+		// if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive != "" { //bidding中有敏感词,不生索引
+		// 	tmp = make(map[string]interface{})
+		// 	continue
+		// }
 		update := map[string]interface{}{}
 		del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段
 		//对比方法----------------

+ 1 - 5
udpcreateindex/src/biddingindex.go

@@ -138,14 +138,10 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 	log.Println("开始迭代..")
 	for n, tmp := range infos {
 		n1++
-		// if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
+		// if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive != "" { //bidding中有敏感词,不生索引
 		// 	tmp = make(map[string]interface{})
 		// 	continue
 		// }
-		if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive != "" { //bidding中有敏感词,不生索引
-			tmp = make(map[string]interface{})
-			continue
-		}
 		update := map[string]interface{}{} //要更新的mongo数据
 		//对比方法----------------
 		tid := qutil.BsonIdToSId(tmp["_id"])

+ 12 - 8
udpcreateindex/src/projectindex.go

@@ -23,17 +23,21 @@ func projectTask(data []byte, project, mapInfo map[string]interface{}) {
 				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
 			},
 		}
-	}
-	idMap, _ := q["_id"].(map[string]interface{})
-	if idMap != nil {
-		tmpQ := map[string]interface{}{}
-		for c, id := range idMap {
-			if idStr, ok := id.(string); ok && id != "" {
-				tmpQ[c] = util.StringTOBsonId(idStr)
+	} else {
+		if q["pici"] == nil {
+			idMap, _ := q["_id"].(map[string]interface{})
+			if idMap != nil {
+				tmpQ := map[string]interface{}{}
+				for c, id := range idMap {
+					if idStr, ok := id.(string); ok && id != "" {
+						tmpQ[c] = util.StringTOBsonId(idStr)
+					}
+				}
+				q["_id"] = tmpQ
 			}
 		}
-		q["_id"] = tmpQ
 	}
+
 	var session *mgov.Session
 	if project["addr"] != nil {
 		session = project2db.GetMgoConn(3600)

+ 3 - 3
udpfilterdup/src/config.json

@@ -5,8 +5,8 @@
         "addr": "192.168.3.207:27092",
         "pool": 5,
         "db": "extract_kf",
-        "extract": "zk_zk_test",
-        "extract_back": "zk_zk_test",
+        "extract": "zk_move",
+        "extract_back": "zk_move",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"
@@ -24,7 +24,7 @@
     "timingTask":false,
     "timingSpanDay": 3,
     "timingPubScope": 720,
-    "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
+    "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
     "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
     "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",

+ 28 - 1
udpfilterdup/src/dataMethod.go

@@ -76,6 +76,33 @@ func againRepeat(v *Info, info *Info) bool {
 	return false
 }
 
+////站点再次判断
+//func againSite(v *Info, info *Info) bool {
+//
+//	if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
+//		return true
+//	}
+//	if isBidWinningAmount(v.bidamount,info.bidamount) && v.bidamount != 0 && info.bidamount != 0{
+//		return true
+//	}
+//	if deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) && v.winner != "" && info.winner != "" {
+//		return true
+//	}
+//	if v.contractnumber != "" && info.contractnumber != "" && v.contractnumber != info.contractnumber {
+//		return true
+//	}
+//	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
+//		return true
+//	}
+//
+//	return false
+//}
+
+
+
+
+
+
 //删除中标单位字符串中多余的空格(含tab)
 func deleteExtraSpace(s string) string {
 	//删除字符串中的多余空格,有多个空格时,仅保留一个空格
@@ -112,7 +139,7 @@ func isBidopentimeInterval(i1 int64 ,i2 int64) bool {
 	day1 := qutil.FormatDateByInt64(&timeOne, qutil.Date_yyyyMMdd)
 	day2 := qutil.FormatDateByInt64(&timeTwo, qutil.Date_yyyyMMdd)
 	if day1==day2 {
-		//是否间隔超过小时
+		//是否间隔超过十二小时
 		if math.Abs(float64(i1-i2)) >43200.0 {
 			return true
 		}else {

+ 15 - 4
udpfilterdup/src/datamap.go

@@ -37,6 +37,7 @@ type Info struct {
 	specialWord      bool                   //再次判断的特殊词
 	mergemap         map[string]interface{} //合并记录
 	is_site          bool                   //是否站点城市
+	repeat_ids        []string               //记录所有重复id
 
 }
 
@@ -231,6 +232,11 @@ func NewInfo(tmp map[string]interface{}) *Info {
 	if info.mergemap == nil {
 		info.mergemap = make(map[string]interface{}, 0)
 	}
+	if info.repeat_ids == nil {
+		info.repeat_ids = make([]string, 0)
+	}
+
+
 
 	info.is_site = false
 
@@ -305,7 +311,7 @@ L:
 						break L
 					}
 					if info.href != "" && info.href != v.href { //待优化
-						if v.title==info.title&&len([]rune(info.title)) >10 && isTheSameDay(info.publishtime,v.publishtime){
+						if v.title==info.title{
 							if !againRepeat(v, info) {//进行同站点二次判断
 								reason = "同站点-href不同-标题相同等"
 								b = true
@@ -316,7 +322,9 @@ L:
 								continue
 							}
 						}else {
-							continue
+							if againRepeat(v, info) {//进行同站点二次判断
+								continue
+							}
 						}
 					}
 				}
@@ -324,7 +332,7 @@ L:
 				specialNum:= dealWithSpecialWordNumber(info,v)
 				//前置条件 - 标题相关,有且一个关键词
 				if specialNum==1 {
-					if info.title != v.title && v.title != "" && info.title != "" {
+					if againRepeat(v, info) {
 						continue
 					}
 				}
@@ -352,7 +360,10 @@ L:
 						}else {
 							if !(strings.Contains(letter1, letter2) || strings.Contains(letter2, letter1)) {
 								//无包含关系-即不相等
-								continue
+								if againRepeat(v, info) {
+									continue
+								}
+
 							}
 						}
 					}

+ 37 - 12
udpfilterdup/src/main.go

@@ -182,15 +182,6 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 
 
 
-
-
-
-
-
-
-
-
-
 //开始判重程序
 func task(data []byte, mapInfo map[string]interface{}) {
 	log.Println("开始数据判重")
@@ -292,6 +283,23 @@ func task(data []byte, mapInfo map[string]interface{}) {
 					updateID["_id"] = info.id
 				}
 
+				repeat_ids:=source.repeat_ids
+				repeat_ids =  append(repeat_ids,info.id)
+				source.repeat_ids = repeat_ids
+				//替换数据池-更新
+				DM.replacePoolData(source)
+				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
+					map[string]interface{}{
+						"_id": StringTOBsonId(source.id),
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"repeat_ids": repeat_ids,
+						},
+					},
+				})
+
+
 				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
 					updateID,
 					map[string]interface{}{
@@ -349,7 +357,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	time.Sleep(60 * time.Second)
 
 	//任务完成,开始发送广播通知下面节点
-	if n > repeateN && mapInfo["stop"] == nil {
+	if n >= repeateN && mapInfo["stop"] == nil {
 		log.Println("判重任务完成发送udp")
 		for _, to := range nextNode {
 			sid, _ := mapInfo["gtid"].(string)
@@ -561,6 +569,23 @@ func historyTaskDay() {
 					if b { //有重复,生成更新语句,更新抽取和更新招标
 						repeateN++
 						//重复数据打标签
+						repeat_ids:=source.repeat_ids
+						repeat_ids =  append(repeat_ids,info.id)
+						source.repeat_ids = repeat_ids
+						//替换数据池-更新
+						DM.replacePoolData(source)
+						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
+							map[string]interface{}{
+								"_id": StringTOBsonId(source.id),
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"repeat_ids": repeat_ids,
+								},
+							},
+						})
+
+
 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 							map[string]interface{}{
 								"_id": tmp["_id"],
@@ -603,7 +628,7 @@ func historyTaskDay() {
 
 
 		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
-		if n > repeateN {
+		if n >= repeateN && gtid!=lteid{
 			for _, to := range nextNode {
 				next_sid := util.BsonIdToSId(gtid)
 				next_eid := util.BsonIdToSId(lteid)
@@ -714,11 +739,11 @@ func moveOnceTimeOut()  {
 	}
 	log.Println("save and delete", " ok index", index)
 
+}
 
 
 
 
-}