package base import ( "context" "log" "runtime" "strings" "app.yhyue.com/moapp/jybase/es" "github.com/gogf/gf/os/gtime" "github.com/gogf/gf/util/gconv" es7 "github.com/olivere/elastic/v7" ) // 配置文件对象 type ( Cfg struct { Tasks []*Task } Task struct { Source *Obj Direct *Obj } Obj struct { Type string Data map[string]any } ) // 任务对象 type ( Idx struct { Source Source Direct Direct } //数据源对象 支持多源输入 xls mysql mongodb elastic Source interface { Find(d Direct) } //保存对象 Direct interface { Save() //保存 Send(data map[string]any) //发送 End() //结束 } ) // 执行任务 func (idx *Idx) Task() { go idx.Direct.Save() idx.Source.Find(idx.Direct) } func Catch() { if r := recover(); r != nil { log.Println(r) for skip := 0; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } } func ParseData(field string, data string) (f string, d any) { arr := strings.Split(field, "|") if len(arr) == 2 { f = arr[1] switch arr[0] { case "Date": d = gtime.New(data).Time case "Float64": d = gconv.Float64(data) case "Int": d = gconv.Int(data) case "Int64": d = gconv.Int64(data) case "Millisecond": d = gtime.New(data).UnixMilli() } } else { f = field d = data } return } func BulkSave(e *es.EsV7, index, itype string, obj *[]map[string]interface{}, isDelBefore bool) { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() req := client.Bulk() for _, v := range *obj { if v == nil || len(v) == 0 { continue } _id := gconv.String(v["id"]) if isDelBefore && _id != "" { req = req.Add(es7.NewBulkDeleteRequest().Index(index).Id(_id)) } req = req.Add(es7.NewBulkIndexRequest().Index(index).Id(_id).Doc(v)) } _, err := req.Do(context.TODO()) if err != nil { log.Println("批量保存到ES出错", err.Error()) } } }