task.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package main
  2. import (
  3. "encoding/json"
  4. "github.com/tealeg/xlsx"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.uber.org/zap"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "regexp"
  11. "strings"
  12. "tieta_data/config"
  13. "time"
  14. )
  15. var (
  16. queryClose = make(chan bool)
  17. queryCloseOver = make(chan bool)
  18. pool = make(chan bool, 2)
  19. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  20. _datereg = regexp.MustCompile("20[0-2][0-9][年-][0-9]{1,2}[月-][0-9]{1,2}[日-]([0-9]{1,2}时[0-9]{0,2})?")
  21. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  22. replaceStr = regexp.MustCompile("(工程|采购|项目|[?!、【】()—()--]|栏标价|中标候选人|招标代理)")
  23. )
  24. func taskExcelP(tmp map[string]interface{}, row *xlsx.Row) {
  25. for _, v := range FieldArr {
  26. if FieldMap[v] != "" {
  27. if v == "项目id" {
  28. row.AddCell().SetValue(mongodb.BsonIdToSId(tmp["_id"]))
  29. } else if v == "预算金额(元)" || v == "中标金额(元)" {
  30. if tmp[FieldMap[v]] != nil {
  31. row.AddCell().SetValue(tmp[FieldMap[v]])
  32. } else {
  33. row.AddCell().SetValue("")
  34. }
  35. } else if v == "招标数据更新时间" || v == "中标数据更新时间" {
  36. p := util.Int64All(tmp[FieldMap[v]])
  37. if p > 0 {
  38. row.AddCell().SetValue(util.FormatDateByInt64(&p, util.Date_Full_Layout))
  39. } else {
  40. row.AddCell().SetValue("")
  41. }
  42. } else {
  43. row.AddCell().SetValue(util.ObjToString(tmp[FieldMap[v]]))
  44. }
  45. } else {
  46. row.AddCell().SetValue("")
  47. }
  48. }
  49. }
  50. func taskExcelB(tmp map[string]interface{}, row *xlsx.Row) {
  51. for _, v := range FieldArr1 {
  52. if FieldMap1[v] != "" {
  53. if v == "信息id" {
  54. row.AddCell().SetValue(util.ObjToString(tmp["id"]))
  55. } else if v == "项目id" {
  56. info, _ := MongoTool.FindOne(config.Conf.DB.Mongo.Pcoll, bson.M{"ids": util.ObjToString(tmp["id"])})
  57. if len(*info) > 0 {
  58. row.AddCell().SetValue(mongodb.BsonIdToSId((*info)["_id"]))
  59. } else {
  60. row.AddCell().SetValue("")
  61. }
  62. } else if v == "发布时间" || v == "标书获取时间" || v == "标书截止时间" || v == "投标开始时间" || v == "投标截止时间" || v == "开标时间" {
  63. p := util.Int64All(tmp[FieldMap1[v]])
  64. if p > 0 {
  65. row.AddCell().SetValue(util.FormatDateByInt64(&p, util.Date_Full_Layout))
  66. } else {
  67. row.AddCell().SetValue("")
  68. }
  69. } else if v == "预算金额(元)" || v == "中标金额(元)" {
  70. if tmp[FieldMap1[v]] != nil {
  71. row.AddCell().SetValue(tmp[FieldMap1[v]])
  72. } else {
  73. row.AddCell().SetValue("")
  74. }
  75. } else if v == "是否电子招标" {
  76. if util.ObjToString(tmp[FieldMap1[v]]) != "" {
  77. row.AddCell().SetValue("是")
  78. } else {
  79. row.AddCell().SetValue("否")
  80. }
  81. } else {
  82. row.AddCell().SetValue(util.ObjToString(tmp[FieldMap1[v]]))
  83. }
  84. } else {
  85. row.AddCell().SetValue("")
  86. }
  87. }
  88. }
  89. func loadData() {
  90. sess := MongoTool.GetMgoConn()
  91. defer MongoTool.DestoryMongoConn(sess)
  92. it := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Pcoll).Find(nil).Select(nil).Iter()
  93. n := 0
  94. result := make(map[string]interface{})
  95. if it.Next(&result) {
  96. n++
  97. bys, _ := json.Marshal(result)
  98. var pro *Project
  99. _ = json.Unmarshal(bys, &pro)
  100. id := pro.Id.Hex()
  101. for _, v := range append([]string{pro.ProjectName}, pro.MPN...) {
  102. if v != "" {
  103. //v = pcReplace.ReplaceAllString(v, "")
  104. if v != "" {
  105. k := mapPn[v]
  106. if k == nil {
  107. k = &Key{Arr: []string{id}}
  108. mapPn[v] = k
  109. } else {
  110. k.Arr = append(k.Arr, id)
  111. }
  112. }
  113. }
  114. }
  115. for _, v := range append([]string{pro.ProjectCode}, pro.MPC...) {
  116. if v != "" {
  117. if v != "" {
  118. k := mapPc[v]
  119. if k == nil {
  120. k = &Key{Arr: []string{id}}
  121. mapPc[v] = k
  122. } else {
  123. k.Arr = append(k.Arr, id)
  124. }
  125. }
  126. }
  127. }
  128. if pro.Buyer != "" && len([]rune(pro.Buyer)) > 2 {
  129. k := mapPb[pro.Buyer]
  130. if k == nil {
  131. k = &Key{Arr: []string{id}}
  132. mapPb[pro.Buyer] = k
  133. } else {
  134. k.Arr = append(k.Arr, id)
  135. }
  136. }
  137. AllIdsMap[id] = &ID{Id: id, P: pro}
  138. }
  139. time.Sleep(2 * time.Second)
  140. log.Info("load project over..", zap.Int("n", n))
  141. }
  142. func taskProject(mapinfo map[string]interface{}) {
  143. q := make(map[string]interface{})
  144. gtid := util.ObjToString(mapinfo["gtid"])
  145. lteid := util.ObjToString(mapinfo["lteid"])
  146. q["_id"] = map[string]interface{}{
  147. "$lte": mongodb.StringTOBsonId(lteid),
  148. "$gte": mongodb.StringTOBsonId(gtid),
  149. }
  150. //生成查询语句执行
  151. log.Info("查询语句:", zap.Any("q", q))
  152. count := 0
  153. sess := MongoTool.GetMgoConn()
  154. defer MongoTool.DestoryMongoConn(sess)
  155. infoPool := make(chan map[string]interface{}, 2000)
  156. over := make(chan bool)
  157. go func() {
  158. L:
  159. for {
  160. select {
  161. case tmp := <-infoPool:
  162. pool <- true
  163. go func(tmp map[string]interface{}) {
  164. defer func() {
  165. <-pool
  166. }()
  167. info := ParseInfo(tmp)
  168. startProjectMerge(info, tmp)
  169. }(tmp)
  170. default:
  171. select {
  172. case tmp := <-infoPool:
  173. pool <- true
  174. go func(tmp map[string]interface{}) {
  175. defer func() {
  176. <-pool
  177. }()
  178. info := ParseInfo(tmp)
  179. startProjectMerge(info, tmp)
  180. }(tmp)
  181. case <-over:
  182. break L
  183. }
  184. }
  185. }
  186. }()
  187. fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0, "detail": 0, "contenthtml": 0, "jsondata": 0}
  188. ms := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(q).Select(fields)
  189. query := ms.Iter()
  190. var lastid interface{}
  191. L:
  192. for {
  193. select {
  194. case <-queryClose:
  195. log.Error("receive interrupt sign")
  196. queryCloseOver <- true
  197. break L
  198. default:
  199. tmp := make(map[string]interface{})
  200. if query.Next(&tmp) {
  201. lastid = tmp["_id"]
  202. if count%2000 == 0 {
  203. log.Info("current", zap.Int("count", count), zap.Any("lastid", lastid))
  204. }
  205. infoPool <- tmp
  206. count++
  207. } else {
  208. break L
  209. }
  210. }
  211. }
  212. time.Sleep(5 * time.Second)
  213. over <- true
  214. ////阻塞
  215. for n := 0; n < 1; n++ {
  216. pool <- true
  217. }
  218. log.Info("所有线程执行完成...", zap.Int("count:", count))
  219. }
  220. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  221. bys, _ := json.Marshal(tmp)
  222. var thisinfo *Info
  223. _ = json.Unmarshal(bys, &thisinfo)
  224. if thisinfo == nil {
  225. return nil
  226. }
  227. if len(thisinfo.Topscopeclass) == 0 {
  228. thisinfo.Topscopeclass = []string{}
  229. }
  230. if len(thisinfo.Subscopeclass) == 0 {
  231. thisinfo.Subscopeclass = []string{}
  232. }
  233. if thisinfo.SubType == "" {
  234. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  235. }
  236. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  237. //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  238. //if thisinfo.ProjectName != "" {
  239. thisinfo.pnbval++
  240. //}
  241. }
  242. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  243. if thisinfo.ProjectCode != "" {
  244. //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  245. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  246. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  247. }
  248. } else {
  249. //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  250. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  251. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  252. }
  253. }
  254. thisinfo.pnbval++
  255. }
  256. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  257. thisinfo.PTC = ""
  258. }
  259. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  260. thisinfo.pnbval++
  261. } else {
  262. thisinfo.Buyer = ""
  263. }
  264. //winners整理、清理
  265. winner := util.ObjToString(tmp["winner"])
  266. m1 := map[string]bool{}
  267. winners := []string{}
  268. if winner != "" {
  269. m1[winner] = true
  270. winners = append(winners, winner)
  271. }
  272. thisinfo.Winners = winners
  273. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  274. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  275. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  276. return thisinfo
  277. }