task.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. qu "qfw/util"
  7. "regexp"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/cron"
  13. "go.mongodb.org/mongo-driver/bson/primitive"
  14. )
  15. var (
  16. //清理
  17. Han = regexp.MustCompile("[\\p{Han}]") //匹配汉字
  18. //es、mgo非全部字段
  19. FieldListMap = map[string]map[string]bool{
  20. "partners": map[string]bool{"stock_type": true, "stock_name": true, "stock_capital": false, "stock_realcapital": false, "identify_type": true, "identify_no": true},
  21. "employees": map[string]bool{"employee_name": false, "position": false},
  22. }
  23. //全部字段
  24. AllFieldListMap = []string{"punishes", "operations", "illegals"}
  25. //地区处理
  26. AreaFiled = []string{"credit_no", "company_code", "area_code"}
  27. //年报信息
  28. AnnualReportsArr = [][]string{
  29. []string{"report_year", "company_phone", "zip_code", "company_email", "employee_no", "operator_name"},
  30. []string{"total_assets", "total_equity", "total_sales", "total_profit", "main_business_income", "profit_amount", "total_tax", "total_liability"},
  31. }
  32. )
  33. // var AllFieldListMap = map[string]string{
  34. // "punishes": "punish_size",
  35. // "operations": "operation_size",
  36. // "illegals": "illegal_size",
  37. // }
  38. //不生索引字段
  39. //var NotEsField = []string{"cancel_reason", "revoke_reason", "cancels"} //cancel_size
  40. type City struct {
  41. Code string `json:"code"`
  42. Province string `json:"province"`
  43. City string `json:"city"`
  44. District string `json:"district"`
  45. }
  46. //定时任务
  47. func TimeTask() {
  48. //StartTask()
  49. c := cron.New()
  50. cronstr := "0 0 15 ? * Tue" //每周一周二15点执行
  51. //cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次
  52. c.AddFunc(cronstr, func() { StartTask() })
  53. c.Start()
  54. }
  55. //开始任务
  56. func StartTask() {
  57. log.Println("Start Task...")
  58. // query := map[string]interface{}{
  59. // "updatetime": map[string]interface{}{
  60. // "$gt": Updatetime,
  61. // },
  62. // }
  63. QyxyStandard()
  64. // run := QyxyStandard()
  65. // if run {
  66. // time.Sleep(5 * time.Minute)
  67. // if Mgo.DelColl(Dbcoll) {
  68. // log.Println("Delete Coll ", Dbcoll, "Success")
  69. // } else {
  70. // log.Println("Delete Coll ", Dbcoll, "Fail")
  71. // }
  72. // }
  73. }
  74. //标准化数据,生索引
  75. func QyxyStandard() bool {
  76. defer qu.Catch()
  77. sess := Mgo.GetMgoConn()
  78. defer Mgo.DestoryMongoConn(sess)
  79. pool := make(chan bool, 20) //控制线程数
  80. wg := &sync.WaitGroup{}
  81. lock := &sync.Mutex{} //控制读写
  82. arr := [][]map[string]interface{}{}
  83. count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
  84. log.Println("共查询:", count, "条")
  85. if count == 0 {
  86. return false
  87. }
  88. it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
  89. sum := 0
  90. for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
  91. if sum%100 == 0 {
  92. log.Println("current:", sum)
  93. }
  94. pool <- true
  95. wg.Add(1)
  96. go func(tmp map[string]interface{}) {
  97. defer func() {
  98. <-pool
  99. wg.Done()
  100. }()
  101. update := []map[string]interface{}{}
  102. esMap := map[string]interface{}{}
  103. mgoMap := map[string]interface{}{}
  104. //id处理
  105. idMap := tmp["_id"].(map[string]interface{})
  106. _id := qu.ObjToString(idMap["_id"])
  107. esMap["_id"] = _id
  108. esMap["updatetime"] = time.Now().Unix()
  109. update = append(update, map[string]interface{}{"_id": _id})
  110. //地区处理
  111. hadArea := false //标记是否有省份信息
  112. for i, field := range AreaFiled {
  113. if tmp[field] == nil {
  114. continue
  115. }
  116. if code := fmt.Sprint(tmp[field]); code != "" {
  117. esMap[field] = code //加入esMap
  118. if !hadArea {
  119. if i == 0 && len(code) >= 8 { //credit_no企业信用代码
  120. code = code[2:8]
  121. } else if i == 1 && len(code) >= 6 { //company_code注册号
  122. code = code[:6]
  123. }
  124. if city := AddressMap[code]; city != nil { //未作废中取
  125. if city.Province != "" {
  126. esMap["company_area"] = city.Province //省
  127. }
  128. if city.City != "" {
  129. esMap["company_city"] = city.City //市
  130. }
  131. if city.District != "" {
  132. esMap["company_district"] = city.District //县
  133. }
  134. } else { //作废中取
  135. if city := AddressOldMap[code]; city != nil {
  136. if city.Province != "" {
  137. esMap["company_area"] = city.Province //省
  138. }
  139. if city.City != "" {
  140. esMap["company_city"] = city.City //市
  141. }
  142. if city.District != "" {
  143. esMap["company_district"] = city.District //县
  144. }
  145. }
  146. }
  147. if esMap["company_area"] != nil {
  148. hadArea = true
  149. }
  150. }
  151. }
  152. }
  153. //生索引字段处理
  154. for _, field := range EsFields {
  155. if tmp[field] == nil {
  156. continue
  157. }
  158. //qu.Debug(field, tmp[field], tmp[field] == nil)
  159. if field == "capital" { //注册资本处理
  160. text := qu.ObjToString(tmp[field])
  161. if currency := GetCurrency(text); currency != "" {
  162. esMap["currency"] = currency //币种
  163. }
  164. capital := ObjToMoney(text)
  165. capital = capital / 10000
  166. if capital != 0 {
  167. esMap[field] = capital //注册资本
  168. }
  169. } else if field == "company_type" { //企业类型处理
  170. text := qu.ObjToString(tmp[field])
  171. if text != "" {
  172. esMap["company_type_old"] = text //old
  173. if strings.Contains(text, "个体") {
  174. esMap[field] = "个体工商户"
  175. } else {
  176. text = strings.ReplaceAll(text, "(", "(")
  177. text = strings.ReplaceAll(text, ")", ")")
  178. if stype := QyStypeMap[text]; stype != "" {
  179. esMap[field] = stype
  180. } else {
  181. esMap[field] = "其他"
  182. }
  183. }
  184. }
  185. } else if field == "company_status" { //企业类型处理
  186. text := qu.ObjToString(tmp[field])
  187. if text != "" {
  188. text = strings.ReplaceAll(text, "(", "(")
  189. text = strings.ReplaceAll(text, ")", ")")
  190. if status := CompanyStatusMap[text]; status != "" {
  191. esMap[field] = status
  192. } else {
  193. esMap[field] = "其他"
  194. }
  195. }
  196. } else if strings.Contains(field, "date") || strings.Contains(field, "time") { //时间处理
  197. if tmp[field] != nil {
  198. if timeTmp, ok := tmp[field].(primitive.DateTime); ok {
  199. t := timeTmp.Time()
  200. esMap[field] = qu.FormatDate(&t, qu.Date_Short_Layout)
  201. } else if timeTmp, ok := tmp[field].(string); ok && timeTmp != "" {
  202. t := timeReg.FindString(timeTmp)
  203. if t != "" {
  204. esMap[field] = t
  205. }
  206. }
  207. }
  208. } else {
  209. if text := qu.ObjToString(tmp[field]); text != "" {
  210. if field == "company_name" {
  211. esMap["name"] = text
  212. }
  213. esMap[field] = text
  214. }
  215. }
  216. }
  217. //不生索引字段处理
  218. for _, field := range Fields {
  219. if text, ok := tmp[field].(string); ok && text != "" {
  220. mgoMap[field] = text
  221. } else if text, ok := tmp[field].(int32); ok {
  222. mgoMap[field] = text
  223. }
  224. }
  225. //list数据
  226. stockName := []string{}
  227. for field, fieldMap := range FieldListMap {
  228. if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 {
  229. if len(list) > 500 {
  230. list = list[:500]
  231. }
  232. tmpArrMgo := []map[string]interface{}{}
  233. tmpArrEs := []map[string]interface{}{}
  234. for _, l := range list {
  235. tmpMapMgo := map[string]interface{}{}
  236. tmpMapEs := map[string]interface{}{}
  237. m := l.(map[string]interface{})
  238. for f, b := range fieldMap {
  239. if text := qu.ObjToString(m[f]); text != "" {
  240. tmpMapMgo[f] = text
  241. if f == "stock_name" {
  242. stockName = append(stockName, text)
  243. }
  244. if b {
  245. tmpMapEs[f] = text
  246. }
  247. }
  248. }
  249. if len(tmpMapEs) > 0 {
  250. tmpArrEs = append(tmpArrEs, tmpMapEs)
  251. }
  252. if len(tmpMapMgo) > 0 {
  253. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  254. }
  255. }
  256. if len(tmpArrEs) > 0 {
  257. esMap[field] = tmpArrEs
  258. }
  259. if len(tmpArrMgo) > 0 {
  260. mgoMap[field] = tmpArrMgo
  261. }
  262. }
  263. }
  264. if len(stockName) > 0 {
  265. esMap["stock_name"] = strings.Join(stockName, ",")
  266. }
  267. for _, field := range AllFieldListMap {
  268. if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 {
  269. tmpArrMgo := []map[string]interface{}{}
  270. for _, l := range list {
  271. tmpMapMgo := map[string]interface{}{}
  272. m := l.(map[string]interface{})
  273. for k, v := range m {
  274. if tmpv := fmt.Sprint(v); v != nil && tmpv != "" {
  275. tmpMapMgo[k] = tmpv
  276. }
  277. }
  278. if len(tmpMapMgo) > 0 {
  279. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  280. }
  281. }
  282. if len(tmpArrMgo) > 0 {
  283. mgoMap[field] = tmpArrMgo
  284. }
  285. }
  286. }
  287. //年报信息
  288. sortArr := []string{} //存年份
  289. sortMap := map[string]map[string]interface{}{} //key:年份;val:每一个年报中的company_phone,company_email,stock_name
  290. tmpArrMgo := []map[string]interface{}{}
  291. if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
  292. for _, annual_report := range annual_reports {
  293. tmpMapMgo := map[string]interface{}{} //记录每个年报信息标准化到mgo的数据
  294. tmpMap := map[string]interface{}{} //只记录每个年报信息的company_email和company_phone
  295. report_year := ""
  296. m := annual_report.(map[string]interface{})
  297. for i, tmpArr := range AnnualReportsArr {
  298. for _, f := range tmpArr {
  299. if text := m[f]; text != nil {
  300. if textstr := fmt.Sprint(text); textstr != "" {
  301. if f == "report_year" {
  302. report_year = textstr
  303. sortArr = append(sortArr, textstr)
  304. } else if f == "company_phone" && !Han.MatchString(textstr) && len(textstr) >= 7 {
  305. tmpMap[f] = textstr
  306. tmpMapMgo[f] = textstr
  307. } else if f == "company_email" && !Han.MatchString(textstr) && len(textstr) >= 4 {
  308. tmpMap[f] = textstr
  309. tmpMapMgo[f] = textstr
  310. }
  311. if i == 0 { //字符串信息
  312. if f == "company_phone" || f == "company_email" {
  313. continue
  314. }
  315. tmpMapMgo[f] = textstr
  316. } else if i == 1 { //转金额
  317. money := ObjToMoney(textstr) / 10000
  318. tmpMapMgo[f] = money
  319. }
  320. }
  321. }
  322. }
  323. }
  324. // stock_nameArr := []string{}
  325. // if i_partners, ok := m["report_partners"].([]interface{}); ok && len(i_partners) > 0 { //股东信息
  326. // for _, par := range i_partners {
  327. // m := par.(map[string]interface{})
  328. // if stock_name, ok := m["stock_name"].(string); ok && stock_name != "" {
  329. // stock_nameArr = append(stock_nameArr, stock_name)
  330. // }
  331. // }
  332. // }
  333. // if len(stock_nameArr) > 0 {
  334. // stockname := strings.Join(stock_nameArr, ",")
  335. // tmpMap["stock_name"] = stockname
  336. // }
  337. sortMap[report_year] = tmpMap
  338. if len(tmpMapMgo) > 0 {
  339. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  340. }
  341. }
  342. }
  343. if len(tmpArrMgo) > 0 {
  344. mgoMap["annual_reports"] = tmpArrMgo
  345. }
  346. if len(sortArr) > 0 && len(sortMap) > 0 {
  347. sort.Strings(sortArr)
  348. report_year := sortArr[len(sortArr)-1]
  349. for k, v := range sortMap[report_year] {
  350. esMap[k] = v
  351. }
  352. }
  353. //合并
  354. for k, v := range esMap {
  355. if k == "partners" {
  356. continue
  357. }
  358. mgoMap[k] = v
  359. }
  360. //es数据过滤
  361. EsSaveFlag := true
  362. company_name := qu.ObjToString(esMap["company_name"])
  363. if len([]rune(company_name)) < 8 {
  364. EsSaveFlag = false
  365. }
  366. if EsSaveFlag {
  367. company_type := qu.ObjToString(esMap["company_type"])
  368. if company_type == "" || company_type == "个体工商户" {
  369. EsSaveFlag = false
  370. }
  371. }
  372. if EsSaveFlag {
  373. credit_no := strings.TrimSpace(qu.ObjToString(esMap["credit_no"]))
  374. company_code := strings.TrimSpace(qu.ObjToString(esMap["company_code"]))
  375. if credit_no == "" && company_code == "" {
  376. EsSaveFlag = false
  377. }
  378. }
  379. //qu.Debug("esMap---", esMap)
  380. // qu.Debug("mgoMap---", mgoMap)
  381. // return
  382. lock.Lock()
  383. if EsSaveFlag {
  384. EsSaveCache <- esMap //过滤后数据保存
  385. }
  386. EsSaveAllCache <- esMap //所有数据保存
  387. update = append(update, map[string]interface{}{"$set": mgoMap})
  388. if len(update) == 2 {
  389. arr = append(arr, update)
  390. }
  391. if len(arr) > 500 {
  392. tmps := arr
  393. Mgo.UpSertBulk(Savecoll, tmps...)
  394. arr = [][]map[string]interface{}{}
  395. }
  396. lock.Unlock()
  397. }(tmp)
  398. tmp = make(map[string]interface{})
  399. }
  400. wg.Wait()
  401. lock.Lock()
  402. if len(arr) > 0 {
  403. Mgo.UpSertBulk(Savecoll, arr...)
  404. }
  405. lock.Unlock()
  406. log.Println("Run Over...Count:", sum)
  407. return true
  408. }
  409. //所有企业数据标准化
  410. func HistoryQyxyStandard() bool {
  411. qu.Debug("--------History--------")
  412. defer qu.Catch()
  413. sess := Mgo.GetMgoConn()
  414. defer Mgo.DestoryMongoConn(sess)
  415. pool := make(chan bool, 20) //控制线程数
  416. wg := &sync.WaitGroup{}
  417. lock := &sync.Mutex{} //控制读写
  418. // arr := [][]map[string]interface{}{}
  419. // count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
  420. // log.Println("共查询:", count, "条")
  421. // if count == 0 {
  422. // return false
  423. // }
  424. it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
  425. sum := 0
  426. for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
  427. if sum%10000 == 0 {
  428. log.Println("current:", sum)
  429. }
  430. pool <- true
  431. wg.Add(1)
  432. go func(tmp map[string]interface{}) {
  433. defer func() {
  434. <-pool
  435. wg.Done()
  436. }()
  437. update := []map[string]interface{}{}
  438. esMap := map[string]interface{}{}
  439. mgoMap := map[string]interface{}{}
  440. //id处理
  441. idMap := tmp["_id"].(map[string]interface{})
  442. _id := qu.ObjToString(idMap["_id"])
  443. esMap["_id"] = _id
  444. esMap["updatetime"] = time.Now().Unix()
  445. update = append(update, map[string]interface{}{"_id": _id})
  446. //地区处理
  447. hadArea := false //标记是否有省份信息
  448. for i, field := range AreaFiled {
  449. if tmp[field] == nil {
  450. continue
  451. }
  452. if code := fmt.Sprint(tmp[field]); code != "" {
  453. esMap[field] = code //加入esMap
  454. if !hadArea {
  455. if i == 0 && len(code) >= 8 { //credit_no企业信用代码
  456. code = code[2:8]
  457. } else if i == 1 && len(code) >= 6 { //company_code注册号
  458. code = code[:6]
  459. }
  460. if city := AddressMap[code]; city != nil { //未作废中取
  461. if city.Province != "" {
  462. esMap["company_area"] = city.Province //省
  463. }
  464. if city.City != "" {
  465. esMap["company_city"] = city.City //市
  466. }
  467. if city.District != "" {
  468. esMap["company_district"] = city.District //县
  469. }
  470. } else { //作废中取
  471. if city := AddressOldMap[code]; city != nil {
  472. if city.Province != "" {
  473. esMap["company_area"] = city.Province //省
  474. }
  475. if city.City != "" {
  476. esMap["company_city"] = city.City //市
  477. }
  478. if city.District != "" {
  479. esMap["company_district"] = city.District //县
  480. }
  481. }
  482. }
  483. if esMap["company_area"] != nil {
  484. hadArea = true
  485. }
  486. }
  487. }
  488. }
  489. //生索引字段处理
  490. for _, field := range EsFields {
  491. if tmp[field] == nil {
  492. continue
  493. }
  494. //qu.Debug(field, tmp[field], tmp[field] == nil)
  495. if field == "capital" { //注册资本处理
  496. text := qu.ObjToString(tmp[field])
  497. if currency := GetCurrency(text); currency != "" {
  498. esMap["currency"] = currency //币种
  499. }
  500. capital := ObjToMoney(text)
  501. capital = capital / 10000
  502. if capital != 0 {
  503. esMap[field] = capital //注册资本
  504. }
  505. } else if field == "company_type" { //企业类型处理
  506. text := qu.ObjToString(tmp[field])
  507. if text != "" {
  508. esMap["company_type_old"] = text //old
  509. if strings.Contains(text, "个体") {
  510. esMap[field] = "个体工商户"
  511. } else {
  512. text = strings.ReplaceAll(text, "(", "(")
  513. text = strings.ReplaceAll(text, ")", ")")
  514. if stype := QyStypeMap[text]; stype != "" {
  515. esMap[field] = stype
  516. } else {
  517. esMap[field] = "其他"
  518. }
  519. }
  520. }
  521. } else if field == "company_status" { //企业类型处理
  522. text := qu.ObjToString(tmp[field])
  523. if text != "" {
  524. text = strings.ReplaceAll(text, "(", "(")
  525. text = strings.ReplaceAll(text, ")", ")")
  526. if status := CompanyStatusMap[text]; status != "" {
  527. esMap[field] = status
  528. } else {
  529. esMap[field] = "其他"
  530. }
  531. }
  532. } else if strings.Contains(field, "date") || strings.Contains(field, "time") { //时间处理
  533. if tmp[field] != nil {
  534. if timeTmp, ok := tmp[field].(primitive.DateTime); ok {
  535. t := timeTmp.Time()
  536. esMap[field] = qu.FormatDate(&t, qu.Date_Short_Layout)
  537. } else if timeTmp, ok := tmp[field].(string); ok && timeTmp != "" {
  538. t := timeReg.FindString(timeTmp)
  539. if t != "" {
  540. esMap[field] = t
  541. }
  542. }
  543. }
  544. } else {
  545. if text := qu.ObjToString(tmp[field]); text != "" {
  546. if field == "company_name" {
  547. esMap["name"] = text
  548. }
  549. esMap[field] = text
  550. }
  551. }
  552. }
  553. //不生索引字段处理
  554. for _, field := range Fields {
  555. if text, ok := tmp[field].(string); ok && text != "" {
  556. mgoMap[field] = text
  557. } else if text, ok := tmp[field].(int32); ok {
  558. mgoMap[field] = text
  559. }
  560. }
  561. //list数据
  562. stockName := []string{}
  563. for field, fieldMap := range FieldListMap {
  564. if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 {
  565. if len(list) > 500 {
  566. list = list[:500]
  567. }
  568. tmpArrMgo := []map[string]interface{}{}
  569. tmpArrEs := []map[string]interface{}{}
  570. for _, l := range list {
  571. tmpMapMgo := map[string]interface{}{}
  572. tmpMapEs := map[string]interface{}{}
  573. m := l.(map[string]interface{})
  574. for f, b := range fieldMap {
  575. if text := qu.ObjToString(m[f]); text != "" {
  576. tmpMapMgo[f] = text
  577. if f == "stock_name" {
  578. stockName = append(stockName, text)
  579. }
  580. if b {
  581. tmpMapEs[f] = text
  582. }
  583. }
  584. }
  585. if len(tmpMapEs) > 0 {
  586. tmpArrEs = append(tmpArrEs, tmpMapEs)
  587. }
  588. if len(tmpMapMgo) > 0 {
  589. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  590. }
  591. }
  592. if len(tmpArrEs) > 0 {
  593. esMap[field] = tmpArrEs
  594. }
  595. if len(tmpArrMgo) > 0 {
  596. mgoMap[field] = tmpArrMgo
  597. }
  598. }
  599. }
  600. if len(stockName) > 0 {
  601. esMap["stock_name"] = strings.Join(stockName, ",")
  602. }
  603. for _, field := range AllFieldListMap {
  604. if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 {
  605. tmpArrMgo := []map[string]interface{}{}
  606. for _, l := range list {
  607. tmpMapMgo := map[string]interface{}{}
  608. m := l.(map[string]interface{})
  609. for k, v := range m {
  610. if tmpv := fmt.Sprint(v); v != nil && tmpv != "" {
  611. tmpMapMgo[k] = tmpv
  612. }
  613. }
  614. if len(tmpMapMgo) > 0 {
  615. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  616. }
  617. }
  618. if len(tmpArrMgo) > 0 {
  619. mgoMap[field] = tmpArrMgo
  620. }
  621. }
  622. }
  623. //年报信息
  624. sortArr := []string{} //存年份
  625. sortMap := map[string]map[string]interface{}{} //key:年份;val:每一个年报中的company_phone,company_email,stock_name
  626. tmpArrMgo := []map[string]interface{}{}
  627. if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
  628. for _, annual_report := range annual_reports {
  629. tmpMapMgo := map[string]interface{}{}
  630. tmpMap := map[string]interface{}{}
  631. report_year := ""
  632. m := annual_report.(map[string]interface{})
  633. for i, tmpArr := range AnnualReportsArr {
  634. for _, f := range tmpArr {
  635. if text := m[f]; text != nil {
  636. if textstr := fmt.Sprint(text); textstr != "" {
  637. if f == "report_year" {
  638. report_year = textstr
  639. sortArr = append(sortArr, textstr)
  640. } else if f == "company_phone" && !Han.MatchString(textstr) && len(textstr) >= 7 {
  641. tmpMap[f] = textstr
  642. tmpMapMgo[f] = textstr
  643. } else if f == "company_email" && !Han.MatchString(textstr) && len(textstr) >= 4 {
  644. tmpMap[f] = textstr
  645. tmpMapMgo[f] = textstr
  646. }
  647. if i == 0 { //字符串信息
  648. if f == "company_phone" || f == "company_email" {
  649. continue
  650. }
  651. tmpMapMgo[f] = textstr
  652. } else if i == 1 { //转金额
  653. money := ObjToMoney(textstr) / 10000
  654. tmpMapMgo[f] = money
  655. }
  656. }
  657. }
  658. }
  659. }
  660. // stock_nameArr := []string{}
  661. // if i_partners, ok := m["report_partners"].([]interface{}); ok && len(i_partners) > 0 { //股东信息
  662. // for _, par := range i_partners {
  663. // m := par.(map[string]interface{})
  664. // if stock_name, ok := m["stock_name"].(string); ok && stock_name != "" {
  665. // stock_nameArr = append(stock_nameArr, stock_name)
  666. // }
  667. // }
  668. // }
  669. // if len(stock_nameArr) > 0 {
  670. // stockname := strings.Join(stock_nameArr, ",")
  671. // tmpMap["stock_name"] = stockname
  672. // }
  673. sortMap[report_year] = tmpMap
  674. if len(tmpMapMgo) > 0 {
  675. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  676. }
  677. }
  678. }
  679. if len(tmpArrMgo) > 0 {
  680. mgoMap["annual_reports"] = tmpArrMgo
  681. }
  682. if len(sortArr) > 0 && len(sortMap) > 0 {
  683. sort.Strings(sortArr)
  684. report_year := sortArr[len(sortArr)-1]
  685. for k, v := range sortMap[report_year] {
  686. esMap[k] = v
  687. }
  688. }
  689. lock.Lock()
  690. EsSaveCache <- esMap
  691. lock.Unlock()
  692. }(tmp)
  693. tmp = make(map[string]interface{})
  694. }
  695. wg.Wait()
  696. log.Println("Run Over...Count:", sum)
  697. return true
  698. }
  699. //过滤后数据存库
  700. func SaveEs() {
  701. log.Println("Es Save...")
  702. arru := make([]map[string]interface{}, 500)
  703. indexu := 0
  704. for {
  705. select {
  706. case v := <-EsSaveCache:
  707. arru[indexu] = v
  708. indexu++
  709. if indexu == 500 {
  710. SP <- true
  711. go func(arru []map[string]interface{}) {
  712. defer func() {
  713. <-SP
  714. }()
  715. Es.BulkSave(Index, Itype, &arru, true)
  716. }(arru)
  717. arru = make([]map[string]interface{}, 500)
  718. indexu = 0
  719. }
  720. case <-time.After(1000 * time.Millisecond):
  721. if indexu > 0 {
  722. SP <- true
  723. go func(arru []map[string]interface{}) {
  724. defer func() {
  725. <-SP
  726. }()
  727. Es.BulkSave(Index, Itype, &arru, true)
  728. }(arru[:indexu])
  729. arru = make([]map[string]interface{}, 500)
  730. indexu = 0
  731. }
  732. }
  733. }
  734. }
  735. //所有数据存库
  736. func SaveAllEs() {
  737. log.Println("Es SaveAll...")
  738. arruAll := make([]map[string]interface{}, 500)
  739. indexu := 0
  740. for {
  741. select {
  742. case v := <-EsSaveAllCache:
  743. arruAll[indexu] = v
  744. indexu++
  745. if indexu == 500 {
  746. SPAll <- true
  747. go func(arruAll []map[string]interface{}) {
  748. defer func() {
  749. <-SPAll
  750. }()
  751. Es.BulkSave(OtherIndex, OtherItype, &arruAll, true)
  752. }(arruAll)
  753. arruAll = make([]map[string]interface{}, 500)
  754. indexu = 0
  755. }
  756. case <-time.After(1000 * time.Millisecond):
  757. if indexu > 0 {
  758. SPAll <- true
  759. go func(arruAll []map[string]interface{}) {
  760. defer func() {
  761. <-SPAll
  762. }()
  763. Es.BulkSave(OtherIndex, OtherItype, &arruAll, true)
  764. }(arruAll[:indexu])
  765. arruAll = make([]map[string]interface{}, 500)
  766. indexu = 0
  767. }
  768. }
  769. }
  770. }
  771. func InitAddress() {
  772. defer qu.Catch()
  773. log.Println("Init Address...")
  774. AddressMap = map[string]*City{}
  775. AddressOldMap = map[string]*City{}
  776. address, _ := Mgo.Find("address", nil, nil, nil, false, -1, -1)
  777. for _, tmp := range *address {
  778. code := qu.ObjToString(tmp["code"])
  779. remark := fmt.Sprint(tmp["Remarks"])
  780. city := &City{}
  781. tmpjson, err := json.Marshal(tmp)
  782. if err == nil {
  783. json.Unmarshal(tmpjson, city)
  784. }
  785. if remark == "已作废" {
  786. AddressOldMap[code] = city
  787. } else {
  788. AddressMap[code] = city
  789. }
  790. }
  791. }
  792. func InitQyStype() {
  793. defer qu.Catch()
  794. log.Println("Init QyStype...")
  795. QyStypeMap = map[string]string{}
  796. qystype, _ := Mgo.Find("qystype", nil, nil, nil, false, -1, -1)
  797. for _, tmp := range *qystype {
  798. name := qu.ObjToString(tmp["name"])
  799. prename := qu.ObjToString(tmp["prename"])
  800. QyStypeMap[name] = prename
  801. }
  802. }
  803. func InitCompanyStatus() {
  804. defer qu.Catch()
  805. log.Println("Init CompanyStatus...")
  806. CompanyStatusMap = map[string]string{}
  807. status, _ := Mgo.Find("company_status", nil, nil, nil, false, -1, -1)
  808. for _, tmp := range *status {
  809. old_status := qu.ObjToString(tmp["old"])
  810. new_status := qu.ObjToString(tmp["new"])
  811. CompanyStatusMap[old_status] = new_status
  812. }
  813. }