package main import ( "context" "esindex/config" "fmt" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "time" ) //buyerOnce 处理增量数据 func buyerOnce() { if len(specialNames) < 1 { initSpecialNames() } rowsPerPage := 1000 now := time.Now() tarTime := time.Date(now.Year(), now.Month(), now.Day()-1, 00, 00, 00, 00, time.Local) curTime := tarTime.Format("2006-01-02") countSql := fmt.Sprintf(` SELECT count(t.id) FROM dws_f_ent_tags AS t LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id WHERE t.createtime >= '%v' OR t.updatetime >= '%v' ORDER BY t.id DESC LIMIT 1 `, curTime, curTime) dataCounts := Mysql.CountBySql(countSql) if dataCounts > 0 { log.Info("buyerOnce", zap.Any(fmt.Sprintf("总数:%s", curTime), dataCounts)) } else { log.Info("buyerOnce", zap.String(curTime, "没有更新数据")) return } lastid, total := 0, 0 realCount := 0 for { query := fmt.Sprintf(` SELECT b.name, t.id, t.name_id, b.company_id, t.createtime, t.updatetime, c.area, c.city, class.name AS buyerclass FROM dws_f_ent_tags AS t LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id LEFT JOIN code_area AS c ON b.city_code = c.code WHERE (t.createtime >= '%v' OR t.updatetime >= '%v') and ( t.id > %d ) ORDER BY t.id ASC LIMIT %d; `, curTime, curTime, lastid, rowsPerPage) ctx := context.Background() rows, err := Mysql.DB.QueryContext(ctx, query) if err != nil { log.Info("dealData", zap.Any("QueryContext err", err)) } if total >= int(dataCounts) { log.Info("buyerOnce over", zap.Any("total", total), zap.Any("lastid", lastid)) break } columns, err := rows.Columns() if err != nil { log.Info("buyerOnce", zap.Any("rows.Columns", err)) } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) //MySQL 更新 update := map[string]interface{}{} for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Info("buyerOnce", zap.Any("rows.Scan", err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } total++ if total%100 == 0 { log.Info("buyerOnce", zap.Int("current total", total)) } lastid = util.IntAll(ret["id"]) name := util.ObjToString(ret["name"]) //company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里 if util.ObjToString(ret["company_id"]) != "" || specialNames[name] { realCount++ update["status"] = 1 data := make(map[string]interface{}, 0) data["name"] = name data["name_id"] = ret["name_id"] data["id"] = ret["name_id"] data["buyer_name"] = name data["province"] = ret["area"] data["city"] = ret["city"] data["buyerclass"] = ret["buyerclass"] if ret["createtime"] != nil { if createtime, ok := ret["createtime"].(time.Time); ok { data["createtime"] = createtime.Unix() if ret["updatetime"] != nil { if updatetime, ok := ret["updatetime"].(time.Time); ok { data["updatetime"] = updatetime.Unix() } } else { data["updatetime"] = createtime.Unix() } } } sql := fmt.Sprintf(`select count(id) from dws_f_ent_contact where name_id = '%v'`, ret["name_id"]) counts := Mysql.CountBySql(sql) if counts > 0 { data["is_contact"] = true } else { data["is_contact"] = false } //生索引 arrEs := make([]map[string]interface{}, 0) //最终生索引数据 arrEs = append(arrEs, data) err = Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs) if err != nil { log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err)) } } //更新MySQL where := map[string]interface{}{ "name_id": ret["name_id"], } if len(update) > 0 { Mysql.Update("dws_f_ent_tags", where, update) } } rows.Close() if err := rows.Err(); err != nil { log.Info("buyerOnce", zap.Any("err", err)) } } log.Info("buyerOnce", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount)) } //dealSpecialNames 拿到凭安 特企数据 以及爬虫采购单位数据 func initSpecialNames() { sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) query := sess.DB(MgoS.DbName).C("special_enterprise").Find(nil).Iter() count := 0 log.Info("dealSpecialNames", zap.String("special_enterprise", "special_enterprise")) for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if tmp["company_name"] != nil && tmp["company_name"] != "" { specialNames[tmp["company_name"].(string)] = true } } //2. time.Sleep(time.Second) log.Info("dealSpecialNames", zap.String("special_foundation", "special_foundation")) query2 := sess.DB("mixdata").C("special_foundation").Find(nil).Iter() count = 0 for tmp := make(map[string]interface{}); query2.Next(tmp); count++ { if tmp["company_name"] != nil && tmp["company_name"] != "" { specialNames[tmp["company_name"].(string)] = true } } //3. time.Sleep(time.Second) log.Info("dealSpecialNames", zap.String("special_gov_unit", "special_gov_unit")) query3 := sess.DB("mixdata").C("special_gov_unit").Find(nil).Iter() count = 0 for tmp := make(map[string]interface{}); query3.Next(tmp); count++ { if tmp["company_name"] != nil && tmp["company_name"] != "" { specialNames[tmp["company_name"].(string)] = true } } //4. time.Sleep(time.Second) log.Info("dealSpecialNames", zap.String("special_hongkong_company", "special_hongkong_company")) query4 := sess.DB("mixdata").C("special_hongkong_company").Find(nil).Iter() count = 0 for tmp := make(map[string]interface{}); query4.Next(tmp); count++ { if tmp["company_name"] != nil && tmp["company_name"] != "" { specialNames[tmp["company_name"].(string)] = true } } //5. time.Sleep(time.Second) log.Info("dealSpecialNames", zap.String("special_hongkong_company_history", "special_hongkong_company_history")) query5 := sess.DB("mixdata").C("special_hongkong_company_history").Find(nil).Iter() count = 0 for tmp := make(map[string]interface{}); query5.Next(tmp); count++ { if tmp["use_name"] != nil && tmp["use_name"] != "" { specialNames[tmp["use_name"].(string)] = true } } //6. time.Sleep(time.Second) log.Info("dealSpecialNames", zap.String("special_law_office", "special_law_office")) query6 := sess.DB("mixdata").C("special_law_office").Find(nil).Iter() count = 0 for tmp := make(map[string]interface{}); query6.Next(tmp); count++ { if tmp["company_name"] != nil && tmp["company_name"] != "" { specialNames[tmp["company_name"].(string)] = true } } //7. log.Info("dealSpecialNames", zap.String("special_social_organ", "special_social_organ")) query7 := sess.DB("mixdata").C("special_social_organ").Find(nil).Iter() count = 0 for tmp := make(map[string]interface{}); query7.Next(tmp); count++ { if tmp["company_name"] != nil && tmp["company_name"] != "" { specialNames[tmp["company_name"].(string)] = true } } log.Info("dealSpecialNames", zap.String("special_trade_union", "special_trade_union")) //8. query8 := sess.DB("mixdata").C("special_trade_union").Find(nil).Iter() count = 0 for tmp := make(map[string]interface{}); query8.Next(tmp); count++ { if tmp["company_name"] != nil && tmp["company_name"] != "" { specialNames[tmp["company_name"].(string)] = true } } //9. 爬虫采集的采购单位 query9 := sess.DB("mixdata").C("special_buyer_name").Find(nil).Iter() count = 0 for tmp := make(map[string]interface{}); query9.Next(tmp); count++ { if tmp["company_name"] != nil && tmp["company_name"] != "" { specialNames[tmp["company_name"].(string)] = true } } }