|
@@ -0,0 +1,352 @@
|
|
|
+package subject
|
|
|
+
|
|
|
+import (
|
|
|
+ log "github.com/donnie4w/go-logger/logger"
|
|
|
+ "github.com/uuid"
|
|
|
+ qu "qfw/util"
|
|
|
+ "strings"
|
|
|
+ su "subject_util"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+ "unicode/utf8"
|
|
|
+ ul "util"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ updateLock sync.Mutex
|
|
|
+)
|
|
|
+
|
|
|
+//正产增量主体数据服务
|
|
|
+func RunSubjectAddDataInfo(gtid string, lteid string) {
|
|
|
+ log.Debug("增量~~~")
|
|
|
+ sess := su.SourceMgo.GetMgoConn()
|
|
|
+ defer su.SourceMgo.DestoryMongoConn(sess)
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": su.StringTOBsonId(gtid),
|
|
|
+ "$lte": su.StringTOBsonId(lteid),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ log.Debug("查询语句 ~ ", q)
|
|
|
+ it := sess.DB(su.SourceMgo.DbName).C(su.S_Coll_Name).Find(&q).Sort("_id").Select(BidFields).Iter()
|
|
|
+ pool := make(chan bool, 8)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ total := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
|
|
|
+ if total%1000 == 0 {
|
|
|
+ log.Debug("cur index ", total)
|
|
|
+ }
|
|
|
+ pool <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-pool
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ if qu.IntAll(tmp["extracttype"]) != 1 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if qu.ObjToString(tmp["buyer"]) == "" &&
|
|
|
+ qu.ObjToString(tmp["agency"]) == "" &&
|
|
|
+ qu.ObjToString(tmp["winner"]) == "" {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ dealWithAddSubjectInfo(tmp)
|
|
|
+ }(tmp)
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ log.Debug("is over ~ ", total)
|
|
|
+}
|
|
|
+
|
|
|
+func dealWithAddSubjectInfo(tmp map[string]interface{}) {
|
|
|
+ buyer := qu.ObjToString(tmp["buyer"])
|
|
|
+ agency := qu.ObjToString(tmp["agency"])
|
|
|
+ winner := qu.ObjToString(tmp["winner"])
|
|
|
+ s_winner := qu.ObjToString(tmp["s_winner"])
|
|
|
+ b_per := qu.ObjToString(tmp["buyerperson"])
|
|
|
+ b_tel := qu.ObjToString(tmp["buyertel"])
|
|
|
+ if utf8.RuneCountInString(b_tel) > 60 {
|
|
|
+ b_tel = ""
|
|
|
+ }
|
|
|
+ a_per := qu.ObjToString(tmp["agencyperson"])
|
|
|
+ a_tel := qu.ObjToString(tmp["agencytel"])
|
|
|
+ if utf8.RuneCountInString(a_tel) > 60 {
|
|
|
+ a_tel = ""
|
|
|
+ }
|
|
|
+ w_per := qu.ObjToString(tmp["winnerperson"])
|
|
|
+ w_tel := qu.ObjToString(tmp["winnertel"])
|
|
|
+ if utf8.RuneCountInString(w_tel) > 60 {
|
|
|
+ w_tel = ""
|
|
|
+ }
|
|
|
+ buyerclass := qu.ObjToString(tmp["buyerclass"])
|
|
|
+ publishtime := qu.Int64All(tmp["publishtime"])
|
|
|
+ tmpid := su.BsonTOStringId(tmp["_id"])
|
|
|
+ winner_arr, winner_bool := SegmentationEntName(winner, s_winner)
|
|
|
+
|
|
|
+ updateLock.Lock()
|
|
|
+ if buyer != "" && utf8.RuneCountInString(buyer) < 30 {
|
|
|
+ dealWithUpdateContact(buyer, buyerclass, &Contact{b_per, b_tel, true, false, false, publishtime}, "001", tmpid)
|
|
|
+ }
|
|
|
+
|
|
|
+ //中标单位
|
|
|
+ for k, v := range winner_arr {
|
|
|
+ b := winner_bool[k]
|
|
|
+ c := Contact{"", "", false, false, true, publishtime}
|
|
|
+ if b {
|
|
|
+ c.Per = w_per
|
|
|
+ c.Tel = w_tel
|
|
|
+ }
|
|
|
+ dealWithUpdateContact(v, "", &c, "010", tmpid)
|
|
|
+ }
|
|
|
+
|
|
|
+ if agency != "" && utf8.RuneCountInString(agency) < 30 {
|
|
|
+ dealWithUpdateContact(agency, "", &Contact{a_per, a_tel, false, true, false, publishtime}, "100", tmpid)
|
|
|
+ }
|
|
|
+ updateLock.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+func dealWithUpdateContact(name string, buyerclass string, contact *Contact, identity string, tmpid string) {
|
|
|
+ info := su.MysqlGlobalTool.FindOne(su.G_Units_Baseinfo, map[string]interface{}{"name": name}, "", "")
|
|
|
+ isNewEnt := false
|
|
|
+ isNewEntInfo := map[string]interface{}{}
|
|
|
+ if info == nil {
|
|
|
+ } else { //判断企业库最新的company_id是否与当前保持一致
|
|
|
+ qyxy_info := CreateQyxyInfo(name)
|
|
|
+ if len(qyxy_info) > 0 {
|
|
|
+ new_company_id := qu.ObjToString(qyxy_info["_id"])
|
|
|
+ old_company_id := qu.ObjToString((*info)["company_id"])
|
|
|
+ if old_company_id != "" && new_company_id != "" && old_company_id != new_company_id {
|
|
|
+ isNewEnt = true
|
|
|
+ isNewEntInfo["name"] = name
|
|
|
+ isNewEntInfo["name_id"] = qu.ObjToString((*info)["name_id"])
|
|
|
+ isNewEntInfo["company_id"] = old_company_id
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if info == nil || isNewEnt { //新增主体信息~企业补充~年报补充
|
|
|
+ base_info := map[string]interface{}{}
|
|
|
+ base_info["name"] = name
|
|
|
+ qyxy_info := CreateQyxyInfo(name)
|
|
|
+ //通讯录数据结构
|
|
|
+ contact_arr := []map[string]interface{}{}
|
|
|
+ area, city, district := "", "", ""
|
|
|
+ name_id := uuid.New().String()
|
|
|
+ name_id = strings.ReplaceAll(name_id, "-", "")
|
|
|
+ base_info["name_id"] = name_id
|
|
|
+ base_info["identity_type"] = qu.IntAll(Str2DEC(identity))
|
|
|
+ base_info["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
|
|
|
+ //创建通讯录信息
|
|
|
+ if contact.Per != "" && contact.Tel != "" {
|
|
|
+ date := ""
|
|
|
+ if contact.Publishtime > 0 {
|
|
|
+ date = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
|
|
|
+ }
|
|
|
+ contact_arr = append(contact_arr, map[string]interface{}{
|
|
|
+ "person": contact.Per,
|
|
|
+ "tel": contact.Tel,
|
|
|
+ "date": date,
|
|
|
+ "email": "",
|
|
|
+ "type": 1,
|
|
|
+ "identity": base_info["identity_type"],
|
|
|
+ })
|
|
|
+ }
|
|
|
+ if len(qyxy_info) > 0 { //含企业信息
|
|
|
+ info_id := qu.ObjToString(qyxy_info["_id"])
|
|
|
+ base_info["company_id"] = info_id
|
|
|
+ address := qu.ObjToString(qyxy_info["company_address"])
|
|
|
+ if utf8.RuneCountInString(address) > 300 {
|
|
|
+ address = ""
|
|
|
+ }
|
|
|
+ base_info["address"] = address
|
|
|
+ area = qu.ObjToString(qyxy_info["company_area"])
|
|
|
+ city = qu.ObjToString(qyxy_info["company_city"])
|
|
|
+ district = qu.ObjToString(qyxy_info["company_district"])
|
|
|
+ legal_person := qu.ObjToString(qyxy_info["legal_person"])
|
|
|
+ if legal_person != "" { //年报信息
|
|
|
+ CreateAnnualInfo(info_id, legal_person, &contact_arr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ base_info["area_code"], base_info["city_code"], base_info["district_code"] = CalculateRegionCode(area, city, district)
|
|
|
+ //新增主体信息表
|
|
|
+ b := su.InsertMysqlData(su.G_Units_Baseinfo, base_info, tmpid)
|
|
|
+ if b > 0 { //存通讯录
|
|
|
+ for _, v := range contact_arr {
|
|
|
+ info_book := map[string]interface{}{}
|
|
|
+ info_book["name_id"] = name_id
|
|
|
+ info_book["contact_name"] = qu.ObjToString(v["person"])
|
|
|
+ info_book["contact_tel"] = qu.ObjToString(v["tel"])
|
|
|
+ info_book["contact_email"] = qu.ObjToString(v["email"])
|
|
|
+ date := qu.ObjToString(v["date"])
|
|
|
+ if date != "" {
|
|
|
+ info_book["publishtime"] = date
|
|
|
+ }
|
|
|
+ info_book["source_type"] = qu.IntAll(v["type"])
|
|
|
+ info_book["identity_type"] = qu.IntAll(v["identity"])
|
|
|
+ info_book["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
|
|
|
+ su.InsertMysqlData(su.G_Units_Contact, info_book, tmpid)
|
|
|
+ }
|
|
|
+ //存主体标签记录~暂时只进采购单位的主体
|
|
|
+ if qu.IntAll(identity)%10 == 1 {
|
|
|
+ info_tag := map[string]interface{}{}
|
|
|
+ info_tag["name_id"] = name_id
|
|
|
+ info_tag["labelcode"] = "1"
|
|
|
+ labelvalues := "00"
|
|
|
+ if su.BuyerClassData[buyerclass] != "" {
|
|
|
+ labelvalues = su.BuyerClassData[buyerclass]
|
|
|
+ }
|
|
|
+ info_tag["labelvalues"] = labelvalues //代码表
|
|
|
+ info_tag["identity_type"] = 1
|
|
|
+ info_tag["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
|
|
|
+ su.InsertMysqlData(su.G_Units_Tags, info_tag, tmpid)
|
|
|
+ }
|
|
|
+ if isNewEnt {
|
|
|
+ isNewEntInfo["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
|
|
|
+ su.InsertMysqlData(su.G_Units_Warning, isNewEntInfo, tmpid)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //更新~主体基本信息表
|
|
|
+ name_id := qu.ObjToString((*info)["name_id"])
|
|
|
+ update_info := map[string]interface{}{}
|
|
|
+ info_identity := ConvertToBin(Str2DEC(qu.ObjToString((*info)["identity_type"])))
|
|
|
+ info_identity = SupplementIdentityType(info_identity)
|
|
|
+ if info_identity != identity {
|
|
|
+ new_identity := CalculateIdentityType(info_identity, identity)
|
|
|
+ if new_identity != info_identity {
|
|
|
+ update_info["identity_type"] = qu.IntAll(Str2DEC(new_identity))
|
|
|
+ update_info["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
|
|
|
+ su.MysqlGlobalTool.Update(su.G_Units_Baseinfo, map[string]interface{}{"name_id": name_id}, update_info)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //更新~主体标签记录表
|
|
|
+ if buyerclass != "" && contact.Buyer {
|
|
|
+ info_tag := su.MysqlGlobalTool.FindOne(su.G_Units_Tags, map[string]interface{}{"name_id": name_id}, "", "")
|
|
|
+ if info_tag != nil {
|
|
|
+ update_tag := map[string]interface{}{}
|
|
|
+ cur_code := "00"
|
|
|
+ if su.BuyerClassData[buyerclass] != "" {
|
|
|
+ cur_code = su.BuyerClassData[buyerclass]
|
|
|
+ }
|
|
|
+ old_code := qu.ObjToString((*info_tag)["labelvalues"])
|
|
|
+ if cur_code != old_code {
|
|
|
+ update_tag["labelvalues"] = cur_code
|
|
|
+ update_tag["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
|
|
|
+ su.MysqlGlobalTool.Update(su.G_Units_Tags, map[string]interface{}{"name_id": name_id}, update_tag)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if contact.Per != "" && contact.Tel != "" {
|
|
|
+ contact_datas := su.MysqlGlobalTool.Find(su.G_Units_Contact, map[string]interface{}{"name_id": name_id}, "", "", -1, -1)
|
|
|
+ isExists := false
|
|
|
+ for _, v := range *contact_datas {
|
|
|
+ update_id := qu.IntAll(v["id"])
|
|
|
+ person := qu.ObjToString(v["contact_name"])
|
|
|
+ tel := qu.ObjToString(v["contact_tel"])
|
|
|
+ pt_str := qu.ObjToString(v["publishtime"])
|
|
|
+ pt := int64(0)
|
|
|
+ if pt_str != "" {
|
|
|
+ t, _ := time.ParseInLocation(su.TimeLayout, pt_str, time.Local)
|
|
|
+ pt = t.Unix()
|
|
|
+ }
|
|
|
+ if person+"~"+tel == contact.Per+"~"+contact.Tel {
|
|
|
+ isExists = true
|
|
|
+ old_contact_identity := ConvertToBin(Str2DEC(qu.ObjToString(v["identity_type"])))
|
|
|
+ old_contact_identity = SupplementIdentityType(old_contact_identity)
|
|
|
+ if old_contact_identity != identity || qu.IntAll(v["source_type"]) == 2 || (contact.Publishtime > pt && contact.Publishtime > 0) {
|
|
|
+ new_identity := CalculateIdentityType(old_contact_identity, identity)
|
|
|
+ update_contact := map[string]interface{}{}
|
|
|
+ update_contact["source_type"] = 1
|
|
|
+ update_contact["identity_type"] = qu.IntAll(Str2DEC(new_identity))
|
|
|
+ update_contact["updatetime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
|
|
|
+ if contact.Publishtime > pt {
|
|
|
+ update_contact["publishtime"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
|
|
|
+ }
|
|
|
+ //更新通讯录
|
|
|
+ su.MysqlGlobalTool.Update(su.G_Units_Contact, map[string]interface{}{"id": update_id}, update_contact)
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !isExists {
|
|
|
+ add_contact := map[string]interface{}{}
|
|
|
+ add_contact["name_id"] = name_id
|
|
|
+ add_contact["contact_name"] = contact.Per
|
|
|
+ add_contact["contact_tel"] = contact.Tel
|
|
|
+ add_contact["contact_email"] = ""
|
|
|
+ add_contact["source_type"] = 1
|
|
|
+ if contact.Publishtime > 0 {
|
|
|
+ add_contact["publishtime"] = time.Unix(contact.Publishtime, 0).Format(ul.TimeLayout)
|
|
|
+ }
|
|
|
+ add_contact["identity_type"] = qu.IntAll(Str2DEC(identity))
|
|
|
+ add_contact["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
|
|
|
+ //新增~通讯录表
|
|
|
+ su.InsertMysqlData(su.G_Units_Contact, add_contact, tmpid)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+//创建年报字段~与现有的通讯录
|
|
|
+func CreateAnnualInfo(company_id string, legal_person string, contact_arr *[]map[string]interface{}) {
|
|
|
+ keys := map[string]string{}
|
|
|
+ for _, v := range *contact_arr {
|
|
|
+ key := qu.ObjToString(v["person"]) + "~" + qu.ObjToString(v["tel"])
|
|
|
+ keys[key] = key
|
|
|
+ }
|
|
|
+ dataArr, _ := su.SpiMgo.Find("annual_report_base", map[string]interface{}{"company_id": company_id}, nil, map[string]interface{}{
|
|
|
+ "company_phone": 1,
|
|
|
+ "company_email": 1,
|
|
|
+ "report_date": 1,
|
|
|
+ })
|
|
|
+ for _, v := range dataArr {
|
|
|
+ company_phone := qu.ObjToString(v["company_phone"])
|
|
|
+ company_email := qu.ObjToString(v["company_email"])
|
|
|
+ if company_phone != "" {
|
|
|
+ report_date := qu.ObjToString(v["report_date"])
|
|
|
+ key := legal_person + "~" + company_phone
|
|
|
+ if keys[key] == "" {
|
|
|
+ dict := map[string]interface{}{
|
|
|
+ "person": legal_person,
|
|
|
+ "tel": company_phone,
|
|
|
+ "date": report_date,
|
|
|
+ "email": company_email,
|
|
|
+ "type": 2,
|
|
|
+ "identity": 0,
|
|
|
+ }
|
|
|
+ (*contact_arr) = append((*contact_arr), dict)
|
|
|
+ keys[key] = key
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//创建企业信息
|
|
|
+func CreateQyxyInfo(name string) map[string]interface{} {
|
|
|
+ //查询企业库
|
|
|
+ qyxy_info := map[string]interface{}{}
|
|
|
+ dataArr, _ := su.QyxyMgo.Find("qyxy_std", map[string]interface{}{"company_name": name}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{
|
|
|
+ "_id": 1,
|
|
|
+ "company_name": 1,
|
|
|
+ "company_address": 1,
|
|
|
+ "company_area": 1,
|
|
|
+ "company_city": 1,
|
|
|
+ "company_district": 1,
|
|
|
+ "legal_person": 1,
|
|
|
+ "company_phone": 1,
|
|
|
+ "company_email": 1,
|
|
|
+ })
|
|
|
+ if len(dataArr) > 0 {
|
|
|
+ qyxy_info = dataArr[0] //补充企业信息
|
|
|
+ } else {
|
|
|
+ data := su.SpiMgo.FindOne("company_history_name", map[string]interface{}{
|
|
|
+ "history_name": name,
|
|
|
+ })
|
|
|
+ if len(data) > 0 {
|
|
|
+ company_id := qu.ObjToString(data["company_id"])
|
|
|
+ qyxy_info = su.QyxyMgo.FindOne("qyxy_std", map[string]interface{}{"_id": company_id})
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return qyxy_info
|
|
|
+}
|