12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- package main
- import (
- "qfw/util/elastic"
- "log"
- "time"
- )
- type updateEsInfo struct {
- //更新或新增通道
- update_pool chan map[string]string
- //数量
- saveSize int
- }
- var sp_es = make(chan bool, 10)
- //批量更新对象
- func newUpdateEsPool() *updateEsInfo {
- update:=&updateEsInfo{make(chan map[string]string, 50000),200}
- return update
- }
- //更新池
- func (update *updateEsInfo) updateEsData() {
- log.Println("监听Es......更新数据")
- tmpArr := make([]map[string]string, update.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-update.update_pool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == update.saveSize {
- sp_es <- true
- go func(dataArr []map[string]string) {
- defer func() {
- <-sp_es
- }()
- //批量更新
- elastic.BulkUpdateArr(esIndex,esType,dataArr)
- }(tmpArr)
- tmpArr = make([]map[string]string, update.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp_es <- true
- go func(dataArr []map[string]string) {
- defer func() {
- <-sp_es
- }()
- //批量更新
- elastic.BulkUpdateArr(esIndex,esType,dataArr)
- }(tmpArr[:tmpIndex])
- tmpArr = make([]map[string]string, update.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
|