task.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package main
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/tealeg/xlsx"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.uber.org/zap"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "regexp"
  13. "strings"
  14. "sync"
  15. "tieta_data/config"
  16. )
  17. var (
  18. queryClose = make(chan bool)
  19. queryCloseOver = make(chan bool)
  20. pool = make(chan bool, 2)
  21. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  22. _datereg = regexp.MustCompile("20[0-2][0-9][年-][0-9]{1,2}[月-][0-9]{1,2}[日-]([0-9]{1,2}时[0-9]{0,2})?")
  23. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  24. replaceStr = regexp.MustCompile("(工程|采购|项目|[?!、【】()—()--]|栏标价|中标候选人|招标代理)")
  25. )
  26. func taskExcelP(tmp map[string]interface{}, row *xlsx.Row) {
  27. for _, v := range FieldArr {
  28. if FieldMap[v] != "" {
  29. if v == "项目id" {
  30. row.AddCell().SetValue(mongodb.BsonIdToSId(tmp["_id"]))
  31. } else if v == "预算金额(元)" || v == "中标金额(元)" {
  32. if tmp[FieldMap[v]] != nil {
  33. row.AddCell().SetValue(tmp[FieldMap[v]])
  34. } else {
  35. row.AddCell().SetValue("")
  36. }
  37. } else if v == "招标数据更新时间" || v == "中标数据更新时间" {
  38. p := util.Int64All(tmp[FieldMap[v]])
  39. if p > 0 {
  40. row.AddCell().SetValue(util.FormatDateByInt64(&p, util.Date_Full_Layout))
  41. } else {
  42. row.AddCell().SetValue("")
  43. }
  44. } else {
  45. row.AddCell().SetValue(util.ObjToString(tmp[FieldMap[v]]))
  46. }
  47. } else {
  48. row.AddCell().SetValue("")
  49. }
  50. }
  51. }
  52. func taskExcelB(tmp map[string]interface{}, row *xlsx.Row) {
  53. for _, v := range FieldArr1 {
  54. if FieldMap1[v] != "" {
  55. if v == "信息id" {
  56. row.AddCell().SetValue(util.ObjToString(tmp["id"]))
  57. } else if v == "项目id" {
  58. info, _ := MongoTool.FindOne(config.Conf.DB.Mongo.Pcoll, bson.M{"ids": util.ObjToString(tmp["id"])})
  59. if len(*info) > 0 {
  60. row.AddCell().SetValue(mongodb.BsonIdToSId((*info)["_id"]))
  61. } else {
  62. row.AddCell().SetValue("")
  63. }
  64. } else if v == "发布时间" || v == "标书获取时间" || v == "标书截止时间" || v == "投标开始时间" || v == "投标截止时间" || v == "开标时间" {
  65. p := util.Int64All(tmp[FieldMap1[v]])
  66. if p > 0 {
  67. row.AddCell().SetValue(util.FormatDateByInt64(&p, util.Date_Full_Layout))
  68. } else {
  69. row.AddCell().SetValue("")
  70. }
  71. } else if v == "预算金额(元)" || v == "中标金额(元)" {
  72. if tmp[FieldMap1[v]] != nil {
  73. row.AddCell().SetValue(tmp[FieldMap1[v]])
  74. } else {
  75. row.AddCell().SetValue("")
  76. }
  77. } else if v == "是否电子招标" {
  78. if util.ObjToString(tmp[FieldMap1[v]]) != "" {
  79. row.AddCell().SetValue("是")
  80. } else {
  81. row.AddCell().SetValue("否")
  82. }
  83. } else {
  84. row.AddCell().SetValue(util.ObjToString(tmp[FieldMap1[v]]))
  85. }
  86. } else {
  87. row.AddCell().SetValue("")
  88. }
  89. }
  90. }
  91. func loadData() {
  92. finalId := 0
  93. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT msg_id FROM %s ORDER BY msg_id DESC LIMIT 1", config.Conf.DB.Mysql.Pcoll))
  94. if len(*lastInfo) > 0 {
  95. finalId = util.IntAll((*lastInfo)[0]["msg_id"])
  96. }
  97. util.Debug("loadData---", "finally id", finalId)
  98. lastid, count := 0, 0
  99. for {
  100. util.Debug("重新查询,lastid---", lastid)
  101. q := fmt.Sprintf("SELECT * FROM %s WHERE msg_id > %d ORDER BY msg_id ASC limit 100000", config.Conf.DB.Mysql.Pcoll, lastid)
  102. rows, err := MysqlTool.DB.Query(q)
  103. if err != nil {
  104. util.Debug("loadData---", err)
  105. }
  106. columns, err := rows.Columns()
  107. if finalId == lastid {
  108. util.Debug("---loadData-finish----", count)
  109. break
  110. }
  111. for rows.Next() {
  112. scanArgs := make([]interface{}, len(columns))
  113. values := make([]interface{}, len(columns))
  114. ret := make(map[string]interface{})
  115. for k := range values {
  116. scanArgs[k] = &values[k]
  117. }
  118. err = rows.Scan(scanArgs...)
  119. if err != nil {
  120. util.Debug("loadData---", err)
  121. break
  122. }
  123. for i, col := range values {
  124. if v, ok := col.([]uint8); ok {
  125. ret[columns[i]] = string(v)
  126. } else {
  127. ret[columns[i]] = col
  128. }
  129. }
  130. lastid = util.IntAll(ret["msg_id"])
  131. count++
  132. if count%20000 == 0 {
  133. util.Debug("loadData current-------", count, lastid)
  134. }
  135. pool <- true
  136. wg.Add(1)
  137. func(result map[string]interface{}) {
  138. defer func() {
  139. <-pool
  140. wg.Done()
  141. }()
  142. bys, _ := json.Marshal(result)
  143. var pro *Project
  144. _ = json.Unmarshal(bys, &pro)
  145. id := pro.ProjectId
  146. if v := pro.ProjectName; v != "" {
  147. k := mapPn[v]
  148. if k == nil {
  149. k = &Key{Arr: []string{id}}
  150. mapPn[v] = k
  151. } else {
  152. k.Arr = append(k.Arr, id)
  153. }
  154. }
  155. if v := pro.ProjectCode; v != "" {
  156. k := mapPc[v]
  157. if k == nil {
  158. k = &Key{Arr: []string{id}}
  159. mapPc[v] = k
  160. } else {
  161. k.Arr = append(k.Arr, id)
  162. }
  163. }
  164. if pro.Buyer != "" && len([]rune(pro.Buyer)) > 2 {
  165. k := mapPb[pro.Buyer]
  166. if k == nil {
  167. k = &Key{Arr: []string{id}}
  168. mapPb[pro.Buyer] = k
  169. } else {
  170. k.Arr = append(k.Arr, id)
  171. }
  172. }
  173. AllIdsMap[id] = &ID{Id: id, P: pro}
  174. }(ret)
  175. ret = make(map[string]interface{})
  176. }
  177. _ = rows.Close()
  178. wg.Wait()
  179. }
  180. log.Info("load project over..", zap.Int("n", count))
  181. }
  182. func taskProject() {
  183. pool := make(chan bool, 2) //控制线程数
  184. wg := &sync.WaitGroup{}
  185. IsProject = true
  186. finalId := 0
  187. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT msg_id FROM %s where projectId is null ORDER BY msg_id DESC LIMIT 1", config.Conf.DB.Mysql.Coll))
  188. if len(*lastInfo) > 0 {
  189. finalId = util.IntAll((*lastInfo)[0]["msg_id"])
  190. }
  191. util.Debug("taskProject---", "finally id", finalId)
  192. lastid, count := 0, 0
  193. for {
  194. util.Debug("重新查询,lastid---", lastid)
  195. q := fmt.Sprintf("SELECT * FROM %s WHERE msg_id > %d And projectId is null ORDER BY msg_id ASC limit 100000", config.Conf.DB.Mysql.Coll, lastid)
  196. var stmtOut *sql.Stmt
  197. var tx *sql.Tx
  198. var err error
  199. if tx == nil {
  200. stmtOut, err = MysqlTool.DB.Prepare(q)
  201. } else {
  202. stmtOut, err = tx.Prepare(q)
  203. }
  204. rows, err := stmtOut.Query()
  205. if err != nil {
  206. util.Debug("taskProject---", err)
  207. }
  208. columns, err := rows.Columns()
  209. if finalId == lastid {
  210. util.Debug("----finish----------", count)
  211. _ = rows.Close()
  212. stmtOut.Close()
  213. break
  214. }
  215. for rows.Next() {
  216. scanArgs := make([]interface{}, len(columns))
  217. values := make([]interface{}, len(columns))
  218. ret := make(map[string]interface{})
  219. for k := range values {
  220. scanArgs[k] = &values[k]
  221. }
  222. err = rows.Scan(scanArgs...)
  223. if err != nil {
  224. util.Debug("taskProject---", err)
  225. break
  226. }
  227. for i, col := range values {
  228. if v, ok := col.([]uint8); ok {
  229. ret[columns[i]] = string(v)
  230. } else {
  231. ret[columns[i]] = col
  232. }
  233. }
  234. lastid = util.IntAll(ret["msg_id"])
  235. count++
  236. if count%2000 == 0 {
  237. util.Debug("current-------", count, lastid)
  238. }
  239. pool <- true
  240. wg.Add(1)
  241. go func(tmp map[string]interface{}) {
  242. defer func() {
  243. <-pool
  244. wg.Done()
  245. }()
  246. info := ParseInfo(tmp)
  247. startProjectMerge(info, tmp)
  248. }(ret)
  249. ret = make(map[string]interface{})
  250. }
  251. _ = rows.Close()
  252. stmtOut.Close()
  253. wg.Wait()
  254. }
  255. IsProject = false
  256. log.Info("所有线程执行完成...", zap.Int("count:", count))
  257. }
  258. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  259. bys, _ := json.Marshal(tmp)
  260. var thisinfo *Info
  261. _ = json.Unmarshal(bys, &thisinfo)
  262. if thisinfo == nil {
  263. return nil
  264. }
  265. if thisinfo.ProjectName == "" {
  266. thisinfo.ProjectName = thisinfo.Title
  267. }
  268. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  269. thisinfo.pnbval++
  270. }
  271. if thisinfo.ProjectCode != "" {
  272. if thisinfo.ProjectCode != "" {
  273. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  274. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  275. }
  276. }
  277. thisinfo.pnbval++
  278. }
  279. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  280. thisinfo.pnbval++
  281. } else {
  282. thisinfo.Buyer = ""
  283. }
  284. if tmp["multipackage"] == nil || util.ObjToString(tmp["multipackage"]) == "否" {
  285. thisinfo.MultiPackage = 0
  286. } else {
  287. thisinfo.MultiPackage = 1
  288. }
  289. //winners整理、清理
  290. winner := util.ObjToString(tmp["s_winner"])
  291. thisinfo.Winners = strings.Split(winner, ",")
  292. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  293. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  294. return thisinfo
  295. }