main.go 13 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/mongo"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "log"
  12. "math"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. MatchArr []TagMatching
  19. BrandArr []Brand
  20. KeyRegs []*RegexpInfo //关键词正则
  21. KeyWords = "课桌椅,办公桌,会议桌,讲桌,办公桌椅,木质课桌,电脑桌,折叠桌,课桌凳,学生桌,教师桌,乒乓球桌,幼儿桌子,午休椅,讲桌,木制桌,休闲桌椅,多功能桌,多功能椅,教室桌,培训桌,桌椅板凳,午休卓,办公椅,礼堂椅,会客椅"
  22. )
  23. type Brand struct {
  24. Name string `json:"name" bson:"name"`
  25. Tags []string `json:"tags" bson:"tags"`
  26. }
  27. var (
  28. MgoBid *mongodb.MongodbSim
  29. Thread int //配置项线程数
  30. saveSize int
  31. savePool chan map[string]interface{}
  32. saveSp chan bool
  33. saveTmpPool chan map[string]interface{} //存储itemname过滤的数据,存在临时表
  34. saveTmpSp chan bool
  35. )
  36. func InitDB() *mongo.Client {
  37. clientOptions := options.Client().ApplyURI("mongodb://127.0.0.1:27017")
  38. MongoDBClient, err := mongo.Connect(context.TODO(), clientOptions) // 连接数据库
  39. if err != nil {
  40. fmt.Println("err =>", err)
  41. }
  42. err = MongoDBClient.Ping(context.TODO(), nil) // 检查数据库是否连接成功
  43. if err != nil {
  44. fmt.Println("err =>", err)
  45. }
  46. fmt.Println("链接success")
  47. return MongoDBClient
  48. }
  49. func InitMgo() {
  50. MgoBid = &mongodb.MongodbSim{
  51. MongodbAddr: "127.0.0.1:27017",
  52. DbName: "ldx",
  53. Size: 10,
  54. UserName: "",
  55. Password: "",
  56. }
  57. MgoBid.InitPool()
  58. Thread = 1
  59. saveSize = 200
  60. savePool = make(chan map[string]interface{}, 1000)
  61. saveSp = make(chan bool, 4)
  62. saveTmpPool = make(chan map[string]interface{}, 1000)
  63. saveTmpSp = make(chan bool, 2)
  64. }
  65. func main() {
  66. //client := InitDB()
  67. InitMgo()
  68. InitRule()
  69. InitBrand()
  70. go saveMethod()
  71. go saveTmpMethod()
  72. taskRun()
  73. c := make(chan bool, 1)
  74. <-c
  75. //
  76. //f, err := excelize.OpenFile("ldx-tmp.xlsx")
  77. //if err != nil {
  78. // fmt.Println(err)
  79. // return
  80. //}
  81. //defer func() {
  82. // if err := f.Close(); err != nil {
  83. // fmt.Println(err)
  84. // }
  85. //}()
  86. //
  87. //datas := []interface{}{}
  88. //// 获取 Sheet1 上所有单元格
  89. //rows, err := f.GetRows("ldx-tmp")
  90. //if err != nil {
  91. // fmt.Println(err)
  92. // return
  93. //}
  94. //
  95. //for k, row := range rows[1:] {
  96. // if len(row) < 3 {
  97. // continue
  98. // }
  99. // pinpai := PinPai{
  100. // Name: strings.TrimSpace(row[1]),
  101. // Tags: strings.Split(row[2], "/"),
  102. // }
  103. // fmt.Println("k ==>", k)
  104. //
  105. // datas = append(datas, pinpai)
  106. // //client.Database("ldx").Collection("pinpai").InsertOne(context.TODO(), pinpai)
  107. //
  108. //}
  109. //client.Database("ldx").Collection("pinpai").InsertMany(context.TODO(), datas)
  110. //
  111. //fmt.Println("success~~~~~~")
  112. }
  113. // InitRule 初始化规则
  114. func InitRule() {
  115. info, _ := MgoBid.Find("productclass_rule", nil, `{"_id": 1}`, nil, false, -1, -1)
  116. for _, m := range *info {
  117. tag := TagMatching{}
  118. tag.tagName = util.ObjToString(m["label_name"])
  119. tag.tagCode = util.ObjToString(m["label_code"])
  120. tag.buyerclass = util.ObjToString(m["buyerclass"])
  121. // 关键词
  122. if f := util.ObjToString(m["match_keyword"]); f != "" {
  123. tag.matchField = strings.Split(f, ",")
  124. if v := util.ObjToString(m["keyword"]); v != "" {
  125. tag.matchKey = util.ObjToString(m["keyword"])
  126. tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"]))
  127. }
  128. }
  129. // 附件词
  130. if f := util.ObjToString(m["match_fjword"]); f != "" {
  131. tag.addField = strings.Split(f, ",")
  132. if v := util.ObjToString(m["fjword"]); v != "" {
  133. tag.addKey = util.ObjToString(m["fjword"])
  134. tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"]))
  135. }
  136. }
  137. // 排除词
  138. if f := util.ObjToString(m["match_pcword"]); f != "" {
  139. tag.excludeField = strings.Split(f, ",")
  140. if v := util.ObjToString(m["pcword"]); v != "" {
  141. tag.excludeKey = util.ObjToString(m["pcword"])
  142. tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"]))
  143. }
  144. }
  145. // 清理词
  146. if v := util.ObjToString(m["qlword"]); v != "" {
  147. tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",")
  148. }
  149. MatchArr = append(MatchArr, tag)
  150. }
  151. fmt.Println("len InitRule ==>", len(MatchArr))
  152. //log.Info("InitRule", zap.Any("MatchArr", len(MatchArr)))
  153. }
  154. func InitBrand() {
  155. info, _ := MgoBid.Find("brand_name", nil, `{"_id": 1}`, nil, false, -1, -1)
  156. brands, err := json.Marshal(*info)
  157. if err != nil {
  158. fmt.Println("InitBrand Marshal err =>", err)
  159. }
  160. err = json.Unmarshal(brands, &BrandArr)
  161. if err != nil {
  162. fmt.Println("InitBrand Unmarshal err =>", err)
  163. }
  164. fmt.Println("len brand ==>", len(BrandArr))
  165. //log.Info("InitBrand", zap.Int("BrandArr", len(BrandArr)))
  166. KeyRegs = GetRegex(KeyWords)
  167. fmt.Println("len(KeyRegs)->", len(KeyRegs))
  168. }
  169. func taskRun() {
  170. sess := MgoBid.GetMgoConn()
  171. defer MgoBid.DestoryMongoConn(sess)
  172. ch := make(chan bool, Thread)
  173. wg := &sync.WaitGroup{}
  174. //query := sess.DB("ldx").C("20221122Ldx_kzy").FindId("637c7cd709f6fc229744825b").Select(nil).Iter()
  175. f := bson.M{"purchasinglist": 1, "area": 1, "city": 1, "district": 1, "buyerclass": 1, "winner": 1, "buyer": 1,
  176. "agency": 1, "matchkey": 1, "publishtime": 1, "projectname": 1, "budget": 1, "bidamount": 1, "id": 1}
  177. query := sess.DB("ldx").C("20221122Ldx_kzy").Find(nil).Select(f).Iter()
  178. count := 0
  179. //tmp, _ := MgoBid.FindById("20221122Ldx_kzy", "637c7cd709f6fc2297448265", nil)
  180. //go func(tmp map[string]interface{}) {
  181. // defer func() {
  182. // <-ch
  183. // wg.Done()
  184. // }()
  185. // tag := taskFuc(tmp) //打标签, 返回 对应TagMatching,
  186. // //处理数据
  187. // start := time.Now()
  188. // dealTmp(tmp, tag)
  189. // elapsed := time.Since(start)
  190. // fmt.Println("dealTmp -- 该函数执行完成耗时:", elapsed)
  191. //
  192. //}(*tmp)
  193. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  194. if count%saveSize == 0 {
  195. //log.Info(fmt.Sprintf("current --- %d", count))
  196. log.Println("current ----->", count)
  197. //fmt.Println(fmt.Sprintf("current --- %d", count))
  198. }
  199. ch <- true
  200. wg.Add(1)
  201. go func(tmp map[string]interface{}) {
  202. defer func() {
  203. <-ch
  204. wg.Done()
  205. }()
  206. tag := TaskFuc(tmp) //打标签, 返回 对应TagMatching,
  207. //处理数据
  208. //start := time.Now()
  209. dealTmp(tmp, tag)
  210. //elapsed := time.Since(start)
  211. //fmt.Println("dealTmp -- 该函数执行完成耗时:", elapsed)
  212. }(tmp)
  213. tmp = make(map[string]interface{})
  214. }
  215. wg.Wait()
  216. fmt.Println("over ---> ", count)
  217. }
  218. func TaskFuc(tmp map[string]interface{}) (tag TagMatching) {
  219. L:
  220. for _, v := range MatchArr {
  221. // 同个类型的标签如果存在,就不需要再打
  222. if v.buyerclass == "" || (v.buyerclass != "" && strings.Contains(v.buyerclass, util.ObjToString(tmp["buyerclass"]))) {
  223. // 排除词
  224. if len(v.excludeField) > 0 && len(v.excludeKeyReg) > 0 {
  225. for _, f := range v.excludeField {
  226. if val := util.ObjToString(tmp[f]); val != "" {
  227. for _, e1 := range v.excludeKeyReg {
  228. if e1.regs != nil && e1.regs.MatchString(val) {
  229. break L
  230. } else {
  231. // && 特殊处理
  232. if strings.Contains(e1.keyStr, "&&") {
  233. flag := true
  234. for _, s := range strings.Split(e1.keyStr, "&&") {
  235. if !strings.Contains(val, s) {
  236. flag = false
  237. break
  238. }
  239. }
  240. if flag {
  241. break L
  242. }
  243. }
  244. }
  245. }
  246. }
  247. }
  248. }
  249. // 清理词
  250. if len(v.clearKey) > 0 && len(v.matchField) > 0 {
  251. for _, s := range v.clearKey {
  252. for _, f := range v.matchField {
  253. if val := util.ObjToString(tmp[f]); val != "" {
  254. tmp[f] = strings.ReplaceAll(val, s, "")
  255. }
  256. }
  257. }
  258. }
  259. // 关键词
  260. if len(v.matchField) > 0 && len(v.matchKeyReg) > 0 {
  261. for _, f := range v.matchField {
  262. if val := util.ObjToString(tmp[f]); val != "" {
  263. for _, r1 := range v.matchKeyReg {
  264. if r1.regs.MatchString(val) {
  265. if len(v.addField) > 0 && len(v.addKeyReg) > 0 {
  266. // 匹配附加词
  267. isCt := false
  268. for _, f1 := range v.addField {
  269. if v1 := util.ObjToString(tmp[f1]); v1 != "" {
  270. for _, r2 := range v.addKeyReg {
  271. if r2.regs != nil && r2.regs.MatchString(v1) {
  272. isCt = true
  273. } else {
  274. // && 特殊处理
  275. if strings.Contains(r2.keyStr, "&&") {
  276. flag := true
  277. for _, s := range strings.Split(r2.keyStr, "&&") {
  278. if !strings.Contains(v1, s) {
  279. flag = false
  280. break
  281. }
  282. }
  283. if flag {
  284. isCt = true
  285. }
  286. }
  287. }
  288. }
  289. }
  290. }
  291. if isCt {
  292. return v
  293. }
  294. } else {
  295. return v
  296. }
  297. }
  298. }
  299. }
  300. }
  301. }
  302. }
  303. }
  304. return tag
  305. }
  306. // dealTmp 处理tmp 新增字段,拆分purchasinglist,补充里面单价数量
  307. func dealTmp(tmp map[string]interface{}, tag TagMatching) {
  308. tmp["infoid"] = util.ObjToString(tmp["id"])
  309. //满足打标签,添加新字段
  310. if tag.tagName != "" {
  311. tmp["productclass_val"] = tag.tagName
  312. tmp[tag.tagName] = tag.tagCode
  313. }
  314. //包含二个字段时,进行计算补充第三个字段
  315. fields := []string{"number", "totalprice", "unitprice"}
  316. // 1.循环purchasinglist,匹配里面的brandname 值,然后替换
  317. //2.计算补充里 数量总价
  318. if purchasinglist, ok := tmp["purchasinglist"]; ok {
  319. if list, ok1 := purchasinglist.([]interface{}); ok1 {
  320. if len(list) > 0 {
  321. for _, item := range list {
  322. newTmp := util.DeepCopy(tmp).(map[string]interface{})
  323. skipTmp := util.DeepCopy(tmp).(map[string]interface{})
  324. var values []string
  325. item1 := item.(map[string]interface{})
  326. if brandname := util.ObjToString(item1["brandname"]); brandname != "" {
  327. values = append(values, util.ObjToString(brandname))
  328. }
  329. if itemname := util.ObjToString(item1["itemname"]); itemname != "" {
  330. if !MatchField(strings.Split(KeyWords, ","), []string{itemname}) {
  331. for kk, vv := range item1 {
  332. skipTmp[kk] = vv
  333. }
  334. delete(skipTmp, "purchasinglist")
  335. delete(skipTmp, "_id")
  336. saveTmpPool <- skipTmp
  337. continue
  338. }
  339. values = append(values, util.ObjToString(itemname))
  340. }
  341. if model := util.ObjToString(item1["model"]); model != "" {
  342. values = append(values, util.ObjToString(model))
  343. }
  344. var matched bool
  345. //匹配 brandname,匹配到就替换原来的值
  346. for _, brand := range BrandArr {
  347. matched = MatchField(brand.Tags, values)
  348. if matched {
  349. item1["brandname"] = brand.Name
  350. break
  351. }
  352. }
  353. //匹配不上,直接用其他
  354. if !matched && util.ObjToString(item1["brandname"]) != "" {
  355. item1["brandname"] = "其他"
  356. }
  357. // 判断补充 purchasinglist 节点里的数量关系
  358. if SumFields(fields, item1) > 1 {
  359. number, okn := item1["number"]
  360. unitprice, oku := item1["unitprice"]
  361. totalprice, okt := item1["totalprice"]
  362. if okn && oku && !okt { //只有数量和单价
  363. item1["totalprice"] = number.(float64) * unitprice.(float64)
  364. } else if okn && okt && !oku { // 只有数量和总价
  365. item1["unitprice"] = totalprice.(float64) / number.(float64)
  366. } else if okt && oku && !okn { //只有总价和单价
  367. number = totalprice.(float64) / unitprice.(float64)
  368. number = math.Ceil(number.(float64))
  369. item1["number"] = number
  370. }
  371. }
  372. for kk, vv := range item1 {
  373. newTmp[kk] = vv
  374. }
  375. delete(newTmp, "purchasinglist")
  376. delete(newTmp, "_id")
  377. savePool <- newTmp
  378. }
  379. }
  380. }
  381. }
  382. }
  383. func saveMethod() {
  384. arru := make([]map[string]interface{}, saveSize)
  385. indexu := 0
  386. for {
  387. select {
  388. case v := <-savePool:
  389. arru[indexu] = v
  390. indexu++
  391. if indexu == saveSize {
  392. saveSp <- true
  393. go func(arru []map[string]interface{}) {
  394. defer func() {
  395. <-saveSp
  396. }()
  397. MgoBid.SaveBulk("20221122Ldx_kzy_new", arru...)
  398. }(arru)
  399. arru = make([]map[string]interface{}, saveSize)
  400. indexu = 0
  401. }
  402. case <-time.After(1000 * time.Millisecond):
  403. if indexu > 0 {
  404. saveSp <- true
  405. go func(arru []map[string]interface{}) {
  406. defer func() {
  407. <-saveSp
  408. }()
  409. MgoBid.SaveBulk("20221122Ldx_kzy_new", arru...)
  410. }(arru[:indexu])
  411. arru = make([]map[string]interface{}, saveSize)
  412. indexu = 0
  413. }
  414. }
  415. }
  416. }
  417. // saveTmpMethod 过滤的数据进一个临时表
  418. func saveTmpMethod() {
  419. arru := make([]map[string]interface{}, saveSize)
  420. indexu := 0
  421. for {
  422. select {
  423. case v := <-saveTmpPool:
  424. arru[indexu] = v
  425. indexu++
  426. if indexu == saveSize {
  427. saveTmpSp <- true
  428. go func(arru []map[string]interface{}) {
  429. defer func() {
  430. <-saveTmpSp
  431. }()
  432. MgoBid.SaveBulk("20221122Ldx_kzy_skip", arru...)
  433. }(arru)
  434. arru = make([]map[string]interface{}, saveSize)
  435. indexu = 0
  436. }
  437. case <-time.After(1000 * time.Millisecond):
  438. if indexu > 0 {
  439. saveTmpSp <- true
  440. go func(arru []map[string]interface{}) {
  441. defer func() {
  442. <-saveTmpSp
  443. }()
  444. MgoBid.SaveBulk("20221122Ldx_kzy_skip", arru...)
  445. }(arru[:indexu])
  446. arru = make([]map[string]interface{}, saveSize)
  447. indexu = 0
  448. }
  449. }
  450. }
  451. }