From eedfd41a4562bb8410388aea2b665503afd16b02 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Fri, 16 Jan 2015 15:48:33 +0800 Subject: [PATCH] retained messages --- apps/emqtt/src/emqtt_router.erl | 2 +- apps/emqtt/src/emqtt_server.erl | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl index 9da53ceb7..3026f98fa 100644 --- a/apps/emqtt/src/emqtt_router.erl +++ b/apps/emqtt/src/emqtt_router.erl @@ -66,7 +66,7 @@ start_link() -> route(Msg) -> % need to retain? - emqtt_retained:retain(Message), + emqtt_retained:retain(Msg), % unset flag and pubsub emqtt_pubsub:publish( emqtt_message:unset_flag(Msg) ). diff --git a/apps/emqtt/src/emqtt_server.erl b/apps/emqtt/src/emqtt_server.erl index b4cf8ada4..f61993784 100644 --- a/apps/emqtt/src/emqtt_server.erl +++ b/apps/emqtt/src/emqtt_server.erl @@ -62,17 +62,17 @@ start_link(RetainOpts) -> retain(#mqtt_message{ retain = false }) -> ignore; %% RETAIN flag set to 1 and payload containing zero bytes -retain(Msg = #mqtt_message{ retain = true, topic = Topic, payload = <<>> }) -> +retain(#mqtt_message{ retain = true, topic = Topic, payload = <<>> }) -> mnesia:dirty_delete(?RETAINED_TAB, Topic); retain(Msg = #mqtt_message{retain = true}) -> - gen_server:cast(?SERVER, {retain, Msg}), Msg; + gen_server:cast(?SERVER, {retain, Msg}). %% subscribe(Topics, CPid) when is_pid(CPid) -> - RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]) + RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]), lists:foreach(fun(Msg) -> - CPid ! {dispatch, {self(), retained_msg(Msg}} + CPid ! {dispatch, {self(), retained_msg(Msg)}} end, RetainedMsgs). %% ------------------------------------------------------------------ @@ -91,11 +91,13 @@ init([RetainOpts]) -> handle_call(_Request, _From, State) -> {reply, ok, State}. -handle_cast({retain, Msg}, State = #state{store_limit = Limit}) -> +handle_cast({retain, Msg = #mqtt_message{ qos = Qos, + topic = Topic, + payload = Payload }}, State = #state{store_limit = Limit}) -> case mnesia:table_info(?RETAINED_TAB, size) of - Size >= Limit -> + Size when Size >= Limit -> lager:error("Server dropped message(retain) for table is full: ~p", [Msg]); - true -> + _ -> lager:info("Server retained message: ~p", [Msg]), mnesia:dirty_write(#mqtt_retained{ topic = Topic, qos = Qos,