๋ฐ์ดํฐ ์ค์ฌ ์ ํ๋ฆฌ์ผ์ด์ ์ค๊ณ 10์ฅ
์ผ๊ด ์ฒ๋ฆฌ
์ผ๊ด ์ฒ๋ฆฌ
์ค๋น!
#[tokio::main] async fn main() { // 1. ํ์ํ ์ค์ , ๋ฐ์ดํฐ ๋ฑ์ ์ธํ ํจ. let state = StateManager::new(); let config_handler = ConfigHandler::new(state.get_db(), state.get_config(), state.get_replication_config()); config_handler.load_config().await; config_handler.configure_db().await; config_handler.configure_replication().await; let port = config_handler.get_port().await; let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).await.unwrap(); println!("Listening on port {}", port); loop { match listener.accept().await { // 2. ์คํธ๋ฆผ์ ๋ฐ์, ์คํธ๋ฆผ์ handle_client๋ก ์ ๋ฌ Ok((stream, _)) => { let db = state.get_db(); let config = state.get_config(); let replication_config = state.get_replication_config(); task::spawn(async move { // 3....
Prerequisite ๋ด๊ฐ ์ง๋ฉดํ ๋ฌธ์ ๋ ์๋์ ๋ฌธ์์ ์๋ค. ๐ [[Redis-Stream-Issue]] ์์ฃผ ๊ฐ๋จํ๊ฒ ์์ฝํ์๋ฉด, ๊ฐ ์ค๋ ๋๊ฐ ์คํธ๋ฆผ์ ๋ฌผ๊ณ ๋ค๋๋ฉด์ ํ์คํฌ๊ฐ ์ฒ๋ฆฌ๋๊ณ , ๋ฆฌ์์ค๋ค์ lock์ผ๋ก ๊ด๋ฆฌํ๋ค. ์ง๊ธ๊น์ง๋ ๊ทธ๋๋ง ๊ด์ฐฎ์๋๋ฐ, ๋ ํ๋ฆฌ์ผ์ด์ ์ ์ฐ๊ธฐ ์ ํ๋ฅผ ์๊ฐํด๋ดค์ ๋, ๋ค์์ ์ฌ๋ ์ด๋ธ ์คํธ๋ฆผ์ ๋ฝ์ผ๋ก ๊ด๋ฆฌํ๋ ๊ตฌ์กฐ์์๋ ๊ฐ๋จํ ์ฐ๊ธฐ ์์ฒญ์์๋ ์ฌ๋ ์ด๋ธ๋ค์ ์คํธ๋ฆผ์ ์ฌ์ฉํ๋ ์ฐ๊ธฐ ๋ฝ ๋๋ฌธ์ ์ง๋์น ๋ณ๋ชฉ์ด ๋ฐ์ํ ๊ตฌ์กฐ๋ผ๋ ๊ฒ์ด๋ค. ์์กฐ ๋ ๋์ค๋ ์ฑ๊ธ์ค๋ ๋์ ์ด๋ฒคํธ๋ฃจํ๋ก ๊ตฌํ๋๋ค. ๊ณผ์ ์์ ์ข์ ๋ฐฉ๋ฒ์ด ์์๊น ํ๋ค๊ฐ ์ฐพ๊ฒ๋๊ฑด mpsc์ด๊ณ mpsc์ ๋ํ ๊ฐ๋จํ ์กฐ์ฌ๋ฅผ ์๋์ ๋ฌธ์์ ์ ๋ฆฌํ๋ค. ๐ [[rust-mpsc]]...
mpsc๋? Rust์ mpsc ์ฑ๋์ โ์ฌ๋ฌ ์์ฐ์ (Multiple Producer)โ์ โํ๋์ ์๋น์ (Single Consumer)โ๋ก ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ณ ์ฒ๋ฆฌ ํ ์ ์๋ ๋น๋๊ธฐ ๋๊ตฌ์ด๋ค. ์์ด๋์ด๋ ์์ด๋์ด์ง๋ง, ๊ธฐ๋ณธ์ ์ผ๋ก ์ค๊ณ์ ๋์์ด Rust์ ์์ ๊ถ๊ณผ ๋์์ฑ ๋ชจ๋ธ์ ๊ธฐ๋ฐ์ ๋๊ณ ์๋ค. ๊ธฐ๋ณธ์ ์ธ ์ฌ์ฉ ๋ฐฉ์์ ์๋์ ๊ฐ๋ค. mpsc::channel tx (์์ฐ์), rx(์๋น์) ๋ฅผ ๋ฐํ๋ฐ๋๋ค. use tokio::sync::mpsc; #[tokio::main] async fn main() { // ์ฑ๋ ์์ฑ (๋ฒํผ ํฌ๊ธฐ: 32) let (tx, mut rx) = mpsc::channel(32); // ์์ฐ์ (Producer) tokio::spawn(async move { for i in 1....
์ฐ๋์ด ์ฐ..
๋ ๋์ค ๋ ํ๋ฆฌ์นด๋ฅผ ๋ฑ๋กํ๋ ๊ณผ์ ์ค replconf, psync๋ฅผ ์ฒ๋ฆฌํ๋ ์ค ๋ฐ์ํ ๋ฌธ์ .
07์ฅ ํธ๋์ญ์ ์ด๋ค ์ ์๋ค์ 2๋จ๊ณ ์ปค๋ฐ์์ ์ ๋ฐ๋๋ ์ฑ๋ฅ์ด๋ ๊ฐ์ฉ์ฑ ๋ฌธ์ ๋๋ฌธ์ ์๊ธฐ๋ ๋น์ฉ์ด ๋๋ฌด ์ปค์ ์ด๋ฅผ ์ง์ํ ์ ์๋ค๊ณ ์ฃผ์ฅํ๋ค. ์ฐ๋ฆฌ๋ ํญ์ ํธ๋์ญ์ ์์ด ์ฝ๋ฉํ๋ ๊ฒ๋ณด๋ค ํธ๋์ญ์ ์ ๊ณผ์ฉํด์ ๋ณ๋ชฉ์ง์ ์ด ์๊ธฐ๋ ์ฑ๋ฅ ๋ฌธ์ ๋ฅผ ์ ํ๋ฆฌ์ผ์ด์ ํ๋ก๊ทธ๋๋จธ๊ฐ ์ฒ๋ฆฌํ๋๊ฒ ๋ซ๋ค๊ณ ์๊ฐํ๋ค. ๋ํนํ ํ์ค ์ธ๊ณ์์ ๋ฐ์ดํฐ ์์คํ ์ ์ฌ๋ฌ ๊ฐ์ง ๋ฌธ์ ๊ฐ ์๊ธธ ์ ์๋ค. ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ํํธ์จ์ด๋ ํ๋์จ์ด๋ ์ธ์ ๋ผ๋ ์คํจํ ์ ์๋ค. ์ ํ๋ฆฌ์ผ์ด์ ์ ์ธ์ ๋ผ๋ ์ฃฝ์ ์ ์๋ฐ. ๋คํธ์ํฌ๊ฐ ๋๊ธฐ๋ฉด ์ ํ๋ฆฌ์ผ์ด์ ๊ณผ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ์ด ๊ฐ์๊ธฐ ๋๊ธฐ๊ฑฐ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ ธ๋ ์ฌ์ด์ ํต์ ์ด ์๋ ์ ์๋ค....
ํํฐ์ ๋ ๋ฐ์ดํฐ์ ์ด ๋งค์ฐ ํฌ๊ฑฐ๋ ์ง์ ์ฒ๋ฆฌ๋์ด ๋งค์ฐ ๋๋ค๋ฉด ๋ณต์ ๋ง์ผ๋ก๋ ๋ถ์กฑํ๊ณ , ๋ฐ์ดํฐ๋ฅผ ํํฐ์ ์ผ๋ก ์ชผ๊ฐค ํ์๊ฐ ์๋ค. ์ด ์์ ์ ์ค๋ฉ์ด๋ผ๊ณ ํ๋ค. ๋ฐ์ดํฐ๋ฅผ ํํฐ์ ๋ ํ๋ ์ฃผ๋ ์ด์ ๋ ํ์ฅ์ฑ์ด๋ค. ํํฐ์ ๋๊ณผ ๋ณต์ ํํฐ์ ๋์ ํด๋, ํํฐ์ ๋ ๋จ์๋ฅผ ๊ธฐ์ค์ผ๋ก๋ ๋ณต์ ๋ฅผ ํ ์ ์๋ค. ํ ๋ ธ๋์ ์ฌ๋ฌ ํํฐ์ ์ ๋ณต์ ํ ์๋ ์์ด์, ๋ ธ๋ ํ๋์ ํน์ ํํฐ์ ์ ๋ฆฌ๋์ ๋๋จธ์ง ํํฐ์ ์ ๋ณต์ ๋ฅผ ๊ฐ์ง๋์์ผ๋ก ๊ต์ฐจํด ๋๊ธฐ๋ ํ๋ค. ํค-๊ฐ ๋ฐ์ดํฐ ํํฐ์ ๋ ํํฐ์ ๋์ ๋ชฉ์ ์ ๋ฐ์ดํฐ์ ์ง์ ๋ถํ๋ฅผ ๋ถ์ฐ์ํค๋ ๊ฒ์ด๋ค. ์ด๊ฒ์ด ๋ฌ์ฑ๋์ง ์์์ ๋ ์ ๋ ธ๋ค(skewed) ๋ผ๊ณ ํ๋ค....
Rust-Redis-D4 Code ๐ฅธ use crate::protocol_constants::{MAGIC_NUMBER, OPCODE_EOF, OPCODE_META, OPCODE_START_DB}; use crate::{Config, Db, ValueEntry}; use byteorder::{LittleEndian, ReadBytesExt}; use crc::{Crc, CRC_64_ECMA_182}; use std::fs::File; use std::io::{self, BufReader, Read, Seek, SeekFrom}; fn bytes_to_hex(bytes: &[u8]) -> String { bytes.iter().map(|b| format!("{:02X}", b)).collect::<Vec<String>>().join(" ") } fn read_length_or_integer<R: Read>(reader: &mut R, first_byte: u8) -> io::Result<usize> { match first_byte >> 6 { 0b00 => Ok((first_byte & 0x3F) as usize), 0b01 => { let second_byte = reader.read_u8()?; Ok((((first_byte & 0x3F) as usize) << 8) | (second_byte as usize)) } 0b10 => reader....