#[tokio::main]
async fn main() {
// 1. ํ์ํ ์ค์ , ๋ฐ์ดํฐ ๋ฑ์ ์ธํ
ํจ.
let state = StateManager::new();
let config_handler = ConfigHandler::new(state.get_db(), state.get_config(), state.get_replication_config());
config_handler.load_config().await;
config_handler.configure_db().await;
config_handler.configure_replication().await;
let port = config_handler.get_port().await;
let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).await.unwrap();
println!("Listening on port {}", port);
loop {
match listener.accept().await {
// 2. ์คํธ๋ฆผ์ ๋ฐ์, ์คํธ๋ฆผ์ handle_client๋ก ์ ๋ฌ
Ok((stream, _)) => {
let db = state.get_db();
let config = state.get_config();
let replication_config = state.get_replication_config();
task::spawn(async move {
// 3. ๋ฑ ๋ด๋ ์ ์ ์๊ฒ ์ง๋ง, ์ผ๋จ ๊ฒฝํฉ์์์ด๊ณ ๋ญ๊ณ ๋ค ๋๊ฒจ๋ฒ๋ฆฐ๋ค.
handle_client(stream, db, config, replication_config).await;
});
}
Err(e) => {
println!("Error accepting connection: {}", e);
}
}
}
}
// === ์ดํ handler.rs ===
pub async fn handle_client(mut stream: TcpStream, db: Db, config: Config, replication_config: ReplicationConfig) {
let mut buffer = [0; 512];
loop {
buffer.fill(0);
// 1. (์ง์ ๋ฐ์์จ ์คํธ๋ฆผ์์) ๋ฒํผ๋ก ์์ฒญ ๋ฉ์ธ์ง๋ฅผ ์ฝ๋๋ค.
match stream.read(&mut buffer).await {
Ok(0) => break,
Ok(n) => {
let message = match std::str::from_utf8(&buffer[..n]) {
Ok(msg) => msg,
Err(_) => {
println!("Failed to parse message as UTF-8");
continue;
}
};
println!("Received message: {:?}", message);
// 2. ๋ฉ์ธ์ง๋ฅผ ํ์ฑํด์, Command ๊ฐ์ฒด๋ฅผ (์ ํํ๋ enum)์ ๋ง๋ค์ด์ค๋ค.
match CommandParser::parse_message(message) {
Ok(command) => {
// 3. ์ปค๋งจ๋๋ฅผ ์คํํ๋, ๋ฉ์๋๋ ๋ค์๊ณผ ๊ฐ์ด ๊ฒฝํฉ์์๋ค์ ๋ฌผ๊ณ ๋ค์ด๊ฐ๋ค(stream, db, ๋ฑ๋ฑ...)
if let Err(e) = command.handle_command(&mut stream, Arc::clone(&db), Arc::clone(&config), replication_config.clone()).await {
println!("Failed to send response: {}", e);
}
}
Err(e) => {
println!("Failed to parse command: {}", e);
}
}
}
Err(e) => {
println!("Error reading from stream: {}", e);
break;
}
}
}
}
// === ์ดํ command.rs ===
pub async fn handle_command(
&self,
stream: &mut TcpStream,
db: Db,
config: Config,
replication_config: ReplicationConfig,
) -> std::io::Result<()> {
let peer_addr = match stream.peer_addr() {
Ok(addr) => addr,
Err(_) => {
let err_response = "-ERR Failed to retrieve client address\r\n".to_string();
stream.write_all(err_response.as_bytes()).await?;
return Ok(());
}
};
// 1. ์ปค๋งจ๋์์ ๋ฐ์์จ ์ฒ๋ฆฌ ์๋ต ๊ฒฐ๊ณผ๋ฅผ ๊ฒฝํฉ์์์ธ stream์ ์ง์ ์ฐ๋ ๋ฌธ์ .
match self.execute(db, config, replication_config, peer_addr).await {
Ok(responses) => {
for response in responses {
match response {
CommandResponse::Simple(response) => {
stream.write_all(response.as_bytes()).await?;
}
CommandResponse::Bulk(data) => {
let header = format!("${}{}", data.len(), CRLF);
stream.write_all(header.as_bytes()).await?;
stream.write_all(&data).await?;
}
CommandResponse::EndStream => break,
}
}
}
Err(e) => {
let err_response = format!("-ERR {}\r\n", e);
stream.write_all(err_response.as_bytes()).await?;
}
}
Ok(())
}
pub async fn execute(
&self,
db: Db,
config: Config,
replication_config: ReplicationConfig,
peer_addr: SocketAddr,
) -> Result<Vec<CommandResponse>, String> {
match self {
// ๊ฐ ์ปค๋งจ๋์ ํด๋นํ๋ ์ฒ๋ฆฌ ํจ์๋ฅผ ํธ์ถํ๋ค.
Command::SET { key, value, ex, px } => Ok(vec![CommandResponse::Simple(
Self::execute_set(key, value, *ex, *px, db).await,
)]),
// (์ค๋ต ... )
}
}
async fn execute_set(key: &String, value: &String, ex: Option<u64>, px: Option<u64>, db: Db) -> String {
let expiration_ms = match (px, ex) {
(Some(ms), _) => Some(ms),
(None, Some(s)) => Some(s * 1000),
_ => None,
};
// 2. ์ค์ ์ค๋ ๋์์ ๊ฒฝํฉ์์์ ์ง์ ์ ์ผ๋ก ์ด์ฉํ๋ค.
db.write().await.insert(key.clone(), ValueEntry::new_relative(value.clone(), expiration_ms));
// 3. ์ฌ์ง์ด ๋ ํ๋ฆฌ์ผ์ด์
์ ํ์ ๊ฐ์ ๋ก์ง์ด ๋์ด๋๋ค๋ฉด ๋ํ๋์ ๊ฒฝํฉ์์์ ์์ฑํ๋ค.
format!("{}OK{}", SIMPLE_STRING_PREFIX, CRLF)
}
// ====== ์ดํ ์งํํ๊ณ ์๋ ๋ฆฌํฉํ ๋ง =======
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
let client_manager = ClientManager::new();
let (tx, mut rx) = mpsc::channel::<RedisEvent>(32);
let db = Arc::new(tokio::sync::RwLock::new(Default::default()));
let config = Arc::new(tokio::sync::RwLock::new(Default::default()));
let replication_config = Arc::new(tokio::sync::RwLock::new(Default::default()));
//# client manager ์์ฒด๋ฅผ ๋๊ธฐ๊ณ , ์๋ spawn์์๋ client manager๋ฅผ ์ฌ์ฉํ์ง ์๊ณ
//# client์ถ๊ฐ, ์ญ์ ๋ฅผ RedisEvent์ ๋ ์ถ๊ฐํด์ ๋๊ธฐ๊ธฐ (RedisEvent::AddClient(...), RemoveClient(...))
//# db, config, replication_config, cilent_manager๋ ๋ชจ๋ event handler์์๋ง ์ฌ์ฉํ๊ฒ ๋ฐ๋๊ฒ ๋์ด์
//# Arc, RwLock ๋ฑ ์ญ์
let event_handler = EventHandler::new(
db.clone(),
config.clone(),
replication_config.clone(),
client_manager.clients.clone(), //# client_manager์์ฒด๋ฅผ ๋๊ธฐ๊ณ ,
);
let event_publisher = EventPublisher::new(tx);
tokio::spawn(async move {
while let Ok((stream, addr)) = listener.accept().await {
//# id๋ atomic๊ฐ์๊ฒ์ ์ด์ฉํด์ uniqueํจ์ ๋ณด์ฅ
let client_id = addr.port() as u64;
//# AddClient event๋ก ์์ ํ๋ฉด์
//# stream์ split ์์ผ์ ์ฝ๋ ์ชฝ๊ณผ ์ฐ๋์ชฝ์ ๋ถ๋ฆฌ.
//# ์ฝ๋์ชฝ์ ์๋spawn ๋ด๋ถ์์ ์ฌ์ฉํ๊ณ , ์ฐ๋์ชฝ์ client์์ ์ฌ์ฉ
//# https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.split
client_manager.add_client(client_id, stream.try_clone().unwrap()).await;
let publisher = event_publisher.clone();
//# ๋งค๋์ ๋ event_handler์์๋ง ์กด์ฌ
let manager = client_manager.clone();
tokio::spawn(async move {
//# ๋ถ๋ฆฌํ sream์ค reader ์ฌ์ฉ
let mut stream = manager.get_stream(client_id).await.unwrap().write().await;
let mut buffer = [0; 1024];
loop {
let bytes_read = match stream.read(&mut buffer).await {
Ok(0) => break,
Ok(n) => n,
Err(_) => {
eprintln!("Failed to read from client {}", client_id);
break;
}
};
//# ๋ณด๋ธ ๋ฐ์ดํฐ๋ฅผ ํ๋ฒ์ ๋ค ์ฝ๋๋ค๋ ๋ณด์ฅ์ด ์๊ธฐ ๋๋ฌธ์
//# ๋ณด๋ผ๋๋ byte len๊ฐ์๊ฑธ ํด๋์ ๋ด์ ๋ณด๋ด๊ณ
//# ์ฝ์๋๋ ํด๋น ๋ฐ์ดํธ๋ฅผ ๋ค ์ฝ์๋๊น์ง ๊ณ์ ๋๋ฉด์ buffer๋ฅผ ์ฑ์์ผ ํจ.
let input = String::from_utf8_lossy(&buffer[..bytes_read]).to_string();
if let Err(e) = publisher.publish(client_id, input.clone()).await {
eprintln!("Error publishing event: {}", e);
}
}
manager.remove_client(client_id).await;
});
}
});
while let Some(event) = rx.recv().await {
event_handler.handle_event(event).await;
}
}
//# ์๋์ ๊ฐ์กฑ Lock, Arc๋ ํ์์ฑ์ด ์๋ค๋ฉด ๋ชจ๋ ์ญ์
pub type SharedClients = Arc<RwLock<HashMap<u64, Arc<Client>>>>;
pub struct ClientManager {
clients: SharedClients,
}
impl ClientManager {
pub fn new() -> Self {
Self {
clients: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn add_client(&self, client_id: u64, client: Client) {
let mut clients = self.clients.write().await;
clients.insert(client_id, Arc::new(client));
}
pub async fn remove_client(&self, client_id: u64) {
let mut clients = self.clients.write().await;
clients.remove(&client_id);
}
pub async fn get_client(&self, client_id: u64) -> Option<Arc<Client>> {
let clients = self.clients.read().await;
clients.get(&client_id).cloned()
}
}
#[derive(Debug)]
pub struct Client {
pub id: u64,
pub stream: Arc<RwLock<TcpStream>>,
pub connected_at: Instant,
pub request_count: RwLock<u64>,
}
impl Client {
pub fn new(id: u64, stream: TcpStream) -> Self {
Self {
id,
stream: Arc::new(RwLock::new(stream)),
connected_at: Instant::now(),
request_count: RwLock::new(0),
}
}
pub async fn increment_request_count(&self) {
let mut count = self.request_count.write().await;
*count += 1;
}
pub async fn get_request_count(&self) -> u64 {
*self.request_count.read().await
}
}