buyertask.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package main
  2. import (
  3. "context"
  4. "esindex/config"
  5. "fmt"
  6. "go.uber.org/zap"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "time"
  10. )
  11. //buyerOnce 处理增量数据
  12. func buyerOnce() {
  13. if len(specialNames) < 1 {
  14. initSpecialNames()
  15. }
  16. rowsPerPage := 1000
  17. now := time.Now()
  18. tarTime := time.Date(now.Year(), now.Month(), now.Day()-1, 00, 00, 00, 00, time.Local)
  19. curTime := tarTime.Format("2006-01-02")
  20. countSql := fmt.Sprintf(`
  21. SELECT
  22. count(t.id)
  23. FROM
  24. dws_f_ent_tags AS t
  25. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  26. WHERE t.createtime >= '%v' OR t.updatetime >= '%v'
  27. ORDER BY t.id DESC LIMIT 1
  28. `, curTime, curTime)
  29. dataCounts := Mysql.CountBySql(countSql)
  30. if dataCounts > 0 {
  31. log.Info("buyerOnce", zap.Any(fmt.Sprintf("总数:%s", curTime), dataCounts))
  32. } else {
  33. log.Info("buyerOnce", zap.String(curTime, "没有更新数据"))
  34. return
  35. }
  36. lastid, total := 0, 0
  37. realCount := 0
  38. for {
  39. query := fmt.Sprintf(`
  40. SELECT
  41. b.name,
  42. t.id,
  43. t.name_id,
  44. b.company_id,
  45. t.createtime,
  46. t.updatetime,
  47. c.area,
  48. c.city,
  49. class.name AS buyerclass
  50. FROM
  51. dws_f_ent_tags AS t
  52. LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
  53. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  54. LEFT JOIN code_area AS c ON b.city_code = c.code
  55. WHERE (t.createtime >= '%v' OR t.updatetime >= '%v') and ( t.id > %d )
  56. ORDER BY t.id ASC
  57. LIMIT %d;
  58. `, curTime, curTime, lastid, rowsPerPage)
  59. ctx := context.Background()
  60. rows, err := Mysql.DB.QueryContext(ctx, query)
  61. if err != nil {
  62. log.Info("dealData", zap.Any("QueryContext err", err))
  63. }
  64. if total >= int(dataCounts) {
  65. log.Info("buyerOnce over", zap.Any("total", total), zap.Any("lastid", lastid))
  66. break
  67. }
  68. columns, err := rows.Columns()
  69. if err != nil {
  70. log.Info("buyerOnce", zap.Any("rows.Columns", err))
  71. }
  72. for rows.Next() {
  73. scanArgs := make([]interface{}, len(columns))
  74. values := make([]interface{}, len(columns))
  75. ret := make(map[string]interface{})
  76. //MySQL 更新
  77. update := map[string]interface{}{}
  78. for k := range values {
  79. scanArgs[k] = &values[k]
  80. }
  81. err = rows.Scan(scanArgs...)
  82. if err != nil {
  83. log.Info("buyerOnce", zap.Any("rows.Scan", err))
  84. break
  85. }
  86. for i, col := range values {
  87. if v, ok := col.([]uint8); ok {
  88. ret[columns[i]] = string(v)
  89. } else {
  90. ret[columns[i]] = col
  91. }
  92. }
  93. total++
  94. if total%100 == 0 {
  95. log.Info("buyerOnce", zap.Int("current total", total))
  96. }
  97. lastid = util.IntAll(ret["id"])
  98. name := util.ObjToString(ret["name"])
  99. //company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里
  100. if util.ObjToString(ret["company_id"]) != "" || specialNames[name] {
  101. realCount++
  102. update["status"] = 1
  103. data := make(map[string]interface{}, 0)
  104. data["name"] = name
  105. data["name_id"] = ret["name_id"]
  106. data["id"] = ret["name_id"]
  107. data["buyer_name"] = name
  108. data["province"] = ret["area"]
  109. data["city"] = ret["city"]
  110. data["buyerclass"] = ret["buyerclass"]
  111. if ret["createtime"] != nil {
  112. if createtime, ok := ret["createtime"].(time.Time); ok {
  113. data["createtime"] = createtime.Unix()
  114. if ret["updatetime"] != nil {
  115. if updatetime, ok := ret["updatetime"].(time.Time); ok {
  116. data["updatetime"] = updatetime.Unix()
  117. }
  118. } else {
  119. data["updatetime"] = createtime.Unix()
  120. }
  121. }
  122. }
  123. sql := fmt.Sprintf(`select count(id) from dws_f_ent_contact where name_id = '%v'`, ret["name_id"])
  124. counts := Mysql.CountBySql(sql)
  125. if counts > 0 {
  126. data["is_contact"] = true
  127. } else {
  128. data["is_contact"] = false
  129. }
  130. //生索引
  131. arrEs := make([]map[string]interface{}, 0) //最终生索引数据
  132. arrEs = append(arrEs, data)
  133. err = Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
  134. if err != nil {
  135. log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err))
  136. }
  137. }
  138. //更新MySQL
  139. where := map[string]interface{}{
  140. "name_id": ret["name_id"],
  141. }
  142. if len(update) > 0 {
  143. Mysql.Update("dws_f_ent_tags", where, update)
  144. }
  145. }
  146. rows.Close()
  147. if err := rows.Err(); err != nil {
  148. log.Info("buyerOnce", zap.Any("err", err))
  149. }
  150. }
  151. log.Info("buyerOnce", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount))
  152. }
  153. //dealSpecialNames 拿到凭安 特企数据 以及爬虫采购单位数据
  154. func initSpecialNames() {
  155. sess := MgoS.GetMgoConn()
  156. defer MgoS.DestoryMongoConn(sess)
  157. query := sess.DB(MgoS.DbName).C("special_enterprise").Find(nil).Iter()
  158. count := 0
  159. log.Info("dealSpecialNames", zap.String("special_enterprise", "special_enterprise"))
  160. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  161. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  162. specialNames[tmp["company_name"].(string)] = true
  163. }
  164. }
  165. //2.
  166. time.Sleep(time.Second)
  167. log.Info("dealSpecialNames", zap.String("special_foundation", "special_foundation"))
  168. query2 := sess.DB("mixdata").C("special_foundation").Find(nil).Iter()
  169. count = 0
  170. for tmp := make(map[string]interface{}); query2.Next(tmp); count++ {
  171. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  172. specialNames[tmp["company_name"].(string)] = true
  173. }
  174. }
  175. //3.
  176. time.Sleep(time.Second)
  177. log.Info("dealSpecialNames", zap.String("special_gov_unit", "special_gov_unit"))
  178. query3 := sess.DB("mixdata").C("special_gov_unit").Find(nil).Iter()
  179. count = 0
  180. for tmp := make(map[string]interface{}); query3.Next(tmp); count++ {
  181. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  182. specialNames[tmp["company_name"].(string)] = true
  183. }
  184. }
  185. //4.
  186. time.Sleep(time.Second)
  187. log.Info("dealSpecialNames", zap.String("special_hongkong_company", "special_hongkong_company"))
  188. query4 := sess.DB("mixdata").C("special_hongkong_company").Find(nil).Iter()
  189. count = 0
  190. for tmp := make(map[string]interface{}); query4.Next(tmp); count++ {
  191. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  192. specialNames[tmp["company_name"].(string)] = true
  193. }
  194. }
  195. //5.
  196. time.Sleep(time.Second)
  197. log.Info("dealSpecialNames", zap.String("special_hongkong_company_history", "special_hongkong_company_history"))
  198. query5 := sess.DB("mixdata").C("special_hongkong_company_history").Find(nil).Iter()
  199. count = 0
  200. for tmp := make(map[string]interface{}); query5.Next(tmp); count++ {
  201. if tmp["use_name"] != nil && tmp["use_name"] != "" {
  202. specialNames[tmp["use_name"].(string)] = true
  203. }
  204. }
  205. //6.
  206. time.Sleep(time.Second)
  207. log.Info("dealSpecialNames", zap.String("special_law_office", "special_law_office"))
  208. query6 := sess.DB("mixdata").C("special_law_office").Find(nil).Iter()
  209. count = 0
  210. for tmp := make(map[string]interface{}); query6.Next(tmp); count++ {
  211. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  212. specialNames[tmp["company_name"].(string)] = true
  213. }
  214. }
  215. //7.
  216. log.Info("dealSpecialNames", zap.String("special_social_organ", "special_social_organ"))
  217. query7 := sess.DB("mixdata").C("special_social_organ").Find(nil).Iter()
  218. count = 0
  219. for tmp := make(map[string]interface{}); query7.Next(tmp); count++ {
  220. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  221. specialNames[tmp["company_name"].(string)] = true
  222. }
  223. }
  224. log.Info("dealSpecialNames", zap.String("special_trade_union", "special_trade_union"))
  225. //8.
  226. query8 := sess.DB("mixdata").C("special_trade_union").Find(nil).Iter()
  227. count = 0
  228. for tmp := make(map[string]interface{}); query8.Next(tmp); count++ {
  229. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  230. specialNames[tmp["company_name"].(string)] = true
  231. }
  232. }
  233. //9. 爬虫采集的采购单位
  234. query9 := sess.DB("mixdata").C("special_buyer_name").Find(nil).Iter()
  235. count = 0
  236. for tmp := make(map[string]interface{}); query9.Next(tmp); count++ {
  237. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  238. specialNames[tmp["company_name"].(string)] = true
  239. }
  240. }
  241. }