怎么实现一个分布式kv系统-2-静态分区
作者:互联网
摘要
本节要实现的有3点
- 解析toml文件
- 计算key的hash值
- 将请求路由到对应的shard
编程实现
1. 定义&解析toml规则文件
定义sharding.toml文件
[[shards]]
name = "Moscow"
id = 0
[[shards]]
name = "Minsk"
id = 1
[[shards]]
name = "Kiev"
id = 2
导入解析toml的包
添加configFile参数解析
在main中读取配置文件
import (
"github.com/BurntSushi/toml"
)
var (
configFile = flag.String("config-file", "sharding.toml", "The configuration file")
)
func main() {
flag.Parse()
var c config.Config
if _, err := toml.DecodeFile(*configFile, &c); err != nil {
log.Fatalf("failed to decode config file(%q):%v", *configFile, err)
}
}
2. 添加config模块
创建config/config.go
定义Config & Shard结构
package config
type Shard struct {
Name string
Id string
}
type Config struct {
Shards []Shard
}
3. 指定shard
先不自动shard,先手动指定shard
添加shard参数解析
检查shard是否存在
确认shard的id
// main.go
var (
shardName = flag.String("shardName", "Moscow", "The name of the shard")
)
ok, shard := c.ExistsShard(*shardName)
if !ok {
log.Fatalf("shard %v not exists", shardName)
}
// config.go
func (c *Config) ExistsShard(name string) (bool, Shard) {
var rc Shard
for _, shard := range c.Shards {
if shard.Name == name {
return true, shard
}
}
return false, rc
}
4. 写入数据到指定shard
写入规则:hash(key) % shardCounter 就是需要写入的分片
更新Server结构,添加shardCounter&shardIndex字段
// web.go
type Server struct {
db *db.Database
shardCounter int
shardIdex int
}
func NewServer(db *db.Database, shardCounter, shardIdex int) *Server {
return &Server{
db: db,
shardCounter: shardCounter,
shardIndex: shardIndex,
}
}
导入计算hash的包: hash/fnv包,计算shard
更新GetHandler、SetHandler,调用getShard来获取应该写入数据的分片。
// web.go
func (s *Server) getShard(key string) uint64 {
h := fnv.New64()
h.Write([]byte(key))
return h.Sum64() % uint64(s.shardCounter)
}
func (s *Server) GetHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
value, err := s.db.GetKey(key)
fmt.Fprintf(w, "%q:%q; shard: %d; %v Get Called\n", key, value, s.getShard(key), err)
}
func (s *Server) SetHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
value := r.Form.Get("value")
err := s.db.SetKey(key, []byte(value))
fmt.Fprintf(w, "%q:%q; shard: %d; %v; Set called\n", key, value, s.getShard(key), err)
}
4.获取所有节点的地址
以上虽然可以计算出分片,可以还没有办法路由给其他的分片。
首先,需要知道其他分片的地址。
其次,将信息传入到Server对象中。
接着,定义redirect函数,请求转发。
最后,改造GetHandle、GetHandle转发请求。
// config.go
func (c *Config) GetAddress() map[int]string {
addrs := make(map[int]string)
for _, addr := range c.Shards {
addrs[addr.Id] = addr.Address
}
return addrs
}
// web.go
type Server struct {
db *db.Database
shardCounter int
shardIndex int
addrs map[int]string
}
// main.go
addrs := c.GetAddress()
svr := web.NewServer(db, len(c.Shards), shard.Id, addrs)
// web.go
func (s *Server) redirect(w http.ResponseWriter, r *http.Request, shard int) error {
resp, err := http.Get("http://" + s.addrs[shard] + r.RequestURI)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Error redirect request:%v", err)
return err
}
defer resp.Body.Close()
io.Copy(w, resp.Body)
return nil
}
func (s *Server) GetHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
shard := s.getShard(key)
if shard != uint32(s.shardIndex) {
err := s.redirect(w, r, int(shard))
if err != nil {
return
}
} else {
value, err := s.db.GetKey(key)
fmt.Fprintf(w, "%q:%q; target shard:%d; current shard:%d; %v Get Called\n", key, value, shard, s.shardIndex, err)
}
}
func (s *Server) SetHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
value := r.Form.Get("value")
shard := s.getShard(key)
if shard != uint32(s.shardIndex) {
err := s.redirect(w, r, int(shard))
if err != nil {
return
}
} else {
err := s.db.SetKey(key, []byte(value))
fmt.Fprintf(w, "%q:%q; target shard:%d; current shard:%d; %v Set Called\n", key, value, shard, s.shardIndex, err)
}
}
5.测试
get(){
for key in a b c;do
echo -e "\n========= get $key========="
for port in 8000 8001 8002;do
curl "http://127.0.0.1:$port/get?key=$key"
done
done
}
set(){
for key in a b c;do
echo -e "\n========= set $key========="
for port in 8000 8001 8002;do
curl "http://127.0.0.1:$port/set?key=$key&value=$key"
done
done
}
set
get
参考资料
本节完整代码:https://github.com/YuriyNasretdinov/distribkv/tree/part2
youtube视频:https://www.youtube.com/watch?v=5VK5tAyZDxQ&list=PLWwSgbaBp9XrMkjEhmTIC37WX2JfwZp7I&index=3
B站视频:https://www.bilibili.com/video/BV1nR4y177YM?p=2
标签:http,err,分区,db,shard,value,kv,key,分布式 来源: https://blog.csdn.net/luzhangting/article/details/123222101