main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. "net"
  6. "reflect"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "go.mongodb.org/mongo-driver/bson/primitive"
  12. elastic "app.yhyue.com/moapp/jybase/es"
  13. "github.com/gogf/gf/v2/util/gconv"
  14. "github.com/robfig/cron"
  15. common "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  16. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  17. )
  18. var (
  19. Bidding, Mgo *MongodbSim
  20. Es elastic.Es
  21. Es2 elastic.Es
  22. UdpClient udp.UdpClient
  23. JyUdpAddr *net.UDPAddr
  24. cfg = new(Config)
  25. ClearHtml = regexp.MustCompile("<[^>]*>")
  26. BiddingField = make(map[string]string)
  27. BiddingLevelField = make(map[string]map[string]string)
  28. TimeV1 = regexp.MustCompile("^(\\d{4})[年.]?$")
  29. TimeV2 = regexp.MustCompile("^(\\d{4})[年./-]?(\\d{1,2})[月./-]?$")
  30. TimeV3 = regexp.MustCompile("^(\\d{4})[年./-]?(\\d{1,2})[月./-]?(\\d{1,2})[日]?$")
  31. TimeClear = regexp.MustCompile("[年|月|日|/|.|-]")
  32. )
  33. type UdpNode struct {
  34. data []byte
  35. addr *net.UDPAddr
  36. timestamp int64
  37. retry int
  38. }
  39. func init() {
  40. common.ReadConfig(&cfg)
  41. log.Println("配置文件 ", cfg)
  42. Mgo = &MongodbSim{
  43. MongodbAddr: cfg.Mgo.Address,
  44. DbName: cfg.Mgo.DbName,
  45. Size: cfg.Mgo.DbSize,
  46. UserName: cfg.Mgo.UserName,
  47. Password: cfg.Mgo.Password,
  48. }
  49. Mgo.InitPool()
  50. Bidding = &MongodbSim{
  51. MongodbAddr: cfg.Bidding.Address,
  52. DbName: cfg.Bidding.DbName,
  53. Size: cfg.Bidding.DbSize,
  54. UserName: cfg.Bidding.UserName,
  55. Password: cfg.Bidding.Password,
  56. }
  57. Bidding.InitPool()
  58. // Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password)
  59. Es = elastic.NewEs("07", "http://172.17.4.184:19908", 50, "jybid", "Top2023_JEB01i@31")
  60. log.Println("初始化完成")
  61. // Es2 = elastic.NewEs("07", "http://192.168.3.241:9205,http://192.168.3.149:9200", 20, "", "")
  62. JyUdpAddr = &net.UDPAddr{
  63. IP: net.ParseIP(cfg.Udp.JyAddr),
  64. Port: cfg.Udp.JyPort,
  65. }
  66. InitEsBiddingField()
  67. UdpClient = udp.UdpClient{Local: ":1799", BufSize: 1024}
  68. UdpClient.Listen(processUdpMsg)
  69. log.Println("Udp服务监听 port:", ":1799")
  70. }
  71. func main() {
  72. run()
  73. c := cron.New()
  74. c.AddFunc("0 */10 * * * ?", run)
  75. c.Start()
  76. ch := make(chan bool, 1)
  77. <-ch
  78. }
  79. func run() {
  80. log.Println("开始执行阳光采购", cfg.LastId)
  81. session := Mgo.GetMgoConn()
  82. lastId := cfg.LastId
  83. query := map[string]interface{}{}
  84. if lastId != "" {
  85. query["_id"] = map[string]interface{}{"$gt": StringTOBsonId(lastId)}
  86. }
  87. log.Println("query :", query)
  88. defer Mgo.DestoryMongoConn(session)
  89. count := 0
  90. iter := session.DB("qfw").C("bidding_yg").Find(&query).Sort("_id").Iter()
  91. thisData := map[string]interface{}{}
  92. for {
  93. if !iter.Next(&thisData) {
  94. break
  95. }
  96. count++
  97. if count%500 == 0 {
  98. log.Println("COUNT ", count)
  99. }
  100. id := common.ObjToString(thisData["id"])
  101. source := common.ObjToString(thisData["source"])
  102. data := Bidding.FindById("bidding", id)
  103. if data != nil && len(data) > 0 {
  104. public_type := "平台发布"
  105. domain_firsttype, domain_secondtype, domain_thirdtype := "", "", ""
  106. if data["gov_classify"] != nil {
  107. if gov_classify, ok := data["gov_classify"].(map[string]interface{}); ok {
  108. root := common.ObjToString(gov_classify["root"])
  109. if root != "" {
  110. rootArr := strings.Split(root, "/")
  111. domain_firsttype = rootArr[0]
  112. if len(rootArr) > 1 {
  113. domain_secondtype = rootArr[1]
  114. }
  115. if len(rootArr) > 2 {
  116. domain_thirdtype = rootArr[2]
  117. }
  118. }
  119. }
  120. }
  121. newData := GetEsField(data)
  122. updateData := map[string]interface{}{
  123. "domain_firsttype": domain_firsttype,
  124. "domain_secondtype": domain_secondtype,
  125. "domain_thirdtype": domain_thirdtype,
  126. }
  127. deliver_area, deliver_city, deliver_district := data["area"], data["city"], data["district"]
  128. if source == "user" {
  129. public_type = "用户发布"
  130. deliver_area, deliver_city, deliver_district = data["deliver_area"], data["deliver_city"], data["deliver_district"]
  131. } else {
  132. updateData["deliver_area"] = deliver_area
  133. updateData["deliver_city"] = deliver_city
  134. updateData["deliver_district"] = deliver_district
  135. }
  136. if source == "is_yg_new" {
  137. newData["is_yg_new"] = 1
  138. }
  139. newData["deliver_area"] = deliver_area
  140. newData["deliver_city"] = deliver_city
  141. newData["deliver_district"] = deliver_district
  142. newData["domain_firsttype"] = domain_firsttype
  143. newData["domain_secondtype"] = domain_secondtype
  144. newData["domain_thirdtype"] = domain_thirdtype
  145. newData["public_type"] = public_type
  146. newData["source_id"] = id
  147. Bidding.UpdateById("bidding", id, map[string]interface{}{"$set": updateData})
  148. if newData["purchasinglist"] != nil {
  149. if purchasinglists, ok := newData["purchasinglist"].(primitive.A); ok {
  150. purchasinglist := common.ObjArrToMapArr(purchasinglists)
  151. if source == "user" && len(purchasinglist) > 1 {
  152. newDatas := newData
  153. itemMap := map[string]string{}
  154. itemArr := []string{}
  155. for _, v := range purchasinglist {
  156. itemname := common.ObjToString(v["itemname"])
  157. if itemname != "" {
  158. itemMap[itemname] = "1"
  159. }
  160. }
  161. for k, _ := range itemMap {
  162. itemArr = append(itemArr, k)
  163. }
  164. citys := gconv.String(deliver_city)
  165. if citys == "" {
  166. citys = gconv.String(deliver_area)
  167. }
  168. newDatas["title"] = strings.Join(itemArr, "/") + "-" + citys
  169. mid := primitive.NewObjectID()
  170. newDatas["_id"] = mid
  171. Mgo.Save("bidding_yg_info", newDatas)
  172. newDatas["_id"] = mid.Hex()
  173. Es.Save("bidding_yg", "", newDatas)
  174. log.Println("保存成功", mid)
  175. } else {
  176. for _, v := range purchasinglist {
  177. newDatas := newData
  178. itemname := common.ObjToString(v["itemname"])
  179. if itemname != "" {
  180. citys := gconv.String(deliver_city)
  181. if citys == "" {
  182. citys = gconv.String(deliver_area)
  183. }
  184. if gconv.String(v["unitname"]) != "" && gconv.String(v["number"]) != "" {
  185. newDatas["title"] = itemname + "-" + gconv.String(v["number"]) + gconv.String(v["unitname"]) + "-" + citys
  186. } else {
  187. newDatas["title"] = itemname + "-" + citys
  188. }
  189. }
  190. mid := primitive.NewObjectID()
  191. newDatas["_id"] = mid
  192. Mgo.Save("bidding_yg_info", newDatas)
  193. newDatas["_id"] = mid.Hex()
  194. Es.Save("bidding_yg", "", newDatas)
  195. log.Println("保存成功", mid)
  196. }
  197. }
  198. } else if purchasinglists, ok := newData["purchasinglist"].([]map[string]interface{}); ok {
  199. purchasinglist := purchasinglists
  200. if source == "user" && len(purchasinglist) > 1 {
  201. newDatas := newData
  202. itemMap := map[string]string{}
  203. itemArr := []string{}
  204. for _, v := range purchasinglist {
  205. itemname := common.ObjToString(v["itemname"])
  206. if itemname != "" {
  207. itemMap[itemname] = "1"
  208. }
  209. }
  210. for k, _ := range itemMap {
  211. itemArr = append(itemArr, k)
  212. }
  213. citys := gconv.String(deliver_city)
  214. if citys == "" {
  215. citys = gconv.String(deliver_area)
  216. }
  217. newDatas["title"] = strings.Join(itemArr, "/") + "-" + citys
  218. mid := primitive.NewObjectID()
  219. newDatas["_id"] = mid
  220. Mgo.Save("bidding_yg_info", newDatas)
  221. newDatas["_id"] = mid.Hex()
  222. Es.Save("bidding_yg", "", newDatas)
  223. log.Println("保存成功", mid)
  224. } else {
  225. for _, v := range purchasinglist {
  226. newDatas := newData
  227. itemname := common.ObjToString(v["itemname"])
  228. if itemname != "" {
  229. citys := gconv.String(deliver_city)
  230. if citys == "" {
  231. citys = gconv.String(deliver_area)
  232. }
  233. if gconv.String(v["unitname"]) != "" && gconv.String(v["number"]) != "" {
  234. newDatas["title"] = itemname + "-" + gconv.String(v["number"]) + gconv.String(v["unitname"]) + "-" + citys
  235. } else {
  236. newDatas["title"] = itemname + "-" + citys
  237. }
  238. }
  239. mid := primitive.NewObjectID()
  240. newDatas["_id"] = mid
  241. Mgo.Save("bidding_yg_info", newDatas)
  242. newDatas["_id"] = mid.Hex()
  243. Es.Save("bidding_yg", "", newDatas)
  244. log.Println("保存成功", mid)
  245. }
  246. }
  247. }
  248. } else {
  249. mid := primitive.NewObjectID()
  250. newData["_id"] = mid
  251. Mgo.Save("bidding_yg_info", newData)
  252. newData["_id"] = mid.Hex()
  253. Es.Save("bidding_yg", "", newData)
  254. log.Println("保存成功", mid)
  255. }
  256. if source == "user" {
  257. mapinfo := map[string]interface{}{
  258. "infoid": id,
  259. "stype": "jyfb_data_over",
  260. }
  261. datas, _ := json.Marshal(mapinfo)
  262. log.Println("信息发布成功", JyUdpAddr, "mapinfo", string(datas))
  263. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, JyUdpAddr)
  264. }
  265. }
  266. cfg.LastId = BsonTOStringId(thisData["_id"])
  267. thisData = map[string]interface{}{}
  268. }
  269. common.WriteSysConfig(&cfg)
  270. log.Println("阳光采购结束", cfg.LastId)
  271. }
  272. func GetEsField(tmp map[string]interface{}) map[string]interface{} {
  273. newTmp := make(map[string]interface{})
  274. for field, ftype := range BiddingField {
  275. if tmp[field] != nil { //
  276. if field == "purchasinglist" { //标的物处理
  277. purchasinglist_new := []map[string]interface{}{}
  278. if pcl, _ := tmp[field].(primitive.A); len(pcl) > 0 {
  279. for _, ls := range pcl {
  280. lsm_new := make(map[string]interface{})
  281. lsm := ls.(map[string]interface{})
  282. for pf, pftype := range BiddingLevelField[field] {
  283. lsmv := lsm[pf]
  284. if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype {
  285. lsm_new[pf] = lsm[pf]
  286. }
  287. }
  288. if lsm_new != nil && len(lsm_new) > 0 {
  289. purchasinglist_new = append(purchasinglist_new, lsm_new)
  290. }
  291. }
  292. }
  293. if len(purchasinglist_new) > 0 {
  294. newTmp[field] = purchasinglist_new
  295. }
  296. } else if field == "procurementlist" {
  297. if tmp["procurementlist"] != nil {
  298. var arr []interface{}
  299. plist := tmp["procurementlist"].(primitive.A)
  300. for _, p := range plist {
  301. p1 := p.(map[string]interface{})
  302. p2 := make(map[string]interface{})
  303. for k, v := range BiddingLevelField[field] {
  304. if k == "projectname" && common.ObjToString(p1[k]) == "" {
  305. p2[k] = common.ObjToString(tmp["projectname"])
  306. } else if k == "buyer" && common.ObjToString(p1[k]) == "" && common.ObjToString(tmp["buyer"]) != "" {
  307. p2[k] = common.ObjToString(tmp["buyer"])
  308. } else if k == "expurasingtime" && common.ObjToString(p1[k]) != "" {
  309. res := getMethod(common.ObjToString(p1[k]))
  310. if res != 0 {
  311. p2[k] = res
  312. }
  313. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  314. p2[k] = p1[k]
  315. }
  316. }
  317. arr = append(arr, p2)
  318. }
  319. if len(arr) > 0 {
  320. newTmp[field] = arr
  321. }
  322. }
  323. } else if field == "projectscope" {
  324. ps, _ := tmp["projectscope"].(string)
  325. newTmp["projectscope"] = ps
  326. } else if field == "winnerorder" { //中标候选
  327. winnerorder_new := []map[string]interface{}{}
  328. if winnerorder, _ := tmp[field].(primitive.A); len(winnerorder) > 0 {
  329. for _, win := range winnerorder {
  330. winMap_new := make(map[string]interface{})
  331. winMap := win.(map[string]interface{})
  332. for wf, wftype := range BiddingLevelField[field] {
  333. wfv := winMap[wf]
  334. if wfv != nil && reflect.TypeOf(wfv).String() == wftype {
  335. if wf == "sort" && common.Int64All(wfv) > 100 {
  336. continue
  337. }
  338. winMap_new[wf] = winMap[wf]
  339. }
  340. }
  341. if winMap_new != nil && len(winMap_new) > 0 {
  342. winnerorder_new = append(winnerorder_new, winMap_new)
  343. }
  344. }
  345. }
  346. if len(winnerorder_new) > 0 {
  347. newTmp[field] = winnerorder_new
  348. }
  349. } else if field == "qualifies" {
  350. //项目资质
  351. qs := []string{}
  352. if q, _ := tmp[field].(primitive.A); len(q) > 0 {
  353. for _, v := range q {
  354. v1 := v.(map[string]interface{})
  355. qs = append(qs, common.ObjToString(v1["key"]))
  356. }
  357. }
  358. if len(qs) > 0 {
  359. newTmp[field] = strings.Join(qs, ",")
  360. }
  361. } else if field == "bidopentime" {
  362. if tmp[field] != nil && tmp["bidendtime"] == nil {
  363. newTmp["bidendtime"] = tmp[field]
  364. newTmp[field] = tmp[field]
  365. } else if tmp[field] == nil && tmp["bidendtime"] != nil {
  366. newTmp["bidendtime"] = tmp[field]
  367. newTmp[field] = tmp["bidendtime"]
  368. } else {
  369. if tmp["bidopentime"] != nil {
  370. newTmp[field] = tmp["bidopentime"]
  371. }
  372. }
  373. } else if field == "detail" { //过滤
  374. detail, _ := tmp[field].(string)
  375. detail = ClearHtml.ReplaceAllString(detail, "")
  376. newTmp[field] = common.ObjToString(tmp["title"]) + " " + detail
  377. } else if field == "topscopeclass" || field == "entidlist" {
  378. newTmp[field] = tmp[field]
  379. } else if field == "_id" {
  380. newTmp["_id"] = BsonTOStringId(tmp["_id"])
  381. newTmp["id"] = BsonTOStringId(tmp["_id"])
  382. } else if field == "publishtime" || field == "comeintime" {
  383. //字段类型不正确,特别处理
  384. if tmp[field] != nil && common.Int64All(tmp[field]) > 0 {
  385. newTmp[field] = common.Int64All(tmp[field])
  386. }
  387. } else if field == "package" {
  388. delete(newTmp, "package")
  389. } else if field == "infoformat" {
  390. newTmp[field] = tmp[field]
  391. } else { //其它字段判断数据类型,不正确舍弃
  392. if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype && ftype != "" {
  393. continue
  394. } else {
  395. if fieldval != "" {
  396. newTmp[field] = fieldval
  397. }
  398. }
  399. }
  400. }
  401. }
  402. newTmp["pici"] = time.Now().Unix()
  403. return newTmp
  404. }
  405. func InitEsBiddingField() {
  406. info, _ := Bidding.Find("bidding_processing_field", map[string]interface{}{"stype": "bidding"}, nil, nil)
  407. if len(info) > 0 {
  408. for _, m := range info {
  409. if common.IntAll(m["level"]) == 1 {
  410. BiddingField[common.ObjToString(m["field"])] = common.ObjToString(m["ftype"])
  411. } else if common.IntAll(m["level"]) == 2 {
  412. pfield := common.ObjToString(m["pfield"])
  413. pfieldMap := BiddingLevelField[pfield]
  414. if pfieldMap == nil {
  415. pfieldMap = make(map[string]string, 0)
  416. }
  417. pfieldMap[common.ObjToString(m["field"])] = common.ObjToString(m["ftype"])
  418. BiddingLevelField[pfield] = pfieldMap
  419. }
  420. }
  421. }
  422. log.Println("BiddingField es 一级字段数量", len(BiddingField))
  423. log.Println("BiddingLevelField es 二级字段数量", len(BiddingLevelField))
  424. }
  425. func getMethod(str string) int64 {
  426. // Handle "YYYY" format
  427. if TimeV1.MatchString(str) {
  428. arr := TimeV1.FindStringSubmatch(str)
  429. st := arr[1] + "0000"
  430. parseInt, err := strconv.ParseInt(st, 10, 64)
  431. if err == nil {
  432. return parseInt
  433. }
  434. }
  435. // Handle "YYYYMM" or "YYYY/MM" or "YYYY-MM" or "YYYY.MM" format
  436. if TimeV2.MatchString(str) {
  437. arr := TimeV2.FindStringSubmatch(str)
  438. year := arr[1]
  439. month := arr[2]
  440. if len(month) == 1 {
  441. month = "0" + month
  442. }
  443. str2 := year + month + "00"
  444. parseInt, err := strconv.ParseInt(str2, 10, 64)
  445. if err == nil {
  446. return parseInt
  447. }
  448. }
  449. // Handle "YYYYMMDD" or "YYYY/MM/DD" or "YYYY-MM-DD" or "YYYY.MM.DD" format
  450. if TimeV3.MatchString(str) {
  451. match := TimeV3.FindStringSubmatch(str)
  452. if len(match) >= 4 {
  453. year := match[1]
  454. month := match[2]
  455. day := match[3]
  456. if len(month) == 1 {
  457. month = "0" + month
  458. }
  459. if len(day) == 1 {
  460. day = "0" + day
  461. }
  462. dateStr := year + month + day
  463. parseInt, err := strconv.ParseInt(dateStr, 10, 64)
  464. if err == nil {
  465. return parseInt
  466. }
  467. }
  468. }
  469. return 0
  470. }
  471. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  472. defer common.Catch()
  473. switch act {
  474. case udp.OP_TYPE_DATA: //上个节点的数据
  475. var mapInfo map[string]interface{}
  476. err := json.Unmarshal(data, &mapInfo)
  477. log.Println("processUdpMsg mapInfo:", mapInfo, err)
  478. case udp.OP_NOOP:
  479. ok := string(data)
  480. log.Println("下节点回应:", ok)
  481. }
  482. }