merge.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. package main
  2. import (
  3. "fmt"
  4. "go.uber.org/zap"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  8. "math/rand"
  9. "reflect"
  10. "sort"
  11. "strconv"
  12. "sync"
  13. "time"
  14. "unicode/utf8"
  15. )
  16. // @Description 参与合并流程
  17. // @Author J 2022/10/20 13:57
  18. func startProjectMerge(tmp map[string]interface{}, info *Info) string {
  19. IDArr := getCompareIds(info.ProjectName, info.Site)
  20. isMerge := false
  21. var mergeProject *Project
  22. for _, v := range IDArr {
  23. mergeProject = v.P
  24. if info.Area != "" && mergeProject.Area != "" {
  25. if info.Area != "全国" && mergeProject.Area != "全国" && info.Area != mergeProject.Area {
  26. continue
  27. } else {
  28. isMerge = true
  29. break
  30. }
  31. } else {
  32. isMerge = true
  33. break
  34. }
  35. }
  36. if isMerge && mergeProject != nil {
  37. updateProject(tmp, *info, mergeProject)
  38. return mergeProject.Id
  39. } else {
  40. _, pinfo := newProject(tmp, *info)
  41. AllPidMapLock.Lock()
  42. AllPidMap[info.Id] = &ID{P: pinfo}
  43. AllPidMapLock.Unlock()
  44. AllPnMapLock.Lock()
  45. res := AllPnMap[info.Site]
  46. if res != nil {
  47. res.Lock.Lock()
  48. res.Id[info.ProjectName] = info.Id
  49. res.Lock.Unlock()
  50. } else {
  51. res = &Pname{
  52. Id: map[string]string{info.ProjectName: info.Id},
  53. Lock: sync.Mutex{},
  54. }
  55. }
  56. AllPnMap[info.Site] = res
  57. AllPnMapLock.Unlock()
  58. }
  59. return info.Id
  60. }
  61. // @Description 通过项目名称找可以与之合并的项目集合
  62. // @Author J 2022/10/20 14:22
  63. func getCompareIds(pname, site string) (IDArr []*ID) {
  64. AllPnMapLock.Lock()
  65. res := AllPnMap[site]
  66. defer AllPnMapLock.Unlock()
  67. pid := ""
  68. IDArr = []*ID{} //项目信息
  69. if res != nil {
  70. res.Lock.Lock()
  71. pid = res.Id[pname]
  72. res.Lock.Unlock()
  73. AllPidMapLock.Lock()
  74. ID := AllPidMap[pid]
  75. AllPidMapLock.Unlock()
  76. if ID != nil {
  77. IDArr = append(IDArr, ID)
  78. }
  79. }
  80. return IDArr
  81. }
  82. func newProject(tmp map[string]interface{}, info Info) (string, *Project) {
  83. pId := mongodb.StringTOBsonId(info.Id)
  84. set := make(map[string]interface{})
  85. set["_id"] = pId
  86. for _, f := range FIELDS {
  87. if tmp[f] != nil && tmp[f] != "" {
  88. if reflect.TypeOf(tmp[f]).String() == "string" {
  89. if utf8.RuneCountInString(util.ObjToString(tmp[f])) < 1000 {
  90. set[f] = tmp[f]
  91. }
  92. } else {
  93. set[f] = tmp[f]
  94. }
  95. }
  96. }
  97. p1 := NewCachePinfo(info)
  98. set["firsttime"] = util.IntAll(info.Publishtime)
  99. set["lasttime"] = util.IntAll(info.Publishtime)
  100. set["ids"] = []string{info.Id}
  101. set["sourceinfourl"] = tmp["href"]
  102. set["follow_num"] = 1
  103. set["pici"] = time.Now().Unix()
  104. p1.FollowNum = 1
  105. now := time.Now().Unix()
  106. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  107. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  108. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  109. set["proposed_number"] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  110. push := PushListInfo(tmp, info.Id)
  111. push["follow_num"] = 1
  112. set["list"] = []map[string]interface{}{
  113. push,
  114. }
  115. updatePool <- []map[string]interface{}{
  116. {
  117. "_id": pId,
  118. },
  119. {
  120. "$set": set,
  121. },
  122. }
  123. return info.Id, &p1
  124. }
  125. // @Description 合并字段信息
  126. // @Author J 2022/10/20 13:57
  127. func updateProject(tmp map[string]interface{}, info Info, pInfo *Project) {
  128. if BinarySearch(pInfo.Ids, info.Id) > -1 {
  129. log.Error("updateProject", zap.String("repeat", info.Id), zap.String("pid", mongodb.BsonIdToSId(pInfo.Id)))
  130. return
  131. }
  132. set := make(map[string]interface{})
  133. pInfo.Ids = append(pInfo.Ids, info.Id)
  134. pInfo.LastTime = info.Publishtime
  135. set["lasttime"] = info.Publishtime
  136. pInfo.FollowNum += 1
  137. set["follow_num"] = pInfo.FollowNum
  138. if pInfo.Owner == "" && info.Owner != "" {
  139. pInfo.Owner = info.Owner
  140. set["owner"] = info.Owner
  141. }
  142. if pInfo.ApproveCode == "" && info.ApproveCode != "" {
  143. pInfo.ApproveCode = info.ApproveCode
  144. set["approvecode"] = info.ApproveCode
  145. }
  146. if pInfo.ApproveNumber == "" && info.ApproveNumber != "" {
  147. pInfo.ApproveNumber = info.ApproveNumber
  148. set["approvenumber"] = info.ApproveNumber
  149. }
  150. if pInfo.ApproveDept == "" && info.ApproveDept != "" {
  151. pInfo.ApproveDept = info.ApproveDept
  152. set["approvedept"] = info.ApproveDept
  153. }
  154. if pInfo.TotalInvestment == "" && info.TotalInvestment != "" && utf8.RuneCountInString(info.TotalInvestment) < 1000 {
  155. pInfo.TotalInvestment = info.TotalInvestment
  156. set["total_investment"] = info.TotalInvestment
  157. }
  158. if pInfo.Funds == "" && info.Funds != "" && utf8.RuneCountInString(info.Funds) < 1000 {
  159. pInfo.Funds = info.Funds
  160. set["funds"] = info.Funds
  161. }
  162. if pInfo.ProjectAddr == "" && info.ProjectAddr != "" && utf8.RuneCountInString(info.ProjectAddr) < 1000 {
  163. pInfo.ProjectAddr = info.ProjectAddr
  164. set["projectaddr"] = info.ProjectAddr
  165. }
  166. if pInfo.ProjectPeriod == "" && info.ProjectPeriod != "" && utf8.RuneCountInString(info.ProjectPeriod) < 1000 {
  167. pInfo.ProjectPeriod = info.ProjectPeriod
  168. set["projectperiod"] = info.ProjectPeriod
  169. }
  170. if pInfo.ProjectScale == "" && info.ProjectScale != "" && utf8.RuneCountInString(info.ProjectScale) < 1000 {
  171. pInfo.ProjectPeriod = info.ProjectScale
  172. set["project_scale"] = info.ProjectScale
  173. }
  174. if pInfo.ProjectStartDate == 0 && info.ProjectStartDate > 0 {
  175. pInfo.ProjectStartDate = info.ProjectStartDate
  176. set["project_startdate"] = info.ProjectStartDate
  177. }
  178. if pInfo.ProjectCompleteDate == 0 && info.ProjectCompleteDate > 0 {
  179. pInfo.ProjectCompleteDate = info.ProjectCompleteDate
  180. set["project_completedate"] = info.ProjectCompleteDate
  181. }
  182. if pInfo.ConstructionArea == "" && info.ConstructionArea != "" && utf8.RuneCountInString(info.ConstructionArea) < 1000 {
  183. pInfo.ConstructionArea = info.ConstructionArea
  184. set["construction_area"] = info.ConstructionArea
  185. }
  186. if pInfo.FloorArea == "" && info.FloorArea != "" && utf8.RuneCountInString(info.FloorArea) < 1000 {
  187. pInfo.FloorArea = info.FloorArea
  188. set["floor_area"] = info.FloorArea
  189. }
  190. if pInfo.ProjectPerson == "" && info.ProjectPerson != "" {
  191. pInfo.ProjectPerson = info.ProjectPerson
  192. set["project_person"] = info.ProjectPerson
  193. }
  194. if pInfo.ProjectPhone == "" && info.ProjectPhone != "" {
  195. pInfo.ProjectPhone = info.ProjectPhone
  196. set["project_phone"] = info.ProjectPhone
  197. }
  198. for _, f := range []string{"ownerclass_code", "category_code", "nature_code", "project_stage_code"} {
  199. if c := util.ObjToString(tmp[f]); c != "" {
  200. set[f] = c
  201. }
  202. }
  203. set["pici"] = time.Now().Unix()
  204. push := PushListInfo(tmp, info.Id)
  205. push["follow_num"] = pInfo.FollowNum
  206. update := map[string]interface{}{}
  207. set["updatetime"] = time.Now().Unix()
  208. update["$set"] = set
  209. update["$push"] = map[string]interface{}{
  210. "list": push,
  211. "ids": info.Id,
  212. }
  213. updateInfo := []map[string]interface{}{
  214. {
  215. "_id": mongodb.StringTOBsonId(pInfo.Id),
  216. },
  217. update,
  218. }
  219. updatePool <- updateInfo
  220. }
  221. // @Description list中存放的信息
  222. // @Author J 2022/10/23 10:16
  223. func PushListInfo(tmp map[string]interface{}, infoid string) map[string]interface{} {
  224. res := map[string]interface{}{
  225. "infoid": infoid,
  226. }
  227. for _, k := range INFOFIELDS {
  228. if tmp[k] != nil {
  229. res[k] = tmp[k]
  230. }
  231. }
  232. return res
  233. }
  234. func NewCachePinfo(info Info) Project {
  235. p1 := Project{
  236. Id: info.Id,
  237. Ids: []string{info.Id},
  238. ProjectName: info.ProjectName,
  239. ApproveCode: info.ApproveCode,
  240. Area: info.Area,
  241. City: info.City,
  242. District: info.District,
  243. }
  244. return p1
  245. }
  246. // 二分字符串查找
  247. func BinarySearch(s []string, k string) int {
  248. sort.Strings(s)
  249. lo, hi := 0, len(s)-1
  250. for lo <= hi {
  251. m := (lo + hi) >> 1
  252. if s[m] < k {
  253. lo = m + 1
  254. } else if s[m] > k {
  255. hi = m - 1
  256. } else {
  257. return m
  258. }
  259. }
  260. return -1
  261. }