merge.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "fmt"
  7. "go.uber.org/zap"
  8. "math/rand"
  9. "reflect"
  10. "sort"
  11. "strconv"
  12. "time"
  13. "unicode/utf8"
  14. )
  15. // @Description 参与合并流程
  16. // @Author J 2022/10/20 13:57
  17. func startProjectMerge(tmp map[string]interface{}, info *Info) {
  18. IDArr := getCompareIds(info.ProjectName, info.Site)
  19. isMerge := false
  20. var mergeProject *Project
  21. for _, v := range IDArr {
  22. mergeProject = v.P
  23. if info.Area != "" && mergeProject.Area != "" {
  24. if info.Area != "全国" && mergeProject.Area != "全国" && info.Area != mergeProject.Area {
  25. continue
  26. } else {
  27. isMerge = true
  28. break
  29. }
  30. } else {
  31. isMerge = true
  32. break
  33. }
  34. }
  35. if isMerge && mergeProject != nil {
  36. updateProject(tmp, *info, mergeProject)
  37. } else {
  38. _, pinfo := newProject(tmp, *info)
  39. AllPidMapLock.Lock()
  40. AllPidMap[info.Id] = &ID{P: pinfo}
  41. AllPidMapLock.Unlock()
  42. AllPnMapLock.Lock()
  43. res := AllPnMap[info.ProjectName]
  44. if res != nil {
  45. res.Id[info.Id] = info.ProjectName
  46. } else {
  47. res = &Pname{
  48. Id: map[string]string{info.Id: info.ProjectName},
  49. }
  50. }
  51. AllPnMapLock.Unlock()
  52. }
  53. }
  54. // @Description 通过项目名称找可以与之合并的项目集合
  55. // @Author J 2022/10/20 14:22
  56. func getCompareIds(pname, site string) (IDArr []*ID) {
  57. AllPnMapLock.Lock()
  58. res := AllPnMap[site]
  59. defer AllPnMapLock.Unlock()
  60. pid := ""
  61. IDArr = []*ID{} //项目信息
  62. if res != nil {
  63. res.Lock.Lock()
  64. pid = res.Id[pname]
  65. res.Lock.Unlock()
  66. AllPidMapLock.Lock()
  67. ID := AllPidMap[pid]
  68. AllPidMapLock.Unlock()
  69. if ID != nil {
  70. IDArr = append(IDArr, ID)
  71. }
  72. }
  73. return IDArr
  74. }
  75. func newProject(tmp map[string]interface{}, info Info) (string, *Project) {
  76. pId := mongodb.StringTOBsonId(info.Id)
  77. set := make(map[string]interface{})
  78. set["_id"] = pId
  79. for _, f := range FIELDS {
  80. if tmp[f] != nil && tmp[f] != "" {
  81. if reflect.TypeOf(tmp[f]).String() == "string" {
  82. if utf8.RuneCountInString(util.ObjToString(tmp[f])) < 1000 {
  83. set[f] = tmp[f]
  84. }
  85. } else {
  86. set[f] = tmp[f]
  87. }
  88. }
  89. }
  90. p1 := NewCachePinfo(info)
  91. set["firsttime"] = util.IntAll(info.Publishtime)
  92. set["lasttime"] = util.IntAll(info.Publishtime)
  93. set["ids"] = []string{info.Id}
  94. set["sourceinfourl"] = tmp["href"]
  95. set["follow_num"] = 1
  96. set["pici"] = time.Now().Unix()
  97. p1.FollowNum = 1
  98. now := time.Now().Unix()
  99. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  100. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  101. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  102. set["proposed_number"] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  103. push := PushListInfo(tmp, info.Id)
  104. push["follow_num"] = 1
  105. set["list"] = []map[string]interface{}{
  106. push,
  107. }
  108. updatePool <- []map[string]interface{}{
  109. {
  110. "_id": pId,
  111. },
  112. {
  113. "$set": set,
  114. },
  115. }
  116. return info.Id, &p1
  117. }
  118. // @Description 合并字段信息
  119. // @Author J 2022/10/20 13:57
  120. func updateProject(tmp map[string]interface{}, info Info, pInfo *Project) {
  121. if BinarySearch(pInfo.Ids, info.Id) > -1 {
  122. log.Error("updateProject", zap.String("repeat", info.Id), zap.String("pid", mongodb.BsonIdToSId(pInfo.Id)))
  123. return
  124. }
  125. set := make(map[string]interface{})
  126. pInfo.Ids = append(pInfo.Ids, info.Id)
  127. pInfo.LastTime = info.Publishtime
  128. set["lasttime"] = info.Publishtime
  129. pInfo.FollowNum += 1
  130. set["follow_num"] = pInfo.FollowNum
  131. if pInfo.Owner == "" && info.Owner != "" {
  132. pInfo.Owner = info.Owner
  133. set["owner"] = info.Owner
  134. }
  135. if pInfo.ApproveCode == "" && info.ApproveCode != "" {
  136. pInfo.ApproveCode = info.ApproveCode
  137. set["approvecode"] = info.ApproveCode
  138. }
  139. if pInfo.ApproveNumber == "" && info.ApproveNumber != "" {
  140. pInfo.ApproveNumber = info.ApproveNumber
  141. set["approvenumber"] = info.ApproveNumber
  142. }
  143. if pInfo.TotalInvestment == "" && info.TotalInvestment != "" && utf8.RuneCountInString(info.TotalInvestment) < 1000 {
  144. pInfo.TotalInvestment = info.TotalInvestment
  145. set["total_investment"] = info.TotalInvestment
  146. }
  147. if pInfo.Funds == "" && info.Funds != "" && utf8.RuneCountInString(info.Funds) < 1000 {
  148. pInfo.Funds = info.Funds
  149. set["funds"] = info.Funds
  150. }
  151. if pInfo.ProjectAddr == "" && info.ProjectAddr != "" && utf8.RuneCountInString(info.ProjectAddr) < 1000 {
  152. pInfo.ProjectAddr = info.ProjectAddr
  153. set["projectaddr"] = info.ProjectAddr
  154. }
  155. if pInfo.ProjectPeriod == "" && info.ProjectPeriod != "" && utf8.RuneCountInString(info.ProjectPeriod) < 1000 {
  156. pInfo.ProjectPeriod = info.ProjectPeriod
  157. set["projectperiod"] = info.ProjectPeriod
  158. }
  159. if pInfo.ProjectScale == "" && info.ProjectScale != "" && utf8.RuneCountInString(info.ProjectScale) < 1000 {
  160. pInfo.ProjectPeriod = info.ProjectScale
  161. set["project_scale"] = info.ProjectScale
  162. }
  163. if pInfo.ProjectStartDate == 0 && info.ProjectStartDate > 0 {
  164. pInfo.ProjectStartDate = info.ProjectStartDate
  165. set["project_startdate"] = info.ProjectStartDate
  166. }
  167. if pInfo.ProjectCompleteDate == 0 && info.ProjectCompleteDate > 0 {
  168. pInfo.ProjectCompleteDate = info.ProjectCompleteDate
  169. set["project_completedate"] = info.ProjectCompleteDate
  170. }
  171. if pInfo.ConstructionArea == "" && info.ConstructionArea != "" && utf8.RuneCountInString(info.ConstructionArea) < 1000 {
  172. pInfo.ConstructionArea = info.ConstructionArea
  173. set["construction_area"] = info.ConstructionArea
  174. }
  175. if pInfo.FloorArea == "" && info.FloorArea != "" && utf8.RuneCountInString(info.FloorArea) < 1000 {
  176. pInfo.FloorArea = info.FloorArea
  177. set["floor_area"] = info.FloorArea
  178. }
  179. if pInfo.ProjectPerson == "" && info.ProjectPerson != "" {
  180. pInfo.ProjectPerson = info.ProjectPerson
  181. set["project_person"] = info.ProjectPerson
  182. }
  183. if pInfo.ProjectPhone == "" && info.ProjectPhone != "" {
  184. pInfo.ProjectPhone = info.ProjectPhone
  185. set["project_phone"] = info.ProjectPhone
  186. }
  187. for _, f := range []string{"ownerclass_code", "category_code", "nature_code", "project_stage_code"} {
  188. if c := util.ObjToString(tmp[f]); c != "" {
  189. set[f] = c
  190. }
  191. }
  192. set["pici"] = time.Now().Unix()
  193. push := PushListInfo(tmp, info.Id)
  194. push["follow_num"] = pInfo.FollowNum
  195. update := map[string]interface{}{}
  196. set["updatetime"] = time.Now().Unix()
  197. update["$set"] = set
  198. update["$push"] = map[string]interface{}{
  199. "list": push,
  200. "ids": info.Id,
  201. }
  202. updateInfo := []map[string]interface{}{
  203. {
  204. "_id": mongodb.StringTOBsonId(pInfo.Id),
  205. },
  206. update,
  207. }
  208. updatePool <- updateInfo
  209. }
  210. // @Description list中存放的信息
  211. // @Author J 2022/10/23 10:16
  212. func PushListInfo(tmp map[string]interface{}, infoid string) map[string]interface{} {
  213. res := map[string]interface{}{
  214. "infoid": infoid,
  215. }
  216. for _, k := range INFOFIELDS {
  217. if tmp[k] != nil {
  218. res[k] = tmp[k]
  219. }
  220. }
  221. return res
  222. }
  223. func NewCachePinfo(info Info) Project {
  224. p1 := Project{
  225. Id: info.Id,
  226. Ids: []string{info.Id},
  227. ProjectName: info.ProjectName,
  228. ApproveCode: info.ApproveCode,
  229. Area: info.Area,
  230. City: info.City,
  231. District: info.District,
  232. }
  233. return p1
  234. }
  235. //二分字符串查找
  236. func BinarySearch(s []string, k string) int {
  237. sort.Strings(s)
  238. lo, hi := 0, len(s)-1
  239. for lo <= hi {
  240. m := (lo + hi) >> 1
  241. if s[m] < k {
  242. lo = m + 1
  243. } else if s[m] > k {
  244. hi = m - 1
  245. } else {
  246. return m
  247. }
  248. }
  249. return -1
  250. }