tidbTask.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "fmt"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.uber.org/zap"
  10. "math/rand"
  11. "proposed_project/config"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "unicode/utf8"
  17. )
  18. var (
  19. saveBasePool = make(chan map[string]interface{}, 5000)
  20. saveBaseSp = make(chan bool, 1)
  21. saveRcPool = make(chan map[string]interface{}, 5000)
  22. saveRcSp = make(chan bool, 1)
  23. saveCtPool = make(chan map[string]interface{}, 5000)
  24. saveCtSp = make(chan bool, 1)
  25. saveCyPool = make(chan map[string]interface{}, 5000)
  26. saveCySp = make(chan bool, 1)
  27. saveEntPool = make(chan map[string]interface{}, 5000)
  28. saveEntSp = make(chan bool, 1)
  29. BaseField = []string{"lasttime", "firsttime", "proposed_number", "proposed_id", "follow_num", "title", "projectname", "approvecode", "approvedept",
  30. "approvenumber", "project_stage_code", "total_investment", "funds", "owner", "name_id", "ownerclass_code", "projecttype_code", "projectaddr",
  31. "projectperiod", "project_startdate", "project_completedate", "industry_code", "approvestatus", "project_scale",
  32. "category_code", "nature_code", "construction_area", "floor_area", "area_code", "city_code", "createtime"}
  33. RecordField = []string{"proposed_id", "infoid", "follow_num", "project_stage_code", "title", "project_scale", "publishtime", "jybxhref", "createtime"}
  34. ContactField = []string{"proposed_id", "infoid", "follow_num", "name_id", "name", "contact_name", "contact_tel", "contact_addr", "createtime"}
  35. CategoryField = []string{"proposed_id", "labelcode", "labelvalues", "labelweight", "createtime"}
  36. EntField = []string{"proposed_id", "name_id", "name", "area_code", "city_code", "address", "createtime", "identity_type"}
  37. AreaCode = make(map[string]string, 5000)
  38. TagCode = make(map[string]interface{}, 100)
  39. )
  40. func InitArea() {
  41. info := MysqlTool.Find("d_area_code_back", nil, "", "", -1, -1)
  42. for _, m := range *info {
  43. var key string
  44. for i, v := range []string{"area", "city", "district"} {
  45. if i == 0 && util.ObjToString(m[v]) != "" {
  46. key = util.ObjToString(m[v])
  47. } else if util.ObjToString(m[v]) != "" {
  48. key += "," + util.ObjToString(m[v])
  49. }
  50. }
  51. AreaCode[key] = util.ObjToString(m["code"])
  52. }
  53. log.Info("InitField", zap.Int("AreaCode", len(AreaCode)))
  54. }
  55. func InitTagCode() {
  56. info, _ := MgoBid.Find("nzj_rule", nil, nil, bson.M{"label_name": 1, "label": 1, "code": 1}, false, -1, -1)
  57. for _, m := range *info {
  58. lname := util.ObjToString(m["label_name"])
  59. lb := util.ObjToString(m["label"])
  60. code := util.ObjToString(m["code"])
  61. if lname == "sub_category" || lname == "top_category" {
  62. lname = "category"
  63. }
  64. if TagCode[lname] != nil {
  65. m1 := TagCode[lname].(map[string]interface{})
  66. m1[code] = lb
  67. TagCode[lname] = m1
  68. } else {
  69. m1 := make(map[string]interface{})
  70. m1[code] = lb
  71. TagCode[lname] = m1
  72. }
  73. }
  74. TagCode["nature"].(map[string]interface{})["00"] = "其它"
  75. TagCode["project_stage"].(map[string]interface{})["00"] = "其它"
  76. TagCode["category"].(map[string]interface{})["04"] = "其它工程"
  77. log.Info("InitTagCode", zap.Any("TagCode", TagCode))
  78. }
  79. func taskTidb(q map[string]interface{}) {
  80. sess := MgoPro.GetMgoConn()
  81. defer MgoPro.DestoryMongoConn(sess)
  82. ch := make(chan bool, config.Conf.Serve.Thread)
  83. wg := &sync.WaitGroup{}
  84. var query *mongodb.MgoIter
  85. if q != nil && len(q) > 0 {
  86. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(q).Select(SelectF).Iter()
  87. } else {
  88. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(nil).Select(SelectF).Iter()
  89. }
  90. count := 0
  91. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  92. if count%20000 == 0 {
  93. log.Info(fmt.Sprintf("current --- %d", count))
  94. }
  95. if t := util.Int64All(tmp["pici"]); t > pici {
  96. pici = t
  97. }
  98. ch <- true
  99. wg.Add(1)
  100. go func(tmp map[string]interface{}) {
  101. defer func() {
  102. <-ch
  103. wg.Done()
  104. }()
  105. saveM := make(map[string]interface{})
  106. for _, f := range BaseField {
  107. if f == "lasttime" || f == "firsttime" {
  108. if t := util.Int64All(tmp[f]); t > 0 {
  109. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  110. }
  111. } else if f == "proposed_id" {
  112. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  113. } else if f == "area_code" {
  114. if tmp["area"] != nil {
  115. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  116. }
  117. } else if f == "city_code" {
  118. if tmp["area"] != nil && tmp["city"] != nil {
  119. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  120. saveM[f] = AreaCode[c]
  121. }
  122. } else if f == "owner" {
  123. if v := util.ObjToString(tmp[f]); v != "" {
  124. if utf8.RuneCountInString(v) < 100 {
  125. saveM[f] = v
  126. }
  127. }
  128. } else if f == "name_id" {
  129. if b := util.ObjToString(tmp["owner"]); b != "" {
  130. if eid := redis.GetStr("ent_id", b); eid != "" {
  131. saveM["name_id"] = strings.Split(eid, "_")[0]
  132. }
  133. }
  134. } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  135. if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  136. t := util.Int64All(tmp[f])
  137. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  138. }
  139. } else if f == "createtime" {
  140. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  141. } else if f == "total_investment" {
  142. text := util.ObjToString(tmp[f])
  143. capital := ObjToMoney(text)
  144. capital = capital / 10000
  145. if capital != 0 {
  146. capital, _ = util.FormatFloat(capital, 6)
  147. saveM[f] = capital
  148. }
  149. } else if f == "approvestatus" {
  150. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  151. saveM[f] = tmp[f]
  152. }
  153. } else if f == "proposed_number" {
  154. if tmp[f] == nil {
  155. now := time.Now().Unix()
  156. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  157. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  158. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  159. saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  160. } else {
  161. saveM[f] = tmp[f]
  162. }
  163. } else if f == "approvecode" {
  164. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  165. saveM[f] = tmp[f]
  166. }
  167. } else if f == "floor_area" {
  168. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  169. saveM[f] = tmp[f]
  170. }
  171. } else {
  172. if tmp[f] != nil {
  173. saveM[f] = tmp[f]
  174. }
  175. }
  176. }
  177. saveBasePool <- saveM
  178. saveCy := make(map[string]interface{})
  179. saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  180. saveCy["labelcode"] = "category_code"
  181. saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
  182. saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
  183. saveCyPool <- saveCy
  184. if ow := util.ObjToString(tmp["owner"]); ow != "" {
  185. saveEnt := make(map[string]interface{})
  186. saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  187. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  188. saveEnt["name"] = ow
  189. if eid := redis.GetStr("ent_id", ow); eid != "" {
  190. arr := strings.Split(eid, "_")
  191. saveEnt["name_id"] = arr[0]
  192. if len(arr) == 2 {
  193. saveEnt["area_code"] = arr[1]
  194. } else if len(arr) == 3 {
  195. saveEnt["city_code"] = arr[2]
  196. }
  197. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  198. if info != nil && len(*info) > 0 {
  199. saveEnt["address"] = (*info)[0]["address"]
  200. }
  201. }
  202. saveEnt["identity_type"] = 1
  203. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  204. saveEntPool <- saveEnt
  205. }
  206. for _, v := range tmp["list"].([]interface{}) {
  207. saveRc := make(map[string]interface{})
  208. v1 := v.(map[string]interface{})
  209. saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  210. infoid := util.ObjToString(v1["infoid"])
  211. saveRc["infoid"] = infoid
  212. saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  213. saveRc["follow_num"] = v1["follow_num"]
  214. saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
  215. saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
  216. saveRc["title"] = util.ObjToString(v1["title"])
  217. if t := util.Int64All(v1["publishtime"]); t > 0 {
  218. saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  219. }
  220. saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
  221. saveRcPool <- saveRc
  222. if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
  223. saveCt := make(map[string]interface{})
  224. saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  225. saveCt["infoid"] = infoid
  226. saveCt["follow_num"] = tmp["follow_num"]
  227. if b := util.ObjToString(tmp["owner"]); b != "" {
  228. saveCt["name"] = util.ObjToString(tmp["owner"])
  229. if eid := redis.GetStr("ent_id", b); eid != "" {
  230. saveCt["name_id"] = strings.Split(eid, "_")[0]
  231. }
  232. }
  233. if p := util.ObjToString(v1["project_person"]); p != "" {
  234. saveCt["contact_name"] = p
  235. }
  236. if p := util.ObjToString(v1["project_phone"]); p != "" {
  237. saveCt["contact_tel"] = p
  238. }
  239. if p := util.ObjToString(v1["projectaddr"]); p != "" {
  240. saveCt["contact_addr"] = p
  241. }
  242. saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  243. saveCtPool <- saveCt
  244. }
  245. }
  246. }(tmp)
  247. tmp = make(map[string]interface{})
  248. }
  249. wg.Wait()
  250. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  251. }
  252. func taskTidb_add(q map[string]interface{}) {
  253. sess := MgoPro.GetMgoConn()
  254. defer MgoPro.DestoryMongoConn(sess)
  255. ch := make(chan bool, config.Conf.Serve.Thread)
  256. wg := &sync.WaitGroup{}
  257. log.Info("taskTidb_add", zap.Any("q: ", q))
  258. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(q).Select(nil).Iter()
  259. count := 0
  260. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  261. if count%200 == 0 {
  262. log.Info(fmt.Sprintf("current --- %d", count))
  263. }
  264. if t := util.Int64All(tmp["pici"]); t > pici {
  265. pici = t
  266. }
  267. ch <- true
  268. wg.Add(1)
  269. go func(tmp map[string]interface{}) {
  270. go func() {
  271. <-ch
  272. wg.Done()
  273. }()
  274. taskB(tmp)
  275. taskE(tmp)
  276. }(tmp)
  277. tmp = make(map[string]interface{})
  278. }
  279. wg.Wait()
  280. TaskSingle = true
  281. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  282. }
  283. func taskB(tmp map[string]interface{}) {
  284. saveM := make(map[string]interface{})
  285. for _, f := range BaseField {
  286. if f == "lasttime" || f == "firsttime" {
  287. if t := util.Int64All(tmp[f]); t > 0 {
  288. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  289. }
  290. } else if f == "proposed_id" {
  291. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  292. } else if f == "area_code" {
  293. if tmp["area"] != nil {
  294. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  295. }
  296. } else if f == "city_code" {
  297. if tmp["area"] != nil && tmp["city"] != nil {
  298. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  299. saveM[f] = AreaCode[c]
  300. }
  301. } else if f == "owner" {
  302. if v := util.ObjToString(tmp[f]); v != "" {
  303. if utf8.RuneCountInString(v) < 100 {
  304. saveM[f] = v
  305. }
  306. }
  307. } else if f == "name_id" {
  308. if b := util.ObjToString(tmp["owner"]); b != "" {
  309. if eid := redis.GetStr("ent_id", b); eid != "" {
  310. saveM["name_id"] = strings.Split(eid, "_")[0]
  311. }
  312. }
  313. } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  314. if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  315. t := util.Int64All(tmp[f])
  316. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  317. }
  318. } else if f == "createtime" {
  319. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  320. } else if f == "total_investment" {
  321. text := util.ObjToString(tmp[f])
  322. capital := ObjToMoney(text)
  323. capital = capital / 10000
  324. if capital != 0 {
  325. capital, _ = util.FormatFloat(capital, 6)
  326. saveM[f] = capital
  327. }
  328. } else if f == "approvestatus" {
  329. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  330. saveM[f] = tmp[f]
  331. }
  332. } else if f == "proposed_number" {
  333. if tmp[f] == nil {
  334. now := time.Now().Unix()
  335. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  336. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  337. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  338. saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  339. } else {
  340. saveM[f] = tmp[f]
  341. }
  342. } else if f == "approvecode" {
  343. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  344. saveM[f] = tmp[f]
  345. }
  346. } else if f == "floor_area" {
  347. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  348. saveM[f] = tmp[f]
  349. }
  350. } else {
  351. if tmp[f] != nil {
  352. saveM[f] = tmp[f]
  353. }
  354. }
  355. }
  356. info := MysqlTool.FindOne("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  357. if info != nil && len(*info) > 0 {
  358. saveM["updatetime"] = time.Now().Format(util.Date_Full_Layout)
  359. MysqlTool.Update("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, saveM)
  360. } else {
  361. MysqlTool.Insert("dwd_f_nzj_baseinfo", saveM)
  362. }
  363. info1 := MysqlTool.FindOne("dwd_f_nzj_category_tags", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  364. if info1 != nil && len(*info1) > 0 {
  365. } else {
  366. saveCy := make(map[string]interface{})
  367. saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  368. saveCy["labelcode"] = "category_code"
  369. saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
  370. saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
  371. MysqlTool.Insert("dwd_f_nzj_category_tags", saveCy)
  372. }
  373. info2 := MysqlTool.FindOne("dwd_f_nzj_ent", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  374. if info2 != nil && len(*info2) > 0 {
  375. } else {
  376. if ow := util.ObjToString(tmp["owner"]); ow != "" {
  377. saveEnt := make(map[string]interface{})
  378. saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  379. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  380. saveEnt["name"] = ow
  381. if eid := redis.GetStr("ent_id", ow); eid != "" {
  382. arr := strings.Split(eid, "_")
  383. saveEnt["name_id"] = arr[0]
  384. if len(arr) == 2 {
  385. saveEnt["area_code"] = arr[1]
  386. } else if len(arr) == 3 {
  387. saveEnt["city_code"] = arr[2]
  388. }
  389. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  390. if info != nil && len(*info) > 0 {
  391. saveEnt["address"] = (*info)[0]["address"]
  392. }
  393. }
  394. saveEnt["identity_type"] = 1
  395. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  396. MysqlTool.Insert("dwd_f_nzj_ent", saveEnt)
  397. }
  398. }
  399. }
  400. func taskE(tmp map[string]interface{}) {
  401. for _, v := range tmp["list"].([]interface{}) {
  402. v1 := v.(map[string]interface{})
  403. infoid := util.ObjToString(v1["infoid"])
  404. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
  405. if info == nil || len(*info) == 0 {
  406. saveRc := make(map[string]interface{})
  407. saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  408. saveRc["infoid"] = infoid
  409. saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  410. saveRc["follow_num"] = v1["follow_num"]
  411. saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
  412. saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
  413. saveRc["title"] = util.ObjToString(v1["title"])
  414. if t := util.Int64All(v1["publishtime"]); t > 0 {
  415. saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  416. }
  417. saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
  418. MysqlTool.Insert("dwd_f_nzj_follw_record", saveRc)
  419. }
  420. info1 := MysqlTool.FindOne("dwd_f_nzj_contact", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
  421. if info1 == nil || len(*info1) == 0 {
  422. if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
  423. saveCt := make(map[string]interface{})
  424. saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  425. saveCt["infoid"] = infoid
  426. saveCt["follow_num"] = tmp["follow_num"]
  427. if b := util.ObjToString(tmp["owner"]); b != "" {
  428. saveCt["name"] = util.ObjToString(tmp["owner"])
  429. if eid := redis.GetStr("ent_id", b); eid != "" {
  430. saveCt["name_id"] = strings.Split(eid, "_")[0]
  431. }
  432. }
  433. if p := util.ObjToString(v1["project_person"]); p != "" {
  434. saveCt["contact_name"] = p
  435. }
  436. if p := util.ObjToString(v1["project_phone"]); p != "" {
  437. saveCt["contact_tel"] = p
  438. }
  439. if p := util.ObjToString(v1["projectaddr"]); p != "" {
  440. saveCt["contact_addr"] = p
  441. }
  442. saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  443. MysqlTool.Insert("dwd_f_nzj_contact", saveCt)
  444. }
  445. }
  446. }
  447. }
  448. func SaveFunc(table string, arr []string) {
  449. arru := make([]map[string]interface{}, saveSize)
  450. indexu := 0
  451. for {
  452. select {
  453. case v := <-saveBasePool:
  454. arru[indexu] = v
  455. indexu++
  456. if indexu == saveSize {
  457. saveBaseSp <- true
  458. go func(arru []map[string]interface{}) {
  459. defer func() {
  460. <-saveBaseSp
  461. }()
  462. MysqlTool.InsertBulk(table, arr, arru...)
  463. }(arru)
  464. arru = make([]map[string]interface{}, saveSize)
  465. indexu = 0
  466. }
  467. case <-time.After(1000 * time.Millisecond):
  468. if indexu > 0 {
  469. saveBaseSp <- true
  470. go func(arru []map[string]interface{}) {
  471. defer func() {
  472. <-saveBaseSp
  473. }()
  474. MysqlTool.InsertBulk(table, arr, arru...)
  475. }(arru[:indexu])
  476. arru = make([]map[string]interface{}, saveSize)
  477. indexu = 0
  478. }
  479. }
  480. }
  481. }
  482. func SaveRFunc(table string, arr []string) {
  483. arru := make([]map[string]interface{}, saveSize)
  484. indexu := 0
  485. for {
  486. select {
  487. case v := <-saveRcPool:
  488. arru[indexu] = v
  489. indexu++
  490. if indexu == saveSize {
  491. saveRcSp <- true
  492. go func(arru []map[string]interface{}) {
  493. defer func() {
  494. <-saveRcSp
  495. }()
  496. MysqlTool.InsertBulk(table, arr, arru...)
  497. }(arru)
  498. arru = make([]map[string]interface{}, saveSize)
  499. indexu = 0
  500. }
  501. case <-time.After(1000 * time.Millisecond):
  502. if indexu > 0 {
  503. saveRcSp <- true
  504. go func(arru []map[string]interface{}) {
  505. defer func() {
  506. <-saveRcSp
  507. }()
  508. MysqlTool.InsertBulk(table, arr, arru...)
  509. }(arru[:indexu])
  510. arru = make([]map[string]interface{}, saveSize)
  511. indexu = 0
  512. }
  513. }
  514. }
  515. }
  516. func SaveCFunc(table string, arr []string) {
  517. arru := make([]map[string]interface{}, saveSize)
  518. indexu := 0
  519. for {
  520. select {
  521. case v := <-saveCtPool:
  522. arru[indexu] = v
  523. indexu++
  524. if indexu == saveSize {
  525. saveCtSp <- true
  526. go func(arru []map[string]interface{}) {
  527. defer func() {
  528. <-saveCtSp
  529. }()
  530. MysqlTool.InsertBulk(table, arr, arru...)
  531. }(arru)
  532. arru = make([]map[string]interface{}, saveSize)
  533. indexu = 0
  534. }
  535. case <-time.After(1000 * time.Millisecond):
  536. if indexu > 0 {
  537. saveCtSp <- true
  538. go func(arru []map[string]interface{}) {
  539. defer func() {
  540. <-saveCtSp
  541. }()
  542. MysqlTool.InsertBulk(table, arr, arru...)
  543. }(arru[:indexu])
  544. arru = make([]map[string]interface{}, saveSize)
  545. indexu = 0
  546. }
  547. }
  548. }
  549. }
  550. func SaveCyFunc(table string, arr []string) {
  551. arru := make([]map[string]interface{}, saveSize)
  552. indexu := 0
  553. for {
  554. select {
  555. case v := <-saveCyPool:
  556. arru[indexu] = v
  557. indexu++
  558. if indexu == saveSize {
  559. saveCySp <- true
  560. go func(arru []map[string]interface{}) {
  561. defer func() {
  562. <-saveCySp
  563. }()
  564. MysqlTool.InsertBulk(table, arr, arru...)
  565. }(arru)
  566. arru = make([]map[string]interface{}, saveSize)
  567. indexu = 0
  568. }
  569. case <-time.After(1000 * time.Millisecond):
  570. if indexu > 0 {
  571. saveCySp <- true
  572. go func(arru []map[string]interface{}) {
  573. defer func() {
  574. <-saveCySp
  575. }()
  576. MysqlTool.InsertBulk(table, arr, arru...)
  577. }(arru[:indexu])
  578. arru = make([]map[string]interface{}, saveSize)
  579. indexu = 0
  580. }
  581. }
  582. }
  583. }
  584. func SaveEntFunc(table string, arr []string) {
  585. arru := make([]map[string]interface{}, saveSize)
  586. indexu := 0
  587. for {
  588. select {
  589. case v := <-saveEntPool:
  590. arru[indexu] = v
  591. indexu++
  592. if indexu == saveSize {
  593. saveEntSp <- true
  594. go func(arru []map[string]interface{}) {
  595. defer func() {
  596. <-saveEntSp
  597. }()
  598. MysqlTool.InsertBulk(table, arr, arru...)
  599. }(arru)
  600. arru = make([]map[string]interface{}, saveSize)
  601. indexu = 0
  602. }
  603. case <-time.After(1000 * time.Millisecond):
  604. if indexu > 0 {
  605. saveEntSp <- true
  606. go func(arru []map[string]interface{}) {
  607. defer func() {
  608. <-saveEntSp
  609. }()
  610. MysqlTool.InsertBulk(table, arr, arru...)
  611. }(arru[:indexu])
  612. arru = make([]map[string]interface{}, saveSize)
  613. indexu = 0
  614. }
  615. }
  616. }
  617. }