123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- package subject
- import (
- log "github.com/donnie4w/go-logger/logger"
- "github.com/uuid"
- qu "qfw/util"
- "strings"
- su "subject_util"
- "sync"
- "time"
- "unicode/utf8"
- ul "util"
- )
- var (
- updateLock sync.Mutex
- )
- // 正产增量主体数据服务
- func RunSubjectAddDataInfo(gtid string, lteid string) {
- log.Debug("增量~~~")
- SeoUnique = map[string]string{}
- sess := su.SourceMgo.GetMgoConn()
- defer su.SourceMgo.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": su.StringTOBsonId(gtid),
- "$lte": su.StringTOBsonId(lteid),
- },
- }
- log.Debug("查询语句 ~ ", q)
- it := sess.DB(su.SourceMgo.DbName).C(su.S_Coll_Name).Find(&q).Sort("_id").Select(BidFields).Iter()
- pool := make(chan bool, 8)
- wg := &sync.WaitGroup{}
- total := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total%1000 == 0 {
- log.Debug("cur index ", total)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- if qu.IntAll(tmp["extracttype"]) != 1 {
- return
- }
- if qu.ObjToString(tmp["buyer"]) == "" &&
- qu.ObjToString(tmp["agency"]) == "" &&
- qu.ObjToString(tmp["winner"]) == "" &&
- qu.ObjToString(tmp["owner"]) == "" {
- return
- }
- dealWithAddSubjectInfo(tmp)
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Debug("is over ~ ", total)
- }
- func dealWithAddSubjectInfo(tmp map[string]interface{}) {
- buyer := qu.ObjToString(tmp["buyer"])
- agency := qu.ObjToString(tmp["agency"])
- winner := qu.ObjToString(tmp["winner"])
- owner := qu.ObjToString(tmp["owner"])
- s_winner := qu.ObjToString(tmp["s_winner"])
- b_per := qu.ObjToString(tmp["buyerperson"])
- b_tel := qu.ObjToString(tmp["buyertel"])
- if utf8.RuneCountInString(b_tel) > 60 {
- b_tel = ""
- }
- a_per := qu.ObjToString(tmp["agencyperson"])
- a_tel := qu.ObjToString(tmp["agencytel"])
- if utf8.RuneCountInString(a_tel) > 60 {
- a_tel = ""
- }
- w_per := qu.ObjToString(tmp["winnerperson"])
- w_tel := qu.ObjToString(tmp["winnertel"])
- if utf8.RuneCountInString(w_tel) > 60 {
- w_tel = ""
- }
- o_per := qu.ObjToString(tmp["project_person"])
- o_tel := qu.ObjToString(tmp["project_phone"])
- if utf8.RuneCountInString(o_tel) > 60 {
- o_tel = ""
- }
- buyerclass := qu.ObjToString(tmp["buyerclass"])
- publishtime := qu.Int64All(tmp["publishtime"])
- tmpid := su.BsonTOStringId(tmp["_id"])
- winner_arr, winner_bool := SegmentationEntName(winner, s_winner)
- updateLock.Lock()
- if buyer != "" && utf8.RuneCountInString(buyer) < 30 {
- dealWithUpdateContact(buyer, buyerclass, &Contact{b_per, b_tel, true, false, false, false, publishtime}, "0001", tmpid)
- }
- //中标单位
- for k, v := range winner_arr {
- b := winner_bool[k]
- c := Contact{"", "", false, false, true, false, publishtime}
- if b {
- c.Per = w_per
- c.Tel = w_tel
- }
- dealWithUpdateContact(v, "", &c, "0010", tmpid)
- }
- if agency != "" && utf8.RuneCountInString(agency) < 30 {
- dealWithUpdateContact(agency, "", &Contact{a_per, a_tel, false, true, false, false, publishtime}, "0100", tmpid)
- }
- if owner != "" && utf8.RuneCountInString(owner) < 30 {
- dealWithUpdateContact(owner, "", &Contact{o_per, o_tel, false, false, false, true, publishtime}, "1000", tmpid)
- }
- updateLock.Unlock()
- }
- func dealWithUpdateContact(name string, buyerclass string, contact *Contact, identity string, tmpid string) {
- info := su.MysqlGlobalTool.FindOne(su.G_Units_Baseinfo, map[string]interface{}{"name": name}, "", "-id")
- isNewEnt := false
- isNewEntInfo := map[string]interface{}{}
- isLast := false //是否需要更新lastime
- if identity == "0010" || identity == "0001" {
- isLast = true
- }
- if info == nil {
- } else { //判断企业库最新的company_id是否与当前保持一致
- qyxy_info := CreateQyxyInfo(name)
- if len(qyxy_info) > 0 {
- new_company_id := qu.ObjToString(qyxy_info["_id"])
- old_company_id := qu.ObjToString((*info)["company_id"])
- if old_company_id != "" && new_company_id != "" && old_company_id != new_company_id {
- isNewEnt = true
- isNewEntInfo["name"] = name
- isNewEntInfo["name_id"] = qu.ObjToString((*info)["name_id"])
- isNewEntInfo["company_id"] = old_company_id
- }
- }
- }
- if info == nil || isNewEnt { //新增主体信息~企业补充~年报补充
- base_info := map[string]interface{}{}
- base_info["name"] = name
- qyxy_info := CreateQyxyInfo(name)
- //通讯录数据结构
- contact_arr := []map[string]interface{}{}
- area, city, district := "", "", ""
- name_id := uuid.New().String()
- name_id = strings.ReplaceAll(name_id, "-", "")
- base_info["name_id"] = name_id
- base_info["identity_type"] = qu.IntAll(Str2DEC(identity))
- base_info["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
- base_info["enterprise_type"] = su.GetCenterEnTypeValue(qyxy_info, name) //新增字段...
- if isLast && contact.Publishtime > 0 {
- base_info["latest_time"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
- }
- //创建通讯录信息 调整仅有联系电话即可
- if contact.Tel != "" {
- date := ""
- if contact.Publishtime > 0 {
- date = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
- }
- contact_arr = append(contact_arr, map[string]interface{}{
- "person": contact.Per,
- "tel": contact.Tel,
- "date": date,
- "email": "",
- "type": 1,
- "identity": base_info["identity_type"],
- })
- }
- if len(qyxy_info) > 0 { //含企业信息
- info_id := qu.ObjToString(qyxy_info["_id"])
- base_info["company_id"] = info_id
- address := qu.ObjToString(qyxy_info["company_address"])
- if utf8.RuneCountInString(address) > 300 {
- address = ""
- }
- base_info["address"] = address
- area = qu.ObjToString(qyxy_info["company_area"])
- city = qu.ObjToString(qyxy_info["company_city"])
- district = qu.ObjToString(qyxy_info["company_district"])
- legal_person := qu.ObjToString(qyxy_info["legal_person"])
- if legal_person != "" { //年报信息
- CreateAnnualInfo(info_id, legal_person, &contact_arr)
- }
- }
- base_info["area_code"], base_info["city_code"], base_info["district_code"] = CalculateRegionCode(area, city, district)
- base_info["seo_id"] = GetRandomSeoId(true)
- //新增主体信息表
- b := su.InsertMysqlData(su.G_Units_Baseinfo, base_info, tmpid)
- if b > 0 { //存通讯录
- for _, v := range contact_arr {
- info_book := map[string]interface{}{}
- info_book["name_id"] = name_id
- info_book["contact_name"] = qu.ObjToString(v["person"])
- info_book["contact_tel"] = qu.ObjToString(v["tel"])
- info_book["contact_email"] = qu.ObjToString(v["email"])
- date := qu.ObjToString(v["date"])
- if date != "" {
- info_book["publishtime"] = date
- }
- info_book["source_type"] = qu.IntAll(v["type"])
- info_book["identity_type"] = qu.IntAll(v["identity"])
- info_book["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
- su.InsertMysqlData(su.G_Units_Contact, info_book, tmpid)
- }
- //存主体标签记录~暂时只进采购单位的主体
- if qu.IntAll(identity)%10 == 1 {
- info_tag := map[string]interface{}{}
- info_tag["name_id"] = name_id
- info_tag["labelcode"] = "1"
- labelvalues := "00"
- if su.BuyerClassData[buyerclass] != "" {
- labelvalues = su.BuyerClassData[buyerclass]
- }
- info_tag["labelvalues"] = labelvalues //代码表
- info_tag["identity_type"] = 1
- info_tag["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
- su.InsertMysqlData(su.G_Units_Tags, info_tag, tmpid)
- }
- if isNewEnt {
- isNewEntInfo["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
- su.InsertMysqlData(su.G_Units_Warning, isNewEntInfo, tmpid)
- }
- }
- return
- }
- //更新~主体基本信息表
- name_id := qu.ObjToString((*info)["name_id"])
- update_info := map[string]interface{}{}
- info_identity := ConvertToBin(Str2DEC(qu.ObjToString((*info)["identity_type"])))
- info_identity = SupplementIdentityType(info_identity)
- p_l_time := int64(0)
- p_l_time_str := qu.ObjToString((*info)["latest_time"])
- if p_l_time_str != "" {
- t, _ := time.ParseInLocation(su.TimeLayout, p_l_time_str, time.Local)
- p_l_time = t.Unix()
- }
- if info_identity != identity {
- new_identity := CalculateIdentityType(info_identity, identity)
- if new_identity != info_identity {
- update_info["identity_type"] = qu.IntAll(Str2DEC(new_identity))
- }
- }
- if isLast && contact.Publishtime > p_l_time {
- update_info["latest_time"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
- }
- if len(update_info) > 0 {
- update_info["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
- su.MysqlGlobalTool.Update(su.G_Units_Baseinfo, map[string]interface{}{"name_id": name_id}, update_info)
- }
- //更新~主体标签记录表
- if buyerclass != "" && contact.Buyer {
- info_tag := su.MysqlGlobalTool.FindOne(su.G_Units_Tags, map[string]interface{}{"name_id": name_id}, "", "")
- if info_tag != nil {
- update_tag := map[string]interface{}{}
- cur_code := "00"
- if su.BuyerClassData[buyerclass] != "" {
- cur_code = su.BuyerClassData[buyerclass]
- }
- old_code := qu.ObjToString((*info_tag)["labelvalues"])
- if cur_code != old_code {
- update_tag["labelvalues"] = cur_code
- update_tag["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
- su.MysqlGlobalTool.Update(su.G_Units_Tags, map[string]interface{}{"name_id": name_id}, update_tag)
- }
- }
- }
- if contact.Tel != "" {
- contact_datas := su.MysqlGlobalTool.Find(su.G_Units_Contact, map[string]interface{}{"name_id": name_id}, "", "", -1, -1)
- isExists := false
- if contact_datas != nil {
- for _, v := range *contact_datas {
- update_id := qu.IntAll(v["id"])
- person := qu.ObjToString(v["contact_name"])
- tel := qu.ObjToString(v["contact_tel"])
- pt_str := qu.ObjToString(v["publishtime"])
- pt := int64(0)
- if pt_str != "" {
- t, _ := time.ParseInLocation(su.TimeLayout, pt_str, time.Local)
- pt = t.Unix()
- }
- if person+"~"+tel == contact.Per+"~"+contact.Tel {
- isExists = true
- old_contact_identity := ConvertToBin(Str2DEC(qu.ObjToString(v["identity_type"])))
- old_contact_identity = SupplementIdentityType(old_contact_identity)
- if old_contact_identity != identity || qu.IntAll(v["source_type"]) == 2 || (contact.Publishtime > pt && contact.Publishtime > 0) {
- new_identity := CalculateIdentityType(old_contact_identity, identity)
- update_contact := map[string]interface{}{}
- update_contact["source_type"] = 1
- update_contact["identity_type"] = qu.IntAll(Str2DEC(new_identity))
- update_contact["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
- if contact.Publishtime > pt {
- update_contact["publishtime"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
- }
- //更新通讯录
- su.MysqlGlobalTool.Update(su.G_Units_Contact, map[string]interface{}{"id": update_id}, update_contact)
- }
- break
- }
- }
- }
- if !isExists {
- add_contact := map[string]interface{}{}
- add_contact["name_id"] = name_id
- add_contact["contact_name"] = contact.Per
- add_contact["contact_tel"] = contact.Tel
- add_contact["contact_email"] = ""
- add_contact["source_type"] = 1
- if contact.Publishtime > 0 {
- add_contact["publishtime"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
- }
- add_contact["identity_type"] = qu.IntAll(Str2DEC(identity))
- add_contact["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
- //新增~通讯录表
- su.InsertMysqlData(su.G_Units_Contact, add_contact, tmpid)
- }
- }
- }
- // 创建年报字段~与现有的通讯录
- func CreateAnnualInfo(company_id string, legal_person string, contact_arr *[]map[string]interface{}) {
- keys := map[string]string{}
- for _, v := range *contact_arr {
- key := qu.ObjToString(v["person"]) + "~" + qu.ObjToString(v["tel"])
- keys[key] = key
- }
- dataArr, _ := su.SpiMgo.Find("annual_report_base", map[string]interface{}{"company_id": company_id}, nil, map[string]interface{}{
- "company_phone": 1,
- "company_email": 1,
- "report_date": 1,
- })
- for _, v := range dataArr {
- company_phone := qu.ObjToString(v["company_phone"])
- company_email := qu.ObjToString(v["company_email"])
- if company_phone != "" {
- report_date := qu.ObjToString(v["report_date"])
- key := legal_person + "~" + company_phone
- if keys[key] == "" {
- dict := map[string]interface{}{
- "person": legal_person,
- "tel": company_phone,
- "date": report_date,
- "email": company_email,
- "type": 2,
- "identity": 0,
- }
- (*contact_arr) = append((*contact_arr), dict)
- keys[key] = key
- }
- }
- }
- }
- // 创建企业信息
- func CreateQyxyInfo(name string) map[string]interface{} {
- //查询企业库
- qyxy_info := map[string]interface{}{}
- dataArr, _ := su.QyxyMgo.Find("qyxy_std", map[string]interface{}{"company_name": name}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{
- "_id": 1,
- "company_name": 1,
- "company_address": 1,
- "company_area": 1,
- "company_city": 1,
- "company_district": 1,
- "legal_person": 1,
- "company_phone": 1,
- "company_email": 1,
- "company_type_old": 1,
- })
- if len(dataArr) > 0 {
- qyxy_info = dataArr[0] //补充企业信息
- } else {
- data := su.SpiMgo.FindOne("company_history_name", map[string]interface{}{
- "history_name": name,
- })
- if len(data) > 0 {
- company_id := qu.ObjToString(data["company_id"])
- qyxy_info = su.QyxyMgo.FindOne("qyxy_std", map[string]interface{}{"_id": company_id})
- }
- }
- return qyxy_info
- }
|