tidbTask.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. package main
  2. import (
  3. "fmt"
  4. "go.mongodb.org/mongo-driver/bson"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  10. "math/rand"
  11. "proposed_project/config"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "unicode/utf8"
  17. )
  18. var (
  19. saveBasePool = make(chan map[string]interface{}, 5000)
  20. saveBaseSp = make(chan bool, 1)
  21. saveRcPool = make(chan map[string]interface{}, 5000)
  22. saveRcSp = make(chan bool, 1)
  23. saveCtPool = make(chan map[string]interface{}, 5000)
  24. saveCtSp = make(chan bool, 1)
  25. saveCyPool = make(chan map[string]interface{}, 5000)
  26. saveCySp = make(chan bool, 1)
  27. saveEntPool = make(chan map[string]interface{}, 5000)
  28. saveEntSp = make(chan bool, 1)
  29. BaseField = []string{"lasttime", "firsttime", "proposed_number", "proposed_id", "follow_num", "title", "projectname", "approvecode", "approvedept",
  30. "approvenumber", "project_stage_code", "total_investment", "funds", "owner", "name_id", "ownerclass_code", "projecttype_code", "projectaddr",
  31. "projectperiod", "project_startdate", "project_completedate", "industry_code", "approvestatus", "project_scale",
  32. "category_code", "nature_code", "construction_area", "floor_area", "area_code", "city_code", "createtime"}
  33. RecordField = []string{"proposed_id", "infoid", "follow_num", "project_stage_code", "title", "project_scale", "publishtime", "jybxhref", "createtime"}
  34. ContactField = []string{"proposed_id", "infoid", "follow_num", "name_id", "name", "contact_name", "contact_tel", "contact_addr", "createtime"}
  35. CategoryField = []string{"proposed_id", "labelcode", "labelvalues", "labelweight", "createtime"}
  36. EntField = []string{"proposed_id", "name_id", "name", "area_code", "city_code", "address", "createtime", "identity_type"}
  37. AreaCode = make(map[string]string, 5000)
  38. TagCode = make(map[string]interface{}, 100)
  39. )
  40. func InitArea() {
  41. info := MysqlTool.Find("d_area_code_back", nil, "", "", -1, -1)
  42. for _, m := range *info {
  43. var key string
  44. for i, v := range []string{"area", "city", "district"} {
  45. if i == 0 && util.ObjToString(m[v]) != "" {
  46. key = util.ObjToString(m[v])
  47. } else if util.ObjToString(m[v]) != "" {
  48. key += "," + util.ObjToString(m[v])
  49. }
  50. }
  51. AreaCode[key] = util.ObjToString(m["code"])
  52. }
  53. log.Info("InitField", zap.Int("AreaCode", len(AreaCode)))
  54. }
  55. func InitTagCode() {
  56. info, _ := MgoBid.Find("nzj_rule", nil, nil, bson.M{"label_name": 1, "label": 1, "code": 1}, false, -1, -1)
  57. for _, m := range *info {
  58. lname := util.ObjToString(m["label_name"])
  59. lb := util.ObjToString(m["label"])
  60. code := util.ObjToString(m["code"])
  61. if lname == "sub_category" || lname == "top_category" {
  62. lname = "category"
  63. }
  64. if TagCode[lname] != nil {
  65. m1 := TagCode[lname].(map[string]interface{})
  66. m1[code] = lb
  67. TagCode[lname] = m1
  68. } else {
  69. m1 := make(map[string]interface{})
  70. m1[code] = lb
  71. TagCode[lname] = m1
  72. }
  73. }
  74. TagCode["nature"].(map[string]interface{})["00"] = "其它"
  75. TagCode["project_stage"].(map[string]interface{})["00"] = "其它"
  76. TagCode["category"].(map[string]interface{})["04"] = "其它工程"
  77. log.Info("InitTagCode", zap.Any("TagCode", TagCode))
  78. }
  79. func taskTidb(q map[string]interface{}) {
  80. sess := MgoPro.GetMgoConn()
  81. defer MgoPro.DestoryMongoConn(sess)
  82. ch := make(chan bool, config.Conf.Serve.Thread)
  83. wg := &sync.WaitGroup{}
  84. var query *mongodb.MgoIter
  85. if q != nil && len(q) > 0 {
  86. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(q).Select(SelectF).Iter()
  87. } else {
  88. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(nil).Select(SelectF).Iter()
  89. }
  90. count := 0
  91. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  92. if count%20000 == 0 {
  93. log.Info(fmt.Sprintf("current --- %d", count))
  94. }
  95. if t := util.Int64All(tmp["pici"]); t > pici {
  96. pici = t
  97. }
  98. ch <- true
  99. wg.Add(1)
  100. go func(tmp map[string]interface{}) {
  101. defer func() {
  102. <-ch
  103. wg.Done()
  104. }()
  105. saveM := make(map[string]interface{})
  106. for _, f := range BaseField {
  107. if f == "lasttime" || f == "firsttime" {
  108. if t := util.Int64All(tmp[f]); t > 0 {
  109. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  110. }
  111. } else if f == "proposed_id" {
  112. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  113. } else if f == "area_code" {
  114. if tmp["area"] != nil {
  115. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  116. }
  117. } else if f == "city_code" {
  118. if tmp["area"] != nil && tmp["city"] != nil {
  119. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  120. saveM[f] = AreaCode[c]
  121. }
  122. } else if f == "owner" {
  123. if v := util.ObjToString(tmp[f]); v != "" {
  124. if utf8.RuneCountInString(v) < 100 {
  125. saveM[f] = v
  126. }
  127. }
  128. } else if f == "name_id" {
  129. if b := util.ObjToString(tmp["owner"]); b != "" {
  130. if eid := redis.GetStr("ent_id", b); eid != "" {
  131. saveM["name_id"] = strings.Split(eid, "_")[0]
  132. }
  133. }
  134. } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  135. if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  136. t := util.Int64All(tmp[f])
  137. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  138. }
  139. } else if f == "createtime" {
  140. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  141. } else if f == "total_investment" {
  142. text := util.ObjToString(tmp[f])
  143. capital := ObjToMoney(text)
  144. capital = capital / 10000
  145. if capital != 0 {
  146. capital, _ = util.FormatFloat(capital, 6)
  147. saveM[f] = capital
  148. }
  149. } else if f == "approvestatus" {
  150. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  151. saveM[f] = tmp[f]
  152. }
  153. } else if f == "proposed_number" {
  154. if tmp[f] == nil {
  155. now := time.Now().Unix()
  156. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  157. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  158. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  159. saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  160. } else {
  161. saveM[f] = tmp[f]
  162. }
  163. } else if f == "approvecode" {
  164. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  165. saveM[f] = tmp[f]
  166. }
  167. } else if f == "floor_area" {
  168. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  169. saveM[f] = tmp[f]
  170. }
  171. } else {
  172. if tmp[f] != nil {
  173. saveM[f] = tmp[f]
  174. }
  175. }
  176. }
  177. saveBasePool <- saveM
  178. saveCy := make(map[string]interface{})
  179. saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  180. saveCy["labelcode"] = "category_code"
  181. saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
  182. saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
  183. saveCyPool <- saveCy
  184. if ow := util.ObjToString(tmp["owner"]); ow != "" {
  185. saveEnt := make(map[string]interface{})
  186. saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  187. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  188. saveEnt["name"] = ow
  189. if eid := redis.GetStr("ent_id", ow); eid != "" {
  190. arr := strings.Split(eid, "_")
  191. saveEnt["name_id"] = arr[0]
  192. if len(arr) == 2 {
  193. saveEnt["area_code"] = arr[1]
  194. } else if len(arr) == 3 {
  195. saveEnt["city_code"] = arr[2]
  196. }
  197. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  198. if info != nil && len(*info) > 0 {
  199. saveEnt["address"] = (*info)[0]["address"]
  200. }
  201. }
  202. saveEnt["identity_type"] = 1
  203. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  204. saveEntPool <- saveEnt
  205. }
  206. for _, v := range tmp["list"].([]interface{}) {
  207. saveRc := make(map[string]interface{})
  208. v1 := v.(map[string]interface{})
  209. saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  210. infoid := util.ObjToString(v1["infoid"])
  211. saveRc["infoid"] = infoid
  212. saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  213. saveRc["follow_num"] = v1["follow_num"]
  214. saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
  215. saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
  216. saveRc["title"] = util.ObjToString(v1["title"])
  217. if t := util.Int64All(v1["publishtime"]); t > 0 {
  218. saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  219. }
  220. saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
  221. saveRcPool <- saveRc
  222. if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
  223. saveCt := make(map[string]interface{})
  224. saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  225. saveCt["infoid"] = infoid
  226. saveCt["follow_num"] = tmp["follow_num"]
  227. if b := util.ObjToString(tmp["owner"]); b != "" {
  228. saveCt["name"] = util.ObjToString(tmp["owner"])
  229. if eid := redis.GetStr("ent_id", b); eid != "" {
  230. saveCt["name_id"] = strings.Split(eid, "_")[0]
  231. }
  232. }
  233. if p := util.ObjToString(v1["project_person"]); p != "" {
  234. saveCt["contact_name"] = p
  235. }
  236. if p := util.ObjToString(v1["project_phone"]); p != "" {
  237. saveCt["contact_tel"] = p
  238. }
  239. if p := util.ObjToString(v1["projectaddr"]); p != "" {
  240. saveCt["contact_addr"] = p
  241. }
  242. saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  243. saveCtPool <- saveCt
  244. }
  245. }
  246. }(tmp)
  247. tmp = make(map[string]interface{})
  248. }
  249. wg.Wait()
  250. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  251. }
  252. func taskTidb1() {
  253. sess := MgoPro.GetMgoConn()
  254. defer MgoPro.DestoryMongoConn(sess)
  255. ch := make(chan bool, config.Conf.Serve.Thread)
  256. wg := &sync.WaitGroup{}
  257. var query *mongodb.MgoIter
  258. q := bson.M{"_id": mongodb.StringTOBsonId(id)}
  259. query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(q).Select(SelectF).Iter()
  260. count := 0
  261. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  262. if count%20000 == 0 {
  263. log.Info(fmt.Sprintf("current --- %d", count))
  264. }
  265. ch <- true
  266. wg.Add(1)
  267. go func(tmp map[string]interface{}) {
  268. defer func() {
  269. <-ch
  270. wg.Done()
  271. }()
  272. saveM := make(map[string]interface{})
  273. for _, f := range BaseField {
  274. if f == "lasttime" || f == "firsttime" {
  275. if t := util.Int64All(tmp[f]); t > 0 {
  276. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  277. }
  278. } else if f == "proposed_id" {
  279. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  280. } else if f == "area_code" {
  281. if tmp["area"] != nil {
  282. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  283. }
  284. } else if f == "city_code" {
  285. if tmp["area"] != nil && tmp["city"] != nil {
  286. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  287. saveM[f] = AreaCode[c]
  288. }
  289. } else if f == "owner" {
  290. if v := util.ObjToString(tmp[f]); v != "" {
  291. if utf8.RuneCountInString(v) < 100 {
  292. saveM[f] = v
  293. }
  294. }
  295. } else if f == "name_id" {
  296. if b := util.ObjToString(tmp["owner"]); b != "" {
  297. if eid := redis.GetStr("ent_id", b); eid != "" {
  298. saveM["name_id"] = strings.Split(eid, "_")[0]
  299. }
  300. }
  301. } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  302. if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  303. t := util.Int64All(tmp[f])
  304. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  305. }
  306. } else if f == "createtime" {
  307. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  308. } else if f == "total_investment" {
  309. text := util.ObjToString(tmp[f])
  310. capital := ObjToMoney(text)
  311. capital = capital / 10000
  312. if capital != 0 {
  313. capital, _ = util.FormatFloat(capital, 6)
  314. saveM[f] = capital
  315. }
  316. } else if f == "approvestatus" {
  317. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  318. saveM[f] = tmp[f]
  319. }
  320. } else if f == "proposed_number" {
  321. if tmp[f] == nil {
  322. now := time.Now().Unix()
  323. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  324. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  325. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  326. saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  327. } else {
  328. saveM[f] = tmp[f]
  329. }
  330. } else if f == "approvecode" {
  331. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  332. saveM[f] = tmp[f]
  333. }
  334. } else if f == "floor_area" {
  335. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  336. saveM[f] = tmp[f]
  337. }
  338. } else {
  339. if tmp[f] != nil {
  340. saveM[f] = tmp[f]
  341. }
  342. }
  343. }
  344. saveBasePool <- saveM
  345. }(tmp)
  346. tmp = make(map[string]interface{})
  347. }
  348. wg.Wait()
  349. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  350. }
  351. func taskTidb_add(q map[string]interface{}) {
  352. sess := MgoPro.GetMgoConn()
  353. defer MgoPro.DestoryMongoConn(sess)
  354. ch := make(chan bool, config.Conf.Serve.Thread)
  355. wg := &sync.WaitGroup{}
  356. log.Info("taskTidb_add", zap.Any("q: ", q))
  357. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(q).Select(nil).Iter()
  358. count := 0
  359. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  360. if count%200 == 0 {
  361. log.Info(fmt.Sprintf("current --- %d", count))
  362. }
  363. if t := util.Int64All(tmp["pici"]); t > pici {
  364. pici = t
  365. }
  366. ch <- true
  367. wg.Add(1)
  368. go func(tmp map[string]interface{}) {
  369. go func() {
  370. <-ch
  371. wg.Done()
  372. }()
  373. taskB(tmp)
  374. taskE(tmp)
  375. }(tmp)
  376. tmp = make(map[string]interface{})
  377. }
  378. wg.Wait()
  379. TaskSingle = true
  380. log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici))
  381. }
  382. func taskB(tmp map[string]interface{}) {
  383. saveM := make(map[string]interface{})
  384. for _, f := range BaseField {
  385. if f == "lasttime" || f == "firsttime" {
  386. if t := util.Int64All(tmp[f]); t > 0 {
  387. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  388. }
  389. } else if f == "proposed_id" {
  390. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  391. } else if f == "area_code" {
  392. if tmp["area"] != nil {
  393. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  394. }
  395. } else if f == "city_code" {
  396. if tmp["area"] != nil && tmp["city"] != nil {
  397. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  398. saveM[f] = AreaCode[c]
  399. }
  400. } else if f == "owner" {
  401. if v := util.ObjToString(tmp[f]); v != "" {
  402. if utf8.RuneCountInString(v) < 100 {
  403. saveM[f] = v
  404. }
  405. }
  406. } else if f == "name_id" {
  407. if b := util.ObjToString(tmp["owner"]); b != "" {
  408. if eid := redis.GetStr("ent_id", b); eid != "" {
  409. saveM["name_id"] = strings.Split(eid, "_")[0]
  410. }
  411. }
  412. } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  413. if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
  414. t := util.Int64All(tmp[f])
  415. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  416. }
  417. } else if f == "createtime" {
  418. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  419. } else if f == "total_investment" {
  420. text := util.ObjToString(tmp[f])
  421. capital := ObjToMoney(text)
  422. capital = capital / 10000
  423. if capital != 0 {
  424. capital, _ = util.FormatFloat(capital, 6)
  425. saveM[f] = capital
  426. }
  427. } else if f == "approvestatus" {
  428. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 {
  429. saveM[f] = tmp[f]
  430. }
  431. } else if f == "proposed_number" {
  432. if tmp[f] == nil {
  433. now := time.Now().Unix()
  434. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  435. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  436. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  437. saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  438. } else {
  439. saveM[f] = tmp[f]
  440. }
  441. } else if f == "approvecode" {
  442. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 {
  443. saveM[f] = tmp[f]
  444. }
  445. } else if f == "floor_area" {
  446. if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 {
  447. saveM[f] = tmp[f]
  448. }
  449. } else {
  450. if tmp[f] != nil {
  451. saveM[f] = tmp[f]
  452. }
  453. }
  454. }
  455. info := MysqlTool.FindOne("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  456. if info != nil && len(*info) > 0 {
  457. saveM["updatetime"] = time.Now().Format(util.Date_Full_Layout)
  458. MysqlTool.Update("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, saveM)
  459. } else {
  460. MysqlTool.Insert("dwd_f_nzj_baseinfo", saveM)
  461. }
  462. info1 := MysqlTool.FindOne("dwd_f_nzj_category_tags", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  463. if info1 != nil && len(*info1) > 0 {
  464. } else {
  465. saveCy := make(map[string]interface{})
  466. saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  467. saveCy["labelcode"] = "category_code"
  468. saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
  469. saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
  470. MysqlTool.Insert("dwd_f_nzj_category_tags", saveCy)
  471. }
  472. info2 := MysqlTool.FindOne("dwd_f_nzj_ent", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
  473. if info2 != nil && len(*info2) > 0 {
  474. } else {
  475. if ow := util.ObjToString(tmp["owner"]); ow != "" {
  476. saveEnt := make(map[string]interface{})
  477. saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  478. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  479. saveEnt["name"] = ow
  480. if eid := redis.GetStr("ent_id", ow); eid != "" {
  481. arr := strings.Split(eid, "_")
  482. saveEnt["name_id"] = arr[0]
  483. if len(arr) == 2 {
  484. saveEnt["area_code"] = arr[1]
  485. } else if len(arr) == 3 {
  486. saveEnt["city_code"] = arr[2]
  487. }
  488. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  489. if info != nil && len(*info) > 0 {
  490. saveEnt["address"] = (*info)[0]["address"]
  491. }
  492. }
  493. saveEnt["identity_type"] = 1
  494. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  495. MysqlTool.Insert("dwd_f_nzj_ent", saveEnt)
  496. }
  497. }
  498. }
  499. func taskE(tmp map[string]interface{}) {
  500. for _, v := range tmp["list"].([]interface{}) {
  501. v1 := v.(map[string]interface{})
  502. infoid := util.ObjToString(v1["infoid"])
  503. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
  504. if info == nil || len(*info) == 0 {
  505. saveRc := make(map[string]interface{})
  506. saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  507. saveRc["infoid"] = infoid
  508. saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  509. saveRc["follow_num"] = v1["follow_num"]
  510. saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
  511. saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
  512. saveRc["title"] = util.ObjToString(v1["title"])
  513. if t := util.Int64All(v1["publishtime"]); t > 0 {
  514. saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  515. }
  516. saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
  517. MysqlTool.Insert("dwd_f_nzj_follw_record", saveRc)
  518. }
  519. info1 := MysqlTool.FindOne("dwd_f_nzj_contact", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "")
  520. if info1 == nil || len(*info1) == 0 {
  521. if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
  522. saveCt := make(map[string]interface{})
  523. saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  524. saveCt["infoid"] = infoid
  525. saveCt["follow_num"] = tmp["follow_num"]
  526. if b := util.ObjToString(tmp["owner"]); b != "" {
  527. saveCt["name"] = util.ObjToString(tmp["owner"])
  528. if eid := redis.GetStr("ent_id", b); eid != "" {
  529. saveCt["name_id"] = strings.Split(eid, "_")[0]
  530. }
  531. }
  532. if p := util.ObjToString(v1["project_person"]); p != "" {
  533. saveCt["contact_name"] = p
  534. }
  535. if p := util.ObjToString(v1["project_phone"]); p != "" {
  536. saveCt["contact_tel"] = p
  537. }
  538. if p := util.ObjToString(v1["projectaddr"]); p != "" {
  539. saveCt["contact_addr"] = p
  540. }
  541. saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  542. MysqlTool.Insert("dwd_f_nzj_contact", saveCt)
  543. }
  544. }
  545. }
  546. }
  547. func SaveFunc(table string, arr []string) {
  548. arru := make([]map[string]interface{}, saveSize)
  549. indexu := 0
  550. for {
  551. select {
  552. case v := <-saveBasePool:
  553. arru[indexu] = v
  554. indexu++
  555. if indexu == saveSize {
  556. saveBaseSp <- true
  557. go func(arru []map[string]interface{}) {
  558. defer func() {
  559. <-saveBaseSp
  560. }()
  561. MysqlTool.InsertBulk(table, arr, arru...)
  562. }(arru)
  563. arru = make([]map[string]interface{}, saveSize)
  564. indexu = 0
  565. }
  566. case <-time.After(1000 * time.Millisecond):
  567. if indexu > 0 {
  568. saveBaseSp <- true
  569. go func(arru []map[string]interface{}) {
  570. defer func() {
  571. <-saveBaseSp
  572. }()
  573. MysqlTool.InsertBulk(table, arr, arru...)
  574. }(arru[:indexu])
  575. arru = make([]map[string]interface{}, saveSize)
  576. indexu = 0
  577. }
  578. }
  579. }
  580. }
  581. func SaveRFunc(table string, arr []string) {
  582. arru := make([]map[string]interface{}, saveSize)
  583. indexu := 0
  584. for {
  585. select {
  586. case v := <-saveRcPool:
  587. arru[indexu] = v
  588. indexu++
  589. if indexu == saveSize {
  590. saveRcSp <- true
  591. go func(arru []map[string]interface{}) {
  592. defer func() {
  593. <-saveRcSp
  594. }()
  595. MysqlTool.InsertBulk(table, arr, arru...)
  596. }(arru)
  597. arru = make([]map[string]interface{}, saveSize)
  598. indexu = 0
  599. }
  600. case <-time.After(1000 * time.Millisecond):
  601. if indexu > 0 {
  602. saveRcSp <- true
  603. go func(arru []map[string]interface{}) {
  604. defer func() {
  605. <-saveRcSp
  606. }()
  607. MysqlTool.InsertBulk(table, arr, arru...)
  608. }(arru[:indexu])
  609. arru = make([]map[string]interface{}, saveSize)
  610. indexu = 0
  611. }
  612. }
  613. }
  614. }
  615. func SaveCFunc(table string, arr []string) {
  616. arru := make([]map[string]interface{}, saveSize)
  617. indexu := 0
  618. for {
  619. select {
  620. case v := <-saveCtPool:
  621. arru[indexu] = v
  622. indexu++
  623. if indexu == saveSize {
  624. saveCtSp <- true
  625. go func(arru []map[string]interface{}) {
  626. defer func() {
  627. <-saveCtSp
  628. }()
  629. MysqlTool.InsertBulk(table, arr, arru...)
  630. }(arru)
  631. arru = make([]map[string]interface{}, saveSize)
  632. indexu = 0
  633. }
  634. case <-time.After(1000 * time.Millisecond):
  635. if indexu > 0 {
  636. saveCtSp <- true
  637. go func(arru []map[string]interface{}) {
  638. defer func() {
  639. <-saveCtSp
  640. }()
  641. MysqlTool.InsertBulk(table, arr, arru...)
  642. }(arru[:indexu])
  643. arru = make([]map[string]interface{}, saveSize)
  644. indexu = 0
  645. }
  646. }
  647. }
  648. }
  649. func SaveCyFunc(table string, arr []string) {
  650. arru := make([]map[string]interface{}, saveSize)
  651. indexu := 0
  652. for {
  653. select {
  654. case v := <-saveCyPool:
  655. arru[indexu] = v
  656. indexu++
  657. if indexu == saveSize {
  658. saveCySp <- true
  659. go func(arru []map[string]interface{}) {
  660. defer func() {
  661. <-saveCySp
  662. }()
  663. MysqlTool.InsertBulk(table, arr, arru...)
  664. }(arru)
  665. arru = make([]map[string]interface{}, saveSize)
  666. indexu = 0
  667. }
  668. case <-time.After(1000 * time.Millisecond):
  669. if indexu > 0 {
  670. saveCySp <- true
  671. go func(arru []map[string]interface{}) {
  672. defer func() {
  673. <-saveCySp
  674. }()
  675. MysqlTool.InsertBulk(table, arr, arru...)
  676. }(arru[:indexu])
  677. arru = make([]map[string]interface{}, saveSize)
  678. indexu = 0
  679. }
  680. }
  681. }
  682. }
  683. func SaveEntFunc(table string, arr []string) {
  684. arru := make([]map[string]interface{}, saveSize)
  685. indexu := 0
  686. for {
  687. select {
  688. case v := <-saveEntPool:
  689. arru[indexu] = v
  690. indexu++
  691. if indexu == saveSize {
  692. saveEntSp <- true
  693. go func(arru []map[string]interface{}) {
  694. defer func() {
  695. <-saveEntSp
  696. }()
  697. MysqlTool.InsertBulk(table, arr, arru...)
  698. }(arru)
  699. arru = make([]map[string]interface{}, saveSize)
  700. indexu = 0
  701. }
  702. case <-time.After(1000 * time.Millisecond):
  703. if indexu > 0 {
  704. saveEntSp <- true
  705. go func(arru []map[string]interface{}) {
  706. defer func() {
  707. <-saveEntSp
  708. }()
  709. MysqlTool.InsertBulk(table, arr, arru...)
  710. }(arru[:indexu])
  711. arru = make([]map[string]interface{}, saveSize)
  712. indexu = 0
  713. }
  714. }
  715. }
  716. }