task.go 12 KB


  1. package main
  2. import (
  3. "fmt"
  4. "github.com/cron"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "log"
  7. "qfw/util"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var (
  14. partner = []string{"identify_no", "stock_type", "stock_name", "identify_type", "stock_capital", "stock_realcapital"}
  15. employee = []string{"employee_name", "position"}
  16. TypeMap = map[string]string{
  17. "采购单位": "1",
  18. "中标单位": "2", // 投标企业
  19. "代理机构": "3",
  20. "厂商": "4",
  21. }
  22. TypeMap1 = map[string]string{
  23. "固定电话": "1",
  24. "手机号": "2",
  25. "邮箱": "3",
  26. "不存在": "4",
  27. }
  28. )
  29. //定时任务
  30. func TimeTask() {
  31. c := cron.New()
  32. cronstr := "0 0 15 ? * Tue" //每周二15点执行
  33. //cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次
  34. err := c.AddFunc(cronstr, func() { StdAdd() })
  35. if err != nil {
  36. util.Debug(err)
  37. return
  38. }
  39. c.Start()
  40. }
  41. // StdAdd 增量数据
  42. func StdAdd() {
  43. defer util.Catch()
  44. sess := Mgo.GetMgoConn()
  45. defer Mgo.DestoryMongoConn(sess)
  46. pool := make(chan bool, 5)
  47. wg := &sync.WaitGroup{}
  48. //q := bson.M{"_id": "affe29f8d061f3faa4170cafba41f316"}
  49. q := bson.M{"updatetime": bson.M{"$gt": Updatetime}}
  50. util.Debug(q)
  51. it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
  52. count := 0
  53. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  54. if count%20000 == 0 {
  55. log.Println("current:", count)
  56. }
  57. if util.IntAll(tmp["use_flag"]) > 5 {
  58. continue
  59. }
  60. pool <- true
  61. wg.Add(1)
  62. go func(tmp map[string]interface{}) {
  63. defer func() {
  64. <-pool
  65. wg.Done()
  66. }()
  67. esMap := map[string]interface{}{}
  68. //生索引字段处理
  69. for _, field := range EsFields {
  70. if tmp[field] == nil {
  71. continue
  72. }
  73. if field == "company_name" {
  74. esMap[field] = tmp["company_name"]
  75. esMap["name"] = tmp["company_name"]
  76. } else if field == "history_name" {
  77. var nameArr []string
  78. names := util.ObjToString(tmp["history_name"])
  79. if strings.Contains(names, ",") {
  80. nameArr = append(nameArr, strings.Split(names, ",")...)
  81. }
  82. if len(nameArr) > 0 {
  83. esMap["history_name"] = nameArr
  84. }
  85. } else if field == "establish_date" {
  86. // 成立日期修改成时间戳
  87. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local)
  88. if err != nil {
  89. util.Debug(err)
  90. } else {
  91. esMap["establish_date"] = location.Unix()
  92. }
  93. } else if field == "cancel_date" {
  94. esMap["cancel_date"] = tmp["cancel_date"]
  95. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["cancel_date"]), time.Local)
  96. if err != nil {
  97. util.Debug(err)
  98. } else {
  99. esMap["cancel_date_unix"] = location.Unix()
  100. }
  101. } else if field == "lastupdatetime" {
  102. esMap["lastupdatetime"] = tmp["update_time_msql"]
  103. } else if field == "bid_projectname" {
  104. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  105. p1 := util.ObjArrToStringArr(pname)
  106. esMap["bid_projectname"] = strings.Join(p1, ",")
  107. }
  108. } else if field == "bid_purchasing" {
  109. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  110. p1 := util.ObjArrToStringArr(pur)
  111. esMap["bid_purchasing"] = strings.Join(p1, ",")
  112. }
  113. } else if field == "bid_area" {
  114. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  115. p1 := util.ObjArrToStringArr(areas)
  116. esMap["bid_area"] = strings.Join(p1, ",")
  117. }
  118. } else if field == "partners" {
  119. if ps, ok := tmp["partners"].([]interface{}); ok {
  120. var parr []map[string]interface{}
  121. for _, v := range ps {
  122. p := make(map[string]interface{})
  123. v1 := v.(map[string]interface{})
  124. for _, field := range partner {
  125. if v1[field] == nil {
  126. continue
  127. }
  128. if field == "stock_capital" || field == "stock_realcapital" {
  129. if v, err := strconv.ParseFloat(util.ObjToString(v1[field]), 64); err == nil {
  130. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  131. p[field] = v
  132. }
  133. } else {
  134. p[field] = v1[field]
  135. }
  136. }
  137. if len(p) > 0 {
  138. parr = append(parr, p)
  139. }
  140. }
  141. if len(parr) > 0 {
  142. esMap[field] = parr
  143. }
  144. }
  145. } else if field == "employees" {
  146. if ps, ok := tmp["employees"].([]interface{}); ok {
  147. var parr []map[string]interface{}
  148. for _, v := range ps {
  149. p := make(map[string]interface{})
  150. v1 := v.(map[string]interface{})
  151. for _, field := range employee {
  152. if v1[field] == nil {
  153. continue
  154. } else {
  155. p[field] = v1[field]
  156. }
  157. }
  158. if len(p) > 0 {
  159. parr = append(parr, p)
  160. }
  161. }
  162. if len(parr) > 0 {
  163. esMap[field] = parr
  164. }
  165. }
  166. } else if field == "bid_unittype" {
  167. if t2, ok := tmp["bid_unittype"].([]interface{}); ok {
  168. var arr []string
  169. for _, v := range util.ObjArrToStringArr(t2) {
  170. arr = append(arr, TypeMap[v])
  171. }
  172. esMap["bid_unittype"] = strings.Join(arr, ",")
  173. }
  174. } else if field == "bid_contracttype" {
  175. if t2, ok := tmp["bid_contracttype"].([]interface{}); ok {
  176. var arr []string
  177. for _, v := range util.ObjArrToStringArr(t2) {
  178. arr = append(arr, TypeMap1[v])
  179. }
  180. esMap["bid_contracttype"] = strings.Join(arr, ",")
  181. }
  182. } else {
  183. esMap[field] = tmp[field]
  184. }
  185. }
  186. company_type := util.ObjToString(tmp["company_type"])
  187. company_name := util.ObjToString(tmp["company_name"])
  188. if company_type == "个体工商户" {
  189. if len([]rune(company_name)) >= 5 {
  190. esMap["company_type_int"] = 31
  191. } else {
  192. esMap["company_type_int"] = 32
  193. }
  194. } else if company_type == "其他" || company_type == "" {
  195. if len([]rune(company_name)) >= 4 {
  196. esMap["company_type_int"] = 21
  197. } else {
  198. esMap["company_type_int"] = 22
  199. }
  200. } else {
  201. if company_type == "内资分公司" || company_type == "内资非法人企业、非公司私营企业及内资非公司企业分支机构" {
  202. esMap["company_type_int"] = 12
  203. } else if len([]rune(company_name)) >= 4 {
  204. esMap["company_type_int"] = 11
  205. } else {
  206. esMap["company_type_int"] = 13
  207. }
  208. }
  209. EsSaveCache <- esMap // 保存es
  210. }(tmp)
  211. tmp = make(map[string]interface{})
  212. }
  213. wg.Wait()
  214. log.Println("Run Over...Count:", count)
  215. }
  216. // StdAll 存量数据生es
  217. func StdAll() {
  218. defer util.Catch()
  219. sess := Mgo.GetMgoConn()
  220. defer Mgo.DestoryMongoConn(sess)
  221. pool := make(chan bool, 10)
  222. wg := &sync.WaitGroup{}
  223. q := bson.M{"_id": "f32b67603a7f296e5f710caf2064ef9d"}
  224. it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
  225. count := 0
  226. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  227. if count%20000 == 0 {
  228. log.Println("current:", count, tmp["_id"])
  229. }
  230. pool <- true
  231. wg.Add(1)
  232. go func(tmp map[string]interface{}) {
  233. defer func() {
  234. <-pool
  235. wg.Done()
  236. }()
  237. esMap := map[string]interface{}{}
  238. //生索引字段处理
  239. for _, field := range EsFields {
  240. if tmp[field] == nil {
  241. continue
  242. }
  243. if field == "_id" {
  244. esMap[field] = tmp[field]
  245. esMap["id"] = tmp[field]
  246. }
  247. if field == "company_name" {
  248. esMap[field] = tmp["company_name"]
  249. esMap["name"] = tmp["company_name"]
  250. } else if field == "history_name" {
  251. var nameArr []string
  252. for _, v := range strings.Split(util.ObjToString(tmp["history_name"]), ";") {
  253. if v != "" {
  254. nameArr = append(nameArr, v)
  255. }
  256. }
  257. if len(nameArr) > 0 {
  258. esMap["history_name"] = nameArr
  259. }
  260. } else if field == "establish_date" {
  261. // 成立日期修改成时间戳
  262. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local)
  263. if err != nil {
  264. util.Debug(err)
  265. } else {
  266. esMap["establish_date"] = location.Unix()
  267. }
  268. } else if field == "lastupdatetime" {
  269. esMap["lastupdatetime"] = tmp["update_time_msql"]
  270. } else if field == "cancel_date" {
  271. esMap["cancel_date"] = tmp["cancel_date"]
  272. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["cancel_date"]), time.Local)
  273. if err != nil {
  274. util.Debug(err)
  275. } else {
  276. esMap["cancel_date_unix"] = location.Unix()
  277. }
  278. } else if field == "bid_projectname" {
  279. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  280. p1 := util.ObjArrToStringArr(pname)
  281. esMap["bid_projectname"] = strings.Join(p1, ",")
  282. }
  283. } else if field == "bid_purchasing" {
  284. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  285. p1 := util.ObjArrToStringArr(pur)
  286. esMap["bid_purchasing"] = strings.Join(p1, ",")
  287. }
  288. } else if field == "bid_area" {
  289. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  290. p1 := util.ObjArrToStringArr(areas)
  291. esMap["bid_area"] = strings.Join(p1, ",")
  292. }
  293. } else if field == "partners" {
  294. if ps, ok := tmp["partners"].([]interface{}); ok {
  295. var parr []map[string]interface{}
  296. for _, v := range ps {
  297. p := make(map[string]interface{})
  298. v1 := v.(map[string]interface{})
  299. for _, field := range partner {
  300. if v1[field] == nil {
  301. continue
  302. }
  303. if field == "stock_capital" || field == "stock_realcapital" {
  304. text := util.ObjToString(v1[field])
  305. if strings.Contains(text, "万元") {
  306. text = strings.Replace(text, "万元", "", -1)
  307. }
  308. if v, err := strconv.ParseFloat(text, 64); err == nil {
  309. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  310. p[field] = v
  311. }
  312. } else {
  313. p[field] = v1[field]
  314. }
  315. }
  316. if len(p) > 0 {
  317. parr = append(parr, p)
  318. }
  319. }
  320. if len(parr) > 0 {
  321. esMap[field] = parr
  322. }
  323. }
  324. } else if field == "employees" {
  325. if ps, ok := tmp["employees"].([]interface{}); ok {
  326. var parr []map[string]interface{}
  327. for _, v := range ps {
  328. p := make(map[string]interface{})
  329. v1 := v.(map[string]interface{})
  330. for _, field := range employee {
  331. if v1[field] == nil {
  332. continue
  333. } else {
  334. p[field] = v1[field]
  335. }
  336. }
  337. if len(p) > 0 {
  338. parr = append(parr, p)
  339. }
  340. }
  341. if len(parr) > 0 {
  342. esMap[field] = parr
  343. }
  344. }
  345. } else if field == "bid_unittype" {
  346. if t2, ok := tmp["bid_unittype"].([]interface{}); ok {
  347. var arr []string
  348. for _, v := range util.ObjArrToStringArr(t2) {
  349. arr = append(arr, TypeMap[v])
  350. }
  351. esMap["bid_unittype"] = strings.Join(arr, ",")
  352. }
  353. } else if field == "bid_contracttype" {
  354. if t2, ok := tmp["bid_contracttype"].([]interface{}); ok {
  355. var arr []string
  356. for _, v := range util.ObjArrToStringArr(t2) {
  357. arr = append(arr, TypeMap1[v])
  358. }
  359. esMap["bid_contracttype"] = strings.Join(arr, ",")
  360. }
  361. } else {
  362. esMap[field] = tmp[field]
  363. }
  364. }
  365. company_type := util.ObjToString(tmp["company_type"])
  366. company_name := util.ObjToString(tmp["company_name"])
  367. if company_type == "个体工商户" {
  368. if len([]rune(company_name)) >= 5 {
  369. esMap["company_type_int"] = 31
  370. } else {
  371. esMap["company_type_int"] = 32
  372. }
  373. } else if company_type == "其他" || company_type == "" {
  374. if len([]rune(company_name)) >= 4 {
  375. esMap["company_type_int"] = 21
  376. } else {
  377. esMap["company_type_int"] = 22
  378. }
  379. } else {
  380. if company_type == "内资分公司" || company_type == "内资非法人企业、非公司私营企业及内资非公司企业分支机构" {
  381. esMap["company_type_int"] = 12
  382. } else if len([]rune(company_name)) >= 4 {
  383. esMap["company_type_int"] = 11
  384. } else {
  385. esMap["company_type_int"] = 13
  386. }
  387. }
  388. EsSaveCache <- esMap // 保存es
  389. }(tmp)
  390. tmp = make(map[string]interface{})
  391. }
  392. wg.Wait()
  393. log.Println("Run Over...Count:", count)
  394. }
  395. // SaveEs 过滤后数据存库
  396. func SaveEs() {
  397. log.Println("Es Save...")
  398. arru := make([]map[string]interface{}, 100)
  399. indexu := 0
  400. for {
  401. select {
  402. case v := <-EsSaveCache:
  403. arru[indexu] = v
  404. indexu++
  405. if indexu == 100 {
  406. SP <- true
  407. go func(arru []map[string]interface{}) {
  408. defer func() {
  409. <-SP
  410. }()
  411. Es.BulkSave(Index, arru)
  412. }(arru)
  413. arru = make([]map[string]interface{}, 100)
  414. indexu = 0
  415. }
  416. case <-time.After(1000 * time.Millisecond):
  417. if indexu > 0 {
  418. SP <- true
  419. go func(arru []map[string]interface{}) {
  420. defer func() {
  421. <-SP
  422. }()
  423. Es.BulkSave(Index, arru)
  424. }(arru[:indexu])
  425. arru = make([]map[string]interface{}, 100)
  426. indexu = 0
  427. }
  428. }
  429. }
  430. }