maxiaoshan 5 жил өмнө
parent
commit
c7ec6af8cc

+ 6 - 6
udpcreateindex/src/config.json

@@ -59,12 +59,12 @@
         "type": "project"
     },
     "qyxy_ent": {
-		"addr": "172.17.145.163:27082",
+		"addr": "192.168.3.207:27092",
 		"pool": 5,
-        "db": "ent2020",
-        "collect": "qyxy_ent",
-        "index": "qyxy",
-        "type": "qyxy"
+        "db": "mxs",
+        "collect": "qyxy",
+        "index": "qyxy_ent",
+        "type": "qyxy_ent"
     },
     "standard": {
  		"addr": "192.168.3.207:27092",
@@ -92,7 +92,7 @@
         "db": "mxs"
     },
     "elastic": {
-        "addr": "http://192.168.3.128:9800",
+        "addr": "http://127.0.0.1:9800",
         "pool": 12
     }
 }

+ 1 - 0
udpcreateindex/src/main.go

@@ -115,6 +115,7 @@ func init() {
 
 func main() {
 	go task_index()
+	//task_qyxyindex()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)

+ 183 - 0
udpcreateindex/src/qyxyindex.go

@@ -4,10 +4,19 @@ import (
 	"log"
 	"qfw/util"
 	elastic "qfw/util/elastic"
+	"regexp"
+
+	//"sync"
+	"time"
+)
+
+var (
+	timeReg = regexp.MustCompile("[\\d]{4}-[\\d]{2}-[\\d]{2}")
 )
 
 func qyxyTask(q map[string]interface{}) {
 	defer util.Catch()
+	//	savelock := sync.Mutex{}
 	//连接
 	session := qyxydb.GetMgoConn(86400)
 	defer qyxydb.DestoryMongoConn(session)
@@ -26,6 +35,157 @@ func qyxyTask(q map[string]interface{}) {
 	var n int
 	i := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		delete(tmp, "_id")
+		tmp["_id"] = tmp["company_id"]
+		delete(tmp, "cancels")
+		delete(tmp, "cancel_date")
+		delete(tmp, "intellectuals")
+		delete(tmp, "chattels")
+		delete(tmp, "checks")
+		delete(tmp, "revoke_date")
+		delete(tmp, "changes")
+		delete(tmp, "partners")
+
+		if tmp["establish_date"] != nil {
+			establish_date_time, ok := tmp["establish_date"].(time.Time)
+			if ok {
+				tmp["establish_date"] = establish_date_time.Unix()
+			} else {
+				tmp["establish_date"] = 0
+				util.Debug(tmp["company_id"], "establish_date")
+			}
+		}
+
+		if tmp["lastupdatetime"] != nil {
+			lastupdatetime_time, ok := tmp["lastupdatetime"].(time.Time)
+			if ok {
+				tmp["lastupdatetime"] = lastupdatetime_time.Unix()
+			} else {
+				tmp["lastupdatetime"] = 0
+				util.Debug(tmp["company_id"], "lastupdatetime")
+			}
+		}
+
+		if tmp["issue_date"] != nil {
+			issue_date_time, ok := tmp["issue_date"].(time.Time)
+			if ok {
+				tmp["issue_date"] = issue_date_time.Unix()
+			} else {
+				tmp["issue_date"] = 0
+				util.Debug(tmp["company_id"], "issue_date")
+			}
+		}
+
+		if operation_startdate, ok := tmp["operation_startdate"].(string); operation_startdate != "" && ok {
+			operation_startdate = timeReg.FindString(operation_startdate)
+			tmp["operation_startdate"] = operation_startdate + " 00:00:00"
+		}
+
+		if operation_enddate, ok := tmp["operation_enddate"].(string); operation_enddate != "" && ok {
+			operation_enddate = timeReg.FindString(operation_enddate)
+			tmp["operation_enddate"] = operation_enddate + " 00:00:00"
+		}
+
+		// if revoke_date, ok := tmp["revoke_date"].(string); revoke_date != "" && ok {
+		// 	revoke_date = timeReg.FindString(revoke_date)
+		// 	tmp["revoke_date"] = revoke_date + " 00:00:00"
+		// }
+		// if changes, ok := tmp["changes"].([]interface{}); ok && len(changes) > 0 {
+		// 	for _, change := range changes {
+		// 		if tmp1, ok := change.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
+		// 			if change_date, ok := tmp1["change_date"].(string); ok && change_date != "" {
+		// 				change_date = timeReg.FindString(change_date)
+		// 				tmp1["change_date"] = change_date + " 00:00:00"
+		// 			}
+		// 		}
+		// 	}
+		// }
+		//operations
+		if operations, ok := tmp["operations"].([]interface{}); ok && len(operations) > 0 {
+			for _, operation := range operations {
+				if tmp1, ok := operation.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
+					if included_time, ok := tmp1["included_time"].(string); ok && included_time != "" {
+						included_time = timeReg.FindString(included_time)
+						tmp1["included_time"] = included_time + " 00:00:00"
+					}
+					if removed_time, ok := tmp1["removed_time"].(string); ok && removed_time != "" {
+						removed_time = timeReg.FindString(removed_time)
+						tmp1["removed_time"] = removed_time + " 00:00:00"
+					}
+				}
+			}
+		}
+		//punishes
+		if punishes, ok := tmp["punishes"].([]interface{}); ok && len(punishes) > 0 {
+			for _, punishe := range punishes {
+				if tmp1, ok := punishe.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
+					if public_date, ok := tmp1["public_date"].(string); ok && public_date != "" {
+						public_date = timeReg.FindString(public_date)
+						tmp1["public_date"] = public_date + " 00:00:00"
+					}
+					if punish_date, ok := tmp1["punish_date"].(string); ok && punish_date != "" {
+						punish_date = timeReg.FindString(punish_date)
+						tmp1["punish_date"] = punish_date + " 00:00:00"
+					}
+				}
+			}
+		}
+		//annual_reports
+		if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
+			for _, annual_report := range annual_reports {
+				if tmp1, ok := annual_report.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
+					if report_changes, ok := tmp1["report_changes"].([]interface{}); ok && len(report_changes) > 0 {
+						for _, report_change := range report_changes {
+							if tmp2, ok := report_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
+								if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
+									change_date = timeReg.FindString(change_date)
+									tmp2["change_date"] = change_date + " 00:00:00"
+								}
+							}
+						}
+					}
+					if report_partners, ok := tmp1["report_partners"].([]interface{}); ok && len(report_partners) > 0 {
+						for _, report_partner := range report_partners {
+							if tmp2, ok := report_partner.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
+								if stock_realdate, ok := tmp2["stock_realdate"].(string); ok && stock_realdate != "" {
+									stock_realdate = timeReg.FindString(stock_realdate)
+									tmp2["stock_realdate"] = stock_realdate + " 00:00:00"
+								}
+								if stock_date, ok := tmp2["stock_date"].(string); ok && stock_date != "" {
+									stock_date = timeReg.FindString(stock_date)
+									tmp2["stock_date"] = stock_date + " 00:00:00"
+								}
+							}
+						}
+					}
+					if report_equity_changes, ok := tmp1["report_equity_changes"].([]interface{}); ok && len(report_equity_changes) > 0 {
+						for _, report_equity_change := range report_equity_changes {
+							if tmp2, ok := report_equity_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
+								if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
+									change_date = timeReg.FindString(change_date)
+									tmp2["change_date"] = change_date + " 00:00:00"
+								}
+							}
+						}
+					}
+					if report_out_guarantees, ok := tmp1["report_out_guarantees"].([]interface{}); ok && len(report_out_guarantees) > 0 {
+						for _, report_out_guarantee := range report_out_guarantees {
+							if tmp2, ok := report_out_guarantee.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
+								if perform_time, ok := tmp2["perform_time"].(string); ok && perform_time != "" {
+									perform_time = timeReg.FindString(perform_time)
+									tmp2["perform_time"] = perform_time + " 00:00:00"
+								}
+								if guarantee_time, ok := tmp2["guarantee_time"].(string); ok && guarantee_time != "" {
+									guarantee_time = timeReg.FindString(guarantee_time)
+									tmp2["guarantee_time"] = guarantee_time + " 00:00:00"
+								}
+							}
+						}
+					}
+
+				}
+			}
+		}
 		arr[i] = tmp
 		n++
 		if i == savesizei-1 {
@@ -43,8 +203,31 @@ func qyxyTask(q map[string]interface{}) {
 		if n%savesizei == 0 {
 			log.Println("当前:", n)
 		}
+
+		// n++
+		// savelock.Lock()
+		// arr = append(arr, tmp)
+		// //生索引
+		// if len(arr) >= savesizei-1 {
+		// 	tmps := arr
+		// 	elastic.BulkSave(index, itype, &tmps, true)
+		// 	time.Sleep(1 * time.Second)
+		// 	arr = []map[string]interface{}{}
+		// }
+		// savelock.Unlock()
+		// //计数
+		// if n%savesizei == 0 {
+		// 	log.Println("当前:", n)
+		// }
 		tmp = make(map[string]interface{})
+
 	}
+	// savelock.Lock()
+	// if len(arr) > 0 {
+	// 	tmps := arr
+	// 	elastic.BulkSave(index, itype, &tmps, true)
+	// }
+	// savelock.Unlock()
 	if i > 0 {
 		elastic.BulkSave(index, itype, &arr, true)
 	}

+ 1 - 1
udpcreateindex/src/task.go

@@ -13,7 +13,7 @@ func task_index() {
 	c := cron.New()
 	c.AddFunc("20 30 5 * * *", func() { task_projects() })
 	c.AddFunc("0 15 * * * *", func() { task_biddingfile() }) //每两小时执行一次
-	c.AddFunc("0 22 14 * * *", func() { task_qyxyindex() })
+	//c.AddFunc("0 22 14 * * *", func() { task_qyxyindex() })
 	c.Start()
 }