maxiaoshan 6 rokov pred
rodič
commit
a906c5df10

+ 121 - 0
src/extractbrand/src/brand.go

@@ -0,0 +1,121 @@
+package main
+
+import (
+	"log"
+	"regexp"
+	"time"
+	//"qfw/util"
+	"sync"
+
+	. "gopkg.in/mgo.v2/bson"
+)
+
+var wg *sync.WaitGroup
+var lock *sync.Mutex
+var letter *regexp.Regexp = regexp.MustCompile(`^[a-zA-Z]+&?`)
+
+func InitBrand() {
+	//初始化db
+	sess := brandMgo.GetMgoConn()
+	defer brandMgo.DestoryMongoConn(sess)
+
+	BrandDFA = &DFA{
+		Link: make(map[string]interface{}),
+	}
+	//查品牌库品牌
+	var res []M
+	//c, _ := sess.DB("spider").C("JD_commodity").Count()
+	sess.DB(brandDbname).C(brandCollname).Pipe([]M{M{"$group": M{"_id": "$brand"}}}).All(&res)
+	n := 0
+	for _, b := range res {
+		brand := b["_id"].(string)
+		if len(brand) > 50 || len(brand) == 1 || (len(brand) == 3 && !letter.MatchString(brand)) {
+			log.Println("err brand:", brand)
+			continue
+		}
+		n++
+		BrandDFA.AddWord(brand) //将品牌库加入DFA
+	}
+	log.Println("brand num :", n)
+}
+
+func UdpTask(sid, eid string) {
+	t := time.Now()
+	log.Println("执行任务")
+	query := M{"_id": M{"$gte": ObjectIdHex(sid), "$lte": ObjectIdHex(eid)}}
+	//附件库
+	sess := appendixMgo.GetMgoConn()
+	defer appendixMgo.DestoryMongoConn(sess)
+
+	poolSize := make(chan bool, pool)
+	wg = &sync.WaitGroup{}
+	lock = &sync.Mutex{}                   //控制读写
+	update := [][]map[string]interface{}{} //批量更新的数据
+
+	data := sess.DB(appendixDbname).C(appendixCollname).Find(query).Sort("_id").Iter()
+	sum := 0
+	for tmp := make(map[string]interface{}); data.Next(tmp); sum++ {
+		if sum%100 == 0 {
+			log.Println("current:", sum)
+		}
+		poolSize <- true
+		wg.Add(1)
+		go func(d map[string]interface{}) {
+			defer func() {
+				<-poolSize
+				wg.Done()
+			}()
+
+			brandArr := GetBrand(d)
+			if len(brandArr) > 0 { //匹配到品牌再处理
+				tmpArr := []map[string]interface{}{} //存储某条数据的id和要更新内容
+				_id := map[string]interface{}{
+					"_id": d["_id"],
+				}
+				tmpArr = append(tmpArr, _id)
+				//				pushAll := map[string]interface{}{
+				//					"$pushAll": map[string]interface{}{
+				//						"conbrand": brandArr,
+				//					},
+				//				}
+				addToSet := map[string]interface{}{
+					"$addToSet": map[string]interface{}{
+						"conbrand": map[string]interface{}{
+							"$each": brandArr,
+						},
+					},
+				}
+				tmpArr = append(tmpArr, addToSet)
+				lock.Lock()
+				update = append(update, tmpArr)
+				if len(update) > savesize {
+					appendixMgo.UpdateBulk(appendixCollname, update...)
+					update = [][]map[string]interface{}{} //更新后把数据置空
+				}
+				lock.Unlock()
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	lock.Lock()
+	if len(update) > 0 {
+		appendixMgo.UpdateBulk(appendixCollname, update...)
+		update = [][]map[string]interface{}{} //更新后把数据置空
+	}
+	lock.Unlock()
+	log.Println("--task over--", time.Since(t).Seconds())
+}
+
+func GetBrand(data map[string]interface{}) (brandArr []string) {
+	if projectinfo, ok := data["projectinfo"].(map[string]interface{}); ok {
+		attachments := projectinfo["attachments"].(map[string]interface{})
+		for _, m := range attachments {
+			val := m.(map[string]interface{})
+			if content, ok := val["content"].(string); ok { //附件文本
+				brandArr = append(brandArr, BrandDFA.CheckSensitiveWord(content)...)
+			}
+		}
+	}
+	return
+}

+ 17 - 0
src/extractbrand/src/config.json

@@ -0,0 +1,17 @@
+{
+	"udpport":"1482",
+	"pool":6,
+	"savesize":200,
+	"brand":{
+		"mgodb":"192.168.3.207:27082",
+		"dbsize": 2,
+		"dbname":"spider",
+		"collname":"JD_commodity"
+	},
+	"appendix":{
+		"mgodb":"192.168.3.207:27082",
+		"dbsize": 2,
+		"dbname":"mxs",
+		"collname":"bidding_file"
+	}
+}

+ 59 - 0
src/extractbrand/src/dfa.go

@@ -0,0 +1,59 @@
+package main
+
+import (
+	"qfw/util"
+)
+
+var BrandDFA *DFA
+
+type DFA struct {
+	Link map[string]interface{}
+}
+
+func (d *DFA) AddWord(keys ...string) {
+	d.AddWordAll(true, keys...)
+}
+func (d *DFA) AddWordAll(haskey bool, keys ...string) {
+	if d.Link == nil {
+		d.Link = make(map[string]interface{})
+	}
+	for _, key := range keys {
+		nowMap := &d.Link
+		for i := 0; i < len(key); i++ {
+			kc := key[i : i+1]
+			if v, ok := (*nowMap)[kc]; ok {
+				nowMap, _ = v.(*map[string]interface{})
+			} else {
+				newMap := map[string]interface{}{}
+				newMap["YN"] = "0" //不是最后一个
+				(*nowMap)[kc] = &newMap
+				nowMap = &newMap
+			}
+			if i == len(key)-1 {
+				(*nowMap)["YN"] = "1" //最后一个
+				if haskey {
+					(*nowMap)["K"] = key
+				}
+			}
+		}
+	}
+}
+
+func (d *DFA) CheckSensitiveWord(src string) []string {
+	res := make([]string, 0)
+	for j := 0; j < len(src); j++ {
+		nowMap := &d.Link
+		for i := j; i < len(src); i++ {
+			word := src[i : i+1]
+			nowMap, _ = (*nowMap)[word].(*map[string]interface{})
+			if nowMap != nil { // 存在,则判断是否为最后一个
+				if "1" == util.ObjToString((*nowMap)["YN"]) {
+					res = append(res, util.ObjToString((*nowMap)["K"]))
+				}
+			} else {
+				break
+			}
+		}
+	}
+	return res
+}

+ 96 - 0
src/extractbrand/src/main.go

@@ -0,0 +1,96 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	. "qfw/util/mongodb"
+	"time"
+)
+
+var (
+	Sysconfig   map[string]interface{} //配置文件
+	brandMgo    *MongodbSim            //mongodb操作对象
+	appendixMgo *MongodbSim            //mongodb操作对象
+	udpclient   mu.UdpClient           //udp对象
+	udpport     string                 //udp端口
+	pool        int                    //并发数
+	savesize    int
+	//品牌库信息
+	brandMgodb    string
+	brandDbname   string
+	brandDbsize   int
+	brandCollname string
+	//附件库信息
+	appendixMgodb    string
+	appendixDbname   string
+	appendixDbsize   int
+	appendixCollname string
+)
+
+func init() {
+	util.ReadConfig("config.json", &Sysconfig)
+	udpport, _ = Sysconfig["udpport"].(string)
+	pool = util.IntAllDef(Sysconfig["pool"], 5)
+	savesize = util.IntAllDef(Sysconfig["savesize"], 200)
+	//品牌库
+	brand := Sysconfig["brand"].(map[string]interface{})
+	brandMgodb, _ = brand["mgodb"].(string)
+	brandDbname, _ = brand["dbname"].(string)
+	brandDbsize = util.IntAllDef(brand["dbsize"], 5)
+	brandCollname, _ = brand["collname"].(string)
+	brandMgo = &MongodbSim{
+		MongodbAddr: brandMgodb,
+		Size:        brandDbsize,
+		DbName:      brandDbname,
+	}
+	brandMgo.InitPool()
+	//附件库
+	appendix := Sysconfig["appendix"].(map[string]interface{})
+	appendixMgodb, _ = appendix["mgodb"].(string)
+	appendixDbname, _ = appendix["dbname"].(string)
+	appendixDbsize = util.IntAllDef(appendix["dbsize"], 5)
+	appendixCollname, _ = appendix["collname"].(string)
+	appendixMgo = &MongodbSim{
+		MongodbAddr: appendixMgodb,
+		Size:        appendixDbsize,
+		DbName:      appendixDbname,
+	}
+
+	appendixMgo.InitPool()
+	//初始化品牌库
+	InitBrand()
+
+}
+func main() {
+	log.Println("udpport", udpport)
+	udpclient = mu.UdpClient{Local: ":" + udpport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Println("Udp服务监听", udpport)
+	time.Sleep(99999 * time.Hour)
+}
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	defer util.Catch()
+	switch act {
+	case mu.OP_TYPE_DATA: //上个节点的数据
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		log.Println("err:", err, "mapInfo:", mapInfo)
+		if err != nil {
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+		} else if mapInfo != nil {
+			sid, _ := mapInfo["gtid"].(string)
+			eid, _ := mapInfo["lteid"].(string)
+			udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
+			UdpTask(sid, eid)
+		}
+	case mu.OP_NOOP: //下个节点回应
+		ok := string(data)
+		if ok != "" {
+			log.Println("ok:", ok)
+		}
+	}
+}