retained messages

This commit is contained in:
Ery Lee 2015-01-16 15:48:33 +08:00
parent 953df5f9f7
commit eedfd41a45
2 changed files with 10 additions and 8 deletions

View File

@ -66,7 +66,7 @@ start_link() ->
route(Msg) ->
% need to retain?
emqtt_retained:retain(Message),
emqtt_retained:retain(Msg),
% unset flag and pubsub
emqtt_pubsub:publish( emqtt_message:unset_flag(Msg) ).

View File

@ -62,17 +62,17 @@ start_link(RetainOpts) ->
retain(#mqtt_message{ retain = false }) -> ignore;
%% RETAIN flag set to 1 and payload containing zero bytes
retain(Msg = #mqtt_message{ retain = true, topic = Topic, payload = <<>> }) ->
retain(#mqtt_message{ retain = true, topic = Topic, payload = <<>> }) ->
mnesia:dirty_delete(?RETAINED_TAB, Topic);
retain(Msg = #mqtt_message{retain = true}) ->
gen_server:cast(?SERVER, {retain, Msg}), Msg;
gen_server:cast(?SERVER, {retain, Msg}).
%%
subscribe(Topics, CPid) when is_pid(CPid) ->
RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)])
RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]),
lists:foreach(fun(Msg) ->
CPid ! {dispatch, {self(), retained_msg(Msg}}
CPid ! {dispatch, {self(), retained_msg(Msg)}}
end, RetainedMsgs).
%% ------------------------------------------------------------------
@ -91,11 +91,13 @@ init([RetainOpts]) ->
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({retain, Msg}, State = #state{store_limit = Limit}) ->
handle_cast({retain, Msg = #mqtt_message{ qos = Qos,
topic = Topic,
payload = Payload }}, State = #state{store_limit = Limit}) ->
case mnesia:table_info(?RETAINED_TAB, size) of
Size >= Limit ->
Size when Size >= Limit ->
lager:error("Server dropped message(retain) for table is full: ~p", [Msg]);
true ->
_ ->
lager:info("Server retained message: ~p", [Msg]),
mnesia:dirty_write(#mqtt_retained{ topic = Topic,
qos = Qos,