task.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/spf13/viper"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var (
  14. partner = []string{"identify_no", "stock_type", "stock_name", "identify_type", "stock_capital", "stock_realcapital"}
  15. employee = []string{"employee_name", "position"}
  16. TypeMap = map[string]string{
  17. "采购单位": "1",
  18. "中标单位": "2", // 投标企业
  19. "代理机构": "3",
  20. "厂商": "4",
  21. }
  22. TypeMap1 = map[string]string{
  23. "固定电话": "1",
  24. "手机号": "2",
  25. "邮箱": "3",
  26. "不存在": "4",
  27. }
  28. )
  29. // StdAdd 增量数据
  30. func StdAdd(q interface{}) {
  31. defer util.Catch()
  32. sess := Mgo.GetMgoConn()
  33. defer Mgo.DestoryMongoConn(sess)
  34. pool := make(chan bool, 18)
  35. wg := &sync.WaitGroup{}
  36. //q := bson.M{"_id": "affe29f8d061f3faa4170cafba41f316"}
  37. //q := bson.M{"updatetime": bson.M{"$gt": Updatetime}}
  38. log.Info("StdAdd", zap.Any("q", q))
  39. it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
  40. count := 0
  41. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  42. if count%20000 == 0 {
  43. //log.Println("current:", count)
  44. log.Info("StdAdd", zap.Int("current:", count))
  45. log.Info("StdAdd", zap.Any("q", q), zap.Any("updatetime", tmp["updatetime"]))
  46. }
  47. if util.IntAll(tmp["use_flag"]) > 5 {
  48. continue
  49. }
  50. pool <- true
  51. wg.Add(1)
  52. go func(tmp map[string]interface{}) {
  53. defer func() {
  54. <-pool
  55. wg.Done()
  56. }()
  57. esMap := map[string]interface{}{}
  58. //生索引字段处理
  59. for _, field := range EsFields {
  60. if tmp[field] == nil {
  61. continue
  62. }
  63. if field == "company_name" {
  64. esMap[field] = tmp["company_name"]
  65. esMap["name"] = tmp["company_name"]
  66. } else if field == "history_name" {
  67. var nameArr []string
  68. names := util.ObjToString(tmp["history_name"])
  69. if strings.Contains(names, ",") {
  70. nameArr = append(nameArr, strings.Split(names, ",")...)
  71. }
  72. if len(nameArr) > 0 {
  73. esMap["history_name"] = nameArr
  74. }
  75. } else if field == "establish_date" {
  76. // 成立日期修改成时间戳
  77. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local)
  78. if err != nil {
  79. util.Debug(err)
  80. } else {
  81. esMap["establish_date"] = location.Unix()
  82. }
  83. } else if field == "cancel_date" {
  84. esMap["cancel_date"] = tmp["cancel_date"]
  85. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["cancel_date"]), time.Local)
  86. if err != nil {
  87. util.Debug(err)
  88. } else {
  89. esMap["cancel_date_unix"] = location.Unix()
  90. }
  91. } else if field == "lastupdatetime" {
  92. esMap["lastupdatetime"] = tmp["update_time_msql"]
  93. } else if field == "bid_projectname" {
  94. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  95. p1 := util.ObjArrToStringArr(pname)
  96. esMap["bid_projectname"] = strings.Join(p1, ",")
  97. }
  98. } else if field == "bid_purchasing" {
  99. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  100. p1 := util.ObjArrToStringArr(pur)
  101. esMap["bid_purchasing"] = strings.Join(p1, ",")
  102. }
  103. } else if field == "bid_area" {
  104. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  105. p1 := util.ObjArrToStringArr(areas)
  106. esMap["bid_area"] = strings.Join(p1, ",")
  107. }
  108. } else if field == "partners" {
  109. if ps, ok := tmp["partners"].([]interface{}); ok {
  110. var parr []map[string]interface{}
  111. for _, v := range ps {
  112. p := make(map[string]interface{})
  113. v1 := v.(map[string]interface{})
  114. for _, field := range partner {
  115. if v1[field] == nil {
  116. continue
  117. }
  118. if field == "stock_capital" || field == "stock_realcapital" {
  119. if v, err := strconv.ParseFloat(util.ObjToString(v1[field]), 64); err == nil {
  120. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  121. p[field] = v
  122. }
  123. } else {
  124. p[field] = v1[field]
  125. }
  126. }
  127. if len(p) > 0 {
  128. parr = append(parr, p)
  129. }
  130. }
  131. if len(parr) > 0 {
  132. esMap[field] = parr
  133. }
  134. }
  135. } else if field == "employees" {
  136. if ps, ok := tmp["employees"].([]interface{}); ok {
  137. var parr []map[string]interface{}
  138. for _, v := range ps {
  139. p := make(map[string]interface{})
  140. v1 := v.(map[string]interface{})
  141. for _, field := range employee {
  142. if v1[field] == nil {
  143. continue
  144. } else {
  145. p[field] = v1[field]
  146. }
  147. }
  148. if len(p) > 0 {
  149. parr = append(parr, p)
  150. }
  151. }
  152. if len(parr) > 0 {
  153. esMap[field] = parr
  154. }
  155. }
  156. } else if field == "bid_unittype" {
  157. if t2, ok := tmp["bid_unittype"].([]interface{}); ok {
  158. var arr []string
  159. for _, v := range util.ObjArrToStringArr(t2) {
  160. arr = append(arr, TypeMap[v])
  161. }
  162. esMap["bid_unittype"] = strings.Join(arr, ",")
  163. }
  164. } else if field == "bid_contracttype" {
  165. if t2, ok := tmp["bid_contracttype"].([]interface{}); ok {
  166. var arr []string
  167. for _, v := range util.ObjArrToStringArr(t2) {
  168. arr = append(arr, TypeMap1[v])
  169. }
  170. esMap["bid_contracttype"] = strings.Join(arr, ",")
  171. }
  172. } else if field == "_id" {
  173. esMap["_id"] = tmp["_id"]
  174. esMap["id"] = tmp["_id"]
  175. } else {
  176. esMap[field] = tmp[field]
  177. }
  178. }
  179. company_type := util.ObjToString(tmp["company_type"])
  180. company_name := util.ObjToString(tmp["company_name"])
  181. if company_type == "个体工商户" {
  182. if len([]rune(company_name)) >= 5 {
  183. esMap["company_type_int"] = 31
  184. } else {
  185. esMap["company_type_int"] = 32
  186. }
  187. } else if company_type == "其他" || company_type == "" {
  188. if len([]rune(company_name)) >= 4 {
  189. esMap["company_type_int"] = 21
  190. } else {
  191. esMap["company_type_int"] = 22
  192. }
  193. } else {
  194. if company_type == "内资分公司" || company_type == "内资非法人企业、非公司私营企业及内资非公司企业分支机构" {
  195. esMap["company_type_int"] = 12
  196. } else if len([]rune(company_name)) >= 4 {
  197. esMap["company_type_int"] = 11
  198. } else {
  199. esMap["company_type_int"] = 13
  200. }
  201. }
  202. EsSaveCache <- esMap // 保存es
  203. }(tmp)
  204. tmp = make(map[string]interface{})
  205. }
  206. wg.Wait()
  207. log.Info("StdAdd", zap.Any("q", q), zap.Int("Run Over...Count::", count))
  208. }
  209. // StdAll 分段处理存量数据
  210. func StdAll() {
  211. type Biddingall struct {
  212. Coll string
  213. Gtime int64
  214. Ltime int64
  215. }
  216. type RoutinesConf struct {
  217. Num int64
  218. }
  219. type AllConf struct {
  220. All map[string]Biddingall
  221. Routines RoutinesConf
  222. }
  223. var all AllConf
  224. viper.SetConfigFile("stdall.toml")
  225. viper.SetConfigName("stdall") // 配置文件名称(无扩展名)
  226. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  227. viper.AddConfigPath("./")
  228. err := viper.ReadInConfig() // 查找并读取配置文件
  229. if err != nil { // 处理读取配置文件的错误
  230. fmt.Println("ReadInConfig err =>", err)
  231. return
  232. }
  233. err = viper.Unmarshal(&all)
  234. if err != nil {
  235. fmt.Println("biddingAllDataTask Unmarshal err =>", err)
  236. return
  237. }
  238. //fmt.Println("all", all)
  239. for k, conf := range all.All {
  240. go dealAll(conf.Coll, k, conf.Gtime, conf.Ltime, all.Routines.Num)
  241. }
  242. }
  243. func dealAll(coll, kword string, gtime, ltime, routines int64) {
  244. defer util.Catch()
  245. sess := Mgo.GetMgoConn()
  246. defer Mgo.DestoryMongoConn(sess)
  247. q := map[string]interface{}{
  248. "updatetime": map[string]interface{}{
  249. "$gt": gtime,
  250. "$lte": ltime,
  251. },
  252. }
  253. log.Info("dealAll", zap.Any("q", q))
  254. pool := make(chan bool, routines)
  255. wg := &sync.WaitGroup{}
  256. it := sess.DB(Dbname).C(coll).Find(q).Iter()
  257. count := 0
  258. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  259. if count%20000 == 0 {
  260. log.Info(kword, zap.Int("current:", count))
  261. log.Info(kword, zap.Any("updatetime =>", tmp["updatetime"]))
  262. }
  263. pool <- true
  264. if util.IntAll(tmp["use_flag"]) > 5 {
  265. continue
  266. }
  267. wg.Add(1)
  268. go func(tmp map[string]interface{}) {
  269. defer func() {
  270. <-pool
  271. wg.Done()
  272. }()
  273. esMap := map[string]interface{}{}
  274. //生索引字段处理
  275. for _, field := range EsFields {
  276. if tmp[field] == nil {
  277. continue
  278. }
  279. if field == "company_name" {
  280. esMap[field] = tmp["company_name"]
  281. esMap["name"] = tmp["company_name"]
  282. } else if field == "history_name" {
  283. var nameArr []string
  284. for _, v := range strings.Split(util.ObjToString(tmp["history_name"]), ",") {
  285. if v != "" {
  286. nameArr = append(nameArr, v)
  287. }
  288. }
  289. if len(nameArr) > 0 {
  290. esMap["history_name"] = nameArr
  291. }
  292. } else if field == "establish_date" {
  293. // 成立日期修改成时间戳
  294. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local)
  295. if err != nil {
  296. util.Debug(err)
  297. } else {
  298. esMap["establish_date"] = location.Unix()
  299. }
  300. } else if field == "lastupdatetime" {
  301. esMap["lastupdatetime"] = tmp["update_time_msql"]
  302. } else if field == "cancel_date" {
  303. esMap["cancel_date"] = tmp["cancel_date"]
  304. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["cancel_date"]), time.Local)
  305. if err != nil {
  306. util.Debug(err)
  307. } else {
  308. esMap["cancel_date_unix"] = location.Unix()
  309. }
  310. } else if field == "bid_projectname" {
  311. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  312. p1 := util.ObjArrToStringArr(pname)
  313. esMap["bid_projectname"] = strings.Join(p1, ",")
  314. }
  315. } else if field == "bid_purchasing" {
  316. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  317. p1 := util.ObjArrToStringArr(pur)
  318. esMap["bid_purchasing"] = strings.Join(p1, ",")
  319. }
  320. } else if field == "bid_area" {
  321. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  322. p1 := util.ObjArrToStringArr(areas)
  323. esMap["bid_area"] = strings.Join(p1, ",")
  324. }
  325. } else if field == "partners" {
  326. if ps, ok := tmp["partners"].([]interface{}); ok {
  327. var parr []map[string]interface{}
  328. for _, v := range ps {
  329. p := make(map[string]interface{})
  330. v1 := v.(map[string]interface{})
  331. for _, field := range partner {
  332. if v1[field] == nil {
  333. continue
  334. }
  335. if field == "stock_capital" || field == "stock_realcapital" {
  336. text := util.ObjToString(v1[field])
  337. if strings.Contains(text, "万元") {
  338. text = strings.Replace(text, "万元", "", -1)
  339. }
  340. if v, err := strconv.ParseFloat(text, 64); err == nil {
  341. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  342. p[field] = v
  343. }
  344. } else {
  345. p[field] = v1[field]
  346. }
  347. }
  348. if len(p) > 0 {
  349. parr = append(parr, p)
  350. }
  351. }
  352. if len(parr) > 0 {
  353. esMap[field] = parr
  354. }
  355. }
  356. } else if field == "employees" {
  357. if ps, ok := tmp["employees"].([]interface{}); ok {
  358. var parr []map[string]interface{}
  359. for _, v := range ps {
  360. p := make(map[string]interface{})
  361. v1 := v.(map[string]interface{})
  362. for _, field := range employee {
  363. if v1[field] == nil {
  364. continue
  365. } else {
  366. p[field] = v1[field]
  367. }
  368. }
  369. if len(p) > 0 {
  370. parr = append(parr, p)
  371. }
  372. }
  373. if len(parr) > 0 {
  374. esMap[field] = parr
  375. }
  376. }
  377. } else if field == "bid_unittype" {
  378. if t2, ok := tmp["bid_unittype"].([]interface{}); ok {
  379. var arr []string
  380. for _, v := range util.ObjArrToStringArr(t2) {
  381. arr = append(arr, TypeMap[v])
  382. }
  383. esMap["bid_unittype"] = strings.Join(arr, ",")
  384. }
  385. } else if field == "bid_contracttype" {
  386. if t2, ok := tmp["bid_contracttype"].([]interface{}); ok {
  387. var arr []string
  388. for _, v := range util.ObjArrToStringArr(t2) {
  389. arr = append(arr, TypeMap1[v])
  390. }
  391. esMap["bid_contracttype"] = strings.Join(arr, ",")
  392. }
  393. } else if field == "_id" {
  394. esMap["_id"] = tmp["_id"]
  395. esMap["id"] = tmp["_id"]
  396. } else {
  397. esMap[field] = tmp[field]
  398. }
  399. }
  400. company_type := util.ObjToString(tmp["company_type"])
  401. company_name := util.ObjToString(tmp["company_name"])
  402. if company_type == "个体工商户" {
  403. if len([]rune(company_name)) >= 5 {
  404. esMap["company_type_int"] = 31
  405. } else {
  406. esMap["company_type_int"] = 32
  407. }
  408. } else if company_type == "其他" || company_type == "" {
  409. if len([]rune(company_name)) >= 4 {
  410. esMap["company_type_int"] = 21
  411. } else {
  412. esMap["company_type_int"] = 22
  413. }
  414. } else {
  415. if company_type == "内资分公司" || company_type == "内资非法人企业、非公司私营企业及内资非公司企业分支机构" {
  416. esMap["company_type_int"] = 12
  417. } else if len([]rune(company_name)) >= 4 {
  418. esMap["company_type_int"] = 11
  419. } else {
  420. esMap["company_type_int"] = 13
  421. }
  422. }
  423. EsSaveCache <- esMap // 保存es
  424. }(tmp)
  425. tmp = make(map[string]interface{})
  426. }
  427. wg.Wait()
  428. log.Info(kword, zap.Any("Run Over...Count", count))
  429. }
  430. // SaveEs 过滤后数据存库
  431. func SaveEs() {
  432. arru := make([]map[string]interface{}, 100)
  433. indexu := 0
  434. for {
  435. select {
  436. case v := <-EsSaveCache:
  437. arru[indexu] = v
  438. indexu++
  439. if indexu == 100 {
  440. SP <- true
  441. go func(arru []map[string]interface{}) {
  442. defer func() {
  443. <-SP
  444. }()
  445. Es.BulkSave(Index, arru)
  446. // 存在第二个集群
  447. if Es2 != nil && Es2.S_esurl != "" {
  448. Es2.BulkSave(Index, arru)
  449. }
  450. }(arru)
  451. arru = make([]map[string]interface{}, 100)
  452. indexu = 0
  453. }
  454. case <-time.After(1000 * time.Millisecond):
  455. if indexu > 0 {
  456. SP <- true
  457. go func(arru []map[string]interface{}) {
  458. defer func() {
  459. <-SP
  460. }()
  461. Es.BulkSave(Index, arru)
  462. // 存在第二个集群
  463. if Es2 != nil && Es2.S_esurl != "" {
  464. Es2.BulkSave(Index, arru)
  465. }
  466. }(arru[:indexu])
  467. arru = make([]map[string]interface{}, 100)
  468. indexu = 0
  469. }
  470. }
  471. }
  472. }