|
@@ -9,26 +9,28 @@ import (
|
|
|
|
|
|
type (
|
|
|
// 图结构
|
|
|
- DAG struct {
|
|
|
- Name string
|
|
|
- Vertexs []*Vertex
|
|
|
+ DAG[T comparable] struct {
|
|
|
+ Name string
|
|
|
+ Vertexs []*Vertex[T]
|
|
|
+ LayerVectexs [][]*Vertex[T]
|
|
|
}
|
|
|
|
|
|
// 顶点
|
|
|
- Vertex struct {
|
|
|
+ Vertex[T comparable] struct {
|
|
|
Key string //名称
|
|
|
- Value interface{}
|
|
|
- Parents []*Vertex
|
|
|
- Children []*Vertex
|
|
|
+ Label string //
|
|
|
+ Value T
|
|
|
+ Parents []*Vertex[T]
|
|
|
+ Children []*Vertex[T]
|
|
|
}
|
|
|
|
|
|
// 配置文件项
|
|
|
- Config struct {
|
|
|
+ Config[T comparable] struct {
|
|
|
Name string `yaml:"name"`
|
|
|
Vectexs []*struct {
|
|
|
- Key string `yaml:"key"`
|
|
|
- Label string `yaml:"label"`
|
|
|
- OtherParam interface{} `yaml:"otherParam"`
|
|
|
+ Key string `yaml:"key"`
|
|
|
+ Label string `yaml:"label"`
|
|
|
+ Arguments T `yaml:"arguments"`
|
|
|
} `yaml:"vertexs"`
|
|
|
Edges []*struct {
|
|
|
From string `yaml:"from"`
|
|
@@ -38,33 +40,34 @@ type (
|
|
|
)
|
|
|
|
|
|
// 添加顶点
|
|
|
-func (dag *DAG) AddVertex(v *Vertex) {
|
|
|
+func (dag *DAG[T]) AddVertex(v *Vertex[T]) {
|
|
|
dag.Vertexs = append(dag.Vertexs, v)
|
|
|
}
|
|
|
|
|
|
// 添加顶点
|
|
|
-func (dag *DAG) AddVertexs(v ...*Vertex) {
|
|
|
+func (dag *DAG[T]) AddVertexs(v ...*Vertex[T]) {
|
|
|
dag.Vertexs = append(dag.Vertexs, v...)
|
|
|
}
|
|
|
|
|
|
// 添加边,连线
|
|
|
-func (dag *DAG) AddEdge(from, to *Vertex) {
|
|
|
+func (dag *DAG[T]) AddEdge(from, to *Vertex[T]) {
|
|
|
from.Children = append(from.Children, to)
|
|
|
to.Parents = append(to.Parents, from)
|
|
|
}
|
|
|
|
|
|
// bfsRange 广度优先,双层切片
|
|
|
-func bfsRange(root *Vertex) [][]*Vertex {
|
|
|
+func (dag *DAG[T]) bfsRange() {
|
|
|
+ root := dag.Vertexs[0]
|
|
|
q := NewQueue()
|
|
|
q.Add(root)
|
|
|
- visited := make(map[string]*Vertex)
|
|
|
- all := make([][]*Vertex, 0)
|
|
|
+ visited := make(map[string]*Vertex[T])
|
|
|
+ all := make([][]*Vertex[T], 0)
|
|
|
for q.Length() > 0 {
|
|
|
qSize := q.Length()
|
|
|
- tmp := make([]*Vertex, 0)
|
|
|
+ tmp := make([]*Vertex[T], 0)
|
|
|
for i := 0; i < qSize; i++ {
|
|
|
//pop vertex
|
|
|
- currVert := q.Remove().(*Vertex)
|
|
|
+ currVert := q.Remove().(*Vertex[T])
|
|
|
if _, ok := visited[currVert.Key]; ok {
|
|
|
continue
|
|
|
}
|
|
@@ -76,18 +79,16 @@ func bfsRange(root *Vertex) [][]*Vertex {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- all = append([][]*Vertex{tmp}, all...)
|
|
|
+ all = append([][]*Vertex[T]{tmp}, all...)
|
|
|
}
|
|
|
- return all
|
|
|
+ dag.LayerVectexs = all
|
|
|
}
|
|
|
|
|
|
// RunDag 并行执行任务
|
|
|
-func RunDag(dat *DAG, threads int, fn func(v *Vertex)) {
|
|
|
- root := dat.Vertexs[0]
|
|
|
- all := bfsRange(root)
|
|
|
+func (dag *DAG[T]) RunDag(threads int, fn func(v *Vertex[T])) {
|
|
|
|
|
|
//
|
|
|
- runTaskFn := func(v *Vertex, wg *sync.WaitGroup, lock <-chan bool) {
|
|
|
+ runTaskFn := func(v *Vertex[T], wg *sync.WaitGroup, lock <-chan bool) {
|
|
|
defer func() {
|
|
|
wg.Done()
|
|
|
<-lock
|
|
@@ -95,7 +96,7 @@ func RunDag(dat *DAG, threads int, fn func(v *Vertex)) {
|
|
|
fn(v)
|
|
|
}
|
|
|
//concurrentExecutionFn
|
|
|
- concurrentExecutionFn := func(layer []*Vertex) {
|
|
|
+ concurrentExecutionFn := func(layer []*Vertex[T]) {
|
|
|
lock := make(chan bool, threads)
|
|
|
wg := new(sync.WaitGroup)
|
|
|
for _, v := range layer {
|
|
@@ -106,29 +107,30 @@ func RunDag(dat *DAG, threads int, fn func(v *Vertex)) {
|
|
|
wg.Wait()
|
|
|
}
|
|
|
//
|
|
|
- for _, layer := range all {
|
|
|
+ for _, layer := range dag.LayerVectexs {
|
|
|
concurrentExecutionFn(layer)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// LoadFromYaml 节点,边,添加的顺序都需要颠倒过来
|
|
|
-func LoadFromYamlFile(f string) (*DAG, error) {
|
|
|
+func LoadFromYamlFile[T comparable](f string) (*DAG[T], error) {
|
|
|
bs, err := ioutil.ReadFile(f)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- dag := &DAG{}
|
|
|
- cf := new(Config)
|
|
|
+ dag := &DAG[T]{}
|
|
|
+ cf := new(Config[T])
|
|
|
err = yaml.Unmarshal(bs, cf)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- cache := map[string]*Vertex{}
|
|
|
- arr := make([]*Vertex, len(cf.Vectexs), len(cf.Vectexs))
|
|
|
+ cache := map[string]*Vertex[T]{}
|
|
|
+ arr := make([]*Vertex[T], len(cf.Vectexs), len(cf.Vectexs))
|
|
|
for i, v := range cf.Vectexs {
|
|
|
- vertex := &Vertex{
|
|
|
+ vertex := &Vertex[T]{
|
|
|
Key: v.Key,
|
|
|
- Value: v.Label,
|
|
|
+ Label: v.Label,
|
|
|
+ Value: v.Arguments,
|
|
|
}
|
|
|
arr[len(cf.Vectexs)-i-1] = vertex
|
|
|
cache[v.Key] = vertex
|
|
@@ -138,5 +140,6 @@ func LoadFromYamlFile(f string) (*DAG, error) {
|
|
|
edge := cf.Edges[i-1]
|
|
|
dag.AddEdge(cache[edge.To], cache[edge.From])
|
|
|
}
|
|
|
+ dag.bfsRange()
|
|
|
return dag, nil
|
|
|
}
|