package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "app.yhyue.com/data_processing/common_utils/redis" "fieldproject_common/config" "fmt" uuid "github.com/satori/go.uuid" "github.com/spf13/cobra" "go.uber.org/zap" "strings" "sync" "time" "unicode/utf8" ) // @Description bidding数据 bid_field // @Author J 2022/8/30 09:09 func bidding() *cobra.Command { cmdClient := &cobra.Command{ Use: "bidding", Short: "Start processing bidding data", Run: func(cmd *cobra.Command, args []string) { go updateEsMethod() taskBidding() }, } //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]") return cmdClient } // @Description 医疗机构数据 // @Author J 2022/8/11 16:50 func institution() *cobra.Command { cmdClient := &cobra.Command{ Use: "medical_institution", Short: "Start processing medical_institutional data", Run: func(cmd *cobra.Command, args []string) { go SaveEs(config.Conf.DB.Es.IndexM, config.Conf.DB.Es.TypeM) taskIterateSql() }, } //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]") return cmdClient } // @Description 产品数据 // @Author J 2022/8/11 16:49 func product() *cobra.Command { cmdClient := &cobra.Command{ Use: "product", Short: "Start processing product data", Run: func(cmd *cobra.Command, args []string) { go SaveFunc("dws_f_product_baseinfo", ProductField) //taskIterateSql1() taskProduct() }, } //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]") return cmdClient } func dealer() *cobra.Command { cmdClient := &cobra.Command{ Use: "dealer", Short: "Start processing dealer data", Run: func(cmd *cobra.Command, args []string) { go SaveFunc("dwd_f_yl_dealer_baseinfo_new", DealerField) //go SaveFuncRc() taskDealer() }, } return cmdClient } func ent() *cobra.Command { cmdClient := &cobra.Command{ Use: "ent", Short: "Start processing ent data", Run: func(cmd *cobra.Command, args []string) { go SaveFunc("dws_f_ent_pa_baseinfo", EntField) taskEnt() }, } return cmdClient } func register() *cobra.Command { cmdClient := &cobra.Command{ Use: "register", Short: "Start processing register data", Run: func(cmd *cobra.Command, args []string) { go SaveFunc("dws_f_register_baseinfo", RegField) taskRegister() }, } return cmdClient } func project() *cobra.Command { cmdClient := &cobra.Command{ Use: "project", Short: "Start processing project data", Run: func(cmd *cobra.Command, args []string) { go SaveFunc("dwd_f_yl_purchasing_baseinfo_new", ProjectField) //go SaveFunc1("dwd_f_yl_purchasing_win_baseinfo_new", WinerField) InitPoCode() InitLvCode() redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id redis.InitRedis1("bid_class=172.17.4.189:8379", 7) // class taskProject() }, } //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]") return cmdClient } func taskBidding() { sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5a8d7f4840d2d9bbe8962002")} query := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if b := util.ObjToString(tmp["bid_field"]); b != "" { updateEsPool <- []map[string]interface{}{{ "_id": mongodb.BsonIdToSId(tmp["_id"]), }, {"bid_field": b}, } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func taskIterateSql() { pool := make(chan bool, 10) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "institution_baseinfo")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("taskIterateSql---", zap.Int("finally id: ", finalId)) lastid, count := 0, 0 for { util.Debug("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "institution_baseinfo", lastid) rows, err := MysqlM.DB.Query(q) if err != nil { log.Error("taskIterateSql---", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { util.Debug("----finish----------", count) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("taskIterateSql---", zap.Error(err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%500 == 0 { util.Debug("current-------", count, lastid) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() EsSaveCache <- method(tmp) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } func taskDealer() { sess := MongoTool2.GetMgoConn() defer MongoTool2.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_company_info").Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() saveM := make(map[string]interface{}) record := make(map[string]interface{}) for _, f := range DealerField { if f == "name_id" && util.ObjToString(tmp["name_id"]) == "" { name_id := strings.ReplaceAll(uuid.NewV4().String(), "-", "") saveM[f] = name_id saveM["exists_id"] = 0 record = map[string]interface{}{"name_id": name_id, "name": tmp["company_name"], "type": 2, "createtime": time.Now().Format(util.Date_Full_Layout)} } else if f == "dealer_name" { saveM[f] = tmp["company_name"] } else if f == "area_code" { if tmp["area"] != nil { saveM[f] = AreaCode[util.ObjToString(tmp["area"])] } } else if f == "city_code" { if tmp["area"] != nil && tmp["city"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) saveM[f] = AreaCode[c] } } else if f == "district_code" { if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"]) saveM[f] = AreaCode[c] } } else if f == "business_model" { saveM[f] = tmp["business_type"] } else if f == "capital" { text := util.ObjToString(tmp["capital"]) capital := ObjToMoney(text) capital = capital / 10000 if capital != 0 { capital, _ = util.FormatFloat(capital, 2) } else { capital = 0 } saveM[f] = capital if capital < 100 { saveM["capital_code"] = 1 } else if capital >= 100 && capital < 500 { saveM["capital_code"] = 2 } else if capital >= 500 && capital < 1000 { saveM["capital_code"] = 3 } else if capital >= 1000 { saveM["capital_code"] = 4 } } else if f == "createtime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "website" { tid := util.ObjToString(tmp["company_id"]) std, _ := MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": tid}, map[string]interface{}{"website_url": 1}) if std != nil && len(*std) > 0 && len(util.ObjToString((*std)["website_url"])) <= 255 { saveM[f] = util.ObjToString((*std)["website_url"]) } } else { if tmp[f] != nil { saveM[f] = tmp[f] } } } if len(record) > 0 { saveRcPool <- record } saveBasePool <- saveM }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func taskEnt() { sess := MongoTool2.GetMgoConn() defer MongoTool2.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_company_info").Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() saveM := make(map[string]interface{}) for _, f := range EntField { if f == "area_code" { if tmp["area"] != nil { saveM[f] = AreaCode[util.ObjToString(tmp["area"])] } } else if f == "issue_date" { if util.ObjToString(tmp[f]) != "" { saveM[f] = tmp[f] } } else if f == "sourcetype" { saveM[f] = 1 } else if f == "createtime" || f == "updatetime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "website" { tid := util.ObjToString(tmp["company_id"]) std, _ := MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": tid}, map[string]interface{}{"website_url": 1}) if std != nil && len(*std) > 0 && len(util.ObjToString((*std)["website_url"])) <= 255 { saveM[f] = util.ObjToString((*std)["website_url"]) } } else { if tmp[f] != nil { saveM[f] = tmp[f] } } } saveBasePool <- saveM }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func taskRegister() { sess := MongoTool2.GetMgoConn() defer MongoTool2.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("nmpa_company").Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() saveM := make(map[string]interface{}) for _, f := range RegField { if f == "dealer_id" { name := util.ObjToString(tmp["company"]) info := MysqlM.FindOne("dws_f_dealer_baseinfo", map[string]interface{}{"dealer_name": name}, "name_id", "") if info == nil || len(*info) == 0 { return } saveM[f] = (*info)["name_id"] saveM["website"] = (*info)["website"] } else if f == "company_name" { saveM[f] = tmp["company"] } else if f == "regnum" { if tmp["reg_no"] != nil { saveM[f] = tmp["reg_no"] } } else if f == "scope" { if util.ObjToString(tmp["class"]) == "生产型" { saveM[f] = tmp["product_range"] } else if util.ObjToString(tmp["class"]) == "经营型" { saveM[f] = tmp["business_range"] } } else if f == "type" { if util.ObjToString(tmp[f]) == "备案企业" { saveM[f] = 2 } else if util.ObjToString(tmp[f]) == "许可企业" { saveM[f] = 1 } } else if f == "approve_depart" { saveM[f] = tmp["badw"] } else if f == "approve_date" { if util.ObjToString(tmp["barq"]) != "" && util.ObjToString(tmp["barq"]) != "null" { saveM[f] = tmp["barq"] } } else if f == "validity_date" { if util.ObjToString(tmp["yxqx"]) != "" && util.ObjToString(tmp["yxqx"]) != "null" { saveM[f] = tmp["yxqx"] } } else if f == "type_address" { if util.ObjToString(tmp["class"]) == "生产型" { saveM[f] = tmp["product_address"] } else if util.ObjToString(tmp["class"]) == "经营型" { saveM[f] = tmp["business_address"] } } else if f == "createtime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else { if tmp[f] != nil { saveM[f] = tmp[f] } } } saveBasePool <- saveM }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func taskProduct() { sess := MongoTool2.GetMgoConn() defer MongoTool2.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_product_info").Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() saveM := make(map[string]interface{}) for _, f := range ProductField { if f == "dealer_id" { name := util.ObjToString(tmp["company_name"]) info := MysqlM.FindOne("dws_f_dealer_baseinfo", map[string]interface{}{"dealer_name": name}, "name_id", "") if info == nil || len(*info) == 0 { MongoTool2.Save("product_err_record", map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"])}) return } saveM[f] = (*info)["name_id"] } else if f == "make_country" || f == "regist_type" { saveM[f] = util.IntAll(tmp[f]) } else if f == "createtime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "medical_equipment_class1" { saveM[f] = tmp["product_class1"] } else if f == "medical_equipment_class2" { saveM[f] = tmp["product_class2"] } else if f == "medical_equipment_class3" { saveM[f] = tmp["product_class3"] } else if f == "sdproduct_name" { saveM[f] = tmp["product_class4"] } else if f == "sdequipment_code" { if len(util.ObjToString(tmp["product_code"])) > 7 { saveM[f] = util.ObjToString(tmp["product_code"])[:7] } else { saveM[f] = tmp["product_code"] } } else { if tmp[f] != nil { saveM[f] = tmp[f] } } } saveBasePool <- saveM }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func taskProject() { sess := MongoTool1.GetMgoConn() defer MongoTool1.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.Mongo1.Dbname).C("projectset_medical").Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if tmp["jg_plist"] == nil { return } saveM := make(map[string]interface{}) infoid := util.ObjToString(tmp["sourceinfoid"]) saveM["projectid"] = mongodb.BsonIdToSId(tmp["_id"]) saveM["infoid"] = infoid saveM["jyhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid)) saveM["bidstatus"] = Bidstatus[util.ObjToString(tmp["bidstatus"])] saveM["bidstype"] = Bidtype[util.ObjToString(tmp["bidstype"])] saveM["updatetime"] = time.Now().Format(util.Date_Full_Layout) saveM["createtime"] = time.Now().Format(util.Date_Full_Layout) if tmp["budget"] != nil { saveM["budget"], _ = util.FormatFloat(util.Float64All(tmp["budget"]), 4) } if tmp["area"] != nil { saveM["area_code"] = AreaCode[util.ObjToString(tmp["area"])] } if tmp["area"] != nil && tmp["city"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) saveM["city_code"] = AreaCode[c] } if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"]) saveM["district_code"] = AreaCode[c] } for _, f := range []string{"title", "projectname", "projectcode", "purchasing", "agency", "buyer"} { if f == "purchasing" { if utf8.RuneCountInString(util.ObjToString(tmp[f])) < 20000 { saveM[f] = tmp[f] } } else { if util.ObjToString(tmp[f]) != "" { saveM[f] = tmp[f] } } } if b := util.ObjToString(tmp["buyer"]); b != "" { if eid := redis.GetStr("ent_id", b); eid != "" { saveM["buyer_id"] = strings.Split(eid, "_")[0] saveM["mi_area_code"] = strings.Split(eid, "_")[1] if cd := LvCode[strings.Split(eid, "_")[0]]; cd != "" { saveM["mi_level_code"] = cd } } } if a := util.ObjToString(tmp["agency"]); a != "" { if eid := redis.GetStr("ent_id", a); eid != "" { saveM["agency_id"] = strings.Split(eid, "_")[0] } } for _, f := range []string{"bidopentime", "zbtime", "jgtime"} { if util.IntAll(tmp[f]) > 0 { t := util.Int64All(tmp[f]) saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } if t := util.Int64All(tmp["jgtime"]); t > 0 { y := time.Unix(t, 0).Year() m := time.Unix(t, 0).Month().String() saveM["year_tags"] = y saveM["month_tags"] = Month[m] saveM["quarter_tags"] = Quarter[m] } pname := make(map[string]bool) // 标的物名称 去重 for _, p := range tmp["jg_plist"].([]interface{}) { p1 := p.(map[string]interface{}) if name := util.ObjToString(p1["itemname"]); name != "" && !pname[name] { if utf8.RuneCountInString(name) > 255 { continue } pname[name] = true saveM1 := util.DeepCopy(saveM).(map[string]interface{}) saveM1["itemname"] = name if code := redis.GetStr("bid_class", fmt.Sprintf("%s_%s", infoid, name)); code != "" { if len(code) > 7 { saveM1["sdequipment_code"] = code[:7] } else { saveM1["sdequipment_code"] = code } for k, v := range PclassCode[code] { if k == "class_1" { saveM1["medical_equipment_class1"] = v } else if k == "class_2" { saveM1["medical_equipment_class2"] = v } else if k == "class_3" { saveM1["medical_equipment_class3"] = v } else if k == "class_4" { saveM1["sdproduct_name"] = v } } } if p1["brandname"] != nil { saveM1["brandname"] = p1["brandname"] } if p1["specs"] != nil { saveM1["specs"] = p1["specs"] } if p1["model"] != nil && utf8.RuneCountInString(util.ObjToString(p1["model"])) < 200 { saveM1["model"] = p1["model"] } if p1["unitname"] != nil { saveM1["unitname"] = p1["unitname"] } if p1["number"] != nil && util.IntAll(p1["number"]) < 100000 { saveM1["number"] = util.IntAll(p1["number"]) } if p1["unitprice"] != nil && util.Float64All(p1["unitprice"]) < 100000000 { saveM1["unitprice"], _ = util.FormatFloat(util.Float64All(p1["unitprice"]), 2) } if p1["totalprice"] != nil { saveM1["totalprice"], _ = util.FormatFloat(util.Float64All(p1["totalprice"]), 2) } saveBasePool <- saveM1 } } // 中标信息 saveW := make(map[string]interface{}) if w := util.ObjToString(tmp["winner"]); w != "" { saveW["winner"] = w if eid := redis.GetStr("ent_id", w); eid != "" { saveW["winner_id"] = strings.Split(eid, "_")[0] saveW["winner_area_code"] = strings.Split(eid, "_")[1] if ccode := strings.Split(eid, "_")[2]; ccode != "" { saveW["winner_city_code"] = ccode } } if util.ObjToString(tmp["winnertel"]) != "" { saveW["contact_tel"] = tmp["winnertel"] } if util.ObjToString(tmp["winnerperson"]) != "" { saveW["contact_name"] = tmp["winnerperson"] } saveW["projectid"] = mongodb.BsonIdToSId(tmp["_id"]) saveW["infoid"] = util.ObjToString(tmp["sourceinfoid"]) if tmp["bidamount"] != nil { saveW["bidamount"], _ = util.FormatFloat(util.Float64All(tmp["bidamount"]), 4) } if util.IntAll(tmp["jgtime"]) > 0 { t := util.Int64All(tmp["jgtime"]) saveW["jgtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } saveW["is_winner"] = 1 saveW["updatetime"] = time.Now().Format(util.Date_Full_Layout) saveW["createtime"] = time.Now().Format(util.Date_Full_Layout) saveBasePool1 <- saveW } //中标候选 if tmp["winnerorder"] != nil { saveW1 := make(map[string]interface{}) for _, w := range tmp["winnerorder"].([]interface{}) { if util.ObjToString(w) == "" { continue } saveW1["winner"] = w if eid := redis.GetStr("ent_id", util.ObjToString(w)); eid != "" { saveW1["winner_id"] = strings.Split(eid, "_")[0] saveW1["winner_area_code"] = strings.Split(eid, "_")[1] if ccode := strings.Split(eid, "_")[2]; ccode != "" { saveW1["winner_city_code"] = ccode } } saveW1["projectid"] = mongodb.BsonIdToSId(tmp["_id"]) saveW1["infoid"] = util.ObjToString(tmp["sourceinfoid"]) if tmp["bidamount"] != nil { saveW1["bidamount"], _ = util.FormatFloat(util.Float64All(tmp["bidamount"]), 4) } if util.IntAll(tmp["jgtime"]) > 0 { t := util.Int64All(tmp["jgtime"]) saveW["jgtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } saveW["is_winner"] = 2 saveW["updatetime"] = time.Now().Format(util.Date_Full_Layout) saveW["createtime"] = time.Now().Format(util.Date_Full_Layout) saveBasePool1 <- saveW } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func method(tmp map[string]interface{}) map[string]interface{} { m := make(map[string]interface{}) for k, v := range config.Conf.DB.Es.FieldM { if k == "alias" { var arr []string info := MysqlM.Find("institution_alias", map[string]interface{}{"company_id": tmp["company_id"]}, "", "", -1, -1) for _, m2 := range *info { arr = append(arr, util.ObjToString(m2["alias"])) } if len(arr) > 0 { m[k] = strings.Join(arr, ",") } } else if k == "sdequipment" { if util.ObjToString(tmp["mi_type_code"]) == "0208" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") { m[k] = "多功能电离子手术治疗机,CO2激光治疗仪,半导体激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针、电刀、电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机" } else if util.ObjToString(tmp["mi_type_code"]) == "0201" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") { m[k] = "牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机" } else if util.ObjToString(tmp["mi_type_code"]) == "0203" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") { m[k] = "小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,胆红素测定仪,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,经皮给药治疗仪,儿童智能测量仪" } else if util.ObjToString(tmp["mi_type_code"]) == "12" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") { m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏起搏器,心脏除颤器,心脏复苏机,呼吸机,儿童用呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,血压计,体温计,体重计,空气消毒机" } else if util.ObjToString(tmp["mi_type_code"]) == "01" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") { m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏除颤器,心脏复苏机,呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,空气消毒机,动态心电监测系统,有创呼吸机,动态血压监护仪,便携式血氧饱和度监护仪,多导睡眠呼吸监测仪,床边肺功能仪,便携式血气分析仪,心肺功能监测仪,胃动力检测仪,胃电治疗仪,腹水超滤仪,腹水浓缩机,人工肝,肝病治疗仪,床单位臭氧消毒机,结肠灌洗治疗仪,经皮肾镜,动态血糖监测仪,胰岛素泵,糖尿病足病诊断箱,眼底镜,空气波压力治疗仪,动态脑电监护仪,颅内压监测仪,脑水肿监测仪,半导体激光治疗仪,钴60放射治疗机,超声聚焦刀,氩氦刀,射频肿瘤治疗仪,微波热疗仪,双筒显微镜,相差显微镜,荧光显微镜,倒置显微镜,骨髓活检装置,流式细胞分析仪,细胞分离机,脑细胞介质分析仪,双光能骨密度仪,脑电超慢涨落分析仪,动脉硬化测试仪,移动式负压吸引器,换药床,乳腺微创真空旋切系统,外碎石设备,骨科牵引床,脊柱牵引床,推拿手法床,石膏床,石膏剪,石膏锯,水温箱,足底静脉泵,激光治疗仪,骨科康复设备,吸引设备,供氧设备,监护设备,呼叫系统,心脏除颤仪,抢救车,换药车,转运床,营养输注泵,防褥疮气垫,血压计,体温计,体重计,移动紫外线灯,负压病房设施,层流病房设施,心脏起搏器,便携式呼吸机,雾化器,床边支气管镜,血液动力学检测仪,电冰毯,电子冰帽,脑电图监测仪,脑功能监测仪,振动排痰器,床单元臭氧消毒机,层流净化系统,holter,主动脉球囊反搏泵,便携式超声诊断仪,食道电生理仪,诊察床,肛管直肠压力测定设备,肛门镜,肛门坐浴熏洗设备,结肠灌洗设备,肛肠综合治疗仪,痔科套扎器,肛肠内腔治疗仪,肛门肌电图,多功能电离子手术治疗机,CO2激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针,电刀,电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机,母婴监护仪,妇科检查台,计划生育手术床,冲洗车,阴道镜,人流吸引器,超声诊断仪,超高频电波刀,超声聚焦治疗仪,盆腔炎治疗设备,产后康复综合治疗仪,胎心监护仪,妇科检查床,综合产床,新生儿抢救台,婴儿辐射保暖台,婴儿培养箱,电动羊水吸引器,经皮给药治疗仪,胆红素测定仪,产妇电脑综合治疗仪,消毒隔离器械柜,小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,儿童智能测量仪,眼科治疗床,裂隙灯,眼压计,角膜曲率计,视力灯箱,客观视力仪,电脑验光仪,全自动电脑视野仪,手术显微镜,眼科AB超声仪,超声乳化治疗仪,眼底荧光造影仪,自动焦度仪,沙眼治疗仪,睫毛电解器,视觉诱发电位仪,耳鼻喉综合治疗台,耳科旋转椅,鼓气电窥耳镜,耳钻,动态喉镜,纤维喉镜,间接喉镜,直接喉镜,支撑喉镜,电子喉镜,鼻咽喉镜,间接鼻咽喉镜,前鼻镜,鼻内镜及手术系统,电测听器,前庭检查仪,眼震电图仪,声阻抗仪,五官科多用显微镜,牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机,普通X光机,洗片机,透视机,移动式X光机,数字X线摄影,CR,DR,干式激光相机,X线电子计算机断层扫描装置,CT,数字减影血管造影X光机,DSA,数字化胃肠X光机,数据传输系统,LIS,全自动血细胞分析仪,尿液分析仪,尿沉渣工作站,冰点渗透压计,凝血检测仪,血糖测定仪,微量血糖测定仪,血气分析仪,干式生化分析仪,生化分析仪,发光免疫分析仪,全自动酶免免疫分析系统,酶标仪,电泳分析仪,血小板聚集仪,全自动细菌培养系统,生物培养箱,微生物鉴定药敏分析仪,血培养仪,菌落计数器,厌氧菌培养箱,幽门螺旋杆菌检测仪,氨基酸分析系统,荧光定量PCR检测系统,TCT液基细胞学检测仪,HPV-DNA检测系统,心梗三项检测仪,脑钠肽检测仪,二氧化碳培养箱,高温灭菌器,生物安全柜,血液流变仪,普通显微镜,生物显微镜,血沉仪,蛋白电泳仪,特种蛋白仪,电解质分析仪,精子分析系统,血栓弹力分析仪,血型鉴定及配血设备,纯水系统,自动洗板机,分析天平,超声清洗器,振荡器,电热培养箱,恒温水浴箱,医用冷库,医用冷藏柜,超低温冰柜,普通离心机,低速冷冻离心机,高速冷冻离心机,快速血糖仪微量泵,输液泵营养,输注泵,负压病房设施,层流病房设施,有创呼吸机便携式呼吸机,心电图机动态心电监测系统,微量泵输液泵,食道电生理仪床,单元臭氧消毒机,肛管直肠,压力测定设备,超声聚焦治疗仪,盆腔炎治疗设备,裂隙灯眼压计,眼底镜角膜曲率计视力,灯箱客观视力仪,睫毛电解器,鼓气电窥耳镜耳钻,动态喉镜纤维喉镜,间接喉镜直接喉镜,前鼻镜鼻,内镜及手术系统,电测听器前庭检查仪,抛光机氦氖激光器,光敏固化灯种植机,喷砂机铸造机石膏振荡器,干燥箱全瓷/铸造烤瓷设备,普通X光机,透视机移动式X光机,X线电子计算机,数字化胃肠X光机,数据传输系统(LIS)全自动血细胞分析仪,尿沉渣工作站冰点渗透压计,全自动细菌培养系统,生物培养箱,血沉仪蛋白电泳仪,分析天平超声清洗器,干燥箱医用冰箱,心脏复苏机,营养输注泵,血透机,自动腹透机,外碎石设备,核磁共振仪,核磁共振成像系统" } info1, _ := MongoTool.FindOneByField("20220906shebei_buchong", map[string]interface{}{"company_id": tmp["company_id"]}, map[string]interface{}{"itemname_all": 1}) if info1 != nil && len(*info1) > 0 { if m[k] != nil { m[k] = fmt.Sprintf("%s,%s", m[k], (*info1)["itemname_all"]) } else { m[k] = (*info1)["itemname_all"] } } info := MysqlM.FindOne("code_sdleveltypeequip", map[string]interface{}{"code": tmp["sdequipment_code"]}, "", "") if info != nil && len(*info) > 0 { if m[k] != nil { m[k] = fmt.Sprintf("%s,%s", m[k], (*info)["equipment"]) } else { m[k] = (*info)["equipment"] } } } else if k == "area_code" { m[k] = tmp[k] info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "") if info != nil && len(*info) > 0 { m["area"] = (*info)["area"] if (*info)["city"] != nil { m["city"] = (*info)["city"] } if (*info)["district"] != nil { m["district"] = (*info)["district"] } } } else if k == "mi_type_code" { m[k] = string([]byte(util.ObjToString(tmp[k]))[:2]) } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" { continue } else { if v == "string" { m[k] = tmp[k] } else { m[k] = util.IntAll(tmp[k]) } } } return m } func taskIterateSql1() { pool := make(chan bool, 10) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlB.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY product_id DESC LIMIT 1", "product_baseinfo")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["product_id"]) } log.Info("taskIterateSql1---", zap.Int("finally id", finalId)) lastid, count := 0, 0 for { util.Debug("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT * FROM %s WHERE product_id > %d ORDER BY product_id ASC limit 100000", "product_baseinfo", lastid) rows, err := MysqlB.DB.Query(q) if err != nil { log.Error("taskIterateSql1---", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { util.Debug("----finish----------", count) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("taskIterateSql1---", zap.Error(err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["product_id"]) count++ if count%500 == 0 { util.Debug("current-------", count, lastid) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() cid := util.ObjToString(tmp["company_id"]) info := MysqlM.FindOne("dws_f_dealer_baseinfo", map[string]interface{}{"company_id": cid}, "name_id, dealer_name", "") if info == nil || len(*info) == 0 { MongoTool2.Save("product_err_record", map[string]interface{}{"product_id": tmp["product_id"]}) return } saveM := make(map[string]interface{}) saveM["dealer_id"] = (*info)["name_id"] saveM["company_name"] = (*info)["dealer_name"] saveM["sdequipment_code"] = tmp["medical_equipment_code"] saveM["createtime"] = time.Now().Format(util.Date_Full_Layout) delete(tmp, "medical_equipment_code") delete(tmp, "product_id") delete(tmp, "comeintime") saveBasePool <- tmp }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } func method1(tmp map[string]interface{}) map[string]interface{} { m := make(map[string]interface{}) for k, v := range config.Conf.DB.Es.FieldS { if k == "business_model" { info := MysqlB.FindOne("company_business_model", map[string]interface{}{"company_id": tmp["company_id"], "company_field_code": "0101"}, "", "") if info != nil && len(*info) > 0 { m[k] = util.IntAll((*info)["business_model"]) } else { m[k] = 2 } } else if k == "supplier" { m[k] = tmp["company_name"] } else if k == "productlist" { var p = method2(util.ObjToString(tmp["company_id"])) if p != nil { m[k] = p } } else if k == "area_code" { if tmp[k] != nil { m[k] = tmp[k] info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "") if info != nil && len(*info) > 0 { m["area"] = (*info)["area"] if (*info)["city"] != nil { m["city"] = (*info)["city"] } if (*info)["district"] != nil { m["district"] = (*info)["district"] } } } else { log.Error("area_code", zap.Any("id", tmp["company_id"])) } } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" { continue } else { if v == "string" { m[k] = tmp[k] } else { m[k] = util.IntAll(tmp[k]) } } } return m } func method2(cid string) []map[string]interface{} { var pmap []map[string]interface{} mc := make(map[string]bool) // 记录name 去重 pinfo1 := MysqlM.Find("product_baseinfo", map[string]interface{}{"company_id": cid}, "", "", -1, -1) for _, m2 := range *pinfo1 { m := make(map[string]interface{}) m["name"] = m2["product_name"] pmap = append(pmap, m) mc[util.ObjToString(m2["product_name"])] = true } pinfo2, _ := MongoTool.Find("bidding_p_list_0907", map[string]interface{}{"company_id": cid}, nil, nil, false, -1, -1) if len(*pinfo2) > 0 { for _, m2 := range *pinfo2 { key := util.ObjToString(m2["itemname"]) + util.ObjToString(m2["brand"]) + util.ObjToString(m2["model"]) if mc[key] { continue } else { m := make(map[string]interface{}) m["name"] = util.ObjToString(m2["itemname"]) if m2["model"] != nil { m["model"] = m2["model"] } if m2["brand"] != nil { m["brand"] = m2["brand"] } pmap = append(pmap, m) mc[key] = true } } } return pmap } func SaveEs(i, t string) { log.Info("SaveEs---", zap.String("i", i), zap.String("t", t)) arru := make([]map[string]interface{}, 500) indexu := 0 for { select { case v := <-EsSaveCache: arru[indexu] = v indexu++ if indexu == 500 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(i, t, &arru, false) }(arru) arru = make([]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() Es.BulkSave(i, t, &arru, false) }(arru[:indexu]) arru = make([]map[string]interface{}, 500) indexu = 0 } } } } func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", "bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", "bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }