diff --git a/apps/emqx/src/bpapi/emqx_bpapi.erl b/apps/emqx/src/bpapi/emqx_bpapi.erl index 616462d94..07f12a2db 100644 --- a/apps/emqx/src/bpapi/emqx_bpapi.erl +++ b/apps/emqx/src/bpapi/emqx_bpapi.erl @@ -23,6 +23,11 @@ versions_file/1 ]). +%% Internal exports (RPC) +-export([ + announce_fun/1 +]). + -export_type([api/0, api_version/0, var_name/0, call/0, rpc/0, bpapi_meta/0]). -include("emqx.hrl"). @@ -77,7 +82,7 @@ supported_version(API) -> -spec announce(atom()) -> ok. announce(App) -> {ok, Data} = file:consult(?MODULE:versions_file(App)), - {atomic, ok} = mria:transaction(?COMMON_SHARD, fun announce_fun/1, [Data]), + {atomic, ok} = mria:transaction(?COMMON_SHARD, fun ?MODULE:announce_fun/1, [Data]), ok. -spec versions_file(atom()) -> file:filename_all(). diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index a9419b27e..a89063870 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -54,6 +54,12 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + create_activate_alarm/3, + do_get_alarms/0 +]). + -record(activated_alarm, { name :: binary() | atom(), details :: map() | list(), @@ -210,7 +216,7 @@ init([]) -> handle_call({activate_alarm, Name, Details, Message}, _From, State) -> Res = mria:transaction( mria:local_content_shard(), - fun create_activate_alarm/3, + fun ?MODULE:create_activate_alarm/3, [Name, Details, Message] ), case Res of @@ -234,15 +240,7 @@ handle_call(delete_all_deactivated_alarms, _From, State) -> handle_call({get_alarms, all}, _From, State) -> {atomic, Alarms} = mria:ro_transaction( - mria:local_content_shard(), - fun() -> - [ - normalize(Alarm) - || Alarm <- - ets:tab2list(?ACTIVATED_ALARM) ++ - ets:tab2list(?DEACTIVATED_ALARM) - ] - end + mria:local_content_shard(), fun ?MODULE:do_get_alarms/0 ), {reply, Alarms, State, get_validity_period()}; handle_call({get_alarms, activated}, _From, State) -> @@ -295,6 +293,14 @@ create_activate_alarm(Name, Details, Message) -> Alarm end. +do_get_alarms() -> + [ + normalize(Alarm) + || Alarm <- + ets:tab2list(?ACTIVATED_ALARM) ++ + ets:tab2list(?DEACTIVATED_ALARM) + ]. + deactivate_alarm( #activated_alarm{ activate_at = ActivateAt, diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 67fa283b0..cf81c735b 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -49,6 +49,11 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + expire_banned_items/1 +]). + -elvis([{elvis_style, state_record_and_type, disable}]). -define(BANNED_TAB, ?MODULE). @@ -224,7 +229,9 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - _ = mria:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), + _ = mria:transaction(?COMMON_SHARD, fun ?MODULE:expire_banned_items/1, [ + erlang:system_time(second) + ]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), diff --git a/apps/emqx/src/emqx_exclusive_subscription.erl b/apps/emqx/src/emqx_exclusive_subscription.erl index f419740d3..7a2d65472 100644 --- a/apps/emqx/src/emqx_exclusive_subscription.erl +++ b/apps/emqx/src/emqx_exclusive_subscription.erl @@ -35,6 +35,11 @@ unsubscribe/2 ]). +%% Internal exports (RPC) +-export([ + try_subscribe/2 +]). + -record(exclusive_subscription, { topic :: emqx_types:topic(), clientid :: emqx_types:clientid() @@ -80,10 +85,7 @@ on_delete_module() -> -spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) -> allow | deny. check_subscribe(#{clientid := ClientId}, Topic) -> - Fun = fun() -> - try_subscribe(ClientId, Topic) - end, - case mria:transaction(?EXCLUSIVE_SHARD, Fun) of + case mria:transaction(?EXCLUSIVE_SHARD, fun ?MODULE:try_subscribe/2, [ClientId, Topic]) of {atomic, Res} -> Res; {aborted, Reason} -> @@ -94,7 +96,7 @@ check_subscribe(#{clientid := ClientId}, Topic) -> end. unsubscribe(Topic, #{is_exclusive := true}) -> - _ = mria:transaction(?EXCLUSIVE_SHARD, fun() -> mnesia:delete({?TAB, Topic}) end), + _ = mria:transaction(?EXCLUSIVE_SHARD, fun mnesia:delete/1, [{?TAB, Topic}]), ok; unsubscribe(_Topic, _SubOpts) -> ok. diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 1340848ec..3f712bf4a 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -47,6 +47,11 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + cleanup_routes/1 +]). + -record(routing_node, {name, const = unused}). -define(ROUTE, emqx_route). @@ -145,7 +150,7 @@ handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> global:trans( {?LOCK, self()}, fun() -> - mria:transaction(?ROUTE_SHARD, fun cleanup_routes/1, [Node]) + mria:transaction(?ROUTE_SHARD, fun ?MODULE:cleanup_routes/1, [Node]) end ), ok = mria:dirty_delete(?ROUTING_NODE, Node), diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index bd9c16206..3d14dddd0 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -67,6 +67,11 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + init_monitors/0 +]). + -export_type([strategy/0]). -type strategy() :: @@ -336,7 +341,7 @@ subscribers(Group, Topic) -> init([]) -> ok = mria:wait_for_tables([?TAB]), {ok, _} = mnesia:subscribe({table, ?TAB, simple}), - {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), + {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0), ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), ok = emqx_tables:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [public, set, {write_concurrency, true}]), diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index e6ee4260f..e9866fb16 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -51,10 +51,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(TRACE, ?MODULE). --define(SHARD, ?COMMON_SHARD). --define(MAX_SIZE, 30). --define(OWN_KEYS, [level, filters, filter_default, handlers]). +-include("emqx_trace.hrl"). -ifdef(TEST). -export([ @@ -66,15 +63,6 @@ -export_type([ip_address/0]). -type ip_address() :: string(). --record(?TRACE, { - name :: binary() | undefined | '_', - type :: clientid | topic | ip_address | undefined | '_', - filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_', - enable = true :: boolean() | '_', - start_at :: integer() | undefined | '_', - end_at :: integer() | undefined | '_' -}). - publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when @@ -172,13 +160,7 @@ create(Trace) -> -spec delete(Name :: binary()) -> ok | {error, not_found}. delete(Name) -> - Tran = fun() -> - case mnesia:read(?TRACE, Name) of - [_] -> mnesia:delete(?TRACE, Name, write); - [] -> mnesia:abort(not_found) - end - end, - transaction(Tran). + transaction(fun emqx_trace_dl:delete/1, [Name]). -spec clear() -> ok | {error, Reason :: term()}. clear() -> @@ -190,20 +172,7 @@ clear() -> -spec update(Name :: binary(), Enable :: boolean()) -> ok | {error, not_found | finished}. update(Name, Enable) -> - Tran = fun() -> - case mnesia:read(?TRACE, Name) of - [] -> - mnesia:abort(not_found); - [#?TRACE{enable = Enable}] -> - ok; - [Rec] -> - case erlang:system_time(second) >= Rec#?TRACE.end_at of - false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write); - true -> mnesia:abort(finished) - end - end - end, - transaction(Tran). + transaction(fun emqx_trace_dl:update/2, [Name, Enable]). check() -> gen_server:call(?MODULE, check). @@ -211,13 +180,7 @@ check() -> -spec get_trace_filename(Name :: binary()) -> {ok, FileName :: string()} | {error, not_found}. get_trace_filename(Name) -> - Tran = fun() -> - case mnesia:read(?TRACE, Name, read) of - [] -> mnesia:abort(not_found); - [#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)} - end - end, - transaction(Tran). + transaction(fun emqx_trace_dl:get_trace_filename/1, [Name]). -spec trace_file(File :: file:filename_all()) -> {ok, Node :: list(), Binary :: binary()} @@ -309,23 +272,7 @@ code_change(_, State, _Extra) -> {ok, State}. insert_new_trace(Trace) -> - Tran = fun() -> - case mnesia:read(?TRACE, Trace#?TRACE.name) of - [] -> - #?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace, - Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter}, - case mnesia:match_object(?TRACE, Match, read) of - [] -> - ok = mnesia:write(?TRACE, Trace, write), - {ok, Trace}; - [#?TRACE{name = Name}] -> - mnesia:abort({duplicate_condition, Name}) - end; - [#?TRACE{name = Name}] -> - mnesia:abort({already_existed, Name}) - end - end, - transaction(Tran). + transaction(fun emqx_trace_dl:insert_new_trace/1, [Trace]). update_trace(Traces) -> Now = erlang:system_time(second), @@ -347,9 +294,7 @@ stop_all_trace_handler() -> get_enabled_trace() -> {atomic, Traces} = - mria:ro_transaction(?SHARD, fun() -> - mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read) - end), + mria:ro_transaction(?SHARD, fun emqx_trace_dl:get_enabled_trace/0), Traces. find_closest_time(Traces, Now) -> @@ -372,17 +317,7 @@ closest(Time, Now, Closest) -> min(Time - Now, Closest). disable_finished([]) -> ok; disable_finished(Traces) -> - transaction(fun() -> - lists:map( - fun(#?TRACE{name = Name}) -> - case mnesia:read(?TRACE, Name, write) of - [] -> ok; - [Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write) - end - end, - Traces - ) - end). + transaction(fun emqx_trace_dl:delete_finished/1, [Traces]). start_trace(Traces, Started0) -> Started = lists:map(fun(#{name := Name}) -> Name end, Started0), @@ -586,8 +521,8 @@ filename(Name, Start) -> [Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading), lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]). -transaction(Tran) -> - case mria:transaction(?COMMON_SHARD, Tran) of +transaction(Fun, Args) -> + case mria:transaction(?COMMON_SHARD, Fun, Args) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx/src/emqx_trace/emqx_trace.hrl b/apps/emqx/src/emqx_trace/emqx_trace.hrl new file mode 100644 index 000000000..a00a37132 --- /dev/null +++ b/apps/emqx/src/emqx_trace/emqx_trace.hrl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_TRACE_HRL). +-define(EMQX_TRACE_HRL, true). + +-define(TRACE, emqx_trace). + +-record(?TRACE, { + name :: binary() | undefined | '_', + type :: clientid | topic | ip_address | undefined | '_', + filter :: + emqx_types:topic() | emqx_types:clientid() | emqx_trace:ip_address() | undefined | '_', + enable = true :: boolean() | '_', + start_at :: integer() | undefined | '_', + end_at :: integer() | undefined | '_' +}). + +-define(SHARD, ?COMMON_SHARD). +-define(MAX_SIZE, 30). +-define(OWN_KEYS, [level, filters, filter_default, handlers]). + +-endif. diff --git a/apps/emqx/src/emqx_trace/emqx_trace_dl.erl b/apps/emqx/src/emqx_trace/emqx_trace_dl.erl new file mode 100644 index 000000000..3f96e1531 --- /dev/null +++ b/apps/emqx/src/emqx_trace/emqx_trace_dl.erl @@ -0,0 +1,103 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% Data layer for emqx_trace +-module(emqx_trace_dl). + +%% API: +-export([ + update/2, + insert_new_trace/1, + delete/1, + get_trace_filename/1, + delete_finished/1, + get_enabled_trace/0 +]). + +-include("emqx_trace.hrl"). + +%%================================================================================ +%% API funcions +%%================================================================================ + +%% Introduced in 5.0 +-spec update(Name :: binary(), Enable :: boolean()) -> + ok. +update(Name, Enable) -> + case mnesia:read(?TRACE, Name) of + [] -> + mnesia:abort(not_found); + [#?TRACE{enable = Enable}] -> + ok; + [Rec] -> + case erlang:system_time(second) >= Rec#?TRACE.end_at of + false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write); + true -> mnesia:abort(finished) + end + end. + +%% Introduced in 5.0 +insert_new_trace(Trace) -> + case mnesia:read(?TRACE, Trace#?TRACE.name) of + [] -> + #?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace, + Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter}, + case mnesia:match_object(?TRACE, Match, read) of + [] -> + ok = mnesia:write(?TRACE, Trace, write), + {ok, Trace}; + [#?TRACE{name = Name}] -> + mnesia:abort({duplicate_condition, Name}) + end; + [#?TRACE{name = Name}] -> + mnesia:abort({already_existed, Name}) + end. + +%% Introduced in 5.0 +-spec delete(Name :: binary()) -> ok. +delete(Name) -> + case mnesia:read(?TRACE, Name) of + [_] -> mnesia:delete(?TRACE, Name, write); + [] -> mnesia:abort(not_found) + end. + +%% Introduced in 5.0 +-spec get_trace_filename(Name :: binary()) -> {ok, string()}. +get_trace_filename(Name) -> + case mnesia:read(?TRACE, Name, read) of + [] -> mnesia:abort(not_found); + [#?TRACE{start_at = Start}] -> {ok, emqx_trace:filename(Name, Start)} + end. + +%% Introduced in 5.0 +delete_finished(Traces) -> + lists:map( + fun(#?TRACE{name = Name}) -> + case mnesia:read(?TRACE, Name, write) of + [] -> ok; + [Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write) + end + end, + Traces + ). + +%% Introduced in 5.0 +get_enabled_trace() -> + mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read). + +%%================================================================================ +%% Internal functions +%%================================================================================ diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index 70f07e0e3..875ec8b6c 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -72,7 +72,7 @@ end_per_testcase(TestCase, Config) when -> Slave = ?config(slave, Config), emqx_common_test_helpers:stop_slave(Slave), - mria:transaction(?ROUTE_SHARD, fun() -> mnesia:clear_table(?ROUTE_TAB) end), + mria:clear_table(?ROUTE_TAB), snabbkaffe:stop(), ok; end_per_testcase(_TestCase, _Config) ->