task.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/spf13/viper"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var (
  14. partner = []string{"identify_no", "stock_type", "stock_name", "identify_type", "stock_capital", "stock_realcapital"}
  15. employee = []string{"employee_name", "position"}
  16. TypeMap = map[string]string{
  17. "采购单位": "1",
  18. "中标单位": "2", // 投标企业
  19. "代理机构": "3",
  20. "厂商": "4",
  21. }
  22. TypeMap1 = map[string]string{
  23. "固定电话": "1",
  24. "手机号": "2",
  25. "邮箱": "3",
  26. "不存在": "4",
  27. }
  28. )
  29. // StdAdd 增量数据
  30. func StdAdd(q interface{}) {
  31. defer util.Catch()
  32. sess := Mgo.GetMgoConn()
  33. defer Mgo.DestoryMongoConn(sess)
  34. pool := make(chan bool, 18)
  35. wg := &sync.WaitGroup{}
  36. //q := bson.M{"_id": "affe29f8d061f3faa4170cafba41f316"}
  37. //q := bson.M{"updatetime": bson.M{"$gt": Updatetime}}
  38. log.Info("StdAdd", zap.Any("q", q))
  39. it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
  40. count := 0
  41. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  42. if count%20000 == 0 {
  43. //log.Println("current:", count)
  44. log.Info("StdAdd", zap.Int("current:", count))
  45. log.Info("StdAdd", zap.Any("q", q), zap.Any("updatetime", tmp["updatetime"]))
  46. }
  47. if util.IntAll(tmp["use_flag"]) > 5 {
  48. continue
  49. }
  50. pool <- true
  51. wg.Add(1)
  52. go func(tmp map[string]interface{}) {
  53. defer func() {
  54. <-pool
  55. wg.Done()
  56. }()
  57. esMap := map[string]interface{}{}
  58. //生索引字段处理
  59. for _, field := range EsFields {
  60. if tmp[field] == nil {
  61. continue
  62. }
  63. if field == "company_name" {
  64. esMap[field] = tmp["company_name"]
  65. esMap["name"] = tmp["company_name"]
  66. } else if field == "history_name" {
  67. var nameArr []string
  68. names := util.ObjToString(tmp["history_name"])
  69. if strings.Contains(names, ",") {
  70. nameArr = append(nameArr, strings.Split(names, ",")...)
  71. }
  72. if len(nameArr) > 0 {
  73. esMap["history_name"] = nameArr
  74. }
  75. } else if field == "establish_date" {
  76. // 成立日期修改成时间戳
  77. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local)
  78. if err != nil {
  79. util.Debug(err)
  80. } else {
  81. esMap["establish_date"] = location.Unix()
  82. }
  83. } else if field == "cancel_date" {
  84. esMap["cancel_date"] = tmp["cancel_date"]
  85. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["cancel_date"]), time.Local)
  86. if err != nil {
  87. util.Debug(err)
  88. } else {
  89. esMap["cancel_date_unix"] = location.Unix()
  90. }
  91. } else if field == "lastupdatetime" {
  92. esMap["lastupdatetime"] = tmp["update_time_msql"]
  93. } else if field == "bid_projectname" {
  94. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  95. p1 := util.ObjArrToStringArr(pname)
  96. esMap["bid_projectname"] = strings.Join(p1, ",")
  97. }
  98. } else if field == "bid_purchasing" {
  99. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  100. p1 := util.ObjArrToStringArr(pur)
  101. esMap["bid_purchasing"] = strings.Join(p1, ",")
  102. }
  103. } else if field == "bid_area" {
  104. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  105. p1 := util.ObjArrToStringArr(areas)
  106. esMap["bid_area"] = strings.Join(p1, ",")
  107. }
  108. } else if field == "partners" {
  109. if ps, ok := tmp["partners"].([]interface{}); ok {
  110. var parr []map[string]interface{}
  111. for _, v := range ps {
  112. p := make(map[string]interface{})
  113. v1 := v.(map[string]interface{})
  114. for _, field := range partner {
  115. if v1[field] == nil {
  116. continue
  117. }
  118. if field == "stock_capital" || field == "stock_realcapital" {
  119. if v, err := strconv.ParseFloat(util.ObjToString(v1[field]), 64); err == nil {
  120. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  121. p[field] = v
  122. }
  123. } else {
  124. p[field] = v1[field]
  125. }
  126. }
  127. if len(p) > 0 {
  128. parr = append(parr, p)
  129. }
  130. }
  131. if len(parr) > 0 {
  132. esMap[field] = parr
  133. }
  134. }
  135. } else if field == "employees" {
  136. if ps, ok := tmp["employees"].([]interface{}); ok {
  137. var parr []map[string]interface{}
  138. for _, v := range ps {
  139. p := make(map[string]interface{})
  140. v1 := v.(map[string]interface{})
  141. for _, field := range employee {
  142. if v1[field] == nil {
  143. continue
  144. } else {
  145. p[field] = v1[field]
  146. }
  147. }
  148. if len(p) > 0 {
  149. parr = append(parr, p)
  150. }
  151. }
  152. if len(parr) > 0 {
  153. esMap[field] = parr
  154. }
  155. }
  156. } else if field == "bid_unittype" {
  157. if t2, ok := tmp["bid_unittype"].([]interface{}); ok {
  158. var arr []string
  159. for _, v := range util.ObjArrToStringArr(t2) {
  160. arr = append(arr, TypeMap[v])
  161. }
  162. esMap["bid_unittype"] = strings.Join(arr, ",")
  163. }
  164. } else if field == "bid_contracttype" {
  165. if t2, ok := tmp["bid_contracttype"].([]interface{}); ok {
  166. var arr []string
  167. for _, v := range util.ObjArrToStringArr(t2) {
  168. arr = append(arr, TypeMap1[v])
  169. }
  170. esMap["bid_contracttype"] = strings.Join(arr, ",")
  171. }
  172. } else if field == "_id" {
  173. esMap["_id"] = tmp["_id"]
  174. esMap["id"] = tmp["_id"]
  175. } else {
  176. esMap[field] = tmp[field]
  177. }
  178. }
  179. company_type := util.ObjToString(tmp["company_type"])
  180. company_name := util.ObjToString(tmp["company_name"])
  181. if company_type == "个体工商户" {
  182. if len([]rune(company_name)) >= 5 {
  183. esMap["company_type_int"] = 31
  184. } else {
  185. esMap["company_type_int"] = 32
  186. }
  187. } else if company_type == "其他" || company_type == "" {
  188. if len([]rune(company_name)) >= 4 {
  189. esMap["company_type_int"] = 21
  190. } else {
  191. esMap["company_type_int"] = 22
  192. }
  193. } else {
  194. if company_type == "内资分公司" || company_type == "内资非法人企业、非公司私营企业及内资非公司企业分支机构" {
  195. esMap["company_type_int"] = 12
  196. } else if len([]rune(company_name)) >= 4 {
  197. esMap["company_type_int"] = 11
  198. } else {
  199. esMap["company_type_int"] = 13
  200. }
  201. }
  202. EsSaveCache <- esMap // 保存es
  203. }(tmp)
  204. tmp = make(map[string]interface{})
  205. }
  206. wg.Wait()
  207. log.Info("StdAdd", zap.Any("q", q), zap.Int("Run Over...Count::", count))
  208. }
  209. // StdAll 分段处理存量数据
  210. func StdAll() {
  211. type Biddingall struct {
  212. Coll string
  213. Gtime int64
  214. Ltime int64
  215. }
  216. type RoutinesConf struct {
  217. Num int64
  218. }
  219. type AllConf struct {
  220. All map[string]Biddingall
  221. Routines RoutinesConf
  222. }
  223. var all AllConf
  224. viper.SetConfigFile("stdall.toml")
  225. viper.SetConfigName("stdall") // 配置文件名称(无扩展名)
  226. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  227. viper.AddConfigPath("./")
  228. err := viper.ReadInConfig() // 查找并读取配置文件
  229. if err != nil { // 处理读取配置文件的错误
  230. fmt.Println("ReadInConfig err =>", err)
  231. return
  232. }
  233. err = viper.Unmarshal(&all)
  234. if err != nil {
  235. fmt.Println("biddingAllDataTask Unmarshal err =>", err)
  236. return
  237. }
  238. //fmt.Println("all", all)
  239. for k, conf := range all.All {
  240. go dealAll(conf.Coll, k, conf.Gtime, conf.Ltime, all.Routines.Num)
  241. }
  242. }
  243. func dealAll(coll, kword string, gtime, ltime, routines int64) {
  244. defer util.Catch()
  245. sess := Mgo.GetMgoConn()
  246. defer Mgo.DestoryMongoConn(sess)
  247. q := map[string]interface{}{
  248. "updatetime": map[string]interface{}{
  249. "$gt": gtime,
  250. "$lte": ltime,
  251. },
  252. }
  253. log.Info("dealAll", zap.Any("q", q))
  254. pool := make(chan bool, routines)
  255. wg := &sync.WaitGroup{}
  256. it := sess.DB(Dbname).C(coll).Find(q).Iter()
  257. count := 0
  258. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  259. if count%20000 == 0 {
  260. log.Info(kword, zap.Int("current:", count))
  261. log.Info(kword, zap.Any("updatetime =>", tmp["updatetime"]))
  262. }
  263. pool <- true
  264. wg.Add(1)
  265. go func(tmp map[string]interface{}) {
  266. defer func() {
  267. <-pool
  268. wg.Done()
  269. }()
  270. esMap := map[string]interface{}{}
  271. //生索引字段处理
  272. for _, field := range EsFields {
  273. if tmp[field] == nil {
  274. continue
  275. }
  276. if field == "company_name" {
  277. esMap[field] = tmp["company_name"]
  278. esMap["name"] = tmp["company_name"]
  279. } else if field == "history_name" {
  280. var nameArr []string
  281. for _, v := range strings.Split(util.ObjToString(tmp["history_name"]), ",") {
  282. if v != "" {
  283. nameArr = append(nameArr, v)
  284. }
  285. }
  286. if len(nameArr) > 0 {
  287. esMap["history_name"] = nameArr
  288. }
  289. } else if field == "establish_date" {
  290. // 成立日期修改成时间戳
  291. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local)
  292. if err != nil {
  293. util.Debug(err)
  294. } else {
  295. esMap["establish_date"] = location.Unix()
  296. }
  297. } else if field == "lastupdatetime" {
  298. esMap["lastupdatetime"] = tmp["update_time_msql"]
  299. } else if field == "cancel_date" {
  300. esMap["cancel_date"] = tmp["cancel_date"]
  301. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["cancel_date"]), time.Local)
  302. if err != nil {
  303. util.Debug(err)
  304. } else {
  305. esMap["cancel_date_unix"] = location.Unix()
  306. }
  307. } else if field == "bid_projectname" {
  308. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  309. p1 := util.ObjArrToStringArr(pname)
  310. esMap["bid_projectname"] = strings.Join(p1, ",")
  311. }
  312. } else if field == "bid_purchasing" {
  313. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  314. p1 := util.ObjArrToStringArr(pur)
  315. esMap["bid_purchasing"] = strings.Join(p1, ",")
  316. }
  317. } else if field == "bid_area" {
  318. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  319. p1 := util.ObjArrToStringArr(areas)
  320. esMap["bid_area"] = strings.Join(p1, ",")
  321. }
  322. } else if field == "partners" {
  323. if ps, ok := tmp["partners"].([]interface{}); ok {
  324. var parr []map[string]interface{}
  325. for _, v := range ps {
  326. p := make(map[string]interface{})
  327. v1 := v.(map[string]interface{})
  328. for _, field := range partner {
  329. if v1[field] == nil {
  330. continue
  331. }
  332. if field == "stock_capital" || field == "stock_realcapital" {
  333. text := util.ObjToString(v1[field])
  334. if strings.Contains(text, "万元") {
  335. text = strings.Replace(text, "万元", "", -1)
  336. }
  337. if v, err := strconv.ParseFloat(text, 64); err == nil {
  338. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  339. p[field] = v
  340. }
  341. } else {
  342. p[field] = v1[field]
  343. }
  344. }
  345. if len(p) > 0 {
  346. parr = append(parr, p)
  347. }
  348. }
  349. if len(parr) > 0 {
  350. esMap[field] = parr
  351. }
  352. }
  353. } else if field == "employees" {
  354. if ps, ok := tmp["employees"].([]interface{}); ok {
  355. var parr []map[string]interface{}
  356. for _, v := range ps {
  357. p := make(map[string]interface{})
  358. v1 := v.(map[string]interface{})
  359. for _, field := range employee {
  360. if v1[field] == nil {
  361. continue
  362. } else {
  363. p[field] = v1[field]
  364. }
  365. }
  366. if len(p) > 0 {
  367. parr = append(parr, p)
  368. }
  369. }
  370. if len(parr) > 0 {
  371. esMap[field] = parr
  372. }
  373. }
  374. } else if field == "bid_unittype" {
  375. if t2, ok := tmp["bid_unittype"].([]interface{}); ok {
  376. var arr []string
  377. for _, v := range util.ObjArrToStringArr(t2) {
  378. arr = append(arr, TypeMap[v])
  379. }
  380. esMap["bid_unittype"] = strings.Join(arr, ",")
  381. }
  382. } else if field == "bid_contracttype" {
  383. if t2, ok := tmp["bid_contracttype"].([]interface{}); ok {
  384. var arr []string
  385. for _, v := range util.ObjArrToStringArr(t2) {
  386. arr = append(arr, TypeMap1[v])
  387. }
  388. esMap["bid_contracttype"] = strings.Join(arr, ",")
  389. }
  390. } else if field == "_id" {
  391. esMap["_id"] = tmp["_id"]
  392. esMap["id"] = tmp["_id"]
  393. } else {
  394. esMap[field] = tmp[field]
  395. }
  396. }
  397. company_type := util.ObjToString(tmp["company_type"])
  398. company_name := util.ObjToString(tmp["company_name"])
  399. if company_type == "个体工商户" {
  400. if len([]rune(company_name)) >= 5 {
  401. esMap["company_type_int"] = 31
  402. } else {
  403. esMap["company_type_int"] = 32
  404. }
  405. } else if company_type == "其他" || company_type == "" {
  406. if len([]rune(company_name)) >= 4 {
  407. esMap["company_type_int"] = 21
  408. } else {
  409. esMap["company_type_int"] = 22
  410. }
  411. } else {
  412. if company_type == "内资分公司" || company_type == "内资非法人企业、非公司私营企业及内资非公司企业分支机构" {
  413. esMap["company_type_int"] = 12
  414. } else if len([]rune(company_name)) >= 4 {
  415. esMap["company_type_int"] = 11
  416. } else {
  417. esMap["company_type_int"] = 13
  418. }
  419. }
  420. EsSaveCache <- esMap // 保存es
  421. }(tmp)
  422. tmp = make(map[string]interface{})
  423. }
  424. wg.Wait()
  425. log.Info(kword, zap.Any("Run Over...Count", count))
  426. }
  427. // SaveEs 过滤后数据存库
  428. func SaveEs() {
  429. arru := make([]map[string]interface{}, 100)
  430. indexu := 0
  431. for {
  432. select {
  433. case v := <-EsSaveCache:
  434. arru[indexu] = v
  435. indexu++
  436. if indexu == 100 {
  437. SP <- true
  438. go func(arru []map[string]interface{}) {
  439. defer func() {
  440. <-SP
  441. }()
  442. Es.BulkSave(Index, arru)
  443. }(arru)
  444. arru = make([]map[string]interface{}, 100)
  445. indexu = 0
  446. }
  447. case <-time.After(1000 * time.Millisecond):
  448. if indexu > 0 {
  449. SP <- true
  450. go func(arru []map[string]interface{}) {
  451. defer func() {
  452. <-SP
  453. }()
  454. Es.BulkSave(Index, arru)
  455. }(arru[:indexu])
  456. arru = make([]map[string]interface{}, 100)
  457. indexu = 0
  458. }
  459. }
  460. }
  461. }