Jianghan 3 年之前
父节点
当前提交
90d80a5ad6

+ 30 - 5
esmgocount/src/main.go

@@ -8,6 +8,7 @@ import (
 	"net/http"
 	"qfw/util"
 	"qfw/util/elastic"
+	"strconv"
 	"strings"
 	"time"
 
@@ -36,6 +37,7 @@ var (
 	esIndex, esDataIndex string
 	Ts                   = []*T{}
 	esQ                  = `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}`
+	esQ1                 = `{"query": {"bool": {"must": [{"range": {"id": {"gte": "%s","lt": "%s"}}},{"terms": {"bidding.site": ["元博网(采购与招标网)","中国招标与采购网"]}}]}}}`
 )
 
 func init() {
@@ -93,9 +95,11 @@ func (t *T) task() {
 	st1 := fmt.Sprintf("%x0000000000000000", st)
 	et1 := fmt.Sprintf("%x0000000000000000", et)
 	eq = fmt.Sprintf(esQ, st1, et1)
+	eq1 := fmt.Sprintf(esQ1, st1, et1)
 	es := elastic.Elastic{S_esurl: esAddr, I_size: 1}
 	es.InitElasticSize()
 	count := int(es.Count(esIndex, esIndex, eq))
+	count1 := int(es.Count(esIndex, esIndex, eq1))
 	switch t.Type {
 	case "alert":
 		if count < t.Min || count > t.Max {
@@ -103,16 +107,37 @@ func (t *T) task() {
 			t.SendMail(report)
 		}
 	case "report":
-		report := fmt.Sprintf("报告%s,统计结果%d", t.Name, count)
+		report := fmt.Sprintf("报告%s,统计结果%d", t.Name, count)
 		if len(t.Mgo) > 5 {
 			fs := strings.Split(t.Mgo, "|")
 			fmgo := mongodb.NewMgoWithUser(fs[0], fs[3], fs[1], fs[2], 1)
 			id1 := mongodb.StringTOBsonId(st1)
 			id2 := mongodb.StringTOBsonId(et1)
-			mq := bson.M{"extracttype": bson.M{"$ne": -1}, "sensitive": bson.M{"$ne": "测试"}, "dataging": bson.M{"$ne": 1}, "_id": bson.M{"$gte": id1, "$lt": id2}}
-			count2 := fmgo.Count(fs[4], mq)
-			count3 := fmgo.Count(fs[4], bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}}) //mgo总入库量
-			report += ",mgo统计" + fmt.Sprint(count2) + ",差值:" + fmt.Sprint(count2-count) + ",mgo总入库量" + fmt.Sprint(count3)
+			//mq := bson.M{"extracttype": bson.M{"$ne": -1}, "sensitive": bson.M{"$ne": "测试"}, "dataging": bson.M{"$ne": 1}, "_id": bson.M{"$gte": id1, "$lt": id2}}
+			//count2 := fmgo.Count(fs[4], mq)
+			//count3 := fmgo.Count(fs[4], bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}}) //mgo总入库量
+			//report += ",mgo统计" + fmt.Sprint(count2) + ",差值:" + fmt.Sprint(count2-count) + ",mgo总入库量" + fmt.Sprint(count3)
+			//迭代
+			mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}}
+			fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1}
+			sess := fmgo.GetMgoConn()
+			defer fmgo.DestoryMongoConn(sess)
+			count2, count3 := 0, 0 //
+			count4, count5 := 0, 0 //竟品
+			query := sess.DB(fs[3]).C(fs[4]).Find(mq).Select(fd).Iter()
+			for tmp := make(map[string]interface{}); query.Next(tmp); count2++ {
+				if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" {
+					count4++
+				}
+				if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 {
+					count3++
+					if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" {
+						count5++
+					}
+				}
+			}
+			report += ",mgo统计" + fmt.Sprint(count3) + ",差值:" + fmt.Sprint(count3-count) + ",mgo总入库量" + fmt.Sprint(count2)
+			report += "<br>" + "竟品统计结果:" + strconv.Itoa(count1) + ",mgo统计" + fmt.Sprint(count5) + ",差值:" + fmt.Sprint(count5-count1) + ",mgo总入库量" + fmt.Sprint(count4)
 		}
 		t.SendMail(report)
 	}

+ 1 - 1
forecast/es/config.json

@@ -6,7 +6,7 @@
   "uname": "",
   "upwd": "",
   "tasktime": 0,
-  "updatetime": 0,
+  "updateid": "",
   "elastic": {
     "addr": "http://192.168.3.206:9800",
     "index": "forecast_v1",

+ 15 - 15
forecast/es/main.go

@@ -7,16 +7,16 @@ import (
 )
 
 var (
-	Sysconfig        map[string]interface{} //配置文件
-	Mgo              *mongodb.MongodbSim
-	Dbname           string
-	Dbcoll           string
-	Es               *es.Elastic
-	Index            string
-	Itype            string
-	EsFields         []string
-	TaskTime         int
-	Updatetime       int64
+	Sysconfig map[string]interface{} //配置文件
+	Mgo       *mongodb.MongodbSim
+	Dbname    string
+	Dbcoll    string
+	Es        *es.Elastic
+	Index     string
+	Itype     string
+	EsFields  []string
+	TaskTime  int
+	UpdateId  string
 )
 var EsSaveCache = make(chan map[string]interface{}, 5000)
 var SP = make(chan bool, 5)
@@ -29,8 +29,8 @@ func init() {
 		MongodbAddr: Sysconfig["mgodb"].(string),
 		Size:        qu.IntAllDef(Sysconfig["dbsize"], 5),
 		DbName:      Dbname,
-		UserName: 	 Sysconfig["uname"].(string),
-		Password: 	 Sysconfig["upwd"].(string),
+		UserName:    Sysconfig["uname"].(string),
+		Password:    Sysconfig["upwd"].(string),
 	}
 	Mgo.InitPool()
 	//es
@@ -44,15 +44,15 @@ func init() {
 	Es.InitElasticSize()
 	EsFields = qu.ObjArrToStringArr(econf["esfields"].([]interface{}))
 	//TaskTime = qu.IntAll(Sysconfig["tasktime"])
-	Updatetime = qu.Int64All(Sysconfig["updatetime"])
+	UpdateId = qu.ObjToString(Sysconfig["updateid"])
 
 }
 
 func main() {
 	go SaveEs()
 
-	//go TimeTask()
-	go SaveAll()
+	go TimeTask()
+	//go SaveAll()
 	//go SaveAdd()
 	ch := make(chan bool, 1)
 	<-ch

+ 40 - 12
forecast/es/task.go

@@ -4,6 +4,7 @@ import (
 	"github.com/cron"
 	"go.mongodb.org/mongo-driver/bson"
 	"log"
+	"mongodb"
 	"qfw/util"
 	"reflect"
 	"sync"
@@ -12,13 +13,14 @@ import (
 
 //定时任务
 func TimeTask() {
+	go SaveAdd()
 	c := cron.New()
-	cronstr := "0 0 15 ? * Tue" //每周二15点执行
+	cronstr := "0 0 2 * * ?" //每天2点执行
 	//cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次
 	err := c.AddFunc(cronstr, func() { SaveAdd() })
 	if err != nil {
 		util.Debug(err)
-		return 
+		return
 	}
 	c.Start()
 }
@@ -31,15 +33,21 @@ func SaveAdd() {
 
 	pool := make(chan bool, 5)
 	wg := &sync.WaitGroup{}
-	//q := bson.M{"_id": "affe29f8d061f3faa4170cafba41f316"}
-	q := bson.M{"updatetime": bson.M{"$gt": Updatetime}}
-	util.Debug(q)
+	if UpdateId == "" {
+		util.Debug("update id err...")
+		return
+	}
+	q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(UpdateId)}}
+	util.Debug("q ---", q)
 	it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
 		if count%5000 == 0 {
 			log.Println("current:", count)
 		}
+		if UpdateId < mongodb.BsonIdToSId(tmp["_id"]) {
+			UpdateId = mongodb.BsonIdToSId(tmp["_id"])
+		}
 		pool <- true
 		wg.Add(1)
 		go func(tmp map[string]interface{}) {
@@ -53,10 +61,30 @@ func SaveAdd() {
 				if tmp[field] == nil {
 					continue
 				}
-
+				if field == "buyerclass" {
+					if reflect.TypeOf(tmp["buyerclass"]).String() == "[]interface {}" {
+						esMap["buyerclass"] = util.ObjArrToStringArr(tmp["buyerclass"].([]interface{}))[0]
+					} else {
+						esMap["buyerclass"] = tmp["buyerclass"]
+					}
+				} else {
+					esMap[field] = tmp[field]
+				}
 			}
-
-			EsSaveCache <- esMap		// 保存es
+			// 处理result
+			if mp, ok := tmp["results"].([]interface{}); ok {
+				var mpArr []map[string]interface{}
+				for _, v := range mp {
+					v1 := v.(map[string]interface{})
+					if v1["purchasing"] != nil {
+						mpArr = append(mpArr, map[string]interface{}{"purchasing": v1["purchasing"]})
+					}
+				}
+				if len(mpArr) > 0 {
+					esMap["results"] = mpArr
+				}
+			}
+			EsSaveCache <- esMap
 		}(tmp)
 		tmp = make(map[string]interface{})
 	}
@@ -95,17 +123,17 @@ func SaveAll() {
 				if field == "buyerclass" {
 					if reflect.TypeOf(tmp["buyerclass"]).String() == "[]interface {}" {
 						esMap["buyerclass"] = util.ObjArrToStringArr(tmp["buyerclass"].([]interface{}))[0]
-					}else {
+					} else {
 						esMap["buyerclass"] = tmp["buyerclass"]
 					}
-				}else {
+				} else {
 					esMap[field] = tmp[field]
 				}
 			}
 			// 处理result
 			if mp, ok := tmp["results"].([]interface{}); ok {
 				var mpArr []map[string]interface{}
-				for _, v := range mp{
+				for _, v := range mp {
 					v1 := v.(map[string]interface{})
 					if v1["purchasing"] != nil {
 						mpArr = append(mpArr, map[string]interface{}{"purchasing": v1["purchasing"]})
@@ -158,4 +186,4 @@ func SaveEs() {
 			}
 		}
 	}
-}
+}

+ 6 - 6
fullproject/src_v1/main.go

@@ -19,7 +19,7 @@ var (
 	SingleClear  = 0
 	toaddr       = []*net.UDPAddr{} //下节点对象
 	ChSign       = make(chan os.Signal)
-	Es 			 *elastic.Elastic
+	Es           *elastic.Elastic
 	Index        string
 	Itype        string
 
@@ -70,7 +70,7 @@ func DealSign() {
 	}
 }
 
-func mainT() {
+func main() {
 	//udp跑增量  id段   project
 	//udp跑全量			qlT
 	//udp跑历史数据  信息id1,id2/或id段  ls
@@ -90,9 +90,9 @@ func mainT() {
 }
 
 //测试组人员使用
-func main() {
-	sid = "60fa9ef71a75b8f446125761"
-	eid = "6193756145a326c6c3ff2833"
+func mainT() {
+	sid = "61ace36c45a326c6c325093e"
+	eid = "61b1738445a326c6c32c29e8"
 	//flag.StringVar(&sid, "sid", "", "开始id")
 	//flag.StringVar(&eid, "eid", "", "结束id")
 	//flag.Parse()
@@ -176,7 +176,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					P_QL.pici = time.Now().Unix()
 					P_QL.taskUpdatePro(mapInfo)
 				}()
-			case "deleteInfo":	// 删除招标公告
+			case "deleteInfo": // 删除招标公告
 				go func() {
 					defer func() {
 						<-SingleThread

+ 12 - 11
fullproject/src_v1/project.go

@@ -95,15 +95,13 @@ func (p *ProjectTask) getCompareIds(pn, pc, ptc, pb string) (bpn, bpc, bptc, bpb
 func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{}) {
 	p.findLock.Lock()
 	defer p.findLock.Unlock()
-	// 3.18 isfow=0数据不参与项目合并
+	// 3.18 isfow=0数据不参与项目合并		(1表示正常数据招标流程)
 	code := strings.ReplaceAll(qu.ObjToString(tmp["spidercode"]), " ", "")
 	p.mapSpiderLock.Lock()
 	isflow := p.mapSpider[code]
-	qu.Debug(code, isflow)
 	p.mapSpiderLock.Unlock()
 	if isflow == 0 {
 		p.NewProject(tmp, info)
-		qu.Debug("直接新建项目,", "project id")
 		return
 	}
 
@@ -241,7 +239,6 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 		//	qu.Debug("舍弃数据---", info.Id)
 		//	return
 		//}
-		qu.Debug("直接新建项目----")
 		id, p1 := p.NewProject(tmp, info)
 		p.AllIdsMapLock.Lock()
 		p.AllIdsMap[id] = &ID{Id: id, P: p1}
@@ -633,8 +630,10 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 	}
 	if p1.Bidamount > 0 {
 		set["sortprice"] = p1.Bidamount
-	} else if p1.Budget > 0 {
-		set["sortprice"] = p1.Budget
+	} else {
+		if p1.Budget > 0 {
+			set["sortprice"] = p1.Budget
+		}
 	}
 	push := p.PushListInfo(tmp, thisinfo.Id)
 	push["s_winner"] = strings.Join(thisinfo.Winners, ",")
@@ -957,12 +956,12 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 			if thisinfo.SubType == "流标" || thisinfo.SubType == "废标" {
 				if BinarySearch(pInfo.Winners, k) != -1 {
 					deleteSlice(pInfo.Winners, k, "")
-					sort.Strings(pInfo.Winners)
+					//sort.Strings(pInfo.Winners)
 				}
 			} else {
 				if BinarySearch(pInfo.Winners, k) == -1 {
 					pInfo.Winners = append(pInfo.Winners, k)
-					sort.Strings(pInfo.Winners)
+					//sort.Strings(pInfo.Winners)
 				}
 			}
 		}
@@ -1067,10 +1066,12 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		set["bidamount"] = pInfo.Bidamount
 		set["bidamounttag"] = 0
 	}
-	if pInfo.Bidamount >= pInfo.Budget {
+	if pInfo.Bidamount > 0 {
 		set["sortprice"] = pInfo.Bidamount
-	} else if pInfo.Budget >= pInfo.Bidamount {
-		set["sortprice"] = pInfo.Budget
+	} else {
+		if pInfo.Budget > 0 {
+			set["sortprice"] = pInfo.Budget
+		}
 	}
 
 	infofield := InfoField{

+ 30 - 32
fullproject/src_v1/task.go

@@ -89,12 +89,12 @@ type ProjectTask struct {
 	//当前时间
 	currentTime int64
 	//保存长度
-	saveSize   	int
-	pici       	int64
-	validTime  	int64
-	statusTime 	int64
+	saveSize   int
+	pici       int64
+	validTime  int64
+	statusTime int64
 	//结果时间的更新		最近两天的公告不再更新jgtime
-	jgTime		int64
+	jgTime int64
 	//	LockPool     chan *sync.Mutex
 	//	LockPoolLock sync.Mutex
 	//	m1, m23, m4  map[int]int
@@ -124,7 +124,7 @@ func NewPT() *ProjectTask {
 		coll:       ProjectColl,
 		validTime:  int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
 		statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
-		jgTime:		int64(util.IntAllDef(7, 7) * 86400),
+		jgTime:     int64(util.IntAllDef(7, 7) * 86400),
 	}
 	return p
 }
@@ -345,12 +345,12 @@ func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
 	}
 	client := Es.GetEsConn()
 	defer Es.DestoryEsConn(client)
-	esquery := `{"query": {"bool": {"must": [{"match": {"ids": "`+infoid+`"}}]}}}`
+	esquery := `{"query": {"bool": {"must": [{"match": {"ids": "` + infoid + `"}}]}}}`
 	data := Es.Get(Index, Itype, esquery)
 	if len(*data) > 0 {
 		pid := util.ObjToString(((*data)[0])["_id"])
 		p.updateJudge(infoMap, pid)
-	}else {
+	} else {
 		util.Debug("not find project---,", infoid)
 	}
 }
@@ -371,10 +371,10 @@ func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
 		delete(proMap, "reason")
 		updataMap := make(map[string]interface{})
 		modifyInfo := make(map[string]interface{})
-		for k, v := range *updateMap{
+		for k, v := range *updateMap {
 			if strings.Contains(k, "time") {
 				updataMap[k] = util.Int64All(v)
-			}else {
+			} else {
 				updataMap[k] = v
 			}
 			modifyInfo[k] = true
@@ -406,7 +406,7 @@ func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
 			v.P = &pro
 		}
 		p.AllIdsMapLock.Unlock()
-	}else {
+	} else {
 		util.Debug("Not find project---", pid)
 	}
 }
@@ -421,12 +421,12 @@ func (p *ProjectTask) delInfoPro(udpInfo map[string]interface{}) {
 	}
 	client := Es.GetEsConn()
 	defer Es.DestoryEsConn(client)
-	esquery := `{"query": {"bool": {"must": [{"term": {"ids": "`+infoid+`"}}]}}}`
+	esquery := `{"query": {"bool": {"must": [{"term": {"ids": "` + infoid + `"}}]}}}`
 	data := Es.Get(Index, Itype, esquery)
 	if len(*data) > 0 {
 		pid := util.ObjToString(((*data)[0])["_id"])
 		p.delJudge(infoid, pid)
-	}else {
+	} else {
 		util.Debug("not find project---,", infoid)
 	}
 }
@@ -490,7 +490,7 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 						//普通合并
 						p.CommonMerge(tmp, info)
 					}(tmp)
-				case <- over:
+				case <-over:
 					break L
 				}
 			}
@@ -523,7 +523,7 @@ L:
 					if count%20000 == 0 {
 						log.Println("current", count, lastid)
 					}
-				}else {
+				} else {
 					if count%1000 == 0 {
 						log.Println("current", count, lastid)
 					}
@@ -531,12 +531,12 @@ L:
 				if util.IntAll(tmp["repeat"]) == 0 {
 					if P_QL.currentType == "ql" {
 						infoPool <- tmp
-					}else if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 {
+					} else if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 {
 						infoPool <- tmp
-					}else {
+					} else {
 						util.Debug("增量   dataging == 1 ", tmp["_id"])
 					}
-				}else {
+				} else {
 					countRepeat++
 					//if P_QL.currentType == "project" {
 					//	util.Debug("repeat err---", tmp["_id"])
@@ -561,7 +561,7 @@ L:
 func (p *ProjectTask) CommonMerge(tmp map[string]interface{}, info *Info) {
 	if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
 		if jsonData, ok := tmp["jsondata"].(map[string]interface{}); ok {
-			proHref := util.ObjToString(jsonData["projecthref"])
+			proHref := util.ObjToString(jsonData["projecthref"]) // 网站本身发布的公告具有招投标流程,直接参与合并
 			if jsonData != nil && proHref != "" {
 				//projectHref字段合并
 				tmp["projecthref"] = proHref
@@ -631,7 +631,7 @@ func ParseInfo(tmp map[string]interface{}) (info *Info) {
 	if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
 		//thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
 		//if thisinfo.ProjectName != "" {
-			thisinfo.pnbval++
+		thisinfo.pnbval++
 		//}
 	}
 
@@ -701,7 +701,6 @@ func ParseInfo(tmp map[string]interface{}) (info *Info) {
 	tmp["buyer"] = buyer
 	thisinfo.Buyer = buyer
 
-
 	thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
 	thisinfo.LenPTC = len([]rune(thisinfo.PTC))
 	thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
@@ -742,16 +741,16 @@ func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
 	_, ok := p.AllIdsMap[pid]
 	p.AllIdsMapLock.Unlock()
 	ids := []interface{}(tmpPro["ids"].(primitive.A))
-	index, position := -1, 0		// index 0:第一个,1:中间,2:最后一个   position list中位置
+	index, position := -1, 0 // index 0:第一个,1:中间,2:最后一个   position list中位置
 
 	for i, v := range ids {
 		if util.ObjToString(v) == mongodb.BsonIdToSId(infoMap["_id"]) {
 			position = i
 			if i == 0 {
 				index = 0
-			}else if i == len(ids) - 1 {
+			} else if i == len(ids)-1 {
 				index = 2
-			}else {
+			} else {
 				index = 1
 			}
 		}
@@ -767,10 +766,10 @@ func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
 				p.mapHrefLock.Unlock()
 				if pid == tempId {
 					p.modifyUpdate(pid, index, position, tmpPro, modifyProMap)
-				}else {
-					util.Debug("projecthref data id err---pid=" + pid, "---"+tempId)
+				} else {
+					util.Debug("projecthref data id err---pid="+pid, "---"+tempId)
 				}
-			}else {
+			} else {
 				f := modifyEle(modifyProMap)
 				if f {
 					//合并、修改
@@ -782,7 +781,7 @@ func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
 					p.modifyUpdate(pid, index, position, tmpPro, modifyProMap)
 				}
 			}
-		}else {
+		} else {
 			f := modifyEle(modifyProMap)
 			if f {
 				//合并、修改
@@ -794,7 +793,7 @@ func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
 				p.modifyUpdate(pid, index, position, tmpPro, modifyProMap)
 			}
 		}
-	}else {
+	} else {
 		// 周期外
 		util.Debug("周期外数据直接修改", "----------------------------")
 		p.modifyUpdate(pid, index, position, tmpPro, modifyProMap)
@@ -814,8 +813,8 @@ var Elements = []string{
 }
 
 /**
-	修改的字段
-	修改的字段是否是影响合并流程的要素字段
+修改的字段
+修改的字段是否是影响合并流程的要素字段
 */
 func modifyEle(tmp map[string]interface{}) bool {
 	merge := false
@@ -934,4 +933,3 @@ func QyFilter(name, stype string) string {
 	}
 	return name
 }
-

+ 4 - 4
qyxy/qyxy_change/config.json

@@ -1,9 +1,9 @@
 {
-  "dbServer": "172.17.4.187:27082,172.17.145.163:27083",
-  "dbName": "mixdata",
+  "dbServer": "192.168.3.207:27092",
+  "dbName": "wjh",
   "dbSize": 10,
-  "uname": "SJZY_RWMIX_Other",
-  "upwd": "SJZY@M34I6x7D9ata",
+  "uname": "",
+  "upwd": "",
   "coll_qy": "qyxy_std",
   "coll_change": "qyxy_change",
   "lastId": 0,

+ 15 - 5
qyxy/qyxy_change/main.go

@@ -30,7 +30,7 @@ func init() {
 	}
 	MgoMix.InitPool()
 	Mgo = &mongodb.MongodbSim{
-		MongodbAddr: "172.17.4.181:27001",
+		MongodbAddr: "172.17.4.181:27001", // 172.17.4.181:27001
 		Size:        10,
 		DbName:      "mixdata",
 	}
@@ -61,13 +61,23 @@ func initChangeMap() {
 }
 
 func main() {
-	go SaveData()
-	//go updateMethod()
+	//go SaveData()
+	go updateMethod()
 
 	//go TimeTask()
 	go GetData()
 	//go TaskAll()
+	//ch := make(chan bool, 1)
+	//<-ch
+
+	//var id string
+	//flag.StringVar(&id, "id", "", "公司id")
+	//flag.Parse()
+	//if id == "" {
+	//	flag.PrintDefaults()
+	//	log.Println("参数错误.")
+	//} else {
+	//	taskInfo(id)
+	//}
 
-	ch := make(chan bool, 1)
-	<-ch
 }

+ 41 - 16
qyxy/qyxy_change/task.go

@@ -4,7 +4,6 @@ import (
 	"fmt"
 	"github.com/cron"
 	"go.mongodb.org/mongo-driver/bson"
-	"go.mongodb.org/mongo-driver/bson/primitive"
 	"log"
 	"qfw/util"
 	"regexp"
@@ -55,20 +54,22 @@ func GetData() {
 			query := bson.M{"company_id": tmp["company_id"]}
 			info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1})
 			if b && len(*info) > 0 {
-				update := make(map[string]interface{})
-				item := make(map[string]interface{})
-				item["change_field"] = tmp["change_field"]
-				item["content_before"] = tmp["content_before"]
-				item["content_after"] = tmp["content_after"]
-				item["change_date"] = tmp["change_date"]
-				setMark(item) //change_name_new
-				//update["changes"] = changes
-				update["update_time"] = currentTime
-				saveInfo := []map[string]interface{}{
-					{"_id": (*info)["_id"]},
-					{"$set": update, "$push": map[string]interface{}{"changes": item}},
+				if util.ObjToString(tmp["_operation_type"]) == "insert" {
+					update := make(map[string]interface{})
+					item := make(map[string]interface{})
+					item["change_field"] = tmp["change_field"]
+					item["content_before"] = tmp["content_before"]
+					item["content_after"] = tmp["content_after"]
+					item["change_date"] = tmp["change_date"]
+					setMark(item) //change_name_new
+					//update["changes"] = changes
+					update["update_time"] = currentTime
+					saveInfo := []map[string]interface{}{
+						{"company_id": tmp["company_id"]},
+						{"$set": update, "$push": map[string]interface{}{"changes": item}},
+					}
+					updatePool <- saveInfo
 				}
-				updatePool <- saveInfo
 			} else {
 				query := bson.M{"_id": tmp["company_id"]}
 				qyxy, b1 := MgoMix.FindOne("qyxy_std", query)
@@ -88,9 +89,9 @@ func GetData() {
 					save["changes"] = changes
 					save["create_time"] = currentTime
 					save["update_time"] = currentTime
-					save["_id"] = primitive.NewObjectID()
+					//save["_id"] = primitive.NewObjectID()
 					saveInfo := []map[string]interface{}{
-						{"_id": save["_id"]},
+						{"company_id": tmp["company_id"]},
 						{"$set": save},
 					}
 					updatePool <- saveInfo
@@ -169,6 +170,29 @@ func TaskAll() {
 	}
 }
 
+func taskInfo(id string) {
+	update := make(map[string]interface{})
+	q := map[string]interface{}{"company_id": id}
+	info, _ := Mgo.Find("company_change", q, nil, nil, false, -1, -1)
+	if len(*info) > 0 {
+		var changes []map[string]interface{}
+		currentTime := time.Now().Unix()
+		for _, v := range *info {
+			item := make(map[string]interface{})
+			item["change_field"] = v["change_field"]
+			item["content_before"] = v["content_before"]
+			item["content_after"] = v["content_after"]
+			item["change_date"] = v["change_date"]
+			setMark(item) //change_name_new
+			changes = append(changes, item)
+		}
+		update["changes"] = changes
+		update["update_time"] = currentTime
+	}
+	util.Debug(update)
+	MgoMix.Update("qyxy_change", q, map[string]interface{}{"$set": update}, false, false)
+}
+
 func disposeFuc(maps []map[string]interface{}, tmp map[string]interface{}) {
 	var changes []map[string]interface{}
 	currentTime := time.Now().Unix()
@@ -238,6 +262,7 @@ func updateMethod() {
 					defer func() {
 						<-updateSp
 					}()
+
 					MgoMix.UpSertBulk(CollSave, arru...)
 				}(arru)
 				arru = make([][]map[string]interface{}, 200)

+ 2 - 1
qyxy/qyxy_es/config.json

@@ -53,7 +53,8 @@
       "company_shortname",
       "website_url",
       "partners",
-      "employees"
+      "employees",
+      "tag_business"
     ]
   }
 }

+ 2 - 2
qyxy/qyxy_es/main.go

@@ -52,8 +52,8 @@ func main() {
 	go SaveEs()
 
 	//go TimeTask()
-	go StdAll()
-	//go StdAdd()
+	//go StdAll()
+	go StdAdd()
 	ch := make(chan bool, 1)
 	<-ch
 }

+ 2 - 2
qyxy/qyxy_es/task.go

@@ -18,7 +18,7 @@ var (
 
 	TypeMap = map[string]string{
 		"采购单位": "1",
-		"投标企业": "2",
+		"中标单位": "2", // 投标企业
 		"代理机构": "3",
 		"厂商":   "4",
 	}
@@ -57,7 +57,7 @@ func StdAdd() {
 	it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
-		if count%5000 == 0 {
+		if count%10000 == 0 {
 			log.Println("current:", count)
 		}
 		if util.IntAll(tmp["use_flag"]) > 5 {

+ 48 - 18
qyxy/qyxy_inc_data/main.go

@@ -10,29 +10,34 @@ import (
 	"io/ioutil"
 	"log"
 	"mongodb"
+	"net/http"
+	"net/url"
 	"os"
 	"qfw/util"
+	"strconv"
+	"strings"
 	"time"
 )
 
 var (
-	MongoTool			*mongodb.MongodbSim
+	MongoTool *mongodb.MongodbSim
 
-	updatePool			chan []map[string]interface{}
-	updateSp			chan bool
-	saveSize			int
+	updatePool chan []map[string]interface{}
+	updateSp   chan bool
+	saveSize   int
 
-	CurrentColl			string
+	CurrentColl string
 
-	SkipCollName		string
-
-	saveArr				[][]map[string]interface{}
+	SkipCollName string
+	sendMsg      string
+	collCount    int
 
+	saveArr [][]map[string]interface{}
 )
 
 func init() {
 	MongoTool = &mongodb.MongodbSim{
-		MongodbAddr: "172.17.4.181:27002",
+		MongodbAddr: "172.17.4.181:27001",
 		Size:        10,
 		DbName:      "mixdata",
 	}
@@ -44,7 +49,6 @@ func init() {
 
 	SkipCollName = "annual_report_base,annual_report_website,company_base,company_employee,company_history_name,company_partner"
 
-
 }
 
 func main() {
@@ -60,7 +64,6 @@ func main() {
 	// /nas/qyxy/ftp/ftpuser/upload/20210811/
 	task(path)
 
-
 	//sess := MongoTool.GetMgoConn()
 	//defer MongoTool.DestoryMongoConn(sess)
 	//
@@ -102,10 +105,11 @@ func task(path string) {
 			//	continue
 			//}
 			CurrentColl = f.Name()
+			collCount = 0
 			util.Debug("collection name:---", f.Name())
 			subPath := path + f.Name() + "/"
 			subFiles, _ := ioutil.ReadDir(subPath)
-			for _, s := range subFiles{
+			for _, s := range subFiles {
 				if s.IsDir() {
 					taskinfo(subPath + s.Name())
 				}
@@ -115,8 +119,10 @@ func task(path string) {
 				MongoTool.UpSertBulk(CurrentColl, tmps...)
 				saveArr = [][]map[string]interface{}{}
 			}
+			sendMsg += f.Name() + ":" + strconv.Itoa(collCount) + ";"
 		}
 	}
+	SendMail(sendMsg)
 }
 
 func taskinfo(path string) {
@@ -143,29 +149,29 @@ func taskinfo(path string) {
 	bfRd := bufio.NewReader(gr)
 	for {
 		line, err := bfRd.ReadBytes('\n')
-		count  = hookfn(line, count)
+		count = hookfn(line, count)
 		if err != nil {
 			if err == io.EOF {
 				fmt.Println("read gzip data finish! ")
 				break
-			}else {
+			} else {
 				fmt.Println("[read gzip data err]: ", err)
 			}
 		}
-		if count % 5000 == 0 {
+		if count%5000 == 0 {
 			util.Debug("current exc---", file, count)
 		}
 	}
 }
 
-
 func hookfn(line []byte, count int) int {
 	tmp := make(map[string]interface{})
-	err:= json.Unmarshal(line, &tmp)
+	err := json.Unmarshal(line, &tmp)
 	if err != nil {
 		util.Debug("err---", err)
 	}
-	count ++
+	count++
+	collCount++
 	tmp["_id"] = util.IntAll(tmp["id"])
 	tmp["id"] = fmt.Sprintf("%d", util.IntAll(tmp["id"]))
 
@@ -217,3 +223,27 @@ func updateMethod() {
 		}
 	}
 }
+
+func SendMail(report string) {
+	api := "http://172.17.145.179:19281/_send/_mail"
+	to := "wangjianghan@topnet.net.cn"
+	_, _ = http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, "凭安数据推送!", report))
+}
+
+func SendMsg(msg string) {
+	v := url.Values{}
+	v.Set("token", "9d1554434aed46018d382737fff25288")
+	v.Set("title", "凭安数据推送")
+	v.Set("content", msg)
+	v.Set("template", "json")
+	body := ioutil.NopCloser(strings.NewReader(v.Encode()))
+	client := &http.Client{}
+	req, _ := http.NewRequest("POST", "http://pushplus.hxtrip.com/send", body)
+	req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
+	resp, err := client.Do(req)
+	defer func(Body io.ReadCloser) {
+		_ = Body.Close()
+	}(resp.Body)
+	data, _ := ioutil.ReadAll(resp.Body)
+	fmt.Println(string(data), err)
+}

+ 46 - 51
qyxy/qyxy_redis/main.go

@@ -12,30 +12,30 @@ import (
 )
 
 var (
-	MongoTool		*mongodb.MongodbSim
+	MongoTool *mongodb.MongodbSim
 
-	PnameLength			int
-	PurchasingLength 	int
+	PnameLength      int
+	PurchasingLength int
 )
 
 func init() {
+	MongoTool = &mongodb.MongodbSim{
+		MongodbAddr: "172.17.4.85:27080", // 172.17.4.85:27080
+		Size:        10,
+		DbName:      "qfw",
+	}
+	MongoTool.InitPool()
 	//MongoTool = &mongodb.MongodbSim{
-	//	MongodbAddr: "172.17.4.85:27080",					// 172.17.4.85:27080
+	//	MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083",
 	//	Size:        10,
 	//	DbName:      "mixdata",
+	//	UserName:    "SJZY_RWMIX_Other",
+	//	Password:    "SJZY@M34I6x7D9ata",
 	//}
 	//MongoTool.InitPool()
-	MongoTool = &mongodb.MongodbSim{
-		MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083",
-		Size:        10,
-		DbName:      "mixdata",
-		UserName:	 "SJZY_RWMIX_Other",
-		Password: 	 "SJZY@M34I6x7D9ata",
-	}
-	MongoTool.InitPool()
 
-	redis.InitRedis1("qyxy_id=172.17.4.189:8379", 2)				// 企业company_id(中标单位)
-	//redis.InitRedis1("qyxy_winner=172.17.4.189:8379", 3)			//	剑鱼中标企业搜索搜索
+	//redis.InitRedis1("qyxy_id=172.17.4.189:8379", 2)				// 企业company_id(中标单位)
+	redis.InitRedis1("qyxy=172.17.4.189:8379", 3) //	剑鱼企业搜索搜索
 
 	PnameLength = 500
 	PurchasingLength = 500
@@ -43,7 +43,7 @@ func init() {
 
 var name string
 
-func main1()  {
+func main1() {
 	flag.StringVar(&name, "n", "", "企业名称")
 	flag.Parse()
 	if name == "" {
@@ -63,7 +63,7 @@ func main() {
 	sess := MongoTool.GetMgoConn()
 	defer MongoTool.DestoryMongoConn(sess)
 
-	ch := make(chan bool, 2)
+	ch := make(chan bool, 3)
 	wg := &sync.WaitGroup{}
 
 	//q := map[string]interface{}{
@@ -72,20 +72,14 @@ func main() {
 	//		"$lte": mongodb.StringTOBsonId("5e0a1f000000000000000000"),
 	//	},
 	//}
-	//field := map[string]interface{}{"agency": 1, "buyer": 1, "s_winner": 1, "projectname": 1, "purchasing": 1, "area": 1}
-	field := map[string]interface{}{"use_flag": 1, "company_type": 1, "company_name": 1}
-	query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(field).Skip(52035000).Iter()
+	field := map[string]interface{}{"agency": 1, "buyer": 1, "s_winner": 1, "projectname": 1, "purchasing": 1, "area": 1}
+	//field := map[string]interface{}{"use_flag": 1, "company_type": 1, "company_name": 1}
+	query := sess.DB("qfw").C("projectset_20200918").Find(nil).Select(field).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 		if count%5000 == 0 {
 			util.Debug("current ---", count)
 		}
-		if util.IntAll(tmp["use_flag"]) > 5 {
-			continue
-		}
-		if strings.Contains(util.ObjToString(tmp["company_type"]), "个体") {
-			continue
-		}
 		ch <- true
 		wg.Add(1)
 		go func(tmp map[string]interface{}) {
@@ -93,7 +87,8 @@ func main() {
 				<-ch
 				wg.Done()
 			}()
-			redis.PutCKV("qyxy_id", util.ObjToString(tmp["company_name"]), util.ObjToString(tmp["_id"]))
+			//redis.PutCKV("qyxy_id", util.ObjToString(tmp["company_name"]), util.ObjToString(tmp["_id"]))
+			taskinfo1(tmp)
 		}(tmp)
 		tmp = make(map[string]interface{})
 	}
@@ -104,10 +99,10 @@ func main() {
 func taskinfo1(tmp map[string]interface{}) {
 	if util.ObjToString(tmp["s_winner"]) != "" {
 		winners := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
-		for _, v := range winners{
+		for _, v := range winners {
 			winnermap := make(map[string]interface{})
-			if b, err := redis.Exists("qyxy_winner", v); err == nil && b {
-				text := redis.GetStr("qyxy_winner", v)
+			if b, err := redis.Exists("qyxy", v); err == nil && b {
+				text := redis.GetStr("qyxy", v)
 				err1 := json.Unmarshal([]byte(text), &winnermap)
 				if err1 != nil {
 					util.Debug(v, "winner-----map解析异常")
@@ -124,10 +119,10 @@ func taskinfo1(tmp map[string]interface{}) {
 						if len(pnameArr) > PnameLength {
 							pnameArr1 := pnameArr[len(pnameArr)-PnameLength:]
 							winnermap["bid_projectname"] = pnameArr1
-						}else {
+						} else {
 							winnermap["bid_projectname"] = pnameArr
 						}
-					}else {
+					} else {
 						pname := []string{util.ObjToString(tmp["projectname"])}
 						winnermap["bid_projectname"] = pname
 					}
@@ -143,10 +138,10 @@ func taskinfo1(tmp map[string]interface{}) {
 						if len(purs) > PurchasingLength {
 							purs1 := purs[len(purs)-PurchasingLength:]
 							winnermap["bid_purchasing"] = purs1
-						}else {
+						} else {
 							winnermap["bid_purchasing"] = purs
 						}
-					}else {
+					} else {
 						purs := []string{util.ObjToString(tmp["purchasing"])}
 						winnermap["bid_purchasing"] = purs
 					}
@@ -159,7 +154,7 @@ func taskinfo1(tmp map[string]interface{}) {
 						areas = append(areas, util.ObjToString(tmp["area"]))
 					}
 					winnermap["bid_area"] = areas
-				}else {
+				} else {
 					areas := []string{util.ObjToString(tmp["area"])}
 					winnermap["bid_area"] = areas
 				}
@@ -171,11 +166,11 @@ func taskinfo1(tmp map[string]interface{}) {
 						types = append(types, "中标单位")
 					}
 					winnermap["bid_unittype"] = types
-				}else {
+				} else {
 					types := []string{"中标单位"}
 					winnermap["bid_unittype"] = types
 				}
-			}else {
+			} else {
 				// bid_projectname
 				if tmp["projectname"] != nil && tmp["projectname"] != "" {
 					winnermap["bid_projectname"] = []string{util.ObjToString(tmp["projectname"])}
@@ -190,16 +185,16 @@ func taskinfo1(tmp map[string]interface{}) {
 				winnermap["bid_unittype"] = []string{"中标单位"}
 			}
 			if len(winnermap) > 0 {
-				data , _ := json.Marshal(winnermap)
-				redis.Put("qyxy_winner", v, string(data), 0)
+				data, _ := json.Marshal(winnermap)
+				redis.Put("qyxy", v, string(data), 0)
 
 			}
 		}
 	}
 	if buyer := util.ObjToString(tmp["buyer"]); buyer != "" {
 		buyermap := make(map[string]interface{})
-		if b, err := redis.Exists("qyxy_winner", buyer); err == nil && b {
-			text := redis.GetStr("qyxy_winner", buyer)
+		if b, err := redis.Exists("qyxy", buyer); err == nil && b {
+			text := redis.GetStr("qyxy", buyer)
 			err1 := json.Unmarshal([]byte(text), &buyermap)
 			if err1 != nil {
 				util.Debug(buyer, "buyer-----map解析异常")
@@ -212,22 +207,22 @@ func taskinfo1(tmp map[string]interface{}) {
 					types = append(types, "采购单位")
 				}
 				buyermap["bid_unittype"] = types
-			}else {
+			} else {
 				types := []string{"采购单位"}
 				buyermap["bid_unittype"] = types
 			}
-		}else {
+		} else {
 			buyermap["bid_unittype"] = []string{"采购单位"}
 		}
 		if len(buyermap) > 0 {
-			data , _ := json.Marshal(buyermap)
-			redis.Put("qyxy_winner", buyer, string(data), 0)
+			data, _ := json.Marshal(buyermap)
+			redis.Put("qyxy", buyer, string(data), 0)
 		}
 	}
 	if agency := util.ObjToString(tmp["agency"]); agency != "" {
 		agencymap := make(map[string]interface{})
-		if b, err := redis.Exists("qyxy_winner", agency); err == nil && b {
-			text := redis.GetStr("qyxy_winner", agency)
+		if b, err := redis.Exists("qyxy", agency); err == nil && b {
+			text := redis.GetStr("qyxy", agency)
 			err1 := json.Unmarshal([]byte(text), &agencymap)
 			if err1 != nil {
 				util.Debug(agency, "agency----map解析异常")
@@ -240,16 +235,16 @@ func taskinfo1(tmp map[string]interface{}) {
 					types = append(types, "代理机构")
 				}
 				agencymap["bid_unittype"] = types
-			}else {
+			} else {
 				types := []string{"代理机构"}
 				agencymap["bid_unittype"] = types
 			}
-		}else {
+		} else {
 			agencymap["bid_unittype"] = []string{"代理机构"}
 		}
 		if len(agencymap) > 0 {
-			data , _ := json.Marshal(agencymap)
-			redis.Put("qyxy_winner", agency, string(data), 0)
+			data, _ := json.Marshal(agencymap)
+			redis.Put("qyxy", agency, string(data), 0)
 		}
 	}
 }
@@ -268,4 +263,4 @@ func BinarySearch(s []string, k string) int {
 		}
 	}
 	return -1
-}
+}

+ 1 - 1
qyxy/qyxy_std/init.go

@@ -79,7 +79,7 @@ func init() {
 		log.Fatal("结巴分词出错...")
 		return
 	}
-	redis.InitRedis1("qyxy_winner=127.0.0.1:8379", 3)
+	redis.InitRedis1("qyxy=172.17.4.189:8379", 3)
 }
 
 func InitQyStype() {

+ 9 - 6
qyxy/qyxy_std/task.go

@@ -407,12 +407,11 @@ func IncStd(tmp map[string]interface{}) {
 		if util.ObjToString(save["company_email"]) != "" {
 			types = append(types, "邮箱")
 		}
-
 		companyName := util.ObjToString(save["company_name"])
 		// redis bid_unittype、bid_purchasing、bid_projectname
-		if b, err := redis.Exists("qyxy_winner", companyName); err == nil && b {
+		if b, err := redis.Exists("qyxy", companyName); err == nil && b {
 			maps := make(map[string]interface{})
-			text := redis.GetStr("qyxy_winner", companyName)
+			text := redis.GetStr("qyxy", companyName)
 			err1 := json.Unmarshal([]byte(text), &maps)
 			if err1 != nil {
 				util.Debug(companyName, "winner-----map解析异常")
@@ -440,7 +439,11 @@ func IncStd(tmp map[string]interface{}) {
 			}
 		}
 		if flag {
-			save["bid_unittype"] = []string{"厂商"}
+			if save["bid_unittype"] != nil {
+				save["bid_unittype"] = append(util.ObjArrToStringArr(save["bid_unittype"].([]interface{})), "厂商")
+			} else {
+				save["bid_unittype"] = []string{"厂商"}
+			}
 		}
 		// search_type
 		if t := util.ObjToString(save["company_type"]); t != "" {
@@ -598,9 +601,9 @@ func InfoStd(tmp map[string]interface{}) {
 
 		companyName := util.ObjToString(tmp["company_name"])
 		// redis bid_unittype、bid_purchasing、bid_projectname
-		if b, err := redis.Exists("qyxy_winner", companyName); err == nil && b {
+		if b, err := redis.Exists("qyxy", companyName); err == nil && b {
 			maps := make(map[string]interface{})
-			text := redis.GetStr("qyxy_winner", companyName)
+			text := redis.GetStr("qyxy", companyName)
 			err1 := json.Unmarshal([]byte(text), &maps)
 			if err1 != nil {
 				util.Debug(companyName, "winner-----map解析异常")

+ 40 - 28
qyxy/save_mgo/main.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"context"
-	"encoding/json"
 	"fmt"
 	"github.com/go-redis/redis"
 	"go.mongodb.org/mongo-driver/bson"
@@ -10,36 +9,43 @@ import (
 	"qfw/util"
 	"reflect"
 	"strconv"
+	"strings"
 	"time"
 )
 
 var (
-	MongoTool		*mongodb.MongodbSim
-	rdb 			*redis.Client
+	MongoTool *mongodb.MongodbSim
+	rdb       *redis.Client
 )
 
 func init() {
+	//MongoTool = &mongodb.MongodbSim{
+	//	MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
+	//	Size:        10,
+	//	DbName:      "mixdata",
+	//	UserName:    "SJZY_RWESBid_Other",
+	//	Password:    "SJZY@O17t8herB3B",
+	//}
+	//MongoTool.InitPool()
 	MongoTool = &mongodb.MongodbSim{
-		MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
+		MongodbAddr: "172.17.4.85:27080",
 		Size:        10,
-		DbName:      "mixdata",
-		UserName: 	 "SJZY_RWESBid_Other",
-		Password:	 "SJZY@O17t8herB3B",
+		DbName:      "qfw",
 	}
 	MongoTool.InitPool()
 }
 
 func initRedis() (err error) {
-	ctx, cancel := context.WithTimeout(context.Background(), time.Second * 10)
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 	defer cancel()
 	rdb = redis.NewClient(&redis.Options{
-		Addr: "127.0.0.1:8379",
+		Addr:     "127.0.0.1:8379",
 		PoolSize: 200,
-		DB: 3,
+		DB:       5,
 	})
 	_, err = rdb.Ping(ctx).Result()
-	if err !=nil{
-		fmt.Println("ping redis failed err:",err)
+	if err != nil {
+		fmt.Println("ping redis failed err:", err)
 		return err
 	}
 	return nil
@@ -47,8 +53,8 @@ func initRedis() (err error) {
 
 func main() {
 	err := initRedis()
-	if err !=nil{
-		fmt.Println("init redis failed err :",err)
+	if err != nil {
+		fmt.Println("init redis failed err :", err)
 		return
 	}
 	ctx := context.Background()
@@ -56,26 +62,31 @@ func main() {
 	var keys []string
 	n := 0
 	for {
-		keys, cursor, err = rdb.Scan(ctx, cursor,"*",500).Result()
-		if err !=nil{
-			fmt.Println("scan keys failed err:",err)
+		keys, cursor, err = rdb.Scan(ctx, cursor, "*", 500).Result()
+		if err != nil {
+			fmt.Println("scan keys failed err:", err)
 			return
 		}
 		util.Debug("---keys---", len(keys))
 		n += len(keys)
-		for _,key := range keys{
+		for _, key := range keys {
 			val, err := rdb.Get(ctx, key).Result()
-			if err != nil{
+			if err != nil {
 				fmt.Println("get key values failed err:", err)
 				return
 			}
-			val, _ = strconv.Unquote(val)						//  处理json字符串带转义符号
-			maps := make(map[string]interface{})
-			err1 := json.Unmarshal([]byte(val), &maps)
-			if err1 != nil {
-				util.Debug("-----map解析异常")
-			}
-			taskinfo(ctx, key, maps)
+
+			val, _ = strconv.Unquote(val)
+			arr := strings.Split(key, ",")
+			MongoTool.Save("first_cooperation", map[string]interface{}{"buyer": arr[0], "winner": arr[1], "project_id": val})
+
+			//val, _ = strconv.Unquote(val) //  处理json字符串带转义符号
+			//maps := make(map[string]interface{})
+			//err1 := json.Unmarshal([]byte(val), &maps)
+			//if err1 != nil {
+			//	util.Debug("-----map解析异常")
+			//}
+			//taskinfo(ctx, key, maps)
 		}
 		util.Debug("current---", n, cursor)
 		if cursor == 0 {
@@ -85,7 +96,7 @@ func main() {
 	}
 }
 
-func taskinfo(ctx context.Context, name string, tmp map[string]interface{})  {
+func taskinfo(ctx context.Context, name string, tmp map[string]interface{}) {
 	q := bson.M{"company_name": name}
 	info, b := MongoTool.FindOneByField("qyxy_std", q, nil)
 	if b && len(*info) > 0 {
@@ -95,8 +106,9 @@ func taskinfo(ctx context.Context, name string, tmp map[string]interface{})  {
 			t2 = append(t2, t1...)
 			tmp["bid_unittype"] = Duplicate(t2)
 		}
+		tmp["updatetime"] = time.Now().Unix()
 		MongoTool.Update("qyxy_std", bson.M{"_id": (*info)["_id"]}, bson.M{"$set": tmp}, true, false)
-	}else {
+	} else {
 		tmp["company_name"] = name
 		MongoTool.Save("qyxy_std_err", tmp)
 		rdb.Del(ctx, name)

+ 13 - 9
udpcreateindex/src/biddingdata.go

@@ -29,7 +29,11 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 	session := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(session)
 	//连接信息
-	c, _ := bidding["collect"].(string)
+	//连接信息
+	c, _ := mapInfo["coll"].(string)
+	if c == "" {
+		c, _ = bidding["collect"].(string)
+	}
 	db, _ := bidding["db"].(string)
 	index := "bidding_all"
 	itype := "bidding"
@@ -41,12 +45,12 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 	query := session.DB(db).C(c).Find(q).Select(bson.M{
 		"projectinfo.attachment": 0,
 		"contenthtml":            0,
-		"publishdept":			  0,
-		"detail": 				  0,
-		"projectscope":			  0,
-		"project_scale":		  0,
-		"bidstatus":			  0,
-		"china_bidding":		  0,
+		"publishdept":            0,
+		"detail":                 0,
+		"projectscope":           0,
+		"project_scale":          0,
+		"bidstatus":              0,
+		"china_bidding":          0,
 	}).Sort("_id").Iter()
 	n := 0
 	//更新数组
@@ -150,7 +154,7 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 						}
 					} else if field == "review_experts" {
 						// 评审专家
-						if arr, ok :=tmp["review_experts"].([]interface{}); ok && len(arr) > 0 {
+						if arr, ok := tmp["review_experts"].([]interface{}); ok && len(arr) > 0 {
 							arr1 := qutil.ObjArrToStringArr(arr)
 							newTmp[field] = strings.Join(arr1, ",")
 						}
@@ -210,4 +214,4 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 	}
 	UpdatesLock.Unlock()
 	log.Println(mapInfo, "create biddingdata index...over", n)
-}
+}

+ 15 - 16
udpcreateindex/src/biddingindex.go

@@ -263,7 +263,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 			//				}
 			//				update["s_winner"] = strings.Join(winnerarr, ",")
 			//			}
-		}else {
+		} else {
 			area := qutil.ObjToString(tmp["area"])
 			city := qutil.ObjToString(tmp["city"])
 			district := qutil.ObjToString(tmp["district"])
@@ -295,7 +295,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 						if len(*ents) > 0 {
 							id = qutil.ObjToString((*ents)[0]["_id"])
 							redis.PutCKV("qyxy_id", w, id)
-						}else {
+						} else {
 							ent, _ := qyxydb.FindOne("company_history_name", bson.M{"history_name": w})
 							if len(*ent) > 0 {
 								id = qutil.ObjToString((*ent)["company_id"])
@@ -448,7 +448,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 						}
 					} else if field == "review_experts" {
 						// 评审专家
-						if arr, ok :=tmp["review_experts"].([]interface{}); ok && len(arr) > 0 {
+						if arr, ok := tmp["review_experts"].([]interface{}); ok && len(arr) > 0 {
 							arr1 := qutil.ObjArrToStringArr(arr)
 							newTmp[field] = strings.Join(arr1, ",")
 						}
@@ -458,10 +458,10 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 						if tmp[field] != nil && tmp["bidendtime"] == nil {
 							newTmp["bidendtime"] = tmp[field]
 							newTmp[field] = tmp[field]
-						}else if tmp[field] == nil && tmp["bidendtime"] != nil {
+						} else if tmp[field] == nil && tmp["bidendtime"] != nil {
 							newTmp["bidendtime"] = tmp["bidendtime"]
 							newTmp[field] = tmp["bidendtime"]
-						}else {
+						} else {
 							if tmp["bidopentime"] != nil {
 								newTmp[field] = tmp["bidopentime"]
 							}
@@ -478,7 +478,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 				}
 			}
 
-			YuceEndtime(newTmp)		// 预测结果时间
+			YuceEndtime(newTmp) // 预测结果时间
 			arrEs = append(arrEs, newTmp)
 		}
 		if len(update) > 0 {
@@ -712,16 +712,16 @@ func YuceEndtime(tmp map[string]interface{}) {
 func YcEndTime(starttime int64, num int, unit string) int64 {
 	yuceendtime := int64(0)
 	if unit == "日历天" || unit == "天" || unit == "日" {
-		yuceendtime = starttime + int64(num * 86400)
-	}else if unit == "周" {
-		yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num * 7).Unix()
-	}else if unit == "月" {
+		yuceendtime = starttime + int64(num*86400)
+	} else if unit == "周" {
+		yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num*7).Unix()
+	} else if unit == "月" {
 		yuceendtime = time.Unix(starttime, 0).AddDate(0, num, 0).Unix()
-	}else if unit == "年" {
+	} else if unit == "年" {
 		yuceendtime = time.Unix(starttime, 0).AddDate(num, 0, 0).Unix()
-	}else if unit == "工作日" {
-		n := num/7 * 2
-		yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num + n).Unix()
+	} else if unit == "工作日" {
+		n := num / 7 * 2
+		yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num+n).Unix()
 	}
 	return yuceendtime
 }
@@ -738,8 +738,7 @@ func FormatDateStr(ds string) int64 {
 	if err != nil {
 		qutil.Debug(err)
 		return 0
-	}else {
+	} else {
 		return location.Unix()
 	}
 }
-

+ 5 - 1
udpcreateindex/src/bidingpurchasing.go

@@ -9,7 +9,6 @@ import (
 	"unicode/utf8"
 
 	u "util"
-
 )
 
 //定时查询bidding中extract_state为2的数据生成索引
@@ -353,6 +352,11 @@ func getFileText(tmp map[string]interface{}) (filetext string) {
 							if utf8.RuneCountInString(filetext+bs) < fileLength {
 								filetext += bs + "\n"
 							} else {
+								if utf8.RuneCountInString(bs) > fileLength {
+									filetext = bs[0:fileLength]
+								} else {
+									filetext = bs
+								}
 								break
 							}
 						}

+ 3 - 3
udpcreateindex/src/config.json

@@ -98,14 +98,14 @@
     "db": "mixdata"
   },
   "standard": {
-    "addr": "192.168.3.207:27092",
+    "addr": "192.168.3.207:27001",
     "pool": 10,
     "db": "mixdata",
     "coll_area": "address_jy_2021",
     "winnerent": {
       "collect1": "winner_enterprise",
       "collect2": "winner_err",
-      "index": "winner_v3",
+      "index": "winner_v1",
       "type": "winner"
     },
     "buyerent": {
@@ -121,7 +121,7 @@
     }
   },
   "elastic": {
-    "addr": "http://127.0.0.1:9800",
+    "addr": "http://192.168.3.206:9800",
     "pool": 12,
     "node": "4q7v7e6mQ5aeCwjUgM6HcA"
   }

+ 45 - 43
udpcreateindex/src/main.go

@@ -12,13 +12,14 @@ import (
 	_ "net/http/pprof"
 	"qfw/util"
 	elastic "qfw/util/elastic"
+	"qfw/util/redis"
 	"strings"
 	"time"
 	u "util"
 )
 
 type Province struct {
-	P_Name    string
+	P_Name string
 }
 type City struct {
 	P_Name string
@@ -57,26 +58,26 @@ var (
 	other_index      string
 	other_itype      string
 
-	esAddr 			 string
-	esNode			 string
+	esAddr string
+	esNode string
 
-	ProvinceDict	map[string][]Province				//省份-map
-	CityDict		map[string][]City					//城市-map
-	DistrictDict	map[string][]District				//区县-map
+	ProvinceDict map[string][]Province //省份-map
+	CityDict     map[string][]City     //城市-map
+	DistrictDict map[string][]District //区县-map
 
 	winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
 )
 var UpdataMgoCache = make(chan []map[string]interface{}, 1000)
 var SP = make(chan bool, 5)
 
-var StopFlag = false		// 程序生索引停止标志
+var StopFlag = false // 程序生索引停止标志
 
 func init() {
 	util.ReadConfig(&Sysconfig)
 	// company_id
-	//redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
-	//inits()
-	//go checkMapJob()
+	redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
+	inits()
+	go checkMapJob()
 	detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
 	fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
 	updport, _ = Sysconfig["updport"].(string)
@@ -94,8 +95,8 @@ func init() {
 		MongodbAddr: mconf["addr"].(string),
 		Size:        util.IntAllDef(mconf["pool"], 5),
 		DbName:      mconf["db"].(string),
-		//UserName:	 Sysconfig["uname"].(string),
-		//Password:    Sysconfig["upwd"].(string),
+		UserName:    Sysconfig["uname"].(string),
+		Password:    Sysconfig["upwd"].(string),
 	}
 	mgo.InitPool()
 	project2db = &mongodb.MongodbSim{
@@ -133,8 +134,8 @@ func init() {
 		MongodbAddr: standard["addr"].(string),
 		Size:        util.IntAllDef(standard["pool"], 5),
 		DbName:      standard["db"].(string),
-		//UserName:    Sysconfig["uname"].(string),
-		//Password:    Sysconfig["upwd"].(string),
+		UserName:    Sysconfig["uname"].(string),
+		Password:    Sysconfig["upwd"].(string),
 	}
 	mgostandard.InitPool()
 
@@ -232,23 +233,23 @@ func main() {
 }
 
 /**
-	检查es查询队列  10s查询一次
- */
-func inspectQuery()  {
+检查es查询队列  10s查询一次
+*/
+func inspectQuery() {
 	ticker := time.NewTicker(time.Second * 10)
 	url := esAddr + "/_nodes/stats/thread_pool"
 	for range ticker.C {
 		resp, _ := http.Get(url)
-		if resp != nil && resp.Body != nil{
+		if resp != nil && resp.Body != nil {
 			defer resp.Body.Close()
 		}
-		body,_ := ioutil.ReadAll(resp.Body)
+		body, _ := ioutil.ReadAll(resp.Body)
 		respMap := make(map[string]interface{})
 		err := json.Unmarshal(body, &respMap)
 		if err == nil {
 			if data, o1 := respMap["nodes"].(map[string]interface{}); o1 {
 				if nodes, o2 := data[esNode].(map[string]interface{}); o2 {
-					if pool,  o3 := nodes["thread_pool"].(map[string]interface{}); o3 {
+					if pool, o3 := nodes["thread_pool"].(map[string]interface{}); o3 {
 						index, _ := pool["index"].(map[string]interface{})
 						search, _ := pool["search"].(map[string]interface{})
 						bulk, _ := pool["bulk"].(map[string]interface{})
@@ -257,7 +258,7 @@ func inspectQuery()  {
 							util.Debug("es thread_pool search queue---", search["queue"])
 							util.Debug("es thread_pool bulk queue---", bulk["queue"])
 							StopFlag = true
-						}else {
+						} else {
 							StopFlag = false
 						}
 					}
@@ -268,6 +269,7 @@ func inspectQuery()  {
 }
 
 var pool = make(chan bool, 20)
+
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
 	case mu.OP_TYPE_DATA: //上个节点的数据
@@ -406,21 +408,21 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 }
 
 //初始化城市
-func initCheckCity()  {
+func initCheckCity() {
 	//初始化-城市配置
-	ProvinceDict = make(map[string][]Province,0)
-	CityDict = make(map[string][]City,0)
-	DistrictDict = make(map[string][]District,0)
+	ProvinceDict = make(map[string][]Province, 0)
+	CityDict = make(map[string][]City, 0)
+	DistrictDict = make(map[string][]District, 0)
 
 	q := map[string]interface{}{
-		"town_code":map[string]interface{}{
-			"$exists":0,
+		"town_code": map[string]interface{}{
+			"$exists": 0,
 		},
 	}
 	sess := mgostandard.GetMgoConn()
 	defer mgostandard.DestoryMongoConn(sess)
 	it := sess.DB("mixdata").C(util.ObjToString(standard["coll_area"])).Find(&q).Iter()
-	total  := 0
+	total := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
 		if total%1000 == 0 {
 			log.Println("当前数量:", total)
@@ -431,39 +433,39 @@ func initCheckCity()  {
 			province := util.ObjToString(tmp["province"])
 			city := util.ObjToString(tmp["city"])
 			district := util.ObjToString(tmp["district"])
-			data := District{province,city,district}
-			if DistrictDict[district]==nil {
+			data := District{province, city, district}
+			if DistrictDict[district] == nil {
 				DistrictDict[district] = []District{data}
-			}else {
+			} else {
 				arr := DistrictDict[district]
-				arr = append(arr,data)
+				arr = append(arr, data)
 				DistrictDict[district] = arr
 			}
-		}else {
-			if city_code>0 {
+		} else {
+			if city_code > 0 {
 				province := util.ObjToString(tmp["province"])
 				city := util.ObjToString(tmp["city"])
-				data := City{province,city}
-				if CityDict[city]==nil {
+				data := City{province, city}
+				if CityDict[city] == nil {
 					CityDict[city] = []City{data}
-				}else {
+				} else {
 					arr := CityDict[city]
-					arr = append(arr,data)
+					arr = append(arr, data)
 					CityDict[city] = arr
 				}
-			}else {
+			} else {
 				province := util.ObjToString(tmp["province"])
 				data := Province{province}
-				if ProvinceDict[province]==nil {
+				if ProvinceDict[province] == nil {
 					ProvinceDict[province] = []Province{data}
-				}else {
+				} else {
 					arr := ProvinceDict[province]
-					arr = append(arr,data)
+					arr = append(arr, data)
 					ProvinceDict[province] = arr
 				}
 			}
 		}
 		tmp = make(map[string]interface{})
 	}
-	util.Debug(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d",len(ProvinceDict),len(CityDict),len(DistrictDict)))
+	util.Debug(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(ProvinceDict), len(CityDict), len(DistrictDict)))
 }

+ 1 - 1
udpcreateindex/src/task.go

@@ -19,7 +19,7 @@ func task_index() {
 
 	_ = c.AddFunc("0 0 0 * * ?", func() { task_winneres() })   //每天凌晨执行一次winner生索引
 	_ = c.AddFunc("0 0 1 * * ?", func() { task_buyeres() })    //每天1点执行一次buyer生索引
-	_ = c.AddFunc("0 0 2 * * ?", func() { task_biddingAll() }) //每天3点执行 前一天的所有招标数据
+	_ = c.AddFunc("0 0 2 * * ?", func() { task_biddingAll() }) //每天2点执行 前一天的所有招标数据
 	c.Start()
 }
 func task_winneres() {

+ 1 - 1
udpcreateindex/src/util/ossclient.go

@@ -11,7 +11,7 @@ import (
 )
 
 var (
-	ossEndpoint        = "oss-cn-beijing-internal.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com 测试:oss-cn-beijing.aliyuncs.com
+	ossEndpoint        = "https://oss-cn-beijing-internal.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com 测试:oss-cn-beijing.aliyuncs.com
 	ossAccessKeyId     = "LTAI4G5x9aoZx8dDamQ7vfZi"
 	ossAccessKeySecret = "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
 	ossBucketName      = "topjy"

+ 5 - 4
udpcreateindex/src/winnertask.go

@@ -22,8 +22,8 @@ func winnerEsTaskOnce() {
 	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
 	task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime))
 	task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime))
-	// task_sid = "5e6598f82c27dc56292158da"
-	// task_eid = "5f80c8f89a0c261af872294c"
+	//task_sid = "5e6598f82c27dc56292158da"
+	//task_eid = "620576342566c40049f26155"
 	log.Println("winner 区间id:", task_sid, task_eid)
 	//区间id
 	q := map[string]interface{}{
@@ -64,11 +64,12 @@ func winnerEsTaskOnce() {
 			savetmp["pici"] = tmp["updatetime"]
 			if province := qu.ObjToString(tmp["province"]); province != "" {
 				savetmp["province"] = province
-
 			}
 			if city := qu.ObjToString(tmp["city"]); city != "" {
 				savetmp["city"] = city
-
+			}
+			if text := qu.ObjToString(tmp["tag_business"]); text != "" {
+				savetmp["tag_business"] = text
 			}
 
 			winerEsLock.Lock()

+ 0 - 7
udps/main.go

@@ -22,15 +22,8 @@ func main() {
 	flag.IntVar(&p, "p", 6601, "端口")
 	flag.IntVar(&tmptime, "tmptime", 0, "时间查询")
 	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")
-<<<<<<< HEAD
 	flag.StringVar(&id1, "gtid", "", "gtid")
 	flag.StringVar(&id2, "lteid", "", "lteid")
-=======
-
-	flag.StringVar(&id1, "gtid", "114168ea1a75b8f44678a39b", "gtid")
-	flag.StringVar(&id2, "lteid", "9142e5741a75b8f4467b3276", "lteid")
-
->>>>>>> a1f5fb16d9e524033da8c8d4092d23129d253066
 	flag.StringVar(&ids, "ids", "", "id1,id2")
 	flag.StringVar(&stype, "stype", "biddingall", "stype,传递类型")
 	flag.StringVar(&bkey, "bkey", "", "bkey,加上此参数表示不生关键词和摘要")