package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "fieldproject_common/config" "fmt" "github.com/spf13/cobra" "go.uber.org/zap" "strings" "sync" "time" ) // @Description bidding数据 bid_field // @Author J 2022/8/30 09:09 func bidding() *cobra.Command { cmdClient := &cobra.Command{ Use: "bidding", Short: "Start processing bidding data", Run: func(cmd *cobra.Command, args []string) { go updateEsMethod() taskBidding() }, } //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]") return cmdClient } // @Description 医疗机构数据 // @Author J 2022/8/11 16:50 func institution() *cobra.Command { cmdClient := &cobra.Command{ Use: "medical_institution", Short: "Start processing medical_institutional data", Run: func(cmd *cobra.Command, args []string) { go SaveEs(config.Conf.DB.Es.IndexM, config.Conf.DB.Es.TypeM) taskIterateSql() }, } //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]") return cmdClient } // @Description 供应商/经销商数据 // @Author J 2022/8/11 16:49 func product() *cobra.Command { cmdClient := &cobra.Command{ Use: "supplier_product", Short: "Start processing supplier_product data", Run: func(cmd *cobra.Command, args []string) { go SaveEs(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS) taskIterateSql1() }, } //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]") return cmdClient } func taskBidding() { sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5a8d7f4840d2d9bbe8962002")} query := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if b := util.ObjToString(tmp["bid_field"]); b != "" { updateEsPool <- []map[string]interface{}{{ "_id": mongodb.BsonIdToSId(tmp["_id"]), }, {"bid_field": b}, } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func taskIterateSql() { pool := make(chan bool, 10) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "institution_baseinfo")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("taskIterateSql---", zap.Int("finally id: ", finalId)) lastid, count := 0, 0 for { util.Debug("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "institution_baseinfo", lastid) rows, err := MysqlM.DB.Query(q) if err != nil { log.Error("taskIterateSql---", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { util.Debug("----finish----------", count) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("taskIterateSql---", zap.Error(err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%500 == 0 { util.Debug("current-------", count, lastid) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() EsSaveCache <- method(tmp) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } func method(tmp map[string]interface{}) map[string]interface{} { m := make(map[string]interface{}) for k, v := range config.Conf.DB.Es.FieldM { if k == "alias" { var arr []string info := MysqlM.Find("institution_alias", map[string]interface{}{"company_id": tmp["company_id"]}, "", "", -1, -1) for _, m2 := range *info { arr = append(arr, util.ObjToString(m2["alias"])) } if len(arr) > 0 { m[k] = strings.Join(arr, ",") } } else if k == "sdequipment" { if util.ObjToString(tmp["mi_type_code"]) == "0208" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") { m[k] = "多功能电离子手术治疗机,CO2激光治疗仪,半导体激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针、电刀、电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机" } else if util.ObjToString(tmp["mi_type_code"]) == "0201" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") { m[k] = "牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机" } else if util.ObjToString(tmp["mi_type_code"]) == "0203" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") { m[k] = "小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,胆红素测定仪,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,经皮给药治疗仪,儿童智能测量仪" } else if util.ObjToString(tmp["mi_type_code"]) == "12" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") { m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏起搏器,心脏除颤器,心脏复苏机,呼吸机,儿童用呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,血压计,体温计,体重计,空气消毒机" } else if util.ObjToString(tmp["mi_type_code"]) == "01" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") { m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏除颤器,心脏复苏机,呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,空气消毒机,动态心电监测系统,有创呼吸机,动态血压监护仪,便携式血氧饱和度监护仪,多导睡眠呼吸监测仪,床边肺功能仪,便携式血气分析仪,心肺功能监测仪,胃动力检测仪,胃电治疗仪,腹水超滤仪,腹水浓缩机,人工肝,肝病治疗仪,床单位臭氧消毒机,结肠灌洗治疗仪,经皮肾镜,动态血糖监测仪,胰岛素泵,糖尿病足病诊断箱,眼底镜,空气波压力治疗仪,动态脑电监护仪,颅内压监测仪,脑水肿监测仪,半导体激光治疗仪,钴60放射治疗机,超声聚焦刀,氩氦刀,射频肿瘤治疗仪,微波热疗仪,双筒显微镜,相差显微镜,荧光显微镜,倒置显微镜,骨髓活检装置,流式细胞分析仪,细胞分离机,脑细胞介质分析仪,双光能骨密度仪,脑电超慢涨落分析仪,动脉硬化测试仪,移动式负压吸引器,换药床,乳腺微创真空旋切系统,外碎石设备,骨科牵引床,脊柱牵引床,推拿手法床,石膏床,石膏剪,石膏锯,水温箱,足底静脉泵,激光治疗仪,骨科康复设备,吸引设备,供氧设备,监护设备,呼叫系统,心脏除颤仪,抢救车,换药车,转运床,营养输注泵,防褥疮气垫,血压计,体温计,体重计,移动紫外线灯,负压病房设施,层流病房设施,心脏起搏器,便携式呼吸机,雾化器,床边支气管镜,血液动力学检测仪,电冰毯,电子冰帽,脑电图监测仪,脑功能监测仪,振动排痰器,床单元臭氧消毒机,层流净化系统,holter,主动脉球囊反搏泵,便携式超声诊断仪,食道电生理仪,诊察床,肛管直肠压力测定设备,肛门镜,肛门坐浴熏洗设备,结肠灌洗设备,肛肠综合治疗仪,痔科套扎器,肛肠内腔治疗仪,肛门肌电图,多功能电离子手术治疗机,CO2激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针,电刀,电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机,母婴监护仪,妇科检查台,计划生育手术床,冲洗车,阴道镜,人流吸引器,超声诊断仪,超高频电波刀,超声聚焦治疗仪,盆腔炎治疗设备,产后康复综合治疗仪,胎心监护仪,妇科检查床,综合产床,新生儿抢救台,婴儿辐射保暖台,婴儿培养箱,电动羊水吸引器,经皮给药治疗仪,胆红素测定仪,产妇电脑综合治疗仪,消毒隔离器械柜,小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,儿童智能测量仪,眼科治疗床,裂隙灯,眼压计,角膜曲率计,视力灯箱,客观视力仪,电脑验光仪,全自动电脑视野仪,手术显微镜,眼科AB超声仪,超声乳化治疗仪,眼底荧光造影仪,自动焦度仪,沙眼治疗仪,睫毛电解器,视觉诱发电位仪,耳鼻喉综合治疗台,耳科旋转椅,鼓气电窥耳镜,耳钻,动态喉镜,纤维喉镜,间接喉镜,直接喉镜,支撑喉镜,电子喉镜,鼻咽喉镜,间接鼻咽喉镜,前鼻镜,鼻内镜及手术系统,电测听器,前庭检查仪,眼震电图仪,声阻抗仪,五官科多用显微镜,牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机,普通X光机,洗片机,透视机,移动式X光机,数字X线摄影,CR,DR,干式激光相机,X线电子计算机断层扫描装置,CT,数字减影血管造影X光机,DSA,数字化胃肠X光机,数据传输系统,LIS,全自动血细胞分析仪,尿液分析仪,尿沉渣工作站,冰点渗透压计,凝血检测仪,血糖测定仪,微量血糖测定仪,血气分析仪,干式生化分析仪,生化分析仪,发光免疫分析仪,全自动酶免免疫分析系统,酶标仪,电泳分析仪,血小板聚集仪,全自动细菌培养系统,生物培养箱,微生物鉴定药敏分析仪,血培养仪,菌落计数器,厌氧菌培养箱,幽门螺旋杆菌检测仪,氨基酸分析系统,荧光定量PCR检测系统,TCT液基细胞学检测仪,HPV-DNA检测系统,心梗三项检测仪,脑钠肽检测仪,二氧化碳培养箱,高温灭菌器,生物安全柜,血液流变仪,普通显微镜,生物显微镜,血沉仪,蛋白电泳仪,特种蛋白仪,电解质分析仪,精子分析系统,血栓弹力分析仪,血型鉴定及配血设备,纯水系统,自动洗板机,分析天平,超声清洗器,振荡器,电热培养箱,恒温水浴箱,医用冷库,医用冷藏柜,超低温冰柜,普通离心机,低速冷冻离心机,高速冷冻离心机,快速血糖仪微量泵,输液泵营养,输注泵,负压病房设施,层流病房设施,有创呼吸机便携式呼吸机,心电图机动态心电监测系统,微量泵输液泵,食道电生理仪床,单元臭氧消毒机,肛管直肠,压力测定设备,超声聚焦治疗仪,盆腔炎治疗设备,裂隙灯眼压计,眼底镜角膜曲率计视力,灯箱客观视力仪,睫毛电解器,鼓气电窥耳镜耳钻,动态喉镜纤维喉镜,间接喉镜直接喉镜,前鼻镜鼻,内镜及手术系统,电测听器前庭检查仪,抛光机氦氖激光器,光敏固化灯种植机,喷砂机铸造机石膏振荡器,干燥箱全瓷/铸造烤瓷设备,普通X光机,透视机移动式X光机,X线电子计算机,数字化胃肠X光机,数据传输系统(LIS)全自动血细胞分析仪,尿沉渣工作站冰点渗透压计,全自动细菌培养系统,生物培养箱,血沉仪蛋白电泳仪,分析天平超声清洗器,干燥箱医用冰箱,心脏复苏机,营养输注泵,血透机,自动腹透机,外碎石设备,核磁共振仪,核磁共振成像系统" } info1, _ := MongoTool.FindOneByField("20220906shebei_buchong", map[string]interface{}{"company_id": tmp["company_id"]}, map[string]interface{}{"itemname_all": 1}) if info1 != nil && len(*info1) > 0 { if m[k] != nil { m[k] = fmt.Sprintf("%s,%s", m[k], (*info1)["itemname_all"]) } else { m[k] = (*info1)["itemname_all"] } } info := MysqlM.FindOne("code_sdleveltypeequip", map[string]interface{}{"code": tmp["sdequipment_code"]}, "", "") if info != nil && len(*info) > 0 { if m[k] != nil { m[k] = fmt.Sprintf("%s,%s", m[k], (*info)["equipment"]) } else { m[k] = (*info)["equipment"] } } } else if k == "area_code" { m[k] = tmp[k] info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "") if info != nil && len(*info) > 0 { m["area"] = (*info)["area"] if (*info)["city"] != nil { m["city"] = (*info)["city"] } if (*info)["district"] != nil { m["district"] = (*info)["district"] } } } else if k == "mi_type_code" { m[k] = string([]byte(util.ObjToString(tmp[k]))[:2]) } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" { continue } else { if v == "string" { m[k] = tmp[k] } else { m[k] = util.IntAll(tmp[k]) } } } return m } func taskIterateSql1() { pool := make(chan bool, 10) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlB.SelectBySql(fmt.Sprintf("SELECT id FROM %s WHERE sourcetype>=2 ORDER BY id DESC LIMIT 1", "company_baseinfo")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("taskIterateSql1---", zap.Int("finally id", finalId)) lastid, count := 0, 0 for { util.Debug("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d and sourcetype>=2 ORDER BY id ASC limit 100000", "company_baseinfo", lastid) //q := fmt.Sprintf("SELECT * FROM %s WHERE id = %d ORDER BY id ASC limit 1000000", "company_baseinfo", 12526) rows, err := MysqlB.DB.Query(q) if err != nil { log.Error("taskIterateSql1---", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { util.Debug("----finish----------", count) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("taskIterateSql1---", zap.Error(err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%500 == 0 { util.Debug("current-------", count, lastid) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() EsSaveCache <- method1(tmp) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } func method1(tmp map[string]interface{}) map[string]interface{} { m := make(map[string]interface{}) for k, v := range config.Conf.DB.Es.FieldS { if k == "business_model" { info := MysqlB.FindOne("company_business_model", map[string]interface{}{"company_id": tmp["company_id"], "company_field_code": "0101"}, "", "") if info != nil && len(*info) > 0 { m[k] = util.IntAll((*info)["business_model"]) } else { m[k] = 2 } } else if k == "supplier" { m[k] = tmp["company_name"] } else if k == "productlist" { var p = method2(util.ObjToString(tmp["company_id"])) if p != nil { m[k] = p } } else if k == "area_code" { if tmp[k] != nil { m[k] = tmp[k] info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "") if info != nil && len(*info) > 0 { m["area"] = (*info)["area"] if (*info)["city"] != nil { m["city"] = (*info)["city"] } if (*info)["district"] != nil { m["district"] = (*info)["district"] } } } else { log.Error("area_code", zap.Any("id", tmp["company_id"])) } } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" { continue } else { if v == "string" { m[k] = tmp[k] } else { m[k] = util.IntAll(tmp[k]) } } } return m } func method2(cid string) []map[string]interface{} { var pmap []map[string]interface{} mc := make(map[string]bool) // 记录name 去重 pinfo1 := MysqlM.Find("product_baseinfo", map[string]interface{}{"company_id": cid}, "", "", -1, -1) for _, m2 := range *pinfo1 { m := make(map[string]interface{}) m["name"] = m2["product_name"] pmap = append(pmap, m) mc[util.ObjToString(m2["product_name"])] = true } pinfo2, _ := MongoTool.Find("bidding_p_list_0907", map[string]interface{}{"company_id": cid}, nil, nil, false, -1, -1) if len(*pinfo2) > 0 { for _, m2 := range *pinfo2 { key := util.ObjToString(m2["itemname"]) + util.ObjToString(m2["brand"]) + util.ObjToString(m2["model"]) if mc[key] { continue } else { m := make(map[string]interface{}) m["name"] = util.ObjToString(m2["itemname"]) if m2["model"] != nil { m["model"] = m2["model"] } if m2["brand"] != nil { m["brand"] = m2["brand"] } pmap = append(pmap, m) mc[key] = true } } } return pmap } func SaveEs(i, t string) { log.Info("SaveEs---", zap.String("i", i), zap.String("t", t)) arru := make([]map[string]interface{}, 500) indexu := 0 for { select { case v := <-EsSaveCache: arru[indexu] = v indexu++ if indexu == 500 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(i, t, &arru, false) }(arru) arru = make([]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(i, t, &arru, false) }(arru[:indexu]) arru = make([]map[string]interface{}, 500) indexu = 0 } } } } func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", "bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", "bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }