feat(exclusive): refactor the implementation
This commit is contained in:
parent
6599b71b5e
commit
7084e510d6
|
@ -1,183 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_mod_exclusive).
|
|
||||||
|
|
||||||
-behaviour(emqx_gen_mod).
|
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
|
|
||||||
-logger_header("[exclusive]").
|
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
|
||||||
-export([mnesia/1]).
|
|
||||||
|
|
||||||
-boot_mnesia({mnesia, [boot]}).
|
|
||||||
-copy_mnesia({mnesia, [copy]}).
|
|
||||||
|
|
||||||
%% emqx_gen_mod callbacks
|
|
||||||
-export([
|
|
||||||
load/1,
|
|
||||||
unload/1,
|
|
||||||
description/0
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
exclusive_subscribe/3,
|
|
||||||
exclusive_unsubscribe/3
|
|
||||||
]).
|
|
||||||
|
|
||||||
-record(exclusive_subscription, {
|
|
||||||
topic :: emqx_types:topic(),
|
|
||||||
clientid :: emqx_types:clientid()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-define(TAB, emqx_exclusive_subscription).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Mnesia bootstrap
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
mnesia(boot) ->
|
|
||||||
StoreProps = [
|
|
||||||
{ets, [
|
|
||||||
{read_concurrency, true},
|
|
||||||
{write_concurrency, true}
|
|
||||||
]}
|
|
||||||
],
|
|
||||||
ok = ekka_mnesia:create_table(?TAB, [
|
|
||||||
{type, set},
|
|
||||||
{ram_copies, [node()]},
|
|
||||||
{record_name, exclusive_subscription},
|
|
||||||
{attributes, record_info(fields, exclusive_subscription)},
|
|
||||||
{storage_properties, StoreProps}
|
|
||||||
]);
|
|
||||||
mnesia(copy) ->
|
|
||||||
ok = ekka_mnesia:copy_table(?TAB, ram_copies).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Load/Unload
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec load(list()) -> ok.
|
|
||||||
load(_Env) ->
|
|
||||||
emqx_hooks:put('client.subscribe', {?MODULE, exclusive_subscribe, []}),
|
|
||||||
emqx_hooks:put('client.unsubscribe', {?MODULE, exclusive_unsubscribe, []}).
|
|
||||||
|
|
||||||
-spec unload(list()) -> ok.
|
|
||||||
unload(_Env) ->
|
|
||||||
emqx_hooks:del('client.subscribe', {?MODULE, exclusive_subscribe}),
|
|
||||||
emqx_hooks:del('client.unsubscribe', {?MODULE, exclusive_unsubscribe}).
|
|
||||||
|
|
||||||
description() ->
|
|
||||||
"EMQ X Exclusive Subscription Module".
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Hooks
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
exclusive_subscribe(ClientInfo, _Prop, TopicFilters) ->
|
|
||||||
{ok, check_is_enabled(ClientInfo, TopicFilters, fun on_subscribe/2)}.
|
|
||||||
|
|
||||||
exclusive_unsubscribe(ClientInfo, _Prop, TopicFilters) ->
|
|
||||||
check_is_enabled(ClientInfo, TopicFilters, fun on_unsubscribe/2).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
check_is_enabled(#{zone := Zone} = ClientInfo, TopicFilters, Cont) ->
|
|
||||||
case emqx_zone:get_env(Zone, exclusive_subscription) of
|
|
||||||
false ->
|
|
||||||
TopicFilters;
|
|
||||||
_ ->
|
|
||||||
case lists:any(fun is_exclusive_subscribe/1, TopicFilters) of
|
|
||||||
false ->
|
|
||||||
TopicFilters;
|
|
||||||
_ ->
|
|
||||||
Cont(ClientInfo, TopicFilters)
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
on_subscribe(#{clientid := ClientId}, TopicFilters) ->
|
|
||||||
Fun = fun() ->
|
|
||||||
try_subscribe(ClientId, TopicFilters)
|
|
||||||
end,
|
|
||||||
case mnesia:transaction(Fun) of
|
|
||||||
{atomic, Res} ->
|
|
||||||
Res;
|
|
||||||
{aborted, Reason} ->
|
|
||||||
?LOG(warning, "Cannot check subscribe ~p due to ~p.", [TopicFilters, Reason]),
|
|
||||||
lists:map(
|
|
||||||
fun({Filter, SubOpts} = TopicFilter) ->
|
|
||||||
case is_exclusive_subscribe(Filter) of
|
|
||||||
false ->
|
|
||||||
TopicFilter;
|
|
||||||
_ ->
|
|
||||||
{Filter, SubOpts#{is_exclusive => true}}
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
TopicFilters
|
|
||||||
)
|
|
||||||
end.
|
|
||||||
|
|
||||||
try_subscribe(ClientId, TopicFilters) ->
|
|
||||||
try_subscribe(TopicFilters, ClientId, []).
|
|
||||||
|
|
||||||
try_subscribe([{<<"$exclusive/", _/binary>> = Topic, SubOpts} = TopicFilters | T], ClientId, Acc) ->
|
|
||||||
try_subscribe(
|
|
||||||
T,
|
|
||||||
ClientId,
|
|
||||||
case mnesia:wread({?TAB, Topic}) of
|
|
||||||
[] ->
|
|
||||||
mnesia:write(
|
|
||||||
?TAB,
|
|
||||||
#exclusive_subscription{
|
|
||||||
clientid = ClientId,
|
|
||||||
topic = Topic
|
|
||||||
},
|
|
||||||
write
|
|
||||||
),
|
|
||||||
[TopicFilters | Acc];
|
|
||||||
[_] ->
|
|
||||||
[{Topic, SubOpts#{is_exclusive => true}} | Acc]
|
|
||||||
end
|
|
||||||
);
|
|
||||||
try_subscribe([H | T], ClientId, Acc) ->
|
|
||||||
try_subscribe(T, ClientId, [H | Acc]);
|
|
||||||
try_subscribe([], _ClientId, Acc) ->
|
|
||||||
lists:reverse(Acc).
|
|
||||||
|
|
||||||
on_unsubscribe(#{clientid := ClientId}, TopicFilters) ->
|
|
||||||
_ = mnesia:transaction(fun() -> try_unsubscribe(TopicFilters, ClientId) end).
|
|
||||||
|
|
||||||
try_unsubscribe([{<<"$exclusive/", _/binary>> = Topic, _} | T], ClientId) ->
|
|
||||||
case mnesia:wread({?TAB, Topic}) of
|
|
||||||
[#exclusive_subscription{clientid = ClientId}] ->
|
|
||||||
mnesia:delete({?TAB, Topic});
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
try_unsubscribe(T, ClientId);
|
|
||||||
try_unsubscribe([H | T], ClientId) ->
|
|
||||||
try_unsubscribe(T, ClientId);
|
|
||||||
try_unsubscribe([], _) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
is_exclusive_subscribe({<<"$exclusive/", Rest/binary>>, _SubOpt}) when Rest =/= <<>> ->
|
|
||||||
true;
|
|
||||||
is_exclusive_subscribe(<<"$exclusive/", Rest/binary>>) when Rest =/= <<>> ->
|
|
||||||
true;
|
|
||||||
is_exclusive_subscribe(_) ->
|
|
||||||
false.
|
|
|
@ -180,7 +180,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) ->
|
||||||
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
||||||
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
||||||
Group = maps:get(share, SubOpts, undefined),
|
Group = maps:get(share, SubOpts, undefined),
|
||||||
do_unsubscribe(Group, Topic, SubPid, SubOpts).
|
do_unsubscribe(Group, Topic, SubPid, SubOpts),
|
||||||
|
emqx_exclusive_subscription:unsubscribe(Topic, SubOpts).
|
||||||
|
|
||||||
do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
|
do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
|
||||||
case maps:get(shard, SubOpts, 0) of
|
case maps:get(shard, SubOpts, 0) of
|
||||||
|
@ -498,4 +499,3 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -1483,8 +1483,8 @@ check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Check Sub Caps
|
%% Check Sub Caps
|
||||||
|
|
||||||
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) ->
|
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) ->
|
||||||
emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
|
emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Enrich SubId
|
%% Enrich SubId
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_exclusive_subscription).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-logger_header("[exclusive]").
|
||||||
|
|
||||||
|
%% Mnesia bootstrap
|
||||||
|
-export([mnesia/1]).
|
||||||
|
|
||||||
|
-boot_mnesia({mnesia, [boot]}).
|
||||||
|
-copy_mnesia({mnesia, [copy]}).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
check_subscribe/2,
|
||||||
|
unsubscribe/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-record(exclusive_subscription, {
|
||||||
|
topic :: emqx_types:topic(),
|
||||||
|
clientid :: emqx_types:clientid()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(TAB, emqx_exclusive_subscription).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Mnesia bootstrap
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
mnesia(boot) ->
|
||||||
|
StoreProps = [
|
||||||
|
{ets, [
|
||||||
|
{read_concurrency, true},
|
||||||
|
{write_concurrency, true}
|
||||||
|
]}
|
||||||
|
],
|
||||||
|
ok = ekka_mnesia:create_table(?TAB, [
|
||||||
|
{type, set},
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{record_name, exclusive_subscription},
|
||||||
|
{attributes, record_info(fields, exclusive_subscription)},
|
||||||
|
{storage_properties, StoreProps}
|
||||||
|
]);
|
||||||
|
mnesia(copy) ->
|
||||||
|
ok = ekka_mnesia:copy_table(?TAB, ram_copies).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% APIs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) ->
|
||||||
|
allow | deny.
|
||||||
|
check_subscribe(#{clientid := ClientId}, Topic) ->
|
||||||
|
Fun = fun() ->
|
||||||
|
try_subscribe(ClientId, Topic)
|
||||||
|
end,
|
||||||
|
case mnesia:transaction(Fun) of
|
||||||
|
{atomic, Res} ->
|
||||||
|
Res;
|
||||||
|
{aborted, Reason} ->
|
||||||
|
?LOG(warning, "Cannot check subscribe ~p due to ~p.", [Topic, Reason]),
|
||||||
|
deny
|
||||||
|
end.
|
||||||
|
|
||||||
|
unsubscribe(Topic, #{is_exclusive := true}) ->
|
||||||
|
_ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end),
|
||||||
|
ok;
|
||||||
|
unsubscribe(_Topic, _SubOpts) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
try_subscribe(Topic, ClientId) ->
|
||||||
|
case mnesia:wread({?TAB, Topic}) of
|
||||||
|
[] ->
|
||||||
|
mnesia:write(
|
||||||
|
?TAB,
|
||||||
|
#exclusive_subscription{
|
||||||
|
clientid = ClientId,
|
||||||
|
topic = Topic
|
||||||
|
},
|
||||||
|
write
|
||||||
|
),
|
||||||
|
allow;
|
||||||
|
[_] ->
|
||||||
|
deny
|
||||||
|
end.
|
|
@ -43,7 +43,7 @@
|
||||||
retain_available => boolean(),
|
retain_available => boolean(),
|
||||||
wildcard_subscription => boolean(),
|
wildcard_subscription => boolean(),
|
||||||
subscription_identifiers => boolean(),
|
subscription_identifiers => boolean(),
|
||||||
shared_subscription => boolean()
|
shared_subscription => boolean(),
|
||||||
exclusive_subscription => boolean()
|
exclusive_subscription => boolean()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -96,11 +96,11 @@ do_check_pub(#{retain := true}, #{retain_available := false}) ->
|
||||||
{error, ?RC_RETAIN_NOT_SUPPORTED};
|
{error, ?RC_RETAIN_NOT_SUPPORTED};
|
||||||
do_check_pub(_Flags, _Caps) -> ok.
|
do_check_pub(_Flags, _Caps) -> ok.
|
||||||
|
|
||||||
-spec(check_sub(emqx_types:zone(),
|
-spec(check_sub(emqx_types:clientinfo(),
|
||||||
emqx_types:topic(),
|
emqx_types:topic(),
|
||||||
emqx_types:subopts())
|
emqx_types:subopts())
|
||||||
-> ok_or_error(emqx_types:reason_code())).
|
-> ok_or_error(emqx_types:reason_code())).
|
||||||
check_sub(Zone, Topic, SubOpts) ->
|
check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) ->
|
||||||
Caps = get_caps(Zone, subscribe),
|
Caps = get_caps(Zone, subscribe),
|
||||||
Flags = lists:foldl(
|
Flags = lists:foldl(
|
||||||
fun(max_topic_levels, Map) ->
|
fun(max_topic_levels, Map) ->
|
||||||
|
@ -113,18 +113,25 @@ check_sub(Zone, Topic, SubOpts) ->
|
||||||
Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)};
|
Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)};
|
||||||
(_Key, Map) -> Map %% Ignore
|
(_Key, Map) -> Map %% Ignore
|
||||||
end, #{}, maps:keys(Caps)),
|
end, #{}, maps:keys(Caps)),
|
||||||
do_check_sub(Flags, Caps).
|
do_check_sub(Flags, Caps, ClientInfo, Topic).
|
||||||
|
|
||||||
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
|
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _)
|
||||||
when Limit > 0, Levels > Limit ->
|
when Limit > 0, Levels > Limit ->
|
||||||
{error, ?RC_TOPIC_FILTER_INVALID};
|
{error, ?RC_TOPIC_FILTER_INVALID};
|
||||||
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) ->
|
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) ->
|
||||||
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};
|
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};
|
||||||
do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
|
do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) ->
|
||||||
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
|
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
|
||||||
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}) ->
|
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) ->
|
||||||
{error, ?RC_QUOTA_EXCEEDED};
|
{error, ?RC_TOPIC_FILTER_INVALID};
|
||||||
do_check_sub(_Flags, _Caps) -> ok.
|
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) ->
|
||||||
|
case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of
|
||||||
|
deny ->
|
||||||
|
{error, ?RC_QUOTA_EXCEEDED};
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
do_check_sub(_Flags, _Caps, _, _) -> ok.
|
||||||
|
|
||||||
default_caps() ->
|
default_caps() ->
|
||||||
?DEFAULT_CAPS.
|
?DEFAULT_CAPS.
|
||||||
|
|
|
@ -216,6 +216,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
|
||||||
_ -> error({invalid_topic_filter, TopicFilter})
|
_ -> error({invalid_topic_filter, TopicFilter})
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) ->
|
||||||
|
case Topic of
|
||||||
|
<<>> ->
|
||||||
|
error({invalid_topic_filter, TopicFilter});
|
||||||
|
_ ->
|
||||||
|
{Topic, Options#{is_exclusive => true}}
|
||||||
|
end;
|
||||||
parse(TopicFilter, Options) ->
|
parse(TopicFilter, Options) ->
|
||||||
{TopicFilter, Options}.
|
{TopicFilter, Options}.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue