123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- package des
- import (
- "createindex/base"
- "log"
- "time"
- "app.yhyue.com/moapp/jybase/es"
- // esv7 "github.com/olivere/elastic/v7"
- )
- //目标保存对象,es工具类
- type Des struct {
- Ch chan map[string]any
- ChEnd chan bool
- Index string
- ES *es.EsV7
- }
- func GetDes(addr, user, pwd, index string, size int) *Des {
- e := &Des{
- Ch: make(chan map[string]any, 500),
- ChEnd: make(chan bool),
- Index: index,
- ES: es.NewEs("v7", addr, size, user, pwd).(*es.EsV7),
- }
- return e
- }
- func (es *Des) Save() {
- arr := []map[string]any{}
- L:
- for {
- select {
- case data := <-es.Ch:
- arr = append(arr, data)
- if len(arr) > 99 {
- func(ao *[]map[string]any) {
- base.BulkSave(es.ES, es.Index, "", ao, false)
- }(&arr)
- arr = []map[string]any{}
- }
- default:
- select {
- case <-es.ChEnd:
- break L
- case <-time.After(time.Millisecond * 10):
- }
- }
- }
- if len(arr) > 0 {
- func(ao *[]map[string]any) {
- base.BulkSave(es.ES, es.Index, "", ao, false)
- }(&arr)
- }
- log.Println("es,保存完成...")
- }
- func (es *Des) Send(data map[string]any) {
- es.Ch <- data
- }
- func (es *Des) End() {
- es.ChEnd <- true
- }
|