project.go 14 KB


  1. package main
  2. import (
  3. "data_tidb/config"
  4. "fmt"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.uber.org/zap"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. func taskP() {
  16. sess := MongoP.GetMgoConn()
  17. defer MongoP.DestoryMongoConn(sess)
  18. ch := make(chan bool, 20)
  19. wg := &sync.WaitGroup{}
  20. //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("64e5a63855d5406905c574e6")}
  21. query := sess.DB(config.Conf.DB.MongoP.Dbname).C("projectset_20230407").Find(nil).Sort("-_id").Skip(100000).Iter()
  22. count := 0
  23. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  24. if count%20000 == 0 {
  25. log.Info(fmt.Sprintf("current --- %d", count))
  26. }
  27. ch <- true
  28. wg.Add(1)
  29. go func(tmp map[string]interface{}) {
  30. defer func() {
  31. <-ch
  32. wg.Done()
  33. }()
  34. //taskPro(tmp)
  35. //taskBusiness(tmp)
  36. //taskProTag(tmp)
  37. taskRelation(tmp)
  38. }(tmp)
  39. tmp = make(map[string]interface{})
  40. }
  41. wg.Wait()
  42. log.Info(fmt.Sprintf("over --- %d", count))
  43. }
  44. func taskPAdd(pici int64) {
  45. sess := MongoP.GetMgoConn()
  46. defer MongoP.DestoryMongoConn(sess)
  47. ch := make(chan bool, 20)
  48. wg := &sync.WaitGroup{}
  49. q := bson.M{"pici": bson.M{"$gt": pici}}
  50. query := sess.DB(config.Conf.DB.MongoP.Dbname).C("projectset_20230407").Find(q).Iter()
  51. count := 0
  52. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  53. if count%20000 == 0 {
  54. log.Info(fmt.Sprintf("current --- %d", count))
  55. }
  56. ch <- true
  57. wg.Add(1)
  58. go func(tmp map[string]interface{}) {
  59. defer func() {
  60. <-ch
  61. wg.Done()
  62. }()
  63. //taskPro(tmp)
  64. //taskBusiness(tmp)
  65. //taskProTag(tmp)
  66. taskRelation2(tmp)
  67. }(tmp)
  68. tmp = make(map[string]interface{})
  69. }
  70. wg.Wait()
  71. log.Info(fmt.Sprintf("over --- %d", count))
  72. }
  73. var BidStatus = map[string]int{
  74. "预告": 0,
  75. "拟建": 1,
  76. "招标": 2,
  77. "中标": 3,
  78. "成交": 4,
  79. "废标": 5,
  80. "流标": 6,
  81. "合同": 7,
  82. "其它": 8,
  83. }
  84. var BidType = map[string]int{
  85. "招标": 0,
  86. "邀标": 1,
  87. "单一": 2,
  88. "竞价": 3,
  89. "竞谈": 4,
  90. "询价": 5,
  91. }
  92. // @Description 基础信息
  93. // @Author J 2022/9/22 18:32
  94. func taskPro(tmp map[string]interface{}) {
  95. saveM := make(map[string]interface{})
  96. for _, f := range ProField {
  97. if f == "projectid" {
  98. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  99. } else if f == "area_code" {
  100. if tmp["area"] != nil {
  101. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  102. }
  103. } else if f == "city_code" {
  104. if tmp["area"] != nil && tmp["city"] != nil {
  105. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  106. saveM[f] = AreaCode[c]
  107. }
  108. } else if f == "district_code" {
  109. if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
  110. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
  111. saveM[f] = AreaCode[c]
  112. }
  113. } else if f == "updatetime" {
  114. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  115. } else if f == "buyerclass_code" {
  116. if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
  117. saveM[f] = BuyerCode[obj]
  118. }
  119. } else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
  120. if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
  121. t := util.Int64All(tmp[f])
  122. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  123. }
  124. } else if f == "bidstatus" {
  125. if b := util.ObjToString(tmp[f]); b != "" {
  126. tmp[f] = BidStatus[b]
  127. }
  128. } else if f == "bidtype" {
  129. if b := util.ObjToString(tmp[f]); b != "" {
  130. tmp[f] = BidType[b]
  131. }
  132. } else if f == "multipackage" {
  133. if tmp[f] == nil {
  134. saveM[f] = 0
  135. } else {
  136. saveM[f] = tmp[f]
  137. }
  138. } else if f == "buyer_id" {
  139. if b := util.ObjToString(tmp["buyer"]); b != "" {
  140. if code := redis.GetStr("qyxy_id", b); code != "" {
  141. saveM[f] = code
  142. }
  143. }
  144. } else if f == "agency_id" {
  145. if b := util.ObjToString(tmp["agency"]); b != "" {
  146. if code := redis.GetStr("qyxy_id", b); code != "" {
  147. saveM[f] = code
  148. }
  149. }
  150. } else {
  151. if tmp[f] != nil {
  152. if ProVMap[f] != nil {
  153. saveM[f], _ = verifyF(f, tmp[f], ProVMap[f])
  154. } else {
  155. saveM[f] = tmp[f]
  156. }
  157. }
  158. }
  159. }
  160. saveProPool <- saveM
  161. }
  162. // @Description 项目业务表
  163. // @Author J 2022/9/30 13:40
  164. func taskBusiness(tmp map[string]interface{}) {
  165. warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  166. if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
  167. warr = append(warr, util.ObjToString(tmp["winner"]))
  168. }
  169. for _, s := range warr {
  170. saveM := make(map[string]interface{})
  171. for _, f := range ProBusField {
  172. if f == "projectid" {
  173. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  174. } else if f == "area_code" {
  175. if tmp["area"] != nil {
  176. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  177. }
  178. } else if f == "city_code" {
  179. if tmp["area"] != nil && tmp["city"] != nil {
  180. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  181. saveM[f] = AreaCode[c]
  182. }
  183. } else if f == "district_code" {
  184. if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
  185. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
  186. saveM[f] = AreaCode[c]
  187. }
  188. } else if f == "updatetime" {
  189. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  190. } else if f == "buyerclass_code" {
  191. if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
  192. saveM[f] = BuyerCode[obj]
  193. }
  194. } else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
  195. if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
  196. t := util.Int64All(tmp[f])
  197. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  198. }
  199. } else if f == "bidstatus" {
  200. if b := util.ObjToString(tmp[f]); b != "" {
  201. tmp[f] = BidStatus[b]
  202. }
  203. } else if f == "bidtype" {
  204. if b := util.ObjToString(tmp[f]); b != "" {
  205. tmp[f] = BidType[b]
  206. }
  207. } else if f == "buyer_id" {
  208. if b := util.ObjToString(tmp["buyer"]); b != "" {
  209. saveM["buyer"] = b
  210. if code := redis.GetStr("qyxy_id", b); code != "" {
  211. saveM[f] = code
  212. }
  213. }
  214. } else if f == "agency_id" {
  215. if b := util.ObjToString(tmp["agency"]); b != "" {
  216. saveM["agency"] = b
  217. if code := redis.GetStr("qyxy_id", b); code != "" {
  218. saveM[f] = code
  219. }
  220. }
  221. } else if f == "winner_id" {
  222. if s != "" {
  223. saveM["winner"] = s
  224. if code := redis.GetStr("qyxy_id", s); code != "" {
  225. saveM[f] = code
  226. }
  227. }
  228. } else {
  229. if tmp[f] != nil {
  230. if ProBusVMap[f] != nil {
  231. saveM[f], _ = verifyF(f, tmp[f], ProBusVMap[f])
  232. } else {
  233. saveM[f] = tmp[f]
  234. }
  235. }
  236. }
  237. }
  238. saveProbPool <- saveM
  239. }
  240. }
  241. // @Description 项目信息标签
  242. // @Author J 2022/9/30 13:54
  243. func taskProTag(tmp map[string]interface{}) {
  244. id := mongodb.BsonIdToSId(tmp["_id"])
  245. if topArr, ok := tmp["topscopeclass"].([]interface{}); ok {
  246. for _, i2 := range topArr {
  247. tclass := regLetter.ReplaceAllString(util.ObjToString(i2), "") // 去除字母
  248. code := TopScopeCode[tclass]
  249. saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
  250. }
  251. }
  252. if subArr, ok := tmp["subscopeclass"].([]interface{}); ok {
  253. for _, i2 := range subArr {
  254. sc := strings.Split(util.ObjToString(i2), "_")
  255. code := SubScopeCode[sc[1]]
  256. saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
  257. }
  258. }
  259. }
  260. // @Description 关系表
  261. // @Author J 2022/9/30 13:56
  262. func taskRelation(tmp map[string]interface{}) {
  263. pid := mongodb.BsonIdToSId(tmp["_id"])
  264. if tmp["ids"] == nil {
  265. log.Info("taskRelation ids err", zap.Any("id", pid))
  266. return
  267. }
  268. ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
  269. lid := ids[len(ids)-1]
  270. //if b := util.ObjToString(tmp["buyer"]); b != "" {
  271. // saveM := make(map[string]interface{})
  272. //
  273. // saveM["projectid"] = pid
  274. // saveM["infoid"] = lid
  275. // saveM["identity_type"] = 1
  276. // saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  277. // if code := redis.GetStr("qyxy_id", b); code != "" {
  278. // saveM["name_id"] = code
  279. // if util.ObjToString(tmp["buyertel"]) != "" {
  280. // q := make(map[string]interface{})
  281. // q["name_id"] = code
  282. // q["identity_type"] = 1
  283. // q["contact_tel"] = util.ObjToString(tmp["buyertel"])
  284. // if util.ObjToString(tmp["buyerperson"]) != "" {
  285. // q["contact_name"] = util.ObjToString(tmp["buyerperson"])
  286. // }
  287. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  288. // if cinfo != nil && len(*cinfo) > 0 {
  289. // saveM["contact_id"] = (*cinfo)["id"]
  290. // saveRelationPool <- saveM
  291. // }
  292. // }
  293. // }
  294. //}
  295. //if a := util.ObjToString(tmp["agency"]); a != "" {
  296. // saveM := make(map[string]interface{})
  297. // saveM["projectid"] = pid
  298. // saveM["infoid"] = lid
  299. // saveM["identity_type"] = 4
  300. // saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  301. // if code := redis.GetStr("qyxy_id", a); code != "" {
  302. // saveM["name_id"] = code
  303. // if util.ObjToString(tmp["agencytel"]) != "" {
  304. // q := make(map[string]interface{})
  305. // q["name_id"] = code
  306. // q["identity_type"] = 4 // 100
  307. // q["contact_tel"] = util.ObjToString(tmp["agencytel"])
  308. // if util.ObjToString(tmp["agencyperson"]) != "" {
  309. // q["contact_name"] = util.ObjToString(tmp["agencyperson"])
  310. // }
  311. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  312. // if cinfo != nil && len(*cinfo) > 0 {
  313. // saveM["contact_id"] = (*cinfo)["id"]
  314. // saveRelationPool <- saveM
  315. // }
  316. // }
  317. // }
  318. //}
  319. for _, item := range tmp["list"].([]interface{}) {
  320. item1 := item.(map[string]interface{})
  321. sw := util.ObjToString(item1["s_winner"])
  322. if !strings.Contains(sw, ",") {
  323. if code := redis.GetStr("qyxy_id", sw); code != "" {
  324. saveM := make(map[string]interface{})
  325. saveM["projectid"] = pid
  326. saveM["infoid"] = lid
  327. saveM["identity_type"] = 2
  328. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  329. saveM["name_id"] = code
  330. if util.ObjToString(item1["winnertel"]) != "" {
  331. q := make(map[string]interface{})
  332. q["name_id"] = code
  333. q["identity_type"] = 2 // 010
  334. q["contact_tel"] = util.ObjToString(item1["winnertel"])
  335. if util.ObjToString(item1["winnerperson"]) != "" {
  336. q["contact_name"] = util.ObjToString(item1["winnerperson"])
  337. }
  338. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  339. if cinfo != nil && len(*cinfo) > 0 {
  340. saveM["contact_id"] = (*cinfo)["id"]
  341. saveRelationPool <- saveM
  342. }
  343. }
  344. }
  345. }
  346. }
  347. }
  348. func taskRelation2(tmp map[string]interface{}) {
  349. pid := mongodb.BsonIdToSId(tmp["_id"])
  350. if tmp["ids"] == nil {
  351. log.Info("taskRelation ids err", zap.Any("id", pid))
  352. return
  353. }
  354. info := MysqlTool.Find("dws_f_bpmc_relation", bson.M{"projectid": pid}, "", "", -1, -1)
  355. if len(*info) > 0 {
  356. } else {
  357. ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
  358. lid := ids[len(ids)-1]
  359. if b := util.ObjToString(tmp["buyer"]); b != "" {
  360. saveM := make(map[string]interface{})
  361. for _, f := range RelationField {
  362. if f == "projectid" {
  363. saveM[f] = pid
  364. } else if f == "infoid" {
  365. saveM[f] = lid
  366. } else if f == "name_id" {
  367. if code := redis.GetStr("qyxy_id", b); code != "" {
  368. saveM[f] = code
  369. if util.ObjToString(tmp["buyertel"]) != "" {
  370. q := make(map[string]interface{})
  371. q["name_id"] = code
  372. q["identity_type"] = 1
  373. q["contact_tel"] = util.ObjToString(tmp["buyertel"])
  374. if util.ObjToString(tmp["buyerperson"]) != "" {
  375. q["contact_name"] = util.ObjToString(tmp["buyerperson"])
  376. }
  377. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  378. if cinfo != nil && len(*cinfo) > 0 {
  379. saveM["contact_id"] = (*cinfo)["id"]
  380. }
  381. }
  382. }
  383. } else if f == "identity_type" {
  384. saveM[f] = 1 // 001
  385. } else if f == "createtime" {
  386. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  387. }
  388. }
  389. saveRelationPool <- saveM
  390. }
  391. if a := util.ObjToString(tmp["agency"]); a != "" {
  392. saveM := make(map[string]interface{})
  393. for _, f := range RelationField {
  394. if f == "projectid" {
  395. saveM[f] = pid
  396. } else if f == "infoid" {
  397. saveM[f] = lid
  398. } else if f == "name_id" {
  399. if code := redis.GetStr("qyxy_id", a); code != "" {
  400. saveM[f] = code
  401. if util.ObjToString(tmp["buyertel"]) != "" {
  402. q := make(map[string]interface{})
  403. q["name_id"] = code
  404. q["identity_type"] = 4
  405. q["contact_tel"] = util.ObjToString(tmp["agencytel"])
  406. if util.ObjToString(tmp["agencyperson"]) != "" {
  407. q["contact_name"] = util.ObjToString(tmp["agencyperson"])
  408. }
  409. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  410. if cinfo != nil && len(*cinfo) > 0 {
  411. saveM["contact_id"] = (*cinfo)["id"]
  412. }
  413. }
  414. }
  415. } else if f == "identity_type" {
  416. saveM[f] = 4 // 100
  417. } else if f == "createtime" {
  418. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  419. }
  420. }
  421. saveRelationPool <- saveM
  422. }
  423. warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  424. if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
  425. warr = append(warr, util.ObjToString(tmp["winner"]))
  426. }
  427. for _, ws := range warr {
  428. saveM := make(map[string]interface{})
  429. for _, f := range RelationField {
  430. if f == "projectid" {
  431. saveM[f] = pid
  432. } else if f == "infoid" {
  433. saveM[f] = lid
  434. } else if f == "name_id" {
  435. if code := redis.GetStr("qyxy_id", ws); code != "" {
  436. saveM[f] = code
  437. if util.ObjToString(tmp["buyertel"]) != "" {
  438. q := make(map[string]interface{})
  439. q["name_id"] = code
  440. q["identity_type"] = 2
  441. q["contact_tel"] = util.ObjToString(tmp["winnertel"])
  442. if util.ObjToString(tmp["winnerperson"]) != "" {
  443. q["contact_name"] = util.ObjToString(tmp["winnerperson"])
  444. }
  445. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  446. if cinfo != nil && len(*cinfo) > 0 {
  447. saveM["contact_id"] = (*cinfo)["id"]
  448. }
  449. }
  450. }
  451. } else if f == "identity_type" {
  452. saveM[f] = 2 // 010
  453. } else if f == "createtime" {
  454. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  455. }
  456. }
  457. saveRelationPool <- saveM
  458. }
  459. }
  460. }