task.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. qu "qfw/util"
  7. "regexp"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/cron"
  14. "go.mongodb.org/mongo-driver/bson/primitive"
  15. )
  16. var (
  17. //清理
  18. Han = regexp.MustCompile("[\\p{Han}]") //匹配汉字
  19. //es、mgo非全部字段
  20. FieldListMap = map[string]map[string]bool{
  21. "partners": map[string]bool{"stock_type": true, "stock_name": true, "stock_capital": true, "stock_realcapital": true, "identify_type": true, "identify_no": true},
  22. "employees": map[string]bool{"employee_name": false, "position": false},
  23. }
  24. //全部字段
  25. AllFieldListMap = []string{"punishes", "operations", "illegals"}
  26. //地区处理
  27. AreaFiled = []string{"credit_no", "company_code", "area_code"}
  28. //年报信息
  29. AnnualReportsArr = [][]string{
  30. []string{"report_year", "company_phone", "zip_code", "company_email", "employee_no", "operator_name"},
  31. []string{"total_assets", "total_equity", "total_sales", "total_profit", "main_business_income", "profit_amount", "total_tax", "total_liability"},
  32. }
  33. //区域code补全
  34. CodeMap = map[int]string{
  35. 2: "0000",
  36. 4: "00",
  37. }
  38. )
  39. // var AllFieldListMap = map[string]string{
  40. // "punishes": "punish_size",
  41. // "operations": "operation_size",
  42. // "illegals": "illegal_size",
  43. // }
  44. //不生索引字段
  45. //var NotEsField = []string{"cancel_reason", "revoke_reason", "cancels"} //cancel_size
  46. type City struct {
  47. Code string `json:"code"`
  48. Province string `json:"province"`
  49. City string `json:"city"`
  50. District string `json:"district"`
  51. }
  52. //定时任务
  53. func TimeTask() {
  54. StartTask()
  55. c := cron.New()
  56. cronstr := "0 0 15 ? * Tue" //每周二15点执行
  57. //cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次
  58. c.AddFunc(cronstr, func() { StartTask() })
  59. c.Start()
  60. }
  61. //开始任务
  62. func StartTask() {
  63. log.Println("Start Task...")
  64. // query := map[string]interface{}{
  65. // "updatetime": map[string]interface{}{
  66. // "$gt": Updatetime,
  67. // },
  68. // }
  69. QyxyStandard()
  70. // run := QyxyStandard()
  71. // if run {
  72. // time.Sleep(5 * time.Minute)
  73. // if Mgo.DelColl(Dbcoll) {
  74. // log.Println("Delete Coll ", Dbcoll, "Success")
  75. // } else {
  76. // log.Println("Delete Coll ", Dbcoll, "Fail")
  77. // }
  78. // }
  79. }
  80. //标准化数据,生索引
  81. func QyxyStandard() bool {
  82. defer qu.Catch()
  83. sess := Mgo.GetMgoConn()
  84. defer Mgo.DestoryMongoConn(sess)
  85. pool := make(chan bool, 20) //控制线程数
  86. wg := &sync.WaitGroup{}
  87. lock := &sync.Mutex{} //控制读写
  88. arr := [][]map[string]interface{}{}
  89. //q := map[string]interface{}{
  90. // "_id" : "0005ba07a6527b1825a169f07eaf4c79",
  91. //}
  92. count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
  93. log.Println("共查询:", count, "条")
  94. if count == 0 {
  95. return false
  96. }
  97. it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
  98. sum := 0
  99. for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
  100. if sum%1000 == 0 {
  101. log.Println("current:", sum)
  102. }
  103. pool <- true
  104. wg.Add(1)
  105. go func(tmp map[string]interface{}) {
  106. defer func() {
  107. <-pool
  108. wg.Done()
  109. }()
  110. update := []map[string]interface{}{}
  111. esMap := map[string]interface{}{}
  112. mgoMap := map[string]interface{}{}
  113. //id处理
  114. idMap := tmp["_id"].(map[string]interface{})
  115. _id := qu.ObjToString(idMap["_id"])
  116. esMap["_id"] = _id
  117. esMap["updatetime"] = time.Now().Unix()
  118. update = append(update, map[string]interface{}{"_id": _id})
  119. //地区处理
  120. hadArea := false //标记是否有省份信息
  121. for i, field := range AreaFiled {
  122. if tmp[field] == nil {
  123. continue
  124. }
  125. if code := fmt.Sprint(tmp[field]); code != "" {
  126. esMap[field] = code //加入esMap
  127. if !hadArea {
  128. if i == 0 && len(code) >= 8 { //credit_no企业信用代码
  129. code = code[2:8]
  130. } else if i == 1 && len(code) >= 6 { //company_code注册号
  131. code = code[:6]
  132. }
  133. if city := AddressMap[code]; city != nil { //未作废中取
  134. if city.Province != "" {
  135. esMap["company_area"] = city.Province //省
  136. }
  137. if city.City != "" {
  138. esMap["company_city"] = city.City //市
  139. }
  140. if city.District != "" {
  141. esMap["company_district"] = city.District //县
  142. }
  143. } else { //作废中取
  144. if city := AddressOldMap[code]; city != nil {
  145. if city.Province != "" {
  146. esMap["company_area"] = city.Province //省
  147. }
  148. if city.City != "" {
  149. esMap["company_city"] = city.City //市
  150. }
  151. if city.District != "" {
  152. esMap["company_district"] = city.District //县
  153. }
  154. }
  155. }
  156. if esMap["company_area"] != nil {
  157. hadArea = true
  158. }
  159. }
  160. }
  161. }
  162. //生索引字段处理
  163. for _, field := range EsFields {
  164. if tmp[field] == nil {
  165. continue
  166. }
  167. //qu.Debug(field, tmp[field], tmp[field] == nil)
  168. if field == "capital" { //注册资本处理
  169. text := qu.ObjToString(tmp[field])
  170. if currency := GetCurrency(text); currency != "" {
  171. esMap["currency"] = currency //币种
  172. }
  173. capital := ObjToMoney(text)
  174. capital = capital / 10000
  175. if capital != 0 {
  176. esMap[field] = capital //注册资本
  177. }
  178. } else if field == "company_type" { //企业类型处理
  179. text := qu.ObjToString(tmp[field])
  180. if text != "" {
  181. esMap["company_type_old"] = text //old
  182. if strings.Contains(text, "个体") {
  183. esMap[field] = "个体工商户"
  184. } else {
  185. text = strings.ReplaceAll(text, "(", "(")
  186. text = strings.ReplaceAll(text, ")", ")")
  187. if stype := QyStypeMap[text]; stype != "" {
  188. esMap[field] = stype
  189. } else {
  190. esMap[field] = "其他"
  191. }
  192. }
  193. }
  194. } else if field == "company_status" { //企业类型处理
  195. text := qu.ObjToString(tmp[field])
  196. if text != "" {
  197. text = strings.ReplaceAll(text, "(", "(")
  198. text = strings.ReplaceAll(text, ")", ")")
  199. if status := CompanyStatusMap[text]; status != "" {
  200. esMap[field] = status
  201. } else {
  202. esMap[field] = "其他"
  203. }
  204. }
  205. } else if strings.Contains(field, "date") || strings.Contains(field, "time") { //时间处理
  206. if tmp[field] != nil {
  207. if timeTmp, ok := tmp[field].(primitive.DateTime); ok {
  208. t := timeTmp.Time()
  209. esMap[field] = qu.FormatDate(&t, qu.Date_Short_Layout)
  210. } else if timeTmp, ok := tmp[field].(string); ok && timeTmp != "" {
  211. t := timeReg.FindString(timeTmp)
  212. if t != "" {
  213. esMap[field] = t
  214. }
  215. }
  216. }
  217. } else {
  218. if text := qu.ObjToString(tmp[field]); text != "" {
  219. if field == "company_name" {
  220. esMap["name"] = text
  221. }
  222. esMap[field] = text
  223. }
  224. }
  225. }
  226. //不生索引字段处理
  227. for _, field := range Fields {
  228. if text, ok := tmp[field].(string); ok && text != "" {
  229. mgoMap[field] = text
  230. } else if text, ok := tmp[field].(int32); ok {
  231. mgoMap[field] = text
  232. }
  233. }
  234. //list数据
  235. stockName := []string{}
  236. for field, fieldMap := range FieldListMap {
  237. if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 {
  238. if len(list) > 500 {
  239. list = list[:500]
  240. }
  241. tmpArrMgo := []map[string]interface{}{}
  242. tmpArrEs := []map[string]interface{}{}
  243. for _, l := range list {
  244. tmpMapMgo := map[string]interface{}{}
  245. tmpMapEs := map[string]interface{}{}
  246. m := l.(map[string]interface{})
  247. for f, b := range fieldMap {
  248. if text := qu.ObjToString(m[f]); text != "" {
  249. tmpMapMgo[f] = text
  250. if f == "stock_name" {
  251. stockName = append(stockName, text)
  252. }
  253. if b {
  254. if f == "stock_capital" {
  255. if v, err := strconv.ParseFloat(text, 64); err == nil {
  256. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  257. tmpMapEs[f] = v
  258. }
  259. }else if f == "stock_realcapital" {
  260. if v, err := strconv.ParseFloat(text, 64); err == nil {
  261. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64)
  262. tmpMapEs[f] = v
  263. }
  264. }else {
  265. tmpMapEs[f] = text
  266. }
  267. }
  268. }
  269. }
  270. if len(tmpMapEs) > 0 {
  271. tmpArrEs = append(tmpArrEs, tmpMapEs)
  272. }
  273. if len(tmpMapMgo) > 0 {
  274. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  275. }
  276. }
  277. if len(tmpArrEs) > 0 {
  278. esMap[field] = tmpArrEs
  279. }
  280. if len(tmpArrMgo) > 0 {
  281. mgoMap[field] = tmpArrMgo
  282. }
  283. }
  284. }
  285. if len(stockName) > 0 {
  286. esMap["stock_name"] = strings.Join(stockName, ",")
  287. }
  288. for _, field := range AllFieldListMap {
  289. if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 {
  290. tmpArrMgo := []map[string]interface{}{}
  291. for _, l := range list {
  292. tmpMapMgo := map[string]interface{}{}
  293. m := l.(map[string]interface{})
  294. for k, v := range m {
  295. if tmpv := fmt.Sprint(v); v != nil && tmpv != "" {
  296. tmpMapMgo[k] = tmpv
  297. }
  298. }
  299. if len(tmpMapMgo) > 0 {
  300. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  301. }
  302. }
  303. if len(tmpArrMgo) > 0 {
  304. mgoMap[field] = tmpArrMgo
  305. }
  306. }
  307. }
  308. //年报信息
  309. sortArr := []string{} //存年份
  310. sortMap := map[string]map[string]interface{}{} //key:年份;val:每一个年报中的company_phone,company_email,stock_name
  311. tmpArrMgo := []map[string]interface{}{}
  312. if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
  313. for _, annual_report := range annual_reports {
  314. tmpMapMgo := map[string]interface{}{} //记录每个年报信息标准化到mgo的数据
  315. tmpMap := map[string]interface{}{} //只记录每个年报信息的company_email和company_phone
  316. report_year := ""
  317. m := annual_report.(map[string]interface{})
  318. for i, tmpArr := range AnnualReportsArr {
  319. for _, f := range tmpArr {
  320. if text := m[f]; text != nil {
  321. if textstr := fmt.Sprint(text); textstr != "" {
  322. if f == "report_year" {
  323. report_year = textstr
  324. sortArr = append(sortArr, textstr)
  325. } else if f == "company_phone" && !Han.MatchString(textstr) && len(textstr) >= 7 {
  326. tmpMap[f] = textstr
  327. tmpMapMgo[f] = textstr
  328. } else if f == "company_email" && !Han.MatchString(textstr) && len(textstr) >= 4 {
  329. tmpMap[f] = textstr
  330. tmpMapMgo[f] = textstr
  331. }
  332. if i == 0 { //字符串信息
  333. if f == "company_phone" || f == "company_email" {
  334. continue
  335. }
  336. tmpMapMgo[f] = textstr
  337. } else if i == 1 { //转金额
  338. money := ObjToMoney(textstr) / 10000
  339. tmpMapMgo[f] = money
  340. }
  341. }
  342. }
  343. }
  344. }
  345. // stock_nameArr := []string{}
  346. // if i_partners, ok := m["report_partners"].([]interface{}); ok && len(i_partners) > 0 { //股东信息
  347. // for _, par := range i_partners {
  348. // m := par.(map[string]interface{})
  349. // if stock_name, ok := m["stock_name"].(string); ok && stock_name != "" {
  350. // stock_nameArr = append(stock_nameArr, stock_name)
  351. // }
  352. // }
  353. // }
  354. // if len(stock_nameArr) > 0 {
  355. // stockname := strings.Join(stock_nameArr, ",")
  356. // tmpMap["stock_name"] = stockname
  357. // }
  358. sortMap[report_year] = tmpMap
  359. if len(tmpMapMgo) > 0 {
  360. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  361. }
  362. }
  363. }
  364. if len(tmpArrMgo) > 0 {
  365. mgoMap["annual_reports"] = tmpArrMgo
  366. }
  367. if len(sortArr) > 0 && len(sortMap) > 0 {
  368. sort.Strings(sortArr)
  369. report_year := sortArr[len(sortArr)-1]
  370. for k, v := range sortMap[report_year] {
  371. esMap[k] = v
  372. }
  373. }
  374. //合并
  375. for k, v := range esMap {
  376. if k == "partners" {
  377. continue
  378. }
  379. mgoMap[k] = v
  380. }
  381. //es数据过滤
  382. EsSaveFlag := true
  383. company_type := qu.ObjToString(esMap["company_type"])
  384. company_name := qu.ObjToString(esMap["company_name"])
  385. if company_type == "个体工商户" {
  386. if len([]rune(company_name)) >= 5 {
  387. esMap["company_type_int"] = 31
  388. }else {
  389. esMap["company_type_int"] = 32
  390. }
  391. }else if company_type == "其他" || company_type == "" {
  392. if len([]rune(company_name)) >= 4 {
  393. esMap["company_type_int"] = 21
  394. }else {
  395. esMap["company_type_int"] = 22
  396. }
  397. }else {
  398. if company_type == "内资分公司" {
  399. esMap["company_type_int"] = 12
  400. }else if len([]rune(company_name)) >= 4 {
  401. esMap["company_type_int"] = 11
  402. }else {
  403. esMap["company_type_int"] = 13
  404. }
  405. }
  406. lock.Lock()
  407. if EsSaveFlag {
  408. if esMap["history_name"] != nil {
  409. var nameArr []string
  410. for _, v := range strings.Split(qu.ObjToString(esMap["history_name"]), ";") {
  411. if v != "" {
  412. nameArr = append(nameArr, v)
  413. }
  414. }
  415. if len(nameArr) > 0 {
  416. esMap["history_name"] = nameArr
  417. }
  418. }
  419. EsSaveCache <- esMap //过滤后数据保存
  420. }
  421. EsSaveAllCache <- esMap //所有数据保存
  422. update = append(update, map[string]interface{}{"$set": mgoMap})
  423. SaveHistoryName(tmp) //保存曾用名
  424. if len(update) == 2 {
  425. arr = append(arr, update)
  426. }
  427. if len(arr) > 500 {
  428. tmps := arr
  429. Mgo.UpSertBulk(Savecoll, tmps...)
  430. arr = [][]map[string]interface{}{}
  431. }
  432. lock.Unlock()
  433. }(tmp)
  434. tmp = make(map[string]interface{})
  435. }
  436. wg.Wait()
  437. lock.Lock()
  438. if len(arr) > 0 {
  439. Mgo.UpSertBulk(Savecoll, arr...)
  440. }
  441. lock.Unlock()
  442. log.Println("Run Over...Count:", sum)
  443. return true
  444. }
  445. //所有企业数据标准化
  446. func HistoryQyxyStandard() {
  447. qu.Debug("--------History--------")
  448. defer qu.Catch()
  449. sess := Mgo.GetMgoConn()
  450. defer Mgo.DestoryMongoConn(sess)
  451. pool := make(chan bool, 20) //控制线程数
  452. wg := &sync.WaitGroup{}
  453. lock := &sync.Mutex{} //控制读写
  454. arr := [][]map[string]interface{}{}
  455. count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
  456. log.Println("共查询:", count, "条")
  457. it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
  458. sum := 0
  459. for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
  460. if sum%10000 == 0 {
  461. log.Println("current:", sum)
  462. }
  463. pool <- true
  464. wg.Add(1)
  465. go func(tmp map[string]interface{}) {
  466. defer func() {
  467. <-pool
  468. wg.Done()
  469. }()
  470. update := []map[string]interface{}{}
  471. esMap := map[string]interface{}{}
  472. mgoMap := map[string]interface{}{}
  473. //id处理
  474. idMap := tmp["_id"].(map[string]interface{})
  475. _id := qu.ObjToString(idMap["_id"])
  476. esMap["_id"] = _id
  477. esMap["updatetime"] = time.Now().Unix()
  478. update = append(update, map[string]interface{}{"_id": _id})
  479. //地区处理
  480. hadArea := false //标记是否有省份信息
  481. for i, field := range AreaFiled {
  482. if tmp[field] == nil {
  483. continue
  484. }
  485. if code := fmt.Sprint(tmp[field]); code != "" {
  486. esMap[field] = code //加入esMap
  487. if !hadArea {
  488. if i == 0 && len(code) >= 8 { //credit_no企业信用代码
  489. code = code[2:8]
  490. } else if i == 1 && len(code) >= 6 { //company_code注册号
  491. code = code[:6]
  492. }
  493. if city := AddressMap[code]; city != nil { //未作废中取
  494. if city.Province != "" {
  495. esMap["company_area"] = city.Province //省
  496. }
  497. if city.City != "" {
  498. esMap["company_city"] = city.City //市
  499. }
  500. if city.District != "" {
  501. esMap["company_district"] = city.District //县
  502. }
  503. } else { //作废中取
  504. if city := AddressOldMap[code]; city != nil {
  505. if city.Province != "" {
  506. esMap["company_area"] = city.Province //省
  507. }
  508. if city.City != "" {
  509. esMap["company_city"] = city.City //市
  510. }
  511. if city.District != "" {
  512. esMap["company_district"] = city.District //县
  513. }
  514. }
  515. }
  516. if esMap["company_area"] != nil {
  517. hadArea = true
  518. }
  519. }
  520. }
  521. }
  522. //生索引字段处理
  523. for _, field := range EsFields {
  524. if tmp[field] == nil {
  525. continue
  526. }
  527. //qu.Debug(field, tmp[field], tmp[field] == nil)
  528. if field == "capital" { //注册资本处理
  529. text := qu.ObjToString(tmp[field])
  530. if currency := GetCurrency(text); currency != "" {
  531. esMap["currency"] = currency //币种
  532. }
  533. capital := ObjToMoney(text)
  534. capital = capital / 10000
  535. if capital != 0 {
  536. esMap[field] = capital //注册资本
  537. }
  538. } else if field == "company_type" { //企业类型处理
  539. text := qu.ObjToString(tmp[field])
  540. if text != "" {
  541. esMap["company_type_old"] = text //old
  542. if strings.Contains(text, "个体") || strings.Contains(text, "非公司") {
  543. esMap[field] = "个体工商户"
  544. } else {
  545. text = strings.ReplaceAll(text, "(", "(")
  546. text = strings.ReplaceAll(text, ")", ")")
  547. if stype := QyStypeMap[text]; stype != "" {
  548. esMap[field] = stype
  549. } else {
  550. esMap[field] = "其他"
  551. }
  552. }
  553. }
  554. } else if field == "company_status" { //企业类型处理
  555. text := qu.ObjToString(tmp[field])
  556. if text != "" {
  557. text = strings.ReplaceAll(text, "(", "(")
  558. text = strings.ReplaceAll(text, ")", ")")
  559. if status := CompanyStatusMap[text]; status != "" {
  560. esMap[field] = status
  561. } else {
  562. esMap[field] = "其他"
  563. }
  564. }
  565. } else if strings.Contains(field, "date") || strings.Contains(field, "time") { //时间处理
  566. if tmp[field] != nil {
  567. if timeTmp, ok := tmp[field].(primitive.DateTime); ok {
  568. t := timeTmp.Time()
  569. esMap[field] = qu.FormatDate(&t, qu.Date_Short_Layout)
  570. } else if timeTmp, ok := tmp[field].(string); ok && timeTmp != "" {
  571. t := timeReg.FindString(timeTmp)
  572. if t != "" {
  573. esMap[field] = t
  574. }
  575. }
  576. }
  577. } else {
  578. if text := qu.ObjToString(tmp[field]); text != "" {
  579. if field == "company_name" {
  580. esMap["name"] = text
  581. }
  582. esMap[field] = text
  583. }
  584. }
  585. }
  586. //不生索引字段处理
  587. for _, field := range Fields {
  588. if text, ok := tmp[field].(string); ok && text != "" {
  589. mgoMap[field] = text
  590. } else if text, ok := tmp[field].(int32); ok {
  591. mgoMap[field] = text
  592. }
  593. }
  594. //list数据
  595. stockName := []string{}
  596. for field, fieldMap := range FieldListMap {
  597. if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 {
  598. if len(list) > 500 {
  599. list = list[:500]
  600. }
  601. tmpArrMgo := []map[string]interface{}{}
  602. tmpArrEs := []map[string]interface{}{}
  603. for _, l := range list {
  604. tmpMapMgo := map[string]interface{}{}
  605. tmpMapEs := map[string]interface{}{}
  606. m := l.(map[string]interface{})
  607. for f, b := range fieldMap {
  608. if text := qu.ObjToString(m[f]); text != "" {
  609. tmpMapMgo[f] = text
  610. if f == "stock_name" {
  611. stockName = append(stockName, text)
  612. }
  613. if b {
  614. if f == "stock_capital" {
  615. if v, err := strconv.ParseFloat(text, 64); err == nil {
  616. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  617. tmpMapEs[f] = v
  618. }
  619. }else if f == "stock_realcapital" {
  620. if v, err := strconv.ParseFloat(text, 64); err == nil {
  621. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64)
  622. tmpMapEs[f] = v
  623. }
  624. }else {
  625. tmpMapEs[f] = text
  626. }
  627. }
  628. }
  629. }
  630. if len(tmpMapEs) > 0 {
  631. tmpArrEs = append(tmpArrEs, tmpMapEs)
  632. }
  633. if len(tmpMapMgo) > 0 {
  634. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  635. }
  636. }
  637. if len(tmpArrEs) > 0 {
  638. esMap[field] = tmpArrEs
  639. }
  640. if len(tmpArrMgo) > 0 {
  641. mgoMap[field] = tmpArrMgo
  642. }
  643. }
  644. }
  645. if len(stockName) > 0 {
  646. esMap["stock_name"] = strings.Join(stockName, ",")
  647. }
  648. for _, field := range AllFieldListMap {
  649. if list, ok := tmp[field].([]interface{}); ok && len(list) > 0 {
  650. tmpArrMgo := []map[string]interface{}{}
  651. for _, l := range list {
  652. tmpMapMgo := map[string]interface{}{}
  653. m := l.(map[string]interface{})
  654. for k, v := range m {
  655. if tmpv := fmt.Sprint(v); v != nil && tmpv != "" {
  656. tmpMapMgo[k] = tmpv
  657. }
  658. }
  659. if len(tmpMapMgo) > 0 {
  660. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  661. }
  662. }
  663. if len(tmpArrMgo) > 0 {
  664. mgoMap[field] = tmpArrMgo
  665. }
  666. }
  667. }
  668. //年报信息
  669. sortArr := []string{} //存年份
  670. sortMap := map[string]map[string]interface{}{} //key:年份;val:每一个年报中的company_phone,company_email,stock_name
  671. tmpArrMgo := []map[string]interface{}{}
  672. if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
  673. for _, annual_report := range annual_reports {
  674. tmpMapMgo := map[string]interface{}{} //记录每个年报信息标准化到mgo的数据
  675. tmpMap := map[string]interface{}{} //只记录每个年报信息的company_email和company_phone
  676. report_year := ""
  677. m := annual_report.(map[string]interface{})
  678. for i, tmpArr := range AnnualReportsArr {
  679. for _, f := range tmpArr {
  680. if text := m[f]; text != nil {
  681. if textstr := fmt.Sprint(text); textstr != "" {
  682. if f == "report_year" {
  683. report_year = textstr
  684. sortArr = append(sortArr, textstr)
  685. } else if f == "company_phone" && !Han.MatchString(textstr) && len(textstr) >= 7 {
  686. tmpMap[f] = textstr
  687. tmpMapMgo[f] = textstr
  688. } else if f == "company_email" && !Han.MatchString(textstr) && len(textstr) >= 4 {
  689. tmpMap[f] = textstr
  690. tmpMapMgo[f] = textstr
  691. }
  692. if i == 0 { //字符串信息
  693. if f == "company_phone" || f == "company_email" {
  694. continue
  695. }
  696. tmpMapMgo[f] = textstr
  697. } else if i == 1 { //转金额
  698. money := ObjToMoney(textstr) / 10000
  699. tmpMapMgo[f] = money
  700. }
  701. }
  702. }
  703. }
  704. }
  705. // stock_nameArr := []string{}
  706. // if i_partners, ok := m["report_partners"].([]interface{}); ok && len(i_partners) > 0 { //股东信息
  707. // for _, par := range i_partners {
  708. // m := par.(map[string]interface{})
  709. // if stock_name, ok := m["stock_name"].(string); ok && stock_name != "" {
  710. // stock_nameArr = append(stock_nameArr, stock_name)
  711. // }
  712. // }
  713. // }
  714. // if len(stock_nameArr) > 0 {
  715. // stockname := strings.Join(stock_nameArr, ",")
  716. // tmpMap["stock_name"] = stockname
  717. // }
  718. sortMap[report_year] = tmpMap
  719. if len(tmpMapMgo) > 0 {
  720. tmpArrMgo = append(tmpArrMgo, tmpMapMgo)
  721. }
  722. }
  723. }
  724. if len(tmpArrMgo) > 0 {
  725. mgoMap["annual_reports"] = tmpArrMgo
  726. }
  727. if len(sortArr) > 0 && len(sortMap) > 0 {
  728. sort.Strings(sortArr)
  729. report_year := sortArr[len(sortArr)-1]
  730. for k, v := range sortMap[report_year] {
  731. esMap[k] = v
  732. }
  733. }
  734. //合并
  735. for k, v := range esMap {
  736. if k == "partners" {
  737. continue
  738. }
  739. mgoMap[k] = v
  740. }
  741. //es数据过滤
  742. //EsSaveFlag := true
  743. company_type := qu.ObjToString(esMap["company_type"])
  744. company_name := qu.ObjToString(esMap["company_name"])
  745. if company_type == "个体工商户" {
  746. if len([]rune(company_name)) >= 5 {
  747. esMap["company_type_int"] = 31
  748. }else {
  749. esMap["company_type_int"] = 32
  750. }
  751. }else if company_type == "其他" || company_type == "" {
  752. if len([]rune(company_name)) >= 4 {
  753. esMap["company_type_int"] = 21
  754. }else {
  755. esMap["company_type_int"] = 22
  756. }
  757. }else {
  758. if company_type == "内资分公司" {
  759. esMap["company_type_int"] = 12
  760. }else if len([]rune(company_name)) >= 4 {
  761. esMap["company_type_int"] = 11
  762. }else {
  763. esMap["company_type_int"] = 13
  764. }
  765. }
  766. lock.Lock()
  767. //if EsSaveFlag {
  768. if esMap["history_name"] != nil {
  769. var nameArr []string
  770. for _, v := range strings.Split(qu.ObjToString(esMap["history_name"]), ";") {
  771. if v != "" {
  772. nameArr = append(nameArr, v)
  773. }
  774. }
  775. if len(nameArr) > 0 {
  776. esMap["history_name"] = nameArr
  777. }
  778. }
  779. EsSaveCache <- esMap //过滤后数据保存
  780. //}
  781. //EsSaveAllCache <- esMap //所有数据保存
  782. //SaveHistoryName(tmp)
  783. //update = append(update, map[string]interface{}{"$set": mgoMap})
  784. //if len(update) == 2 {
  785. // arr = append(arr, update)
  786. //}
  787. //if len(arr) > 500 {
  788. // tmps := arr
  789. // Mgo.UpSertBulk(Savecoll, tmps...)
  790. // arr = [][]map[string]interface{}{}
  791. //}
  792. lock.Unlock()
  793. }(tmp)
  794. tmp = make(map[string]interface{})
  795. }
  796. wg.Wait()
  797. lock.Lock()
  798. if len(arr) > 0 {
  799. //Mgo.UpSertBulk(Savecoll, arr...)
  800. }
  801. lock.Unlock()
  802. log.Println("Run Over...Count:", sum)
  803. }
  804. //过滤后数据存库
  805. func SaveEs() {
  806. log.Println("Es Save...")
  807. arru := make([]map[string]interface{}, 500)
  808. indexu := 0
  809. for {
  810. select {
  811. case v := <-EsSaveCache:
  812. arru[indexu] = v
  813. indexu++
  814. if indexu == 500 {
  815. SP <- true
  816. go func(arru []map[string]interface{}) {
  817. defer func() {
  818. <-SP
  819. }()
  820. Es.BulkSave(Index, Itype, &arru, true)
  821. }(arru)
  822. arru = make([]map[string]interface{}, 500)
  823. indexu = 0
  824. }
  825. case <-time.After(1000 * time.Millisecond):
  826. if indexu > 0 {
  827. SP <- true
  828. go func(arru []map[string]interface{}) {
  829. defer func() {
  830. <-SP
  831. }()
  832. qu.Debug(Index, Itype, arru)
  833. Es.BulkSave(Index, Itype, &arru, true)
  834. }(arru[:indexu])
  835. arru = make([]map[string]interface{}, 500)
  836. indexu = 0
  837. }
  838. }
  839. }
  840. }
  841. func SaveHistoryName(tmp map[string]interface{}) {
  842. if qu.ObjToString(tmp["company_name"]) != "" {
  843. set := make(map[string]interface{})
  844. set["company_name"] = qu.ObjToString(tmp["company_name"])
  845. set["credit_no"] = tmp["credit_no"]
  846. set["org_code"] = tmp["org_code"]
  847. set["company_id"] = tmp["company_id"]
  848. set["company_type"] = tmp["company_type"]
  849. set["company_status"] = tmp["company_status"]
  850. set["company_code"] = tmp["company_code"]
  851. Mgo.Update("qyxy_historyname", map[string]interface{}{"company_name": qu.ObjToString(tmp["company_name"])}, map[string]interface{}{"$set": set}, true, false)
  852. if qu.ObjToString(tmp["history_name"]) != "" {
  853. for _, v := range strings.Split(qu.ObjToString(tmp["history_name"]), ";") {
  854. if v != "" {
  855. set["company_name"] = v
  856. set["credit_no"] = tmp["credit_no"]
  857. set["org_code"] = tmp["org_code"]
  858. set["company_id"] = tmp["company_id"]
  859. set["company_type"] = tmp["company_type"]
  860. set["company_status"] = tmp["company_status"]
  861. set["company_code"] = tmp["company_code"]
  862. Mgo.Update("qyxy_historyname", map[string]interface{}{"company_name": v}, map[string]interface{}{"$set": set}, true, false)
  863. }
  864. }
  865. }
  866. }
  867. }
  868. //所有数据存库
  869. func SaveAllEs() {
  870. log.Println("Es SaveAll...")
  871. arruAll := make([]map[string]interface{}, 500)
  872. indexu := 0
  873. for {
  874. select {
  875. case v := <-EsSaveAllCache:
  876. arruAll[indexu] = v
  877. indexu++
  878. if indexu == 500 {
  879. SPAll <- true
  880. go func(arruAll []map[string]interface{}) {
  881. defer func() {
  882. <-SPAll
  883. }()
  884. Es.BulkSave(OtherIndex, OtherItype, &arruAll, true)
  885. }(arruAll)
  886. arruAll = make([]map[string]interface{}, 500)
  887. indexu = 0
  888. }
  889. case <-time.After(1000 * time.Millisecond):
  890. if indexu > 0 {
  891. SPAll <- true
  892. go func(arruAll []map[string]interface{}) {
  893. defer func() {
  894. <-SPAll
  895. }()
  896. Es.BulkSave(OtherIndex, OtherItype, &arruAll, true)
  897. }(arruAll[:indexu])
  898. arruAll = make([]map[string]interface{}, 500)
  899. indexu = 0
  900. }
  901. }
  902. }
  903. }
  904. func InitAddress() {
  905. defer qu.Catch()
  906. log.Println("Init Address...")
  907. AddressMap = map[string]*City{}
  908. AddressOldMap = map[string]*City{}
  909. sess := Mgo.GetMgoConn()
  910. defer Mgo.DestoryMongoConn(sess)
  911. result := sess.DB(Dbname).C("address_new_2020").Find(nil).Iter()
  912. count := 0
  913. for tmp := make(map[string]interface{}); result.Next(&tmp); count++ {
  914. code := qu.ObjToString(tmp["code"])
  915. codeLen := len(code)
  916. if codeLen > 6 {
  917. continue
  918. }
  919. if t_code := CodeMap[codeLen]; t_code != "" { //新的address表补齐code
  920. code = code + t_code
  921. }
  922. remark := fmt.Sprint(tmp["Remarks"])
  923. city := &City{}
  924. tmpjson, err := json.Marshal(tmp)
  925. if err == nil {
  926. json.Unmarshal(tmpjson, city)
  927. }
  928. if remark == "已作废" {
  929. AddressOldMap[code] = city
  930. } else {
  931. AddressMap[code] = city
  932. }
  933. }
  934. qu.Debug("Init Address end...", len(AddressMap), len(AddressOldMap))
  935. }
  936. func InitQyStype() {
  937. defer qu.Catch()
  938. log.Println("Init QyStype...")
  939. QyStypeMap = map[string]string{}
  940. qystype, _ := Mgo.Find("qystype", nil, nil, nil, false, -1, -1)
  941. for _, tmp := range *qystype {
  942. name := qu.ObjToString(tmp["name"])
  943. prename := qu.ObjToString(tmp["prename"])
  944. QyStypeMap[name] = prename
  945. }
  946. }
  947. func InitCompanyStatus() {
  948. defer qu.Catch()
  949. log.Println("Init CompanyStatus...")
  950. CompanyStatusMap = map[string]string{}
  951. status, _ := Mgo.Find("company_status", nil, nil, nil, false, -1, -1)
  952. for _, tmp := range *status {
  953. old_status := qu.ObjToString(tmp["old"])
  954. new_status := qu.ObjToString(tmp["new"])
  955. CompanyStatusMap[old_status] = new_status
  956. }
  957. }