Эх сурвалжийг харах

信息融合-项目搭建

apple 4 жил өмнө
parent
commit
6909bbecc8

+ 23 - 0
udpfusion/src/config.json

@@ -0,0 +1,23 @@
+{
+  "udpport": ":17330",
+  "mongodb": {
+    "addrName": "192.168.3.207:27092",
+    "dbName": "zhengkun",
+    "collName": "test",
+    "pool": 10,
+    "site": {
+      "site_dbname": "qfw",
+      "site_coll": "site"
+    }
+  },
+  "fusion_coll_name":"fusiondata",
+  "record_coll_name":"recorddata",
+  "":"",
+  "jkmail": {
+    "to": "zhengkun@topnet.net.cn",
+    "api": "http://10.171.112.160:19281/_send/_mail"
+  },
+  "nextNode": [
+
+  ]
+}

+ 183 - 0
udpfusion/src/main.go

@@ -0,0 +1,183 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	mu "mfw/util"
+	"net"
+	"os"
+	"qfw/common/src/qfw/util"
+	qu "qfw/util"
+	"time"
+)
+
+
+var (
+	sysconfig    map[string]interface{} //配置文件
+	mgo          *MongodbSim            //mongodb操作对象
+	udpclient    mu.UdpClient             //udp对象
+	nextNode     []map[string]interface{} //下节点数组
+	coll_name,fusion_coll_name,record_coll_name 	 string
+)
+
+
+
+func initMgo()  {
+	mconf := sysconfig["mongodb"].(map[string]interface{})
+	log.Println(mconf)
+	mgo = &MongodbSim{
+		MongodbAddr: mconf["addrName"].(string),
+		DbName:      mconf["dbName"].(string),
+		Size:        qu.IntAllDef(mconf["pool"], 10),
+	}
+	mgo.InitPool()
+
+
+	coll_name = mconf["collName"].(string)
+	fusion_coll_name = sysconfig["fusion_coll_name"].(string)
+	record_coll_name = sysconfig["record_coll_name"].(string)
+}
+
+
+func init() {
+	//加载配置文件
+	qu.ReadConfig(&sysconfig)
+	initMgo()
+	log.Println("采用udp模式")
+}
+
+
+func mainT() {
+	go checkMapJob()
+	updport := sysconfig["udpport"].(string)
+	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Println("Udp服务监听", updport)
+	time.Sleep(99999 * time.Hour)
+}
+
+//快速测试使用
+func main() {
+
+	sid := "1f0000000000000000000000"
+	eid := "9f0000000000000000000000"
+	log.Println(sid, "---", eid)
+	mapinfo := map[string]interface{}{}
+	if sid == "" || eid == "" {
+		log.Println("sid,eid参数不能为空")
+		os.Exit(0)
+	}
+	mapinfo["gtid"] = sid
+	mapinfo["lteid"] = eid
+	startTask([]byte{}, mapinfo)
+	time.Sleep(99999 * time.Hour)
+
+}
+
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	switch act {
+	case mu.OP_TYPE_DATA: //上个节点的数据
+		//从表中开始处理
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		log.Println("err:", err, "mapInfo:", mapInfo)
+		if err != nil {
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+		} else if mapInfo != nil {
+			taskType := qu.ObjToString(mapInfo["stype"])
+			if taskType == "fusion" {
+				go startTask(data, mapInfo)
+			} else {
+				log.Println("未知类型:融合异常... ...")
+			}
+			key, _ := mapInfo["key"].(string)
+			if key == "" {
+				key = "udpok"
+			}
+			udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+		}
+	case mu.OP_NOOP: //下个节点回应
+		ok := string(data)
+		if ok != "" {
+			log.Println("ok:", ok)
+			udptaskmap.Delete(ok)
+		}
+	}
+}
+
+
+
+//融合具体方法
+func startTask(data []byte, mapInfo map[string]interface{}) {
+
+	//遍历数据
+	log.Println("开始融合流程")
+	defer qu.Catch()
+	//区间id
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
+			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
+		},
+	}
+	log.Println("查询条件:",q)
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
+	updateExtract := [][]map[string]interface{}{}
+	index:=0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if index%10000 == 0 {
+			log.Println("当前数量:", index, tmp["_id"])
+		}
+
+
+
+
+
+
+		tmp = make(map[string]interface{})
+	}
+
+
+	if len(updateExtract) >0 {
+		mgo.UpSertBulk(coll_name, updateExtract...)
+
+	}
+
+
+	log.Println("task fusion over - 总计数量",index)
+
+	time.Sleep(30 * time.Second)
+
+	//任务完成,开始发送广播通知下面节点
+
+	sendUdp(mapInfo)
+
+
+
+}
+
+func sendUdp(mapinfo map[string]interface{})  {
+
+	//log.Println("信息融合结束-发送udp")
+	for _, to := range nextNode {
+		sid, _ := mapinfo["gtid"].(string)
+		eid, _ := mapinfo["lteid"].(string)
+		key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
+		by, _ := json.Marshal(map[string]interface{}{
+			"gtid":  sid,
+			"lteid": eid,
+			"stype": util.ObjToString(to["stype"]),
+			"key":   key,
+		})
+		addr := &net.UDPAddr{
+			IP:   net.ParseIP(to["addr"].(string)),
+			Port: util.IntAll(to["port"]),
+		}
+		node := &udpNode{by, addr, time.Now().Unix(), 0}
+		udptaskmap.Store(key, node)
+		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+	}
+}

+ 315 - 0
udpfusion/src/mgo.go

@@ -0,0 +1,315 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 59 - 0
udpfusion/src/sendmail.go

@@ -0,0 +1,59 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	"net/http"
+	"sync"
+	"time"
+)
+
+var udptaskmap = &sync.Map{}
+var tomail string
+var api string
+
+type udpNode struct {
+	data      []byte
+	addr      *net.UDPAddr
+	timestamp int64
+	retry     int
+}
+
+func checkMapJob() {
+	//阿里云内网无法发送邮件
+	jkmail, _ := sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println("start checkMapJob", tomail, sysconfig["jkmail"])
+	for {
+		udptaskmap.Range(func(k, v interface{}) bool {
+			now := time.Now().Unix()
+			node, _ := v.(*udpNode)
+			if now-node.timestamp > 120 {
+				node.retry++
+				if node.retry > 5 {
+					log.Println("udp重试失败", k)
+					udptaskmap.Delete(k)
+					res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "extract-send-fail", k.(string)))
+					if err == nil {
+						defer res.Body.Close()
+						read, err := ioutil.ReadAll(res.Body)
+						log.Println("邮件发发送:", string(read), err)
+					}
+				} else {
+					log.Println("udp重发", k)
+					udpclient.WriteUdp(node.data, mu.OP_TYPE_DATA, node.addr)
+				}
+			} else if now-node.timestamp > 10 {
+				log.Println("udp任务超时中..", k)
+			}
+			return true
+		})
+		time.Sleep(60 * time.Second)
+	}
+}

+ 62 - 0
udpfusion/src/updateFusion.go

@@ -0,0 +1,62 @@
+package main
+
+import (
+	"log"
+	"time"
+)
+
+type updateFusionInfo struct {
+
+	//更新或新增通道
+	updatePool chan []map[string]interface{}
+	//数量
+	saveSize   	int
+
+}
+
+
+
+
+var sp_f = make(chan bool, 5)
+
+func newUpdateFusionPool() *updateFusionInfo {
+	update:=&updateFusionInfo{make(chan []map[string]interface{}, 50000),500}
+	return update
+}
+
+
+func (update *updateFusionInfo) updateFusionData() {
+	log.Println("开始不断监听--待更新数据")
+	tmpArr := make([][]map[string]interface{}, update.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-update.updatePool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == update.saveSize {
+				sp_f <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp_f
+					}()
+					mgo.UpSertBulk(fusion_coll_name, dataArr...)
+				}(tmpArr)
+				tmpArr = make([][]map[string]interface{}, update.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp_f <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp_f
+					}()
+					mgo.UpSertBulk(fusion_coll_name, dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([][]map[string]interface{}, update.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}

+ 62 - 0
udpfusion/src/updateRecord.go

@@ -0,0 +1,62 @@
+package main
+
+import (
+	"log"
+	"time"
+)
+
+type updateRecordInfo struct {
+
+	//更新或新增通道
+	updatePool chan []map[string]interface{}
+	//数量
+	saveSize   	int
+
+}
+
+
+
+
+var sp_r = make(chan bool, 5)
+
+func newUpdateRecordPool() *updateRecordInfo {
+	update:=&updateRecordInfo{make(chan []map[string]interface{}, 50000),500}
+	return update
+}
+
+
+func (update *updateRecordInfo) updateRecordData() {
+	log.Println("开始不断监听--待更新数据")
+	tmpArr := make([][]map[string]interface{}, update.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-update.updatePool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == update.saveSize {
+				sp_r <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp_r
+					}()
+					mgo.UpSertBulk(record_coll_name, dataArr...)
+				}(tmpArr)
+				tmpArr = make([][]map[string]interface{}, update.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp_r <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp_r
+					}()
+					mgo.UpSertBulk(record_coll_name, dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([][]map[string]interface{}, update.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}