Merge pull request #5937 from k32/mria

feat(mria): Replace ekka_mnesia with mria
This commit is contained in:
k32 2021-10-20 13:15:23 +02:00 committed by GitHub
commit 8e58699ff5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
60 changed files with 269 additions and 356 deletions

View File

@ -15,7 +15,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.6"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.6"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}

View File

@ -5,7 +5,7 @@
{vsn, "5.0.0"}, % strict semver, bump manually! {vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy,lc]}, {applications, [kernel,stdlib,gproc,gen_rpc,mria,esockd,cowboy,sasl,os_mon,jiffy,lc]},
{mod, {emqx_app,[]}}, {mod, {emqx_app,[]}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -26,7 +26,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([post_config_update/4]). -export([post_config_update/4]).
@ -95,21 +94,18 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?ACTIVATED_ALARM, ok = mria:create_table(?ACTIVATED_ALARM,
[{type, set}, [{type, set},
{disc_copies, [node()]}, {storage, disc_copies},
{local_content, true}, {local_content, true},
{record_name, activated_alarm}, {record_name, activated_alarm},
{attributes, record_info(fields, activated_alarm)}]), {attributes, record_info(fields, activated_alarm)}]),
ok = ekka_mnesia:create_table(?DEACTIVATED_ALARM, ok = mria:create_table(?DEACTIVATED_ALARM,
[{type, ordered_set}, [{type, ordered_set},
{disc_copies, [node()]}, {storage, disc_copies},
{local_content, true}, {local_content, true},
{record_name, deactivated_alarm}, {record_name, deactivated_alarm},
{attributes, record_info(fields, deactivated_alarm)}]); {attributes, record_info(fields, deactivated_alarm)}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ACTIVATED_ALARM, disc_copies),
ok = ekka_mnesia:copy_table(?DEACTIVATED_ALARM, disc_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
@ -184,6 +180,7 @@ to_rfc3339(Timestamp) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
_ = mria:wait_for_tables([?ACTIVATED_ALARM, ?DEACTIVATED_ALARM]),
deactivate_all_alarms(), deactivate_all_alarms(),
ok = emqx_config_handler:add_handler([alarm], ?MODULE), ok = emqx_config_handler:add_handler([alarm], ?MODULE),
{ok, #state{timer = ensure_timer(undefined, get_validity_period())}}. {ok, #state{timer = ensure_timer(undefined, get_validity_period())}}.
@ -201,7 +198,7 @@ handle_call({activate_alarm, Name, Details}, _From, State) ->
details = Details, details = Details,
message = normalize_message(Name, Details), message = normalize_message(Name, Details),
activate_at = erlang:system_time(microsecond)}, activate_at = erlang:system_time(microsecond)},
ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), mria:dirty_write(?ACTIVATED_ALARM, Alarm),
do_actions(activate, Alarm, emqx:get_config([alarm, actions])), do_actions(activate, Alarm, emqx:get_config([alarm, actions])),
{reply, ok, State} {reply, ok, State}
end; end;
@ -221,7 +218,7 @@ handle_call(delete_all_deactivated_alarms, _From, State) ->
handle_call({get_alarms, all}, _From, State) -> handle_call({get_alarms, all}, _From, State) ->
{atomic, Alarms} = {atomic, Alarms} =
ekka_mnesia:ro_transaction( mria:ro_transaction(
?COMMON_SHARD, ?COMMON_SHARD,
fun() -> fun() ->
[normalize(Alarm) || [normalize(Alarm) ||
@ -282,7 +279,7 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name
case mnesia:dirty_first(?DEACTIVATED_ALARM) of case mnesia:dirty_first(?DEACTIVATED_ALARM) of
'$end_of_table' -> ok; '$end_of_table' -> ok;
ActivateAt2 -> ActivateAt2 ->
ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) mria:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
end; end;
false -> ok false -> ok
end, end,
@ -291,8 +288,8 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name
DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details,
normalize_message(Name, Details), normalize_message(Name, Details),
erlang:system_time(microsecond)), erlang:system_time(microsecond)),
ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), mria:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name), mria:dirty_delete(?ACTIVATED_ALARM, Name),
do_actions(deactivate, DeActAlarm, emqx:get_config([alarm, actions])). do_actions(deactivate, DeActAlarm, emqx:get_config([alarm, actions])).
make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) ->
@ -309,7 +306,7 @@ deactivate_all_alarms() ->
details = Details, details = Details,
message = Message, message = Message,
activate_at = ActivateAt}) -> activate_at = ActivateAt}) ->
ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, mria:dirty_write(?DEACTIVATED_ALARM,
#deactivated_alarm{ #deactivated_alarm{
activate_at = ActivateAt, activate_at = ActivateAt,
name = Name, name = Name,
@ -321,7 +318,7 @@ deactivate_all_alarms() ->
%% Delete all records from the given table, ignore result. %% Delete all records from the given table, ignore result.
clear_table(TableName) -> clear_table(TableName) ->
case ekka_mnesia:clear_table(TableName) of case mria:clear_table(TableName) of
{aborted, Reason} -> {aborted, Reason} ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "fail_to_clear_table", msg => "fail_to_clear_table",
@ -347,7 +344,7 @@ delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) ->
delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) -> delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) ->
case ActivatedAt =< Checkpoint of case ActivatedAt =< Checkpoint of
true -> true ->
ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), mria:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt),
NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt), NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt),
delete_expired_deactivated_alarms(NActivatedAt, Checkpoint); delete_expired_deactivated_alarms(NActivatedAt, Checkpoint);
false -> false ->

View File

@ -57,7 +57,7 @@ stop(_State) -> ok.
ensure_ekka_started() -> ensure_ekka_started() ->
ekka:start(), ekka:start(),
ok = ekka_rlog:wait_for_shards(?BOOT_SHARDS, infinity). ok = mria_rlog:wait_for_shards(?BOOT_SHARDS, infinity).
%% @doc Call this function to make emqx boot without loading config, %% @doc Call this function to make emqx boot without loading config,
%% in case we want to delegate the config load to a higher level app %% in case we want to delegate the config load to a higher level app

View File

@ -27,7 +27,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([start_link/0, stop/0]). -export([start_link/0, stop/0]).
@ -58,16 +57,13 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?BANNED_TAB, [ ok = mria:create_table(?BANNED_TAB, [
{type, set}, {type, set},
{rlog_shard, ?COMMON_SHARD}, {rlog_shard, ?COMMON_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, banned}, {record_name, banned},
{attributes, record_info(fields, banned)}, {attributes, record_info(fields, banned)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?BANNED_TAB, disc_copies).
%% @doc Start the banned server. %% @doc Start the banned server.
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).
@ -155,13 +151,13 @@ create(#{who := Who,
reason := Reason, reason := Reason,
at := At, at := At,
until := Until}) -> until := Until}) ->
ekka_mnesia:dirty_write(?BANNED_TAB, #banned{who = Who, mria:dirty_write(?BANNED_TAB, #banned{who = Who,
by = By, by = By,
reason = Reason, reason = Reason,
at = At, at = At,
until = Until}); until = Until});
create(Banned) when is_record(Banned, banned) -> create(Banned) when is_record(Banned, banned) ->
ekka_mnesia:dirty_write(?BANNED_TAB, Banned). mria:dirty_write(?BANNED_TAB, Banned).
look_up(Who) when is_map(Who) -> look_up(Who) when is_map(Who) ->
look_up(pares_who(Who)); look_up(pares_who(Who));
@ -174,7 +170,7 @@ look_up(Who) ->
delete(Who) when is_map(Who)-> delete(Who) when is_map(Who)->
delete(pares_who(Who)); delete(pares_who(Who));
delete(Who) -> delete(Who) ->
ekka_mnesia:dirty_delete(?BANNED_TAB, Who). mria:dirty_delete(?BANNED_TAB, Who).
info(InfoKey) -> info(InfoKey) ->
mnesia:table_info(?BANNED_TAB, InfoKey). mnesia:table_info(?BANNED_TAB, InfoKey).
@ -195,7 +191,7 @@ handle_cast(Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
ekka_mnesia:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), _ = mria:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]),
{noreply, ensure_expiry_timer(State), hibernate}; {noreply, ensure_expiry_timer(State), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->

View File

@ -71,7 +71,7 @@ register_channel(ClientId) when is_binary(ClientId) ->
register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of case is_enabled() of
true -> ekka_mnesia:dirty_write(?TAB, record(ClientId, ChanPid)); true -> mria:dirty_write(?TAB, record(ClientId, ChanPid));
false -> ok false -> ok
end. end.
@ -83,7 +83,7 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of case is_enabled() of
true -> ekka_mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid)); true -> mria:dirty_delete_object(?TAB, record(ClientId, ChanPid));
false -> ok false -> ok
end. end.
@ -100,16 +100,15 @@ record(ClientId, ChanPid) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, bag}, {type, bag},
{rlog_shard, ?CM_SHARD}, {rlog_shard, ?CM_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, channel}, {record_name, channel},
{attributes, record_info(fields, channel)}, {attributes, record_info(fields, channel)},
{storage_properties, [{ets, [{read_concurrency, true}, {storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]), {write_concurrency, true}]}]}]),
ok = ekka_mnesia:copy_table(?TAB, ram_copies), ok = mria_rlog:wait_for_shards([?CM_SHARD], infinity),
ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
ok = ekka:monitor(membership), ok = ekka:monitor(membership),
{ok, #{}}. {ok, #{}}.
@ -124,7 +123,7 @@ handle_cast(Msg, State) ->
handle_info({membership, {mnesia, down, Node}}, State) -> handle_info({membership, {mnesia, down, Node}}, State) ->
global:trans({?LOCK, self()}, global:trans({?LOCK, self()},
fun() -> fun() ->
ekka_mnesia:transaction(?CM_SHARD, fun cleanup_channels/1, [Node]) mria:transaction(?CM_SHARD, fun cleanup_channels/1, [Node])
end), end),
{noreply, State}; {noreply, State};

View File

@ -28,7 +28,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([start_link/2]). -export([start_link/2]).
@ -74,16 +73,14 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?ROUTE_TAB, [ ok = mria:create_table(?ROUTE_TAB, [
{type, bag}, {type, bag},
{rlog_shard, ?ROUTE_SHARD}, {rlog_shard, ?ROUTE_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, route}, {record_name, route},
{attributes, record_info(fields, route)}, {attributes, record_info(fields, route)},
{storage_properties, [{ets, [{read_concurrency, true}, {storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]); {write_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ROUTE_TAB, ram_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Start a router %% Start a router
@ -225,7 +222,7 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
insert_direct_route(Route) -> insert_direct_route(Route) ->
ekka_mnesia:dirty_write(?ROUTE_TAB, Route). mria:dirty_write(?ROUTE_TAB, Route).
insert_trie_route(Route = #route{topic = Topic}) -> insert_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of case mnesia:wread({?ROUTE_TAB, Topic}) of
@ -235,7 +232,7 @@ insert_trie_route(Route = #route{topic = Topic}) ->
mnesia:write(?ROUTE_TAB, Route, sticky_write). mnesia:write(?ROUTE_TAB, Route, sticky_write).
delete_direct_route(Route) -> delete_direct_route(Route) ->
ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route). mria:dirty_delete_object(?ROUTE_TAB, Route).
delete_trie_route(Route = #route{topic = Topic}) -> delete_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of case mnesia:wread({?ROUTE_TAB, Topic}) of
@ -255,7 +252,7 @@ maybe_trans(Fun, Args) ->
trans(Fun, Args); trans(Fun, Args);
global -> global ->
%% Assert: %% Assert:
mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash mnesia = mria_rlog:backend(), %% TODO: do something smarter than just crash
lock_router(), lock_router(),
try mnesia:sync_dirty(Fun, Args) try mnesia:sync_dirty(Fun, Args)
after after
@ -276,11 +273,11 @@ trans(Fun, Args) ->
{WPid, RefMon} = {WPid, RefMon} =
spawn_monitor( spawn_monitor(
%% NOTE: this is under the assumption that crashes in Fun %% NOTE: this is under the assumption that crashes in Fun
%% are caught by mnesia:transaction/2. %% are caught by mria:transaction/2.
%% Future changes should keep in mind that this process %% Future changes should keep in mind that this process
%% always exit with database write result. %% always exit with database write result.
fun() -> fun() ->
Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of Res = case mria:transaction(?ROUTE_SHARD, Fun, Args) of
{atomic, Ok} -> Ok; {atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end, end,

View File

@ -27,7 +27,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% API %% API
-export([ start_link/0 -export([ start_link/0
@ -59,16 +58,13 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?ROUTING_NODE, [ ok = mria:create_table(?ROUTING_NODE, [
{type, set}, {type, set},
{rlog_shard, ?ROUTE_SHARD}, {rlog_shard, ?ROUTE_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, routing_node}, {record_name, routing_node},
{attributes, record_info(fields, routing_node)}, {attributes, record_info(fields, routing_node)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ROUTING_NODE, ram_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
@ -87,7 +83,7 @@ monitor(Node) when is_atom(Node) ->
case ekka:is_member(Node) case ekka:is_member(Node)
orelse ets:member(?ROUTING_NODE, Node) of orelse ets:member(?ROUTING_NODE, Node) of
true -> ok; true -> ok;
false -> ekka_mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node}) false -> mria:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -136,9 +132,9 @@ handle_info({mnesia_table_event, Event}, State) ->
handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
global:trans({?LOCK, self()}, global:trans({?LOCK, self()},
fun() -> fun() ->
ekka_mnesia:transaction(fun cleanup_routes/1, [Node]) mria:transaction(?ROUTE_SHARD, fun cleanup_routes/1, [Node])
end), end),
ok = ekka_mnesia:dirty_delete(?ROUTING_NODE, Node), ok = mria:dirty_delete(?ROUTING_NODE, Node),
{noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate}; {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate};
handle_info({membership, {mnesia, down, Node}}, State) -> handle_info({membership, {mnesia, down, Node}}, State) ->

View File

@ -28,7 +28,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% APIs %% APIs
-export([start_link/0]). -export([start_link/0]).
@ -85,15 +84,12 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, bag}, {type, bag},
{rlog_shard, ?SHARED_SUB_SHARD}, {rlog_shard, ?SHARED_SUB_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, emqx_shared_subscription}, {record_name, emqx_shared_subscription},
{attributes, record_info(fields, emqx_shared_subscription)}]); {attributes, record_info(fields, emqx_shared_subscription)}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, ram_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
@ -297,7 +293,7 @@ subscribers(Group, Topic) ->
init([]) -> init([]) ->
{ok, _} = mnesia:subscribe({table, ?TAB, simple}), {ok, _} = mnesia:subscribe({table, ?TAB, simple}),
{atomic, PMon} = ekka_mnesia:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun init_monitors/0),
ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
{ok, update_stats(#state{pmon = PMon})}. {ok, update_stats(#state{pmon = PMon})}.
@ -309,7 +305,7 @@ init_monitors() ->
end, emqx_pmon:new(), ?TAB). end, emqx_pmon:new(), ?TAB).
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
ekka_mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), mria:dirty_write(?TAB, record(Group, Topic, SubPid)),
case ets:member(?SHARED_SUBS, {Group, Topic}) of case ets:member(?SHARED_SUBS, {Group, Topic}) of
true -> ok; true -> ok;
false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
@ -319,7 +315,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon
{reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
ekka_mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), mria:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
delete_route_if_needed({Group, Topic}), delete_route_if_needed({Group, Topic}),
{reply, ok, State}; {reply, ok, State};
@ -373,7 +369,7 @@ cleanup_down(SubPid) ->
?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid), ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
lists:foreach( lists:foreach(
fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
ok = ekka_mnesia:dirty_delete_object(?TAB, Record), ok = mria:dirty_delete_object(?TAB, Record),
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
delete_route_if_needed({Group, Topic}) delete_route_if_needed({Group, Topic})
end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).

View File

@ -150,7 +150,7 @@ handle_info({timeout, TRef, tick},
State = #state{ticker = TRef, version = Version, sysdescr = Descr}) -> State = #state{ticker = TRef, version = Version, sysdescr = Descr}) ->
publish_any(version, Version), publish_any(version, Version),
publish_any(sysdescr, Descr), publish_any(sysdescr, Descr),
publish_any(brokers, ekka_mnesia:running_nodes()), publish_any(brokers, mria_mnesia:running_nodes()),
publish_any(stats, emqx_stats:getstats()), publish_any(stats, emqx_stats:getstats()),
publish_any(metrics, emqx_metrics:all()), publish_any(metrics, emqx_metrics:all()),
{noreply, tick(State), hibernate}; {noreply, tick(State), hibernate};

View File

@ -22,7 +22,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% Trie APIs %% Trie APIs
-export([ insert/1 -export([ insert/1
@ -61,16 +60,13 @@ mnesia(boot) ->
StoreProps = [{ets, [{read_concurrency, true}, StoreProps = [{ets, [{read_concurrency, true},
{write_concurrency, true} {write_concurrency, true}
]}], ]}],
ok = ekka_mnesia:create_table(?TRIE, [ ok = mria:create_table(?TRIE, [
{rlog_shard, ?ROUTE_SHARD}, {rlog_shard, ?ROUTE_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, ?TRIE}, {record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)}, {attributes, record_info(fields, ?TRIE)},
{type, ordered_set}, {type, ordered_set},
{storage_properties, StoreProps}]); {storage_properties, StoreProps}]).
mnesia(copy) ->
%% Copy topics table
ok = ekka_mnesia:copy_table(?TRIE, ram_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Topics APIs %% Topics APIs
@ -329,6 +325,6 @@ do_compact_test() ->
do_compact(words(<<"a/+/+/+/+/b">>))), do_compact(words(<<"a/+/+/+/+/b">>))),
ok. ok.
clear_tables() -> ekka_mnesia:clear_table(?TRIE). clear_tables() -> mria:clear_table(?TRIE).
-endif. % TEST -endif. % TEST

View File

@ -27,14 +27,12 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
application:load(emqx), application:load(emqx),
ok = ekka:start(), ok = ekka:start(),
%% for coverage
ok = emqx_banned:mnesia(copy),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ekka:stop(), ekka:stop(),
ekka_mnesia:ensure_stopped(), mria:stop(),
ekka_mnesia:delete_schema(). mria_mnesia:delete_schema().
t_add_delete(_) -> t_add_delete(_) ->
Banned = #banned{who = {clientid, <<"TestClient">>}, Banned = #banned{who = {clientid, <<"TestClient">>},
@ -92,4 +90,3 @@ t_unused(_) ->
?assertEqual(ok, Banned ! ok), ?assertEqual(ok, Banned ! ok),
timer:sleep(500), %% expiry timer timer:sleep(500), %% expiry timer
ok = emqx_banned:stop(). ok = emqx_banned:stop().

View File

@ -195,7 +195,8 @@ generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) ->
-spec(stop_apps(list()) -> ok). -spec(stop_apps(list()) -> ok).
stop_apps(Apps) -> stop_apps(Apps) ->
[application:stop(App) || App <- Apps ++ [emqx, mnesia]]. [application:stop(App) || App <- Apps ++ [emqx, mria, mnesia]],
ok.
%% backward compatible %% backward compatible
deps_path(App, RelativePath) -> app_path(App, RelativePath). deps_path(App, RelativePath) -> app_path(App, RelativePath).
@ -269,8 +270,8 @@ reload(App, SpecAppConfigHandler) ->
application:start(App). application:start(App).
ensure_mnesia_stopped() -> ensure_mnesia_stopped() ->
ekka_mnesia:ensure_stopped(), mria:stop(),
ekka_mnesia:delete_schema(). mria_mnesia:delete_schema().
%% Help function to wait for Fun to yield 'true'. %% Help function to wait for Fun to yield 'true'.
wait_for(Fn, Ln, F, Timeout) -> wait_for(Fn, Ln, F, Timeout) ->

View File

@ -35,7 +35,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]), emqx_common_test_helpers:stop_apps([]),
ekka_mnesia:delete_schema(), %% Clean emqx_banned table mria_mnesia:delete_schema(), %% Clean emqx_banned table
ok. ok.
t_detect_check(_) -> t_detect_check(_) ->

View File

@ -41,10 +41,6 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
clear_tables(). clear_tables().
t_mnesia(_) ->
%% for coverage
ok = emqx_router:mnesia(copy).
% t_add_route(_) -> % t_add_route(_) ->
% error('TODO'). % error('TODO').
@ -117,4 +113,3 @@ t_unexpected(_) ->
clear_tables() -> clear_tables() ->
lists:foreach(fun mnesia:clear_table/1, lists:foreach(fun mnesia:clear_table/1,
[emqx_route, emqx_trie, emqx_trie_node]). [emqx_route, emqx_trie, emqx_trie_node]).

View File

@ -50,8 +50,8 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
ekka:stop(), ekka:stop(),
ekka_mnesia:ensure_stopped(), mria:stop(),
ekka_mnesia:delete_schema(). mria_mnesia:delete_schema().
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
clear_tables(), clear_tables(),
@ -60,9 +60,6 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
clear_tables(). clear_tables().
t_mnesia(_) ->
ok = ?TRIE:mnesia(copy).
t_insert(_) -> t_insert(_) ->
Fun = fun() -> Fun = fun() ->
?TRIE:insert(<<"sensor/1/metric/2">>), ?TRIE:insert(<<"sensor/1/metric/2">>),
@ -191,7 +188,6 @@ t_delete3(_) ->
clear_tables() -> emqx_trie:clear_tables(). clear_tables() -> emqx_trie:clear_tables().
trans(Fun) -> trans(Fun) ->
mnesia:transaction(Fun). mria:transaction(?ROUTE_SHARD, Fun).
trans(Fun, Args) -> trans(Fun, Args) ->
mnesia:transaction(Fun, Args). mria:transaction(?ROUTE_SHARD, Fun, Args).

View File

@ -29,7 +29,7 @@
[ emqx_metrics [ emqx_metrics
, emqx_stats , emqx_stats
, emqx_broker , emqx_broker
, ekka_mnesia , mria_mnesia
]). ]).
-define(ALL(Vars, Types, Exprs), -define(ALL(Vars, Types, Exprs),
@ -80,8 +80,8 @@ do_mock(emqx_broker) ->
fun(Msg) -> {node(), <<"test">>, Msg} end); fun(Msg) -> {node(), <<"test">>, Msg} end);
do_mock(emqx_stats) -> do_mock(emqx_stats) ->
meck:expect(emqx_stats, getstats, fun() -> [0] end); meck:expect(emqx_stats, getstats, fun() -> [0] end);
do_mock(ekka_mnesia) -> do_mock(mria_mnesia) ->
meck:expect(ekka_mnesia, running_nodes, fun() -> [node()] end); meck:expect(mria_mnesia, running_nodes, fun() -> [node()] end);
do_mock(emqx_metrics) -> do_mock(emqx_metrics) ->
meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end). meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end).
@ -129,4 +129,3 @@ postcondition(_State, {call, _Mod, _Fun, _Args}, _Res) ->
next_state(State, _Res, {call, _Mod, _Fun, _Args}) -> next_state(State, _Res, {call, _Mod, _Fun, _Args}) ->
NewState = State, NewState = State,
NewState. NewState.

View File

@ -30,7 +30,7 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
ok = ekka_rlog:wait_for_shards([?AUTH_SHARD], infinity), ok = mria_rlog:wait_for_shards([?AUTH_SHARD], infinity),
{ok, Sup} = emqx_authn_sup:start_link(), {ok, Sup} = emqx_authn_sup:start_link(),
ok = ?AUTHN:register_providers(providers()), ok = ?AUTHN:register_providers(providers()),
ok = initialize(), ok = initialize(),

View File

@ -46,7 +46,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-record(user_info, -record(user_info,
{ user_id { user_id
@ -63,15 +62,12 @@
%% @doc Create or replicate tables. %% @doc Create or replicate tables.
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{rlog_shard, ?AUTH_SHARD}, {rlog_shard, ?AUTH_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, user_info}, {record_name, user_info},
{attributes, record_info(fields, user_info)}, {attributes, record_info(fields, user_info)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Hocon Schema %% Hocon Schema
@ -268,7 +264,7 @@ trans(Fun) ->
trans(Fun, []). trans(Fun, []).
trans(Fun, Args) -> trans(Fun, Args) ->
case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of case mria:transaction(?AUTH_SHARD, Fun, Args) of
{atomic, Res} -> Res; {atomic, Res} -> Res;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end. end.

View File

@ -59,7 +59,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
@ -70,15 +69,12 @@
%% @doc Create or replicate tables. %% @doc Create or replicate tables.
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{rlog_shard, ?AUTH_SHARD}, {rlog_shard, ?AUTH_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, user_info}, {record_name, user_info},
{attributes, record_info(fields, user_info)}, {attributes, record_info(fields, user_info)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Hocon Schema %% Hocon Schema
@ -390,7 +386,7 @@ trans(Fun) ->
trans(Fun, []). trans(Fun, []).
trans(Fun, Args) -> trans(Fun, Args) ->
case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of case mria:transaction(?AUTH_SHARD, Fun, Args) of
{atomic, Res} -> Res; {atomic, Res} -> Res;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end. end.

View File

@ -528,10 +528,10 @@ users(get, #{query_string := Qs}) ->
end; end;
users(post, #{body := Body}) when is_list(Body) -> users(post, #{body := Body}) when is_list(Body) ->
lists:foreach(fun(#{<<"username">> := Username, <<"rules">> := Rules}) -> lists:foreach(fun(#{<<"username">> := Username, <<"rules">> := Rules}) ->
ekka_mnesia:dirty_write(#emqx_acl{ mria:dirty_write(#emqx_acl{
who = {?ACL_TABLE_USERNAME, Username}, who = {?ACL_TABLE_USERNAME, Username},
rules = format_rules(Rules) rules = format_rules(Rules)
}) })
end, Body), end, Body),
{204}. {204}.
@ -561,10 +561,10 @@ clients(get, #{query_string := Qs}) ->
end; end;
clients(post, #{body := Body}) when is_list(Body) -> clients(post, #{body := Body}) when is_list(Body) ->
lists:foreach(fun(#{<<"clientid">> := Clientid, <<"rules">> := Rules}) -> lists:foreach(fun(#{<<"clientid">> := Clientid, <<"rules">> := Rules}) ->
ekka_mnesia:dirty_write(#emqx_acl{ mria:dirty_write(#emqx_acl{
who = {?ACL_TABLE_CLIENTID, Clientid}, who = {?ACL_TABLE_CLIENTID, Clientid},
rules = format_rules(Rules) rules = format_rules(Rules)
}) })
end, Body), end, Body),
{204}. {204}.
@ -581,13 +581,13 @@ user(get, #{bindings := #{username := Username}}) ->
end; end;
user(put, #{bindings := #{username := Username}, user(put, #{bindings := #{username := Username},
body := #{<<"username">> := Username, <<"rules">> := Rules}}) -> body := #{<<"username">> := Username, <<"rules">> := Rules}}) ->
ekka_mnesia:dirty_write(#emqx_acl{ mria:dirty_write(#emqx_acl{
who = {?ACL_TABLE_USERNAME, Username}, who = {?ACL_TABLE_USERNAME, Username},
rules = format_rules(Rules) rules = format_rules(Rules)
}), }),
{204}; {204};
user(delete, #{bindings := #{username := Username}}) -> user(delete, #{bindings := #{username := Username}}) ->
ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}}), mria:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}}),
{204}. {204}.
client(get, #{bindings := #{clientid := Clientid}}) -> client(get, #{bindings := #{clientid := Clientid}}) ->
@ -603,13 +603,13 @@ client(get, #{bindings := #{clientid := Clientid}}) ->
end; end;
client(put, #{bindings := #{clientid := Clientid}, client(put, #{bindings := #{clientid := Clientid},
body := #{<<"clientid">> := Clientid, <<"rules">> := Rules}}) -> body := #{<<"clientid">> := Clientid, <<"rules">> := Rules}}) ->
ekka_mnesia:dirty_write(#emqx_acl{ mria:dirty_write(#emqx_acl{
who = {?ACL_TABLE_CLIENTID, Clientid}, who = {?ACL_TABLE_CLIENTID, Clientid},
rules = format_rules(Rules) rules = format_rules(Rules)
}), }),
{204}; {204};
client(delete, #{bindings := #{clientid := Clientid}}) -> client(delete, #{bindings := #{clientid := Clientid}}) ->
ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}}), mria:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}}),
{204}. {204}.
all(get, _) -> all(get, _) ->
@ -624,17 +624,17 @@ all(get, _) ->
} }
end; end;
all(put, #{body := #{<<"rules">> := Rules}}) -> all(put, #{body := #{<<"rules">> := Rules}}) ->
ekka_mnesia:dirty_write(#emqx_acl{ mria:dirty_write(#emqx_acl{
who = ?ACL_TABLE_ALL, who = ?ACL_TABLE_ALL,
rules = format_rules(Rules) rules = format_rules(Rules)
}), }),
{204}. {204}.
purge(delete, _) -> purge(delete, _) ->
case emqx_authz_api_sources:get_raw_source(<<"built-in-database">>) of case emqx_authz_api_sources:get_raw_source(<<"built-in-database">>) of
[#{<<"enable">> := false}] -> [#{<<"enable">> := false}] ->
ok = lists:foreach(fun(Key) -> ok = lists:foreach(fun(Key) ->
ok = ekka_mnesia:dirty_delete(?ACL_TABLE, Key) ok = mria:dirty_delete(?ACL_TABLE, Key)
end, mnesia:dirty_all_keys(?ACL_TABLE)), end, mnesia:dirty_all_keys(?ACL_TABLE)),
{204}; {204};
[#{<<"enable">> := true}] -> [#{<<"enable">> := true}] ->

View File

@ -12,7 +12,7 @@
-export([start/2, stop/1]). -export([start/2, stop/1]).
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
ok = ekka_rlog:wait_for_shards([?ACL_SHARDED], infinity), ok = mria_rlog:wait_for_shards([?ACL_SHARDED], infinity),
{ok, Sup} = emqx_authz_sup:start_link(), {ok, Sup} = emqx_authz_sup:start_link(),
ok = emqx_authz:init(), ok = emqx_authz:init(),
{ok, Sup}. {ok, Sup}.

View File

@ -32,18 +32,15 @@
-endif. -endif.
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?ACL_TABLE, [ ok = mria:create_table(?ACL_TABLE, [
{type, ordered_set}, {type, ordered_set},
{rlog_shard, ?ACL_SHARDED}, {rlog_shard, ?ACL_SHARDED},
{disc_copies, [node()]}, {storage, disc_copies},
{attributes, record_info(fields, ?ACL_TABLE)}, {attributes, record_info(fields, ?ACL_TABLE)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ACL_TABLE, disc_copies).
description() -> description() ->
"AuthZ with Mnesia". "AuthZ with Mnesia".

View File

@ -54,24 +54,24 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(t_authz, Config) -> init_per_testcase(t_authz, Config) ->
mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>}, mria:dirty_write(#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>},
rules = [{allow, publish, <<"test/%u">>}, rules = [{allow, publish, <<"test/%u">>},
{allow, subscribe, <<"eq #">>} {allow, subscribe, <<"eq #">>}
] ]
}]), }),
mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>}, mria:dirty_write(#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>},
rules = [{allow, publish, <<"test/%c">>}, rules = [{allow, publish, <<"test/%c">>},
{deny, subscribe, <<"eq #">>} {deny, subscribe, <<"eq #">>}
] ]
}]), }),
mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL, mria:dirty_write(#emqx_acl{who = ?ACL_TABLE_ALL,
rules = [{deny, all, <<"#">>}] rules = [{deny, all, <<"#">>}]
}]), }),
Config; Config;
init_per_testcase(_, Config) -> Config. init_per_testcase(_, Config) -> Config.
end_per_testcase(t_authz, Config) -> end_per_testcase(t_authz, Config) ->
[ ekka_mnesia:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)], [ mria:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)],
Config; Config;
end_per_testcase(_, Config) -> Config. end_per_testcase(_, Config) -> Config.
@ -106,4 +106,3 @@ t_authz(_) ->
?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"#">>)), ?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"#">>)),
ok. ok.

View File

@ -42,7 +42,7 @@ all() ->
[t_auto_subscribe, t_update]. [t_auto_subscribe, t_update].
init_per_suite(Config) -> init_per_suite(Config) ->
ekka_mnesia:start(), mria:start(),
application:stop(?APP), application:stop(?APP),
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
@ -159,4 +159,3 @@ check_subs([{{_, Topic}, #{subid := ?CLIENT_ID}} | Subs], List) ->
check_subs(Subs, lists:delete(Topic, List)); check_subs(Subs, lists:delete(Topic, List));
check_subs([_ | Subs], List) -> check_subs([_ | Subs], List) ->
check_subs(Subs, List). check_subs(Subs, List).

View File

@ -153,7 +153,7 @@ param_path_operation()->
}. }.
list_bridges(get, _Params) -> list_bridges(get, _Params) ->
{200, lists:append([list_local_bridges(Node) || Node <- ekka_mnesia:running_nodes()])}. {200, lists:append([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
list_local_bridges(Node) when Node =:= node() -> list_local_bridges(Node) when Node =:= node() ->
[format_resp(Data) || Data <- emqx_bridge:list_bridges()]; [format_resp(Data) || Data <- emqx_bridge:list_bridges()];
@ -161,7 +161,7 @@ list_local_bridges(Node) ->
rpc_call(Node, list_local_bridges, [Node]). rpc_call(Node, list_local_bridges, [Node]).
crud_bridges_cluster(Method, Params) -> crud_bridges_cluster(Method, Params) ->
Results = [crud_bridges(Node, Method, Params) || Node <- ekka_mnesia:running_nodes()], Results = [crud_bridges(Node, Method, Params) || Node <- mria_mnesia:running_nodes()],
case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of
[] -> [] ->
case Results of case Results of

View File

@ -21,7 +21,6 @@
-include("emqx_dashboard.hrl"). -include("emqx_dashboard.hrl").
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% Mnesia bootstrap %% Mnesia bootstrap
-export([mnesia/1]). -export([mnesia/1]).
@ -50,16 +49,14 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(mqtt_admin, [ ok = mria:create_table(mqtt_admin, [
{type, set}, {type, set},
{rlog_shard, ?DASHBOARD_SHARD}, {rlog_shard, ?DASHBOARD_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, mqtt_admin}, {record_name, mqtt_admin},
{attributes, record_info(fields, mqtt_admin)}, {attributes, record_info(fields, mqtt_admin)},
{storage_properties, [{ets, [{read_concurrency, true}, {storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]); {write_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(mqtt_admin, disc_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
@ -68,7 +65,7 @@ mnesia(copy) ->
-spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}). -spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}).
add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) -> add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) ->
Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags},
return(ekka_mnesia:transaction(?DASHBOARD_SHARD, fun add_user_/1, [Admin])). return(mria:transaction(?DASHBOARD_SHARD, fun add_user_/1, [Admin])).
force_add_user(Username, Password, Tags) -> force_add_user(Username, Password, Tags) ->
AddFun = fun() -> AddFun = fun() ->
@ -76,7 +73,7 @@ force_add_user(Username, Password, Tags) ->
password = Password, password = Password,
tags = Tags}) tags = Tags})
end, end,
case ekka_mnesia:transaction(?DASHBOARD_SHARD, AddFun) of case mria:transaction(?DASHBOARD_SHARD, AddFun) of
{atomic, ok} -> ok; {atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end. end.
@ -98,11 +95,11 @@ remove_user(Username) when is_binary(Username) ->
end, end,
mnesia:delete({mqtt_admin, Username}) mnesia:delete({mqtt_admin, Username})
end, end,
return(ekka_mnesia:transaction(?DASHBOARD_SHARD, Trans)). return(mria:transaction(?DASHBOARD_SHARD, Trans)).
-spec(update_user(binary(), binary()) -> ok | {error, term()}). -spec(update_user(binary(), binary()) -> ok | {error, term()}).
update_user(Username, Tags) when is_binary(Username) -> update_user(Username, Tags) when is_binary(Username) ->
return(ekka_mnesia:transaction(?DASHBOARD_SHARD, fun update_user_/2, [Username, Tags])). return(mria:transaction(?DASHBOARD_SHARD, fun update_user_/2, [Username, Tags])).
%% @private %% @private
update_user_(Username, Tags) -> update_user_(Username, Tags) ->
@ -135,13 +132,13 @@ update_pwd(Username, Fun) ->
end, end,
mnesia:write(Fun(User)) mnesia:write(Fun(User))
end, end,
return(ekka_mnesia:transaction(?DASHBOARD_SHARD, Trans)). return(mria:transaction(?DASHBOARD_SHARD, Trans)).
-spec(lookup_user(binary()) -> [mqtt_admin()]). -spec(lookup_user(binary()) -> [mqtt_admin()]).
lookup_user(Username) when is_binary(Username) -> lookup_user(Username) when is_binary(Username) ->
Fun = fun() -> mnesia:read(mqtt_admin, Username) end, Fun = fun() -> mnesia:read(mqtt_admin, Username) end,
{atomic, User} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun), {atomic, User} = mria:ro_transaction(?DASHBOARD_SHARD, Fun),
User. User.
-spec(all_users() -> [#mqtt_admin{}]). -spec(all_users() -> [#mqtt_admin{}]).

View File

@ -26,7 +26,7 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_dashboard_sup:start_link(), {ok, Sup} = emqx_dashboard_sup:start_link(),
ok = ekka_rlog:wait_for_shards([?DASHBOARD_SHARD], infinity), ok = mria_rlog:wait_for_shards([?DASHBOARD_SHARD], infinity),
_ = emqx_dashboard:start_listeners(), _ = emqx_dashboard:start_listeners(),
emqx_dashboard_cli:load(), emqx_dashboard_cli:load(),
ok = emqx_dashboard_admin:add_default_user(), ok = emqx_dashboard_admin:add_default_user(),

View File

@ -25,7 +25,6 @@
-export([get_local_time/0]). -export([get_local_time/0]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% Mnesia bootstrap %% Mnesia bootstrap
-export([mnesia/1]). -export([mnesia/1]).
@ -41,14 +40,12 @@
-define(EXPIRE_INTERVAL, 86400000 * 7). -define(EXPIRE_INTERVAL, 86400000 * 7).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(emqx_collect, [ ok = mria:create_table(emqx_collect, [
{type, set}, {type, set},
{local_content, true}, {local_content, true},
{disc_only_copies, [node()]}, {storage, disc_only_copies},
{record_name, mqtt_collect}, {record_name, mqtt_collect},
{attributes, record_info(fields, mqtt_collect)}]); {attributes, record_info(fields, mqtt_collect)}]).
mnesia(copy) ->
mnesia:add_table_copy(emqx_collect, node(), disc_only_copies).
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@ -162,8 +159,8 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) ->
diff(Sent, Sent0), diff(Sent, Sent0),
diff(Dropped, Dropped0)}, diff(Dropped, Dropped0)},
Ts = get_local_time(), Ts = get_local_time(),
ekka_mnesia:transaction(ekka_mnesia:local_content_shard(), _ = mria:transaction(mria:local_content_shard(),
fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]), fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]),
{Received, Sent, Dropped}. {Received, Sent, Dropped}.
avg(Items) -> avg(Items) ->

View File

@ -158,8 +158,8 @@ counters(get, #{bindings := #{counter := Counter}}) ->
lookup([{<<"counter">>, Counter}]). lookup([{<<"counter">>, Counter}]).
current_counters(get, _Params) -> current_counters(get, _Params) ->
Data = [get_collect(Node) || Node <- ekka_mnesia:running_nodes()], Data = [get_collect(Node) || Node <- mria_mnesia:running_nodes()],
Nodes = length(ekka_mnesia:running_nodes()), Nodes = length(mria_mnesia:running_nodes()),
{Received, Sent, Sub, Conn} = format_current_metrics(Data), {Received, Sent, Sub, Conn} = format_current_metrics(Data),
Response = #{ Response = #{
nodes => Nodes, nodes => Nodes,
@ -194,16 +194,16 @@ lookup_(#{node := Node, counter := Counter}) ->
lookup_(#{node := Node}) -> lookup_(#{node := Node}) ->
{200, sampling(Node)}; {200, sampling(Node)};
lookup_(#{counter := Counter}) -> lookup_(#{counter := Counter}) ->
CounterData = merger_counters([sampling(Node, Counter) || Node <- ekka_mnesia:running_nodes()]), CounterData = merger_counters([sampling(Node, Counter) || Node <- mria_mnesia:running_nodes()]),
Data = hd(maps:values(CounterData)), Data = hd(maps:values(CounterData)),
{200, Data}. {200, Data}.
list_collect(Aggregate) -> list_collect(Aggregate) ->
case Aggregate of case Aggregate of
<<"true">> -> <<"true">> ->
[maps:put(node, Node, sampling(Node)) || Node <- ekka_mnesia:running_nodes()]; [maps:put(node, Node, sampling(Node)) || Node <- mria_mnesia:running_nodes()];
_ -> _ ->
Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], Counters = [sampling(Node) || Node <- mria_mnesia:running_nodes()],
merger_counters(Counters) merger_counters(Counters)
end. end.

View File

@ -28,7 +28,6 @@
]). ]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([mnesia/1]). -export([mnesia/1]).
@ -77,16 +76,14 @@ destroy_by_username(Username) ->
do_destroy_by_username(Username). do_destroy_by_username(Username).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, set}, {type, set},
{rlog_shard, ?DASHBOARD_SHARD}, {rlog_shard, ?DASHBOARD_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, mqtt_admin_jwt}, {record_name, mqtt_admin_jwt},
{attributes, record_info(fields, mqtt_admin_jwt)}, {attributes, record_info(fields, mqtt_admin_jwt)},
{storage_properties, [{ets, [{read_concurrency, true}, {storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]); {write_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% jwt apply %% jwt apply
@ -104,7 +101,7 @@ do_sign(Username, Password) ->
Signed = jose_jwt:sign(JWK, JWS, JWT), Signed = jose_jwt:sign(JWK, JWS, JWT),
{_, Token} = jose_jws:compact(Signed), {_, Token} = jose_jws:compact(Signed),
JWTRec = format(Token, Username, ExpTime), JWTRec = format(Token, Username, ExpTime),
ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]), _ = mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]),
{ok, Token}. {ok, Token}.
do_verify(Token)-> do_verify(Token)->
@ -113,7 +110,7 @@ do_verify(Token)->
case ExpTime > erlang:system_time(millisecond) of case ExpTime > erlang:system_time(millisecond) of
true -> true ->
NewJWT = JWT#mqtt_admin_jwt{exptime = jwt_expiration_time()}, NewJWT = JWT#mqtt_admin_jwt{exptime = jwt_expiration_time()},
{atomic, Res} = ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]), {atomic, Res} = mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]),
Res; Res;
_ -> _ ->
{error, token_timeout} {error, token_timeout}
@ -124,7 +121,7 @@ do_verify(Token)->
do_destroy(Token) -> do_destroy(Token) ->
Fun = fun mnesia:delete/1, Fun = fun mnesia:delete/1,
{atomic, ok} = ekka_mnesia:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]), {atomic, ok} = mria:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]),
ok. ok.
do_destroy_by_username(Username) -> do_destroy_by_username(Username) ->
@ -135,7 +132,7 @@ do_destroy_by_username(Username) ->
-spec(lookup(Token :: binary()) -> {ok, #mqtt_admin_jwt{}} | {error, not_found}). -spec(lookup(Token :: binary()) -> {ok, #mqtt_admin_jwt{}} | {error, not_found}).
lookup(Token) -> lookup(Token) ->
Fun = fun() -> mnesia:read(?TAB, Token) end, Fun = fun() -> mnesia:read(?TAB, Token) end,
case ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun) of case mria:ro_transaction(?DASHBOARD_SHARD, Fun) of
{atomic, [JWT]} -> {ok, JWT}; {atomic, [JWT]} -> {ok, JWT};
{atomic, []} -> {error, not_found} {atomic, []} -> {error, not_found}
end. end.
@ -143,7 +140,7 @@ lookup(Token) ->
lookup_by_username(Username) -> lookup_by_username(Username) ->
Spec = [{{mqtt_admin_jwt, '_', Username, '_'}, [], ['$_']}], Spec = [{{mqtt_admin_jwt, '_', Username, '_'}, [], ['$_']}],
Fun = fun() -> mnesia:select(?TAB, Spec) end, Fun = fun() -> mnesia:select(?TAB, Spec) end,
{atomic, List} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun), {atomic, List} = mria:ro_transaction(?DASHBOARD_SHARD, Fun),
List. List.
@ -193,7 +190,7 @@ handle_info(clean_jwt, State) ->
timer_clean(self()), timer_clean(self()),
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
Spec = [{{mqtt_admin_jwt, '_', '_', '$1'}, [{'<', '$1', Now}], ['$_']}], Spec = [{{mqtt_admin_jwt, '_', '_', '$1'}, [{'<', '$1', Now}], ['$_']}],
{atomic, JWTList} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, {atomic, JWTList} = mria:ro_transaction(?DASHBOARD_SHARD,
fun() -> mnesia:select(?TAB, Spec) end), fun() -> mnesia:select(?TAB, Spec) end),
destroy(JWTList), destroy(JWTList),
{noreply, State}; {noreply, State};

View File

@ -50,7 +50,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_management]), emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_management]),
ekka_mnesia:ensure_stopped(). mria:stop().
set_special_configs(emqx_management) -> set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
@ -102,7 +102,7 @@ t_rest_api(_Config) ->
ok. ok.
t_cli(_Config) -> t_cli(_Config) ->
[ekka_mnesia:dirty_delete(mqtt_admin, Admin) || Admin <- mnesia:dirty_all_keys(mqtt_admin)], [mria:dirty_delete(mqtt_admin, Admin) || Admin <- mnesia:dirty_all_keys(mqtt_admin)],
emqx_dashboard_cli:admins(["add", "username", "password"]), emqx_dashboard_cli:admins(["add", "username", "password"]),
[{mqtt_admin, <<"username">>, <<Salt:4/binary, Hash/binary>>, _}] = [{mqtt_admin, <<"username">>, <<Salt:4/binary, Hash/binary>>, _}] =
emqx_dashboard_admin:lookup_user(<<"username">>), emqx_dashboard_admin:lookup_user(<<"username">>),

View File

@ -65,7 +65,7 @@ register_channel(Type, ClientId) when is_binary(ClientId) ->
register_channel(Type, {ClientId, self()}); register_channel(Type, {ClientId, self()});
register_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> register_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
ekka_mnesia:dirty_write(tabname(Type), record(ClientId, ChanPid)). mria:dirty_write(tabname(Type), record(ClientId, ChanPid)).
%% @doc Unregister a global channel. %% @doc Unregister a global channel.
-spec unregister_channel(atom(), binary() | {binary(), pid()}) -> ok. -spec unregister_channel(atom(), binary() | {binary(), pid()}) -> ok.
@ -73,7 +73,7 @@ unregister_channel(Type, ClientId) when is_binary(ClientId) ->
unregister_channel(Type, {ClientId, self()}); unregister_channel(Type, {ClientId, self()});
unregister_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> unregister_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
ekka_mnesia:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)). mria:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)).
%% @doc Lookup the global channels. %% @doc Lookup the global channels.
-spec lookup_channels(atom(), binary()) -> list(pid()). -spec lookup_channels(atom(), binary()) -> list(pid()).
@ -89,16 +89,15 @@ record(ClientId, ChanPid) ->
init([Type]) -> init([Type]) ->
Tab = tabname(Type), Tab = tabname(Type),
ok = ekka_mnesia:create_table(Tab, [ ok = mria:create_table(Tab, [
{type, bag}, {type, bag},
{rlog_shard, ?CM_SHARD}, {rlog_shard, ?CM_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, channel}, {record_name, channel},
{attributes, record_info(fields, channel)}, {attributes, record_info(fields, channel)},
{storage_properties, [{ets, [{read_concurrency, true}, {storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]), {write_concurrency, true}]}]}]),
ok = ekka_mnesia:copy_table(Tab, ram_copies), ok = mria:wait_for_tables([Tab]),
%%ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
ok = ekka:monitor(membership), ok = ekka:monitor(membership),
{ok, #{type => Type}}. {ok, #{type => Type}}.
@ -115,7 +114,7 @@ handle_info({membership, {mnesia, down, Node}}, State = #{type := Type}) ->
global:trans({?LOCK, self()}, global:trans({?LOCK, self()},
fun() -> fun() ->
%% FIXME: The shard name should be fixed later %% FIXME: The shard name should be fixed later
ekka_mnesia:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab]) mria:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab])
end), end),
{noreply, State}; {noreply, State};

View File

@ -209,7 +209,7 @@ confexp({error, already_exist}) ->
emqx_type:clientid(), {atom(), atom()}) -> list(). emqx_type:clientid(), {atom(), atom()}) -> list().
lookup_client(GwName, ClientId, FormatFun) -> lookup_client(GwName, ClientId, FormatFun) ->
lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun)
|| Node <- ekka_mnesia:running_nodes()]). || Node <- mria_mnesia:running_nodes()]).
lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() -> lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() ->
ChanTab = emqx_gateway_cm:tabname(chan, GwName), ChanTab = emqx_gateway_cm:tabname(chan, GwName),
@ -229,7 +229,7 @@ lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) ->
| ok. | ok.
kickout_client(GwName, ClientId) -> kickout_client(GwName, ClientId) ->
Results = [kickout_client(Node, GwName, ClientId) Results = [kickout_client(Node, GwName, ClientId)
|| Node <- ekka_mnesia:running_nodes()], || Node <- mria_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok; true -> ok;
false -> lists:last(Results) false -> lists:last(Results)

View File

@ -56,20 +56,16 @@
%-export([mnesia/1]). %-export([mnesia/1]).
%-boot_mnesia({mnesia, [boot]}). %-boot_mnesia({mnesia, [boot]}).
%-copy_mnesia({mnesia, [copy]}).
%%% @doc Create or replicate tables. %%% @doc Create or replicate tables.
%-spec(mnesia(boot | copy) -> ok). %-spec(mnesia(boot | copy) -> ok).
%mnesia(boot) -> %mnesia(boot) ->
% %% Optimize storage % %% Optimize storage
% StoreProps = [{ets, [{read_concurrency, true}]}], % StoreProps = [{ets, [{read_concurrency, true}]}],
% ok = ekka_mnesia:create_table(?MODULE, [ % ok = mria:create_table(?MODULE, [
% {attributes, record_info(fields, emqx_sn_registry)}, % {attributes, record_info(fields, emqx_sn_registry)},
% {ram_copies, [node()]}, % {ram_copies, [node()]},
% {storage_properties, StoreProps}]); % {storage_properties, StoreProps}]).
%
%mnesia(copy) ->
% ok = ekka_mnesia:copy_table(?MODULE, ram_copies).
-type registry() :: {Tab :: atom(), -type registry() :: {Tab :: atom(),
RegistryPid :: pid()}. RegistryPid :: pid()}.
@ -141,28 +137,27 @@ init([InstaId, PredefTopics]) ->
%% {ClientId, TopicId} -> TopicName %% {ClientId, TopicId} -> TopicName
%% {ClientId, TopicName} -> TopicId %% {ClientId, TopicName} -> TopicId
Tab = name(InstaId), Tab = name(InstaId),
ok = ekka_mnesia:create_table(Tab, [ ok = mria:create_table(Tab, [
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, emqx_sn_registry}, {record_name, emqx_sn_registry},
{attributes, record_info(fields, emqx_sn_registry)}, {attributes, record_info(fields, emqx_sn_registry)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}, {storage_properties, [{ets, [{read_concurrency, true}]}]},
{rlog_shard, ?SN_SHARD} {rlog_shard, ?SN_SHARD}
]), ]),
ok = ekka_mnesia:copy_table(Tab, ram_copies), ok = mria:wait_for_tables([Tab]),
ok = ekka_rlog:wait_for_shards([?SN_SHARD], infinity),
% FIXME: % FIXME:
%ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), %ok = mria_rlog:wait_for_shards([?CM_SHARD], infinity),
MaxPredefId = lists:foldl( MaxPredefId = lists:foldl(
fun(#{id := TopicId, topic := TopicName0}, AccId) -> fun(#{id := TopicId, topic := TopicName0}, AccId) ->
TopicName = iolist_to_binary(TopicName0), TopicName = iolist_to_binary(TopicName0),
ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ mria:dirty_write(Tab, #emqx_sn_registry{
key = {predef, TopicId}, key = {predef, TopicId},
value = TopicName} value = TopicName}
), ),
ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ mria:dirty_write(Tab, #emqx_sn_registry{
key = {predef, TopicName}, key = {predef, TopicName},
value = TopicId} value = TopicId}
), ),
if TopicId > AccId -> TopicId; true -> AccId end if TopicId > AccId -> TopicId; true -> AccId end
end, 0, PredefTopics), end, 0, PredefTopics),
{ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}. {ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}.
@ -190,7 +185,7 @@ handle_call({register, ClientId, TopicName}, _From,
key = {ClientId, TopicId}, key = {ClientId, TopicId},
value = TopicName}, write) value = TopicName}, write)
end, end,
case ekka_mnesia:transaction(?SN_SHARD, Fun) of case mria:transaction(?SN_SHARD, Fun) of
{atomic, ok} -> {atomic, ok} ->
{reply, TopicId, State}; {reply, TopicId, State};
{aborted, Error} -> {aborted, Error} ->
@ -205,7 +200,7 @@ handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) ->
{emqx_sn_registry, {ClientId, '_'}, '_'} {emqx_sn_registry, {ClientId, '_'}, '_'}
), ),
lists:foreach(fun(R) -> lists:foreach(fun(R) ->
ekka_mnesia:dirty_delete_object(Tab, R) mria:dirty_delete_object(Tab, R)
end, Registry), end, Registry),
{reply, ok, State}; {reply, ok, State};

View File

@ -35,7 +35,7 @@ all() ->
init_per_suite(Config) -> init_per_suite(Config) ->
application:ensure_all_started(ekka), application:ensure_all_started(ekka),
ekka_mnesia:start(), mria:start(),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
@ -49,7 +49,7 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
{Tab, _Pid} = proplists:get_value(reg, Config), {Tab, _Pid} = proplists:get_value(reg, Config),
ekka_mnesia:clear_table(Tab), mria:clear_table(Tab),
Config. Config.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -30,14 +30,10 @@
-endif. -endif.
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include("emqx_machine.hrl"). -include("emqx_machine.hrl").
-rlog_shard({?EMQX_MACHINE_SHARD, ?CLUSTER_MFA}).
-rlog_shard({?EMQX_MACHINE_SHARD, ?CLUSTER_COMMIT}).
-define(CATCH_UP, catch_up). -define(CATCH_UP, catch_up).
-define(TIMEOUT, timer:minutes(1)). -define(TIMEOUT, timer:minutes(1)).
@ -45,21 +41,18 @@
%%% API %%% API
%%%=================================================================== %%%===================================================================
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ ok = mria:create_table(?CLUSTER_MFA, [
{type, ordered_set}, {type, ordered_set},
{rlog_shard, ?EMQX_MACHINE_SHARD}, {rlog_shard, ?EMQX_MACHINE_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, cluster_rpc_mfa}, {record_name, cluster_rpc_mfa},
{attributes, record_info(fields, cluster_rpc_mfa)}]), {attributes, record_info(fields, cluster_rpc_mfa)}]),
ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ ok = mria:create_table(?CLUSTER_COMMIT, [
{type, set}, {type, set},
{rlog_shard, ?EMQX_MACHINE_SHARD}, {rlog_shard, ?EMQX_MACHINE_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, cluster_rpc_commit}, {record_name, cluster_rpc_commit},
{attributes, record_info(fields, cluster_rpc_commit)}]); {attributes, record_info(fields, cluster_rpc_commit)}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(cluster_rpc_mfa, disc_copies),
ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies).
start_link() -> start_link() ->
start_link(node(), ?MODULE, get_retry_ms()). start_link(node(), ?MODULE, get_retry_ms()).
@ -88,13 +81,13 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
MFA = {initiate, {M, F, A}}, MFA = {initiate, {M, F, A}},
Begin = erlang:monotonic_time(), Begin = erlang:monotonic_time(),
InitRes = InitRes =
case ekka_rlog:role() of case mria_rlog:role() of
core -> gen_server:call(?MODULE, MFA, Timeout); core -> gen_server:call(?MODULE, MFA, Timeout);
replicant -> replicant ->
%% the initiate transaction must happened on core node %% the initiate transaction must happened on core node
%% make sure MFA(in the transaction) and the transaction on the same node %% make sure MFA(in the transaction) and the transaction on the same node
%% don't need rpc again inside transaction. %% don't need rpc again inside transaction.
case ekka_rlog_status:upstream_node(?EMQX_MACHINE_SHARD) of case mria_status:upstream_node(?EMQX_MACHINE_SHARD) of
{ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout);
disconnected -> {error, disconnected} disconnected -> {error, disconnected}
end end
@ -150,8 +143,8 @@ handle_continue(?CATCH_UP, State) ->
{noreply, State, catch_up(State)}. {noreply, State, catch_up(State)}.
handle_call(reset, _From, State) -> handle_call(reset, _From, State) ->
_ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), _ = mria:clear_table(?CLUSTER_COMMIT),
_ = ekka_mnesia:clear_table(?CLUSTER_MFA), _ = mria:clear_table(?CLUSTER_MFA),
{reply, ok, State, {continue, ?CATCH_UP}}; {reply, ok, State, {continue, ?CATCH_UP}};
handle_call({initiate, MFA}, _From, State = #{node := Node}) -> handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
@ -280,7 +273,7 @@ do_catch_up_in_one_trans(LatestId, Node) ->
end. end.
transaction(Func, Args) -> transaction(Func, Args) ->
ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, Func, Args). mria:transaction(?EMQX_MACHINE_SHARD, Func, Args).
trans_status() -> trans_status() ->
mnesia:foldl(fun(Rec, Acc) -> mnesia:foldl(fun(Rec, Acc) ->

View File

@ -49,7 +49,7 @@ handle_cast(Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) ->
case ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of case mria:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of
{atomic, ok} -> ok; {atomic, ok} -> ok;
Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error])
end, end,

View File

@ -36,7 +36,7 @@ start() ->
ok = print_otp_version_warning(), ok = print_otp_version_warning(),
ok = load_config_files(), ok = load_config_files(),
ekka:start(), ekka:start(),
ekka_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), ok = mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity),
ok. ok.
graceful_shutdown() -> graceful_shutdown() ->

View File

@ -147,7 +147,7 @@ fields("cluster") ->
#{})} #{})}
, {"db_backend", , {"db_backend",
sc(hoconsc:enum([mnesia, rlog]), sc(hoconsc:enum([mnesia, rlog]),
#{ mapping => "ekka.db_backend" #{ mapping => "mria.db_backend"
, default => mnesia , default => mnesia
})} })}
, {"rlog", , {"rlog",
@ -253,12 +253,12 @@ fields(cluster_k8s) ->
fields("rlog") -> fields("rlog") ->
[ {"role", [ {"role",
sc(hoconsc:enum([core, replicant]), sc(hoconsc:enum([core, replicant]),
#{ mapping => "ekka.node_role" #{ mapping => "mria.node_role"
, default => core , default => core
})} })}
, {"core_nodes", , {"core_nodes",
sc(emqx_schema:comma_separated_atoms(), sc(emqx_schema:comma_separated_atoms(),
#{ mapping => "ekka.core_nodes" #{ mapping => "mria.core_nodes"
, default => [] , default => []
})} })}
]; ];

View File

@ -43,7 +43,7 @@ init_per_suite(Config) ->
application:load(emqx), application:load(emqx),
application:load(emqx_machine), application:load(emqx_machine),
ok = ekka:start(), ok = ekka:start(),
ok = ekka_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), ok = mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity),
application:set_env(emqx_machine, cluster_call_max_history, 100), application:set_env(emqx_machine, cluster_call_max_history, 100),
application:set_env(emqx_machine, cluster_call_clean_interval, 1000), application:set_env(emqx_machine, cluster_call_clean_interval, 1000),
application:set_env(emqx_machine, cluster_call_retry_interval, 900), application:set_env(emqx_machine, cluster_call_retry_interval, 900),
@ -54,8 +54,8 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
ekka:stop(), ekka:stop(),
ekka_mnesia:ensure_stopped(), mria:stop(),
ekka_mnesia:delete_schema(), mria_mnesia:delete_schema(),
meck:unload(emqx_alarm), meck:unload(emqx_alarm),
ok. ok.

View File

@ -164,7 +164,7 @@ stopped_node_info(Node) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
list_brokers() -> list_brokers() ->
[{Node, broker_info(Node)} || Node <- ekka_mnesia:running_nodes()]. [{Node, broker_info(Node)} || Node <- mria_mnesia:running_nodes()].
lookup_broker(Node) -> lookup_broker(Node) ->
broker_info(Node). broker_info(Node).
@ -181,7 +181,7 @@ broker_info(Node) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
get_metrics() -> get_metrics() ->
nodes_info_count([get_metrics(Node) || Node <- ekka_mnesia:running_nodes()]). nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]).
get_metrics(Node) when Node =:= node() -> get_metrics(Node) when Node =:= node() ->
emqx_metrics:all(); emqx_metrics:all();
@ -201,7 +201,7 @@ get_stats() ->
begin begin
Stats = get_stats(Node), Stats = get_stats(Node),
delete_keys(Stats, GlobalStatsKeys) delete_keys(Stats, GlobalStatsKeys)
end || Node <- ekka_mnesia:running_nodes()]), end || Node <- mria_mnesia:running_nodes()]),
GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))), GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
maps:merge(CountStats, GlobalStats). maps:merge(CountStats, GlobalStats).
@ -232,10 +232,10 @@ nodes_info_count(PropList) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
lookup_client({clientid, ClientId}, FormatFun) -> lookup_client({clientid, ClientId}, FormatFun) ->
lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- ekka_mnesia:running_nodes()]); lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]);
lookup_client({username, Username}, FormatFun) -> lookup_client({username, Username}, FormatFun) ->
lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]). lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- mria_mnesia:running_nodes()]).
lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
lists:append(lists:map( lists:append(lists:map(
@ -257,7 +257,7 @@ lookup_client(Node, {username, Username}, FormatFun) ->
rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]).
kickout_client(ClientId) -> kickout_client(ClientId) ->
Results = [kickout_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], Results = [kickout_client(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok; true -> ok;
false -> lists:last(Results) false -> lists:last(Results)
@ -273,7 +273,7 @@ list_authz_cache(ClientId) ->
call_client(ClientId, list_authz_cache). call_client(ClientId, list_authz_cache).
list_client_subscriptions(ClientId) -> list_client_subscriptions(ClientId) ->
Results = [client_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
Expected = lists:filter(fun({error, _}) -> false; Expected = lists:filter(fun({error, _}) -> false;
([]) -> false; ([]) -> false;
(_) -> true (_) -> true
@ -290,7 +290,7 @@ client_subscriptions(Node, ClientId) ->
rpc_call(Node, client_subscriptions, [Node, ClientId]). rpc_call(Node, client_subscriptions, [Node, ClientId]).
clean_authz_cache(ClientId) -> clean_authz_cache(ClientId) ->
Results = [clean_authz_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok; true -> ok;
false -> lists:last(Results) false -> lists:last(Results)
@ -308,7 +308,7 @@ clean_authz_cache(Node, ClientId) ->
rpc_call(Node, clean_authz_cache, [Node, ClientId]). rpc_call(Node, clean_authz_cache, [Node, ClientId]).
clean_authz_cache_all() -> clean_authz_cache_all() ->
Results = [{Node, clean_authz_cache_all(Node)} || Node <- ekka_mnesia:running_nodes()], Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()],
case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of
[] -> ok; [] -> ok;
BadNodes -> {error, BadNodes} BadNodes -> {error, BadNodes}
@ -328,7 +328,7 @@ set_quota_policy(ClientId, Policy) ->
%% @private %% @private
call_client(ClientId, Req) -> call_client(ClientId, Req) ->
Results = [call_client(Node, ClientId, Req) || Node <- ekka_mnesia:running_nodes()], Results = [call_client(Node, ClientId, Req) || Node <- mria_mnesia:running_nodes()],
Expected = lists:filter(fun({error, _}) -> false; Expected = lists:filter(fun({error, _}) -> false;
(_) -> true (_) -> true
end, Results), end, Results),
@ -366,7 +366,7 @@ list_subscriptions(Node) ->
rpc_call(Node, list_subscriptions, [Node]). rpc_call(Node, list_subscriptions, [Node]).
list_subscriptions_via_topic(Topic, FormatFun) -> list_subscriptions_via_topic(Topic, FormatFun) ->
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- ekka_mnesia:running_nodes()]). lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- mria_mnesia:running_nodes()]).
list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() -> list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}], MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
@ -376,7 +376,7 @@ list_subscriptions_via_topic(Node, Topic, FormatFun) ->
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]). rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
lookup_subscriptions(ClientId) -> lookup_subscriptions(ClientId) ->
lists:append([lookup_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()]). lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
lookup_subscriptions(Node, ClientId) when Node =:= node() -> lookup_subscriptions(Node, ClientId) when Node =:= node() ->
case ets:lookup(emqx_subid, ClientId) of case ets:lookup(emqx_subid, ClientId) of
@ -400,7 +400,7 @@ lookup_routes(Topic) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
subscribe(ClientId, TopicTables) -> subscribe(ClientId, TopicTables) ->
subscribe(ekka_mnesia:running_nodes(), ClientId, TopicTables). subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables).
subscribe([Node | Nodes], ClientId, TopicTables) -> subscribe([Node | Nodes], ClientId, TopicTables) ->
case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of
@ -424,7 +424,7 @@ publish(Msg) ->
emqx:publish(Msg). emqx:publish(Msg).
unsubscribe(ClientId, Topic) -> unsubscribe(ClientId, Topic) ->
unsubscribe(ekka_mnesia:running_nodes(), ClientId, Topic). unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic).
unsubscribe([Node | Nodes], ClientId, Topic) -> unsubscribe([Node | Nodes], ClientId, Topic) ->
case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of
@ -447,7 +447,7 @@ do_unsubscribe(ClientId, Topic) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
list_plugins() -> list_plugins() ->
[{Node, list_plugins(Node)} || Node <- ekka_mnesia:running_nodes()]. [{Node, list_plugins(Node)} || Node <- mria_mnesia:running_nodes()].
list_plugins(Node) when Node =:= node() -> list_plugins(Node) when Node =:= node() ->
emqx_plugins:list(); emqx_plugins:list();
@ -474,7 +474,7 @@ reload_plugin(Node, Plugin) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
list_listeners() -> list_listeners() ->
lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]). lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]).
list_listeners(Node) when Node =:= node() -> list_listeners(Node) when Node =:= node() ->
[Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()]; [Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()];
@ -505,7 +505,7 @@ manage_listener(Operation, Param = #{node := Node}) ->
rpc_call(Node, manage_listener, [Operation, Param]). rpc_call(Node, manage_listener, [Operation, Param]).
update_listener(Id, Config) -> update_listener(Id, Config) ->
[update_listener(Node, Id, Config) || Node <- ekka_mnesia:running_nodes()]. [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
update_listener(Node, Id, Config) when Node =:= node() -> update_listener(Node, Id, Config) when Node =:= node() ->
case emqx_listeners:parse_listener_id(Id) of case emqx_listeners:parse_listener_id(Id) of
@ -523,7 +523,7 @@ update_listener(Node, Id, Config) ->
rpc_call(Node, update_listener, [Node, Id, Config]). rpc_call(Node, update_listener, [Node, Id, Config]).
remove_listener(Id) -> remove_listener(Id) ->
[remove_listener(Node, Id) || Node <- ekka_mnesia:running_nodes()]. [remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()].
remove_listener(Node, Id) when Node =:= node() -> remove_listener(Node, Id) when Node =:= node() ->
{Type, Name} = emqx_listeners:parse_listener_id(Id), {Type, Name} = emqx_listeners:parse_listener_id(Id),
@ -540,7 +540,7 @@ remove_listener(Node, Id) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
get_alarms(Type) -> get_alarms(Type) ->
[{Node, get_alarms(Node, Type)} || Node <- ekka_mnesia:running_nodes()]. [{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()].
get_alarms(Node, Type) when Node =:= node() -> get_alarms(Node, Type) when Node =:= node() ->
add_duration_field(emqx_alarm:get_alarms(Type)); add_duration_field(emqx_alarm:get_alarms(Type));
@ -553,7 +553,7 @@ deactivate(Node, Name) ->
rpc_call(Node, deactivate, [Node, Name]). rpc_call(Node, deactivate, [Node, Name]).
delete_all_deactivated_alarms() -> delete_all_deactivated_alarms() ->
[delete_all_deactivated_alarms(Node) || Node <- ekka_mnesia:running_nodes()]. [delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()].
delete_all_deactivated_alarms(Node) when Node =:= node() -> delete_all_deactivated_alarms(Node) when Node =:= node() ->
emqx_alarm:delete_all_deactivated_alarms(); emqx_alarm:delete_all_deactivated_alarms();
@ -621,5 +621,3 @@ max_row_limit() ->
?MAX_ROW_LIMIT. ?MAX_ROW_LIMIT.
table_size(Tab) -> ets:info(Tab, size). table_size(Tab) -> ets:info(Tab, size).

View File

@ -167,7 +167,7 @@ cluster_query(Params, Tab, QsSchema, QueryFun) ->
{_CodCnt, Qs} = params2qs(Params, QsSchema), {_CodCnt, Qs} = params2qs(Params, QsSchema),
Limit = b2i(limit(Params)), Limit = b2i(limit(Params)),
Page = b2i(page(Params)), Page = b2i(page(Params)),
Nodes = ekka_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
Meta = #{page => Page, limit => Limit, count => 0}, Meta = #{page => Page, limit => Limit, count => 0},
page_limit_check_query(Meta, {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, Meta]}). page_limit_check_query(Meta, {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, Meta]}).

View File

@ -300,7 +300,7 @@ manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}})
Result; Result;
manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) -> manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) ->
Results = [do_manage_listeners(Node, Id, Oper) || Node <- ekka_mnesia:running_nodes()], Results = [do_manage_listeners(Node, Id, Oper) || Node <- mria_mnesia:running_nodes()],
case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of
[] -> {200}; [] -> {200};
Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}} Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}}

View File

@ -154,6 +154,6 @@ list(get, #{query_string := Qs}) ->
{200, emqx_mgmt:get_metrics()}; {200, emqx_mgmt:get_metrics()};
_ -> _ ->
Data = [maps:from_list(emqx_mgmt:get_metrics(Node) ++ [{node, Node}]) || Data = [maps:from_list(emqx_mgmt:get_metrics(Node) ++ [{node, Node}]) ||
Node <- ekka_mnesia:running_nodes()], Node <- mria_mnesia:running_nodes()],
{200, Data} {200, Data}
end. end.

View File

@ -96,6 +96,6 @@ list(get, #{query_string := Qs}) ->
{200, emqx_mgmt:get_stats()}; {200, emqx_mgmt:get_stats()};
_ -> _ ->
Data = [maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}]) || Data = [maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}]) ||
Node <- ekka_mnesia:running_nodes()], Node <- mria_mnesia:running_nodes()],
{200, Data} {200, Data}
end. end.

View File

@ -28,7 +28,7 @@
start(_Type, _Args) -> start(_Type, _Args) ->
{ok, Sup} = emqx_mgmt_sup:start_link(), {ok, Sup} = emqx_mgmt_sup:start_link(),
ok = ekka_rlog:wait_for_shards([?MANAGEMENT_SHARD], infinity), ok = mria_rlog:wait_for_shards([?MANAGEMENT_SHARD], infinity),
emqx_mgmt_cli:load(), emqx_mgmt_cli:load(),
{ok, Sup}. {ok, Sup}.

View File

@ -24,7 +24,7 @@ init_suite() ->
init_suite([]). init_suite([]).
init_suite(Apps) -> init_suite(Apps) ->
ekka_mnesia:start(), mria:start(),
application:load(emqx_management), application:load(emqx_management),
emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], fun set_special_configs/1). emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], fun set_special_configs/1).

View File

@ -25,7 +25,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([ start_link/0 -export([ start_link/0
, on_message_publish/1 , on_message_publish/1
@ -64,14 +63,12 @@
%% Mnesia bootstrap %% Mnesia bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, ordered_set}, {type, ordered_set},
{disc_copies, [node()]}, {storage, disc_copies},
{local_content, true}, {local_content, true},
{record_name, delayed_message}, {record_name, delayed_message},
{attributes, record_info(fields, delayed_message)}]); {attributes, record_info(fields, delayed_message)}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Hooks %% Hooks
@ -184,7 +181,7 @@ delete_delayed_message(Id0) ->
{error, not_found}; {error, not_found};
Rows -> Rows ->
Timestamp = hd(Rows), Timestamp = hd(Rows),
ekka_mnesia:dirty_delete(?TAB, {Timestamp, Id}) mria:dirty_delete(?TAB, {Timestamp, Id})
end. end.
update_config(Config) -> update_config(Config) ->
{ok, _} = emqx:update_config([delayed], Config). {ok, _} = emqx:update_config([delayed], Config).
@ -205,7 +202,7 @@ handle_call({set_max_delayed_messages, Max}, _From, State) ->
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, handle_call({store, DelayedMsg = #delayed_message{key = Key}},
_From, State = #{max_delayed_messages := 0}) -> _From, State = #{max_delayed_messages := 0}) ->
ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), ok = mria:dirty_write(?TAB, DelayedMsg),
emqx_metrics:inc('messages.delayed'), emqx_metrics:inc('messages.delayed'),
{reply, ok, ensure_publish_timer(Key, State)}; {reply, ok, ensure_publish_timer(Key, State)};
@ -216,7 +213,7 @@ handle_call({store, DelayedMsg = #delayed_message{key = Key}},
true -> true ->
{reply, {error, max_delayed_messages_full}, State}; {reply, {error, max_delayed_messages_full}, State};
false -> false ->
ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), ok = mria:dirty_write(?TAB, DelayedMsg),
emqx_metrics:inc('messages.delayed'), emqx_metrics:inc('messages.delayed'),
{reply, ok, ensure_publish_timer(Key, State)} {reply, ok, ensure_publish_timer(Key, State)}
end; end;
@ -240,7 +237,7 @@ handle_cast(Msg, State) ->
%% Do Publish... %% Do Publish...
handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) -> handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) ->
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)),
lists:foreach(fun(Key) -> ekka_mnesia:dirty_delete(?TAB, Key) end, DeletedKeys), lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys),
{noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})}; {noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})};
handle_info(stats, State = #{stats_fun := StatsFun}) -> handle_info(stats, State = #{stats_fun := StatsFun}) ->

View File

@ -210,7 +210,7 @@ generate_max_delayed_messages(Config) ->
update_config_(Config) -> update_config_(Config) ->
lists:foreach(fun(Node) -> lists:foreach(fun(Node) ->
update_config_(Node, Config) update_config_(Node, Config)
end, ekka_mnesia:running_nodes()). end, mria_mnesia:running_nodes()).
update_config_(Node, Config) when Node =:= node() -> update_config_(Node, Config) when Node =:= node() ->
_ = emqx_delayed:update_config(Config), _ = emqx_delayed:update_config(Config),

View File

@ -30,7 +30,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([ start_link/0 -export([ start_link/0
, stop/0 , stop/0
@ -91,14 +90,12 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TELEMETRY, ok = mria:create_table(?TELEMETRY,
[{type, set}, [{type, set},
{disc_copies, [node()]}, {storage, disc_copies},
{local_content, true}, {local_content, true},
{record_name, telemetry}, {record_name, telemetry},
{attributes, record_info(fields, telemetry)}]); {attributes, record_info(fields, telemetry)}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TELEMETRY, disc_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
@ -130,7 +127,7 @@ get_telemetry() ->
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% This is to suppress dialyzer warnings for mnesia:dirty_write and %% This is to suppress dialyzer warnings for mria:dirty_write and
%% dirty_read race condition. Given that the init function is not evaluated %% dirty_read race condition. Given that the init function is not evaluated
%% concurrently in one node, it should be free of race condition. %% concurrently in one node, it should be free of race condition.
%% Given the chance of having two nodes bootstraping with the write %% Given the chance of having two nodes bootstraping with the write
@ -140,8 +137,8 @@ init(_Opts) ->
UUID1 = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of UUID1 = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of
[] -> [] ->
UUID = generate_uuid(), UUID = generate_uuid(),
ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, mria:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
uuid = UUID}), uuid = UUID}),
UUID; UUID;
[#telemetry{uuid = UUID} | _] -> [#telemetry{uuid = UUID} | _] ->
UUID UUID
@ -268,7 +265,7 @@ uptime() ->
element(1, erlang:statistics(wall_clock)). element(1, erlang:statistics(wall_clock)).
nodes_uuid() -> nodes_uuid() ->
Nodes = lists:delete(node(), ekka_mnesia:running_nodes()), Nodes = lists:delete(node(), mria_mnesia:running_nodes()),
lists:foldl(fun(Node, Acc) -> lists:foldl(fun(Node, Acc) ->
case rpc:call(Node, ?MODULE, get_uuid, []) of case rpc:call(Node, ?MODULE, get_uuid, []) of
{badrpc, _Reason} -> {badrpc, _Reason} ->

View File

@ -152,7 +152,7 @@ data(get, _Request) ->
enable_telemetry(Enable) -> enable_telemetry(Enable) ->
lists:foreach(fun(Node) -> lists:foreach(fun(Node) ->
enable_telemetry(Node, Enable) enable_telemetry(Node, Enable)
end, ekka_mnesia:running_nodes()). end, mria_mnesia:running_nodes()).
enable_telemetry(Node, Enable) when Node =:= node() -> enable_telemetry(Node, Enable) when Node =:= node() ->
case Enable of case Enable of

View File

@ -35,7 +35,7 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ekka_mnesia:start(), mria:start(),
ok = emqx_delayed:mnesia(boot), ok = emqx_delayed:mnesia(boot),
emqx_common_test_helpers:start_apps([emqx_modules]), emqx_common_test_helpers:start_apps([emqx_modules]),
Config. Config.

View File

@ -28,7 +28,7 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ok = ekka_mnesia:start(), ok = mria:start(),
ok = emqx_telemetry:mnesia(boot), ok = emqx_telemetry:mnesia(boot),
emqx_common_test_helpers:start_apps([emqx_modules]), emqx_common_test_helpers:start_apps([emqx_modules]),
Config. Config.

View File

@ -590,6 +590,6 @@ emqx_cluster() ->
]. ].
emqx_cluster_data() -> emqx_cluster_data() ->
#{running_nodes := Running, stopped_nodes := Stopped} = ekka_mnesia:cluster_info(), #{running_nodes := Running, stopped_nodes := Stopped} = mria_mnesia:cluster_info(),
[{nodes_running, length(Running)}, [{nodes_running, length(Running)},
{nodes_stopped, length(Stopped)}]. {nodes_stopped, length(Stopped)}].

View File

@ -47,7 +47,6 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-define(PSK_SHARD, emqx_psk_shard). -define(PSK_SHARD, emqx_psk_shard).
@ -64,16 +63,13 @@
%% @doc Create or replicate tables. %% @doc Create or replicate tables.
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{rlog_shard, ?PSK_SHARD}, {rlog_shard, ?PSK_SHARD},
{type, ordered_set}, {type, ordered_set},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, psk_entry}, {record_name, psk_entry},
{attributes, record_info(fields, psk_entry)}, {attributes, record_info(fields, psk_entry)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs
@ -237,7 +233,7 @@ trim_crlf(Bin) ->
end. end.
trans(Fun, Args) -> trans(Fun, Args) ->
case ekka_mnesia:transaction(?PSK_SHARD, Fun, Args) of case mria:transaction(?PSK_SHARD, Fun, Args) of
{atomic, Res} -> Res; {atomic, Res} -> Res;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end. end.

View File

@ -52,15 +52,14 @@ create_resource(#{storage_type := StorageType}) ->
{read_concurrency, true}, {read_concurrency, true},
{write_concurrency, true}]}, {write_concurrency, true}]},
{dets, [{auto_save, 1000}]}], {dets, [{auto_save, 1000}]}],
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, set}, {type, set},
{rlog_shard, ?RETAINER_SHARD}, {rlog_shard, ?RETAINER_SHARD},
{Copies, [node()]}, {storage, Copies},
{record_name, retained}, {record_name, retained},
{attributes, record_info(fields, retained)}, {attributes, record_info(fields, retained)},
{storage_properties, StoreProps}]), {storage_properties, StoreProps}]),
ok = ekka_mnesia:copy_table(?TAB, Copies), ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
ok = ekka_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
case mnesia:table_info(?TAB, storage_type) of case mnesia:table_info(?TAB, storage_type) of
Copies -> ok; Copies -> ok;
_Other -> _Other ->
@ -73,10 +72,10 @@ store_retained(_, Msg =#message{topic = Topic}) ->
case is_table_full() of case is_table_full() of
false -> false ->
ok = emqx_metrics:inc('messages.retained'), ok = emqx_metrics:inc('messages.retained'),
ekka_mnesia:dirty_write(?TAB, mria:dirty_write(?TAB,
#retained{topic = topic2tokens(Topic), #retained{topic = topic2tokens(Topic),
msg = Msg, msg = Msg,
expiry_time = ExpiryTime}); expiry_time = ExpiryTime});
_ -> _ ->
Tokens = topic2tokens(Topic), Tokens = topic2tokens(Topic),
Fun = fun() -> Fun = fun() ->
@ -94,7 +93,7 @@ store_retained(_, Msg =#message{topic = Topic}) ->
ok ok
end end
end, end,
{atomic, ok} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), {atomic, ok} = mria:transaction(?RETAINER_SHARD, Fun),
ok ok
end. end.
@ -106,7 +105,7 @@ clear_expired(_) ->
Keys = mnesia:select(?TAB, Ms, write), Keys = mnesia:select(?TAB, Ms, write),
lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys) lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys)
end, end,
{atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), {atomic, _} = mria:transaction(?RETAINER_SHARD, Fun),
ok. ok.
delete_message(_, Topic) -> delete_message(_, Topic) ->
@ -117,12 +116,8 @@ delete_message(_, Topic) ->
Fun = fun() -> Fun = fun() ->
mnesia:delete({?TAB, Tokens}) mnesia:delete({?TAB, Tokens})
end, end,
case ekka_mnesia:transaction(?RETAINER_SHARD, Fun) of _ = mria:transaction(?RETAINER_SHARD, Fun),
{atomic, Result} -> ok
Result;
ok ->
ok
end
end, end,
ok. ok.
@ -157,7 +152,7 @@ match_messages(_, Topic, Cursor) ->
end. end.
clean(_) -> clean(_) ->
ekka_mnesia:clear_table(?TAB), _ = mria:clear_table(?TAB),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
@ -214,7 +209,7 @@ match_delete_messages(Filter) ->
MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'}, MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'},
Ms = [{MsHd, [], ['$_']}], Ms = [{MsHd, [], ['$_']}],
Rs = mnesia:dirty_select(?TAB, Ms), Rs = mnesia:dirty_select(?TAB, Ms),
lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Rs). lists:foreach(fun(R) -> mria:dirty_delete_object(?TAB, R) end, Rs).
%% @private %% @private
condition(Ws) -> condition(Ws) ->

View File

@ -338,4 +338,4 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
get_rule_metrics(Id) -> get_rule_metrics(Id) ->
[maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
|| Node <- ekka_mnesia:running_nodes()]. || Node <- mria_mnesia:running_nodes()].

View File

@ -50,7 +50,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}

View File

@ -60,7 +60,7 @@ community_plugin_overrides() ->
%% Temporary workaround for a rebar3 erl_opts duplication %% Temporary workaround for a rebar3 erl_opts duplication
%% bug. Ideally, we want to set this define globally %% bug. Ideally, we want to set this define globally
snabbkaffe_overrides() -> snabbkaffe_overrides() ->
Apps = [snabbkaffe, ekka], Apps = [snabbkaffe, ekka, mria],
[{add, App, [{erl_opts, [{d, snk_kind, msg}]}]} || App <- Apps]. [{add, App, [{erl_opts, [{d, snk_kind, msg}]}]} || App <- Apps].
config(HasElixir) -> config(HasElixir) ->