From 761a3b2a2c5544cd31ac4f94e5ae13c757266eb8 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Sun, 12 Apr 2015 22:52:29 +0800 Subject: [PATCH] redesign retained message store and deliver --- apps/emqttd/include/emqttd.hrl | 5 ++- apps/emqttd/src/emqttd_client.erl | 7 +++ apps/emqttd/src/emqttd_mnesia.erl | 23 ++++++---- apps/emqttd/src/emqttd_retained.erl | 66 ++++++++++++++++------------- apps/emqttd/src/emqttd_session.erl | 6 ++- 5 files changed, 66 insertions(+), 41 deletions(-) diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index 112f86806..45d2a7330 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -93,7 +93,8 @@ %%------------------------------------------------------------------------------ %% MQTT Retained Message %%------------------------------------------------------------------------------ --record(mqtt_retained, {topic, qos, payload}). +-record(message_retained, {topic, qos, payload}). + +-type message_retained() :: #message_retained{}. --type mqtt_retained() :: #mqtt_retained{}. diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index a7401aeb7..30d1ed41a 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -107,6 +107,13 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState stop({shutdown, duplicate_id}, State); %%TODO: ok?? +handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) -> + ProtoState1 = + lists:foldl(fun(Message, PState) -> + {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1 + end, ProtoState, Messages), + {noreply, State#state{proto_state = ProtoState1}}; + handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; diff --git a/apps/emqttd/src/emqttd_mnesia.erl b/apps/emqttd/src/emqttd_mnesia.erl index f7e5d5bab..97d6714a0 100644 --- a/apps/emqttd/src/emqttd_mnesia.erl +++ b/apps/emqttd/src/emqttd_mnesia.erl @@ -109,10 +109,10 @@ create_tables() -> {index, [subpid]}, {local_content, true}]), %% TODO: retained messages, this table should not be copied... - ok = create_table(mqtt_retained, [ + ok = create_table(message_retained, [ {type, ordered_set}, {ram_copies, [node()]}, - {attributes, record_info(fields, mqtt_retained)}]). + {attributes, record_info(fields, message_retained)}]). create_table(Table, Attrs) -> case mnesia:create_table(Table, Attrs) of @@ -129,11 +129,18 @@ create_table(Table, Attrs) -> %% @end %%------------------------------------------------------------------------------ copy_tables() -> - {atomic, ok} = mnesia:add_table_copy(topic, node(), ram_copies), - {atomic, ok} = mnesia:add_table_copy(topic_trie, node(), ram_copies), - {atomic, ok} = mnesia:add_table_copy(topic_trie_node, node(), ram_copies), - {atomic, ok} = mnesia:add_table_copy(topic_subscriber, node(), ram_copies), - {atomic, ok} = mnesia:add_table_copy(mqtt_retained, node(), ram_copies). + ok = copy_ram_table(topic), + ok = copy_ram_table(topic_trie), + ok = copy_ram_table(topic_trie_node), + ok = copy_ram_table(topic_subscriber), + ok = copy_ram_table(message_retained). + +copy_ram_table(Table) -> + case mnesia:add_table_copy(Table, node(), ram_copies) of + {atomic, ok} -> ok; + {aborted, {already_exists, Table, _Node}} -> ok; + {aborted, Error} -> Error + end. %%------------------------------------------------------------------------------ %% @doc @@ -173,7 +180,7 @@ cluster(Node) -> end end, - init_tables(), + copy_tables(), wait_for_tables(). wait_for_mnesia(stop) -> diff --git a/apps/emqttd/src/emqttd_retained.erl b/apps/emqttd/src/emqttd_retained.erl index 286cc3721..5a7a4743f 100644 --- a/apps/emqttd/src/emqttd_retained.erl +++ b/apps/emqttd/src/emqttd_retained.erl @@ -34,37 +34,35 @@ -include("emqttd_packet.hrl"). +-define(RETAINED_TABLE, message_retained). + %% API Function Exports --export([retain/1, dispatch/2]). +-export([retain/1, redeliver/2]). %% @doc retain message. -spec retain(mqtt_message()) -> ok | ignore. retain(#mqtt_message{retain = false}) -> ignore; %% RETAIN flag set to 1 and payload containing zero bytes -retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> - mnesia:transaction(fun() -> mnesia:delete({mqtt_retained, Topic}) end); +retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> + mnesia:async_dirty(fun mnesia:delete/1, [{?RETAINED_TABLE, Topic}]); retain(Msg = #mqtt_message{retain = true, topic = Topic, qos = Qos, payload = Payload}) -> - TabSize = mnesia:table_info(mqtt_retained, size), + TabSize = mnesia:table_info(?RETAINED_TABLE, size), case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> lager:debug("Retained: store message: ~p", [Msg]), - mnesia:transaction( - fun() -> - mnesia:write(#mqtt_retained{topic = Topic, - qos = Qos, - payload = Payload}) - end), + RetainedMsg = #message_retained{topic = Topic, qos = Qos, payload = Payload}, + mnesia:async_dirty(fun mnesia:write/1, [RetainedMsg]), emqttd_metrics:set('messages/retained/count', - mnesia:table_info(mqtt_retained, size)); + mnesia:table_info(?RETAINED_TABLE, size)); {false, _}-> - lager:error("Retained: dropped message(topic=~s) for table is full!", [Topic]); + lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]); {_, false}-> - lager:error("Retained: dropped message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)]) + lager:error("Dropped retained message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)]) end. limit(table) -> @@ -81,26 +79,34 @@ env() -> Env end. -%% @doc dispatch retained messages to subscribed client. --spec dispatch(Topics, CPid) -> any() when +%% @doc redeliver retained messages to subscribed client. +-spec redeliver(Topics, CPid) -> any() when Topics :: list(binary()), CPid :: pid(). -dispatch(Topics, CPid) when is_pid(CPid) -> - Msgs = lists:flatten([mnesia:dirty_read(mqtt_retained, Topic) || Topic <- match(Topics)]), - lists:foreach(fun(Msg) -> CPid ! {dispatch, {self(), mqtt_msg(Msg)}} end, Msgs). +redeliver(Topics, CPid) when is_pid(CPid) -> + lists:foreach(fun(Topic) -> + case emqttd_topic:type(#topic{name=Topic}) of + direct -> + dispatch(CPid, mnesia:dirty_read(message_retained, Topic)); + wildcard -> + Fun = fun(Msg = #message_retained{topic = Name}, Acc) -> + case emqttd_topic:match(Name, Topic) of + true -> [Msg|Acc]; + false -> Acc + end + end, + RetainedMsgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], ?RETAINED_TABLE]), + dispatch(CPid, lists:reverse(RetainedMsgs)) + end + end, Topics). -match(Topics) -> - RetainedTopics = mnesia:dirty_all_keys(mqtt_retained), - lists:flatten([match(Topic, RetainedTopics) || Topic <- Topics]). +dispatch(_CPid, []) -> + ignore; +dispatch(CPid, RetainedMsgs) when is_list(RetainedMsgs) -> + CPid ! {dispatch, {self(), [mqtt_msg(Msg) || Msg <- RetainedMsgs]}}; +dispatch(CPid, RetainedMsg) when is_record(RetainedMsg, message_retained) -> + CPid ! {dispatch, {self(), mqtt_msg(RetainedMsg)}}. -match(Topic, RetainedTopics) -> - case emqttd_topic:type(#topic{name=Topic}) of - direct -> %% FIXME - [Topic]; - wildcard -> - [T || T <- RetainedTopics, emqttd_topic:match(T, Topic)] - end. - -mqtt_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) -> +mqtt_msg(#message_retained{topic = Topic, qos = Qos, payload = Payload}) -> #mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}. diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 11075610a..30967e6f2 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -187,7 +187,7 @@ subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Top SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), %%TODO: should be gen_event and notification... - emqttd_retained:dispatch([ Name || {Name, _} <- Topics ], self()), + emqttd_retained:redeliver([Name || {Name, _} <- Topics], self()), {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; subscribe(SessPid, Topics) when is_pid(SessPid) -> @@ -337,6 +337,10 @@ handle_cast({destroy, ClientId}, State = #session_state{client_id = ClientId}) - handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. +handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) -> + F = fun(Message, S) -> dispatch(Message, S) end, + {noreply, lists:foldl(F, State, Messages)}; + handle_info({dispatch, {_From, Message}}, State) -> {noreply, dispatch(Message, State)};