projectTask.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "encoding/json"
  8. "fmt"
  9. "github.com/go-ego/gse"
  10. "go.mongodb.org/mongo-driver/bson"
  11. "go.uber.org/zap"
  12. "io/ioutil"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "proposed_project/config"
  17. "regexp"
  18. "strings"
  19. "sync"
  20. "time"
  21. "unicode/utf8"
  22. )
  23. var (
  24. seg gse.Segmenter
  25. stopWords []string
  26. regNum = regexp.MustCompile(`[\dA-Za-z]{6,30}`)
  27. regSymb = regexp.MustCompile("[•、,,.。??'\"“”‘’·~!@#¥$%…&*()()\\-—+=【】\\[\\]{}{}<>《》|\\/\\s]+")
  28. regDel = regexp.MustCompile("项目|工程|生产|中心")
  29. reg1 = regexp.MustCompile("分布式光伏发电|自然人|出让|国有建设用地使用权")
  30. field = []string{"projectname.pname"}
  31. sField = []string{"projectname", "bidstatus", "firsttime", "_id", "area", "city", "district"}
  32. savePpPool = make(chan map[string]interface{}, 5000)
  33. savePpSp = make(chan bool, 3)
  34. )
  35. func initSeg() {
  36. _ = seg.LoadDict("./t_1.txt")
  37. //_ = seg.LoadDict()
  38. seg.AddToken("渼陂", 3, "")
  39. seg.LoadStop("./stopwords.txt")
  40. //f, _ := os.Open("./stopwords.txt")
  41. //defer f.Close()
  42. //scanner := bufio.NewScanner(f)
  43. //for scanner.Scan() {
  44. // stopWords = append(stopWords, scanner.Text())
  45. //}
  46. //sort.Strings(stopWords)
  47. }
  48. func taskC() {
  49. sess := MgoPro.GetMgoConn()
  50. defer MgoPro.DestoryMongoConn(sess)
  51. ch := make(chan bool, config.Conf.Serve.Thread)
  52. wg := &sync.WaitGroup{}
  53. f := map[string]interface{}{
  54. "projectname": 1,
  55. "approvecode": 1,
  56. "approvenumber": 1,
  57. "approvestatus": 1,
  58. "area": 1,
  59. "city": 1,
  60. "district": 1,
  61. }
  62. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(nil).Select(f).Iter()
  63. count := 0
  64. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  65. if count%2000 == 0 {
  66. log.Info(fmt.Sprintf("current --- %d", count))
  67. }
  68. ch <- true
  69. wg.Add(1)
  70. go func(tmp map[string]interface{}) {
  71. defer func() {
  72. <-ch
  73. wg.Done()
  74. }()
  75. if filterMethod(tmp) {
  76. return
  77. }
  78. var mArr []map[string]interface{}
  79. var eArr []map[string]interface{}
  80. n1, n2 := 0, 0
  81. // approvecode、approvenumber
  82. //q := Method2(util.ObjToString(tmp["approvecode"]), util.ObjToString(tmp["approvenumber"]))
  83. //if q != "" {
  84. // binfo := Es.Get("projectset", q)
  85. // if binfo != nil && len(*binfo) > 0 {
  86. // for _, m := range *binfo {
  87. // n1 = len(*binfo)
  88. // mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 1})
  89. // }
  90. // }
  91. //}
  92. wds, q := Method1(util.ObjToString(tmp["projectname"]))
  93. if q != "" {
  94. binfo := Es.Get("projectset_v1", q)
  95. if binfo != nil && len(*binfo) > 0 {
  96. n2 = len(*binfo)
  97. for _, m := range *binfo {
  98. if b, _ := redis.Exists(config.Conf.DB.Redis.Pcode, util.ObjToString(m["_id"])); b {
  99. continue
  100. }
  101. if util.ObjToString(m["bidstatus"]) == "拟建" {
  102. eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]})
  103. continue
  104. }
  105. if judgeArea(tmp, m, wds) {
  106. eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]})
  107. continue
  108. }
  109. redis.PutCKV(config.Conf.DB.Redis.Pcode, util.ObjToString(m["_id"]), mongodb.BsonIdToSId(tmp["_id"]))
  110. mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 2})
  111. }
  112. }
  113. }
  114. //if mArr != nil && len(mArr) > 0 {
  115. save := make(map[string]interface{})
  116. save["_id"] = tmp["_id"]
  117. save["ids"] = mArr
  118. save["wds"] = wds
  119. save["err"] = eArr
  120. save["projectname"] = tmp["projectname"]
  121. save["esearch"] = q
  122. save["size_1"] = n1
  123. save["size_2"] = n2
  124. save["createtime"] = time.Now().Unix()
  125. savePpPool <- save
  126. //}
  127. }(tmp)
  128. tmp = make(map[string]interface{})
  129. }
  130. wg.Wait()
  131. log.Info(fmt.Sprintf("over --- %d", count))
  132. }
  133. func Method(pname string) string {
  134. pname = regNum.ReplaceAllString(pname, "")
  135. pname = regSymb.ReplaceAllString(pname, "")
  136. //wds := seg.CutStop(pname, true)
  137. util.Debug(pname)
  138. surl := config.Conf.DB.Es.Addr + "/projectset_v2/_analyze"
  139. URL, _ := url.Parse(surl)
  140. Q := URL.Query()
  141. Q.Add("pretty", "1")
  142. Q.Add("analyzer", "my_ngram_title")
  143. Q.Add("text", pname)
  144. URL.RawQuery = Q.Encode()
  145. resp, err := http.Get(URL.String())
  146. if err != nil {
  147. log.Info("")
  148. }
  149. result, err := ioutil.ReadAll(resp.Body)
  150. if err != nil {
  151. log.Info("")
  152. }
  153. var resMap map[string]interface{}
  154. json.Unmarshal(result, &resMap)
  155. if resMap == nil || len(resMap["tokens"].([]interface{})) == 0 {
  156. log.Info("")
  157. }
  158. tokens := util.ObjArrToMapArr(resMap["tokens"].([]interface{}))
  159. var should []interface{}
  160. for _, t := range tokens {
  161. wd := util.ObjToString(t["token"])
  162. if in(wd) {
  163. // 删除词 跳过
  164. continue
  165. }
  166. mm := &ShouldObject{MultiMatch: &MultiMatch{
  167. Query: wd,
  168. Type: "phrase",
  169. Fields: field,
  170. }}
  171. should = append(should, mm)
  172. }
  173. query := &QueryObject{Query: &BoolObject{Bool: &MMSMObject{Must: should}}}
  174. jsr, err := json.Marshal(query)
  175. if err != nil {
  176. fmt.Printf("Err %v", err)
  177. os.Exit(1)
  178. }
  179. return string(jsr)
  180. }
  181. //@Description jieba分词查询
  182. // @Desc projectname
  183. // @Desc minimum_should_match 当should分支总数小于等于指定的数量时,则必须匹配所有should分支,当should分支总数大于指定的数量时,则应用指定的说明符
  184. // @Author J 2023/3/7 17:13
  185. func Method1(pname string) ([]string, string) {
  186. if pname == "" {
  187. return nil, ""
  188. }
  189. var wds []string
  190. p1 := regSymb.ReplaceAllString(pname, "")
  191. p1 = regNum.ReplaceAllString(p1, "")
  192. if utf8.RuneCountInString(p1) < 12 {
  193. wds = append(wds, pname)
  194. } else {
  195. wds = seg.CutStop(pname, true)
  196. }
  197. if len(wds) > 0 {
  198. wds = combArr(wds)
  199. var should []interface{}
  200. for _, t := range wds {
  201. //if utf8.RuneCountInString(t) == 1 {
  202. // continue
  203. //}
  204. mm := &ShouldObject{MultiMatch: &MultiMatch{
  205. Query: t,
  206. Type: "phrase",
  207. Fields: field,
  208. }}
  209. should = append(should, mm)
  210. }
  211. if should == nil || len(should) <= 0 {
  212. return nil, ""
  213. }
  214. query := &QueryObject{Query: &BoolObject{Bool: &MMSMObject{Should: should, MinSdMatch: config.Conf.DB.Es.MinSdMh}}, Source: sField, Size: 1000}
  215. jsr, err := json.Marshal(query)
  216. if err != nil {
  217. fmt.Printf("Err %v", err)
  218. os.Exit(1)
  219. }
  220. return wds, strings.Replace(string(jsr), "\\u003c", "<", -1)
  221. } else {
  222. return nil, ""
  223. }
  224. }
  225. // @Description
  226. // @Author J 2023/3/20 14:39
  227. func Method2(acode, anumb string) string {
  228. if acode == "" && anumb == "" {
  229. return ""
  230. }
  231. var should []interface{}
  232. if acode != "" {
  233. should = append(should, &ShouldObject{MultiMatch: &MultiMatch{
  234. Query: acode,
  235. Type: "phrase",
  236. Fields: []string{"detail"},
  237. }})
  238. }
  239. if anumb != "" {
  240. should = append(should, &ShouldObject{MultiMatch: &MultiMatch{
  241. Query: anumb,
  242. Type: "phrase",
  243. Fields: []string{"detail"},
  244. }})
  245. }
  246. query := &QueryObject1{Query: &BoolObject1{Bool: &MMSObject{Should: should}}, Source: sField, Size: 500}
  247. jsr, err := json.Marshal(query)
  248. if err != nil {
  249. fmt.Printf("Err %v", err)
  250. os.Exit(1)
  251. }
  252. return string(jsr)
  253. }
  254. func judgeArea(tmp, btmp map[string]interface{}, wds []string) bool {
  255. if util.ObjToString(tmp["area"]) == "全国" || util.ObjToString(btmp["area"]) == "全国" {
  256. pname := util.ObjToString(btmp["projectname"])
  257. for _, wd := range wds {
  258. if !strings.Contains(strings.ToLower(pname), strings.ToLower(wd)) {
  259. return true
  260. }
  261. }
  262. return false
  263. }
  264. if tmp["district"] != nil && btmp["district"] != nil {
  265. if util.ObjToString(tmp["district"]) == util.ObjToString(btmp["district"]) {
  266. return false
  267. } else {
  268. return true
  269. }
  270. } else if tmp["city"] != nil && btmp["city"] != nil {
  271. if util.ObjToString(tmp["city"]) == util.ObjToString(btmp["city"]) {
  272. return false
  273. } else {
  274. return true
  275. }
  276. } else if tmp["area"] != nil && btmp["area"] != nil {
  277. if util.ObjToString(tmp["area"]) == util.ObjToString(btmp["area"]) {
  278. return false
  279. } else {
  280. return true
  281. }
  282. }
  283. return false
  284. }
  285. var regs = []*regexp.Regexp{
  286. regexp.MustCompile("[\\\\u4e00-\\\\u9fa5]{0,10}私宅"),
  287. regexp.MustCompile("自然人|分布式光伏发电"),
  288. }
  289. // @Description 过滤数据
  290. // @Author J 2023/3/24 09:30
  291. func filterMethod(tmp map[string]interface{}) bool {
  292. pname := regSymb.ReplaceAllString(util.ObjToString(tmp["projectname"]), "")
  293. pname = regNum.ReplaceAllString(pname, "")
  294. p1 := regDel.ReplaceAllString(pname, "")
  295. p1 = regSymb.ReplaceAllString(p1, "")
  296. if utf8.RuneCountInString(p1) <= 5 {
  297. return true
  298. }
  299. if len(reg1.FindAllString(pname, -1)) > 1 {
  300. return true
  301. }
  302. for _, reg := range regs {
  303. if reg.MatchString(util.ObjToString(tmp["projectname"])) {
  304. return true
  305. }
  306. }
  307. return false
  308. }
  309. func combArr(arr []string) []string {
  310. var nArr []string
  311. for i := 0; i < len(arr); i++ {
  312. if i == len(arr)-1 {
  313. if utf8.RuneCountInString(arr[i]) == 1 {
  314. nArr[len(nArr)-1] += arr[i]
  315. } else {
  316. nArr = append(nArr, arr[i])
  317. }
  318. } else {
  319. if utf8.RuneCountInString(arr[i]) == 1 {
  320. if i == 0 {
  321. nArr = append(nArr, arr[i])
  322. } else {
  323. nArr[len(nArr)-1] += arr[i]
  324. //if utf8.RuneCountInString(arr[i+1]) == 1 {
  325. // if utf8.RuneCountInString(nArr[len(nArr)-1]) == 1 {
  326. // nArr[len(nArr)-1] += arr[i]
  327. // } else {
  328. // nArr = append(nArr, arr[i])
  329. // }
  330. //} else {
  331. // nArr[len(nArr)-1] += arr[i]
  332. //}
  333. }
  334. } else {
  335. if len(nArr) > 0 && utf8.RuneCountInString(nArr[len(nArr)-1]) == 1 {
  336. nArr[len(nArr)-1] += arr[i]
  337. } else {
  338. nArr = append(nArr, arr[i])
  339. }
  340. }
  341. }
  342. }
  343. return nArr
  344. }
  345. func SavePpMethod() {
  346. arru := make([]map[string]interface{}, saveSize)
  347. indexu := 0
  348. for {
  349. select {
  350. case v := <-savePpPool:
  351. arru[indexu] = v
  352. indexu++
  353. if indexu == saveSize {
  354. savePpSp <- true
  355. go func(arru []map[string]interface{}) {
  356. defer func() {
  357. <-savePpSp
  358. }()
  359. MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...)
  360. }(arru)
  361. arru = make([]map[string]interface{}, saveSize)
  362. indexu = 0
  363. }
  364. case <-time.After(1000 * time.Millisecond):
  365. if indexu > 0 {
  366. savePpSp <- true
  367. go func(arru []map[string]interface{}) {
  368. defer func() {
  369. <-savePpSp
  370. }()
  371. MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...)
  372. }(arru[:indexu])
  373. arru = make([]map[string]interface{}, saveSize)
  374. indexu = 0
  375. }
  376. }
  377. }
  378. }
  379. var StageCode []TagMatching
  380. func initStage() {
  381. info, _ := MgoBid.Find(config.Conf.Serve.TagRule, bson.M{"label_name": "project_stage"}, `{"_id": 1}`, nil, false, -1, -1)
  382. for _, m := range *info {
  383. tag := TagMatching{}
  384. tag.tagName = util.ObjToString(m["label_name"])
  385. tag.tagCode = util.ObjToString(m["code"])
  386. // 关键词
  387. tag.matchField = []string{"title", "project"}
  388. if v := util.ObjToString(m["keyword"]); v != "" {
  389. tag.matchKey = util.ObjToString(m["keyword"])
  390. tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"]))
  391. }
  392. // 附件词
  393. if f := util.ObjToString(m["match_fjword"]); f != "" {
  394. tag.addField = strings.Split(f, ",")
  395. for _, s := range tag.addField {
  396. SelectF[s] = 1
  397. }
  398. if v := util.ObjToString(m["fjword"]); v != "" {
  399. tag.addKey = util.ObjToString(m["fjword"])
  400. tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"]))
  401. }
  402. }
  403. // 排除词
  404. if f := util.ObjToString(m["match_pcword"]); f != "" {
  405. tag.excludeField = strings.Split(f, ",")
  406. for _, s := range tag.excludeField {
  407. SelectF[s] = 1
  408. }
  409. if v := util.ObjToString(m["pcword"]); v != "" {
  410. tag.excludeKey = util.ObjToString(m["pcword"])
  411. tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"]))
  412. }
  413. }
  414. // 清理词
  415. if v := util.ObjToString(m["qlword"]); v != "" {
  416. tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",")
  417. }
  418. StageCode = append(StageCode, tag)
  419. }
  420. log.Info("initStage", zap.Int("StageCode", len(StageCode)))
  421. }
  422. func taskD() {
  423. sess := MgoPro.GetMgoConn()
  424. defer MgoPro.DestoryMongoConn(sess)
  425. ch := make(chan bool, config.Conf.Serve.Thread)
  426. wg := &sync.WaitGroup{}
  427. //q := bson.M{"_id": mongodb.StringTOBsonId("60a2995b8a2adb30a57172ec")}
  428. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.CombColl).Find(nil).Iter()
  429. count := 0
  430. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  431. if count%2000 == 0 {
  432. log.Info(fmt.Sprintf("current --- %d", count))
  433. }
  434. ch <- true
  435. wg.Add(1)
  436. go func(tmp map[string]interface{}) {
  437. defer func() {
  438. <-ch
  439. wg.Done()
  440. }()
  441. if ids, ok := tmp["ids"].([]interface{}); ok {
  442. //id := mongodb.BsonIdToSId(tmp["_id"])
  443. for _, p := range ids {
  444. p1 := p.(map[string]interface{})
  445. info, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProjectColl, util.ObjToString(p1["pid"]), nil)
  446. if list, ok1 := (*info)["list"].([]interface{}); ok1 {
  447. for _, l := range list {
  448. l1 := l.(map[string]interface{})
  449. m := make(map[string]interface{})
  450. m["project_stage_code"] = tagFunc(l1)
  451. m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  452. m["title"] = util.ObjToString(l1["title"])
  453. if t := util.Int64All(l1["publishtime"]); t > 0 {
  454. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  455. }
  456. m["infoid"] = util.ObjToString(l1["infoid"])
  457. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
  458. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  459. MgoPro.Save("projectset_comb_temp1", m)
  460. //MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  461. }
  462. }
  463. //if buyer := util.ObjToString((*info)["buyer"]); buyer != "" {
  464. // s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer})
  465. // if s <= 0 {
  466. // saveEnt := make(map[string]interface{})
  467. // saveEnt["proposed_id"] = id
  468. // saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  469. // saveEnt["name"] = buyer
  470. // if eid := redis.GetStr("ent_id", buyer); eid != "" {
  471. // arr := strings.Split(eid, "_")
  472. // saveEnt["name_id"] = arr[0]
  473. // if len(arr) == 2 {
  474. // saveEnt["area_code"] = arr[1]
  475. // } else if len(arr) == 3 {
  476. // saveEnt["city_code"] = arr[2]
  477. // }
  478. // info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  479. // if info != nil && len(*info) > 0 {
  480. // saveEnt["address"] = (*info)[0]["address"]
  481. // }
  482. // }
  483. // saveEnt["identity_type"] = 2
  484. // saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  485. // saveEntPool1 <- saveEnt
  486. // }
  487. //}
  488. //if winner := util.ObjToString((*info)["buyer"]); winner != "" {
  489. // for _, w := range strings.Split(winner, ",") {
  490. // s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w})
  491. // if s <= 0 {
  492. // saveEnt := make(map[string]interface{})
  493. // saveEnt["proposed_id"] = id
  494. // saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  495. // saveEnt["name"] = w
  496. // if eid := redis.GetStr("ent_id", w); eid != "" {
  497. // arr := strings.Split(eid, "_")
  498. // saveEnt["name_id"] = arr[0]
  499. // if len(arr) == 2 {
  500. // saveEnt["area_code"] = arr[1]
  501. // } else if len(arr) == 3 {
  502. // saveEnt["city_code"] = arr[2]
  503. // }
  504. // info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  505. // if info != nil && len(*info) > 0 {
  506. // saveEnt["address"] = (*info)[0]["address"]
  507. // }
  508. // }
  509. // saveEnt["identity_type"] = 3
  510. // saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  511. // saveEntPool1 <- saveEnt
  512. // }
  513. // }
  514. //}
  515. }
  516. //size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
  517. //info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
  518. //MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
  519. }
  520. }(tmp)
  521. tmp = make(map[string]interface{})
  522. }
  523. wg.Wait()
  524. log.Info(fmt.Sprintf("over --- %d", count))
  525. }
  526. // @Description 施工准备(06)、施工(07)、设计(05)
  527. // @Author J 2023/4/21 14:45
  528. func tagFunc(info map[string]interface{}) string {
  529. tag := taskFuc1(info)
  530. if tag["project_stage"] != "" {
  531. return util.ObjToString(tag["project_stage"])
  532. }
  533. if util.ObjToString(info["toptype"]) == "招标" || util.ObjToString(info["toptype"]) == "预告" {
  534. return "06"
  535. }
  536. return "00"
  537. }
  538. // @Description 在建项目增量
  539. // @Author J 2023/4/24 13:58
  540. func taskAA() {
  541. sess := MgoPro.GetMgoConn()
  542. defer MgoPro.DestoryMongoConn(sess)
  543. ch := make(chan bool, config.Conf.Serve.Thread)
  544. wg := &sync.WaitGroup{}
  545. q := bson.M{"pici": bson.M{"$gte": config.Conf.Serve.Pici}}
  546. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProjectColl).Find(q).Iter()
  547. count := 0
  548. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  549. if count%2000 == 0 {
  550. log.Info(fmt.Sprintf("current --- %d", count))
  551. }
  552. if pc := util.Int64All(tmp["pici"]); pc > config.Conf.Serve.Pici {
  553. config.Conf.Serve.Pici = pc
  554. }
  555. ch <- true
  556. wg.Add(1)
  557. go func(tmp map[string]interface{}) {
  558. defer func() {
  559. <-ch
  560. wg.Done()
  561. }()
  562. id := mongodb.BsonIdToSId(tmp["_id"])
  563. if str := redis.GetStr(config.Conf.DB.Redis.Pcode, id); str != "" {
  564. strs := strings.Split(str, "-")
  565. if len(tmp["list"].([]interface{})) != util.IntAll(strs[1]) {
  566. for _, info := range tmp["list"].([]interface{}) {
  567. info1 := info.(map[string]interface{})
  568. }
  569. }
  570. } else {
  571. }
  572. }(tmp)
  573. tmp = make(map[string]interface{})
  574. }
  575. wg.Wait()
  576. log.Info(fmt.Sprintf("over --- %d, pici ---%d", count, config.Conf.Serve.Pici))
  577. }
  578. // @Description 拟建项目增量
  579. // @Author J 2023/4/24 13:59
  580. func taskBB() {
  581. sess := MgoPro.GetMgoConn()
  582. defer MgoPro.DestoryMongoConn(sess)
  583. ch := make(chan bool, config.Conf.Serve.Thread)
  584. wg := &sync.WaitGroup{}
  585. //q := bson.M{"_id": mongodb.StringTOBsonId("60a2995b8a2adb30a57172ec")}
  586. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.CombColl).Find(nil).Iter()
  587. count := 0
  588. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  589. if count%2000 == 0 {
  590. log.Info(fmt.Sprintf("current --- %d", count))
  591. }
  592. ch <- true
  593. wg.Add(1)
  594. go func(tmp map[string]interface{}) {
  595. defer func() {
  596. <-ch
  597. wg.Done()
  598. }()
  599. if ids, ok := tmp["ids"].([]interface{}); ok {
  600. //id := mongodb.BsonIdToSId(tmp["_id"])
  601. for _, p := range ids {
  602. p1 := p.(map[string]interface{})
  603. info, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProjectColl, util.ObjToString(p1["pid"]), nil)
  604. if list, ok1 := (*info)["list"].([]interface{}); ok1 {
  605. for _, l := range list {
  606. l1 := l.(map[string]interface{})
  607. m := make(map[string]interface{})
  608. m["project_stage_code"] = tagFunc(l1)
  609. m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  610. m["title"] = util.ObjToString(l1["title"])
  611. if t := util.Int64All(l1["publishtime"]); t > 0 {
  612. m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  613. }
  614. m["infoid"] = util.ObjToString(l1["infoid"])
  615. m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
  616. m["createtime"] = time.Now().Format(util.Date_Full_Layout)
  617. MgoPro.Save("projectset_comb_temp1", m)
  618. //MysqlTool.Insert("dwd_f_nzj_follw_record", m)
  619. }
  620. }
  621. //if buyer := util.ObjToString((*info)["buyer"]); buyer != "" {
  622. // s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer})
  623. // if s <= 0 {
  624. // saveEnt := make(map[string]interface{})
  625. // saveEnt["proposed_id"] = id
  626. // saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  627. // saveEnt["name"] = buyer
  628. // if eid := redis.GetStr("ent_id", buyer); eid != "" {
  629. // arr := strings.Split(eid, "_")
  630. // saveEnt["name_id"] = arr[0]
  631. // if len(arr) == 2 {
  632. // saveEnt["area_code"] = arr[1]
  633. // } else if len(arr) == 3 {
  634. // saveEnt["city_code"] = arr[2]
  635. // }
  636. // info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  637. // if info != nil && len(*info) > 0 {
  638. // saveEnt["address"] = (*info)[0]["address"]
  639. // }
  640. // }
  641. // saveEnt["identity_type"] = 2
  642. // saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  643. // saveEntPool1 <- saveEnt
  644. // }
  645. //}
  646. //if winner := util.ObjToString((*info)["buyer"]); winner != "" {
  647. // for _, w := range strings.Split(winner, ",") {
  648. // s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w})
  649. // if s <= 0 {
  650. // saveEnt := make(map[string]interface{})
  651. // saveEnt["proposed_id"] = id
  652. // saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
  653. // saveEnt["name"] = w
  654. // if eid := redis.GetStr("ent_id", w); eid != "" {
  655. // arr := strings.Split(eid, "_")
  656. // saveEnt["name_id"] = arr[0]
  657. // if len(arr) == 2 {
  658. // saveEnt["area_code"] = arr[1]
  659. // } else if len(arr) == 3 {
  660. // saveEnt["city_code"] = arr[2]
  661. // }
  662. // info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
  663. // if info != nil && len(*info) > 0 {
  664. // saveEnt["address"] = (*info)[0]["address"]
  665. // }
  666. // }
  667. // saveEnt["identity_type"] = 3
  668. // saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  669. // saveEntPool1 <- saveEnt
  670. // }
  671. // }
  672. //}
  673. }
  674. //size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
  675. //info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
  676. //MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
  677. }
  678. }(tmp)
  679. tmp = make(map[string]interface{})
  680. }
  681. wg.Wait()
  682. log.Info(fmt.Sprintf("over --- %d", count))
  683. }
  684. var saveEntPool1 = make(chan map[string]interface{}, 5000)
  685. var saveEntSp1 = make(chan bool, 1)
  686. func SaveEntFunc1(table string, arr []string) {
  687. arru := make([]map[string]interface{}, saveSize)
  688. indexu := 0
  689. for {
  690. select {
  691. case v := <-saveEntPool1:
  692. arru[indexu] = v
  693. indexu++
  694. if indexu == saveSize {
  695. saveEntSp1 <- true
  696. go func(arru []map[string]interface{}) {
  697. defer func() {
  698. <-saveEntSp1
  699. }()
  700. MysqlTool.InsertBulk(table, arr, arru...)
  701. }(arru)
  702. arru = make([]map[string]interface{}, saveSize)
  703. indexu = 0
  704. }
  705. case <-time.After(1000 * time.Millisecond):
  706. if indexu > 0 {
  707. saveEntSp1 <- true
  708. go func(arru []map[string]interface{}) {
  709. defer func() {
  710. <-saveEntSp1
  711. }()
  712. MysqlTool.InsertBulk(table, arr, arru...)
  713. }(arru[:indexu])
  714. arru = make([]map[string]interface{}, saveSize)
  715. indexu = 0
  716. }
  717. }
  718. }
  719. }