diff --git a/src/DiskQueue/IPersistentQueue.cs b/src/DiskQueue/IPersistentQueue.cs index 276c2cb..29be126 100644 --- a/src/DiskQueue/IPersistentQueue.cs +++ b/src/DiskQueue/IPersistentQueue.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; namespace DiskQueue { @@ -21,10 +22,10 @@ public interface IPersistentQueue : IDisposable /// int EstimatedCountOfItemsInQueue { get; } - /// - /// Advanced adjustable settings. Use with caution. Read the source code. - /// - IPersistentQueueImpl Internals { get; } + /// + /// Advanced adjustable settings. Use with caution. Read the source code. + /// + IPersistentQueueImpl Internals { get; } /// /// Maximum size of files in queue. New files will be rolled-out if this is exceeded. diff --git a/src/DiskQueue/Implementation/PersistentQueueImpl.cs b/src/DiskQueue/Implementation/PersistentQueueImpl.cs index 62ae266..a0054b9 100644 --- a/src/DiskQueue/Implementation/PersistentQueueImpl.cs +++ b/src/DiskQueue/Implementation/PersistentQueueImpl.cs @@ -348,10 +348,31 @@ private IFileStream WaitForTransactionLog(byte[] transactionBuffer) } } - /// - /// Assumes that entries has at least one entry. Should be called inside a lock. - /// - private bool ReadAhead() + public List ToList() + { + lock (_entries) + { + var last = _entries.Last; + if (last == null) return new List(); + + var entry = last.Value; + if (entry == null) throw new Exception("Entry queue was in an invalid state: null entry"); + + if (entry.Data == null) + { + var ok = ReadAhead(); + if (!ok) return new List(); + } + + var entryList = _entries.ToList(); + return entryList; + } + } + + /// + /// Assumes that entries has at least one entry. Should be called inside a lock. + /// + private bool ReadAhead() { long currentBufferSize = 0; diff --git a/src/DiskQueue/Implementation/PersistentQueueSession.cs b/src/DiskQueue/Implementation/PersistentQueueSession.cs index c7172f7..ac710da 100644 --- a/src/DiskQueue/Implementation/PersistentQueueSession.cs +++ b/src/DiskQueue/Implementation/PersistentQueueSession.cs @@ -139,6 +139,23 @@ private void OnReplaceStream(IFileStream newStream) return entry.Data; } + /// + /// Try to pull data all from the queue. Data is not removed from the queue + /// + public List ToList() + { + var retList = new List(); + var entries = _queue.ToList(); + foreach(var entry in entries) + { + if( entry.Data != null ) + { + retList.Add(entry.Data); + } + } + return retList; + } + /// /// Commit actions taken in this session since last flush. /// If the session is disposed with no flush, actions are not persisted diff --git a/src/DiskQueue/Implementation/PersistentQueueSessionT.cs b/src/DiskQueue/Implementation/PersistentQueueSessionT.cs index c453c20..d22c2b3 100644 --- a/src/DiskQueue/Implementation/PersistentQueueSessionT.cs +++ b/src/DiskQueue/Implementation/PersistentQueueSessionT.cs @@ -1,4 +1,6 @@ -namespace DiskQueue.Implementation +using System.Collections.Generic; + +namespace DiskQueue.Implementation { /// public class PersistentQueueSession : PersistentQueueSession, IPersistentQueueSession @@ -30,5 +32,23 @@ public void Enqueue(T data) Enqueue(bytes); } } + + /// + /// Try to pull data all from the queue. Data is not removed from the queue + /// + public new List ToList() + { + var typedList = new List(); + List dataList = base.ToList(); + foreach (byte[] data in dataList) + { + T? obj = SerializationStrategy.Deserialize(data); + if( obj != null ) + { + typedList.Add(obj); + } + } + return typedList; + } } } diff --git a/src/DiskQueue/PublicInterfaces/IPersistentQueueImpl.cs b/src/DiskQueue/PublicInterfaces/IPersistentQueueImpl.cs index d87106a..6095f21 100644 --- a/src/DiskQueue/PublicInterfaces/IPersistentQueueImpl.cs +++ b/src/DiskQueue/PublicInterfaces/IPersistentQueueImpl.cs @@ -34,6 +34,11 @@ public interface IPersistentQueueImpl : IDisposable /// Entry? Dequeue(); + /// + /// List data, returning all storage entry as list + /// + List ToList(); + /// /// UNSAFE. Incorrect use will result in data loss. /// Undo Enqueue and Dequeue operations. diff --git a/src/DiskQueue/PublicInterfaces/IPersistentQueueSession.cs b/src/DiskQueue/PublicInterfaces/IPersistentQueueSession.cs index 5bffb57..0286dea 100644 --- a/src/DiskQueue/PublicInterfaces/IPersistentQueueSession.cs +++ b/src/DiskQueue/PublicInterfaces/IPersistentQueueSession.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; namespace DiskQueue { @@ -20,10 +21,15 @@ public interface IPersistentQueueSession : IDisposable byte[]? Dequeue(); /// - /// Commit actions taken in this session since last flush. - /// If the session is disposed with no flush, actions are not persisted - /// to the queue (Enqueues are not written, dequeues are left on the queue) + /// Try to pull data all from the queue. Data is not removed from the queue /// - void Flush(); + public List ToList(); + + /// + /// Commit actions taken in this session since last flush. + /// If the session is disposed with no flush, actions are not persisted + /// to the queue (Enqueues are not written, dequeues are left on the queue) + /// + void Flush(); } } diff --git a/src/DiskQueue/PublicInterfaces/IPersistentQueueSessionT.cs b/src/DiskQueue/PublicInterfaces/IPersistentQueueSessionT.cs index 7905537..a9231f7 100644 --- a/src/DiskQueue/PublicInterfaces/IPersistentQueueSessionT.cs +++ b/src/DiskQueue/PublicInterfaces/IPersistentQueueSessionT.cs @@ -1,4 +1,5 @@ -using DiskQueue.Implementation; +using System.Collections.Generic; +using DiskQueue.Implementation; namespace DiskQueue { @@ -15,6 +16,11 @@ public interface IPersistentQueueSession : IPersistentQueueSession /// new T? Dequeue(); + /// + /// Try to pull data all from the queue. Data is not removed from the queue + /// + new List ToList(); + /// /// This class performs the serialization of the object to be queued into a byte array suitable for queueing. /// It defaults to , but you are free to implement your own and inject it in.