projectTask.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "encoding/json"
  8. "fmt"
  9. "github.com/go-ego/gse"
  10. "go.mongodb.org/mongo-driver/bson"
  11. "go.uber.org/zap"
  12. "io/ioutil"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "proposed_project/config"
  17. "regexp"
  18. "strings"
  19. "sync"
  20. "time"
  21. "unicode/utf8"
  22. )
  23. var (
  24. seg gse.Segmenter
  25. stopWords []string
  26. regNum = regexp.MustCompile(`[\dA-Za-z]{6,30}`)
  27. regSymb = regexp.MustCompile("[•、,,.。??'\"“”‘’·~!@#¥$%…&*()()\\-—+=【】\\[\\]{}{}<>《》|\\/\\s]+")
  28. regDel = regexp.MustCompile("项目|工程|生产|中心")
  29. reg1 = regexp.MustCompile("分布式光伏发电|自然人|出让|国有建设用地使用权")
  30. field = []string{"projectname.pname"}
  31. sField = []string{"projectname", "bidstatus", "firsttime", "_id", "area", "city", "district", "ids"}
  32. esQ = `{"query": {"bool": {"must": [{"multi_match": {"query": "%s","type": "phrase","fields": ["projectname"]}}]}},"size": 100}`
  33. savePpPool = make(chan map[string]interface{}, 5000)
  34. savePpSp = make(chan bool, 3)
  35. )
  36. func initSeg() {
  37. _ = seg.LoadDict("./t_1.txt")
  38. //_ = seg.LoadDict()
  39. seg.AddToken("渼陂", 3, "")
  40. seg.LoadStop("./stopwords.txt")
  41. //f, _ := os.Open("./stopwords.txt")
  42. //defer f.Close()
  43. //scanner := bufio.NewScanner(f)
  44. //for scanner.Scan() {
  45. // stopWords = append(stopWords, scanner.Text())
  46. //}
  47. //sort.Strings(stopWords)
  48. }
  49. // @Description 关联
  50. // @Author J 2023/4/25 10:26
  51. func taskComb() {
  52. sess := MgoPro.GetMgoConn()
  53. defer MgoPro.DestoryMongoConn(sess)
  54. ch := make(chan bool, config.Conf.Serve.Thread)
  55. wg := &sync.WaitGroup{}
  56. f := map[string]interface{}{
  57. "projectname": 1,
  58. "approvecode": 1,
  59. "approvenumber": 1,
  60. "approvestatus": 1,
  61. "area": 1,
  62. "city": 1,
  63. "district": 1,
  64. "ids": 1,
  65. }
  66. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(nil).Select(f).Iter()
  67. count := 0
  68. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  69. if count%2000 == 0 {
  70. log.Info(fmt.Sprintf("current --- %d", count))
  71. }
  72. ch <- true
  73. wg.Add(1)
  74. go func(tmp map[string]interface{}) {
  75. defer func() {
  76. <-ch
  77. wg.Done()
  78. }()
  79. if filterMethod(tmp) {
  80. return
  81. }
  82. var mArr []map[string]interface{}
  83. var eArr []map[string]interface{}
  84. n1, n2 := 0, 0
  85. // approvecode、approvenumber
  86. //q := Method2(util.ObjToString(tmp["approvecode"]), util.ObjToString(tmp["approvenumber"]))
  87. //if q != "" {
  88. // binfo := Es.Get("projectset", q)
  89. // if binfo != nil && len(*binfo) > 0 {
  90. // for _, m := range *binfo {
  91. // n1 = len(*binfo)
  92. // mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 1})
  93. // }
  94. // }
  95. //}
  96. wds, q := Method1(util.ObjToString(tmp["projectname"]))
  97. if q != "" {
  98. binfo := Es.Get("projectset", q)
  99. if binfo != nil && len(*binfo) > 0 {
  100. n2 = len(*binfo)
  101. for _, m := range *binfo {
  102. if b, _ := redis.Exists(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"])); b {
  103. continue
  104. }
  105. if util.ObjToString(m["bidstatus"]) == "拟建" {
  106. eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]})
  107. continue
  108. }
  109. if judgeArea(tmp, m, wds) {
  110. eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]})
  111. continue
  112. }
  113. size := len(m["ids"].([]interface{}))
  114. redis.PutCKV(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"]), fmt.Sprintf("%s-%d", mongodb.BsonIdToSId(tmp["_id"]), size))
  115. redis.PutCKV(config.Conf.DB.Redis.Proposed, mongodb.BsonIdToSId(tmp["_id"]), len(tmp["ids"].([]interface{})))
  116. mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 2})
  117. }
  118. }
  119. }
  120. //if mArr != nil && len(mArr) > 0 {
  121. save := make(map[string]interface{})
  122. save["_id"] = tmp["_id"]
  123. save["ids"] = mArr
  124. save["wds"] = wds
  125. save["err"] = eArr
  126. save["projectname"] = tmp["projectname"]
  127. save["esearch"] = q
  128. save["size_1"] = n1
  129. save["size_2"] = n2
  130. save["createtime"] = time.Now().Unix()
  131. savePpPool <- save
  132. //}
  133. }(tmp)
  134. tmp = make(map[string]interface{})
  135. }
  136. wg.Wait()
  137. log.Info(fmt.Sprintf("over --- %d", count))
  138. }
  139. func Method(pname string) string {
  140. pname = regNum.ReplaceAllString(pname, "")
  141. pname = regSymb.ReplaceAllString(pname, "")
  142. //wds := seg.CutStop(pname, true)
  143. util.Debug(pname)
  144. surl := config.Conf.DB.Es.Addr + "/projectset_v2/_analyze"
  145. URL, _ := url.Parse(surl)
  146. Q := URL.Query()
  147. Q.Add("pretty", "1")
  148. Q.Add("analyzer", "my_ngram_title")
  149. Q.Add("text", pname)
  150. URL.RawQuery = Q.Encode()
  151. resp, err := http.Get(URL.String())
  152. if err != nil {
  153. log.Info("")
  154. }
  155. result, err := ioutil.ReadAll(resp.Body)
  156. if err != nil {
  157. log.Info("")
  158. }
  159. var resMap map[string]interface{}
  160. json.Unmarshal(result, &resMap)
  161. if resMap == nil || len(resMap["tokens"].([]interface{})) == 0 {
  162. log.Info("")
  163. }
  164. tokens := util.ObjArrToMapArr(resMap["tokens"].([]interface{}))
  165. var should []interface{}
  166. for _, t := range tokens {
  167. wd := util.ObjToString(t["token"])
  168. if in(wd) {
  169. // 删除词 跳过
  170. continue
  171. }
  172. mm := &ShouldObject{MultiMatch: &MultiMatch{
  173. Query: wd,
  174. Type: "phrase",
  175. Fields: field,
  176. }}
  177. should = append(should, mm)
  178. }
  179. query := &QueryObject{Query: &BoolObject{Bool: &MMSMObject{Must: should}}}
  180. jsr, err := json.Marshal(query)
  181. if err != nil {
  182. fmt.Printf("Err %v", err)
  183. os.Exit(1)
  184. }
  185. return string(jsr)
  186. }
  187. //@Description jieba分词查询
  188. // @Desc projectname
  189. // @Desc minimum_should_match 当should分支总数小于等于指定的数量时,则必须匹配所有should分支,当should分支总数大于指定的数量时,则应用指定的说明符
  190. // @Author J 2023/3/7 17:13
  191. func Method1(pname string) ([]string, string) {
  192. if pname == "" {
  193. return nil, ""
  194. }
  195. var wds []string
  196. p1 := regSymb.ReplaceAllString(pname, "")
  197. p1 = regNum.ReplaceAllString(p1, "")
  198. if utf8.RuneCountInString(p1) < 12 {
  199. wds = append(wds, pname)
  200. } else {
  201. wds = seg.CutStop(pname, true)
  202. }
  203. if len(wds) > 0 {
  204. wds = combArr(wds)
  205. var should []interface{}
  206. for _, t := range wds {
  207. //if utf8.RuneCountInString(t) == 1 {
  208. // continue
  209. //}
  210. mm := &ShouldObject{MultiMatch: &MultiMatch{
  211. Query: t,
  212. Type: "phrase",
  213. Fields: field,
  214. }}
  215. should = append(should, mm)
  216. }
  217. if should == nil || len(should) <= 0 {
  218. return nil, ""
  219. }
  220. query := &QueryObject{Query: &BoolObject{Bool: &MMSMObject{Should: should, MinSdMatch: config.Conf.DB.Es.MinSdMh}}, Source: sField, Size: 1000}
  221. jsr, err := json.Marshal(query)
  222. if err != nil {
  223. fmt.Printf("Err %v", err)
  224. os.Exit(1)
  225. }
  226. return wds, strings.Replace(string(jsr), "\\u003c", "<", -1)
  227. } else {
  228. return nil, ""
  229. }
  230. }
  231. // @Description
  232. // @Author J 2023/3/20 14:39
  233. func Method2(acode, anumb string) string {
  234. if acode == "" && anumb == "" {
  235. return ""
  236. }
  237. var should []interface{}
  238. if acode != "" {
  239. should = append(should, &ShouldObject{MultiMatch: &MultiMatch{
  240. Query: acode,
  241. Type: "phrase",
  242. Fields: []string{"detail"},
  243. }})
  244. }
  245. if anumb != "" {
  246. should = append(should, &ShouldObject{MultiMatch: &MultiMatch{
  247. Query: anumb,
  248. Type: "phrase",
  249. Fields: []string{"detail"},
  250. }})
  251. }
  252. query := &QueryObject1{Query: &BoolObject1{Bool: &MMSObject{Should: should}}, Source: sField, Size: 500}
  253. jsr, err := json.Marshal(query)
  254. if err != nil {
  255. fmt.Printf("Err %v", err)
  256. os.Exit(1)
  257. }
  258. return string(jsr)
  259. }
  260. func judgeArea(tmp, btmp map[string]interface{}, wds []string) bool {
  261. if util.ObjToString(tmp["area"]) == "全国" || util.ObjToString(btmp["area"]) == "全国" {
  262. pname := util.ObjToString(btmp["projectname"])
  263. for _, wd := range wds {
  264. if !strings.Contains(strings.ToLower(pname), strings.ToLower(wd)) {
  265. return true
  266. }
  267. }
  268. return false
  269. }
  270. if tmp["district"] != nil && btmp["district"] != nil {
  271. if util.ObjToString(tmp["district"]) == util.ObjToString(btmp["district"]) {
  272. return false
  273. } else {
  274. return true
  275. }
  276. } else if tmp["city"] != nil && btmp["city"] != nil {
  277. if util.ObjToString(tmp["city"]) == util.ObjToString(btmp["city"]) {
  278. return false
  279. } else {
  280. return true
  281. }
  282. } else if tmp["area"] != nil && btmp["area"] != nil {
  283. if util.ObjToString(tmp["area"]) == util.ObjToString(btmp["area"]) {
  284. return false
  285. } else {
  286. return true
  287. }
  288. }
  289. return false
  290. }
  291. var regs = []*regexp.Regexp{
  292. regexp.MustCompile("[\\\\u4e00-\\\\u9fa5]{0,10}私宅"),
  293. regexp.MustCompile("自然人|分布式光伏发电"),
  294. }
  295. // @Description 过滤数据
  296. // @Author J 2023/3/24 09:30
  297. func filterMethod(tmp map[string]interface{}) bool {
  298. pname := regSymb.ReplaceAllString(util.ObjToString(tmp["projectname"]), "")
  299. pname = regNum.ReplaceAllString(pname, "")
  300. p1 := regDel.ReplaceAllString(pname, "")
  301. p1 = regSymb.ReplaceAllString(p1, "")
  302. if utf8.RuneCountInString(p1) <= 5 {
  303. return true
  304. }
  305. if len(reg1.FindAllString(pname, -1)) > 1 {
  306. return true
  307. }
  308. for _, reg := range regs {
  309. if reg.MatchString(util.ObjToString(tmp["projectname"])) {
  310. return true
  311. }
  312. }
  313. return false
  314. }
  315. func combArr(arr []string) []string {
  316. var nArr []string
  317. for i := 0; i < len(arr); i++ {
  318. if i == len(arr)-1 {
  319. if utf8.RuneCountInString(arr[i]) == 1 {
  320. nArr[len(nArr)-1] += arr[i]
  321. } else {
  322. nArr = append(nArr, arr[i])
  323. }
  324. } else {
  325. if utf8.RuneCountInString(arr[i]) == 1 {
  326. if i == 0 {
  327. nArr = append(nArr, arr[i])
  328. } else {
  329. nArr[len(nArr)-1] += arr[i]
  330. //if utf8.RuneCountInString(arr[i+1]) == 1 {
  331. // if utf8.RuneCountInString(nArr[len(nArr)-1]) == 1 {
  332. // nArr[len(nArr)-1] += arr[i]
  333. // } else {
  334. // nArr = append(nArr, arr[i])
  335. // }
  336. //} else {
  337. // nArr[len(nArr)-1] += arr[i]
  338. //}
  339. }
  340. } else {
  341. if len(nArr) > 0 && utf8.RuneCountInString(nArr[len(nArr)-1]) == 1 {
  342. nArr[len(nArr)-1] += arr[i]
  343. } else {
  344. nArr = append(nArr, arr[i])
  345. }
  346. }
  347. }
  348. }
  349. return nArr
  350. }
  351. func SavePpMethod() {
  352. arru := make([]map[string]interface{}, saveSize)
  353. indexu := 0
  354. for {
  355. select {
  356. case v := <-savePpPool:
  357. arru[indexu] = v
  358. indexu++
  359. if indexu == saveSize {
  360. savePpSp <- true
  361. go func(arru []map[string]interface{}) {
  362. defer func() {
  363. <-savePpSp
  364. }()
  365. MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...)
  366. }(arru)
  367. arru = make([]map[string]interface{}, saveSize)
  368. indexu = 0
  369. }
  370. case <-time.After(1000 * time.Millisecond):
  371. if indexu > 0 {
  372. savePpSp <- true
  373. go func(arru []map[string]interface{}) {
  374. defer func() {
  375. <-savePpSp
  376. }()
  377. MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...)
  378. }(arru[:indexu])
  379. arru = make([]map[string]interface{}, saveSize)
  380. indexu = 0
  381. }
  382. }
  383. }
  384. }
  385. var StageCode []TagMatching
  386. func initStage() {
  387. info, _ := MgoBid.Find(config.Conf.Serve.TagRule, bson.M{"label_name": "project_stage"}, `{"_id": 1}`, nil, false, -1, -1)
  388. for _, m := range *info {
  389. tag := TagMatching{}
  390. tag.tagName = util.ObjToString(m["label_name"])
  391. tag.tagCode = util.ObjToString(m["code"])
  392. // 关键词
  393. tag.matchField = []string{"title", "project"}
  394. if v := util.ObjToString(m["keyword"]); v != "" {
  395. tag.matchKey = util.ObjToString(m["keyword"])
  396. tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"]))
  397. }
  398. // 附件词
  399. if f := util.ObjToString(m["match_fjword"]); f != "" {
  400. tag.addField = strings.Split(f, ",")
  401. for _, s := range tag.addField {
  402. SelectF[s] = 1
  403. }
  404. if v := util.ObjToString(m["fjword"]); v != "" {
  405. tag.addKey = util.ObjToString(m["fjword"])
  406. tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"]))
  407. }
  408. }
  409. // 排除词
  410. if f := util.ObjToString(m["match_pcword"]); f != "" {
  411. tag.excludeField = strings.Split(f, ",")
  412. for _, s := range tag.excludeField {
  413. SelectF[s] = 1
  414. }
  415. if v := util.ObjToString(m["pcword"]); v != "" {
  416. tag.excludeKey = util.ObjToString(m["pcword"])
  417. tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"]))
  418. }
  419. }
  420. // 清理词
  421. if v := util.ObjToString(m["qlword"]); v != "" {
  422. tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",")
  423. }
  424. StageCode = append(StageCode, tag)
  425. }
  426. log.Info("initStage", zap.Int("StageCode", len(StageCode)))
  427. }
  428. func taskD() {
  429. sess := MgoPro.GetMgoConn()
  430. defer MgoPro.DestoryMongoConn(sess)
  431. ch := make(chan bool, config.Conf.Serve.Thread)
  432. wg := &sync.WaitGroup{}
  433. //q := bson.M{"_id": mongodb.StringTOBsonId("60a2995b8a2adb30a57172ec")}
  434. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.CombColl).Find(nil).Iter()
  435. count := 0
  436. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  437. if count%2000 == 0 {
  438. log.Info(fmt.Sprintf("current --- %d", count))
  439. }
  440. ch <- true
  441. wg.Add(1)
  442. go func(tmp map[string]interface{}) {
  443. defer func() {
  444. <-ch
  445. wg.Done()
  446. }()
  447. if ids, ok := tmp["ids"].([]interface{}); ok {
  448. id := mongodb.BsonIdToSId(tmp["_id"])
  449. for _, p := range ids {
  450. p1 := p.(map[string]interface{})
  451. info, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProjectColl, util.ObjToString(p1["pid"]), nil)
  452. if list, ok1 := (*info)["list"].([]interface{}); ok1 {
  453. for _, l := range list {
  454. l1 := l.(map[string]interface{})
  455. m := make(map[string]interface{})
  456. m["project_stage_code"] = tagFunc(l1)
  457. m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  458. m["title"] = util.ObjToString(l1["title"])
  459. if t := util.Int64All(l1["publishtime"]); t > 0 {
  460. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  461. }
  462. m["infoid"] = util.ObjToString(l1["infoid"])
  463. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
  464. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  465. //MgoPro.Save("projectset_comb_temp1", m)
  466. MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  467. }
  468. }
  469. if buyer := util.ObjToString((*info)["buyer"]); buyer != "" {
  470. s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer})
  471. if s <= 0 {
  472. saveEnt := make(map[string]interface{})
  473. saveEnt["proposed_id"] = id
  474. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  475. saveEnt["name"] = buyer
  476. if eid := redis.GetStr("ent_id", buyer); eid != "" {
  477. arr := strings.Split(eid, "_")
  478. saveEnt["name_id"] = arr[0]
  479. if len(arr) == 2 {
  480. saveEnt["area_code"] = arr[1]
  481. } else if len(arr) == 3 {
  482. saveEnt["city_code"] = arr[2]
  483. }
  484. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  485. if info != nil && len(*info) > 0 {
  486. saveEnt["address"] = (*info)[0]["address"]
  487. }
  488. }
  489. saveEnt["identity_type"] = 2
  490. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  491. saveEntPool1 <- saveEnt
  492. }
  493. }
  494. if winner := util.ObjToString((*info)["buyer"]); winner != "" {
  495. for _, w := range strings.Split(winner, ",") {
  496. s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w})
  497. if s <= 0 {
  498. saveEnt := make(map[string]interface{})
  499. saveEnt["proposed_id"] = id
  500. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  501. saveEnt["name"] = w
  502. if eid := redis.GetStr("ent_id", w); eid != "" {
  503. arr := strings.Split(eid, "_")
  504. saveEnt["name_id"] = arr[0]
  505. if len(arr) == 2 {
  506. saveEnt["area_code"] = arr[1]
  507. } else if len(arr) == 3 {
  508. saveEnt["city_code"] = arr[2]
  509. }
  510. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  511. if info != nil && len(*info) > 0 {
  512. saveEnt["address"] = (*info)[0]["address"]
  513. }
  514. }
  515. saveEnt["identity_type"] = 3
  516. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  517. saveEntPool1 <- saveEnt
  518. }
  519. }
  520. }
  521. }
  522. size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
  523. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
  524. MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
  525. }
  526. }(tmp)
  527. tmp = make(map[string]interface{})
  528. }
  529. wg.Wait()
  530. log.Info(fmt.Sprintf("over --- %d", count))
  531. }
  532. // @Description 施工准备(06)、施工(07)、设计(05)
  533. // @Author J 2023/4/21 14:45
  534. func tagFunc(info map[string]interface{}) string {
  535. tag := taskFuc1(info)
  536. if tag["project_stage"] != "" {
  537. return util.ObjToString(tag["project_stage"])
  538. }
  539. if util.ObjToString(info["toptype"]) == "招标" || util.ObjToString(info["toptype"]) == "预告" {
  540. return "06"
  541. }
  542. return "00"
  543. }
  544. // @Description 在建项目增量
  545. // @Author J 2023/4/24 13:58
  546. func taskAA() {
  547. sess := MgoPro.GetMgoConn()
  548. defer MgoPro.DestoryMongoConn(sess)
  549. ch := make(chan bool, config.Conf.Serve.Thread)
  550. wg := &sync.WaitGroup{}
  551. q := bson.M{"pici": bson.M{"$gte": config.Conf.Serve.Pici}}
  552. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProjectColl).Find(q).Iter()
  553. count := 0
  554. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  555. if count%2000 == 0 {
  556. log.Info(fmt.Sprintf("project current --- %d", count))
  557. }
  558. if pc := util.Int64All(tmp["pici"]); pc > config.Conf.Serve.Pici {
  559. config.Conf.Serve.Pici = pc
  560. }
  561. ch <- true
  562. wg.Add(1)
  563. go func(tmp map[string]interface{}) {
  564. defer func() {
  565. <-ch
  566. wg.Done()
  567. }()
  568. bUpdate := false
  569. id := mongodb.BsonIdToSId(tmp["_id"])
  570. if str := redis.GetStr(config.Conf.DB.Redis.Project, id); str != "" {
  571. strs := strings.Split(str, "-")
  572. size := len(tmp["list"].([]interface{}))
  573. if size != util.IntAll(strs[1]) {
  574. list := tmp["list"].([]interface{})
  575. for k := range list {
  576. info1 := list[size-k-1].(map[string]interface{}) //倒序
  577. infoid := util.ObjToString(info1["infoid"])
  578. binfo := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"infoid": infoid}, "", "")
  579. if binfo != nil && len(*binfo) > 0 {
  580. break
  581. } else {
  582. m := make(map[string]interface{})
  583. m["project_stage_code"] = tagFunc(info1)
  584. m["proposed_id"] = strs[0]
  585. m["title"] = util.ObjToString(info1["title"])
  586. if t := util.Int64All(info1["publishtime"]); t > 0 {
  587. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  588. }
  589. m["infoid"] = util.ObjToString(info1["infoid"])
  590. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(info1["infoid"])))
  591. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  592. MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  593. bUpdate = true
  594. }
  595. }
  596. redis.PutCKV(config.Conf.DB.Redis.Project, id, fmt.Sprintf("%s-%d", strs[0], size))
  597. }
  598. } else {
  599. // 新项目
  600. // 新项目是否都有必要进行关联(拟建项目)
  601. bUpdate = Method3(tmp)
  602. }
  603. // 更新拟在建基本信息表
  604. if bUpdate {
  605. size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
  606. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
  607. if info != nil && len(*info) > 0 {
  608. MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
  609. }
  610. }
  611. }(tmp)
  612. tmp = make(map[string]interface{})
  613. }
  614. wg.Wait()
  615. log.Info(fmt.Sprintf("project over --- %d, pici ---%d", count, config.Conf.Serve.Pici))
  616. }
  617. // @Description 拟建项目增量
  618. // @Author J 2023/4/24 13:59
  619. func taskBB() {
  620. sess := MgoPro.GetMgoConn()
  621. defer MgoPro.DestoryMongoConn(sess)
  622. ch := make(chan bool, config.Conf.Serve.Thread)
  623. wg := &sync.WaitGroup{}
  624. q := bson.M{"pici": bson.M{"$gte": config.Conf.Serve.Pici}}
  625. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(q).Iter()
  626. count := 0
  627. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  628. if count%2000 == 0 {
  629. log.Info(fmt.Sprintf("proposed current --- %d", count))
  630. }
  631. if pc := util.Int64All(tmp["pici"]); pc > config.Conf.Serve.Pici {
  632. config.Conf.Serve.Pici = pc
  633. }
  634. ch <- true
  635. wg.Add(1)
  636. go func(tmp map[string]interface{}) {
  637. defer func() {
  638. <-ch
  639. wg.Done()
  640. }()
  641. bUpdate := false
  642. id := mongodb.BsonIdToSId(tmp["_id"])
  643. if num := redis.GetInt(config.Conf.DB.Redis.Proposed, id); num > 0 {
  644. size := len(tmp["list"].([]interface{}))
  645. list := tmp["list"].([]interface{})
  646. if num != size {
  647. for k := range list {
  648. info1 := list[size-k-1].(map[string]interface{})
  649. infoid := util.ObjToString(info1["infoid"])
  650. binfo := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"infoid": infoid}, "", "")
  651. if binfo != nil && len(*binfo) > 0 {
  652. break
  653. } else {
  654. m := make(map[string]interface{})
  655. m["project_stage_code"] = tagFunc(info1)
  656. m["proposed_id"] = id
  657. m["title"] = util.ObjToString(info1["title"])
  658. if t := util.Int64All(info1["publishtime"]); t > 0 {
  659. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  660. }
  661. m["infoid"] = util.ObjToString(info1["infoid"])
  662. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(info1["infoid"])))
  663. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  664. MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  665. bUpdate = true
  666. }
  667. }
  668. redis.PutCKV(config.Conf.DB.Redis.Proposed, id, size)
  669. }
  670. } else {
  671. // 新项目
  672. // 1、关联
  673. wds, q := Method1(util.ObjToString(tmp["projectname"]))
  674. if q != "" {
  675. binfo := Es.Get("projectset", q)
  676. if binfo != nil && len(*binfo) > 0 {
  677. for _, m := range *binfo {
  678. if b, _ := redis.Exists(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"])); b {
  679. continue
  680. }
  681. if util.ObjToString(m["bidstatus"]) == "拟建" {
  682. continue
  683. }
  684. if judgeArea(tmp, m, wds) {
  685. continue
  686. }
  687. size := len(m["ids"].([]interface{}))
  688. redis.PutCKV(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"]), fmt.Sprintf("%s-%d", mongodb.BsonIdToSId(tmp["_id"]), size))
  689. redis.PutCKV(config.Conf.DB.Redis.Proposed, mongodb.BsonIdToSId(tmp["_id"]), len(tmp["ids"].([]interface{})))
  690. bUpdate = true
  691. // 2、保存信息到tidb
  692. if list, ok1 := m["list"].([]interface{}); ok1 {
  693. for _, l := range list {
  694. l1 := l.(map[string]interface{})
  695. m := make(map[string]interface{})
  696. m["project_stage_code"] = tagFunc(l1)
  697. m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  698. m["title"] = util.ObjToString(l1["title"])
  699. if t := util.Int64All(l1["publishtime"]); t > 0 {
  700. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  701. }
  702. m["infoid"] = util.ObjToString(l1["infoid"])
  703. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
  704. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  705. MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  706. }
  707. }
  708. if buyer := util.ObjToString(m["buyer"]); buyer != "" {
  709. s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer})
  710. if s <= 0 {
  711. saveEnt := make(map[string]interface{})
  712. saveEnt["proposed_id"] = id
  713. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  714. saveEnt["name"] = buyer
  715. if eid := redis.GetStr("ent_id", buyer); eid != "" {
  716. arr := strings.Split(eid, "_")
  717. saveEnt["name_id"] = arr[0]
  718. if len(arr) == 2 {
  719. saveEnt["area_code"] = arr[1]
  720. } else if len(arr) == 3 {
  721. saveEnt["city_code"] = arr[2]
  722. }
  723. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  724. if info != nil && len(*info) > 0 {
  725. saveEnt["address"] = (*info)[0]["address"]
  726. }
  727. }
  728. saveEnt["identity_type"] = 2
  729. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  730. saveEntPool1 <- saveEnt
  731. }
  732. }
  733. if winner := util.ObjToString(m["buyer"]); winner != "" {
  734. for _, w := range strings.Split(winner, ",") {
  735. s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w})
  736. if s <= 0 {
  737. saveEnt := make(map[string]interface{})
  738. saveEnt["proposed_id"] = id
  739. saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  740. saveEnt["name"] = w
  741. if eid := redis.GetStr("ent_id", w); eid != "" {
  742. arr := strings.Split(eid, "_")
  743. saveEnt["name_id"] = arr[0]
  744. if len(arr) == 2 {
  745. saveEnt["area_code"] = arr[1]
  746. } else if len(arr) == 3 {
  747. saveEnt["city_code"] = arr[2]
  748. }
  749. info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  750. if info != nil && len(*info) > 0 {
  751. saveEnt["address"] = (*info)[0]["address"]
  752. }
  753. }
  754. saveEnt["identity_type"] = 3
  755. saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  756. saveEntPool1 <- saveEnt
  757. }
  758. }
  759. }
  760. }
  761. }
  762. }
  763. }
  764. // 更新拟在建基本信息表
  765. if bUpdate {
  766. size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
  767. info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
  768. MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
  769. }
  770. }(tmp)
  771. tmp = make(map[string]interface{})
  772. }
  773. wg.Wait()
  774. log.Info(fmt.Sprintf("proposed over --- %d, pici ---%d", count, config.Conf.Serve.Pici))
  775. }
  776. func Method3(tmp map[string]interface{}) bool {
  777. pname := util.ObjToString(tmp["projectname"])
  778. pname = strings.ReplaceAll(pname, "\"", "'")
  779. binfo := Es.Get("proposed", fmt.Sprintf(esQ, pname))
  780. if binfo != nil && len(*binfo) > 0 {
  781. if list, ok1 := (*binfo)[0]["list"].([]interface{}); ok1 {
  782. for _, l := range list {
  783. l1 := l.(map[string]interface{})
  784. m := make(map[string]interface{})
  785. m["project_stage_code"] = tagFunc(l1)
  786. m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  787. m["title"] = util.ObjToString(l1["title"])
  788. if t := util.Int64All(l1["publishtime"]); t > 0 {
  789. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  790. }
  791. m["infoid"] = util.ObjToString(l1["infoid"])
  792. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
  793. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  794. MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  795. }
  796. redis.PutCKV(config.Conf.DB.Redis.Project, mongodb.BsonIdToSId(tmp["_id"]), fmt.Sprintf("%s-%d", util.ObjToString((*binfo)[0]["_id"]), len(list)))
  797. return true
  798. }
  799. }
  800. return false
  801. }
  802. var saveEntPool1 = make(chan map[string]interface{}, 5000)
  803. var saveEntSp1 = make(chan bool, 1)
  804. func SaveEntFunc1(table string, arr []string) {
  805. arru := make([]map[string]interface{}, saveSize)
  806. indexu := 0
  807. for {
  808. select {
  809. case v := <-saveEntPool1:
  810. arru[indexu] = v
  811. indexu++
  812. if indexu == saveSize {
  813. saveEntSp1 <- true
  814. go func(arru []map[string]interface{}) {
  815. defer func() {
  816. <-saveEntSp1
  817. }()
  818. MysqlTool.InsertBulk(table, arr, arru...)
  819. }(arru)
  820. arru = make([]map[string]interface{}, saveSize)
  821. indexu = 0
  822. }
  823. case <-time.After(1000 * time.Millisecond):
  824. if indexu > 0 {
  825. saveEntSp1 <- true
  826. go func(arru []map[string]interface{}) {
  827. defer func() {
  828. <-saveEntSp1
  829. }()
  830. MysqlTool.InsertBulk(table, arr, arru...)
  831. }(arru[:indexu])
  832. arru = make([]map[string]interface{}, saveSize)
  833. indexu = 0
  834. }
  835. }
  836. }
  837. }