apple 5 年 前
コミット
84c84560dc

+ 2 - 1
src/jy/admin/audit/agencyinfo.go

@@ -75,8 +75,8 @@ func init() {
 			}
 			e["contact"] = contacts
 		}
-		e["comeintime"] =time.Now().Unix()
 
+		e["updatatime"]=time.Now().Unix()
 		var sid string
 		if bson.IsObjectIdHex(_id) {
 			//更新
@@ -105,6 +105,7 @@ func init() {
 			}
 			c.JSON(200, gin.H{"rep": 200, "updateid": sid})
 		} else {
+			e["comeintime"] =time.Now().Unix()
 			//不存在直接保存新数据
 			sid = Mgo.Save(util.ElasticClientAgencyDB, e)
 			if sid == ""{

+ 2 - 2
src/jy/admin/audit/buyerinfo.go

@@ -77,8 +77,7 @@ func init() {
 			}
 			e["contact"] = contacts
 		}
-		e["comeintime"] =time.Now().Unix()
-
+		e["updatatime"]=time.Now().Unix()
 		var sid string
 		if bson.IsObjectIdHex(_id) {
 			//更新
@@ -107,6 +106,7 @@ func init() {
 			}
 			c.JSON(200, gin.H{"rep": 200, "updateid": sid})
 		} else {
+			e["comeintime"] =time.Now().Unix()
 			//不存在直接保存新数据
 			sid = Mgo.Save(util.ElasticClientBuyerDB, e)
 			if sid == ""{

+ 2 - 1
src/jy/admin/audit/qiyeku.go

@@ -75,7 +75,7 @@ func init() {
 			}
 			e["contact"] = contacts
 		}
-		e["comeintime"] =time.Now().Unix()
+		e["updatatime"]=time.Now().Unix()
 		var sid string
 		if bson.IsObjectIdHex(_id) {
 			//更新
@@ -104,6 +104,7 @@ func init() {
 			}
 			c.JSON(200, gin.H{"rep": 200, "updateid": sid})
 		} else {
+			e["comeintime"] =time.Now().Unix()
 			//不存在直接保存新数据
 			sid = Mgo.Save(util.ElasticClientDB, e)
 			if sid == "" {

+ 9 - 3
src/jy/admin/audit/rulemanager.go

@@ -173,17 +173,23 @@ func GetOrder(sel string) int {
 
 func GetRule(c *gin.Context) {
 	fid, _ := c.GetPostForm("fid")
+	rule_type, _ := c.GetPostForm("rule_type")
+	start := c.GetInt("start")
+	limit := c.GetInt("length")
 	query := map[string]interface{}{
 		"s_fid":  fid,
 		"delete": false,
 	}
-	data, _ := Mgo.Find("rc_rule", query, `{"_id":1}`, nil, false, -1, -1)
-	//count := Mgo.Count("rc_rule", query)
+	if rule_type != "-1" {
+		query["s_type"] = rule_type
+	}
+	data, _ := Mgo.Find("rc_rule", query, `{"_id":1}`, nil, false, start, limit)
+	count := Mgo.Count("rc_rule", query)
 	for _, d := range *data {
 		timeStr := time.Unix(d["l_createtime"].(int64), 0).Format(Date_Short_Layout)
 		d["l_createtime"] = timeStr
 	}
-	c.JSON(200, gin.H{"data": data})
+	c.JSON(200, gin.H{"data": data, "recordsFiltered": count, "recordsTotal": count})
 }
 
 func SaveRule(c *gin.Context) {

+ 34 - 8
src/web/templates/admin/audit_rulelist.html

@@ -24,11 +24,12 @@
 		              <thead>
 		              <tr>
 		                <th>名称</th>
-						<th>时间</th>
-						<th>创建人</th>
-						<th>描述</th>
-						<th>是否启用</th>
-						<th>操作</th>
+        						<th>时间</th>
+        						<th>创建人</th>
+        						<th>描述</th>
+                    <th>类型</th>
+        						<th>是否启用</th>
+        						<th>操作</th>
 		              </tr>
 		              </thead>
 		            </table>
@@ -55,7 +56,7 @@ $(function () {
 		"ordering"    : false,
 		"info"        : true,
 		"autoWidth"   : true,
-		"serverSide": false,
+		"serverSide": true,
 		"ajax": {
 			"url": "/admin/rulemanager/getrule",
 			"type": "post",
@@ -69,6 +70,13 @@ $(function () {
 			{ "data": "l_createtime"},
 			{ "data": "s_username"},
 			{ "data": "s_descript"},
+      { "data": "s_type",render:function(val){
+        if(val=="ok"){
+					return "正确";
+				}else if(val == "err"){
+					return "异常";
+				}
+      }},
 			{ "data": "isuse",render:function(val,a,row){
 				tmp=""
 				if(val){
@@ -96,7 +104,15 @@ $(function () {
 					'&nbsp;&nbsp;<a class="btn btn-sm btn-danger" onclick="del(\''+val+'\')">删除</a>'
 					
 			}}
-       	]
+     ],
+		"fnServerParams": function (e) {  
+			var rule_type=$("#rule_type").val();
+  			if(rule_type){
+  				e.rule_type=rule_type;
+  			}else{
+  				e.rule_type="-1";
+  			}
+      }
 	});
 	ttablerulemanager.on('init.dt', function () {
 		$("#showbtn").on('click','a.opr',function(){
@@ -140,7 +156,8 @@ $(function () {
 					{label:"名称",s_label:"s_name",must:true},
 					{label:"描述",s_label:"s_descript"},
 					{label:"启用",s_label:"isuse",type:"tpl_list_local",list:[{"s_name":"是","_id":true},{"s_name":"否","_id":false}],default:true},
-					{label:"正则",s_label:"s_rule",type:"tpl_text",rows:2,must:true},
+					{label:"类型",s_label:"s_type",type:"tpl_list_local",must:true,list:[{"s_name":"正确","_id":"ok"},{"s_name":"异常","_id":"err"}],default:"0"},
+          {label:"正则",s_label:"s_rule",type:"tpl_text",rows:2,must:true},
 					{s_label:"_id",type:"tpl_hidden"},
 					{s_label:"s_fid",type:"tpl_hidden",val:fid},
 					{s_label:"s_field",type:"tpl_hidden",val:fname}
@@ -185,6 +202,15 @@ $(function () {
 			break;
 			}
 		});
+		var opt="<option value='-1'>全部</option>"+
+		"<option value='ok'>正确</option>"+
+		"<option value='err'>异常</option>";
+		var select="<div class='form-group'><label for='name'>类型:</label>"+
+			"<select id='rule_type' onchange='checkclick(this.value)' class='form-control input-sm'>"+
+			opt+
+			"</select></div>"
+		$("#rulemanagerTable_filter").prepend("&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;");
+		$("#rulemanagerTable_filter").prepend(select);
 	})
 })
 

+ 7 - 14
udp_winner/config.json

@@ -1,30 +1,23 @@
 {
-  "elasticsearch": "http://172.17.145.170:9800",
-  "elasticsearch_index": "winner_enterprise",
-  "elasticsearch_type": "winnerent",
-  "elasticsearch_buyer_index": "buyer_enterprise",
-  "elasticsearch_buyer_type": "buyerent",
-  "elasticsearch_agency_index": "agency_enterprise",
-  "elasticsearch_agency_type": "agencyent",
   "udpport": "127.0.0.1:12678",
   "port": "12678",
   "pool_size": "10",
-  "mgoinit": "172.17.145.163:27082",
-  "mgodb_bidding": "extract_v3",
-  "mgodb_mgoinit_c": "buyer",
+  "mgoinit": "127.0.0.1:27017",
+  "mgodb_bidding": "qfw",
+  "mgodb_mgoinit_c": "bidding",
   "mgodb_enterprise": "enterprise",
   "mgodb_enterprise_c": "qyxy",
   "mgourl2": "192.168.3.207:27092",
-  "mgourl": "172.17.145.163:27082",
-  "mgodb_extract_kf": "extract_v3",
+  "mgourl": "127.0.0.1:27017",
+  "mgodb_extract_kf": "extract_kf",
   "mgo_qyk_c": "winner_enterprise",
   "mgo_qyk_buyer": "buyer_enterprise",
   "mgo_qyk_agency": "agency_enterprise",
   "mgo_qyk_reg": "rc_rule",
-  "redis": "172.17.148.44:1479",
+  "redis": "127.0.0.1:6379",
   "redis_winner_db": "1",
   "redis_buyer_db": "2",
   "redis_agency_db": "3",
   "chan_pool_num": "10",
-  "his_redis": "172.17.148.44:2579"
+  "his_redis": "127.0.0.1:6380"
 }

+ 28 - 20
udp_winner/main.go

@@ -6,11 +6,9 @@ import (
 	"github.com/garyburd/redigo/redis"
 	hisRedis "github.com/go-redis/redis"
 	"gopkg.in/mgo.v2/bson"
-	es "gopkg.in/olivere/elastic.v1"
 	"log"
 	mu "mfw/util"
 	"net"
-	"qfw/common/src/qfw/util/elastic"
 	"qfw/util"
 	"qfw/util/mongodb"
 	"regexp"
@@ -27,13 +25,11 @@ var (
 	HisRedisPool                          *hisRedis.Client
 	Addrs                                 = make(map[string]interface{}, 0) //省市县
 	udpclient                             mu.UdpClient                      //udp对象
-	ElasticClientIndex, ElasticClientType string
+	Reg_person                            = regexp.MustCompile("[\u4e00-\u9fa5]+")
 	Reg_xing                              = regexp.MustCompile(`\*{1,}`)
-	Reg_person                            = regexp.MustCompile("[\u4E00-\u9FA5\\s]+")
 	Reg_tel                               = regexp.MustCompile(`^[0-9\-\s]*$`)
-	EsConn                                *es.Client
 	Updport                               int
-	CPool                                 chan bool
+	CPoolWinner, CPoolBuery, CPoolAgency  chan bool
 	//his_redis db
 	redis_winner_db, redis_buyer_db, redis_agency_db int
 	//异常表正则匹配处理
@@ -53,7 +49,9 @@ func init() {
 	if err != nil {
 		log.Fatalln(err)
 	}
-	CPool = make(chan bool, cpnum)
+	CPoolWinner = make(chan bool, cpnum)
+	CPoolBuery = make(chan bool, cpnum)
+	CPoolAgency = make(chan bool, cpnum)
 	Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
 		"capital", "establish_date", "legal_person", "company_type",
 		"district", "city", "province", "area_code", "credit_no",
@@ -68,11 +66,6 @@ func init() {
 		"address", "district", "city", "province", "area_code", "credit_no", "agency_name",
 		"history_name", "wechat_accounts", "website", "report_websites"}
 
-	//es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
-	//es init
-	elastic.InitElasticSize(Config["elasticsearch"], 50)
-	EsConn = elastic.GetEsConn()
-	defer elastic.DestoryEsConn(EsConn)
 	initRdis()
 	initMongo()
 	initReg()
@@ -113,13 +106,22 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			}
 			//data_info:save//存量   data_info:add //增量
 			//阻塞
-			CPool <- true
+			CPoolWinner <- true
 			go func(mapinfo *map[string]interface{}) {
-				defer func() { <-CPool }()
-				go TaskWinner(mapinfo)
-				go TaskBuyer(mapinfo)
-				go TaskAgency(mapinfo)
+				defer func() { <-CPoolWinner }()
+				TaskWinner(mapinfo)
 			}(tmp)
+			CPoolBuery <- true
+			go func(mapinfo *map[string]interface{}) {
+				defer func() { <-CPoolBuery }()
+				TaskBuyer(mapinfo)
+			}(tmp)
+			CPoolAgency <- true
+			go func(mapinfo *map[string]interface{}) {
+				defer func() { <-CPoolAgency }()
+				TaskAgency(mapinfo)
+			}(tmp)
+
 		}
 	case mu.OP_NOOP: //下个节点回应
 		log.Println("发送成功", string(data))
@@ -170,14 +172,12 @@ func initMongo() {
 	SourceClient.MongodbAddr = Config["mgoinit"]
 	SourceClient.Size = pool_size
 	SourceClient.DbName = Config["mgodb_bidding"]
-	//mongodbSim.DbName = "qfw"
 	SourceClient.InitPool()
 
 	FClient = new(mongodb.MongodbSim)
 	FClient.MongodbAddr = Config["mgourl"]
 	FClient.Size = pool_size
 	FClient.DbName = Config["mgodb_extract_kf"]
-	//mongodbSim.DbName = "qfw"
 	FClient.InitPool()
 	FClientmgoConn := FClient.GetMgoConn()
 	defer FClient.DestoryMongoConn(FClientmgoConn)
@@ -211,7 +211,15 @@ func initReg() {
 		if !ok || !ok2 || !ok3 || s_field == "" || s_rule == "" || s_type == "" {
 			continue
 		}
-		regtmp := regexp.MustCompile(s_rule)
+		var pattern string
+		if strings.Contains(s_rule, "\\u") {
+			s_rule = strings.Replace(s_rule, "\\", "\\\\", -1)
+			s_rule = strings.Replace(s_rule, "\\\\u", "\\u", -1)
+			pattern, _ = strconv.Unquote(`"` + s_rule + `"`)
+		} else {
+			pattern = s_rule
+		}
+		regtmp := regexp.MustCompile(pattern)
 		if s_field == "winner" {
 			if s_type == "ok" {
 				WinnerRegOk = append(WinnerRegOk, *regtmp)

+ 207 - 226
udp_winner/timedTaskWinner.go

@@ -4,7 +4,6 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/garyburd/redigo/redis"
-	"gopkg.in/mgo.v2"
 	"gopkg.in/mgo.v2/bson"
 	"log"
 	"qfw/util"
@@ -99,10 +98,6 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 				rdb.Do("SELECT", redis_winner_db)
 				if reply, err := redis.String(rdb.Do("GET", redisCName)); err != nil {
 					//redis不存在,存到临时表,定时任务处理
-					FClient.DbName = Config["mgodb_extract_kf"]
-					//if tmpid := FClient.Save("winner_new", tmps); tmpid == nil {
-					//	log.Println("存量 FClient.Save err", tmpid)
-					//}
 					for _, vmap := range rValuesMaps {
 						vmap["_id"] = bson.ObjectIdHex(vmap["_id"].(string))
 						if err = FClient.SaveForOld("winner_new", vmap); err != nil {
@@ -120,7 +115,6 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 						log.Println(err)
 					}
 					//拿到合并后的qyk
-					FClient.DbName = Config["mgodb_extract_kf"]
 					oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, nil)
 					if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
 						log.Println(redisCName, "存量 redis id 不存在", reply)
@@ -197,116 +191,117 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 					(*oldTmp)["contact"] = contactMaps
 					//mongo更新
 					(*oldTmp)["updatatime"] = time.Now().Unix()
-					FClient.DbName = Config["mgodb_extract_kf"]
 					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
 						log.Println("存量  mongo更新 err", esId, oldTmp)
 					}
 					//es更新
 					delete((*oldTmp), "_id")
-					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-						log.Println("存量 EsConn err :", err)
-					}
 				}
 			}
 		}
 		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+		//发送udp 更新es段
 
 	} else {
-		//增量
-		overid := addfunc(gtid, cursor)
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = Add(overid, tmp)
+		}
 		SourceClient.DestoryMongoConn(SourceClientcc)
 		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+		//发送udp 更新es段
 	}
 
 }
 
 //增量
-func addfunc(gtid string, cursor *mgo.Iter) string {
-	//增量处理
-	overid := gtid
-	tmp := map[string]interface{}{}
-	for cursor.Next(&tmp) {
-		overid = tmp["_id"].(bson.ObjectId).Hex()
-		//log.Println(tmp["_id"])
-		winner, ok := tmp["winner"].(string)
-		if !ok || utf8.RuneCountInString(winner) < 4 {
-			continue
+func Add(overid string, tmp map[string]interface{}) string {
+	overid = tmp["_id"].(bson.ObjectId).Hex()
+	winner, ok := tmp["winner"].(string)
+	if !ok || utf8.RuneCountInString(winner) < 4 {
+		return overid
+	}
+	//redis查询是否存在
+	rdb := RedisPool.Get()
+	rdb.Do("SELECT", redis_winner_db)
+	if reply, err := redis.String(rdb.Do("GET", winner)); err != nil {
+		//redis不存在存到临时表,定时任务处理
+		if err := FClient.SaveForOld("winner_new", tmp); err != nil {
+			log.Println("FClient.Save err", err, tmp)
 		}
-		//redis查询是否存在
-		rdb := RedisPool.Get()
-		rdb.Do("SELECT", redis_winner_db)
-		if reply, err := redis.String(rdb.Do("GET", winner)); err != nil {
-			//redis不存在存到临时表,定时任务处理
-			FClient.DbName = Config["mgodb_extract_kf"]
-			if err := FClient.SaveForOld("winner_new", tmp); err != nil {
-				log.Println("FClient.Save err", err, tmp)
-			}
-			//log.Println("get redis id err:定时任务处理", err, tmp)
-			if err := rdb.Close(); err != nil {
-				log.Println(err)
-			}
-			continue
-		} else {
-			if err := rdb.Close(); err != nil {
-				log.Println(err)
-			}
-			//拿到合并后的qyk
-			FClient.DbName = Config["mgodb_extract_kf"]
-			oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, bson.M{})
-			if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
-				log.Println("redis id 不存在")
-				continue
-			}
-			//比较合并 行业类型
-			tmpTopscopeclass := []string{}
-			tmpConTopscopeclass := []string{}
-			tmpTopscopeclassMap := make(map[string]bool)
+		//log.Println("get redis id err:定时任务处理", err, tmp)
+		if err := rdb.Close(); err != nil {
+			log.Println(err)
+		}
+		return overid
+	} else {
+		if err := rdb.Close(); err != nil {
+			log.Println(err)
+		}
+		//拿到合并后的qyk
+		oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, bson.M{})
+		if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
+			log.Println("redis id 不存在", reply)
+			return overid
+		}
+		//比较合并 行业类型
+		tmpTopscopeclass := []string{}
+		tmpConTopscopeclass := []string{}
+		tmpTopscopeclassMap := make(map[string]bool)
 
-			if (*oldTmp)["industry"] != nil {
-				if v, ok := (*oldTmp)["industry"].([]interface{}); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok {
-							tmpTopscopeclassMap[vvv] = true
-						}
-					}
-				}
-			}
-			if v, ok := tmp["topscopeclass"].([]interface{}); ok {
+		if (*oldTmp)["industry"] != nil {
+			if v, ok := (*oldTmp)["industry"].([]interface{}); ok {
 				for _, vv := range v {
-					if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-						tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-						tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
+					if vvv, ok := vv.(string); ok {
+						tmpTopscopeclassMap[vvv] = true
 					}
 				}
 			}
-			for k := range tmpTopscopeclassMap {
-				tmpTopscopeclass = append(tmpTopscopeclass, k)
+		}
+		if v, ok := tmp["topscopeclass"].([]interface{}); ok {
+			for _, vv := range v {
+				if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+					tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+					tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
+				}
 			}
-			sort.Strings(tmpTopscopeclass)
-			(*oldTmp)["industry"] = tmpTopscopeclass
+		}
+		for k := range tmpTopscopeclassMap {
+			tmpTopscopeclass = append(tmpTopscopeclass, k)
+		}
+		sort.Strings(tmpTopscopeclass)
+		(*oldTmp)["industry"] = tmpTopscopeclass
 
-			esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
-			//更新行业类型
-			if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
-				(*oldTmp)["updatatime"] = time.Now().Unix()
-				//mongo更新
-				FClient.DbName = Config["mgodb_extract_kf"]
-				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
-					log.Println("mongo更新err", esId)
-				}
+		esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
+		//更新行业类型
+		if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
+			(*oldTmp)["updatatime"] = time.Now().Unix()
+			//mongo更新
+			if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+				log.Println("mongo更新err", esId)
+			}
 
-				//es更新
-				delete((*oldTmp), "_id")
-				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-					log.Println("update es err:", err)
+			//es更新
+			delete((*oldTmp), "_id")
+			return overid
+		}
+		//联系方式合并
+		contactMaps := make([]map[string]interface{}, 0)
+		if (*oldTmp)["contact"] != nil {
+			//直接添加联系人,不再判断
+			if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
+				for _, vv := range v {
+					contactMaps = append(contactMaps, vv.(map[string]interface{}))
 				}
-				continue
-			}
-			//联系方式合并
-			var tmpperson, winnertel string
-			if tmppersona, ok := tmp["winnerperson"].(string); ok {
-				tmpperson = tmppersona
 			}
+		}
+		var tmpperson, winnertel string
+		if tmppersona, ok := tmp["winnerperson"].(string); ok && tmpperson != "" && Reg_person.MatchString(tmpperson) && !Reg_xing.MatchString(tmpperson) {
+			tmpperson = tmppersona
+		}
+		if tmpperson != "" {
 			if winnerteltmp, ok := tmp["winnertel"].(string); ok {
 				winnertel = winnerteltmp
 			}
@@ -315,13 +310,7 @@ func addfunc(gtid string, cursor *mgo.Iter) string {
 			} else {
 				winnertel = winnertel
 			}
-			contactMaps := make([]interface{}, 0)
-			if (*oldTmp)["contact"] != nil {
-				//直接添加联系人,不再判断
-				if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
-					contactMaps = append(contactMaps, v...)
-				}
-			}
+
 			vvv := make(map[string]interface{})
 			vvv["infoid"] = overid
 			vvv["contact_person"] = tmpperson
@@ -330,23 +319,22 @@ func addfunc(gtid string, cursor *mgo.Iter) string {
 			vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
 			vvv["updatetime"] = time.Now().Unix()
 			contactMaps = append(contactMaps, vvv)
-			//分包处理
-			PackageDealWith(&contactMaps, tmp)
-			(*oldTmp)["contact"] = contactMaps
-			//mongo更新
-			(*oldTmp)["updatatime"] = time.Now().Unix()
-			FClient.DbName = Config["mgodb_extract_kf"]
-			if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
-				log.Println("mongo更新 err", esId, oldTmp)
-			}
-			//es更新
-			delete((*oldTmp), "_id")
-			if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-				log.Println("EsConn err :", err)
-			}
 		}
+		//分包处理
+		if tmp["package"] != nil {
+			PackageDealWith(oldTmp, tmp, winner)
+		}
+		(*oldTmp)["contact"] = contactMaps
+		//mongo更新
+		(*oldTmp)["updatatime"] = time.Now().Unix()
+		if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+			log.Println("mongo更新 err", esId, oldTmp)
+		}
+		//es更新
+		delete((*oldTmp), "_id")
 	}
 	return overid
+
 }
 
 //定时任务  新增
@@ -362,11 +350,11 @@ func TimedTaskWinner() {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
-				t2.Reset(time.Minute * 5)
+				t2.Reset(time.Minute * 1)
 				FClient.DestoryMongoConn(Fcconn)
 				continue
 			} else {
-				log.Println("临时表有数据:", tmpLast)
+				log.Println("临时表有数据:", tmpLast["_id"])
 				fconn := FClient.GetMgoConn(86400)
 				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{
 					"_id": bson.M{
@@ -391,13 +379,9 @@ func TimedTaskWinner() {
 					rdb := RedisPool.Get()
 					rdb.Do("SELECT", redis_winner_db)
 					if _, err := redis.String(rdb.Do("GET", errwinner)); err == nil {
-						//redis存在发送udp进行处理
-						TaskWinner(&map[string]interface{}{
-							"gtid":  tmpId,
-							"lteid": tmpId,
-						})
+						//增量合并
+						Add(tmpId, tmp)
 						//存在的话删除tmp mongo表
-						FClient.DbName = Config["mgodb_extract_kf"]
 						if DeletedCount := FClient.Del("winner_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
 							log.Println("删除临时表err:", DeletedCount)
 						}
@@ -411,10 +395,11 @@ func TimedTaskWinner() {
 						}
 					}
 					//查询redis不存在新增
-					FClient.DbName = Config["mgodb_enterprise"]
-
-					resulttmp, b := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": errwinner})
-					if !b || (*resulttmp)["_id"] == nil {
+					sessionfinone := FClient.GetMgoConn()
+					resulttmp := make(map[string]interface{})
+					err := sessionfinone.DB(Config["mgodb_enterprise"]).C(Config["mgodb_enterprise_c"]).Find(bson.M{"company_name": errwinner}).One(&resulttmp)
+					FClient.DestoryMongoConn(sessionfinone)
+					if err != nil || resulttmp["_id"] == nil {
 						//log.Println(r)
 						//人工审核正则
 						var isok bool
@@ -439,122 +424,126 @@ func TimedTaskWinner() {
 							}
 						}
 						//都没匹配
-						if tmp["winner_ok"] == nil  && tmp["winner_err"] == nil{
+						if tmp["winner_ok"] == nil && tmp["winner_err"] == nil {
 							tmp["winner_err"] = 1
 						}
 						//匹配不到原始库,存入异常表删除临时表
-						FClient.DbName = Config["mgodb_extract_kf"]
 						if err := FClient.SaveForOld("winner_err", tmp); err != nil {
 							log.Println("存入异常表错误", err, tmp)
 						}
-						if deleteNum := FClient.Del("winner_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !b {
+						if deleteNum := FClient.Del("winner_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
 							log.Println("删除临时表错误", deleteNum)
 						}
 						continue
 					} else {
 						//log.Println(123)
 						//匹配到原始库,新增 resulttmp
-						if (*resulttmp)["credit_no"] != nil {
-							if credit_no, ok := (*resulttmp)["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
 								len(strings.TrimSpace(credit_no)) > 8 {
 								dataNo := strings.TrimSpace(credit_no)[2:8]
 								if Addrs[dataNo] != nil {
 									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
-										if (*resulttmp)["province"] == nil || (*resulttmp)["province"] == "" {
-											(*resulttmp)["province"] = v["province"]
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
 										}
-										(*resulttmp)["city"] = v["city"]
-										(*resulttmp)["district"] = v["district"]
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
 
 									}
 								}
 							}
 						}
-						contacts := make([]map[string]interface{}, 0)
-						contact := make(map[string]interface{}, 0)
-						if (*resulttmp)["legal_person"] != nil {
-							contact["contact_person"] = (*resulttmp)["legal_person"] //联系人
-						} else {
-							contact["contact_person"] = "" //联系人
-						}
-						contact["contact_type"] = "法定代表人" //法定代表人
-						//log.Println(1)
-						if (*resulttmp)["annual_reports"] != nil {
-							bytes, err := json.Marshal((*resulttmp)["annual_reports"])
-							if err != nil {
-								log.Println("annual_reports err:", err)
-							}
-							phonetmp := make([]map[string]interface{}, 0)
-							err = json.Unmarshal(bytes, &phonetmp)
-							if err != nil {
-								log.Println("Unmarshal err:", err)
+						//行业类型
+						tmpclass := make([]string, 0)
+						if tclasss, ok := tmp["topscopeclass"].([]interface{}); ok {
+							for _, vv := range tclasss {
+								if vvv, ok := vv.(string); ok {
+									if len(vvv) > 1 {
+										tmpclass = append(tmpclass, vvv[:len(vvv)-1])
+									}
+								}
 							}
-							for _, vv := range phonetmp {
-								if vv["company_phone"] != nil {
-									if vv["company_phone"] == "" {
-										continue
+						}
+						contacts := make([]map[string]interface{}, 0)
+						if legal_person, ok := resulttmp["legal_person"].(string); ok && legal_person != "" && !Reg_xing.MatchString(legal_person) && Reg_person.MatchString(legal_person) {
+							contact := make(map[string]interface{}, 0)
+							contact["contact_person"] = legal_person //联系人
+							contact["contact_type"] = "法定代表人"        //法定代表人
+							if resulttmp["annual_reports"] != nil {
+								bytes, err := json.Marshal(resulttmp["annual_reports"])
+								if err != nil {
+									log.Println("annual_reports err:", err)
+								}
+								phonetmp := make([]map[string]interface{}, 0)
+								err = json.Unmarshal(bytes, &phonetmp)
+								if err != nil {
+									log.Println("Unmarshal err:", err)
+								}
+								for _, vv := range phonetmp {
+									if vv["company_phone"] != nil {
+										if vv["company_phone"] == "" {
+											continue
+										} else {
+											contact["phone"] = vv["company_phone"] //联系电话
+											break
+										}
 									} else {
-										contact["phone"] = vv["company_phone"] //联系电话
-										break
+										contact["phone"] = "" //联系电话
 									}
-								} else {
+
+								}
+							}
+							//log.Println(k, contact["phone"], resulttmp["_id"])
+							//time.Sleep(10 * time.Second)
+							if phone, ok := contact["phone"].(string); ok && phone != "" {
+								if Reg_xing.MatchString(phone) || !Reg_tel.MatchString(phone) {
 									contact["phone"] = "" //联系电话
 								}
-
+							} else {
+								contact["phone"] = "" //联系电话
 							}
+							contact["topscopeclass"] = "企业公示"         //项目类型
+							contact["updatetime"] = time.Now().Unix() //更新时间
+							contact["infoid"] = ""                    //招标信息id
+							contacts = append(contacts, contact)
 						}
-						//log.Println(k, contact["phone"], resulttmp["_id"])
-						//time.Sleep(10 * time.Second)
-						if contact["phone"] == nil {
-							contact["phone"] = "" //联系电话
-						}
-						contact["topscopeclass"] = "企业公示"         //项目类型
-						contact["updatetime"] = time.Now().Unix() //更新时间
-						contact["infoid"] = ""                    //招标信息id
-						contacts = append(contacts, contact)
 						//添加临时表匹配到的联系人
-						vvv := make(map[string]interface{})
-						vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex()
-						if tmp["winnerperson"] != nil {
-							vvv["contact_person"] = tmp["winnerperson"]
-						} else {
-							vvv["contact_person"] = ""
-						}
-						vvv["contact_type"] = "项目联系人"
-						//	"winner": 1, "winnertel": 1, "winnerperson": 1, "topscopeclass": 1
-						if tmp["winnertel"] != nil {
-							vvv["phone"] = tmp["winnertel"]
-						} else {
-							vvv["phone"] = ""
-						}
-						tmpclass := make([]string, 0)
-						if tclasss, ok := tmp["topscopeclass"].([]string); ok {
-							for _, vv := range tclasss {
-								if len(vv) > 1 {
-									tmpclass = append(tmpclass, vv[:len(vv)-1])
-								}
+						if winnerperson, ok := tmp["winnerperson"].(string); ok && winnerperson != "" &&
+							!Reg_xing.MatchString(winnerperson) && Reg_person.MatchString(winnerperson) {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex()
+							vvv["contact_person"] = winnerperson
+							vvv["contact_type"] = "项目联系人"
+							//	"winner": 1, "winnertel": 1, "winnerperson": 1, "topscopeclass": 1
+							if winnertel, ok := tmp["winnertel"].(string); ok && !Reg_xing.MatchString(winnertel) && Reg_tel.MatchString(winnertel) {
+								vvv["phone"] = winnertel
+							} else {
+								vvv["phone"] = ""
 							}
+
+							vvv["topscopeclass"] = strings.Join(tmpclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contacts = append(contacts, vvv)
 						}
-						vvv["topscopeclass"] = strings.Join(tmpclass, ";")
-						vvv["updatetime"] = time.Now().Unix()
-						contacts = append(contacts, vvv)
-						(*resulttmp)["contact"] = contacts
+
+						resulttmp["contact"] = contacts
 
 						savetmp := make(map[string]interface{}, 0)
 						for _, sk := range Fields {
 							if sk == "establish_date" {
-								if (*resulttmp)[sk] != nil {
-									savetmp[sk] = (*resulttmp)[sk].(time.Time).UTC().Unix()
+								if resulttmp[sk] != nil {
+									savetmp[sk] = resulttmp[sk].(time.Time).UTC().Unix()
 									continue
 								}
 							} else if sk == "capital" {
 								//log.Println(sk, resulttmp[sk])
-								savetmp[sk] = ObjToMoney([]interface{}{(*resulttmp)[sk], ""})[0]
+								savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
 								continue
 							} else if sk == "partners" {
 								//log.Println(sk, resulttmp[sk], )
-								if (*resulttmp)[sk] != nil {
-									if ppms, ok := (*resulttmp)[sk].([]interface{}); ok {
+								if resulttmp[sk] != nil {
+									if ppms, ok := resulttmp[sk].([]interface{}); ok {
 										for i, _ := range ppms {
 											if ppms[i].(map[string]interface{})["stock_type"] != nil {
 												ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
@@ -569,19 +558,19 @@ func TimedTaskWinner() {
 								}
 								continue
 							} else if sk == "_id" {
-								savetmp["tmp"+sk] = (*resulttmp)[sk]
+								savetmp["tmp"+sk] = resulttmp[sk]
 								continue
 							} else if sk == "area_code" {
 								//行政区划代码
-								savetmp[sk] = fmt.Sprint((*resulttmp)[sk])
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
 								continue
 							} else if sk == "report_websites" {
 								//网址
-								if (*resulttmp)["report_websites"] == nil {
+								if resulttmp["report_websites"] == nil {
 									savetmp["website"] = ""
 								} else {
 									report_websitesArr := []string{}
-									if ppms, ok := (*resulttmp)[sk].([]interface{}); ok {
+									if ppms, ok := resulttmp[sk].([]interface{}); ok {
 										for _, v := range ppms {
 											if vvv, ok := v.(map[string]interface{}); ok {
 												if rv, ok := vvv["website_url"].(string); ok {
@@ -610,50 +599,36 @@ func TimedTaskWinner() {
 								savetmp[sk] = tmpTopscopeclass
 								continue
 							}
-							if (*resulttmp)[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
 								savetmp[sk] = ""
 							} else {
-								savetmp[sk] = (*resulttmp)[sk]
+								savetmp[sk] = resulttmp[sk]
 							}
 						}
+						//判断分包
+						if tmp["package"] != nil {
+							PackageDealWith(&savetmp, tmp, errwinner)
+						}
 						//tmps = append(tmps, savetmp)
+						savetmp["comeintime"] = time.Now().Unix()
 						savetmp["updatatime"] = time.Now().Unix()
 						//保存mongo
-						FClient.DbName = Config["mgodb_extract_kf"]
-
 						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
 						if saveid != "" {
 							//保存redis
 							rc := RedisPool.Get()
 							rc.Do("SELECT", redis_winner_db)
-							//var _id string
-							//if v, ok := saveid.(primitive.ObjectID); ok {
-							//	_id = v.Hex()
-							//}
 							if _, err := rc.Do("SET", savetmp["company_name"], saveid); err != nil {
 								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
 							} else {
-								//保存es
-								delete(savetmp, "_id")
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
-
-								//esConn := elastic.GetEsConn()
-								//defer elastic.DestoryEsConn(esConn)
-								if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(saveid).BodyJson(savetmp).Refresh(true).Do(); err != nil {
-									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
-								} else {
-									//删除临时表
-									FClient.DbName = Config["mgodb_extract_kf"]
-									if deleteNum := FClient.Del("winner_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
-										log.Println("删除临时表失败", deleteNum)
-									}
+								//删除临时表
+								if deleteNum := FClient.Del("winner_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
+									log.Println("删除临时表失败", deleteNum)
 								}
 							}
+							if err := rc.Close(); err != nil {
+								log.Println(err)
+							}
 						} else {
 							log.Println("save mongo err:", saveid, tmp["_id"])
 						}
@@ -669,6 +644,12 @@ func TimedTaskWinner() {
 }
 
 //分包处理
-func PackageDealWith(contactMaps *[]interface{}, tmp map[string]interface{}) {
-
+func PackageDealWith(contactMap *map[string]interface{}, tmp map[string]interface{}, comName string) []interface{} {
+	util.Catch()
+	//if v, ok := tmp["package"].(map[string]interface{}); ok {
+	//for i, pv := range v {
+	//	log.Println(i, pv)
+	//}
+	//}
+	return nil
 }

+ 20 - 0
udpcreateindex/src/config.json

@@ -47,6 +47,26 @@
         "index": "projectset_v1",
         "type": "projectset"
     },
+    "standard": {
+ 		"addr": "172.17.145.163:27082",
+        "size": 10,
+        "db": "qfw",
+    	"winnerent":{
+			"collect": "winner_enterprise",
+        	"index": "winnerent_v1",
+        	"type": "winnerent"
+		},
+        "buyerent":{
+			"collect": "buyer_enterprise",
+        	"index": "buyerent_v1",
+        	"type": "buyerent"
+		},
+ 		"agencyent":{
+			"collect": "agency_enterprise",
+       	 	"index": "agencyent_v1",
+       		"type": "agencyent"
+		}
+    },
     "mongodb": {
         "addr": "10.172.242.243:27080,10.30.94.175:27081,10.81.232.246:27082",
         "pool": 10,

+ 45 - 12
udpcreateindex/src/main.go

@@ -13,17 +13,19 @@ import (
 )
 
 var (
-	Sysconfig                                                      map[string]interface{} //配置文件
-	mgo                                                            *mongodb.MongodbSim    //mongodb操作对象
-	extractmgo                                                     *mongodb.MongodbSim    //mongodb操作对象
-	udpclient                                                      mu.UdpClient           //udp对象
-	updport                                                        string
-	winner, winnerenterprise, bidding, biddingback, project, buyer map[string]interface{}
-	savesizei                                                      = 500
-	biddingIndexFields                                             = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
-	projectinfoFields                                              []string
-	multiIndex                                                     []string
-	BulkSize                                                       = 400
+	Sysconfig          map[string]interface{} //配置文件
+	mgo                *mongodb.MongodbSim    //mongodb操作对象
+	extractmgo         *mongodb.MongodbSim    //mongodb操作对象
+	mgostandard        *mongodb.MongodbSim    //mongodb操作对象
+	udpclient          mu.UdpClient           //udp对象
+	updport            string
+	savesizei          = 500
+	biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
+	projectinfoFields  []string
+	multiIndex         []string
+	BulkSize           = 400
+
+	winner, bidding, biddingback, project, buyer, standard map[string]interface{}
 )
 
 func init() {
@@ -32,7 +34,7 @@ func init() {
 	go checkMapJob()
 	updport, _ = Sysconfig["updport"].(string)
 	winner, _ = Sysconfig["winner"].(map[string]interface{})
-	winnerenterprise, _ = Sysconfig["winnerenterprise"].(map[string]interface{})
+	standard, _ = Sysconfig["standard"].(map[string]interface{})
 	buyer, _ = Sysconfig["buyer"].(map[string]interface{})
 	bidding, _ = Sysconfig["bidding"].(map[string]interface{})
 	biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
@@ -60,6 +62,13 @@ func init() {
 		}
 		extractmgo.InitPool()
 	}
+	mgostandard = &mongodb.MongodbSim{
+		MongodbAddr: standard["addr"].(string),
+		Size:        util.IntAllDef(standard["pool"], 5),
+		DbName:      standard["db"].(string),
+	}
+	mgostandard.InitPool()
+	log.Println(standard["addr"].(string))
 
 	econf := Sysconfig["elastic"].(map[string]interface{})
 	elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
@@ -173,6 +182,30 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					buyerTask(data, mapInfo)
 				}()
+			case "winnerent": //标准库
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					standardTask("winnerent", mapInfo)
+				}()
+			case "buyerent": //标准库
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					standardTask("buyerent", mapInfo)
+				}()
+			case "agencyent": //标准库
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					standardTask("agencyent", mapInfo)
+				}()
 			default:
 				pool <- true
 				go func() {

+ 178 - 0
udpcreateindex/src/standardata.go

@@ -0,0 +1,178 @@
+package main
+
+import (
+	"log"
+	"qfw/util"
+	elastic "qfw/util/elastic"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+func standardTask(stype string, mapInfo map[string]interface{}) {
+	defer util.Catch()
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": bson.M{
+				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	}
+	switch stype {
+	case "winnerent":
+		winnerEnt(q)
+	case "buyerent":
+		buyerEnt(q)
+	case "agencyent":
+		agencyEnt(q)
+	}
+}
+
+//winnerent
+func winnerEnt(q map[string]interface{}) {
+	session := mgostandard.GetMgoConn(3600)
+	defer mgostandard.DestoryMongoConn(session)
+	db, _ := standard["db"].(string)
+	winnerent, _ := standard["winnerent"].(map[string]interface{})
+	c, _ := winnerent["collect"].(string)
+	index, _ := winnerent["index"].(string)
+	itype, _ := winnerent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Iter()
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		//不生索引字段
+		delete(tmp, "partners")
+		delete(tmp, "wechat_accounts")
+		delete(tmp, "tmp_id")
+		tmp["company"] = tmp["company_name"]
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create winnerent index...over", n)
+}
+
+//buyerent
+func buyerEnt(q map[string]interface{}) {
+	session := mgostandard.GetMgoConn(3600)
+	defer mgostandard.DestoryMongoConn(session)
+	db, _ := standard["db"].(string)
+	buyerent, _ := standard["buyerent"].(map[string]interface{})
+	c, _ := buyerent["collect"].(string)
+	index, _ := buyerent["index"].(string)
+	itype, _ := buyerent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Iter()
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		//不生索引字段
+		delete(tmp, "partners")
+		delete(tmp, "wechat_accounts")
+		delete(tmp, "tmp_id")
+		tmp["buyer"] = tmp["buyer_name"]
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create buyerent index...over", n)
+}
+
+//agencyent
+func agencyEnt(q map[string]interface{}) {
+	session := mgostandard.GetMgoConn(3600)
+	defer mgostandard.DestoryMongoConn(session)
+	db, _ := standard["db"].(string)
+	agencyent, _ := standard["agencyent"].(map[string]interface{})
+	c, _ := agencyent["collect"].(string)
+	index, _ := agencyent["index"].(string)
+	itype, _ := agencyent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Iter()
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		//不生索引字段
+		delete(tmp, "partners")
+		delete(tmp, "wechat_accounts")
+		delete(tmp, "tmp_id")
+
+		tmp["agency"] = tmp["agency_name"]
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create agencyent index...over", n)
+}

+ 61 - 55
udps/main.go

@@ -6,72 +6,78 @@ import (
 	"log"
 	mu "mfw/util"
 	"net"
-	qu "qfw/util"
+	"os"
+	qutil "qfw/util"
+	"qfw/util/mongodb"
 	"time"
 
 	"gopkg.in/mgo.v2/bson"
 )
 
-var udpclient mu.UdpClient //udp对象
-var nextNodes []map[string]interface{}
-
-var startDate, endDate, ip, port, stype, sid, eid string
+var startDate, endDate string
 
 func main() {
-	//2015-11-03,2017-04-01
-	//2017-04-01,2017-06-01
-	//2017-06-01,2018-06-01
-	//2018-06-01,2019-02-20
-	/*
-ObjectId("5da3f31aa5cb26b9b798d3aa")
-ObjectId("5da418c4a5cb26b9b7e3e9a6")
-*/
-
-	flag.StringVar(&sid, "sid", "", "开始id")
-	flag.StringVar(&eid, "eid", "", "结束id")
+	ip, p, tmptime, tmpkey, id1, id2, stype, q, bkey, param := "", 0, 0, "", "", "", "", "", "", ""
 	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
-	flag.StringVar(&endDate, "end", "2019-11-10", "结束日期2006-01-02")
+	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
-	flag.StringVar(&port, "port", "1488", "dup端口")
-	flag.StringVar(&stype, "stype", "", "stype")
+	flag.IntVar(&p, "p", 0, "端口")
+	flag.IntVar(&tmptime, "tmptime", 0, "时间查询")
+	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")
+	flag.StringVar(&id1, "gtid", "", "gtid")
+	flag.StringVar(&id2, "lteid", "", "lteid")
+	flag.StringVar(&stype, "stype", "", "stype,传递类型")
+	flag.StringVar(&bkey, "bkey", "", "bkey,加上此参数表示不生关键词和摘要")
+	flag.StringVar(&q, "q", "", "q查询语句\"{'':''}\",有q就不要gtid,lteid")
+	flag.StringVar(&param, "param", "", "param,生信息发布或其他索引时用双引号套单引号\"{'mgoaddr':'','d':'','c':'','index':'','type':''}\"")
 	flag.Parse()
-	var startid, endid bson.ObjectId
-	if sid != "" && eid != "" {
-		startid = qu.StringTOBsonId(sid)
-		endid = qu.StringTOBsonId(eid)
-	} else {
-		start, _ := time.ParseInLocation(qu.Date_Short_Layout, startDate, time.Local)
-		end, _ := time.ParseInLocation(qu.Date_Short_Layout, endDate, time.Local)
-		startid = bson.NewObjectIdWithTime(start)
-		endid = bson.NewObjectIdWithTime(end)
+	if startDate != "" || endDate != "" {
+		start, _ := time.ParseInLocation(qutil.Date_Short_Layout, startDate, time.Local)
+		end, _ := time.ParseInLocation(qutil.Date_Short_Layout, endDate, time.Local)
+		id1 = qutil.BsonIdToSId(bson.NewObjectIdWithTime(start))
+		id2 = qutil.BsonIdToSId(bson.NewObjectIdWithTime(end))
+		log.Println(id1, id2)
 	}
-	log.Println(startid, endid, ip, port, stype)
-	udpclient = mu.UdpClient{Local: ":1470", BufSize: 1024}
-	udpclient.Listen(processUdpMsg)
-	by, _ := json.Marshal(map[string]interface{}{
-		"gtid":  startid,
-		"lteid": endid,
-		"stype": stype,
-	})
-	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-		IP:   net.ParseIP(ip),
-		Port: qu.IntAll(port),
-	})
-	b := make(chan bool, 1)
-	<-b
-}
-
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-	switch act {
-	case mu.OP_TYPE_DATA:
-		var mapInfo map[string]interface{}
-		err := json.Unmarshal(data, &mapInfo)
-		if err != nil {
-			log.Println(err)
-		} else {
-			log.Println(mapInfo)
+	if ip != "" && p > 0 && ((id1 != "" && id2 != "") || (q != "" || tmptime > 0)) {
+		toadd := &net.UDPAddr{
+			IP:   net.ParseIP(ip),
+			Port: p,
+		}
+		udp := mu.UdpClient{Local: ":50010", BufSize: 1024}
+		udp.Listen(func(b byte, data []byte, add *net.UDPAddr) {
+			switch b {
+			case mu.OP_NOOP:
+				log.Println(string(data))
+				os.Exit(0)
+			}
+		})
+		m1 := map[string]interface{}{
+			"gtid":  id1,
+			"lteid": id2,
+			"stype": stype,
 		}
-	case mu.OP_NOOP: //下个节点回应
-		log.Println("发送成功", string(data))
+		if bkey != "" {
+			m1["bkey"] = bkey
+		}
+		if q != "" {
+			m1["query"] = mongodb.ObjToMQ(q, true) //qutil.ObjToMap(q)
+		}
+		if tmptime > 0 && tmpkey != "" {
+			m1["query"] = map[string]interface{}{tmpkey: map[string]interface{}{"$gte": tmptime}}
+		}
+		if param != "" {
+			pm := qutil.ObjToMap(param)
+			for k, v := range *pm {
+				m1[k] = v
+			}
+		}
+
+		by, _ := json.Marshal(m1)
+		log.Println(string(by))
+		udp.WriteUdp(by, mu.OP_TYPE_DATA, toadd)
+		time.Sleep(30 * time.Second)
+	} else {
+		flag.PrintDefaults()
+		log.Println("参数错误.")
 	}
 }