projectTask.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/go-ego/gse"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.uber.org/zap"
  8. "io/ioutil"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "proposed_project/config"
  17. "regexp"
  18. "strings"
  19. "sync"
  20. "time"
  21. "unicode/utf8"
  22. )
  23. var (
  24. seg gse.Segmenter
  25. stopWords []string
  26. regNum = regexp.MustCompile(`[\dA-Za-z]{6,30}`)
  27. regSymb = regexp.MustCompile("[•、,,.。??'\"“”‘’·~!@#¥$%…&*()()\\-—+=【】\\[\\]{}{}<>《》|\\/\\s]+")
  28. regDel = regexp.MustCompile("项目|工程|生产|中心")
  29. reg1 = regexp.MustCompile("分布式光伏发电|自然人|出让|国有建设用地使用权")
  30. field = []string{"projectname.pname"}
  31. sField = []string{"projectname", "bidstatus", "firsttime", "_id", "area", "city", "district", "ids"}
  32. esQ = `{"query": {"bool": {"must": [{"multi_match": {"query": "%s","type": "phrase","fields": ["projectname"]}}]}},"size": 100}`
  33. savePpPool = make(chan map[string]interface{}, 5000)
  34. savePpSp = make(chan bool, 3)
  35. )
  36. func initSeg() {
  37. _ = seg.LoadDict("./t_1.txt")
  38. //_ = seg.LoadDict()
  39. seg.AddToken("渼陂", 3, "")
  40. seg.LoadStop("./stopwords.txt")
  41. //f, _ := os.Open("./stopwords.txt")
  42. //defer f.Close()
  43. //scanner := bufio.NewScanner(f)
  44. //for scanner.Scan() {
  45. // stopWords = append(stopWords, scanner.Text())
  46. //}
  47. //sort.Strings(stopWords)
  48. }
  49. // @Description 关联
  50. // @Author J 2023/4/25 10:26
  51. func taskComb() {
  52. sess := MgoPro.GetMgoConn()
  53. defer MgoPro.DestoryMongoConn(sess)
  54. ch := make(chan bool, config.Conf.Serve.Thread)
  55. wg := &sync.WaitGroup{}
  56. f := map[string]interface{}{
  57. "projectname": 1,
  58. "approvecode": 1,
  59. "approvenumber": 1,
  60. "approvestatus": 1,
  61. "area": 1,
  62. "city": 1,
  63. "district": 1,
  64. "ids": 1,
  65. }
  66. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(nil).Select(f).Iter()
  67. count := 0
  68. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  69. if count%2000 == 0 {
  70. log.Info(fmt.Sprintf("current --- %d", count))
  71. }
  72. ch <- true
  73. wg.Add(1)
  74. go func(tmp map[string]interface{}) {
  75. defer func() {
  76. <-ch
  77. wg.Done()
  78. }()
  79. if filterMethod(tmp) {
  80. return
  81. }
  82. var mArr []map[string]interface{}
  83. var eArr []map[string]interface{}
  84. n1, n2 := 0, 0
  85. // approvecode、approvenumber
  86. //q := Method2(util.ObjToString(tmp["approvecode"]), util.ObjToString(tmp["approvenumber"]))
  87. //if q != "" {
  88. // binfo := Es.Get("projectset", q)
  89. // if binfo != nil && len(*binfo) > 0 {
  90. // for _, m := range *binfo {
  91. // n1 = len(*binfo)
  92. // mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 1})
  93. // }
  94. // }
  95. //}
  96. wds, q := Method1(util.ObjToString(tmp["projectname"]))
  97. if q != "" {
  98. binfo := Es.Get("projectset", q)
  99. if binfo != nil && len(*binfo) > 0 {
  100. n2 = len(*binfo)
  101. for _, m := range *binfo {
  102. if b, _ := redis.Exists(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"])); b {
  103. continue
  104. }
  105. if util.ObjToString(m["bidstatus"]) == "拟建" {
  106. eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]})
  107. continue
  108. }
  109. if judgeArea(tmp, m, wds) {
  110. eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]})
  111. continue
  112. }
  113. size := len(m["ids"].([]interface{}))
  114. redis.PutCKV(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"]), fmt.Sprintf("%s-%d", mongodb.BsonIdToSId(tmp["_id"]), size))
  115. redis.PutCKV(config.Conf.DB.Redis.Proposed, mongodb.BsonIdToSId(tmp["_id"]), len(tmp["ids"].([]interface{})))
  116. mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 2})
  117. }
  118. }
  119. }
  120. //if mArr != nil && len(mArr) > 0 {
  121. save := make(map[string]interface{})
  122. save["_id"] = tmp["_id"]
  123. save["ids"] = mArr
  124. save["wds"] = wds
  125. save["err"] = eArr
  126. save["projectname"] = tmp["projectname"]
  127. save["esearch"] = q
  128. save["size_1"] = n1
  129. save["size_2"] = n2
  130. save["createtime"] = time.Now().Unix()
  131. savePpPool <- save
  132. //}
  133. }(tmp)
  134. tmp = make(map[string]interface{})
  135. }
  136. wg.Wait()
  137. log.Info(fmt.Sprintf("over --- %d", count))
  138. }
  139. func Method(pname string) string {
  140. pname = regNum.ReplaceAllString(pname, "")
  141. pname = regSymb.ReplaceAllString(pname, "")
  142. //wds := seg.CutStop(pname, true)
  143. util.Debug(pname)
  144. surl := config.Conf.DB.Es.Addr + "/projectset_v2/_analyze"
  145. URL, _ := url.Parse(surl)
  146. Q := URL.Query()
  147. Q.Add("pretty", "1")
  148. Q.Add("analyzer", "my_ngram_title")
  149. Q.Add("text", pname)
  150. URL.RawQuery = Q.Encode()
  151. resp, err := http.Get(URL.String())
  152. if err != nil {
  153. log.Info("")
  154. }
  155. result, err := ioutil.ReadAll(resp.Body)
  156. if err != nil {
  157. log.Info("")
  158. }
  159. var resMap map[string]interface{}
  160. json.Unmarshal(result, &resMap)
  161. if resMap == nil || len(resMap["tokens"].([]interface{})) == 0 {
  162. log.Info("")
  163. }
  164. tokens := util.ObjArrToMapArr(resMap["tokens"].([]interface{}))
  165. var should []interface{}
  166. for _, t := range tokens {
  167. wd := util.ObjToString(t["token"])
  168. if in(wd) {
  169. // 删除词 跳过
  170. continue
  171. }
  172. mm := &ShouldObject{MultiMatch: &MultiMatch{
  173. Query: wd,
  174. Type: "phrase",
  175. Fields: field,
  176. }}
  177. should = append(should, mm)
  178. }
  179. query := &QueryObject{Query: &BoolObject{Bool: &MMSMObject{Must: should}}}
  180. jsr, err := json.Marshal(query)
  181. if err != nil {
  182. fmt.Printf("Err %v", err)
  183. os.Exit(1)
  184. }
  185. return string(jsr)
  186. }
  187. // @Description jieba分词查询
  188. // @Desc projectname
  189. // @Desc minimum_should_match 当should分支总数小于等于指定的数量时,则必须匹配所有should分支,当should分支总数大于指定的数量时,则应用指定的说明符
  190. // @Author J 2023/3/7 17:13
  191. func Method1(pname string) ([]string, string) {
  192. if pname == "" {
  193. return nil, ""
  194. }
  195. var wds []string
  196. p1 := regSymb.ReplaceAllString(pname, "")
  197. p1 = regNum.ReplaceAllString(p1, "")
  198. if utf8.RuneCountInString(p1) < 12 {
  199. wds = append(wds, pname)
  200. } else {
  201. wds = seg.CutStop(pname, true)
  202. }
  203. if len(wds) > 0 {
  204. wds = combArr(wds)
  205. var should []interface{}
  206. for _, t := range wds {
  207. //if utf8.RuneCountInString(t) == 1 {
  208. // continue
  209. //}
  210. mm := &ShouldObject{MultiMatch: &MultiMatch{
  211. Query: t,
  212. Type: "phrase",
  213. Fields: field,
  214. }}
  215. should = append(should, mm)
  216. }
  217. if should == nil || len(should) <= 0 {
  218. return nil, ""
  219. }
  220. query := &QueryObject{Query: &BoolObject{Bool: &MMSMObject{Should: should, MinSdMatch: config.Conf.DB.Es.MinSdMh}}, Source: sField, Size: 1000}
  221. jsr, err := json.Marshal(query)
  222. if err != nil {
  223. fmt.Printf("Err %v", err)
  224. os.Exit(1)
  225. }
  226. return wds, strings.Replace(string(jsr), "\\u003c", "<", -1)
  227. } else {
  228. return nil, ""
  229. }
  230. }
  231. // @Description
  232. // @Author J 2023/3/20 14:39
  233. func Method2(acode, anumb string) string {
  234. if acode == "" && anumb == "" {
  235. return ""
  236. }
  237. var should []interface{}
  238. if acode != "" {
  239. should = append(should, &ShouldObject{MultiMatch: &MultiMatch{
  240. Query: acode,
  241. Type: "phrase",
  242. Fields: []string{"detail"},
  243. }})
  244. }
  245. if anumb != "" {
  246. should = append(should, &ShouldObject{MultiMatch: &MultiMatch{
  247. Query: anumb,
  248. Type: "phrase",
  249. Fields: []string{"detail"},
  250. }})
  251. }
  252. query := &QueryObject1{Query: &BoolObject1{Bool: &MMSObject{Should: should}}, Source: sField, Size: 500}
  253. jsr, err := json.Marshal(query)
  254. if err != nil {
  255. fmt.Printf("Err %v", err)
  256. os.Exit(1)
  257. }
  258. return string(jsr)
  259. }
  260. func judgeArea(tmp, btmp map[string]interface{}, wds []string) bool {
  261. if util.ObjToString(tmp["area"]) == "全国" || util.ObjToString(btmp["area"]) == "全国" {
  262. pname := util.ObjToString(btmp["projectname"])
  263. for _, wd := range wds {
  264. if !strings.Contains(strings.ToLower(pname), strings.ToLower(wd)) {
  265. return true
  266. }
  267. }
  268. return false
  269. }
  270. if tmp["district"] != nil && btmp["district"] != nil {
  271. if util.ObjToString(tmp["district"]) == util.ObjToString(btmp["district"]) {
  272. return false
  273. } else {
  274. return true
  275. }
  276. } else if tmp["city"] != nil && btmp["city"] != nil {
  277. if util.ObjToString(tmp["city"]) == util.ObjToString(btmp["city"]) {
  278. return false
  279. } else {
  280. return true
  281. }
  282. } else if tmp["area"] != nil && btmp["area"] != nil {
  283. if util.ObjToString(tmp["area"]) == util.ObjToString(btmp["area"]) {
  284. return false
  285. } else {
  286. return true
  287. }
  288. }
  289. return false
  290. }
  291. var regs = []*regexp.Regexp{
  292. regexp.MustCompile("[\\\\u4e00-\\\\u9fa5]{0,10}私宅"),
  293. regexp.MustCompile("自然人|分布式光伏发电"),
  294. }
  295. // @Description 过滤数据
  296. // @Author J 2023/3/24 09:30
  297. func filterMethod(tmp map[string]interface{}) bool {
  298. pname := regSymb.ReplaceAllString(util.ObjToString(tmp["projectname"]), "")
  299. pname = regNum.ReplaceAllString(pname, "")
  300. p1 := regDel.ReplaceAllString(pname, "")
  301. p1 = regSymb.ReplaceAllString(p1, "")
  302. if utf8.RuneCountInString(p1) <= 5 {
  303. return true
  304. }
  305. if len(reg1.FindAllString(pname, -1)) > 1 {
  306. return true
  307. }
  308. for _, reg := range regs {
  309. if reg.MatchString(util.ObjToString(tmp["projectname"])) {
  310. return true
  311. }
  312. }
  313. return false
  314. }
  315. func combArr(arr []string) []string {
  316. var nArr []string
  317. for i := 0; i < len(arr); i++ {
  318. if i == len(arr)-1 {
  319. if utf8.RuneCountInString(arr[i]) == 1 {
  320. if len(nArr) > 0 {
  321. nArr[len(nArr)-1] += arr[i]
  322. }
  323. } else {
  324. nArr = append(nArr, arr[i])
  325. }
  326. } else {
  327. if utf8.RuneCountInString(arr[i]) == 1 {
  328. if i == 0 {
  329. nArr = append(nArr, arr[i])
  330. } else {
  331. nArr[len(nArr)-1] += arr[i]
  332. //if utf8.RuneCountInString(arr[i+1]) == 1 {
  333. // if utf8.RuneCountInString(nArr[len(nArr)-1]) == 1 {
  334. // nArr[len(nArr)-1] += arr[i]
  335. // } else {
  336. // nArr = append(nArr, arr[i])
  337. // }
  338. //} else {
  339. // nArr[len(nArr)-1] += arr[i]
  340. //}
  341. }
  342. } else {
  343. if len(nArr) > 0 && utf8.RuneCountInString(nArr[len(nArr)-1]) == 1 {
  344. nArr[len(nArr)-1] += arr[i]
  345. } else {
  346. nArr = append(nArr, arr[i])
  347. }
  348. }
  349. }
  350. }
  351. return nArr
  352. }
  353. func SavePpMethod() {
  354. arru := make([]map[string]interface{}, saveSize)
  355. indexu := 0
  356. for {
  357. select {
  358. case v := <-savePpPool:
  359. arru[indexu] = v
  360. indexu++
  361. if indexu == saveSize {
  362. savePpSp <- true
  363. go func(arru []map[string]interface{}) {
  364. defer func() {
  365. <-savePpSp
  366. }()
  367. MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...)
  368. }(arru)
  369. arru = make([]map[string]interface{}, saveSize)
  370. indexu = 0
  371. }
  372. case <-time.After(1000 * time.Millisecond):
  373. if indexu > 0 {
  374. savePpSp <- true
  375. go func(arru []map[string]interface{}) {
  376. defer func() {
  377. <-savePpSp
  378. }()
  379. MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...)
  380. }(arru[:indexu])
  381. arru = make([]map[string]interface{}, saveSize)
  382. indexu = 0
  383. }
  384. }
  385. }
  386. }
  387. var StageCode []TagMatching
  388. func initStage() {
  389. info, _ := MgoBid.Find(config.Conf.Serve.TagRule, bson.M{"label_name": "project_stage"}, `{"_id": 1}`, nil, false, -1, -1)
  390. for _, m := range *info {
  391. tag := TagMatching{}
  392. tag.tagName = util.ObjToString(m["label_name"])
  393. tag.tagCode = util.ObjToString(m["code"])
  394. // 关键词
  395. tag.matchField = []string{"title", "project"}
  396. if v := util.ObjToString(m["keyword"]); v != "" {
  397. tag.matchKey = util.ObjToString(m["keyword"])
  398. tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"]))
  399. }
  400. // 附件词
  401. if f := util.ObjToString(m["match_fjword"]); f != "" {
  402. tag.addField = strings.Split(f, ",")
  403. for _, s := range tag.addField {
  404. SelectF[s] = 1
  405. }
  406. if v := util.ObjToString(m["fjword"]); v != "" {
  407. tag.addKey = util.ObjToString(m["fjword"])
  408. tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"]))
  409. }
  410. }
  411. // 排除词
  412. if f := util.ObjToString(m["match_pcword"]); f != "" {
  413. tag.excludeField = strings.Split(f, ",")
  414. for _, s := range tag.excludeField {
  415. SelectF[s] = 1
  416. }
  417. if v := util.ObjToString(m["pcword"]); v != "" {
  418. tag.excludeKey = util.ObjToString(m["pcword"])
  419. tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"]))
  420. }
  421. }
  422. // 清理词
  423. if v := util.ObjToString(m["qlword"]); v != "" {
  424. tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",")
  425. }
  426. StageCode = append(StageCode, tag)
  427. }
  428. log.Info("initStage", zap.Int("StageCode", len(StageCode)))
  429. }
  430. func taskD() {
  431. sess := MgoPro.GetMgoConn()
  432. defer MgoPro.DestoryMongoConn(sess)
  433. ch := make(chan bool, config.Conf.Serve.Thread)
  434. wg := &sync.WaitGroup{}
  435. //q := bson.M{"_id": mongodb.StringTOBsonId("60a2995b8a2adb30a57172ec")}
  436. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.CombColl).Find(nil).Iter()
  437. count := 0
  438. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  439. if count%2000 == 0 {
  440. log.Info(fmt.Sprintf("current --- %d", count))
  441. }
  442. ch <- true
  443. wg.Add(1)
  444. go func(tmp map[string]interface{}) {
  445. defer func() {
  446. <-ch
  447. wg.Done()
  448. }()
  449. if ids, ok := tmp["ids"].([]interface{}); ok {
  450. id := mongodb.BsonIdToSId(tmp["_id"])
  451. for _, p := range ids {
  452. p1 := p.(map[string]interface{})
  453. info, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProjectColl, util.ObjToString(p1["pid"]), nil)
  454. if list, ok1 := (*info)["list"].([]interface{}); ok1 {
  455. for _, l := range list {
  456. l1 := l.(map[string]interface{})
  457. m := make(map[string]interface{})
  458. m["project_stage_code"] = tagFunc(l1)
  459. m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  460. m["title"] = util.ObjToString(l1["title"])
  461. if t := util.Int64All(l1["publishtime"]); t > 0 {
  462. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  463. }
  464. m["infoid"] = util.ObjToString(l1["infoid"])
  465. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
  466. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  467. //MgoPro.Save("projectset_comb_temp1", m)
  468. MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  469. }
  470. }
  471. if buyer := util.ObjToString((*info)["buyer"]); buyer != "" {
  472. s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer})
  473. if s <= 0 {
  474. saveEnt := make(map[string]interface{})
  475. saveEnt["proposed_id"] = id
  476. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  477. saveEnt["name"] = buyer
  478. if eid := redis.GetStr("ent_id", buyer); eid != "" {
  479. arr := strings.Split(eid, "_")
  480. saveEnt["name_id"] = arr[0]
  481. if len(arr) == 2 {
  482. saveEnt["area_code"] = arr[1]
  483. } else if len(arr) == 3 {
  484. saveEnt["city_code"] = arr[2]
  485. }
  486. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  487. if info != nil && len(*info) > 0 {
  488. saveEnt["address"] = (*info)[0]["address"]
  489. }
  490. }
  491. saveEnt["identity_type"] = 2
  492. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  493. saveEntPool1 <- saveEnt
  494. }
  495. }
  496. if winner := util.ObjToString((*info)["buyer"]); winner != "" {
  497. for _, w := range strings.Split(winner, ",") {
  498. s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w})
  499. if s <= 0 {
  500. saveEnt := make(map[string]interface{})
  501. saveEnt["proposed_id"] = id
  502. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  503. saveEnt["name"] = w
  504. if eid := redis.GetStr("ent_id", w); eid != "" {
  505. arr := strings.Split(eid, "_")
  506. saveEnt["name_id"] = arr[0]
  507. if len(arr) == 2 {
  508. saveEnt["area_code"] = arr[1]
  509. } else if len(arr) == 3 {
  510. saveEnt["city_code"] = arr[2]
  511. }
  512. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  513. if info != nil && len(*info) > 0 {
  514. saveEnt["address"] = (*info)[0]["address"]
  515. }
  516. }
  517. saveEnt["identity_type"] = 3
  518. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  519. saveEntPool1 <- saveEnt
  520. }
  521. }
  522. }
  523. }
  524. size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
  525. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
  526. MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
  527. }
  528. }(tmp)
  529. tmp = make(map[string]interface{})
  530. }
  531. wg.Wait()
  532. log.Info(fmt.Sprintf("over --- %d", count))
  533. }
  534. // @Description 施工准备(06)、施工(07)、设计(05)
  535. // @Author J 2023/4/21 14:45
  536. func tagFunc(info map[string]interface{}) string {
  537. tag := taskFuc1(info)
  538. if tag["project_stage"] != "" {
  539. return util.ObjToString(tag["project_stage"])
  540. }
  541. if util.ObjToString(info["toptype"]) == "招标" || util.ObjToString(info["toptype"]) == "预告" {
  542. return "06"
  543. }
  544. return "00"
  545. }
  546. // @Description 在建项目增量
  547. // @Author J 2023/4/24 13:58
  548. func taskAA() {
  549. sess := MgoPro.GetMgoConn()
  550. defer MgoPro.DestoryMongoConn(sess)
  551. ch := make(chan bool, config.Conf.Serve.Thread)
  552. wg := &sync.WaitGroup{}
  553. q := bson.M{"pici": bson.M{"$gte": config.Conf.Serve.Pici}}
  554. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProjectColl).Find(q).Iter()
  555. count := 0
  556. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  557. if count%2000 == 0 {
  558. log.Info(fmt.Sprintf("project current --- %d", count))
  559. }
  560. if pc := util.Int64All(tmp["pici"]); pc > config.Conf.Serve.Pici {
  561. config.Conf.Serve.Pici = pc
  562. }
  563. ch <- true
  564. wg.Add(1)
  565. go func(tmp map[string]interface{}) {
  566. defer func() {
  567. <-ch
  568. wg.Done()
  569. }()
  570. bUpdate := false
  571. id := mongodb.BsonIdToSId(tmp["_id"])
  572. if str := redis.GetStr(config.Conf.DB.Redis.Project, id); str != "" {
  573. strs := strings.Split(str, "-")
  574. size := len(tmp["list"].([]interface{}))
  575. if size != util.IntAll(strs[1]) {
  576. list := tmp["list"].([]interface{})
  577. for k := range list {
  578. info1 := list[size-k-1].(map[string]interface{}) //倒序
  579. infoid := util.ObjToString(info1["infoid"])
  580. binfo := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"infoid": infoid}, "", "")
  581. if binfo != nil && len(*binfo) > 0 {
  582. break
  583. } else {
  584. m := make(map[string]interface{})
  585. m["project_stage_code"] = tagFunc(info1)
  586. m["proposed_id"] = strs[0]
  587. m["title"] = util.ObjToString(info1["title"])
  588. if t := util.Int64All(info1["publishtime"]); t > 0 {
  589. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  590. }
  591. m["infoid"] = util.ObjToString(info1["infoid"])
  592. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(info1["infoid"])))
  593. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  594. MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  595. bUpdate = true
  596. }
  597. }
  598. redis.PutCKV(config.Conf.DB.Redis.Project, id, fmt.Sprintf("%s-%d", strs[0], size))
  599. }
  600. } else {
  601. // 新项目
  602. // 新项目是否都有必要进行关联(拟建项目)
  603. bUpdate = Method3(tmp)
  604. }
  605. // 更新拟在建基本信息表
  606. if bUpdate {
  607. size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
  608. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
  609. if info != nil && len(*info) > 0 {
  610. MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
  611. }
  612. }
  613. }(tmp)
  614. tmp = make(map[string]interface{})
  615. }
  616. wg.Wait()
  617. log.Info(fmt.Sprintf("project over --- %d, pici ---%d", count, config.Conf.Serve.Pici))
  618. }
  619. // @Description 拟建项目增量
  620. // @Author J 2023/4/24 13:59
  621. func taskBB() {
  622. sess := MgoPro.GetMgoConn()
  623. defer MgoPro.DestoryMongoConn(sess)
  624. ch := make(chan bool, config.Conf.Serve.Thread)
  625. wg := &sync.WaitGroup{}
  626. q := bson.M{"pici": bson.M{"$gte": config.Conf.Serve.Pici}}
  627. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(q).Iter()
  628. count := 0
  629. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  630. if count%2000 == 0 {
  631. log.Info(fmt.Sprintf("proposed current --- %d", count))
  632. }
  633. if pc := util.Int64All(tmp["pici"]); pc > config.Conf.Serve.Pici {
  634. config.Conf.Serve.Pici = pc
  635. }
  636. ch <- true
  637. wg.Add(1)
  638. go func(tmp map[string]interface{}) {
  639. defer func() {
  640. <-ch
  641. wg.Done()
  642. }()
  643. bUpdate := false
  644. id := mongodb.BsonIdToSId(tmp["_id"])
  645. if num := redis.GetInt(config.Conf.DB.Redis.Proposed, id); num > 0 {
  646. size := len(tmp["list"].([]interface{}))
  647. list := tmp["list"].([]interface{})
  648. if num != size {
  649. for k := range list {
  650. info1 := list[size-k-1].(map[string]interface{})
  651. infoid := util.ObjToString(info1["infoid"])
  652. binfo := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"infoid": infoid}, "", "")
  653. if binfo != nil && len(*binfo) > 0 {
  654. break
  655. } else {
  656. m := make(map[string]interface{})
  657. m["project_stage_code"] = tagFunc(info1)
  658. m["proposed_id"] = id
  659. m["title"] = util.ObjToString(info1["title"])
  660. if t := util.Int64All(info1["publishtime"]); t > 0 {
  661. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  662. }
  663. m["infoid"] = util.ObjToString(info1["infoid"])
  664. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(info1["infoid"])))
  665. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  666. MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  667. bUpdate = true
  668. }
  669. }
  670. redis.PutCKV(config.Conf.DB.Redis.Proposed, id, size)
  671. }
  672. } else {
  673. // 新项目
  674. // 1、关联
  675. wds, q := Method1(util.ObjToString(tmp["projectname"]))
  676. if q != "" {
  677. binfo := Es.Get("projectset", q)
  678. if binfo != nil && len(*binfo) > 0 {
  679. for _, m := range *binfo {
  680. if b, _ := redis.Exists(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"])); b {
  681. continue
  682. }
  683. if util.ObjToString(m["bidstatus"]) == "拟建" {
  684. continue
  685. }
  686. if judgeArea(tmp, m, wds) {
  687. continue
  688. }
  689. size := len(m["ids"].([]interface{}))
  690. redis.PutCKV(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"]), fmt.Sprintf("%s-%d", mongodb.BsonIdToSId(tmp["_id"]), size))
  691. redis.PutCKV(config.Conf.DB.Redis.Proposed, mongodb.BsonIdToSId(tmp["_id"]), len(tmp["ids"].([]interface{})))
  692. bUpdate = true
  693. // 2、保存信息到tidb
  694. if list, ok1 := m["list"].([]interface{}); ok1 {
  695. for _, l := range list {
  696. l1 := l.(map[string]interface{})
  697. m := make(map[string]interface{})
  698. m["project_stage_code"] = tagFunc(l1)
  699. m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  700. m["title"] = util.ObjToString(l1["title"])
  701. if t := util.Int64All(l1["publishtime"]); t > 0 {
  702. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  703. }
  704. m["infoid"] = util.ObjToString(l1["infoid"])
  705. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
  706. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  707. MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  708. }
  709. }
  710. if buyer := util.ObjToString(m["buyer"]); buyer != "" {
  711. s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer})
  712. if s <= 0 {
  713. saveEnt := make(map[string]interface{})
  714. saveEnt["proposed_id"] = id
  715. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  716. saveEnt["name"] = buyer
  717. if eid := redis.GetStr("ent_id", buyer); eid != "" {
  718. arr := strings.Split(eid, "_")
  719. saveEnt["name_id"] = arr[0]
  720. if len(arr) == 2 {
  721. saveEnt["area_code"] = arr[1]
  722. } else if len(arr) == 3 {
  723. saveEnt["city_code"] = arr[2]
  724. }
  725. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  726. if info != nil && len(*info) > 0 {
  727. saveEnt["address"] = (*info)[0]["address"]
  728. }
  729. }
  730. saveEnt["identity_type"] = 2
  731. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  732. saveEntPool1 <- saveEnt
  733. }
  734. }
  735. if winner := util.ObjToString(m["buyer"]); winner != "" {
  736. for _, w := range strings.Split(winner, ",") {
  737. s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w})
  738. if s <= 0 {
  739. saveEnt := make(map[string]interface{})
  740. saveEnt["proposed_id"] = id
  741. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  742. saveEnt["name"] = w
  743. if eid := redis.GetStr("ent_id", w); eid != "" {
  744. arr := strings.Split(eid, "_")
  745. saveEnt["name_id"] = arr[0]
  746. if len(arr) == 2 {
  747. saveEnt["area_code"] = arr[1]
  748. } else if len(arr) == 3 {
  749. saveEnt["city_code"] = arr[2]
  750. }
  751. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  752. if info != nil && len(*info) > 0 {
  753. saveEnt["address"] = (*info)[0]["address"]
  754. }
  755. }
  756. saveEnt["identity_type"] = 3
  757. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  758. saveEntPool1 <- saveEnt
  759. }
  760. }
  761. }
  762. }
  763. }
  764. }
  765. }
  766. // 更新拟在建基本信息表
  767. if bUpdate {
  768. size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
  769. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
  770. MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
  771. }
  772. }(tmp)
  773. tmp = make(map[string]interface{})
  774. }
  775. wg.Wait()
  776. log.Info(fmt.Sprintf("proposed over --- %d, pici ---%d", count, config.Conf.Serve.Pici))
  777. }
  778. func Method3(tmp map[string]interface{}) bool {
  779. pname := util.ObjToString(tmp["projectname"])
  780. pname = strings.ReplaceAll(pname, "\"", "'")
  781. binfo := Es.Get("proposed", fmt.Sprintf(esQ, pname))
  782. if binfo != nil && len(*binfo) > 0 {
  783. if list, ok1 := (*binfo)[0]["list"].([]interface{}); ok1 {
  784. for _, l := range list {
  785. l1 := l.(map[string]interface{})
  786. m := make(map[string]interface{})
  787. m["project_stage_code"] = tagFunc(l1)
  788. m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  789. m["title"] = util.ObjToString(l1["title"])
  790. if t := util.Int64All(l1["publishtime"]); t > 0 {
  791. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  792. }
  793. m["infoid"] = util.ObjToString(l1["infoid"])
  794. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
  795. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  796. MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  797. }
  798. redis.PutCKV(config.Conf.DB.Redis.Project, mongodb.BsonIdToSId(tmp["_id"]), fmt.Sprintf("%s-%d", util.ObjToString((*binfo)[0]["_id"]), len(list)))
  799. return true
  800. }
  801. }
  802. return false
  803. }
  804. var saveEntPool1 = make(chan map[string]interface{}, 5000)
  805. var saveEntSp1 = make(chan bool, 1)
  806. func SaveEntFunc1(table string, arr []string) {
  807. arru := make([]map[string]interface{}, saveSize)
  808. indexu := 0
  809. for {
  810. select {
  811. case v := <-saveEntPool1:
  812. arru[indexu] = v
  813. indexu++
  814. if indexu == saveSize {
  815. saveEntSp1 <- true
  816. go func(arru []map[string]interface{}) {
  817. defer func() {
  818. <-saveEntSp1
  819. }()
  820. MysqlTool.InsertBulk(table, arr, arru...)
  821. }(arru)
  822. arru = make([]map[string]interface{}, saveSize)
  823. indexu = 0
  824. }
  825. case <-time.After(1000 * time.Millisecond):
  826. if indexu > 0 {
  827. saveEntSp1 <- true
  828. go func(arru []map[string]interface{}) {
  829. defer func() {
  830. <-saveEntSp1
  831. }()
  832. MysqlTool.InsertBulk(table, arr, arru...)
  833. }(arru[:indexu])
  834. arru = make([]map[string]interface{}, saveSize)
  835. indexu = 0
  836. }
  837. }
  838. }
  839. }