瀏覽代碼

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

maxiaoshan 5 年之前
父節點
當前提交
6d47378e76

+ 7 - 1
src/jy/cluster/aliecs.go

@@ -19,6 +19,7 @@ import (
 	"net/http"
 	"net/url"
 	qu "qfw/util"
+	"regexp"
 	"sort"
 	"strings"
 	"time"
@@ -92,6 +93,8 @@ func runInstances(kv map[string]interface{}, taskName, widthOut, computer string
 	log.Println(res)
 }
 
+var aireg = regexp.MustCompile("^ai")
+
 //查询多台实例的详细信息
 func DescribeInstances() {
 	res := GET("DescribeInstances", [][]string{
@@ -99,6 +102,9 @@ func DescribeInstances() {
 		[]string{"InstanceChargeType", "PostPaid"},
 		[]string{"PageSize", "100"},
 	})
+	if res["Instances"] == nil {
+		return
+	}
 	for _, ins := range res["Instances"].(map[string]interface{}) {
 		for _, val := range ins.([]interface{}) {
 			if tmp, ok := val.(map[string]interface{}); ok {
@@ -113,7 +119,7 @@ func DescribeInstances() {
 						tmp["ip_ww"] = tt[0]
 					}
 				}
-				if strings.Contains(qu.ObjToString(tmp["InstanceName"]), "ocr_task") {
+				if aireg.MatchString(qu.ObjToString(tmp["InstanceName"])) || strings.Contains(qu.ObjToString(tmp["InstanceName"]), "ocr_task") {
 					continue
 				}
 				//更新实例信息

+ 2 - 0
src/jy/extract/newextractcity.go

@@ -76,11 +76,13 @@ func (e *ExtractTask) NewExtractCity(j *ju.Job, resulttmp map[string]interface{}
 	title, _ := resulttmp["title"].(string)
 	projectname, _ := resulttmp["projectname"].(string)
 	buyer, _ := resulttmp["buyer"].(string)
+	projectaddr, _ := resulttmp["projectaddr"].(string)
 	//qu.Debug("buyeraddr--", buyeraddr, "--buyer--", buyer, "--title--", title, "--projectname--", projectname)
 	sm.AddKey("buyeraddr", buyeraddr)
 	sm.AddKey("buyer", buyer)
 	sm.AddKey("title", title)
 	sm.AddKey("projectname", projectname)
+	sm.AddKey("projectaddr",projectaddr)
 	//7.buyeraddr buyer title projectname抽取
 	e.NewGetCityByOthers(j, sm, &pscore, &cscore, &dscore)
 	//qu.Debug("全称打分后结果---", j.FullAreaScore, j.FullCityScore, j.FullDistrictScore)

+ 17 - 0
udp_city/src/config.json

@@ -0,0 +1,17 @@
+{
+  "mgodb": "127.0.0.1:27017",
+  "mgodb_xs": "172.17.145.163:27082",
+  "dbsize": 3,
+  "dbname": "enterprise",
+  "mgodb_bidding": "192.168.3.207:27092",
+  "mgodb_bidding_xs": "10.172.242.243:27080",
+  "dbname_bidding": "qfw",
+  "udpport": "1485",
+  "nextNode": [
+    {
+      "addr": "127.0.0.1",
+      "port": 1484,
+      "memo": "抽取回复"
+    }
+  ]
+}

+ 180 - 0
udp_city/src/main.go

@@ -0,0 +1,180 @@
+package main
+
+import (
+	"encoding/json"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	mgo "mongodbutil"
+	"net"
+	qu "qfw/util"
+)
+
+var Udpclient mu.UdpClient //udp对象
+var nextNodes []map[string]interface{}
+var Config map[string]interface{}
+var PageSize = 5000 //查询分页
+var biddingFields = `{"buyer":1,"modifyinfo":1}`
+var qyxyFields = `{"company_code":1,"province":1,"city":1,"district":1}`
+
+func init() {
+	qu.ReadConfig(&Config)
+	if len(Config) == 0 {
+		log.Fatal("读取配置文件失败", Config)
+	}
+	initCap := qu.IntAll(Config["dbsize"])
+	addr := qu.ObjToString(Config["mgodb"])
+	dbname := qu.ObjToString(Config["dbname"])
+	mgo.Mgo = mgo.MgoFactory(initCap, initCap*3, 120, addr, dbname)
+	mgo.Mgo_Bidding = mgo.MgoFactory(initCap, initCap*3, 120, qu.ObjToString(Config["mgodb_bidding"]), qu.ObjToString(Config["dbname_bidding"]))
+	nextNodes = qu.ObjArrToMapArr(Config["nextNode"].([]interface{}))
+	Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(Config["udpport"]), BufSize: 1024}
+	log.Println("udp run ", Config["udpport"])
+	Udpclient.Listen(processUdpMsg)
+}
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var rep map[string]interface{}
+		err := json.Unmarshal(data, &rep)
+		if err != nil {
+			log.Println(err)
+		} else {
+			sid, _ := rep["gtid"].(string)
+			eid, _ := rep["lteid"].(string)
+			if sid == "" || eid == "" {
+				log.Println("err", "sid=", sid, ",eid=", eid)
+				return
+			} else {
+				go Udpclient.WriteUdp(data, mu.OP_NOOP, ra)
+				log.Println("udp通知抽取id段", sid, " ", eid)
+
+				getCity(sid, eid)
+				log.Println("udp通知抽取完成,eid", eid)
+				for _, m := range nextNodes {
+					by, _ := json.Marshal(map[string]interface{}{
+						"gtid":  sid,
+						"lteid": eid,
+						"stype": qu.ObjToString(m["stype"]),
+					})
+					err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+						IP:   net.ParseIP(m["addr"].(string)),
+						Port: qu.IntAll(m["port"]),
+					})
+					if err != nil {
+						log.Println(err)
+					}
+				}
+			}
+		}
+	case mu.OP_NOOP: //下个节点回应
+		log.Println(string(data))
+	}
+}
+
+func getCity(sid, eid string) {
+	index := 0
+	query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
+	count := mgo.Mgo_Bidding.Count("bidding", query)
+	count_bak := mgo.Mgo_Bidding.Count("bidding_bak", query)
+	count += count_bak
+	log.Println("查询条件为:", query, "查询条数:", count)
+	pageNum := (count + PageSize - 1) / PageSize
+	limit := PageSize
+	if count < PageSize {
+		limit = count
+	}
+	table := "bidding_bak"
+	for i := 0; i < pageNum; i++ {
+		query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid),"$lte":bson.ObjectIdHex(eid)}}
+		if mgo.Mgo_Bidding.Count(table, query) < 1 {
+			table = "bidding"
+		} else if table == "bidding_bak" && mgo.Mgo_Bidding.Count("bidding", query) > 0 {
+			log.Printf("page=%d,query=%v,db=%v\n", i+1, query, "bidding")
+			list2, _ := mgo.Mgo_Bidding.Find("bidding", query, nil, biddingFields, false, 0, limit)
+			processingCity(sid, eid, list2, index, "bidding", i)
+		}
+		log.Printf("page=%d,query=%v,db=%v\n", i+1, query, table)
+		list, _ := mgo.Mgo_Bidding.Find(table, query, nil, biddingFields, false, 0, limit)
+		processingCity(sid, eid, list, index, table, i)
+	}
+}
+
+func processingCity(sid string, eid string, list *[]map[string]interface{}, index int, table string, i int) {
+	for _, v := range *list {
+		if qu.ObjToString(v["district"]) != "" && qu.ObjToString(v["city"]) != "" && qu.ObjToString(v["area"]) != "" {
+			index++
+			continue
+		}
+		if qu.ObjToString(v["buyer"]) == "" {
+			index++
+			continue
+		}
+
+		_id := qu.BsonIdToSId(v["_id"])
+		rdata := cityMarshal(v)
+		if len(rdata) > 0 {
+			umap := make(map[string]interface{})
+			if v["modifyinfo"] == nil {
+				umap["modifyinfo"] = make(map[string]interface{})
+			} else {
+				umap["modifyinfo"] = v["modifyinfo"]
+			}
+			for rk, rv := range rdata {
+				umap[rk] = rv
+				umap["modifyinfo"].(map[string]interface{})[rk] = "企业信息"
+			}
+			mgo.Mgo_Bidding.UpdateById(table, v["_id"], map[string]interface{}{
+				"$set": umap,
+			})
+
+		}
+		index++
+		if index%1000 == 0 {
+			log.Println("index:", index, ",页码:", i+1, ",_id:", _id)
+		}
+		sid = _id
+		if sid >= eid {
+			break
+		}
+	}
+}
+func cityMarshal(data map[string]interface{}) map[string]string {
+	rdata := make(map[string]string)
+	buyer := qu.ObjToString(data["buyer"])
+	tmp, _ := mgo.Mgo.FindOneByField("qyxy", `{"company_name":"`+buyer+`"}`, qyxyFields)
+	if tmp == nil || (*tmp) == nil {
+		return rdata
+	}
+	company_code := qu.ObjToString((*tmp)["company_code"])
+	if len(company_code) > 5 {
+		province_city_district, _ := mgo.Mgo.FindOne("address", `{"code":"`+company_code[:6]+`"}`)
+		if province_city_district != nil && (*province_city_district) != nil {
+			if province := qu.ObjToString((*province_city_district)["province"]); province != "" {
+				rdata["area"] = province
+			}
+			if city := qu.ObjToString((*province_city_district)["city"]); city != "" {
+				rdata["city"] = city
+			}
+			if district := qu.ObjToString((*province_city_district)["district"]); district != "" {
+				rdata["district"] = district
+			}
+			return rdata
+		}
+	}
+	if province := qu.ObjToString((*tmp)["province"]); province != "" {
+		rdata["area"] = province
+	}
+	if city := qu.ObjToString((*tmp)["city"]); city != "" {
+		rdata["city"] = city
+	}
+	if district := qu.ObjToString((*tmp)["district"]); district != "" {
+		rdata["district"] = district
+	}
+	return rdata
+}
+func main() {
+	c := make(chan bool)
+	<-c
+}

+ 378 - 0
udp_city/src/mongodbutil/mongodbutil.go

@@ -0,0 +1,378 @@
+package mongodbutil
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"log"
+	"qfw/util"
+	"runtime"
+	"strings"
+
+	. "gopkg.in/mgo.v2/bson"
+)
+
+//统计
+func (m *Pool) Count(c string, query interface{}) int {
+	defer util.Catch()
+	sess := m.Get()
+	//log.Println("count:", m.Size, m.MongodbAddr, m.DB, sess, m.GetMgoConn(), m)
+	var n int = 0
+	if sess != nil {
+		defer m.Close(sess)
+		coll := sess.DB(m.DB).C(c)
+		var err error
+		n, err = coll.Find(ObjToM(query)).Count()
+		if nil != err {
+			log.Println("CountError", err)
+		}
+	}
+	return n
+}
+
+//统计
+func (m *Pool) CountByErr(c string, query interface{}) (int, error) {
+	defer util.Catch()
+	sess := m.Get()
+	//log.Println("count:", m.Size, m.MongodbAddr, m.DB, sess, m.GetMgoConn(), m)
+	var n int = 0
+	if sess != nil {
+		defer m.Close(sess)
+		coll := sess.DB(m.DB).C(c)
+		var err error
+		n, err = coll.Find(ObjToM(query)).Count()
+		if nil != err {
+			return 0, err
+		} else {
+			return n, nil
+		}
+
+	}
+	return n, errors.New("no sess")
+}
+
+func (m *Pool) Update(c string, query interface{}, set interface{}, upsert bool, multi bool) bool {
+	defer util.Catch()
+	sess := m.Get()
+	b := true
+	if sess != nil {
+		defer m.Close(sess)
+		coll := sess.DB(m.DB).C(c)
+		var err error
+		if upsert {
+			_, err = coll.Upsert(ObjToM(query), ObjToM(set))
+		} else {
+			if multi {
+				_, err = coll.UpdateAll(ObjToM(query), ObjToM(set))
+			} else {
+				err = coll.Update(ObjToM(query), ObjToM(set))
+			}
+		}
+		if nil != err {
+			log.Println("UpdateError", err)
+			b = false
+		}
+	}
+	return b
+}
+
+func (m *Pool) UpdateById(c string, id interface{}, set interface{}) bool {
+	defer util.Catch()
+	sess := m.Get()
+	b := false
+	if sess != nil {
+		defer m.Close(sess)
+		coll := sess.DB(m.DB).C(c)
+		var q interface{}
+		if sid, ok := id.(string); ok {
+			q = M{"_id": util.StringTOBsonId(sid)}
+		} else {
+			q = M{"_id": id}
+		}
+		err := coll.Update(q, ObjToM(set))
+		if nil != err {
+			log.Println("UpdateByIdError", err)
+			b = false
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+//批量更新
+
+func (m *Pool) UpdateBulkAll(db, c string, doc ...[]map[string]interface{}) bool {
+	defer util.Catch()
+	sess := m.Get()
+	b := true
+	if sess != nil {
+		defer m.Close(sess)
+		coll := sess.DB(db).C(c)
+		bulk := coll.Bulk()
+		for _, v := range doc {
+			if len(v) == 2 {
+				bulk.Update(v[0], v[1])
+			}
+		}
+		_, err := bulk.Run()
+		if nil != err {
+			log.Println("BulkError", err)
+			b = false
+		}
+	} else {
+		b = false
+	}
+	return b
+}
+
+func (m *Pool) UpdateBulk(c string, doc ...[]map[string]interface{}) bool {
+	return m.UpdateBulkAll(m.DB, c, doc...)
+}
+
+//批量更新
+func (m *Pool) UpSertBulk(c string, doc ...[]map[string]interface{}) bool {
+	defer util.Catch()
+	sess := m.Get()
+	b := true
+	if sess != nil {
+		defer m.Close(sess)
+		coll := sess.DB(m.DB).C(c)
+		bulk := coll.Bulk()
+		for _, v := range doc {
+			if len(v) == 2 {
+				bulk.Upsert(v[0], v[1])
+			}
+		}
+		_, err := bulk.Run()
+		if nil != err {
+			log.Println("BulkUpsertError", err)
+			b = false
+		}
+	} else {
+		b = false
+	}
+	return b
+}
+
+//批量插入
+func (m *Pool) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	defer util.Catch()
+	sess := m.Get()
+	b := true
+	if sess != nil {
+		defer m.Close(sess)
+		coll := sess.DB(m.DB).C(c)
+		bulk := coll.Bulk()
+		for _, v := range doc {
+			bulk.Insert(v)
+		}
+		_, err := bulk.Run()
+		if nil != err {
+			log.Println("BulkError", err)
+			b = false
+		}
+	} else {
+		b = false
+	}
+	return b
+}
+
+func (m *Pool) Save(c string, doc interface{}) string {
+	defer util.Catch()
+	sess := m.Get()
+	if sess != nil {
+		defer m.Close(sess)
+		coll := sess.DB(m.DB).C(c)
+		obj := ObjToM(doc)
+		id := NewObjectId()
+		(*obj)["_id"] = id
+		err := coll.Insert(obj)
+		if nil != err {
+			log.Println("SaveError", err)
+			return ""
+		}
+		return (strings.Split(fmt.Sprintf("%s", id), `"`)[1])
+	}
+	return ""
+}
+
+//查询单条对象
+func (m *Pool) FindOne(c string, query interface{}) (*map[string]interface{}, bool) {
+	return m.FindOneByField(c, query, nil)
+}
+
+//查询单条对象
+func (m *Pool) FindOneByField(c string, query interface{}, fields interface{}) (*map[string]interface{}, bool) {
+	defer util.Catch()
+	res, ok := m.Find(c, query, nil, fields, true, -1, -1)
+	if nil != res && len(*res) > 0 {
+		return &((*res)[0]), ok
+	}
+	return nil, ok
+}
+
+//查询单条对象
+func (m *Pool) FindById(c string, query string, fields interface{}) (*map[string]interface{}, bool) {
+	defer util.Catch()
+	sess := m.Get()
+	var res map[string]interface{}
+	b := false
+	if sess != nil {
+		defer m.Close(sess)
+		res = make(map[string]interface{})
+		coll := sess.DB(m.DB).C(c)
+		var err error
+		err = coll.FindId(ObjectIdHex(query)).Select(ObjToOth(fields)).One(&res)
+		if nil != err {
+			log.Println("FindByIdError", err,query)
+		}
+		b = true
+	}
+	return &res, b
+}
+
+//底层查询方法
+func (m *Pool) Find(c string, query interface{}, order interface{}, fields interface{}, single bool, start int, limit int) (*[]map[string]interface{}, bool) {
+	defer util.Catch()
+	sess := m.Get()
+	var res []map[string]interface{}
+	b := false
+	if sess != nil {
+		defer m.Close(sess)
+		res = make([]map[string]interface{}, 1)
+		coll := sess.DB(m.DB).C(c)
+		var err error
+		if single {
+			err = coll.Find(ObjToM(query)).Select(ObjToOth(fields)).Sort(ObjToArr(order)...).One(&res[0])
+		} else if start > -1 {
+			err = coll.Find(ObjToM(query)).Select(ObjToOth(fields)).Sort(ObjToArr(order)...).Skip(start).Limit(limit).All(&res)
+		} else {
+			err = coll.Find(ObjToM(query)).Select(ObjToOth(fields)).Sort(ObjToArr(order)...).All(&res)
+		}
+		if nil != err {
+			//log.Println("FindError", err)
+		}
+		b = true
+	}
+	return &res, b
+}
+
+//删除对象
+func (m *Pool) Del(c string, query interface{}) bool {
+	defer util.Catch()
+	sess := m.Get()
+	b := false
+	if sess != nil {
+		defer m.Close(sess)
+		coll := sess.DB(m.DB).C(c)
+		_, err := coll.RemoveAll(ObjToM(query))
+		if nil != err {
+			log.Println("DelError", err)
+			b = false
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+func (m *Pool) GetObjectId(str string) ObjectId {
+	return ObjectIdHex(str)
+}
+
+func ObjToOth(query interface{}) *M {
+	return ObjToMQ(query, false)
+}
+func ObjToM(query interface{}) *M {
+	return ObjToMQ(query, true)
+}
+
+//obj(string,M)转M,查询用到
+func ObjToMQ(query interface{}, isQuery bool) *M {
+	data := make(M)
+	defer func() {
+		if r := recover(); r != nil {
+			log.Println("[E]", r)
+			for skip := 1; ; skip++ {
+				_, file, line, ok := runtime.Caller(skip)
+				if !ok {
+					break
+				}
+				go log.Printf("%v,%v\n", file, line)
+			}
+		}
+	}()
+	if s2, ok2 := query.(*map[string]interface{}); ok2 {
+		data = M(*s2)
+	} else if s3, ok3 := query.(*M); ok3 {
+		return s3
+	} else if s, ok := query.(string); ok {
+		json.Unmarshal([]byte(strings.Replace(s, "'", "\"", -1)), &data)
+		if ss, oks := data["_id"]; oks && isQuery {
+			switch ss.(type) {
+			case string:
+				data["_id"] = ObjectIdHex(ss.(string))
+			case map[string]interface{}:
+				tmp := ss.(map[string]interface{})
+				for k, v := range tmp {
+					tmp[k] = ObjectIdHex(v.(string))
+				}
+				data["_id"] = tmp
+			}
+
+		}
+	} else if s1, ok1 := query.(map[string]interface{}); ok1 {
+		data = s1
+	} else if s4, ok4 := query.(M); ok4 {
+		data = s4
+	} else {
+		data = nil
+	}
+	return &data
+}
+
+//对象转数组
+func ObjToArr(obj interface{}) []string {
+	if s, ok := obj.(string); ok {
+		if strings.ContainsAny(s, "{") {
+			//暂时简单支持此种写法
+			var temp = make(M)
+			var str = []string{}
+			json.Unmarshal([]byte(s), &temp)
+			for k, v := range temp {
+				m := util.IntAll(v)
+				if m > 0 {
+					str = append(str, k)
+				} else {
+					str = append(str, "-"+k)
+				}
+			}
+			return str
+		} else {
+			return strings.Split(s, ",")
+		}
+	} else if s1, ok1 := obj.([]string); ok1 {
+		return s1
+	} else {
+		return []string{}
+	}
+}
+
+//删除表
+func (m *Pool) DelColl(c string) bool {
+	defer util.Catch()
+	sess := m.Get()
+	b := true
+	if sess != nil {
+		defer m.Close(sess)
+		coll := sess.DB(m.DB).C(c)
+		err := coll.DropCollection()
+		if err != nil {
+			b = false
+		}
+	} else {
+		b = false
+	}
+	return b
+}

+ 117 - 0
udp_city/src/mongodbutil/pool.go

@@ -0,0 +1,117 @@
+package mongodbutil
+
+import (
+	"log"
+	"sync"
+	"time"
+
+	mgo "gopkg.in/mgo.v2"
+)
+
+var Mgo *Pool
+var Mgo_Bidding *Pool
+
+type Pool struct {
+	mu      sync.RWMutex
+	initCap int
+	maxCap  int
+	timeout int64
+	ch      chan *mgosess
+	addr    string
+	DB      string
+	live    int
+}
+type mgosess struct {
+	sess      *mgo.Session
+	timestamp int64
+}
+
+func MgoFactory(initCap int, maxCap int, timeout int64, addr, DB string) *Pool {
+	p := &Pool{sync.RWMutex{}, initCap, maxCap, timeout, make(chan *mgosess, maxCap), addr, DB, 0}
+	p.init()
+	go p.gc()
+	return p
+}
+
+func (p *Pool) GetLive() int {
+	return p.live
+}
+
+func (p *Pool) init() {
+	for i := 0; i < p.initCap; i++ {
+		sess, err := mgo.DialWithTimeout(p.addr, time.Duration(p.timeout)*time.Second)
+		if sess != nil && sess.Ping() == nil {
+			p.live++
+			p.ch <- &mgosess{sess, time.Now().Unix()}
+		} else {
+			log.Println(err.Error())
+		}
+	}
+}
+
+func (p *Pool) Get() (sess *mgo.Session) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.live > 0 {
+		select {
+		case mgos := <-p.ch:
+			if mgos.sess.Ping() == nil {
+				sess = mgos.sess
+			} else {
+				p.live--
+			}
+		case <-time.After(100 * time.Millisecond):
+		}
+	}
+	if sess == nil && p.live < p.maxCap {
+		s, err := mgo.DialWithTimeout(p.addr, 10*time.Second)
+		if s != nil && s.Ping() == nil {
+			p.live++
+			sess = s
+		} else {
+			log.Println(err.Error())
+		}
+	}
+	return
+}
+
+func (p *Pool) Close(sess *mgo.Session) {
+	if sess != nil {
+		if sess.Ping() == nil {
+			p.mu.Lock()
+			select {
+			case p.ch <- &mgosess{sess, time.Now().Unix()}:
+			default:
+				p.live--
+			}
+			p.mu.Unlock()
+		} else {
+			p.live--
+		}
+	}
+}
+
+func (p *Pool) gc() {
+	p.mu.Lock()
+	size := len(p.ch)
+	if size > p.initCap {
+		tn := time.Now().Unix()
+		init1 := 0
+		for i := 0; i < size; i++ {
+			select {
+			case c := <-p.ch:
+				if tn-c.timestamp < p.timeout || init1 < p.initCap {
+					p.ch <- c
+					init1++
+				} else {
+					c.sess.Close()
+					p.live--
+				}
+			default:
+			}
+		}
+	}
+	p.mu.Unlock()
+	//log.Println("size:", size, "live:", p.live)
+	time.AfterFunc(time.Duration(p.timeout)*time.Second, p.gc)
+}