task.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/spf13/viper"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var (
  14. partner = []string{"identify_no", "stock_type", "stock_name", "identify_type", "stock_capital", "stock_realcapital"}
  15. employee = []string{"employee_name", "position"}
  16. TypeMap = map[string]string{
  17. "采购单位": "1",
  18. "中标单位": "2", // 投标企业
  19. "代理机构": "3",
  20. "厂商": "4",
  21. }
  22. TypeMap1 = map[string]string{
  23. "固定电话": "1",
  24. "手机号": "2",
  25. "邮箱": "3",
  26. "不存在": "4",
  27. }
  28. )
  29. // StdAdd 增量数据
  30. func StdAdd(q interface{}) {
  31. defer util.Catch()
  32. sess := Mgo.GetMgoConn()
  33. defer Mgo.DestoryMongoConn(sess)
  34. pool := make(chan bool, 18)
  35. wg := &sync.WaitGroup{}
  36. //q := bson.M{"_id": "affe29f8d061f3faa4170cafba41f316"}
  37. //q := bson.M{"updatetime": bson.M{"$gt": Updatetime}}
  38. log.Info("StdAdd", zap.Any("q", q))
  39. it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
  40. count := 0
  41. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  42. if count%20000 == 0 {
  43. //log.Println("current:", count)
  44. log.Info("StdAdd", zap.Int("current:", count))
  45. log.Info("StdAdd", zap.Any("q", q), zap.Any("updatetime", tmp["updatetime"]))
  46. }
  47. if util.IntAll(tmp["use_flag"]) > 5 {
  48. continue
  49. }
  50. pool <- true
  51. wg.Add(1)
  52. go func(tmp map[string]interface{}) {
  53. defer func() {
  54. <-pool
  55. wg.Done()
  56. }()
  57. esMap := map[string]interface{}{}
  58. //生索引字段处理
  59. for _, field := range EsFields {
  60. if tmp[field] == nil {
  61. continue
  62. }
  63. if field == "company_name" {
  64. esMap[field] = tmp["company_name"]
  65. esMap["name"] = tmp["company_name"]
  66. } else if field == "history_name" {
  67. var nameArr []string
  68. names := util.ObjToString(tmp["history_name"])
  69. if strings.Contains(names, ",") {
  70. nameArr = append(nameArr, strings.Split(names, ",")...)
  71. }
  72. if len(nameArr) > 0 {
  73. esMap["history_name"] = nameArr
  74. }
  75. } else if field == "establish_date" {
  76. // 成立日期修改成时间戳
  77. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local)
  78. if err != nil {
  79. util.Debug(err)
  80. } else {
  81. esMap["establish_date"] = location.Unix()
  82. }
  83. } else if field == "cancel_date" {
  84. esMap["cancel_date"] = tmp["cancel_date"]
  85. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["cancel_date"]), time.Local)
  86. if err != nil {
  87. util.Debug(err)
  88. } else {
  89. esMap["cancel_date_unix"] = location.Unix()
  90. }
  91. } else if field == "lastupdatetime" {
  92. esMap["lastupdatetime"] = tmp["update_time_msql"]
  93. } else if field == "bid_projectname" {
  94. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  95. p1 := util.ObjArrToStringArr(pname)
  96. esMap["bid_projectname"] = strings.Join(p1, ",")
  97. }
  98. } else if field == "bid_purchasing" {
  99. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  100. p1 := util.ObjArrToStringArr(pur)
  101. esMap["bid_purchasing"] = strings.Join(p1, ",")
  102. }
  103. } else if field == "bid_area" {
  104. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  105. p1 := util.ObjArrToStringArr(areas)
  106. esMap["bid_area"] = strings.Join(p1, ",")
  107. }
  108. } else if field == "partners" {
  109. if ps, ok := tmp["partners"].([]interface{}); ok {
  110. var parr []map[string]interface{}
  111. for _, v := range ps {
  112. p := make(map[string]interface{})
  113. v1 := v.(map[string]interface{})
  114. for _, field := range partner {
  115. if v1[field] == nil {
  116. continue
  117. }
  118. if field == "stock_capital" || field == "stock_realcapital" {
  119. if v, err := strconv.ParseFloat(util.ObjToString(v1[field]), 64); err == nil {
  120. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  121. p[field] = v
  122. }
  123. } else {
  124. p[field] = v1[field]
  125. }
  126. }
  127. if len(p) > 0 {
  128. parr = append(parr, p)
  129. }
  130. }
  131. if len(parr) > 0 {
  132. esMap[field] = parr
  133. }
  134. }
  135. } else if field == "employees" {
  136. if ps, ok := tmp["employees"].([]interface{}); ok {
  137. var parr []map[string]interface{}
  138. for _, v := range ps {
  139. p := make(map[string]interface{})
  140. v1 := v.(map[string]interface{})
  141. for _, field := range employee {
  142. if v1[field] == nil {
  143. continue
  144. } else {
  145. p[field] = v1[field]
  146. }
  147. }
  148. if len(p) > 0 {
  149. parr = append(parr, p)
  150. }
  151. }
  152. if len(parr) > 0 {
  153. esMap[field] = parr
  154. }
  155. }
  156. } else if field == "bid_unittype" {
  157. if t2, ok := tmp["bid_unittype"].([]interface{}); ok {
  158. var arr []string
  159. for _, v := range util.ObjArrToStringArr(t2) {
  160. arr = append(arr, TypeMap[v])
  161. }
  162. esMap["bid_unittype"] = strings.Join(arr, ",")
  163. }
  164. } else if field == "bid_contracttype" {
  165. if t2, ok := tmp["bid_contracttype"].([]interface{}); ok {
  166. var arr []string
  167. for _, v := range util.ObjArrToStringArr(t2) {
  168. arr = append(arr, TypeMap1[v])
  169. }
  170. esMap["bid_contracttype"] = strings.Join(arr, ",")
  171. }
  172. } else if field == "_id" {
  173. esMap["_id"] = tmp["_id"]
  174. esMap["id"] = tmp["_id"]
  175. } else {
  176. esMap[field] = tmp[field]
  177. }
  178. }
  179. company_type := util.ObjToString(tmp["company_type"])
  180. company_name := util.ObjToString(tmp["company_name"])
  181. if company_type == "个体工商户" {
  182. if len([]rune(company_name)) >= 5 {
  183. esMap["company_type_int"] = 31
  184. } else {
  185. esMap["company_type_int"] = 32
  186. }
  187. } else if company_type == "其他" || company_type == "" {
  188. if len([]rune(company_name)) >= 4 {
  189. esMap["company_type_int"] = 21
  190. } else {
  191. esMap["company_type_int"] = 22
  192. }
  193. } else {
  194. if company_type == "内资分公司" || company_type == "内资非法人企业、非公司私营企业及内资非公司企业分支机构" {
  195. esMap["company_type_int"] = 12
  196. } else if len([]rune(company_name)) >= 4 {
  197. esMap["company_type_int"] = 11
  198. } else {
  199. esMap["company_type_int"] = 13
  200. }
  201. }
  202. //国企、央企、其他
  203. own_type := getCompanyType(company_name, company_type)
  204. if own_type != "" {
  205. esMap["ownership_type"] = own_type
  206. }
  207. // 添加企业的国标行业分类
  208. companyID := util.ObjToString(tmp["_id"])
  209. updateMgo := make(map[string]interface{}, 0)
  210. if own_type != "" {
  211. updateMgo["ownership_type"] = own_type
  212. }
  213. where := map[string]interface{}{
  214. "company_id": companyID,
  215. }
  216. industry, _ := Mgo181.FindOne("company_industry", where)
  217. if industry != nil && len(*industry) > 0 {
  218. if util.ObjToString((*industry)["industry_l1_name"]) != "" {
  219. esMap["national_top"] = (*industry)["industry_l1_name"]
  220. updateMgo["national_top"] = (*industry)["industry_l1_name"]
  221. }
  222. if util.ObjToString((*industry)["industry_l2_name"]) != "" {
  223. esMap["national_sub"] = (*industry)["industry_l2_name"]
  224. updateMgo["national_sub"] = (*industry)["industry_l2_name"]
  225. }
  226. if util.ObjToString((*industry)["industry_l3_name"]) != "" {
  227. esMap["national_subsub"] = (*industry)["industry_l3_name"]
  228. updateMgo["national_subsub"] = (*industry)["industry_l3_name"]
  229. }
  230. }
  231. if len(updateMgo) > 0 {
  232. updatePool <- []map[string]interface{}{
  233. {"_id": companyID},
  234. {"$set": updateMgo},
  235. }
  236. }
  237. EsSaveCache <- esMap // 保存es
  238. }(tmp)
  239. tmp = make(map[string]interface{})
  240. }
  241. wg.Wait()
  242. log.Info("StdAdd", zap.Any("q", q), zap.Int("Run Over...Count::", count))
  243. }
  244. // StdAll 分段处理存量数据
  245. func StdAll() {
  246. type Biddingall struct {
  247. Coll string
  248. Gtime int64
  249. Ltime int64
  250. }
  251. type RoutinesConf struct {
  252. Num int64
  253. }
  254. type AllConf struct {
  255. All map[string]Biddingall
  256. Routines RoutinesConf
  257. }
  258. var all AllConf
  259. viper.SetConfigFile("stdall.toml")
  260. viper.SetConfigName("stdall") // 配置文件名称(无扩展名)
  261. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  262. viper.AddConfigPath("./")
  263. err := viper.ReadInConfig() // 查找并读取配置文件
  264. if err != nil { // 处理读取配置文件的错误
  265. fmt.Println("ReadInConfig err =>", err)
  266. return
  267. }
  268. err = viper.Unmarshal(&all)
  269. if err != nil {
  270. fmt.Println("biddingAllDataTask Unmarshal err =>", err)
  271. return
  272. }
  273. //fmt.Println("all", all)
  274. for k, conf := range all.All {
  275. go dealAll(conf.Coll, k, conf.Gtime, conf.Ltime, all.Routines.Num)
  276. }
  277. }
  278. func dealAll(coll, kword string, gtime, ltime, routines int64) {
  279. defer util.Catch()
  280. sess := Mgo.GetMgoConn()
  281. defer Mgo.DestoryMongoConn(sess)
  282. q := map[string]interface{}{
  283. "updatetime": map[string]interface{}{
  284. "$gt": gtime,
  285. "$lte": ltime,
  286. },
  287. }
  288. log.Info("dealAll", zap.Any("q", q))
  289. pool := make(chan bool, routines)
  290. wg := &sync.WaitGroup{}
  291. it := sess.DB(Dbname).C(coll).Find(q).Iter()
  292. count := 0
  293. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  294. if count%20000 == 0 {
  295. log.Info(kword, zap.Int("current:", count))
  296. log.Info(kword, zap.Any("updatetime =>", tmp["updatetime"]))
  297. }
  298. pool <- true
  299. if util.IntAll(tmp["use_flag"]) > 5 {
  300. continue
  301. }
  302. wg.Add(1)
  303. go func(tmp map[string]interface{}) {
  304. defer func() {
  305. <-pool
  306. wg.Done()
  307. }()
  308. esMap := map[string]interface{}{}
  309. //生索引字段处理
  310. for _, field := range EsFields {
  311. if tmp[field] == nil {
  312. continue
  313. }
  314. if field == "company_name" {
  315. esMap[field] = tmp["company_name"]
  316. esMap["name"] = tmp["company_name"]
  317. } else if field == "history_name" {
  318. var nameArr []string
  319. for _, v := range strings.Split(util.ObjToString(tmp["history_name"]), ",") {
  320. if v != "" {
  321. nameArr = append(nameArr, v)
  322. }
  323. }
  324. if len(nameArr) > 0 {
  325. esMap["history_name"] = nameArr
  326. }
  327. } else if field == "establish_date" {
  328. // 成立日期修改成时间戳
  329. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local)
  330. if err != nil {
  331. util.Debug(err)
  332. } else {
  333. esMap["establish_date"] = location.Unix()
  334. }
  335. } else if field == "lastupdatetime" {
  336. esMap["lastupdatetime"] = tmp["update_time_msql"]
  337. } else if field == "cancel_date" {
  338. esMap["cancel_date"] = tmp["cancel_date"]
  339. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["cancel_date"]), time.Local)
  340. if err != nil {
  341. util.Debug(err)
  342. } else {
  343. esMap["cancel_date_unix"] = location.Unix()
  344. }
  345. } else if field == "bid_projectname" {
  346. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  347. p1 := util.ObjArrToStringArr(pname)
  348. esMap["bid_projectname"] = strings.Join(p1, ",")
  349. }
  350. } else if field == "bid_purchasing" {
  351. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  352. p1 := util.ObjArrToStringArr(pur)
  353. esMap["bid_purchasing"] = strings.Join(p1, ",")
  354. }
  355. } else if field == "bid_area" {
  356. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  357. p1 := util.ObjArrToStringArr(areas)
  358. esMap["bid_area"] = strings.Join(p1, ",")
  359. }
  360. } else if field == "partners" {
  361. if ps, ok := tmp["partners"].([]interface{}); ok {
  362. var parr []map[string]interface{}
  363. for _, v := range ps {
  364. p := make(map[string]interface{})
  365. v1 := v.(map[string]interface{})
  366. for _, field := range partner {
  367. if v1[field] == nil {
  368. continue
  369. }
  370. if field == "stock_capital" || field == "stock_realcapital" {
  371. text := util.ObjToString(v1[field])
  372. if strings.Contains(text, "万元") {
  373. text = strings.Replace(text, "万元", "", -1)
  374. }
  375. if v, err := strconv.ParseFloat(text, 64); err == nil {
  376. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  377. p[field] = v
  378. }
  379. } else {
  380. p[field] = v1[field]
  381. }
  382. }
  383. if len(p) > 0 {
  384. parr = append(parr, p)
  385. }
  386. }
  387. if len(parr) > 0 {
  388. esMap[field] = parr
  389. }
  390. }
  391. } else if field == "employees" {
  392. if ps, ok := tmp["employees"].([]interface{}); ok {
  393. var parr []map[string]interface{}
  394. for _, v := range ps {
  395. p := make(map[string]interface{})
  396. v1 := v.(map[string]interface{})
  397. for _, field := range employee {
  398. if v1[field] == nil {
  399. continue
  400. } else {
  401. p[field] = v1[field]
  402. }
  403. }
  404. if len(p) > 0 {
  405. parr = append(parr, p)
  406. }
  407. }
  408. if len(parr) > 0 {
  409. esMap[field] = parr
  410. }
  411. }
  412. } else if field == "bid_unittype" {
  413. if t2, ok := tmp["bid_unittype"].([]interface{}); ok {
  414. var arr []string
  415. for _, v := range util.ObjArrToStringArr(t2) {
  416. arr = append(arr, TypeMap[v])
  417. }
  418. esMap["bid_unittype"] = strings.Join(arr, ",")
  419. }
  420. } else if field == "bid_contracttype" {
  421. if t2, ok := tmp["bid_contracttype"].([]interface{}); ok {
  422. var arr []string
  423. for _, v := range util.ObjArrToStringArr(t2) {
  424. arr = append(arr, TypeMap1[v])
  425. }
  426. esMap["bid_contracttype"] = strings.Join(arr, ",")
  427. }
  428. } else if field == "_id" {
  429. esMap["_id"] = tmp["_id"]
  430. esMap["id"] = tmp["_id"]
  431. } else {
  432. esMap[field] = tmp[field]
  433. }
  434. }
  435. company_type := util.ObjToString(tmp["company_type"])
  436. company_name := util.ObjToString(tmp["company_name"])
  437. if company_type == "个体工商户" {
  438. if len([]rune(company_name)) >= 5 {
  439. esMap["company_type_int"] = 31
  440. } else {
  441. esMap["company_type_int"] = 32
  442. }
  443. } else if company_type == "其他" || company_type == "" {
  444. if len([]rune(company_name)) >= 4 {
  445. esMap["company_type_int"] = 21
  446. } else {
  447. esMap["company_type_int"] = 22
  448. }
  449. if strings.HasSuffix(company_name, "公司") || strings.HasSuffix(company_name, "集团") {
  450. esMap["company_type_int"] = 12
  451. }
  452. } else {
  453. if company_type == "内资分公司" || company_type == "内资非法人企业、非公司私营企业及内资非公司企业分支机构" {
  454. esMap["company_type_int"] = 12
  455. } else if len([]rune(company_name)) >= 4 {
  456. esMap["company_type_int"] = 11
  457. } else {
  458. esMap["company_type_int"] = 13
  459. }
  460. }
  461. EsSaveCache <- esMap // 保存es
  462. }(tmp)
  463. tmp = make(map[string]interface{})
  464. }
  465. wg.Wait()
  466. log.Info(kword, zap.Any("Run Over...Count", count))
  467. }
  468. // SaveEs 过滤后数据存库
  469. func SaveEs() {
  470. arru := make([]map[string]interface{}, 100)
  471. indexu := 0
  472. for {
  473. select {
  474. case v := <-EsSaveCache:
  475. arru[indexu] = v
  476. indexu++
  477. if indexu == 100 {
  478. SP <- true
  479. go func(arru []map[string]interface{}) {
  480. defer func() {
  481. <-SP
  482. }()
  483. Es.BulkSave(Index, arru)
  484. // 存在第二个集群
  485. if Es2 != nil && Es2.S_esurl != "" {
  486. Es2.BulkSave(Index, arru)
  487. }
  488. }(arru)
  489. arru = make([]map[string]interface{}, 100)
  490. indexu = 0
  491. }
  492. case <-time.After(1000 * time.Millisecond):
  493. if indexu > 0 {
  494. SP <- true
  495. go func(arru []map[string]interface{}) {
  496. defer func() {
  497. <-SP
  498. }()
  499. Es.BulkSave(Index, arru)
  500. // 存在第二个集群
  501. if Es2 != nil && Es2.S_esurl != "" {
  502. Es2.BulkSave(Index, arru)
  503. }
  504. }(arru[:indexu])
  505. arru = make([]map[string]interface{}, 100)
  506. indexu = 0
  507. }
  508. }
  509. }
  510. }