commit
c34ae3aa80
|
@ -18,13 +18,19 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
|
||||||
%% Mnesia Callbacks
|
%% Mnesia Callbacks
|
||||||
-export([mnesia/1]).
|
-export([mnesia/1]).
|
||||||
|
|
||||||
-boot_mnesia({mnesia, [boot]}).
|
-boot_mnesia({mnesia, [boot]}).
|
||||||
-copy_mnesia({mnesia, [copy]}).
|
-copy_mnesia({mnesia, [copy]}).
|
||||||
|
|
||||||
%% API.
|
%% Retained Message API
|
||||||
|
-export([retain_message/1, read_messages/1, match_messages/1, delete_message/1,
|
||||||
|
expire_messages/1, retained_count/0]).
|
||||||
|
|
||||||
|
%% Static Subscription API
|
||||||
-export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1,
|
-export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1,
|
||||||
del_subscription/2]).
|
del_subscription/2]).
|
||||||
|
|
||||||
|
@ -33,6 +39,14 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
mnesia(boot) ->
|
mnesia(boot) ->
|
||||||
|
ok = emqttd_mnesia:create_table(retained_message, [
|
||||||
|
{type, ordered_set},
|
||||||
|
{disc_copies, [node()]},
|
||||||
|
{record_name, mqtt_message},
|
||||||
|
{index, [#mqtt_message.topic]},
|
||||||
|
{attributes, record_info(fields, mqtt_message)},
|
||||||
|
{storage_properties, [{ets, [compressed]},
|
||||||
|
{dets, [{auto_save, 1000}]}]}]),
|
||||||
ok = emqttd_mnesia:create_table(backend_subscription, [
|
ok = emqttd_mnesia:create_table(backend_subscription, [
|
||||||
{type, bag},
|
{type, bag},
|
||||||
{disc_copies, [node()]},
|
{disc_copies, [node()]},
|
||||||
|
@ -42,14 +56,59 @@ mnesia(boot) ->
|
||||||
{dets, [{auto_save, 5000}]}]}]);
|
{dets, [{auto_save, 5000}]}]}]);
|
||||||
|
|
||||||
mnesia(copy) ->
|
mnesia(copy) ->
|
||||||
|
ok = emqttd_mnesia:copy_table(retained_message),
|
||||||
ok = emqttd_mnesia:copy_table(backend_subscription).
|
ok = emqttd_mnesia:copy_table(backend_subscription).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Retained Message
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(retain_message(mqtt_message()) -> ok).
|
||||||
|
retain_message(Msg) when is_record(Msg, mqtt_message) ->
|
||||||
|
mnesia:dirty_write(retained_message, Msg).
|
||||||
|
|
||||||
|
-spec(read_messages(binary()) -> [mqtt_message()]).
|
||||||
|
read_messages(Topic) ->
|
||||||
|
mnesia:dirty_index_read(retained_message, Topic, #mqtt_message.topic).
|
||||||
|
|
||||||
|
-spec(match_messages(binary()) -> [mqtt_message()]).
|
||||||
|
match_messages(Filter) ->
|
||||||
|
%% TODO: optimize later...
|
||||||
|
Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) ->
|
||||||
|
case emqttd_topic:match(Name, Filter) of
|
||||||
|
true -> [Msg|Acc];
|
||||||
|
false -> Acc
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]).
|
||||||
|
|
||||||
|
-spec(delete_message(binary()) -> ok).
|
||||||
|
delete_message(Topic) ->
|
||||||
|
%%TODO: no transaction???
|
||||||
|
[mnesia:dirty_delete_object(retained_message, Msg) || Msg <- read_messages(Topic)].
|
||||||
|
|
||||||
|
-spec(expire_messages(pos_integer()) -> any()).
|
||||||
|
expire_messages(Time) when is_integer(Time) ->
|
||||||
|
mnesia:transaction(
|
||||||
|
fun() ->
|
||||||
|
Match = ets:fun2ms(
|
||||||
|
fun(#mqtt_message{msgid = MsgId, timestamp = {MegaSecs, Secs, _}})
|
||||||
|
when Time > (MegaSecs * 1000000 + Secs) -> MsgId
|
||||||
|
end),
|
||||||
|
MsgIds = mnesia:select(retained_message, Match, write),
|
||||||
|
lists:foreach(fun(MsgId) -> mnesia:delete({retained_message, MsgId}) end, MsgIds)
|
||||||
|
end).
|
||||||
|
|
||||||
|
-spec(retained_count() -> non_neg_integer()).
|
||||||
|
retained_count() ->
|
||||||
|
mnesia:table_info(retained_message, size).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Static Subscriptions
|
%% Static Subscriptions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Add a static subscription manually.
|
%% @doc Add a static subscription manually.
|
||||||
-spec add_subscription(mqtt_subscription()) -> ok | {error, already_existed}.
|
-spec(add_subscription(mqtt_subscription()) -> ok | {error, already_existed}).
|
||||||
add_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) ->
|
add_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) ->
|
||||||
Pattern = match_pattern(SubId, Topic),
|
Pattern = match_pattern(SubId, Topic),
|
||||||
return(mnesia:transaction(fun() ->
|
return(mnesia:transaction(fun() ->
|
||||||
|
@ -89,3 +148,4 @@ match_pattern(SubId, Topic) ->
|
||||||
|
|
||||||
return({atomic, ok}) -> ok;
|
return({atomic, ok}) -> ok;
|
||||||
return({aborted, Reason}) -> {error, Reason}.
|
return({aborted, Reason}) -> {error, Reason}.
|
||||||
|
|
||||||
|
|
|
@ -23,66 +23,41 @@
|
||||||
|
|
||||||
-include("emqttd_internal.hrl").
|
-include("emqttd_internal.hrl").
|
||||||
|
|
||||||
-include_lib("stdlib/include/ms_transform.hrl").
|
|
||||||
|
|
||||||
%% Mnesia callbacks
|
|
||||||
-export([mnesia/1]).
|
|
||||||
|
|
||||||
-boot_mnesia({mnesia, [boot]}).
|
|
||||||
-copy_mnesia({mnesia, [copy]}).
|
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([retain/1, dispatch/2]).
|
-export([retain/1, dispatch/2]).
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/0, expire/1]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-record(mqtt_retained, {topic, message}).
|
|
||||||
|
|
||||||
-record(state, {stats_fun, expired_after, stats_timer, expire_timer}).
|
-record(state, {stats_fun, expired_after, stats_timer, expire_timer}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Mnesia callbacks
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
mnesia(boot) ->
|
|
||||||
ok = emqttd_mnesia:create_table(retained, [
|
|
||||||
{type, ordered_set},
|
|
||||||
{disc_copies, [node()]},
|
|
||||||
{record_name, mqtt_retained},
|
|
||||||
{attributes, record_info(fields, mqtt_retained)}]);
|
|
||||||
mnesia(copy) ->
|
|
||||||
ok = emqttd_mnesia:copy_table(retained).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Start a retained server
|
%% @doc Start the retainer
|
||||||
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
%% @doc Retain message
|
%% @doc Retain a message
|
||||||
-spec retain(mqtt_message()) -> ok | ignore.
|
-spec(retain(mqtt_message()) -> ok | ignore).
|
||||||
retain(#mqtt_message{retain = false}) -> ignore;
|
retain(#mqtt_message{retain = false}) -> ignore;
|
||||||
|
|
||||||
%% RETAIN flag set to 1 and payload containing zero bytes
|
%% RETAIN flag set to 1 and payload containing zero bytes
|
||||||
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
|
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
|
||||||
mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]);
|
emqttd_backend:delete_message(Topic);
|
||||||
|
|
||||||
retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) ->
|
retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) ->
|
||||||
TabSize = mnesia:table_info(retained, size),
|
TabSize = emqttd_backend:retained_count(),
|
||||||
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
||||||
{true, true} ->
|
{true, true} ->
|
||||||
Retained = #mqtt_retained{topic = Topic, message = Msg},
|
emqttd_backend:retain_message(Msg),
|
||||||
lager:debug("RETAIN ~s", [emqttd_message:format(Msg)]),
|
emqttd_metrics:set('messages/retained', emqttd_backend:retained_count());
|
||||||
mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]),
|
|
||||||
emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size));
|
|
||||||
{false, _}->
|
{false, _}->
|
||||||
lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
|
lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
|
||||||
{_, false}->
|
{_, false}->
|
||||||
|
@ -103,23 +78,12 @@ env(Key) ->
|
||||||
Val
|
Val
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Deliver retained messages to subscribed client
|
%% @doc Deliver retained messages to the subscriber
|
||||||
-spec dispatch(Topic, CPid) -> any() when
|
-spec(dispatch(Topic :: binary(), CPid :: pid()) -> any()).
|
||||||
Topic :: binary(),
|
|
||||||
CPid :: pid().
|
|
||||||
dispatch(Topic, CPid) when is_binary(Topic) ->
|
dispatch(Topic, CPid) when is_binary(Topic) ->
|
||||||
Msgs =
|
Msgs = case emqttd_topic:wildcard(Topic) of
|
||||||
case emqttd_topic:wildcard(Topic) of
|
false -> emqttd_backend:read_messages(Topic);
|
||||||
false ->
|
true -> emqttd_backend:match_messages(Topic)
|
||||||
[Msg || #mqtt_retained{message = Msg} <- mnesia:dirty_read(retained, Topic)];
|
|
||||||
true ->
|
|
||||||
Fun = fun(#mqtt_retained{topic = Name, message = Msg}, Acc) ->
|
|
||||||
case emqttd_topic:match(Name, Topic) of
|
|
||||||
true -> [Msg|Acc];
|
|
||||||
false -> Acc
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained])
|
|
||||||
end,
|
end,
|
||||||
lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)).
|
lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)).
|
||||||
|
|
||||||
|
@ -145,7 +109,7 @@ handle_cast(Msg, State) ->
|
||||||
?UNEXPECTED_MSG(Msg, State).
|
?UNEXPECTED_MSG(Msg, State).
|
||||||
|
|
||||||
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
||||||
StatsFun(mnesia:table_info(retained, size)),
|
StatsFun(emqttd_backend:retained_count()),
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(expire, State = #state{expired_after = Never})
|
handle_info(expire, State = #state{expired_after = Never})
|
||||||
|
@ -153,7 +117,7 @@ handle_info(expire, State = #state{expired_after = Never})
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
|
handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
|
||||||
expire(emqttd_time:now_to_secs() - ExpiredAfter),
|
emqttd_backend:expire_messages(emqttd_time:now_to_secs() - ExpiredAfter),
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
@ -166,18 +130,3 @@ terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) -
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
expire(Time) ->
|
|
||||||
mnesia:async_dirty(
|
|
||||||
fun() ->
|
|
||||||
Match = ets:fun2ms(
|
|
||||||
fun(#mqtt_retained{topic = Topic, message = #mqtt_message{timestamp = {MegaSecs, Secs, _}}})
|
|
||||||
when Time > (MegaSecs * 1000000 + Secs) -> Topic
|
|
||||||
end),
|
|
||||||
Topics = mnesia:select(retained, Match, write),
|
|
||||||
lists:foreach(fun(Topic) -> mnesia:delete({retained, Topic}) end, Topics)
|
|
||||||
end).
|
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ all() ->
|
||||||
{group, metrics},
|
{group, metrics},
|
||||||
{group, stats},
|
{group, stats},
|
||||||
{group, hook},
|
{group, hook},
|
||||||
|
{group, backend},
|
||||||
{group, cli}].
|
{group, cli}].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
|
@ -44,8 +45,6 @@ groups() ->
|
||||||
router_unused]},
|
router_unused]},
|
||||||
{session, [sequence],
|
{session, [sequence],
|
||||||
[start_session]},
|
[start_session]},
|
||||||
{retainer, [sequence],
|
|
||||||
[retain_message]},
|
|
||||||
{broker, [sequence],
|
{broker, [sequence],
|
||||||
[hook_unhook]},
|
[hook_unhook]},
|
||||||
{metrics, [sequence],
|
{metrics, [sequence],
|
||||||
|
@ -55,6 +54,12 @@ groups() ->
|
||||||
{hook, [sequence],
|
{hook, [sequence],
|
||||||
[add_delete_hook,
|
[add_delete_hook,
|
||||||
run_hooks]},
|
run_hooks]},
|
||||||
|
{retainer, [sequence],
|
||||||
|
[retain_messages,
|
||||||
|
dispatch_retained_messages,
|
||||||
|
expire_retained_messages]},
|
||||||
|
{backend, [sequence],
|
||||||
|
[backend_subscription]},
|
||||||
{cli, [sequence],
|
{cli, [sequence],
|
||||||
[ctl_register_cmd,
|
[ctl_register_cmd,
|
||||||
cli_status,
|
cli_status,
|
||||||
|
@ -207,19 +212,6 @@ start_session(_) ->
|
||||||
emqttd_session:unsubscribe(SessPid, [<<"topic/session">>]),
|
emqttd_session:unsubscribe(SessPid, [<<"topic/session">>]),
|
||||||
emqttd_mock_client:stop(ClientPid).
|
emqttd_mock_client:stop(ClientPid).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Retainer Group
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
retain_message(_) ->
|
|
||||||
Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>,
|
|
||||||
payload = <<"payload">>},
|
|
||||||
emqttd_retainer:retain(Msg),
|
|
||||||
emqttd_retainer:dispatch(<<"a/b/+">>, self()),
|
|
||||||
true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end,
|
|
||||||
emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
|
|
||||||
[] = mnesia:dirty_read({retained, <<"a/b/c">>}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Broker Group
|
%% Broker Group
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -280,6 +272,51 @@ hook_fun3(arg1, arg2, _Acc, init) -> ok.
|
||||||
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}.
|
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}.
|
||||||
hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}.
|
hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Retainer Test
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
retain_messages(_) ->
|
||||||
|
Msg = emqttd_message:make(<<"clientId">>, <<"topic">>, <<"payload">>),
|
||||||
|
emqttd_backend:retain_message(Msg),
|
||||||
|
[Msg] = emqttd_backend:read_messages(<<"topic">>),
|
||||||
|
[Msg] = emqttd_backend:match_messages(<<"topic/#">>),
|
||||||
|
emqttd_backend:delete_message(<<"topic">>),
|
||||||
|
0 = emqttd_backend:retained_count().
|
||||||
|
|
||||||
|
dispatch_retained_messages(_) ->
|
||||||
|
Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>,
|
||||||
|
payload = <<"payload">>},
|
||||||
|
emqttd_retainer:retain(Msg),
|
||||||
|
emqttd_retainer:dispatch(<<"a/b/+">>, self()),
|
||||||
|
true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end,
|
||||||
|
emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
|
||||||
|
[] = emqttd_backend:read_messages(<<"a/b/c">>).
|
||||||
|
|
||||||
|
expire_retained_messages(_) ->
|
||||||
|
Msg1 = emqttd_message:make(<<"clientId1">>, qos1, <<"topic/1">>, <<"payload1">>),
|
||||||
|
Msg2 = emqttd_message:make(<<"clientId2">>, qos2, <<"topic/2">>, <<"payload2">>),
|
||||||
|
emqttd_backend:retain_message(Msg1),
|
||||||
|
emqttd_backend:retain_message(Msg2),
|
||||||
|
timer:sleep(2000),
|
||||||
|
emqttd_backend:expire_messages(emqttd_time:now_to_secs()),
|
||||||
|
0 = emqttd_backend:retained_count().
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Backend Test
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
backend_subscription(_) ->
|
||||||
|
Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2},
|
||||||
|
Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"#">>, qos = 2},
|
||||||
|
emqttd_backend:add_subscription(Sub1),
|
||||||
|
emqttd_backend:add_subscription(Sub2),
|
||||||
|
[Sub1, Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
|
||||||
|
emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>),
|
||||||
|
[Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
|
||||||
|
emqttd_backend:del_subscriptions(<<"clientId">>),
|
||||||
|
[] = emqttd_backend:lookup_subscriptions(<<"clientId">>).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% CLI Group
|
%% CLI Group
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -16,21 +16,30 @@
|
||||||
|
|
||||||
-module(emqttd_backend_SUITE).
|
-module(emqttd_backend_SUITE).
|
||||||
|
|
||||||
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
all() -> [{group, retainer}].
|
all() -> [{group, subscription}].
|
||||||
|
|
||||||
groups() -> [{retainer, [], [t_retain]}].
|
groups() -> [{subscription, [], [add_del_subscription]}].
|
||||||
|
|
||||||
init_per_group(retainer, _Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqttd_mnesia:ensure_started(),
|
ok = emqttd_mnesia:ensure_started(),
|
||||||
emqttd_retainer:mnesia(boot),
|
emqttd_backend:mnesia(boot),
|
||||||
emqttd_retainer:mnesia(copy).
|
emqttd_backend:mnesia(copy),
|
||||||
|
Config.
|
||||||
|
|
||||||
end_per_group(retainer, _Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok;
|
emqttd_mnesia:ensure_stopped().
|
||||||
end_per_group(_Group, _Config) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_retain(_) -> ok.
|
add_del_subscription(_) ->
|
||||||
|
Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2},
|
||||||
|
Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 1},
|
||||||
|
ok = emqttd_backend:add_subscription(Sub1),
|
||||||
|
{error, already_existed} = emqttd_backend:add_subscription(Sub1),
|
||||||
|
ok = emqttd_backend:add_subscription(Sub2),
|
||||||
|
[Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>),
|
||||||
|
emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>),
|
||||||
|
[] = emqttd_backend:lookup_subscriptions(<<"clientId">>).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue