backend_subscription

This commit is contained in:
Feng 2016-03-08 13:50:41 +08:00
parent 0f1347a495
commit f1f58818d5
3 changed files with 33 additions and 33 deletions

View File

@ -25,15 +25,15 @@
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
%% API. %% API.
-export([add_static_subscription/1, lookup_static_subscriptions/1, -export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1,
del_static_subscriptions/1, del_static_subscription/2]). del_subscription/2]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia callbacks %% Mnesia callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = emqttd_mnesia:create_table(static_subscription, [ ok = emqttd_mnesia:create_table(backend_subscription, [
{type, bag}, {type, bag},
{disc_copies, [node()]}, {disc_copies, [node()]},
{record_name, mqtt_subscription}, {record_name, mqtt_subscription},
@ -42,48 +42,48 @@ mnesia(boot) ->
{dets, [{auto_save, 5000}]}]}]); {dets, [{auto_save, 5000}]}]}]);
mnesia(copy) -> mnesia(copy) ->
ok = emqttd_mnesia:copy_table(static_subscription). ok = emqttd_mnesia:copy_table(backend_subscription).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Static Subscriptions %% Static Subscriptions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Add a static subscription manually. %% @doc Add a static subscription manually.
-spec add_static_subscription(mqtt_subscription()) -> {atom, ok}. -spec add_subscription(mqtt_subscription()) -> {atom, ok}.
add_static_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) -> add_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) ->
Pattern = match_pattern(SubId, Topic), Pattern = match_pattern(SubId, Topic),
mnesia:transaction( mnesia:transaction(
fun() -> fun() ->
case mnesia:match_object(static_subscription, Pattern, write) of case mnesia:match_object(backend_subscription, Pattern, write) of
[] -> [] ->
mnesia:write(static_subscription, Subscription, write); mnesia:write(backend_subscription, Subscription, write);
[Subscription] -> [Subscription] ->
mnesia:abort({error, existed}); mnesia:abort({error, existed});
[Subscription1] -> %% QoS is different [Subscription1] -> %% QoS is different
mnesia:delete_object(static_subscription, Subscription1, write), mnesia:delete_object(backend_subscription, Subscription1, write),
mnesia:write(static_subscription, Subscription, write) mnesia:write(backend_subscription, Subscription, write)
end end
end). end).
%% @doc Lookup static subscriptions. %% @doc Lookup static subscriptions.
-spec lookup_static_subscriptions(binary()) -> list(mqtt_subscription()). -spec lookup_subscriptions(binary()) -> list(mqtt_subscription()).
lookup_static_subscriptions(ClientId) when is_binary(ClientId) -> lookup_subscriptions(ClientId) when is_binary(ClientId) ->
mnesia:dirty_read(static_subscription, ClientId). mnesia:dirty_read(backend_subscription, ClientId).
%% @doc Delete static subscriptions by ClientId manually. %% @doc Delete static subscriptions by ClientId manually.
-spec del_static_subscriptions(binary()) -> ok. -spec del_subscriptions(binary()) -> ok.
del_static_subscriptions(ClientId) when is_binary(ClientId) -> del_subscriptions(ClientId) when is_binary(ClientId) ->
mnesia:transaction(fun mnesia:delete/1, [{static_subscription, ClientId}]). mnesia:transaction(fun mnesia:delete/1, [{backend_subscription, ClientId}]).
%% @doc Delete a static subscription manually. %% @doc Delete a static subscription manually.
-spec del_static_subscription(binary(), binary()) -> ok. -spec del_subscription(binary(), binary()) -> ok.
del_static_subscription(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) -> del_subscription(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) ->
mnesia:transaction(fun del_static_subscription_/1, [match_pattern(ClientId, Topic)]). mnesia:transaction(fun del_subscription_/1, [match_pattern(ClientId, Topic)]).
del_static_subscription_(Pattern) -> del_subscription_(Pattern) ->
lists:foreach(fun(Subscription) -> lists:foreach(fun(Subscription) ->
mnesia:delete_object(static_subscription, Subscription, write) mnesia:delete_object(backend_subscription, Subscription, write)
end, mnesia:match_object(static_subscription, Pattern, write)). end, mnesia:match_object(backend_subscription, Pattern, write)).
match_pattern(SubId, Topic) -> match_pattern(SubId, Topic) ->
#mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}. #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}.

View File

@ -228,7 +228,7 @@ subscriptions(["list"]) ->
subscriptions(["list", "static"]) -> subscriptions(["list", "static"]) ->
Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end, Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end,
if_could_print(static_subscription, Print); if_could_print(backend_subscription, Print);
subscriptions(["show", ClientId]) -> subscriptions(["show", ClientId]) ->
case mnesia:dirty_read(subscription, bin(ClientId)) of case mnesia:dirty_read(subscription, bin(ClientId)) of
@ -241,7 +241,7 @@ subscriptions(["add", ClientId, Topic, QoS]) ->
Subscription = #mqtt_subscription{subid = bin(ClientId), Subscription = #mqtt_subscription{subid = bin(ClientId),
topic = bin(Topic), topic = bin(Topic),
qos = IntQos}, qos = IntQos},
case emqttd_backend:add_static_subscription(Subscription) of case emqttd_backend:add_subscription(Subscription) of
{atomic, ok} -> {atomic, ok} ->
?PRINT_MSG("ok~n"); ?PRINT_MSG("ok~n");
{aborted, {error, existed}} -> {aborted, {error, existed}} ->
@ -253,11 +253,11 @@ subscriptions(["add", ClientId, Topic, QoS]) ->
if_valid_qos(QoS, Add); if_valid_qos(QoS, Add);
subscriptions(["del", ClientId]) -> subscriptions(["del", ClientId]) ->
Ok = emqttd_backend:del_static_subscriptions(bin(ClientId)), Ok = emqttd_backend:del_subscriptions(bin(ClientId)),
?PRINT("~p~n", [Ok]); ?PRINT("~p~n", [Ok]);
subscriptions(["del", ClientId, Topic]) -> subscriptions(["del", ClientId, Topic]) ->
Ok = emqttd_backend:del_static_subscription(bin(ClientId), bin(Topic)), Ok = emqttd_backend:del_subscription(bin(ClientId), bin(Topic)),
?PRINT("~p~n", [Ok]); ?PRINT("~p~n", [Ok]);
subscriptions(_) -> subscriptions(_) ->

View File

@ -25,11 +25,11 @@
-export([load/1, client_connected/3, unload/1]). -export([load/1, client_connected/3, unload/1]).
-record(state, {topics, stored = false}). -record(state, {topics, backend = false}).
load(Opts) -> load(Opts) ->
Topics = [{iolist_to_binary(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)], Topics = [{iolist_to_binary(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)],
State = #state{topics = Topics, stored = lists:member(stored, Opts)}, State = #state{topics = Topics, backend = lists:member(backend, Opts)},
emqttd_broker:hook('client.connected', {?MODULE, client_connected}, emqttd_broker:hook('client.connected', {?MODULE, client_connected},
{?MODULE, client_connected, [State]}), {?MODULE, client_connected, [State]}),
ok. ok.
@ -37,18 +37,18 @@ load(Opts) ->
client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId,
client_pid = ClientPid, client_pid = ClientPid,
username = Username}, username = Username},
#state{topics = Topics, stored = Stored}) -> #state{topics = Topics, backend = Backend}) ->
Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end, Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end,
TopicTable = with_stored(Stored, ClientId, [{Replace(Topic), Qos} || {Topic, Qos} <- Topics]), TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- with_backend(Backend, ClientId, Topics)],
emqttd_client:subscribe(ClientPid, TopicTable); emqttd_client:subscribe(ClientPid, TopicTable);
client_connected(_ConnAck, _Client, _State) -> ok. client_connected(_ConnAck, _Client, _State) -> ok.
with_stored(false, _ClientId, TopicTable) -> with_backend(false, _ClientId, TopicTable) ->
TopicTable; TopicTable;
with_stored(true, ClientId, TopicTable) -> with_backend(true, ClientId, TopicTable) ->
Fun = fun(#mqtt_subscription{topic = Topic, qos = Qos}) -> {Topic, Qos} end, Fun = fun(#mqtt_subscription{topic = Topic, qos = Qos}) -> {Topic, Qos} end,
emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_pubsub:lookup(subscription, ClientId)], TopicTable). emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_backend:lookup_subscriptions(ClientId)], TopicTable).
unload(_Opts) -> unload(_Opts) ->
emqttd_broker:unhook('client.connected', {?MODULE, client_connected}). emqttd_broker:unhook('client.connected', {?MODULE, client_connected}).