瀏覽代碼

更换连接池

5 年之前
父節點
當前提交
65bf574c11

+ 15 - 12
fullproject/src/project.go

@@ -20,6 +20,8 @@ import (
 func (p *ProjectTask) getCompareIds(pn, pc, ptc, pb string) (bpn, bpc, bptc, bpb int, res []*Key, idArr []string, IDArr []*ID) {
 	p.findLock.Lock()
 	defer p.findLock.Unlock()
+	//	p.ConCurrentLock(n1, n2, n3, n4)
+	//	defer p.ConCurrentUnLock(n1, n2, n3, n4)
 	p.wg.Add(1)
 	//查找到id数组
 	res = []*Key{}
@@ -95,7 +97,7 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 	//bpn, bpc, bptc, bpb 是否查找到,并标识位置。-1代表未查找到。
 	//pids 是项目id数组集合
 	//IDArr,是单个项目ID对象集合
-	bpn, bpc, bptc, bpb, pids, _, IDArr := p.getCompareIds(info.ProjectName, info.ProjectCode, info.PTC, info.Buyer)
+	bpn, bpc, bptc, bpb, pids, _, IDArr := p.getCompareIds(info.ProjectName, info.ProjectCode, info.PTC, info.Buyer) //, info.LenPN, info.LenPC, info.LenPTC, len([]rune(info.Buyer)))
 	defer p.wg.Done()
 	//map--k为pn,ptn,pc,ptc,buyer值 v为Id数组和lock
 
@@ -119,7 +121,7 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 		compareProject.score = 0
 		//问题出地LastTime!!!!!
 		diffTime := int64(math.Abs(float64(info.Publishtime - compareProject.LastTime)))
-		if diffTime < 185*86400 {
+		if diffTime <= p.validTime {
 			//"A 相等 	B 被包含 	C 不相等	 	D不存在  E被包含
 			info.PNBH = 0
 			info.PCBH = 0
@@ -417,6 +419,7 @@ var bidstatus = map[string]string{
 	"成交": "成交",
 	"废标": "废标",
 	"流标": "流标",
+	"合同": "合同",
 }
 
 //招标时间zbtime、中标时间jgtime、项目状态bidstatus、招标类型bidtype、最后发布时间lasttime、首次发布时间firsttime
@@ -472,15 +475,15 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 	set["list"] = []bson.M{
 		push,
 	}
-	p.savePool <- set
-	//	[]map[string]interface{}{
-	//		map[string]interface{}{
-	//			"_id": pId,
-	//		},
-	//		map[string]interface{}{
-	//			"$set": ,
-	//		},
-	//	}
+	//p.savePool <- set
+	p.updatePool <- []map[string]interface{}{
+		map[string]interface{}{
+			"_id": pId,
+		},
+		map[string]interface{}{
+			"$set": set,
+		},
+	}
 	return pId.Hex(), &p1
 }
 
@@ -687,7 +690,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 				sort.Strings(pInfo.Winners)
 			}
 		}
-		set["winners"] = pInfo.Winners
+		//set["winners"] = pInfo.Winners
 		set["s_winner"] = strings.Join(pInfo.Winners, ",")
 	}
 

+ 158 - 21
fullproject/src/task.go

@@ -35,6 +35,7 @@ type ProjectTask struct {
 	AllIdsMap map[string]*ID
 	//采购单位、项目名称、项目编号
 	mapPb, mapPn, mapPc map[string]*Key
+	//	mapPbLock, mapPnLock, mapPcLock sync.Mutex
 	//更新或新增通道
 	updatePool           chan []map[string]interface{}
 	savePool             chan map[string]interface{}
@@ -48,27 +49,123 @@ type ProjectTask struct {
 	//当前时间
 	currentTime int64
 	//保存长度
-	saveSize int
-	pici     int64
+	saveSize  int
+	pici      int64
+	validTime int64
+	//	LockPool     chan *sync.Mutex
+	//	LockPoolLock sync.Mutex
+	//	m1, m23, m4  map[int]int
+	//	l1, l23, l4  map[int]*sync.Mutex
 }
 
+//func (p *ProjectTask) ConCurrentLock(n1, n2, n3, n4 int) {
+//	var lock *sync.Mutex
+//	p.LockPoolLock.Lock()
+//	if p.m1[n1] > 0 || p.m23[n2] > 0 || p.m23[n3] > 0 || p.m4[n4] > 0 {
+//		if p.l1[n1] != nil {
+//			lock = p.l1[n1]
+//		} else if p.l23[n2] != nil {
+//			lock = p.l23[n2]
+//		} else if p.l23[n3] != nil {
+//			lock = p.l23[n3]
+//		} else if p.l4[n4] != nil {
+//			lock = p.l4[n4]
+//		}
+//	} else {
+//		lock = <-p.LockPool
+//	}
+//	if n1 > 0 {
+//		p.m1[n1]++
+//		p.l1[n1] = lock
+//	}
+//	if n2 > 0 {
+//		p.m23[n2]++
+//		p.l23[n2] = lock
+//	}
+//	if n3 > 0 {
+//		p.m23[n3]++
+//		p.l23[n3] = lock
+//	}
+//	if n4 > 0 {
+//		p.m4[n4]++
+//		p.l4[n4] = lock
+//	}
+//	p.LockPoolLock.Unlock()
+//	lock.Lock()
+//}
+
+//func (p *ProjectTask) ConCurrentUnLock(n1, n2, n3, n4 int) {
+//	var lock1 *sync.Mutex
+//	p.LockPoolLock.Lock()
+//	if p.l1[n1] != nil {
+//		lock1 = p.l1[n1]
+//	} else if p.l23[n2] != nil {
+//		lock1 = p.l23[n2]
+//	} else if p.l23[n3] != nil {
+//		lock1 = p.l23[n3]
+//	} else if p.l4[n4] != nil {
+//		lock1 = p.l4[n4]
+//	}
+//	if p.m1[n1] > 0 {
+//		p.m1[n1]--
+//		if p.m1[n1] == 0 {
+//			p.l1[n1] = nil
+//		}
+//	}
+//	if p.m23[n2] > 0 {
+//		p.m23[n2]--
+//		if p.m23[n2] == 0 {
+//			p.l23[n2] = nil
+//		}
+//	}
+//	if p.m23[n3] > 0 {
+//		p.m23[n3]--
+//		if p.m23[n3] == 0 {
+//			p.l23[n3] = nil
+//		}
+//	}
+//	if p.m4[n4] > 0 {
+//		p.m4[n4]--
+//		if p.m4[n4] == 0 {
+//			p.l4[n4] = nil
+//		}
+//	}
+//	p.LockPoolLock.Unlock()
+//	lock1.Unlock()
+//}
+
 func NewPT() *ProjectTask {
-	return &ProjectTask{
+	p := &ProjectTask{
 		InitMinTime: int64(1325347200),
 		name:        "全/增量对象",
 		thread:      4,
-		updatePool:  make(chan []map[string]interface{}, 1000),
+		updatePool:  make(chan []map[string]interface{}, 2000),
 		savePool:    make(chan map[string]interface{}, 2000),
 		wg:          sync.WaitGroup{},
-		AllIdsMap:   make(map[string]*ID, 100000),
-		mapPb:       make(map[string]*Key, 1000000),
-		mapPn:       make(map[string]*Key, 1000000),
-		mapPc:       make(map[string]*Key, 1500000),
+		AllIdsMap:   make(map[string]*ID, 10000000),
+		mapPb:       make(map[string]*Key, 2000000),
+		mapPn:       make(map[string]*Key, 5000000),
+		mapPc:       make(map[string]*Key, 5000000),
 		saveSize:    200,
 		saveSign:    make(chan bool, 1),
 		updateSign:  make(chan bool, 1),
 		coll:        ProjectColl,
+		validTime:   180 * 86400,
+		//		LockPool:    make(chan *sync.Mutex, 200),
+		//		m1:          map[int]int{},
+		//		m23:         map[int]int{},
+		//		m4:          map[int]int{},
+		//		l1:          map[int]*sync.Mutex{}, l23: map[int]*sync.Mutex{}, l4: map[int]*sync.Mutex{},
 	}
+	//	for i := 0; i < 200; i++ {
+	//		p.LockPool <- &sync.Mutex{}
+	//	}
+	//	go func() {
+	//		for {
+	//			p.LockPool <- &sync.Mutex{}
+	//		}
+	//	}()
+	return p
 }
 
 var P_QL *ProjectTask
@@ -76,8 +173,9 @@ var P_QL *ProjectTask
 //初始化全量合并对象
 func init() {
 	P_QL = NewPT()
-	go P_QL.saveQueue()
-	go P_QL.updateQueue()
+	go P_QL.updateAllQueue()
+	//	go P_QL.saveQueue()
+	//	go P_QL.updateQueue()
 	go P_QL.clearMem()
 }
 
@@ -140,29 +238,65 @@ func (p *ProjectTask) updateQueue() {
 	}
 }
 
+func (p *ProjectTask) updateAllQueue() {
+	arru := make([][]map[string]interface{}, p.saveSize)
+	indexu := 0
+	sp := make(chan bool, 5)
+	for {
+		select {
+		case v := <-p.updatePool:
+			arru[indexu] = v
+			indexu++
+			if indexu == p.saveSize {
+				sp <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					MongoTool.UpSertBulk(p.coll, arru...)
+				}(arru)
+				arru = make([][]map[string]interface{}, p.saveSize)
+				indexu = 0
+			}
+		case <-time.After(500 * time.Millisecond):
+			if indexu > 0 {
+				sp <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					MongoTool.UpSertBulk(p.coll, arru...)
+				}(arru[:indexu])
+				//MongoTool.UpSertBulk(p.coll, arru[:indexu]...)
+				arru = make([][]map[string]interface{}, p.saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
 //项目合并内存更新
 func (p *ProjectTask) clearMem() {
 	c := cron.New()
 	//在内存中保留最近6个月的信息
-	validTime := int64(6 * 30 * 86400)
 	//跑全量时每4分钟跑一次,跑增量时400分钟跑一次
-	c.AddFunc("50 0/10 * * * *", func() {
+	c.AddFunc("50 0/15 * * * *", func() {
 		if p.currentType == "ql" || p.clearContimes >= 60 {
 			//跳过的次数清零
 			p.clearContimes = 0
 			//信息进入查找对比全局锁
 			p.findLock.Lock()
-			defer p.findLock.Unlock()
+			//defer p.findLock.Unlock()
 			//合并进行的任务都完成
 			p.wg.Wait()
 			//遍历id
 			//所有内存中的项目信息
 			p.AllIdsMapLock.Lock()
-			defer p.AllIdsMapLock.Unlock()
+
 			//清除计数
 			clearNum := 0
 			for k, v := range p.AllIdsMap {
-				if p.currentTime-v.P.LastTime > validTime {
+				if p.currentTime-v.P.LastTime > p.validTime {
 					clearNum++
 					//删除id的map
 					delete(p.AllIdsMap, k)
@@ -209,7 +343,9 @@ func (p *ProjectTask) clearMem() {
 					v = nil
 				}
 			}
-			log.Println("清除完成:", clearNum, len(p.AllIdsMap))
+			p.AllIdsMapLock.Unlock()
+			p.findLock.Unlock()
+			log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb))
 		} else {
 			p.clearContimes++
 		}
@@ -343,10 +479,11 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 	log.Println("start project", q)
 	sess := MongoTool.GetMgoConn()
 	defer MongoTool.DestoryMongoConn(sess)
-	query := sess.DB(db).C(coll).Find(q).Sort("_id").Select(map[string]interface{}{
-		"blocks":   0,
-		"fieldall": 0,
-	}).Iter()
+	query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
+	//	.Select(map[string]interface{}{
+	//		"blocks":   0,
+	//		"fieldall": 0,
+	//	}).Iter()
 	//over := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 		if util.IntAll(tmp["repeat"]) == 0 {
@@ -486,7 +623,7 @@ func ParseInfo(tmp map[string]interface{}) (info *Info) {
 		for _, p := range packageM {
 			pm, _ := p.(map[string]interface{})
 			pw, _ := pm["winner"].(string)
-			if pw != "" {
+			if pw != "" && !m1[pw] {
 				m1[pw] = true
 				winners = append(winners, pw)
 			}

+ 22 - 0
fullproject/src_v1/config.json

@@ -0,0 +1,22 @@
+{
+    "loadStart":-1,
+	"validdays":150,
+	"mongodbServers": "192.168.3.207:27082",
+    "mongodbPoolSize": 10,
+    "mongodbName": "cesuo",
+	"hints":"_id_1_publishtime_1",
+    "extractColl": "key1_biddingall",
+    "projectColl": "projectset_zjk",
+    "jkmail": {
+        "to": "zhangjinkun@topnet.net.cn",
+        "api": "http://10.171.112.160:19281/_send/_mail"
+    },
+    "udpport": ":1482",
+    "nextNode": [
+        {
+            "addr": "172.17.145.163",
+            "port": 1483,
+            "memo": "创建项目索引new"
+        }
+    ]
+}

+ 373 - 0
fullproject/src_v1/init.go

@@ -0,0 +1,373 @@
+package main
+
+import (
+	"log"
+	"math"
+	mu "mfw/util"
+	"qfw/util"
+	//"qfw/util/mongodb"
+	"regexp"
+	"sort"
+	"strings"
+	"sync"
+
+	//"gopkg.in/mgo.v2/bson"
+	//"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+const (
+	ProjectCache = "info" //存放每条项目信息,key为项目ID
+)
+
+var (
+	Sysconfig                map[string]interface{} //读取配置文件
+	MongoTool                *MongodbSim            //mongodb连接
+	ExtractColl, ProjectColl string                 //抽取表、项目表
+	//NextNode                 []interface{}
+)
+
+var (
+	//判断是日期
+	_datereg   = regexp.MustCompile("20[0-2][0-9][年-][0-9]{1,2}[月-][0-9]{1,2}[日-]([0-9]{1,2}时[0-9]{0,2})?")
+	_numreg1   = regexp.MustCompile("^[0-9-]{1,8}$")
+	_zimureg1  = regexp.MustCompile("^[a-zA-Z-]{1,7}$")
+	_nzreg     = regexp.MustCompile("^[0-9a-zA-Z-]+$")
+	_hanreg    = regexp.MustCompile(`^[\p{Han}::【】\\[\\]()()--、]+$`)
+	replaceStr = regexp.MustCompile("(工程|采购|项目|[?!、【】()—()--]|栏标价|中标候选人|招标代理)")
+	//判断带有分包、等特定词的
+	pStr = regexp.MustCompile("(勘察|监理|施工|设计|验收|标段|分包|子包|[0-9A-Z]包|[一二三四五六七八九十0-9]批)")
+	//判断包含数值
+	nreg1 = regexp.MustCompile("[0-9]{2,}")
+	//判断包含字母
+	zreg1 = regexp.MustCompile("[a-zA-Z]{1,}")
+	//判断包含汉字
+	hreg1 = regexp.MustCompile(`[\p{Han}]+`)
+	//判断项目编号是在10以内的纯数字结构
+	numCheckPc = regexp.MustCompile("^[0-9-]{1,10}$")
+	//仅初始化使用
+	compareNoPass = map[string]bool{}
+	compareAB     = map[string]bool{}
+	compareAB2D   = map[string]bool{}
+	compareABD    = map[string]bool{}
+	compareAB2CD  = map[string]bool{}
+	compareABCD   = map[string]bool{}
+)
+
+func init() {
+	util.ReadConfig(&Sysconfig)
+	MongoTool = &MongodbSim{
+		MongodbAddr: Sysconfig["mongodbServers"].(string),
+		Size:        util.IntAll(Sysconfig["mongodbPoolSize"]),
+		DbName:      Sysconfig["mongodbName"].(string),
+	}
+	MongoTool.InitPool()
+
+	ExtractColl = Sysconfig["extractColl"].(string)
+	ProjectColl = Sysconfig["projectColl"].(string)
+	//NextNode = Sysconfig["nextNode"].([]interface{})
+	udpport, _ := Sysconfig["udpport"].(string)
+	udpclient = mu.UdpClient{Local: udpport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Println("Udp服务监听", udpport)
+
+	//加载项目数据
+
+	//---不能通过
+	vm := []string{"C", "D"}
+	for i := 0; i < 2; i++ {
+		for j := 0; j < 2; j++ {
+			for k := 0; k < 2; k++ {
+				key := vm[i] + vm[j] + vm[k]
+				compareNoPass[key] = true
+				//fmt.Println(key)
+			}
+		}
+	}
+	//fmt.Println("-------------------")
+
+	//三个元素一致 [AB][AB][AB],分值最高
+	vm = []string{"A", "B"}
+	for i := 0; i < 2; i++ {
+		for j := 0; j < 2; j++ {
+			for k := 0; k < 2; k++ {
+				key := vm[i] + vm[j] + vm[k]
+				compareAB[key] = true
+				//fmt.Println(key)
+			}
+		}
+	}
+	//fmt.Println("-------------------", len(compareAB))
+	//---至少两个一致,其他可能不存在
+	//[AB][AB][ABD]
+	//[AB][ABD][AB]
+	vm = []string{"A", "B"}
+	vm2 := []string{"A", "B", "D"}
+	for i := 0; i < 2; i++ {
+		for j := 0; j < 2; j++ {
+			for k := 0; k < 3; k++ {
+				key := vm[i] + vm[j] + vm2[k]
+				if !compareAB[key] {
+					compareAB2D[key] = true
+					//fmt.Println(key)
+
+				}
+			}
+		}
+	}
+	for i := 0; i < 2; i++ {
+		for j := 0; j < 3; j++ {
+			for k := 0; k < 2; k++ {
+				key := vm[i] + vm2[j] + vm[k]
+				if !compareAB[key] {
+					compareAB2D[key] = true
+					//fmt.Println(key)
+
+				}
+			}
+		}
+	}
+	//fmt.Println("-------------------", len(compareAB2D))
+	//---至少一个一致,其他可能不存在
+	//[ABD][ABD][ABD] //已经删除DDD
+	vm = []string{"A", "B", "D"}
+	for i := 0; i < 3; i++ {
+		for j := 0; j < 3; j++ {
+			for k := 0; k < 3; k++ {
+				key := vm[i] + vm[j] + vm[k]
+				if !compareAB[key] && !compareAB2D[key] && !compareNoPass[key] {
+					compareABD[key] = true
+					//fmt.Println(key)
+				}
+			}
+		}
+	}
+	//fmt.Println("-------------------", len(compareABD))
+
+	//[AB][ABCD][AB]
+	//[AB][AB][ABCD]
+	vm = []string{"A", "B"}
+	vm2 = []string{"A", "B", "C", "D"}
+	for i := 0; i < 2; i++ {
+		for j := 0; j < 4; j++ {
+			for k := 0; k < 2; k++ {
+				key := vm[i] + vm2[j] + vm[k]
+				if !compareAB[key] && !compareAB2D[key] && !compareNoPass[key] && !compareABD[key] {
+					compareAB2CD[key] = true
+					//fmt.Println(key)
+				}
+			}
+		}
+	}
+	for i := 0; i < 2; i++ {
+		for j := 0; j < 2; j++ {
+			for k := 0; k < 4; k++ {
+				key := vm[i] + vm[j] + vm2[k]
+				if !compareAB[key] && !compareAB2D[key] && !compareNoPass[key] && !compareABD[key] {
+					compareAB2CD[key] = true
+					//fmt.Println(key)
+				}
+			}
+		}
+	}
+	//fmt.Println("-------------------", len(compareAB2CD))
+	//[ABECD][ABECD][ABECD]  //已经删除[CD][CD][CD]   //这个要重点讨论
+	vm = []string{"A", "B", "C", "D"}
+	for i := 0; i < 4; i++ {
+		for j := 0; j < 4; j++ {
+			for k := 0; k < 4; k++ {
+				key := vm[i] + vm[j] + vm[k]
+				if !compareAB[key] && !compareAB2D[key] && !compareABD[key] && !compareNoPass[key] && !compareAB2CD[key] {
+					compareABCD[key] = true
+					//fmt.Println(key)
+				}
+			}
+		}
+	}
+}
+
+func CheckHanAndNum(str string) (b bool) {
+	return nreg1.MatchString(str) && hreg1.MatchString(str)
+}
+func CheckZimuAndNum(str string) (b bool) {
+	return zreg1.MatchString(str) && nreg1.MatchString(str)
+}
+
+type KeyMap struct {
+	Lock sync.Mutex
+	Map  map[string]*Key
+}
+
+type ID struct {
+	Id   string
+	Lock sync.Mutex
+	pos  int
+	P    *ProjectInfo
+}
+type Key struct {
+	Arr  []string
+	Lock sync.Mutex
+}
+type IdAndLock struct {
+	Id   string
+	Lock sync.Mutex
+}
+
+func NewKeyMap() *KeyMap {
+	return &KeyMap{
+		Map:  map[string]*Key{},
+		Lock: sync.Mutex{},
+	}
+}
+
+//招标信息实体类
+type Info struct {
+	Id          string                 `json:"_id"`
+	Href        string                 `json:"href"` //源地址
+	Publishtime int64                  `json:"publishtime"`
+	Title       string                 `json:"title"`
+	TopType     string                 `json:"toptype"`
+	SubType     string                 `json:"subtype"`
+	ProjectName string                 `json:"projectname"`
+	ProjectCode string                 `json:"projectcode"`
+	Buyer       string                 `json:"buyer"`
+	Buyerperson string                 `json:"buyerperson"`
+	Buyertel    string                 `json:"buyertel"`
+	Agency      string                 `json:"agency"`
+	Area        string                 `json:"area"`
+	City        string                 `json:"city"`
+	District    string                 `json:"district"`
+	HasPackage  bool                   // `json:"haspackage"`
+	Package     map[string]interface{} `json:"package"`
+	//PNum          string                 `json:"pnum"`
+	Topscopeclass []string `json:"topscopeclass"`
+	Subscopeclass []string `json:"subscopeclass"`
+	Buyerclass    string   `json:"buyerclass"`
+	Bidopentime   int64    `json:"bidopentime"`
+	Budget        float64  `json:"budget"`
+	Bidamount     float64  `json:"bidamount"`
+	Winners       []string
+	dealtype      int
+
+	Winnerorder []string
+
+	PTC    string //从标题中抽的项目编号
+	pnbval int    //项目名称、编号、采购单位存在的个数
+	LenPC  int    //项目编号长度
+	LenPN  int    //项目名称长度
+	LenPTC int    //标题抽的项目编号长度
+	//以下三个元素做对比,计算包含时候使用
+	PNBH  int //0初始,+包含,-被包含
+	PCBH  int
+	PTCBH int
+}
+
+//项目实体类
+type ProjectInfo struct {
+	Id            primitive.ObjectID `json:"_id"`
+	FirstTime     int64              `json:"firsttime,omitempty"` //项目的最早时间
+	LastTime      int64              `json:"lasttime,omitempty"`  //项目的最后时间
+	Ids           []string           `json:"ids,omitempty"`
+	Topscopeclass []string           `json:"topscopeclass,omitempty"`
+	Subscopeclass []string           `json:"subscopeclass,omitempty"` //子行业分类
+	Winners       []string           `json:"winners,omitempty"`       //中标人
+	ProjectName   string             `json:"projectname,omitempty"`   //项目名称
+	ProjectCode   string             `json:"projectcode,omitempty"`   //项目代码唯一(纯数字的权重低)
+	Buyer         string             `json:"buyer,omitempty"`         //采购单位唯一
+	MPN           []string           `json:"mpn,omitempty"`           //合并后多余的项目名称
+	MPC           []string           `json:"mpc,omitempty"`           //合并后多余的项目编号
+	Buyerperson   string             `json:"buyerperson"`             //采购联系人
+	Buyertel      string             `json:"buyertel"`                //采购联系人电话
+	Agency        string             `json:"agency"`                  //代理机构
+	Area          string             `json:"area"`                    //地区
+	City          string             `json:"city"`                    //地市
+	District      string             `json:"district"`                //区县
+	//HasPackage    bool                   `json:"haspackage"`              //是否有分包
+	Package     map[string]interface{} `json:"package,omitempty"`     //分包的对比对象
+	Buyerclass  string                 `json:"buyerclass"`            //采购单位分类
+	Bidopentime int64                  `json:"bidopentime,omitempty"` //开标时间
+	//	Zbtime        int64                  `json:"zbtime"`        //招标时间
+	//	Jgtime        int64                  `json:"jgtime"`        //结果中标时间
+	Bidamount float64 `json:"bidamount,omitempty"` //中标金额
+	Budget    float64 `json:"budget,omitempty"`    //预算
+	//Winnerorder []string `json:"winnerorder"` //中标候选人
+	score         int
+	comStr        string
+	resVal, pjVal int
+}
+
+//二分字符串查找
+func BinarySearch(s []string, k string) int {
+	sort.Strings(s)
+	lo, hi := 0, len(s)-1
+	for lo <= hi {
+		m := (lo + hi) >> 1
+		if s[m] < k {
+			lo = m + 1
+		} else if s[m] > k {
+			hi = m - 1
+		} else {
+			return m
+		}
+	}
+	return -1
+}
+
+//计算文本相似度
+func CosineSimilar(srcWords1, dstWords1 string) float64 {
+	srcWords, dstWords := strings.Split(srcWords1, ""), strings.Split(dstWords1, "")
+	// get all words
+	allWordsMap := make(map[string]int, 0)
+	for _, word := range srcWords {
+		if _, found := allWordsMap[word]; !found {
+			allWordsMap[word] = 1
+		} else {
+			allWordsMap[word] += 1
+		}
+	}
+	for _, word := range dstWords {
+		if _, found := allWordsMap[word]; !found {
+			allWordsMap[word] = 1
+		} else {
+			allWordsMap[word] += 1
+		}
+	}
+
+	// stable the sort
+	allWordsSlice := make([]string, 0)
+	for word, _ := range allWordsMap {
+		allWordsSlice = append(allWordsSlice, word)
+	}
+
+	// assemble vector
+	srcVector := make([]int, len(allWordsSlice))
+	dstVector := make([]int, len(allWordsSlice))
+	for _, word := range srcWords {
+		if index := BinarySearch(allWordsSlice, word); index != -1 {
+			srcVector[index] += 1
+		}
+	}
+	for _, word := range dstWords {
+		if index := BinarySearch(allWordsSlice, word); index != -1 {
+			dstVector[index] += 1
+		}
+	}
+
+	// calc cos
+	numerator := float64(0)
+	srcSq := 0
+	dstSq := 0
+	for i, srcCount := range srcVector {
+		dstCount := dstVector[i]
+		numerator += float64(srcCount * dstCount)
+		srcSq += srcCount * srcCount
+		dstSq += dstCount * dstCount
+	}
+	denominator := math.Sqrt(float64(srcSq * dstSq))
+
+	v1 := numerator / denominator
+	//	if v1 > 0.6 {
+	//		log.Println(v1, srcWords1, dstWords1)
+	//	}
+	return v1
+}

+ 99 - 0
fullproject/src_v1/load_data.go

@@ -0,0 +1,99 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	"time"
+)
+
+//初始加载数据,默认加载最近6个月的数据
+
+func (p *ProjectTask) loadData(starttime int64) {
+	log.Println("load start..", starttime)
+	p.findLock.Lock()
+	defer p.findLock.Unlock()
+	p.AllIdsMapLock.Lock()
+	defer p.AllIdsMapLock.Unlock()
+	sess := MongoTool.GetMgoConn()
+	defer MongoTool.DestoryMongoConn(sess)
+	q := map[string]interface{}{
+		"lasttime": map[string]interface{}{"$gt": starttime},
+	}
+	it := sess.DB(MongoTool.DbName).C(p.coll).Find(&q).Select(map[string]interface{}{
+		"list": 0,
+	}).Iter()
+	n := 0
+	//	tmp := &ProjectInfo{}
+	pool := make(chan *ProjectInfo, 100)
+	over := make(chan bool)
+	go func() {
+		for {
+			select {
+			case tmp := <-pool:
+				n++
+				if n%10000 == 0 {
+					log.Println("current", n, "\n", tmp.Id, len(p.mapPn), len(p.mapPc), len(p.mapPb)) //, tmp.ProjectName, tmp.MPN, tmp.ProjectCode, tmp.MPC, tmp.Buyer)
+				}
+				if tmp != nil {
+					id := tmp.Id.Hex()
+					for _, v := range append([]string{tmp.ProjectName}, tmp.MPN...) {
+						if v != "" {
+							v = pcReplace.ReplaceAllString(v, "")
+							if v != "" {
+								k := p.mapPn[v]
+								if k == nil {
+									k = &Key{Arr: []string{id}}
+									p.mapPn[v] = k
+								} else {
+									k.Arr = append(k.Arr, id)
+								}
+							}
+						}
+					}
+					for _, v := range append([]string{tmp.ProjectCode}, tmp.MPC...) {
+						if v != "" {
+							v = pcReplace.ReplaceAllString(v, "")
+							if v != "" {
+								k := p.mapPc[v]
+								if k == nil {
+									k = &Key{Arr: []string{id}}
+									p.mapPc[v] = k
+								} else {
+									k.Arr = append(k.Arr, id)
+								}
+							}
+						}
+					}
+					if tmp.Buyer != "" && len([]rune(tmp.Buyer)) > 2 {
+						k := p.mapPb[tmp.Buyer]
+						if k == nil {
+							k = &Key{Arr: []string{id}}
+							p.mapPb[tmp.Buyer] = k
+						} else {
+							k.Arr = append(k.Arr, id)
+						}
+					}
+					p.AllIdsMap[id] = &ID{Id: id, P: tmp}
+				}
+			case <-over:
+				return
+			}
+		}
+	}()
+	for {
+		result := make(map[string]interface{})
+		if it.Next(&result) {
+			go func(res map[string]interface{}) {
+				bys, _ := json.Marshal(result)
+				var tmp *ProjectInfo
+				json.Unmarshal(bys, &tmp)
+				pool <- tmp
+			}(result)
+		} else {
+			break
+		}
+	}
+	time.Sleep(2 * time.Second)
+	over <- true
+	log.Println("load over..", n)
+}

+ 124 - 0
fullproject/src_v1/main.go

@@ -0,0 +1,124 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	mu "mfw/util"
+	"net"
+	"os"
+	"os/signal"
+	"qfw/util"
+	"syscall"
+	"time"
+)
+
+var (
+	udpclient    mu.UdpClient //udp对象
+	SingleThread = make(chan bool, 1)
+	toaddr       = []*net.UDPAddr{} //下节点对象
+	ChSign       = make(chan os.Signal)
+)
+
+func init() {
+	signal.Notify(ChSign)
+	go DealSign()
+	nextNode := util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
+	for _, m := range nextNode {
+		toaddr = append(toaddr, &net.UDPAddr{
+			IP:   net.ParseIP(m["addr"].(string)),
+			Port: util.IntAll(m["port"]),
+		})
+	}
+}
+
+var queryClose = make(chan bool)
+var queryCloseOver = make(chan bool)
+
+func DealSign() {
+	for {
+		select {
+		case sign := <-ChSign:
+			log.Println("receive:", sign)
+			if v, ok := sign.(syscall.Signal); ok && v == os.Interrupt {
+				log.Println("receice signal..,start close iter")
+				queryClose <- true
+				<-queryCloseOver
+				util.ReadConfig(&Sysconfig)
+				log.Println("signal deal over")
+			}
+		}
+	}
+}
+
+func main() {
+	//udp跑增量  id段   project
+	//udp跑全量			ql
+	//udp跑历史数据  信息id1,id2/或id段  ls
+	//udp强制合并  信息id1,id2,id3 [项目id] 不存在时新建  qzhb
+	//udp强制拆分  项目id,信息id1,id2          qzcf
+	//udp重新合并  信息id1,id2,id3             cxhb
+	if Sysconfig["loadStart"] != nil {
+		loadStart := util.Int64All(Sysconfig["loadStart"])
+		if loadStart > -1 {
+			P_QL.loadData(loadStart)
+		}
+	}
+	go checkMapJob()
+	time.Sleep(99999 * time.Hour)
+}
+
+//udp调用信号
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	switch act {
+	case mu.OP_TYPE_DATA: //上个节点的数据
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		log.Println("err:", err, "mapInfo:", mapInfo)
+		if err != nil {
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+		} else if mapInfo != nil {
+			key, _ := mapInfo["key"].(string)
+			if key == "" {
+				key = "udpok"
+			}
+			go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+			SingleThread <- true
+			tasktype, _ := mapInfo["stype"].(string)
+			log.Println("tasktype:", tasktype)
+			switch tasktype {
+			case "ql": //全量合并
+				go func() {
+					defer func() {
+						<-SingleThread
+					}()
+					P_QL.currentType = tasktype
+					P_QL.pici = time.Now().Unix()
+					P_QL.taskQl(mapInfo)
+				}()
+			case "project": //增量合并,未抽取到项目名称或项目编号的不合并  bidding中mergestatus 1已合并 2字段问题不合并 3历史待合并
+				//合同、验收公告在6个月内查询不到可扩展到两年
+				go func() {
+					defer func() {
+						<-SingleThread
+					}()
+					P_QL.currentType = tasktype
+					P_QL.pici = time.Now().Unix()
+					P_QL.taskZl(mapInfo)
+				}()
+
+			case "history": //历史数据合并,暂时不写
+				go func() {
+					defer func() {
+						<-SingleThread
+					}()
+				}()
+			}
+		}
+	case mu.OP_NOOP: //下个节点回应
+		ok := string(data)
+		if ok != "" {
+			udptaskmap.Delete(ok)
+			log.Println("ok:", ok)
+		}
+	}
+}

+ 187 - 0
fullproject/src_v1/merge_comparepnc.go

@@ -0,0 +1,187 @@
+package main
+
+import (
+	"strings"
+)
+
+//对比项目名称、项目编号
+
+func comparePNC(info *Info, compareProject *ProjectInfo) (compareStr string, score int) {
+	if info.ProjectName != "" {
+		pns := []string{}
+		if compareProject.ProjectName != "" {
+			pns = append(pns, compareProject.ProjectName)
+		}
+		if len(compareProject.MPN) > 0 {
+			pns = append(pns, compareProject.MPN...)
+		}
+		ifind := 0
+		templen := 0
+		for _, v := range pns {
+			if info.ProjectName == v {
+				ifind = 1
+				break
+			} else {
+				//if strings.Contains(info.ProjectName, v) || strings.Contains(v, info.ProjectName) ||
+				retv := CheckContain(info.ProjectName, v)
+				if retv == 1 {
+					ifind = 1
+					break
+				} else {
+					v1 := CosineSimilar(info.ProjectName, v)
+					if retv == 2 || v1 > 0.81 {
+						templen = len([]rune(v))
+						ifind = 2
+					} else if ifind == 0 {
+						ifind = 3
+					}
+				}
+			}
+		}
+		switch ifind {
+		case 0:
+			compareStr = "D"
+		case 1:
+			compareStr = "A"
+			score += 4
+			if len([]rune(info.ProjectName)) > 18 {
+				score += 2
+			}
+		case 2:
+			compareStr = "B"
+			score += 2
+			if templen > info.LenPN {
+				templen = info.LenPN
+			}
+			info.PNBH = templen
+			if templen > 12 {
+				score += 1
+			}
+		case 3:
+			compareStr = "C"
+		}
+	} else {
+		compareStr = "D"
+	}
+
+	/*
+				项目编号 - -()() 要注意
+				init_text = ["号","(重)","(第二次)","(重)"]
+		all_clean_mark = ["[","(","【","(","〖","]",")","】",")","〗","-","〔","〕","《","[","]","{","}","{","—"," ","-","﹝","﹞","–"]
+	*/
+	for index, pc := range []string{info.ProjectCode, info.PTC} {
+		if pc != "" {
+			pcs := []string{}
+			if compareProject.ProjectCode != "" {
+				pcs = append(pcs, compareProject.ProjectCode)
+			}
+			if len(compareProject.MPC) > 0 {
+				pcs = append(pcs, compareProject.MPC...)
+			}
+			ifind := 0
+			templen := 0
+			for _, v := range pcs {
+				if pc == v {
+					ifind = 1
+					break
+				} else {
+					// math.Abs(float64(len([]rune(pc))-len([]rune(v)))) < 6
+					//if !_numreg1.MatchString(pc) && !_zimureg1.MatchString(pc) && !_numreg1.MatchString(v) && !_zimureg1.MatchString(v)
+					if strings.Contains(pc, v) || strings.Contains(v, pc) {
+						t1 := pc
+						t2 := v
+						if len(v) > len(pc) {
+							t1 = v
+							t2 = pc
+						}
+						t3 := strings.Replace(t1, t2, "", -1)
+						t3 = _datereg.ReplaceAllString(t3, "")
+						if t3 == "" {
+							ifind = 1
+							break
+						} else {
+							ifind = 2
+							templen = len([]rune(v))
+						}
+					} else if ifind == 0 {
+						ifind = 3
+					}
+				}
+			}
+			switch ifind {
+			case 0:
+				compareStr += "D"
+			case 1:
+				compareStr += "A"
+				score += 4
+				if len([]rune(pc)) > 18 {
+					score += 2
+				}
+			case 2:
+				compareStr += "B"
+				score += 2
+				if index == 0 {
+					if templen > info.LenPC {
+						templen = info.LenPC
+					}
+					info.PCBH = templen
+					if templen > 12 {
+						score += 1
+					}
+
+				} else {
+					if templen > info.LenPTC {
+						templen = info.LenPTC
+					}
+					info.PTCBH = templen
+					if templen > 12 {
+						score += 1
+					}
+				}
+
+			case 3:
+				compareStr += "C"
+			}
+
+		} else {
+			compareStr += "D"
+		}
+	}
+	return
+}
+
+func CheckContain(b1, b2 string) (res int) {
+	b1 = replaceStr.ReplaceAllString(b1, "")
+	b2 = replaceStr.ReplaceAllString(b2, "")
+
+	if b1 == b2 {
+		res = 1 //相等
+		return
+	}
+	bs1 := []rune(b1)
+	bs2 := []rune(b2)
+	tmp := ""
+	for i := 0; i < len(bs1); i++ {
+		for j := 0; j < len(bs2); j++ {
+			if bs1[i] == bs2[j] {
+				tmp += string(bs1[i])
+			} else if tmp != "" {
+				b1 = strings.Replace(b1, tmp, "", -1)
+				b2 = strings.Replace(b2, tmp, "", -1)
+				tmp = ""
+			}
+		}
+	}
+	if tmp != "" {
+		b1 = strings.Replace(b1, tmp, "", -1)
+		b2 = strings.Replace(b2, tmp, "", -1)
+	}
+	if b1 == b2 {
+		res = 1 //相等
+	} else if b1 == "" || b2 == "" {
+		res = 2 //包含
+	} else {
+		res = 3 //不相等
+	}
+	return
+}

+ 510 - 0
fullproject/src_v1/merge_select.go

@@ -0,0 +1,510 @@
+package main
+
+//根据字符特征打分
+//3为最高分,pj为评级 A AD A  AA AA AB
+func Select(compareStr string, info *Info, compareInfo *ProjectInfo) (res, pj int) {
+	//没有可对比的项目名称、或项目编号 //评级
+	if compareNoPass[compareStr] {
+
+	} else {
+		switch compareStr {
+		case "AAA":
+			res = 3
+			pj = 3
+		case "AAB":
+			res = 3
+			pj = 3
+		case "ABA":
+			res = 3
+			pj = 3
+		case "ABB":
+			if info.LenPN > 10 || info.PCBH > 8 || info.PTCBH > 8 {
+				res = 3
+			} else {
+				res = 2
+			}
+			pj = 3
+		case "BAA":
+			if info.PNBH > 10 || info.LenPC > 8 || info.LenPTC > 8 {
+				res = 3
+			} else {
+				res = 2
+			}
+			pj = 3
+		case "BAB":
+			if info.PNBH > 10 || info.LenPTC > 8 || info.PTCBH > 8 {
+				res = 3
+			} else {
+				res = 2
+			}
+			pj = 3
+		case "BBA":
+			if info.PNBH > 10 || info.PCBH > 8 || info.LenPC > 8 {
+				res = 3
+			} else {
+				res = 2
+			}
+			pj = 3
+		case "BBB":
+			v := 0
+			if info.PNBH > 10 {
+				v++
+			}
+			if info.PCBH > 8 {
+				v++
+			}
+			if info.PTCBH > 8 {
+				v++
+			}
+			if v > 1 {
+				res = 3
+			} else {
+				res = 2
+			}
+			pj = 2
+		case "AAD":
+			if info.LenPC > 8 || info.LenPN > 12 {
+				res = 3
+			} else {
+				res = 2
+			}
+			pj = 3
+		case "ABD":
+			if info.LenPN > 10 && info.PCBH > 8 {
+				res = 3
+				pj = 2
+			} else if info.LenPN > 10 || info.PCBH > 8 {
+				res = 2
+				pj = 3
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "BAD":
+			if info.LenPC > 13 || (info.PNBH > 10 && info.LenPC > 8) {
+				res = 3
+				pj = 3
+			} else if info.PNBH > 10 || info.LenPC > 8 {
+				res = 2
+				pj = 3
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "BBD":
+			if info.PNBH > 12 && info.PCBH > 10 {
+				res = 3
+				pj = 1
+			} else if info.PNBH > 10 && info.PCBH > 8 {
+				res = 2
+				pj = 3
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "ADA":
+			if info.LenPN > 12 || (info.LenPTC > 8 && !StrOrNum2.MatchString(info.PTC)) {
+				res = 3
+			} else {
+				res = 2
+			}
+			pj = 2
+		case "ADB":
+			if info.LenPN > 10 && info.PTCBH > 8 && !StrOrNum2.MatchString(info.PTC) {
+				res = 3
+				pj = 2
+			} else if info.LenPN > 10 || info.PTCBH > 8 {
+				res = 2
+				pj = 3
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "BDA":
+			if info.PNBH > 10 && info.LenPTC > 8 && !StrOrNum2.MatchString(info.PTC) {
+				res = 3
+				pj = 2
+			} else if info.PNBH > 10 || info.LenPTC > 8 {
+				res = 2
+				pj = 3
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "BDB":
+			if info.PNBH > 12 && info.PTCBH > 10 && !StrOrNum2.MatchString(info.PTC) {
+				res = 3
+			} else if info.PNBH > 10 && info.PTCBH > 8 {
+				res = 2
+			} else {
+				res = 1
+			}
+			pj = 2
+		case "ADD":
+			if info.LenPN > 18 {
+				res = 3
+				pj = 2
+			} else if info.LenPN > 10 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 2
+			}
+		case "BDD":
+			if info.PNBH > 10 {
+				res = 2
+			} else {
+				res = 1
+			}
+			pj = 1
+		case "DAA":
+			if info.LenPTC > 8 || info.LenPC > 8 {
+				res = 3
+				pj = 2
+			} else {
+				res = 2
+				pj = 3
+			}
+		case "DAB":
+			if info.LenPC > 8 && info.PTCBH > 8 {
+				res = 3
+				pj = 2
+			} else if info.LenPC > 8 || info.PTCBH > 8 {
+				res = 2
+				pj = 3
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "DAD":
+			if info.LenPC > 14 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 3
+				pj = 2
+			} else if info.LenPC > 8 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 2
+			}
+		case "DBA":
+			if info.PCBH > 8 && info.LenPC > 8 && !StrOrNum2.MatchString(info.PTC) {
+				res = 3
+				pj = 2
+			} else if info.PCBH > 8 || info.LenPC > 8 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "DBB":
+			if info.PCBH > 10 && info.PTCBH > 10 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 3
+				pj = 1
+			} else if info.PCBH > 8 && info.PTCBH > 8 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "DBD":
+			if info.PCBH > 12 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 2
+				pj = 1
+			} else {
+				res = 1
+				pj = 1
+			}
+		case "DDA":
+			if info.LenPTC > 14 && !StrOrNum2.MatchString(info.PTC) {
+				res = 3
+				pj = 1
+			} else if info.LenPTC > 8 {
+				res = 2
+				pj = 1
+			} else {
+				res = 1
+				pj = 2
+			}
+		case "DDB":
+			if info.PTCBH > 12 && !StrOrNum2.MatchString(info.PTC) {
+				res = 2
+			} else {
+				res = 1
+			}
+			pj = 1
+		case "ACA":
+			if info.LenPN > 10 && info.LenPTC > 8 && info.LenPC != len([]rune(compareInfo.ProjectCode)) && !StrOrNum2.MatchString(info.PTC) {
+				res = 3
+				pj = 2
+			} else if info.LenPN > 10 || info.LenPTC > 8 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "ACB":
+			if info.LenPN > 10 && info.PTCBH > 8 && info.LenPC != len([]rune(compareInfo.ProjectCode)) && !StrOrNum2.MatchString(info.PTC) {
+				res = 3
+				pj = 2
+			} else if info.LenPN > 10 || info.PTCBH > 8 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "BCA":
+			if (info.PNBH > 10 && info.LenPTC > 8) || info.LenPTC > 12 && info.LenPC != len([]rune(compareInfo.ProjectCode)) && !StrOrNum2.MatchString(info.PTC) {
+				res = 3
+				pj = 2
+			} else if info.PNBH > 10 || info.LenPTC > 8 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "BCB":
+			if info.PNBH > 12 && info.PTCBH > 12 && info.LenPC != len([]rune(compareInfo.ProjectCode)) && !StrOrNum2.MatchString(info.PTC) {
+				res = 3
+				pj = 1
+			} else if info.PNBH > 10 || info.PTCBH > 8 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 2
+			}
+		case "AAC":
+			if (info.LenPN > 10 && info.LenPC > 8) || info.LenPN > 14 || (info.LenPC > 10 && !StrOrNum2.MatchString(info.ProjectCode)) {
+				res = 3
+				pj = 3
+			} else {
+				res = 2
+				pj = 3
+			}
+		case "ABC":
+			if info.LenPN > 14 && info.PCBH > 10 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 3
+				pj = 2
+			} else if info.LenPN > 10 || info.PCBH > 8 {
+				res = 2
+				pj = 3
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "BAC":
+			if info.PNBH > 14 && info.LenPC > 8 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 3
+				pj = 2
+			} else if info.PNBH > 10 || info.LenPC > 8 {
+				res = 2
+				pj = 3
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "BBC":
+			if info.PNBH > 14 && info.PCBH > 10 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 3
+				pj = 1
+			} else if info.PNBH > 10 || info.PCBH > 8 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 2
+			}
+		case "ACC":
+			if info.LenPC != len([]rune(compareInfo.ProjectCode)) {
+				if info.LenPN > 16 {
+					res = 2
+					pj = 1
+				} else {
+					res = 1
+					pj = 2
+				}
+			}
+		case "ACD":
+			//项目编号不一致
+			if info.LenPC != len([]rune(compareInfo.ProjectCode)) {
+				if info.LenPN > 16 {
+
+					res = 2
+				} else {
+					res = 1
+
+				}
+				pj = 1
+			}
+
+		case "ADC":
+			if info.LenPN > 16 {
+				res = 2
+			} else {
+				res = 1
+			}
+			pj = 1
+		case "BCC":
+			//项目编号不一致
+			if info.LenPC != len([]rune(compareInfo.ProjectCode)) {
+				if info.PNBH > 12 {
+					res = 1
+				}
+				pj = 1
+			}
+		case "BCD":
+			//项目编号不一致
+			if info.LenPC != len([]rune(compareInfo.ProjectCode)) {
+				if info.PNBH > 8 {
+					res = 1
+				}
+				pj = 1
+			}
+		case "BDC":
+			if info.PNBH > 7 {
+				res = 1
+			}
+			pj = 1
+		case "CAA":
+			if info.LenPC > 12 || info.LenPTC > 12 {
+				res = 3
+				pj = 2
+			} else if info.LenPC > 8 || info.LenPTC > 8 {
+				res = 2
+				pj = 3
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "CAB":
+			if info.LenPC > 12 && info.PTCBH > 8 {
+				res = 3
+				pj = 2
+			} else if info.LenPC > 12 || info.PTCBH > 8 {
+				res = 2
+				pj = 3
+			} else {
+				res = 1
+				pj = 3
+			}
+		case "CAC":
+			if info.LenPC > 9 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 1
+			}
+		case "CAD":
+			if info.LenPC > 9 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 2
+			} else {
+				res = 1
+			}
+			pj = 1
+		case "CBA":
+			if info.LenPTC > 14 && info.PCBH > 12 {
+				res = 3
+				pj = 2
+			} else if info.LenPTC > 12 || info.PCBH > 10 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 2
+			}
+		case "CBB":
+			if info.PCBH > 13 && info.PTCBH > 13 {
+				res = 3
+				pj = 1
+			} else if info.PCBH > 9 || info.PTCBH > 9 {
+				res = 2
+				pj = 2
+			} else {
+				res = 1
+				pj = 2
+			}
+		case "CBC":
+			if info.PCBH > 14 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 2
+			} else if info.PCBH > 5 {
+				res = 1
+			}
+			pj = 1
+		case "CBD":
+			if info.PCBH > 14 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 2
+			} else if info.PCBH > 5 {
+				res = 1
+			}
+			pj = 1
+		case "CCA":
+			if info.LenPC != len([]rune(compareInfo.ProjectCode)) {
+				if info.LenPTC > 12 && !StrOrNum2.MatchString(info.PTC) {
+					res = 2
+				} else if info.LenPTC > 5 {
+					res = 1
+				}
+				pj = 1
+			}
+		case "CCB":
+			if info.LenPC != len([]rune(compareInfo.ProjectCode)) {
+				if info.PTCBH > 10 && !StrOrNum2.MatchString(info.PTC) {
+					res = 1
+				}
+				pj = 1
+			}
+		case "CDA":
+			if info.LenPTC > 12 && !StrOrNum2.MatchString(info.PTC) {
+				res = 2
+			} else {
+				res = 1
+			}
+			pj = 1
+		case "CDB":
+			if info.PTCBH > 10 && !StrOrNum2.MatchString(info.PTC) {
+				res = 1
+				pj = 1
+			}
+		case "DAC":
+			if info.LenPC > 13 && !StrOrNum2.MatchString(info.ProjectCode) {
+				res = 3
+			} else if info.LenPC > 8 {
+				res = 2
+			} else {
+				res = 1
+			}
+			pj = 1
+		case "DBC":
+			if info.PCBH > 8 {
+				res = 1
+			}
+			pj = 1
+		case "DCA":
+			if info.LenPC != len([]rune(compareInfo.ProjectCode)) {
+				if info.LenPTC > 10 {
+					res = 2
+				} else {
+					res = 1
+				}
+				pj = 1
+			}
+		case "DCB":
+			if info.LenPC != len([]rune(compareInfo.ProjectCode)) {
+				if info.PTCBH > 8 && !StrOrNum2.MatchString(info.PTC) {
+					res = 1
+				}
+				pj = 1
+			}
+		}
+
+	}
+	return
+}

+ 194 - 0
fullproject/src_v1/mgotool.go

@@ -0,0 +1,194 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	Hints  interface{}
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Hint(hint interface{}) *MgoSess {
+	ms.Hints = hint
+	return ms
+}
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(300)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.Hints != nil {
+		find.SetHint(ms.Hints)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return false
+	}
+	return true
+}

+ 773 - 0
fullproject/src_v1/project.go

@@ -0,0 +1,773 @@
+package main
+
+import (
+	"log"
+	//	"log"
+	"math"
+	qu "qfw/util"
+	"sort"
+	"strings"
+	"time"
+
+	//"gopkg.in/mgo.v2/bson"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+/**
+项目合并,对比,计算,合并,生成项目
+**/
+
+//从对应map中获取对比的项目id
+func (p *ProjectTask) getCompareIds(pn, pc, ptc, pb string) (bpn, bpc, bptc, bpb int, res []*Key, idArr []string, IDArr []*ID) {
+	p.findLock.Lock()
+	defer p.findLock.Unlock()
+	//	p.ConCurrentLock(n1, n2, n3, n4)
+	//	defer p.ConCurrentUnLock(n1, n2, n3, n4)
+	p.wg.Add(1)
+	//查找到id数组
+	res = []*Key{}
+	//是否查找到,并标识位置。-1代表值为空或已经存在。
+	bpn, bpc, bptc, bpb = -1, -1, -1, -1
+	if pn != "" {
+		ids := p.mapPn[pn]
+		if ids == nil {
+			ids = &Key{Arr: []string{}}
+			p.mapPn[pn] = ids
+			bpn = 0
+		}
+		ids.Lock.Lock()
+		res = append(res, ids)
+	}
+
+	if pc != "" {
+		ids := p.mapPc[pc]
+		if ids == nil {
+			ids = &Key{Arr: []string{}}
+			p.mapPc[pc] = ids
+			bpc = len(res)
+		}
+		ids.Lock.Lock()
+		res = append(res, ids)
+	}
+
+	if ptc != "" {
+		ids := p.mapPc[ptc]
+		if ids == nil {
+			ids = &Key{Arr: []string{}}
+			p.mapPc[ptc] = ids
+			bptc = len(res)
+		}
+		ids.Lock.Lock()
+		res = append(res, ids)
+	}
+
+	if pb != "" {
+		ids := p.mapPb[pb]
+		if ids == nil {
+			ids = &Key{Arr: []string{}}
+			p.mapPb[pb] = ids
+			bpb = len(res)
+		}
+		ids.Lock.Lock()
+		res = append(res, ids)
+	}
+	repeatId := map[string]bool{}
+	idArr = []string{} //项目id
+	IDArr = []*ID{}    //项目信息
+	for _, m := range res {
+		for _, id := range m.Arr {
+			if !repeatId[id] {
+				repeatId[id] = true
+				//_, _ = strconv.ParseInt(id[0:8], 16, 64)
+				p.AllIdsMapLock.Lock()
+				Id := p.AllIdsMap[id]
+				p.AllIdsMapLock.Unlock()
+				if Id != nil {
+					Id.Lock.Lock()
+					idArr = append(idArr, id)
+					IDArr = append(IDArr, Id)
+				}
+			}
+		}
+	}
+	return
+}
+
+func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{}) {
+	//只有或没有采购单位的无法合并
+	//bpn, bpc, bptc, bpb 是否查找到,并标识位置。-1代表未查找到。
+	//pids 是项目id数组集合
+	//IDArr,是单个项目ID对象集合
+	bpn, bpc, bptc, bpb, pids, _, IDArr := p.getCompareIds(info.ProjectName, info.ProjectCode, info.PTC, info.Buyer) //, info.LenPN, info.LenPC, info.LenPTC, len([]rune(info.Buyer)))
+	defer p.wg.Done()
+	//map--k为pn,ptn,pc,ptc,buyer值 v为Id数组和lock
+
+	for _, m := range pids {
+		defer m.Lock.Unlock()
+	}
+	for _, id := range IDArr {
+		defer id.Lock.Unlock()
+	}
+
+	bFindProject := false
+	findPid := ""
+	//获取完id,进行计算
+	//定义两组
+	comRes1 := []*ProjectInfo{} //优先级最高的对比结果数组
+	comRes2 := []*ProjectInfo{} //优化级其次
+	comRes3 := []*ProjectInfo{}
+	for _, v := range IDArr {
+		comStr := ""
+		compareProject := v.P
+		compareProject.score = 0
+		//问题出地LastTime!!!!!
+		diffTime := int64(math.Abs(float64(info.Publishtime - compareProject.LastTime)))
+		if diffTime <= p.validTime {
+			//"A 相等 	B 被包含 	C 不相等	 	D不存在  E被包含
+			info.PNBH = 0
+			info.PCBH = 0
+			info.PTCBH = 0
+			compareStr, score := comparePNC(info, compareProject)
+
+			resVal, pjVal := Select(compareStr, info, compareProject)
+			//---------------------------------------
+			//log.Println(resVal, pjVal, compareProject)
+			if resVal > 0 {
+				compareBuyer, compareCity, compareTime, compareAgency, compareBudget, compareBidmount, score2 := p.compareBCTABB(info, compareProject, diffTime, score)
+
+				//项目名称、项目编号、标题项目编号、采购单位、省、市、发布时间、代理机构
+				comStr = compareStr + compareBuyer + compareCity + compareTime + compareAgency + compareBudget + compareBidmount
+				compareProject.comStr = comStr
+				compareProject.pjVal = pjVal
+				compareProject.resVal = resVal
+				//log.Println(compareProject.comStr)
+				eqV := 0
+				switch resVal {
+				case 3:
+					if pjVal == 3 && comStr[3:] != "CCCDCCC" {
+						eqV = 1
+					} else if compareBuyer < "C" {
+						if pjVal > 1 {
+							eqV = 1
+						} else { //if (compareCity[1:1] != "C" || compareTime != "D") && score2 > 0
+							eqV = 2
+						}
+					} else if compareBuyer == "D" {
+						if pjVal > 1 && (compareCity[1:1] != "C" || score2 > 0) {
+							eqV = 2
+						} else if compareCity[1:1] != "C" && compareTime == "A" && score2 > 0 {
+							eqV = 3
+						}
+					} else {
+						if pjVal == 3 && (score2 > 0 || compareCity[1:1] != "C") {
+							eqV = 2
+						} else if pjVal == 2 && compareCity[1:1] != "C" && compareTime == "A" && score2 > 0 {
+							eqV = 3
+						} else if compareCity == "AA" && compareTime == "A" && score2 > 0 {
+							eqV = 3
+						}
+					}
+				case 2:
+					if compareBuyer < "C" {
+						if pjVal > 1 {
+							eqV = 2
+						} else if compareCity[1:1] != "C" && compareTime == "A" || score2 > 0 {
+							eqV = 3
+						}
+					} else if compareBuyer == "D" {
+						if pjVal > 1 && (score2 > 0 || compareCity[1:1] != "C") {
+							eqV = 2
+						} else if compareCity[1:1] != "C" && compareTime == "A" && score2 > 0 {
+							eqV = 3
+						}
+
+					} else {
+						if pjVal > 1 && compareTime == "A" && (score2 > 0 || compareCity[1:1] != "C") {
+							eqV = 2
+						} else if compareCity[1:1] != "C" && compareTime == "A" && (compareAgency == "A" || score2 > 0) && (compareBudget == "A" || compareBidmount == "A") {
+							eqV = 3
+						}
+					}
+				case 1:
+					if compareBuyer < "C" {
+						if pjVal > 1 && (score2 > 0 || compareCity[1:1] != "C") {
+							eqV = 2
+						} else if compareCity[1:1] != "C" && compareTime == "A" && (compareAgency == "A" || score2 > 0) && (compareBudget == "A" || compareBidmount == "A") {
+							eqV = 3
+						}
+					} else if compareBuyer == "D" {
+						if pjVal > 1 && compareTime == "A" && (score2 > 0 || compareCity[1:1] != "C") {
+							eqV = 2
+						} else if compareCity[1:1] != "C" && compareTime == "A" && (compareAgency == "A" || score2 > 0) && (compareBudget == "A" || compareBidmount == "A") {
+							eqV = 3
+						}
+					} else {
+						if pjVal > 1 && compareTime == "A" && score2 > 0 && (compareBudget == "A" || compareBidmount == "A") && compareCity[1:1] != "C" {
+							eqV = 3
+						}
+					}
+				}
+				if eqV == 1 {
+					comRes1 = append(comRes1, compareProject)
+				} else if eqV == 2 {
+					comRes2 = append(comRes2, compareProject)
+				} else if eqV == 3 {
+					comRes3 = append(comRes3, compareProject)
+				}
+				//				else if resVal == 3 || pjVal > 1 {
+				//					log.Println("===", resVal, pjVal, comStr, info.ProjectCode, compareProject.ProjectCode,
+				//						info.ProjectName, compareProject.ProjectName, info.Buyer, compareProject.Buyer, info.Id, compareProject.Id.Hex())
+				//				}
+			}
+		}
+	}
+	//--------------------------------对比完成-----------------------
+	//更新数组、更新项目
+	for kv, resN := range [][]*ProjectInfo{comRes1, comRes2, comRes3} {
+		if len(resN) > 0 {
+			if len(resN) > 1 {
+				sort.Slice(resN, func(i, j int) bool {
+					return resN[i].score > resN[j].score
+				})
+			}
+
+			bFindProject = true
+			findPid = resN[0].Id.Hex()
+			for k2, bv := range []int{bpn, bpc, bptc, bpb} {
+				if bv > -1 {
+					pids[bv].Arr = append(pids[bv].Arr, findPid)
+					if k2 == 0 {
+						if resN[0].ProjectName == "" {
+							resN[0].ProjectName = info.ProjectName
+						} else {
+							if resN[0].MPN == nil {
+								resN[0].MPN = []string{info.ProjectName}
+							} else {
+								resN[0].MPN = append(resN[0].MPN, info.ProjectName)
+							}
+						}
+
+					} else if k2 < 3 {
+						if resN[0].ProjectCode == "" {
+							resN[0].ProjectCode = qu.If(k2 == 1, info.ProjectCode, info.PTC).(string)
+						} else {
+							if resN[0].MPC == nil {
+								resN[0].MPC = []string{qu.If(k2 == 1, info.ProjectCode, info.PTC).(string)}
+							} else {
+								resN[0].MPC = append(resN[0].MPC, qu.If(k2 == 1, info.ProjectCode, info.PTC).(string))
+							}
+						}
+
+					} else {
+						if resN[0].Buyer == "" {
+							resN[0].Buyer = info.Buyer
+						}
+					}
+				}
+			}
+			p.UpdateProject(tmp, info, resN[0], kv+1, resN[0].comStr)
+			break
+		}
+	}
+
+	if !bFindProject {
+		//没有找到
+		id, p1 := p.NewProject(tmp, info)
+		p.AllIdsMapLock.Lock()
+		p.AllIdsMap[id] = &ID{Id: id, P: p1}
+		p.AllIdsMapLock.Unlock()
+		for _, m := range pids {
+			m.Arr = append(m.Arr, id)
+		}
+	}
+
+}
+
+func (p *ProjectTask) compareBCTABB(info *Info, cp *ProjectInfo, diffTime int64, score int) (compareBuyer, compareCity, compareTime, compareAgency, compareBudget, compareBidmount string, score2 int) {
+	compareBuyer = "D"
+	if len([]rune(info.Buyer)) > 3 && len([]rune(cp.Buyer)) > 3 {
+		v := CheckContain(info.Buyer, cp.Buyer)
+		if v == 1 {
+			compareBuyer = "A"
+			score += 3
+		} else {
+			v1 := CosineSimilar(info.Buyer, cp.Buyer)
+			if v == 2 || v1 > 0.8 {
+				compareBuyer = "B"
+				score += 1
+			} else {
+				compareBuyer = "C"
+			}
+		}
+	}
+	//---------------------------------------
+
+	compareCity = ""
+	if info.Area != "全国" && info.Area != "" && info.Area == cp.Area {
+		compareCity += "A"
+		score += 2
+	} else if info.Area == "全国" || cp.Area == "全国" {
+		compareCity += "B"
+		score += 1
+	} else {
+		compareCity += "C"
+	}
+	if compareCity != "C" {
+		if info.City != "" && info.City == cp.City {
+			compareCity += "A"
+			score += 2
+		} else {
+			if info.Area == "全国" || cp.Area == "全国" {
+				compareCity += "B"
+			} else if info.City == compareCity {
+				compareCity += "B"
+			} else {
+				compareCity += "C"
+			}
+		}
+	} else {
+		compareCity += "C"
+	}
+	score2 = 0
+	if compareCity == "AA" {
+		if info.District != "" && info.District == cp.District {
+			score2 = 1
+		}
+	}
+
+	compareTime = "D"
+	if diffTime < 45*86400 {
+		compareTime = "A"
+		score += 2
+	} else if diffTime < 90*86400 {
+		compareTime = "B"
+		score += 1
+	}
+
+	compareAgency = "D"
+	if info.Agency != "" {
+		if info.Agency == cp.Agency {
+			compareAgency = "A"
+			score += 2
+			score2 += 1
+		} else if cp.Agency != "" {
+			if strings.Contains(info.Agency, cp.Agency) || strings.Contains(cp.Agency, info.Agency) {
+				compareAgency = "B"
+				score += 1
+				score2 += 1
+			} else {
+				compareAgency = "C"
+			}
+		}
+	}
+	compareBudget = "C"
+	if info.Budget > 0 && (info.Budget == cp.Budget || (cp.Bidamount > 0 && info.Budget > cp.Bidamount && (info.Budget-cp.Bidamount) < (0.15*info.Budget))) {
+		compareBudget = "A"
+		score += 1
+		score2 += 1
+	}
+	//	else if info.Budget == 0 && cp.Budget == 0 {
+	//		compareBudget = "B"
+	//	}
+	compareBidmount = "C"
+	if info.Bidamount > 0 && (info.Bidamount == cp.Bidamount || (cp.Budget > 0 && cp.Budget > info.Bidamount && (cp.Budget-info.Bidamount) < 0.15*cp.Budget)) {
+		compareBidmount = "A"
+		score += 1
+		score2 += 1
+	}
+	//	else if info.Bidamount == 0 && cp.Bidamount == 0 {
+	//		compareBidmount = "B"
+	//	}
+
+	cp.score = score
+	return
+}
+
+var FIELDS = []string{
+	"area",
+	"city",
+	"district",
+	"projectname",
+	"projectcode",
+	"buyer",
+	"buyerclass",
+	"buyerperson",
+	"buyertel",
+	"winner",
+	//"budget",
+	//"bidamount",
+	//"bidstatus",
+	"agency",
+	"projectscope",
+
+	"topscopeclass",
+	"subscopeclass",
+	"winnerorder",
+	"package",
+}
+
+var bidtype = map[string]string{
+	"招标": "招标",
+	"邀标": "邀标",
+	"询价": "询价",
+	"竞谈": "竞谈",
+	"单一": "单一",
+	"竞价": "竞价",
+}
+
+var bidstatus = map[string]string{
+	"中标": "中标",
+	"成交": "成交",
+	"废标": "废标",
+	"流标": "流标",
+	"合同": "合同",
+}
+
+//招标时间zbtime、中标时间jgtime、项目状态bidstatus、招标类型bidtype、最后发布时间lasttime、首次发布时间firsttime
+
+func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (string, *ProjectInfo) {
+	pId := primitive.NewObjectID() //NewObjectId()
+	p1 := p.NewCachePinfo(pId, thisinfo)
+	set := map[string]interface{}{}
+	set["_id"] = pId
+	for _, f := range FIELDS {
+		if tmp[f] != nil {
+			set[f] = tmp[f]
+		}
+	}
+	if tmp["budget"] != nil {
+		set["budget"] = thisinfo.Budget
+	}
+	if tmp["bidamount"] != nil {
+		set["bidamount"] = thisinfo.Bidamount
+	}
+	bidopentime := qu.Int64All(tmp["bidopentime"])
+	if bidopentime > 0 {
+		set["bidopentime"] = bidopentime
+	}
+	if thisinfo.ProjectName != "" {
+		set["s_projectname"] = tmp["projectname"] //兼容老版本
+	}
+	now := time.Now().Unix()
+	set["createtime"] = now
+	set["sourceinfoid"] = thisinfo.Id
+	set["sourceinfourl"] = tmp["href"]
+	set["firsttime"] = tmp["publishtime"]
+	set["lasttime"] = tmp["publishtime"]
+	set["pici"] = p.pici
+	set["ids"] = []string{thisinfo.Id}
+	if thisinfo.TopType == "招标" {
+		set["zbtime"] = tmp["publishtime"]
+	} else if thisinfo.TopType == "结果" {
+		set["jgtime"] = tmp["publishtime"]
+	}
+	//招标类型
+	bt := bidtype[thisinfo.SubType]
+	if bt == "" {
+		bt = "招标"
+	}
+	set["bidtype"] = bt
+	bs, _ := tmp["bidstatus"].(string)
+	if bidstatus[bs] != "" {
+		set["bidstatus"] = bs
+	}
+	if set["bidstatus"] == nil && thisinfo.TopType == "结果" {
+		set["bidstatus"] = thisinfo.SubType
+	}
+	if len(thisinfo.Subscopeclass) > 0 {
+		s_subscopeclass := strings.Join(thisinfo.Subscopeclass, ",")
+		set["s_subscopeclass"] = s_subscopeclass
+	}
+	if len(thisinfo.Winners) > 0 {
+		set["s_winner"] = strings.Join(thisinfo.Winners, ",")
+		p1.Winners = thisinfo.Winners
+	}
+	if thisinfo.HasPackage {
+		set["multipackage"] = 1
+	} else {
+		set["multipackage"] = 0
+	}
+	push := p.PushListInfo(tmp, thisinfo.Id)
+	set["list"] = []bson.M{
+		push,
+	}
+	//p.savePool <- set
+	p.updatePool <- []map[string]interface{}{
+		map[string]interface{}{
+			"_id": pId,
+		},
+		map[string]interface{}{
+			"$set": set,
+		},
+	}
+	return pId.Hex(), &p1
+}
+
+var INFOFIELDS = []string{
+	"projectname",
+	"projectcode",
+	"title",
+	"href",
+	"publishtime",
+	"comeintime",
+	"bidopentime",
+	"toptype",
+	"subtype",
+	"buyer",
+	"buyerclass",
+	"agency",
+	"winner",
+	"budget",
+	"bidamount",
+	"topscopeclass",
+	"subscopclass",
+	"infoformat",
+	"buyerperson",
+	"buyertel",
+}
+
+//项目中list的信息
+func (p *ProjectTask) PushListInfo(tmp map[string]interface{}, infoid string) bson.M {
+	res := bson.M{
+		"infoid": infoid,
+	}
+	for _, k := range INFOFIELDS {
+		if tmp[k] != nil {
+			res[k] = tmp[k]
+		}
+	}
+	return res
+}
+
+//生成存放在内存中的对象
+func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info) ProjectInfo {
+	p1 := ProjectInfo{
+		Id:            id,
+		Ids:           []string{thisinfo.Id},
+		ProjectName:   thisinfo.ProjectName,
+		ProjectCode:   thisinfo.ProjectCode,
+		Buyer:         thisinfo.Buyer,
+		Buyerclass:    thisinfo.Buyerclass,
+		Buyerperson:   thisinfo.Buyerperson,
+		Buyertel:      thisinfo.Buyertel,
+		Topscopeclass: thisinfo.Topscopeclass,
+		Subscopeclass: thisinfo.Subscopeclass,
+		Agency:        thisinfo.Agency,
+		Area:          thisinfo.Area,
+		City:          thisinfo.City,
+		District:      thisinfo.District,
+		MPN:           []string{},
+		MPC:           []string{},
+		FirstTime:     thisinfo.Publishtime,
+		LastTime:      thisinfo.Publishtime,
+		Budget:        thisinfo.Budget,
+		Package:       thisinfo.Package,
+		Bidamount:     thisinfo.Bidamount,
+	}
+	if thisinfo.LenPTC > 5 {
+		p1.MPC = append(p1.MPC, thisinfo.PTC)
+	}
+	return p1
+}
+
+//更新项目
+func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info, pInfo *ProjectInfo, weight int, comStr string) {
+	if p.currentType != "ql" {
+		if BinarySearch(pInfo.Ids, thisinfo.Id) > -1 {
+			log.Println("repeat", thisinfo.Id)
+			return
+		}
+	}
+	set := map[string]interface{}{}
+	pInfo.Ids = append(pInfo.Ids, thisinfo.Id)
+
+	//1--firsttime
+	if thisinfo.Publishtime < pInfo.FirstTime && thisinfo.Publishtime > 0 {
+		pInfo.FirstTime = thisinfo.Publishtime
+		set["firsttime"] = thisinfo.Publishtime
+		if thisinfo.TopType == "招标" {
+			set["zbtime"] = tmp["publishtime"]
+		}
+	}
+	//2--lasttime
+	if thisinfo.Publishtime > pInfo.LastTime {
+		pInfo.LastTime = thisinfo.Publishtime
+		set["lasttime"] = thisinfo.Publishtime
+		bt := bidtype[thisinfo.SubType]
+		if bt != "" {
+			set["bidtype"] = bt
+		}
+		if thisinfo.TopType == "结果" {
+			set["bidstatus"] = thisinfo.SubType
+			set["jgtime"] = tmp["publishtime"]
+		}
+	}
+	//3\4\5--省、市、县
+	if thisinfo.Area != "全国" {
+		//xt := true
+		if pInfo.Area == "全国" {
+			pInfo.Area = thisinfo.Area
+			set["area"] = thisinfo.Area
+		} else if pInfo.Area != thisinfo.Area {
+			//xt = false
+		}
+		if pInfo.City == "" && thisinfo.City != "" {
+			pInfo.City = thisinfo.City
+			set["city"] = thisinfo.City
+		} else if pInfo.City != thisinfo.City {
+			//xt = false
+		}
+		if thisinfo.District != "" && pInfo.District == "" {
+			pInfo.District = thisinfo.District
+			set["district"] = thisinfo.District
+		}
+		//省市县有不相同的
+		//		if !xt {
+		//			log.Println(pInfo.Area, pInfo.City, thisinfo.Area, thisinfo.District)
+		//		}
+	}
+	//6--项目名称
+	if (thisinfo.ProjectName != "" && pInfo.ProjectName == "") || (len([]rune(pInfo.ProjectName)) < 6 && thisinfo.LenPN > 6) {
+		pInfo.ProjectName = thisinfo.ProjectName
+		set["projectname"] = thisinfo.ProjectName
+	}
+	//7--项目编号
+	if (pInfo.ProjectCode == "" && thisinfo.ProjectCode != "") || (len([]rune(pInfo.ProjectCode)) < 6 && len([]rune(thisinfo.ProjectCode)) > 6) {
+		pInfo.ProjectCode = thisinfo.ProjectCode
+		set["projectcode"] = thisinfo.ProjectCode
+	}
+	//7--采购单位
+	if (pInfo.Buyer == "" && thisinfo.Buyer != "") || (len([]rune(pInfo.Buyer)) < 5 && len([]rune(thisinfo.Buyer)) > 5) {
+		pInfo.Buyer = thisinfo.Buyer
+		set["buyer"] = thisinfo.Buyer
+	}
+	//8--代理机构
+	if (pInfo.Agency == "" && thisinfo.Agency != "") || (len([]rune(pInfo.Agency)) < 5 && len([]rune(thisinfo.Agency)) > 5) {
+		pInfo.Agency = thisinfo.Agency
+		set["agency"] = thisinfo.Agency
+	}
+	//9--采购单位联系人
+	if thisinfo.Buyerperson != "" && strings.Index(pInfo.Buyerperson, thisinfo.Buyerperson) < 0 {
+		pInfo.Buyerperson = thisinfo.Buyerperson
+		set["buyerperson"] = pInfo.Buyerperson
+	}
+	//10--采购单位電話
+	if thisinfo.Buyertel != "" && strings.Index(pInfo.Buyertel, thisinfo.Buyertel) < 0 {
+		pInfo.Buyertel = thisinfo.Buyertel
+		set["buyertel"] = pInfo.Buyertel
+	}
+
+	if thisinfo.Buyerclass != "" && pInfo.Buyerclass == "" {
+		pInfo.Buyerclass = thisinfo.Buyerclass
+		set["buyerclass"] = pInfo.Buyerclass
+	}
+	if thisinfo.Bidopentime > pInfo.Bidopentime {
+		pInfo.Bidopentime = thisinfo.Bidopentime
+		set["bidopentime"] = pInfo.Bidopentime
+	}
+	if thisinfo.Bidamount > 0 && pInfo.Bidamount < 1 {
+		pInfo.Bidamount = thisinfo.Bidamount
+		set["bidamount"] = pInfo.Bidamount
+	}
+
+	if thisinfo.Budget > 0 && pInfo.Budget < 1 { //多包的会有问题,没有进行合计。
+		pInfo.Budget = thisinfo.Budget
+		set["budget"] = pInfo.Budget
+	}
+
+	if len(thisinfo.Topscopeclass) > 0 {
+		sort.Strings(pInfo.Topscopeclass)
+		for _, k := range thisinfo.Topscopeclass {
+			if BinarySearch(pInfo.Topscopeclass, k) == -1 {
+				pInfo.Topscopeclass = append(pInfo.Topscopeclass, k)
+				sort.Strings(pInfo.Topscopeclass)
+			}
+		}
+		set["topscopeclass"] = pInfo.Topscopeclass
+	}
+
+	if len(thisinfo.Subscopeclass) > 0 {
+		sort.Strings(pInfo.Subscopeclass)
+		for _, k := range thisinfo.Subscopeclass {
+			if BinarySearch(pInfo.Subscopeclass, k) == -1 {
+				pInfo.Subscopeclass = append(pInfo.Subscopeclass, k)
+				sort.Strings(pInfo.Subscopeclass)
+			}
+		}
+		set["subscopeclass"] = pInfo.Subscopeclass
+		set["s_subscopeclass"] = strings.Join(pInfo.Subscopeclass, ",")
+	}
+	//winner
+	if len(thisinfo.Winners) > 0 {
+		sort.Strings(pInfo.Winners)
+		for _, k := range thisinfo.Winners {
+			if BinarySearch(pInfo.Winners, k) == -1 {
+				pInfo.Winners = append(pInfo.Winners, k)
+				sort.Strings(pInfo.Winners)
+			}
+		}
+		//set["winners"] = pInfo.Winners
+		set["s_winner"] = strings.Join(pInfo.Winners, ",")
+	}
+
+	if thisinfo.HasPackage { //多包处理
+		p2, _ := tmp["package"].(map[string]interface{})
+		if p2 != nil {
+			if pInfo.Package != nil {
+				for pk2, pv2 := range p2 {
+					if pInfo.Package[pk2] != nil { //合并
+						item1, _ := pInfo.Package[pk2].(map[string]interface{})
+						item2, _ := pv2.(map[string]interface{})
+						if item1 != nil && item2 != nil { //原始项
+							for ik1, iv1 := range item2 {
+								if item1[ik1] == nil {
+									item1[ik1] = iv1
+								}
+							}
+						}
+						pInfo.Package[pk2] = item1
+					} else {
+						pInfo.Package[pk2] = pv2
+					}
+				}
+			} else {
+				pInfo.Package = p2
+			}
+		}
+		set["package"] = pInfo.Package
+	}
+
+	set["mpn"] = pInfo.MPN
+	set["mpc"] = pInfo.MPC
+	set["pici"] = p.pici
+
+	if thisinfo.HasPackage {
+		set["multipackage"] = 1
+	} else {
+		set["multipackage"] = 0
+	}
+
+	update := map[string]interface{}{}
+	if len(set) > 0 {
+		update["$set"] = set
+	}
+	//保留原数据吧
+	push := p.PushListInfo(tmp, thisinfo.Id)
+	push["compareStr"] = comStr
+	push["resVal"] = pInfo.resVal
+	push["pjVal"] = pInfo.pjVal
+	update["$push"] = map[string]interface{}{
+		"list": push,
+		"ids":  thisinfo.Id,
+	}
+	if len(update) > 0 {
+		updateInfo := []map[string]interface{}{
+			map[string]interface{}{
+				"_id": pInfo.Id,
+			},
+			update,
+		}
+		p.updatePool <- updateInfo
+	}
+}

二進制
fullproject/src_v1/src_v1


+ 648 - 0
fullproject/src_v1/task.go

@@ -0,0 +1,648 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	mu "mfw/util"
+
+	"qfw/util"
+	"regexp"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/robfig/cron"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+/**
+任务入口
+全量、增量合并
+更新、插入,内存清理
+转换成info对象
+**/
+
+//项目合并对象
+type ProjectTask struct {
+	InitMinTime int64 //最小时间,小于0的处理一次
+	name        string
+	thread      int //线程数
+	//查找锁
+	findLock sync.Mutex
+	wg       sync.WaitGroup
+	//map锁
+	AllIdsMapLock sync.Mutex
+	//对应的id
+	AllIdsMap map[string]*ID
+	//采购单位、项目名称、项目编号
+	mapPb, mapPn, mapPc map[string]*Key
+	//	mapPbLock, mapPnLock, mapPcLock sync.Mutex
+	//更新或新增通道
+	updatePool           chan []map[string]interface{}
+	savePool             chan map[string]interface{}
+	saveSign, updateSign chan bool
+	//表名
+	coll string
+	//当前状态是全量还是增量
+	currentType string //当前是跑全量还是跑增量
+	//
+	clearContimes int
+	//当前时间
+	currentTime int64
+	//保存长度
+	saveSize  int
+	pici      int64
+	validTime int64
+	//	LockPool     chan *sync.Mutex
+	//	LockPoolLock sync.Mutex
+	//	m1, m23, m4  map[int]int
+	//	l1, l23, l4  map[int]*sync.Mutex
+}
+
+func NewPT() *ProjectTask {
+	p := &ProjectTask{
+		InitMinTime: int64(1325347200),
+		name:        "全/增量对象",
+		thread:      4,
+		updatePool:  make(chan []map[string]interface{}, 5000),
+		//savePool:    make(chan map[string]interface{}, 2000),
+		wg:         sync.WaitGroup{},
+		AllIdsMap:  make(map[string]*ID, 5000000),
+		mapPb:      make(map[string]*Key, 1500000),
+		mapPn:      make(map[string]*Key, 5000000),
+		mapPc:      make(map[string]*Key, 5000000),
+		saveSize:   400,
+		saveSign:   make(chan bool, 1),
+		updateSign: make(chan bool, 1),
+		coll:       ProjectColl,
+		validTime:  int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
+	}
+	return p
+}
+
+var P_QL *ProjectTask
+
+//初始化全量合并对象
+func init() {
+	P_QL = NewPT()
+	go P_QL.updateAllQueue()
+	go P_QL.clearMem()
+}
+
+func (p *ProjectTask) updateAllQueue() {
+	arru := make([][]map[string]interface{}, p.saveSize)
+	indexu := 0
+	sp := make(chan bool, 5)
+	for {
+		select {
+		case v := <-p.updatePool:
+			arru[indexu] = v
+			indexu++
+			if indexu == p.saveSize {
+				sp <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					MongoTool.UpSertBulk(p.coll, arru...)
+				}(arru)
+				arru = make([][]map[string]interface{}, p.saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				sp <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					MongoTool.UpSertBulk(p.coll, arru...)
+				}(arru[:indexu])
+				arru = make([][]map[string]interface{}, p.saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+//项目合并内存更新
+func (p *ProjectTask) clearMem() {
+	c := cron.New()
+	//在内存中保留最近6个月的信息
+	//跑全量时每4分钟跑一次,跑增量时400分钟跑一次
+	c.AddFunc("50 0/15 * * * *", func() {
+		if p.currentType == "ql" || p.clearContimes >= 60 {
+			//跳过的次数清零
+			p.clearContimes = 0
+			//信息进入查找对比全局锁
+			p.findLock.Lock()
+			//defer p.findLock.Unlock()
+			//合并进行的任务都完成
+			p.wg.Wait()
+			//遍历id
+			//所有内存中的项目信息
+			p.AllIdsMapLock.Lock()
+
+			//清除计数
+			clearNum := 0
+			for k, v := range p.AllIdsMap {
+				if p.currentTime-v.P.LastTime > p.validTime {
+					clearNum++
+					//删除id的map
+					delete(p.AllIdsMap, k)
+					//删除pb
+					if v.P.Buyer != "" {
+						ids := p.mapPb[v.P.Buyer]
+						if ids != nil {
+							ids.Lock.Lock()
+							ids.Arr = deleteSlice(ids.Arr, k)
+							if len(ids.Arr) == 0 {
+								delete(p.mapPb, v.P.Buyer)
+							}
+							ids.Lock.Unlock()
+						}
+					}
+					//删除mapPn
+					for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
+						if vn != "" {
+							ids := p.mapPn[vn]
+							if ids != nil {
+								ids.Lock.Lock()
+								ids.Arr = deleteSlice(ids.Arr, k)
+								if len(ids.Arr) == 0 {
+									delete(p.mapPn, vn)
+								}
+								ids.Lock.Unlock()
+							}
+						}
+					}
+					//删除mapPc
+					for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
+						if vn != "" {
+							ids := p.mapPc[vn]
+							if ids != nil {
+								ids.Lock.Lock()
+								ids.Arr = deleteSlice(ids.Arr, k)
+								if len(ids.Arr) == 0 {
+									delete(p.mapPc, vn)
+								}
+								ids.Lock.Unlock()
+							}
+						}
+					}
+					v = nil
+				}
+			}
+			p.AllIdsMapLock.Unlock()
+			p.findLock.Unlock()
+			log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb))
+		} else {
+			p.clearContimes++
+		}
+	})
+	c.Start()
+	select {}
+}
+
+//全量合并
+func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
+	defer util.Catch()
+	//1、检查pubilshtime索引
+	db, _ := udpInfo["db"].(string)
+	if db == "" {
+		db = MongoTool.DbName
+	}
+	coll, _ := udpInfo["coll"].(string)
+	if coll == "" {
+		coll = ExtractColl
+	}
+	thread := util.IntAllDef(udpInfo["thread"], 4)
+	if thread > 0 {
+		p.thread = thread
+	}
+	q, _ := udpInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{}
+		lteid, _ := udpInfo["lteid"].(string)
+		var idmap map[string]interface{}
+		if len(lteid) > 15 {
+			idmap = map[string]interface{}{
+				"$lte": StringTOBsonId(lteid),
+			}
+		}
+		gtid, _ := udpInfo["gtid"].(string)
+		if len(gtid) > 15 {
+			if idmap == nil {
+				idmap = map[string]interface{}{}
+			}
+			idmap["$gt"] = StringTOBsonId(gtid)
+		}
+		if idmap != nil {
+			q["_id"] = idmap
+		}
+	}
+	//生成查询语句执行
+	log.Println("查询语句:", q)
+	p.enter(db, coll, q)
+
+}
+
+//增量合并
+func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
+	defer util.Catch()
+	//1、检查pubilshtime索引
+	db, _ := udpInfo["db"].(string)
+	if db == "" {
+		db = MongoTool.DbName
+	}
+	coll, _ := udpInfo["coll"].(string)
+	if coll == "" {
+		coll = ExtractColl
+	}
+	thread := util.IntAllDef(udpInfo["thread"], 4)
+	if thread > 0 {
+		p.thread = thread
+	}
+	//开始id和结束id
+	q, _ := udpInfo["query"].(map[string]interface{})
+	gtid := udpInfo["gtid"].(string)
+	lteid := udpInfo["lteid"].(string)
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  StringTOBsonId(gtid),
+				"$lte": StringTOBsonId(lteid),
+			},
+		}
+	}
+	if q != nil {
+		//生成查询语句执行
+		p.enter(db, coll, q)
+	}
+	if udpInfo["stop"] == nil {
+		nextNode(udpInfo, p.pici)
+	}
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+//通知下个节点nextNode
+func nextNode(mapInfo map[string]interface{}, pici int64) {
+	mapInfo["stype"] = "project"
+	mapInfo["query"] = map[string]interface{}{
+		"pici": pici,
+	}
+	for n, to := range toaddr {
+		key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
+		mapInfo["key"] = key
+		datas, _ := json.Marshal(mapInfo)
+		node := &udpNode{datas, to, time.Now().Unix(), 0}
+		udptaskmap.Store(key, node)
+		udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
+	}
+}
+
+func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
+	defer util.Catch()
+	count, taskcount := 0, 0
+
+	pool := make(chan bool, p.thread)
+	log.Println("start project", q)
+	sess := MongoTool.GetMgoConn()
+	defer MongoTool.DestoryMongoConn(sess)
+
+	infoPool := make(chan map[string]interface{}, 2000)
+	over := make(chan bool)
+	go func() {
+	L:
+		for {
+			select {
+			case tmp := <-infoPool:
+				pool <- true
+				taskcount++
+				go func(tmp map[string]interface{}) {
+					defer func() {
+						<-pool
+					}()
+					if util.IntAll(tmp["repeat"]) == 0 {
+						info := ParseInfo(tmp)
+						if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
+							p.currentTime = info.Publishtime
+							p.startProjectMerge(info, tmp)
+						}
+					} else {
+						//信息错误,进行更新
+					}
+				}(tmp)
+			case <-over:
+				break L
+			}
+		}
+
+	}()
+	ms := sess.DB(db).C(coll).Find(q).Sort("publishtime")
+	if Sysconfig["hints"] != nil {
+		ms.Hint(Sysconfig["hints"])
+	}
+	query := ms.Iter()
+	//
+	var lastid interface{}
+L:
+	for {
+		select {
+		case <-queryClose:
+			log.Println("receive interrupt sign")
+			log.Println("close iter..", lastid, query.Cursor.Close(nil))
+			queryCloseOver <- true
+			break L
+		default:
+			tmp := make(map[string]interface{})
+			if query.Next(&tmp) {
+				lastid = tmp["_id"]
+				if count%2000 == 0 {
+					log.Println("current", count, lastid)
+				}
+				infoPool <- tmp
+				count++
+			} else {
+				break L
+			}
+		}
+	}
+	time.Sleep(5 * time.Second)
+	over <- true
+	//阻塞
+	for n := 0; n < p.thread; n++ {
+		pool <- true
+	}
+	log.Println("所有线程执行完成...", count, taskcount)
+
+}
+
+var (
+	//从标题获取项目编号
+	titleGetPc  = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
+	titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
+	titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
+	//项目编号过滤
+	pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
+	//项目编号只是数字或只是字母4个以下
+	StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
+	//纯数字或纯字母
+	StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
+)
+
+func ParseInfo(tmp map[string]interface{}) (info *Info) {
+	bys, _ := json.Marshal(tmp)
+	var thisinfo *Info
+	json.Unmarshal(bys, &thisinfo)
+	if thisinfo == nil {
+		return nil
+	}
+	if len(thisinfo.Topscopeclass) == 0 {
+		thisinfo.Topscopeclass = []string{}
+	}
+	if len(thisinfo.Subscopeclass) == 0 {
+		thisinfo.Subscopeclass = []string{}
+	}
+
+	//从标题中查找项目编号
+	res := titleGetPc.FindStringSubmatch(thisinfo.Title)
+	if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
+		thisinfo.PTC = res[1]
+	} else {
+		res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
+		if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
+			thisinfo.PTC = res[3]
+		} else {
+			res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
+			if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
+				thisinfo.PTC = res[1]
+			}
+		}
+	}
+
+	if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
+		thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
+		if thisinfo.ProjectName != "" {
+			thisinfo.pnbval++
+		}
+	}
+
+	if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
+		if thisinfo.ProjectCode != "" {
+			thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
+			if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
+				thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
+			}
+		} else {
+			thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
+			if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
+				thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
+			}
+		}
+		if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
+			thisinfo.pnbval++
+		}
+	}
+	if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
+		thisinfo.PTC = ""
+	}
+
+	if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
+		thisinfo.pnbval++
+	} else {
+		thisinfo.Buyer = ""
+	}
+	//winners整理
+	winner, _ := tmp["winner"].(string)
+	m1 := map[string]bool{}
+	winners := []string{}
+	if winner != "" {
+		m1[winner] = true
+		winners = append(winners, winner)
+	}
+	packageM, _ := tmp["package"].(map[string]interface{})
+	if packageM != nil {
+		thisinfo.HasPackage = true
+		for _, p := range packageM {
+			pm, _ := p.(map[string]interface{})
+			pw, _ := pm["winner"].(string)
+			if pw != "" && !m1[pw] {
+				m1[pw] = true
+				winners = append(winners, pw)
+			}
+		}
+	}
+	thisinfo.Winners = winners
+
+	thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
+	thisinfo.LenPTC = len([]rune(thisinfo.PTC))
+	thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
+	return thisinfo
+}
+
+//从数组中删除元素
+func deleteSlice(arr []string, v string) []string {
+	for k, v1 := range arr {
+		if v1 == v {
+			return append(arr[:k], arr[k+1:]...)
+		}
+	}
+	return arr
+}
+
+//			if taskcount > 0 && taskcount%50000 == 0 { //歇歇
+//				log.Println("pause start..", taskcount)
+//				for n := 0; n < p.thread; n++ {
+//					pool <- true
+//				}
+//				for n := 0; n < p.thread; n++ {
+//					<-pool
+//				}
+//				log.Println("pause over..")
+//			}
+//lastid = tmp["_id"]
+//tmp = make(map[string]interface{})
+//		if count > 40000 {
+//			query.Close()
+//			break
+//		}
+//over++
+//func (p *ProjectTask) saveQueue() {
+//	arr := make([]map[string]interface{}, p.saveSize)
+//	indexs := 0
+//	for {
+//		select {
+//		case <-p.saveSign:
+//			if indexs > 0 {
+//				MongoTool.SaveBulk(p.coll, arr[:indexs]...)
+//				arr = make([]map[string]interface{}, p.saveSize)
+//				indexs = 0
+//			}
+//			p.updateSign <- true
+//		case v := <-p.savePool:
+//			arr[indexs] = v
+//			indexs++
+//			if indexs == p.saveSize {
+//				MongoTool.SaveBulk(p.coll, arr...)
+//				arr = make([]map[string]interface{}, p.saveSize)
+//				indexs = 0
+//			}
+//		case <-time.After(100 * time.Millisecond):
+//			if indexs > 0 {
+//				MongoTool.SaveBulk(p.coll, arr[:indexs]...)
+//				arr = make([]map[string]interface{}, p.saveSize)
+//				indexs = 0
+//			}
+//		}
+//	}
+//}
+
+////项目保存和更新通道
+//func (p *ProjectTask) updateQueue() {
+//	arru := make([][]map[string]interface{}, p.saveSize)
+//	indexu := 0
+//	for {
+//		select {
+//		case v := <-p.updatePool:
+//			arru[indexu] = v
+//			indexu++
+//			if indexu == p.saveSize {
+//				//更新之前先保存
+//				p.saveSign <- true
+//				<-p.updateSign
+//				MongoTool.UpdateBulk(p.coll, arru...)
+//				arru = make([][]map[string]interface{}, p.saveSize)
+//				indexu = 0
+//			}
+//		case <-time.After(100 * time.Millisecond):
+//			if indexu > 0 {
+//				p.saveSign <- true
+//				<-p.updateSign
+//				MongoTool.UpdateBulk(p.coll, arru[:indexu]...)
+//				arru = make([][]map[string]interface{}, p.saveSize)
+//				indexu = 0
+//			}
+//		}
+//	}
+//}
+//func (p *ProjectTask) ConCurrentLock(n1, n2, n3, n4 int) {
+//	var lock *sync.Mutex
+//	p.LockPoolLock.Lock()
+//	if p.m1[n1] > 0 || p.m23[n2] > 0 || p.m23[n3] > 0 || p.m4[n4] > 0 {
+//		if p.l1[n1] != nil {
+//			lock = p.l1[n1]
+//		} else if p.l23[n2] != nil {
+//			lock = p.l23[n2]
+//		} else if p.l23[n3] != nil {
+//			lock = p.l23[n3]
+//		} else if p.l4[n4] != nil {
+//			lock = p.l4[n4]
+//		}
+//	} else {
+//		lock = <-p.LockPool
+//	}
+//	if n1 > 0 {
+//		p.m1[n1]++
+//		p.l1[n1] = lock
+//	}
+//	if n2 > 0 {
+//		p.m23[n2]++
+//		p.l23[n2] = lock
+//	}
+//	if n3 > 0 {
+//		p.m23[n3]++
+//		p.l23[n3] = lock
+//	}
+//	if n4 > 0 {
+//		p.m4[n4]++
+//		p.l4[n4] = lock
+//	}
+//	p.LockPoolLock.Unlock()
+//	lock.Lock()
+//}
+
+//func (p *ProjectTask) ConCurrentUnLock(n1, n2, n3, n4 int) {
+//	var lock1 *sync.Mutex
+//	p.LockPoolLock.Lock()
+//	if p.l1[n1] != nil {
+//		lock1 = p.l1[n1]
+//	} else if p.l23[n2] != nil {
+//		lock1 = p.l23[n2]
+//	} else if p.l23[n3] != nil {
+//		lock1 = p.l23[n3]
+//	} else if p.l4[n4] != nil {
+//		lock1 = p.l4[n4]
+//	}
+//	if p.m1[n1] > 0 {
+//		p.m1[n1]--
+//		if p.m1[n1] == 0 {
+//			p.l1[n1] = nil
+//		}
+//	}
+//	if p.m23[n2] > 0 {
+//		p.m23[n2]--
+//		if p.m23[n2] == 0 {
+//			p.l23[n2] = nil
+//		}
+//	}
+//	if p.m23[n3] > 0 {
+//		p.m23[n3]--
+//		if p.m23[n3] == 0 {
+//			p.l23[n3] = nil
+//		}
+//	}
+//	if p.m4[n4] > 0 {
+//		p.m4[n4]--
+//		if p.m4[n4] == 0 {
+//			p.l4[n4] = nil
+//		}
+//	}
+//	p.LockPoolLock.Unlock()
+//	lock1.Unlock()
+//}

+ 59 - 0
fullproject/src_v1/udptaskmap.go

@@ -0,0 +1,59 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	"net/http"
+	"sync"
+	"time"
+)
+
+var udptaskmap = &sync.Map{}
+var tomail string
+var api string
+
+type udpNode struct {
+	data      []byte
+	addr      *net.UDPAddr
+	timestamp int64
+	retry     int
+}
+
+func checkMapJob() {
+	//阿里云内网无法发送邮件
+	jkmail, _ := Sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+		log.Println("start checkMapJob", tomail, Sysconfig["jkmail"])
+		for {
+			udptaskmap.Range(func(k, v interface{}) bool {
+				now := time.Now().Unix()
+				node, _ := v.(*udpNode)
+				if now-node.timestamp > 120 {
+					node.retry++
+					if node.retry > 5 {
+						log.Println("udp重试失败", k)
+						udptaskmap.Delete(k)
+						res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "project-send-fail", k.(string)))
+						if err == nil {
+							defer res.Body.Close()
+							read, err := ioutil.ReadAll(res.Body)
+							log.Println("邮件发发送:", string(read), err)
+						}
+					} else {
+						log.Println("udp重发", k)
+						udpclient.WriteUdp(node.data, mu.OP_TYPE_DATA, node.addr)
+					}
+				} else if now-node.timestamp > 10 {
+					log.Println("udp任务超时中..", k)
+				}
+				return true
+			})
+			time.Sleep(60 * time.Second)
+		}
+	}
+}