123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641 |
- package main
- import (
- util "app.yhyue.com/data_processing/common_utils"
- "app.yhyue.com/data_processing/common_utils/log"
- "app.yhyue.com/data_processing/common_utils/mongodb"
- "app.yhyue.com/data_processing/common_utils/redis"
- "fmt"
- "go.mongodb.org/mongo-driver/bson"
- "go.uber.org/zap"
- "math/rand"
- "proposed_project/config"
- "strconv"
- "strings"
- "sync"
- "time"
- "unicode/utf8"
- )
- var (
- saveBasePool = make(chan map[string]interface{}, 5000)
- saveBaseSp = make(chan bool, 1)
- saveRcPool = make(chan map[string]interface{}, 5000)
- saveRcSp = make(chan bool, 1)
- saveCtPool = make(chan map[string]interface{}, 5000)
- saveCtSp = make(chan bool, 1)
- saveCyPool = make(chan map[string]interface{}, 5000)
- saveCySp = make(chan bool, 1)
- saveEntPool = make(chan map[string]interface{}, 5000)
- saveEntSp = make(chan bool, 1)
- BaseField = []string{"lasttime", "firsttime", "proposed_number", "proposed_id", "follow_num", "title", "projectname", "approvecode", "approvedept",
- "approvenumber", "project_stage_code", "total_investment", "funds", "owner", "name_id", "ownerclass_code", "projecttype_code", "projectaddr",
- "projectperiod", "project_startdate", "project_completedate", "industry_code", "approvestatus", "project_scale",
- "category_code", "nature_code", "construction_area", "floor_area", "area_code", "city_code", "createtime"}
- RecordField = []string{"proposed_id", "infoid", "follow_num", "project_stage_code", "title", "project_scale", "publishtime", "jybxhref", "createtime"}
- ContactField = []string{"proposed_id", "infoid", "follow_num", "name_id", "name", "contact_name", "contact_tel", "contact_addr", "createtime"}
- CategoryField = []string{"proposed_id", "labelcode", "labelvalues", "labelweight", "createtime"}
- EntField = []string{"proposed_id", "name_id", "name", "area_code", "city_code", "address", "createtime", "identity_type"}
- AreaCode = make(map[string]string, 5000)
- TagCode = make(map[string]interface{}, 100)
- )
- func InitArea() {
- info := MysqlTool.Find("d_area_code_back", nil, "", "", -1, -1)
- for _, m := range *info {
- var key string
- for i, v := range []string{"area", "city", "district"} {
- if i == 0 && util.ObjToString(m[v]) != "" {
- key = util.ObjToString(m[v])
- } else if util.ObjToString(m[v]) != "" {
- key += "," + util.ObjToString(m[v])
- }
- }
- AreaCode[key] = util.ObjToString(m["code"])
- }
- log.Info("InitField", zap.Int("AreaCode", len(AreaCode)))
- }
- func InitTagCode() {
- info, _ := MgoBid.Find("nzj_rule", nil, nil, bson.M{"label_name": 1, "label": 1, "code": 1}, false, -1, -1)
- for _, m := range *info {
- lname := util.ObjToString(m["label_name"])
- lb := util.ObjToString(m["label"])
- code := util.ObjToString(m["code"])
- if lname == "sub_category" || lname == "top_category" {
- lname = "category"
- }
- if TagCode[lname] != nil {
- m1 := TagCode[lname].(map[string]interface{})
- m1[code] = lb
- TagCode[lname] = m1
- } else {
- m1 := make(map[string]interface{})
- m1[code] = lb
- TagCode[lname] = m1
- }
- }
- TagCode["nature"].(map[string]interface{})["00"] = "其它"
- TagCode["project_stage"].(map[string]interface{})["00"] = "其它"
- log.Info("InitTagCode", zap.Any("TagCode", TagCode))
- }
- func taskTidb(q map[string]interface{}) {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, config.Conf.Serve.Thread)
- wg := &sync.WaitGroup{}
- var query *mongodb.MgoIter
- if q != nil && len(q) > 0 {
- query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(q).Select(SelectF).Iter()
- } else {
- query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(nil).Select(SelectF).Iter()
- }
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%20000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- if t := util.Int64All(tmp["pici"]); t > pici {
- pici = t
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- saveM := make(map[string]interface{})
- for _, f := range BaseField {
- if f == "lasttime" || f == "firsttime" {
- if t := util.Int64All(tmp[f]); t > 0 {
- saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- } else if f == "proposed_id" {
- saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
- } else if f == "area_code" {
- if tmp["area"] != nil {
- saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
- }
- } else if f == "city_code" {
- if tmp["area"] != nil && tmp["city"] != nil {
- c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
- saveM[f] = AreaCode[c]
- }
- } else if f == "owner" {
- if v := util.ObjToString(tmp[f]); v != "" {
- if utf8.RuneCountInString(v) < 100 {
- saveM[f] = v
- }
- }
- } else if f == "name_id" {
- if b := util.ObjToString(tmp["owner"]); b != "" {
- if eid := redis.GetStr("ent_id", b); eid != "" {
- saveM["name_id"] = strings.Split(eid, "_")[0]
- }
- }
- } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
- if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
- t := util.Int64All(tmp[f])
- saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- } else if f == "createtime" {
- saveM[f] = time.Now().Format(util.Date_Full_Layout)
- } else if f == "total_investment" {
- text := util.ObjToString(tmp[f])
- capital := ObjToMoney(text)
- capital = capital / 10000
- if capital != 0 {
- capital, _ = util.FormatFloat(capital, 6)
- saveM[f] = capital
- }
- } else if f == "approvestatus" {
- if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
- saveM[f] = tmp[f]
- }
- } else if f == "proposed_number" {
- if tmp[f] == nil {
- now := time.Now().Unix()
- st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
- parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
- rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
- saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
- } else {
- saveM[f] = tmp[f]
- }
- } else if f == "approvecode" {
- if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
- saveM[f] = tmp[f]
- }
- } else if f == "floor_area" {
- if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
- saveM[f] = tmp[f]
- }
- } else {
- if tmp[f] != nil {
- saveM[f] = tmp[f]
- }
- }
- }
- saveBasePool <- saveM
- saveCy := make(map[string]interface{})
- saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
- saveCy["labelcode"] = "category_code"
- saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
- saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
- saveCyPool <- saveCy
- if ow := util.ObjToString(tmp["owner"]); ow != "" {
- saveEnt := make(map[string]interface{})
- saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
- saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
- saveEnt["name"] = ow
- if eid := redis.GetStr("ent_id", ow); eid != "" {
- arr := strings.Split(eid, "_")
- saveEnt["name_id"] = arr[0]
- if len(arr) == 2 {
- saveEnt["area_code"] = arr[1]
- } else if len(arr) == 3 {
- saveEnt["city_code"] = arr[2]
- }
- info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
- if info != nil && len(*info) > 0 {
- saveEnt["address"] = (*info)[0]["address"]
- }
- }
- saveEnt["identity_type"] = 1
- saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
- saveEntPool <- saveEnt
- }
- for _, v := range tmp["list"].([]interface{}) {
- saveRc := make(map[string]interface{})
- v1 := v.(map[string]interface{})
- saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
- infoid := util.ObjToString(v1["infoid"])
- saveRc["infoid"] = infoid
- saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
- saveRc["follow_num"] = v1["follow_num"]
- saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
- saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
- saveRc["title"] = util.ObjToString(v1["title"])
- if t := util.Int64All(v1["publishtime"]); t > 0 {
- saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
- saveRcPool <- saveRc
- if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
- saveCt := make(map[string]interface{})
- saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
- saveCt["infoid"] = infoid
- saveCt["follow_num"] = tmp["follow_num"]
- if b := util.ObjToString(tmp["owner"]); b != "" {
- saveCt["name"] = util.ObjToString(tmp["owner"])
- if eid := redis.GetStr("ent_id", b); eid != "" {
- saveCt["name_id"] = strings.Split(eid, "_")[0]
- }
- }
- if p := util.ObjToString(v1["project_person"]); p != "" {
- saveCt["contact_name"] = p
- }
- if p := util.ObjToString(v1["project_phone"]); p != "" {
- saveCt["contact_tel"] = p
- }
- if p := util.ObjToString(v1["projectaddr"]); p != "" {
- saveCt["contact_addr"] = p
- }
- saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
- saveCtPool <- saveCt
- }
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
- }
- func taskTidb_add(q map[string]interface{}) {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, config.Conf.Serve.Thread)
- wg := &sync.WaitGroup{}
- log.Info("taskTidb_add", zap.Any("q: ", q))
- query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(q).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%200 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- if t := util.Int64All(tmp["pici"]); t > pici {
- pici = t
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- go func() {
- <-ch
- wg.Done()
- }()
- taskB(tmp)
- taskE(tmp)
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- TaskSingle = true
- log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
- }
- func taskB(tmp map[string]interface{}) {
- saveM := make(map[string]interface{})
- for _, f := range BaseField {
- if f == "lasttime" || f == "firsttime" {
- if t := util.Int64All(tmp[f]); t > 0 {
- saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- } else if f == "proposed_id" {
- saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
- } else if f == "area_code" {
- if tmp["area"] != nil {
- saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
- }
- } else if f == "city_code" {
- if tmp["area"] != nil && tmp["city"] != nil {
- c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
- saveM[f] = AreaCode[c]
- }
- } else if f == "owner" {
- if v := util.ObjToString(tmp[f]); v != "" {
- if utf8.RuneCountInString(v) < 100 {
- saveM[f] = v
- }
- }
- } else if f == "name_id" {
- if b := util.ObjToString(tmp["owner"]); b != "" {
- if eid := redis.GetStr("ent_id", b); eid != "" {
- saveM["name_id"] = strings.Split(eid, "_")[0]
- }
- }
- } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
- if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
- t := util.Int64All(tmp[f])
- saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- } else if f == "createtime" {
- saveM[f] = time.Now().Format(util.Date_Full_Layout)
- } else if f == "total_investment" {
- text := util.ObjToString(tmp[f])
- capital := ObjToMoney(text)
- capital = capital / 10000
- if capital != 0 {
- capital, _ = util.FormatFloat(capital, 6)
- saveM[f] = capital
- }
- } else if f == "approvestatus" {
- if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
- saveM[f] = tmp[f]
- }
- } else if f == "proposed_number" {
- if tmp[f] == nil {
- now := time.Now().Unix()
- st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
- parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
- rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
- saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
- } else {
- saveM[f] = tmp[f]
- }
- } else if f == "approvecode" {
- if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
- saveM[f] = tmp[f]
- }
- } else if f == "floor_area" {
- if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
- saveM[f] = tmp[f]
- }
- } else {
- if tmp[f] != nil {
- saveM[f] = tmp[f]
- }
- }
- }
- info := MysqlTool.FindOne("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
- if info != nil && len(*info) > 0 {
- saveM["updatetime"] = time.Now().Format(util.Date_Full_Layout)
- MysqlTool.Update("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, saveM)
- } else {
- MysqlTool.Insert("dwd_f_nzj_baseinfo", saveM)
- }
- info1 := MysqlTool.FindOne("dwd_f_nzj_category_tags", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
- if info1 != nil && len(*info1) > 0 {
- } else {
- saveCy := make(map[string]interface{})
- saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
- saveCy["labelcode"] = "category_code"
- saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
- saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
- MysqlTool.Insert("dwd_f_nzj_category_tags", saveCy)
- }
- info2 := MysqlTool.FindOne("dwd_f_nzj_ent", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
- if info2 != nil && len(*info2) > 0 {
- } else {
- if ow := util.ObjToString(tmp["owner"]); ow != "" {
- saveEnt := make(map[string]interface{})
- saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
- saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
- saveEnt["name"] = ow
- if eid := redis.GetStr("ent_id", ow); eid != "" {
- arr := strings.Split(eid, "_")
- saveEnt["name_id"] = arr[0]
- if len(arr) == 2 {
- saveEnt["area_code"] = arr[1]
- } else if len(arr) == 3 {
- saveEnt["city_code"] = arr[2]
- }
- info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
- if info != nil && len(*info) > 0 {
- saveEnt["address"] = (*info)[0]["address"]
- }
- }
- saveEnt["identity_type"] = 1
- saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
- MysqlTool.Insert("dwd_f_nzj_ent", saveEnt)
- }
- }
- }
- func taskE(tmp map[string]interface{}) {
- for _, v := range tmp["list"].([]interface{}) {
- v1 := v.(map[string]interface{})
- infoid := util.ObjToString(v1["infoid"])
- info := MysqlTool.FindOne("dwd_f_nzj_follw_record", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
- if info == nil || len(*info) == 0 {
- saveRc := make(map[string]interface{})
- saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
- saveRc["infoid"] = infoid
- saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
- saveRc["follow_num"] = v1["follow_num"]
- saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
- saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
- saveRc["title"] = util.ObjToString(v1["title"])
- if t := util.Int64All(v1["publishtime"]); t > 0 {
- saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
- MysqlTool.Insert("dwd_f_nzj_follw_record", saveRc)
- }
- info1 := MysqlTool.FindOne("dwd_f_nzj_contact", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
- if info1 == nil || len(*info1) == 0 {
- if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
- saveCt := make(map[string]interface{})
- saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
- saveCt["infoid"] = infoid
- saveCt["follow_num"] = tmp["follow_num"]
- if b := util.ObjToString(tmp["owner"]); b != "" {
- saveCt["name"] = util.ObjToString(tmp["owner"])
- if eid := redis.GetStr("ent_id", b); eid != "" {
- saveCt["name_id"] = strings.Split(eid, "_")[0]
- }
- }
- if p := util.ObjToString(v1["project_person"]); p != "" {
- saveCt["contact_name"] = p
- }
- if p := util.ObjToString(v1["project_phone"]); p != "" {
- saveCt["contact_tel"] = p
- }
- if p := util.ObjToString(v1["projectaddr"]); p != "" {
- saveCt["contact_addr"] = p
- }
- saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
- MysqlTool.Insert("dwd_f_nzj_contact", saveCt)
- }
- }
- }
- }
- func SaveFunc(table string, arr []string) {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveBasePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveBaseSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveBaseSp
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveBaseSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveBaseSp
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveRFunc(table string, arr []string) {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveRcPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveRcSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveRcSp
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveRcSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveRcSp
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveCFunc(table string, arr []string) {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveCtPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveCtSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveCtSp
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveCtSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveCtSp
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveCyFunc(table string, arr []string) {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveCyPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveCySp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveCySp
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveCySp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveCySp
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveEntFunc(table string, arr []string) {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveEntPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveEntSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEntSp
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveEntSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEntSp
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|