@@ -120,6 +120,8 @@ start_link() ->
120120 PeerService = application :get_env (plumtree , peer_service , partisan_peer_service ),
121121 {ok , Members } = PeerService :members (),
122122 {InitEagers , InitLazys } = init_peers (Members ),
123+ plumtree_util :log (debug , " init peers, eager: ~p , lazy: ~p " ,
124+ [InitEagers , InitLazys ]),
123125 Mods = app_helper :get_env (plumtree , broadcast_mods , []),
124126 Res = start_link (Members , InitEagers , InitLazys , Mods ),
125127 PeerService :add_sup_callback (fun ? MODULE :update /1 ),
@@ -256,57 +258,51 @@ handle_call({cancel_exchanges, WhichExchanges}, _From, State) ->
256258% % @private
257259-spec handle_cast (term (), state ()) -> {noreply , state ()}.
258260handle_cast ({broadcast , MessageId , Message , Mod }, State ) ->
259- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
260- % lager:info("broadcast/3 messaged processed; messages remaining: ~p",
261- % [MessageQueueLen]),
261+ plumtree_util :log (debug , " received {broadcast, ~p , Msg, ~p }" ,
262+ [MessageId , Mod ]),
262263 State1 = eager_push (MessageId , Message , Mod , State ),
263264 State2 = schedule_lazy_push (MessageId , Mod , State1 ),
264265 {noreply , State2 };
265266handle_cast ({broadcast , MessageId , Message , Mod , Round , Root , From }, State ) ->
266- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
267- % lager:info("broadcast/6 messaged processed; messages remaining: ~p",
268- % [MessageQueueLen]),
267+ plumtree_util :log (debug , " received {broadcast, ~p , Msg, ~p , ~p , ~p , ~p }" ,
268+ [MessageId , Mod , Round , Root , From ]),
269269 Valid = Mod :merge (MessageId , Message ),
270270 State1 = handle_broadcast (Valid , MessageId , Message , Mod , Round , Root , From , State ),
271271 {noreply , State1 };
272272handle_cast ({prune , Root , From }, State ) ->
273- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
274- % lager:info("prune/2 messaged processed; messages remaining: ~p",
275- % [MessageQueueLen]),
273+ plumtree_util :log (debug , " received ~p " , [{prune , Root , From }]),
274+ plumtree_util :log (debug , " moving peer ~p from eager to lazy" , [From ]),
276275 State1 = add_lazy (From , Root , State ),
277276 {noreply , State1 };
278277handle_cast ({i_have , MessageId , Mod , Round , Root , From }, State ) ->
279- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
280- % lager:info("i_have/5 messaged processed; messages remaining: ~p",
281- % [MessageQueueLen]),
278+ plumtree_util :log (debug , " received ~p " , [{i_have , MessageId , Mod , Round , Root , From }]),
282279 Stale = Mod :is_stale (MessageId ),
283280 State1 = handle_ihave (Stale , MessageId , Mod , Round , Root , From , State ),
284281 {noreply , State1 };
285282handle_cast ({ignored_i_have , MessageId , Mod , Round , Root , From }, State ) ->
286- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
287- % lager:info("ignored_i_have/5 messaged processed; messages remaining: ~p",
288- % [MessageQueueLen]),
283+ plumtree_util :log (debug , " received ~p " , [{ignored_i_have , MessageId , Mod , Round , Root , From }]),
289284 State1 = ack_outstanding (MessageId , Mod , Round , Root , From , State ),
290285 {noreply , State1 };
291286handle_cast ({graft , MessageId , Mod , Round , Root , From }, State ) ->
292- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
293- % lager:info("graft/5 messaged processed; messages remaining: ~p",
294- % [MessageQueueLen]),
287+ plumtree_util :log (debug , " received ~p " , [{graft , MessageId , Mod , Round , Root , From }]),
295288 Result = Mod :graft (MessageId ),
289+ plumtree_util :log (debug , " graft(~p ): ~p " , [MessageId , Result ]),
296290 State1 = handle_graft (Result , MessageId , Mod , Round , Root , From , State ),
297291 {noreply , State1 };
298292handle_cast ({update , Members }, State = # state {all_members = BroadcastMembers }) ->
299- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
300- % lager:info("update/1 messaged processed; messages remaining: ~p",
301- % [MessageQueueLen]),
293+ plumtree_util :log (debug , " received ~p " , [{update , Members }]),
302294 CurrentMembers = ordsets :from_list (Members ),
303295 New = ordsets :subtract (CurrentMembers , BroadcastMembers ),
304296 Removed = ordsets :subtract (BroadcastMembers , CurrentMembers ),
297+ plumtree_util :log (debug , " new members: ~p " , [ordsets :to_list (New )]),
298+ plumtree_util :log (debug , " removed members: ~p " , [ordsets :to_list (Removed )]),
305299 State1 = case ordsets :size (New ) > 0 of
306300 false ->
307301 State ;
308302 true ->
309303 {EagerPeers , LazyPeers } = init_peers (CurrentMembers ),
304+ plumtree_util :log (debug , " new peers, eager: ~p , lazy: ~p " ,
305+ [EagerPeers , LazyPeers ]),
310306 reset_peers (CurrentMembers , EagerPeers , LazyPeers , State )
311307 end ,
312308 State2 = neighbors_down (Removed , State1 ),
@@ -341,10 +337,13 @@ code_change(_OldVsn, State, _Extra) ->
341337% %% Internal functions
342338% %%===================================================================
343339handle_broadcast (false , _MessageId , _Message , Mod , _Round , Root , From , State ) -> % % stale msg
340+ % % remove sender from eager and set as lazy
341+ plumtree_util :log (debug , " moving peer ~p from eager to lazy" , [From ]),
344342 State1 = add_lazy (From , Root , State ),
345343 _ = send ({prune , Root , myself ()}, Mod , From ),
346344 State1 ;
347345handle_broadcast (true , MessageId , Message , Mod , Round , Root , From , State ) -> % % valid msg
346+ % % remove sender from lazy and set as eager
348347 State1 = add_eager (From , Root , State ),
349348 State2 = eager_push (MessageId , Message , Mod , Round + 1 , Root , From , State1 ),
350349 schedule_lazy_push (MessageId , Mod , Round + 1 , Root , From , State2 ).
@@ -399,6 +398,7 @@ eager_push(MessageId, Message, Mod, State) ->
399398
400399eager_push (MessageId , Message , Mod , Round , Root , From , State ) ->
401400 Peers = eager_peers (Root , From , State ),
401+ plumtree_util :log (debug , " eager push to peers: ~p " , [Peers ]),
402402 _ = send ({broadcast , MessageId , Message , Mod , Round , Root , myself ()}, Mod , Peers ),
403403 State .
404404
@@ -407,6 +407,8 @@ schedule_lazy_push(MessageId, Mod, State) ->
407407
408408schedule_lazy_push (MessageId , Mod , Round , Root , From , State ) ->
409409 Peers = lazy_peers (Root , From , State ),
410+ plumtree_util :log (debug , " scheduling lazy push to peers ~p : ~p " ,
411+ [Peers , {MessageId , Mod , Round , Root , From }]),
410412 add_all_outstanding (MessageId , Mod , Round , Root , Peers , State ).
411413
412414send_lazy (# state {outstanding = Outstanding }) ->
@@ -417,6 +419,8 @@ send_lazy(Peer, Messages) ->
417419 {MessageId , Mod , Round , Root } <- ordsets :to_list (Messages )].
418420
419421send_lazy (MessageId , Mod , Round , Root , Peer ) ->
422+ plumtree_util :log (debug , " sending lazy push ~p " ,
423+ [{i_have , MessageId , Mod , Round , Root , myself ()}]),
420424 send ({i_have , MessageId , Mod , Round , Root , myself ()}, Mod , Peer ).
421425
422426maybe_exchange (State ) ->
@@ -443,7 +447,7 @@ maybe_exchange(_Peer, State=#state{mods=[]}) ->
443447exchange (Peer , State = # state {mods = [Mod | Mods ], exchanges = Exchanges }) ->
444448 State1 = case Mod :exchange (Peer ) of
445449 {ok , Pid } ->
446- lager : debug ( " started ~p exchange with ~p (~p )" , [Mod , Peer , Pid ]),
450+ plumtree_util : log ( debug , " started ~p exchange with ~p (~p )" , [Mod , Peer , Pid ]),
447451 Ref = monitor (process , Pid ),
448452 State # state {exchanges = [{Mod , Peer , Ref , Pid } | Exchanges ]};
449453 {error , _Reason } ->
0 commit comments