3030 is_stale /1 ,
3131 graft /1 ,
3232 exchange /1 ]).
33+
3334% % gen_server callbacks
3435-export ([init /1 ,
3536 handle_call /3 ,
4041
4142% % API
4243-export ([start_link /0 ,
43- read /1 ]).
44+ get /1 ,
45+ put /2 ]).
4446
4547-record (state , {}).
46- -type state () :: # state {}.
48+ -type state () :: # state {}.
4749
4850-spec start_link () -> ok .
4951start_link () ->
5052 {ok , _ } = gen_server :start_link ({local , ? SERVER }, ? MODULE ,
5153 [], []),
5254 ok .
5355
54- -spec read (Key :: any ()) -> {ok , any ()} | {error , not_found }.
55- read (Key ) ->
56- case ets :lookup (? MODULE , Key ) of
57- [{Key , Value }] ->
58- % lager:info("read key ~p: ~p",
59- % [Key, Value]),
60- {ok , Value };
61- _ ->
62- lager :info (" unable to find key: ~p " ,
63- [Key ]),
64- {error , not_found }
56+ -spec get (Key :: any ()) -> {error , not_found } | {ok , any ()}.
57+ get (Key ) ->
58+ case dbread (Key ) of
59+ undefined -> {error , not_found };
60+ Obj ->
61+ {ok , plumtree_test_object :value (Obj )}
6562 end .
6663
64+ -spec put (Key :: any (),
65+ Value :: any ()) -> ok .
66+ put (Key , Value ) ->
67+ Existing = dbread (Key ),
68+ UpdatedObj = plumtree_test_object :modify (Existing , Value , this_server_id ()),
69+ dbwrite (Key , UpdatedObj ),
70+ plumtree_broadcast :broadcast ({Key , UpdatedObj }, plumtree_test_broadcast_handler ),
71+ ok .
72+
6773% %%===================================================================
6874% %% gen_server callbacks
6975% %%===================================================================
7076
7177% % @private
7278-spec init ([[any ()], ...]) -> {ok , state ()}.
7379init ([]) ->
74- msgs_seen = ets :new (msgs_seen , [named_table , set , public ,
75- {keypos , 1 },
76- {read_concurrency , true }]),
7780 ? MODULE = ets :new (? MODULE , [named_table , set , public ,
7881 {keypos , 1 },
7982 {read_concurrency , true }]),
@@ -111,61 +114,50 @@ code_change(_OldVsn, State, _Extra) ->
111114
112115% % Return a two-tuple of message id and payload from a given broadcast
113116-spec broadcast_data (any ()) -> {any (), any ()}.
114- broadcast_data ({Key , _Value } = Data ) ->
115- MsgId = erlang : phash2 ( Data ) ,
117+ broadcast_data ({Key , Object } ) ->
118+ MsgId = { Key , plumtree_test_object : context ( Object )} ,
116119 lager :info (" broadcast_data(~p ), msg id: ~p " ,
117- [Data , MsgId ]),
118- true = ets :insert (msgs_seen , {MsgId , Key }),
119- true = ets :insert (? MODULE , Data ),
120- {MsgId , Data }.
120+ [Object , MsgId ]),
121+ {MsgId , Object }.
121122
122123% % Given the message id and payload, merge the message in the local state.
123124% % If the message has already been received return `false', otherwise return `true'
124125-spec merge (any (), any ()) -> boolean ().
125- merge (MsgId , {Key , _Value } = Payload ) ->
126- case ets :lookup (msgs_seen , MsgId ) of
127- [{MsgId , _ }] ->
128- lager :info (" msg with id ~p has already been seen" ,
129- [MsgId ]),
130- false ;
131- _ ->
132- lager :info (" merging(~p , ~p ) in local state" ,
133- [MsgId , Payload ]),
134- % % insert the message in the local state
135- true = ets :insert (? MODULE , Payload ),
136- % % mark this message as been seen
137- true = ets :insert_new (msgs_seen , {MsgId , Key }),
126+ merge ({Key , _Context } = MsgId , RemoteObj ) ->
127+ lager :info (" merge msg id ~p , object: ~p " ,
128+ [MsgId , RemoteObj ]),
129+ Existing = dbread (Key ),
130+ case plumtree_test_object :reconcile (RemoteObj , Existing ) of
131+ false -> false ;
132+ {true , Reconciled } ->
133+ dbwrite (Key , Reconciled ),
138134 true
139135 end .
140136
141137% % Return true if the message (given the message id) has already been received.
142138% % `false' otherwise
143139-spec is_stale (any ()) -> boolean ().
144- is_stale (MsgId ) ->
145- case ets :lookup (msgs_seen , MsgId ) of
146- [{MsgId , _ }] ->
147- lager :info (" is_stale(~p ): ~p " ,
148- [MsgId , true ]),
149- true ;
150- _ ->
151- lager :info (" is_stale(~p ): ~p " ,
152- [MsgId , false ]),
153- false
154- end .
140+ is_stale ({Key , Context }) ->
141+ Existing = dbread (Key ),
142+ plumtree_test_object :is_stale (Context , Existing ).
155143
156144% % Return the message associated with the given message id. In some cases a message
157145% % has already been sent with information that subsumes the message associated with the given
158146% % message id. In this case, `stale' is returned.
159147-spec graft (any ()) -> stale | {ok , any ()} | {error , any ()}.
160- graft (MsgId ) ->
161- % lager:info("graft(~p)",
162- % [MsgId]),
163- case ets :lookup (msgs_seen , MsgId ) of
164- [{MsgId , Key }] ->
165- [{Key ,Msg }] = ets :lookup (? MODULE , Key ),
166- {ok , {Key , Msg }};
167- _ ->
168- {error , not_found }
148+ graft ({Key , Context }) ->
149+ case dbread (Key ) of
150+ undefined ->
151+ % % this *really* should not happen
152+ lager :alert (" unable to graft key ~p , could not find it" ,
153+ [Key ]),
154+ {error , not_found };
155+ Object ->
156+ LocalContext = plumtree_test_object :context (Object ),
157+ case LocalContext =:= Context of
158+ true -> {ok , Object };
159+ false -> stale
160+ end
169161 end .
170162
171163% % Trigger an exchange between the local handler and the handler on the given node.
@@ -176,3 +168,24 @@ graft(MsgId) ->
176168-spec exchange (node ()) -> {ok , pid ()} | {error , term ()}.
177169exchange (_Node ) ->
178170 {ok , self ()}.
171+
172+ % % @private
173+ -spec dbread (Key :: any ()) -> any () | undefined .
174+ dbread (Key ) ->
175+ case ets :lookup (? MODULE , Key ) of
176+ [{Key , Object }] ->
177+ Object ;
178+ _ ->
179+ undefined
180+ end .
181+
182+ % % @private
183+ -spec dbwrite (Key :: any (),
184+ Value :: any ()) -> any ().
185+ dbwrite (Key , Object ) ->
186+ ets :insert (? MODULE , {Key , Object }),
187+ Object .
188+
189+ % % @private
190+ this_server_id () -> node ().
191+
0 commit comments