123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package base
- import (
- "context"
- "log"
- "runtime"
- "strings"
- "app.yhyue.com/moapp/jybase/es"
- "github.com/gogf/gf/os/gtime"
- "github.com/gogf/gf/util/gconv"
- es7 "github.com/olivere/elastic/v7"
- )
- // 配置文件对象
- type (
- Cfg struct {
- Tasks []*Task
- }
- Task struct {
- Source *Obj
- Direct *Obj
- }
- Obj struct {
- Type string
- Data map[string]any
- }
- )
- // 任务对象
- type (
- Idx struct {
- Source Source
- Direct Direct
- }
- //数据源对象 支持多源输入 xls mysql mongodb elastic
- Source interface {
- Find(d Direct)
- }
- //保存对象
- Direct interface {
- Save() //保存
- Send(data map[string]any) //发送
- End() //结束
- }
- )
- // 执行任务
- func (idx *Idx) Task() {
- go idx.Direct.Save()
- idx.Source.Find(idx.Direct)
- }
- func Catch() {
- if r := recover(); r != nil {
- log.Println(r)
- for skip := 0; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }
- func ParseData(field string, data string) (f string, d any) {
- arr := strings.Split(field, "|")
- if len(arr) == 2 {
- f = arr[1]
- switch arr[0] {
- case "Date":
- d = gtime.New(data).Time
- case "Float64":
- d = gconv.Float64(data)
- case "Int":
- d = gconv.Int(data)
- case "Int64":
- d = gconv.Int64(data)
- case "Millisecond":
- d = gtime.New(data).UnixMilli()
- }
- } else {
- f = field
- d = data
- }
- return
- }
- func BulkSave(e *es.EsV7, index, itype string, obj *[]map[string]interface{}, isDelBefore bool) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- req := client.Bulk()
- for _, v := range *obj {
- if v == nil || len(v) == 0 {
- continue
- }
- _id := gconv.String(v["id"])
- if isDelBefore && _id != "" {
- req = req.Add(es7.NewBulkDeleteRequest().Index(index).Id(_id))
- }
- req = req.Add(es7.NewBulkIndexRequest().Index(index).Id(_id).Doc(v))
- }
- _, err := req.Do(context.TODO())
- if err != nil {
- log.Println("批量保存到ES出错", err.Error())
- }
- }
- }
|