mpsc๋?
Rust์ mpsc ์ฑ๋์ “์ฌ๋ฌ ์์ฐ์ (Multiple Producer)“์ “ํ๋์ ์๋น์ (Single Consumer)“๋ก ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ณ ์ฒ๋ฆฌ ํ ์ ์๋ ๋น๋๊ธฐ ํต์ ๋๊ตฌ์ด๋ค. ์์ด๋์ด๋ ์์ด๋์ด์ง๋ง, ๊ธฐ๋ณธ์ ์ผ๋ก ์ค๊ณ์ ๋์์ด Rust์ ์์ ๊ถ๊ณผ ๋์์ฑ ๋ชจ๋ธ์ ๊ธฐ๋ฐ์ ๋๊ณ ์๋ค.
๊ธฐ๋ณธ์ ์ธ ์ฌ์ฉ ๋ฐฉ์์ ์๋์ ๊ฐ๋ค.
mpsc::channel
- tx (์์ฐ์), rx(์๋น์) ๋ฅผ ๋ฐํ๋ฐ๋๋ค.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// ์ฑ๋ ์์ฑ (๋ฒํผ ํฌ๊ธฐ: 32)
let (tx, mut rx) = mpsc::channel(32);
// ์์ฐ์ (Producer)
tokio::spawn(async move {
for i in 1..=5 {
// tx.send()๋ก ์ฑ๋์ ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ธ๋ค.
if let Err(_) = tx.send(format!("Message {}", i)).await {
println!("Receiver dropped");
return;
}
println!("Sent: Message {}", i);
}
});
// ์๋น์ (Consumer) : rx.recv()๋ก ์์ ํ ๋ฉ์ธ์ง๋ฅผ ์ฒ๋ฆฌํ๋ค.
while let Some(message) = rx.recv().await {
println!("Received: {}", message);
}
}
Sent: Message 1
Received: Message 1
Sent: Message 2
Received: Message 2
...
Sender
์ ๋ณต์
- tx์๋
clone()
์ด ๊ตฌํ๋์ด ์์ด, ์ฌ๋ฌ ์์ฐ์๋ฅผ ๋ณต์ ํ ์ ์๋ค.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
// Sender๋ฅผ ๋ณต์
let tx1 = tx.clone();
let tx2 = tx.clone();
// ์ฒซ ๋ฒ์งธ ์์ฐ์
tokio::spawn(async move {
tx1.send("From Producer 1").await.unwrap();
});
// ๋ ๋ฒ์งธ ์์ฐ์
tokio::spawn(async move {
tx2.send("From Producer 2").await.unwrap();
});
// ์๋น์
while let Some(message) = rx.recv().await {
println!("Received: {}", message);
}
}
Received: From Producer 1
Received: From Producer 2
- ๊ทธ ์ธ์๋ ๋ฒํผ์ ํฌ๊ธฐ๋ฅผ ์ค์ ํ ์ ์๊ณ , ๋ฒํผ๊ฐ ๊ฐ๋ ์ฐจ๋ฉด send ํธ์ถ์ด ๋ธ๋ก์ํ๊ฐ ๋๋ค.(์ฆ ์ปจ์๋จธ์ ์ฒ๋ฆฌ ์๋๊ฐ ๊ฑฑ์ ๋๋ค๋ฉด, ์ ๋นํ ๋ฒํผ์ ํฌ๊ธฐ๋ก ์กฐ์ ํ ์ ์๋ค.)
- ๋ํ ๋ฃจํ์ ์ํ๋ฅผ ๋ณด๊ณ send๋ฅผ ํ๊ฑฐ๋ ๋๊ธฐ๋ฅผ ํ๋๊ฒ๋ send, try_send์ ๊ฐ์ ๋ฉ์๋๋ค๋ก ๊ตฌ๋ถ ํ ์ ์๋ค.
๊ทธ๋ฆฌ๊ณ ๋งค์ฐ ์ผ๋ฐ์ ์ธ ๋ฐฉ๋ฒ์ธ ๊ฒ ๊ฐ์๋ฐ, ๋ฒํผ์ ํฌ๊ธฐ๋ฅผ ์กฐ์ ํ๊ธฐ๋ ํ์ง๋ง ํ ์์ฒด๋ฅผ ๋๋ ์ ๋ถ๋ฐฐ ํ ์ ๋ ์๋ค.
let (client_tx, mut client_rx) = tokio::sync::mpsc::channel::<RedisEvent>(32);
let (replication_tx, mut replication_rx) = tokio::sync::mpsc::channel::<RedisEvent>(32);
tokio::spawn(async move {
while let Some(event) = client_rx.recv().await {
// ํด๋ผ์ด์ธํธ ์์ฒญ ์ฒ๋ฆฌ
}
});
tokio::spawn(async move {
while let Some(event) = replication_rx.recv().await {
// ๋ ํ๋ฆฌ์ผ์ด์
์์
์ฒ๋ฆฌ
}
});
mpsc์ ์ฅ์
- ์ค๋ ๋ ๊ฐ ์์ ํ ๋ฐ์ดํฐ ๊ตํ
- mpsc ์ฑ๋์ ์ ์ค๊ณํ๋ฉด Rust์ ์์ ๊ถ๊ณผ ๋์์ฑ ๋ชจ๋ธ์ ๊ธฐ๋ฐ์ ๋๊ณ ์์ด, ์์ฐ์์ ์๋น์๊ฐ ๋ค๋ฅธ ์ค๋ ๋์์ ๋์ํด๋ ๋ฐ์ดํฐ ๊ฒฝํฉ ์์ด ์์ ํ๋ค.
- ์ฌ์ง์ด ๋ด๋ถ์ ์ผ๋ก Arc๋ Mutex์์ด ์ค๊ณ๋์ด ๊ณ ์ฑ๋ฅ์ ๋ณด์ฅํ๋ค๊ณ ํ๋ค.
- ์์ฐ์ ํ์ฅ์ฑ
- mpsc์ Sender๋ฅผ ํด๋ก ํ์ฌ ์ฌ๋ฌ ์์ฐ์๋ฅผ ์์ฑ ํ ์ ์๋ค.
- ์์
๋์ปคํ๋ง
- ๊ตฌ์กฐ๋๋ก ์์ฑํ๋ค๋ฉด, ์์ฐ์์ ์๋น์๋ ์๋ก ๋ ๋ฆฝ์ ์ผ๋ก ๋์ํ๋ค.
- ์์ฐ์๋ ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ด๋๊ณ , ๋ค์ ์์ ์ผ๋ก ๋์ด๊ฐ ์ ์๊ณ ์๋น์๋ ํ์์ ๋ฐ์ดํฐ๋ฅผ ๊บผ๋ด ์ฒ๋ฆฌ ํ ๋ฟ์ด๋ค.
- ๋ฐฑํ๋ ์
๊ด๋ฆฌ
- mpsc๋ ๋ฒํผ๋ง๊ธฐ๋ฅ์ ์์ฒด์ ์ผ๋ก ์ง์ํ๊ณ , ๊ด๋ จํ ์ธํฐํ์ด์ค๋ ์ ๋น ์ ธ์๋ค.
- ๋ฒํผ๊ฐ ๊ฐ๋ ์ฐจ๋ ์ ๋, ์๋น์์ ์ฒ๋ฆฌ์๋๋ฅผ ๊ณ ๋ คํด์ ์ฌ์ฉ์๊ฐ ์ ๋์ ์ผ๋ก ์กฐ์ ํ ์ ์๊ณ
- ๋ฐ๋๋ก ์ฌ๋ฌ ์๋น์๋ฅผ ๋ฌ์ ๋ถ๋ฐฐ๋ฅผ ํ ์ ์๋ค.
๊ฒฐ๋ก
- ์ด์์ ์ผ๋ก ์ ์ค๊ณ๋ง ํ๋ค๋ฉด ๋ฉํฐ์ค๋ ๋์ ์ฅ์ ๊ณผ (์ด๋ฒคํธ๋ฃจํ ๊ธฐ๋ฐ์)์ฑ๊ธ์ค๋ ๋ ํ๋ก๊ทธ๋จ์ ์ฅ์ ์ ๋์์ ์ทจํ ์ ์์ ๊ฒ ๊ฐ๋ค.
- ์๋ง๋ ๋ฝ๊ฐ์ ๊ฒฝํฉ ์์๋ค์ ์ปจ์๋จธ์ชฝ์ ๋ชฐ๋นตํ๋ ๊ตฌ์กฐ๊ฐ ๋ ๊ฒ ๊ฐ์๋ฐ, ์ด์ ๊ฐ์ ๊ธฐ๋ฐ์ผ๋ก ์ค๊ณ๋ฅผ ์ฒ์๋ถํฐ ์ ํ๊ณ ๋ค์ด๊ฐ์ผ ์ฌ์ฉ ํ ์ ์์ ๊ฒ ๊ฐ๋ค.