基于Golang实现内存数据库的示例详解

作者:CSGOPHER 时间:2024-01-20 05:23:18 

GO实现内存数据库

实现Redis的database层(核心层:处理命令并返回)

https://github.com/csgopher/go-redis

本文涉及以下文件:dict:定义字典的一些方法

  • sync_dict:实现dict

  • db:分数据库

  • command:定义指令

  • ping,keys,string:指令的具体处理逻辑

  • database:单机版数据库

datastruct/dict/dict.go

type Consumer func(key string, val interface{}) bool

type Dict interface {
  Get(key string) (val interface{}, exists bool)
  Len() int
  Put(key string, val interface{}) (result int)
  PutIfAbsent(key string, val interface{}) (result int)
  PutIfExists(key string, val interface{}) (result int)
  Remove(key string) (result int)
  ForEach(consumer Consumer)
  Keys() []string
  RandomKeys(limit int) []string
  RandomDistinctKeys(limit int) []string
  Clear()
}

Dict接口:Redis数据结构的接口。这里我们使用sync.Map作为字典的实现,如果想用别的数据结构,换一个实现即可

Consumer:遍历字典所有的键值对,返回值是布尔,true继续遍历,false停止遍历

datastruct/dict/sync_dict.go

type SyncDict struct {
  m sync.Map
}

func MakeSyncDict() *SyncDict {
  return &SyncDict{}
}

func (dict *SyncDict) Get(key string) (val interface{}, exists bool) {
  val, ok := dict.m.Load(key)
  return val, ok
}

func (dict *SyncDict) Len() int {
  length := 0
  dict.m.Range(func(k, v interface{}) bool {
     length++
     return true
  })
  return length
}

func (dict *SyncDict) Put(key string, val interface{}) (result int) {
  _, existed := dict.m.Load(key)
  dict.m.Store(key, val)
  if existed {
     return 0
  }
  return 1
}

func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) {
  _, existed := dict.m.Load(key)
  if existed {
     return 0
  }
  dict.m.Store(key, val)
  return 1
}

func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) {
  _, existed := dict.m.Load(key)
  if existed {
     dict.m.Store(key, val)
     return 1
  }
  return 0
}

func (dict *SyncDict) Remove(key string) (result int) {
  _, existed := dict.m.Load(key)
  dict.m.Delete(key)
  if existed {
     return 1
  }
  return 0
}

func (dict *SyncDict) ForEach(consumer Consumer) {
  dict.m.Range(func(key, value interface{}) bool {
     consumer(key.(string), value)
     return true
  })
}

func (dict *SyncDict) Keys() []string {
  result := make([]string, dict.Len())
  i := 0
  dict.m.Range(func(key, value interface{}) bool {
     result[i] = key.(string)
     i++
     return true
  })
  return result
}

func (dict *SyncDict) RandomKeys(limit int) []string {
  result := make([]string, limit)
  for i := 0; i < limit; i++ {
     dict.m.Range(func(key, value interface{}) bool {
        result[i] = key.(string)
        return false
     })
  }
  return result
}

func (dict *SyncDict) RandomDistinctKeys(limit int) []string {
  result := make([]string, limit)
  i := 0
  dict.m.Range(func(key, value interface{}) bool {
     result[i] = key.(string)
     i++
     if i == limit {
        return false
     }
     return true
  })
  return result
}

func (dict *SyncDict) Clear() {
  *dict = *MakeSyncDict()
}

使用sync.Map实现Dict接口

database/db.go

type DB struct {
index int
data  dict.Dict
}

type ExecFunc func(db *DB, args [][]byte) resp.Reply

type CmdLine = [][]byte

func makeDB() *DB {
db := &DB{
data: dict.MakeSyncDict(),
}
return db
}

func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply {
cmdName := strings.ToLower(string(cmdLine[0]))
cmd, ok := cmdTable[cmdName]
if !ok {
return reply.MakeErrReply("ERR unknown command '" + cmdName + "'")
}
if !validateArity(cmd.arity, cmdLine) {
return reply.MakeArgNumErrReply(cmdName)
}
fun := cmd.executor
return fun(db, cmdLine[1:]) // 把 set k v 中的set切掉
}

func validateArity(arity int, cmdArgs [][]byte) bool {
argNum := len(cmdArgs)
if arity >= 0 {
return argNum == arity
}
return argNum >= -arity
}

func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {
raw, ok := db.data.Get(key)
if !ok {
return nil, false
}
entity, _ := raw.(*database.DataEntity)
return entity, true
}

func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
return db.data.Put(key, entity)
}

func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {
return db.data.PutIfExists(key, entity)
}

func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
return db.data.PutIfAbsent(key, entity)
}

func (db *DB) Remove(key string) {
db.data.Remove(key)
}

func (db *DB) Removes(keys ...string) (deleted int) {
deleted = 0
for _, key := range keys {
_, exists := db.data.Get(key)
if exists {
db.Remove(key)
deleted++
}
}
return deleted
}

func (db *DB) Flush() {
db.data.Clear()
}

实现Redis中的分数据库

ExecFunc:所有Redis的指令都写成这样的类型

validateArity方法:

  • 定长:set k v => arity=3;

  • 变长:exists k1 k2 k3 ... => arity=-2,表示参数>=2个

database/command.go

var cmdTable = make(map[string]*command)

type command struct {
  executor ExecFunc
  arity    int
}

func RegisterCommand(name string, executor ExecFunc, arity int) {
  name = strings.ToLower(name)
  cmdTable[name] = &command{
     executor: executor,
     arity:    arity,
  }
}

command:每一个command结构体都是一个指令,例如ping,keys等等

  • arity:参数数量

  • cmdTable:记录所有指令和command结构体的关系

  • RegisterCommand:注册指令的实现,在程序

database/ping.go

func Ping(db *DB, args [][]byte) resp.Reply {
   if len(args) == 0 {
       return &reply.PongReply{}
   } else if len(args) == 1 {
       return reply.MakeStatusReply(string(args[0]))
   } else {
       return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command")
   }
}

func init() {
   RegisterCommand("ping", Ping, 1)
}

init方法:在启动程序时就会调用这个方法,用于初始化

database/keys.go

func execDel(db *DB, args [][]byte) resp.Reply {
  keys := make([]string, len(args))
  for i, v := range args {
     keys[i] = string(v)
  }

deleted := db.Removes(keys...)
  return reply.MakeIntReply(int64(deleted))
}

func execExists(db *DB, args [][]byte) resp.Reply {
  result := int64(0)
  for _, arg := range args {
     key := string(arg)
     _, exists := db.GetEntity(key)
     if exists {
        result++
     }
  }
  return reply.MakeIntReply(result)
}

func execFlushDB(db *DB, args [][]byte) resp.Reply {
  db.Flush()
  return &reply.OkReply{}
}

func execType(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  entity, exists := db.GetEntity(key)
  if !exists {
     return reply.MakeStatusReply("none")
  }
  switch entity.Data.(type) {
  case []byte:
     return reply.MakeStatusReply("string")
  }
  return &reply.UnknownErrReply{}
}

func execRename(db *DB, args [][]byte) resp.Reply {
  if len(args) != 2 {
     return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
  }
  src := string(args[0])
  dest := string(args[1])

entity, ok := db.GetEntity(src)
  if !ok {
     return reply.MakeErrReply("no such key")
  }
  db.PutEntity(dest, entity)
  db.Remove(src)
  return &reply.OkReply{}
}

func execRenameNx(db *DB, args [][]byte) resp.Reply {
  src := string(args[0])
  dest := string(args[1])

_, exist := db.GetEntity(dest)
  if exist {
     return reply.MakeIntReply(0)
  }

entity, ok := db.GetEntity(src)
  if !ok {
     return reply.MakeErrReply("no such key")
  }
  db.Removes(src, dest)
  db.PutEntity(dest, entity)
  return reply.MakeIntReply(1)
}

func execKeys(db *DB, args [][]byte) resp.Reply {
  pattern := wildcard.CompilePattern(string(args[0]))
  result := make([][]byte, 0)
  db.data.ForEach(func(key string, val interface{}) bool {
     if pattern.IsMatch(key) {
        result = append(result, []byte(key))
     }
     return true
  })
  return reply.MakeMultiBulkReply(result)
}

func init() {
  RegisterCommand("Del", execDel, -2)
  RegisterCommand("Exists", execExists, -2)
  RegisterCommand("Keys", execKeys, 2)
  RegisterCommand("FlushDB", execFlushDB, -1)
  RegisterCommand("Type", execType, 2)
  RegisterCommand("Rename", execRename, 3)
  RegisterCommand("RenameNx", execRenameNx, 3)
}

keys.go实现以下指令:

  • execDel:del k1 k2 k3 ...

  • execExists:exist k1 k2 k3 ...

  • execFlushDB:flushdb

  • execType:type k1

  • execRename:rename k1 k2

  • execRenameNx:renamenx k1 k2

  • execKeys:keys(依赖lib包的工具类wildcard.go)

database/string.go

func execGet(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  bytes, err := db.getAsString(key)
  if err != nil {
     return err
  }
  if bytes == nil {
     return &reply.NullBulkReply{}
  }
  return reply.MakeBulkReply(bytes)
}

func (db *DB) getAsString(key string) ([]byte, reply.ErrorReply) {
  entity, ok := db.GetEntity(key)
  if !ok {
     return nil, nil
  }
  bytes, ok := entity.Data.([]byte)
  if !ok {
     return nil, &reply.WrongTypeErrReply{}
  }
  return bytes, nil
}

func execSet(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  value := args[1]
  entity := &database.DataEntity{
     Data: value,
  }
  db.PutEntity(key, entity)
  return &reply.OkReply{}
}

func execSetNX(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  value := args[1]
  entity := &database.DataEntity{
     Data: value,
  }
  result := db.PutIfAbsent(key, entity)
  return reply.MakeIntReply(int64(result))
}

func execGetSet(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  value := args[1]

entity, exists := db.GetEntity(key)
  db.PutEntity(key, &database.DataEntity{Data: value})
  if !exists {
     return reply.MakeNullBulkReply()
  }
  old := entity.Data.([]byte)
  return reply.MakeBulkReply(old)
}

func execStrLen(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  entity, exists := db.GetEntity(key)
  if !exists {
     return reply.MakeNullBulkReply()
  }
  old := entity.Data.([]byte)
  return reply.MakeIntReply(int64(len(old)))
}

func init() {
  RegisterCommand("Get", execGet, 2)
  RegisterCommand("Set", execSet, -3)
  RegisterCommand("SetNx", execSetNX, 3)
  RegisterCommand("GetSet", execGetSet, 3)
  RegisterCommand("StrLen", execStrLen, 2)
}

string.go实现以下指令:

  • execGet:get k1

  • execSet:set k v

  • execSetNX:setnex k v

  • execGetSet:getset k v 返回旧值

  • execStrLen:strlen k

database/database.go

type Database struct {
  dbSet []*DB
}

func NewDatabase() *Database {
  mdb := &Database{}
  if config.Properties.Databases == 0 {
     config.Properties.Databases = 16
  }
  mdb.dbSet = make([]*DB, config.Properties.Databases)
  for i := range mdb.dbSet {
     singleDB := makeDB()
     singleDB.index = i
     mdb.dbSet[i] = singleDB
  }
  return mdb
}

func (mdb *Database) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {
  defer func() {
     if err := recover(); err != nil {
        logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
     }
  }()

cmdName := strings.ToLower(string(cmdLine[0]))
  if cmdName == "select" {
     if len(cmdLine) != 2 {
        return reply.MakeArgNumErrReply("select")
     }
     return execSelect(c, mdb, cmdLine[1:])
  }
  dbIndex := c.GetDBIndex()
  selectedDB := mdb.dbSet[dbIndex]
  return selectedDB.Exec(c, cmdLine)
}

func execSelect(c resp.Connection, mdb *Database, args [][]byte) resp.Reply {
  dbIndex, err := strconv.Atoi(string(args[0]))
  if err != nil {
     return reply.MakeErrReply("ERR invalid DB index")
  }
  if dbIndex >= len(mdb.dbSet) {
     return reply.MakeErrReply("ERR DB index is out of range")
  }
  c.SelectDB(dbIndex)
  return reply.MakeOkReply()
}

func (mdb *Database) Close() {
}

func (mdb *Database) AfterClientClose(c resp.Connection) {
}
  • Database:一组db的集合

  • Exec:执行切换db指令或者其他指令

  • execSelect方法:选择db(指令:select 2)

resp/handler/handler.go

import (
database2 "go-redis/database"
)

func MakeHandler() *RespHandler {
  var db database.Database
  db = database2.NewDatabase()
  return &RespHandler{
     db: db,
  }
}

修改实现协议层handler的database实现

架构小结

TCP层服务TCP的连接,然后将连接交给RESP协议层的handler,handler监听客户端的连接,将指令解析后发给管道,管道转给database层(database/database.go),核心层根据命令类型执行不同的方法,然后返回。

来源:https://www.cnblogs.com/csgopher/p/17249335.html

标签:Golang,内存,数据库
0
投稿

猜你喜欢

  • python基础之内置函数

    2022-02-28 09:15:58
  • 网页绿色系配色应用实例

    2008-08-26 11:51:00
  • python实战之90行代码写个猜数字游戏

    2023-05-24 21:14:49
  • Python设计模式之备忘录模式原理与用法详解

    2022-12-27 11:12:01
  • Python爬虫headers处理及网络超时问题解决方案

    2022-11-19 23:38:34
  • 跨浏览器实现float:center,No CSS hacks

    2008-08-22 12:59:00
  • PyTorch 对应点相乘、矩阵相乘实例

    2021-10-23 16:33:29
  • 使用Python标准库中的wave模块绘制乐谱的简单教程

    2023-11-20 14:21:35
  • 6个卓越Web设计细节[译]

    2010-03-24 18:34:00
  • python操作mysql代码总结

    2024-01-24 07:36:19
  • 关联的 script 标签

    2009-11-02 10:40:00
  • 40个网页设计常用小代码

    2008-01-01 19:27:00
  • python 下载文件的多种方法汇总

    2023-08-11 16:50:05
  • tensorflow使用L2 regularization正则化修正overfitting过拟合方式

    2023-11-26 04:07:32
  • python 使用Tensorflow训练BP神经网络实现鸢尾花分类

    2023-04-15 13:29:00
  • 利用Python pyecharts绘制饼图

    2023-06-13 08:56:21
  • js经验分享 JavaScript反调试技巧

    2023-07-13 07:53:04
  • 如何基于Python爬取隐秘的角落评论

    2022-02-17 05:31:43
  • Java正则表达式循环匹配字符串方式

    2023-07-25 03:29:48
  • django 实现编写控制登录和访问权限控制的中间件方法

    2021-04-26 21:08:53
  • asp之家 网络编程 m.aspxhome.com