mpsc๋ž€?


Rust์˜ mpsc ์ฑ„๋„์€ “์—ฌ๋Ÿฌ ์ƒ์‚ฐ์ž (Multiple Producer)“์™€ “ํ•˜๋‚˜์˜ ์†Œ๋น„์ž (Single Consumer)“๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๊ณ  ์ฒ˜๋ฆฌ ํ•  ์ˆ˜ ์žˆ๋Š” ๋น„๋™๊ธฐ ํ†ต์‹  ๋„๊ตฌ์ด๋‹ค. ์•„์ด๋””์–ด๋„ ์•„์ด๋””์–ด์ง€๋งŒ, ๊ธฐ๋ณธ์ ์œผ๋กœ ์„ค๊ณ„์™€ ๋™์ž‘์ด Rust์˜ ์†Œ์œ ๊ถŒ๊ณผ ๋™์‹œ์„ฑ ๋ชจ๋ธ์— ๊ธฐ๋ฐ˜์„ ๋‘๊ณ  ์žˆ๋‹ค.

๊ธฐ๋ณธ์ ์ธ ์‚ฌ์šฉ ๋ฐฉ์‹์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

mpsc::channel

  • tx (์ƒ์‚ฐ์ž), rx(์†Œ๋น„์ž) ๋ฅผ ๋ฐ˜ํ™˜๋ฐ›๋Š”๋‹ค.
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // ์ฑ„๋„ ์ƒ์„ฑ (๋ฒ„ํผ ํฌ๊ธฐ: 32)
    let (tx, mut rx) = mpsc::channel(32);

    // ์ƒ์‚ฐ์ž (Producer)
    tokio::spawn(async move {
        for i in 1..=5 {
            // tx.send()๋กœ ์ฑ„๋„์— ๋ฉ”์„ธ์ง€๋ฅผ ๋ณด๋‚ธ๋‹ค.
            if let Err(_) = tx.send(format!("Message {}", i)).await {
                println!("Receiver dropped");
                return;
            }
            println!("Sent: Message {}", i);
        }
    });

    // ์†Œ๋น„์ž (Consumer) : rx.recv()๋กœ ์ˆ˜์‹ ํ•œ ๋ฉ”์„ธ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•œ๋‹ค.
    while let Some(message) = rx.recv().await {
        println!("Received: {}", message);
    }
}
Sent: Message 1
Received: Message 1
Sent: Message 2
Received: Message 2
...

Sender์˜ ๋ณต์ œ

  • tx์—๋Š” clone()์ด ๊ตฌํ˜„๋˜์–ด ์žˆ์–ด, ์—ฌ๋Ÿฌ ์ƒ์‚ฐ์ž๋ฅผ ๋ณต์ œํ•  ์ˆ˜ ์žˆ๋‹ค.
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    // Sender๋ฅผ ๋ณต์ œ
    let tx1 = tx.clone();
    let tx2 = tx.clone();

    // ์ฒซ ๋ฒˆ์งธ ์ƒ์‚ฐ์ž
    tokio::spawn(async move {
        tx1.send("From Producer 1").await.unwrap();
    });

    // ๋‘ ๋ฒˆ์งธ ์ƒ์‚ฐ์ž
    tokio::spawn(async move {
        tx2.send("From Producer 2").await.unwrap();
    });

    // ์†Œ๋น„์ž
    while let Some(message) = rx.recv().await {
        println!("Received: {}", message);
    }
}
Received: From Producer 1
Received: From Producer 2
  • ๊ทธ ์™ธ์—๋„ ๋ฒ„ํผ์˜ ํฌ๊ธฐ๋ฅผ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๊ณ , ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐจ๋ฉด send ํ˜ธ์ถœ์ด ๋ธ”๋ก์ƒํƒœ๊ฐ€ ๋œ๋‹ค.(์ฆ‰ ์ปจ์Šˆ๋จธ์˜ ์ฒ˜๋ฆฌ ์†๋„๊ฐ€ ๊ฑฑ์ •๋œ๋‹ค๋ฉด, ์ ๋‹นํ•œ ๋ฒ„ํผ์˜ ํฌ๊ธฐ๋กœ ์กฐ์ ˆ ํ•  ์ˆ˜ ์žˆ๋‹ค.)
  • ๋˜ํ•œ ๋ฃจํ”„์˜ ์ƒํƒœ๋ฅผ ๋ณด๊ณ  send๋ฅผ ํ•˜๊ฑฐ๋‚˜ ๋Œ€๊ธฐ๋ฅผ ํ•˜๋Š”๊ฒƒ๋„ send, try_send์™€ ๊ฐ™์€ ๋ฉ”์„œ๋“œ๋“ค๋กœ ๊ตฌ๋ถ„ ํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ  ๋งค์šฐ ์ผ๋ฐ˜์ ์ธ ๋ฐฉ๋ฒ•์ธ ๊ฒƒ ๊ฐ™์€๋ฐ, ๋ฒ„ํผ์˜ ํฌ๊ธฐ๋ฅผ ์กฐ์ ˆํ•˜๊ธฐ๋„ ํ•˜์ง€๋งŒ ํ ์ž์ฒด๋ฅผ ๋‚˜๋ˆ ์„œ ๋ถ„๋ฐฐ ํ•  ์ˆ˜ ๋„ ์žˆ๋‹ค.

let (client_tx, mut client_rx) = tokio::sync::mpsc::channel::<RedisEvent>(32);
let (replication_tx, mut replication_rx) = tokio::sync::mpsc::channel::<RedisEvent>(32);

tokio::spawn(async move {
    while let Some(event) = client_rx.recv().await {
        // ํด๋ผ์ด์–ธํŠธ ์š”์ฒญ ์ฒ˜๋ฆฌ
    }
});

tokio::spawn(async move {
    while let Some(event) = replication_rx.recv().await {
        // ๋ ˆํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ž‘์—… ์ฒ˜๋ฆฌ
    }
});

mpsc์˜ ์žฅ์ 


  • ์Šค๋ ˆ๋“œ ๊ฐ„ ์•ˆ์ „ํ•œ ๋ฐ์ดํ„ฐ ๊ตํ™˜
    • mpsc ์ฑ„๋„์€ ์ž˜ ์„ค๊ณ„ํ•˜๋ฉด Rust์˜ ์†Œ์œ ๊ถŒ๊ณผ ๋™์‹œ์„ฑ ๋ชจ๋ธ์— ๊ธฐ๋ฐ˜์„ ๋‘๊ณ  ์žˆ์–ด, ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž๊ฐ€ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์—์„œ ๋™์ž‘ํ•ด๋„ ๋ฐ์ดํ„ฐ ๊ฒฝํ•ฉ ์—†์ด ์•ˆ์ „ํ•˜๋‹ค.
    • ์‹ฌ์ง€์–ด ๋‚ด๋ถ€์ ์œผ๋กœ Arc๋‚˜ Mutex์—†์ด ์„ค๊ณ„๋˜์–ด ๊ณ ์„ฑ๋Šฅ์„ ๋ณด์žฅํ•œ๋‹ค๊ณ  ํ•œ๋‹ค.
  • ์ƒ์‚ฐ์ž ํ™•์žฅ์„ฑ
    • mpsc์˜ Sender๋ฅผ ํด๋ก ํ•˜์—ฌ ์—ฌ๋Ÿฌ ์ƒ์‚ฐ์ž๋ฅผ ์ƒ์„ฑ ํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ์ž‘์—… ๋””์ปคํ”Œ๋ง
    • ๊ตฌ์กฐ๋Œ€๋กœ ์ž‘์„ฑํ•œ๋‹ค๋ฉด, ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž๋Š” ์„œ๋กœ ๋…๋ฆฝ์ ์œผ๋กœ ๋™์ž‘ํ•œ๋‹ค.
    • ์ƒ์‚ฐ์ž๋Š” ๋ฉ”์„ธ์ง€๋ฅผ ๋ณด๋‚ด๋†“๊ณ , ๋‹ค์Œ ์ž‘์—…์œผ๋กœ ๋„˜์–ด๊ฐˆ ์ˆ˜ ์žˆ๊ณ  ์†Œ๋น„์ž๋Š” ํ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊บผ๋‚ด ์ฒ˜๋ฆฌ ํ•  ๋ฟ์ด๋‹ค.
  • ๋ฐฑํ”„๋ ˆ์…” ๊ด€๋ฆฌ
    • mpsc๋Š” ๋ฒ„ํผ๋ง๊ธฐ๋Šฅ์„ ์ž์ฒด์ ์œผ๋กœ ์ง€์›ํ•˜๊ณ , ๊ด€๋ จํ•œ ์ธํ„ฐํŽ˜์ด์Šค๋„ ์ž˜ ๋น ์ ธ์žˆ๋‹ค.
    • ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐจ๋Š” ์ •๋„, ์†Œ๋น„์ž์˜ ์ฒ˜๋ฆฌ์†๋„๋ฅผ ๊ณ ๋ คํ•ด์„œ ์‚ฌ์šฉ์ž๊ฐ€ ์œ ๋™์ ์œผ๋กœ ์กฐ์ ˆ ํ•  ์ˆ˜ ์žˆ๊ณ 
    • ๋ฐ˜๋Œ€๋กœ ์—ฌ๋Ÿฌ ์†Œ๋น„์ž๋ฅผ ๋‘ฌ์„œ ๋ถ„๋ฐฐ๋ฅผ ํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ฒฐ๋ก 


  1. ์ด์ƒ์ ์œผ๋กœ ์ž˜ ์„ค๊ณ„๋งŒ ํ•œ๋‹ค๋ฉด ๋ฉ€ํ‹ฐ์Šค๋ ˆ๋“œ์˜ ์žฅ์ ๊ณผ (์ด๋ฒคํŠธ๋ฃจํ”„ ๊ธฐ๋ฐ˜์˜)์‹ฑ๊ธ€์Šค๋ ˆ๋“œ ํ”„๋กœ๊ทธ๋žจ์˜ ์žฅ์ ์„ ๋™์‹œ์— ์ทจํ•  ์ˆ˜ ์žˆ์„ ๊ฒƒ ๊ฐ™๋‹ค.
  2. ์•„๋งˆ๋„ ๋ฝ๊ฐ™์€ ๊ฒฝํ•ฉ ์ž์›๋“ค์„ ์ปจ์Šˆ๋จธ์ชฝ์— ๋ชฐ๋นตํ•˜๋Š” ๊ตฌ์กฐ๊ฐ€ ๋  ๊ฒƒ ๊ฐ™์€๋ฐ, ์ด์™€ ๊ฐ™์€ ๊ธฐ๋ฐ˜์œผ๋กœ ์„ค๊ณ„๋ฅผ ์ฒ˜์Œ๋ถ€ํ„ฐ ์ž˜ ํ•˜๊ณ  ๋“ค์–ด๊ฐ€์•ผ ์‚ฌ์šฉ ํ•  ์ˆ˜ ์žˆ์„ ๊ฒƒ ๊ฐ™๋‹ค.