From 03806557ef27f0f43b9d5cb2a3d61101b843eff4 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Wed, 8 Apr 2015 16:02:55 +0800 Subject: [PATCH] retained messages --- apps/emqttd/src/emqttd.erl | 8 -- apps/emqttd/src/emqttd_app.erl | 6 +- apps/emqttd/src/emqttd_mnesia.erl | 31 +++--- apps/emqttd/src/emqttd_retained.erl | 106 +++++++++++++++++++++ apps/emqttd/src/emqttd_router.erl | 2 +- apps/emqttd/src/emqttd_server.erl | 142 ---------------------------- apps/emqttd/src/emqttd_session.erl | 2 +- rel/files/app.config | 9 +- rel/reltool.config | 4 +- 9 files changed, 135 insertions(+), 175 deletions(-) create mode 100644 apps/emqttd/src/emqttd_retained.erl delete mode 100644 apps/emqttd/src/emqttd_server.erl diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index 578280c42..f9d70fc04 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -79,12 +79,4 @@ is_running(Node) -> Pid when is_pid(Pid) -> true end. -%% TODO: publish chain... -publish(FromClient, Topic, Message) -> - emqttd_router:route(Message). - -%% TODO: subscribe: subscribe chain... -subscribe(FromClient, Topic) -> - emqttd_pubsub:subscribe(Topic). - diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 6b084c071..08667cc0d 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -35,7 +35,6 @@ -define(SERVICES, [config, event, - retained, client, session, pubsub, @@ -62,6 +61,7 @@ Reason :: term(). start(_StartType, _StartArgs) -> print_banner(), + emqttd_mnesia:init(), {ok, Sup} = emqttd_sup:start_link(), start_services(Sup), ok = emqttd_mnesia:wait(), @@ -101,10 +101,6 @@ service(config) -> service(event) -> {"emqttd event", emqttd_event}; -service(retained) -> - {ok, RetainOpts} = application:get_env(retain), - {"emqttd server", emqttd_server, RetainOpts}; - service(client) -> {"emqttd client manager", emqttd_cm}; diff --git a/apps/emqttd/src/emqttd_mnesia.erl b/apps/emqttd/src/emqttd_mnesia.erl index 7fbd084d4..27889dcd0 100644 --- a/apps/emqttd/src/emqttd_mnesia.erl +++ b/apps/emqttd/src/emqttd_mnesia.erl @@ -28,21 +28,28 @@ -author('feng@emqtt.io'). --export([init/0, wait/0, stop/0]). +-include("emqttd.hrl"). + +-export([init/0, wait/0]). init() -> - case mnesia:system_info(extra_db_nodes) of - [] -> mnesia:create_schema([node()]); - _ -> ok + case mnesia:system_info(extra_db_nodes) of + [] -> + mnesia:stop(), + mnesia:create_schema([node()]); + _ -> + ok end, ok = mnesia:start(), + create_tables(). + +create_tables() -> + mnesia:create_table(mqtt_retained, [ + {type, ordered_set}, + {ram_copies, [node()]}, + {attributes, record_info(fields, mqtt_retained)}]), + mnesia:add_table_copy(mqtt_retained, node(), ram_copies). + +wait() -> mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity). -%TODO: timeout should be configured? -wait() -> - mnesia:wait_for_tables([topic, topic_trie, topic_trie_node], 30000). - -stop() -> - mnesia:stop(). - - diff --git a/apps/emqttd/src/emqttd_retained.erl b/apps/emqttd/src/emqttd_retained.erl new file mode 100644 index 000000000..286cc3721 --- /dev/null +++ b/apps/emqttd/src/emqttd_retained.erl @@ -0,0 +1,106 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd retained messages. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_retained). + +-author('feng@slimpp.io'). + +-include("emqttd.hrl"). + +-include("emqttd_topic.hrl"). + +-include("emqttd_packet.hrl"). + +%% API Function Exports +-export([retain/1, dispatch/2]). + +%% @doc retain message. +-spec retain(mqtt_message()) -> ok | ignore. +retain(#mqtt_message{retain = false}) -> ignore; + +%% RETAIN flag set to 1 and payload containing zero bytes +retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> + mnesia:transaction(fun() -> mnesia:delete({mqtt_retained, Topic}) end); + +retain(Msg = #mqtt_message{retain = true, + topic = Topic, + qos = Qos, + payload = Payload}) -> + TabSize = mnesia:table_info(mqtt_retained, size), + case {TabSize < limit(table), size(Payload) < limit(payload)} of + {true, true} -> + lager:debug("Retained: store message: ~p", [Msg]), + mnesia:transaction( + fun() -> + mnesia:write(#mqtt_retained{topic = Topic, + qos = Qos, + payload = Payload}) + end), + emqttd_metrics:set('messages/retained/count', + mnesia:table_info(mqtt_retained, size)); + {false, _}-> + lager:error("Retained: dropped message(topic=~s) for table is full!", [Topic]); + {_, false}-> + lager:error("Retained: dropped message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)]) + end. + +limit(table) -> + proplists:get_value(max_message_num, env()); +limit(payload) -> + proplists:get_value(max_playload_size, env()). + +env() -> + case get({env, retained}) of + undefined -> + {ok, Env} = application:get_env(emqttd, retained), + put({env, retained}, Env), Env; + Env -> + Env + end. + +%% @doc dispatch retained messages to subscribed client. +-spec dispatch(Topics, CPid) -> any() when + Topics :: list(binary()), + CPid :: pid(). +dispatch(Topics, CPid) when is_pid(CPid) -> + Msgs = lists:flatten([mnesia:dirty_read(mqtt_retained, Topic) || Topic <- match(Topics)]), + lists:foreach(fun(Msg) -> CPid ! {dispatch, {self(), mqtt_msg(Msg)}} end, Msgs). + +match(Topics) -> + RetainedTopics = mnesia:dirty_all_keys(mqtt_retained), + lists:flatten([match(Topic, RetainedTopics) || Topic <- Topics]). + +match(Topic, RetainedTopics) -> + case emqttd_topic:type(#topic{name=Topic}) of + direct -> %% FIXME + [Topic]; + wildcard -> + [T || T <- RetainedTopics, emqttd_topic:match(T, Topic)] + end. + +mqtt_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) -> + #mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}. + diff --git a/apps/emqttd/src/emqttd_router.erl b/apps/emqttd/src/emqttd_router.erl index d32172299..5006e8f59 100644 --- a/apps/emqttd/src/emqttd_router.erl +++ b/apps/emqttd/src/emqttd_router.erl @@ -65,7 +65,7 @@ start_link() -> route(Msg) -> lager:debug("Route ~s", [emqttd_message:dump(Msg)]), % TODO: need to retain? - emqttd_server:retain(Msg), + emqttd_retained:retain(Msg), % unset flag and pubsub emqttd_pubsub:publish(emqttd_message:unset_flag(Msg)). diff --git a/apps/emqttd/src/emqttd_server.erl b/apps/emqttd/src/emqttd_server.erl deleted file mode 100644 index d70cebca1..000000000 --- a/apps/emqttd/src/emqttd_server.erl +++ /dev/null @@ -1,142 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd server. retain messages??? -%%% TODO: redesign... -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_server). - --author('feng@slimpp.io'). - --behaviour(gen_server). - --define(SERVER, ?MODULE). - --include("emqttd.hrl"). - --include("emqttd_topic.hrl"). - --include("emqttd_packet.hrl"). - --record(state, {store_limit}). - --define(RETAINED_TAB, mqtt_retained). - --define(STORE_LIMIT, 1000000). - -%% API Function Exports --export([start_link/1, retain/1, subscribe/2]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%%%============================================================================= -%%% API -%%%============================================================================= - --spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}. -start_link(Opts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). - -retain(#mqtt_message{retain = false}) -> ignore; - -%% RETAIN flag set to 1 and payload containing zero bytes -retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> - mnesia:dirty_delete(?RETAINED_TAB, Topic); - -retain(Msg = #mqtt_message{retain = true}) -> - gen_server:cast(?SERVER, {retain, Msg}). - -%% TODO: this is not right??? -subscribe(Topics, CPid) when is_pid(CPid) -> - RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]), - lists:foreach(fun(Msg) -> - CPid ! {dispatch, {self(), retained_msg(Msg)}} - end, RetainedMsgs). - -%%%============================================================================= -%%% gen_server callbacks -%%%============================================================================= - -init([Opts]) -> - mnesia:create_table(?RETAINED_TAB, [ - {type, ordered_set}, - {ram_copies, [node()]}, - {attributes, record_info(fields, mqtt_retained)}]), - mnesia:add_table_copy(?RETAINED_TAB, node(), ram_copies), - Limit = proplists:get_value(store_limit, Opts, ?STORE_LIMIT), - {ok, #state{store_limit = Limit}}. - -handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. - -handle_cast({retain, Msg = #mqtt_message{topic = Topic, - qos = Qos, - payload = Payload}}, - State = #state{store_limit = Limit}) -> - case mnesia:table_info(?RETAINED_TAB, size) of - Size when Size >= Limit -> - lager:error("Dropped message(retain) for table is full: ~p", [Msg]); - _ -> - lager:debug("Retained message: ~p", [Msg]), - mnesia:dirty_write(#mqtt_retained{topic = Topic, - qos = Qos, - payload = Payload}), - emqttd_metrics:set('messages/retained/count', - mnesia:table_info(?RETAINED_TAB, size)) - end, - {noreply, State}; - -handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. - -handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%============================================================================= -%%% Internal functions -%%%============================================================================= - -match(Topics) -> - RetainedTopics = mnesia:dirty_all_keys(?RETAINED_TAB), - lists:flatten([match(Topic, RetainedTopics) || Topic <- Topics]). - -match(Topic, RetainedTopics) -> - case emqttd_topic:type(#topic{name=Topic}) of - direct -> %% FIXME - [Topic]; - wildcard -> - [T || T <- RetainedTopics, emqttd_topic:match(T, Topic)] - end. - -retained_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) -> - #mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}. - - diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index bf9831135..11075610a 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -187,7 +187,7 @@ subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Top SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), %%TODO: should be gen_event and notification... - emqttd_server:subscribe([ Name || {Name, _} <- Topics ], self()), + emqttd_retained:dispatch([ Name || {Name, _} <- Topics ], self()), {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; subscribe(SessPid, Topics) when is_pid(SessPid) -> diff --git a/rel/files/app.config b/rel/files/app.config index f21c25096..b63d4170b 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -8,7 +8,7 @@ {sasl_error_logger, {file, "log/emqttd_sasl.log"}} ]}, {mnesia, [ - {dir, "data"} + {dir, "data/mnesia"} ]}, {ssl, [ %{versions, ['tlsv1.2', 'tlsv1.1']} @@ -59,13 +59,14 @@ ]}, %% Session {session, [ - {expires, 1}, + {expires, 1}, %hour {max_queue, 1000}, {store_qos0, false} ]}, %% Retain messages - {retain, [ - {store_limit, 100000} + {retained, [ + {max_message_num, 100000}, + {max_playload_size, 16#ffff} ]}, %% Broker {broker, [ diff --git a/rel/reltool.config b/rel/reltool.config index 1b9849f48..fd226f03a 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -2,7 +2,7 @@ {lib_dirs, ["../apps", "../deps", "../plugins"]}, {erts, [{mod_cond, derived}, {app_file, strip}]}, {app_file, strip}, - {rel, "emqttd", "0.5.4", + {rel, "emqttd", "0.6.0", [ kernel, stdlib, @@ -10,7 +10,7 @@ syntax_tools, ssl, crypto, - mnesia, + %mnesia, os_mon, inets, goldrush,