From 953df5f9f71218a08fe3f321637b8b144a2e2123 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Fri, 16 Jan 2015 15:45:15 +0800 Subject: [PATCH] retained messages --- apps/emqtt/src/emqtt_router.erl | 12 +-- apps/emqtt/src/emqtt_server.erl | 127 +++++++++++++++++++++++++++++++ apps/emqtt/src/emqtt_session.erl | 2 + apps/emqtt/src/x.erl | 50 ++++++++++++ rel/files/app.config | 2 +- 5 files changed, 186 insertions(+), 7 deletions(-) create mode 100644 apps/emqtt/src/emqtt_server.erl create mode 100644 apps/emqtt/src/x.erl diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl index 09eeaf0ce..9da53ceb7 100644 --- a/apps/emqtt/src/emqtt_router.erl +++ b/apps/emqtt/src/emqtt_router.erl @@ -64,18 +64,18 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -route(Message) -> - %%TODO: hooks later. - emqtt_pubsub:publish( - emqtt_message:unset_flag( - emqtt_retained:retain(Message))). +route(Msg) -> + % need to retain? + emqtt_retained:retain(Message), + % unset flag and pubsub + emqtt_pubsub:publish( emqtt_message:unset_flag(Msg) ). %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ init(Args) -> - {ok, Args}. + {ok, Args, hibernate}. handle_call(_Request, _From, State) -> {reply, ok, State}. diff --git a/apps/emqtt/src/emqtt_server.erl b/apps/emqtt/src/emqtt_server.erl new file mode 100644 index 000000000..b4cf8ada4 --- /dev/null +++ b/apps/emqtt/src/emqtt_server.erl @@ -0,0 +1,127 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2012-2015, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ + +-module(emqtt_server). + +-author('feng@slimpp.io'). + +-include("emqtt.hrl"). + +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + +-define(RETAINED_TAB, mqtt_retained). + +-define(STORE_LIMIT, 100000). + +-record(mqtt_retained, {topic, qos, payload}). + +-record(state, {store_limit}). + +%% ------------------------------------------------------------------ +%% API Function Exports +%% ------------------------------------------------------------------ + +%%TODO: subscribe +-export([start_link/1, retain/1, subscribe/2]). + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% ------------------------------------------------------------------ +%% API Function Definitions +%% ------------------------------------------------------------------ + +start_link(RetainOpts) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [RetainOpts], []). + +retain(#mqtt_message{ retain = false }) -> ignore; + +%% RETAIN flag set to 1 and payload containing zero bytes +retain(Msg = #mqtt_message{ retain = true, topic = Topic, payload = <<>> }) -> + mnesia:dirty_delete(?RETAINED_TAB, Topic); + +retain(Msg = #mqtt_message{retain = true}) -> + gen_server:cast(?SERVER, {retain, Msg}), Msg; + +%% +subscribe(Topics, CPid) when is_pid(CPid) -> + RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]) + lists:foreach(fun(Msg) -> + CPid ! {dispatch, {self(), retained_msg(Msg}} + end, RetainedMsgs). + +%% ------------------------------------------------------------------ +%% gen_server Function Definitions +%% ------------------------------------------------------------------ + +init([RetainOpts]) -> + mnesia:create_table(mqtt_retained, [ + {type, ordered_set}, + {ram_copies, [node()]}, + {attributes, record_info(fields, mqtt_retained)}]), + mnesia:add_table_copy(mqtt_retained, node(), ram_copies), + Limit = proplists:get_value(store_limit, RetainOpts, ?STORE_LIMIT), + {ok, #state{store_limit = Limit}}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast({retain, Msg}, State = #state{store_limit = Limit}) -> + case mnesia:table_info(?RETAINED_TAB, size) of + Size >= Limit -> + lager:error("Server dropped message(retain) for table is full: ~p", [Msg]); + true -> + lager:info("Server retained message: ~p", [Msg]), + mnesia:dirty_write(#mqtt_retained{ topic = Topic, + qos = Qos, + payload = Payload }) + end, + {noreply, State}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ +match(Topics) -> + %%TODO: dirty_all_keys.... + Topics. + +retained_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) -> + #mqtt_message { qos = Qos, retain = true, topic = Topic, payload = Payload }. + diff --git a/apps/emqtt/src/emqtt_session.erl b/apps/emqtt/src/emqtt_session.erl index 430f82ae9..e52890a1a 100644 --- a/apps/emqtt/src/emqtt_session.erl +++ b/apps/emqtt/src/emqtt_session.erl @@ -147,6 +147,8 @@ subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Top end, SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), {ok, GrantedQos} = emqtt_pubsub:subscribe(Topics, self()), + %%TODO: should be gen_event and notification... + emqtt_server:subscribe([ Name || {Name, _} <- Topics ], self()), {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; subscribe(SessPid, Topics) when is_pid(SessPid) -> diff --git a/apps/emqtt/src/x.erl b/apps/emqtt/src/x.erl new file mode 100644 index 000000000..06e3e515d --- /dev/null +++ b/apps/emqtt/src/x.erl @@ -0,0 +1,50 @@ +-module(x). +-behaviour(gen_server). +-define(SERVER, ?MODULE). + +%% ------------------------------------------------------------------ +%% API Function Exports +%% ------------------------------------------------------------------ + +-export([start_link/0]). + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% ------------------------------------------------------------------ +%% API Function Definitions +%% ------------------------------------------------------------------ + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%% ------------------------------------------------------------------ +%% gen_server Function Definitions +%% ------------------------------------------------------------------ + +init(Args) -> + {ok, Args}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ + diff --git a/rel/files/app.config b/rel/files/app.config index 3102ebb27..f6ae0f077 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -39,7 +39,7 @@ {store_qos0, false} ]}, {retain, [ - + {store_limit, 100000} ]}, {listen, [ {mqtt, 1883, [