Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/DiskQueue/IPersistentQueue.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;

namespace DiskQueue
{
Expand All @@ -21,10 +22,10 @@ public interface IPersistentQueue : IDisposable
/// </summary>
int EstimatedCountOfItemsInQueue { get; }

/// <summary>
/// Advanced adjustable settings. Use with caution. Read the source code.
/// </summary>
IPersistentQueueImpl Internals { get; }
/// <summary>
/// Advanced adjustable settings. Use with caution. Read the source code.
/// </summary>
IPersistentQueueImpl Internals { get; }

/// <summary>
/// Maximum size of files in queue. New files will be rolled-out if this is exceeded.
Expand Down
29 changes: 25 additions & 4 deletions src/DiskQueue/Implementation/PersistentQueueImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,31 @@ private IFileStream WaitForTransactionLog(byte[] transactionBuffer)
}
}

/// <summary>
/// Assumes that entries has at least one entry. Should be called inside a lock.
/// </summary>
private bool ReadAhead()
public List<Entry> ToList()
{
lock (_entries)
{
var last = _entries.Last;
if (last == null) return new List<Entry>();

var entry = last.Value;
if (entry == null) throw new Exception("Entry queue was in an invalid state: null entry");

if (entry.Data == null)
{
var ok = ReadAhead();
if (!ok) return new List<Entry>();
}

var entryList = _entries.ToList();
return entryList;
}
}

/// <summary>
/// Assumes that entries has at least one entry. Should be called inside a lock.
/// </summary>
private bool ReadAhead()
{
long currentBufferSize = 0;

Expand Down
17 changes: 17 additions & 0 deletions src/DiskQueue/Implementation/PersistentQueueSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,23 @@ private void OnReplaceStream(IFileStream newStream)
return entry.Data;
}

/// <summary>
/// Try to pull data all from the queue. Data is not removed from the queue
/// </summary>
public List<byte[]> ToList()
{
var retList = new List<byte[]>();
var entries = _queue.ToList();
foreach(var entry in entries)
{
if( entry.Data != null )
{
retList.Add(entry.Data);
}
}
return retList;
}

/// <summary>
/// Commit actions taken in this session since last flush.
/// If the session is disposed with no flush, actions are not persisted
Expand Down
22 changes: 21 additions & 1 deletion src/DiskQueue/Implementation/PersistentQueueSessionT.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace DiskQueue.Implementation
using System.Collections.Generic;

namespace DiskQueue.Implementation
{
/// <inheritdoc cref="IPersistentQueueSession{T}"/>
public class PersistentQueueSession<T> : PersistentQueueSession, IPersistentQueueSession<T>
Expand Down Expand Up @@ -30,5 +32,23 @@ public void Enqueue(T data)
Enqueue(bytes);
}
}

/// <summary>
/// Try to pull data all from the queue. Data is not removed from the queue
/// </summary>
public new List<T> ToList()
{
var typedList = new List<T>();
List<byte[]> dataList = base.ToList();
foreach (byte[] data in dataList)
{
T? obj = SerializationStrategy.Deserialize(data);
if( obj != null )
{
typedList.Add(obj);
}
}
return typedList;
}
}
}
5 changes: 5 additions & 0 deletions src/DiskQueue/PublicInterfaces/IPersistentQueueImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public interface IPersistentQueueImpl : IDisposable
/// </summary>
Entry? Dequeue();

/// <summary>
/// List data, returning all storage entry as list
/// </summary>
List<Entry> ToList();

/// <summary>
/// <para>UNSAFE. Incorrect use will result in data loss.</para>
/// <para>Undo Enqueue and Dequeue operations.</para>
Expand Down
14 changes: 10 additions & 4 deletions src/DiskQueue/PublicInterfaces/IPersistentQueueSession.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;

namespace DiskQueue
{
Expand All @@ -20,10 +21,15 @@ public interface IPersistentQueueSession : IDisposable
byte[]? Dequeue();

/// <summary>
/// Commit actions taken in this session since last flush.
/// If the session is disposed with no flush, actions are not persisted
/// to the queue (Enqueues are not written, dequeues are left on the queue)
/// Try to pull data all from the queue. Data is not removed from the queue
/// </summary>
void Flush();
public List<byte[]> ToList();

/// <summary>
/// Commit actions taken in this session since last flush.
/// If the session is disposed with no flush, actions are not persisted
/// to the queue (Enqueues are not written, dequeues are left on the queue)
/// </summary>
void Flush();
}
}
8 changes: 7 additions & 1 deletion src/DiskQueue/PublicInterfaces/IPersistentQueueSessionT.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using DiskQueue.Implementation;
using System.Collections.Generic;
using DiskQueue.Implementation;

namespace DiskQueue
{
Expand All @@ -15,6 +16,11 @@ public interface IPersistentQueueSession<T> : IPersistentQueueSession
/// </summary>
new T? Dequeue();

/// <summary>
/// Try to pull data all from the queue. Data is not removed from the queue
/// </summary>
new List<T> ToList();

/// <summary>
/// This class performs the serialization of the object to be queued into a byte array suitable for queueing.
/// It defaults to <see cref="DefaultSerializationStrategy{T}"/>, but you are free to implement your own and inject it in.
Expand Down