|
@@ -0,0 +1,142 @@
|
|
|
|
+package dag
|
|
|
|
+
|
|
|
|
+import (
|
|
|
|
+ "io/ioutil"
|
|
|
|
+ "sync"
|
|
|
|
+
|
|
|
|
+ "gopkg.in/yaml.v2"
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+type (
|
|
|
|
+ // 图结构
|
|
|
|
+ DAG struct {
|
|
|
|
+ Name string
|
|
|
|
+ Vertexs []*Vertex
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 顶点
|
|
|
|
+ Vertex struct {
|
|
|
|
+ Key string //名称
|
|
|
|
+ Value interface{}
|
|
|
|
+ Parents []*Vertex
|
|
|
|
+ Children []*Vertex
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 配置文件项
|
|
|
|
+ Config struct {
|
|
|
|
+ Name string `yaml:"name"`
|
|
|
|
+ Vectexs []*struct {
|
|
|
|
+ Key string `yaml:"key"`
|
|
|
|
+ Label string `yaml:"label"`
|
|
|
|
+ OtherParam interface{} `yaml:"otherParam"`
|
|
|
|
+ } `yaml:"vertexs"`
|
|
|
|
+ Edges []*struct {
|
|
|
|
+ From string `yaml:"from"`
|
|
|
|
+ To string `yaml:"to"`
|
|
|
|
+ } `yaml:"edges"`
|
|
|
|
+ }
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+// 添加顶点
|
|
|
|
+func (dag *DAG) AddVertex(v *Vertex) {
|
|
|
|
+ dag.Vertexs = append(dag.Vertexs, v)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// 添加顶点
|
|
|
|
+func (dag *DAG) AddVertexs(v ...*Vertex) {
|
|
|
|
+ dag.Vertexs = append(dag.Vertexs, v...)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// 添加边,连线
|
|
|
|
+func (dag *DAG) AddEdge(from, to *Vertex) {
|
|
|
|
+ from.Children = append(from.Children, to)
|
|
|
|
+ to.Parents = append(to.Parents, from)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// bfsRange 广度优先,双层切片
|
|
|
|
+func bfsRange(root *Vertex) [][]*Vertex {
|
|
|
|
+ q := NewQueue()
|
|
|
|
+ q.Add(root)
|
|
|
|
+ visited := make(map[string]*Vertex)
|
|
|
|
+ all := make([][]*Vertex, 0)
|
|
|
|
+ for q.Length() > 0 {
|
|
|
|
+ qSize := q.Length()
|
|
|
|
+ tmp := make([]*Vertex, 0)
|
|
|
|
+ for i := 0; i < qSize; i++ {
|
|
|
|
+ //pop vertex
|
|
|
|
+ currVert := q.Remove().(*Vertex)
|
|
|
|
+ if _, ok := visited[currVert.Key]; ok {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ visited[currVert.Key] = currVert
|
|
|
|
+ tmp = append(tmp, currVert)
|
|
|
|
+ for _, val := range currVert.Children {
|
|
|
|
+ if _, ok := visited[val.Key]; !ok {
|
|
|
|
+ q.Add(val) //add child
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ all = append([][]*Vertex{tmp}, all...)
|
|
|
|
+ }
|
|
|
|
+ return all
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// RunDag 并行执行任务
|
|
|
|
+func RunDag(dat *DAG, threads int, fn func(v *Vertex)) {
|
|
|
|
+ root := dat.Vertexs[0]
|
|
|
|
+ all := bfsRange(root)
|
|
|
|
+
|
|
|
|
+ //
|
|
|
|
+ runTaskFn := func(v *Vertex, wg *sync.WaitGroup, lock <-chan bool) {
|
|
|
|
+ defer func() {
|
|
|
|
+ wg.Done()
|
|
|
|
+ <-lock
|
|
|
|
+ }()
|
|
|
|
+ fn(v)
|
|
|
|
+ }
|
|
|
|
+ //concurrentExecutionFn
|
|
|
|
+ concurrentExecutionFn := func(layer []*Vertex) {
|
|
|
|
+ lock := make(chan bool, threads)
|
|
|
|
+ wg := new(sync.WaitGroup)
|
|
|
|
+ for _, v := range layer {
|
|
|
|
+ wg.Add(1)
|
|
|
|
+ lock <- true
|
|
|
|
+ go runTaskFn(v, wg, lock)
|
|
|
|
+ }
|
|
|
|
+ wg.Wait()
|
|
|
|
+ }
|
|
|
|
+ //
|
|
|
|
+ for _, layer := range all {
|
|
|
|
+ concurrentExecutionFn(layer)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// LoadFromYaml 节点,边,添加的顺序都需要颠倒过来
|
|
|
|
+func LoadFromYamlFile(f string) (*DAG, error) {
|
|
|
|
+ bs, err := ioutil.ReadFile(f)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+ dag := &DAG{}
|
|
|
|
+ cf := new(Config)
|
|
|
|
+ err = yaml.Unmarshal(bs, cf)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+ cache := map[string]*Vertex{}
|
|
|
|
+ arr := make([]*Vertex, len(cf.Vectexs), len(cf.Vectexs))
|
|
|
|
+ for i, v := range cf.Vectexs {
|
|
|
|
+ vertex := &Vertex{
|
|
|
|
+ Key: v.Key,
|
|
|
|
+ Value: v.Label,
|
|
|
|
+ }
|
|
|
|
+ arr[len(cf.Vectexs)-i-1] = vertex
|
|
|
|
+ cache[v.Key] = vertex
|
|
|
|
+ }
|
|
|
|
+ dag.AddVertexs(arr...)
|
|
|
|
+ for i := len(cf.Edges); i > 0; i-- {
|
|
|
|
+ edge := cf.Edges[i-1]
|
|
|
|
+ dag.AddEdge(cache[edge.To], cache[edge.From])
|
|
|
|
+ }
|
|
|
|
+ return dag, nil
|
|
|
|
+}
|