subject_add.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. package subject
  2. import (
  3. log "github.com/donnie4w/go-logger/logger"
  4. "github.com/uuid"
  5. qu "qfw/util"
  6. "strings"
  7. su "subject_util"
  8. "sync"
  9. "time"
  10. "unicode/utf8"
  11. ul "util"
  12. )
  13. var (
  14. updateLock sync.Mutex
  15. )
  16. // 正产增量主体数据服务
  17. func RunSubjectAddDataInfo(gtid string, lteid string) {
  18. log.Debug("增量~~~")
  19. SeoUnique = map[string]string{}
  20. sess := su.SourceMgo.GetMgoConn()
  21. defer su.SourceMgo.DestoryMongoConn(sess)
  22. q := map[string]interface{}{
  23. "_id": map[string]interface{}{
  24. "$gt": su.StringTOBsonId(gtid),
  25. "$lte": su.StringTOBsonId(lteid),
  26. },
  27. }
  28. log.Debug("查询语句 ~ ", q)
  29. it := sess.DB(su.SourceMgo.DbName).C(su.S_Coll_Name).Find(&q).Sort("_id").Select(BidFields).Iter()
  30. pool := make(chan bool, 8)
  31. wg := &sync.WaitGroup{}
  32. total := 0
  33. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  34. if total%1000 == 0 {
  35. log.Debug("cur index ", total)
  36. }
  37. pool <- true
  38. wg.Add(1)
  39. go func(tmp map[string]interface{}) {
  40. defer func() {
  41. <-pool
  42. wg.Done()
  43. }()
  44. if qu.IntAll(tmp["extracttype"]) != 1 {
  45. return
  46. }
  47. if qu.ObjToString(tmp["buyer"]) == "" &&
  48. qu.ObjToString(tmp["agency"]) == "" &&
  49. qu.ObjToString(tmp["winner"]) == "" &&
  50. qu.ObjToString(tmp["owner"]) == "" {
  51. return
  52. }
  53. dealWithAddSubjectInfo(tmp)
  54. }(tmp)
  55. tmp = make(map[string]interface{})
  56. }
  57. wg.Wait()
  58. log.Debug("is over ~ ", total)
  59. }
  60. func dealWithAddSubjectInfo(tmp map[string]interface{}) {
  61. buyer := qu.ObjToString(tmp["buyer"])
  62. agency := qu.ObjToString(tmp["agency"])
  63. winner := qu.ObjToString(tmp["winner"])
  64. owner := qu.ObjToString(tmp["owner"])
  65. s_winner := qu.ObjToString(tmp["s_winner"])
  66. b_per := qu.ObjToString(tmp["buyerperson"])
  67. b_tel := qu.ObjToString(tmp["buyertel"])
  68. if utf8.RuneCountInString(b_tel) > 60 {
  69. b_tel = ""
  70. }
  71. a_per := qu.ObjToString(tmp["agencyperson"])
  72. a_tel := qu.ObjToString(tmp["agencytel"])
  73. if utf8.RuneCountInString(a_tel) > 60 {
  74. a_tel = ""
  75. }
  76. w_per := qu.ObjToString(tmp["winnerperson"])
  77. w_tel := qu.ObjToString(tmp["winnertel"])
  78. if utf8.RuneCountInString(w_tel) > 60 {
  79. w_tel = ""
  80. }
  81. o_per := qu.ObjToString(tmp["project_person"])
  82. o_tel := qu.ObjToString(tmp["project_phone"])
  83. if utf8.RuneCountInString(o_tel) > 60 {
  84. o_tel = ""
  85. }
  86. buyerclass := qu.ObjToString(tmp["buyerclass"])
  87. publishtime := qu.Int64All(tmp["publishtime"])
  88. tmpid := su.BsonTOStringId(tmp["_id"])
  89. winner_arr, winner_bool := SegmentationEntName(winner, s_winner)
  90. updateLock.Lock()
  91. if buyer != "" && utf8.RuneCountInString(buyer) < 30 {
  92. dealWithUpdateContact(buyer, buyerclass, &Contact{b_per, b_tel, true, false, false, false, publishtime}, "0001", tmpid)
  93. }
  94. //中标单位
  95. for k, v := range winner_arr {
  96. b := winner_bool[k]
  97. c := Contact{"", "", false, false, true, false, publishtime}
  98. if b {
  99. c.Per = w_per
  100. c.Tel = w_tel
  101. }
  102. dealWithUpdateContact(v, "", &c, "0010", tmpid)
  103. }
  104. if agency != "" && utf8.RuneCountInString(agency) < 30 {
  105. dealWithUpdateContact(agency, "", &Contact{a_per, a_tel, false, true, false, false, publishtime}, "0100", tmpid)
  106. }
  107. if owner != "" && utf8.RuneCountInString(owner) < 30 {
  108. dealWithUpdateContact(owner, "", &Contact{o_per, o_tel, false, false, false, true, publishtime}, "1000", tmpid)
  109. }
  110. updateLock.Unlock()
  111. }
  112. func dealWithUpdateContact(name string, buyerclass string, contact *Contact, identity string, tmpid string) {
  113. info := su.MysqlGlobalTool.FindOne(su.G_Units_Baseinfo, map[string]interface{}{"name": name}, "", "-id")
  114. isNewEnt := false
  115. isNewEntInfo := map[string]interface{}{}
  116. isLast := false //是否需要更新lastime
  117. if identity == "0010" || identity == "0001" {
  118. isLast = true
  119. }
  120. if info == nil {
  121. } else { //判断企业库最新的company_id是否与当前保持一致
  122. qyxy_info := CreateQyxyInfo(name)
  123. if len(qyxy_info) > 0 {
  124. new_company_id := qu.ObjToString(qyxy_info["_id"])
  125. old_company_id := qu.ObjToString((*info)["company_id"])
  126. if old_company_id != "" && new_company_id != "" && old_company_id != new_company_id {
  127. isNewEnt = true
  128. isNewEntInfo["name"] = name
  129. isNewEntInfo["name_id"] = qu.ObjToString((*info)["name_id"])
  130. isNewEntInfo["company_id"] = old_company_id
  131. }
  132. }
  133. }
  134. if info == nil || isNewEnt { //新增主体信息~企业补充~年报补充
  135. base_info := map[string]interface{}{}
  136. base_info["name"] = name
  137. qyxy_info := CreateQyxyInfo(name)
  138. //通讯录数据结构
  139. contact_arr := []map[string]interface{}{}
  140. area, city, district := "", "", ""
  141. name_id := uuid.New().String()
  142. name_id = strings.ReplaceAll(name_id, "-", "")
  143. base_info["name_id"] = name_id
  144. base_info["identity_type"] = qu.IntAll(Str2DEC(identity))
  145. base_info["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
  146. base_info["enterprise_type"] = su.GetCenterEnTypeValue(qyxy_info, name) //新增字段...
  147. if isLast && contact.Publishtime > 0 {
  148. base_info["latest_time"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
  149. }
  150. //创建通讯录信息 调整仅有联系电话即可
  151. if contact.Tel != "" {
  152. date := ""
  153. if contact.Publishtime > 0 {
  154. date = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
  155. }
  156. contact_arr = append(contact_arr, map[string]interface{}{
  157. "person": contact.Per,
  158. "tel": contact.Tel,
  159. "date": date,
  160. "email": "",
  161. "type": 1,
  162. "identity": base_info["identity_type"],
  163. })
  164. }
  165. if len(qyxy_info) > 0 { //含企业信息
  166. info_id := qu.ObjToString(qyxy_info["_id"])
  167. base_info["company_id"] = info_id
  168. address := qu.ObjToString(qyxy_info["company_address"])
  169. if utf8.RuneCountInString(address) > 300 {
  170. address = ""
  171. }
  172. base_info["address"] = address
  173. area = qu.ObjToString(qyxy_info["company_area"])
  174. city = qu.ObjToString(qyxy_info["company_city"])
  175. district = qu.ObjToString(qyxy_info["company_district"])
  176. legal_person := qu.ObjToString(qyxy_info["legal_person"])
  177. if legal_person != "" { //年报信息
  178. CreateAnnualInfo(info_id, legal_person, &contact_arr)
  179. }
  180. }
  181. base_info["area_code"], base_info["city_code"], base_info["district_code"] = CalculateRegionCode(area, city, district)
  182. base_info["seo_id"] = GetRandomSeoId(true)
  183. //新增主体信息表
  184. b := su.InsertMysqlData(su.G_Units_Baseinfo, base_info, tmpid)
  185. if b > 0 { //存通讯录
  186. for _, v := range contact_arr {
  187. info_book := map[string]interface{}{}
  188. info_book["name_id"] = name_id
  189. info_book["contact_name"] = qu.ObjToString(v["person"])
  190. info_book["contact_tel"] = qu.ObjToString(v["tel"])
  191. info_book["contact_email"] = qu.ObjToString(v["email"])
  192. date := qu.ObjToString(v["date"])
  193. if date != "" {
  194. info_book["publishtime"] = date
  195. }
  196. info_book["source_type"] = qu.IntAll(v["type"])
  197. info_book["identity_type"] = qu.IntAll(v["identity"])
  198. info_book["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
  199. su.InsertMysqlData(su.G_Units_Contact, info_book, tmpid)
  200. }
  201. //存主体标签记录~暂时只进采购单位的主体
  202. if qu.IntAll(identity)%10 == 1 {
  203. info_tag := map[string]interface{}{}
  204. info_tag["name_id"] = name_id
  205. info_tag["labelcode"] = "1"
  206. labelvalues := "00"
  207. if su.BuyerClassData[buyerclass] != "" {
  208. labelvalues = su.BuyerClassData[buyerclass]
  209. }
  210. info_tag["labelvalues"] = labelvalues //代码表
  211. info_tag["identity_type"] = 1
  212. info_tag["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
  213. su.InsertMysqlData(su.G_Units_Tags, info_tag, tmpid)
  214. }
  215. if isNewEnt {
  216. isNewEntInfo["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
  217. su.InsertMysqlData(su.G_Units_Warning, isNewEntInfo, tmpid)
  218. }
  219. }
  220. return
  221. }
  222. //更新~主体基本信息表
  223. name_id := qu.ObjToString((*info)["name_id"])
  224. update_info := map[string]interface{}{}
  225. info_identity := ConvertToBin(Str2DEC(qu.ObjToString((*info)["identity_type"])))
  226. info_identity = SupplementIdentityType(info_identity)
  227. p_l_time := int64(0)
  228. p_l_time_str := qu.ObjToString((*info)["latest_time"])
  229. if p_l_time_str != "" {
  230. t, _ := time.ParseInLocation(su.TimeLayout, p_l_time_str, time.Local)
  231. p_l_time = t.Unix()
  232. }
  233. if info_identity != identity {
  234. new_identity := CalculateIdentityType(info_identity, identity)
  235. if new_identity != info_identity {
  236. update_info["identity_type"] = qu.IntAll(Str2DEC(new_identity))
  237. }
  238. }
  239. if isLast && contact.Publishtime > p_l_time {
  240. update_info["latest_time"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
  241. }
  242. if len(update_info) > 0 {
  243. update_info["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
  244. su.MysqlGlobalTool.Update(su.G_Units_Baseinfo, map[string]interface{}{"name_id": name_id}, update_info)
  245. }
  246. //更新~主体标签记录表
  247. if buyerclass != "" && contact.Buyer {
  248. info_tag := su.MysqlGlobalTool.FindOne(su.G_Units_Tags, map[string]interface{}{"name_id": name_id}, "", "")
  249. if info_tag != nil {
  250. update_tag := map[string]interface{}{}
  251. cur_code := "00"
  252. if su.BuyerClassData[buyerclass] != "" {
  253. cur_code = su.BuyerClassData[buyerclass]
  254. }
  255. old_code := qu.ObjToString((*info_tag)["labelvalues"])
  256. if cur_code != old_code {
  257. update_tag["labelvalues"] = cur_code
  258. update_tag["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
  259. su.MysqlGlobalTool.Update(su.G_Units_Tags, map[string]interface{}{"name_id": name_id}, update_tag)
  260. }
  261. }
  262. }
  263. if contact.Tel != "" {
  264. contact_datas := su.MysqlGlobalTool.Find(su.G_Units_Contact, map[string]interface{}{"name_id": name_id}, "", "", -1, -1)
  265. isExists := false
  266. if contact_datas != nil {
  267. for _, v := range *contact_datas {
  268. update_id := qu.IntAll(v["id"])
  269. person := qu.ObjToString(v["contact_name"])
  270. tel := qu.ObjToString(v["contact_tel"])
  271. pt_str := qu.ObjToString(v["publishtime"])
  272. pt := int64(0)
  273. if pt_str != "" {
  274. t, _ := time.ParseInLocation(su.TimeLayout, pt_str, time.Local)
  275. pt = t.Unix()
  276. }
  277. if person+"~"+tel == contact.Per+"~"+contact.Tel {
  278. isExists = true
  279. old_contact_identity := ConvertToBin(Str2DEC(qu.ObjToString(v["identity_type"])))
  280. old_contact_identity = SupplementIdentityType(old_contact_identity)
  281. if old_contact_identity != identity || qu.IntAll(v["source_type"]) == 2 || (contact.Publishtime > pt && contact.Publishtime > 0) {
  282. new_identity := CalculateIdentityType(old_contact_identity, identity)
  283. update_contact := map[string]interface{}{}
  284. update_contact["source_type"] = 1
  285. update_contact["identity_type"] = qu.IntAll(Str2DEC(new_identity))
  286. update_contact["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
  287. if contact.Publishtime > pt {
  288. update_contact["publishtime"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
  289. }
  290. //更新通讯录
  291. su.MysqlGlobalTool.Update(su.G_Units_Contact, map[string]interface{}{"id": update_id}, update_contact)
  292. }
  293. break
  294. }
  295. }
  296. }
  297. if !isExists {
  298. add_contact := map[string]interface{}{}
  299. add_contact["name_id"] = name_id
  300. add_contact["contact_name"] = contact.Per
  301. add_contact["contact_tel"] = contact.Tel
  302. add_contact["contact_email"] = ""
  303. add_contact["source_type"] = 1
  304. if contact.Publishtime > 0 {
  305. add_contact["publishtime"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
  306. }
  307. add_contact["identity_type"] = qu.IntAll(Str2DEC(identity))
  308. add_contact["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
  309. //新增~通讯录表
  310. su.InsertMysqlData(su.G_Units_Contact, add_contact, tmpid)
  311. }
  312. }
  313. }
  314. // 创建年报字段~与现有的通讯录
  315. func CreateAnnualInfo(company_id string, legal_person string, contact_arr *[]map[string]interface{}) {
  316. keys := map[string]string{}
  317. for _, v := range *contact_arr {
  318. key := qu.ObjToString(v["person"]) + "~" + qu.ObjToString(v["tel"])
  319. keys[key] = key
  320. }
  321. dataArr, _ := su.SpiMgo.Find("annual_report_base", map[string]interface{}{"company_id": company_id}, nil, map[string]interface{}{
  322. "company_phone": 1,
  323. "company_email": 1,
  324. "report_date": 1,
  325. })
  326. for _, v := range dataArr {
  327. company_phone := qu.ObjToString(v["company_phone"])
  328. company_email := qu.ObjToString(v["company_email"])
  329. if company_phone != "" {
  330. report_date := qu.ObjToString(v["report_date"])
  331. key := legal_person + "~" + company_phone
  332. if keys[key] == "" {
  333. dict := map[string]interface{}{
  334. "person": legal_person,
  335. "tel": company_phone,
  336. "date": report_date,
  337. "email": company_email,
  338. "type": 2,
  339. "identity": 0,
  340. }
  341. (*contact_arr) = append((*contact_arr), dict)
  342. keys[key] = key
  343. }
  344. }
  345. }
  346. }
  347. // 创建企业信息
  348. func CreateQyxyInfo(name string) map[string]interface{} {
  349. //查询企业库
  350. qyxy_info := map[string]interface{}{}
  351. dataArr, _ := su.QyxyMgo.Find("qyxy_std", map[string]interface{}{"company_name": name}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{
  352. "_id": 1,
  353. "company_name": 1,
  354. "company_address": 1,
  355. "company_area": 1,
  356. "company_city": 1,
  357. "company_district": 1,
  358. "legal_person": 1,
  359. "company_phone": 1,
  360. "company_email": 1,
  361. "company_type_old": 1,
  362. })
  363. if len(dataArr) > 0 {
  364. qyxy_info = dataArr[0] //补充企业信息
  365. } else {
  366. data := su.SpiMgo.FindOne("company_history_name", map[string]interface{}{
  367. "history_name": name,
  368. })
  369. if len(data) > 0 {
  370. company_id := qu.ObjToString(data["company_id"])
  371. qyxy_info = su.QyxyMgo.FindOne("qyxy_std", map[string]interface{}{"_id": company_id})
  372. }
  373. }
  374. return qyxy_info
  375. }