task.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. package main
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/tealeg/xlsx"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.uber.org/zap"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "regexp"
  13. "strings"
  14. "sync"
  15. "tieta_data/config"
  16. )
  17. var (
  18. queryClose = make(chan bool)
  19. queryCloseOver = make(chan bool)
  20. pool = make(chan bool, 2)
  21. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  22. _datereg = regexp.MustCompile("20[0-2][0-9][年-][0-9]{1,2}[月-][0-9]{1,2}[日-]([0-9]{1,2}时[0-9]{0,2})?")
  23. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  24. replaceStr = regexp.MustCompile("(工程|采购|项目|[?!、【】()—()--]|栏标价|中标候选人|招标代理)")
  25. )
  26. func taskExcelP(tmp map[string]interface{}, row *xlsx.Row) {
  27. for _, v := range FieldArr {
  28. if FieldMap[v] != "" {
  29. if v == "项目id" {
  30. row.AddCell().SetValue(mongodb.BsonIdToSId(tmp["_id"]))
  31. } else if v == "预算金额(元)" || v == "中标金额(元)" {
  32. if tmp[FieldMap[v]] != nil {
  33. row.AddCell().SetValue(tmp[FieldMap[v]])
  34. } else {
  35. row.AddCell().SetValue("")
  36. }
  37. } else if v == "招标数据更新时间" || v == "中标数据更新时间" {
  38. p := util.Int64All(tmp[FieldMap[v]])
  39. if p > 0 {
  40. row.AddCell().SetValue(util.FormatDateByInt64(&p, util.Date_Full_Layout))
  41. } else {
  42. row.AddCell().SetValue("")
  43. }
  44. } else {
  45. row.AddCell().SetValue(util.ObjToString(tmp[FieldMap[v]]))
  46. }
  47. } else {
  48. row.AddCell().SetValue("")
  49. }
  50. }
  51. }
  52. func taskExcelB(tmp map[string]interface{}, row *xlsx.Row) {
  53. for _, v := range FieldArr1 {
  54. if FieldMap1[v] != "" {
  55. if v == "信息id" {
  56. row.AddCell().SetValue(util.ObjToString(tmp["id"]))
  57. } else if v == "项目id" {
  58. info, _ := MongoTool.FindOne(config.Conf.DB.Mongo.Pcoll, bson.M{"ids": util.ObjToString(tmp["id"])})
  59. if len(*info) > 0 {
  60. row.AddCell().SetValue(mongodb.BsonIdToSId((*info)["_id"]))
  61. } else {
  62. row.AddCell().SetValue("")
  63. }
  64. } else if v == "发布时间" || v == "标书获取时间" || v == "标书截止时间" || v == "投标开始时间" || v == "投标截止时间" || v == "开标时间" {
  65. p := util.Int64All(tmp[FieldMap1[v]])
  66. if p > 0 {
  67. row.AddCell().SetValue(util.FormatDateByInt64(&p, util.Date_Full_Layout))
  68. } else {
  69. row.AddCell().SetValue("")
  70. }
  71. } else if v == "预算金额(元)" || v == "中标金额(元)" {
  72. if tmp[FieldMap1[v]] != nil {
  73. row.AddCell().SetValue(tmp[FieldMap1[v]])
  74. } else {
  75. row.AddCell().SetValue("")
  76. }
  77. } else if v == "是否电子招标" {
  78. if util.ObjToString(tmp[FieldMap1[v]]) != "" {
  79. row.AddCell().SetValue("是")
  80. } else {
  81. row.AddCell().SetValue("否")
  82. }
  83. } else {
  84. row.AddCell().SetValue(util.ObjToString(tmp[FieldMap1[v]]))
  85. }
  86. } else {
  87. row.AddCell().SetValue("")
  88. }
  89. }
  90. }
  91. func loadData() {
  92. finalId := 0
  93. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT msg_id FROM %s ORDER BY msg_id DESC LIMIT 1", config.Conf.DB.Mysql.Pcoll))
  94. if len(*lastInfo) > 0 {
  95. finalId = util.IntAll((*lastInfo)[0]["msg_id"])
  96. }
  97. util.Debug("loadData---", "finally id", finalId)
  98. lastid, count := 0, 0
  99. for {
  100. util.Debug("重新查询,lastid---", lastid)
  101. q := fmt.Sprintf("SELECT * FROM %s WHERE msg_id > %d ORDER BY msg_id ASC limit 100000", config.Conf.DB.Mysql.Pcoll, lastid)
  102. rows, err := MysqlTool.DB.Query(q)
  103. if err != nil {
  104. util.Debug("loadData---", err)
  105. }
  106. columns, err := rows.Columns()
  107. if finalId == lastid {
  108. util.Debug("---loadData-finish----", count)
  109. break
  110. }
  111. for rows.Next() {
  112. scanArgs := make([]interface{}, len(columns))
  113. values := make([]interface{}, len(columns))
  114. ret := make(map[string]interface{})
  115. for k := range values {
  116. scanArgs[k] = &values[k]
  117. }
  118. err = rows.Scan(scanArgs...)
  119. if err != nil {
  120. util.Debug("loadData---", err)
  121. break
  122. }
  123. for i, col := range values {
  124. if v, ok := col.([]uint8); ok {
  125. ret[columns[i]] = string(v)
  126. } else {
  127. ret[columns[i]] = col
  128. }
  129. }
  130. lastid = util.IntAll(ret["msg_id"])
  131. count++
  132. if count%20000 == 0 {
  133. util.Debug("loadData current-------", count, lastid)
  134. }
  135. pool <- true
  136. wg.Add(1)
  137. func(result map[string]interface{}) {
  138. defer func() {
  139. <-pool
  140. wg.Done()
  141. }()
  142. bys, _ := json.Marshal(result)
  143. var pro *Project
  144. _ = json.Unmarshal(bys, &pro)
  145. id := pro.ProjectId
  146. if v := pro.ProjectName; v != "" {
  147. k := mapPn[v]
  148. if k == nil {
  149. k = &Key{Arr: []string{id}}
  150. mapPn[v] = k
  151. } else {
  152. k.Arr = append(k.Arr, id)
  153. }
  154. }
  155. if v := pro.ProjectCode; v != "" {
  156. k := mapPc[v]
  157. if k == nil {
  158. k = &Key{Arr: []string{id}}
  159. mapPc[v] = k
  160. } else {
  161. k.Arr = append(k.Arr, id)
  162. }
  163. }
  164. if pro.Buyer != "" && len([]rune(pro.Buyer)) > 2 {
  165. k := mapPb[pro.Buyer]
  166. if k == nil {
  167. k = &Key{Arr: []string{id}}
  168. mapPb[pro.Buyer] = k
  169. } else {
  170. k.Arr = append(k.Arr, id)
  171. }
  172. }
  173. AllIdsMap[id] = &ID{Id: id, P: pro}
  174. }(ret)
  175. ret = make(map[string]interface{})
  176. }
  177. _ = rows.Close()
  178. wg.Wait()
  179. }
  180. log.Info("load project over..", zap.Int("n", count))
  181. }
  182. func taskProject() {
  183. pool := make(chan bool, 2) //控制线程数
  184. wg := &sync.WaitGroup{}
  185. finalId := 0
  186. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT msg_id FROM %s where projectId is null ORDER BY msg_id DESC LIMIT 1", config.Conf.DB.Mysql.Coll))
  187. if len(*lastInfo) > 0 {
  188. finalId = util.IntAll((*lastInfo)[0]["msg_id"])
  189. }
  190. util.Debug("taskProject---", "finally id", finalId)
  191. lastid, count := 0, 0
  192. for {
  193. util.Debug("重新查询,lastid---", lastid)
  194. q := fmt.Sprintf("SELECT * FROM %s WHERE msg_id > %d And projectId is null ORDER BY msg_id ASC limit 100000", config.Conf.DB.Mysql.Coll, lastid)
  195. var stmtOut *sql.Stmt
  196. var tx *sql.Tx
  197. var err error
  198. if tx == nil {
  199. stmtOut, err = MysqlTool.DB.Prepare(q)
  200. } else {
  201. stmtOut, err = tx.Prepare(q)
  202. }
  203. rows, err := stmtOut.Query()
  204. if err != nil {
  205. util.Debug("taskProject---", err)
  206. }
  207. columns, err := rows.Columns()
  208. if finalId == lastid {
  209. util.Debug("----finish----------", count)
  210. break
  211. }
  212. for rows.Next() {
  213. scanArgs := make([]interface{}, len(columns))
  214. values := make([]interface{}, len(columns))
  215. ret := make(map[string]interface{})
  216. for k := range values {
  217. scanArgs[k] = &values[k]
  218. }
  219. err = rows.Scan(scanArgs...)
  220. if err != nil {
  221. util.Debug("taskProject---", err)
  222. break
  223. }
  224. for i, col := range values {
  225. if v, ok := col.([]uint8); ok {
  226. ret[columns[i]] = string(v)
  227. } else {
  228. ret[columns[i]] = col
  229. }
  230. }
  231. lastid = util.IntAll(ret["msg_id"])
  232. count++
  233. if count%2000 == 0 {
  234. util.Debug("current-------", count, lastid)
  235. }
  236. pool <- true
  237. wg.Add(1)
  238. go func(tmp map[string]interface{}) {
  239. defer func() {
  240. <-pool
  241. wg.Done()
  242. }()
  243. info := ParseInfo(tmp)
  244. startProjectMerge(info, tmp)
  245. }(ret)
  246. ret = make(map[string]interface{})
  247. }
  248. _ = rows.Close()
  249. stmtOut.Close()
  250. wg.Wait()
  251. }
  252. log.Info("所有线程执行完成...", zap.Int("count:", count))
  253. }
  254. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  255. bys, _ := json.Marshal(tmp)
  256. var thisinfo *Info
  257. _ = json.Unmarshal(bys, &thisinfo)
  258. if thisinfo == nil {
  259. return nil
  260. }
  261. if thisinfo.ProjectName == "" {
  262. thisinfo.ProjectName = thisinfo.Title
  263. }
  264. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  265. thisinfo.pnbval++
  266. }
  267. if thisinfo.ProjectCode != "" {
  268. if thisinfo.ProjectCode != "" {
  269. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  270. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  271. }
  272. }
  273. thisinfo.pnbval++
  274. }
  275. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  276. thisinfo.pnbval++
  277. } else {
  278. thisinfo.Buyer = ""
  279. }
  280. if tmp["multipackage"] == nil || util.ObjToString(tmp["multipackage"]) == "否" {
  281. thisinfo.MultiPackage = 0
  282. } else {
  283. thisinfo.MultiPackage = 1
  284. }
  285. //winners整理、清理
  286. winner := util.ObjToString(tmp["s_winner"])
  287. thisinfo.Winners = strings.Split(winner, ",")
  288. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  289. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  290. return thisinfo
  291. }