|
- package main
- import (
- util "app.yhyue.com/data_processing/common_utils"
- "app.yhyue.com/data_processing/common_utils/log"
- "app.yhyue.com/data_processing/common_utils/mongodb"
- "fieldproject_common/config"
- "fmt"
- "github.com/spf13/cobra"
- "go.uber.org/zap"
- "strings"
- "sync"
- "time"
- )
- // @Description bidding数据 bid_field
- // @Author J 2022/8/30 09:09
- func bidding() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "bidding",
- Short: "Start processing bidding data",
- Run: func(cmd *cobra.Command, args []string) {
- go updateEsMethod()
- taskBidding()
- },
- }
- //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
- return cmdClient
- }
- // @Description 医疗机构数据
- // @Author J 2022/8/11 16:50
- func institution() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "medical_institution",
- Short: "Start processing medical_institutional data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveEs(config.Conf.DB.Es.IndexM, config.Conf.DB.Es.TypeM)
- taskIterateSql()
- },
- }
- //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
- return cmdClient
- }
- // @Description 供应商/经销商数据
- // @Author J 2022/8/11 16:49
- func product() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "supplier_product",
- Short: "Start processing supplier_product data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveEs(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS)
- taskIterateSql1()
- },
- }
- //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
- return cmdClient
- }
- func taskBidding() {
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5a8d7f4840d2d9bbe8962002")}
- query := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%20000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if b := util.ObjToString(tmp["bid_field"]); b != "" {
- updateEsPool <- []map[string]interface{}{{
- "_id": mongodb.BsonIdToSId(tmp["_id"]),
- },
- {"bid_field": b},
- }
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- func taskIterateSql() {
- pool := make(chan bool, 10) //控制线程数
- wg := &sync.WaitGroup{}
- finalId := 0
- lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "institution_baseinfo"))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Info("taskIterateSql---", zap.Int("finally id: ", finalId))
- lastid, count := 0, 0
- for {
- util.Debug("重新查询,lastid---", lastid)
- q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "institution_baseinfo", lastid)
- rows, err := MysqlM.DB.Query(q)
- if err != nil {
- log.Error("taskIterateSql---", zap.Error(err))
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- util.Debug("----finish----------", count)
- break
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Error("taskIterateSql---", zap.Error(err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["id"])
- count++
- if count%500 == 0 {
- util.Debug("current-------", count, lastid)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- EsSaveCache <- method(tmp)
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- wg.Wait()
- }
- }
- func method(tmp map[string]interface{}) map[string]interface{} {
- m := make(map[string]interface{})
- for k, v := range config.Conf.DB.Es.FieldM {
- if k == "alias" {
- var arr []string
- info := MysqlM.Find("institution_alias", map[string]interface{}{"company_id": tmp["company_id"]}, "", "", -1, -1)
- for _, m2 := range *info {
- arr = append(arr, util.ObjToString(m2["alias"]))
- }
- if len(arr) > 0 {
- m[k] = strings.Join(arr, ",")
- }
- } else if k == "sdequipment" {
- if util.ObjToString(tmp["mi_type_code"]) == "0208" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
- m[k] = "多功能电离子手术治疗机,CO2激光治疗仪,半导体激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针、电刀、电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机"
- } else if util.ObjToString(tmp["mi_type_code"]) == "0201" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
- m[k] = "牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机"
- } else if util.ObjToString(tmp["mi_type_code"]) == "0203" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
- m[k] = "小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,胆红素测定仪,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,经皮给药治疗仪,儿童智能测量仪"
- } else if util.ObjToString(tmp["mi_type_code"]) == "12" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
- m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏起搏器,心脏除颤器,心脏复苏机,呼吸机,儿童用呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,血压计,体温计,体重计,空气消毒机"
- } else if util.ObjToString(tmp["mi_type_code"]) == "01" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
- m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏除颤器,心脏复苏机,呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,空气消毒机,动态心电监测系统,有创呼吸机,动态血压监护仪,便携式血氧饱和度监护仪,多导睡眠呼吸监测仪,床边肺功能仪,便携式血气分析仪,心肺功能监测仪,胃动力检测仪,胃电治疗仪,腹水超滤仪,腹水浓缩机,人工肝,肝病治疗仪,床单位臭氧消毒机,结肠灌洗治疗仪,经皮肾镜,动态血糖监测仪,胰岛素泵,糖尿病足病诊断箱,眼底镜,空气波压力治疗仪,动态脑电监护仪,颅内压监测仪,脑水肿监测仪,半导体激光治疗仪,钴60放射治疗机,超声聚焦刀,氩氦刀,射频肿瘤治疗仪,微波热疗仪,双筒显微镜,相差显微镜,荧光显微镜,倒置显微镜,骨髓活检装置,流式细胞分析仪,细胞分离机,脑细胞介质分析仪,双光能骨密度仪,脑电超慢涨落分析仪,动脉硬化测试仪,移动式负压吸引器,换药床,乳腺微创真空旋切系统,外碎石设备,骨科牵引床,脊柱牵引床,推拿手法床,石膏床,石膏剪,石膏锯,水温箱,足底静脉泵,激光治疗仪,骨科康复设备,吸引设备,供氧设备,监护设备,呼叫系统,心脏除颤仪,抢救车,换药车,转运床,营养输注泵,防褥疮气垫,血压计,体温计,体重计,移动紫外线灯,负压病房设施,层流病房设施,心脏起搏器,便携式呼吸机,雾化器,床边支气管镜,血液动力学检测仪,电冰毯,电子冰帽,脑电图监测仪,脑功能监测仪,振动排痰器,床单元臭氧消毒机,层流净化系统,holter,主动脉球囊反搏泵,便携式超声诊断仪,食道电生理仪,诊察床,肛管直肠压力测定设备,肛门镜,肛门坐浴熏洗设备,结肠灌洗设备,肛肠综合治疗仪,痔科套扎器,肛肠内腔治疗仪,肛门肌电图,多功能电离子手术治疗机,CO2激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针,电刀,电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机,母婴监护仪,妇科检查台,计划生育手术床,冲洗车,阴道镜,人流吸引器,超声诊断仪,超高频电波刀,超声聚焦治疗仪,盆腔炎治疗设备,产后康复综合治疗仪,胎心监护仪,妇科检查床,综合产床,新生儿抢救台,婴儿辐射保暖台,婴儿培养箱,电动羊水吸引器,经皮给药治疗仪,胆红素测定仪,产妇电脑综合治疗仪,消毒隔离器械柜,小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,儿童智能测量仪,眼科治疗床,裂隙灯,眼压计,角膜曲率计,视力灯箱,客观视力仪,电脑验光仪,全自动电脑视野仪,手术显微镜,眼科AB超声仪,超声乳化治疗仪,眼底荧光造影仪,自动焦度仪,沙眼治疗仪,睫毛电解器,视觉诱发电位仪,耳鼻喉综合治疗台,耳科旋转椅,鼓气电窥耳镜,耳钻,动态喉镜,纤维喉镜,间接喉镜,直接喉镜,支撑喉镜,电子喉镜,鼻咽喉镜,间接鼻咽喉镜,前鼻镜,鼻内镜及手术系统,电测听器,前庭检查仪,眼震电图仪,声阻抗仪,五官科多用显微镜,牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机,普通X光机,洗片机,透视机,移动式X光机,数字X线摄影,CR,DR,干式激光相机,X线电子计算机断层扫描装置,CT,数字减影血管造影X光机,DSA,数字化胃肠X光机,数据传输系统,LIS,全自动血细胞分析仪,尿液分析仪,尿沉渣工作站,冰点渗透压计,凝血检测仪,血糖测定仪,微量血糖测定仪,血气分析仪,干式生化分析仪,生化分析仪,发光免疫分析仪,全自动酶免免疫分析系统,酶标仪,电泳分析仪,血小板聚集仪,全自动细菌培养系统,生物培养箱,微生物鉴定药敏分析仪,血培养仪,菌落计数器,厌氧菌培养箱,幽门螺旋杆菌检测仪,氨基酸分析系统,荧光定量PCR检测系统,TCT液基细胞学检测仪,HPV-DNA检测系统,心梗三项检测仪,脑钠肽检测仪,二氧化碳培养箱,高温灭菌器,生物安全柜,血液流变仪,普通显微镜,生物显微镜,血沉仪,蛋白电泳仪,特种蛋白仪,电解质分析仪,精子分析系统,血栓弹力分析仪,血型鉴定及配血设备,纯水系统,自动洗板机,分析天平,超声清洗器,振荡器,电热培养箱,恒温水浴箱,医用冷库,医用冷藏柜,超低温冰柜,普通离心机,低速冷冻离心机,高速冷冻离心机,快速血糖仪微量泵,输液泵营养,输注泵,负压病房设施,层流病房设施,有创呼吸机便携式呼吸机,心电图机动态心电监测系统,微量泵输液泵,食道电生理仪床,单元臭氧消毒机,肛管直肠,压力测定设备,超声聚焦治疗仪,盆腔炎治疗设备,裂隙灯眼压计,眼底镜角膜曲率计视力,灯箱客观视力仪,睫毛电解器,鼓气电窥耳镜耳钻,动态喉镜纤维喉镜,间接喉镜直接喉镜,前鼻镜鼻,内镜及手术系统,电测听器前庭检查仪,抛光机氦氖激光器,光敏固化灯种植机,喷砂机铸造机石膏振荡器,干燥箱全瓷/铸造烤瓷设备,普通X光机,透视机移动式X光机,X线电子计算机,数字化胃肠X光机,数据传输系统(LIS)全自动血细胞分析仪,尿沉渣工作站冰点渗透压计,全自动细菌培养系统,生物培养箱,血沉仪蛋白电泳仪,分析天平超声清洗器,干燥箱医用冰箱,心脏复苏机,营养输注泵,血透机,自动腹透机,外碎石设备,核磁共振仪,核磁共振成像系统"
- }
- info1, _ := MongoTool.FindOneByField("20220906shebei_buchong", map[string]interface{}{"company_id": tmp["company_id"]}, map[string]interface{}{"itemname_all": 1})
- if info1 != nil && len(*info1) > 0 {
- if m[k] != nil {
- m[k] = fmt.Sprintf("%s,%s", m[k], (*info1)["itemname_all"])
- } else {
- m[k] = (*info1)["itemname_all"]
- }
- }
- info := MysqlM.FindOne("code_sdleveltypeequip", map[string]interface{}{"code": tmp["sdequipment_code"]}, "", "")
- if info != nil && len(*info) > 0 {
- if m[k] != nil {
- m[k] = fmt.Sprintf("%s,%s", m[k], (*info)["equipment"])
- } else {
- m[k] = (*info)["equipment"]
- }
- }
- } else if k == "area_code" {
- m[k] = tmp[k]
- info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "")
- if info != nil && len(*info) > 0 {
- m["area"] = (*info)["area"]
- if (*info)["city"] != nil {
- m["city"] = (*info)["city"]
- }
- if (*info)["district"] != nil {
- m["district"] = (*info)["district"]
- }
- }
- } else if k == "mi_type_code" {
- m[k] = string([]byte(util.ObjToString(tmp[k]))[:2])
- } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" {
- continue
- } else {
- if v == "string" {
- m[k] = tmp[k]
- } else {
- m[k] = util.IntAll(tmp[k])
- }
- }
- }
- return m
- }
- func taskIterateSql1() {
- pool := make(chan bool, 10) //控制线程数
- wg := &sync.WaitGroup{}
- finalId := 0
- lastInfo := MysqlB.SelectBySql(fmt.Sprintf("SELECT id FROM %s WHERE sourcetype>=2 ORDER BY id DESC LIMIT 1", "company_baseinfo"))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Info("taskIterateSql1---", zap.Int("finally id", finalId))
- lastid, count := 0, 0
- for {
- util.Debug("重新查询,lastid---", lastid)
- q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d and sourcetype>=2 ORDER BY id ASC limit 100000", "company_baseinfo", lastid)
- //q := fmt.Sprintf("SELECT * FROM %s WHERE id = %d ORDER BY id ASC limit 1000000", "company_baseinfo", 12526)
- rows, err := MysqlB.DB.Query(q)
- if err != nil {
- log.Error("taskIterateSql1---", zap.Error(err))
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- util.Debug("----finish----------", count)
- break
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Error("taskIterateSql1---", zap.Error(err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["id"])
- count++
- if count%500 == 0 {
- util.Debug("current-------", count, lastid)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- EsSaveCache <- method1(tmp)
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- wg.Wait()
- }
- }
- func method1(tmp map[string]interface{}) map[string]interface{} {
- m := make(map[string]interface{})
- for k, v := range config.Conf.DB.Es.FieldS {
- if k == "business_model" {
- info := MysqlB.FindOne("company_business_model", map[string]interface{}{"company_id": tmp["company_id"], "company_field_code": "0101"}, "", "")
- if info != nil && len(*info) > 0 {
- m[k] = util.IntAll((*info)["business_model"])
- } else {
- m[k] = 2
- }
- } else if k == "supplier" {
- m[k] = tmp["company_name"]
- } else if k == "productlist" {
- var p = method2(util.ObjToString(tmp["company_id"]))
- if p != nil {
- m[k] = p
- }
- } else if k == "area_code" {
- if tmp[k] != nil {
- m[k] = tmp[k]
- info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "")
- if info != nil && len(*info) > 0 {
- m["area"] = (*info)["area"]
- if (*info)["city"] != nil {
- m["city"] = (*info)["city"]
- }
- if (*info)["district"] != nil {
- m["district"] = (*info)["district"]
- }
- }
- } else {
- log.Error("area_code", zap.Any("id", tmp["company_id"]))
- }
- } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" {
- continue
- } else {
- if v == "string" {
- m[k] = tmp[k]
- } else {
- m[k] = util.IntAll(tmp[k])
- }
- }
- }
- return m
- }
- func method2(cid string) []map[string]interface{} {
- var pmap []map[string]interface{}
- mc := make(map[string]bool) // 记录name 去重
- pinfo1 := MysqlM.Find("product_baseinfo", map[string]interface{}{"company_id": cid}, "", "", -1, -1)
- for _, m2 := range *pinfo1 {
- m := make(map[string]interface{})
- m["name"] = m2["product_name"]
- pmap = append(pmap, m)
- mc[util.ObjToString(m2["product_name"])] = true
- }
- pinfo2, _ := MongoTool.Find("bidding_p_list_0907", map[string]interface{}{"company_id": cid}, nil, nil, false, -1, -1)
- if len(*pinfo2) > 0 {
- for _, m2 := range *pinfo2 {
- key := util.ObjToString(m2["itemname"]) + util.ObjToString(m2["brand"]) + util.ObjToString(m2["model"])
- if mc[key] {
- continue
- } else {
- m := make(map[string]interface{})
- m["name"] = util.ObjToString(m2["itemname"])
- if m2["model"] != nil {
- m["model"] = m2["model"]
- }
- if m2["brand"] != nil {
- m["brand"] = m2["brand"]
- }
- pmap = append(pmap, m)
- mc[key] = true
- }
- }
- }
- return pmap
- }
- func SaveEs(i, t string) {
- log.Info("SaveEs---", zap.String("i", i), zap.String("t", t))
- arru := make([]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-EsSaveCache:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- Es.BulkSave(i, t, &arru, false)
- }(arru)
- arru = make([]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- Es.BulkSave(i, t, &arru, false)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", "bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", "bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|