taskEs.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "fieldproject_common/config"
  7. "fmt"
  8. "github.com/spf13/cobra"
  9. "go.uber.org/zap"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. // @Description bidding数据 bid_field
  15. // @Author J 2022/8/30 09:09
  16. func bidding() *cobra.Command {
  17. cmdClient := &cobra.Command{
  18. Use: "bidding",
  19. Short: "Start processing bidding data",
  20. Run: func(cmd *cobra.Command, args []string) {
  21. go updateEsMethod()
  22. taskBidding()
  23. },
  24. }
  25. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  26. return cmdClient
  27. }
  28. // @Description 医疗机构数据
  29. // @Author J 2022/8/11 16:50
  30. func institution() *cobra.Command {
  31. cmdClient := &cobra.Command{
  32. Use: "medical_institution",
  33. Short: "Start processing medical_institutional data",
  34. Run: func(cmd *cobra.Command, args []string) {
  35. go SaveEs(config.Conf.DB.Es.IndexM, config.Conf.DB.Es.TypeM)
  36. taskIterateSql()
  37. },
  38. }
  39. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  40. return cmdClient
  41. }
  42. // @Description 供应商/经销商数据
  43. // @Author J 2022/8/11 16:49
  44. func product() *cobra.Command {
  45. cmdClient := &cobra.Command{
  46. Use: "supplier_product",
  47. Short: "Start processing supplier_product data",
  48. Run: func(cmd *cobra.Command, args []string) {
  49. go SaveEs(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS)
  50. taskIterateSql1()
  51. },
  52. }
  53. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  54. return cmdClient
  55. }
  56. func taskBidding() {
  57. sess := MongoTool.GetMgoConn()
  58. defer MongoTool.DestoryMongoConn(sess)
  59. ch := make(chan bool, 3)
  60. wg := &sync.WaitGroup{}
  61. //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5a8d7f4840d2d9bbe8962002")}
  62. query := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(nil).Iter()
  63. count := 0
  64. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  65. if count%20000 == 0 {
  66. log.Info(fmt.Sprintf("current --- %d", count))
  67. }
  68. ch <- true
  69. wg.Add(1)
  70. go func(tmp map[string]interface{}) {
  71. defer func() {
  72. <-ch
  73. wg.Done()
  74. }()
  75. if b := util.ObjToString(tmp["bid_field"]); b != "" {
  76. updateEsPool <- []map[string]interface{}{{
  77. "_id": mongodb.BsonIdToSId(tmp["_id"]),
  78. },
  79. {"bid_field": b},
  80. }
  81. }
  82. }(tmp)
  83. tmp = make(map[string]interface{})
  84. }
  85. wg.Wait()
  86. log.Info(fmt.Sprintf("over --- %d", count))
  87. }
  88. func taskIterateSql() {
  89. pool := make(chan bool, 10) //控制线程数
  90. wg := &sync.WaitGroup{}
  91. finalId := 0
  92. lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "institution_baseinfo"))
  93. if len(*lastInfo) > 0 {
  94. finalId = util.IntAll((*lastInfo)[0]["id"])
  95. }
  96. log.Info("taskIterateSql---", zap.Int("finally id: ", finalId))
  97. lastid, count := 0, 0
  98. for {
  99. util.Debug("重新查询,lastid---", lastid)
  100. q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "institution_baseinfo", lastid)
  101. rows, err := MysqlM.DB.Query(q)
  102. if err != nil {
  103. log.Error("taskIterateSql---", zap.Error(err))
  104. }
  105. columns, err := rows.Columns()
  106. if finalId == lastid {
  107. util.Debug("----finish----------", count)
  108. break
  109. }
  110. for rows.Next() {
  111. scanArgs := make([]interface{}, len(columns))
  112. values := make([]interface{}, len(columns))
  113. ret := make(map[string]interface{})
  114. for k := range values {
  115. scanArgs[k] = &values[k]
  116. }
  117. err = rows.Scan(scanArgs...)
  118. if err != nil {
  119. log.Error("taskIterateSql---", zap.Error(err))
  120. break
  121. }
  122. for i, col := range values {
  123. if v, ok := col.([]uint8); ok {
  124. ret[columns[i]] = string(v)
  125. } else {
  126. ret[columns[i]] = col
  127. }
  128. }
  129. lastid = util.IntAll(ret["id"])
  130. count++
  131. if count%500 == 0 {
  132. util.Debug("current-------", count, lastid)
  133. }
  134. pool <- true
  135. wg.Add(1)
  136. go func(tmp map[string]interface{}) {
  137. defer func() {
  138. <-pool
  139. wg.Done()
  140. }()
  141. EsSaveCache <- method(tmp)
  142. }(ret)
  143. ret = make(map[string]interface{})
  144. }
  145. _ = rows.Close()
  146. wg.Wait()
  147. }
  148. }
  149. func method(tmp map[string]interface{}) map[string]interface{} {
  150. m := make(map[string]interface{})
  151. for k, v := range config.Conf.DB.Es.FieldM {
  152. if k == "alias" {
  153. var arr []string
  154. info := MysqlM.Find("institution_alias", map[string]interface{}{"company_id": tmp["company_id"]}, "", "", -1, -1)
  155. for _, m2 := range *info {
  156. arr = append(arr, util.ObjToString(m2["alias"]))
  157. }
  158. if len(arr) > 0 {
  159. m[k] = strings.Join(arr, ",")
  160. }
  161. } else if k == "sdequipment" {
  162. if util.ObjToString(tmp["mi_type_code"]) == "0208" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
  163. m[k] = "多功能电离子手术治疗机,CO2激光治疗仪,半导体激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针、电刀、电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机"
  164. } else if util.ObjToString(tmp["mi_type_code"]) == "0201" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
  165. m[k] = "牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机"
  166. } else if util.ObjToString(tmp["mi_type_code"]) == "0203" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
  167. m[k] = "小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,胆红素测定仪,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,经皮给药治疗仪,儿童智能测量仪"
  168. } else if util.ObjToString(tmp["mi_type_code"]) == "12" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
  169. m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏起搏器,心脏除颤器,心脏复苏机,呼吸机,儿童用呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,血压计,体温计,体重计,空气消毒机"
  170. } else if util.ObjToString(tmp["mi_type_code"]) == "01" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
  171. m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏除颤器,心脏复苏机,呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,空气消毒机,动态心电监测系统,有创呼吸机,动态血压监护仪,便携式血氧饱和度监护仪,多导睡眠呼吸监测仪,床边肺功能仪,便携式血气分析仪,心肺功能监测仪,胃动力检测仪,胃电治疗仪,腹水超滤仪,腹水浓缩机,人工肝,肝病治疗仪,床单位臭氧消毒机,结肠灌洗治疗仪,经皮肾镜,动态血糖监测仪,胰岛素泵,糖尿病足病诊断箱,眼底镜,空气波压力治疗仪,动态脑电监护仪,颅内压监测仪,脑水肿监测仪,半导体激光治疗仪,钴60放射治疗机,超声聚焦刀,氩氦刀,射频肿瘤治疗仪,微波热疗仪,双筒显微镜,相差显微镜,荧光显微镜,倒置显微镜,骨髓活检装置,流式细胞分析仪,细胞分离机,脑细胞介质分析仪,双光能骨密度仪,脑电超慢涨落分析仪,动脉硬化测试仪,移动式负压吸引器,换药床,乳腺微创真空旋切系统,外碎石设备,骨科牵引床,脊柱牵引床,推拿手法床,石膏床,石膏剪,石膏锯,水温箱,足底静脉泵,激光治疗仪,骨科康复设备,吸引设备,供氧设备,监护设备,呼叫系统,心脏除颤仪,抢救车,换药车,转运床,营养输注泵,防褥疮气垫,血压计,体温计,体重计,移动紫外线灯,负压病房设施,层流病房设施,心脏起搏器,便携式呼吸机,雾化器,床边支气管镜,血液动力学检测仪,电冰毯,电子冰帽,脑电图监测仪,脑功能监测仪,振动排痰器,床单元臭氧消毒机,层流净化系统,holter,主动脉球囊反搏泵,便携式超声诊断仪,食道电生理仪,诊察床,肛管直肠压力测定设备,肛门镜,肛门坐浴熏洗设备,结肠灌洗设备,肛肠综合治疗仪,痔科套扎器,肛肠内腔治疗仪,肛门肌电图,多功能电离子手术治疗机,CO2激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针,电刀,电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机,母婴监护仪,妇科检查台,计划生育手术床,冲洗车,阴道镜,人流吸引器,超声诊断仪,超高频电波刀,超声聚焦治疗仪,盆腔炎治疗设备,产后康复综合治疗仪,胎心监护仪,妇科检查床,综合产床,新生儿抢救台,婴儿辐射保暖台,婴儿培养箱,电动羊水吸引器,经皮给药治疗仪,胆红素测定仪,产妇电脑综合治疗仪,消毒隔离器械柜,小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,儿童智能测量仪,眼科治疗床,裂隙灯,眼压计,角膜曲率计,视力灯箱,客观视力仪,电脑验光仪,全自动电脑视野仪,手术显微镜,眼科AB超声仪,超声乳化治疗仪,眼底荧光造影仪,自动焦度仪,沙眼治疗仪,睫毛电解器,视觉诱发电位仪,耳鼻喉综合治疗台,耳科旋转椅,鼓气电窥耳镜,耳钻,动态喉镜,纤维喉镜,间接喉镜,直接喉镜,支撑喉镜,电子喉镜,鼻咽喉镜,间接鼻咽喉镜,前鼻镜,鼻内镜及手术系统,电测听器,前庭检查仪,眼震电图仪,声阻抗仪,五官科多用显微镜,牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机,普通X光机,洗片机,透视机,移动式X光机,数字X线摄影,CR,DR,干式激光相机,X线电子计算机断层扫描装置,CT,数字减影血管造影X光机,DSA,数字化胃肠X光机,数据传输系统,LIS,全自动血细胞分析仪,尿液分析仪,尿沉渣工作站,冰点渗透压计,凝血检测仪,血糖测定仪,微量血糖测定仪,血气分析仪,干式生化分析仪,生化分析仪,发光免疫分析仪,全自动酶免免疫分析系统,酶标仪,电泳分析仪,血小板聚集仪,全自动细菌培养系统,生物培养箱,微生物鉴定药敏分析仪,血培养仪,菌落计数器,厌氧菌培养箱,幽门螺旋杆菌检测仪,氨基酸分析系统,荧光定量PCR检测系统,TCT液基细胞学检测仪,HPV-DNA检测系统,心梗三项检测仪,脑钠肽检测仪,二氧化碳培养箱,高温灭菌器,生物安全柜,血液流变仪,普通显微镜,生物显微镜,血沉仪,蛋白电泳仪,特种蛋白仪,电解质分析仪,精子分析系统,血栓弹力分析仪,血型鉴定及配血设备,纯水系统,自动洗板机,分析天平,超声清洗器,振荡器,电热培养箱,恒温水浴箱,医用冷库,医用冷藏柜,超低温冰柜,普通离心机,低速冷冻离心机,高速冷冻离心机,快速血糖仪微量泵,输液泵营养,输注泵,负压病房设施,层流病房设施,有创呼吸机便携式呼吸机,心电图机动态心电监测系统,微量泵输液泵,食道电生理仪床,单元臭氧消毒机,肛管直肠,压力测定设备,超声聚焦治疗仪,盆腔炎治疗设备,裂隙灯眼压计,眼底镜角膜曲率计视力,灯箱客观视力仪,睫毛电解器,鼓气电窥耳镜耳钻,动态喉镜纤维喉镜,间接喉镜直接喉镜,前鼻镜鼻,内镜及手术系统,电测听器前庭检查仪,抛光机氦氖激光器,光敏固化灯种植机,喷砂机铸造机石膏振荡器,干燥箱全瓷/铸造烤瓷设备,普通X光机,透视机移动式X光机,X线电子计算机,数字化胃肠X光机,数据传输系统(LIS)全自动血细胞分析仪,尿沉渣工作站冰点渗透压计,全自动细菌培养系统,生物培养箱,血沉仪蛋白电泳仪,分析天平超声清洗器,干燥箱医用冰箱,心脏复苏机,营养输注泵,血透机,自动腹透机,外碎石设备,核磁共振仪,核磁共振成像系统"
  172. }
  173. info1, _ := MongoTool.FindOneByField("20220906shebei_buchong", map[string]interface{}{"company_id": tmp["company_id"]}, map[string]interface{}{"itemname_all": 1})
  174. if info1 != nil && len(*info1) > 0 {
  175. if m[k] != nil {
  176. m[k] = fmt.Sprintf("%s,%s", m[k], (*info1)["itemname_all"])
  177. } else {
  178. m[k] = (*info1)["itemname_all"]
  179. }
  180. }
  181. info := MysqlM.FindOne("code_sdleveltypeequip", map[string]interface{}{"code": tmp["sdequipment_code"]}, "", "")
  182. if info != nil && len(*info) > 0 {
  183. if m[k] != nil {
  184. m[k] = fmt.Sprintf("%s,%s", m[k], (*info)["equipment"])
  185. } else {
  186. m[k] = (*info)["equipment"]
  187. }
  188. }
  189. } else if k == "area_code" {
  190. m[k] = tmp[k]
  191. info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "")
  192. if info != nil && len(*info) > 0 {
  193. m["area"] = (*info)["area"]
  194. if (*info)["city"] != nil {
  195. m["city"] = (*info)["city"]
  196. }
  197. if (*info)["district"] != nil {
  198. m["district"] = (*info)["district"]
  199. }
  200. }
  201. } else if k == "mi_type_code" {
  202. m[k] = string([]byte(util.ObjToString(tmp[k]))[:2])
  203. } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" {
  204. continue
  205. } else {
  206. if v == "string" {
  207. m[k] = tmp[k]
  208. } else {
  209. m[k] = util.IntAll(tmp[k])
  210. }
  211. }
  212. }
  213. return m
  214. }
  215. func taskIterateSql1() {
  216. pool := make(chan bool, 10) //控制线程数
  217. wg := &sync.WaitGroup{}
  218. finalId := 0
  219. lastInfo := MysqlB.SelectBySql(fmt.Sprintf("SELECT id FROM %s WHERE sourcetype>=2 ORDER BY id DESC LIMIT 1", "company_baseinfo"))
  220. if len(*lastInfo) > 0 {
  221. finalId = util.IntAll((*lastInfo)[0]["id"])
  222. }
  223. log.Info("taskIterateSql1---", zap.Int("finally id", finalId))
  224. lastid, count := 0, 0
  225. for {
  226. util.Debug("重新查询,lastid---", lastid)
  227. q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d and sourcetype>=2 ORDER BY id ASC limit 100000", "company_baseinfo", lastid)
  228. //q := fmt.Sprintf("SELECT * FROM %s WHERE id = %d ORDER BY id ASC limit 1000000", "company_baseinfo", 12526)
  229. rows, err := MysqlB.DB.Query(q)
  230. if err != nil {
  231. log.Error("taskIterateSql1---", zap.Error(err))
  232. }
  233. columns, err := rows.Columns()
  234. if finalId == lastid {
  235. util.Debug("----finish----------", count)
  236. break
  237. }
  238. for rows.Next() {
  239. scanArgs := make([]interface{}, len(columns))
  240. values := make([]interface{}, len(columns))
  241. ret := make(map[string]interface{})
  242. for k := range values {
  243. scanArgs[k] = &values[k]
  244. }
  245. err = rows.Scan(scanArgs...)
  246. if err != nil {
  247. log.Error("taskIterateSql1---", zap.Error(err))
  248. break
  249. }
  250. for i, col := range values {
  251. if v, ok := col.([]uint8); ok {
  252. ret[columns[i]] = string(v)
  253. } else {
  254. ret[columns[i]] = col
  255. }
  256. }
  257. lastid = util.IntAll(ret["id"])
  258. count++
  259. if count%500 == 0 {
  260. util.Debug("current-------", count, lastid)
  261. }
  262. pool <- true
  263. wg.Add(1)
  264. go func(tmp map[string]interface{}) {
  265. defer func() {
  266. <-pool
  267. wg.Done()
  268. }()
  269. EsSaveCache <- method1(tmp)
  270. }(ret)
  271. ret = make(map[string]interface{})
  272. }
  273. _ = rows.Close()
  274. wg.Wait()
  275. }
  276. }
  277. func method1(tmp map[string]interface{}) map[string]interface{} {
  278. m := make(map[string]interface{})
  279. for k, v := range config.Conf.DB.Es.FieldS {
  280. if k == "business_model" {
  281. info := MysqlB.FindOne("company_business_model", map[string]interface{}{"company_id": tmp["company_id"], "company_field_code": "0101"}, "", "")
  282. if info != nil && len(*info) > 0 {
  283. m[k] = util.IntAll((*info)["business_model"])
  284. } else {
  285. m[k] = 2
  286. }
  287. } else if k == "supplier" {
  288. m[k] = tmp["company_name"]
  289. } else if k == "productlist" {
  290. var p = method2(util.ObjToString(tmp["company_id"]))
  291. if p != nil {
  292. m[k] = p
  293. }
  294. } else if k == "area_code" {
  295. if tmp[k] != nil {
  296. m[k] = tmp[k]
  297. info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "")
  298. if info != nil && len(*info) > 0 {
  299. m["area"] = (*info)["area"]
  300. if (*info)["city"] != nil {
  301. m["city"] = (*info)["city"]
  302. }
  303. if (*info)["district"] != nil {
  304. m["district"] = (*info)["district"]
  305. }
  306. }
  307. } else {
  308. log.Error("area_code", zap.Any("id", tmp["company_id"]))
  309. }
  310. } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" {
  311. continue
  312. } else {
  313. if v == "string" {
  314. m[k] = tmp[k]
  315. } else {
  316. m[k] = util.IntAll(tmp[k])
  317. }
  318. }
  319. }
  320. return m
  321. }
  322. func method2(cid string) []map[string]interface{} {
  323. var pmap []map[string]interface{}
  324. mc := make(map[string]bool) // 记录name 去重
  325. pinfo1 := MysqlM.Find("product_baseinfo", map[string]interface{}{"company_id": cid}, "", "", -1, -1)
  326. for _, m2 := range *pinfo1 {
  327. m := make(map[string]interface{})
  328. m["name"] = m2["product_name"]
  329. pmap = append(pmap, m)
  330. mc[util.ObjToString(m2["product_name"])] = true
  331. }
  332. pinfo2, _ := MongoTool.Find("bidding_p_list_0907", map[string]interface{}{"company_id": cid}, nil, nil, false, -1, -1)
  333. if len(*pinfo2) > 0 {
  334. for _, m2 := range *pinfo2 {
  335. key := util.ObjToString(m2["itemname"]) + util.ObjToString(m2["brand"]) + util.ObjToString(m2["model"])
  336. if mc[key] {
  337. continue
  338. } else {
  339. m := make(map[string]interface{})
  340. m["name"] = util.ObjToString(m2["itemname"])
  341. if m2["model"] != nil {
  342. m["model"] = m2["model"]
  343. }
  344. if m2["brand"] != nil {
  345. m["brand"] = m2["brand"]
  346. }
  347. pmap = append(pmap, m)
  348. mc[key] = true
  349. }
  350. }
  351. }
  352. return pmap
  353. }
  354. func SaveEs(i, t string) {
  355. log.Info("SaveEs---", zap.String("i", i), zap.String("t", t))
  356. arru := make([]map[string]interface{}, 500)
  357. indexu := 0
  358. for {
  359. select {
  360. case v := <-EsSaveCache:
  361. arru[indexu] = v
  362. indexu++
  363. if indexu == 500 {
  364. SP <- true
  365. go func(arru []map[string]interface{}) {
  366. defer func() {
  367. <-SP
  368. }()
  369. Es.BulkSave(i, t, &arru, false)
  370. }(arru)
  371. arru = make([]map[string]interface{}, 500)
  372. indexu = 0
  373. }
  374. case <-time.After(1000 * time.Millisecond):
  375. if indexu > 0 {
  376. SP <- true
  377. go func(arru []map[string]interface{}) {
  378. defer func() {
  379. <-SP
  380. }()
  381. Es.BulkSave(i, t, &arru, false)
  382. }(arru[:indexu])
  383. arru = make([]map[string]interface{}, 500)
  384. indexu = 0
  385. }
  386. }
  387. }
  388. }
  389. func updateEsMethod() {
  390. arru := make([][]map[string]interface{}, 200)
  391. indexu := 0
  392. for {
  393. select {
  394. case v := <-updateEsPool:
  395. arru[indexu] = v
  396. indexu++
  397. if indexu == 200 {
  398. updateEsSp <- true
  399. go func(arru [][]map[string]interface{}) {
  400. defer func() {
  401. <-updateEsSp
  402. }()
  403. Es.UpdateBulk("bidding", "bidding", arru...)
  404. }(arru)
  405. arru = make([][]map[string]interface{}, 200)
  406. indexu = 0
  407. }
  408. case <-time.After(1000 * time.Millisecond):
  409. if indexu > 0 {
  410. updateEsSp <- true
  411. go func(arru [][]map[string]interface{}) {
  412. defer func() {
  413. <-updateEsSp
  414. }()
  415. Es.UpdateBulk("bidding", "bidding", arru...)
  416. }(arru[:indexu])
  417. arru = make([][]map[string]interface{}, 200)
  418. indexu = 0
  419. }
  420. }
  421. }
  422. }