123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441 |
- package main
- import (
- "log"
- "qfw/util"
- elastic "qfw/util/elastic"
- "regexp"
- //"sync"
- "time"
- )
- var (
- timeReg = regexp.MustCompile("[\\d]{4}-[\\d]{2}-[\\d]{2}")
- )
- func qyxyTask1(q map[string]interface{}) {
- defer util.Catch()
- // savelock := sync.Mutex{}
- //连接
- session := qyxydb.GetMgoConn(86400)
- defer qyxydb.DestoryMongoConn(session)
- //
- c, _ := qyxy_ent["collect"].(string)
- db, _ := qyxy_ent["db"].(string)
- index, _ := qyxy_ent["index"].(string)
- itype, _ := qyxy_ent["type"].(string)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- log.Println("企业信用索引 查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Iter()
- arr := make([]map[string]interface{}, savesizei)
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- delete(tmp, "_id")
- tmp["_id"] = tmp["company_id"]
- delete(tmp, "cancels")
- delete(tmp, "cancel_date")
- delete(tmp, "intellectuals")
- delete(tmp, "chattels")
- delete(tmp, "checks")
- delete(tmp, "revoke_date")
- delete(tmp, "changes")
- delete(tmp, "partners")
- if tmp["establish_date"] != nil {
- establish_date_time, ok := tmp["establish_date"].(time.Time)
- if ok {
- tmp["establish_date"] = establish_date_time.Unix()
- } else {
- tmp["establish_date"] = 0
- util.Debug(tmp["company_id"], "establish_date")
- }
- }
- if tmp["lastupdatetime"] != nil {
- lastupdatetime_time, ok := tmp["lastupdatetime"].(time.Time)
- if ok {
- tmp["lastupdatetime"] = lastupdatetime_time.Unix()
- } else {
- tmp["lastupdatetime"] = 0
- util.Debug(tmp["company_id"], "lastupdatetime")
- }
- }
- if tmp["issue_date"] != nil {
- issue_date_time, ok := tmp["issue_date"].(time.Time)
- if ok {
- tmp["issue_date"] = issue_date_time.Unix()
- } else {
- tmp["issue_date"] = 0
- util.Debug(tmp["company_id"], "issue_date")
- }
- }
- if operation_startdate, ok := tmp["operation_startdate"].(string); operation_startdate != "" && ok {
- operation_startdate = timeReg.FindString(operation_startdate)
- tmp["operation_startdate"] = operation_startdate + " 00:00:00"
- }
- if operation_enddate, ok := tmp["operation_enddate"].(string); operation_enddate != "" && ok {
- operation_enddate = timeReg.FindString(operation_enddate)
- tmp["operation_enddate"] = operation_enddate + " 00:00:00"
- }
- // if revoke_date, ok := tmp["revoke_date"].(string); revoke_date != "" && ok {
- // revoke_date = timeReg.FindString(revoke_date)
- // tmp["revoke_date"] = revoke_date + " 00:00:00"
- // }
- // if changes, ok := tmp["changes"].([]interface{}); ok && len(changes) > 0 {
- // for _, change := range changes {
- // if tmp1, ok := change.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
- // if change_date, ok := tmp1["change_date"].(string); ok && change_date != "" {
- // change_date = timeReg.FindString(change_date)
- // tmp1["change_date"] = change_date + " 00:00:00"
- // }
- // }
- // }
- // }
- //operations
- if operations, ok := tmp["operations"].([]interface{}); ok && len(operations) > 0 {
- for _, operation := range operations {
- if tmp1, ok := operation.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
- if included_time, ok := tmp1["included_time"].(string); ok && included_time != "" {
- included_time = timeReg.FindString(included_time)
- tmp1["included_time"] = included_time + " 00:00:00"
- }
- if removed_time, ok := tmp1["removed_time"].(string); ok && removed_time != "" {
- removed_time = timeReg.FindString(removed_time)
- tmp1["removed_time"] = removed_time + " 00:00:00"
- }
- }
- }
- }
- //punishes
- if punishes, ok := tmp["punishes"].([]interface{}); ok && len(punishes) > 0 {
- for _, punishe := range punishes {
- if tmp1, ok := punishe.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
- if public_date, ok := tmp1["public_date"].(string); ok && public_date != "" {
- public_date = timeReg.FindString(public_date)
- tmp1["public_date"] = public_date + " 00:00:00"
- }
- if punish_date, ok := tmp1["punish_date"].(string); ok && punish_date != "" {
- punish_date = timeReg.FindString(punish_date)
- tmp1["punish_date"] = punish_date + " 00:00:00"
- }
- }
- }
- }
- //annual_reports
- if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
- for _, annual_report := range annual_reports {
- if tmp1, ok := annual_report.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
- if report_changes, ok := tmp1["report_changes"].([]interface{}); ok && len(report_changes) > 0 {
- for _, report_change := range report_changes {
- if tmp2, ok := report_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
- if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
- change_date = timeReg.FindString(change_date)
- tmp2["change_date"] = change_date + " 00:00:00"
- }
- }
- }
- }
- if report_partners, ok := tmp1["report_partners"].([]interface{}); ok && len(report_partners) > 0 {
- for _, report_partner := range report_partners {
- if tmp2, ok := report_partner.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
- if stock_realdate, ok := tmp2["stock_realdate"].(string); ok && stock_realdate != "" {
- stock_realdate = timeReg.FindString(stock_realdate)
- tmp2["stock_realdate"] = stock_realdate + " 00:00:00"
- }
- if stock_date, ok := tmp2["stock_date"].(string); ok && stock_date != "" {
- stock_date = timeReg.FindString(stock_date)
- tmp2["stock_date"] = stock_date + " 00:00:00"
- }
- }
- }
- }
- if report_equity_changes, ok := tmp1["report_equity_changes"].([]interface{}); ok && len(report_equity_changes) > 0 {
- for _, report_equity_change := range report_equity_changes {
- if tmp2, ok := report_equity_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
- if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
- change_date = timeReg.FindString(change_date)
- tmp2["change_date"] = change_date + " 00:00:00"
- }
- }
- }
- }
- if report_out_guarantees, ok := tmp1["report_out_guarantees"].([]interface{}); ok && len(report_out_guarantees) > 0 {
- for _, report_out_guarantee := range report_out_guarantees {
- if tmp2, ok := report_out_guarantee.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
- if perform_time, ok := tmp2["perform_time"].(string); ok && perform_time != "" {
- perform_time = timeReg.FindString(perform_time)
- tmp2["perform_time"] = perform_time + " 00:00:00"
- }
- if guarantee_time, ok := tmp2["guarantee_time"].(string); ok && guarantee_time != "" {
- guarantee_time = timeReg.FindString(guarantee_time)
- tmp2["guarantee_time"] = guarantee_time + " 00:00:00"
- }
- }
- }
- }
- }
- }
- }
- arr[i] = tmp
- n++
- if i == savesizei-1 {
- savepool <- true
- tmps := arr
- go func(tmpn *[]map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, tmpn, true)
- }(&tmps)
- i = 0
- arr = make([]map[string]interface{}, savesizei)
- }
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- // n++
- // savelock.Lock()
- // arr = append(arr, tmp)
- // //生索引
- // if len(arr) >= savesizei-1 {
- // tmps := arr
- // elastic.BulkSave(index, itype, &tmps, true)
- // time.Sleep(1 * time.Second)
- // arr = []map[string]interface{}{}
- // }
- // savelock.Unlock()
- // //计数
- // if n%savesizei == 0 {
- // log.Println("当前:", n)
- // }
- tmp = make(map[string]interface{})
- }
- // savelock.Lock()
- // if len(arr) > 0 {
- // tmps := arr
- // elastic.BulkSave(index, itype, &tmps, true)
- // }
- // savelock.Unlock()
- if i > 0 {
- elastic.BulkSave(index, itype, &arr, true)
- }
- log.Println("create qyxy index...over", n)
- }
- func qyxyTask(q map[string]interface{}) {
- defer util.Catch()
- // savelock := sync.Mutex{}
- //连接
- session := qyxydb.GetMgoConn(86400)
- defer qyxydb.DestoryMongoConn(session)
- //
- c, _ := qyxy_ent["collect"].(string)
- db, _ := qyxy_ent["db"].(string)
- index, _ := qyxy_ent["index"].(string)
- itype, _ := qyxy_ent["type"].(string)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- log.Println("企业信用索引 查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Iter()
- arr := make([]map[string]interface{}, savesizei)
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- //delete(tmp, "_id")
- tmp["_id"] = tmp["company_id"]
- // delete(tmp, "cancels")
- // delete(tmp, "cancel_date")
- // delete(tmp, "intellectuals")
- // delete(tmp, "chattels")
- // delete(tmp, "checks")
- // delete(tmp, "revoke_date")
- delete(tmp, "changes")
- // delete(tmp, "partners")
- // if tmp["establish_date"] != nil {
- // establish_date_time, ok := tmp["establish_date"].(time.Time)
- // if ok {
- // tmp["establish_date"] = establish_date_time.Unix()
- // } else {
- // tmp["establish_date"] = 0
- // util.Debug(tmp["company_id"], "establish_date")
- // }
- // }
- // if tmp["lastupdatetime"] != nil {
- // lastupdatetime_time, ok := tmp["lastupdatetime"].(time.Time)
- // if ok {
- // tmp["lastupdatetime"] = lastupdatetime_time.Unix()
- // } else {
- // tmp["lastupdatetime"] = 0
- // util.Debug(tmp["company_id"], "lastupdatetime")
- // }
- // }
- // if tmp["issue_date"] != nil {
- // issue_date_time, ok := tmp["issue_date"].(time.Time)
- // if ok {
- // tmp["issue_date"] = issue_date_time.Unix()
- // } else {
- // tmp["issue_date"] = 0
- // util.Debug(tmp["company_id"], "issue_date")
- // }
- // }
- // if operation_startdate, ok := tmp["operation_startdate"].(string); operation_startdate != "" && ok {
- // operation_startdate = timeReg.FindString(operation_startdate)
- // tmp["operation_startdate"] = operation_startdate + " 00:00:00"
- // }
- // if operation_enddate, ok := tmp["operation_enddate"].(string); operation_enddate != "" && ok {
- // operation_enddate = timeReg.FindString(operation_enddate)
- // tmp["operation_enddate"] = operation_enddate + " 00:00:00"
- // }
- // //operations
- // if operations, ok := tmp["operations"].([]interface{}); ok && len(operations) > 0 {
- // for _, operation := range operations {
- // if tmp1, ok := operation.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
- // if included_time, ok := tmp1["included_time"].(string); ok && included_time != "" {
- // included_time = timeReg.FindString(included_time)
- // tmp1["included_time"] = included_time + " 00:00:00"
- // }
- // if removed_time, ok := tmp1["removed_time"].(string); ok && removed_time != "" {
- // removed_time = timeReg.FindString(removed_time)
- // tmp1["removed_time"] = removed_time + " 00:00:00"
- // }
- // }
- // }
- // }
- // //punishes
- // if punishes, ok := tmp["punishes"].([]interface{}); ok && len(punishes) > 0 {
- // for _, punishe := range punishes {
- // if tmp1, ok := punishe.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
- // if public_date, ok := tmp1["public_date"].(string); ok && public_date != "" {
- // public_date = timeReg.FindString(public_date)
- // tmp1["public_date"] = public_date + " 00:00:00"
- // }
- // if punish_date, ok := tmp1["punish_date"].(string); ok && punish_date != "" {
- // punish_date = timeReg.FindString(punish_date)
- // tmp1["punish_date"] = punish_date + " 00:00:00"
- // }
- // }
- // }
- // }
- // //annual_reports
- // if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
- // for _, annual_report := range annual_reports {
- // if tmp1, ok := annual_report.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
- // if report_changes, ok := tmp1["report_changes"].([]interface{}); ok && len(report_changes) > 0 {
- // for _, report_change := range report_changes {
- // if tmp2, ok := report_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
- // if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
- // change_date = timeReg.FindString(change_date)
- // tmp2["change_date"] = change_date + " 00:00:00"
- // }
- // }
- // }
- // }
- // if report_partners, ok := tmp1["report_partners"].([]interface{}); ok && len(report_partners) > 0 {
- // for _, report_partner := range report_partners {
- // if tmp2, ok := report_partner.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
- // if stock_realdate, ok := tmp2["stock_realdate"].(string); ok && stock_realdate != "" {
- // stock_realdate = timeReg.FindString(stock_realdate)
- // tmp2["stock_realdate"] = stock_realdate + " 00:00:00"
- // }
- // if stock_date, ok := tmp2["stock_date"].(string); ok && stock_date != "" {
- // stock_date = timeReg.FindString(stock_date)
- // tmp2["stock_date"] = stock_date + " 00:00:00"
- // }
- // }
- // }
- // }
- // if report_equity_changes, ok := tmp1["report_equity_changes"].([]interface{}); ok && len(report_equity_changes) > 0 {
- // for _, report_equity_change := range report_equity_changes {
- // if tmp2, ok := report_equity_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
- // if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
- // change_date = timeReg.FindString(change_date)
- // tmp2["change_date"] = change_date + " 00:00:00"
- // }
- // }
- // }
- // }
- // if report_out_guarantees, ok := tmp1["report_out_guarantees"].([]interface{}); ok && len(report_out_guarantees) > 0 {
- // for _, report_out_guarantee := range report_out_guarantees {
- // if tmp2, ok := report_out_guarantee.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
- // if perform_time, ok := tmp2["perform_time"].(string); ok && perform_time != "" {
- // perform_time = timeReg.FindString(perform_time)
- // tmp2["perform_time"] = perform_time + " 00:00:00"
- // }
- // if guarantee_time, ok := tmp2["guarantee_time"].(string); ok && guarantee_time != "" {
- // guarantee_time = timeReg.FindString(guarantee_time)
- // tmp2["guarantee_time"] = guarantee_time + " 00:00:00"
- // }
- // }
- // }
- // }
- // }
- // }
- // }
- arr[i] = tmp
- n++
- if i == savesizei-1 {
- savepool <- true
- tmps := arr
- go func(tmpn *[]map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, tmpn, true)
- }(&tmps)
- i = 0
- arr = make([]map[string]interface{}, savesizei)
- }
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- // n++
- // savelock.Lock()
- // arr = append(arr, tmp)
- // //生索引
- // if len(arr) >= savesizei-1 {
- // tmps := arr
- // elastic.BulkSave(index, itype, &tmps, true)
- // time.Sleep(1 * time.Second)
- // arr = []map[string]interface{}{}
- // }
- // savelock.Unlock()
- // //计数
- // if n%savesizei == 0 {
- // log.Println("当前:", n)
- // }
- tmp = make(map[string]interface{})
- }
- // savelock.Lock()
- // if len(arr) > 0 {
- // tmps := arr
- // elastic.BulkSave(index, itype, &tmps, true)
- // }
- // savelock.Unlock()
- if i > 0 {
- elastic.BulkSave(index, itype, &arr, true)
- }
- log.Println("create qyxy index...over", n)
- }
|