123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- package dag
- import (
- "io/ioutil"
- "sync"
- "gopkg.in/yaml.v2"
- )
- type (
- // 图结构
- DAG[T comparable] struct {
- Name string
- Vertexs []*Vertex[T]
- LayerVectexs [][]*Vertex[T]
- }
- // 顶点
- Vertex[T comparable] struct {
- Key string //名称
- Label string //
- Value T
- Parents []*Vertex[T]
- Children []*Vertex[T]
- }
- // 配置文件项
- Config[T comparable] struct {
- Name string `yaml:"name"`
- Vectexs []*struct {
- Key string `yaml:"key"`
- Label string `yaml:"label"`
- Arguments T `yaml:"arguments"`
- } `yaml:"vertexs"`
- Edges []*struct {
- From string `yaml:"from"`
- To string `yaml:"to"`
- } `yaml:"edges"`
- }
- )
- // 添加顶点
- func (dag *DAG[T]) AddVertex(v *Vertex[T]) {
- dag.Vertexs = append(dag.Vertexs, v)
- }
- // 添加顶点
- func (dag *DAG[T]) AddVertexs(v ...*Vertex[T]) {
- dag.Vertexs = append(dag.Vertexs, v...)
- }
- // 添加边,连线
- func (dag *DAG[T]) AddEdge(from, to *Vertex[T]) {
- from.Children = append(from.Children, to)
- to.Parents = append(to.Parents, from)
- }
- // bfsRange 广度优先,双层切片
- func (dag *DAG[T]) bfsRange() {
- root := dag.Vertexs[0]
- q := NewQueue()
- q.Add(root)
- visited := make(map[string]*Vertex[T])
- all := make([][]*Vertex[T], 0)
- for q.Length() > 0 {
- qSize := q.Length()
- tmp := make([]*Vertex[T], 0)
- for i := 0; i < qSize; i++ {
- //pop vertex
- currVert := q.Remove().(*Vertex[T])
- if _, ok := visited[currVert.Key]; ok {
- continue
- }
- visited[currVert.Key] = currVert
- tmp = append(tmp, currVert)
- for _, val := range currVert.Children {
- if _, ok := visited[val.Key]; !ok {
- q.Add(val) //add child
- }
- }
- }
- all = append([][]*Vertex[T]{tmp}, all...)
- }
- dag.LayerVectexs = all
- }
- // RunDag 并行执行任务
- func (dag *DAG[T]) RunDag(threads int, fn func(v *Vertex[T])) {
- //
- runTaskFn := func(v *Vertex[T], wg *sync.WaitGroup, lock <-chan bool) {
- defer func() {
- wg.Done()
- <-lock
- }()
- fn(v)
- }
- //concurrentExecutionFn
- concurrentExecutionFn := func(layer []*Vertex[T]) {
- lock := make(chan bool, threads)
- wg := new(sync.WaitGroup)
- for _, v := range layer {
- wg.Add(1)
- lock <- true
- go runTaskFn(v, wg, lock)
- }
- wg.Wait()
- }
- //
- for _, layer := range dag.LayerVectexs {
- concurrentExecutionFn(layer)
- }
- }
- // LoadFromYaml 节点,边,添加的顺序都需要颠倒过来
- func LoadFromYamlFile[T comparable](f string) (*DAG[T], error) {
- bs, err := ioutil.ReadFile(f)
- if err != nil {
- return nil, err
- }
- dag := &DAG[T]{}
- cf := new(Config[T])
- err = yaml.Unmarshal(bs, cf)
- if err != nil {
- return nil, err
- }
- dag.Name = cf.Name
- cache := map[string]*Vertex[T]{}
- arr := make([]*Vertex[T], len(cf.Vectexs), len(cf.Vectexs))
- for i, v := range cf.Vectexs {
- vertex := &Vertex[T]{
- Key: v.Key,
- Label: v.Label,
- Value: v.Arguments,
- }
- arr[len(cf.Vectexs)-i-1] = vertex
- cache[v.Key] = vertex
- }
- dag.AddVertexs(arr...)
- for i := len(cf.Edges); i > 0; i-- {
- edge := cf.Edges[i-1]
- dag.AddEdge(cache[edge.To], cache[edge.From])
- }
- dag.bfsRange()
- return dag, nil
- }
|