package des import ( "createindex/base" "log" "time" "app.yhyue.com/moapp/jybase/es" // esv7 "github.com/olivere/elastic/v7" ) //目标保存对象,es工具类 type Des struct { Ch chan map[string]any ChEnd chan bool Index string ES *es.EsV7 } func GetDes(addr, user, pwd, index string, size int) *Des { e := &Des{ Ch: make(chan map[string]any, 500), ChEnd: make(chan bool), Index: index, ES: es.NewEs("v7", addr, size, user, pwd).(*es.EsV7), } return e } func (es *Des) Save() { arr := []map[string]any{} L: for { select { case data := <-es.Ch: arr = append(arr, data) if len(arr) > 99 { func(ao *[]map[string]any) { base.BulkSave(es.ES, es.Index, "", ao, false) }(&arr) arr = []map[string]any{} } default: select { case <-es.ChEnd: break L case <-time.After(time.Millisecond * 10): } } } if len(arr) > 0 { func(ao *[]map[string]any) { base.BulkSave(es.ES, es.Index, "", ao, false) }(&arr) } log.Println("es,保存完成...") } func (es *Des) Send(data map[string]any) { es.Ch <- data } func (es *Des) End() { es.ChEnd <- true }