Skip to content

Commit d40b6d9

Browse files
Bernard Dugganbernardd
authored andcommitted
Subscription prime support
1 parent 9539806 commit d40b6d9

File tree

11 files changed

+188
-22
lines changed

11 files changed

+188
-22
lines changed

lib/absinthe.ex

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@ defmodule Absinthe do
4444
%{message: String.t()}
4545
| %{message: String.t(), locations: [%{line: pos_integer, column: integer}]}
4646

47+
@type continuation_t :: nil | [Continuation.t()]
48+
4749
@type result_t ::
48-
%{data: nil | result_selection_t}
49-
| %{data: nil | result_selection_t, errors: [result_error_t]}
50+
%{required(:data) => nil | result_selection_t,
51+
optional(:continuation) => continuation_t,
52+
optional(:errors) => [result_error_t]}
5053
| %{errors: [result_error_t]}
5154

5255
@doc """
@@ -95,7 +98,7 @@ defmodule Absinthe do
9598
max_complexity: non_neg_integer | :infinity
9699
]
97100

98-
@type run_result :: {:ok, result_t} | {:error, String.t()}
101+
@type run_result :: {:ok, result_t} | {:more, result_t} | {:error, String.t()}
99102

100103
@spec run(
101104
binary | Absinthe.Language.Source.t() | Absinthe.Language.Document.t(),
@@ -107,7 +110,23 @@ defmodule Absinthe do
107110
schema
108111
|> Absinthe.Pipeline.for_document(options)
109112

110-
case Absinthe.Pipeline.run(document, pipeline) do
113+
document
114+
|> Absinthe.Pipeline.run(pipeline)
115+
|> build_result()
116+
end
117+
118+
@spec continue([Continuation.t()]) :: run_result()
119+
def continue(continuation) do
120+
continuation
121+
|> Absinthe.Pipeline.continue()
122+
|> build_result()
123+
end
124+
125+
defp build_result(output) do
126+
case output do
127+
{:ok, %{result: %{continuation: c} = result}, _phases} when c != [] ->
128+
{:more, result}
129+
111130
{:ok, %{result: result}, _phases} ->
112131
{:ok, result}
113132

@@ -131,6 +150,7 @@ defmodule Absinthe do
131150
def run!(input, schema, options \\ []) do
132151
case run(input, schema, options) do
133152
{:ok, result} -> result
153+
{:more, result} -> result
134154
{:error, err} -> raise ExecutionError, message: err
135155
end
136156
end
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
defmodule Absinthe.Blueprint.Continuation do
2+
@moduledoc false
3+
4+
# Continuations allow further resolutions after the initial result is
5+
# returned
6+
7+
alias Absinthe.Pipeline
8+
9+
defstruct [
10+
:phase_input,
11+
:pipeline
12+
]
13+
14+
@type t :: %__MODULE__{
15+
phase_input: Pipeline.data_t,
16+
pipeline: Pipeline.t()
17+
}
18+
19+
end

lib/absinthe/blueprint/result/list.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@ defmodule Absinthe.Blueprint.Result.List do
99
:values,
1010
errors: [],
1111
flags: %{},
12-
extensions: %{}
12+
extensions: %{},
13+
continuations: []
1314
]
1415

1516
@type t :: %__MODULE__{
1617
emitter: Blueprint.Document.Field.t(),
1718
values: [Blueprint.Execution.node_t()],
1819
errors: [Phase.Error.t()],
1920
flags: Blueprint.flags_t(),
20-
extensions: %{any => any}
21+
extensions: %{any => any},
22+
continuations: [Continuation.t()]
2123
}
2224
end

lib/absinthe/blueprint/result/object.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ defmodule Absinthe.Blueprint.Result.Object do
1010
:fields,
1111
errors: [],
1212
flags: %{},
13-
extensions: %{}
13+
extensions: %{},
14+
continuations: []
1415
]
1516

1617
@type t :: %__MODULE__{
1718
emitter: Blueprint.Document.Field.t(),
1819
fields: [Blueprint.Execution.node_t()],
1920
errors: [Phase.Error.t()],
2021
flags: Blueprint.flags_t(),
21-
extensions: %{any => any}
22+
extensions: %{any => any},
23+
continuations: [Continuation.t()]
2224
}
2325
end

lib/absinthe/phase/document/result.ex

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ defmodule Absinthe.Phase.Document.Result do
2525
{:validation_failed, errors}
2626
end
2727

28-
format_result(result)
28+
result
29+
|> format_result()
30+
|> maybe_add_continuations(blueprint.execution.result)
2931
end
3032

3133
defp format_result({:ok, {data, []}}) do
@@ -134,4 +136,9 @@ defmodule Absinthe.Phase.Document.Result do
134136
end
135137

136138
defp format_location(_), do: []
139+
140+
defp maybe_add_continuations(result, %{continuations: continuations}) when continuations != [],
141+
do: Map.put(result, :continuation, continuations)
142+
143+
defp maybe_add_continuations(result, _), do: result
137144
end
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
defmodule Absinthe.Phase.Subscription.Prime do
2+
@moduledoc false
3+
4+
@spec run(any(), Keyword.t()) :: Phase.result_t()
5+
def run(blueprint, [prime_result: cr]) do
6+
{:ok, put_in(blueprint.execution.root_value, cr)}
7+
end
8+
end

lib/absinthe/phase/subscription/result.ex

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,47 @@ defmodule Absinthe.Phase.Subscription.Result do
55
# subscription
66

77
alias Absinthe.Blueprint
8+
alias Absinthe.Blueprint.Continuation
89

910
@spec run(any, Keyword.t()) :: {:ok, Blueprint.t()}
10-
def run(blueprint, topic: topic) do
11+
def run(blueprint, options) do
12+
topic = Keyword.get(options, :topic)
13+
prime = Keyword.get(options, :prime)
1114
result = %{"subscribed" => topic}
12-
{:ok, put_in(blueprint.result, result)}
15+
case prime do
16+
nil ->
17+
{:ok, put_in(blueprint.result, result)}
18+
19+
prime_fun when is_function(prime_fun, 0) ->
20+
{:ok, prime_results} = prime_fun.()
21+
22+
result =
23+
if prime_results != [] do
24+
continuations =
25+
Enum.map(prime_results, fn cr ->
26+
%Continuation{
27+
phase_input: blueprint,
28+
pipeline: [
29+
{Absinthe.Phase.Subscription.Prime, [prime_result: cr]},
30+
{Absinthe.Phase.Document.Execution.Resolution, options},
31+
Absinthe.Phase.Document.Result
32+
]
33+
}
34+
end)
35+
36+
Map.put(result, :continuation, continuations)
37+
else
38+
result
39+
end
40+
41+
{:ok, put_in(blueprint.result, result)}
42+
43+
val ->
44+
raise """
45+
Invalid prime function. Must be a function of arity 0.
46+
47+
#{inspect(val)}
48+
"""
49+
end
1350
end
1451
end

lib/absinthe/phase/subscription/subscribe_self.ex

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
2222
%{selections: [field]} = op
2323

2424
with {:ok, config} <- get_config(field, context, blueprint) do
25-
field_keys = get_field_keys(field, config)
25+
{field_keys, prime} = get_field_keys(field, config)
2626
subscription_id = get_subscription_id(config, blueprint, options)
2727

2828
for field_key <- field_keys,
2929
do: Absinthe.Subscription.subscribe(pubsub, field_key, subscription_id, blueprint)
3030

3131
{:replace, blueprint,
3232
[
33-
{Phase.Subscription.Result, topic: subscription_id},
33+
{Phase.Subscription.Result, topic: subscription_id, prime: prime},
3434
{Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])}
3535
]}
3636
else
@@ -96,8 +96,9 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
9696
defp get_field_keys(%{schema_node: schema_node} = _field, config) do
9797
name = schema_node.identifier
9898

99-
find_field_keys!(config)
100-
|> Enum.map(fn key -> {name, key} end)
99+
{keys, prime} = find_field_keys!(config)
100+
field_keys = Enum.map(keys, fn key -> {name, key} end)
101+
{field_keys, prime}
101102
end
102103

103104
defp ensure_pubsub!(context) do
@@ -132,8 +133,12 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
132133
"""
133134

134135
val ->
135-
List.wrap(val)
136-
|> Enum.map(&to_string/1)
136+
topics = List.wrap(val)
137+
|> Enum.map(&to_string/1)
138+
139+
prime = config[:prime] || nil
140+
141+
{topics, prime}
137142
end
138143
end
139144

lib/absinthe/pipeline.ex

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,39 @@ defmodule Absinthe.Pipeline do
1212
* See `Absinthe.Schema` on adjusting the schema pipeline for schema manipulation.
1313
"""
1414

15+
alias Absinthe.Blueprint.Continuation
1516
alias Absinthe.Phase
1617

1718
@type data_t :: any
1819

20+
@type run_result_t :: {:ok, data_t, [Phase.t()]} | {:error, String.t(), [Phase.t()]}
21+
1922
@type phase_config_t :: Phase.t() | {Phase.t(), Keyword.t()}
2023

2124
@type t :: [phase_config_t | [phase_config_t]]
2225

23-
@spec run(data_t, t) :: {:ok, data_t, [Phase.t()]} | {:error, String.t(), [Phase.t()]}
26+
@spec run(data_t, t) :: run_result_t
2427
def run(input, pipeline) do
2528
pipeline
2629
|> List.flatten()
2730
|> run_phase(input)
2831
end
2932

33+
@spec continue([Continuation.t()]) :: run_result_t
34+
def continue([continuation | rest]) do
35+
result = run_phase(continuation.pipeline, continuation.phase_input)
36+
37+
case result do
38+
{:ok, blueprint, phases} when rest == [] ->
39+
{:ok, blueprint, phases}
40+
{:ok, blueprint, phases} ->
41+
bp_result = Map.put(blueprint.result, :continuation, rest)
42+
blueprint = Map.put(blueprint, :result, bp_result)
43+
{:ok, blueprint, phases}
44+
error -> error
45+
end
46+
end
47+
3048
@defaults [
3149
adapter: Absinthe.Adapter.LanguageConventions,
3250
operation_name: nil,
@@ -388,8 +406,8 @@ defmodule Absinthe.Pipeline do
388406
end)
389407
end
390408

391-
@spec run_phase(t, data_t, [Phase.t()]) ::
392-
{:ok, data_t, [Phase.t()]} | {:error, String.t(), [Phase.t()]}
409+
@spec run_phase(t, data_t, [Phase.t()]) :: run_result_t
410+
393411
def run_phase(pipeline, input, done \\ [])
394412

395413
def run_phase([], input, done) do

lib/absinthe/subscription.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ defmodule Absinthe.Subscription do
4747

4848
@type subscription_field_spec :: {atom, term | (term -> term)}
4949

50+
@type prime_fun :: (-> {:ok, [map()]})
51+
5052
@doc """
5153
Publish a mutation
5254

0 commit comments

Comments
 (0)