redesign retained message store and deliver
This commit is contained in:
parent
4fb90d1e19
commit
761a3b2a2c
|
@ -93,7 +93,8 @@
|
|||
%%------------------------------------------------------------------------------
|
||||
%% MQTT Retained Message
|
||||
%%------------------------------------------------------------------------------
|
||||
-record(mqtt_retained, {topic, qos, payload}).
|
||||
-record(message_retained, {topic, qos, payload}).
|
||||
|
||||
-type message_retained() :: #message_retained{}.
|
||||
|
||||
-type mqtt_retained() :: #mqtt_retained{}.
|
||||
|
||||
|
|
|
@ -107,6 +107,13 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState
|
|||
stop({shutdown, duplicate_id}, State);
|
||||
|
||||
%%TODO: ok??
|
||||
handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) ->
|
||||
ProtoState1 =
|
||||
lists:foldl(fun(Message, PState) ->
|
||||
{ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1
|
||||
end, ProtoState, Messages),
|
||||
{noreply, State#state{proto_state = ProtoState1}};
|
||||
|
||||
handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) ->
|
||||
{ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState),
|
||||
{noreply, State#state{proto_state = ProtoState1}};
|
||||
|
|
|
@ -109,10 +109,10 @@ create_tables() ->
|
|||
{index, [subpid]},
|
||||
{local_content, true}]),
|
||||
%% TODO: retained messages, this table should not be copied...
|
||||
ok = create_table(mqtt_retained, [
|
||||
ok = create_table(message_retained, [
|
||||
{type, ordered_set},
|
||||
{ram_copies, [node()]},
|
||||
{attributes, record_info(fields, mqtt_retained)}]).
|
||||
{attributes, record_info(fields, message_retained)}]).
|
||||
|
||||
create_table(Table, Attrs) ->
|
||||
case mnesia:create_table(Table, Attrs) of
|
||||
|
@ -129,11 +129,18 @@ create_table(Table, Attrs) ->
|
|||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
copy_tables() ->
|
||||
{atomic, ok} = mnesia:add_table_copy(topic, node(), ram_copies),
|
||||
{atomic, ok} = mnesia:add_table_copy(topic_trie, node(), ram_copies),
|
||||
{atomic, ok} = mnesia:add_table_copy(topic_trie_node, node(), ram_copies),
|
||||
{atomic, ok} = mnesia:add_table_copy(topic_subscriber, node(), ram_copies),
|
||||
{atomic, ok} = mnesia:add_table_copy(mqtt_retained, node(), ram_copies).
|
||||
ok = copy_ram_table(topic),
|
||||
ok = copy_ram_table(topic_trie),
|
||||
ok = copy_ram_table(topic_trie_node),
|
||||
ok = copy_ram_table(topic_subscriber),
|
||||
ok = copy_ram_table(message_retained).
|
||||
|
||||
copy_ram_table(Table) ->
|
||||
case mnesia:add_table_copy(Table, node(), ram_copies) of
|
||||
{atomic, ok} -> ok;
|
||||
{aborted, {already_exists, Table, _Node}} -> ok;
|
||||
{aborted, Error} -> Error
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -173,7 +180,7 @@ cluster(Node) ->
|
|||
end
|
||||
|
||||
end,
|
||||
init_tables(),
|
||||
copy_tables(),
|
||||
wait_for_tables().
|
||||
|
||||
wait_for_mnesia(stop) ->
|
||||
|
|
|
@ -34,37 +34,35 @@
|
|||
|
||||
-include("emqttd_packet.hrl").
|
||||
|
||||
-define(RETAINED_TABLE, message_retained).
|
||||
|
||||
%% API Function Exports
|
||||
-export([retain/1, dispatch/2]).
|
||||
-export([retain/1, redeliver/2]).
|
||||
|
||||
%% @doc retain message.
|
||||
-spec retain(mqtt_message()) -> ok | ignore.
|
||||
retain(#mqtt_message{retain = false}) -> ignore;
|
||||
|
||||
%% RETAIN flag set to 1 and payload containing zero bytes
|
||||
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
|
||||
mnesia:transaction(fun() -> mnesia:delete({mqtt_retained, Topic}) end);
|
||||
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
|
||||
mnesia:async_dirty(fun mnesia:delete/1, [{?RETAINED_TABLE, Topic}]);
|
||||
|
||||
retain(Msg = #mqtt_message{retain = true,
|
||||
topic = Topic,
|
||||
qos = Qos,
|
||||
payload = Payload}) ->
|
||||
TabSize = mnesia:table_info(mqtt_retained, size),
|
||||
TabSize = mnesia:table_info(?RETAINED_TABLE, size),
|
||||
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
||||
{true, true} ->
|
||||
lager:debug("Retained: store message: ~p", [Msg]),
|
||||
mnesia:transaction(
|
||||
fun() ->
|
||||
mnesia:write(#mqtt_retained{topic = Topic,
|
||||
qos = Qos,
|
||||
payload = Payload})
|
||||
end),
|
||||
RetainedMsg = #message_retained{topic = Topic, qos = Qos, payload = Payload},
|
||||
mnesia:async_dirty(fun mnesia:write/1, [RetainedMsg]),
|
||||
emqttd_metrics:set('messages/retained/count',
|
||||
mnesia:table_info(mqtt_retained, size));
|
||||
mnesia:table_info(?RETAINED_TABLE, size));
|
||||
{false, _}->
|
||||
lager:error("Retained: dropped message(topic=~s) for table is full!", [Topic]);
|
||||
lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]);
|
||||
{_, false}->
|
||||
lager:error("Retained: dropped message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)])
|
||||
lager:error("Dropped retained message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)])
|
||||
end.
|
||||
|
||||
limit(table) ->
|
||||
|
@ -81,26 +79,34 @@ env() ->
|
|||
Env
|
||||
end.
|
||||
|
||||
%% @doc dispatch retained messages to subscribed client.
|
||||
-spec dispatch(Topics, CPid) -> any() when
|
||||
%% @doc redeliver retained messages to subscribed client.
|
||||
-spec redeliver(Topics, CPid) -> any() when
|
||||
Topics :: list(binary()),
|
||||
CPid :: pid().
|
||||
dispatch(Topics, CPid) when is_pid(CPid) ->
|
||||
Msgs = lists:flatten([mnesia:dirty_read(mqtt_retained, Topic) || Topic <- match(Topics)]),
|
||||
lists:foreach(fun(Msg) -> CPid ! {dispatch, {self(), mqtt_msg(Msg)}} end, Msgs).
|
||||
redeliver(Topics, CPid) when is_pid(CPid) ->
|
||||
lists:foreach(fun(Topic) ->
|
||||
case emqttd_topic:type(#topic{name=Topic}) of
|
||||
direct ->
|
||||
dispatch(CPid, mnesia:dirty_read(message_retained, Topic));
|
||||
wildcard ->
|
||||
Fun = fun(Msg = #message_retained{topic = Name}, Acc) ->
|
||||
case emqttd_topic:match(Name, Topic) of
|
||||
true -> [Msg|Acc];
|
||||
false -> Acc
|
||||
end
|
||||
end,
|
||||
RetainedMsgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], ?RETAINED_TABLE]),
|
||||
dispatch(CPid, lists:reverse(RetainedMsgs))
|
||||
end
|
||||
end, Topics).
|
||||
|
||||
match(Topics) ->
|
||||
RetainedTopics = mnesia:dirty_all_keys(mqtt_retained),
|
||||
lists:flatten([match(Topic, RetainedTopics) || Topic <- Topics]).
|
||||
dispatch(_CPid, []) ->
|
||||
ignore;
|
||||
dispatch(CPid, RetainedMsgs) when is_list(RetainedMsgs) ->
|
||||
CPid ! {dispatch, {self(), [mqtt_msg(Msg) || Msg <- RetainedMsgs]}};
|
||||
dispatch(CPid, RetainedMsg) when is_record(RetainedMsg, message_retained) ->
|
||||
CPid ! {dispatch, {self(), mqtt_msg(RetainedMsg)}}.
|
||||
|
||||
match(Topic, RetainedTopics) ->
|
||||
case emqttd_topic:type(#topic{name=Topic}) of
|
||||
direct -> %% FIXME
|
||||
[Topic];
|
||||
wildcard ->
|
||||
[T || T <- RetainedTopics, emqttd_topic:match(T, Topic)]
|
||||
end.
|
||||
|
||||
mqtt_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) ->
|
||||
mqtt_msg(#message_retained{topic = Topic, qos = Qos, payload = Payload}) ->
|
||||
#mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}.
|
||||
|
||||
|
|
|
@ -187,7 +187,7 @@ subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Top
|
|||
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
|
||||
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
|
||||
%%TODO: should be gen_event and notification...
|
||||
emqttd_retained:dispatch([ Name || {Name, _} <- Topics ], self()),
|
||||
emqttd_retained:redeliver([Name || {Name, _} <- Topics], self()),
|
||||
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
|
||||
|
||||
subscribe(SessPid, Topics) when is_pid(SessPid) ->
|
||||
|
@ -337,6 +337,10 @@ handle_cast({destroy, ClientId}, State = #session_state{client_id = ClientId}) -
|
|||
handle_cast(Msg, State) ->
|
||||
{stop, {badmsg, Msg}, State}.
|
||||
|
||||
handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
|
||||
F = fun(Message, S) -> dispatch(Message, S) end,
|
||||
{noreply, lists:foldl(F, State, Messages)};
|
||||
|
||||
handle_info({dispatch, {_From, Message}}, State) ->
|
||||
{noreply, dispatch(Message, State)};
|
||||
|
||||
|
|
Loading…
Reference in New Issue