Prerequisite


내가 직면한 문제는 아래의 문서에 있다.

👉 [[Redis-Stream-Issue]]

  • 아주 간단하게 요약하자면, 각 스레드가 스트림을 물고다니면서 태스크가 처리되고, 리소스들을 lock으로 관리한다.

  • 지금까지는 그나마 괜찮았는데, 레플리케이션의 쓰기 전파를 생각해봤을 때, 다수의 슬레이브 스트림을 락으로 관리하는 구조에서는 간단한 쓰기 요청에서도 슬레이브들의 스트림을 사용하는 쓰기 락 때문에 지나친 병목이 발생한 구조라는 것이다.

  • 원조 레디스는 싱글스레드와 이벤트루프로 구현된다.

  • 과정에서 좋은 방법이 없을까 하다가 찾게된건 mpsc이고 mpsc에 대한 간단한 조사를 아래의 문서에 정리했다.

👉 [[rust-mpsc]]

  • 아주 좋은 구조이자 해결책이라는 생각이 들었지만, 동시에 스트레스가 차오른다.

mpsc는 좋지만 도입을 위해 해결해야 할 문제들


지금까지는 상쾌하게, 아무런 설계에 대한 고민을 하지 않고 진행을 했고, 그게 내가 코드크래프터스에 돈을 지불하는 이유라고 생각했다. 스테이지별 처리해야할 요구사항만을 기준으로 생각하고 구현했고, 그 결과는 위와 같았던 것이다.

지금까지의 나의 코드의 구조는 이렇다.

main.rs

#[tokio::main]
async fn main() {
    // 1. 필요한 설정, 데이터 등을 세팅함.
    let state = StateManager::new();
    let config_handler = ConfigHandler::new(state.get_db(), state.get_config(), state.get_replication_config());

    config_handler.load_config().await;
    config_handler.configure_db().await;
    config_handler.configure_replication().await;

    let port = config_handler.get_port().await;
    let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).await.unwrap();

    println!("Listening on port {}", port);

    loop {
        match listener.accept().await {
            // 2. 스트림을 따서, 스트림을 handle_client로 전달
            Ok((stream, _)) => {
                let db = state.get_db();
                let config = state.get_config();
                let replication_config = state.get_replication_config();
                task::spawn(async move {
                    // 3. 딱 봐도 알 수 있겠지만, 일단 경합자원이고 뭐고 다 넘겨버린다.
                    handle_client(stream, db, config, replication_config).await;
                });
            }
            Err(e) => {
                println!("Error accepting connection: {}", e);
            }
        }
    }
}
  • main.rs에서 하는 일은 1,2,3으로 정리 될 수 있다.
  • lock으로 관리하는 자원들을 넘기는게 일단 잘못되어있다.

handle_client.rs

pub async fn handle_client(mut stream: TcpStream, db: Db, config: Config, replication_config: ReplicationConfig) {
    let mut buffer = [0; 512];
    loop {
        buffer.fill(0);
        // 1. (직접 받아온 스트림에서) 버퍼로 요청 메세지를 읽는다.
        match stream.read(&mut buffer).await {
            Ok(0) => break,
            Ok(n) => {
                let message = match std::str::from_utf8(&buffer[..n]) {
                    Ok(msg) => msg,
                    Err(_) => {
                        println!("Failed to parse message as UTF-8");
                        continue;
                    }
                };

                println!("Received message: {:?}", message);
                // 2. 메세지를 파싱해서, Command 객체를 (정확히는 enum)을 만들어준다.
                match CommandParser::parse_message(message) {
                    Ok(command) => {
                        // 3. 커맨드를 실행하는, 메서드는 다음과 같이 경합자원들을 물고 들어간다(stream, db, 등등...)
                        if let Err(e) = command.handle_command(&mut stream, Arc::clone(&db), Arc::clone(&config), replication_config.clone()).await {
                            println!("Failed to send response: {}", e);
                        }
                    }
                    Err(e) => {
                        println!("Failed to parse command: {}", e);
                    }
                }
            }
            Err(e) => {
                println!("Error reading from stream: {}", e);
                break;
            }
        }
    }
}
  • 여기도 마찬가지로 1,2,3 순서로 주석을 작성했다.

command.rs

    pub async fn handle_command(
        &self,
        stream: &mut TcpStream,
        db: Db,
        config: Config,
        replication_config: ReplicationConfig,
    ) -> std::io::Result<()> {
        let peer_addr = match stream.peer_addr() {
            Ok(addr) => addr,
            Err(_) => {
                let err_response = "-ERR Failed to retrieve client address\r\n".to_string();
                stream.write_all(err_response.as_bytes()).await?;
                return Ok(());
            }
        };

        // 1. 커맨드에서 받아온 처리 응답 결과를 경합자원인 stream에 직접 쓰는 문제.
        match self.execute(db, config, replication_config, peer_addr).await {
            Ok(responses) => {
                for response in responses {
                    match response {
                        CommandResponse::Simple(response) => {
                            stream.write_all(response.as_bytes()).await?;
                        }
                        CommandResponse::Bulk(data) => {
                            let header = format!("${}{}", data.len(), CRLF);
                            stream.write_all(header.as_bytes()).await?;
                            stream.write_all(&data).await?;
                        }
                        CommandResponse::EndStream => break,
                    }
                }
            }
            Err(e) => {
                let err_response = format!("-ERR {}\r\n", e);
                stream.write_all(err_response.as_bytes()).await?;
            }
        }

        Ok(())
    }
    pub async fn execute(
        &self,
        db: Db,
        config: Config,
        replication_config: ReplicationConfig,
        peer_addr: SocketAddr,
    ) -> Result<Vec<CommandResponse>, String> {
        match self {
          // 각 커맨드에 해당하는 처리 함수를 호출한다.
          Command::SET { key, value, ex, px } => Ok(vec![CommandResponse::Simple(
             Self::execute_set(key, value, *ex, *px, db).await,
           )]),
          // (중략 ... )
        }
    }

    async fn execute_set(key: &String, value: &String, ex: Option<u64>, px: Option<u64>, db: Db) -> String {
        let expiration_ms = match (px, ex) {
            (Some(ms), _) => Some(ms),
            (None, Some(s)) => Some(s * 1000),
            _ => None,
        };

        // 2. 실제 스레드에서 경합자원을 직접적으로 이용한다.
        db.write().await.insert(key.clone(), ValueEntry::new_relative(value.clone(), expiration_ms));
        // 3. 심지어 레플리케이션 전파와 같은 로직이 늘어난다면 또하나의 경합자원을 생성한다.
        format!("{}OK{}", SIMPLE_STRING_PREFIX, CRLF)
    }
    
  • 마찬가지로 작성된, 나의 코드의 문제를 주석으로 작성했다.

  • 즉 이런 구조로는 스레드가 직접적으로 경합 자원을 가져다 쓴다는 이슈이다.

  • 더 어려운건 이 상태를 변경하려면 구조를 심각하게 변경해야 한다는 것이다.

다시 설계를 진행해야 한다.


먼저 생각해본 개편 이후의 흐름.

먼저 새로 이벤트를 정의한다. 아래는 아직 간단하게 생각하는 redis_event.rs

#[derive(Debug)]
pub enum RedisEvent {
    ClientRequest {
        client: Client,
        command: Command,
    },
    Replication {
        slave: Slave,
        data: Vec<u8>,
    },
    PubSub {
        channel: String,
        message: String,
    },
    ClientConnect {
        client: Client,
    },
    ClientDisconnect {
        client: Client,
    },
}

참고, 원조 레디스는 클라이언트를 별도의 주소값만으로 식별한다.

use std::net::SocketAddr;

#[derive(Debug)]
pub struct Client {
    pub addr: SocketAddr,
}

마찬가지로 이벤트 핸들러. event_handler.rs

pub struct EventHandler {
    // 아마도 여기에 경합 자원들을 전부 보관.
    // State 객체를 여기서 들고있어도 좋을 것 같다.
    // 거기에 추가적으로 Sender도 하나 들고 있어야 한다(아마도 replication_rx). 
    // (이벤트를 처리하는 도중, 새로운 이벤트를 발행할수 있음. 예를들어 set을 처리하면서 db에 쓰고, 새로운 replication 전파 이벤트 발행)
}

impl EventHandler {
    pub async fn handle_event(event: RedisEvent) {
        match event {
            RedisEvent::ClientRequest { client, command, params } => {
                // 명령 실행 로직
            }
            RedisEvent::Replication { slave_id, data } => {
                // 레플리케이션 로직
            }
            RedisEvent::PubSub { channel, message } => {
                // Pub/Sub 메시지 처리
            }
            RedisEvent::ClientConnect { client } => {
                // 클라이언트 연결 처리
            }
            RedisEvent::ClientDisconnect { client } => {
                // 클라이언트 해제 처리
            }
        }
    }
}

이부분까지 진행을 했을 때, 리팩토링을 하는 부분에 있어서 질문이 생겼고 커뮤니티에 질문을 올리고 답변을 받았다. [[mpsc-질문-답변-백업]]