diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index b851700b9..0c8f5e7ba 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -15,7 +15,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.6"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 3f167d093..f3ea37b67 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -5,7 +5,7 @@ {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, - {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy,lc]}, + {applications, [kernel,stdlib,gproc,gen_rpc,mria,esockd,cowboy,sasl,os_mon,jiffy,lc]}, {mod, {emqx_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 2df126ea7..2585494eb 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -26,7 +26,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([post_config_update/4]). @@ -95,21 +94,18 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?ACTIVATED_ALARM, + ok = mria:create_table(?ACTIVATED_ALARM, [{type, set}, - {disc_copies, [node()]}, + {storage, disc_copies}, {local_content, true}, {record_name, activated_alarm}, {attributes, record_info(fields, activated_alarm)}]), - ok = ekka_mnesia:create_table(?DEACTIVATED_ALARM, + ok = mria:create_table(?DEACTIVATED_ALARM, [{type, ordered_set}, - {disc_copies, [node()]}, + {storage, disc_copies}, {local_content, true}, {record_name, deactivated_alarm}, - {attributes, record_info(fields, deactivated_alarm)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?ACTIVATED_ALARM, disc_copies), - ok = ekka_mnesia:copy_table(?DEACTIVATED_ALARM, disc_copies). + {attributes, record_info(fields, deactivated_alarm)}]). %%-------------------------------------------------------------------- %% API @@ -184,6 +180,7 @@ to_rfc3339(Timestamp) -> %%-------------------------------------------------------------------- init([]) -> + _ = mria:wait_for_tables([?ACTIVATED_ALARM, ?DEACTIVATED_ALARM]), deactivate_all_alarms(), ok = emqx_config_handler:add_handler([alarm], ?MODULE), {ok, #state{timer = ensure_timer(undefined, get_validity_period())}}. @@ -201,7 +198,7 @@ handle_call({activate_alarm, Name, Details}, _From, State) -> details = Details, message = normalize_message(Name, Details), activate_at = erlang:system_time(microsecond)}, - ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), + mria:dirty_write(?ACTIVATED_ALARM, Alarm), do_actions(activate, Alarm, emqx:get_config([alarm, actions])), {reply, ok, State} end; @@ -221,7 +218,7 @@ handle_call(delete_all_deactivated_alarms, _From, State) -> handle_call({get_alarms, all}, _From, State) -> {atomic, Alarms} = - ekka_mnesia:ro_transaction( + mria:ro_transaction( ?COMMON_SHARD, fun() -> [normalize(Alarm) || @@ -282,7 +279,7 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name case mnesia:dirty_first(?DEACTIVATED_ALARM) of '$end_of_table' -> ok; ActivateAt2 -> - ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) + mria:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) end; false -> ok end, @@ -291,8 +288,8 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, normalize_message(Name, Details), erlang:system_time(microsecond)), - ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), - ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name), + mria:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), + mria:dirty_delete(?ACTIVATED_ALARM, Name), do_actions(deactivate, DeActAlarm, emqx:get_config([alarm, actions])). make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> @@ -309,7 +306,7 @@ deactivate_all_alarms() -> details = Details, message = Message, activate_at = ActivateAt}) -> - ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, + mria:dirty_write(?DEACTIVATED_ALARM, #deactivated_alarm{ activate_at = ActivateAt, name = Name, @@ -321,7 +318,7 @@ deactivate_all_alarms() -> %% Delete all records from the given table, ignore result. clear_table(TableName) -> - case ekka_mnesia:clear_table(TableName) of + case mria:clear_table(TableName) of {aborted, Reason} -> ?SLOG(warning, #{ msg => "fail_to_clear_table", @@ -347,7 +344,7 @@ delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) -> delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) -> case ActivatedAt =< Checkpoint of true -> - ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), + mria:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt), delete_expired_deactivated_alarms(NActivatedAt, Checkpoint); false -> diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index cd435b864..662439397 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -57,7 +57,7 @@ stop(_State) -> ok. ensure_ekka_started() -> ekka:start(), - ok = ekka_rlog:wait_for_shards(?BOOT_SHARDS, infinity). + ok = mria_rlog:wait_for_shards(?BOOT_SHARDS, infinity). %% @doc Call this function to make emqx boot without loading config, %% in case we want to delegate the config load to a higher level app diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 759c9f955..dfd299d90 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -27,7 +27,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([start_link/0, stop/0]). @@ -58,16 +57,13 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?BANNED_TAB, [ + ok = mria:create_table(?BANNED_TAB, [ {type, set}, {rlog_shard, ?COMMON_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, banned}, {attributes, record_info(fields, banned)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?BANNED_TAB, disc_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). %% @doc Start the banned server. -spec(start_link() -> startlink_ret()). @@ -155,13 +151,13 @@ create(#{who := Who, reason := Reason, at := At, until := Until}) -> - ekka_mnesia:dirty_write(?BANNED_TAB, #banned{who = Who, - by = By, - reason = Reason, - at = At, - until = Until}); + mria:dirty_write(?BANNED_TAB, #banned{who = Who, + by = By, + reason = Reason, + at = At, + until = Until}); create(Banned) when is_record(Banned, banned) -> - ekka_mnesia:dirty_write(?BANNED_TAB, Banned). + mria:dirty_write(?BANNED_TAB, Banned). look_up(Who) when is_map(Who) -> look_up(pares_who(Who)); @@ -174,7 +170,7 @@ look_up(Who) -> delete(Who) when is_map(Who)-> delete(pares_who(Who)); delete(Who) -> - ekka_mnesia:dirty_delete(?BANNED_TAB, Who). + mria:dirty_delete(?BANNED_TAB, Who). info(InfoKey) -> mnesia:table_info(?BANNED_TAB, InfoKey). @@ -195,7 +191,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - ekka_mnesia:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), + _ = mria:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index ef7ad6131..492c6dacd 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -71,7 +71,7 @@ register_channel(ClientId) when is_binary(ClientId) -> register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> ekka_mnesia:dirty_write(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_write(?TAB, record(ClientId, ChanPid)); false -> ok end. @@ -83,7 +83,7 @@ unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> ekka_mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_delete_object(?TAB, record(ClientId, ChanPid)); false -> ok end. @@ -100,16 +100,15 @@ record(ClientId, ChanPid) -> %%-------------------------------------------------------------------- init([]) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {type, bag}, {rlog_shard, ?CM_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, channel}, {attributes, record_info(fields, channel)}, {storage_properties, [{ets, [{read_concurrency, true}, {write_concurrency, true}]}]}]), - ok = ekka_mnesia:copy_table(?TAB, ram_copies), - ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?CM_SHARD], infinity), ok = ekka:monitor(membership), {ok, #{}}. @@ -124,7 +123,7 @@ handle_cast(Msg, State) -> handle_info({membership, {mnesia, down, Node}}, State) -> global:trans({?LOCK, self()}, fun() -> - ekka_mnesia:transaction(?CM_SHARD, fun cleanup_channels/1, [Node]) + mria:transaction(?CM_SHARD, fun cleanup_channels/1, [Node]) end), {noreply, State}; diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 3e5475c13..3337b57c8 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -28,7 +28,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([start_link/2]). @@ -74,16 +73,14 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?ROUTE_TAB, [ + ok = mria:create_table(?ROUTE_TAB, [ {type, bag}, {rlog_shard, ?ROUTE_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, route}, {attributes, record_info(fields, route)}, {storage_properties, [{ets, [{read_concurrency, true}, - {write_concurrency, true}]}]}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?ROUTE_TAB, ram_copies). + {write_concurrency, true}]}]}]). %%-------------------------------------------------------------------- %% Start a router @@ -225,7 +222,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- insert_direct_route(Route) -> - ekka_mnesia:dirty_write(?ROUTE_TAB, Route). + mria:dirty_write(?ROUTE_TAB, Route). insert_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE_TAB, Topic}) of @@ -235,7 +232,7 @@ insert_trie_route(Route = #route{topic = Topic}) -> mnesia:write(?ROUTE_TAB, Route, sticky_write). delete_direct_route(Route) -> - ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route). + mria:dirty_delete_object(?ROUTE_TAB, Route). delete_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE_TAB, Topic}) of @@ -255,7 +252,7 @@ maybe_trans(Fun, Args) -> trans(Fun, Args); global -> %% Assert: - mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash + mnesia = mria_rlog:backend(), %% TODO: do something smarter than just crash lock_router(), try mnesia:sync_dirty(Fun, Args) after @@ -276,11 +273,11 @@ trans(Fun, Args) -> {WPid, RefMon} = spawn_monitor( %% NOTE: this is under the assumption that crashes in Fun - %% are caught by mnesia:transaction/2. + %% are caught by mria:transaction/2. %% Future changes should keep in mind that this process %% always exit with database write result. fun() -> - Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of + Res = case mria:transaction(?ROUTE_SHARD, Fun, Args) of {atomic, Ok} -> Ok; {aborted, Reason} -> {error, Reason} end, diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index a88e82d8d..aecce70ac 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -27,7 +27,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). %% API -export([ start_link/0 @@ -59,16 +58,13 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?ROUTING_NODE, [ + ok = mria:create_table(?ROUTING_NODE, [ {type, set}, {rlog_shard, ?ROUTE_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, routing_node}, {attributes, record_info(fields, routing_node)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?ROUTING_NODE, ram_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). %%-------------------------------------------------------------------- %% API @@ -87,7 +83,7 @@ monitor(Node) when is_atom(Node) -> case ekka:is_member(Node) orelse ets:member(?ROUTING_NODE, Node) of true -> ok; - false -> ekka_mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node}) + false -> mria:dirty_write(?ROUTING_NODE, #routing_node{name = Node}) end. %%-------------------------------------------------------------------- @@ -136,9 +132,9 @@ handle_info({mnesia_table_event, Event}, State) -> handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> global:trans({?LOCK, self()}, fun() -> - ekka_mnesia:transaction(fun cleanup_routes/1, [Node]) + mria:transaction(?ROUTE_SHARD, fun cleanup_routes/1, [Node]) end), - ok = ekka_mnesia:dirty_delete(?ROUTING_NODE, Node), + ok = mria:dirty_delete(?ROUTING_NODE, Node), {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate}; handle_info({membership, {mnesia, down, Node}}, State) -> diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index ef8e3d288..79a7d5522 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -28,7 +28,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). %% APIs -export([start_link/0]). @@ -85,15 +84,12 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {type, bag}, {rlog_shard, ?SHARED_SUB_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, emqx_shared_subscription}, - {attributes, record_info(fields, emqx_shared_subscription)}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, ram_copies). + {attributes, record_info(fields, emqx_shared_subscription)}]). %%-------------------------------------------------------------------- %% API @@ -297,7 +293,7 @@ subscribers(Group, Topic) -> init([]) -> {ok, _} = mnesia:subscribe({table, ?TAB, simple}), - {atomic, PMon} = ekka_mnesia:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), + {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), {ok, update_stats(#state{pmon = PMon})}. @@ -309,7 +305,7 @@ init_monitors() -> end, emqx_pmon:new(), ?TAB). handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> - ekka_mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), + mria:dirty_write(?TAB, record(Group, Topic, SubPid)), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) @@ -319,7 +315,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> - ekka_mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), + mria:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}), {reply, ok, State}; @@ -373,7 +369,7 @@ cleanup_down(SubPid) -> ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid), lists:foreach( fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> - ok = ekka_mnesia:dirty_delete_object(?TAB, Record), + ok = mria:dirty_delete_object(?TAB, Record), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}) end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 692d2bd0a..7882abc19 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -150,7 +150,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Version, sysdescr = Descr}) -> publish_any(version, Version), publish_any(sysdescr, Descr), - publish_any(brokers, ekka_mnesia:running_nodes()), + publish_any(brokers, mria_mnesia:running_nodes()), publish_any(stats, emqx_stats:getstats()), publish_any(metrics, emqx_metrics:all()), {noreply, tick(State), hibernate}; diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 1e3a0e5a5..4354a2bab 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -22,7 +22,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). %% Trie APIs -export([ insert/1 @@ -61,16 +60,13 @@ mnesia(boot) -> StoreProps = [{ets, [{read_concurrency, true}, {write_concurrency, true} ]}], - ok = ekka_mnesia:create_table(?TRIE, [ + ok = mria:create_table(?TRIE, [ {rlog_shard, ?ROUTE_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, ?TRIE}, {attributes, record_info(fields, ?TRIE)}, {type, ordered_set}, - {storage_properties, StoreProps}]); -mnesia(copy) -> - %% Copy topics table - ok = ekka_mnesia:copy_table(?TRIE, ram_copies). + {storage_properties, StoreProps}]). %%-------------------------------------------------------------------- %% Topics APIs @@ -329,6 +325,6 @@ do_compact_test() -> do_compact(words(<<"a/+/+/+/+/b">>))), ok. -clear_tables() -> ekka_mnesia:clear_table(?TRIE). +clear_tables() -> mria:clear_table(?TRIE). -endif. % TEST diff --git a/apps/emqx/test/emqx_banned_SUITE.erl b/apps/emqx/test/emqx_banned_SUITE.erl index b215b5055..de117ab00 100644 --- a/apps/emqx/test/emqx_banned_SUITE.erl +++ b/apps/emqx/test/emqx_banned_SUITE.erl @@ -27,14 +27,12 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> application:load(emqx), ok = ekka:start(), - %% for coverage - ok = emqx_banned:mnesia(copy), Config. end_per_suite(_Config) -> ekka:stop(), - ekka_mnesia:ensure_stopped(), - ekka_mnesia:delete_schema(). + mria:stop(), + mria_mnesia:delete_schema(). t_add_delete(_) -> Banned = #banned{who = {clientid, <<"TestClient">>}, @@ -92,4 +90,3 @@ t_unused(_) -> ?assertEqual(ok, Banned ! ok), timer:sleep(500), %% expiry timer ok = emqx_banned:stop(). - diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 20ce756a0..a470370e6 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -195,7 +195,8 @@ generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) -> -spec(stop_apps(list()) -> ok). stop_apps(Apps) -> - [application:stop(App) || App <- Apps ++ [emqx, mnesia]]. + [application:stop(App) || App <- Apps ++ [emqx, mria, mnesia]], + ok. %% backward compatible deps_path(App, RelativePath) -> app_path(App, RelativePath). @@ -269,8 +270,8 @@ reload(App, SpecAppConfigHandler) -> application:start(App). ensure_mnesia_stopped() -> - ekka_mnesia:ensure_stopped(), - ekka_mnesia:delete_schema(). + mria:stop(), + mria_mnesia:delete_schema(). %% Help function to wait for Fun to yield 'true'. wait_for(Fn, Ln, F, Timeout) -> diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index 88a4d8ee0..b5a5314d7 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -35,7 +35,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([]), - ekka_mnesia:delete_schema(), %% Clean emqx_banned table + mria_mnesia:delete_schema(), %% Clean emqx_banned table ok. t_detect_check(_) -> diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index aeb60fc2c..0d14a7992 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -41,10 +41,6 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_tables(). -t_mnesia(_) -> - %% for coverage - ok = emqx_router:mnesia(copy). - % t_add_route(_) -> % error('TODO'). @@ -117,4 +113,3 @@ t_unexpected(_) -> clear_tables() -> lists:foreach(fun mnesia:clear_table/1, [emqx_route, emqx_trie, emqx_trie_node]). - diff --git a/apps/emqx/test/emqx_trie_SUITE.erl b/apps/emqx/test/emqx_trie_SUITE.erl index e22e643b0..00d64877c 100644 --- a/apps/emqx/test/emqx_trie_SUITE.erl +++ b/apps/emqx/test/emqx_trie_SUITE.erl @@ -50,8 +50,8 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ekka:stop(), - ekka_mnesia:ensure_stopped(), - ekka_mnesia:delete_schema(). + mria:stop(), + mria_mnesia:delete_schema(). init_per_testcase(_TestCase, Config) -> clear_tables(), @@ -60,9 +60,6 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_tables(). -t_mnesia(_) -> - ok = ?TRIE:mnesia(copy). - t_insert(_) -> Fun = fun() -> ?TRIE:insert(<<"sensor/1/metric/2">>), @@ -191,7 +188,6 @@ t_delete3(_) -> clear_tables() -> emqx_trie:clear_tables(). trans(Fun) -> - mnesia:transaction(Fun). + mria:transaction(?ROUTE_SHARD, Fun). trans(Fun, Args) -> - mnesia:transaction(Fun, Args). - + mria:transaction(?ROUTE_SHARD, Fun, Args). diff --git a/apps/emqx/test/props/prop_emqx_sys.erl b/apps/emqx/test/props/prop_emqx_sys.erl index 3b2b8b94c..3f0373d39 100644 --- a/apps/emqx/test/props/prop_emqx_sys.erl +++ b/apps/emqx/test/props/prop_emqx_sys.erl @@ -29,7 +29,7 @@ [ emqx_metrics , emqx_stats , emqx_broker - , ekka_mnesia + , mria_mnesia ]). -define(ALL(Vars, Types, Exprs), @@ -80,8 +80,8 @@ do_mock(emqx_broker) -> fun(Msg) -> {node(), <<"test">>, Msg} end); do_mock(emqx_stats) -> meck:expect(emqx_stats, getstats, fun() -> [0] end); -do_mock(ekka_mnesia) -> - meck:expect(ekka_mnesia, running_nodes, fun() -> [node()] end); +do_mock(mria_mnesia) -> + meck:expect(mria_mnesia, running_nodes, fun() -> [node()] end); do_mock(emqx_metrics) -> meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end). @@ -129,4 +129,3 @@ postcondition(_State, {call, _Mod, _Fun, _Args}, _Res) -> next_state(State, _Res, {call, _Mod, _Fun, _Args}) -> NewState = State, NewState. - diff --git a/apps/emqx_authn/src/emqx_authn_app.erl b/apps/emqx_authn/src/emqx_authn_app.erl index d297c9042..b71a3e16b 100644 --- a/apps/emqx_authn/src/emqx_authn_app.erl +++ b/apps/emqx_authn/src/emqx_authn_app.erl @@ -30,7 +30,7 @@ %%------------------------------------------------------------------------------ start(_StartType, _StartArgs) -> - ok = ekka_rlog:wait_for_shards([?AUTH_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?AUTH_SHARD], infinity), {ok, Sup} = emqx_authn_sup:start_link(), ok = ?AUTHN:register_providers(providers()), ok = initialize(), diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index 3daece601..92bdf0619 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -46,7 +46,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -record(user_info, { user_id @@ -63,15 +62,12 @@ %% @doc Create or replicate tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {rlog_shard, ?AUTH_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, user_info}, {attributes, record_info(fields, user_info)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, disc_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). %%------------------------------------------------------------------------------ %% Hocon Schema @@ -268,7 +264,7 @@ trans(Fun) -> trans(Fun, []). trans(Fun, Args) -> - case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of + case mria:transaction(?AUTH_SHARD, Fun, Args) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index c3b6badf7..095dc6ef8 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -59,7 +59,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -define(TAB, ?MODULE). @@ -70,15 +69,12 @@ %% @doc Create or replicate tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {rlog_shard, ?AUTH_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, user_info}, {attributes, record_info(fields, user_info)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, disc_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). %%------------------------------------------------------------------------------ %% Hocon Schema @@ -390,7 +386,7 @@ trans(Fun) -> trans(Fun, []). trans(Fun, Args) -> - case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of + case mria:transaction(?AUTH_SHARD, Fun, Args) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl index a6bd0f7b8..da9b552c0 100644 --- a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl @@ -528,10 +528,10 @@ users(get, #{query_string := Qs}) -> end; users(post, #{body := Body}) when is_list(Body) -> lists:foreach(fun(#{<<"username">> := Username, <<"rules">> := Rules}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_USERNAME, Username}, - rules = format_rules(Rules) - }) + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_USERNAME, Username}, + rules = format_rules(Rules) + }) end, Body), {204}. @@ -561,10 +561,10 @@ clients(get, #{query_string := Qs}) -> end; clients(post, #{body := Body}) when is_list(Body) -> lists:foreach(fun(#{<<"clientid">> := Clientid, <<"rules">> := Rules}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_CLIENTID, Clientid}, - rules = format_rules(Rules) - }) + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_CLIENTID, Clientid}, + rules = format_rules(Rules) + }) end, Body), {204}. @@ -581,13 +581,13 @@ user(get, #{bindings := #{username := Username}}) -> end; user(put, #{bindings := #{username := Username}, body := #{<<"username">> := Username, <<"rules">> := Rules}}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_USERNAME, Username}, - rules = format_rules(Rules) - }), + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_USERNAME, Username}, + rules = format_rules(Rules) + }), {204}; user(delete, #{bindings := #{username := Username}}) -> - ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}}), + mria:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}}), {204}. client(get, #{bindings := #{clientid := Clientid}}) -> @@ -603,13 +603,13 @@ client(get, #{bindings := #{clientid := Clientid}}) -> end; client(put, #{bindings := #{clientid := Clientid}, body := #{<<"clientid">> := Clientid, <<"rules">> := Rules}}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_CLIENTID, Clientid}, - rules = format_rules(Rules) - }), + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_CLIENTID, Clientid}, + rules = format_rules(Rules) + }), {204}; client(delete, #{bindings := #{clientid := Clientid}}) -> - ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}}), + mria:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}}), {204}. all(get, _) -> @@ -624,17 +624,17 @@ all(get, _) -> } end; all(put, #{body := #{<<"rules">> := Rules}}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = ?ACL_TABLE_ALL, - rules = format_rules(Rules) - }), + mria:dirty_write(#emqx_acl{ + who = ?ACL_TABLE_ALL, + rules = format_rules(Rules) + }), {204}. purge(delete, _) -> case emqx_authz_api_sources:get_raw_source(<<"built-in-database">>) of [#{<<"enable">> := false}] -> ok = lists:foreach(fun(Key) -> - ok = ekka_mnesia:dirty_delete(?ACL_TABLE, Key) + ok = mria:dirty_delete(?ACL_TABLE, Key) end, mnesia:dirty_all_keys(?ACL_TABLE)), {204}; [#{<<"enable">> := true}] -> diff --git a/apps/emqx_authz/src/emqx_authz_app.erl b/apps/emqx_authz/src/emqx_authz_app.erl index f868ac342..a5044443b 100644 --- a/apps/emqx_authz/src/emqx_authz_app.erl +++ b/apps/emqx_authz/src/emqx_authz_app.erl @@ -12,7 +12,7 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> - ok = ekka_rlog:wait_for_shards([?ACL_SHARDED], infinity), + ok = mria_rlog:wait_for_shards([?ACL_SHARDED], infinity), {ok, Sup} = emqx_authz_sup:start_link(), ok = emqx_authz:init(), {ok, Sup}. diff --git a/apps/emqx_authz/src/emqx_authz_mnesia.erl b/apps/emqx_authz/src/emqx_authz_mnesia.erl index ab755403e..3851affed 100644 --- a/apps/emqx_authz/src/emqx_authz_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_mnesia.erl @@ -32,18 +32,15 @@ -endif. -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> - ok = ekka_mnesia:create_table(?ACL_TABLE, [ + ok = mria:create_table(?ACL_TABLE, [ {type, ordered_set}, {rlog_shard, ?ACL_SHARDED}, - {disc_copies, [node()]}, + {storage, disc_copies}, {attributes, record_info(fields, ?ACL_TABLE)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?ACL_TABLE, disc_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). description() -> "AuthZ with Mnesia". diff --git a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl index 947f46a82..f2562becc 100644 --- a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl @@ -54,24 +54,24 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_authz, Config) -> - mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>}, - rules = [{allow, publish, <<"test/%u">>}, - {allow, subscribe, <<"eq #">>} - ] - }]), - mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>}, - rules = [{allow, publish, <<"test/%c">>}, - {deny, subscribe, <<"eq #">>} - ] - }]), - mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL, - rules = [{deny, all, <<"#">>}] - }]), + mria:dirty_write(#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>}, + rules = [{allow, publish, <<"test/%u">>}, + {allow, subscribe, <<"eq #">>} + ] + }), + mria:dirty_write(#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>}, + rules = [{allow, publish, <<"test/%c">>}, + {deny, subscribe, <<"eq #">>} + ] + }), + mria:dirty_write(#emqx_acl{who = ?ACL_TABLE_ALL, + rules = [{deny, all, <<"#">>}] + }), Config; init_per_testcase(_, Config) -> Config. end_per_testcase(t_authz, Config) -> - [ ekka_mnesia:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)], + [ mria:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)], Config; end_per_testcase(_, Config) -> Config. @@ -96,7 +96,7 @@ t_authz(_) -> listener => {tcp, default} }, - ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), + ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"#">>)), ?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_username">>)), @@ -106,4 +106,3 @@ t_authz(_) -> ?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"#">>)), ok. - diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index 58f64be90..744267e74 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -41,8 +41,8 @@ all() -> [t_auto_subscribe, t_update]. -init_per_suite(Config) -> - ekka_mnesia:start(), +init_per_suite(Config) -> + mria:start(), application:stop(?APP), meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]), @@ -159,4 +159,3 @@ check_subs([{{_, Topic}, #{subid := ?CLIENT_ID}} | Subs], List) -> check_subs(Subs, lists:delete(Topic, List)); check_subs([_ | Subs], List) -> check_subs(Subs, List). - diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index e4805d7eb..40a101640 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -153,7 +153,7 @@ param_path_operation()-> }. list_bridges(get, _Params) -> - {200, lists:append([list_local_bridges(Node) || Node <- ekka_mnesia:running_nodes()])}. + {200, lists:append([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}. list_local_bridges(Node) when Node =:= node() -> [format_resp(Data) || Data <- emqx_bridge:list_bridges()]; @@ -161,7 +161,7 @@ list_local_bridges(Node) -> rpc_call(Node, list_local_bridges, [Node]). crud_bridges_cluster(Method, Params) -> - Results = [crud_bridges(Node, Method, Params) || Node <- ekka_mnesia:running_nodes()], + Results = [crud_bridges(Node, Method, Params) || Node <- mria_mnesia:running_nodes()], case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of [] -> case Results of diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index 5af983b4d..2a7f2410a 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -21,7 +21,6 @@ -include("emqx_dashboard.hrl"). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). %% Mnesia bootstrap -export([mnesia/1]). @@ -50,16 +49,14 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(mqtt_admin, [ + ok = mria:create_table(mqtt_admin, [ {type, set}, {rlog_shard, ?DASHBOARD_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, mqtt_admin}, {attributes, record_info(fields, mqtt_admin)}, {storage_properties, [{ets, [{read_concurrency, true}, - {write_concurrency, true}]}]}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(mqtt_admin, disc_copies). + {write_concurrency, true}]}]}]). %%-------------------------------------------------------------------- %% API @@ -68,7 +65,7 @@ mnesia(copy) -> -spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}). add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) -> Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, fun add_user_/1, [Admin])). + return(mria:transaction(?DASHBOARD_SHARD, fun add_user_/1, [Admin])). force_add_user(Username, Password, Tags) -> AddFun = fun() -> @@ -76,7 +73,7 @@ force_add_user(Username, Password, Tags) -> password = Password, tags = Tags}) end, - case ekka_mnesia:transaction(?DASHBOARD_SHARD, AddFun) of + case mria:transaction(?DASHBOARD_SHARD, AddFun) of {atomic, ok} -> ok; {aborted, Reason} -> {error, Reason} end. @@ -98,11 +95,11 @@ remove_user(Username) when is_binary(Username) -> end, mnesia:delete({mqtt_admin, Username}) end, - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, Trans)). + return(mria:transaction(?DASHBOARD_SHARD, Trans)). -spec(update_user(binary(), binary()) -> ok | {error, term()}). update_user(Username, Tags) when is_binary(Username) -> - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, fun update_user_/2, [Username, Tags])). + return(mria:transaction(?DASHBOARD_SHARD, fun update_user_/2, [Username, Tags])). %% @private update_user_(Username, Tags) -> @@ -135,13 +132,13 @@ update_pwd(Username, Fun) -> end, mnesia:write(Fun(User)) end, - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, Trans)). + return(mria:transaction(?DASHBOARD_SHARD, Trans)). -spec(lookup_user(binary()) -> [mqtt_admin()]). lookup_user(Username) when is_binary(Username) -> Fun = fun() -> mnesia:read(mqtt_admin, Username) end, - {atomic, User} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun), + {atomic, User} = mria:ro_transaction(?DASHBOARD_SHARD, Fun), User. -spec(all_users() -> [#mqtt_admin{}]). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_app.erl b/apps/emqx_dashboard/src/emqx_dashboard_app.erl index 4e1b0caec..16acdb182 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_app.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_app.erl @@ -26,7 +26,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_dashboard_sup:start_link(), - ok = ekka_rlog:wait_for_shards([?DASHBOARD_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?DASHBOARD_SHARD], infinity), _ = emqx_dashboard:start_listeners(), emqx_dashboard_cli:load(), ok = emqx_dashboard_admin:add_default_user(), diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index 0e2adf7c3..0eb1e033b 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -25,7 +25,6 @@ -export([get_local_time/0]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). %% Mnesia bootstrap -export([mnesia/1]). @@ -41,14 +40,12 @@ -define(EXPIRE_INTERVAL, 86400000 * 7). mnesia(boot) -> - ok = ekka_mnesia:create_table(emqx_collect, [ + ok = mria:create_table(emqx_collect, [ {type, set}, {local_content, true}, - {disc_only_copies, [node()]}, + {storage, disc_only_copies}, {record_name, mqtt_collect}, - {attributes, record_info(fields, mqtt_collect)}]); -mnesia(copy) -> - mnesia:add_table_copy(emqx_collect, node(), disc_only_copies). + {attributes, record_info(fields, mqtt_collect)}]). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -162,8 +159,8 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) -> diff(Sent, Sent0), diff(Dropped, Dropped0)}, Ts = get_local_time(), - ekka_mnesia:transaction(ekka_mnesia:local_content_shard(), - fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]), + _ = mria:transaction(mria:local_content_shard(), + fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]), {Received, Sent, Dropped}. avg(Items) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index c00310211..ba2fc6dbe 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -158,8 +158,8 @@ counters(get, #{bindings := #{counter := Counter}}) -> lookup([{<<"counter">>, Counter}]). current_counters(get, _Params) -> - Data = [get_collect(Node) || Node <- ekka_mnesia:running_nodes()], - Nodes = length(ekka_mnesia:running_nodes()), + Data = [get_collect(Node) || Node <- mria_mnesia:running_nodes()], + Nodes = length(mria_mnesia:running_nodes()), {Received, Sent, Sub, Conn} = format_current_metrics(Data), Response = #{ nodes => Nodes, @@ -194,16 +194,16 @@ lookup_(#{node := Node, counter := Counter}) -> lookup_(#{node := Node}) -> {200, sampling(Node)}; lookup_(#{counter := Counter}) -> - CounterData = merger_counters([sampling(Node, Counter) || Node <- ekka_mnesia:running_nodes()]), + CounterData = merger_counters([sampling(Node, Counter) || Node <- mria_mnesia:running_nodes()]), Data = hd(maps:values(CounterData)), {200, Data}. list_collect(Aggregate) -> case Aggregate of <<"true">> -> - [maps:put(node, Node, sampling(Node)) || Node <- ekka_mnesia:running_nodes()]; + [maps:put(node, Node, sampling(Node)) || Node <- mria_mnesia:running_nodes()]; _ -> - Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], + Counters = [sampling(Node) || Node <- mria_mnesia:running_nodes()], merger_counters(Counters) end. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_token.erl b/apps/emqx_dashboard/src/emqx_dashboard_token.erl index c1ca15cb3..5a6771ae5 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_token.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_token.erl @@ -28,7 +28,6 @@ ]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([mnesia/1]). @@ -77,16 +76,14 @@ destroy_by_username(Username) -> do_destroy_by_username(Username). mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {type, set}, {rlog_shard, ?DASHBOARD_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, mqtt_admin_jwt}, {attributes, record_info(fields, mqtt_admin_jwt)}, {storage_properties, [{ets, [{read_concurrency, true}, - {write_concurrency, true}]}]}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, disc_copies). + {write_concurrency, true}]}]}]). %%-------------------------------------------------------------------- %% jwt apply @@ -104,7 +101,7 @@ do_sign(Username, Password) -> Signed = jose_jwt:sign(JWK, JWS, JWT), {_, Token} = jose_jws:compact(Signed), JWTRec = format(Token, Username, ExpTime), - ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]), + _ = mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]), {ok, Token}. do_verify(Token)-> @@ -113,7 +110,7 @@ do_verify(Token)-> case ExpTime > erlang:system_time(millisecond) of true -> NewJWT = JWT#mqtt_admin_jwt{exptime = jwt_expiration_time()}, - {atomic, Res} = ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]), + {atomic, Res} = mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]), Res; _ -> {error, token_timeout} @@ -124,7 +121,7 @@ do_verify(Token)-> do_destroy(Token) -> Fun = fun mnesia:delete/1, - {atomic, ok} = ekka_mnesia:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]), + {atomic, ok} = mria:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]), ok. do_destroy_by_username(Username) -> @@ -135,7 +132,7 @@ do_destroy_by_username(Username) -> -spec(lookup(Token :: binary()) -> {ok, #mqtt_admin_jwt{}} | {error, not_found}). lookup(Token) -> Fun = fun() -> mnesia:read(?TAB, Token) end, - case ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun) of + case mria:ro_transaction(?DASHBOARD_SHARD, Fun) of {atomic, [JWT]} -> {ok, JWT}; {atomic, []} -> {error, not_found} end. @@ -143,7 +140,7 @@ lookup(Token) -> lookup_by_username(Username) -> Spec = [{{mqtt_admin_jwt, '_', Username, '_'}, [], ['$_']}], Fun = fun() -> mnesia:select(?TAB, Spec) end, - {atomic, List} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun), + {atomic, List} = mria:ro_transaction(?DASHBOARD_SHARD, Fun), List. @@ -193,7 +190,7 @@ handle_info(clean_jwt, State) -> timer_clean(self()), Now = erlang:system_time(millisecond), Spec = [{{mqtt_admin_jwt, '_', '_', '$1'}, [{'<', '$1', Now}], ['$_']}], - {atomic, JWTList} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, + {atomic, JWTList} = mria:ro_transaction(?DASHBOARD_SHARD, fun() -> mnesia:select(?TAB, Spec) end), destroy(JWTList), {noreply, State}; diff --git a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl index 6d245c9bc..9f5fb3526 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -50,7 +50,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_management]), - ekka_mnesia:ensure_stopped(). + mria:stop(). set_special_configs(emqx_management) -> emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], @@ -102,7 +102,7 @@ t_rest_api(_Config) -> ok. t_cli(_Config) -> - [ekka_mnesia:dirty_delete(mqtt_admin, Admin) || Admin <- mnesia:dirty_all_keys(mqtt_admin)], + [mria:dirty_delete(mqtt_admin, Admin) || Admin <- mnesia:dirty_all_keys(mqtt_admin)], emqx_dashboard_cli:admins(["add", "username", "password"]), [{mqtt_admin, <<"username">>, <>, _}] = emqx_dashboard_admin:lookup_user(<<"username">>), diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 1d9daa637..e82537285 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -65,7 +65,7 @@ register_channel(Type, ClientId) when is_binary(ClientId) -> register_channel(Type, {ClientId, self()}); register_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> - ekka_mnesia:dirty_write(tabname(Type), record(ClientId, ChanPid)). + mria:dirty_write(tabname(Type), record(ClientId, ChanPid)). %% @doc Unregister a global channel. -spec unregister_channel(atom(), binary() | {binary(), pid()}) -> ok. @@ -73,7 +73,7 @@ unregister_channel(Type, ClientId) when is_binary(ClientId) -> unregister_channel(Type, {ClientId, self()}); unregister_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> - ekka_mnesia:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)). + mria:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)). %% @doc Lookup the global channels. -spec lookup_channels(atom(), binary()) -> list(pid()). @@ -89,16 +89,15 @@ record(ClientId, ChanPid) -> init([Type]) -> Tab = tabname(Type), - ok = ekka_mnesia:create_table(Tab, [ + ok = mria:create_table(Tab, [ {type, bag}, {rlog_shard, ?CM_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, channel}, {attributes, record_info(fields, channel)}, {storage_properties, [{ets, [{read_concurrency, true}, {write_concurrency, true}]}]}]), - ok = ekka_mnesia:copy_table(Tab, ram_copies), - %%ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), + ok = mria:wait_for_tables([Tab]), ok = ekka:monitor(membership), {ok, #{type => Type}}. @@ -115,7 +114,7 @@ handle_info({membership, {mnesia, down, Node}}, State = #{type := Type}) -> global:trans({?LOCK, self()}, fun() -> %% FIXME: The shard name should be fixed later - ekka_mnesia:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab]) + mria:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab]) end), {noreply, State}; diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 07a5ae3c4..da440dba8 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -209,7 +209,7 @@ confexp({error, already_exist}) -> emqx_type:clientid(), {atom(), atom()}) -> list(). lookup_client(GwName, ClientId, FormatFun) -> lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) - || Node <- ekka_mnesia:running_nodes()]). + || Node <- mria_mnesia:running_nodes()]). lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() -> ChanTab = emqx_gateway_cm:tabname(chan, GwName), @@ -229,7 +229,7 @@ lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) -> | ok. kickout_client(GwName, ClientId) -> Results = [kickout_client(Node, GwName, ClientId) - || Node <- ekka_mnesia:running_nodes()], + || Node <- mria_mnesia:running_nodes()], case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> lists:last(Results) diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index c1f76e95a..4f5734525 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -56,20 +56,16 @@ %-export([mnesia/1]). %-boot_mnesia({mnesia, [boot]}). -%-copy_mnesia({mnesia, [copy]}). %%% @doc Create or replicate tables. %-spec(mnesia(boot | copy) -> ok). %mnesia(boot) -> % %% Optimize storage % StoreProps = [{ets, [{read_concurrency, true}]}], -% ok = ekka_mnesia:create_table(?MODULE, [ +% ok = mria:create_table(?MODULE, [ % {attributes, record_info(fields, emqx_sn_registry)}, % {ram_copies, [node()]}, -% {storage_properties, StoreProps}]); -% -%mnesia(copy) -> -% ok = ekka_mnesia:copy_table(?MODULE, ram_copies). +% {storage_properties, StoreProps}]). -type registry() :: {Tab :: atom(), RegistryPid :: pid()}. @@ -141,28 +137,27 @@ init([InstaId, PredefTopics]) -> %% {ClientId, TopicId} -> TopicName %% {ClientId, TopicName} -> TopicId Tab = name(InstaId), - ok = ekka_mnesia:create_table(Tab, [ - {ram_copies, [node()]}, + ok = mria:create_table(Tab, [ + {storage, ram_copies}, {record_name, emqx_sn_registry}, {attributes, record_info(fields, emqx_sn_registry)}, {storage_properties, [{ets, [{read_concurrency, true}]}]}, {rlog_shard, ?SN_SHARD} ]), - ok = ekka_mnesia:copy_table(Tab, ram_copies), - ok = ekka_rlog:wait_for_shards([?SN_SHARD], infinity), + ok = mria:wait_for_tables([Tab]), % FIXME: - %ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), + %ok = mria_rlog:wait_for_shards([?CM_SHARD], infinity), MaxPredefId = lists:foldl( fun(#{id := TopicId, topic := TopicName0}, AccId) -> TopicName = iolist_to_binary(TopicName0), - ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ - key = {predef, TopicId}, - value = TopicName} - ), - ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ - key = {predef, TopicName}, - value = TopicId} - ), + mria:dirty_write(Tab, #emqx_sn_registry{ + key = {predef, TopicId}, + value = TopicName} + ), + mria:dirty_write(Tab, #emqx_sn_registry{ + key = {predef, TopicName}, + value = TopicId} + ), if TopicId > AccId -> TopicId; true -> AccId end end, 0, PredefTopics), {ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}. @@ -190,7 +185,7 @@ handle_call({register, ClientId, TopicName}, _From, key = {ClientId, TopicId}, value = TopicName}, write) end, - case ekka_mnesia:transaction(?SN_SHARD, Fun) of + case mria:transaction(?SN_SHARD, Fun) of {atomic, ok} -> {reply, TopicId, State}; {aborted, Error} -> @@ -205,7 +200,7 @@ handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) -> {emqx_sn_registry, {ClientId, '_'}, '_'} ), lists:foreach(fun(R) -> - ekka_mnesia:dirty_delete_object(Tab, R) + mria:dirty_delete_object(Tab, R) end, Registry), {reply, ok, State}; diff --git a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl index e727c6c88..d7a8f1d1c 100644 --- a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl @@ -35,7 +35,7 @@ all() -> init_per_suite(Config) -> application:ensure_all_started(ekka), - ekka_mnesia:start(), + mria:start(), Config. end_per_suite(_Config) -> @@ -49,7 +49,7 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) -> {Tab, _Pid} = proplists:get_value(reg, Config), - ekka_mnesia:clear_table(Tab), + mria:clear_table(Tab), Config. %%-------------------------------------------------------------------- diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index 66616f3ea..a55d17616 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -30,14 +30,10 @@ -endif. -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -include_lib("emqx/include/logger.hrl"). -include("emqx_machine.hrl"). --rlog_shard({?EMQX_MACHINE_SHARD, ?CLUSTER_MFA}). --rlog_shard({?EMQX_MACHINE_SHARD, ?CLUSTER_COMMIT}). - -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). @@ -45,21 +41,18 @@ %%% API %%%=================================================================== mnesia(boot) -> - ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ + ok = mria:create_table(?CLUSTER_MFA, [ {type, ordered_set}, {rlog_shard, ?EMQX_MACHINE_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, cluster_rpc_mfa}, {attributes, record_info(fields, cluster_rpc_mfa)}]), - ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ + ok = mria:create_table(?CLUSTER_COMMIT, [ {type, set}, {rlog_shard, ?EMQX_MACHINE_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, cluster_rpc_commit}, - {attributes, record_info(fields, cluster_rpc_commit)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(cluster_rpc_mfa, disc_copies), - ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). + {attributes, record_info(fields, cluster_rpc_commit)}]). start_link() -> start_link(node(), ?MODULE, get_retry_ms()). @@ -88,13 +81,13 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu MFA = {initiate, {M, F, A}}, Begin = erlang:monotonic_time(), InitRes = - case ekka_rlog:role() of + case mria_rlog:role() of core -> gen_server:call(?MODULE, MFA, Timeout); replicant -> %% the initiate transaction must happened on core node %% make sure MFA(in the transaction) and the transaction on the same node %% don't need rpc again inside transaction. - case ekka_rlog_status:upstream_node(?EMQX_MACHINE_SHARD) of + case mria_status:upstream_node(?EMQX_MACHINE_SHARD) of {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); disconnected -> {error, disconnected} end @@ -150,8 +143,8 @@ handle_continue(?CATCH_UP, State) -> {noreply, State, catch_up(State)}. handle_call(reset, _From, State) -> - _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), - _ = ekka_mnesia:clear_table(?CLUSTER_MFA), + _ = mria:clear_table(?CLUSTER_COMMIT), + _ = mria:clear_table(?CLUSTER_MFA), {reply, ok, State, {continue, ?CATCH_UP}}; handle_call({initiate, MFA}, _From, State = #{node := Node}) -> @@ -280,7 +273,7 @@ do_catch_up_in_one_trans(LatestId, Node) -> end. transaction(Func, Args) -> - ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, Func, Args). + mria:transaction(?EMQX_MACHINE_SHARD, Func, Args). trans_status() -> mnesia:foldl(fun(Rec, Acc) -> diff --git a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl index 6dcbd3d25..2b1242fa7 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl @@ -49,7 +49,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> - case ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of + case mria:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of {atomic, ok} -> ok; Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) end, diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 0f8208b46..313f45dd3 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -36,7 +36,7 @@ start() -> ok = print_otp_version_warning(), ok = load_config_files(), ekka:start(), - ekka_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), ok. graceful_shutdown() -> diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl index 995de1f62..ee317d1bf 100644 --- a/apps/emqx_machine/src/emqx_machine_schema.erl +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -147,7 +147,7 @@ fields("cluster") -> #{})} , {"db_backend", sc(hoconsc:enum([mnesia, rlog]), - #{ mapping => "ekka.db_backend" + #{ mapping => "mria.db_backend" , default => mnesia })} , {"rlog", @@ -253,12 +253,12 @@ fields(cluster_k8s) -> fields("rlog") -> [ {"role", sc(hoconsc:enum([core, replicant]), - #{ mapping => "ekka.node_role" + #{ mapping => "mria.node_role" , default => core })} , {"core_nodes", sc(emqx_schema:comma_separated_atoms(), - #{ mapping => "ekka.core_nodes" + #{ mapping => "mria.core_nodes" , default => [] })} ]; diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index 4e3b2d2c2..cf6c794a7 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -43,7 +43,7 @@ init_per_suite(Config) -> application:load(emqx), application:load(emqx_machine), ok = ekka:start(), - ok = ekka_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), application:set_env(emqx_machine, cluster_call_max_history, 100), application:set_env(emqx_machine, cluster_call_clean_interval, 1000), application:set_env(emqx_machine, cluster_call_retry_interval, 900), @@ -54,8 +54,8 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ekka:stop(), - ekka_mnesia:ensure_stopped(), - ekka_mnesia:delete_schema(), + mria:stop(), + mria_mnesia:delete_schema(), meck:unload(emqx_alarm), ok. diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 3b1fd5903..eca5b3e56 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -164,7 +164,7 @@ stopped_node_info(Node) -> %%-------------------------------------------------------------------- list_brokers() -> - [{Node, broker_info(Node)} || Node <- ekka_mnesia:running_nodes()]. + [{Node, broker_info(Node)} || Node <- mria_mnesia:running_nodes()]. lookup_broker(Node) -> broker_info(Node). @@ -181,7 +181,7 @@ broker_info(Node) -> %%-------------------------------------------------------------------- get_metrics() -> - nodes_info_count([get_metrics(Node) || Node <- ekka_mnesia:running_nodes()]). + nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]). get_metrics(Node) when Node =:= node() -> emqx_metrics:all(); @@ -201,7 +201,7 @@ get_stats() -> begin Stats = get_stats(Node), delete_keys(Stats, GlobalStatsKeys) - end || Node <- ekka_mnesia:running_nodes()]), + end || Node <- mria_mnesia:running_nodes()]), GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))), maps:merge(CountStats, GlobalStats). @@ -232,10 +232,10 @@ nodes_info_count(PropList) -> %%-------------------------------------------------------------------- lookup_client({clientid, ClientId}, FormatFun) -> - lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- ekka_mnesia:running_nodes()]); + lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]); lookup_client({username, Username}, FormatFun) -> - lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]). + lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- mria_mnesia:running_nodes()]). lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> lists:append(lists:map( @@ -257,7 +257,7 @@ lookup_client(Node, {username, Username}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). kickout_client(ClientId) -> - Results = [kickout_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + Results = [kickout_client(Node, ClientId) || Node <- mria_mnesia:running_nodes()], case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> lists:last(Results) @@ -273,7 +273,7 @@ list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). list_client_subscriptions(ClientId) -> - Results = [client_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()], Expected = lists:filter(fun({error, _}) -> false; ([]) -> false; (_) -> true @@ -290,7 +290,7 @@ client_subscriptions(Node, ClientId) -> rpc_call(Node, client_subscriptions, [Node, ClientId]). clean_authz_cache(ClientId) -> - Results = [clean_authz_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()], case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> lists:last(Results) @@ -308,7 +308,7 @@ clean_authz_cache(Node, ClientId) -> rpc_call(Node, clean_authz_cache, [Node, ClientId]). clean_authz_cache_all() -> - Results = [{Node, clean_authz_cache_all(Node)} || Node <- ekka_mnesia:running_nodes()], + Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()], case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of [] -> ok; BadNodes -> {error, BadNodes} @@ -328,7 +328,7 @@ set_quota_policy(ClientId, Policy) -> %% @private call_client(ClientId, Req) -> - Results = [call_client(Node, ClientId, Req) || Node <- ekka_mnesia:running_nodes()], + Results = [call_client(Node, ClientId, Req) || Node <- mria_mnesia:running_nodes()], Expected = lists:filter(fun({error, _}) -> false; (_) -> true end, Results), @@ -366,7 +366,7 @@ list_subscriptions(Node) -> rpc_call(Node, list_subscriptions, [Node]). list_subscriptions_via_topic(Topic, FormatFun) -> - lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- ekka_mnesia:running_nodes()]). + lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- mria_mnesia:running_nodes()]). list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() -> MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}], @@ -376,7 +376,7 @@ list_subscriptions_via_topic(Node, Topic, FormatFun) -> rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]). lookup_subscriptions(ClientId) -> - lists:append([lookup_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()]). + lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]). lookup_subscriptions(Node, ClientId) when Node =:= node() -> case ets:lookup(emqx_subid, ClientId) of @@ -400,7 +400,7 @@ lookup_routes(Topic) -> %%-------------------------------------------------------------------- subscribe(ClientId, TopicTables) -> - subscribe(ekka_mnesia:running_nodes(), ClientId, TopicTables). + subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of @@ -424,7 +424,7 @@ publish(Msg) -> emqx:publish(Msg). unsubscribe(ClientId, Topic) -> - unsubscribe(ekka_mnesia:running_nodes(), ClientId, Topic). + unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic). unsubscribe([Node | Nodes], ClientId, Topic) -> case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of @@ -447,7 +447,7 @@ do_unsubscribe(ClientId, Topic) -> %%-------------------------------------------------------------------- list_plugins() -> - [{Node, list_plugins(Node)} || Node <- ekka_mnesia:running_nodes()]. + [{Node, list_plugins(Node)} || Node <- mria_mnesia:running_nodes()]. list_plugins(Node) when Node =:= node() -> emqx_plugins:list(); @@ -474,7 +474,7 @@ reload_plugin(Node, Plugin) -> %%-------------------------------------------------------------------- list_listeners() -> - lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]). + lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]). list_listeners(Node) when Node =:= node() -> [Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()]; @@ -505,7 +505,7 @@ manage_listener(Operation, Param = #{node := Node}) -> rpc_call(Node, manage_listener, [Operation, Param]). update_listener(Id, Config) -> - [update_listener(Node, Id, Config) || Node <- ekka_mnesia:running_nodes()]. + [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()]. update_listener(Node, Id, Config) when Node =:= node() -> case emqx_listeners:parse_listener_id(Id) of @@ -523,7 +523,7 @@ update_listener(Node, Id, Config) -> rpc_call(Node, update_listener, [Node, Id, Config]). remove_listener(Id) -> - [remove_listener(Node, Id) || Node <- ekka_mnesia:running_nodes()]. + [remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()]. remove_listener(Node, Id) when Node =:= node() -> {Type, Name} = emqx_listeners:parse_listener_id(Id), @@ -540,7 +540,7 @@ remove_listener(Node, Id) -> %%-------------------------------------------------------------------- get_alarms(Type) -> - [{Node, get_alarms(Node, Type)} || Node <- ekka_mnesia:running_nodes()]. + [{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()]. get_alarms(Node, Type) when Node =:= node() -> add_duration_field(emqx_alarm:get_alarms(Type)); @@ -553,7 +553,7 @@ deactivate(Node, Name) -> rpc_call(Node, deactivate, [Node, Name]). delete_all_deactivated_alarms() -> - [delete_all_deactivated_alarms(Node) || Node <- ekka_mnesia:running_nodes()]. + [delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()]. delete_all_deactivated_alarms(Node) when Node =:= node() -> emqx_alarm:delete_all_deactivated_alarms(); @@ -621,5 +621,3 @@ max_row_limit() -> ?MAX_ROW_LIMIT. table_size(Tab) -> ets:info(Tab, size). - - diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 9afb6090e..d79eab587 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -167,7 +167,7 @@ cluster_query(Params, Tab, QsSchema, QueryFun) -> {_CodCnt, Qs} = params2qs(Params, QsSchema), Limit = b2i(limit(Params)), Page = b2i(page(Params)), - Nodes = ekka_mnesia:running_nodes(), + Nodes = mria_mnesia:running_nodes(), Meta = #{page => Page, limit => Limit, count => 0}, page_limit_check_query(Meta, {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, Meta]}). diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 9e12cbe3a..52c7a8709 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -300,7 +300,7 @@ manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) Result; manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) -> - Results = [do_manage_listeners(Node, Id, Oper) || Node <- ekka_mnesia:running_nodes()], + Results = [do_manage_listeners(Node, Id, Oper) || Node <- mria_mnesia:running_nodes()], case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of [] -> {200}; Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}} diff --git a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl index 2795fe342..8c0b364c0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl @@ -154,6 +154,6 @@ list(get, #{query_string := Qs}) -> {200, emqx_mgmt:get_metrics()}; _ -> Data = [maps:from_list(emqx_mgmt:get_metrics(Node) ++ [{node, Node}]) || - Node <- ekka_mnesia:running_nodes()], + Node <- mria_mnesia:running_nodes()], {200, Data} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl index 470b5fda1..da8e643d8 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl @@ -96,6 +96,6 @@ list(get, #{query_string := Qs}) -> {200, emqx_mgmt:get_stats()}; _ -> Data = [maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}]) || - Node <- ekka_mnesia:running_nodes()], + Node <- mria_mnesia:running_nodes()], {200, Data} end. diff --git a/apps/emqx_management/src/emqx_mgmt_app.erl b/apps/emqx_management/src/emqx_mgmt_app.erl index 8dc1651da..4dae30a97 100644 --- a/apps/emqx_management/src/emqx_mgmt_app.erl +++ b/apps/emqx_management/src/emqx_mgmt_app.erl @@ -28,7 +28,7 @@ start(_Type, _Args) -> {ok, Sup} = emqx_mgmt_sup:start_link(), - ok = ekka_rlog:wait_for_shards([?MANAGEMENT_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?MANAGEMENT_SHARD], infinity), emqx_mgmt_cli:load(), {ok, Sup}. diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index c023267a6..cdad91b0c 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -24,7 +24,7 @@ init_suite() -> init_suite([]). init_suite(Apps) -> - ekka_mnesia:start(), + mria:start(), application:load(emqx_management), emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], fun set_special_configs/1). diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 86ef97dac..401e2a10c 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -25,7 +25,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([ start_link/0 , on_message_publish/1 @@ -64,14 +63,12 @@ %% Mnesia bootstrap %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {type, ordered_set}, - {disc_copies, [node()]}, + {storage, disc_copies}, {local_content, true}, {record_name, delayed_message}, - {attributes, record_info(fields, delayed_message)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, disc_copies). + {attributes, record_info(fields, delayed_message)}]). %%-------------------------------------------------------------------- %% Hooks @@ -184,7 +181,7 @@ delete_delayed_message(Id0) -> {error, not_found}; Rows -> Timestamp = hd(Rows), - ekka_mnesia:dirty_delete(?TAB, {Timestamp, Id}) + mria:dirty_delete(?TAB, {Timestamp, Id}) end. update_config(Config) -> {ok, _} = emqx:update_config([delayed], Config). @@ -205,7 +202,7 @@ handle_call({set_max_delayed_messages, Max}, _From, State) -> handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State = #{max_delayed_messages := 0}) -> - ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), + ok = mria:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)}; @@ -216,7 +213,7 @@ handle_call({store, DelayedMsg = #delayed_message{key = Key}}, true -> {reply, {error, max_delayed_messages_full}, State}; false -> - ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), + ok = mria:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)} end; @@ -240,7 +237,7 @@ handle_cast(Msg, State) -> %% Do Publish... handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) -> DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), - lists:foreach(fun(Key) -> ekka_mnesia:dirty_delete(?TAB, Key) end, DeletedKeys), + lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys), {noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})}; handle_info(stats, State = #{stats_fun := StatsFun}) -> diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 94a388767..768ef4590 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -210,7 +210,7 @@ generate_max_delayed_messages(Config) -> update_config_(Config) -> lists:foreach(fun(Node) -> update_config_(Node, Config) - end, ekka_mnesia:running_nodes()). + end, mria_mnesia:running_nodes()). update_config_(Node, Config) when Node =:= node() -> _ = emqx_delayed:update_config(Config), diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 65852df93..17f32ee58 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -30,7 +30,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([ start_link/0 , stop/0 @@ -91,14 +90,12 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?TELEMETRY, + ok = mria:create_table(?TELEMETRY, [{type, set}, - {disc_copies, [node()]}, + {storage, disc_copies}, {local_content, true}, {record_name, telemetry}, - {attributes, record_info(fields, telemetry)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TELEMETRY, disc_copies). + {attributes, record_info(fields, telemetry)}]). %%-------------------------------------------------------------------- %% API @@ -130,7 +127,7 @@ get_telemetry() -> %% gen_server callbacks %%-------------------------------------------------------------------- -%% This is to suppress dialyzer warnings for mnesia:dirty_write and +%% This is to suppress dialyzer warnings for mria:dirty_write and %% dirty_read race condition. Given that the init function is not evaluated %% concurrently in one node, it should be free of race condition. %% Given the chance of having two nodes bootstraping with the write @@ -140,8 +137,8 @@ init(_Opts) -> UUID1 = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of [] -> UUID = generate_uuid(), - ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, - uuid = UUID}), + mria:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, + uuid = UUID}), UUID; [#telemetry{uuid = UUID} | _] -> UUID @@ -268,7 +265,7 @@ uptime() -> element(1, erlang:statistics(wall_clock)). nodes_uuid() -> - Nodes = lists:delete(node(), ekka_mnesia:running_nodes()), + Nodes = lists:delete(node(), mria_mnesia:running_nodes()), lists:foldl(fun(Node, Acc) -> case rpc:call(Node, ?MODULE, get_uuid, []) of {badrpc, _Reason} -> diff --git a/apps/emqx_modules/src/emqx_telemetry_api.erl b/apps/emqx_modules/src/emqx_telemetry_api.erl index 5d1cffdcd..7dd36654a 100644 --- a/apps/emqx_modules/src/emqx_telemetry_api.erl +++ b/apps/emqx_modules/src/emqx_telemetry_api.erl @@ -152,7 +152,7 @@ data(get, _Request) -> enable_telemetry(Enable) -> lists:foreach(fun(Node) -> enable_telemetry(Node, Enable) - end, ekka_mnesia:running_nodes()). + end, mria_mnesia:running_nodes()). enable_telemetry(Node, Enable) when Node =:= node() -> case Enable of diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index 3a070a1e8..81c2c1577 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -35,7 +35,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ekka_mnesia:start(), + mria:start(), ok = emqx_delayed:mnesia(boot), emqx_common_test_helpers:start_apps([emqx_modules]), Config. diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index cf1ddff93..5380b55a6 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -28,7 +28,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ok = ekka_mnesia:start(), + ok = mria:start(), ok = emqx_telemetry:mnesia(boot), emqx_common_test_helpers:start_apps([emqx_modules]), Config. diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index bbdbab6b2..4667de24f 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -590,6 +590,6 @@ emqx_cluster() -> ]. emqx_cluster_data() -> - #{running_nodes := Running, stopped_nodes := Stopped} = ekka_mnesia:cluster_info(), + #{running_nodes := Running, stopped_nodes := Stopped} = mria_mnesia:cluster_info(), [{nodes_running, length(Running)}, {nodes_stopped, length(Stopped)}]. diff --git a/apps/emqx_psk/src/emqx_psk.erl b/apps/emqx_psk/src/emqx_psk.erl index 9ea25cdeb..7de332cb9 100644 --- a/apps/emqx_psk/src/emqx_psk.erl +++ b/apps/emqx_psk/src/emqx_psk.erl @@ -47,7 +47,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -define(TAB, ?MODULE). -define(PSK_SHARD, emqx_psk_shard). @@ -64,16 +63,13 @@ %% @doc Create or replicate tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {rlog_shard, ?PSK_SHARD}, {type, ordered_set}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, psk_entry}, {attributes, record_info(fields, psk_entry)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, disc_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). %%------------------------------------------------------------------------------ %% APIs @@ -237,7 +233,7 @@ trim_crlf(Bin) -> end. trans(Fun, Args) -> - case ekka_mnesia:transaction(?PSK_SHARD, Fun, Args) of + case mria:transaction(?PSK_SHARD, Fun, Args) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 0fc0a4615..6b5a9ac14 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -52,15 +52,14 @@ create_resource(#{storage_type := StorageType}) -> {read_concurrency, true}, {write_concurrency, true}]}, {dets, [{auto_save, 1000}]}], - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {type, set}, {rlog_shard, ?RETAINER_SHARD}, - {Copies, [node()]}, + {storage, Copies}, {record_name, retained}, {attributes, record_info(fields, retained)}, {storage_properties, StoreProps}]), - ok = ekka_mnesia:copy_table(?TAB, Copies), - ok = ekka_rlog:wait_for_shards([?RETAINER_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity), case mnesia:table_info(?TAB, storage_type) of Copies -> ok; _Other -> @@ -73,10 +72,10 @@ store_retained(_, Msg =#message{topic = Topic}) -> case is_table_full() of false -> ok = emqx_metrics:inc('messages.retained'), - ekka_mnesia:dirty_write(?TAB, - #retained{topic = topic2tokens(Topic), - msg = Msg, - expiry_time = ExpiryTime}); + mria:dirty_write(?TAB, + #retained{topic = topic2tokens(Topic), + msg = Msg, + expiry_time = ExpiryTime}); _ -> Tokens = topic2tokens(Topic), Fun = fun() -> @@ -94,7 +93,7 @@ store_retained(_, Msg =#message{topic = Topic}) -> ok end end, - {atomic, ok} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), + {atomic, ok} = mria:transaction(?RETAINER_SHARD, Fun), ok end. @@ -106,7 +105,7 @@ clear_expired(_) -> Keys = mnesia:select(?TAB, Ms, write), lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys) end, - {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), + {atomic, _} = mria:transaction(?RETAINER_SHARD, Fun), ok. delete_message(_, Topic) -> @@ -117,12 +116,8 @@ delete_message(_, Topic) -> Fun = fun() -> mnesia:delete({?TAB, Tokens}) end, - case ekka_mnesia:transaction(?RETAINER_SHARD, Fun) of - {atomic, Result} -> - Result; - ok -> - ok - end + _ = mria:transaction(?RETAINER_SHARD, Fun), + ok end, ok. @@ -157,7 +152,7 @@ match_messages(_, Topic, Cursor) -> end. clean(_) -> - ekka_mnesia:clear_table(?TAB), + _ = mria:clear_table(?TAB), ok. %%-------------------------------------------------------------------- %% Internal functions @@ -214,7 +209,7 @@ match_delete_messages(Filter) -> MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'}, Ms = [{MsHd, [], ['$_']}], Rs = mnesia:dirty_select(?TAB, Ms), - lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Rs). + lists:foreach(fun(R) -> mria:dirty_delete_object(?TAB, R) end, Rs). %% @private condition(Ws) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 5e6b28b61..b9a3b16f7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -338,4 +338,4 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) -> get_rule_metrics(Id) -> [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) - || Node <- ekka_mnesia:running_nodes()]. + || Node <- mria_mnesia:running_nodes()]. diff --git a/rebar.config b/rebar.config index 89b8e3a9f..91c0196c2 100644 --- a/rebar.config +++ b/rebar.config @@ -50,7 +50,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} diff --git a/rebar.config.erl b/rebar.config.erl index 86ab38285..aec39717e 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -60,7 +60,7 @@ community_plugin_overrides() -> %% Temporary workaround for a rebar3 erl_opts duplication %% bug. Ideally, we want to set this define globally snabbkaffe_overrides() -> - Apps = [snabbkaffe, ekka], + Apps = [snabbkaffe, ekka, mria], [{add, App, [{erl_opts, [{d, snk_kind, msg}]}]} || App <- Apps]. config(HasElixir) ->