maxiaoshan 5 年之前
父节点
当前提交
8fa89c8b8b
共有 4 个文件被更改,包括 166 次插入71 次删除
  1. 4 4
      qyxy/src/config.json
  2. 20 17
      qyxy/src/main.go
  3. 13 0
      qyxy/src/mgoutil/mongodb/mongodb.go
  4. 129 50
      qyxy/src/task.go

+ 4 - 4
qyxy/src/config.json

@@ -1,10 +1,10 @@
 {
 	"mgodb": "192.168.3.207:27092",
-	"dbsize": 10,
+	"dbsize": 12,
 	"dbname": "mxs",
-	"dbcoll": "test",
+	"dbcoll": "qyxy",
 	"savecoll": "qyxy_std",
-	"tasktime": 12,
+	"tasktime": 1,
 	"updatetime": 1597202468,
 	"elastic": {
         "addr": "http://192.168.3.128:9800",
@@ -12,6 +12,6 @@
         "itype": "qyxy",
         "pool": 12,
         "fields": ["cancel_reason","revoke_reason","cancel_size","partner_size","punish_size","operation_size","illegal_size","annual_report_size"],
-        "esfields":["company_name","history_name","company_code","credit_no","org_code","tax_code","area_code","company_type","legal_person","legal_person_certno","establish_date","lastupdatetime","capital","operation_startdate","operation_enddate","authority","issue_date","company_status","company_address","business_scope","cancel_date","revoke_date"]
+        "esfields":["company_name","history_name","org_code","tax_code","company_type","legal_person","legal_person_certno","establish_date","lastupdatetime","capital","operation_startdate","operation_enddate","authority","issue_date","company_status","company_address","business_scope","cancel_date","revoke_date"]
     }
 }

+ 20 - 17
qyxy/src/main.go

@@ -7,20 +7,22 @@ import (
 )
 
 var (
-	Sysconfig  map[string]interface{} //配置文件
-	Mgo        *mgoutil.MongodbSim
-	Dbname     string
-	Dbcoll     string
-	Savecoll   string
-	Es         *es.Elastic
-	Index      string
-	Itype      string
-	EsFields   []string
-	Fields     []string
-	AddressMap map[string]*City
-	QyStypeMap map[string]string
-	TaskTime   int
-	Updatetime int64
+	Sysconfig        map[string]interface{} //配置文件
+	Mgo              *mgoutil.MongodbSim
+	Dbname           string
+	Dbcoll           string
+	Savecoll         string
+	Es               *es.Elastic
+	Index            string
+	Itype            string
+	EsFields         []string
+	Fields           []string
+	AddressMap       map[string]*City
+	AddressOldMap    map[string]*City
+	QyStypeMap       map[string]string
+	CompanyStatusMap map[string]string
+	TaskTime         int
+	//Updatetime       int64
 )
 var EsSaveCache = make(chan map[string]interface{}, 1000)
 var SP = make(chan bool, 5)
@@ -53,14 +55,15 @@ func init() {
 	go SaveEs()
 	//初始化其他信息
 	TaskTime = qu.IntAll(Sysconfig["tasktime"])
-	Updatetime = qu.Int64All(Sysconfig["updatetime"])
+	//Updatetime = qu.Int64All(Sysconfig["updatetime"])
 	InitAddress()
 	InitQyStype()
+	InitCompanyStatus()
 }
 
 func main() {
-	//go TimeTask()
-	QyxyStandard(map[string]interface{}{})
+	go TimeTask()
+	//QyxyStandard()
 	ch := make(chan bool, 1)
 	<-ch
 }

+ 13 - 0
qyxy/src/mgoutil/mongodb/mongodb.go

@@ -345,6 +345,19 @@ func (m *MongodbSim) Del(c string, q interface{}) bool {
 	return true
 }
 
+//删除表
+func (m *MongodbSim) DelColl(c string) bool {
+	defer catch()
+	m.Open()
+	defer m.Close()
+	err := m.C.Database(m.DbName).Collection(c).Drop(m.Ctx)
+	if err != nil {
+		log.Println("删除错误", err.Error())
+		return false
+	}
+	return true
+}
+
 //按条件更新
 func (m *MongodbSim) Update(c string, q, u interface{}, upsert bool, multi bool) bool {
 	defer catch()

+ 129 - 50
qyxy/src/task.go

@@ -24,6 +24,9 @@ var FieldListMap = map[string]map[string]bool{
 //全部字段
 var AllFieldListMap = []string{"punishes", "operations", "illegals"}
 
+//地区处理
+var AreaFiled = []string{"credit_no", "company_code", "area_code"}
+
 // var AllFieldListMap = map[string]string{
 // 	"punishes":   "punish_size",
 // 	"operations": "operation_size",
@@ -48,7 +51,8 @@ type City struct {
 func TimeTask() {
 	//StartTask()
 	c := cron.New()
-	cronstr := "0 0 */" + fmt.Sprint(TaskTime) + " * * ?" //每TaskTime小时执行一次
+	cronstr := "0 0 15 ? * Mon,Tue" //每周一周二15点执行
+	//cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次
 	c.AddFunc(cronstr, func() { StartTask() })
 	c.Start()
 }
@@ -56,24 +60,25 @@ func TimeTask() {
 //开始任务
 func StartTask() {
 	log.Println("Start Task...")
-	query := map[string]interface{}{
-		"updatetime": map[string]interface{}{
-			"$gt": Updatetime,
-		},
-	}
-	// one, _ := Mgo.FindOne(Dbcoll, query)
-	// if len(*one) > 0 { //有数据
-	run := QyxyStandard(query)
+	// query := map[string]interface{}{
+	// 	"updatetime": map[string]interface{}{
+	// 		"$gt": Updatetime,
+	// 	},
+	// }
+	run := QyxyStandard()
 	if run {
-		Updatetime = time.Now().Unix() //更新下次查询时间
+		time.Sleep(5 * time.Minute)
+		if Mgo.DelColl(Dbcoll) {
+			log.Println("Delete Coll ", Dbcoll, "Success")
+		} else {
+			log.Println("Delete Coll ", Dbcoll, "Fail")
+		}
 	}
-	//}
 }
 
 //标准化数据,生索引
-func QyxyStandard(query map[string]interface{}) bool {
+func QyxyStandard() bool {
 	defer qu.Catch()
-	log.Println("query:", query)
 	sess := Mgo.GetMgoConn()
 	defer Mgo.DestoryMongoConn(sess)
 
@@ -81,15 +86,15 @@ func QyxyStandard(query map[string]interface{}) bool {
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{} //控制读写
 	arr := [][]map[string]interface{}{}
-	// count, _ := sess.DB(Dbname).C(Dbcoll).Find(query).Count()
-	// log.Println("共查询:", count, "条")
-	// if count == 0 {
-	// 	return false
-	// }
-	it := sess.DB(Dbname).C(Dbcoll).Find(query).Iter()
+	count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
+	log.Println("共查询:", count, "条")
+	if count == 0 {
+		return false
+	}
+	it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
 	sum := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
-		if sum%10000 == 0 {
+		if sum%100 == 0 {
 			log.Println("current:", sum)
 		}
 		pool <- true
@@ -108,6 +113,49 @@ func QyxyStandard(query map[string]interface{}) bool {
 			esMap["_id"] = _id
 			esMap["updatetime"] = time.Now().Unix()
 			update = append(update, map[string]interface{}{"_id": _id})
+			//地区处理
+			hadArea := false //标记是否有省份信息
+			for i, field := range AreaFiled {
+				if tmp[field] == nil {
+					continue
+				}
+				if code := fmt.Sprint(tmp[field]); code != "" {
+					esMap[field] = code //加入esMap
+					if !hadArea {
+						if i == 0 && len(code) >= 8 { //credit_no企业信用代码
+							code = code[2:8]
+						} else if i == 1 && len(code) >= 6 { //company_code注册号
+							code = code[:6]
+						}
+						if city := AddressMap[code]; city != nil { //未作废中取
+							if city.Province != "" {
+								esMap["company_area"] = city.Province //省
+							}
+							if city.City != "" {
+								esMap["company_city"] = city.City //市
+							}
+							if city.District != "" {
+								esMap["company_district"] = city.District //县
+							}
+						} else { //作废中取
+							if city := AddressOldMap[code]; city != nil {
+								if city.Province != "" {
+									esMap["company_area"] = city.Province //省
+								}
+								if city.City != "" {
+									esMap["company_city"] = city.City //市
+								}
+								if city.District != "" {
+									esMap["company_district"] = city.District //县
+								}
+							}
+						}
+						if esMap["company_area"] != nil {
+							hadArea = true
+						}
+					}
+				}
+			}
 			//生索引字段处理
 			for _, field := range EsFields {
 				if tmp[field] == nil {
@@ -124,20 +172,6 @@ func QyxyStandard(query map[string]interface{}) bool {
 					if capital != 0 {
 						esMap[field] = capital //注册资本
 					}
-				} else if field == "area_code" { //地区处理
-					code := fmt.Sprint(tmp[field])
-					esMap[field] = code //行政区划代码
-					if city := AddressMap[code]; city != nil {
-						if city.Province != "" {
-							esMap["company_area"] = city.Province //省
-						}
-						if city.City != "" {
-							esMap["company_city"] = city.City //市
-						}
-						if city.District != "" {
-							esMap["company_district"] = city.District //县
-						}
-					}
 				} else if field == "company_type" { //企业类型处理
 					text := qu.ObjToString(tmp[field])
 					if text != "" {
@@ -154,7 +188,17 @@ func QyxyStandard(query map[string]interface{}) bool {
 							}
 						}
 					}
-
+				} else if field == "company_status" { //企业类型处理
+					text := qu.ObjToString(tmp[field])
+					if text != "" {
+						text = strings.ReplaceAll(text, "(", "(")
+						text = strings.ReplaceAll(text, ")", ")")
+						if status := CompanyStatusMap[text]; status != "" {
+							esMap[field] = status
+						} else {
+							esMap[field] = "其他"
+						}
+					}
 				} else if strings.Contains(field, "date") || strings.Contains(field, "time") { //时间处理
 					if tmp[field] != nil {
 						if timeTmp, ok := tmp[field].(primitive.DateTime); ok {
@@ -185,8 +229,12 @@ func QyxyStandard(query map[string]interface{}) bool {
 				}
 			}
 			//list数据
+			stockName := []string{}
 			for field, fieldMap := range FieldListMap {
 				if list, ok := tmp[field].(primitive.A); ok && len(list) > 0 {
+					if len(list) > 500 {
+						list = list[:500]
+					}
 					tmpArrMgo := []map[string]interface{}{}
 					tmpArrEs := []map[string]interface{}{}
 					for _, l := range list {
@@ -196,6 +244,9 @@ func QyxyStandard(query map[string]interface{}) bool {
 						for f, b := range fieldMap {
 							if text := qu.ObjToString(m[f]); text != "" {
 								tmpMapMgo[f] = text
+								if f == "stock_name" {
+									stockName = append(stockName, text)
+								}
 								if b {
 									tmpMapEs[f] = text
 								}
@@ -216,6 +267,9 @@ func QyxyStandard(query map[string]interface{}) bool {
 					}
 				}
 			}
+			if len(stockName) > 0 {
+				esMap["stock_name"] = strings.Join(stockName, ",")
+			}
 			for _, field := range AllFieldListMap {
 				if list, ok := tmp[field].(primitive.A); ok && len(list) > 0 {
 					tmpArrMgo := []map[string]interface{}{}
@@ -254,10 +308,15 @@ func QyxyStandard(query map[string]interface{}) bool {
 									if f == "report_year" {
 										report_year = textstr
 										sortArr = append(sortArr, textstr)
-									} else if f == "company_phone" || f == "company_email" {
+									} else if f == "company_phone" && len(textstr) >= 7 {
+										tmpMap[f] = textstr
+									} else if f == "company_email" {
 										tmpMap[f] = textstr
 									}
 									if i == 0 { //字符串信息
+										if f == "company_phone" && len(textstr) < 7 {
+											continue
+										}
 										tmpMapMgo[f] = textstr
 									} else if i == 1 { //转金额
 										money := ObjToMoney(textstr) / 10000
@@ -267,19 +326,19 @@ func QyxyStandard(query map[string]interface{}) bool {
 							}
 						}
 					}
-					stock_nameArr := []string{}
-					if i_partners, ok := m["report_partners"].(primitive.A); ok && len(i_partners) > 0 { //股东信息
-						for _, par := range i_partners {
-							m := par.(map[string]interface{})
-							if stock_name, ok := m["stock_name"].(string); ok && stock_name != "" {
-								stock_nameArr = append(stock_nameArr, stock_name)
-							}
-						}
-					}
-					if len(stock_nameArr) > 0 {
-						stockname := strings.Join(stock_nameArr, ",")
-						tmpMap["stock_name"] = stockname
-					}
+					// stock_nameArr := []string{}
+					// if i_partners, ok := m["report_partners"].(primitive.A); ok && len(i_partners) > 0 { //股东信息
+					// 	for _, par := range i_partners {
+					// 		m := par.(map[string]interface{})
+					// 		if stock_name, ok := m["stock_name"].(string); ok && stock_name != "" {
+					// 			stock_nameArr = append(stock_nameArr, stock_name)
+					// 		}
+					// 	}
+					// }
+					// if len(stock_nameArr) > 0 {
+					// 	stockname := strings.Join(stock_nameArr, ",")
+					// 	tmpMap["stock_name"] = stockname
+					// }
 					sortMap[report_year] = tmpMap
 					if len(tmpMapMgo) > 0 {
 						tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
@@ -302,9 +361,11 @@ func QyxyStandard(query map[string]interface{}) bool {
 					continue
 				}
 				mgoMap[k] = v
+
 			}
 			//qu.Debug("esMap---", esMap)
 			// qu.Debug("mgoMap---", mgoMap)
+			// return
 			lock.Lock()
 			EsSaveCache <- esMap
 			update = append(update, map[string]interface{}{"$set": mgoMap})
@@ -371,15 +432,21 @@ func InitAddress() {
 	defer qu.Catch()
 	log.Println("Init Address...")
 	AddressMap = map[string]*City{}
+	AddressOldMap = map[string]*City{}
 	address, _ := Mgo.Find("address", nil, nil, nil, false, -1, -1)
 	for _, tmp := range *address {
 		code := qu.ObjToString(tmp["code"])
+		remark := fmt.Sprint(tmp["Remarks"])
 		city := &City{}
 		tmpjson, err := json.Marshal(tmp)
 		if err == nil {
 			json.Unmarshal(tmpjson, city)
 		}
-		AddressMap[code] = city
+		if remark == "已作废" {
+			AddressOldMap[code] = city
+		} else {
+			AddressMap[code] = city
+		}
 	}
 }
 
@@ -394,3 +461,15 @@ func InitQyStype() {
 		QyStypeMap[name] = prename
 	}
 }
+
+func InitCompanyStatus() {
+	defer qu.Catch()
+	log.Println("Init CompanyStatus...")
+	CompanyStatusMap = map[string]string{}
+	status, _ := Mgo.Find("company_status", nil, nil, nil, false, -1, -1)
+	for _, tmp := range *status {
+		old_status := qu.ObjToString(tmp["old"])
+		new_status := qu.ObjToString(tmp["new"])
+		CompanyStatusMap[old_status] = new_status
+	}
+}