Bläddra i källkod

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

fengweiqiang 5 år sedan
förälder
incheckning
f4ff60979d

+ 1 - 1
src/config.json

@@ -28,7 +28,7 @@
     "filelength": 100000,
     "iscltlog": false,
     "brandgoods": false,
-    "udptaskid": "5cdd3021698414032c8322b1",
+    "udptaskid": "5cdd3025698414032c8322b1",
     "udpport": "1484",
     "nextNode": [
         {

+ 34 - 32
src/jy/admin/audit/dataaudit.go

@@ -10,6 +10,7 @@ import (
 	qu "qfw/util"
 	"qfw/util/elastic"
 	redis "qfw/util/redis"
+	"strconv"
 	"strings"
 	"time"
 
@@ -70,16 +71,17 @@ func AuditOneField(c *gin.Context) {
 	start := c.GetInt("start")
 	limit := c.GetInt("length")
 	auditattr, _ := c.GetPostForm("auditattr")
+	check, _ := strconv.Atoi(auditattr)
 	query := map[string]interface{}{}
-	if auditattr != "-1" {
+	if check != -1 {
 		query = map[string]interface{}{
-			field + "_" + auditattr: 1,
+			"check": check,
 		}
 	} else {
 		query = map[string]interface{}{
 			"$or": []interface{}{
-				map[string]interface{}{field + "_ok": 1},
-				map[string]interface{}{field + "_err": 1},
+				map[string]interface{}{"check": 1},
+				map[string]interface{}{"check": 0},
 			},
 		}
 	}
@@ -107,24 +109,24 @@ func AllAudit(c *gin.Context) {
 	} else { //批量审核
 		SaveDb := ""
 		FieldBd := 0
-		ElasticClientIndex := ""
-		ElasticClientType := ""
+		// ElasticClientIndex := ""
+		// ElasticClientType := ""
 		RedisName := util.QYK_RedisName
 		if field == "winner" {
 			SaveDb = util.ElasticClientDB
 			FieldBd = util.WinnerDB
-			ElasticClientIndex = util.ElasticClientIndex
-			ElasticClientType = util.ElasticClientType
+			// ElasticClientIndex = util.ElasticClientIndex
+			// ElasticClientType = util.ElasticClientType
 		} else if field == "buyer" {
 			SaveDb = util.ElasticClientBuyerDB
 			FieldBd = util.BuyerDB
-			ElasticClientIndex = util.ElasticClientBuyerIndex
-			ElasticClientType = util.ElasticClientBuyerType
+			// ElasticClientIndex = util.ElasticClientBuyerIndex
+			// ElasticClientType = util.ElasticClientBuyerType
 		} else {
 			SaveDb = util.ElasticClientAgencyDB
 			FieldBd = util.AgencyDB
-			ElasticClientIndex = util.ElasticClientAgencyIndex
-			ElasticClientType = util.ElasticClientAgencyType
+			// ElasticClientIndex = util.ElasticClientAgencyIndex
+			// ElasticClientType = util.ElasticClientAgencyType
 		}
 		//redis
 		qykredis := redis.RedisPool[RedisName].Get()
@@ -156,11 +158,11 @@ func AllAudit(c *gin.Context) {
 						return
 					}
 				}
-				_, err := escon.Index().Index(ElasticClientIndex).Type(ElasticClientType).Id(sid).BodyJson(e).Refresh(true).Do()
-				if err != nil {
-					c.JSON(200, gin.H{"rep": false, "msg": "更新es错误"})
-					return
-				}
+				// _, err := escon.Index().Index(ElasticClientIndex).Type(ElasticClientType).Id(sid).BodyJson(e).Refresh(true).Do()
+				// if err != nil {
+				// 	c.JSON(200, gin.H{"rep": false, "msg": "更新es错误"})
+				// 	return
+				// }
 			}
 			//删除标记数据
 			query := map[string]interface{}{
@@ -177,8 +179,8 @@ func AllAudit(c *gin.Context) {
 func DataSave(c *gin.Context) {
 	SaveDb := ""
 	FieldBd := 0
-	ElasticClientIndex := ""
-	ElasticClientType := ""
+	// ElasticClientIndex := ""
+	// ElasticClientType := ""
 	RedisName := util.QYK_RedisName
 	//企业名称
 	e := make(map[string]interface{})
@@ -188,8 +190,8 @@ func DataSave(c *gin.Context) {
 	if field == "winner" {
 		SaveDb = util.ElasticClientDB
 		FieldBd = util.WinnerDB
-		ElasticClientIndex = util.ElasticClientIndex
-		ElasticClientType = util.ElasticClientType
+		// ElasticClientIndex = util.ElasticClientIndex
+		// ElasticClientType = util.ElasticClientType
 		capital, _ := c.GetPostForm("capital")
 		capitalfloat := clear.ObjToMoney([]interface{}{capital, ""})[0]
 		business_scope, _ := c.GetPostForm("business_scope")
@@ -200,8 +202,8 @@ func DataSave(c *gin.Context) {
 	} else if field == "buyer" {
 		SaveDb = util.ElasticClientBuyerDB
 		FieldBd = util.BuyerDB
-		ElasticClientIndex = util.ElasticClientBuyerIndex
-		ElasticClientType = util.ElasticClientBuyerType
+		// ElasticClientIndex = util.ElasticClientBuyerIndex
+		// ElasticClientType = util.ElasticClientBuyerType
 		buyerclass, _ := c.GetPostForm("buyerclass")
 		ranks, _ := c.GetPostForm("ranks")
 		buyer_type, _ := c.GetPostForm("type")
@@ -213,8 +215,8 @@ func DataSave(c *gin.Context) {
 	} else {
 		SaveDb = util.ElasticClientAgencyDB
 		FieldBd = util.AgencyDB
-		ElasticClientIndex = util.ElasticClientAgencyIndex
-		ElasticClientType = util.ElasticClientAgencyType
+		// ElasticClientIndex = util.ElasticClientAgencyIndex
+		// ElasticClientType = util.ElasticClientAgencyType
 		ranks, _ := c.GetPostForm("ranks")
 		agency_type, _ := c.GetPostForm("type")
 		e["ranks"] = ranks
@@ -279,13 +281,13 @@ func DataSave(c *gin.Context) {
 				return
 			}
 		}
-		escon := elastic.GetEsConn()
-		defer elastic.DestoryEsConn(escon)
-		_, err := escon.Index().Index(ElasticClientIndex).Type(ElasticClientType).Id(sid).BodyJson(e).Refresh(true).Do()
-		if err != nil {
-			c.JSON(200, gin.H{"rep": false, "msg": "更新es错误"})
-			return
-		}
+		// escon := elastic.GetEsConn()
+		// defer elastic.DestoryEsConn(escon)
+		// _, err := escon.Index().Index(ElasticClientIndex).Type(ElasticClientType).Id(sid).BodyJson(e).Refresh(true).Do()
+		// if err != nil {
+		// 	c.JSON(200, gin.H{"rep": false, "msg": "更新es错误"})
+		// 	return
+		// }
 	}
 	//删除标记数据
 	coll, _ := c.GetPostForm("coll")

+ 13 - 9
src/jy/extract/extract.go

@@ -27,12 +27,12 @@ import (
 var (
 	lock, lockrule, lockclear, locktag, blocktag sync.RWMutex
 
-	cut     = ju.NewCut()                          //获取正文并清理
-	ExtLogs map[*TaskInfo][]map[string]interface{} //抽取日志
-	TaskList      map[string]*ExtractTask          //任务列表
-	ClearTaskList map[string]*ClearTask            //清理任务列表
-	saveLimit     = 100                            //抽取日志批量保存
-	PageSize      = 5000                           //查询分页
+	cut           = ju.NewCut()                          //获取正文并清理
+	ExtLogs       map[*TaskInfo][]map[string]interface{} //抽取日志
+	TaskList      map[string]*ExtractTask                //任务列表
+	ClearTaskList map[string]*ClearTask                  //清理任务列表
+	saveLimit     = 100                                  //抽取日志批量保存
+	PageSize      = 5000                                 //查询分页
 	Fields        = `{"title":1,"summary":1,"detail":1,"contenthtml":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"bidstatus":1,"area":1,"city":1,"comeintime":1,"publishtime":1,"sensitive":1,"projectinfo":1,"jsondata":1,"href":1,"infoformat":1}`
 	Fields2       = `{"budget":1,"bidamount":1,"title":1,"projectname":1,"winner":1}`
 )
@@ -583,7 +583,11 @@ func (e *ExtractTask) ExtractDetail(j *ju.Job, isSite bool, codeSite string) {
 		//函数清理
 		for key, val := range j.Result {
 			for i, v := range val {
-				//qu.Debug(key, v.Value)
+				// if v.ExtFrom == "title"&& v.Field == "buyer"{
+				// 	qu.Debug("title---",v.Value)
+				// }else if v.Field == "buyer"{
+				// 	qu.Debug("text---",v.Value)
+				// }
 				lockclear.Lock()
 				var cfn = []string{}
 				if isSite {
@@ -1949,7 +1953,7 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 					tmp["epackage"] = string(bs)
 				}
 			}
-			//tmp["result"] = result
+			tmp["result"] = result
 			tmp["resultf"] = resultf
 			b := db.Mgo.Update(e.TaskInfo.TestColl, `{"_id":"`+_id+`"}`, map[string]interface{}{"$set": tmp}, true, false)
 			if !b {
@@ -2139,7 +2143,7 @@ func (e *ExtractTask) QualityAudit(resulttmp map[string]interface{}) {
 func (e *ExtractTask) RedisMatch(field, fv string, val map[string]interface{}) {
 	defer qu.Catch()
 	i := redis.GetInt(field, field+"_"+fv) //查找redis
-	if i == 0 { //reids未找到,执行规则匹配
+	if i == 0 {                            //reids未找到,执行规则匹配
 		val[field+"_isredis"] = false
 		e.RuleMatch(field, fv, val) //规则匹配
 	} else { //redis找到,打标识存库

+ 10 - 8
src/jy/extract/score.go

@@ -30,6 +30,7 @@ func init() {
 	qu.ReadConfig("./res/tagscore.json", &TagConfig)
 	qu.ReadConfig("./res/fieldscore.json", &SoreConfig)
 	if SoreConfig == nil { //配置出错,强退
+		fmt.Println("fieldscore.json配置文件出错,强制退出!")
 		os.Exit(0)
 	}
 	if repeat, ok := SoreConfig["other"]["repeat"].(map[string]interface{}); ok {
@@ -105,6 +106,7 @@ func init() {
 }
 
 var CNreg = regexp.MustCompile("[\u4e00-\u9fa5]")
+
 //结果打分
 func ScoreFields(j *ju.Job, ftag map[string][]*Tag) map[string][]*ju.ExtField {
 	qu.Catch()
@@ -115,9 +117,9 @@ func ScoreFields(j *ju.Job, ftag map[string][]*Tag) map[string][]*ju.ExtField {
 		}
 		if field == "budget" || field == "bidamount" {
 			for tmpsindex, tmpsvalue := range tmps {
-				if strings.Contains(tmpsvalue.RuleText,"总") && tmpsvalue.Type =="colon"{
+				if strings.Contains(tmpsvalue.RuleText, "总") && tmpsvalue.Type == "colon" {
 					tmps[tmpsindex].Score += 1
-					tmps[tmpsindex].ScoreItem = append(tmps[tmpsindex].ScoreItem, &ju.ScoreItem{Des: field+`value结果含总字+1`, Code: field, Value: tmpsvalue.Value, Score:1})
+					tmps[tmpsindex].ScoreItem = append(tmps[tmpsindex].ScoreItem, &ju.ScoreItem{Des: field + `value结果含总字+1`, Code: field, Value: tmpsvalue.Value, Score: 1})
 				}
 			}
 		}
@@ -126,15 +128,15 @@ func ScoreFields(j *ju.Job, ftag map[string][]*Tag) map[string][]*ju.ExtField {
 		locktag.Unlock()
 		for tmpsindex, tmpsvalue := range tmps {
 
-			if tmpsvalue.Score>0 {
+			if tmpsvalue.Score > 0 {
 				//有初始分先添加进去
 				tmps[tmpsindex].ScoreItem = append(tmps[tmpsindex].ScoreItem, &ju.ScoreItem{
-					Des: "正则初始分",
-					Code: tmpsvalue.Code,
-					RuleText: tmpsvalue.RuleText,
+					Des:       "正则初始分",
+					Code:      tmpsvalue.Code,
+					RuleText:  tmpsvalue.RuleText,
 					ScoreFrom: "正则初始分",
-					Value: tmpsvalue.Value,
-					Score: tmpsvalue.Score,
+					Value:     tmpsvalue.Value,
+					Score:     tmpsvalue.Score,
 				})
 			}
 

+ 22 - 0
src/main_test.go

@@ -7,6 +7,7 @@ import (
 	. "jy/mongodbutil"
 	"log"
 	"os"
+	"qfw/util"
 	"regexp"
 	"strconv"
 	"strings"
@@ -111,3 +112,24 @@ func Test_clear(t *testing.T) {
 	}
 	log.Println("result---", value)
 }
+
+func Test_buyer(t *testing.T) {
+	Mgo = MgoFactory(1, 3, 120, "192.168.3.207:27092", "extract_kf")
+	demo, _ := Mgo.Find("demo_data", nil, `{"_id:1"}`, `{"buyer":1,"title":1}`, false, -1, -1)
+	result, _ := Mgo.Find("mxs_buyer", nil, `{"_id:1"}`, `{"buyer":1}`, false, -1, -1)
+	for _, d := range *demo {
+		id1 := util.BsonIdToSId(d["_id"])
+		buyer1 := util.ObjToString(d["buyer"])
+		title := util.ObjToString(d["title"])
+		for _, r := range *result {
+			id2 := util.BsonIdToSId(r["_id"])
+			buyer2 := util.ObjToString(r["buyer"])
+			if id1 == id2 {
+				if buyer1 != buyer2 {
+					util.Debug(id1, buyer1, buyer2)
+				}
+				break
+			}
+		}
+	}
+}

+ 1 - 2
src/res/ext_v3_dump.sh

@@ -2,8 +2,7 @@
 dbhost="127.0.0.1:27082"
 dbname="extract_v3"
 datapath="/opt/soft/mongodb/mongodb3.4/bin"
-tables=(audit areacode citys classify cleanup memu menusecond fields infoclass infotype province postcode rc_calss rc_field rc.order rc_rule rule_back rule_code rule_logic rule_logicback rule_logicore rule_logicpre rule_pre tag tagdetailinfo version versioninfo block_info block_classify block_classify_info block_classify_tag)
-
+tables=(address areacode audit block_classify block_classify_info block_classify_tag block_info citys classify cleanup fields infoclass infotype memu menusecond pkg_info pkg_logicore postcode province postcode rc_calss rc_field rc_rule rule_back rule_code rule_logic rule_logicback rule_logicore rule_logicpre rule_pre site site_management site_rule_code site_rule_logic site_rule_logicback site_rule_logickv site_rule_logicore site_versioninfo tag tagdetailinfo user version versioninfo)
 
 for i in "${!tables[@]}"; 
 do

+ 13 - 5
src/res/fieldscore.json

@@ -18,6 +18,14 @@
                 "regexp": 1,
                 "kvweight": 1
             },
+            "buyer":{
+            	"title": 0,
+				"table": 5,
+                "colon": 5,
+                "space": 5,
+                "regexp": 3,
+                "kvweight": 3
+            },
             "winner": {
                 "table": 3,
                 "colon": 3,
@@ -147,15 +155,15 @@
         "positivewords": [
             {
                 "describe": "以*结尾",
-                "regstr": ".{2,100}(委员会|中心|分校|办公室|学校|幼儿园|动物园|管理站|图书馆|殡仪馆|博物馆|基地|青年宫|少年宫|艺术宫|电视台|协会|政府|初中|集团|银行|[大中小]学|院|厂|店|段|场|社|室|部|厅|局|处|所|队|公司|监狱|监测站|血站|检查站)$",
-                "score": 3
+                "regstr": ".{2,100}(委员会|管委会|医院|卫计委|机关|社区|中心|中心校|分校|办公室|学校|幼儿园|动物园|管理站|馆|基地|青年宫|少年宫|艺术宫|电视台|协会|政府|[高]中|集团|银行|[大中小]学|院|厂|店|段|场|社|室|部|厅|局|处|所|队|公司|监狱|监测站|血站|检查站|工作站)$",
+                "score": 10
             }
         ],
         "negativewords": [
             {
                 "describe": "包含负分",
                 "regstr": "(标人|附件|委托|认证|代理|咨询|顾问|管理有限公司|管理顾问|招标失败|交易中心|不足|公告|变更|招标|废标|废止|流标|中标|评标|开标|供应商|金额|万元|元整|预算|报价|单价|第(\\d|一|二|三|四|五)(名|包)|排名|候选|确定|标段|(标|一|二|三|四|五)包|中选|成交|包号|(A|B|C|D|E|F|G)包|地址|详情|要求|推荐|名称|评审|得分|合同|平方米|公示期|结果|备注|说明|单位|代表|委托|工作日|营业(执|期)|通过|代码|电话|联系|条件|合理|费率|以上|以下|拟定|为|注:|\\d[\\s]{0,10}(\\.|元|包|米|平米|平方米|吨|辆|千克|克|毫克|毫升|公升|套|件|瓶|箱|只|台|年|月|日|天|号)|(:|:|;|;|?|¥|\\*|%)|^[a-zA-Z0-9-]{5,100}|^[a-zA-Z0-9-]{1,100}$|[a-zA-Z0-9-]{10,100})",
-                "score": -5
+                "score": -20
             },
             {
                 "describe": "包含负分不再展示",
@@ -174,7 +182,7 @@
                 "range": [
                     0,
                     3,
-                    -5
+                    -20
                 ]
             },
             {
@@ -190,7 +198,7 @@
                 "range": [
                     25,
                     -1,
-                    -1
+                    -5
                 ]
             }
         ]

+ 2 - 2
src/web/templates/admin/audit_auditone.html

@@ -154,8 +154,8 @@ $(function () {
 	});
 	ttable.on('init.dt', function () {
 		var opt="<option value='-1'>全部</option>"+
-				"<option value='ok'>正确</option>"+
-				"<option value='err'>异常</option>";
+				"<option value='1'>正确</option>"+
+				"<option value='0'>异常</option>";
 		var select="<div class='form-group'><label for='name'>数据类型:</label>"+
 			"<select id='auditattr' onchange='checkclick(this.value)' class='form-control input-sm'>"+
 			opt+

+ 20 - 16
standardata/src/config.json

@@ -1,26 +1,30 @@
 {
-  "port": "1234",
-  "mgofrom": "127.0.0.1:27017",
+  "mgofrom": "172.17.4.187:27083",
   "mgofromsize":5,
-  "mgofromdb":"demo",
-  "mgoto": "127.0.0.1:27017",
+  "mgofromdb":"qfw",
+  "mgoto": "172.17.145.163:27082",
   "mgotosize":5,
-  "mgotodb":"demo",
-  "mgoent": "127.0.0.1:27017",
+  "mgotodb":"extract_v3",
+  "mgoent": "172.17.145.163:27082",
   "mgoentsize":5,
-  "mgoentdb":"demo",
-  "extractcoll":"result_20200117",
+  "mgoentdb":"enterprise",
+  "extractcoll":"result_20200116",
   "standardata":{
 	"winner":{
-		"standarent":"winner_zjkent",
-		"standarerr":"winner_zjkerr",
+		"standarent":"winner_enterprisenew",
+		"standarerr":"winner_errnew",
 		"redisdb":1
 	},
-	"buyer":{
-		"standarent":"buyer_zjkent",
-		"standarerr":"buyer_zjkerr",
-		"redisdb":2
-	}
+    "buyer":{
+      "standarent":"buyer_agency_enterprise",
+      "standarerr":"buyer_err",
+      "redisdb":2
+    },
+    "agency":{
+      "standarent":"agency_enterprise",
+      "standarerr":"agency_err",
+      "redisdb":3
+    }
   },
-  "redis": "winner=127.0.0.1:6379,buyer=127.0.0.1:6379"
+  "redis": "winner=172.17.148.44:2679,buyer=172.17.148.44:2679,agency=172.17.148.44:2679"
 }

+ 67 - 0
standardata/src/historyrepair.go

@@ -0,0 +1,67 @@
+// historyrepair 处理多线程重复数据问题
+package main
+
+import (
+	"dbutil/mongo"
+	"dbutil/redis"
+	"log"
+	qu "qfw/util"
+
+	"go.mongodb.org/mongo-driver/bson"
+)
+
+func historyrepair(db, coll, datatype string, dbnum int) {
+	sess := MongoTo.GetMgoConn()
+	defer MongoTo.Close()
+	field := ""
+	if datatype == "winner" {
+		field = "company_name"
+	} else if datatype == "buyer" {
+		field = "buyer_name"
+	} else if datatype == "agency" {
+		field = "agency_name"
+	}
+	it := sess.DB(db).C(coll).Find(bson.M{}).Select(bson.M{field: 1}).Iter()
+	index := 0
+	delnum := 0
+	for tmp := map[string]interface{}{}; it.Next(&tmp); index++ {
+		name := qu.ObjToString(tmp[field])
+		id := mongo.BsonTOStringId(tmp["_id"])
+		str, _ := redis.GetRedisStr(datatype, dbnum, name)
+		if str != "" {
+			MongoTo.DeleteById(coll, id)
+			delnum++
+		} else {
+			redis.PutRedis(datatype, dbnum, name, id, -1)
+		}
+		tmp = map[string]interface{}{}
+		if index%100 == 0 {
+			log.Println(index, delnum)
+		}
+	}
+	log.Println(index, delnum)
+}
+
+func historyrepairErr(db, coll, datatype string, dbnum int) {
+	sess := MongoTo.GetMgoConn()
+	defer MongoTo.Close()
+	it := sess.DB(db).C(coll).Find(bson.M{}).Select(bson.M{"name": 1}).Iter()
+	index := 0
+	delnum := 0
+	for tmp := map[string]interface{}{}; it.Next(&tmp); index++ {
+		name := qu.ObjToString(tmp["name"])
+		id := mongo.BsonTOStringId(tmp["_id"])
+		str, _ := redis.GetRedisStr(datatype, dbnum, name)
+		if str != "" {
+			MongoTo.DeleteById(coll, id)
+			delnum++
+		} else {
+			redis.PutRedis(datatype, dbnum, name, id, -1)
+		}
+		tmp = map[string]interface{}{}
+		if index%100 == 0 {
+			log.Println(index, delnum)
+		}
+	}
+	log.Println(index, delnum)
+}

+ 19 - 13
standardata/src/main.go

@@ -15,16 +15,18 @@ import (
 
 var (
 	MongoFrom /*抽取原*/, MongoTo /*保存库*/, MongoEnt/*企业库*/ *mongo.MongodbSim
-	sysconfig            map[string]interface{}
-	extractcoll          string
-	winnerent, winnererr string
-	buyerent, buyererr   string
-	winnerbd, buyerbd    int
-	Addrs                = make(map[string]interface{}, 0) //省市县
-	winchanbool          = make(chan bool, 3)
-	buyerchanbool        = make(chan bool, 3)
-	gochan               = make(chan bool, 3)
-	udpclient            mu.UdpClient
+	sysconfig                   map[string]interface{}
+	extractcoll                 string
+	winnerent, winnererr        string
+	buyerent, buyererr          string
+	agencyent, agencyerr        string
+	winnerbd, buyerbd, agencybd int
+	Addrs                       = make(map[string]interface{}, 0) //省市县
+	winchanbool                 = make(chan bool, 3)
+	buyerchanbool               = make(chan bool, 3)
+	agencychanbool              = make(chan bool, 3)
+	gochan                      = make(chan bool, 3)
+	udpclient                   mu.UdpClient
 	//异常表正则匹配处理
 	WinnerRegOk, WinnerRegErr, AgencyRegOk, AgencyRegErr, BuyerRegOk, BuyerRegErr []regexp.Regexp
 )
@@ -41,6 +43,10 @@ func init() {
 		buyerent = qu.ObjToString(buyer["standarent"])
 		buyererr = qu.ObjToString(buyer["standarerr"])
 		buyerbd = qu.IntAll(buyer["redisdb"])
+		agency, _ := standardata["agency"].(map[string]interface{})
+		agencyent = qu.ObjToString(agency["standarent"])
+		agencyerr = qu.ObjToString(agency["standarerr"])
+		agencybd = qu.IntAll(agency["redisdb"])
 	} else {
 		os.Exit(0)
 	}
@@ -123,9 +129,9 @@ func initReg() {
 func main() {
 	//go historywinner(qu.ObjToString(sysconfig["mgofromdb"]), extractcoll)
 	//go historybuyer(qu.ObjToString(sysconfig["mgofromdb"]), extractcoll)
-	//go winStandarHistory(qu.ObjToString(sysconfig["mgotodb"]))
-	//go buyerStandarHistory(qu.ObjToString(sysconfig["mgotodb"]))
-	//go task_standarData()
+	//go historyagency(qu.ObjToString(sysconfig["mgofromdb"]), extractcoll)
+
+	go task_standarData()
 	c := make(chan int, 1)
 	<-c
 }

+ 238 - 0
standardata/src/standaragency.go

@@ -0,0 +1,238 @@
+// standaragency
+package main
+
+import (
+	"dbutil/mongo"
+	"dbutil/redis"
+	"encoding/json"
+	"log"
+	qu "qfw/util"
+	"time"
+	"unicode/utf8"
+
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+)
+
+//增量处理
+func agencyStandarData(db string, query map[string]interface{}) {
+	defer qu.Catch()
+	sess := MongoFrom.GetMgoConn()
+	defer MongoFrom.Close()
+	it := sess.DB(db).C(extractcoll).Find(query).Select(bson.M{"repeat": 1,"agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1,
+		"agencyaddr": 1}).Sort("_id").Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if qu.IntAll(tmp["repeat"]) > 0  { //重复数据跳过
+			continue
+		}
+		agency := qu.ObjToString(tmp["agency"])
+		if utf8.RuneCountInString(agency) < 5 {
+			continue
+		}
+		infoid := mongo.BsonTOStringId(tmp["_id"])
+		topscopeclass, _ := tmp["topscopeclass"].(primitive.A)
+		entid, _ := redis.GetRedisStr("agency", agencybd, agency)
+		ps := []map[string]interface{}{}
+		agencyperson := qu.ObjToString(tmp["agencyperson"])
+		agencytel := qu.ObjToString(tmp["agencytel"])
+		if entid == "" {//redis 未存
+			savetoerr := true
+			if agencytel != "" {
+				v := map[string]interface{}{
+					"contact_person": agencyperson,
+					"phone":          agencytel,
+					"topscopeclass":  comRepTopscopeclass(topscopeclass),
+					"infoid":         infoid,
+				}
+				ps = append(ps, v)
+				data := comHisMegerNewData(agency, "agency", ps)
+				if data != nil {
+					_id := MongoTo.Save(agencyent, data)
+					redis.PutRedis("agency", agencybd, agency, _id.(primitive.ObjectID).Hex(), -1)
+					savetoerr = false
+				}
+			}
+			if savetoerr {
+				t := MongoTo.FindOne(agencyerr, map[string]interface{}{"name": agency})
+				if len(t) < 1 {
+					MongoTo.Save(agencyerr, map[string]interface{}{
+						"name":       agency,
+						"check":      comMarkdata(agency, "agency"),
+						"updatetime": time.Now().Unix(),
+					})
+				}
+			}
+		} else {
+			if agencytel != "" {
+				is_exist:=false //电话是否存在
+				for _,v := range ps{
+					if v["phone"]==agencytel{
+						is_exist = true
+						if agencyperson!=""&&v["contact_person"]!=agencyperson {
+							v["contact_person"]=agencyperson
+							v["infoid"] = infoid
+							bs, _ := json.Marshal(ps)//替换数据,更新
+							redis.PutRedis("agency", agencybd, agency, bs, -1)
+						}
+						continue
+					}
+				}
+				if !is_exist {
+					v := map[string]interface{}{
+						"contact_person": agencyperson,
+						"phone":          agencytel,
+						"topscopeclass":  comRepTopscopeclass(topscopeclass),
+						"infoid":         infoid,
+					}
+					MongoTo.UpdateById(agencyent, entid,
+						map[string]interface{}{
+							"$set":  v,
+							"$push": map[string]interface{}{"contact": v},
+						},
+					)
+				}
+			}
+		}
+		tmp = map[string]interface{}{}
+		if index%100 == 0 {
+			log.Println("agency index", index)
+		}
+	}
+	log.Println("agency ok index", index)
+}
+
+//历史数据处理
+func historyagency(db, fromcoll string) {
+	defer qu.Catch()
+	log.Println("history  start")
+	sess := MongoFrom.GetMgoConn()
+	defer MongoFrom.Close()
+	it := sess.DB(db).C(fromcoll).Find(map[string]interface{}{}).Select(bson.M{"repeat": 1,"agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1,
+		"agencyaddr": 1}).Sort("_id").Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过
+			continue
+		}
+		_id := mongo.BsonTOStringId(tmp["_id"])
+		agencychanbool <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-agencychanbool
+			}()
+			agency := qu.ObjToString(tmp["agency"])
+			topscopeclass, _ := tmp["topscopeclass"].(primitive.A)
+			if agency != "" && utf8.RuneCountInString(agency) > 4 {
+				agencyperson := qu.ObjToString(tmp["agencyperson"])
+				agencytel := qu.ObjToString(tmp["agencytel"])
+				b, _ := redis.ExistRedis("agency", agencybd, agency)
+				if b {//redis 存在
+					if  agencytel != "" {
+						strs, _ := redis.GetRedisStr("agency", agencybd, agency)
+						ps := []map[string]interface{}{}
+						err := json.Unmarshal([]byte(strs), &ps)
+						if err == nil {
+							is_exist:=false //电话是否存在
+							for _,v := range ps{
+								if v["phone"]==agencytel{
+									is_exist = true
+									if agencyperson!=""&&v["contact_person"]!=agencyperson {
+										v["contact_person"]=agencyperson
+										v["infoid"] = _id
+										bs, _ := json.Marshal(ps)//替换数据,更新
+										redis.PutRedis("agency", agencybd, agency, bs, -1)
+									}
+									continue
+								}
+							}
+
+							if !is_exist {
+								v := map[string]interface{}{
+									"contact_person": agencyperson,
+									"phone":          agencytel,
+									"topscopeclass":  comRepTopscopeclass(topscopeclass),
+									"infoid":         _id,
+								}
+								ps = append(ps, v)
+								bs, _ := json.Marshal(ps)
+								redis.PutRedis("agency", agencybd, agency, bs, -1)
+							}
+						} else {
+							log.Println("jsonErr", err)
+						}
+					}
+				} else {
+					val := []map[string]interface{}{}
+					if agencytel != "" {
+						tmp := map[string]interface{}{
+							"contact_person": agencyperson,
+							"phone":          agencytel,
+							"topscopeclass":  comRepTopscopeclass(topscopeclass),
+							"infoid":         _id,
+						}
+						val = append(val, tmp)
+					}
+					bs, _ := json.Marshal(val)
+					redis.PutRedis("agency", agencybd, agency, bs, -1)
+					MongoTo.Save(agencyerr, map[string]interface{}{
+						"name":       agency,
+						"updatetime": time.Now().Unix(),
+					})
+				}
+			}
+		}(tmp)
+		tmp = map[string]interface{}{}
+		if index%10000 == 0 {
+			log.Println("index", index, _id)
+		}
+	}
+	log.Println("history  ok  index", index)
+	agencyStandarHistory(qu.ObjToString(sysconfig["mgotodb"]))
+}
+
+//查询agencyerr标准化历史数据
+func agencyStandarHistory(db string) {
+	defer qu.Catch()
+	log.Println("开始标准化数据--agency", db)
+	sessto := MongoTo.GetMgoConn()
+	defer MongoTo.Close()
+	it := sessto.DB(db).C(agencyerr).Find(map[string]interface{}{}).Iter()
+	index := 0
+	entnum := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		err_id := mongo.BsonTOStringId(tmp["_id"])
+		name := qu.ObjToString(tmp["name"])
+		agencychanbool <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-agencychanbool
+			}()
+			strs, err := redis.GetRedisStr("agency", agencybd, name)
+			if err != nil {
+				return
+			}
+			ps := []map[string]interface{}{}
+			err = json.Unmarshal([]byte(strs), &ps)
+			if err == nil {
+				data := comHisMegerNewData(name, "agency", ps)
+				if data != nil {
+					MongoTo.Save(agencyent, data)
+					MongoTo.DeleteById(agencyerr, err_id)
+					entnum++
+				} else { //未查询到企业,打标记并存表
+					num := comMarkdata(name, "agency")
+					tmp["check"] = num
+					MongoTo.UpdateById(agencyerr, err_id, map[string]interface{}{"$set": map[string]interface{}{"check": num}})
+				}
+			} else {
+				log.Println("jsonErr", name, err)
+			}
+		}(tmp)
+		if index%1000 == 0 {
+			log.Println("标准化历史数据--agency", index, err_id, entnum)
+		}
+		tmp = map[string]interface{}{}
+	}
+	log.Println("标准化数据完成--agency", index, entnum)
+}

+ 26 - 53
standardata/src/standarwinner.go

@@ -305,6 +305,7 @@ func comHisMegerNewData(name, datatype string, ps []map[string]interface{}) map[
 		"comeintime":      time.Now().Unix(),
 		"updatetime":      time.Now().Unix(),
 	}
+	//统一信用代码
 	credit_no := strings.TrimSpace(qu.ObjToString(tmp["credit_no"]))
 	if credit_no != "" {
 		data["credit_no"] = credit_no
@@ -321,6 +322,30 @@ func comHisMegerNewData(name, datatype string, ps []map[string]interface{}) map[
 			}
 		}
 	}
+
+	//网址
+	annual_reports := tmp["annual_reports"]
+	if annual_reports != nil {
+		report_websitesArr := []string{}
+		if anreports, ok := annual_reports.([]interface{}); ok {
+			for _, report_websites := range anreports {
+				if websites, ok := report_websites.([]interface{}); ok {
+					for _, website := range websites {
+						if rv, ok := website.(map[string]interface{}); ok {
+							web := qu.ObjToString(rv["website_url"])
+							if web != "" {
+								report_websitesArr = append(report_websitesArr, web)
+							}
+						}
+					}
+				}
+			}
+		}
+		if len(report_websitesArr) > 0 {
+			data["website"] = strings.Join(report_websitesArr, ";")
+		}
+	}
+
 	if datatype == "winner" {
 		data["company_name"] = name
 		data["partners"] = tmp["partners"]
@@ -332,27 +357,7 @@ func comHisMegerNewData(name, datatype string, ps []map[string]interface{}) map[
 		if capital != nil {
 			data["capital"] = ObjToMoney([]interface{}{capital, ""})[0]
 		}
-		annual_reports := tmp["annual_reports"]
-		if annual_reports != nil {
-			report_websitesArr := []string{}
-			if anreports, ok := annual_reports.([]interface{}); ok {
-				for _, report_websites := range anreports {
-					if websites, ok := report_websites.([]interface{}); ok {
-						for _, website := range websites {
-							if rv, ok := website.(map[string]interface{}); ok {
-								web := qu.ObjToString(rv["website_url"])
-								if web != "" {
-									report_websitesArr = append(report_websitesArr, web)
-								}
-							}
-						}
-					}
-				}
-			}
-			if len(report_websitesArr) > 0 {
-				data["website"] = strings.Join(report_websitesArr, ";")
-			}
-		}
+
 		industry := make([]string, 0)
 		tmpindustry := map[string]bool{}
 		for _, p := range ps {
@@ -461,35 +466,3 @@ func comRepTopscopeclass(tops []interface{}) []interface{} {
 	}
 	return data
 }
-
-//
-func comUpdateErr(coll, name string, tclass []interface{}) {
-	if len(tclass) < 1 {
-		return
-	}
-	tmp := MongoTo.FindOne(coll, map[string]interface{}{"name": name})
-	topscopeclass := tmp["topscopeclass"].(primitive.A)
-	tmpclass := map[string]bool{}
-	for _, tc := range topscopeclass {
-		tmpclass[qu.ObjToString(tc)] = true
-	}
-	oldlen := len(tmpclass)
-	for _, tc := range tclass {
-		tmpclass[qu.ObjToString(tc)] = true
-	}
-	newlen := len(tmpclass)
-	if oldlen == newlen {
-		return
-	}
-	newclass := []interface{}{}
-	for _, v := range tmpclass {
-		newclass = append(newclass, v)
-	}
-	MongoTo.Update(coll, map[string]interface{}{"name": name}, map[string]interface{}{
-		"$set": map[string]interface{}{
-			"name":          name,
-			"topscopeclass": newclass,
-			"updatetime":    time.Now().Unix(),
-		},
-	})
-}

+ 6 - 4
standardata/src/task.go

@@ -12,19 +12,21 @@ import (
 func task_standarData() {
 	mgofromdb := qu.ObjToString(sysconfig["mgofromdb"])
 	c := cron.New()
-	_ = c.AddFunc("0/5 * * * * *", func() {
+	_ = c.AddFunc("0 30 4 * * *", func() {
 		t := time.Now()
 		pici := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.Local).Unix()
 		query := map[string]interface{}{
 			"comeintime": map[string]interface{}{
-				"$gt":  -pici - 86400,
+				"$gt":  pici - 86400,
 				"$lte": pici,
 			},
 		}
 		log.Println(mgofromdb, query)
-		//go winnerStandarData(mgofromdb, query)
+		go winnerStandarData(mgofromdb, query)
+		time.Sleep(1 * time.Minute)
 		go buyerStandarData(mgofromdb, query)
-		//go agencyStandarData(mgofromdb, query)
+		time.Sleep(1 * time.Minute)
+		go agencyStandarData(mgofromdb, query)
 	})
 	c.Start()
 }