|
- package main
- import (
- "fieldproject_common/config"
- "fmt"
- uuid "github.com/satori/go.uuid"
- "github.com/spf13/cobra"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
- "strings"
- "sync"
- "time"
- "unicode/utf8"
- )
- // @Description bidding数据 bid_field
- // @Author J 2022/8/30 09:09
- func bidding() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "bidding",
- Short: "Start processing bidding data",
- Run: func(cmd *cobra.Command, args []string) {
- go updateEsMethod()
- taskBidding()
- },
- }
- //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
- return cmdClient
- }
- // @Description 医疗机构数据
- // @Author J 2022/8/11 16:50
- func institution() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "medical_institution",
- Short: "Start processing medical_institutional data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveEs(config.Conf.DB.Es.IndexM, config.Conf.DB.Es.TypeM)
- taskIterateSql()
- },
- }
- //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
- return cmdClient
- }
- // @Description 产品数据
- // @Author J 2022/8/11 16:49
- func product() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "product",
- Short: "Start processing product data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveFunc("dws_f_product_baseinfo", ProductField)
- //taskIterateSql1()
- taskProduct()
- },
- }
- //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
- return cmdClient
- }
- func dealer() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "dealer",
- Short: "Start processing dealer data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveFunc("dwd_f_yl_dealer_baseinfo_new", DealerField)
- //go SaveFuncRc()
- taskDealer()
- },
- }
- return cmdClient
- }
- func ent() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "ent",
- Short: "Start processing ent data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveFunc("dws_f_ent_pa_baseinfo", EntField)
- taskEnt()
- },
- }
- return cmdClient
- }
- func register() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "register",
- Short: "Start processing register data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveFunc("dws_f_register_baseinfo", RegField)
- taskRegister()
- },
- }
- return cmdClient
- }
- func project() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "project",
- Short: "Start processing project data",
- Run: func(cmd *cobra.Command, args []string) {
- go SaveFunc("dwd_f_yl_purchasing_baseinfo_new", ProjectField)
- //go SaveFunc1("dwd_f_yl_purchasing_win_baseinfo_new", WinerField)
- InitPoCode()
- InitLvCode()
- redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
- redis.InitRedis1("bid_class=172.17.4.189:8379", 7) // class
- taskProject()
- },
- }
- //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
- return cmdClient
- }
- func taskBidding() {
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5a8d7f4840d2d9bbe8962002")}
- query := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%20000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if b := util.ObjToString(tmp["bid_field"]); b != "" {
- updateEsPool <- []map[string]interface{}{{
- "_id": mongodb.BsonIdToSId(tmp["_id"]),
- },
- {"bid_field": b},
- }
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- func taskIterateSql() {
- pool := make(chan bool, 10) //控制线程数
- wg := &sync.WaitGroup{}
- finalId := 0
- lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "institution_baseinfo"))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Info("taskIterateSql---", zap.Int("finally id: ", finalId))
- lastid, count := 0, 0
- for {
- util.Debug("重新查询,lastid---", lastid)
- q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "institution_baseinfo", lastid)
- rows, err := MysqlM.DB.Query(q)
- if err != nil {
- log.Error("taskIterateSql---", zap.Error(err))
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- util.Debug("----finish----------", count)
- break
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Error("taskIterateSql---", zap.Error(err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["id"])
- count++
- if count%500 == 0 {
- util.Debug("current-------", count, lastid)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- EsSaveCache <- method(tmp)
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- wg.Wait()
- }
- }
- func taskDealer() {
- sess := MongoTool2.GetMgoConn()
- defer MongoTool2.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_company_info").Find(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%20000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- saveM := make(map[string]interface{})
- record := make(map[string]interface{})
- for _, f := range DealerField {
- if f == "name_id" && util.ObjToString(tmp["name_id"]) == "" {
- name_id := strings.ReplaceAll(uuid.NewV4().String(), "-", "")
- saveM[f] = name_id
- saveM["exists_id"] = 0
- record = map[string]interface{}{"name_id": name_id, "name": tmp["company_name"], "type": 2, "createtime": time.Now().Format(util.Date_Full_Layout)}
- } else if f == "dealer_name" {
- saveM[f] = tmp["company_name"]
- } else if f == "area_code" {
- if tmp["area"] != nil {
- saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
- }
- } else if f == "city_code" {
- if tmp["area"] != nil && tmp["city"] != nil {
- c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
- saveM[f] = AreaCode[c]
- }
- } else if f == "district_code" {
- if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
- c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
- saveM[f] = AreaCode[c]
- }
- } else if f == "business_model" {
- saveM[f] = tmp["business_type"]
- } else if f == "capital" {
- text := util.ObjToString(tmp["capital"])
- capital := ObjToMoney(text)
- capital = capital / 10000
- if capital != 0 {
- capital, _ = util.FormatFloat(capital, 2)
- } else {
- capital = 0
- }
- saveM[f] = capital
- if capital < 100 {
- saveM["capital_code"] = 1
- } else if capital >= 100 && capital < 500 {
- saveM["capital_code"] = 2
- } else if capital >= 500 && capital < 1000 {
- saveM["capital_code"] = 3
- } else if capital >= 1000 {
- saveM["capital_code"] = 4
- }
- } else if f == "createtime" {
- saveM[f] = time.Now().Format(util.Date_Full_Layout)
- } else if f == "website" {
- tid := util.ObjToString(tmp["company_id"])
- std, _ := MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": tid}, map[string]interface{}{"website_url": 1})
- if std != nil && len(*std) > 0 && len(util.ObjToString((*std)["website_url"])) <= 255 {
- saveM[f] = util.ObjToString((*std)["website_url"])
- }
- } else {
- if tmp[f] != nil {
- saveM[f] = tmp[f]
- }
- }
- }
- if len(record) > 0 {
- saveRcPool <- record
- }
- saveBasePool <- saveM
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- func taskEnt() {
- sess := MongoTool2.GetMgoConn()
- defer MongoTool2.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_company_info").Find(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%20000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- saveM := make(map[string]interface{})
- for _, f := range EntField {
- if f == "area_code" {
- if tmp["area"] != nil {
- saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
- }
- } else if f == "issue_date" {
- if util.ObjToString(tmp[f]) != "" {
- saveM[f] = tmp[f]
- }
- } else if f == "sourcetype" {
- saveM[f] = 1
- } else if f == "createtime" || f == "updatetime" {
- saveM[f] = time.Now().Format(util.Date_Full_Layout)
- } else if f == "website" {
- tid := util.ObjToString(tmp["company_id"])
- std, _ := MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": tid}, map[string]interface{}{"website_url": 1})
- if std != nil && len(*std) > 0 && len(util.ObjToString((*std)["website_url"])) <= 255 {
- saveM[f] = util.ObjToString((*std)["website_url"])
- }
- } else {
- if tmp[f] != nil {
- saveM[f] = tmp[f]
- }
- }
- }
- saveBasePool <- saveM
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- func taskRegister() {
- sess := MongoTool2.GetMgoConn()
- defer MongoTool2.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("nmpa_company").Find(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%20000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- saveM := make(map[string]interface{})
- for _, f := range RegField {
- if f == "dealer_id" {
- name := util.ObjToString(tmp["company"])
- info := MysqlM.FindOne("dws_f_dealer_baseinfo", map[string]interface{}{"dealer_name": name}, "name_id", "")
- if info == nil || len(*info) == 0 {
- return
- }
- saveM[f] = (*info)["name_id"]
- saveM["website"] = (*info)["website"]
- } else if f == "company_name" {
- saveM[f] = tmp["company"]
- } else if f == "regnum" {
- if tmp["reg_no"] != nil {
- saveM[f] = tmp["reg_no"]
- }
- } else if f == "scope" {
- if util.ObjToString(tmp["class"]) == "生产型" {
- saveM[f] = tmp["product_range"]
- } else if util.ObjToString(tmp["class"]) == "经营型" {
- saveM[f] = tmp["business_range"]
- }
- } else if f == "type" {
- if util.ObjToString(tmp[f]) == "备案企业" {
- saveM[f] = 2
- } else if util.ObjToString(tmp[f]) == "许可企业" {
- saveM[f] = 1
- }
- } else if f == "approve_depart" {
- saveM[f] = tmp["badw"]
- } else if f == "approve_date" {
- if util.ObjToString(tmp["barq"]) != "" && util.ObjToString(tmp["barq"]) != "null" {
- saveM[f] = tmp["barq"]
- }
- } else if f == "validity_date" {
- if util.ObjToString(tmp["yxqx"]) != "" && util.ObjToString(tmp["yxqx"]) != "null" {
- saveM[f] = tmp["yxqx"]
- }
- } else if f == "type_address" {
- if util.ObjToString(tmp["class"]) == "生产型" {
- saveM[f] = tmp["product_address"]
- } else if util.ObjToString(tmp["class"]) == "经营型" {
- saveM[f] = tmp["business_address"]
- }
- } else if f == "createtime" {
- saveM[f] = time.Now().Format(util.Date_Full_Layout)
- } else {
- if tmp[f] != nil {
- saveM[f] = tmp[f]
- }
- }
- }
- saveBasePool <- saveM
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- func taskProduct() {
- sess := MongoTool2.GetMgoConn()
- defer MongoTool2.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_product_info").Find(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%20000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- saveM := make(map[string]interface{})
- for _, f := range ProductField {
- if f == "dealer_id" {
- name := util.ObjToString(tmp["company_name"])
- info := MysqlM.FindOne("dws_f_dealer_baseinfo", map[string]interface{}{"dealer_name": name}, "name_id", "")
- if info == nil || len(*info) == 0 {
- MongoTool2.Save("product_err_record", map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"])})
- return
- }
- saveM[f] = (*info)["name_id"]
- } else if f == "make_country" || f == "regist_type" {
- saveM[f] = util.IntAll(tmp[f])
- } else if f == "createtime" {
- saveM[f] = time.Now().Format(util.Date_Full_Layout)
- } else if f == "medical_equipment_class1" {
- saveM[f] = tmp["product_class1"]
- } else if f == "medical_equipment_class2" {
- saveM[f] = tmp["product_class2"]
- } else if f == "medical_equipment_class3" {
- saveM[f] = tmp["product_class3"]
- } else if f == "sdproduct_name" {
- saveM[f] = tmp["product_class4"]
- } else if f == "sdequipment_code" {
- if len(util.ObjToString(tmp["product_code"])) > 7 {
- saveM[f] = util.ObjToString(tmp["product_code"])[:7]
- } else {
- saveM[f] = tmp["product_code"]
- }
- } else {
- if tmp[f] != nil {
- saveM[f] = tmp[f]
- }
- }
- }
- saveBasePool <- saveM
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- func taskProject() {
- sess := MongoTool1.GetMgoConn()
- defer MongoTool1.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- query := sess.DB(config.Conf.DB.Mongo1.Dbname).C("projectset_medical").Find(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%20000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if tmp["jg_plist"] == nil {
- return
- }
- saveM := make(map[string]interface{})
- infoid := util.ObjToString(tmp["sourceinfoid"])
- saveM["projectid"] = mongodb.BsonIdToSId(tmp["_id"])
- saveM["infoid"] = infoid
- saveM["jyhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
- saveM["bidstatus"] = Bidstatus[util.ObjToString(tmp["bidstatus"])]
- saveM["bidstype"] = Bidtype[util.ObjToString(tmp["bidstype"])]
- saveM["updatetime"] = time.Now().Format(util.Date_Full_Layout)
- saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
- if tmp["budget"] != nil {
- saveM["budget"], _ = util.FormatFloat(util.Float64All(tmp["budget"]), 4)
- }
- if tmp["area"] != nil {
- saveM["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
- }
- if tmp["area"] != nil && tmp["city"] != nil {
- c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
- saveM["city_code"] = AreaCode[c]
- }
- if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
- c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
- saveM["district_code"] = AreaCode[c]
- }
- for _, f := range []string{"title", "projectname", "projectcode", "purchasing", "agency", "buyer"} {
- if f == "purchasing" {
- if utf8.RuneCountInString(util.ObjToString(tmp[f])) < 20000 {
- saveM[f] = tmp[f]
- }
- } else {
- if util.ObjToString(tmp[f]) != "" {
- saveM[f] = tmp[f]
- }
- }
- }
- if b := util.ObjToString(tmp["buyer"]); b != "" {
- if eid := redis.GetStr("ent_id", b); eid != "" {
- saveM["buyer_id"] = strings.Split(eid, "_")[0]
- saveM["mi_area_code"] = strings.Split(eid, "_")[1]
- if cd := LvCode[strings.Split(eid, "_")[0]]; cd != "" {
- saveM["mi_level_code"] = cd
- }
- }
- }
- if a := util.ObjToString(tmp["agency"]); a != "" {
- if eid := redis.GetStr("ent_id", a); eid != "" {
- saveM["agency_id"] = strings.Split(eid, "_")[0]
- }
- }
- for _, f := range []string{"bidopentime", "zbtime", "jgtime"} {
- if util.IntAll(tmp[f]) > 0 {
- t := util.Int64All(tmp[f])
- saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- }
- if t := util.Int64All(tmp["jgtime"]); t > 0 {
- y := time.Unix(t, 0).Year()
- m := time.Unix(t, 0).Month().String()
- saveM["year_tags"] = y
- saveM["month_tags"] = Month[m]
- saveM["quarter_tags"] = Quarter[m]
- }
- pname := make(map[string]bool) // 标的物名称 去重
- for _, p := range tmp["jg_plist"].([]interface{}) {
- p1 := p.(map[string]interface{})
- if name := util.ObjToString(p1["itemname"]); name != "" && !pname[name] {
- if utf8.RuneCountInString(name) > 255 {
- continue
- }
- pname[name] = true
- saveM1 := util.DeepCopy(saveM).(map[string]interface{})
- saveM1["itemname"] = name
- if code := redis.GetStr("bid_class", fmt.Sprintf("%s_%s", infoid, name)); code != "" {
- if len(code) > 7 {
- saveM1["sdequipment_code"] = code[:7]
- } else {
- saveM1["sdequipment_code"] = code
- }
- for k, v := range PclassCode[code] {
- if k == "class_1" {
- saveM1["medical_equipment_class1"] = v
- } else if k == "class_2" {
- saveM1["medical_equipment_class2"] = v
- } else if k == "class_3" {
- saveM1["medical_equipment_class3"] = v
- } else if k == "class_4" {
- saveM1["sdproduct_name"] = v
- }
- }
- }
- if p1["brandname"] != nil {
- saveM1["brandname"] = p1["brandname"]
- }
- if p1["specs"] != nil {
- saveM1["specs"] = p1["specs"]
- }
- if p1["model"] != nil && utf8.RuneCountInString(util.ObjToString(p1["model"])) < 200 {
- saveM1["model"] = p1["model"]
- }
- if p1["unitname"] != nil {
- saveM1["unitname"] = p1["unitname"]
- }
- if p1["number"] != nil && util.IntAll(p1["number"]) < 100000 {
- saveM1["number"] = util.IntAll(p1["number"])
- }
- if p1["unitprice"] != nil && util.Float64All(p1["unitprice"]) < 100000000 {
- saveM1["unitprice"], _ = util.FormatFloat(util.Float64All(p1["unitprice"]), 2)
- }
- if p1["totalprice"] != nil {
- saveM1["totalprice"], _ = util.FormatFloat(util.Float64All(p1["totalprice"]), 2)
- }
- saveBasePool <- saveM1
- }
- }
- // 中标信息
- saveW := make(map[string]interface{})
- if w := util.ObjToString(tmp["winner"]); w != "" {
- saveW["winner"] = w
- if eid := redis.GetStr("ent_id", w); eid != "" {
- saveW["winner_id"] = strings.Split(eid, "_")[0]
- saveW["winner_area_code"] = strings.Split(eid, "_")[1]
- if ccode := strings.Split(eid, "_")[2]; ccode != "" {
- saveW["winner_city_code"] = ccode
- }
- }
- if util.ObjToString(tmp["winnertel"]) != "" {
- saveW["contact_tel"] = tmp["winnertel"]
- }
- if util.ObjToString(tmp["winnerperson"]) != "" {
- saveW["contact_name"] = tmp["winnerperson"]
- }
- saveW["projectid"] = mongodb.BsonIdToSId(tmp["_id"])
- saveW["infoid"] = util.ObjToString(tmp["sourceinfoid"])
- if tmp["bidamount"] != nil {
- saveW["bidamount"], _ = util.FormatFloat(util.Float64All(tmp["bidamount"]), 4)
- }
- if util.IntAll(tmp["jgtime"]) > 0 {
- t := util.Int64All(tmp["jgtime"])
- saveW["jgtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- saveW["is_winner"] = 1
- saveW["updatetime"] = time.Now().Format(util.Date_Full_Layout)
- saveW["createtime"] = time.Now().Format(util.Date_Full_Layout)
- saveBasePool1 <- saveW
- }
- //中标候选
- if tmp["winnerorder"] != nil {
- saveW1 := make(map[string]interface{})
- for _, w := range tmp["winnerorder"].([]interface{}) {
- if util.ObjToString(w) == "" {
- continue
- }
- saveW1["winner"] = w
- if eid := redis.GetStr("ent_id", util.ObjToString(w)); eid != "" {
- saveW1["winner_id"] = strings.Split(eid, "_")[0]
- saveW1["winner_area_code"] = strings.Split(eid, "_")[1]
- if ccode := strings.Split(eid, "_")[2]; ccode != "" {
- saveW1["winner_city_code"] = ccode
- }
- }
- saveW1["projectid"] = mongodb.BsonIdToSId(tmp["_id"])
- saveW1["infoid"] = util.ObjToString(tmp["sourceinfoid"])
- if tmp["bidamount"] != nil {
- saveW1["bidamount"], _ = util.FormatFloat(util.Float64All(tmp["bidamount"]), 4)
- }
- if util.IntAll(tmp["jgtime"]) > 0 {
- t := util.Int64All(tmp["jgtime"])
- saveW["jgtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- saveW["is_winner"] = 2
- saveW["updatetime"] = time.Now().Format(util.Date_Full_Layout)
- saveW["createtime"] = time.Now().Format(util.Date_Full_Layout)
- saveBasePool1 <- saveW
- }
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- func method(tmp map[string]interface{}) map[string]interface{} {
- m := make(map[string]interface{})
- for k, v := range config.Conf.DB.Es.FieldM {
- if k == "alias" {
- var arr []string
- info := MysqlM.Find("institution_alias", map[string]interface{}{"company_id": tmp["company_id"]}, "", "", -1, -1)
- for _, m2 := range *info {
- arr = append(arr, util.ObjToString(m2["alias"]))
- }
- if len(arr) > 0 {
- m[k] = strings.Join(arr, ",")
- }
- } else if k == "sdequipment" {
- if util.ObjToString(tmp["mi_type_code"]) == "0208" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
- m[k] = "多功能电离子手术治疗机,CO2激光治疗仪,半导体激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针、电刀、电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机"
- } else if util.ObjToString(tmp["mi_type_code"]) == "0201" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
- m[k] = "牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机"
- } else if util.ObjToString(tmp["mi_type_code"]) == "0203" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
- m[k] = "小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,胆红素测定仪,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,经皮给药治疗仪,儿童智能测量仪"
- } else if util.ObjToString(tmp["mi_type_code"]) == "12" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
- m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏起搏器,心脏除颤器,心脏复苏机,呼吸机,儿童用呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,血压计,体温计,体重计,空气消毒机"
- } else if util.ObjToString(tmp["mi_type_code"]) == "01" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
- m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏除颤器,心脏复苏机,呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,空气消毒机,动态心电监测系统,有创呼吸机,动态血压监护仪,便携式血氧饱和度监护仪,多导睡眠呼吸监测仪,床边肺功能仪,便携式血气分析仪,心肺功能监测仪,胃动力检测仪,胃电治疗仪,腹水超滤仪,腹水浓缩机,人工肝,肝病治疗仪,床单位臭氧消毒机,结肠灌洗治疗仪,经皮肾镜,动态血糖监测仪,胰岛素泵,糖尿病足病诊断箱,眼底镜,空气波压力治疗仪,动态脑电监护仪,颅内压监测仪,脑水肿监测仪,半导体激光治疗仪,钴60放射治疗机,超声聚焦刀,氩氦刀,射频肿瘤治疗仪,微波热疗仪,双筒显微镜,相差显微镜,荧光显微镜,倒置显微镜,骨髓活检装置,流式细胞分析仪,细胞分离机,脑细胞介质分析仪,双光能骨密度仪,脑电超慢涨落分析仪,动脉硬化测试仪,移动式负压吸引器,换药床,乳腺微创真空旋切系统,外碎石设备,骨科牵引床,脊柱牵引床,推拿手法床,石膏床,石膏剪,石膏锯,水温箱,足底静脉泵,激光治疗仪,骨科康复设备,吸引设备,供氧设备,监护设备,呼叫系统,心脏除颤仪,抢救车,换药车,转运床,营养输注泵,防褥疮气垫,血压计,体温计,体重计,移动紫外线灯,负压病房设施,层流病房设施,心脏起搏器,便携式呼吸机,雾化器,床边支气管镜,血液动力学检测仪,电冰毯,电子冰帽,脑电图监测仪,脑功能监测仪,振动排痰器,床单元臭氧消毒机,层流净化系统,holter,主动脉球囊反搏泵,便携式超声诊断仪,食道电生理仪,诊察床,肛管直肠压力测定设备,肛门镜,肛门坐浴熏洗设备,结肠灌洗设备,肛肠综合治疗仪,痔科套扎器,肛肠内腔治疗仪,肛门肌电图,多功能电离子手术治疗机,CO2激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针,电刀,电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机,母婴监护仪,妇科检查台,计划生育手术床,冲洗车,阴道镜,人流吸引器,超声诊断仪,超高频电波刀,超声聚焦治疗仪,盆腔炎治疗设备,产后康复综合治疗仪,胎心监护仪,妇科检查床,综合产床,新生儿抢救台,婴儿辐射保暖台,婴儿培养箱,电动羊水吸引器,经皮给药治疗仪,胆红素测定仪,产妇电脑综合治疗仪,消毒隔离器械柜,小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,儿童智能测量仪,眼科治疗床,裂隙灯,眼压计,角膜曲率计,视力灯箱,客观视力仪,电脑验光仪,全自动电脑视野仪,手术显微镜,眼科AB超声仪,超声乳化治疗仪,眼底荧光造影仪,自动焦度仪,沙眼治疗仪,睫毛电解器,视觉诱发电位仪,耳鼻喉综合治疗台,耳科旋转椅,鼓气电窥耳镜,耳钻,动态喉镜,纤维喉镜,间接喉镜,直接喉镜,支撑喉镜,电子喉镜,鼻咽喉镜,间接鼻咽喉镜,前鼻镜,鼻内镜及手术系统,电测听器,前庭检查仪,眼震电图仪,声阻抗仪,五官科多用显微镜,牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机,普通X光机,洗片机,透视机,移动式X光机,数字X线摄影,CR,DR,干式激光相机,X线电子计算机断层扫描装置,CT,数字减影血管造影X光机,DSA,数字化胃肠X光机,数据传输系统,LIS,全自动血细胞分析仪,尿液分析仪,尿沉渣工作站,冰点渗透压计,凝血检测仪,血糖测定仪,微量血糖测定仪,血气分析仪,干式生化分析仪,生化分析仪,发光免疫分析仪,全自动酶免免疫分析系统,酶标仪,电泳分析仪,血小板聚集仪,全自动细菌培养系统,生物培养箱,微生物鉴定药敏分析仪,血培养仪,菌落计数器,厌氧菌培养箱,幽门螺旋杆菌检测仪,氨基酸分析系统,荧光定量PCR检测系统,TCT液基细胞学检测仪,HPV-DNA检测系统,心梗三项检测仪,脑钠肽检测仪,二氧化碳培养箱,高温灭菌器,生物安全柜,血液流变仪,普通显微镜,生物显微镜,血沉仪,蛋白电泳仪,特种蛋白仪,电解质分析仪,精子分析系统,血栓弹力分析仪,血型鉴定及配血设备,纯水系统,自动洗板机,分析天平,超声清洗器,振荡器,电热培养箱,恒温水浴箱,医用冷库,医用冷藏柜,超低温冰柜,普通离心机,低速冷冻离心机,高速冷冻离心机,快速血糖仪微量泵,输液泵营养,输注泵,负压病房设施,层流病房设施,有创呼吸机便携式呼吸机,心电图机动态心电监测系统,微量泵输液泵,食道电生理仪床,单元臭氧消毒机,肛管直肠,压力测定设备,超声聚焦治疗仪,盆腔炎治疗设备,裂隙灯眼压计,眼底镜角膜曲率计视力,灯箱客观视力仪,睫毛电解器,鼓气电窥耳镜耳钻,动态喉镜纤维喉镜,间接喉镜直接喉镜,前鼻镜鼻,内镜及手术系统,电测听器前庭检查仪,抛光机氦氖激光器,光敏固化灯种植机,喷砂机铸造机石膏振荡器,干燥箱全瓷/铸造烤瓷设备,普通X光机,透视机移动式X光机,X线电子计算机,数字化胃肠X光机,数据传输系统(LIS)全自动血细胞分析仪,尿沉渣工作站冰点渗透压计,全自动细菌培养系统,生物培养箱,血沉仪蛋白电泳仪,分析天平超声清洗器,干燥箱医用冰箱,心脏复苏机,营养输注泵,血透机,自动腹透机,外碎石设备,核磁共振仪,核磁共振成像系统"
- }
- info1, _ := MongoTool.FindOneByField("20220906shebei_buchong", map[string]interface{}{"company_id": tmp["company_id"]}, map[string]interface{}{"itemname_all": 1})
- if info1 != nil && len(*info1) > 0 {
- if m[k] != nil {
- m[k] = fmt.Sprintf("%s,%s", m[k], (*info1)["itemname_all"])
- } else {
- m[k] = (*info1)["itemname_all"]
- }
- }
- info := MysqlM.FindOne("code_sdleveltypeequip", map[string]interface{}{"code": tmp["sdequipment_code"]}, "", "")
- if info != nil && len(*info) > 0 {
- if m[k] != nil {
- m[k] = fmt.Sprintf("%s,%s", m[k], (*info)["equipment"])
- } else {
- m[k] = (*info)["equipment"]
- }
- }
- } else if k == "area_code" {
- m[k] = tmp[k]
- info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "")
- if info != nil && len(*info) > 0 {
- m["area"] = (*info)["area"]
- if (*info)["city"] != nil {
- m["city"] = (*info)["city"]
- }
- if (*info)["district"] != nil {
- m["district"] = (*info)["district"]
- }
- }
- } else if k == "mi_type_code" {
- m[k] = string([]byte(util.ObjToString(tmp[k]))[:2])
- } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" {
- continue
- } else {
- if v == "string" {
- m[k] = tmp[k]
- } else {
- m[k] = util.IntAll(tmp[k])
- }
- }
- }
- return m
- }
- func taskIterateSql1() {
- pool := make(chan bool, 10) //控制线程数
- wg := &sync.WaitGroup{}
- finalId := 0
- lastInfo := MysqlB.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY product_id DESC LIMIT 1", "product_baseinfo"))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["product_id"])
- }
- log.Info("taskIterateSql1---", zap.Int("finally id", finalId))
- lastid, count := 0, 0
- for {
- util.Debug("重新查询,lastid---", lastid)
- q := fmt.Sprintf("SELECT * FROM %s WHERE product_id > %d ORDER BY product_id ASC limit 100000", "product_baseinfo", lastid)
- rows, err := MysqlB.DB.Query(q)
- if err != nil {
- log.Error("taskIterateSql1---", zap.Error(err))
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- util.Debug("----finish----------", count)
- break
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Error("taskIterateSql1---", zap.Error(err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["product_id"])
- count++
- if count%500 == 0 {
- util.Debug("current-------", count, lastid)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- cid := util.ObjToString(tmp["company_id"])
- info := MysqlM.FindOne("dws_f_dealer_baseinfo", map[string]interface{}{"company_id": cid}, "name_id, dealer_name", "")
- if info == nil || len(*info) == 0 {
- MongoTool2.Save("product_err_record", map[string]interface{}{"product_id": tmp["product_id"]})
- return
- }
- saveM := make(map[string]interface{})
- saveM["dealer_id"] = (*info)["name_id"]
- saveM["company_name"] = (*info)["dealer_name"]
- saveM["sdequipment_code"] = tmp["medical_equipment_code"]
- saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
- delete(tmp, "medical_equipment_code")
- delete(tmp, "product_id")
- delete(tmp, "comeintime")
- saveBasePool <- tmp
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- wg.Wait()
- }
- }
- func method1(tmp map[string]interface{}) map[string]interface{} {
- m := make(map[string]interface{})
- for k, v := range config.Conf.DB.Es.FieldS {
- if k == "business_model" {
- info := MysqlB.FindOne("company_business_model", map[string]interface{}{"company_id": tmp["company_id"], "company_field_code": "0101"}, "", "")
- if info != nil && len(*info) > 0 {
- m[k] = util.IntAll((*info)["business_model"])
- } else {
- m[k] = 2
- }
- } else if k == "supplier" {
- m[k] = tmp["company_name"]
- } else if k == "productlist" {
- var p = method2(util.ObjToString(tmp["company_id"]))
- if p != nil {
- m[k] = p
- }
- } else if k == "area_code" {
- if tmp[k] != nil {
- m[k] = tmp[k]
- info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "")
- if info != nil && len(*info) > 0 {
- m["area"] = (*info)["area"]
- if (*info)["city"] != nil {
- m["city"] = (*info)["city"]
- }
- if (*info)["district"] != nil {
- m["district"] = (*info)["district"]
- }
- }
- } else {
- log.Error("area_code", zap.Any("id", tmp["company_id"]))
- }
- } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" {
- continue
- } else {
- if v == "string" {
- m[k] = tmp[k]
- } else {
- m[k] = util.IntAll(tmp[k])
- }
- }
- }
- return m
- }
- func method2(cid string) []map[string]interface{} {
- var pmap []map[string]interface{}
- mc := make(map[string]bool) // 记录name 去重
- pinfo1 := MysqlM.Find("product_baseinfo", map[string]interface{}{"company_id": cid}, "", "", -1, -1)
- for _, m2 := range *pinfo1 {
- m := make(map[string]interface{})
- m["name"] = m2["product_name"]
- pmap = append(pmap, m)
- mc[util.ObjToString(m2["product_name"])] = true
- }
- pinfo2, _ := MongoTool.Find("bidding_p_list_0907", map[string]interface{}{"company_id": cid}, nil, nil, false, -1, -1)
- if len(*pinfo2) > 0 {
- for _, m2 := range *pinfo2 {
- key := util.ObjToString(m2["itemname"]) + util.ObjToString(m2["brand"]) + util.ObjToString(m2["model"])
- if mc[key] {
- continue
- } else {
- m := make(map[string]interface{})
- m["name"] = util.ObjToString(m2["itemname"])
- if m2["model"] != nil {
- m["model"] = m2["model"]
- }
- if m2["brand"] != nil {
- m["brand"] = m2["brand"]
- }
- pmap = append(pmap, m)
- mc[key] = true
- }
- }
- }
- return pmap
- }
- func SaveEs(i, t string) {
- log.Info("SaveEs---", zap.String("i", i), zap.String("t", t))
- arru := make([]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-EsSaveCache:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- Es.BulkSave(i, arru)
- }(arru)
- arru = make([]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- Es.BulkSave(i, arru)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|