tidbTask.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "fmt"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.uber.org/zap"
  10. "math/rand"
  11. "proposed_project/config"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "unicode/utf8"
  17. )
  18. var (
  19. saveBasePool = make(chan map[string]interface{}, 5000)
  20. saveBaseSp = make(chan bool, 1)
  21. saveRcPool = make(chan map[string]interface{}, 5000)
  22. saveRcSp = make(chan bool, 1)
  23. saveCtPool = make(chan map[string]interface{}, 5000)
  24. saveCtSp = make(chan bool, 1)
  25. saveCyPool = make(chan map[string]interface{}, 5000)
  26. saveCySp = make(chan bool, 1)
  27. saveEntPool = make(chan map[string]interface{}, 5000)
  28. saveEntSp = make(chan bool, 1)
  29. BaseField = []string{"lasttime", "firsttime", "proposed_number", "proposed_id", "follow_num", "title", "projectname", "approvecode", "approvedept",
  30. "approvenumber", "project_stage_code", "total_investment", "funds", "owner", "name_id", "ownerclass_code", "projecttype_code", "projectaddr",
  31. "projectperiod", "project_startdate", "project_completedate", "industry_code", "approvestatus", "project_scale",
  32. "category_code", "nature_code", "construction_area", "floor_area", "area_code", "city_code", "createtime"}
  33. RecordField = []string{"proposed_id", "infoid", "follow_num", "project_stage_code", "title", "project_scale", "publishtime", "jybxhref", "createtime"}
  34. ContactField = []string{"proposed_id", "infoid", "follow_num", "name_id", "name", "contact_name", "contact_tel", "contact_addr", "createtime"}
  35. CategoryField = []string{"proposed_id", "labelcode", "labelvalues", "labelweight", "createtime"}
  36. EntField = []string{"proposed_id", "name_id", "name", "area_code", "city_code", "address", "createtime", "identity_type"}
  37. AreaCode = make(map[string]string, 5000)
  38. TagCode = make(map[string]interface{}, 100)
  39. )
  40. func InitArea() {
  41. info := MysqlTool.Find("d_area_code_back", nil, "", "", -1, -1)
  42. for _, m := range *info {
  43. var key string
  44. for i, v := range []string{"area", "city", "district"} {
  45. if i == 0 && util.ObjToString(m[v]) != "" {
  46. key = util.ObjToString(m[v])
  47. } else if util.ObjToString(m[v]) != "" {
  48. key += "," + util.ObjToString(m[v])
  49. }
  50. }
  51. AreaCode[key] = util.ObjToString(m["code"])
  52. }
  53. log.Info("InitField", zap.Int("AreaCode", len(AreaCode)))
  54. }
  55. func InitTagCode() {
  56. info, _ := MgoBid.Find("nzj_rule", nil, nil, bson.M{"label_name": 1, "label": 1, "code": 1}, false, -1, -1)
  57. for _, m := range *info {
  58. lname := util.ObjToString(m["label_name"])
  59. lb := util.ObjToString(m["label"])
  60. code := util.ObjToString(m["code"])
  61. if lname == "sub_category" || lname == "top_category" {
  62. lname = "category"
  63. }
  64. if TagCode[lname] != nil {
  65. m1 := TagCode[lname].(map[string]interface{})
  66. m1[code] = lb
  67. TagCode[lname] = m1
  68. } else {
  69. m1 := make(map[string]interface{})
  70. m1[code] = lb
  71. TagCode[lname] = m1
  72. }
  73. }
  74. TagCode["nature"].(map[string]interface{})["00"] = "其它"
  75. TagCode["project_stage"].(map[string]interface{})["00"] = "其它"
  76. log.Info("InitTagCode", zap.Any("TagCode", TagCode))
  77. }
  78. func taskTidb(q map[string]interface{}) {
  79. sess := MgoPro.GetMgoConn()
  80. defer MgoPro.DestoryMongoConn(sess)
  81. ch := make(chan bool, config.Conf.Serve.Thread)
  82. wg := &sync.WaitGroup{}
  83. var query *mongodb.MgoIter
  84. if q != nil && len(q) > 0 {
  85. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(q).Select(SelectF).Iter()
  86. } else {
  87. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(nil).Select(SelectF).Iter()
  88. }
  89. count := 0
  90. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  91. if count%20000 == 0 {
  92. log.Info(fmt.Sprintf("current --- %d", count))
  93. }
  94. if t := util.Int64All(tmp["pici"]); t > pici {
  95. pici = t
  96. }
  97. ch <- true
  98. wg.Add(1)
  99. go func(tmp map[string]interface{}) {
  100. defer func() {
  101. <-ch
  102. wg.Done()
  103. }()
  104. saveM := make(map[string]interface{})
  105. for _, f := range BaseField {
  106. if f == "lasttime" || f == "firsttime" {
  107. if t := util.Int64All(tmp[f]); t > 0 {
  108. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  109. }
  110. } else if f == "proposed_id" {
  111. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  112. } else if f == "area_code" {
  113. if tmp["area"] != nil {
  114. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  115. }
  116. } else if f == "city_code" {
  117. if tmp["area"] != nil && tmp["city"] != nil {
  118. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  119. saveM[f] = AreaCode[c]
  120. }
  121. } else if f == "owner" {
  122. if v := util.ObjToString(tmp[f]); v != "" {
  123. if utf8.RuneCountInString(v) < 100 {
  124. saveM[f] = v
  125. }
  126. }
  127. } else if f == "name_id" {
  128. if b := util.ObjToString(tmp["owner"]); b != "" {
  129. if eid := redis.GetStr("ent_id", b); eid != "" {
  130. saveM["name_id"] = strings.Split(eid, "_")[0]
  131. }
  132. }
  133. } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  134. if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  135. t := util.Int64All(tmp[f])
  136. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  137. }
  138. } else if f == "createtime" {
  139. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  140. } else if f == "total_investment" {
  141. text := util.ObjToString(tmp[f])
  142. capital := ObjToMoney(text)
  143. capital = capital / 10000
  144. if capital != 0 {
  145. capital, _ = util.FormatFloat(capital, 6)
  146. saveM[f] = capital
  147. }
  148. } else if f == "approvestatus" {
  149. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  150. saveM[f] = tmp[f]
  151. }
  152. } else if f == "proposed_number" {
  153. if tmp[f] == nil {
  154. now := time.Now().Unix()
  155. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  156. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  157. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  158. saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  159. } else {
  160. saveM[f] = tmp[f]
  161. }
  162. } else if f == "approvecode" {
  163. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  164. saveM[f] = tmp[f]
  165. }
  166. } else if f == "floor_area" {
  167. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  168. saveM[f] = tmp[f]
  169. }
  170. } else {
  171. if tmp[f] != nil {
  172. saveM[f] = tmp[f]
  173. }
  174. }
  175. }
  176. saveBasePool <- saveM
  177. saveCy := make(map[string]interface{})
  178. saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  179. saveCy["labelcode"] = "category_code"
  180. saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
  181. saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
  182. saveCyPool <- saveCy
  183. if ow := util.ObjToString(tmp["owner"]); ow != "" {
  184. saveEnt := make(map[string]interface{})
  185. saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  186. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  187. saveEnt["name"] = ow
  188. if eid := redis.GetStr("ent_id", ow); eid != "" {
  189. arr := strings.Split(eid, "_")
  190. saveEnt["name_id"] = arr[0]
  191. if len(arr) == 2 {
  192. saveEnt["area_code"] = arr[1]
  193. } else if len(arr) == 3 {
  194. saveEnt["city_code"] = arr[2]
  195. }
  196. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  197. if info != nil && len(*info) > 0 {
  198. saveEnt["address"] = (*info)[0]["address"]
  199. }
  200. }
  201. saveEnt["identity_type"] = 1
  202. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  203. saveEntPool <- saveEnt
  204. }
  205. for _, v := range tmp["list"].([]interface{}) {
  206. saveRc := make(map[string]interface{})
  207. v1 := v.(map[string]interface{})
  208. saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  209. infoid := util.ObjToString(v1["infoid"])
  210. saveRc["infoid"] = infoid
  211. saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  212. saveRc["follow_num"] = v1["follow_num"]
  213. saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
  214. saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
  215. saveRc["title"] = util.ObjToString(v1["title"])
  216. if t := util.Int64All(v1["publishtime"]); t > 0 {
  217. saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  218. }
  219. saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
  220. saveRcPool <- saveRc
  221. if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
  222. saveCt := make(map[string]interface{})
  223. saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  224. saveCt["infoid"] = infoid
  225. saveCt["follow_num"] = tmp["follow_num"]
  226. if b := util.ObjToString(tmp["owner"]); b != "" {
  227. saveCt["name"] = util.ObjToString(tmp["owner"])
  228. if eid := redis.GetStr("ent_id", b); eid != "" {
  229. saveCt["name_id"] = strings.Split(eid, "_")[0]
  230. }
  231. }
  232. if p := util.ObjToString(v1["project_person"]); p != "" {
  233. saveCt["contact_name"] = p
  234. }
  235. if p := util.ObjToString(v1["project_phone"]); p != "" {
  236. saveCt["contact_tel"] = p
  237. }
  238. if p := util.ObjToString(v1["projectaddr"]); p != "" {
  239. saveCt["contact_addr"] = p
  240. }
  241. saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  242. saveCtPool <- saveCt
  243. }
  244. }
  245. }(tmp)
  246. tmp = make(map[string]interface{})
  247. }
  248. wg.Wait()
  249. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  250. }
  251. func taskTidb_add(q map[string]interface{}) {
  252. sess := MgoPro.GetMgoConn()
  253. defer MgoPro.DestoryMongoConn(sess)
  254. ch := make(chan bool, config.Conf.Serve.Thread)
  255. wg := &sync.WaitGroup{}
  256. log.Info("taskTidb_add", zap.Any("q: ", q))
  257. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(q).Select(nil).Iter()
  258. count := 0
  259. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  260. if count%200 == 0 {
  261. log.Info(fmt.Sprintf("current --- %d", count))
  262. }
  263. if t := util.Int64All(tmp["pici"]); t > pici {
  264. pici = t
  265. }
  266. ch <- true
  267. wg.Add(1)
  268. go func(tmp map[string]interface{}) {
  269. go func() {
  270. <-ch
  271. wg.Done()
  272. }()
  273. taskB(tmp)
  274. taskE(tmp)
  275. }(tmp)
  276. tmp = make(map[string]interface{})
  277. }
  278. wg.Wait()
  279. TaskSingle = true
  280. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  281. }
  282. func taskB(tmp map[string]interface{}) {
  283. saveM := make(map[string]interface{})
  284. for _, f := range BaseField {
  285. if f == "lasttime" || f == "firsttime" {
  286. if t := util.Int64All(tmp[f]); t > 0 {
  287. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  288. }
  289. } else if f == "proposed_id" {
  290. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  291. } else if f == "area_code" {
  292. if tmp["area"] != nil {
  293. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  294. }
  295. } else if f == "city_code" {
  296. if tmp["area"] != nil && tmp["city"] != nil {
  297. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  298. saveM[f] = AreaCode[c]
  299. }
  300. } else if f == "owner" {
  301. if v := util.ObjToString(tmp[f]); v != "" {
  302. if utf8.RuneCountInString(v) < 100 {
  303. saveM[f] = v
  304. }
  305. }
  306. } else if f == "name_id" {
  307. if b := util.ObjToString(tmp["owner"]); b != "" {
  308. if eid := redis.GetStr("ent_id", b); eid != "" {
  309. saveM["name_id"] = strings.Split(eid, "_")[0]
  310. }
  311. }
  312. } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  313. if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  314. t := util.Int64All(tmp[f])
  315. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  316. }
  317. } else if f == "createtime" {
  318. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  319. } else if f == "total_investment" {
  320. text := util.ObjToString(tmp[f])
  321. capital := ObjToMoney(text)
  322. capital = capital / 10000
  323. if capital != 0 {
  324. capital, _ = util.FormatFloat(capital, 6)
  325. saveM[f] = capital
  326. }
  327. } else if f == "approvestatus" {
  328. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  329. saveM[f] = tmp[f]
  330. }
  331. } else if f == "proposed_number" {
  332. if tmp[f] == nil {
  333. now := time.Now().Unix()
  334. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  335. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  336. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  337. saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  338. } else {
  339. saveM[f] = tmp[f]
  340. }
  341. } else if f == "approvecode" {
  342. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  343. saveM[f] = tmp[f]
  344. }
  345. } else if f == "floor_area" {
  346. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  347. saveM[f] = tmp[f]
  348. }
  349. } else {
  350. if tmp[f] != nil {
  351. saveM[f] = tmp[f]
  352. }
  353. }
  354. }
  355. info := MysqlTool.FindOne("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  356. if info != nil && len(*info) > 0 {
  357. saveM["updatetime"] = time.Now().Format(util.Date_Full_Layout)
  358. MysqlTool.Update("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, saveM)
  359. } else {
  360. MysqlTool.Insert("dwd_f_nzj_baseinfo", saveM)
  361. }
  362. info1 := MysqlTool.FindOne("dwd_f_nzj_category_tags", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  363. if info1 != nil && len(*info1) > 0 {
  364. } else {
  365. saveCy := make(map[string]interface{})
  366. saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  367. saveCy["labelcode"] = "category_code"
  368. saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
  369. saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
  370. MysqlTool.Insert("dwd_f_nzj_category_tags", saveCy)
  371. }
  372. info2 := MysqlTool.FindOne("dwd_f_nzj_ent", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  373. if info2 != nil && len(*info2) > 0 {
  374. } else {
  375. if ow := util.ObjToString(tmp["owner"]); ow != "" {
  376. saveEnt := make(map[string]interface{})
  377. saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  378. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  379. saveEnt["name"] = ow
  380. if eid := redis.GetStr("ent_id", ow); eid != "" {
  381. arr := strings.Split(eid, "_")
  382. saveEnt["name_id"] = arr[0]
  383. if len(arr) == 2 {
  384. saveEnt["area_code"] = arr[1]
  385. } else if len(arr) == 3 {
  386. saveEnt["city_code"] = arr[2]
  387. }
  388. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  389. if info != nil && len(*info) > 0 {
  390. saveEnt["address"] = (*info)[0]["address"]
  391. }
  392. }
  393. saveEnt["identity_type"] = 1
  394. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  395. MysqlTool.Insert("dwd_f_nzj_ent", saveEnt)
  396. }
  397. }
  398. }
  399. func taskE(tmp map[string]interface{}) {
  400. for _, v := range tmp["list"].([]interface{}) {
  401. v1 := v.(map[string]interface{})
  402. infoid := util.ObjToString(v1["infoid"])
  403. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
  404. if info == nil || len(*info) == 0 {
  405. saveRc := make(map[string]interface{})
  406. saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  407. saveRc["infoid"] = infoid
  408. saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  409. saveRc["follow_num"] = v1["follow_num"]
  410. saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
  411. saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
  412. saveRc["title"] = util.ObjToString(v1["title"])
  413. if t := util.Int64All(v1["publishtime"]); t > 0 {
  414. saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  415. }
  416. saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
  417. MysqlTool.Insert("dwd_f_nzj_follw_record", saveRc)
  418. }
  419. info1 := MysqlTool.FindOne("dwd_f_nzj_contact", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
  420. if info1 == nil || len(*info1) == 0 {
  421. if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
  422. saveCt := make(map[string]interface{})
  423. saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  424. saveCt["infoid"] = infoid
  425. saveCt["follow_num"] = tmp["follow_num"]
  426. if b := util.ObjToString(tmp["owner"]); b != "" {
  427. saveCt["name"] = util.ObjToString(tmp["owner"])
  428. if eid := redis.GetStr("ent_id", b); eid != "" {
  429. saveCt["name_id"] = strings.Split(eid, "_")[0]
  430. }
  431. }
  432. if p := util.ObjToString(v1["project_person"]); p != "" {
  433. saveCt["contact_name"] = p
  434. }
  435. if p := util.ObjToString(v1["project_phone"]); p != "" {
  436. saveCt["contact_tel"] = p
  437. }
  438. if p := util.ObjToString(v1["projectaddr"]); p != "" {
  439. saveCt["contact_addr"] = p
  440. }
  441. saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  442. MysqlTool.Insert("dwd_f_nzj_contact", saveCt)
  443. }
  444. }
  445. }
  446. }
  447. func SaveFunc(table string, arr []string) {
  448. arru := make([]map[string]interface{}, saveSize)
  449. indexu := 0
  450. for {
  451. select {
  452. case v := <-saveBasePool:
  453. arru[indexu] = v
  454. indexu++
  455. if indexu == saveSize {
  456. saveBaseSp <- true
  457. go func(arru []map[string]interface{}) {
  458. defer func() {
  459. <-saveBaseSp
  460. }()
  461. MysqlTool.InsertBulk(table, arr, arru...)
  462. }(arru)
  463. arru = make([]map[string]interface{}, saveSize)
  464. indexu = 0
  465. }
  466. case <-time.After(1000 * time.Millisecond):
  467. if indexu > 0 {
  468. saveBaseSp <- true
  469. go func(arru []map[string]interface{}) {
  470. defer func() {
  471. <-saveBaseSp
  472. }()
  473. MysqlTool.InsertBulk(table, arr, arru...)
  474. }(arru[:indexu])
  475. arru = make([]map[string]interface{}, saveSize)
  476. indexu = 0
  477. }
  478. }
  479. }
  480. }
  481. func SaveRFunc(table string, arr []string) {
  482. arru := make([]map[string]interface{}, saveSize)
  483. indexu := 0
  484. for {
  485. select {
  486. case v := <-saveRcPool:
  487. arru[indexu] = v
  488. indexu++
  489. if indexu == saveSize {
  490. saveRcSp <- true
  491. go func(arru []map[string]interface{}) {
  492. defer func() {
  493. <-saveRcSp
  494. }()
  495. MysqlTool.InsertBulk(table, arr, arru...)
  496. }(arru)
  497. arru = make([]map[string]interface{}, saveSize)
  498. indexu = 0
  499. }
  500. case <-time.After(1000 * time.Millisecond):
  501. if indexu > 0 {
  502. saveRcSp <- true
  503. go func(arru []map[string]interface{}) {
  504. defer func() {
  505. <-saveRcSp
  506. }()
  507. MysqlTool.InsertBulk(table, arr, arru...)
  508. }(arru[:indexu])
  509. arru = make([]map[string]interface{}, saveSize)
  510. indexu = 0
  511. }
  512. }
  513. }
  514. }
  515. func SaveCFunc(table string, arr []string) {
  516. arru := make([]map[string]interface{}, saveSize)
  517. indexu := 0
  518. for {
  519. select {
  520. case v := <-saveCtPool:
  521. arru[indexu] = v
  522. indexu++
  523. if indexu == saveSize {
  524. saveCtSp <- true
  525. go func(arru []map[string]interface{}) {
  526. defer func() {
  527. <-saveCtSp
  528. }()
  529. MysqlTool.InsertBulk(table, arr, arru...)
  530. }(arru)
  531. arru = make([]map[string]interface{}, saveSize)
  532. indexu = 0
  533. }
  534. case <-time.After(1000 * time.Millisecond):
  535. if indexu > 0 {
  536. saveCtSp <- true
  537. go func(arru []map[string]interface{}) {
  538. defer func() {
  539. <-saveCtSp
  540. }()
  541. MysqlTool.InsertBulk(table, arr, arru...)
  542. }(arru[:indexu])
  543. arru = make([]map[string]interface{}, saveSize)
  544. indexu = 0
  545. }
  546. }
  547. }
  548. }
  549. func SaveCyFunc(table string, arr []string) {
  550. arru := make([]map[string]interface{}, saveSize)
  551. indexu := 0
  552. for {
  553. select {
  554. case v := <-saveCyPool:
  555. arru[indexu] = v
  556. indexu++
  557. if indexu == saveSize {
  558. saveCySp <- true
  559. go func(arru []map[string]interface{}) {
  560. defer func() {
  561. <-saveCySp
  562. }()
  563. MysqlTool.InsertBulk(table, arr, arru...)
  564. }(arru)
  565. arru = make([]map[string]interface{}, saveSize)
  566. indexu = 0
  567. }
  568. case <-time.After(1000 * time.Millisecond):
  569. if indexu > 0 {
  570. saveCySp <- true
  571. go func(arru []map[string]interface{}) {
  572. defer func() {
  573. <-saveCySp
  574. }()
  575. MysqlTool.InsertBulk(table, arr, arru...)
  576. }(arru[:indexu])
  577. arru = make([]map[string]interface{}, saveSize)
  578. indexu = 0
  579. }
  580. }
  581. }
  582. }
  583. func SaveEntFunc(table string, arr []string) {
  584. arru := make([]map[string]interface{}, saveSize)
  585. indexu := 0
  586. for {
  587. select {
  588. case v := <-saveEntPool:
  589. arru[indexu] = v
  590. indexu++
  591. if indexu == saveSize {
  592. saveEntSp <- true
  593. go func(arru []map[string]interface{}) {
  594. defer func() {
  595. <-saveEntSp
  596. }()
  597. MysqlTool.InsertBulk(table, arr, arru...)
  598. }(arru)
  599. arru = make([]map[string]interface{}, saveSize)
  600. indexu = 0
  601. }
  602. case <-time.After(1000 * time.Millisecond):
  603. if indexu > 0 {
  604. saveEntSp <- true
  605. go func(arru []map[string]interface{}) {
  606. defer func() {
  607. <-saveEntSp
  608. }()
  609. MysqlTool.InsertBulk(table, arr, arru...)
  610. }(arru[:indexu])
  611. arru = make([]map[string]interface{}, saveSize)
  612. indexu = 0
  613. }
  614. }
  615. }
  616. }