2323
2424% % API
2525-export ([start_link /0 ,
26- start_link /4 ,
26+ start_link /5 ,
2727 broadcast /2 ,
2828 update /1 ,
2929 broadcast_members /0 ,
9999
100100 % % Set of all known members. Used to determine
101101 % % which members have joined and left during a membership update
102- all_members :: ordsets :ordset (nodename ()) | undefined
102+ all_members :: ordsets :ordset (nodename ()) | undefined ,
103+
104+ % % Lazy tick period in milliseconds. On every tick all outstanding
105+ % % lazy pushes are sent out
106+ lazy_tick_period :: non_neg_integer (),
107+
108+ % % Exchange tick period in milliseconds that may or may not occur
109+ exchange_tick_period :: non_neg_integer ()
110+
103111 }).
104112-type state () :: # state {}.
105113
117125% % to generate membership updates as the ring changes.
118126-spec start_link () -> {ok , pid ()} | ignore | {error , term ()}.
119127start_link () ->
128+ LazyTickPeriod = application :get_env (plumtree , lazy_tick_period ,
129+ ? DEFAULT_LAZY_TICK_PERIOD ),
130+ ExchangeTickPeriod = application :get_env (plumtree , exchange_tick_period ,
131+ ? DEFAULT_EXCHANGE_TICK_PERIOD ),
120132 PeerService = application :get_env (plumtree , peer_service , partisan_peer_service ),
121133 {ok , Members } = PeerService :members (),
122134 plumtree_util :log (debug , " peer sampling service members: ~p " , [Members ]),
@@ -128,7 +140,9 @@ start_link() ->
128140 plumtree_util :log (debug , " init peers, eager: ~p , lazy: ~p " ,
129141 [InitEagers , InitLazys ]),
130142 Mods = app_helper :get_env (plumtree , broadcast_mods , []),
131- Res = start_link (Members , InitEagers , InitLazys , Mods ),
143+ Res = start_link (Members , InitEagers , InitLazys , Mods ,
144+ [{lazy_tick_period , LazyTickPeriod },
145+ {exchange_tick_period , ExchangeTickPeriod }]),
132146 PeerService :add_sup_callback (fun ? MODULE :update /1 ),
133147 Res .
134148
@@ -141,14 +155,20 @@ start_link() ->
141155% % `InitEagers' and `InitLazys' must also be subsets of `InitMembers'. `Mods' is
142156% % a list of modules that may be handlers for broadcasted messages. All modules in
143157% % `Mods' should implement the `plumtree_broadcast_handler' behaviour.
158+ % % `Opts' is a proplist with the following possible options:
159+ % % Flush all outstanding lazy pushes period (in milliseconds)
160+ % % {`lazy_tick_period', non_neg_integer()}
161+ % % Possibly perform an exchange period (in milliseconds)
162+ % % {`exchange_tick_period', non_neg_integer()}
144163% %
145164% % NOTE: When starting the server using start_link/2 no automatic membership update from
146165% % ring_events is registered. Use start_link/0.
147- -spec start_link ([nodename ()], [nodename ()], [nodename ()], [module ()]) ->
166+ -spec start_link ([nodename ()], [nodename ()], [nodename ()], [module ()],
167+ proplists :proplist ()) ->
148168 {ok , pid ()} | ignore | {error , term ()}.
149- start_link (InitMembers , InitEagers , InitLazys , Mods ) ->
169+ start_link (InitMembers , InitEagers , InitLazys , Mods , Opts ) ->
150170 gen_server :start_link ({local , ? SERVER }, ? MODULE ,
151- [InitMembers , InitEagers , InitLazys , Mods ], []).
171+ [InitMembers , InitEagers , InitLazys , Mods , Opts ], []).
152172
153173% % @doc Broadcasts a message originating from this node. The message will be delivered to
154174% % each node at least once. The `Mod' passed is responsible for handling the message on remote
@@ -235,13 +255,17 @@ debug_get_tree(Root, Nodes) ->
235255
236256% % @private
237257-spec init ([[any ()], ...]) -> {ok , state ()}.
238- init ([AllMembers , InitEagers , InitLazys , Mods ]) ->
239- schedule_lazy_tick (),
240- schedule_exchange_tick (),
258+ init ([AllMembers , InitEagers , InitLazys , Mods , Opts ]) ->
259+ LazyTickPeriod = proplists :get_value (lazy_tick_period , Opts ),
260+ ExchangeTickPeriod = proplists :get_value (exchange_tick_period , Opts ),
261+ schedule_lazy_tick (LazyTickPeriod ),
262+ schedule_exchange_tick (ExchangeTickPeriod ),
241263 State1 = # state {
242264 outstanding = orddict :new (),
243265 mods = lists :usort (Mods ),
244- exchanges = []
266+ exchanges = [],
267+ lazy_tick_period = LazyTickPeriod ,
268+ exchange_tick_period = ExchangeTickPeriod
245269 },
246270 State2 = reset_peers (AllMembers , InitEagers , InitLazys , State1 ),
247271 {ok , State2 }.
@@ -321,13 +345,15 @@ handle_cast({update, Members}, State=#state{all_members=BroadcastMembers,
321345% % @private
322346-spec handle_info ('exchange_tick' | 'lazy_tick' | {'DOWN' , _ , 'process' , _ , _ }, state ()) ->
323347 {noreply , state ()}.
324- handle_info (lazy_tick , State ) ->
325- schedule_lazy_tick (),
348+ handle_info (lazy_tick ,
349+ # state { lazy_tick_period = Period } = State ) ->
326350 _ = send_lazy (State ),
351+ schedule_lazy_tick (Period ),
327352 {noreply , State };
328- handle_info (exchange_tick , State ) ->
329- schedule_exchange_tick (),
353+ handle_info (exchange_tick ,
354+ # state { exchange_tick_period = Period } = State ) ->
330355 State1 = maybe_exchange (State ),
356+ schedule_exchange_tick (Period ),
331357 {noreply , State1 };
332358handle_info ({'DOWN' , Ref , process , _Pid , _Reason }, State = # state {exchanges = Exchanges }) ->
333359 Exchanges1 = lists :keydelete (Ref , 3 , Exchanges ),
@@ -623,11 +649,11 @@ send(Msg, Mod, P) ->
623649 % % TODO: add debug logging
624650 % % gen_server:cast({?SERVER, P}, Msg).
625651
626- schedule_lazy_tick () ->
627- schedule_tick (lazy_tick , broadcast_lazy_timer , 1000 ).
652+ schedule_lazy_tick (Period ) ->
653+ schedule_tick (lazy_tick , broadcast_lazy_timer , Period ).
628654
629- schedule_exchange_tick () ->
630- schedule_tick (exchange_tick , broadcast_exchange_timer , 10000 ).
655+ schedule_exchange_tick (Period ) ->
656+ schedule_tick (exchange_tick , broadcast_exchange_timer , Period ).
631657
632658schedule_tick (Message , Timer , Default ) ->
633659 TickMs = app_helper :get_env (plumtree , Timer , Default ),
0 commit comments