qyxyindex.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. package main
  2. import (
  3. "log"
  4. "qfw/util"
  5. elastic "qfw/util/elastic"
  6. "regexp"
  7. //"sync"
  8. "time"
  9. )
  10. var (
  11. timeReg = regexp.MustCompile("[\\d]{4}-[\\d]{2}-[\\d]{2}")
  12. )
  13. func qyxyTask1(q map[string]interface{}) {
  14. defer util.Catch()
  15. // savelock := sync.Mutex{}
  16. //连接
  17. session := qyxydb.GetMgoConn(86400)
  18. defer qyxydb.DestoryMongoConn(session)
  19. //
  20. c, _ := qyxy_ent["collect"].(string)
  21. db, _ := qyxy_ent["db"].(string)
  22. index, _ := qyxy_ent["index"].(string)
  23. itype, _ := qyxy_ent["type"].(string)
  24. count, _ := session.DB(db).C(c).Find(&q).Count()
  25. savepool := make(chan bool, 10)
  26. log.Println("企业信用索引 查询语句:", q, "同步总数:", count, "elastic库:", index)
  27. query := session.DB(db).C(c).Find(q).Iter()
  28. arr := make([]map[string]interface{}, savesizei)
  29. var n int
  30. i := 0
  31. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  32. delete(tmp, "_id")
  33. tmp["_id"] = tmp["company_id"]
  34. delete(tmp, "cancels")
  35. delete(tmp, "cancel_date")
  36. delete(tmp, "intellectuals")
  37. delete(tmp, "chattels")
  38. delete(tmp, "checks")
  39. delete(tmp, "revoke_date")
  40. delete(tmp, "changes")
  41. delete(tmp, "partners")
  42. if tmp["establish_date"] != nil {
  43. establish_date_time, ok := tmp["establish_date"].(time.Time)
  44. if ok {
  45. tmp["establish_date"] = establish_date_time.Unix()
  46. } else {
  47. tmp["establish_date"] = 0
  48. util.Debug(tmp["company_id"], "establish_date")
  49. }
  50. }
  51. if tmp["lastupdatetime"] != nil {
  52. lastupdatetime_time, ok := tmp["lastupdatetime"].(time.Time)
  53. if ok {
  54. tmp["lastupdatetime"] = lastupdatetime_time.Unix()
  55. } else {
  56. tmp["lastupdatetime"] = 0
  57. util.Debug(tmp["company_id"], "lastupdatetime")
  58. }
  59. }
  60. if tmp["issue_date"] != nil {
  61. issue_date_time, ok := tmp["issue_date"].(time.Time)
  62. if ok {
  63. tmp["issue_date"] = issue_date_time.Unix()
  64. } else {
  65. tmp["issue_date"] = 0
  66. util.Debug(tmp["company_id"], "issue_date")
  67. }
  68. }
  69. if operation_startdate, ok := tmp["operation_startdate"].(string); operation_startdate != "" && ok {
  70. operation_startdate = timeReg.FindString(operation_startdate)
  71. tmp["operation_startdate"] = operation_startdate + " 00:00:00"
  72. }
  73. if operation_enddate, ok := tmp["operation_enddate"].(string); operation_enddate != "" && ok {
  74. operation_enddate = timeReg.FindString(operation_enddate)
  75. tmp["operation_enddate"] = operation_enddate + " 00:00:00"
  76. }
  77. // if revoke_date, ok := tmp["revoke_date"].(string); revoke_date != "" && ok {
  78. // revoke_date = timeReg.FindString(revoke_date)
  79. // tmp["revoke_date"] = revoke_date + " 00:00:00"
  80. // }
  81. // if changes, ok := tmp["changes"].([]interface{}); ok && len(changes) > 0 {
  82. // for _, change := range changes {
  83. // if tmp1, ok := change.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
  84. // if change_date, ok := tmp1["change_date"].(string); ok && change_date != "" {
  85. // change_date = timeReg.FindString(change_date)
  86. // tmp1["change_date"] = change_date + " 00:00:00"
  87. // }
  88. // }
  89. // }
  90. // }
  91. //operations
  92. if operations, ok := tmp["operations"].([]interface{}); ok && len(operations) > 0 {
  93. for _, operation := range operations {
  94. if tmp1, ok := operation.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
  95. if included_time, ok := tmp1["included_time"].(string); ok && included_time != "" {
  96. included_time = timeReg.FindString(included_time)
  97. tmp1["included_time"] = included_time + " 00:00:00"
  98. }
  99. if removed_time, ok := tmp1["removed_time"].(string); ok && removed_time != "" {
  100. removed_time = timeReg.FindString(removed_time)
  101. tmp1["removed_time"] = removed_time + " 00:00:00"
  102. }
  103. }
  104. }
  105. }
  106. //punishes
  107. if punishes, ok := tmp["punishes"].([]interface{}); ok && len(punishes) > 0 {
  108. for _, punishe := range punishes {
  109. if tmp1, ok := punishe.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
  110. if public_date, ok := tmp1["public_date"].(string); ok && public_date != "" {
  111. public_date = timeReg.FindString(public_date)
  112. tmp1["public_date"] = public_date + " 00:00:00"
  113. }
  114. if punish_date, ok := tmp1["punish_date"].(string); ok && punish_date != "" {
  115. punish_date = timeReg.FindString(punish_date)
  116. tmp1["punish_date"] = punish_date + " 00:00:00"
  117. }
  118. }
  119. }
  120. }
  121. //annual_reports
  122. if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
  123. for _, annual_report := range annual_reports {
  124. if tmp1, ok := annual_report.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
  125. if report_changes, ok := tmp1["report_changes"].([]interface{}); ok && len(report_changes) > 0 {
  126. for _, report_change := range report_changes {
  127. if tmp2, ok := report_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
  128. if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
  129. change_date = timeReg.FindString(change_date)
  130. tmp2["change_date"] = change_date + " 00:00:00"
  131. }
  132. }
  133. }
  134. }
  135. if report_partners, ok := tmp1["report_partners"].([]interface{}); ok && len(report_partners) > 0 {
  136. for _, report_partner := range report_partners {
  137. if tmp2, ok := report_partner.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
  138. if stock_realdate, ok := tmp2["stock_realdate"].(string); ok && stock_realdate != "" {
  139. stock_realdate = timeReg.FindString(stock_realdate)
  140. tmp2["stock_realdate"] = stock_realdate + " 00:00:00"
  141. }
  142. if stock_date, ok := tmp2["stock_date"].(string); ok && stock_date != "" {
  143. stock_date = timeReg.FindString(stock_date)
  144. tmp2["stock_date"] = stock_date + " 00:00:00"
  145. }
  146. }
  147. }
  148. }
  149. if report_equity_changes, ok := tmp1["report_equity_changes"].([]interface{}); ok && len(report_equity_changes) > 0 {
  150. for _, report_equity_change := range report_equity_changes {
  151. if tmp2, ok := report_equity_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
  152. if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
  153. change_date = timeReg.FindString(change_date)
  154. tmp2["change_date"] = change_date + " 00:00:00"
  155. }
  156. }
  157. }
  158. }
  159. if report_out_guarantees, ok := tmp1["report_out_guarantees"].([]interface{}); ok && len(report_out_guarantees) > 0 {
  160. for _, report_out_guarantee := range report_out_guarantees {
  161. if tmp2, ok := report_out_guarantee.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
  162. if perform_time, ok := tmp2["perform_time"].(string); ok && perform_time != "" {
  163. perform_time = timeReg.FindString(perform_time)
  164. tmp2["perform_time"] = perform_time + " 00:00:00"
  165. }
  166. if guarantee_time, ok := tmp2["guarantee_time"].(string); ok && guarantee_time != "" {
  167. guarantee_time = timeReg.FindString(guarantee_time)
  168. tmp2["guarantee_time"] = guarantee_time + " 00:00:00"
  169. }
  170. }
  171. }
  172. }
  173. }
  174. }
  175. }
  176. arr[i] = tmp
  177. n++
  178. if i == savesizei-1 {
  179. savepool <- true
  180. tmps := arr
  181. go func(tmpn *[]map[string]interface{}) {
  182. defer func() {
  183. <-savepool
  184. }()
  185. elastic.BulkSave(index, itype, tmpn, true)
  186. }(&tmps)
  187. i = 0
  188. arr = make([]map[string]interface{}, savesizei)
  189. }
  190. if n%savesizei == 0 {
  191. log.Println("当前:", n)
  192. }
  193. // n++
  194. // savelock.Lock()
  195. // arr = append(arr, tmp)
  196. // //生索引
  197. // if len(arr) >= savesizei-1 {
  198. // tmps := arr
  199. // elastic.BulkSave(index, itype, &tmps, true)
  200. // time.Sleep(1 * time.Second)
  201. // arr = []map[string]interface{}{}
  202. // }
  203. // savelock.Unlock()
  204. // //计数
  205. // if n%savesizei == 0 {
  206. // log.Println("当前:", n)
  207. // }
  208. tmp = make(map[string]interface{})
  209. }
  210. // savelock.Lock()
  211. // if len(arr) > 0 {
  212. // tmps := arr
  213. // elastic.BulkSave(index, itype, &tmps, true)
  214. // }
  215. // savelock.Unlock()
  216. if i > 0 {
  217. elastic.BulkSave(index, itype, &arr, true)
  218. }
  219. log.Println("create qyxy index...over", n)
  220. }
  221. func qyxyTask(q map[string]interface{}) {
  222. defer util.Catch()
  223. // savelock := sync.Mutex{}
  224. //连接
  225. session := qyxydb.GetMgoConn(86400)
  226. defer qyxydb.DestoryMongoConn(session)
  227. //
  228. c, _ := qyxy_ent["collect"].(string)
  229. db, _ := qyxy_ent["db"].(string)
  230. index, _ := qyxy_ent["index"].(string)
  231. itype, _ := qyxy_ent["type"].(string)
  232. count, _ := session.DB(db).C(c).Find(&q).Count()
  233. savepool := make(chan bool, 10)
  234. log.Println("企业信用索引 查询语句:", q, "同步总数:", count, "elastic库:", index)
  235. query := session.DB(db).C(c).Find(q).Iter()
  236. arr := make([]map[string]interface{}, savesizei)
  237. var n int
  238. i := 0
  239. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  240. //delete(tmp, "_id")
  241. tmp["_id"] = tmp["company_id"]
  242. // delete(tmp, "cancels")
  243. // delete(tmp, "cancel_date")
  244. // delete(tmp, "intellectuals")
  245. // delete(tmp, "chattels")
  246. // delete(tmp, "checks")
  247. // delete(tmp, "revoke_date")
  248. delete(tmp, "changes")
  249. // delete(tmp, "partners")
  250. // if tmp["establish_date"] != nil {
  251. // establish_date_time, ok := tmp["establish_date"].(time.Time)
  252. // if ok {
  253. // tmp["establish_date"] = establish_date_time.Unix()
  254. // } else {
  255. // tmp["establish_date"] = 0
  256. // util.Debug(tmp["company_id"], "establish_date")
  257. // }
  258. // }
  259. // if tmp["lastupdatetime"] != nil {
  260. // lastupdatetime_time, ok := tmp["lastupdatetime"].(time.Time)
  261. // if ok {
  262. // tmp["lastupdatetime"] = lastupdatetime_time.Unix()
  263. // } else {
  264. // tmp["lastupdatetime"] = 0
  265. // util.Debug(tmp["company_id"], "lastupdatetime")
  266. // }
  267. // }
  268. // if tmp["issue_date"] != nil {
  269. // issue_date_time, ok := tmp["issue_date"].(time.Time)
  270. // if ok {
  271. // tmp["issue_date"] = issue_date_time.Unix()
  272. // } else {
  273. // tmp["issue_date"] = 0
  274. // util.Debug(tmp["company_id"], "issue_date")
  275. // }
  276. // }
  277. // if operation_startdate, ok := tmp["operation_startdate"].(string); operation_startdate != "" && ok {
  278. // operation_startdate = timeReg.FindString(operation_startdate)
  279. // tmp["operation_startdate"] = operation_startdate + " 00:00:00"
  280. // }
  281. // if operation_enddate, ok := tmp["operation_enddate"].(string); operation_enddate != "" && ok {
  282. // operation_enddate = timeReg.FindString(operation_enddate)
  283. // tmp["operation_enddate"] = operation_enddate + " 00:00:00"
  284. // }
  285. // //operations
  286. // if operations, ok := tmp["operations"].([]interface{}); ok && len(operations) > 0 {
  287. // for _, operation := range operations {
  288. // if tmp1, ok := operation.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
  289. // if included_time, ok := tmp1["included_time"].(string); ok && included_time != "" {
  290. // included_time = timeReg.FindString(included_time)
  291. // tmp1["included_time"] = included_time + " 00:00:00"
  292. // }
  293. // if removed_time, ok := tmp1["removed_time"].(string); ok && removed_time != "" {
  294. // removed_time = timeReg.FindString(removed_time)
  295. // tmp1["removed_time"] = removed_time + " 00:00:00"
  296. // }
  297. // }
  298. // }
  299. // }
  300. // //punishes
  301. // if punishes, ok := tmp["punishes"].([]interface{}); ok && len(punishes) > 0 {
  302. // for _, punishe := range punishes {
  303. // if tmp1, ok := punishe.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
  304. // if public_date, ok := tmp1["public_date"].(string); ok && public_date != "" {
  305. // public_date = timeReg.FindString(public_date)
  306. // tmp1["public_date"] = public_date + " 00:00:00"
  307. // }
  308. // if punish_date, ok := tmp1["punish_date"].(string); ok && punish_date != "" {
  309. // punish_date = timeReg.FindString(punish_date)
  310. // tmp1["punish_date"] = punish_date + " 00:00:00"
  311. // }
  312. // }
  313. // }
  314. // }
  315. // //annual_reports
  316. // if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
  317. // for _, annual_report := range annual_reports {
  318. // if tmp1, ok := annual_report.(map[string]interface{}); tmp1 != nil && ok && len(tmp1) > 0 {
  319. // if report_changes, ok := tmp1["report_changes"].([]interface{}); ok && len(report_changes) > 0 {
  320. // for _, report_change := range report_changes {
  321. // if tmp2, ok := report_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
  322. // if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
  323. // change_date = timeReg.FindString(change_date)
  324. // tmp2["change_date"] = change_date + " 00:00:00"
  325. // }
  326. // }
  327. // }
  328. // }
  329. // if report_partners, ok := tmp1["report_partners"].([]interface{}); ok && len(report_partners) > 0 {
  330. // for _, report_partner := range report_partners {
  331. // if tmp2, ok := report_partner.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
  332. // if stock_realdate, ok := tmp2["stock_realdate"].(string); ok && stock_realdate != "" {
  333. // stock_realdate = timeReg.FindString(stock_realdate)
  334. // tmp2["stock_realdate"] = stock_realdate + " 00:00:00"
  335. // }
  336. // if stock_date, ok := tmp2["stock_date"].(string); ok && stock_date != "" {
  337. // stock_date = timeReg.FindString(stock_date)
  338. // tmp2["stock_date"] = stock_date + " 00:00:00"
  339. // }
  340. // }
  341. // }
  342. // }
  343. // if report_equity_changes, ok := tmp1["report_equity_changes"].([]interface{}); ok && len(report_equity_changes) > 0 {
  344. // for _, report_equity_change := range report_equity_changes {
  345. // if tmp2, ok := report_equity_change.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
  346. // if change_date, ok := tmp2["change_date"].(string); ok && change_date != "" {
  347. // change_date = timeReg.FindString(change_date)
  348. // tmp2["change_date"] = change_date + " 00:00:00"
  349. // }
  350. // }
  351. // }
  352. // }
  353. // if report_out_guarantees, ok := tmp1["report_out_guarantees"].([]interface{}); ok && len(report_out_guarantees) > 0 {
  354. // for _, report_out_guarantee := range report_out_guarantees {
  355. // if tmp2, ok := report_out_guarantee.(map[string]interface{}); tmp2 != nil && ok && len(tmp2) > 0 {
  356. // if perform_time, ok := tmp2["perform_time"].(string); ok && perform_time != "" {
  357. // perform_time = timeReg.FindString(perform_time)
  358. // tmp2["perform_time"] = perform_time + " 00:00:00"
  359. // }
  360. // if guarantee_time, ok := tmp2["guarantee_time"].(string); ok && guarantee_time != "" {
  361. // guarantee_time = timeReg.FindString(guarantee_time)
  362. // tmp2["guarantee_time"] = guarantee_time + " 00:00:00"
  363. // }
  364. // }
  365. // }
  366. // }
  367. // }
  368. // }
  369. // }
  370. arr[i] = tmp
  371. n++
  372. if i == savesizei-1 {
  373. savepool <- true
  374. tmps := arr
  375. go func(tmpn *[]map[string]interface{}) {
  376. defer func() {
  377. <-savepool
  378. }()
  379. elastic.BulkSave(index, itype, tmpn, true)
  380. }(&tmps)
  381. i = 0
  382. arr = make([]map[string]interface{}, savesizei)
  383. }
  384. if n%savesizei == 0 {
  385. log.Println("当前:", n)
  386. }
  387. // n++
  388. // savelock.Lock()
  389. // arr = append(arr, tmp)
  390. // //生索引
  391. // if len(arr) >= savesizei-1 {
  392. // tmps := arr
  393. // elastic.BulkSave(index, itype, &tmps, true)
  394. // time.Sleep(1 * time.Second)
  395. // arr = []map[string]interface{}{}
  396. // }
  397. // savelock.Unlock()
  398. // //计数
  399. // if n%savesizei == 0 {
  400. // log.Println("当前:", n)
  401. // }
  402. tmp = make(map[string]interface{})
  403. }
  404. // savelock.Lock()
  405. // if len(arr) > 0 {
  406. // tmps := arr
  407. // elastic.BulkSave(index, itype, &tmps, true)
  408. // }
  409. // savelock.Unlock()
  410. if i > 0 {
  411. elastic.BulkSave(index, itype, &arr, true)
  412. }
  413. log.Println("create qyxy index...over", n)
  414. }