maxiaoshan vor 5 Jahren
Ursprung
Commit
ba4f3001de
2 geänderte Dateien mit 211 neuen und 5 gelöschten Zeilen
  1. 4 4
      udpcreateindex/src/config.json
  2. 207 1
      udpcreateindex/src/qyxyindex.go

+ 4 - 4
udpcreateindex/src/config.json

@@ -30,11 +30,11 @@
     },
     "bidding": {
         "db": "mxs",
-        "collect": "bidding_test",
+        "collect": "test1",
         "index": "bidding_v2",
         "type": "bidding",
         "extractdb": "mxs",
-        "extractcollect": "extract",
+        "extractcollect": "test2",
         "indexfields":[ 
         "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo"
         ],
@@ -62,7 +62,7 @@
 		"addr": "192.168.3.207:27092",
 		"pool": 5,
         "db": "mxs",
-        "collect": "qyxy",
+        "collect": "test2",
         "index": "qyxy_ent",
         "type": "qyxy_ent"
     },
@@ -92,7 +92,7 @@
         "db": "mxs"
     },
     "elastic": {
-        "addr": "http://127.0.0.1:9800",
+        "addr": "http://192.168.3.128:9800",
         "pool": 12
     }
 }

+ 207 - 1
udpcreateindex/src/qyxyindex.go

@@ -14,7 +14,7 @@ var (
 	timeReg = regexp.MustCompile("[\\d]{4}-[\\d]{2}-[\\d]{2}")
 )
 
-func qyxyTask(q map[string]interface{}) {
+func qyxyTask1(q map[string]interface{}) {
 	defer util.Catch()
 	//	savelock := sync.Mutex{}
 	//连接
@@ -233,3 +233,209 @@ func qyxyTask(q map[string]interface{}) {
 	}
 	log.Println("create qyxy index...over", n)
 }
+
+func qyxyTask(q map[string]interface{}) {
+	defer util.Catch()
+	//	savelock := sync.Mutex{}
+	//连接
+	session := qyxydb.GetMgoConn(86400)
+	defer qyxydb.DestoryMongoConn(session)
+	//
+	c, _ := qyxy_ent["collect"].(string)
+	db, _ := qyxy_ent["db"].(string)
+	index, _ := qyxy_ent["index"].(string)
+	itype, _ := qyxy_ent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println("企业信用索引	查询语句:", q, "同步总数:", count, "elastic库:", index)
+	query := session.DB(db).C(c).Find(q).Iter()
+
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		//delete(tmp, "_id")
+		tmp["_id"] = tmp["company_id"]
+		// delete(tmp, "cancels")
+		// delete(tmp, "cancel_date")
+		// delete(tmp, "intellectuals")
+		// delete(tmp, "chattels")
+		// delete(tmp, "checks")
+		// delete(tmp, "revoke_date")
+		delete(tmp, "changes")
+		// delete(tmp, "partners")
+
+		// if tmp["establish_date"] != nil {
+		// 	establish_date_time, ok := tmp["establish_date"].(time.Time)
+		// 	if ok {
+		// 		tmp["establish_date"] = establish_date_time.Unix()
+		// 	} else {
+		// 		tmp["establish_date"] = 0
+		// 		util.Debug(tmp["company_id"], "establish_date")
+		// 	}
+		// }
+
+		// if tmp["lastupdatetime"] != nil {
+		// 	lastupdatetime_time, ok := tmp["lastupdatetime"].(time.Time)
+		// 	if ok {
+		// 		tmp["lastupdatetime"] = lastupdatetime_time.Unix()
+		// 	} else {
+		// 		tmp["lastupdatetime"] = 0
+		// 		util.Debug(tmp["company_id"], "lastupdatetime")
+		// 	}
+		// }
+
+		// if tmp["issue_date"] != nil {
+		// 	issue_date_time, ok := tmp["issue_date"].(time.Time)
+		// 	if ok {
+		// 		tmp["issue_date"] = issue_date_time.Unix()
+		// 	} else {
+		// 		tmp["issue_date"] = 0
+		// 		util.Debug(tmp["company_id"], "issue_date")
+		// 	}
+		// }
+
+		// if operation_startdate, ok := tmp["operation_startdate"].(string); operation_startdate != "" && ok {
+		// 	operation_startdate = timeReg.FindString(operation_startdate)
+		// 	tmp["operation_startdate"] = operation_startdate + " 00:00:00"
+		// }
+
+		// if operation_enddate, ok := tmp["operation_enddate"].(string); operation_enddate != "" && ok {
+		// 	operation_enddate = timeReg.FindString(operation_enddate)
+		// 	tmp["operation_enddate"] = operation_enddate + " 00:00:00"
+		// }
+
+		// //operations
+		// if operations, ok := tmp["operations"].([]interface{}); ok && len(operations) > 0 {
+		// 	for _, operation := range operations {
+		// 		if tmp1, ok := operation.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
+		// 			if included_time, ok := tmp1["included_time"].(string); ok && included_time != "" {
+		// 				included_time = timeReg.FindString(included_time)
+		// 				tmp1["included_time"] = included_time + " 00:00:00"
+		// 			}
+		// 			if removed_time, ok := tmp1["removed_time"].(string); ok && removed_time != "" {
+		// 				removed_time = timeReg.FindString(removed_time)
+		// 				tmp1["removed_time"] = removed_time + " 00:00:00"
+		// 			}
+		// 		}
+		// 	}
+		// }
+		// //punishes
+		// if punishes, ok := tmp["punishes"].([]interface{}); ok && len(punishes) > 0 {
+		// 	for _, punishe := range punishes {
+		// 		if tmp1, ok := punishe.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
+		// 			if public_date, ok := tmp1["public_date"].(string); ok && public_date != "" {
+		// 				public_date = timeReg.FindString(public_date)
+		// 				tmp1["public_date"] = public_date + " 00:00:00"
+		// 			}
+		// 			if punish_date, ok := tmp1["punish_date"].(string); ok && punish_date != "" {
+		// 				punish_date = timeReg.FindString(punish_date)
+		// 				tmp1["punish_date"] = punish_date + " 00:00:00"
+		// 			}
+		// 		}
+		// 	}
+		// }
+		// //annual_reports
+		// if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
+		// 	for _, annual_report := range annual_reports {
+		// 		if tmp1, ok := annual_report.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
+		// 			if report_changes, ok := tmp1["report_changes"].([]interface{}); ok && len(report_changes) > 0 {
+		// 				for _, report_change := range report_changes {
+		// 					if tmp2, ok := report_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
+		// 						if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
+		// 							change_date = timeReg.FindString(change_date)
+		// 							tmp2["change_date"] = change_date + " 00:00:00"
+		// 						}
+		// 					}
+		// 				}
+		// 			}
+		// 			if report_partners, ok := tmp1["report_partners"].([]interface{}); ok && len(report_partners) > 0 {
+		// 				for _, report_partner := range report_partners {
+		// 					if tmp2, ok := report_partner.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
+		// 						if stock_realdate, ok := tmp2["stock_realdate"].(string); ok && stock_realdate != "" {
+		// 							stock_realdate = timeReg.FindString(stock_realdate)
+		// 							tmp2["stock_realdate"] = stock_realdate + " 00:00:00"
+		// 						}
+		// 						if stock_date, ok := tmp2["stock_date"].(string); ok && stock_date != "" {
+		// 							stock_date = timeReg.FindString(stock_date)
+		// 							tmp2["stock_date"] = stock_date + " 00:00:00"
+		// 						}
+		// 					}
+		// 				}
+		// 			}
+		// 			if report_equity_changes, ok := tmp1["report_equity_changes"].([]interface{}); ok && len(report_equity_changes) > 0 {
+		// 				for _, report_equity_change := range report_equity_changes {
+		// 					if tmp2, ok := report_equity_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
+		// 						if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
+		// 							change_date = timeReg.FindString(change_date)
+		// 							tmp2["change_date"] = change_date + " 00:00:00"
+		// 						}
+		// 					}
+		// 				}
+		// 			}
+		// 			if report_out_guarantees, ok := tmp1["report_out_guarantees"].([]interface{}); ok && len(report_out_guarantees) > 0 {
+		// 				for _, report_out_guarantee := range report_out_guarantees {
+		// 					if tmp2, ok := report_out_guarantee.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
+		// 						if perform_time, ok := tmp2["perform_time"].(string); ok && perform_time != "" {
+		// 							perform_time = timeReg.FindString(perform_time)
+		// 							tmp2["perform_time"] = perform_time + " 00:00:00"
+		// 						}
+		// 						if guarantee_time, ok := tmp2["guarantee_time"].(string); ok && guarantee_time != "" {
+		// 							guarantee_time = timeReg.FindString(guarantee_time)
+		// 							tmp2["guarantee_time"] = guarantee_time + " 00:00:00"
+		// 						}
+		// 					}
+		// 				}
+		// 			}
+
+		// 		}
+		// 	}
+		// }
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+
+		// n++
+		// savelock.Lock()
+		// arr = append(arr, tmp)
+		// //生索引
+		// if len(arr) >= savesizei-1 {
+		// 	tmps := arr
+		// 	elastic.BulkSave(index, itype, &tmps, true)
+		// 	time.Sleep(1 * time.Second)
+		// 	arr = []map[string]interface{}{}
+		// }
+		// savelock.Unlock()
+		// //计数
+		// if n%savesizei == 0 {
+		// 	log.Println("当前:", n)
+		// }
+		tmp = make(map[string]interface{})
+
+	}
+	// savelock.Lock()
+	// if len(arr) > 0 {
+	// 	tmps := arr
+	// 	elastic.BulkSave(index, itype, &tmps, true)
+	// }
+	// savelock.Unlock()
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create qyxy index...over", n)
+}