package main import ( "log" "qfw/util" elastic "qfw/util/elastic" "regexp" //"sync" "time" ) var ( timeReg = regexp.MustCompile("[\\d]{4}-[\\d]{2}-[\\d]{2}") ) func qyxyTask1(q map[string]interface{}) { defer util.Catch() // savelock := sync.Mutex{} //连接 session := qyxydb.GetMgoConn(86400) defer qyxydb.DestoryMongoConn(session) // c, _ := qyxy_ent["collect"].(string) db, _ := qyxy_ent["db"].(string) index, _ := qyxy_ent["index"].(string) itype, _ := qyxy_ent["type"].(string) count, _ := session.DB(db).C(c).Find(&q).Count() savepool := make(chan bool, 10) log.Println("企业信用索引 查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(db).C(c).Find(q).Iter() arr := make([]map[string]interface{}, savesizei) var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { delete(tmp, "_id") tmp["_id"] = tmp["company_id"] delete(tmp, "cancels") delete(tmp, "cancel_date") delete(tmp, "intellectuals") delete(tmp, "chattels") delete(tmp, "checks") delete(tmp, "revoke_date") delete(tmp, "changes") delete(tmp, "partners") if tmp["establish_date"] != nil { establish_date_time, ok := tmp["establish_date"].(time.Time) if ok { tmp["establish_date"] = establish_date_time.Unix() } else { tmp["establish_date"] = 0 util.Debug(tmp["company_id"], "establish_date") } } if tmp["lastupdatetime"] != nil { lastupdatetime_time, ok := tmp["lastupdatetime"].(time.Time) if ok { tmp["lastupdatetime"] = lastupdatetime_time.Unix() } else { tmp["lastupdatetime"] = 0 util.Debug(tmp["company_id"], "lastupdatetime") } } if tmp["issue_date"] != nil { issue_date_time, ok := tmp["issue_date"].(time.Time) if ok { tmp["issue_date"] = issue_date_time.Unix() } else { tmp["issue_date"] = 0 util.Debug(tmp["company_id"], "issue_date") } } if operation_startdate, ok := tmp["operation_startdate"].(string); operation_startdate != "" && ok { operation_startdate = timeReg.FindString(operation_startdate) tmp["operation_startdate"] = operation_startdate + " 00:00:00" } if operation_enddate, ok := tmp["operation_enddate"].(string); operation_enddate != "" && ok { operation_enddate = timeReg.FindString(operation_enddate) tmp["operation_enddate"] = operation_enddate + " 00:00:00" } // if revoke_date, ok := tmp["revoke_date"].(string); revoke_date != "" && ok { // revoke_date = timeReg.FindString(revoke_date) // tmp["revoke_date"] = revoke_date + " 00:00:00" // } // if changes, ok := tmp["changes"].([]interface{}); ok && len(changes) > 0 { // for _, change := range changes { // if tmp1, ok := change.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 { // if change_date, ok := tmp1["change_date"].(string); ok && change_date != "" { // change_date = timeReg.FindString(change_date) // tmp1["change_date"] = change_date + " 00:00:00" // } // } // } // } //operations if operations, ok := tmp["operations"].([]interface{}); ok && len(operations) > 0 { for _, operation := range operations { if tmp1, ok := operation.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 { if included_time, ok := tmp1["included_time"].(string); ok && included_time != "" { included_time = timeReg.FindString(included_time) tmp1["included_time"] = included_time + " 00:00:00" } if removed_time, ok := tmp1["removed_time"].(string); ok && removed_time != "" { removed_time = timeReg.FindString(removed_time) tmp1["removed_time"] = removed_time + " 00:00:00" } } } } //punishes if punishes, ok := tmp["punishes"].([]interface{}); ok && len(punishes) > 0 { for _, punishe := range punishes { if tmp1, ok := punishe.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 { if public_date, ok := tmp1["public_date"].(string); ok && public_date != "" { public_date = timeReg.FindString(public_date) tmp1["public_date"] = public_date + " 00:00:00" } if punish_date, ok := tmp1["punish_date"].(string); ok && punish_date != "" { punish_date = timeReg.FindString(punish_date) tmp1["punish_date"] = punish_date + " 00:00:00" } } } } //annual_reports if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 { for _, annual_report := range annual_reports { if tmp1, ok := annual_report.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 { if report_changes, ok := tmp1["report_changes"].([]interface{}); ok && len(report_changes) > 0 { for _, report_change := range report_changes { if tmp2, ok := report_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 { if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" { change_date = timeReg.FindString(change_date) tmp2["change_date"] = change_date + " 00:00:00" } } } } if report_partners, ok := tmp1["report_partners"].([]interface{}); ok && len(report_partners) > 0 { for _, report_partner := range report_partners { if tmp2, ok := report_partner.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 { if stock_realdate, ok := tmp2["stock_realdate"].(string); ok && stock_realdate != "" { stock_realdate = timeReg.FindString(stock_realdate) tmp2["stock_realdate"] = stock_realdate + " 00:00:00" } if stock_date, ok := tmp2["stock_date"].(string); ok && stock_date != "" { stock_date = timeReg.FindString(stock_date) tmp2["stock_date"] = stock_date + " 00:00:00" } } } } if report_equity_changes, ok := tmp1["report_equity_changes"].([]interface{}); ok && len(report_equity_changes) > 0 { for _, report_equity_change := range report_equity_changes { if tmp2, ok := report_equity_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 { if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" { change_date = timeReg.FindString(change_date) tmp2["change_date"] = change_date + " 00:00:00" } } } } if report_out_guarantees, ok := tmp1["report_out_guarantees"].([]interface{}); ok && len(report_out_guarantees) > 0 { for _, report_out_guarantee := range report_out_guarantees { if tmp2, ok := report_out_guarantee.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 { if perform_time, ok := tmp2["perform_time"].(string); ok && perform_time != "" { perform_time = timeReg.FindString(perform_time) tmp2["perform_time"] = perform_time + " 00:00:00" } if guarantee_time, ok := tmp2["guarantee_time"].(string); ok && guarantee_time != "" { guarantee_time = timeReg.FindString(guarantee_time) tmp2["guarantee_time"] = guarantee_time + " 00:00:00" } } } } } } } arr[i] = tmp n++ if i == savesizei-1 { savepool <- true tmps := arr go func(tmpn *[]map[string]interface{}) { defer func() { <-savepool }() elastic.BulkSave(index, itype, tmpn, true) }(&tmps) i = 0 arr = make([]map[string]interface{}, savesizei) } if n%savesizei == 0 { log.Println("当前:", n) } // n++ // savelock.Lock() // arr = append(arr, tmp) // //生索引 // if len(arr) >= savesizei-1 { // tmps := arr // elastic.BulkSave(index, itype, &tmps, true) // time.Sleep(1 * time.Second) // arr = []map[string]interface{}{} // } // savelock.Unlock() // //计数 // if n%savesizei == 0 { // log.Println("当前:", n) // } tmp = make(map[string]interface{}) } // savelock.Lock() // if len(arr) > 0 { // tmps := arr // elastic.BulkSave(index, itype, &tmps, true) // } // savelock.Unlock() if i > 0 { elastic.BulkSave(index, itype, &arr, true) } log.Println("create qyxy index...over", n) } func qyxyTask(q map[string]interface{}) { defer util.Catch() // savelock := sync.Mutex{} //连接 session := qyxydb.GetMgoConn(86400) defer qyxydb.DestoryMongoConn(session) // c, _ := qyxy_ent["collect"].(string) db, _ := qyxy_ent["db"].(string) index, _ := qyxy_ent["index"].(string) itype, _ := qyxy_ent["type"].(string) count, _ := session.DB(db).C(c).Find(&q).Count() savepool := make(chan bool, 10) log.Println("企业信用索引 查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(db).C(c).Find(q).Iter() arr := make([]map[string]interface{}, savesizei) var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { //delete(tmp, "_id") tmp["_id"] = tmp["company_id"] // delete(tmp, "cancels") // delete(tmp, "cancel_date") // delete(tmp, "intellectuals") // delete(tmp, "chattels") // delete(tmp, "checks") // delete(tmp, "revoke_date") delete(tmp, "changes") // delete(tmp, "partners") // if tmp["establish_date"] != nil { // establish_date_time, ok := tmp["establish_date"].(time.Time) // if ok { // tmp["establish_date"] = establish_date_time.Unix() // } else { // tmp["establish_date"] = 0 // util.Debug(tmp["company_id"], "establish_date") // } // } // if tmp["lastupdatetime"] != nil { // lastupdatetime_time, ok := tmp["lastupdatetime"].(time.Time) // if ok { // tmp["lastupdatetime"] = lastupdatetime_time.Unix() // } else { // tmp["lastupdatetime"] = 0 // util.Debug(tmp["company_id"], "lastupdatetime") // } // } // if tmp["issue_date"] != nil { // issue_date_time, ok := tmp["issue_date"].(time.Time) // if ok { // tmp["issue_date"] = issue_date_time.Unix() // } else { // tmp["issue_date"] = 0 // util.Debug(tmp["company_id"], "issue_date") // } // } // if operation_startdate, ok := tmp["operation_startdate"].(string); operation_startdate != "" && ok { // operation_startdate = timeReg.FindString(operation_startdate) // tmp["operation_startdate"] = operation_startdate + " 00:00:00" // } // if operation_enddate, ok := tmp["operation_enddate"].(string); operation_enddate != "" && ok { // operation_enddate = timeReg.FindString(operation_enddate) // tmp["operation_enddate"] = operation_enddate + " 00:00:00" // } // //operations // if operations, ok := tmp["operations"].([]interface{}); ok && len(operations) > 0 { // for _, operation := range operations { // if tmp1, ok := operation.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 { // if included_time, ok := tmp1["included_time"].(string); ok && included_time != "" { // included_time = timeReg.FindString(included_time) // tmp1["included_time"] = included_time + " 00:00:00" // } // if removed_time, ok := tmp1["removed_time"].(string); ok && removed_time != "" { // removed_time = timeReg.FindString(removed_time) // tmp1["removed_time"] = removed_time + " 00:00:00" // } // } // } // } // //punishes // if punishes, ok := tmp["punishes"].([]interface{}); ok && len(punishes) > 0 { // for _, punishe := range punishes { // if tmp1, ok := punishe.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 { // if public_date, ok := tmp1["public_date"].(string); ok && public_date != "" { // public_date = timeReg.FindString(public_date) // tmp1["public_date"] = public_date + " 00:00:00" // } // if punish_date, ok := tmp1["punish_date"].(string); ok && punish_date != "" { // punish_date = timeReg.FindString(punish_date) // tmp1["punish_date"] = punish_date + " 00:00:00" // } // } // } // } // //annual_reports // if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 { // for _, annual_report := range annual_reports { // if tmp1, ok := annual_report.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 { // if report_changes, ok := tmp1["report_changes"].([]interface{}); ok && len(report_changes) > 0 { // for _, report_change := range report_changes { // if tmp2, ok := report_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 { // if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" { // change_date = timeReg.FindString(change_date) // tmp2["change_date"] = change_date + " 00:00:00" // } // } // } // } // if report_partners, ok := tmp1["report_partners"].([]interface{}); ok && len(report_partners) > 0 { // for _, report_partner := range report_partners { // if tmp2, ok := report_partner.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 { // if stock_realdate, ok := tmp2["stock_realdate"].(string); ok && stock_realdate != "" { // stock_realdate = timeReg.FindString(stock_realdate) // tmp2["stock_realdate"] = stock_realdate + " 00:00:00" // } // if stock_date, ok := tmp2["stock_date"].(string); ok && stock_date != "" { // stock_date = timeReg.FindString(stock_date) // tmp2["stock_date"] = stock_date + " 00:00:00" // } // } // } // } // if report_equity_changes, ok := tmp1["report_equity_changes"].([]interface{}); ok && len(report_equity_changes) > 0 { // for _, report_equity_change := range report_equity_changes { // if tmp2, ok := report_equity_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 { // if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" { // change_date = timeReg.FindString(change_date) // tmp2["change_date"] = change_date + " 00:00:00" // } // } // } // } // if report_out_guarantees, ok := tmp1["report_out_guarantees"].([]interface{}); ok && len(report_out_guarantees) > 0 { // for _, report_out_guarantee := range report_out_guarantees { // if tmp2, ok := report_out_guarantee.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 { // if perform_time, ok := tmp2["perform_time"].(string); ok && perform_time != "" { // perform_time = timeReg.FindString(perform_time) // tmp2["perform_time"] = perform_time + " 00:00:00" // } // if guarantee_time, ok := tmp2["guarantee_time"].(string); ok && guarantee_time != "" { // guarantee_time = timeReg.FindString(guarantee_time) // tmp2["guarantee_time"] = guarantee_time + " 00:00:00" // } // } // } // } // } // } // } arr[i] = tmp n++ if i == savesizei-1 { savepool <- true tmps := arr go func(tmpn *[]map[string]interface{}) { defer func() { <-savepool }() elastic.BulkSave(index, itype, tmpn, true) }(&tmps) i = 0 arr = make([]map[string]interface{}, savesizei) } if n%savesizei == 0 { log.Println("当前:", n) } // n++ // savelock.Lock() // arr = append(arr, tmp) // //生索引 // if len(arr) >= savesizei-1 { // tmps := arr // elastic.BulkSave(index, itype, &tmps, true) // time.Sleep(1 * time.Second) // arr = []map[string]interface{}{} // } // savelock.Unlock() // //计数 // if n%savesizei == 0 { // log.Println("当前:", n) // } tmp = make(map[string]interface{}) } // savelock.Lock() // if len(arr) > 0 { // tmps := arr // elastic.BulkSave(index, itype, &tmps, true) // } // savelock.Unlock() if i > 0 { elastic.BulkSave(index, itype, &arr, true) } log.Println("create qyxy index...over", n) }