Skip to content

Commit 4bdcf15

Browse files
authored
Merge pull request #178 from njlr/feature/issue-170-channels
Adds channel interop functions
2 parents b10efcd + 2cb2809 commit 4bdcf15

File tree

5 files changed

+253
-77
lines changed

5 files changed

+253
-77
lines changed

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ open System.Threading.Tasks
1212
open System.Runtime.ExceptionServices
1313
#if !FABLE_COMPILER
1414
open System.Linq
15+
open System.Threading.Channels
1516
#endif
1617

1718
#nowarn "40" "3218"
@@ -374,6 +375,17 @@ module AsyncSeq =
374375
member x.MoveNext() = async { return None }
375376
member x.Dispose() = () } }
376377

378+
let emptyAsync<'T> (action : Async<unit>) : AsyncSeq<'T> =
379+
{ new IAsyncEnumerable<'T> with
380+
member x.GetEnumerator() =
381+
{ new IAsyncEnumerator<'T> with
382+
member x.MoveNext() =
383+
async {
384+
do! action
385+
return None
386+
}
387+
member x.Dispose() = () } }
388+
377389
let singleton (v:'T) : AsyncSeq<'T> =
378390
{ new IAsyncEnumerable<'T> with
379391
member x.GetEnumerator() =
@@ -1946,6 +1958,75 @@ module AsyncSeq =
19461958
#endif
19471959

19481960

1961+
#if !FABLE_COMPILER
1962+
open System.Threading.Channels
1963+
1964+
let toChannel (writer : ChannelWriter<'a>) (xs : AsyncSeq<'a>) : Async<unit> =
1965+
async {
1966+
try
1967+
do!
1968+
xs
1969+
|> iterAsync
1970+
(fun x ->
1971+
async {
1972+
if not (writer.TryWrite(x)) then
1973+
let! ct = Async.CancellationToken
1974+
1975+
do!
1976+
writer.WriteAsync(x, ct).AsTask()
1977+
|> Async.AwaitTask
1978+
})
1979+
1980+
writer.Complete()
1981+
with exn ->
1982+
writer.Complete(error = exn)
1983+
}
1984+
1985+
let fromChannel (reader : ChannelReader<'a>) : AsyncSeq<'a> =
1986+
asyncSeq {
1987+
let mutable keepGoing = true
1988+
1989+
while keepGoing do
1990+
let mutable item = Unchecked.defaultof<'a>
1991+
1992+
if reader.TryRead(&item) then
1993+
yield item
1994+
else
1995+
let! ct = Async.CancellationToken
1996+
1997+
let! hasMoreData =
1998+
reader.WaitToReadAsync(ct).AsTask()
1999+
|> Async.AwaitTask
2000+
2001+
if not hasMoreData then
2002+
keepGoing <- false
2003+
}
2004+
2005+
let prefetch (numberToPrefetch : int) (xs : AsyncSeq<'a>) : AsyncSeq<'a> =
2006+
if numberToPrefetch = 0 then
2007+
xs
2008+
else
2009+
if numberToPrefetch < 1 then
2010+
invalidArg (nameof numberToPrefetch) "must be at least zero"
2011+
asyncSeq {
2012+
let opts = BoundedChannelOptions(numberToPrefetch)
2013+
opts.SingleWriter <- true
2014+
opts.SingleReader <- true
2015+
2016+
let channel = Channel.CreateBounded(opts)
2017+
2018+
let! fillChannelTask =
2019+
toChannel channel.Writer xs
2020+
|> Async.StartChild
2021+
2022+
yield!
2023+
append
2024+
(fromChannel channel.Reader)
2025+
(emptyAsync fillChannelTask)
2026+
}
2027+
2028+
#endif
2029+
19492030

19502031
[<AutoOpen>]
19512032
module AsyncSeqExtensions =

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,25 @@ module AsyncSeq =
557557
#endif
558558
#endif
559559

560+
#if !FABLE_COMPILER
561+
562+
open System.Threading.Channels
563+
564+
/// Fills a channel writer with the values from an async seq.
565+
/// The writer will be closed when the async seq completes or raises an error.
566+
val toChannel<'T> : writer: ChannelWriter<'T> -> source: AsyncSeq<'T> -> Async<unit>
567+
568+
/// Creates an async seq from a channel reader.
569+
/// The async seq will read values from the channel reader until it is closed.
570+
/// If the reader raises an error than the sequence will raise it.
571+
val fromChannel<'T> : reader: ChannelReader<'T> -> AsyncSeq<'T>
572+
573+
/// Transforms an async seq to a new one that fetches values ahead of time to improve throughput.
574+
val prefetch<'T> : numberToPrefetch: int -> source: AsyncSeq<'T> -> AsyncSeq<'T>
575+
576+
#endif
577+
578+
560579
/// An automatically-opened module that contains the `asyncSeq` builder and an extension method
561580
[<AutoOpen>]
562581
module AsyncSeqExtensions =

src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<ItemGroup>
2424
<PackageReference Update="FSharp.Core" Version="4.7.2" />
2525
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0" />
26+
<PackageReference Include="System.Threading.Channels" Version="*" />
2627
<Content Include="*.fsproj; **\*.fs; **\*.fsi;" PackagePath="fable\" />
2728
</ItemGroup>
2829
</Project>

0 commit comments

Comments
 (0)