#[tokio::main]
async fn main() {
    // 1. ํ•„์š”ํ•œ ์„ค์ •, ๋ฐ์ดํ„ฐ ๋“ฑ์„ ์„ธํŒ…ํ•จ.
    let state = StateManager::new();
    let config_handler = ConfigHandler::new(state.get_db(), state.get_config(), state.get_replication_config());

    config_handler.load_config().await;
    config_handler.configure_db().await;
    config_handler.configure_replication().await;

    let port = config_handler.get_port().await;
    let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).await.unwrap();

    println!("Listening on port {}", port);

    loop {
        match listener.accept().await {
            // 2. ์ŠคํŠธ๋ฆผ์„ ๋”ฐ์„œ, ์ŠคํŠธ๋ฆผ์„ handle_client๋กœ ์ „๋‹ฌ
            Ok((stream, _)) => {
                let db = state.get_db();
                let config = state.get_config();
                let replication_config = state.get_replication_config();
                task::spawn(async move {
                    // 3. ๋”ฑ ๋ด๋„ ์•Œ ์ˆ˜ ์žˆ๊ฒ ์ง€๋งŒ, ์ผ๋‹จ ๊ฒฝํ•ฉ์ž์›์ด๊ณ  ๋ญ๊ณ  ๋‹ค ๋„˜๊ฒจ๋ฒ„๋ฆฐ๋‹ค.
                    handle_client(stream, db, config, replication_config).await;
                });
            }
            Err(e) => {
                println!("Error accepting connection: {}", e);
            }
        }
    }
}

// === ์ดํ•˜ handler.rs ===
pub async fn handle_client(mut stream: TcpStream, db: Db, config: Config, replication_config: ReplicationConfig) {
    let mut buffer = [0; 512];
    loop {
        buffer.fill(0);
        // 1. (์ง์ ‘ ๋ฐ›์•„์˜จ ์ŠคํŠธ๋ฆผ์—์„œ) ๋ฒ„ํผ๋กœ ์š”์ฒญ ๋ฉ”์„ธ์ง€๋ฅผ ์ฝ๋Š”๋‹ค.
        match stream.read(&mut buffer).await {
            Ok(0) => break,
            Ok(n) => {
                let message = match std::str::from_utf8(&buffer[..n]) {
                    Ok(msg) => msg,
                    Err(_) => {
                        println!("Failed to parse message as UTF-8");
                        continue;
                    }
                };

                println!("Received message: {:?}", message);
                // 2. ๋ฉ”์„ธ์ง€๋ฅผ ํŒŒ์‹ฑํ•ด์„œ, Command ๊ฐ์ฒด๋ฅผ (์ •ํ™•ํžˆ๋Š” enum)์„ ๋งŒ๋“ค์–ด์ค€๋‹ค.
                match CommandParser::parse_message(message) {
                    Ok(command) => {
                        // 3. ์ปค๋งจ๋“œ๋ฅผ ์‹คํ–‰ํ•˜๋Š”, ๋ฉ”์„œ๋“œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๊ฒฝํ•ฉ์ž์›๋“ค์„ ๋ฌผ๊ณ  ๋“ค์–ด๊ฐ„๋‹ค(stream, db, ๋“ฑ๋“ฑ...)
                        if let Err(e) = command.handle_command(&mut stream, Arc::clone(&db), Arc::clone(&config), replication_config.clone()).await {
                            println!("Failed to send response: {}", e);
                        }
                    }
                    Err(e) => {
                        println!("Failed to parse command: {}", e);
                    }
                }
            }
            Err(e) => {
                println!("Error reading from stream: {}", e);
                break;
            }
        }
    }
}


// === ์ดํ•˜ command.rs ===
 pub async fn handle_command(
        &self,
        stream: &mut TcpStream,
        db: Db,
        config: Config,
        replication_config: ReplicationConfig,
    ) -> std::io::Result<()> {
        let peer_addr = match stream.peer_addr() {
            Ok(addr) => addr,
            Err(_) => {
                let err_response = "-ERR Failed to retrieve client address\r\n".to_string();
                stream.write_all(err_response.as_bytes()).await?;
                return Ok(());
            }
        };

        // 1. ์ปค๋งจ๋“œ์—์„œ ๋ฐ›์•„์˜จ ์ฒ˜๋ฆฌ ์‘๋‹ต ๊ฒฐ๊ณผ๋ฅผ ๊ฒฝํ•ฉ์ž์›์ธ stream์— ์ง์ ‘ ์“ฐ๋Š” ๋ฌธ์ œ.
        match self.execute(db, config, replication_config, peer_addr).await {
            Ok(responses) => {
                for response in responses {
                    match response {
                        CommandResponse::Simple(response) => {
                            stream.write_all(response.as_bytes()).await?;
                        }
                        CommandResponse::Bulk(data) => {
                            let header = format!("${}{}", data.len(), CRLF);
                            stream.write_all(header.as_bytes()).await?;
                            stream.write_all(&data).await?;
                        }
                        CommandResponse::EndStream => break,
                    }
                }
            }
            Err(e) => {
                let err_response = format!("-ERR {}\r\n", e);
                stream.write_all(err_response.as_bytes()).await?;
            }
        }

        Ok(())
    }
    pub async fn execute(
        &self,
        db: Db,
        config: Config,
        replication_config: ReplicationConfig,
        peer_addr: SocketAddr,
    ) -> Result<Vec<CommandResponse>, String> {
        match self {
          // ๊ฐ ์ปค๋งจ๋“œ์— ํ•ด๋‹นํ•˜๋Š” ์ฒ˜๋ฆฌ ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค.
          Command::SET { key, value, ex, px } => Ok(vec![CommandResponse::Simple(
             Self::execute_set(key, value, *ex, *px, db).await,
           )]),
          // (์ค‘๋žต ... )
        }
    }

    async fn execute_set(key: &String, value: &String, ex: Option<u64>, px: Option<u64>, db: Db) -> String {
        let expiration_ms = match (px, ex) {
            (Some(ms), _) => Some(ms),
            (None, Some(s)) => Some(s * 1000),
            _ => None,
        };

        // 2. ์‹ค์ œ ์Šค๋ ˆ๋“œ์—์„œ ๊ฒฝํ•ฉ์ž์›์„ ์ง์ ‘์ ์œผ๋กœ ์ด์šฉํ•œ๋‹ค.
        db.write().await.insert(key.clone(), ValueEntry::new_relative(value.clone(), expiration_ms));
        // 3. ์‹ฌ์ง€์–ด ๋ ˆํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ „ํŒŒ์™€ ๊ฐ™์€ ๋กœ์ง์ด ๋Š˜์–ด๋‚œ๋‹ค๋ฉด ๋˜ํ•˜๋‚˜์˜ ๊ฒฝํ•ฉ์ž์›์„ ์ƒ์„ฑํ•œ๋‹ค.
        format!("{}OK{}", SIMPLE_STRING_PREFIX, CRLF)
    }



// ====== ์ดํ•˜ ์ง„ํ–‰ํ•˜๊ณ  ์žˆ๋Š” ๋ฆฌํŒฉํ† ๋ง =======
#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    let client_manager = ClientManager::new();
    let (tx, mut rx) = mpsc::channel::<RedisEvent>(32);

    let db = Arc::new(tokio::sync::RwLock::new(Default::default()));
    let config = Arc::new(tokio::sync::RwLock::new(Default::default()));
    let replication_config = Arc::new(tokio::sync::RwLock::new(Default::default()));

    //# client manager ์ž์ฒด๋ฅผ ๋„˜๊ธฐ๊ณ , ์•„๋ž˜ spawn์—์„œ๋Š” client manager๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ 
    //# client์ถ”๊ฐ€, ์‚ญ์ œ๋ฅผ RedisEvent์— ๋” ์ถ”๊ฐ€ํ•ด์„œ ๋„˜๊ธฐ๊ธฐ (RedisEvent::AddClient(...), RemoveClient(...))
    //# db, config, replication_config, cilent_manager๋Š” ๋ชจ๋‘ event handler์—์„œ๋งŒ ์‚ฌ์šฉํ•˜๊ฒŒ ๋ฐ”๋€Œ๊ฒŒ ๋˜์–ด์„œ
    //# Arc, RwLock ๋“ฑ ์‚ญ์ œ
    let event_handler = EventHandler::new(
        db.clone(),
        config.clone(),
        replication_config.clone(),
        client_manager.clients.clone(), //# client_manager์ž์ฒด๋ฅผ ๋„˜๊ธฐ๊ณ , 
    );

    let event_publisher = EventPublisher::new(tx);

    tokio::spawn(async move {
        while let Ok((stream, addr)) = listener.accept().await {
            //# id๋Š” atomic๊ฐ™์€๊ฒƒ์„ ์ด์šฉํ•ด์„œ uniqueํ•จ์„ ๋ณด์žฅ
            let client_id = addr.port() as u64;
            
            //# AddClient event๋กœ ์ˆ˜์ •ํ•˜๋ฉด์„œ
            //# stream์„ split ์‹œ์ผœ์„œ ์ฝ๋Š” ์ชฝ๊ณผ ์“ฐ๋Š”์ชฝ์„ ๋ถ„๋ฆฌ. 
            //# ์ฝ๋Š”์ชฝ์€ ์•„๋ž˜spawn ๋‚ด๋ถ€์—์„œ ์‚ฌ์šฉํ•˜๊ณ , ์“ฐ๋Š”์ชฝ์€ client์—์„œ ์‚ฌ์šฉ
            //# https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.split
            client_manager.add_client(client_id, stream.try_clone().unwrap()).await;

            let publisher = event_publisher.clone();
            //# ๋งค๋‹ˆ์ €๋Š” event_handler์—์„œ๋งŒ ์กด์žฌ
            let manager = client_manager.clone();

            tokio::spawn(async move {
                //# ๋ถ„๋ฆฌํ•œ sream์ค‘ reader ์‚ฌ์šฉ
                let mut stream = manager.get_stream(client_id).await.unwrap().write().await;
                let mut buffer = [0; 1024];

                loop {
                    let bytes_read = match stream.read(&mut buffer).await {
                        Ok(0) => break,
                        Ok(n) => n,
                        Err(_) => {
                            eprintln!("Failed to read from client {}", client_id);
                            break;
                        }
                    };

                    //# ๋ณด๋‚ธ ๋ฐ์ดํ„ฐ๋ฅผ ํ•œ๋ฒˆ์— ๋‹ค ์ฝ๋Š”๋‹ค๋Š” ๋ณด์žฅ์ด ์—†๊ธฐ ๋•Œ๋ฌธ์—
                    //# ๋ณด๋‚ผ๋•Œ๋„ byte len๊ฐ™์€๊ฑธ ํ•ด๋”์— ๋‹ด์•„ ๋ณด๋‚ด๊ณ 
                    //# ์ฝ์„๋•Œ๋„ ํ•ด๋‹น ๋ฐ”์ดํŠธ๋ฅผ ๋‹ค ์ฝ์„๋•Œ๊นŒ์ง€ ๊ณ„์† ๋Œ๋ฉด์„œ buffer๋ฅผ ์ฑ„์›Œ์•ผ ํ•จ.
                    let input = String::from_utf8_lossy(&buffer[..bytes_read]).to_string();
                    if let Err(e) = publisher.publish(client_id, input.clone()).await {
                        eprintln!("Error publishing event: {}", e);
                    }
                }

                manager.remove_client(client_id).await;
            });
        }
    });

    while let Some(event) = rx.recv().await {
        event_handler.handle_event(event).await;
    }
}

//# ์•„๋ž˜์— ๊ฐ์กฑ Lock, Arc๋Š” ํ•„์š”์„ฑ์ด ์—†๋‹ค๋ฉด ๋ชจ๋‘ ์‚ญ์ œ
pub type SharedClients = Arc<RwLock<HashMap<u64, Arc<Client>>>>;

pub struct ClientManager {
    clients: SharedClients,
}

impl ClientManager {
    pub fn new() -> Self {
        Self {
            clients: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    pub async fn add_client(&self, client_id: u64, client: Client) {
        let mut clients = self.clients.write().await;
        clients.insert(client_id, Arc::new(client));
    }

    pub async fn remove_client(&self, client_id: u64) {
        let mut clients = self.clients.write().await;
        clients.remove(&client_id);
    }

    pub async fn get_client(&self, client_id: u64) -> Option<Arc<Client>> {
        let clients = self.clients.read().await;
        clients.get(&client_id).cloned()
    }
}

#[derive(Debug)]
pub struct Client {
    pub id: u64,
    pub stream: Arc<RwLock<TcpStream>>,
    pub connected_at: Instant,
    pub request_count: RwLock<u64>,
}

impl Client {
    pub fn new(id: u64, stream: TcpStream) -> Self {
        Self {
            id,
            stream: Arc::new(RwLock::new(stream)),
            connected_at: Instant::now(),
            request_count: RwLock::new(0),
        }
    }

    pub async fn increment_request_count(&self) {
        let mut count = self.request_count.write().await;
        *count += 1;
    }

    pub async fn get_request_count(&self) -> u64 {
        *self.request_count.read().await
    }
}