Learners High Week 3,4 Summary!
카프카를 헥사고날 아키텍처를 토대로 클론코딩 해봤다. 깃허브 주소
Week 3,4 Intro
인사이동이 있었다. 부서이동으로 인한 OJT가 완료되어야 다음 이슈를 받을 수 있는 상황이었고, 기존에 받았던 이슈들은 처리가 완료되었다.
일부 운영 이슈가 있었지만, 그것도 다 처리가 완료되어있어서 진짜 뭘 해야할지 모르겠는 상황이었다.
새로운 프로젝트가 열리고, 업무를 파악하며 개선하고 싶은 부분들을 찾아서 열심히 코드를 봤지만, 그래도 당장 진행한다고 하기가 어려웠다.
원래같았으면 공부나 발전을 하기에 오히려 좋았을 환경이었겠지만, 당장 나의 미션에 적합하지는 않다고 생각해서 굉장히 우울해 있다가, 털고 일어나서 지금 OJT기간을 가장 치열하게 보낼 수 있는 일들을 찾아봤다.
옮기는 부서의 특징은 다음과 같았다. 일단 파트너오피스로 고객사와 md들이 사용하는 백오피스를 개발하는 부서였고, 가장 주요한 업무는 거의 스트림과 연관이 되어 있었다. (상품 정보 연동, 수정과 같은 이벤트들의 스트림)
그리고 추가적으로 메인 프로덕트에 헥사고날 아키텍처를 도입해서 개발/유지/보수 하고 있었다.
헥사고날 아키텍처는 많이 생소했고, 실제로 하는걸 본 소스코드는 이번이 처음이었다.
그래서 이번 OJT 기간동안 헥사고날 아키텍처와 스트림(메세지브로커)를 공부하기 위해서 카프카를 헥사고날 아키텍처 형태로 클론코딩하기로 했다.
Start with Codecrafters
먼저 코드크래프터스라는 사이트의 도움을 받았다. 해당 사이트는
위처럼 특정 기술을 구현하는데, 단계를 나눠주고, 해당 구현 단계마다 통합테스트를 진행해주는 사이트이다. 그리고 구현을 위해 필요한 프로토콜등의 정보를 보기 쉽게 정리하고 요약해서 제공해주기도 한다.
다만 요청/응답을 실제 해당 기술이 사용하는 프로토콜을 통해서 진행되어야 한다. 요청 예시, 응답 예시와 같이 실제 요청도 바이트 버퍼로 오고, 응답도 바이트 버퍼로 인코딩해서 보내야한다.
다만 문제는
아직 카프카는 베타로 지원하는중이라, 절반정도만 구현이 되어있다는 것이었다. 이 시점까지의 github 링크
저기까지 구현했을 때, kraft 메타데이터 정보와 일부 consumer 인터페이스만 구현이 되어있었다.
가장 중요한 메세지 저장 로직과, 컨슈머 그룹관리, 오프셋 커밋하는 인터페이스들은 안되어 있었고, 레플리케이션 관련 로직도 아직 없었다.
레플리케이션 로직은 어렵다고 쳐도, 그래도 실제 카프카처럼 어느정도 동작한다고 하려면 기본적인 producer, consumser api들과 오프셋 커밋,등이 되어 있어야 한다고 생각했는데, 이부분이 너무 아쉬웠다.
내가 스스로 요구사항을 정의하고 추가 구현을 진행하기로
그래서 새로 레포를 파고, 추가 구현을 진행했다.
최대한 프로토콜을 지키면서 진행하려고 했고, 아직 구현되지 않은 요구사항은 통합테스트로 작성했다.
/tests/integration_test.rs
#[tokio::test]
async fn test_message_log_persistence() -> Result<()> {
print_test_header("Message Log Persistence Test");
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let config = StoreConfig {
max_segment_size: 1024,
max_buffer_size: 2,
flush_interval: Duration::from_millis(100),
};
let store = DiskMessageStore::new(temp_dir.path().to_path_buf(), config);
// 테스트 메시지들 준비
let test_messages = vec![
KafkaMessage {
correlation_id: 1,
topic: "test-topic".to_string(),
partition: 0,
offset: 0,
timestamp: 1234567890,
payload: "First message".as_bytes().to_vec(),
}, KafkaMessage {
correlation_id: 2,
topic: "test-topic".to_string(),
partition: 0,
offset: 0,
timestamp: 1234567891,
payload: "Second message".as_bytes().to_vec(),
}, KafkaMessage {
correlation_id: 3,
topic: "test-topic".to_string(),
partition: 0,
offset: 0,
timestamp: 1234567892,
payload: "Third message".as_bytes().to_vec(),
}, ];
println!("\n=== 메시지 저장 시작 ===");
let mut stored_offsets = Vec::new();
for (i, message) in test_messages.iter().enumerate() {
print_message_details(&format!("저장할 메시지 {}", i + 1), message);
let offset = store.store_message(message.clone()).await?;
stored_offsets.push(offset);
println!("메시지 {} 저장됨, 오프셋: {}", i + 1, offset);
}
// 버퍼가 디스크에 플러시되도록 잠시 대기
println!("\n버퍼 플러시 대기 중...");
tokio::time::sleep(Duration::from_millis(200)).await;
// 로그 파일 확인
let topic_dir = temp_dir.path().join("test-topic-0");
let base_offset = (stored_offsets[0] / 1000) * 1000;
let log_file = topic_dir.join(format!("{:020}.log", base_offset));
let index_file = topic_dir.join(format!("{:020}.index", base_offset));
println!("\n=== 로그 파일 정보 ===");
println!("로그 파일 경로: {:?}", log_file);
println!("인덱스 파일 경로: {:?}", index_file);
assert!(log_file.exists(), "로그 파일이 존재해야 함");
assert!(index_file.exists(), "인덱스 파일이 존재해야 함");
let log_metadata = fs::metadata(&log_file).await?;
let index_metadata = fs::metadata(&index_file).await?;
println!("로그 파일 크기: {} bytes", log_metadata.len());
println!("인덱스 파일 크기: {} bytes", index_metadata.len());
// 저장된 메시지 읽기 및 검증
println!("\n=== 저장된 메시지 검증 ===");
for (i, offset) in stored_offsets.iter().enumerate() {
let read_result = store
.read_messages("test-topic", 0, *offset as i64)
.await?;
match read_result {
Some(data) => {
println!("\n메시지 {} (오프셋 {}):", i + 1, offset);
println!("원본 메시지: {:?}", String::from_utf8_lossy(&test_messages[i].payload));
println!("읽은 메시지: {:?}", String::from_utf8_lossy(&data));
assert_eq!(&data, &test_messages[i].payload, "메시지 내용이 일치해야 함");
} None => panic!("메시지를 찾을 수 없음: offset {}", offset),
} }
// 로그 파일 내용 직접 확인 (처음 100바이트만)
println!("\n=== 로그 파일 원시 내용 ===");
let mut log_file = File::open(&log_file).await?;
let mut buffer = vec![0u8; 100];
let n = log_file.read(&mut buffer).await?;
println!("처음 {} 바이트: {:?}", n, &buffer[..n]);
Ok(())
}
사실 원대한 목표는 통합 테스트 코드를 작성하고, 해당 통합 테스트 코드에 카프카 클라이언트 라이브러리를 붙여서 실제로 기능이 동작하는지를 보고싶었는데, 생각보다 더 많은 요구사항을 구현해둔 뒤에야 실제 카프카 클라이언트 라이브러리를 붙있 수 있었다.
사실 통합 테스트로 보기 어려운 부분은 하나 더 있었는데 바로 실제 입력 버퍼로 요청을 보내는 부분을 생략했다는 점이다. 테스트 코드를 위한 정확한 입력 버퍼를 바이트코드로 직접 인코딩을 해야하는데, ai툴을 이용해도 오류가 생각보다 많고 별 것 아닌 프로토콜 바이트 오류 때문에 디버깅하는시간이 너무 길어져서 본질과는 다른일을 하고있다는 생각이 들었다.
시작 전에 고민하던 것들
- 처음부터 어느정도의 추상화와 설계를 하고 진행해야 한다. 해당 사이트에서 레디스를 구현하는 프로젝트를 진행한적이 있는데, 중간에 구조적으로 잘못되어 있는 부분들을 리팩토링 하다가 너무 힘들어서 해맸던 경험이 있다. 결국 러스트 사용자 모임 디스코드에서 질문을 올리고 답변받아 진행했는데, 그와 같은 경험을 하고싶지는 않았다. 난개발된-레디스를-이벤트루프-기반으로-리팩토링하기
- 관련한 이야기인데, 도메인로직을 최대한 정의하고 시작해보려고 노력했다. 관련해서 ddd와 헥사고날과 관련한 공부를 미리 진행했다. 헥사고날-아키텍처
- 카프카 내부 구현에 대한 내용을 미리 알고 시작하면 좋을 것 같았다. 그래서 간단하게 카프카 관련 서적을 읽기는 했는데, 주로 설정과 운영노하우에 대한 이야기가 많아서 내 목적에는 조금 부합하지 않았던 것 같다.
Impl and TroubleShootings
01 어려운건 헥사고날이 아니라 DDD..
먼저 처음 시작 할 때는, 도메인 로직에 대한 정의가 어려웠고, 하던 중에는 도메인 로직인지 아닌지가 항상 헷갈렸고, 끝나고 나서는 내가 도메인 로직을 잘 발라냈는지가 아직도 의문이었다. 그만큼 도메인 로직 자체를 미리 바르는것도, 내가 구현해야할 프로젝트의 도메인 로직을 전부 알아야 하는것도 너무 어려웠다. 그래서 DDD에 대해서는 항상 생각이 많았지만, 도메인 로직이 확실히 격리, 분리되어있어야 하는 헥사고날 아키텍처에서는 이부분이 가장 어려웠다. 그래서 이게 헥사고날의 문제냐 하면 그게 맞는 것 같다고 느낀 이유는 작업 도중에 코드를 정리하는 난이도가 기존 layered arhcitecture에서는 어느정도 가능했지만, 헥사고날에서는 더 어려웠다는 생각이 들었기 때문이었다.
아무튼 구조를 잘 잡아보려고 노력했을 때 나의 프로젝트는 다음과 같은 구조가 나왔다.
/adapters/incoming/tcp_adapter.rs
pub struct TcpAdapter {
listener: TcpListener,
broker_incoming_port: Arc<dyn BrokerIncomingPort>,
protocol_parser: KafkaProtocolParser,
}
impl TcpAdapter {
pub async fn new(
addr: &str,
broker_incoming_port: Arc<dyn BrokerIncomingPort>,
protocol_parser: KafkaProtocolParser,
) -> Result<Self> {
let listener = TcpListener::bind(addr)
.await
.map_err(ApplicationError::Io)?;
Ok(Self {
listener,
broker_incoming_port,
protocol_parser,
})
}
pub async fn run(&self) -> Result<()> {
println!("Server listening on port 9092");
loop {
match self.listener.accept().await {
Ok((stream, _)) => {
let message_handler = Arc::clone(&self.broker_incoming_port);
let protocol_parser = self.protocol_parser.clone();
tokio::spawn(async move {
if let Err(e) =
handle_connection(stream, message_handler, protocol_parser).await
{
println!("Connection error: {}", e);
}
});
}
Err(e) => println!("Accept error: {}", e),
}
}
}
}
async fn handle_connection(
mut stream: TcpStream,
broker_incoming_port: Arc<dyn BrokerIncomingPort>,
protocol_parser: KafkaProtocolParser,
) -> Result<()> {
println!("Accepted new connection");
loop {
// 1. 요청 크기 읽기
let mut size_bytes = [0u8; 4];
if let Err(e) = stream.read_exact(&mut size_bytes).await {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
println!("Client closed connection");
return Ok(());
}
return Err(ApplicationError::Io(e));
}
let message_size = i32::from_be_bytes(size_bytes);
// 2. 요청 데이터 읽기
let mut request_data = vec![0; message_size as usize];
stream
.read_exact(&mut request_data)
.await
.map_err(ApplicationError::Io)?;
// 3. 프로토콜 파싱
let request = protocol_parser.parse_request(&request_data)?;
// 4. 비즈니스 로직 처리
let response = broker_incoming_port.handle_request(request).await?;
// 5. 응답 인코딩 및 전송
let encoded = protocol_parser.encode_response(response);
stream
.write_all(&encoded)
.await
.map_err(ApplicationError::Io)?;
}
}
/ports/incoming/broker_incoming_port.rs
#[async_trait]
pub trait BrokerIncomingPort: Send + Sync {
async fn handle_request(&self, request: KafkaRequest) -> Result<KafkaResponse>;
}
/application/broker_service.rs
#[allow(dead_code)]
pub struct BrokerService {
message_store: Box<dyn MessageOutgoingPort>,
metadata_store: Box<dyn MetadataOutgoingPort>,
}
impl BrokerService {
pub fn new(
message_store: Box<dyn MessageOutgoingPort>,
metadata_store: Box<dyn MetadataOutgoingPort>,
) -> Self {
Self {
message_store,
metadata_store,
}
}
#[async_trait]
impl BrokerIncomingPort for BrokerService {
async fn handle_request(&self, request: KafkaRequest) -> Result<KafkaResponse> {
if !request.header.is_supported_version() {
return Ok(KafkaResponse::new(
request.header.correlation_id,
UNSUPPORTED_VERSION,
ResponsePayload::ApiVersions(ApiVersionsResponse::default()),
));
}
match request.header.api_key {
API_VERSIONS_KEY => Ok(KafkaResponse::new(
request.header.correlation_id,
0,
ResponsePayload::ApiVersions(ApiVersionsResponse::default()),
)),
FETCH_KEY => self.handle_fetch_request(&request, &request.payload).await,
DESCRIBE_TOPIC_PARTITIONS_KEY => {
self.handle_describe_topic_partitions(&request, &request.payload)
.await
}
PRODUCE_KEY => {
self.handle_produce_request(&request, &request.payload)
.await
}
_ => Ok(KafkaResponse::new(
request.header.correlation_id,
0,
ResponsePayload::ApiVersions(ApiVersionsResponse::new(vec![])),
)),
}
}
}
뭔가 습관적으로 Controller를 도메인 단위로 분리하는게 생각나서 그렇게 할 까 했지만, 그것보다는 별도의 설정을 위한 cli나 새로운 입력 어댑터로 분류 할 수 있는 경우가 생겨야 분리하는게 낫다는 생각이 들어 모든 요청을 하나의 어댑터에서 처리했다.
02 프로토콜은 도메인 로직일까?
일단 내 프로젝트의 계층 구조는 아래와 같다.
src/
├── adapters/
│ ├── incoming/
│ │ ├── mod.rs
│ │ ├── tcp_adapter.rs
│ ├── outgoing/
│ │ ├── README.md
│ │ ├── disk_store.rs
│ │ ├── kraft_metadata_store.rs
│ │ ├── mod.rs
│ ├── protocol/
│ │ ├── dto/
│ │ ├── parser/
│ │ │ ├── constants.rs
│ │ │ ├── kraft_record_parser.rs
│ │ │ ├── mod.rs
│ │ │ ├── tcp_parser.rs
│ │ ├── mod.rs
├── application/
├── config/
├── domain/
├── ports/
├── lib.rs
└── main.rs
여기서 프로토콜 부분이 참 애매했는데, 내가 내린 결론은 위처럼 반영되어있다.
먼저 내가 카프카를 개발하는 개발자 관점이었다면 프로토콜이 가지는 도메인적 의의가 충분하다.
당연히 버퍼를 어떻게 해석해야 하는지와 같은 도메인 로직도 있고,
심지어 클라이언트와 핸드쉐이크(정확히는 핸드쉐이크로 통칭하지는 않지만) 과정에서 아래와 같은 일이 일어난다.
클라이언트 : 안녕 지원하는 버전좀 알려줘
카프카 :
{
"ErrorCode": 0,
"ApiKeys": [
{
"Index": 0,
"MinVersion": 0,
"MaxVersion": 8
},
{
"Index": 1,
"MinVersion": 0,
"MaxVersion": 12
},
// ... 다른 API 키들 ...
],
"ThrottleTimeMs": 0
}
클라이언트 : 카프카와 본인이 지원하는 가장 높은 버전으로 버퍼를 인코딩해서 요청
결국 내가 카프카를 개발하는 개발자 관점에서는 분명히 프로토콜은 도메인 로직이다.
내가 어떠한 버전을 지원하는지, 입력 버퍼를 어떻게 디코드/인코드 해야하는지에 대한 정보이며 유지보수 관리되어야 하기 때문이다.
문제는 그렇게 했을때, 어댑터 레이어에서는 프로토콜을 모른채로 포트를 통해 어댑터로 입련된 일련의 바이트 버퍼를 내려보내주고 서비스 레이어 이하에서 실제 프로토콜대로 버퍼를 파싱하는 로직이 붙게 된다.
그러면 그 구조는 자연스러운가 했을때 기본적으로 의아한점이 분명히 있기도 하고, 두 번째로 말이 쉽지 incoming에서 프로토콜을 전혀 모른채로 버퍼를 어디까지 읽어들인다음에 내려보내야하는지도 전혀 알 수 없다. (어댑터에서 프로토콜을 전혀 모른다면 어디까지가 요청인지에 대한 구분이 어려움)
그래서 어댑터 레이어에서 프로토콜을 알게 한다면 바로 헥사고날 아키텍처를 깨뜨린다 (어댑터의 도메인 로직 참조)
결론적으로 이 두가지 내용이 맞지 않다고 생각되어 adapters/
하위에 protocol을 두기는 했다.
이부분은 옮기는 팀에서 처음 프로젝트를 빌딩한 분께 여쭤봐서 같이 고민해봤는데도 “애매하네..” 와 같은 결론만 나왔다. 굉장히 묘한 문제는 맞는 것 같다.
03 메세지를 영속화 하는 문제
이건 그냥 실수인데, 바꾸기에는 해당 부분 코드가 너무 비대해졌다. 카프카가 메세지를 바로 디스크에 영속화 하는것은 알고 있지만 자세한 실제 구현사항을 나중에 알게되었다.
책에서 본 바로는 일단 기본적으로 바로 영속화를 하고, 지금 작업중인 세그먼트를 포함해서 최근의 세그먼트를 페이지캐시해서 속도를 유지한다고 한다. 그래서 실시간성이 중요한 프로젝트에서는 특히 프로듀서의 메세지 생성과 컨슈머의 소비 시간의 격차가 캐시할 수 있는 세그먼트 범주에 들어있어야 제성능이 나온다고 한다.
이걸 자세히 보기 전에 내 의식의 흐름은 아래와 같았다.
1. 카프카는 메세지를 디스크에 쓴다 -> 맞음
2. 카프카는 빠르다 -> 맞음
3. 그러면 분명히 메모리에 올릴거야 -> 맞음
4. 정확히는 컨슈머가 아직 안빼간 데이터를 메모리에 두고 있다가 디스크에 플러시하겠지? -> 틀림
5. 컨슈머가 늦게 빼가면 그걸 메모리에 올리는 별도의 로직이 있을꺼야! -> 틀림
이 치명적인 오류를 가졌음에도, ‘아니 이방법 말고 디스크에 쓰는놈이 어떻게 빠를 수 있는데’와 같은 마음을 먹고 당당하게 구현을 시작했다.
pub fn new(log_dir: PathBuf, config: StoreConfig) -> Self {
let (flush_sender, flush_receiver) = mpsc::channel(100);
let is_running = Arc::new(AtomicBool::new(true));
let segments = Arc::new(RwLock::new(HashMap::new()));
let store = Self {
log_dir,
segments: segments.clone(),
config: config.clone(),
flush_sender,
is_running: is_running.clone(),
};
// 플러시 작업을 처리할 백그라운드 태스크 시작
store.start_flush_task(flush_receiver, config.flush_interval);
store
}
// 대충 주기적인 플러시와 롤링이 일어날때 명시적인 플러시를 당당하게 구현해놓은 함수
fn start_flush_task(
&self,
mut flush_receiver: mpsc::Receiver<FlushMessage>,
interval_duration: Duration,
) {
let segments = self.segments.clone();
let is_running = self.is_running.clone();
tokio::spawn(async move {
let mut interval = interval(interval_duration);
while is_running.load(Ordering::SeqCst) {
tokio::select! {
_ = interval.tick() => {
// 주기적인 플러시
let segments_guard = segments.read().await;
for (_topic_partition, cache) in segments_guard.iter() {
for (_base_offset, segment) in cache.segments.iter() {
let mut segment = segment.write().await;
if let Err(e) = DiskMessageStore::flush_segment(&mut segment).await {
eprintln!("Error flushing segment: {:?}", e);
}
}
}
}
Some(msg) = flush_receiver.recv() => {
match msg {
FlushMessage::Flush(topic_partition, base_offset) => {
let segments_guard = segments.read().await;
if let Some(cache) = segments_guard.get(&topic_partition) {
if let Some(segment) = cache.segments.get(&base_offset) {
let mut segment = segment.write().await;
if let Err(e) = DiskMessageStore::flush_segment(&mut segment).await {
eprintln!("Error flushing segment: {:?}", e);
}
}
}
}
FlushMessage::Shutdown => break,
}
}
}
}
});
}
지난번 레디스를 구현 할 때 배웠던 mpsc까지 써가며 열심히 플러시 로직을 구현했다. 주기적으로 메모리 버퍼를 락걸고 플러시하거나, 쓰기요청때 정해진 버퍼가 가득 차면 플러시를 하도록 하는 코드를 구현하고 매우 뿌듯하게 만족하고 있었다.
#[async_trait]
impl MessageOutgoingPort for DiskMessageStore {
async fn store_message(&self, message: KafkaMessage) -> Result<u64> {
let segment = self
.get_or_create_segment(
&TopicPartition {
topic: message.topic.clone(),
partition: message.partition,
},
message.offset,
)
.await?;
let mut segment = segment.write().await;
let new_offset = segment.allocate_offset();
let mut message = message.clone();
message.offset = new_offset;
// 메모리 버퍼에 추가 시도
if !segment.buffer.add_message(new_offset, message.clone()) {
// 메모리 제한에 도달한 경우 먼저 플러시
DiskMessageStore::flush_segment(&mut segment).await?;
// 플러시 후 다시 시도
if !segment.buffer.add_message(new_offset, message) {
return Err(DomainError::StorageError("Message too large for buffer".to_string()).into());
}
}
// 버퍼가 가득 찼거나 설정된 크기에 도달하면 플러시
if segment.buffer.is_full() || segment.buffer.messages.len() >= self.config.max_buffer_size {
match DiskMessageStore::flush_segment(&mut segment).await {
Ok(_) => (),
Err(e) => {
eprintln!("Flush failed: {:?}, retrying...", e);
for _ in 0..3 { // 최대 3번 재시도
if DiskMessageStore::flush_segment(&mut segment).await.is_ok() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
Ok(new_offset)
}
// ...
그리고 위와 같은 로직을 작성하고 열심히 통합테스트를 돌리다 위화감을 느끼고 검색한 결과 실제 구현로직을 알게되었다. (놀랍게도 사전에 읽은 책에도 분명히 나와있던 내용이었다.)
즉, 메모리에 메시지를 올려두고 있다가 플러시를 수행하는 방식을 구현했지만, 실제 카프카는 다르게 동작한다는 것을 나중에 깨달았다.
카프카는 메시지를 바로 디스크에 기록하지만, OS의 페이지 캐시를 활용하여 성능을 최적화한다.
그렇기 때문에 애초에 내가 구현한 것처럼 직접 메모리에 유지하고 있다가 플러시하는 방식이 필요하지 않았다.
결국 아래와 같은 문제가 발생했다.
- 설계가 복잡해짐
• 불필요한 플러시 타이머와 명시적인 플러시 요청 로직이 추가됨.
• segments를 RwLock으로 보호해야 하는 등, 동기화 비용 증가.
- 성능이 예상보다 낮음
• 초반에는 성능이 좋아 보였지만, 실제로 메모리 소비가 많고, 특정 조건에서 성능이 급격히 저하됨.
• OS 페이지 캐시를 활용하는 것이 아니라, 직접 메모리를 관리하려다 보니 비효율적인 설계가 되어버림.
결국 이 부분을 수정하려 했지만, 이미 해당 구조를 전제로 한 많은 코드가 있었기 때문에 완전히 다시 짜야 하는 상황이었다.
그래서 당장은 그대로 두었고, 추후 개선 포인트로 남겨두었다.
결론
프로젝트 진행상태 리뷰
- 일단 codecrafters 사이트에서 나와있는 모든 카프카 관련 기능들은 클론을 마쳤다.
- 추가적으로 프로듀서와 컨슈머 관련 api를 프로토콜 기반해서 스스로 문제를 정의해서 구현을 했다.
- producer api - 토픽, 파티션에 메세지 저장, 오프셋 기반으로 저장하기, 세그먼트와 인덱스파일 저장하기
- consumer api - 특정 오프셋부터 읽어가기 등
- 아직 구현하지 못한 부분이지만 가장 아쉬운 것들 위주로 작성하면 아래와 같다.
- 리밸런싱, 혹은 키값을 지정해서 자동으로 파티션 할당 (메세지 입력순서 보장을 위한 핵심적인 기능이라고 생각한다.)
- 마찬가지로 컨슈머 그룹 관리 및 자동으로 파티션 할당 (라운드로빈)
- 위의 것들을 구현하면 뭔가 클라이언트를 실제로 붙일 수 있을 것 같기에 위의 과제를 가장 최우선적으로 보충해보고 싶다.
헥사고날 아키텍처에 익숙해졌나요?
반은 맞고 반은 틀린것같다. 상기한 이유로 애초에 잘 맞는지 모르겠으며 개인적인 “지금까지의 느낌"을 약간만 보충하면 헥사고날 아키텍처보다는 DDD가 어려운 것 같다. 사실 잘 어울리는 구조였는지는 모르겠다. 그리고 DDD를 잘 해야 헥사고날을 진짜로 잘 할 수 있을 것 같은 느낌이 들었고, DDD를 잘 하려면 처음부터 해당 도메인과 비즈니스 로직이 명확하게 잘 알고 있는 상태로 프로젝트의 첫 발을 놓아야 한다는 생각이 들었다.
구조 자체에 대한 이해는 어느정도 피상적으로 한 것 같지만, 본질적인 부분은 DDD인 것 같았다는 것 자체가 배움이었던 것 같다.
그래도 헥사고날 아키텍처에서 사용하는 관용구들과 컨벤션 이름들에 익숙해진 시간은 된 것 같다.
메세지 브로커에 대해서 잘 알게 되었나요?
이전의 나와 이 프로젝트를 진행 한 이후의 나를 비교하면 매우 그런것 같다. 실제 구현을 하고 실수를 해보면서 카프카의 구조에 대해서 훨씬 더 잘 알게 된 것 같고. “왜 이렇게 되어있지?” 와 같은 부분에 대한 답변을 조금이나마 더 할 수 있게 된 것 같다.