task.go 10 KB


  1. package main
  2. import (
  3. "fmt"
  4. "github.com/cron"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "log"
  7. "mongodb"
  8. "qfw/util"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. var (
  15. partner = []string{"identify_no", "stock_type", "stock_name", "identify_type", "stock_capital", "stock_realcapital"}
  16. employee = []string{"employee_name", "position"}
  17. )
  18. //定时任务
  19. func TimeTask() {
  20. c := cron.New()
  21. cronstr := "0 0 15 ? * Tue" //每周二15点执行
  22. //cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次
  23. err := c.AddFunc(cronstr, func() { StdAdd() })
  24. if err != nil {
  25. util.Debug(err)
  26. return
  27. }
  28. c.Start()
  29. }
  30. // StdAdd 增量数据
  31. func StdAdd() {
  32. defer util.Catch()
  33. sess := Mgo.GetMgoConn()
  34. defer Mgo.DestoryMongoConn(sess)
  35. pool := make(chan bool, 5)
  36. wg := &sync.WaitGroup{}
  37. //q := bson.M{"_id": "affe29f8d061f3faa4170cafba41f316"}
  38. q := bson.M{"updatetime": bson.M{"$gt": Updatetime}}
  39. util.Debug(q)
  40. it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
  41. count := 0
  42. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  43. if count%200 == 0 {
  44. log.Println("current:", count)
  45. }
  46. pool <- true
  47. wg.Add(1)
  48. go func(tmp map[string]interface{}) {
  49. defer func() {
  50. <-pool
  51. wg.Done()
  52. }()
  53. esMap := map[string]interface{}{}
  54. //生索引字段处理
  55. for _, field := range EsFields {
  56. if tmp[field] == nil {
  57. continue
  58. }
  59. if field == "company_name" {
  60. esMap[field] = tmp["company_name"]
  61. esMap["name"] = tmp["company_name"]
  62. } else if field == "history_name" {
  63. var nameArr []string
  64. names := util.ObjToString(tmp["history_name"])
  65. if strings.Contains(names, ",") {
  66. nameArr = append(nameArr, strings.Split(names, ",")...)
  67. }
  68. if len(nameArr) > 0 {
  69. esMap["history_name"] = nameArr
  70. }
  71. } else if field == "establish_date" {
  72. // 成立日期修改成时间戳
  73. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local)
  74. if err != nil {
  75. util.Debug(err)
  76. } else {
  77. esMap["establish_date"] = location.Unix()
  78. }
  79. } else if field == "lastupdatetime" {
  80. esMap["lastupdatetime"] = tmp["update_time_msql"]
  81. } else if field == "bid_projectname" {
  82. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  83. p1 := util.ObjArrToStringArr(pname)
  84. esMap["bid_projectname"] = strings.Join(p1, ",")
  85. }
  86. } else if field == "bid_purchasing" {
  87. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  88. p1 := util.ObjArrToStringArr(pur)
  89. esMap["bid_purchasing"] = strings.Join(p1, ",")
  90. }
  91. } else if field == "bid_area" {
  92. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  93. p1 := util.ObjArrToStringArr(areas)
  94. esMap["bid_area"] = strings.Join(p1, ",")
  95. }
  96. } else if field == "partners" {
  97. if ps, ok := tmp["partners"].([]interface{}); ok {
  98. var parr []map[string]interface{}
  99. for _, v := range ps {
  100. p := make(map[string]interface{})
  101. v1 := v.(map[string]interface{})
  102. for _, field := range partner {
  103. if v1[field] == nil {
  104. continue
  105. }
  106. if field == "stock_capital" || field == "stock_realcapital" {
  107. if v, err := strconv.ParseFloat(util.ObjToString(v1[field]), 64); err == nil {
  108. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  109. p[field] = v
  110. }
  111. } else {
  112. p[field] = v1[field]
  113. }
  114. }
  115. if len(p) > 0 {
  116. parr = append(parr, p)
  117. }
  118. }
  119. if len(parr) > 0 {
  120. esMap[field] = parr
  121. }
  122. }
  123. } else if field == "employees" {
  124. if ps, ok := tmp["employees"].([]interface{}); ok {
  125. var parr []map[string]interface{}
  126. for _, v := range ps {
  127. p := make(map[string]interface{})
  128. v1 := v.(map[string]interface{})
  129. for _, field := range employee {
  130. if v1[field] == nil {
  131. continue
  132. } else {
  133. p[field] = v1[field]
  134. }
  135. }
  136. if len(p) > 0 {
  137. parr = append(parr, p)
  138. }
  139. }
  140. if len(parr) > 0 {
  141. esMap[field] = parr
  142. }
  143. }
  144. } else {
  145. esMap[field] = tmp[field]
  146. }
  147. }
  148. company_type := util.ObjToString(tmp["company_type"])
  149. company_name := util.ObjToString(tmp["company_name"])
  150. if company_type == "个体工商户" {
  151. if len([]rune(company_name)) >= 5 {
  152. esMap["company_type_int"] = 31
  153. } else {
  154. esMap["company_type_int"] = 32
  155. }
  156. } else if company_type == "其他" || company_type == "" {
  157. if len([]rune(company_name)) >= 4 {
  158. esMap["company_type_int"] = 21
  159. } else {
  160. esMap["company_type_int"] = 22
  161. }
  162. } else {
  163. if company_type == "内资分公司" {
  164. esMap["company_type_int"] = 12
  165. } else if len([]rune(company_name)) >= 4 {
  166. esMap["company_type_int"] = 11
  167. } else {
  168. esMap["company_type_int"] = 13
  169. }
  170. }
  171. EsSaveCache <- esMap // 保存es
  172. }(tmp)
  173. tmp = make(map[string]interface{})
  174. }
  175. wg.Wait()
  176. log.Println("Run Over...Count:", count)
  177. }
  178. // StdAll 存量数据生es
  179. func StdAll() {
  180. defer util.Catch()
  181. sess := Mgo.GetMgoConn()
  182. defer Mgo.DestoryMongoConn(sess)
  183. pool := make(chan bool, 10)
  184. wg := &sync.WaitGroup{}
  185. //q := bson.M{"_id": mongodb.StringTOBsonId("61d7f129de77c557ce0457ca")}
  186. it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
  187. count := 0
  188. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  189. if count%200 == 0 {
  190. log.Println("current:", count)
  191. }
  192. pool <- true
  193. wg.Add(1)
  194. go func(tmp map[string]interface{}) {
  195. defer func() {
  196. <-pool
  197. wg.Done()
  198. }()
  199. esMap := map[string]interface{}{}
  200. //生索引字段处理
  201. for _, field := range EsFields {
  202. if tmp[field] == nil {
  203. continue
  204. }
  205. if field == "_id" {
  206. esMap[field] = mongodb.BsonIdToSId(tmp[field])
  207. } else if field == "company_name" {
  208. esMap[field] = tmp["company_name"]
  209. esMap["name"] = tmp["company_name"]
  210. } else if field == "history_name" {
  211. var nameArr []string
  212. for _, v := range strings.Split(util.ObjToString(tmp["history_name"]), ";") {
  213. if v != "" {
  214. nameArr = append(nameArr, v)
  215. }
  216. }
  217. if len(nameArr) > 0 {
  218. esMap["history_name"] = nameArr
  219. }
  220. } else if field == "establish_date" && util.ObjToString(tmp["establish_date"]) != "" {
  221. // 成立日期修改成时间戳
  222. location, err := time.ParseInLocation(util.Date_Short_Layout, util.ObjToString(tmp["establish_date"]), time.Local)
  223. if err != nil {
  224. util.Debug(err)
  225. } else {
  226. esMap["establish_date"] = location.Unix()
  227. }
  228. } else if field == "lastupdatetime" {
  229. esMap["lastupdatetime"] = tmp["update_time_msql"]
  230. } else if field == "bid_projectname" {
  231. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  232. p1 := util.ObjArrToStringArr(pname)
  233. esMap["bid_projectname"] = strings.Join(p1, ",")
  234. }
  235. } else if field == "bid_purchasing" {
  236. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  237. p1 := util.ObjArrToStringArr(pur)
  238. esMap["bid_purchasing"] = strings.Join(p1, ",")
  239. }
  240. } else if field == "bid_area" {
  241. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  242. p1 := util.ObjArrToStringArr(areas)
  243. esMap["bid_area"] = strings.Join(p1, ",")
  244. }
  245. } else if field == "partners" {
  246. if ps, ok := tmp["partners"].([]interface{}); ok {
  247. var parr []map[string]interface{}
  248. for _, v := range ps {
  249. p := make(map[string]interface{})
  250. v1 := v.(map[string]interface{})
  251. for _, field := range partner {
  252. if v1[field] == nil {
  253. continue
  254. }
  255. if field == "stock_capital" || field == "stock_realcapital" {
  256. text := util.ObjToString(v1[field])
  257. if strings.Contains(text, "万元") {
  258. text = strings.Replace(text, "万元", "", -1)
  259. }
  260. if v, err := strconv.ParseFloat(text, 64); err == nil {
  261. v, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", v), 64) //保留小数点两位
  262. p[field] = v
  263. }
  264. } else {
  265. p[field] = v1[field]
  266. }
  267. }
  268. if len(p) > 0 {
  269. parr = append(parr, p)
  270. }
  271. }
  272. if len(parr) > 0 {
  273. esMap[field] = parr
  274. }
  275. }
  276. } else if field == "employees" {
  277. if ps, ok := tmp["employees"].([]interface{}); ok {
  278. var parr []map[string]interface{}
  279. for _, v := range ps {
  280. p := make(map[string]interface{})
  281. v1 := v.(map[string]interface{})
  282. for _, field := range employee {
  283. if v1[field] == nil {
  284. continue
  285. } else {
  286. p[field] = v1[field]
  287. }
  288. }
  289. if len(p) > 0 {
  290. parr = append(parr, p)
  291. }
  292. }
  293. if len(parr) > 0 {
  294. esMap[field] = parr
  295. }
  296. }
  297. } else {
  298. esMap[field] = tmp[field]
  299. }
  300. }
  301. company_type := util.ObjToString(tmp["company_type"])
  302. company_name := util.ObjToString(tmp["company_name"])
  303. if company_type == "个体工商户" {
  304. if len([]rune(company_name)) >= 5 {
  305. esMap["company_type_int"] = 31
  306. } else {
  307. esMap["company_type_int"] = 32
  308. }
  309. } else if company_type == "其他" || company_type == "" {
  310. if len([]rune(company_name)) >= 4 {
  311. esMap["company_type_int"] = 21
  312. } else {
  313. esMap["company_type_int"] = 22
  314. }
  315. } else {
  316. if company_type == "内资分公司" {
  317. esMap["company_type_int"] = 12
  318. } else if len([]rune(company_name)) >= 4 {
  319. esMap["company_type_int"] = 11
  320. } else {
  321. esMap["company_type_int"] = 13
  322. }
  323. }
  324. EsSaveCache <- esMap // 保存es
  325. }(tmp)
  326. tmp = make(map[string]interface{})
  327. }
  328. wg.Wait()
  329. log.Println("Run Over...Count:", count)
  330. }
  331. // SaveEs 过滤后数据存库
  332. func SaveEs() {
  333. log.Println("Es Save...")
  334. arru := make([]map[string]interface{}, 100)
  335. indexu := 0
  336. for {
  337. select {
  338. case v := <-EsSaveCache:
  339. arru[indexu] = v
  340. indexu++
  341. if indexu == 100 {
  342. SP <- true
  343. go func(arru []map[string]interface{}) {
  344. defer func() {
  345. <-SP
  346. }()
  347. Es.BulkSave(Index, arru)
  348. }(arru)
  349. arru = make([]map[string]interface{}, 100)
  350. indexu = 0
  351. }
  352. case <-time.After(1000 * time.Millisecond):
  353. if indexu > 0 {
  354. SP <- true
  355. go func(arru []map[string]interface{}) {
  356. defer func() {
  357. <-SP
  358. }()
  359. Es.BulkSave(Index, arru)
  360. }(arru[:indexu])
  361. arru = make([]map[string]interface{}, 100)
  362. indexu = 0
  363. }
  364. }
  365. }
  366. }