123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- package db
- import (
- "encoding/json"
- "errors"
- "github.com/boltdb/bolt"
- qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- be "spider_creator/backend"
- )
- // 在新增表的时候,需要在这里增加实体清单
- type Entity interface {
- be.SpiderConfig | be.Job
- }
- var (
- // 在新增表的时候,需要在这里增加表名
- DB_TABLES = []string{"myBucket", "jobs", "spiderconfig", "systemconfig"}
- Db *SpiderDb
- )
- type (
- //SpiderDB 爬虫库,这里模拟真实数据库
- SpiderDb struct {
- db *bolt.DB
- enf be.EventNotifyFace
- }
- )
- // NewSpiderDb
- func NewSpiderDb(dbfile string, enf be.EventNotifyFace) *SpiderDb {
- db, err := bolt.Open(dbfile, 0600, nil)
- if err != nil {
- qu.Debug("db error", err.Error())
- }
- for _, t := range DB_TABLES {
- err = db.Update(func(tx *bolt.Tx) error {
- _, err := tx.CreateBucketIfNotExists([]byte(t))
- return err
- })
- if err != nil {
- qu.Debug("db error", err.Error())
- }
- }
- return &SpiderDb{
- db, enf,
- }
- }
- // Close
- func (s *SpiderDb) Close() {
- s.db.Close()
- }
- // 支持泛型的通用方法
- // LoadEntity
- func LoadEntity[T any](table, key string) (*T, error) {
- var ret *T = new(T)
- err := Db.db.View(func(tx *bolt.Tx) error {
- bucket := tx.Bucket([]byte(table))
- value := bucket.Get([]byte(key))
- if value != nil && len(value) > 0 {
- _ = json.Unmarshal(value, ret)
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- return ret, nil
- }
- // LoadEntities
- func LoadEntities[T1 Entity](table string) ([]*T1, error) {
- ret := make([]*T1, 0)
- // 开始读取事务
- err := Db.db.View(func(tx *bolt.Tx) error {
- // 遍历数据库中的所有桶
- bucket := tx.Bucket([]byte(table))
- if bucket == nil {
- return errors.New("桶不存在")
- }
- // 遍历桶中的所有键/值对
- return bucket.ForEach(func(k, v []byte) error {
- var sf = new(T1)
- json.Unmarshal(v, sf)
- if sf != nil {
- ret = append(ret, sf)
- }
- return nil
- })
- })
- if err != nil {
- return ret, err
- }
- return ret, nil
- }
- // DeleteEntity
- func DeleteEntity[T any](table, key string) error {
- return Db.db.Update(func(tx *bolt.Tx) error {
- bucket := tx.Bucket([]byte(table))
- return bucket.Delete([]byte(key))
- })
- }
- // SaveEntity
- func SaveEntity[T any](table, key string, obj *T) error {
- //加载原始数据
- value, _ := json.Marshal(obj)
- return Db.db.Update(func(tx *bolt.Tx) error {
- bucket := tx.Bucket([]byte(table))
- err := bucket.Put([]byte(key), value)
- return err
- })
- }
|