Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,6 @@ async fn handle_connection(stream: TcpStream, db: Db) -> Result<()> {
loop {
// Check for pubsub messages if subscribed
if client.is_subscribed() {
// First, process any remaining data in the buffer before waiting
if !buffer.is_empty() {
process_buffer(&mut buffer, &mut writer, &db, &mut client).await?;
writer.flush().await?;
continue;
}
tokio::select! {
// Wait for pubsub messages
Some(msg) = client.pubsub_rx.recv() => {
Expand Down Expand Up @@ -186,11 +180,6 @@ async fn handle_connection(stream: TcpStream, db: Db) -> Result<()> {
}
} else {
// Not subscribed - simple read loop
// First, process any remaining data in the buffer
if !buffer.is_empty() {
process_buffer(&mut buffer, &mut writer, &db, &mut client).await?;
continue;
}
let n = reader.read_buf(&mut buffer).await?;
if n == 0 {
println!("Connection closed");
Expand Down
22 changes: 17 additions & 5 deletions src/resp/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,25 @@ fn parse_bulk_string(buffer: BytesMut) -> Result<(Value, usize)> {
return Err(anyhow::anyhow!("Invalid bulk string format {:?}", buffer));
};

let end_of_bulk_str = bytes_consumed + bulk_str_len as usize;
if bulk_str_len == -1 {
return Ok((Value::NullBulk, bytes_consumed));
}
if bulk_str_len < 0 {
return Err(anyhow::anyhow!("Invalid bulk string length"));
}

let bulk_str_len = bulk_str_len as usize;
let end_of_bulk_str = bytes_consumed + bulk_str_len;
let total_parsed = end_of_bulk_str + 2;

Ok((
Value::BulkString(String::from_utf8(buffer[bytes_consumed..end_of_bulk_str].to_vec())?),
total_parsed,
))
if buffer.len() < total_parsed {
return Err(anyhow::anyhow!("Incomplete bulk string data"));
}

let data = &buffer[bytes_consumed..end_of_bulk_str];
let s = String::from_utf8(data.to_vec())?;

Ok((Value::BulkString(s), total_parsed))
Comment on lines +61 to +71
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate trailing CRLF before accepting bulk payload.

Line 64 only checks length, but not that the two trailer bytes are \r\n. A malformed frame can be parsed as valid and consume unrelated bytes, which can corrupt subsequent message boundaries.

Suggested fix
     let bulk_str_len = bulk_str_len as usize;
     let end_of_bulk_str = bytes_consumed + bulk_str_len;
     let total_parsed = end_of_bulk_str + 2;

     if buffer.len() < total_parsed {
         return Err(anyhow::anyhow!("Incomplete bulk string data"));
     }
+    if buffer[end_of_bulk_str] != b'\r' || buffer[end_of_bulk_str + 1] != b'\n' {
+        return Err(anyhow::anyhow!("Invalid bulk string terminator"));
+    }

     let data = &buffer[bytes_consumed..end_of_bulk_str];
     let s = String::from_utf8(data.to_vec())?;

     Ok((Value::BulkString(s), total_parsed))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let end_of_bulk_str = bytes_consumed + bulk_str_len;
let total_parsed = end_of_bulk_str + 2;
Ok((
Value::BulkString(String::from_utf8(buffer[bytes_consumed..end_of_bulk_str].to_vec())?),
total_parsed,
))
if buffer.len() < total_parsed {
return Err(anyhow::anyhow!("Incomplete bulk string data"));
}
let data = &buffer[bytes_consumed..end_of_bulk_str];
let s = String::from_utf8(data.to_vec())?;
Ok((Value::BulkString(s), total_parsed))
let end_of_bulk_str = bytes_consumed + bulk_str_len;
let total_parsed = end_of_bulk_str + 2;
if buffer.len() < total_parsed {
return Err(anyhow::anyhow!("Incomplete bulk string data"));
}
if buffer[end_of_bulk_str] != b'\r' || buffer[end_of_bulk_str + 1] != b'\n' {
return Err(anyhow::anyhow!("Invalid bulk string terminator"));
}
let data = &buffer[bytes_consumed..end_of_bulk_str];
let s = String::from_utf8(data.to_vec())?;
Ok((Value::BulkString(s), total_parsed))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/resp/parser.rs` around lines 61 - 71, Before returning the BulkString,
validate that the two trailer bytes are CRLF: after computing end_of_bulk_str
and total_parsed, check buffer[end_of_bulk_str] == b'\r' and
buffer[end_of_bulk_str + 1] == b'\n' (using bytes_consumed, bulk_str_len to
locate them); if they are not CRLF return an error (e.g., Incomplete or Invalid
bulk string terminator) instead of accepting the payload and converting data to
String for Value::BulkString.

}

fn read_until_crlf(buffer: &[u8]) -> Option<(&[u8], usize)> {
Expand Down