瀏覽代碼

备份~文件

zhengkun 2 年之前
父節點
當前提交
a9fd1d1a41

+ 21 - 0
fieldproject_medical/data_subject/src/main.go

@@ -0,0 +1,21 @@
+package main
+
+import (
+	"subject_sql"
+	su "subject_util"
+)
+
+func init() {
+	su.IsLocal = true
+	su.InitClass()
+	//s_udp.InitUdpMsg()
+}
+
+func main() {
+	//subject.RunSubjectFullDataInfo("632892000000000000000000")
+	//subject_heal.HealSubjectInfoData()
+	subject_sql.ExportSqlInfoData()
+
+	lock := make(chan bool)
+	<-lock
+}

+ 14 - 0
fieldproject_medical/data_subject/src/subject/info.go

@@ -0,0 +1,14 @@
+package subject
+
+type Info struct {
+	identity   string
+	buyerclass string
+	contact    map[string]*Contact
+}
+type Contact struct {
+	per    string
+	tel    string
+	buyer  bool
+	agency bool
+	winner bool
+}

+ 28 - 0
fieldproject_medical/data_subject/src/subject/mem.go

@@ -0,0 +1,28 @@
+package subject
+
+import (
+	"fmt"
+	"runtime"
+)
+
+func toMegaBytes(bytes uint64) float64 {
+	return float64(bytes) / 1024 / 1024
+}
+
+func traceMemStats() {
+	var ms runtime.MemStats
+	runtime.ReadMemStats(&ms)
+	var result = make([]float64, 7)
+	result[0] = float64(ms.HeapObjects)
+	result[1] = toMegaBytes(ms.HeapAlloc)
+	result[2] = toMegaBytes(ms.TotalAlloc)
+	result[3] = toMegaBytes(ms.HeapSys)
+	result[4] = toMegaBytes(ms.HeapIdle)
+	result[5] = toMegaBytes(ms.HeapReleased)
+	result[6] = toMegaBytes(ms.HeapIdle - ms.HeapReleased)
+
+	for _, v := range result {
+		fmt.Printf("%.2f\t", v)
+	}
+	fmt.Printf("\n")
+}

+ 179 - 0
fieldproject_medical/data_subject/src/subject/subject.go

@@ -0,0 +1,179 @@
+package subject
+
+import (
+	log "github.com/donnie4w/go-logger/logger"
+	qu "qfw/util"
+	su "subject_util"
+	"sync"
+)
+
+var (
+	dataLock     sync.Mutex
+	SubjectInfos = map[string]*Info{}
+)
+
+var BidFields = map[string]interface{}{
+	"extracttype": 1, "buyerclass": 1,
+	"buyer": 1, "agency": 1, "winner": 1,
+	"agencytel": 1, "winnertel": 1, "buyertel": 1,
+	"agencyperson": 1, "winnerperson": 1, "buyerperson": 1,
+}
+
+//全量数据准备
+func RunSubjectFullDataInfo(lteid string) {
+	log.Debug("开始准备全量主体~~~")
+	sess := su.SaveMgo.GetMgoConn()
+	defer su.SaveMgo.DestoryMongoConn(sess)
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$lte": su.StringTOBsonId(lteid),
+		},
+	}
+	log.Debug("查询语句 ~ ", q)
+	it := sess.DB(su.SaveMgo.DbName).C(su.S_Coll_Name).Find(&q).Sort("_id").Select(BidFields).Iter()
+	pool := make(chan bool, 8)
+	wg := &sync.WaitGroup{}
+	total := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%10000 == 0 {
+			log.Debug("cur index ", total, "~", len(SubjectInfos))
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			if qu.IntAll(tmp["extracttype"]) != 1 {
+				return
+			}
+			if qu.ObjToString(tmp["buyer"]) == "" &&
+				qu.ObjToString(tmp["agency"]) == "" &&
+				qu.ObjToString(tmp["winner"]) == "" {
+				return
+			}
+			createBaseInfo(tmp) //处理数据
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Debug("is over ~ ", total, "~", len(SubjectInfos))
+	saveMgoInfo()
+}
+
+func saveMgoInfo() {
+	//多线程保存数据
+	log.Debug("开始保存~临时表数据~")
+	index := 0
+	pool := make(chan bool, 8)
+	wg := &sync.WaitGroup{}
+	for k, v := range SubjectInfos {
+		index++
+		if index%10000 == 0 {
+			log.Debug("save index ~ ", index)
+		}
+		pool <- true
+		wg.Add(1)
+		go func(k string, v *Info) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			data := map[string]interface{}{}
+			data["name"] = k
+			data["buyerclass"] = v.buyerclass
+			data["identity"] = qu.IntAll(v.identity)
+			arr := []map[string]interface{}{}
+			for _, v1 := range v.contact {
+				arr = append(arr, map[string]interface{}{
+					"person": v1.per,
+					"tel":    v1.tel,
+					"buyer":  v1.buyer,
+					"agency": v1.agency,
+					"winner": v1.winner,
+				})
+			}
+			data["contact"] = arr
+			su.SaveMgo.Save(su.O_Coll_Name, data)
+		}(k, v)
+	}
+	wg.Wait()
+	log.Debug("save over ~ ", index)
+}
+
+func createBaseInfo(tmp map[string]interface{}) {
+	buyer := qu.ObjToString(tmp["buyer"])
+	agency := qu.ObjToString(tmp["agency"])
+	winner := qu.ObjToString(tmp["winner"])
+	b_per := qu.ObjToString(tmp["buyerperson"])
+	b_tel := qu.ObjToString(tmp["buyertel"])
+	a_per := qu.ObjToString(tmp["agencyperson"])
+	a_tel := qu.ObjToString(tmp["agencytel"])
+	w_per := qu.ObjToString(tmp["winnerperson"])
+	w_tel := qu.ObjToString(tmp["winnertel"])
+	buyerclass := qu.ObjToString(tmp["buyerclass"])
+	dataLock.Lock()
+	if buyer != "" {
+		dealWithContact(buyer, buyerclass, &Contact{b_per, b_tel, true, false, false}, "001")
+	}
+	if winner != "" {
+		dealWithContact(winner, "", &Contact{w_per, w_tel, false, false, true}, "010")
+	}
+	if agency != "" {
+		dealWithContact(agency, "", &Contact{a_per, a_tel, false, true, false}, "100")
+	}
+	dataLock.Unlock()
+}
+
+func dealWithContact(name string, buyerclass string, contact *Contact, identity string) {
+	info := SubjectInfos[name]
+	if info != nil {
+		if info.identity != identity {
+			info.identity = calculateIdentityType(info.identity, identity)
+		}
+		if buyerclass != "" {
+			info.buyerclass = buyerclass
+		}
+		if contact.per != "" && contact.tel != "" {
+			key := contact.per + "~" + contact.tel
+			old_contact := info.contact[key]
+			if old_contact != nil {
+				if old_contact.buyer {
+					contact.buyer = true
+				}
+				if old_contact.agency {
+					contact.agency = true
+				}
+				if old_contact.winner {
+					contact.winner = true
+				}
+			}
+			info.contact[key] = contact
+		}
+	} else {
+		data := map[string]*Contact{}
+		if contact.per != "" && contact.tel != "" {
+			key := contact.per + "~" + contact.tel
+			data[key] = contact
+		}
+		SubjectInfos[name] = &Info{identity, buyerclass, data}
+	}
+}
+
+//计算身份类型
+func calculateIdentityType(identity string, new_identity string) string {
+	a := identity[:1]
+	b := identity[1:2]
+	c := identity[2:3]
+	if new_identity == "001" {
+		c = "1"
+	} else if new_identity == "010" {
+		b = "1"
+	} else if new_identity == "100" {
+		a = "1"
+	} else {
+
+	}
+	return a + b + c
+}

+ 114 - 0
fieldproject_medical/data_subject/src/subject_heal/heal.go

@@ -0,0 +1,114 @@
+package subject_heal
+
+import (
+	log "github.com/donnie4w/go-logger/logger"
+	qu "qfw/util"
+	su "subject_util"
+	"sync"
+)
+
+var (
+	numLock sync.Mutex
+)
+
+//治愈数据
+func HealSubjectInfoData() {
+	log.Debug("开始治愈数据~~~企业相关信息~~~")
+	sess := su.SaveMgo.GetMgoConn()
+	defer su.SaveMgo.DestoryMongoConn(sess)
+	q := map[string]interface{}{}
+	log.Debug("查询语句 ~ ", q)
+	it := sess.DB(su.SaveMgo.DbName).C(su.O_Coll_Name).Find(&q).Sort("_id").Iter()
+	pool := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+	total, isok := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%1000 == 0 {
+			log.Debug("cur index ", total, "~", isok, "~", tmp["_id"])
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			b := updateInfoData(tmp)
+			if b {
+				numLock.Lock()
+				isok++
+				numLock.Unlock()
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Debug("is over ~ ", total, "~", isok)
+}
+
+func updateInfoData(tmp map[string]interface{}) bool {
+	update := map[string]interface{}{}
+	name := qu.ObjToString(tmp["name"])
+	contact := su.IsMarkInterfaceMap(tmp["contact"])
+	dataArr, _ := su.SaveMgo.Find("qyxy_std", map[string]interface{}{"company_name": name}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{
+		"_id":              1,
+		"company_name":     1,
+		"company_address":  1,
+		"company_area":     1,
+		"company_city":     1,
+		"company_district": 1,
+		"legal_person":     1,
+		"company_phone":    1,
+	})
+	if len(dataArr) > 0 {
+		info := dataArr[0]
+		dealWithUpdateInfo(contact, info, &update)
+	} else {
+		data := su.SpiMgo.FindOne("company_history_name", map[string]interface{}{
+			"history_name": name,
+		})
+		if len(data) > 0 {
+			company_id := qu.ObjToString(data["company_id"])
+			if company_id != "" {
+				info := su.SaveMgo.FindOne("qyxy_std", map[string]interface{}{"_id": company_id})
+				if len(info) > 0 && info != nil {
+					dealWithUpdateInfo(contact, info, &update)
+				}
+			}
+		}
+	}
+	if len(update) > 0 { //更新操作
+		tmpid := su.BsonTOStringId(tmp["_id"])
+		su.SaveMgo.UpdateById(su.O_Coll_Name, tmpid, map[string]interface{}{
+			"$set": update,
+		})
+		return true
+	}
+	return false
+}
+
+//处理待更新信息
+func dealWithUpdateInfo(contact []map[string]interface{}, info map[string]interface{}, update *map[string]interface{}) {
+	(*update)["company_address"] = qu.ObjToString(info["company_address"])
+	(*update)["company_id"] = qu.ObjToString(info["_id"])
+	(*update)["area"] = qu.ObjToString(info["company_area"])
+	(*update)["city"] = qu.ObjToString(info["company_city"])
+	(*update)["district"] = qu.ObjToString(info["company_district"])
+	legal_person := qu.ObjToString(info["legal_person"])
+	company_phone := qu.ObjToString(info["company_phone"])
+	if legal_person != "" && company_phone != "" {
+		is_R := false
+		for _, v := range contact {
+			person := qu.ObjToString(v["person"])
+			tel := qu.ObjToString(v["tel"])
+			if person == legal_person && company_phone == tel {
+				is_R = true
+				break
+			}
+		}
+		if !is_R {
+			(*update)["legal_person"] = legal_person
+			(*update)["legal_tel"] = company_phone
+		}
+	}
+}

+ 48 - 0
fieldproject_medical/data_subject/src/subject_sql/savesql.go

@@ -0,0 +1,48 @@
+package subject_sql
+
+import (
+	log "github.com/donnie4w/go-logger/logger"
+	qu "qfw/util"
+	su "subject_util"
+	"sync"
+)
+
+//导出sql数据
+func ExportSqlInfoData() {
+	log.Debug("开始治愈数据~~~企业相关信息~~~")
+	sess := su.SaveMgo.GetMgoConn()
+	defer su.SaveMgo.DestoryMongoConn(sess)
+	q := map[string]interface{}{}
+	log.Debug("查询语句 ~ ", q)
+	it := sess.DB(su.SaveMgo.DbName).C(su.O_Coll_Name).Find(&q).Sort("_id").Iter()
+	pool := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+	total, isok := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%1000 == 0 {
+			log.Debug("cur index ", total, "~", isok, "~", tmp["_id"])
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+
+			//处理信息
+			dealWithSqlInfo(tmp)
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Debug("is over ~ ", total, "~", isok)
+}
+
+//处理信息
+func dealWithSqlInfo(tmp map[string]interface{}) {
+	info := map[string]interface{}{}
+	info["name"] = qu.ObjToString(tmp["name"])
+
+}

+ 61 - 0
fieldproject_medical/data_subject/src/subject_udp/udp.go

@@ -0,0 +1,61 @@
+package subject_udp
+
+import (
+	"encoding/json"
+	log "github.com/donnie4w/go-logger/logger"
+	mu "mfw/util"
+	"net"
+	qu "qfw/util"
+	"sync"
+)
+
+var (
+	udpclient mu.UdpClient
+	udpLock   sync.Mutex
+)
+
+func InitUdpMsg() {
+	port := ":1555"
+	udpclient = mu.UdpClient{Local: port, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Debug("监听~", port)
+}
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		if err != nil {
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+		} else if mapInfo != nil {
+			sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
+			if sid == "" || eid == "" {
+				log.Debug("异常~", sid, "~", eid)
+			} else {
+				key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
+				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+				udpLock.Lock()
+				udpLock.Unlock()
+			}
+		}
+	case mu.OP_NOOP: //下个节点回应
+		ok := string(data)
+		if ok != "" {
+			log.Debug("接收~", ok)
+		}
+	}
+}
+
+func processNextNode(sid string, eid string, key string) {
+	//by, _ := json.Marshal(map[string]interface{}{
+	//	"gtid":  sid,
+	//	"lteid": eid,
+	//	"key":   key,
+	//})
+	//addr := &net.UDPAddr{
+	//	IP:   net.ParseIP("172.17.4.189"),
+	//	Port: 152322,
+	//}
+	//udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+}

+ 86 - 0
fieldproject_medical/data_subject/src/subject_util/initcfg.go

@@ -0,0 +1,86 @@
+package subject_util
+
+import (
+	log "github.com/donnie4w/go-logger/logger"
+)
+
+var (
+	SaveMgo, SpiMgo *MongodbSim
+	TimeLayout      = "2006-01-02 15:04:05"
+	MysqlTool       *Mysql
+	IsLocal         bool
+	S_Coll_Name     = "bidding"
+	O_Coll_Name     = "zktest_subject_data"
+)
+
+func InitClass() {
+	initMgo()
+	initMysql()
+}
+
+//初始化mgo
+func initMgo() {
+	if IsLocal {
+		SaveMgo = &MongodbSim{
+			MongodbAddr: "127.0.0.1:27017",
+			DbName:      "mixdata",
+			Size:        10,
+			UserName:    "",
+			Password:    "",
+		}
+		SaveMgo.InitPool()
+
+		SpiMgo = &MongodbSim{
+			MongodbAddr: "127.0.0.1:27017",
+			DbName:      "mixdata",
+			Size:        10,
+			UserName:    "",
+			Password:    "",
+		}
+		SpiMgo.InitPool()
+	} else {
+		SaveMgo = &MongodbSim{
+			MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
+			DbName:      "mixdata",
+			Size:        10,
+			UserName:    "zhengkun",
+			Password:    "zk@123123",
+		}
+		SaveMgo.InitPool()
+
+		SpiMgo = &MongodbSim{
+			MongodbAddr: "172.17.4.181:27001",
+			DbName:      "mixdata",
+			Size:        10,
+			UserName:    "",
+			Password:    "",
+		}
+		SpiMgo.InitPool()
+	}
+}
+
+func initMysql() {
+	username, password := "root", "=PDT49#80Z!RVv52_z"
+	address := "192.168.3.217:4000"
+	if !IsLocal {
+		username = "zhengkun"
+		password = "Zk#20220824"
+		address = "172.17.4.242:4000"
+	}
+	MysqlTool = &Mysql{
+		Address:  address,
+		UserName: username,
+		PassWord: password,
+		DBName:   "global_common_data",
+	}
+	MysqlTool.Init()
+}
+
+//插入数据
+func InsertMysqlData(name string, data map[string]interface{}, mark string) int64 {
+	inb := MysqlTool.Insert(name, data)
+	if inb == -1 {
+		log.Debug("插入数据异常...", name, "~", mark)
+	}
+	return inb
+}

+ 354 - 0
fieldproject_medical/data_subject/src/subject_util/mgo.go

@@ -0,0 +1,354 @@
+package subject_util
+
+import (
+	"context"
+	"log"
+	qu "qfw/util"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName != "" && m.Password != "" {
+		cre := options.Credential{
+			Username: m.UserName,
+			Password: m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}
+
+//return
+func IsMarkInterfaceMap(t interface{}) []map[string]interface{} {
+	p_list := []map[string]interface{}{}
+	if yl_list_1, ok_1 := t.(primitive.A); ok_1 {
+		p_list = qu.ObjArrToMapArr(yl_list_1)
+	} else {
+		if yl_list_2, ok_2 := t.([]interface{}); ok_2 {
+			p_list = qu.ObjArrToMapArr(yl_list_2)
+		}
+	}
+	return p_list
+}
+
+//return
+func IsMarkInterfaceArr(t interface{}) []string {
+	sub_list := []string{}
+	if list_1, ok_1 := t.(primitive.A); ok_1 {
+		sub_list = qu.ObjArrToStringArr(list_1)
+	} else {
+		if list_2, ok_2 := t.([]interface{}); ok_2 {
+			sub_list = qu.ObjArrToStringArr(list_2)
+		}
+	}
+	return sub_list
+}

+ 505 - 0
fieldproject_medical/data_subject/src/subject_util/mysql.go

@@ -0,0 +1,505 @@
+package subject_util
+
+import (
+	"bytes"
+	"database/sql"
+	"fmt"
+	"log"
+	"reflect"
+	"strings"
+	"time"
+
+	_ "github.com/go-sql-driver/mysql"
+)
+
+type Mysql struct {
+	Address      string  //数据库地址:端口
+	UserName     string  //用户名
+	PassWord     string  //密码
+	DBName       string  //数据库名
+	DB           *sql.DB //数据库连接池对象
+	MaxOpenConns int     //用于设置最大打开的连接数,默认值为0表示不限制。
+	MaxIdleConns int     //用于设置闲置的连接数。
+}
+
+func (m *Mysql) Init() {
+	if m.MaxOpenConns <= 0 {
+		m.MaxOpenConns = 20
+	}
+	if m.MaxIdleConns <= 0 {
+		m.MaxIdleConns = 20
+	}
+	var err error //utf8mb4
+	m.DB, err = sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4", m.UserName, m.PassWord, m.Address, m.DBName))
+	if err != nil {
+		log.Println(err)
+		return
+	}
+	m.DB.SetMaxOpenConns(m.MaxOpenConns)
+	m.DB.SetMaxIdleConns(m.MaxIdleConns)
+	m.DB.SetConnMaxLifetime(time.Minute * 3)
+	err = m.DB.Ping()
+	if err != nil {
+		log.Println(err)
+	}
+}
+
+//新增
+func (m *Mysql) Insert(tableName string, data map[string]interface{}) int64 {
+	return m.InsertByTx(nil, tableName, data)
+}
+
+//带有事务的新增
+func (m *Mysql) InsertByTx(tx *sql.Tx, tableName string, data map[string]interface{}) int64 {
+	fields := []string{}
+	values := []interface{}{}
+	placeholders := []string{}
+	if tableName == "dataexport_order" {
+		if _, ok := data["user_nickname"]; ok {
+			data["user_nickname"] = ""
+		}
+	}
+	for k, v := range data {
+		fields = append(fields, k)
+		values = append(values, v)
+		placeholders = append(placeholders, "?")
+	}
+	q := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, strings.Join(fields, ","), strings.Join(placeholders, ","))
+	//log.Println("mysql", q, values)
+	return m.InsertBySqlByTx(tx, q, values...)
+}
+
+//sql语句新增
+func (m *Mysql) InsertBySql(q string, args ...interface{}) int64 {
+	return m.InsertBySqlByTx(nil, q, args...)
+}
+
+//带有事务的sql语句新增
+func (m *Mysql) InsertBySqlByTx(tx *sql.Tx, q string, args ...interface{}) int64 {
+	result, _ := m.ExecBySqlByTx(tx, q, args...)
+	if result == nil {
+		return -1
+	}
+	id, err := result.LastInsertId()
+	if err != nil {
+		log.Println(err)
+		return -1
+	}
+	return id
+}
+
+//批量新增
+func (m *Mysql) InsertIgnoreBatch(tableName string, fields []string, values []interface{}) (int64, int64) {
+	return m.InsertIgnoreBatchByTx(nil, tableName, fields, values)
+}
+
+//带事务的批量新增
+func (m *Mysql) InsertIgnoreBatchByTx(tx *sql.Tx, tableName string, fields []string, values []interface{}) (int64, int64) {
+	return m.insertOrReplaceBatchByTx(tx, "INSERT", "IGNORE", tableName, fields, values)
+}
+
+//批量新增
+func (m *Mysql) InsertBatch(tableName string, fields []string, values []interface{}) (int64, int64) {
+	return m.InsertBatchByTx(nil, tableName, fields, values)
+}
+
+//带事务的批量新增
+func (m *Mysql) InsertBatchByTx(tx *sql.Tx, tableName string, fields []string, values []interface{}) (int64, int64) {
+	return m.insertOrReplaceBatchByTx(tx, "INSERT", "", tableName, fields, values)
+}
+
+//批量更新
+func (m *Mysql) ReplaceBatch(tableName string, fields []string, values []interface{}) (int64, int64) {
+	return m.ReplaceBatchByTx(nil, tableName, fields, values)
+}
+
+//带事务的批量更新
+func (m *Mysql) ReplaceBatchByTx(tx *sql.Tx, tableName string, fields []string, values []interface{}) (int64, int64) {
+	return m.insertOrReplaceBatchByTx(tx, "REPLACE", "", tableName, fields, values)
+}
+
+func (m *Mysql) insertOrReplaceBatchByTx(tx *sql.Tx, tp string, afterInsert, tableName string, fields []string, values []interface{}) (int64, int64) {
+	placeholders := []string{}
+	for range fields {
+		placeholders = append(placeholders, "?")
+	}
+	placeholder := strings.Join(placeholders, ",")
+	array := []string{}
+	for i := 0; i < len(values)/len(fields); i++ {
+		array = append(array, fmt.Sprintf("(%s)", placeholder))
+	}
+	q := fmt.Sprintf("%s %s INTO %s (%s) VALUES %s", tp, afterInsert, tableName, strings.Join(fields, ","), strings.Join(array, ","))
+	result, _ := m.ExecBySqlByTx(tx, q, values...)
+	if result == nil {
+		return -1, -1
+	}
+	v1, e1 := result.RowsAffected()
+	if e1 != nil {
+		log.Println(e1)
+		return -1, -1
+	}
+	v2, e2 := result.LastInsertId()
+	if e2 != nil {
+		log.Println(e2)
+		return -1, -1
+	}
+	return v1, v2
+}
+
+//sql语句执行
+func (m *Mysql) ExecBySql(q string, args ...interface{}) (sql.Result, error) {
+	return m.ExecBySqlByTx(nil, q, args...)
+}
+
+//sql语句执行,带有事务
+func (m *Mysql) ExecBySqlByTx(tx *sql.Tx, q string, args ...interface{}) (sql.Result, error) {
+	var stmtIns *sql.Stmt
+	var err error
+	if tx == nil {
+		stmtIns, err = m.DB.Prepare(q)
+	} else {
+		stmtIns, err = tx.Prepare(q)
+	}
+	if err != nil {
+		log.Println(err)
+		return nil, err
+	}
+	defer stmtIns.Close()
+	result, err := stmtIns.Exec(args...)
+	if err != nil {
+		//log.Println(args, err)
+		log.Println(err)
+		return nil, err
+	}
+	return result, nil
+}
+
+/*不等于 map[string]string{"ne":"1"}
+ *不等于多个 map[string]string{"notin":[]interface{}{1,2}}
+ *字段为空 map[string]string{"name":"$isNull"}
+ *字段不为空 map[string]string{"name":"$isNotNull"}
+ */
+func (m *Mysql) Find(tableName string, query map[string]interface{}, fields, order string, start, pageSize int) *[]map[string]interface{} {
+	fs := []string{}
+	vs := []interface{}{}
+	for k, v := range query {
+		rt := reflect.TypeOf(v)
+		rv := reflect.ValueOf(v)
+		if rt.Kind() == reflect.Map {
+			for _, rv_k := range rv.MapKeys() {
+				if rv_k.String() == "ne" {
+					fs = append(fs, fmt.Sprintf("%s!=?", k))
+					vs = append(vs, rv.MapIndex(rv_k).Interface())
+				}
+				if rv_k.String() == "notin" {
+					if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
+						for _, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
+							fs = append(fs, fmt.Sprintf("%s!=?", k))
+							vs = append(vs, v)
+						}
+					}
+				}
+				if rv_k.String() == "in" {
+					if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
+						_fs := fmt.Sprintf("%s in (?", k)
+						for k, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
+							if k > 0 {
+								_fs += ",?"
+							}
+							vs = append(vs, v)
+						}
+						_fs += ")"
+						fs = append(fs, _fs)
+					}
+				}
+			}
+		} else {
+			if v == "$isNull" {
+				fs = append(fs, fmt.Sprintf("%s is null", k))
+			} else if v == "$isNotNull" {
+				fs = append(fs, fmt.Sprintf("%s is not null", k))
+			} else {
+				fs = append(fs, fmt.Sprintf("%s=?", k))
+				vs = append(vs, v)
+			}
+		}
+	}
+	var buffer bytes.Buffer
+	buffer.WriteString("select ")
+	if fields == "" {
+		buffer.WriteString("*")
+	} else {
+		buffer.WriteString(fields)
+	}
+	buffer.WriteString(" from ")
+	buffer.WriteString(tableName)
+	if len(fs) > 0 {
+		buffer.WriteString(" where ")
+		buffer.WriteString(strings.Join(fs, " and "))
+	}
+	if order != "" {
+		buffer.WriteString(" order by ")
+		buffer.WriteString(order)
+	}
+	if start > -1 && pageSize > 0 {
+		buffer.WriteString(" limit ")
+		buffer.WriteString(fmt.Sprint(start))
+		buffer.WriteString(",")
+		buffer.WriteString(fmt.Sprint(pageSize))
+	}
+	q := buffer.String()
+	//log.Println(q, vs)
+	return m.SelectBySql(q, vs...)
+}
+
+//sql语句查询
+func (m *Mysql) SelectBySql(q string, args ...interface{}) *[]map[string]interface{} {
+	return m.SelectBySqlByTx(nil, q, args...)
+}
+func (m *Mysql) SelectBySqlByTx(tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
+	return m.Select(0, nil, tx, q, args...)
+}
+func (m *Mysql) Select(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
+	var stmtOut *sql.Stmt
+	var err error
+	if tx == nil {
+		stmtOut, err = m.DB.Prepare(q)
+	} else {
+		stmtOut, err = tx.Prepare(q)
+	}
+	if err != nil {
+		log.Println(err)
+		return nil
+	}
+	defer stmtOut.Close()
+	rows, err := stmtOut.Query(args...)
+	if err != nil {
+		log.Println(err)
+		return nil
+	}
+	if rows != nil {
+		defer rows.Close()
+	}
+	columns, err := rows.Columns()
+	if err != nil {
+		log.Println(err)
+		return nil
+	}
+	list := []map[string]interface{}{}
+	for rows.Next() {
+		scanArgs := make([]interface{}, len(columns))
+		values := make([]interface{}, len(columns))
+		ret := make(map[string]interface{})
+		for k, _ := range values {
+			scanArgs[k] = &values[k]
+		}
+		err = rows.Scan(scanArgs...)
+		if err != nil {
+			log.Println(err)
+			break
+		}
+		for i, col := range values {
+			if v, ok := col.([]uint8); ok {
+				ret[columns[i]] = string(v)
+			} else {
+				ret[columns[i]] = col
+			}
+		}
+		list = append(list, ret)
+		if bath > 0 && len(list) == bath {
+			f(&list)
+			list = []map[string]interface{}{}
+		}
+	}
+	if bath > 0 && len(list) > 0 {
+		f(&list)
+		list = []map[string]interface{}{}
+	}
+	return &list
+}
+func (m *Mysql) SelectByBath(bath int, f func(l *[]map[string]interface{}), q string, args ...interface{}) {
+	m.SelectByBathByTx(bath, f, nil, q, args...)
+}
+func (m *Mysql) SelectByBathByTx(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx, q string, args ...interface{}) {
+	m.Select(bath, f, tx, q, args...)
+}
+func (m *Mysql) FindOne(tableName string, query map[string]interface{}, fields, order string) *map[string]interface{} {
+	list := m.Find(tableName, query, fields, order, 0, 1)
+	if list != nil && len(*list) == 1 {
+		temp := (*list)[0]
+		return &temp
+	}
+	return nil
+}
+
+//修改
+func (m *Mysql) Update(tableName string, query, update map[string]interface{}) bool {
+	return m.UpdateByTx(nil, tableName, query, update)
+}
+
+//带事务的修改
+func (m *Mysql) UpdateByTx(tx *sql.Tx, tableName string, query, update map[string]interface{}) bool {
+	q_fs := []string{}
+	u_fs := []string{}
+	values := []interface{}{}
+	for k, v := range update {
+		q_fs = append(q_fs, fmt.Sprintf("%s=?", k))
+		values = append(values, v)
+	}
+	for k, v := range query {
+		u_fs = append(u_fs, fmt.Sprintf("%s=?", k))
+		values = append(values, v)
+	}
+	q := fmt.Sprintf("update %s set %s where %s", tableName, strings.Join(q_fs, ","), strings.Join(u_fs, " and "))
+	//log.Println(q, values)
+	return m.UpdateOrDeleteBySqlByTx(tx, q, values...) >= 0
+}
+
+//删除
+func (m *Mysql) Delete(tableName string, query map[string]interface{}) bool {
+	return m.DeleteByTx(nil, tableName, query)
+}
+func (m *Mysql) DeleteByTx(tx *sql.Tx, tableName string, query map[string]interface{}) bool {
+	fields := []string{}
+	values := []interface{}{}
+	for k, v := range query {
+		fields = append(fields, fmt.Sprintf("%s=?", k))
+		values = append(values, v)
+	}
+	q := fmt.Sprintf("delete from %s where %s", tableName, strings.Join(fields, " and "))
+	log.Println(q, values)
+	return m.UpdateOrDeleteBySqlByTx(tx, q, values...) > 0
+}
+
+//修改或删除
+func (m *Mysql) UpdateOrDeleteBySql(q string, args ...interface{}) int64 {
+	return m.UpdateOrDeleteBySqlByTx(nil, q, args...)
+}
+
+//带事务的修改或删除
+func (m *Mysql) UpdateOrDeleteBySqlByTx(tx *sql.Tx, q string, args ...interface{}) int64 {
+	result, err := m.ExecBySqlByTx(tx, q, args...)
+	if err != nil {
+		log.Println(err)
+		return -1
+	}
+	count, err := result.RowsAffected()
+	if err != nil {
+		log.Println(err)
+		return -1
+	}
+	return count
+}
+
+//总数
+func (m *Mysql) Count(tableName string, query map[string]interface{}) int64 {
+	fields := []string{}
+	values := []interface{}{}
+	for k, v := range query {
+		rt := reflect.TypeOf(v)
+		rv := reflect.ValueOf(v)
+		if rt.Kind() == reflect.Map {
+			for _, rv_k := range rv.MapKeys() {
+				if rv_k.String() == "ne" {
+					fields = append(fields, fmt.Sprintf("%s!=?", k))
+					values = append(values, rv.MapIndex(rv_k).Interface())
+				}
+				if rv_k.String() == "notin" {
+					if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
+						for _, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
+							fields = append(fields, fmt.Sprintf("%s!=?", k))
+							values = append(values, v)
+						}
+					}
+				}
+				if rv_k.String() == "in" {
+					if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
+						_fs := fmt.Sprintf("%s in (?", k)
+						for k, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
+							if k > 0 {
+								_fs += ",?"
+							}
+							values = append(values, v)
+						}
+						_fs += ")"
+						fields = append(fields, _fs)
+					}
+				}
+			}
+		} else if v == "$isNull" {
+			fields = append(fields, fmt.Sprintf("%s is null", k))
+		} else if v == "$isNotNull" {
+			fields = append(fields, fmt.Sprintf("%s is not null", k))
+		} else {
+			fields = append(fields, fmt.Sprintf("%s=?", k))
+			values = append(values, v)
+		}
+	}
+	q := fmt.Sprintf("select count(1) as count from %s", tableName)
+	if len(query) > 0 {
+		q += fmt.Sprintf(" where %s", strings.Join(fields, " and "))
+	}
+	log.Println(q, values)
+	return m.CountBySql(q, values...)
+}
+func (m *Mysql) CountBySql(q string, args ...interface{}) int64 {
+	stmtIns, err := m.DB.Prepare(q)
+	if err != nil {
+		log.Println(err)
+		return -1
+	}
+	defer stmtIns.Close()
+
+	rows, err := stmtIns.Query(args...)
+	if err != nil {
+		log.Println(err)
+		return -1
+	}
+	if rows != nil {
+		defer rows.Close()
+	}
+	var count int64 = -1
+	if rows.Next() {
+		err = rows.Scan(&count)
+		if err != nil {
+			log.Println(err)
+		}
+	}
+	return count
+}
+
+//执行事务
+func (m *Mysql) ExecTx(msg string, f func(tx *sql.Tx) bool) bool {
+	tx, err := m.DB.Begin()
+	if err != nil {
+		log.Println(msg, "获取事务错误", err)
+	} else {
+		if f(tx) {
+			if err := tx.Commit(); err != nil {
+				log.Println(msg, "提交事务错误", err)
+			} else {
+				return true
+			}
+		} else {
+			if err := tx.Rollback(); err != nil {
+				log.Println(msg, "事务回滚错误", err)
+			}
+		}
+	}
+	return false
+}
+
+/*************方法命名不规范,上面有替代方法*************/
+func (m *Mysql) Query(query string, args ...interface{}) *[]map[string]interface{} {
+	return m.SelectBySql(query, args...)
+}
+
+func (m *Mysql) QueryCount(query string, args ...interface{}) (count int) {
+	count = -1
+	if !strings.Contains(strings.ToLower(query), "count(*)") {
+		fmt.Println("QueryCount need query like < select count(*) from ..... >")
+		return
+	}
+	count = int(m.CountBySql(query, args...))
+	return
+}