task.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package main
  2. import (
  3. "customer_project/config"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "regexp"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. )
  15. var (
  16. queryClose = make(chan bool)
  17. queryCloseOver = make(chan bool)
  18. pool = make(chan bool, 2)
  19. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  20. _datereg = regexp.MustCompile("20[0-2][0-9][年-][0-9]{1,2}[月-][0-9]{1,2}[日-]([0-9]{1,2}时[0-9]{0,2})?")
  21. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  22. replaceStr = regexp.MustCompile("(工程|采购|项目|[?!、【】()—()--]|栏标价|中标候选人|招标代理)")
  23. )
  24. func loadData() {
  25. finalId := 0
  26. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT msg_id FROM %s ORDER BY msg_id DESC LIMIT 1", config.Conf.DB.Mysql.Pcoll))
  27. if len(*lastInfo) > 0 {
  28. finalId = util.IntAll((*lastInfo)[0]["msg_id"])
  29. }
  30. log.Debug("loadData---", zap.Any("finally id", finalId))
  31. lastid, count := 0, 0
  32. for {
  33. log.Debug("重新查询", zap.Any("lastid---", lastid))
  34. q := fmt.Sprintf("SELECT * FROM %s WHERE msg_id > %d ORDER BY msg_id ASC limit 100000", config.Conf.DB.Mysql.Pcoll, lastid)
  35. rows, err := MysqlTool.DB.Query(q)
  36. if err != nil {
  37. log.Error("loadData---", zap.Error(err))
  38. }
  39. columns, err := rows.Columns()
  40. if finalId == lastid {
  41. log.Debug("---loadData-finish----" + fmt.Sprint(count))
  42. break
  43. }
  44. for rows.Next() {
  45. scanArgs := make([]interface{}, len(columns))
  46. values := make([]interface{}, len(columns))
  47. ret := make(map[string]interface{})
  48. for k := range values {
  49. scanArgs[k] = &values[k]
  50. }
  51. err = rows.Scan(scanArgs...)
  52. if err != nil {
  53. log.Error("loadData---", zap.Error(err))
  54. break
  55. }
  56. for i, col := range values {
  57. if v, ok := col.([]uint8); ok {
  58. ret[columns[i]] = string(v)
  59. } else {
  60. ret[columns[i]] = col
  61. }
  62. }
  63. lastid = util.IntAll(ret["msg_id"])
  64. count++
  65. if count%20000 == 0 {
  66. log.Info("loadData current-------", zap.Any("count", count), zap.Any("lastid", lastid))
  67. }
  68. pool <- true
  69. wg.Add(1)
  70. func(result map[string]interface{}) {
  71. defer func() {
  72. <-pool
  73. wg.Done()
  74. }()
  75. bys, _ := json.Marshal(result)
  76. var pro *Project
  77. _ = json.Unmarshal(bys, &pro)
  78. eid := pro.EntId
  79. id := pro.ProjectId
  80. var mapPn, mapPc, mapPb map[string]*Key
  81. if mapEnt[eid] != nil {
  82. mapPn = mapEnt[eid].mapPn
  83. mapPc = mapEnt[eid].mapPc
  84. mapPb = mapEnt[eid].mapPb
  85. if v := pro.ProjectName; v != "" {
  86. k := mapPn[v]
  87. if k == nil {
  88. k = &Key{Arr: []string{id}}
  89. mapPn[v] = k
  90. } else {
  91. k.Arr = append(k.Arr, id)
  92. }
  93. }
  94. if v := pro.ProjectCode; v != "" {
  95. k := mapPc[v]
  96. if k == nil {
  97. k = &Key{Arr: []string{id}}
  98. mapPc[v] = k
  99. } else {
  100. k.Arr = append(k.Arr, id)
  101. }
  102. }
  103. if pro.Buyer != "" && len([]rune(pro.Buyer)) > 2 {
  104. k := mapPb[pro.Buyer]
  105. if k == nil {
  106. k = &Key{Arr: []string{id}}
  107. mapPb[pro.Buyer] = k
  108. } else {
  109. k.Arr = append(k.Arr, id)
  110. }
  111. }
  112. AllIdsMap[id] = &ID{Id: id, P: pro}
  113. } else {
  114. mapPn = make(map[string]*Key, 1500000)
  115. mapPb = make(map[string]*Key, 5000000)
  116. mapPc = make(map[string]*Key, 5000000)
  117. if v := pro.ProjectName; v != "" {
  118. k := mapPn[v]
  119. if k == nil {
  120. k = &Key{Arr: []string{id}}
  121. mapPn[v] = k
  122. } else {
  123. k.Arr = append(k.Arr, id)
  124. }
  125. }
  126. if v := pro.ProjectCode; v != "" {
  127. k := mapPc[v]
  128. if k == nil {
  129. k = &Key{Arr: []string{id}}
  130. mapPc[v] = k
  131. } else {
  132. k.Arr = append(k.Arr, id)
  133. }
  134. }
  135. if pro.Buyer != "" && len([]rune(pro.Buyer)) > 2 {
  136. k := mapPb[pro.Buyer]
  137. if k == nil {
  138. k = &Key{Arr: []string{id}}
  139. mapPb[pro.Buyer] = k
  140. } else {
  141. k.Arr = append(k.Arr, id)
  142. }
  143. }
  144. AllIdsMap[id] = &ID{Id: id, P: pro}
  145. }
  146. mapEnt[eid] = &EntMap{
  147. mapPb: mapPb,
  148. mapPn: mapPc,
  149. mapPc: mapPn,
  150. }
  151. }(ret)
  152. ret = make(map[string]interface{})
  153. }
  154. _ = rows.Close()
  155. wg.Wait()
  156. }
  157. log.Info("load project over..", zap.Int("n", count))
  158. }
  159. func taskProject() {
  160. pool := make(chan bool, 1) //控制线程数
  161. wg := &sync.WaitGroup{}
  162. finalId := 0
  163. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id FROM %s where projectId is null and status = 1 ORDER BY id DESC LIMIT 1", config.Conf.DB.Mysql.Coll))
  164. if len(*lastInfo) > 0 {
  165. finalId = util.IntAll((*lastInfo)[0]["id"])
  166. }
  167. log.Debug("taskProject---", zap.Any("finalId", finalId))
  168. lastid, count := 0, 0
  169. for {
  170. log.Debug("重新查询,", zap.Any("lastid", lastid))
  171. q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d And projectId is null and status = 1 ORDER BY id ASC limit 10000", config.Conf.DB.Mysql.Coll, lastid)
  172. var stmtOut *sql.Stmt
  173. var tx *sql.Tx
  174. var err error
  175. if tx == nil {
  176. stmtOut, err = MysqlTool.DB.Prepare(q)
  177. } else {
  178. stmtOut, err = tx.Prepare(q)
  179. }
  180. rows, err := stmtOut.Query()
  181. if err != nil {
  182. log.Error("taskProject---", zap.Error(err))
  183. }
  184. columns, err := rows.Columns()
  185. if finalId == lastid {
  186. log.Debug("----finish----------", zap.Int("count", count))
  187. break
  188. }
  189. for rows.Next() {
  190. scanArgs := make([]interface{}, len(columns))
  191. values := make([]interface{}, len(columns))
  192. ret := make(map[string]interface{})
  193. for k := range values {
  194. scanArgs[k] = &values[k]
  195. }
  196. err = rows.Scan(scanArgs...)
  197. if err != nil {
  198. log.Error("taskProject---", zap.Error(err))
  199. break
  200. }
  201. for i, col := range values {
  202. if v, ok := col.([]uint8); ok {
  203. ret[columns[i]] = string(v)
  204. } else {
  205. ret[columns[i]] = col
  206. }
  207. }
  208. lastid = util.IntAll(ret["id"])
  209. count++
  210. if count%2000 == 0 {
  211. log.Debug("current-------", zap.Any("count", count), zap.Any("lastid", lastid))
  212. }
  213. pool <- true
  214. wg.Add(1)
  215. go func(tmp map[string]interface{}) {
  216. defer func() {
  217. <-pool
  218. wg.Done()
  219. }()
  220. info := ParseInfo(tmp)
  221. startProjectMerge(info, tmp)
  222. }(ret)
  223. ret = make(map[string]interface{})
  224. }
  225. _ = rows.Close()
  226. stmtOut.Close()
  227. wg.Wait()
  228. }
  229. log.Info("所有线程执行完成...", zap.Int("count:", count))
  230. }
  231. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  232. bys, _ := json.Marshal(tmp)
  233. var thisinfo *Info
  234. _ = json.Unmarshal(bys, &thisinfo)
  235. if thisinfo == nil {
  236. return nil
  237. }
  238. thisinfo.Budget, _ = strconv.ParseFloat(util.ObjToString(tmp["budget"]), 64)
  239. thisinfo.Bidamount, _ = strconv.ParseFloat(util.ObjToString(tmp["bidamount"]), 64)
  240. if thisinfo.ProjectName == "" {
  241. thisinfo.ProjectName = thisinfo.Title
  242. }
  243. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  244. thisinfo.pnbval++
  245. }
  246. if thisinfo.ProjectCode != "" {
  247. if thisinfo.ProjectCode != "" {
  248. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  249. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  250. }
  251. }
  252. thisinfo.pnbval++
  253. }
  254. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  255. thisinfo.pnbval++
  256. } else {
  257. thisinfo.Buyer = ""
  258. }
  259. //winners整理、清理
  260. winner := util.ObjToString(tmp["s_winner"])
  261. thisinfo.Winners = strings.Split(winner, ",")
  262. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  263. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  264. return thisinfo
  265. }