main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. package main
  2. import (
  3. "log"
  4. "reflect"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "go.mongodb.org/mongo-driver/bson/primitive"
  10. elastic "app.yhyue.com/moapp/jybase/es"
  11. "github.com/gogf/gf/v2/util/gconv"
  12. "github.com/robfig/cron"
  13. common "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  14. )
  15. var (
  16. Bidding, Mgo *MongodbSim
  17. Es elastic.Es
  18. Es2 elastic.Es
  19. cfg = new(Config)
  20. ClearHtml = regexp.MustCompile("<[^>]*>")
  21. BiddingLevelField = make(map[string]map[string]string)
  22. TimeV1 = regexp.MustCompile("^(\\d{4})[年.]?$")
  23. TimeV2 = regexp.MustCompile("^(\\d{4})[年./-]?(\\d{1,2})[月./-]?$")
  24. TimeV3 = regexp.MustCompile("^(\\d{4})[年./-]?(\\d{1,2})[月./-]?(\\d{1,2})[日]?$")
  25. TimeClear = regexp.MustCompile("[年|月|日|/|.|-]")
  26. )
  27. func init() {
  28. common.ReadConfig(&cfg)
  29. Mgo = &MongodbSim{
  30. // MongodbAddr: "172.17.4.86:27080",
  31. MongodbAddr: "192.168.3.166:27082",
  32. DbName: "yantianlei",
  33. Size: 20,
  34. UserName: "",
  35. Password: "",
  36. }
  37. Mgo.InitPool()
  38. Bidding = &MongodbSim{
  39. // MongodbAddr: "172.17.4.86:27080",
  40. MongodbAddr: "192.168.3.166:27082",
  41. DbName: "yantianlei",
  42. Size: 20,
  43. UserName: "",
  44. Password: "",
  45. }
  46. Bidding.InitPool()
  47. // Es = elastic.NewEs("07", "http://127.0.0.1:9801", 20, "jybid", "Top2023_JEB01i@31")
  48. Es2 = elastic.NewEs("07", "http://192.168.3.241:9205,http://192.168.3.149:9200", 20, "", "")
  49. }
  50. func main() {
  51. c := cron.New()
  52. c.AddFunc("0 */10 * * * ?", run)
  53. c.Start()
  54. ch := make(chan bool, 1)
  55. <-ch
  56. }
  57. func run() {
  58. session := Mgo.GetMgoConn()
  59. lastId := cfg.LastId
  60. query := map[string]interface{}{}
  61. if lastId != "" {
  62. query["_id"] = map[string]interface{}{"$gt": StringTOBsonId(lastId)}
  63. }
  64. log.Println("query :", query)
  65. defer Mgo.DestoryMongoConn(session)
  66. count := 0
  67. iter := session.DB("qfw").C("bidding_yg").Find(&query).Sort("_id").Iter()
  68. thisData := map[string]interface{}{}
  69. for {
  70. if !iter.Next(&thisData) {
  71. break
  72. }
  73. count++
  74. if count%500 == 0 {
  75. log.Println("COUNT ", count)
  76. }
  77. id := common.ObjToString(thisData["id"])
  78. source := common.ObjToString(thisData["source"])
  79. data := Bidding.FindById("bidding", id)
  80. if data != nil && len(data) > 0 {
  81. public_type := "平台发布"
  82. domain_firsttype, domain_secondtype, domain_thirdtype := "", "", ""
  83. if data["gov_classify"] != nil {
  84. if gov_classify, ok := data["gov_classify"].(map[string]interface{}); ok {
  85. root := common.ObjToString(gov_classify["root"])
  86. if root != "" {
  87. rootArr := strings.Split(root, "/")
  88. domain_firsttype = rootArr[0]
  89. if len(rootArr) > 1 {
  90. domain_secondtype = rootArr[1]
  91. }
  92. if len(rootArr) > 2 {
  93. domain_thirdtype = rootArr[2]
  94. }
  95. }
  96. }
  97. }
  98. deliver_area, deliver_city, deliver_district := data["area"], data["city"], data["district"]
  99. if source == "user" {
  100. public_type = "用户发布"
  101. deliver_area, deliver_city, deliver_district = data["deliver_area"], data["deliver_city"], data["deliver_district"]
  102. }
  103. newData := GetEsField(data)
  104. newData["deliver_area"] = deliver_area
  105. newData["deliver_city"] = deliver_city
  106. newData["deliver_district"] = deliver_district
  107. newData["domain_firsttype"] = domain_firsttype
  108. newData["domain_secondtype"] = domain_secondtype
  109. newData["domain_thirdtype"] = domain_thirdtype
  110. newData["public_type"] = public_type
  111. newData["source_id"] = id
  112. Bidding.UpdateById("bidding", id, map[string]interface{}{"$set": map[string]interface{}{
  113. "deliver_area": deliver_area,
  114. "deliver_city": deliver_city,
  115. "deliver_district": deliver_district,
  116. "domain_firsttype": domain_firsttype,
  117. "domain_secondtype": domain_secondtype,
  118. "domain_thirdtype": domain_thirdtype,
  119. }})
  120. if newData["purchasinglist"] != nil {
  121. purchasinglist := common.ObjArrToMapArr(newData["purchasinglist"].(primitive.A))
  122. if source == "user" && len(purchasinglist) > 1 {
  123. newDatas := newData
  124. itemMap := map[string]string{}
  125. itemArr := []string{}
  126. for _, v := range purchasinglist {
  127. itemname := common.ObjToString(v["itemname"])
  128. if itemname != "" {
  129. itemMap[itemname] = "1"
  130. }
  131. }
  132. for k, _ := range itemMap {
  133. itemArr = append(itemArr, k)
  134. }
  135. citys := gconv.String(deliver_city)
  136. if citys == "" {
  137. citys = gconv.String(deliver_area)
  138. }
  139. newDatas["title"] = strings.Join(itemArr, "/") + "-" + citys
  140. mid := primitive.NewObjectID()
  141. newDatas["_id"] = mid
  142. Mgo.Save("bidding_yg_info", newDatas)
  143. newDatas["_id"] = mid.Hex()
  144. Es.Save("bidding_yg", "", newDatas)
  145. } else {
  146. for _, v := range purchasinglist {
  147. newDatas := newData
  148. itemname := common.ObjToString(v["itemname"])
  149. if itemname != "" {
  150. citys := gconv.String(deliver_city)
  151. if citys == "" {
  152. citys = gconv.String(deliver_area)
  153. }
  154. newDatas["title"] = itemname + "-" + gconv.String(v["number"]) + gconv.String(v["unitname"]) + "-" + citys
  155. }
  156. mid := primitive.NewObjectID()
  157. newDatas["_id"] = mid
  158. Mgo.Save("bidding_yg_info", newDatas)
  159. newDatas["_id"] = mid.Hex()
  160. Es.Save("bidding_yg", "", newDatas)
  161. }
  162. }
  163. } else {
  164. mid := primitive.NewObjectID()
  165. newData["_id"] = mid
  166. Mgo.Save("bidding_yg_info", newData)
  167. newData["_id"] = mid.Hex()
  168. Es.Save("bidding_yg", "", newData)
  169. }
  170. }
  171. cfg.LastId = BsonTOStringId(thisData["_id"])
  172. thisData = map[string]interface{}{}
  173. }
  174. common.WriteSysConfig(&cfg)
  175. }
  176. func GetEsField(tmp map[string]interface{}) map[string]interface{} {
  177. newTmp := tmp
  178. for field, _ := range tmp {
  179. if tmp[field] != nil { //
  180. if field == "purchasinglist" { //标的物处理
  181. purchasinglist_new := []map[string]interface{}{}
  182. if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 {
  183. for _, ls := range pcl {
  184. lsm_new := make(map[string]interface{})
  185. lsm := ls.(map[string]interface{})
  186. for pf, pftype := range BiddingLevelField[field] {
  187. lsmv := lsm[pf]
  188. if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype {
  189. lsm_new[pf] = lsm[pf]
  190. }
  191. }
  192. if lsm_new != nil && len(lsm_new) > 0 {
  193. purchasinglist_new = append(purchasinglist_new, lsm_new)
  194. }
  195. }
  196. }
  197. if len(purchasinglist_new) > 0 {
  198. newTmp[field] = purchasinglist_new
  199. }
  200. } else if field == "procurementlist" {
  201. if tmp["procurementlist"] != nil {
  202. var arr []interface{}
  203. plist := tmp["procurementlist"].([]interface{})
  204. for _, p := range plist {
  205. p1 := p.(map[string]interface{})
  206. p2 := make(map[string]interface{})
  207. for k, v := range BiddingLevelField[field] {
  208. if k == "projectname" && common.ObjToString(p1[k]) == "" {
  209. p2[k] = common.ObjToString(tmp["projectname"])
  210. } else if k == "buyer" && common.ObjToString(p1[k]) == "" && common.ObjToString(tmp["buyer"]) != "" {
  211. p2[k] = common.ObjToString(tmp["buyer"])
  212. } else if k == "expurasingtime" && common.ObjToString(p1[k]) != "" {
  213. res := getMethod(common.ObjToString(p1[k]))
  214. if res != 0 {
  215. p2[k] = res
  216. }
  217. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  218. p2[k] = p1[k]
  219. }
  220. }
  221. arr = append(arr, p2)
  222. }
  223. if len(arr) > 0 {
  224. newTmp[field] = arr
  225. }
  226. }
  227. } else if field == "projectscope" {
  228. ps, _ := tmp["projectscope"].(string)
  229. newTmp["projectscope"] = ps
  230. } else if field == "winnerorder" { //中标候选
  231. winnerorder_new := []map[string]interface{}{}
  232. if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 {
  233. for _, win := range winnerorder {
  234. winMap_new := make(map[string]interface{})
  235. winMap := win.(map[string]interface{})
  236. for wf, wftype := range BiddingLevelField[field] {
  237. wfv := winMap[wf]
  238. if wfv != nil && reflect.TypeOf(wfv).String() == wftype {
  239. if wf == "sort" && common.Int64All(wfv) > 100 {
  240. continue
  241. }
  242. winMap_new[wf] = winMap[wf]
  243. }
  244. }
  245. if winMap_new != nil && len(winMap_new) > 0 {
  246. winnerorder_new = append(winnerorder_new, winMap_new)
  247. }
  248. }
  249. }
  250. if len(winnerorder_new) > 0 {
  251. newTmp[field] = winnerorder_new
  252. }
  253. } else if field == "qualifies" {
  254. //项目资质
  255. qs := []string{}
  256. if q, _ := tmp[field].([]interface{}); len(q) > 0 {
  257. for _, v := range q {
  258. v1 := v.(map[string]interface{})
  259. qs = append(qs, common.ObjToString(v1["key"]))
  260. }
  261. }
  262. if len(qs) > 0 {
  263. newTmp[field] = strings.Join(qs, ",")
  264. }
  265. } else if field == "bidopentime" {
  266. if tmp[field] != nil && tmp["bidendtime"] == nil {
  267. newTmp["bidendtime"] = tmp[field]
  268. newTmp[field] = tmp[field]
  269. } else if tmp[field] == nil && tmp["bidendtime"] != nil {
  270. newTmp["bidendtime"] = tmp[field]
  271. newTmp[field] = tmp["bidendtime"]
  272. } else {
  273. if tmp["bidopentime"] != nil {
  274. newTmp[field] = tmp["bidopentime"]
  275. }
  276. }
  277. } else if field == "detail" { //过滤
  278. detail, _ := tmp[field].(string)
  279. detail = ClearHtml.ReplaceAllString(detail, "")
  280. newTmp[field] = common.ObjToString(tmp["title"]) + " " + detail
  281. } else if field == "topscopeclass" || field == "entidlist" {
  282. newTmp[field] = tmp[field]
  283. } else if field == "_id" {
  284. newTmp["_id"] = BsonTOStringId(tmp["_id"])
  285. newTmp["id"] = BsonTOStringId(tmp["_id"])
  286. } else if field == "publishtime" || field == "comeintime" {
  287. //字段类型不正确,特别处理
  288. if tmp[field] != nil && common.Int64All(tmp[field]) > 0 {
  289. newTmp[field] = common.Int64All(tmp[field])
  290. }
  291. } else if field == "package" {
  292. delete(newTmp, "package")
  293. } else if field == "infoformat" {
  294. newTmp[field] = tmp[field]
  295. }
  296. }
  297. }
  298. newTmp["pici"] = time.Now().Unix()
  299. return newTmp
  300. }
  301. func InitEsBiddingField() {
  302. info, _ := Bidding.Find("bidding_processing_field", map[string]interface{}{"stype": "bidding"}, nil, nil)
  303. if len(info) > 0 {
  304. for _, m := range info {
  305. if common.IntAll(m["level"]) == 2 {
  306. pfield := common.ObjToString(m["pfield"])
  307. pfieldMap := BiddingLevelField[pfield]
  308. if pfieldMap == nil {
  309. pfieldMap = make(map[string]string, 0)
  310. }
  311. pfieldMap[common.ObjToString(m["field"])] = common.ObjToString(m["ftype"])
  312. BiddingLevelField[pfield] = pfieldMap
  313. }
  314. }
  315. }
  316. log.Println("BiddingLevelField es 二级字段数量", len(BiddingLevelField))
  317. }
  318. func getMethod(str string) int64 {
  319. // Handle "YYYY" format
  320. if TimeV1.MatchString(str) {
  321. arr := TimeV1.FindStringSubmatch(str)
  322. st := arr[1] + "0000"
  323. parseInt, err := strconv.ParseInt(st, 10, 64)
  324. if err == nil {
  325. return parseInt
  326. }
  327. }
  328. // Handle "YYYYMM" or "YYYY/MM" or "YYYY-MM" or "YYYY.MM" format
  329. if TimeV2.MatchString(str) {
  330. arr := TimeV2.FindStringSubmatch(str)
  331. year := arr[1]
  332. month := arr[2]
  333. if len(month) == 1 {
  334. month = "0" + month
  335. }
  336. str2 := year + month + "00"
  337. parseInt, err := strconv.ParseInt(str2, 10, 64)
  338. if err == nil {
  339. return parseInt
  340. }
  341. }
  342. // Handle "YYYYMMDD" or "YYYY/MM/DD" or "YYYY-MM-DD" or "YYYY.MM.DD" format
  343. if TimeV3.MatchString(str) {
  344. match := TimeV3.FindStringSubmatch(str)
  345. if len(match) >= 4 {
  346. year := match[1]
  347. month := match[2]
  348. day := match[3]
  349. if len(month) == 1 {
  350. month = "0" + month
  351. }
  352. if len(day) == 1 {
  353. day = "0" + day
  354. }
  355. dateStr := year + month + day
  356. parseInt, err := strconv.ParseInt(dateStr, 10, 64)
  357. if err == nil {
  358. return parseInt
  359. }
  360. }
  361. }
  362. return 0
  363. }