diff --git a/src/emqttd_backend.erl b/src/emqttd_backend.erl index 39c1a8c2d..338a6097f 100644 --- a/src/emqttd_backend.erl +++ b/src/emqttd_backend.erl @@ -25,15 +25,15 @@ -copy_mnesia({mnesia, [copy]}). %% API. --export([add_static_subscription/1, lookup_static_subscriptions/1, - del_static_subscriptions/1, del_static_subscription/2]). +-export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1, + del_subscription/2]). %%-------------------------------------------------------------------- %% Mnesia callbacks %%-------------------------------------------------------------------- mnesia(boot) -> - ok = emqttd_mnesia:create_table(static_subscription, [ + ok = emqttd_mnesia:create_table(backend_subscription, [ {type, bag}, {disc_copies, [node()]}, {record_name, mqtt_subscription}, @@ -42,48 +42,48 @@ mnesia(boot) -> {dets, [{auto_save, 5000}]}]}]); mnesia(copy) -> - ok = emqttd_mnesia:copy_table(static_subscription). + ok = emqttd_mnesia:copy_table(backend_subscription). %%-------------------------------------------------------------------- %% Static Subscriptions %%-------------------------------------------------------------------- %% @doc Add a static subscription manually. --spec add_static_subscription(mqtt_subscription()) -> {atom, ok}. -add_static_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) -> +-spec add_subscription(mqtt_subscription()) -> {atom, ok}. +add_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) -> Pattern = match_pattern(SubId, Topic), mnesia:transaction( fun() -> - case mnesia:match_object(static_subscription, Pattern, write) of + case mnesia:match_object(backend_subscription, Pattern, write) of [] -> - mnesia:write(static_subscription, Subscription, write); + mnesia:write(backend_subscription, Subscription, write); [Subscription] -> mnesia:abort({error, existed}); [Subscription1] -> %% QoS is different - mnesia:delete_object(static_subscription, Subscription1, write), - mnesia:write(static_subscription, Subscription, write) + mnesia:delete_object(backend_subscription, Subscription1, write), + mnesia:write(backend_subscription, Subscription, write) end end). %% @doc Lookup static subscriptions. --spec lookup_static_subscriptions(binary()) -> list(mqtt_subscription()). -lookup_static_subscriptions(ClientId) when is_binary(ClientId) -> - mnesia:dirty_read(static_subscription, ClientId). +-spec lookup_subscriptions(binary()) -> list(mqtt_subscription()). +lookup_subscriptions(ClientId) when is_binary(ClientId) -> + mnesia:dirty_read(backend_subscription, ClientId). %% @doc Delete static subscriptions by ClientId manually. --spec del_static_subscriptions(binary()) -> ok. -del_static_subscriptions(ClientId) when is_binary(ClientId) -> - mnesia:transaction(fun mnesia:delete/1, [{static_subscription, ClientId}]). +-spec del_subscriptions(binary()) -> ok. +del_subscriptions(ClientId) when is_binary(ClientId) -> + mnesia:transaction(fun mnesia:delete/1, [{backend_subscription, ClientId}]). %% @doc Delete a static subscription manually. --spec del_static_subscription(binary(), binary()) -> ok. -del_static_subscription(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) -> - mnesia:transaction(fun del_static_subscription_/1, [match_pattern(ClientId, Topic)]). +-spec del_subscription(binary(), binary()) -> ok. +del_subscription(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) -> + mnesia:transaction(fun del_subscription_/1, [match_pattern(ClientId, Topic)]). -del_static_subscription_(Pattern) -> +del_subscription_(Pattern) -> lists:foreach(fun(Subscription) -> - mnesia:delete_object(static_subscription, Subscription, write) - end, mnesia:match_object(static_subscription, Pattern, write)). + mnesia:delete_object(backend_subscription, Subscription, write) + end, mnesia:match_object(backend_subscription, Pattern, write)). match_pattern(SubId, Topic) -> #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}. diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 6bfaf104b..17225ac4a 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -228,7 +228,7 @@ subscriptions(["list"]) -> subscriptions(["list", "static"]) -> Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end, - if_could_print(static_subscription, Print); + if_could_print(backend_subscription, Print); subscriptions(["show", ClientId]) -> case mnesia:dirty_read(subscription, bin(ClientId)) of @@ -241,7 +241,7 @@ subscriptions(["add", ClientId, Topic, QoS]) -> Subscription = #mqtt_subscription{subid = bin(ClientId), topic = bin(Topic), qos = IntQos}, - case emqttd_backend:add_static_subscription(Subscription) of + case emqttd_backend:add_subscription(Subscription) of {atomic, ok} -> ?PRINT_MSG("ok~n"); {aborted, {error, existed}} -> @@ -253,11 +253,11 @@ subscriptions(["add", ClientId, Topic, QoS]) -> if_valid_qos(QoS, Add); subscriptions(["del", ClientId]) -> - Ok = emqttd_backend:del_static_subscriptions(bin(ClientId)), + Ok = emqttd_backend:del_subscriptions(bin(ClientId)), ?PRINT("~p~n", [Ok]); subscriptions(["del", ClientId, Topic]) -> - Ok = emqttd_backend:del_static_subscription(bin(ClientId), bin(Topic)), + Ok = emqttd_backend:del_subscription(bin(ClientId), bin(Topic)), ?PRINT("~p~n", [Ok]); subscriptions(_) -> diff --git a/src/emqttd_mod_subscription.erl b/src/emqttd_mod_subscription.erl index 098fc592d..b8d31b436 100644 --- a/src/emqttd_mod_subscription.erl +++ b/src/emqttd_mod_subscription.erl @@ -25,11 +25,11 @@ -export([load/1, client_connected/3, unload/1]). --record(state, {topics, stored = false}). +-record(state, {topics, backend = false}). load(Opts) -> Topics = [{iolist_to_binary(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)], - State = #state{topics = Topics, stored = lists:member(stored, Opts)}, + State = #state{topics = Topics, backend = lists:member(backend, Opts)}, emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [State]}), ok. @@ -37,18 +37,18 @@ load(Opts) -> client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid, username = Username}, - #state{topics = Topics, stored = Stored}) -> + #state{topics = Topics, backend = Backend}) -> Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end, - TopicTable = with_stored(Stored, ClientId, [{Replace(Topic), Qos} || {Topic, Qos} <- Topics]), + TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- with_backend(Backend, ClientId, Topics)], emqttd_client:subscribe(ClientPid, TopicTable); client_connected(_ConnAck, _Client, _State) -> ok. -with_stored(false, _ClientId, TopicTable) -> +with_backend(false, _ClientId, TopicTable) -> TopicTable; -with_stored(true, ClientId, TopicTable) -> +with_backend(true, ClientId, TopicTable) -> Fun = fun(#mqtt_subscription{topic = Topic, qos = Qos}) -> {Topic, Qos} end, - emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_pubsub:lookup(subscription, ClientId)], TopicTable). + emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_backend:lookup_subscriptions(ClientId)], TopicTable). unload(_Opts) -> emqttd_broker:unhook('client.connected', {?MODULE, client_connected}).