retained messages

This commit is contained in:
Ery Lee 2015-04-08 16:02:55 +08:00
parent 826ca7afca
commit 03806557ef
9 changed files with 135 additions and 175 deletions

View File

@ -79,12 +79,4 @@ is_running(Node) ->
Pid when is_pid(Pid) -> true
end.
%% TODO: publish chain...
publish(FromClient, Topic, Message) ->
emqttd_router:route(Message).
%% TODO: subscribe: subscribe chain...
subscribe(FromClient, Topic) ->
emqttd_pubsub:subscribe(Topic).

View File

@ -35,7 +35,6 @@
-define(SERVICES, [config,
event,
retained,
client,
session,
pubsub,
@ -62,6 +61,7 @@
Reason :: term().
start(_StartType, _StartArgs) ->
print_banner(),
emqttd_mnesia:init(),
{ok, Sup} = emqttd_sup:start_link(),
start_services(Sup),
ok = emqttd_mnesia:wait(),
@ -101,10 +101,6 @@ service(config) ->
service(event) ->
{"emqttd event", emqttd_event};
service(retained) ->
{ok, RetainOpts} = application:get_env(retain),
{"emqttd server", emqttd_server, RetainOpts};
service(client) ->
{"emqttd client manager", emqttd_cm};

View File

@ -28,21 +28,28 @@
-author('feng@emqtt.io').
-export([init/0, wait/0, stop/0]).
-include("emqttd.hrl").
-export([init/0, wait/0]).
init() ->
case mnesia:system_info(extra_db_nodes) of
[] -> mnesia:create_schema([node()]);
_ -> ok
case mnesia:system_info(extra_db_nodes) of
[] ->
mnesia:stop(),
mnesia:create_schema([node()]);
_ ->
ok
end,
ok = mnesia:start(),
create_tables().
create_tables() ->
mnesia:create_table(mqtt_retained, [
{type, ordered_set},
{ram_copies, [node()]},
{attributes, record_info(fields, mqtt_retained)}]),
mnesia:add_table_copy(mqtt_retained, node(), ram_copies).
wait() ->
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).
%TODO: timeout should be configured?
wait() ->
mnesia:wait_for_tables([topic, topic_trie, topic_trie_node], 30000).
stop() ->
mnesia:stop().

View File

@ -0,0 +1,106 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd retained messages.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_retained).
-author('feng@slimpp.io').
-include("emqttd.hrl").
-include("emqttd_topic.hrl").
-include("emqttd_packet.hrl").
%% API Function Exports
-export([retain/1, dispatch/2]).
%% @doc retain message.
-spec retain(mqtt_message()) -> ok | ignore.
retain(#mqtt_message{retain = false}) -> ignore;
%% RETAIN flag set to 1 and payload containing zero bytes
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
mnesia:transaction(fun() -> mnesia:delete({mqtt_retained, Topic}) end);
retain(Msg = #mqtt_message{retain = true,
topic = Topic,
qos = Qos,
payload = Payload}) ->
TabSize = mnesia:table_info(mqtt_retained, size),
case {TabSize < limit(table), size(Payload) < limit(payload)} of
{true, true} ->
lager:debug("Retained: store message: ~p", [Msg]),
mnesia:transaction(
fun() ->
mnesia:write(#mqtt_retained{topic = Topic,
qos = Qos,
payload = Payload})
end),
emqttd_metrics:set('messages/retained/count',
mnesia:table_info(mqtt_retained, size));
{false, _}->
lager:error("Retained: dropped message(topic=~s) for table is full!", [Topic]);
{_, false}->
lager:error("Retained: dropped message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)])
end.
limit(table) ->
proplists:get_value(max_message_num, env());
limit(payload) ->
proplists:get_value(max_playload_size, env()).
env() ->
case get({env, retained}) of
undefined ->
{ok, Env} = application:get_env(emqttd, retained),
put({env, retained}, Env), Env;
Env ->
Env
end.
%% @doc dispatch retained messages to subscribed client.
-spec dispatch(Topics, CPid) -> any() when
Topics :: list(binary()),
CPid :: pid().
dispatch(Topics, CPid) when is_pid(CPid) ->
Msgs = lists:flatten([mnesia:dirty_read(mqtt_retained, Topic) || Topic <- match(Topics)]),
lists:foreach(fun(Msg) -> CPid ! {dispatch, {self(), mqtt_msg(Msg)}} end, Msgs).
match(Topics) ->
RetainedTopics = mnesia:dirty_all_keys(mqtt_retained),
lists:flatten([match(Topic, RetainedTopics) || Topic <- Topics]).
match(Topic, RetainedTopics) ->
case emqttd_topic:type(#topic{name=Topic}) of
direct -> %% FIXME
[Topic];
wildcard ->
[T || T <- RetainedTopics, emqttd_topic:match(T, Topic)]
end.
mqtt_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) ->
#mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}.

View File

@ -65,7 +65,7 @@ start_link() ->
route(Msg) ->
lager:debug("Route ~s", [emqttd_message:dump(Msg)]),
% TODO: need to retain?
emqttd_server:retain(Msg),
emqttd_retained:retain(Msg),
% unset flag and pubsub
emqttd_pubsub:publish(emqttd_message:unset_flag(Msg)).

View File

@ -1,142 +0,0 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd server. retain messages???
%%% TODO: redesign...
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_server).
-author('feng@slimpp.io').
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-include("emqttd.hrl").
-include("emqttd_topic.hrl").
-include("emqttd_packet.hrl").
-record(state, {store_limit}).
-define(RETAINED_TAB, mqtt_retained).
-define(STORE_LIMIT, 1000000).
%% API Function Exports
-export([start_link/1, retain/1, subscribe/2]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%%%=============================================================================
%%% API
%%%=============================================================================
-spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}.
start_link(Opts) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
retain(#mqtt_message{retain = false}) -> ignore;
%% RETAIN flag set to 1 and payload containing zero bytes
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
mnesia:dirty_delete(?RETAINED_TAB, Topic);
retain(Msg = #mqtt_message{retain = true}) ->
gen_server:cast(?SERVER, {retain, Msg}).
%% TODO: this is not right???
subscribe(Topics, CPid) when is_pid(CPid) ->
RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]),
lists:foreach(fun(Msg) ->
CPid ! {dispatch, {self(), retained_msg(Msg)}}
end, RetainedMsgs).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([Opts]) ->
mnesia:create_table(?RETAINED_TAB, [
{type, ordered_set},
{ram_copies, [node()]},
{attributes, record_info(fields, mqtt_retained)}]),
mnesia:add_table_copy(?RETAINED_TAB, node(), ram_copies),
Limit = proplists:get_value(store_limit, Opts, ?STORE_LIMIT),
{ok, #state{store_limit = Limit}}.
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({retain, Msg = #mqtt_message{topic = Topic,
qos = Qos,
payload = Payload}},
State = #state{store_limit = Limit}) ->
case mnesia:table_info(?RETAINED_TAB, size) of
Size when Size >= Limit ->
lager:error("Dropped message(retain) for table is full: ~p", [Msg]);
_ ->
lager:debug("Retained message: ~p", [Msg]),
mnesia:dirty_write(#mqtt_retained{topic = Topic,
qos = Qos,
payload = Payload}),
emqttd_metrics:set('messages/retained/count',
mnesia:table_info(?RETAINED_TAB, size))
end,
{noreply, State};
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
match(Topics) ->
RetainedTopics = mnesia:dirty_all_keys(?RETAINED_TAB),
lists:flatten([match(Topic, RetainedTopics) || Topic <- Topics]).
match(Topic, RetainedTopics) ->
case emqttd_topic:type(#topic{name=Topic}) of
direct -> %% FIXME
[Topic];
wildcard ->
[T || T <- RetainedTopics, emqttd_topic:match(T, Topic)]
end.
retained_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) ->
#mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}.

View File

@ -187,7 +187,7 @@ subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Top
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
%%TODO: should be gen_event and notification...
emqttd_server:subscribe([ Name || {Name, _} <- Topics ], self()),
emqttd_retained:dispatch([ Name || {Name, _} <- Topics ], self()),
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
subscribe(SessPid, Topics) when is_pid(SessPid) ->

View File

@ -8,7 +8,7 @@
{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
]},
{mnesia, [
{dir, "data"}
{dir, "data/mnesia"}
]},
{ssl, [
%{versions, ['tlsv1.2', 'tlsv1.1']}
@ -59,13 +59,14 @@
]},
%% Session
{session, [
{expires, 1},
{expires, 1}, %hour
{max_queue, 1000},
{store_qos0, false}
]},
%% Retain messages
{retain, [
{store_limit, 100000}
{retained, [
{max_message_num, 100000},
{max_playload_size, 16#ffff}
]},
%% Broker
{broker, [

View File

@ -2,7 +2,7 @@
{lib_dirs, ["../apps", "../deps", "../plugins"]},
{erts, [{mod_cond, derived}, {app_file, strip}]},
{app_file, strip},
{rel, "emqttd", "0.5.4",
{rel, "emqttd", "0.6.0",
[
kernel,
stdlib,
@ -10,7 +10,7 @@
syntax_tools,
ssl,
crypto,
mnesia,
%mnesia,
os_mon,
inets,
goldrush,