101101 % % which members have joined and left during a membership update
102102 all_members :: ordsets :ordset (nodename ()) | undefined
103103 }).
104+ -type state () :: # state {}.
104105
105106% %%===================================================================
106107% %% API
118119start_link () ->
119120 PeerService = application :get_env (plumtree , peer_service , partisan_peer_service ),
120121 {ok , Members } = PeerService :members (),
121- {InitEagers , InitLazys } = init_peers (Members ),
122+ plumtree_util :log (debug , " peer sampling service members: ~p " , [Members ]),
123+ % % the peer service has already sampled the members, we start off
124+ % % with pure gossip (ie. all members are in the eager push list and lazy
125+ % % list is empty)
126+ InitEagers = Members ,
127+ InitLazys = [],
128+ plumtree_util :log (debug , " init peers, eager: ~p , lazy: ~p " ,
129+ [InitEagers , InitLazys ]),
122130 Mods = app_helper :get_env (plumtree , broadcast_mods , []),
123131 Res = start_link (Members , InitEagers , InitLazys , Mods ),
124132 PeerService :add_sup_callback (fun ? MODULE :update /1 ),
@@ -226,7 +234,7 @@ debug_get_tree(Root, Nodes) ->
226234% %%===================================================================
227235
228236% % @private
229- -spec init ([[any ()],...]) -> {ok , # state {} }.
237+ -spec init ([[any ()], ...]) -> {ok , state () }.
230238init ([AllMembers , InitEagers , InitLazys , Mods ]) ->
231239 schedule_lazy_tick (),
232240 schedule_exchange_tick (),
@@ -239,7 +247,7 @@ init([AllMembers, InitEagers, InitLazys, Mods]) ->
239247 {ok , State2 }.
240248
241249% % @private
242- -spec handle_call (term (), {pid (), term ()}, # state {}) -> {reply , term (), # state {} }.
250+ -spec handle_call (term (), {pid (), term ()}, state ()) -> {reply , term (), state () }.
243251handle_call ({get_peers , Root }, _From , State ) ->
244252 EagerPeers = all_peers (Root , State # state .eager_sets , State # state .common_eagers ),
245253 LazyPeers = all_peers (Root , State # state .lazy_sets , State # state .common_lazys ),
@@ -253,67 +261,66 @@ handle_call({cancel_exchanges, WhichExchanges}, _From, State) ->
253261 {reply , Cancelled , State }.
254262
255263% % @private
256- -spec handle_cast (term (), # state {}) -> {noreply , # state {} }.
264+ -spec handle_cast (term (), state ()) -> {noreply , state () }.
257265handle_cast ({broadcast , MessageId , Message , Mod }, State ) ->
258- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
259- % lager:info("broadcast/3 messaged processed; messages remaining: ~p",
260- % [MessageQueueLen]),
266+ plumtree_util :log (debug , " received {broadcast, ~p , Msg, ~p }" ,
267+ [MessageId , Mod ]),
261268 State1 = eager_push (MessageId , Message , Mod , State ),
262269 State2 = schedule_lazy_push (MessageId , Mod , State1 ),
263270 {noreply , State2 };
264271handle_cast ({broadcast , MessageId , Message , Mod , Round , Root , From }, State ) ->
265- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
266- % lager:info("broadcast/6 messaged processed; messages remaining: ~p",
267- % [MessageQueueLen]),
272+ plumtree_util :log (debug , " received {broadcast, ~p , Msg, ~p , ~p , ~p , ~p }" ,
273+ [MessageId , Mod , Round , Root , From ]),
268274 Valid = Mod :merge (MessageId , Message ),
269275 State1 = handle_broadcast (Valid , MessageId , Message , Mod , Round , Root , From , State ),
270276 {noreply , State1 };
271277handle_cast ({prune , Root , From }, State ) ->
272- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
273- % lager:info("prune/2 messaged processed; messages remaining: ~p",
274- % [MessageQueueLen]),
278+ plumtree_util :log (debug , " received ~p " , [{prune , Root , From }]),
279+ plumtree_util :log (debug , " moving peer ~p from eager to lazy" , [From ]),
275280 State1 = add_lazy (From , Root , State ),
276281 {noreply , State1 };
277282handle_cast ({i_have , MessageId , Mod , Round , Root , From }, State ) ->
278- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
279- % lager:info("i_have/5 messaged processed; messages remaining: ~p",
280- % [MessageQueueLen]),
283+ plumtree_util :log (debug , " received ~p " , [{i_have , MessageId , Mod , Round , Root , From }]),
281284 Stale = Mod :is_stale (MessageId ),
282285 State1 = handle_ihave (Stale , MessageId , Mod , Round , Root , From , State ),
283286 {noreply , State1 };
284287handle_cast ({ignored_i_have , MessageId , Mod , Round , Root , From }, State ) ->
285- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
286- % lager:info("ignored_i_have/5 messaged processed; messages remaining: ~p",
287- % [MessageQueueLen]),
288+ plumtree_util :log (debug , " received ~p " , [{ignored_i_have , MessageId , Mod , Round , Root , From }]),
288289 State1 = ack_outstanding (MessageId , Mod , Round , Root , From , State ),
289290 {noreply , State1 };
290291handle_cast ({graft , MessageId , Mod , Round , Root , From }, State ) ->
291- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
292- % lager:info("graft/5 messaged processed; messages remaining: ~p",
293- % [MessageQueueLen]),
292+ plumtree_util :log (debug , " received ~p " , [{graft , MessageId , Mod , Round , Root , From }]),
294293 Result = Mod :graft (MessageId ),
294+ plumtree_util :log (debug , " graft(~p ): ~p " , [MessageId , Result ]),
295295 State1 = handle_graft (Result , MessageId , Mod , Round , Root , From , State ),
296296 {noreply , State1 };
297- handle_cast ({update , Members }, State = # state {all_members = BroadcastMembers }) ->
298- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len) ,
299- % lager:info("update/1 messaged processed; messages remaining: ~p",
300- % [MessageQueueLen ]),
297+ handle_cast ({update , Members }, State = # state {all_members = BroadcastMembers ,
298+ common_eagers = EagerPeers0 ,
299+ common_lazys = LazyPeers }) ->
300+ plumtree_util : log ( debug , " received ~p " , [{ update , Members } ]),
301301 CurrentMembers = ordsets :from_list (Members ),
302302 New = ordsets :subtract (CurrentMembers , BroadcastMembers ),
303303 Removed = ordsets :subtract (BroadcastMembers , CurrentMembers ),
304+ plumtree_util :log (debug , " new members: ~p " , [ordsets :to_list (New )]),
305+ plumtree_util :log (debug , " removed members: ~p " , [ordsets :to_list (Removed )]),
304306 State1 = case ordsets :size (New ) > 0 of
305307 false ->
306308 State ;
307309 true ->
308- {EagerPeers , LazyPeers } = init_peers (CurrentMembers ),
310+ % % as per the paper (page 9):
311+ % % "When a new member is detected, it is simply added to the set
312+ % % of eagerPushPeers"
313+ EagerPeers = ordsets :union (EagerPeers0 , New ),
314+ plumtree_util :log (debug , " new peers, eager: ~p , lazy: ~p " ,
315+ [EagerPeers , LazyPeers ]),
309316 reset_peers (CurrentMembers , EagerPeers , LazyPeers , State )
310317 end ,
311318 State2 = neighbors_down (Removed , State1 ),
312319 {noreply , State2 }.
313320
314321% % @private
315- -spec handle_info ('exchange_tick' | 'lazy_tick' | {'DOWN' , _ , 'process' , _ , _ }, # state {} ) ->
316- {noreply , # state {} }.
322+ -spec handle_info ('exchange_tick' | 'lazy_tick' | {'DOWN' , _ , 'process' , _ , _ }, state () ) ->
323+ {noreply , state () }.
317324handle_info (lazy_tick , State ) ->
318325 schedule_lazy_tick (),
319326 _ = send_lazy (State ),
@@ -327,23 +334,26 @@ handle_info({'DOWN', Ref, process, _Pid, _Reason}, State=#state{exchanges=Exchan
327334 {noreply , State # state {exchanges = Exchanges1 }}.
328335
329336% % @private
330- -spec terminate (term (), # state {} ) -> term ().
337+ -spec terminate (term (), state () ) -> term ().
331338terminate (_Reason , _State ) ->
332339 ok .
333340
334341% % @private
335- -spec code_change (term () | {down , term ()}, # state {} , term ()) -> {ok , # state {} }.
342+ -spec code_change (term () | {down , term ()}, state () , term ()) -> {ok , state () }.
336343code_change (_OldVsn , State , _Extra ) ->
337344 {ok , State }.
338345
339346% %%===================================================================
340347% %% Internal functions
341348% %%===================================================================
342349handle_broadcast (false , _MessageId , _Message , Mod , _Round , Root , From , State ) -> % % stale msg
350+ % % remove sender from eager and set as lazy
351+ plumtree_util :log (debug , " moving peer ~p from eager to lazy" , [From ]),
343352 State1 = add_lazy (From , Root , State ),
344353 _ = send ({prune , Root , myself ()}, Mod , From ),
345354 State1 ;
346355handle_broadcast (true , MessageId , Message , Mod , Round , Root , From , State ) -> % % valid msg
356+ % % remove sender from lazy and set as eager
347357 State1 = add_eager (From , Root , State ),
348358 State2 = eager_push (MessageId , Message , Mod , Round + 1 , Root , From , State1 ),
349359 schedule_lazy_push (MessageId , Mod , Round + 1 , Root , From , State2 ).
@@ -372,8 +382,8 @@ handle_graft({error, Reason}, _MessageId, Mod, _Round, _Root, _From, State) ->
372382 lager :error (" unable to graft message from ~p . reason: ~p " , [Mod , Reason ]),
373383 State .
374384
375- neighbors_down (Removed , State = # state {common_eagers = CommonEagers ,eager_sets = EagerSets ,
376- common_lazys = CommonLazys ,lazy_sets = LazySets ,
385+ neighbors_down (Removed , State = # state {common_eagers = CommonEagers , eager_sets = EagerSets ,
386+ common_lazys = CommonLazys , lazy_sets = LazySets ,
377387 outstanding = Outstanding }) ->
378388 NewCommonEagers = ordsets :subtract (CommonEagers , Removed ),
379389 NewCommonLazys = ordsets :subtract (CommonLazys , Removed ),
@@ -398,6 +408,7 @@ eager_push(MessageId, Message, Mod, State) ->
398408
399409eager_push (MessageId , Message , Mod , Round , Root , From , State ) ->
400410 Peers = eager_peers (Root , From , State ),
411+ plumtree_util :log (debug , " eager push to peers: ~p " , [Peers ]),
401412 _ = send ({broadcast , MessageId , Message , Mod , Round , Root , myself ()}, Mod , Peers ),
402413 State .
403414
@@ -406,6 +417,8 @@ schedule_lazy_push(MessageId, Mod, State) ->
406417
407418schedule_lazy_push (MessageId , Mod , Round , Root , From , State ) ->
408419 Peers = lazy_peers (Root , From , State ),
420+ plumtree_util :log (debug , " scheduling lazy push to peers ~p : ~p " ,
421+ [Peers , {MessageId , Mod , Round , Root , From }]),
409422 add_all_outstanding (MessageId , Mod , Round , Root , Peers , State ).
410423
411424send_lazy (# state {outstanding = Outstanding }) ->
@@ -416,6 +429,8 @@ send_lazy(Peer, Messages) ->
416429 {MessageId , Mod , Round , Root } <- ordsets :to_list (Messages )].
417430
418431send_lazy (MessageId , Mod , Round , Root , Peer ) ->
432+ plumtree_util :log (debug , " sending lazy push ~p " ,
433+ [{i_have , MessageId , Mod , Round , Root , myself ()}]),
419434 send ({i_have , MessageId , Mod , Round , Root , myself ()}, Mod , Peer ).
420435
421436maybe_exchange (State ) ->
@@ -425,7 +440,7 @@ maybe_exchange(State) ->
425440
426441maybe_exchange (undefined , State ) ->
427442 State ;
428- maybe_exchange (Peer , State = # state {mods = [Mod | _ ],exchanges = Exchanges }) ->
443+ maybe_exchange (Peer , State = # state {mods = [Mod | _ ], exchanges = Exchanges }) ->
429444 % % limit the number of exchanges this node can start concurrently.
430445 % % the exchange must (currently?) implement any "inbound" concurrency limits
431446 ExchangeLimit = app_helper :get_env (plumtree , broadcast_start_exchange_limit , 1 ),
@@ -439,10 +454,10 @@ maybe_exchange(_Peer, State=#state{mods=[]}) ->
439454 % % No registered handler.
440455 State .
441456
442- exchange (Peer , State = # state {mods = [Mod | Mods ],exchanges = Exchanges }) ->
457+ exchange (Peer , State = # state {mods = [Mod | Mods ], exchanges = Exchanges }) ->
443458 State1 = case Mod :exchange (Peer ) of
444459 {ok , Pid } ->
445- lager : debug ( " started ~p exchange with ~p (~p )" , [Mod , Peer , Pid ]),
460+ plumtree_util : log ( debug , " started ~p exchange with ~p (~p )" , [Mod , Peer , Pid ]),
446461 Ref = monitor (process , Pid ),
447462 State # state {exchanges = [{Mod , Peer , Ref , Pid } | Exchanges ]};
448463 {error , _Reason } ->
@@ -569,7 +584,7 @@ update_peers(From, Root, EagerUpdate, LazyUpdate, State) ->
569584 NewLazys = LazyUpdate (From , CurrentLazys ),
570585 set_peers (Root , NewEagers , NewLazys , State ).
571586
572- set_peers (Root , Eagers , Lazys , State = # state {eager_sets = EagerSets ,lazy_sets = LazySets }) ->
587+ set_peers (Root , Eagers , Lazys , State = # state {eager_sets = EagerSets , lazy_sets = LazySets }) ->
573588 NewEagers = orddict :store (Root , Eagers , EagerSets ),
574589 NewLazys = orddict :store (Root , Lazys , LazySets ),
575590 State # state {eager_sets = NewEagers , lazy_sets = NewLazys }.
@@ -627,38 +642,6 @@ reset_peers(AllMembers, EagerPeers, LazyPeers, State) ->
627642 all_members = ordsets :from_list (AllMembers )
628643 }.
629644
630- init_peers (Members ) ->
631- case length (Members ) of
632- 1 ->
633- % % Single member, must be ourselves
634- InitEagers = [],
635- InitLazys = [];
636- 2 ->
637- % % Two members, just eager push to the other
638- InitEagers = Members -- [myself ()],
639- InitLazys = [];
640- N when N < 5 ->
641- % % 2 to 4 members, start with a fully connected tree
642- % % with cycles. it will be adjusted as needed
643- Tree = plumtree_util :build_tree (1 , Members , [cycles ]),
644- InitEagers = orddict :fetch (myself (), Tree ),
645- InitLazys = [lists :nth (rand_compat :uniform (N - 2 ), Members -- [myself () | InitEagers ])];
646- N when N < 10 ->
647- % % 5 to 9 members, start with gossip tree used by
648- % % riak_core_gossip. it will be adjusted as needed
649- Tree = plumtree_util :build_tree (2 , Members , [cycles ]),
650- InitEagers = orddict :fetch (myself (), Tree ),
651- InitLazys = [lists :nth (rand_compat :uniform (N - 3 ), Members -- [myself () | InitEagers ])];
652- N ->
653- % % 10 or more members, use a tree similar to riak_core_gossip
654- % % but with higher fanout (larger initial eager set size)
655- NEagers = round (math :log (N ) + 1 ),
656- Tree = plumtree_util :build_tree (NEagers , Members , [cycles ]),
657- InitEagers = orddict :fetch (myself (), Tree ),
658- InitLazys = [lists :nth (rand_compat :uniform (N - (NEagers + 1 )), Members -- [myself () | InitEagers ])]
659- end ,
660- {InitEagers , InitLazys }.
661-
662645% % @private
663646myself () ->
664647 node ().
0 commit comments