merge.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "fmt"
  7. "go.uber.org/zap"
  8. "math/rand"
  9. "reflect"
  10. "sort"
  11. "strconv"
  12. "sync"
  13. "time"
  14. "unicode/utf8"
  15. )
  16. // @Description 参与合并流程
  17. // @Author J 2022/10/20 13:57
  18. func startProjectMerge(tmp map[string]interface{}, info *Info) {
  19. IDArr := getCompareIds(info.ProjectName, info.Site)
  20. isMerge := false
  21. var mergeProject *Project
  22. for _, v := range IDArr {
  23. mergeProject = v.P
  24. if info.Area != "" && mergeProject.Area != "" {
  25. if info.Area != "全国" && mergeProject.Area != "全国" && info.Area != mergeProject.Area {
  26. continue
  27. } else {
  28. isMerge = true
  29. break
  30. }
  31. } else {
  32. isMerge = true
  33. break
  34. }
  35. }
  36. if isMerge && mergeProject != nil {
  37. updateProject(tmp, *info, mergeProject)
  38. } else {
  39. _, pinfo := newProject(tmp, *info)
  40. AllPidMapLock.Lock()
  41. AllPidMap[info.Id] = &ID{P: pinfo}
  42. AllPidMapLock.Unlock()
  43. AllPnMapLock.Lock()
  44. res := AllPnMap[info.Site]
  45. if res != nil {
  46. res.Lock.Lock()
  47. res.Id[info.ProjectName] = info.Id
  48. res.Lock.Unlock()
  49. } else {
  50. res = &Pname{
  51. Id: map[string]string{info.ProjectName: info.Id},
  52. Lock: sync.Mutex{},
  53. }
  54. }
  55. AllPnMap[info.Site] = res
  56. AllPnMapLock.Unlock()
  57. }
  58. }
  59. // @Description 通过项目名称找可以与之合并的项目集合
  60. // @Author J 2022/10/20 14:22
  61. func getCompareIds(pname, site string) (IDArr []*ID) {
  62. AllPnMapLock.Lock()
  63. res := AllPnMap[site]
  64. defer AllPnMapLock.Unlock()
  65. pid := ""
  66. IDArr = []*ID{} //项目信息
  67. if res != nil {
  68. res.Lock.Lock()
  69. pid = res.Id[pname]
  70. res.Lock.Unlock()
  71. AllPidMapLock.Lock()
  72. ID := AllPidMap[pid]
  73. AllPidMapLock.Unlock()
  74. if ID != nil {
  75. IDArr = append(IDArr, ID)
  76. }
  77. }
  78. return IDArr
  79. }
  80. func newProject(tmp map[string]interface{}, info Info) (string, *Project) {
  81. pId := mongodb.StringTOBsonId(info.Id)
  82. set := make(map[string]interface{})
  83. set["_id"] = pId
  84. for _, f := range FIELDS {
  85. if tmp[f] != nil && tmp[f] != "" {
  86. if reflect.TypeOf(tmp[f]).String() == "string" {
  87. if utf8.RuneCountInString(util.ObjToString(tmp[f])) < 1000 {
  88. set[f] = tmp[f]
  89. }
  90. } else {
  91. set[f] = tmp[f]
  92. }
  93. }
  94. }
  95. p1 := NewCachePinfo(info)
  96. set["firsttime"] = util.IntAll(info.Publishtime)
  97. set["lasttime"] = util.IntAll(info.Publishtime)
  98. set["ids"] = []string{info.Id}
  99. set["sourceinfourl"] = tmp["href"]
  100. set["follow_num"] = 1
  101. set["pici"] = time.Now().Unix()
  102. p1.FollowNum = 1
  103. now := time.Now().Unix()
  104. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  105. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  106. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  107. set["proposed_number"] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  108. push := PushListInfo(tmp, info.Id)
  109. push["follow_num"] = 1
  110. set["list"] = []map[string]interface{}{
  111. push,
  112. }
  113. updatePool <- []map[string]interface{}{
  114. {
  115. "_id": pId,
  116. },
  117. {
  118. "$set": set,
  119. },
  120. }
  121. return info.Id, &p1
  122. }
  123. // @Description 合并字段信息
  124. // @Author J 2022/10/20 13:57
  125. func updateProject(tmp map[string]interface{}, info Info, pInfo *Project) {
  126. if BinarySearch(pInfo.Ids, info.Id) > -1 {
  127. log.Error("updateProject", zap.String("repeat", info.Id), zap.String("pid", mongodb.BsonIdToSId(pInfo.Id)))
  128. return
  129. }
  130. set := make(map[string]interface{})
  131. pInfo.Ids = append(pInfo.Ids, info.Id)
  132. pInfo.LastTime = info.Publishtime
  133. set["lasttime"] = info.Publishtime
  134. pInfo.FollowNum += 1
  135. set["follow_num"] = pInfo.FollowNum
  136. if pInfo.Owner == "" && info.Owner != "" {
  137. pInfo.Owner = info.Owner
  138. set["owner"] = info.Owner
  139. }
  140. if pInfo.ApproveCode == "" && info.ApproveCode != "" {
  141. pInfo.ApproveCode = info.ApproveCode
  142. set["approvecode"] = info.ApproveCode
  143. }
  144. if pInfo.ApproveNumber == "" && info.ApproveNumber != "" {
  145. pInfo.ApproveNumber = info.ApproveNumber
  146. set["approvenumber"] = info.ApproveNumber
  147. }
  148. if pInfo.TotalInvestment == "" && info.TotalInvestment != "" && utf8.RuneCountInString(info.TotalInvestment) < 1000 {
  149. pInfo.TotalInvestment = info.TotalInvestment
  150. set["total_investment"] = info.TotalInvestment
  151. }
  152. if pInfo.Funds == "" && info.Funds != "" && utf8.RuneCountInString(info.Funds) < 1000 {
  153. pInfo.Funds = info.Funds
  154. set["funds"] = info.Funds
  155. }
  156. if pInfo.ProjectAddr == "" && info.ProjectAddr != "" && utf8.RuneCountInString(info.ProjectAddr) < 1000 {
  157. pInfo.ProjectAddr = info.ProjectAddr
  158. set["projectaddr"] = info.ProjectAddr
  159. }
  160. if pInfo.ProjectPeriod == "" && info.ProjectPeriod != "" && utf8.RuneCountInString(info.ProjectPeriod) < 1000 {
  161. pInfo.ProjectPeriod = info.ProjectPeriod
  162. set["projectperiod"] = info.ProjectPeriod
  163. }
  164. if pInfo.ProjectScale == "" && info.ProjectScale != "" && utf8.RuneCountInString(info.ProjectScale) < 1000 {
  165. pInfo.ProjectPeriod = info.ProjectScale
  166. set["project_scale"] = info.ProjectScale
  167. }
  168. if pInfo.ProjectStartDate == 0 && info.ProjectStartDate > 0 {
  169. pInfo.ProjectStartDate = info.ProjectStartDate
  170. set["project_startdate"] = info.ProjectStartDate
  171. }
  172. if pInfo.ProjectCompleteDate == 0 && info.ProjectCompleteDate > 0 {
  173. pInfo.ProjectCompleteDate = info.ProjectCompleteDate
  174. set["project_completedate"] = info.ProjectCompleteDate
  175. }
  176. if pInfo.ConstructionArea == "" && info.ConstructionArea != "" && utf8.RuneCountInString(info.ConstructionArea) < 1000 {
  177. pInfo.ConstructionArea = info.ConstructionArea
  178. set["construction_area"] = info.ConstructionArea
  179. }
  180. if pInfo.FloorArea == "" && info.FloorArea != "" && utf8.RuneCountInString(info.FloorArea) < 1000 {
  181. pInfo.FloorArea = info.FloorArea
  182. set["floor_area"] = info.FloorArea
  183. }
  184. if pInfo.ProjectPerson == "" && info.ProjectPerson != "" {
  185. pInfo.ProjectPerson = info.ProjectPerson
  186. set["project_person"] = info.ProjectPerson
  187. }
  188. if pInfo.ProjectPhone == "" && info.ProjectPhone != "" {
  189. pInfo.ProjectPhone = info.ProjectPhone
  190. set["project_phone"] = info.ProjectPhone
  191. }
  192. for _, f := range []string{"ownerclass_code", "category_code", "nature_code", "project_stage_code"} {
  193. if c := util.ObjToString(tmp[f]); c != "" {
  194. set[f] = c
  195. }
  196. }
  197. set["pici"] = time.Now().Unix()
  198. push := PushListInfo(tmp, info.Id)
  199. push["follow_num"] = pInfo.FollowNum
  200. update := map[string]interface{}{}
  201. set["updatetime"] = time.Now().Unix()
  202. update["$set"] = set
  203. update["$push"] = map[string]interface{}{
  204. "list": push,
  205. "ids": info.Id,
  206. }
  207. updateInfo := []map[string]interface{}{
  208. {
  209. "_id": mongodb.StringTOBsonId(pInfo.Id),
  210. },
  211. update,
  212. }
  213. updatePool <- updateInfo
  214. }
  215. // @Description list中存放的信息
  216. // @Author J 2022/10/23 10:16
  217. func PushListInfo(tmp map[string]interface{}, infoid string) map[string]interface{} {
  218. res := map[string]interface{}{
  219. "infoid": infoid,
  220. }
  221. for _, k := range INFOFIELDS {
  222. if tmp[k] != nil {
  223. res[k] = tmp[k]
  224. }
  225. }
  226. return res
  227. }
  228. func NewCachePinfo(info Info) Project {
  229. p1 := Project{
  230. Id: info.Id,
  231. Ids: []string{info.Id},
  232. ProjectName: info.ProjectName,
  233. ApproveCode: info.ApproveCode,
  234. Area: info.Area,
  235. City: info.City,
  236. District: info.District,
  237. }
  238. return p1
  239. }
  240. //二分字符串查找
  241. func BinarySearch(s []string, k string) int {
  242. sort.Strings(s)
  243. lo, hi := 0, len(s)-1
  244. for lo <= hi {
  245. m := (lo + hi) >> 1
  246. if s[m] < k {
  247. lo = m + 1
  248. } else if s[m] > k {
  249. hi = m - 1
  250. } else {
  251. return m
  252. }
  253. }
  254. return -1
  255. }