data.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. package front
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/tealeg/xlsx"
  6. "io/ioutil"
  7. "mongodb"
  8. qu "qfw/util"
  9. "regexp"
  10. "strings"
  11. "sync"
  12. . "task"
  13. . "udptask"
  14. "util"
  15. )
  16. var fileIndexMap = map[string]bool{
  17. "href": true,
  18. "title": true,
  19. "site": true,
  20. "channel": true,
  21. "area": true,
  22. "city": true,
  23. "spidercode": true,
  24. "publishtime": true,
  25. "jsondata": true,
  26. "district": true,
  27. "type": true,
  28. "publishdept": true,
  29. }
  30. var publishtimeReg = regexp.MustCompile("\\d{4}-\\d{2}-\\d{2}")
  31. var fields = map[string]interface{}{"state": 1, "spidercode": 1, "site": 1, "channel": 1, "title": 1, "href": 1}
  32. func (f *Front) PrepareBidding() {
  33. defer qu.Catch()
  34. sess := util.MgoDT.GetMgoConn()
  35. defer util.MgoDT.DestoryMongoConn(sess)
  36. lock := &sync.Mutex{}
  37. wg := &sync.WaitGroup{}
  38. ch := make(chan bool, 10)
  39. fields := map[string]interface{}{
  40. "spidercode": 1,
  41. "title": 1,
  42. "href": 1,
  43. }
  44. query := map[string]interface{}{
  45. "state": map[string]interface{}{
  46. "$ne": 1,
  47. },
  48. }
  49. count := util.MgoDT.Count("bidding_copy", query)
  50. it := sess.DB(util.MgoDT.DbName).C("bidding_copy").Find(&query).Select(&fields).Iter()
  51. n := 0
  52. oknum := 0
  53. arr := [][]map[string]interface{}{}
  54. save := []map[string]interface{}{}
  55. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  56. ch <- true
  57. wg.Add(1)
  58. go func(tmp map[string]interface{}) {
  59. defer func() {
  60. <-ch
  61. wg.Done()
  62. }()
  63. list, _ := util.MgoB.Find("bidding", map[string]interface{}{"title": tmp["title"]}, nil, nil, false, -1, -1)
  64. state := 0 //标记是否找到对应的bidding数据
  65. for _, l := range *list {
  66. if qu.ObjToString(l["href"]) == qu.ObjToString(tmp["href"]) && qu.ObjToString(l["spidercode"]) == qu.ObjToString(tmp["spidercode"]) {
  67. state = 1
  68. lock.Lock()
  69. oknum++
  70. l["state"] = 0
  71. save = append(save, l)
  72. lock.Unlock()
  73. break
  74. }
  75. }
  76. update := []map[string]interface{}{}
  77. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  78. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"state": state}})
  79. lock.Lock()
  80. arr = append(arr, update)
  81. if len(arr) > 500 {
  82. util.MgoDT.UpdateBulk("bidding_copy", arr...)
  83. arr = [][]map[string]interface{}{}
  84. }
  85. if len(save) > 500 {
  86. util.MgoDT.SaveBulk(util.DataColl, save...)
  87. save = []map[string]interface{}{}
  88. }
  89. lock.Unlock()
  90. }(tmp)
  91. if n%1000 == 0 {
  92. qu.Debug("current:", n)
  93. }
  94. tmp = map[string]interface{}{}
  95. }
  96. wg.Wait()
  97. if len(arr) > 0 {
  98. util.MgoDT.UpdateBulk("bidding_copy", arr...)
  99. arr = [][]map[string]interface{}{}
  100. }
  101. if len(save) > 0 {
  102. util.MgoDT.SaveBulk(util.DataColl, save...)
  103. save = []map[string]interface{}{}
  104. }
  105. msg := "bidding_copy表共" + fmt.Sprint(count) + "条数据,成功拉取对应bidding信息" + fmt.Sprint(oknum) + "条"
  106. qu.Debug("bidding表获取信息结果:", msg)
  107. f.ServeJson(map[string]interface{}{"ok": true, "msg": msg})
  108. }
  109. func (f *Front) ViewBidding() {
  110. defer qu.Catch()
  111. if f.Method() == "POST" {
  112. searchStr := f.GetString("search[value]")
  113. search := strings.TrimSpace(searchStr)
  114. start, _ := f.GetInteger("start")
  115. limit, _ := f.GetInteger("length")
  116. draw, _ := f.GetInteger("draw")
  117. query := map[string]interface{}{}
  118. state, _ := f.GetInteger("state")
  119. if state != -1 {
  120. query["state"] = state
  121. }
  122. if search != "" {
  123. query["$or"] = []interface{}{
  124. map[string]interface{}{"title": map[string]interface{}{"$regex": search}},
  125. }
  126. }
  127. qu.Debug("query:", query)
  128. task, _ := util.MgoDT.Find("bidding_copy", query, map[string]interface{}{"_id": -1}, nil, false, start, limit)
  129. count := util.MgoDT.Count("bidding_copy", query)
  130. f.ServeJson(map[string]interface{}{
  131. "draw": draw,
  132. "data": *task,
  133. "recordsFiltered": count,
  134. "recordsTotal": count,
  135. })
  136. } else {
  137. f.Render("task/biddingview.html")
  138. }
  139. }
  140. func (f *Front) ImportData() {
  141. defer qu.Catch()
  142. dataFile, _, err := f.GetFile("xlsx")
  143. msgArr := []string{}
  144. ok := true
  145. coll := ""
  146. save := []map[string]interface{}{}
  147. if err == nil {
  148. binary, _ := ioutil.ReadAll(dataFile)
  149. xls, _ := xlsx.OpenBinary(binary)
  150. sheet := xls.Sheets[0]
  151. coll = sheet.Name //sheetName当做表名
  152. if RunningTask[coll] { //正在处理任务的表禁止导入数据
  153. ok = false
  154. msgArr = append(msgArr, coll+"表已被占用!")
  155. goto END
  156. }
  157. fieldsTmp := map[string]int{} //记录字段对应的位置
  158. for i, row := range sheet.Rows {
  159. if i == 0 { //得到字段属性
  160. for j, cell := range row.Cells {
  161. if fileIndexMap[cell.Value] { //去除无效字段
  162. fieldsTmp[cell.Value] = j
  163. }
  164. }
  165. } else { //生成数据
  166. result := map[string]interface{}{}
  167. cells := row.Cells
  168. cellsLen := len(cells)
  169. for field, index := range fieldsTmp {
  170. if cellsLen >= index+1 {
  171. cell := cells[index]
  172. //字段处理
  173. if field == "comeintime" {
  174. comeintime, _ := cell.Int64()
  175. result[field] = comeintime
  176. } else if field == "publishtime" {
  177. publishtime := cell.Value
  178. if publishtime == "0" || publishtime == "" {
  179. result[field] = "0"
  180. } else if publishtimeReg.MatchString(publishtime) { //2022-10-15 23:49:49
  181. result[field] = publishtime
  182. } else if cell.NumFmt == "m/d/yy h:mm" {
  183. t, _ := cell.GetTime(false)
  184. result[field] = t.Format(qu.Date_Full_Layout)
  185. } else { //其他类型视为异常
  186. ok = false
  187. msgArr = append(msgArr, "第"+fmt.Sprint(i+1)+"行publishtime字段异常")
  188. goto END
  189. }
  190. } else { //其它字段
  191. result[field] = cell.Value
  192. }
  193. }
  194. }
  195. //必要字段缺失校验
  196. noFields := checkField(result)
  197. if len(noFields) == 0 {
  198. result["state"] = 0
  199. result["times"] = 0
  200. save = append(save, result)
  201. } else {
  202. ok = false
  203. msgArr = append(msgArr, "第"+fmt.Sprint(i+1)+"行"+strings.Join(noFields, ",")+"字段不存在")
  204. }
  205. }
  206. }
  207. }
  208. END:
  209. if ok { //保存数据
  210. if len(save) > 0 && coll != "" && !strings.Contains(coll, "Sheet") {
  211. util.MgoDT.SaveBulk(coll, save...)
  212. } else {
  213. ok = false
  214. msgArr = append(msgArr, "存储表异常")
  215. }
  216. }
  217. save = []map[string]interface{}{}
  218. f.ServeJson(map[string]interface{}{"ok": ok, "msg": msgArr})
  219. }
  220. func (f *Front) DataList() {
  221. defer qu.Catch()
  222. coll := f.GetString("coll")
  223. if f.Method() == "POST" {
  224. searchStr := f.GetString("search[value]")
  225. search := strings.TrimSpace(searchStr)
  226. start, _ := f.GetInteger("start")
  227. limit, _ := f.GetInteger("length")
  228. draw, _ := f.GetInteger("draw")
  229. query := map[string]interface{}{}
  230. state, _ := f.GetInteger("state") //数据状态
  231. if state != -2 {
  232. query["state"] = state
  233. }
  234. if search != "" {
  235. query["$or"] = []interface{}{
  236. map[string]interface{}{"title": map[string]interface{}{"$regex": search}},
  237. map[string]interface{}{"site": map[string]interface{}{"$regex": search}},
  238. map[string]interface{}{"channel": map[string]interface{}{"$regex": search}},
  239. map[string]interface{}{"spidercode": map[string]interface{}{"$regex": search}},
  240. }
  241. }
  242. qu.Debug("query:", query)
  243. task, _ := util.MgoDT.Find(coll, query, map[string]interface{}{"_id": 1}, fields, false, start, limit)
  244. count := util.MgoDT.Count(coll, query)
  245. f.ServeJson(map[string]interface{}{
  246. "draw": draw,
  247. "data": *task,
  248. "recordsFiltered": count,
  249. "recordsTotal": count,
  250. })
  251. } else {
  252. f.T["coll"] = coll
  253. f.T["taskid"] = f.GetString("taskid")
  254. f.Render("task/datalist.html", &f.T)
  255. }
  256. }
  257. func (f *Front) DataView() {
  258. defer qu.Catch()
  259. id := f.GetString("id")
  260. coll := f.GetString("coll")
  261. data, _ := util.MgoDT.FindById(coll, id, "")
  262. delete(*data, "_id")
  263. //contenthtml detail 单独拿出来
  264. detail := qu.ObjToString((*data)["detail"])
  265. contenthtml := qu.ObjToString((*data)["contenthtml"])
  266. summary := qu.ObjToString((*data)["summary"])
  267. f.T["detail"] = detail
  268. f.T["contenthtml"] = contenthtml
  269. f.T["summary"] = summary
  270. delete(*data, "detail")
  271. delete(*data, "contenthtml")
  272. delete(*data, "summary")
  273. f.T["id"] = id
  274. f.T["data"] = *data
  275. f.T["coll"] = coll
  276. f.T["datacoll"] = util.DataColl
  277. f.T["taskid"] = f.GetString("taskid")
  278. f.Render("task/dataview.html", &f.T)
  279. }
  280. func (f *Front) DataSend() {
  281. defer qu.Catch()
  282. ok := false
  283. msg := ""
  284. taskid := f.GetString("taskid")
  285. task, _ := util.Mgo.FindById("task", taskid, nil)
  286. if len(*task) > 0 {
  287. flows := (*task)["flows"].([]interface{})
  288. t := &Task{
  289. ID: taskid,
  290. DB: qu.ObjToString((*task)["s_db"]),
  291. Name: qu.ObjToString((*task)["s_name"]),
  292. Coll: qu.ObjToString((*task)["s_coll"]),
  293. Flows: qu.ObjArrToStringArr(flows),
  294. IsBidding: (*task)["isbidding"].(bool),
  295. }
  296. t.SendData()
  297. ok = true
  298. }
  299. f.ServeJson(map[string]interface{}{"rep": ok, "msg": msg})
  300. }
  301. func (f *Front) DataUpdate() {
  302. defer qu.Catch()
  303. updateStr := f.GetStringComm("update")
  304. updateMap := map[string]interface{}{}
  305. ok := false
  306. msg := ""
  307. if err := json.Unmarshal([]byte(updateStr), &updateMap); err != nil {
  308. qu.Debug("data Unmarshal Failed:", err)
  309. } else {
  310. //delete(updateMap, "_id")
  311. coll := f.GetString("coll")
  312. id := f.GetString("id")
  313. //字段类型转换
  314. util.FormatFields(updateMap)
  315. util.FormatNumber(updateMap) //时间类型转换
  316. query := map[string]interface{}{
  317. "_id": mongodb.StringTOBsonId(id),
  318. }
  319. //数据推送
  320. if coll == util.DataColl { //bidding数据推送
  321. //更新数据
  322. updateMap["state"] = 1 //添加数据下载成功标记
  323. ok = util.MgoDT.Update(coll, query, map[string]interface{}{"$set": updateMap}, false, false)
  324. if ok {
  325. q := map[string]interface{}{
  326. "_id": map[string]interface{}{
  327. "$lt": mongodb.StringTOBsonId(id),
  328. },
  329. }
  330. flows := []interface{}{}
  331. task, _ := util.Mgo.FindById("task", f.GetString("taskid"), map[string]interface{}{"flows": 1})
  332. if len(*task) > 0 {
  333. flows, _ = (*task)["flows"].([]interface{}) //任务上配了流程
  334. }
  335. gtid := util.GTEID // 起始id
  336. one, _ := util.MgoDT.Find(coll, q, map[string]interface{}{"_id": -1}, map[string]interface{}{"_id": 1}, false, 0, 1)
  337. if len(*one) == 1 {
  338. gtid = mongodb.BsonIdToSId((*one)[0]["_id"])
  339. }
  340. if len(flows) > 0 { //任务上有处理流程,走处理流程
  341. SendUdp(coll, gtid, id, util.FlowsMap["抽取"].Stype, util.FlowsMap["抽取"].Addr, util.FlowsMap["抽取"].Port) //直接掉抽取
  342. } else { //直接更新
  343. UpdateBiddingData(gtid, id)
  344. }
  345. } else {
  346. msg = "数据更新失败"
  347. }
  348. } else { //非bidding数据推送
  349. data, _ := util.MgoDT.FindById(coll, id, nil) //记录当前表的源数据
  350. //拼装最终数据result
  351. result := map[string]interface{}{
  352. "dataging": 0, //补充dataging字段
  353. "T": "bidding",
  354. }
  355. mustFiledNum := 0 //记录更新有效字段个数
  356. for k, _ := range util.Fields {
  357. if v := updateMap[k]; v != nil {
  358. result[k] = v
  359. mustFiledNum++
  360. } else if v := (*data)[k]; v != nil {
  361. mustFiledNum++
  362. result[k] = v
  363. }
  364. }
  365. qu.Debug(result)
  366. if mustFiledNum > 0 { //保存服务推送有效数据
  367. flag, id, coll := SaveObj(4002, "title", result)
  368. updateMap["state"] = 1 //添加数据下载成功标记
  369. updateMap["sendflag"] = flag
  370. updateMap["biddingid"] = id
  371. updateMap["biddingcoll"] = coll
  372. } else {
  373. msg = "推送失败,无有效字段"
  374. }
  375. //更新数据
  376. ok = util.MgoDT.Update(coll, query, map[string]interface{}{"$set": updateMap}, false, false)
  377. }
  378. }
  379. f.ServeJson(map[string]interface{}{"rep": ok, "msg": msg})
  380. }
  381. func (f *Front) DataDelete() {
  382. defer qu.Catch()
  383. taskid := f.GetString("taskid")
  384. task, _ := util.Mgo.FindById("task", taskid, nil)
  385. ok := false
  386. msg := ""
  387. if len(*task) > 0 {
  388. coll := qu.ObjToString((*task)["s_coll"])
  389. code := f.GetString("code")
  390. codeArr := strings.Split(code, ",")
  391. query := map[string]interface{}{
  392. "spidercode": map[string]interface{}{
  393. "$in": codeArr,
  394. },
  395. }
  396. qu.Debug("删除无效数据的爬虫:", code)
  397. ok = util.MgoDT.Del(coll, query)
  398. } else {
  399. msg = "任务信息查询失败"
  400. }
  401. f.ServeJson(map[string]interface{}{"msg": msg, "ok": ok})
  402. }
  403. func checkField(result map[string]interface{}) (arr []string) {
  404. defer qu.Catch()
  405. for _, f := range []string{"href", "title", "site", "channel", "area", "spidercode", "publishtime"} {
  406. if qu.ObjToString(result[f]) == "" {
  407. arr = append(arr, f)
  408. }
  409. }
  410. return
  411. }