package subject import ( log "github.com/donnie4w/go-logger/logger" "github.com/uuid" qu "qfw/util" "strings" su "subject_util" "sync" "time" "unicode/utf8" ul "util" ) var ( updateLock sync.Mutex ) // 正产增量主体数据服务 func RunSubjectAddDataInfo(gtid string, lteid string) { log.Debug("增量~~~") SeoUnique = map[string]string{} sess := su.SourceMgo.GetMgoConn() defer su.SourceMgo.DestoryMongoConn(sess) q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": su.StringTOBsonId(gtid), "$lte": su.StringTOBsonId(lteid), }, } log.Debug("查询语句 ~ ", q) it := sess.DB(su.SourceMgo.DbName).C(su.S_Coll_Name).Find(&q).Sort("_id").Select(BidFields).Iter() pool := make(chan bool, 8) wg := &sync.WaitGroup{} total := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%1000 == 0 { log.Debug("cur index ", total) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() if qu.IntAll(tmp["extracttype"]) != 1 { return } if qu.ObjToString(tmp["buyer"]) == "" && qu.ObjToString(tmp["agency"]) == "" && qu.ObjToString(tmp["winner"]) == "" && qu.ObjToString(tmp["owner"]) == "" { return } dealWithAddSubjectInfo(tmp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Debug("is over ~ ", total) } func dealWithAddSubjectInfo(tmp map[string]interface{}) { buyer := qu.ObjToString(tmp["buyer"]) agency := qu.ObjToString(tmp["agency"]) winner := qu.ObjToString(tmp["winner"]) owner := qu.ObjToString(tmp["owner"]) s_winner := qu.ObjToString(tmp["s_winner"]) b_per := qu.ObjToString(tmp["buyerperson"]) b_tel := qu.ObjToString(tmp["buyertel"]) if utf8.RuneCountInString(b_tel) > 60 { b_tel = "" } a_per := qu.ObjToString(tmp["agencyperson"]) a_tel := qu.ObjToString(tmp["agencytel"]) if utf8.RuneCountInString(a_tel) > 60 { a_tel = "" } w_per := qu.ObjToString(tmp["winnerperson"]) w_tel := qu.ObjToString(tmp["winnertel"]) if utf8.RuneCountInString(w_tel) > 60 { w_tel = "" } o_per := qu.ObjToString(tmp["project_person"]) o_tel := qu.ObjToString(tmp["project_phone"]) if utf8.RuneCountInString(o_tel) > 60 { o_tel = "" } buyerclass := qu.ObjToString(tmp["buyerclass"]) publishtime := qu.Int64All(tmp["publishtime"]) tmpid := su.BsonTOStringId(tmp["_id"]) winner_arr, winner_bool := SegmentationEntName(winner, s_winner) updateLock.Lock() if buyer != "" && utf8.RuneCountInString(buyer) < 30 { dealWithUpdateContact(buyer, buyerclass, &Contact{b_per, b_tel, true, false, false, false, publishtime}, "0001", tmpid) } //中标单位 for k, v := range winner_arr { b := winner_bool[k] c := Contact{"", "", false, false, true, false, publishtime} if b { c.Per = w_per c.Tel = w_tel } dealWithUpdateContact(v, "", &c, "0010", tmpid) } if agency != "" && utf8.RuneCountInString(agency) < 30 { dealWithUpdateContact(agency, "", &Contact{a_per, a_tel, false, true, false, false, publishtime}, "0100", tmpid) } if owner != "" && utf8.RuneCountInString(owner) < 30 { dealWithUpdateContact(owner, "", &Contact{o_per, o_tel, false, false, false, true, publishtime}, "1000", tmpid) } updateLock.Unlock() } func dealWithUpdateContact(name string, buyerclass string, contact *Contact, identity string, tmpid string) { info := su.MysqlGlobalTool.FindOne(su.G_Units_Baseinfo, map[string]interface{}{"name": name}, "", "-id") isNewEnt := false isNewEntInfo := map[string]interface{}{} isLast := false //是否需要更新lastime if identity == "0010" || identity == "0001" { isLast = true } if info == nil { } else { //判断企业库最新的company_id是否与当前保持一致 qyxy_info := CreateQyxyInfo(name) if len(qyxy_info) > 0 { new_company_id := qu.ObjToString(qyxy_info["_id"]) old_company_id := qu.ObjToString((*info)["company_id"]) if old_company_id != "" && new_company_id != "" && old_company_id != new_company_id { isNewEnt = true isNewEntInfo["name"] = name isNewEntInfo["name_id"] = qu.ObjToString((*info)["name_id"]) isNewEntInfo["company_id"] = old_company_id } } } if info == nil || isNewEnt { //新增主体信息~企业补充~年报补充 base_info := map[string]interface{}{} base_info["name"] = name qyxy_info := CreateQyxyInfo(name) //通讯录数据结构 contact_arr := []map[string]interface{}{} area, city, district := "", "", "" name_id := uuid.New().String() name_id = strings.ReplaceAll(name_id, "-", "") base_info["name_id"] = name_id base_info["identity_type"] = qu.IntAll(Str2DEC(identity)) base_info["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout) base_info["enterprise_type"] = su.GetCenterEnTypeValue(qyxy_info, name) //新增字段... if isLast && contact.Publishtime > 0 { base_info["latest_time"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout) } //创建通讯录信息 调整仅有联系电话即可 if contact.Tel != "" { date := "" if contact.Publishtime > 0 { date = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout) } contact_arr = append(contact_arr, map[string]interface{}{ "person": contact.Per, "tel": contact.Tel, "date": date, "email": "", "type": 1, "identity": base_info["identity_type"], }) } if len(qyxy_info) > 0 { //含企业信息 info_id := qu.ObjToString(qyxy_info["_id"]) base_info["company_id"] = info_id address := qu.ObjToString(qyxy_info["company_address"]) if utf8.RuneCountInString(address) > 300 { address = "" } base_info["address"] = address area = qu.ObjToString(qyxy_info["company_area"]) city = qu.ObjToString(qyxy_info["company_city"]) district = qu.ObjToString(qyxy_info["company_district"]) legal_person := qu.ObjToString(qyxy_info["legal_person"]) if legal_person != "" { //年报信息 CreateAnnualInfo(info_id, legal_person, &contact_arr) } } base_info["area_code"], base_info["city_code"], base_info["district_code"] = CalculateRegionCode(area, city, district) base_info["seo_id"] = GetRandomSeoId(true) //新增主体信息表 b := su.InsertMysqlData(su.G_Units_Baseinfo, base_info, tmpid) if b > 0 { //存通讯录 for _, v := range contact_arr { info_book := map[string]interface{}{} info_book["name_id"] = name_id info_book["contact_name"] = qu.ObjToString(v["person"]) info_book["contact_tel"] = qu.ObjToString(v["tel"]) info_book["contact_email"] = qu.ObjToString(v["email"]) date := qu.ObjToString(v["date"]) if date != "" { info_book["publishtime"] = date } info_book["source_type"] = qu.IntAll(v["type"]) info_book["identity_type"] = qu.IntAll(v["identity"]) info_book["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout) su.InsertMysqlData(su.G_Units_Contact, info_book, tmpid) } //存主体标签记录~暂时只进采购单位的主体 if qu.IntAll(identity)%10 == 1 { info_tag := map[string]interface{}{} info_tag["name_id"] = name_id info_tag["labelcode"] = "1" labelvalues := "00" if su.BuyerClassData[buyerclass] != "" { labelvalues = su.BuyerClassData[buyerclass] } info_tag["labelvalues"] = labelvalues //代码表 info_tag["identity_type"] = 1 info_tag["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout) su.InsertMysqlData(su.G_Units_Tags, info_tag, tmpid) } if isNewEnt { isNewEntInfo["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout) su.InsertMysqlData(su.G_Units_Warning, isNewEntInfo, tmpid) } } return } //更新~主体基本信息表 name_id := qu.ObjToString((*info)["name_id"]) update_info := map[string]interface{}{} info_identity := ConvertToBin(Str2DEC(qu.ObjToString((*info)["identity_type"]))) info_identity = SupplementIdentityType(info_identity) p_l_time := int64(0) p_l_time_str := qu.ObjToString((*info)["latest_time"]) if p_l_time_str != "" { t, _ := time.ParseInLocation(su.TimeLayout, p_l_time_str, time.Local) p_l_time = t.Unix() } if info_identity != identity { new_identity := CalculateIdentityType(info_identity, identity) if new_identity != info_identity { update_info["identity_type"] = qu.IntAll(Str2DEC(new_identity)) } } if isLast && contact.Publishtime > p_l_time { update_info["latest_time"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout) } if len(update_info) > 0 { update_info["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout) su.MysqlGlobalTool.Update(su.G_Units_Baseinfo, map[string]interface{}{"name_id": name_id}, update_info) } //更新~主体标签记录表 if buyerclass != "" && contact.Buyer { info_tag := su.MysqlGlobalTool.FindOne(su.G_Units_Tags, map[string]interface{}{"name_id": name_id}, "", "") if info_tag != nil { update_tag := map[string]interface{}{} cur_code := "00" if su.BuyerClassData[buyerclass] != "" { cur_code = su.BuyerClassData[buyerclass] } old_code := qu.ObjToString((*info_tag)["labelvalues"]) if cur_code != old_code { update_tag["labelvalues"] = cur_code update_tag["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout) su.MysqlGlobalTool.Update(su.G_Units_Tags, map[string]interface{}{"name_id": name_id}, update_tag) } } } if contact.Tel != "" { contact_datas := su.MysqlGlobalTool.Find(su.G_Units_Contact, map[string]interface{}{"name_id": name_id}, "", "", -1, -1) isExists := false if contact_datas != nil { for _, v := range *contact_datas { update_id := qu.IntAll(v["id"]) person := qu.ObjToString(v["contact_name"]) tel := qu.ObjToString(v["contact_tel"]) pt_str := qu.ObjToString(v["publishtime"]) pt := int64(0) if pt_str != "" { t, _ := time.ParseInLocation(su.TimeLayout, pt_str, time.Local) pt = t.Unix() } if person+"~"+tel == contact.Per+"~"+contact.Tel { isExists = true old_contact_identity := ConvertToBin(Str2DEC(qu.ObjToString(v["identity_type"]))) old_contact_identity = SupplementIdentityType(old_contact_identity) if old_contact_identity != identity || qu.IntAll(v["source_type"]) == 2 || (contact.Publishtime > pt && contact.Publishtime > 0) { new_identity := CalculateIdentityType(old_contact_identity, identity) update_contact := map[string]interface{}{} update_contact["source_type"] = 1 update_contact["identity_type"] = qu.IntAll(Str2DEC(new_identity)) update_contact["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout) if contact.Publishtime > pt { update_contact["publishtime"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout) } //更新通讯录 su.MysqlGlobalTool.Update(su.G_Units_Contact, map[string]interface{}{"id": update_id}, update_contact) } break } } } if !isExists { add_contact := map[string]interface{}{} add_contact["name_id"] = name_id add_contact["contact_name"] = contact.Per add_contact["contact_tel"] = contact.Tel add_contact["contact_email"] = "" add_contact["source_type"] = 1 if contact.Publishtime > 0 { add_contact["publishtime"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout) } add_contact["identity_type"] = qu.IntAll(Str2DEC(identity)) add_contact["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout) //新增~通讯录表 su.InsertMysqlData(su.G_Units_Contact, add_contact, tmpid) } } } // 创建年报字段~与现有的通讯录 func CreateAnnualInfo(company_id string, legal_person string, contact_arr *[]map[string]interface{}) { keys := map[string]string{} for _, v := range *contact_arr { key := qu.ObjToString(v["person"]) + "~" + qu.ObjToString(v["tel"]) keys[key] = key } dataArr, _ := su.SpiMgo.Find("annual_report_base", map[string]interface{}{"company_id": company_id}, nil, map[string]interface{}{ "company_phone": 1, "company_email": 1, "report_date": 1, }) for _, v := range dataArr { company_phone := qu.ObjToString(v["company_phone"]) company_email := qu.ObjToString(v["company_email"]) if company_phone != "" { report_date := qu.ObjToString(v["report_date"]) key := legal_person + "~" + company_phone if keys[key] == "" { dict := map[string]interface{}{ "person": legal_person, "tel": company_phone, "date": report_date, "email": company_email, "type": 2, "identity": 0, } (*contact_arr) = append((*contact_arr), dict) keys[key] = key } } } } // 创建企业信息 func CreateQyxyInfo(name string) map[string]interface{} { //查询企业库 qyxy_info := map[string]interface{}{} dataArr, _ := su.QyxyMgo.Find("qyxy_std", map[string]interface{}{"company_name": name}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{ "_id": 1, "company_name": 1, "company_address": 1, "company_area": 1, "company_city": 1, "company_district": 1, "legal_person": 1, "company_phone": 1, "company_email": 1, "company_type_old": 1, }) if len(dataArr) > 0 { qyxy_info = dataArr[0] //补充企业信息 } else { data := su.SpiMgo.FindOne("company_history_name", map[string]interface{}{ "history_name": name, }) if len(data) > 0 { company_id := qu.ObjToString(data["company_id"]) qyxy_info = su.QyxyMgo.FindOne("qyxy_std", map[string]interface{}{"_id": company_id}) } } return qyxy_info }