base.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package base
  2. import (
  3. "context"
  4. "log"
  5. "runtime"
  6. "strings"
  7. "app.yhyue.com/moapp/jybase/es"
  8. "github.com/gogf/gf/os/gtime"
  9. "github.com/gogf/gf/util/gconv"
  10. es7 "github.com/olivere/elastic/v7"
  11. )
  12. // 配置文件对象
  13. type (
  14. Cfg struct {
  15. Tasks []*Task
  16. }
  17. Task struct {
  18. Source *Obj
  19. Direct *Obj
  20. }
  21. Obj struct {
  22. Type string
  23. Data map[string]any
  24. }
  25. )
  26. // 任务对象
  27. type (
  28. Idx struct {
  29. Source Source
  30. Direct Direct
  31. }
  32. //数据源对象 支持多源输入 xls mysql mongodb elastic
  33. Source interface {
  34. Find(d Direct)
  35. }
  36. //保存对象
  37. Direct interface {
  38. Save() //保存
  39. Send(data map[string]any) //发送
  40. End() //结束
  41. }
  42. )
  43. // 执行任务
  44. func (idx *Idx) Task() {
  45. go idx.Direct.Save()
  46. idx.Source.Find(idx.Direct)
  47. }
  48. func Catch() {
  49. if r := recover(); r != nil {
  50. log.Println(r)
  51. for skip := 0; ; skip++ {
  52. _, file, line, ok := runtime.Caller(skip)
  53. if !ok {
  54. break
  55. }
  56. go log.Printf("%v,%v\n", file, line)
  57. }
  58. }
  59. }
  60. func ParseData(field string, data string) (f string, d any) {
  61. arr := strings.Split(field, "|")
  62. if len(arr) == 2 {
  63. f = arr[1]
  64. switch arr[0] {
  65. case "Date":
  66. d = gtime.New(data).Time
  67. case "Float64":
  68. d = gconv.Float64(data)
  69. case "Int":
  70. d = gconv.Int(data)
  71. case "Int64":
  72. d = gconv.Int64(data)
  73. case "Millisecond":
  74. d = gtime.New(data).UnixMilli()
  75. }
  76. } else {
  77. f = field
  78. d = data
  79. }
  80. return
  81. }
  82. func BulkSave(e *es.EsV7, index, itype string, obj *[]map[string]interface{}, isDelBefore bool) {
  83. client := e.GetEsConn()
  84. defer e.DestoryEsConn(client)
  85. if client != nil {
  86. defer func() {
  87. if r := recover(); r != nil {
  88. log.Println("[E]", r)
  89. for skip := 1; ; skip++ {
  90. _, file, line, ok := runtime.Caller(skip)
  91. if !ok {
  92. break
  93. }
  94. go log.Printf("%v,%v\n", file, line)
  95. }
  96. }
  97. }()
  98. req := client.Bulk()
  99. for _, v := range *obj {
  100. if v == nil || len(v) == 0 {
  101. continue
  102. }
  103. _id := gconv.String(v["id"])
  104. if isDelBefore && _id != "" {
  105. req = req.Add(es7.NewBulkDeleteRequest().Index(index).Id(_id))
  106. }
  107. req = req.Add(es7.NewBulkIndexRequest().Index(index).Id(_id).Doc(v))
  108. }
  109. _, err := req.Do(context.TODO())
  110. if err != nil {
  111. log.Println("批量保存到ES出错", err.Error())
  112. }
  113. }
  114. }