types.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package dag
  2. import (
  3. "io/ioutil"
  4. "sync"
  5. "gopkg.in/yaml.v2"
  6. )
  7. type (
  8. // 图结构
  9. DAG[T comparable] struct {
  10. Name string
  11. Vertexs []*Vertex[T]
  12. LayerVectexs [][]*Vertex[T]
  13. }
  14. // 顶点
  15. Vertex[T comparable] struct {
  16. Key string //名称
  17. Label string //
  18. Value T
  19. Parents []*Vertex[T]
  20. Children []*Vertex[T]
  21. }
  22. // 配置文件项
  23. Config[T comparable] struct {
  24. Name string `yaml:"name"`
  25. Vectexs []*struct {
  26. Key string `yaml:"key"`
  27. Label string `yaml:"label"`
  28. Arguments T `yaml:"arguments"`
  29. } `yaml:"vertexs"`
  30. Edges []*struct {
  31. From string `yaml:"from"`
  32. To string `yaml:"to"`
  33. } `yaml:"edges"`
  34. }
  35. )
  36. // 添加顶点
  37. func (dag *DAG[T]) AddVertex(v *Vertex[T]) {
  38. dag.Vertexs = append(dag.Vertexs, v)
  39. }
  40. // 添加顶点
  41. func (dag *DAG[T]) AddVertexs(v ...*Vertex[T]) {
  42. dag.Vertexs = append(dag.Vertexs, v...)
  43. }
  44. // 添加边,连线
  45. func (dag *DAG[T]) AddEdge(from, to *Vertex[T]) {
  46. from.Children = append(from.Children, to)
  47. to.Parents = append(to.Parents, from)
  48. }
  49. // bfsRange 广度优先,双层切片
  50. func (dag *DAG[T]) bfsRange() {
  51. root := dag.Vertexs[0]
  52. q := NewQueue()
  53. q.Add(root)
  54. visited := make(map[string]*Vertex[T])
  55. all := make([][]*Vertex[T], 0)
  56. for q.Length() > 0 {
  57. qSize := q.Length()
  58. tmp := make([]*Vertex[T], 0)
  59. for i := 0; i < qSize; i++ {
  60. //pop vertex
  61. currVert := q.Remove().(*Vertex[T])
  62. if _, ok := visited[currVert.Key]; ok {
  63. continue
  64. }
  65. visited[currVert.Key] = currVert
  66. tmp = append(tmp, currVert)
  67. for _, val := range currVert.Children {
  68. if _, ok := visited[val.Key]; !ok {
  69. q.Add(val) //add child
  70. }
  71. }
  72. }
  73. all = append([][]*Vertex[T]{tmp}, all...)
  74. }
  75. dag.LayerVectexs = all
  76. }
  77. // RunDag 并行执行任务
  78. func (dag *DAG[T]) RunDag(threads int, fn func(v *Vertex[T])) {
  79. //
  80. runTaskFn := func(v *Vertex[T], wg *sync.WaitGroup, lock <-chan bool) {
  81. defer func() {
  82. wg.Done()
  83. <-lock
  84. }()
  85. fn(v)
  86. }
  87. //concurrentExecutionFn
  88. concurrentExecutionFn := func(layer []*Vertex[T]) {
  89. lock := make(chan bool, threads)
  90. wg := new(sync.WaitGroup)
  91. for _, v := range layer {
  92. wg.Add(1)
  93. lock <- true
  94. go runTaskFn(v, wg, lock)
  95. }
  96. wg.Wait()
  97. }
  98. //
  99. for _, layer := range dag.LayerVectexs {
  100. concurrentExecutionFn(layer)
  101. }
  102. }
  103. // LoadFromYaml 节点,边,添加的顺序都需要颠倒过来
  104. func LoadFromYamlFile[T comparable](f string) (*DAG[T], error) {
  105. bs, err := ioutil.ReadFile(f)
  106. if err != nil {
  107. return nil, err
  108. }
  109. dag := &DAG[T]{}
  110. cf := new(Config[T])
  111. err = yaml.Unmarshal(bs, cf)
  112. if err != nil {
  113. return nil, err
  114. }
  115. dag.Name = cf.Name
  116. cache := map[string]*Vertex[T]{}
  117. arr := make([]*Vertex[T], len(cf.Vectexs), len(cf.Vectexs))
  118. for i, v := range cf.Vectexs {
  119. vertex := &Vertex[T]{
  120. Key: v.Key,
  121. Label: v.Label,
  122. Value: v.Arguments,
  123. }
  124. arr[len(cf.Vectexs)-i-1] = vertex
  125. cache[v.Key] = vertex
  126. }
  127. dag.AddVertexs(arr...)
  128. for i := len(cf.Edges); i > 0; i-- {
  129. edge := cf.Edges[i-1]
  130. dag.AddEdge(cache[edge.To], cache[edge.From])
  131. }
  132. dag.bfsRange()
  133. return dag, nil
  134. }