From b08bd5a578f55fa711da2f6130f6119c8d093e5f Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 12 Mar 2016 16:38:30 +0800 Subject: [PATCH] retained_message --- src/emqttd_backend.erl | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/emqttd_backend.erl b/src/emqttd_backend.erl index df75866cc..9ad03334a 100644 --- a/src/emqttd_backend.erl +++ b/src/emqttd_backend.erl @@ -34,6 +34,8 @@ -export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1, del_subscription/2]). +-record(retained_message, {topic, msg}). + %%-------------------------------------------------------------------- %% Mnesia callbacks %%-------------------------------------------------------------------- @@ -42,9 +44,8 @@ mnesia(boot) -> ok = emqttd_mnesia:create_table(retained_message, [ {type, ordered_set}, {disc_copies, [node()]}, - {record_name, mqtt_message}, - {index, [#mqtt_message.topic]}, - {attributes, record_info(fields, mqtt_message)}, + {record_name, retained_message}, + {attributes, record_info(fields, retained_message)}, {storage_properties, [{ets, [compressed]}, {dets, [{auto_save, 1000}]}]}]), ok = emqttd_mnesia:create_table(backend_subscription, [ @@ -64,17 +65,17 @@ mnesia(copy) -> %%-------------------------------------------------------------------- -spec(retain_message(mqtt_message()) -> ok). -retain_message(Msg) when is_record(Msg, mqtt_message) -> - mnesia:dirty_write(retained_message, Msg). +retain_message(Msg = #mqtt_message{topic = Topic}) -> + mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}). -spec(read_messages(binary()) -> [mqtt_message()]). read_messages(Topic) -> - mnesia:dirty_index_read(retained_message, Topic, #mqtt_message.topic). + [Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)]. -spec(match_messages(binary()) -> [mqtt_message()]). match_messages(Filter) -> %% TODO: optimize later... - Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) -> + Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) -> case emqttd_topic:match(Name, Filter) of true -> [Msg|Acc]; false -> Acc @@ -84,19 +85,18 @@ match_messages(Filter) -> -spec(delete_message(binary()) -> ok). delete_message(Topic) -> - %%TODO: no transaction??? - [mnesia:dirty_delete_object(retained_message, Msg) || Msg <- read_messages(Topic)]. + mnesia:dirty_delete(retained_message, Topic). -spec(expire_messages(pos_integer()) -> any()). expire_messages(Time) when is_integer(Time) -> mnesia:transaction( fun() -> Match = ets:fun2ms( - fun(#mqtt_message{msgid = MsgId, timestamp = {MegaSecs, Secs, _}}) - when Time > (MegaSecs * 1000000 + Secs) -> MsgId + fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = {MegaSecs, Secs, _}}}) + when Time > (MegaSecs * 1000000 + Secs) -> Topic end), - MsgIds = mnesia:select(retained_message, Match, write), - lists:foreach(fun(MsgId) -> mnesia:delete({retained_message, MsgId}) end, MsgIds) + Topics = mnesia:select(retained_message, Match, write), + lists:foreach(fun(Topic) -> mnesia:delete({retained_message, Topic}) end, Topics) end). -spec(retained_count() -> non_neg_integer()).