From b515a45a4f7ba70573420491961d994682f2f70c Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Thu, 14 Oct 2021 18:24:59 +0200 Subject: [PATCH 01/14] chore(mria): Rename module: ekka_mnesia -> mria --- apps/emqx/rebar.config | 2 +- apps/emqx/src/emqx_alarm.erl | 12 ++--- apps/emqx/src/emqx_banned.erl | 16 +++---- apps/emqx/src/emqx_cm_registry.erl | 6 +-- apps/emqx/src/emqx_router.erl | 6 +-- apps/emqx/src/emqx_router_helper.erl | 6 +-- apps/emqx/src/emqx_shared_sub.erl | 8 ++-- apps/emqx/test/props/prop_emqx_sys.erl | 2 +- .../emqx_enhanced_authn_scram_mnesia.erl | 2 +- .../src/simple_authn/emqx_authn_mnesia.erl | 2 +- apps/emqx_authz/src/emqx_authz_api_mnesia.erl | 46 +++++++++---------- .../test/emqx_authz_mnesia_SUITE.erl | 29 ++++++------ .../src/emqx_dashboard_admin.erl | 10 ++-- .../src/emqx_dashboard_collection.erl | 2 +- .../src/emqx_dashboard_token.erl | 6 +-- .../test/emqx_dashboard_SUITE.erl | 2 +- .../src/emqx_gateway_cm_registry.erl | 6 +-- .../src/mqttsn/emqx_sn_registry.erl | 20 ++++---- apps/emqx_machine/src/emqx_cluster_rpc.erl | 2 +- .../src/emqx_cluster_rpc_handler.erl | 2 +- apps/emqx_modules/src/emqx_delayed.erl | 8 ++-- apps/emqx_modules/src/emqx_telemetry.erl | 4 +- apps/emqx_psk/src/emqx_psk.erl | 2 +- .../src/emqx_retainer_mnesia.erl | 16 +++---- rebar.config | 2 +- 25 files changed, 109 insertions(+), 110 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index b851700b9..0c8f5e7ba 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -15,7 +15,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.6"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 2df126ea7..412880bad 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -201,7 +201,7 @@ handle_call({activate_alarm, Name, Details}, _From, State) -> details = Details, message = normalize_message(Name, Details), activate_at = erlang:system_time(microsecond)}, - ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), + mria:dirty_write(?ACTIVATED_ALARM, Alarm), do_actions(activate, Alarm, emqx:get_config([alarm, actions])), {reply, ok, State} end; @@ -282,7 +282,7 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name case mnesia:dirty_first(?DEACTIVATED_ALARM) of '$end_of_table' -> ok; ActivateAt2 -> - ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) + mria:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) end; false -> ok end, @@ -291,8 +291,8 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, normalize_message(Name, Details), erlang:system_time(microsecond)), - ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), - ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name), + mria:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), + mria:dirty_delete(?ACTIVATED_ALARM, Name), do_actions(deactivate, DeActAlarm, emqx:get_config([alarm, actions])). make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> @@ -309,7 +309,7 @@ deactivate_all_alarms() -> details = Details, message = Message, activate_at = ActivateAt}) -> - ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, + mria:dirty_write(?DEACTIVATED_ALARM, #deactivated_alarm{ activate_at = ActivateAt, name = Name, @@ -347,7 +347,7 @@ delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) -> delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) -> case ActivatedAt =< Checkpoint of true -> - ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), + mria:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt), delete_expired_deactivated_alarms(NActivatedAt, Checkpoint); false -> diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 759c9f955..86f79ede8 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -155,13 +155,13 @@ create(#{who := Who, reason := Reason, at := At, until := Until}) -> - ekka_mnesia:dirty_write(?BANNED_TAB, #banned{who = Who, - by = By, - reason = Reason, - at = At, - until = Until}); + mria:dirty_write(?BANNED_TAB, #banned{who = Who, + by = By, + reason = Reason, + at = At, + until = Until}); create(Banned) when is_record(Banned, banned) -> - ekka_mnesia:dirty_write(?BANNED_TAB, Banned). + mria:dirty_write(?BANNED_TAB, Banned). look_up(Who) when is_map(Who) -> look_up(pares_who(Who)); @@ -174,7 +174,7 @@ look_up(Who) -> delete(Who) when is_map(Who)-> delete(pares_who(Who)); delete(Who) -> - ekka_mnesia:dirty_delete(?BANNED_TAB, Who). + mria:dirty_delete(?BANNED_TAB, Who). info(InfoKey) -> mnesia:table_info(?BANNED_TAB, InfoKey). @@ -195,7 +195,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - ekka_mnesia:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), + mria:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index ef7ad6131..65b89a884 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -71,7 +71,7 @@ register_channel(ClientId) when is_binary(ClientId) -> register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> ekka_mnesia:dirty_write(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_write(?TAB, record(ClientId, ChanPid)); false -> ok end. @@ -83,7 +83,7 @@ unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> ekka_mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_delete_object(?TAB, record(ClientId, ChanPid)); false -> ok end. @@ -124,7 +124,7 @@ handle_cast(Msg, State) -> handle_info({membership, {mnesia, down, Node}}, State) -> global:trans({?LOCK, self()}, fun() -> - ekka_mnesia:transaction(?CM_SHARD, fun cleanup_channels/1, [Node]) + mria:transaction(?CM_SHARD, fun cleanup_channels/1, [Node]) end), {noreply, State}; diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 3e5475c13..4793100ba 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -225,7 +225,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- insert_direct_route(Route) -> - ekka_mnesia:dirty_write(?ROUTE_TAB, Route). + mria:dirty_write(?ROUTE_TAB, Route). insert_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE_TAB, Topic}) of @@ -235,7 +235,7 @@ insert_trie_route(Route = #route{topic = Topic}) -> mnesia:write(?ROUTE_TAB, Route, sticky_write). delete_direct_route(Route) -> - ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route). + mria:dirty_delete_object(?ROUTE_TAB, Route). delete_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE_TAB, Topic}) of @@ -280,7 +280,7 @@ trans(Fun, Args) -> %% Future changes should keep in mind that this process %% always exit with database write result. fun() -> - Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of + Res = case mria:transaction(?ROUTE_SHARD, Fun, Args) of {atomic, Ok} -> Ok; {aborted, Reason} -> {error, Reason} end, diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index a88e82d8d..6542e3983 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -87,7 +87,7 @@ monitor(Node) when is_atom(Node) -> case ekka:is_member(Node) orelse ets:member(?ROUTING_NODE, Node) of true -> ok; - false -> ekka_mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node}) + false -> mria:dirty_write(?ROUTING_NODE, #routing_node{name = Node}) end. %%-------------------------------------------------------------------- @@ -136,9 +136,9 @@ handle_info({mnesia_table_event, Event}, State) -> handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> global:trans({?LOCK, self()}, fun() -> - ekka_mnesia:transaction(fun cleanup_routes/1, [Node]) + mria:transaction(fun cleanup_routes/1, [Node]) end), - ok = ekka_mnesia:dirty_delete(?ROUTING_NODE, Node), + ok = mria:dirty_delete(?ROUTING_NODE, Node), {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate}; handle_info({membership, {mnesia, down, Node}}, State) -> diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index ef8e3d288..299c4df38 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -297,7 +297,7 @@ subscribers(Group, Topic) -> init([]) -> {ok, _} = mnesia:subscribe({table, ?TAB, simple}), - {atomic, PMon} = ekka_mnesia:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), + {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), {ok, update_stats(#state{pmon = PMon})}. @@ -309,7 +309,7 @@ init_monitors() -> end, emqx_pmon:new(), ?TAB). handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> - ekka_mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), + mria:dirty_write(?TAB, record(Group, Topic, SubPid)), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) @@ -319,7 +319,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> - ekka_mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), + mria:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}), {reply, ok, State}; @@ -373,7 +373,7 @@ cleanup_down(SubPid) -> ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid), lists:foreach( fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> - ok = ekka_mnesia:dirty_delete_object(?TAB, Record), + ok = mria:dirty_delete_object(?TAB, Record), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}) end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). diff --git a/apps/emqx/test/props/prop_emqx_sys.erl b/apps/emqx/test/props/prop_emqx_sys.erl index 3b2b8b94c..b6a0e0e38 100644 --- a/apps/emqx/test/props/prop_emqx_sys.erl +++ b/apps/emqx/test/props/prop_emqx_sys.erl @@ -29,7 +29,7 @@ [ emqx_metrics , emqx_stats , emqx_broker - , ekka_mnesia + , mria_mnesia ]). -define(ALL(Vars, Types, Exprs), diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index 3daece601..4f8b38b1e 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -268,7 +268,7 @@ trans(Fun) -> trans(Fun, []). trans(Fun, Args) -> - case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of + case mria:transaction(?AUTH_SHARD, Fun, Args) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index c3b6badf7..314c3d578 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -390,7 +390,7 @@ trans(Fun) -> trans(Fun, []). trans(Fun, Args) -> - case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of + case mria:transaction(?AUTH_SHARD, Fun, Args) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl index c29f90f43..78a08f570 100644 --- a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl @@ -528,10 +528,10 @@ users(get, #{query_string := Qs}) -> end; users(post, #{body := Body}) when is_list(Body) -> lists:foreach(fun(#{<<"username">> := Username, <<"rules">> := Rules}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_USERNAME, Username}, - rules = format_rules(Rules) - }) + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_USERNAME, Username}, + rules = format_rules(Rules) + }) end, Body), {204}. @@ -561,10 +561,10 @@ clients(get, #{query_string := Qs}) -> end; clients(post, #{body := Body}) when is_list(Body) -> lists:foreach(fun(#{<<"clientid">> := Clientid, <<"rules">> := Rules}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_CLIENTID, Clientid}, - rules = format_rules(Rules) - }) + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_CLIENTID, Clientid}, + rules = format_rules(Rules) + }) end, Body), {204}. @@ -581,13 +581,13 @@ user(get, #{bindings := #{username := Username}}) -> end; user(put, #{bindings := #{username := Username}, body := #{<<"username">> := Username, <<"rules">> := Rules}}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_USERNAME, Username}, - rules = format_rules(Rules) - }), + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_USERNAME, Username}, + rules = format_rules(Rules) + }), {204}; user(delete, #{bindings := #{username := Username}}) -> - ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}}), + mria:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}}), {204}. client(get, #{bindings := #{clientid := Clientid}}) -> @@ -603,13 +603,13 @@ client(get, #{bindings := #{clientid := Clientid}}) -> end; client(put, #{bindings := #{clientid := Clientid}, body := #{<<"clientid">> := Clientid, <<"rules">> := Rules}}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_CLIENTID, Clientid}, - rules = format_rules(Rules) - }), + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_CLIENTID, Clientid}, + rules = format_rules(Rules) + }), {204}; client(delete, #{bindings := #{clientid := Clientid}}) -> - ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}}), + mria:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}}), {204}. all(get, _) -> @@ -624,17 +624,17 @@ all(get, _) -> } end; all(put, #{body := #{<<"rules">> := Rules}}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = ?ACL_TABLE_ALL, - rules = format_rules(Rules) - }), + mria:dirty_write(#emqx_acl{ + who = ?ACL_TABLE_ALL, + rules = format_rules(Rules) + }), {204}. purge(delete, _) -> case emqx_authz_api_sources:get_raw_source(<<"built-in-database">>) of [#{enable := false}] -> ok = lists:foreach(fun(Key) -> - ok = ekka_mnesia:dirty_delete(?ACL_TABLE, Key) + ok = mria:dirty_delete(?ACL_TABLE, Key) end, mnesia:dirty_all_keys(?ACL_TABLE)), {204}; _ -> diff --git a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl index 947f46a82..ed82e9b1d 100644 --- a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl @@ -54,24 +54,24 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_authz, Config) -> - mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>}, - rules = [{allow, publish, <<"test/%u">>}, - {allow, subscribe, <<"eq #">>} - ] - }]), - mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>}, - rules = [{allow, publish, <<"test/%c">>}, - {deny, subscribe, <<"eq #">>} - ] - }]), - mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL, - rules = [{deny, all, <<"#">>}] + mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>}, + rules = [{allow, publish, <<"test/%u">>}, + {allow, subscribe, <<"eq #">>} + ] }]), + mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>}, + rules = [{allow, publish, <<"test/%c">>}, + {deny, subscribe, <<"eq #">>} + ] + }]), + mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL, + rules = [{deny, all, <<"#">>}] + }]), Config; init_per_testcase(_, Config) -> Config. end_per_testcase(t_authz, Config) -> - [ ekka_mnesia:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)], + [ mria:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)], Config; end_per_testcase(_, Config) -> Config. @@ -96,7 +96,7 @@ t_authz(_) -> listener => {tcp, default} }, - ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), + ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"#">>)), ?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_username">>)), @@ -106,4 +106,3 @@ t_authz(_) -> ?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"#">>)), ok. - diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index 5af983b4d..72115a80b 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -68,7 +68,7 @@ mnesia(copy) -> -spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}). add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) -> Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, fun add_user_/1, [Admin])). + return(mria:transaction(?DASHBOARD_SHARD, fun add_user_/1, [Admin])). force_add_user(Username, Password, Tags) -> AddFun = fun() -> @@ -76,7 +76,7 @@ force_add_user(Username, Password, Tags) -> password = Password, tags = Tags}) end, - case ekka_mnesia:transaction(?DASHBOARD_SHARD, AddFun) of + case mria:transaction(?DASHBOARD_SHARD, AddFun) of {atomic, ok} -> ok; {aborted, Reason} -> {error, Reason} end. @@ -98,11 +98,11 @@ remove_user(Username) when is_binary(Username) -> end, mnesia:delete({mqtt_admin, Username}) end, - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, Trans)). + return(mria:transaction(?DASHBOARD_SHARD, Trans)). -spec(update_user(binary(), binary()) -> ok | {error, term()}). update_user(Username, Tags) when is_binary(Username) -> - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, fun update_user_/2, [Username, Tags])). + return(mria:transaction(?DASHBOARD_SHARD, fun update_user_/2, [Username, Tags])). %% @private update_user_(Username, Tags) -> @@ -135,7 +135,7 @@ update_pwd(Username, Fun) -> end, mnesia:write(Fun(User)) end, - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, Trans)). + return(mria:transaction(?DASHBOARD_SHARD, Trans)). -spec(lookup_user(binary()) -> [mqtt_admin()]). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index 0e2adf7c3..7ef5e0972 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -162,7 +162,7 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) -> diff(Sent, Sent0), diff(Dropped, Dropped0)}, Ts = get_local_time(), - ekka_mnesia:transaction(ekka_mnesia:local_content_shard(), + mria:transaction(ekka_mnesia:local_content_shard(), fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]), {Received, Sent, Dropped}. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_token.erl b/apps/emqx_dashboard/src/emqx_dashboard_token.erl index c1ca15cb3..7d53ee06b 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_token.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_token.erl @@ -104,7 +104,7 @@ do_sign(Username, Password) -> Signed = jose_jwt:sign(JWK, JWS, JWT), {_, Token} = jose_jws:compact(Signed), JWTRec = format(Token, Username, ExpTime), - ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]), + mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]), {ok, Token}. do_verify(Token)-> @@ -113,7 +113,7 @@ do_verify(Token)-> case ExpTime > erlang:system_time(millisecond) of true -> NewJWT = JWT#mqtt_admin_jwt{exptime = jwt_expiration_time()}, - {atomic, Res} = ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]), + {atomic, Res} = mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]), Res; _ -> {error, token_timeout} @@ -124,7 +124,7 @@ do_verify(Token)-> do_destroy(Token) -> Fun = fun mnesia:delete/1, - {atomic, ok} = ekka_mnesia:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]), + {atomic, ok} = mria:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]), ok. do_destroy_by_username(Username) -> diff --git a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl index 6d245c9bc..cb947b995 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -102,7 +102,7 @@ t_rest_api(_Config) -> ok. t_cli(_Config) -> - [ekka_mnesia:dirty_delete(mqtt_admin, Admin) || Admin <- mnesia:dirty_all_keys(mqtt_admin)], + [mria:dirty_delete(mqtt_admin, Admin) || Admin <- mnesia:dirty_all_keys(mqtt_admin)], emqx_dashboard_cli:admins(["add", "username", "password"]), [{mqtt_admin, <<"username">>, <>, _}] = emqx_dashboard_admin:lookup_user(<<"username">>), diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 1d9daa637..51c4e95c2 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -65,7 +65,7 @@ register_channel(Type, ClientId) when is_binary(ClientId) -> register_channel(Type, {ClientId, self()}); register_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> - ekka_mnesia:dirty_write(tabname(Type), record(ClientId, ChanPid)). + mria:dirty_write(tabname(Type), record(ClientId, ChanPid)). %% @doc Unregister a global channel. -spec unregister_channel(atom(), binary() | {binary(), pid()}) -> ok. @@ -73,7 +73,7 @@ unregister_channel(Type, ClientId) when is_binary(ClientId) -> unregister_channel(Type, {ClientId, self()}); unregister_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> - ekka_mnesia:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)). + mria:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)). %% @doc Lookup the global channels. -spec lookup_channels(atom(), binary()) -> list(pid()). @@ -115,7 +115,7 @@ handle_info({membership, {mnesia, down, Node}}, State = #{type := Type}) -> global:trans({?LOCK, self()}, fun() -> %% FIXME: The shard name should be fixed later - ekka_mnesia:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab]) + mria:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab]) end), {noreply, State}; diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 2534eee26..863ea2be7 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -157,14 +157,14 @@ init([InstaId, PredefTopics]) -> MaxPredefId = lists:foldl( fun(#{id := TopicId, topic := TopicName0}, AccId) -> TopicName = iolist_to_binary(TopicName0), - ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ - key = {predef, TopicId}, - value = TopicName} - ), - ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ - key = {predef, TopicName}, - value = TopicId} - ), + mria:dirty_write(Tab, #emqx_sn_registry{ + key = {predef, TopicId}, + value = TopicName} + ), + mria:dirty_write(Tab, #emqx_sn_registry{ + key = {predef, TopicName}, + value = TopicId} + ), if TopicId > AccId -> TopicId; true -> AccId end end, 0, PredefTopics), {ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}. @@ -192,7 +192,7 @@ handle_call({register, ClientId, TopicName}, _From, key = {ClientId, TopicId}, value = TopicName}, write) end, - case ekka_mnesia:transaction(?SN_SHARD, Fun) of + case mria:transaction(?SN_SHARD, Fun) of {atomic, ok} -> {reply, TopicId, State}; {aborted, Error} -> @@ -207,7 +207,7 @@ handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) -> {emqx_sn_registry, {ClientId, '_'}, '_'} ), lists:foreach(fun(R) -> - ekka_mnesia:dirty_delete_object(Tab, R) + mria:dirty_delete_object(Tab, R) end, Registry), {reply, ok, State}; diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index 66616f3ea..0d963a2de 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -280,7 +280,7 @@ do_catch_up_in_one_trans(LatestId, Node) -> end. transaction(Func, Args) -> - ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, Func, Args). + mria:transaction(?EMQX_MACHINE_SHARD, Func, Args). trans_status() -> mnesia:foldl(fun(Rec, Acc) -> diff --git a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl index 6dcbd3d25..2b1242fa7 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl @@ -49,7 +49,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> - case ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of + case mria:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of {atomic, ok} -> ok; Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) end, diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 86ef97dac..f2eef3f39 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -184,7 +184,7 @@ delete_delayed_message(Id0) -> {error, not_found}; Rows -> Timestamp = hd(Rows), - ekka_mnesia:dirty_delete(?TAB, {Timestamp, Id}) + mria:dirty_delete(?TAB, {Timestamp, Id}) end. update_config(Config) -> {ok, _} = emqx:update_config([delayed], Config). @@ -205,7 +205,7 @@ handle_call({set_max_delayed_messages, Max}, _From, State) -> handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State = #{max_delayed_messages := 0}) -> - ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), + ok = mria:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)}; @@ -216,7 +216,7 @@ handle_call({store, DelayedMsg = #delayed_message{key = Key}}, true -> {reply, {error, max_delayed_messages_full}, State}; false -> - ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), + ok = mria:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)} end; @@ -240,7 +240,7 @@ handle_cast(Msg, State) -> %% Do Publish... handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) -> DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), - lists:foreach(fun(Key) -> ekka_mnesia:dirty_delete(?TAB, Key) end, DeletedKeys), + lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys), {noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})}; handle_info(stats, State = #{stats_fun := StatsFun}) -> diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 65852df93..04586e8fc 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -140,8 +140,8 @@ init(_Opts) -> UUID1 = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of [] -> UUID = generate_uuid(), - ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, - uuid = UUID}), + mria:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, + uuid = UUID}), UUID; [#telemetry{uuid = UUID} | _] -> UUID diff --git a/apps/emqx_psk/src/emqx_psk.erl b/apps/emqx_psk/src/emqx_psk.erl index 9ea25cdeb..65ce1025b 100644 --- a/apps/emqx_psk/src/emqx_psk.erl +++ b/apps/emqx_psk/src/emqx_psk.erl @@ -237,7 +237,7 @@ trim_crlf(Bin) -> end. trans(Fun, Args) -> - case ekka_mnesia:transaction(?PSK_SHARD, Fun, Args) of + case mria:transaction(?PSK_SHARD, Fun, Args) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 0fc0a4615..e3d90fe7d 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -73,10 +73,10 @@ store_retained(_, Msg =#message{topic = Topic}) -> case is_table_full() of false -> ok = emqx_metrics:inc('messages.retained'), - ekka_mnesia:dirty_write(?TAB, - #retained{topic = topic2tokens(Topic), - msg = Msg, - expiry_time = ExpiryTime}); + mria:dirty_write(?TAB, + #retained{topic = topic2tokens(Topic), + msg = Msg, + expiry_time = ExpiryTime}); _ -> Tokens = topic2tokens(Topic), Fun = fun() -> @@ -94,7 +94,7 @@ store_retained(_, Msg =#message{topic = Topic}) -> ok end end, - {atomic, ok} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), + {atomic, ok} = mria:transaction(?RETAINER_SHARD, Fun), ok end. @@ -106,7 +106,7 @@ clear_expired(_) -> Keys = mnesia:select(?TAB, Ms, write), lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys) end, - {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), + {atomic, _} = mria:transaction(?RETAINER_SHARD, Fun), ok. delete_message(_, Topic) -> @@ -117,7 +117,7 @@ delete_message(_, Topic) -> Fun = fun() -> mnesia:delete({?TAB, Tokens}) end, - case ekka_mnesia:transaction(?RETAINER_SHARD, Fun) of + case mria:transaction(?RETAINER_SHARD, Fun) of {atomic, Result} -> Result; ok -> @@ -214,7 +214,7 @@ match_delete_messages(Filter) -> MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'}, Ms = [{MsHd, [], ['$_']}], Rs = mnesia:dirty_select(?TAB, Ms), - lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Rs). + lists:foreach(fun(R) -> mria:dirty_delete_object(?TAB, R) end, Rs). %% @private condition(Ws) -> diff --git a/rebar.config b/rebar.config index 89b8e3a9f..91c0196c2 100644 --- a/rebar.config +++ b/rebar.config @@ -50,7 +50,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} From ff48322e0c75175b74e79626eab37f4867f15d6c Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Thu, 14 Oct 2021 18:30:22 +0200 Subject: [PATCH 02/14] chore(mria): ekka_mnesia:running_nodes -> mria:running_nodes --- apps/emqx/src/emqx_sys.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 4 +- .../src/emqx_dashboard_monitor_api.erl | 10 ++--- apps/emqx_gateway/src/emqx_gateway_http.erl | 4 +- apps/emqx_management/src/emqx_mgmt.erl | 42 +++++++++---------- apps/emqx_management/src/emqx_mgmt_api.erl | 2 +- .../src/emqx_mgmt_api_listeners.erl | 2 +- .../src/emqx_mgmt_api_metrics.erl | 2 +- .../src/emqx_mgmt_api_stats.erl | 2 +- apps/emqx_modules/src/emqx_delayed_api.erl | 2 +- apps/emqx_modules/src/emqx_telemetry.erl | 2 +- apps/emqx_modules/src/emqx_telemetry_api.erl | 2 +- .../src/emqx_rule_engine_api.erl | 2 +- 13 files changed, 38 insertions(+), 40 deletions(-) diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 692d2bd0a..7882abc19 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -150,7 +150,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Version, sysdescr = Descr}) -> publish_any(version, Version), publish_any(sysdescr, Descr), - publish_any(brokers, ekka_mnesia:running_nodes()), + publish_any(brokers, mria_mnesia:running_nodes()), publish_any(stats, emqx_stats:getstats()), publish_any(metrics, emqx_metrics:all()), {noreply, tick(State), hibernate}; diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index e4805d7eb..40a101640 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -153,7 +153,7 @@ param_path_operation()-> }. list_bridges(get, _Params) -> - {200, lists:append([list_local_bridges(Node) || Node <- ekka_mnesia:running_nodes()])}. + {200, lists:append([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}. list_local_bridges(Node) when Node =:= node() -> [format_resp(Data) || Data <- emqx_bridge:list_bridges()]; @@ -161,7 +161,7 @@ list_local_bridges(Node) -> rpc_call(Node, list_local_bridges, [Node]). crud_bridges_cluster(Method, Params) -> - Results = [crud_bridges(Node, Method, Params) || Node <- ekka_mnesia:running_nodes()], + Results = [crud_bridges(Node, Method, Params) || Node <- mria_mnesia:running_nodes()], case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of [] -> case Results of diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index c00310211..ba2fc6dbe 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -158,8 +158,8 @@ counters(get, #{bindings := #{counter := Counter}}) -> lookup([{<<"counter">>, Counter}]). current_counters(get, _Params) -> - Data = [get_collect(Node) || Node <- ekka_mnesia:running_nodes()], - Nodes = length(ekka_mnesia:running_nodes()), + Data = [get_collect(Node) || Node <- mria_mnesia:running_nodes()], + Nodes = length(mria_mnesia:running_nodes()), {Received, Sent, Sub, Conn} = format_current_metrics(Data), Response = #{ nodes => Nodes, @@ -194,16 +194,16 @@ lookup_(#{node := Node, counter := Counter}) -> lookup_(#{node := Node}) -> {200, sampling(Node)}; lookup_(#{counter := Counter}) -> - CounterData = merger_counters([sampling(Node, Counter) || Node <- ekka_mnesia:running_nodes()]), + CounterData = merger_counters([sampling(Node, Counter) || Node <- mria_mnesia:running_nodes()]), Data = hd(maps:values(CounterData)), {200, Data}. list_collect(Aggregate) -> case Aggregate of <<"true">> -> - [maps:put(node, Node, sampling(Node)) || Node <- ekka_mnesia:running_nodes()]; + [maps:put(node, Node, sampling(Node)) || Node <- mria_mnesia:running_nodes()]; _ -> - Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], + Counters = [sampling(Node) || Node <- mria_mnesia:running_nodes()], merger_counters(Counters) end. diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index e5c927a1a..6b6e8b636 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -209,7 +209,7 @@ confexp({error, already_exist}) -> emqx_type:clientid(), {atom(), atom()}) -> list(). lookup_client(GwName, ClientId, FormatFun) -> lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) - || Node <- ekka_mnesia:running_nodes()]). + || Node <- mria_mnesia:running_nodes()]). lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() -> ChanTab = emqx_gateway_cm:tabname(chan, GwName), @@ -229,7 +229,7 @@ lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) -> | ok. kickout_client(GwName, ClientId) -> Results = [kickout_client(Node, GwName, ClientId) - || Node <- ekka_mnesia:running_nodes()], + || Node <- mria_mnesia:running_nodes()], case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> lists:last(Results) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 3b1fd5903..eca5b3e56 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -164,7 +164,7 @@ stopped_node_info(Node) -> %%-------------------------------------------------------------------- list_brokers() -> - [{Node, broker_info(Node)} || Node <- ekka_mnesia:running_nodes()]. + [{Node, broker_info(Node)} || Node <- mria_mnesia:running_nodes()]. lookup_broker(Node) -> broker_info(Node). @@ -181,7 +181,7 @@ broker_info(Node) -> %%-------------------------------------------------------------------- get_metrics() -> - nodes_info_count([get_metrics(Node) || Node <- ekka_mnesia:running_nodes()]). + nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]). get_metrics(Node) when Node =:= node() -> emqx_metrics:all(); @@ -201,7 +201,7 @@ get_stats() -> begin Stats = get_stats(Node), delete_keys(Stats, GlobalStatsKeys) - end || Node <- ekka_mnesia:running_nodes()]), + end || Node <- mria_mnesia:running_nodes()]), GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))), maps:merge(CountStats, GlobalStats). @@ -232,10 +232,10 @@ nodes_info_count(PropList) -> %%-------------------------------------------------------------------- lookup_client({clientid, ClientId}, FormatFun) -> - lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- ekka_mnesia:running_nodes()]); + lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]); lookup_client({username, Username}, FormatFun) -> - lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]). + lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- mria_mnesia:running_nodes()]). lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> lists:append(lists:map( @@ -257,7 +257,7 @@ lookup_client(Node, {username, Username}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). kickout_client(ClientId) -> - Results = [kickout_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + Results = [kickout_client(Node, ClientId) || Node <- mria_mnesia:running_nodes()], case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> lists:last(Results) @@ -273,7 +273,7 @@ list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). list_client_subscriptions(ClientId) -> - Results = [client_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()], Expected = lists:filter(fun({error, _}) -> false; ([]) -> false; (_) -> true @@ -290,7 +290,7 @@ client_subscriptions(Node, ClientId) -> rpc_call(Node, client_subscriptions, [Node, ClientId]). clean_authz_cache(ClientId) -> - Results = [clean_authz_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()], case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> lists:last(Results) @@ -308,7 +308,7 @@ clean_authz_cache(Node, ClientId) -> rpc_call(Node, clean_authz_cache, [Node, ClientId]). clean_authz_cache_all() -> - Results = [{Node, clean_authz_cache_all(Node)} || Node <- ekka_mnesia:running_nodes()], + Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()], case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of [] -> ok; BadNodes -> {error, BadNodes} @@ -328,7 +328,7 @@ set_quota_policy(ClientId, Policy) -> %% @private call_client(ClientId, Req) -> - Results = [call_client(Node, ClientId, Req) || Node <- ekka_mnesia:running_nodes()], + Results = [call_client(Node, ClientId, Req) || Node <- mria_mnesia:running_nodes()], Expected = lists:filter(fun({error, _}) -> false; (_) -> true end, Results), @@ -366,7 +366,7 @@ list_subscriptions(Node) -> rpc_call(Node, list_subscriptions, [Node]). list_subscriptions_via_topic(Topic, FormatFun) -> - lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- ekka_mnesia:running_nodes()]). + lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- mria_mnesia:running_nodes()]). list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() -> MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}], @@ -376,7 +376,7 @@ list_subscriptions_via_topic(Node, Topic, FormatFun) -> rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]). lookup_subscriptions(ClientId) -> - lists:append([lookup_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()]). + lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]). lookup_subscriptions(Node, ClientId) when Node =:= node() -> case ets:lookup(emqx_subid, ClientId) of @@ -400,7 +400,7 @@ lookup_routes(Topic) -> %%-------------------------------------------------------------------- subscribe(ClientId, TopicTables) -> - subscribe(ekka_mnesia:running_nodes(), ClientId, TopicTables). + subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of @@ -424,7 +424,7 @@ publish(Msg) -> emqx:publish(Msg). unsubscribe(ClientId, Topic) -> - unsubscribe(ekka_mnesia:running_nodes(), ClientId, Topic). + unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic). unsubscribe([Node | Nodes], ClientId, Topic) -> case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of @@ -447,7 +447,7 @@ do_unsubscribe(ClientId, Topic) -> %%-------------------------------------------------------------------- list_plugins() -> - [{Node, list_plugins(Node)} || Node <- ekka_mnesia:running_nodes()]. + [{Node, list_plugins(Node)} || Node <- mria_mnesia:running_nodes()]. list_plugins(Node) when Node =:= node() -> emqx_plugins:list(); @@ -474,7 +474,7 @@ reload_plugin(Node, Plugin) -> %%-------------------------------------------------------------------- list_listeners() -> - lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]). + lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]). list_listeners(Node) when Node =:= node() -> [Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()]; @@ -505,7 +505,7 @@ manage_listener(Operation, Param = #{node := Node}) -> rpc_call(Node, manage_listener, [Operation, Param]). update_listener(Id, Config) -> - [update_listener(Node, Id, Config) || Node <- ekka_mnesia:running_nodes()]. + [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()]. update_listener(Node, Id, Config) when Node =:= node() -> case emqx_listeners:parse_listener_id(Id) of @@ -523,7 +523,7 @@ update_listener(Node, Id, Config) -> rpc_call(Node, update_listener, [Node, Id, Config]). remove_listener(Id) -> - [remove_listener(Node, Id) || Node <- ekka_mnesia:running_nodes()]. + [remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()]. remove_listener(Node, Id) when Node =:= node() -> {Type, Name} = emqx_listeners:parse_listener_id(Id), @@ -540,7 +540,7 @@ remove_listener(Node, Id) -> %%-------------------------------------------------------------------- get_alarms(Type) -> - [{Node, get_alarms(Node, Type)} || Node <- ekka_mnesia:running_nodes()]. + [{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()]. get_alarms(Node, Type) when Node =:= node() -> add_duration_field(emqx_alarm:get_alarms(Type)); @@ -553,7 +553,7 @@ deactivate(Node, Name) -> rpc_call(Node, deactivate, [Node, Name]). delete_all_deactivated_alarms() -> - [delete_all_deactivated_alarms(Node) || Node <- ekka_mnesia:running_nodes()]. + [delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()]. delete_all_deactivated_alarms(Node) when Node =:= node() -> emqx_alarm:delete_all_deactivated_alarms(); @@ -621,5 +621,3 @@ max_row_limit() -> ?MAX_ROW_LIMIT. table_size(Tab) -> ets:info(Tab, size). - - diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index c7da2e752..783f0b2f6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -167,7 +167,7 @@ cluster_query(Params, Tab, QsSchema, QueryFun) -> {_CodCnt, Qs} = params2qs(Params, QsSchema), Limit = b2i(limit(Params)), Page = b2i(page(Params)), - Nodes = ekka_mnesia:running_nodes(), + Nodes = mria_mnesia:running_nodes(), Meta = #{page => Page, limit => Limit, count => 0}, page_limit_check_query(Meta, {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, Meta]}). diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 9e12cbe3a..52c7a8709 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -300,7 +300,7 @@ manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) Result; manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) -> - Results = [do_manage_listeners(Node, Id, Oper) || Node <- ekka_mnesia:running_nodes()], + Results = [do_manage_listeners(Node, Id, Oper) || Node <- mria_mnesia:running_nodes()], case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of [] -> {200}; Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}} diff --git a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl index 2795fe342..8c0b364c0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl @@ -154,6 +154,6 @@ list(get, #{query_string := Qs}) -> {200, emqx_mgmt:get_metrics()}; _ -> Data = [maps:from_list(emqx_mgmt:get_metrics(Node) ++ [{node, Node}]) || - Node <- ekka_mnesia:running_nodes()], + Node <- mria_mnesia:running_nodes()], {200, Data} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl index 470b5fda1..da8e643d8 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl @@ -96,6 +96,6 @@ list(get, #{query_string := Qs}) -> {200, emqx_mgmt:get_stats()}; _ -> Data = [maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}]) || - Node <- ekka_mnesia:running_nodes()], + Node <- mria_mnesia:running_nodes()], {200, Data} end. diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 94a388767..768ef4590 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -210,7 +210,7 @@ generate_max_delayed_messages(Config) -> update_config_(Config) -> lists:foreach(fun(Node) -> update_config_(Node, Config) - end, ekka_mnesia:running_nodes()). + end, mria_mnesia:running_nodes()). update_config_(Node, Config) when Node =:= node() -> _ = emqx_delayed:update_config(Config), diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 04586e8fc..3c3f9bd6a 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -268,7 +268,7 @@ uptime() -> element(1, erlang:statistics(wall_clock)). nodes_uuid() -> - Nodes = lists:delete(node(), ekka_mnesia:running_nodes()), + Nodes = lists:delete(node(), mria_mnesia:running_nodes()), lists:foldl(fun(Node, Acc) -> case rpc:call(Node, ?MODULE, get_uuid, []) of {badrpc, _Reason} -> diff --git a/apps/emqx_modules/src/emqx_telemetry_api.erl b/apps/emqx_modules/src/emqx_telemetry_api.erl index 5d1cffdcd..7dd36654a 100644 --- a/apps/emqx_modules/src/emqx_telemetry_api.erl +++ b/apps/emqx_modules/src/emqx_telemetry_api.erl @@ -152,7 +152,7 @@ data(get, _Request) -> enable_telemetry(Enable) -> lists:foreach(fun(Node) -> enable_telemetry(Node, Enable) - end, ekka_mnesia:running_nodes()). + end, mria_mnesia:running_nodes()). enable_telemetry(Node, Enable) when Node =:= node() -> case Enable of diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 5e6b28b61..b9a3b16f7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -338,4 +338,4 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) -> get_rule_metrics(Id) -> [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) - || Node <- ekka_mnesia:running_nodes()]. + || Node <- mria_mnesia:running_nodes()]. From ae2056da1b60f5e8644cd1d631090bc00c750501 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Thu, 14 Oct 2021 19:41:19 +0200 Subject: [PATCH 03/14] chore(mria): Fix clear_table and ro_transaction calls --- apps/emqx/src/emqx_alarm.erl | 4 ++-- apps/emqx/src/emqx_trie.erl | 2 +- apps/emqx_dashboard/src/emqx_dashboard_admin.erl | 2 +- apps/emqx_dashboard/src/emqx_dashboard_token.erl | 6 +++--- apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl | 2 +- apps/emqx_machine/src/emqx_cluster_rpc.erl | 4 ++-- apps/emqx_retainer/src/emqx_retainer_mnesia.erl | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 412880bad..39d13cacd 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -221,7 +221,7 @@ handle_call(delete_all_deactivated_alarms, _From, State) -> handle_call({get_alarms, all}, _From, State) -> {atomic, Alarms} = - ekka_mnesia:ro_transaction( + mria:ro_transaction( ?COMMON_SHARD, fun() -> [normalize(Alarm) || @@ -321,7 +321,7 @@ deactivate_all_alarms() -> %% Delete all records from the given table, ignore result. clear_table(TableName) -> - case ekka_mnesia:clear_table(TableName) of + case mria:clear_table(TableName) of {aborted, Reason} -> ?SLOG(warning, #{ msg => "fail_to_clear_table", diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 1e3a0e5a5..327db16c0 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -329,6 +329,6 @@ do_compact_test() -> do_compact(words(<<"a/+/+/+/+/b">>))), ok. -clear_tables() -> ekka_mnesia:clear_table(?TRIE). +clear_tables() -> mria:clear_table(?TRIE). -endif. % TEST diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index 72115a80b..55cb8ffe5 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -141,7 +141,7 @@ update_pwd(Username, Fun) -> -spec(lookup_user(binary()) -> [mqtt_admin()]). lookup_user(Username) when is_binary(Username) -> Fun = fun() -> mnesia:read(mqtt_admin, Username) end, - {atomic, User} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun), + {atomic, User} = mria:ro_transaction(?DASHBOARD_SHARD, Fun), User. -spec(all_users() -> [#mqtt_admin{}]). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_token.erl b/apps/emqx_dashboard/src/emqx_dashboard_token.erl index 7d53ee06b..611ef44a2 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_token.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_token.erl @@ -135,7 +135,7 @@ do_destroy_by_username(Username) -> -spec(lookup(Token :: binary()) -> {ok, #mqtt_admin_jwt{}} | {error, not_found}). lookup(Token) -> Fun = fun() -> mnesia:read(?TAB, Token) end, - case ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun) of + case mria:ro_transaction(?DASHBOARD_SHARD, Fun) of {atomic, [JWT]} -> {ok, JWT}; {atomic, []} -> {error, not_found} end. @@ -143,7 +143,7 @@ lookup(Token) -> lookup_by_username(Username) -> Spec = [{{mqtt_admin_jwt, '_', Username, '_'}, [], ['$_']}], Fun = fun() -> mnesia:select(?TAB, Spec) end, - {atomic, List} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun), + {atomic, List} = mria:ro_transaction(?DASHBOARD_SHARD, Fun), List. @@ -193,7 +193,7 @@ handle_info(clean_jwt, State) -> timer_clean(self()), Now = erlang:system_time(millisecond), Spec = [{{mqtt_admin_jwt, '_', '_', '$1'}, [{'<', '$1', Now}], ['$_']}], - {atomic, JWTList} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, + {atomic, JWTList} = mria:ro_transaction(?DASHBOARD_SHARD, fun() -> mnesia:select(?TAB, Spec) end), destroy(JWTList), {noreply, State}; diff --git a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl index e727c6c88..04a3d8024 100644 --- a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl @@ -49,7 +49,7 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) -> {Tab, _Pid} = proplists:get_value(reg, Config), - ekka_mnesia:clear_table(Tab), + mria:clear_table(Tab), Config. %%-------------------------------------------------------------------- diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index 0d963a2de..0d93805b7 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -150,8 +150,8 @@ handle_continue(?CATCH_UP, State) -> {noreply, State, catch_up(State)}. handle_call(reset, _From, State) -> - _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), - _ = ekka_mnesia:clear_table(?CLUSTER_MFA), + _ = mria:clear_table(?CLUSTER_COMMIT), + _ = mria:clear_table(?CLUSTER_MFA), {reply, ok, State, {continue, ?CATCH_UP}}; handle_call({initiate, MFA}, _From, State = #{node := Node}) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index e3d90fe7d..456ee6a9a 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -157,7 +157,7 @@ match_messages(_, Topic, Cursor) -> end. clean(_) -> - ekka_mnesia:clear_table(?TAB), + mria:clear_table(?TAB), ok. %%-------------------------------------------------------------------- %% Internal functions From 9965d6e0285e9261ebef5dee7e4ba5796cb46bc2 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Thu, 14 Oct 2021 20:51:36 +0200 Subject: [PATCH 04/14] chore(mria): ekka_mnesia:create_table -> mria:create_table --- apps/emqx/src/emqx_alarm.erl | 13 +++++-------- apps/emqx/src/emqx_banned.erl | 9 +++------ apps/emqx/src/emqx_cm_registry.erl | 5 ++--- apps/emqx/src/emqx_router.erl | 8 +++----- apps/emqx/src/emqx_router_helper.erl | 9 +++------ apps/emqx/src/emqx_shared_sub.erl | 9 +++------ apps/emqx/src/emqx_trie.erl | 9 +++------ .../emqx_enhanced_authn_scram_mnesia.erl | 9 +++------ .../src/simple_authn/emqx_authn_mnesia.erl | 9 +++------ apps/emqx_authz/src/emqx_authz_mnesia.erl | 8 +++----- apps/emqx_dashboard/src/emqx_dashboard_admin.erl | 8 +++----- .../src/emqx_dashboard_collection.erl | 8 +++----- apps/emqx_dashboard/src/emqx_dashboard_token.erl | 8 +++----- apps/emqx_gateway/src/emqx_gateway_cm_registry.erl | 7 +++---- apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl | 14 +++++--------- apps/emqx_machine/src/emqx_cluster_rpc.erl | 13 +++++-------- apps/emqx_modules/src/emqx_delayed.erl | 8 +++----- apps/emqx_modules/src/emqx_telemetry.erl | 8 +++----- apps/emqx_modules/test/emqx_delayed_SUITE.erl | 2 +- apps/emqx_modules/test/emqx_telemetry_SUITE.erl | 2 +- apps/emqx_prometheus/src/emqx_prometheus.erl | 2 +- apps/emqx_psk/src/emqx_psk.erl | 9 +++------ apps/emqx_retainer/src/emqx_retainer_mnesia.erl | 5 ++--- 23 files changed, 67 insertions(+), 115 deletions(-) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 39d13cacd..6c4e348c4 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -95,21 +95,18 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?ACTIVATED_ALARM, + ok = mria:create_table(?ACTIVATED_ALARM, [{type, set}, - {disc_copies, [node()]}, + {storage, disc_copies}, {local_content, true}, {record_name, activated_alarm}, {attributes, record_info(fields, activated_alarm)}]), - ok = ekka_mnesia:create_table(?DEACTIVATED_ALARM, + ok = mria:create_table(?DEACTIVATED_ALARM, [{type, ordered_set}, - {disc_copies, [node()]}, + {storage, disc_copies}, {local_content, true}, {record_name, deactivated_alarm}, - {attributes, record_info(fields, deactivated_alarm)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?ACTIVATED_ALARM, disc_copies), - ok = ekka_mnesia:copy_table(?DEACTIVATED_ALARM, disc_copies). + {attributes, record_info(fields, deactivated_alarm)}]). %%-------------------------------------------------------------------- %% API diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 86f79ede8..30f91da6e 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -58,16 +58,13 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?BANNED_TAB, [ + ok = mria:create_table(?BANNED_TAB, [ {type, set}, {rlog_shard, ?COMMON_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, banned}, {attributes, record_info(fields, banned)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?BANNED_TAB, disc_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). %% @doc Start the banned server. -spec(start_link() -> startlink_ret()). diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 65b89a884..c51660353 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -100,15 +100,14 @@ record(ClientId, ChanPid) -> %%-------------------------------------------------------------------- init([]) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {type, bag}, {rlog_shard, ?CM_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, channel}, {attributes, record_info(fields, channel)}, {storage_properties, [{ets, [{read_concurrency, true}, {write_concurrency, true}]}]}]), - ok = ekka_mnesia:copy_table(?TAB, ram_copies), ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), ok = ekka:monitor(membership), {ok, #{}}. diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 4793100ba..0416d6d72 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -74,16 +74,14 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?ROUTE_TAB, [ + ok = mria:create_table(?ROUTE_TAB, [ {type, bag}, {rlog_shard, ?ROUTE_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, route}, {attributes, record_info(fields, route)}, {storage_properties, [{ets, [{read_concurrency, true}, - {write_concurrency, true}]}]}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?ROUTE_TAB, ram_copies). + {write_concurrency, true}]}]}]). %%-------------------------------------------------------------------- %% Start a router diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 6542e3983..3fc3f42ce 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -59,16 +59,13 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?ROUTING_NODE, [ + ok = mria:create_table(?ROUTING_NODE, [ {type, set}, {rlog_shard, ?ROUTE_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, routing_node}, {attributes, record_info(fields, routing_node)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?ROUTING_NODE, ram_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). %%-------------------------------------------------------------------- %% API diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 299c4df38..9a3c7c74d 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -85,15 +85,12 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {type, bag}, {rlog_shard, ?SHARED_SUB_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, emqx_shared_subscription}, - {attributes, record_info(fields, emqx_shared_subscription)}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, ram_copies). + {attributes, record_info(fields, emqx_shared_subscription)}]). %%-------------------------------------------------------------------- %% API diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 327db16c0..bf2afc910 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -61,16 +61,13 @@ mnesia(boot) -> StoreProps = [{ets, [{read_concurrency, true}, {write_concurrency, true} ]}], - ok = ekka_mnesia:create_table(?TRIE, [ + ok = mria:create_table(?TRIE, [ {rlog_shard, ?ROUTE_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, ?TRIE}, {attributes, record_info(fields, ?TRIE)}, {type, ordered_set}, - {storage_properties, StoreProps}]); -mnesia(copy) -> - %% Copy topics table - ok = ekka_mnesia:copy_table(?TRIE, ram_copies). + {storage_properties, StoreProps}]). %%-------------------------------------------------------------------- %% Topics APIs diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index 4f8b38b1e..d791d1aed 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -63,15 +63,12 @@ %% @doc Create or replicate tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {rlog_shard, ?AUTH_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, user_info}, {attributes, record_info(fields, user_info)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, disc_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). %%------------------------------------------------------------------------------ %% Hocon Schema diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index 314c3d578..590152aaf 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -70,15 +70,12 @@ %% @doc Create or replicate tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {rlog_shard, ?AUTH_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, user_info}, {attributes, record_info(fields, user_info)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, disc_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). %%------------------------------------------------------------------------------ %% Hocon Schema diff --git a/apps/emqx_authz/src/emqx_authz_mnesia.erl b/apps/emqx_authz/src/emqx_authz_mnesia.erl index ab755403e..9c5ca458b 100644 --- a/apps/emqx_authz/src/emqx_authz_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_mnesia.erl @@ -36,14 +36,12 @@ -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> - ok = ekka_mnesia:create_table(?ACL_TABLE, [ + ok = mria:create_table(?ACL_TABLE, [ {type, ordered_set}, {rlog_shard, ?ACL_SHARDED}, - {disc_copies, [node()]}, + {storage, disc_copies}, {attributes, record_info(fields, ?ACL_TABLE)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?ACL_TABLE, disc_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). description() -> "AuthZ with Mnesia". diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index 55cb8ffe5..f79eb92e5 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -50,16 +50,14 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(mqtt_admin, [ + ok = mria:create_table(mqtt_admin, [ {type, set}, {rlog_shard, ?DASHBOARD_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, mqtt_admin}, {attributes, record_info(fields, mqtt_admin)}, {storage_properties, [{ets, [{read_concurrency, true}, - {write_concurrency, true}]}]}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(mqtt_admin, disc_copies). + {write_concurrency, true}]}]}]). %%-------------------------------------------------------------------- %% API diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index 7ef5e0972..03c5bfcb3 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -41,14 +41,12 @@ -define(EXPIRE_INTERVAL, 86400000 * 7). mnesia(boot) -> - ok = ekka_mnesia:create_table(emqx_collect, [ + ok = mria:create_table(emqx_collect, [ {type, set}, {local_content, true}, - {disc_only_copies, [node()]}, + {storage, disc_only_copies}, {record_name, mqtt_collect}, - {attributes, record_info(fields, mqtt_collect)}]); -mnesia(copy) -> - mnesia:add_table_copy(emqx_collect, node(), disc_only_copies). + {attributes, record_info(fields, mqtt_collect)}]). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_token.erl b/apps/emqx_dashboard/src/emqx_dashboard_token.erl index 611ef44a2..5b38a163f 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_token.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_token.erl @@ -77,16 +77,14 @@ destroy_by_username(Username) -> do_destroy_by_username(Username). mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {type, set}, {rlog_shard, ?DASHBOARD_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, mqtt_admin_jwt}, {attributes, record_info(fields, mqtt_admin_jwt)}, {storage_properties, [{ets, [{read_concurrency, true}, - {write_concurrency, true}]}]}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, disc_copies). + {write_concurrency, true}]}]}]). %%-------------------------------------------------------------------- %% jwt apply diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 51c4e95c2..e82537285 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -89,16 +89,15 @@ record(ClientId, ChanPid) -> init([Type]) -> Tab = tabname(Type), - ok = ekka_mnesia:create_table(Tab, [ + ok = mria:create_table(Tab, [ {type, bag}, {rlog_shard, ?CM_SHARD}, - {ram_copies, [node()]}, + {storage, ram_copies}, {record_name, channel}, {attributes, record_info(fields, channel)}, {storage_properties, [{ets, [{read_concurrency, true}, {write_concurrency, true}]}]}]), - ok = ekka_mnesia:copy_table(Tab, ram_copies), - %%ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), + ok = mria:wait_for_tables([Tab]), ok = ekka:monitor(membership), {ok, #{type => Type}}. diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 863ea2be7..48f8e9b5f 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -65,13 +65,10 @@ %mnesia(boot) -> % %% Optimize storage % StoreProps = [{ets, [{read_concurrency, true}]}], -% ok = ekka_mnesia:create_table(?MODULE, [ +% ok = mria:create_table(?MODULE, [ % {attributes, record_info(fields, emqx_sn_registry)}, % {ram_copies, [node()]}, -% {storage_properties, StoreProps}]); -% -%mnesia(copy) -> -% ok = ekka_mnesia:copy_table(?MODULE, ram_copies). +% {storage_properties, StoreProps}]). -type registry() :: {Tab :: atom(), RegistryPid :: pid()}. @@ -143,15 +140,14 @@ init([InstaId, PredefTopics]) -> %% {ClientId, TopicId} -> TopicName %% {ClientId, TopicName} -> TopicId Tab = name(InstaId), - ok = ekka_mnesia:create_table(Tab, [ - {ram_copies, [node()]}, + ok = mria:create_table(Tab, [ + {storage, ram_copies}, {record_name, emqx_sn_registry}, {attributes, record_info(fields, emqx_sn_registry)}, {storage_properties, [{ets, [{read_concurrency, true}]}]}, {rlog_shard, ?SN_SHARD} ]), - ok = ekka_mnesia:copy_table(Tab, ram_copies), - ok = ekka_rlog:wait_for_shards([?SN_SHARD], infinity), + ok = mria:wait_for_tables([Tab]), % FIXME: %ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), MaxPredefId = lists:foldl( diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index 0d93805b7..ebebd19a4 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -45,21 +45,18 @@ %%% API %%%=================================================================== mnesia(boot) -> - ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ + ok = mria:create_table(?CLUSTER_MFA, [ {type, ordered_set}, {rlog_shard, ?EMQX_MACHINE_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, cluster_rpc_mfa}, {attributes, record_info(fields, cluster_rpc_mfa)}]), - ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ + ok = mria:create_table(?CLUSTER_COMMIT, [ {type, set}, {rlog_shard, ?EMQX_MACHINE_SHARD}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, cluster_rpc_commit}, - {attributes, record_info(fields, cluster_rpc_commit)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(cluster_rpc_mfa, disc_copies), - ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). + {attributes, record_info(fields, cluster_rpc_commit)}]). start_link() -> start_link(node(), ?MODULE, get_retry_ms()). diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index f2eef3f39..15cafb614 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -64,14 +64,12 @@ %% Mnesia bootstrap %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {type, ordered_set}, - {disc_copies, [node()]}, + {storage, disc_copies}, {local_content, true}, {record_name, delayed_message}, - {attributes, record_info(fields, delayed_message)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, disc_copies). + {attributes, record_info(fields, delayed_message)}]). %%-------------------------------------------------------------------- %% Hooks diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 3c3f9bd6a..9a4881d9b 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -91,14 +91,12 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?TELEMETRY, + ok = mria:create_table(?TELEMETRY, [{type, set}, - {disc_copies, [node()]}, + {storage, disc_copies}, {local_content, true}, {record_name, telemetry}, - {attributes, record_info(fields, telemetry)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TELEMETRY, disc_copies). + {attributes, record_info(fields, telemetry)}]). %%-------------------------------------------------------------------- %% API diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index 3a070a1e8..81c2c1577 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -35,7 +35,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ekka_mnesia:start(), + mria:start(), ok = emqx_delayed:mnesia(boot), emqx_common_test_helpers:start_apps([emqx_modules]), Config. diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index cf1ddff93..5380b55a6 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -28,7 +28,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ok = ekka_mnesia:start(), + ok = mria:start(), ok = emqx_telemetry:mnesia(boot), emqx_common_test_helpers:start_apps([emqx_modules]), Config. diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index bbdbab6b2..0a7aaa8e4 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -590,6 +590,6 @@ emqx_cluster() -> ]. emqx_cluster_data() -> - #{running_nodes := Running, stopped_nodes := Stopped} = ekka_mnesia:cluster_info(), + #{running_nodes := Running, stopped_nodes := Stopped} = mria:cluster_info(), [{nodes_running, length(Running)}, {nodes_stopped, length(Stopped)}]. diff --git a/apps/emqx_psk/src/emqx_psk.erl b/apps/emqx_psk/src/emqx_psk.erl index 65ce1025b..871276c24 100644 --- a/apps/emqx_psk/src/emqx_psk.erl +++ b/apps/emqx_psk/src/emqx_psk.erl @@ -64,16 +64,13 @@ %% @doc Create or replicate tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {rlog_shard, ?PSK_SHARD}, {type, ordered_set}, - {disc_copies, [node()]}, + {storage, disc_copies}, {record_name, psk_entry}, {attributes, record_info(fields, psk_entry)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, disc_copies). + {storage_properties, [{ets, [{read_concurrency, true}]}]}]). %%------------------------------------------------------------------------------ %% APIs diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 456ee6a9a..478234def 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -52,14 +52,13 @@ create_resource(#{storage_type := StorageType}) -> {read_concurrency, true}, {write_concurrency, true}]}, {dets, [{auto_save, 1000}]}], - ok = ekka_mnesia:create_table(?TAB, [ + ok = mria:create_table(?TAB, [ {type, set}, {rlog_shard, ?RETAINER_SHARD}, - {Copies, [node()]}, + {storage, Copies}, {record_name, retained}, {attributes, record_info(fields, retained)}, {storage_properties, StoreProps}]), - ok = ekka_mnesia:copy_table(?TAB, Copies), ok = ekka_rlog:wait_for_shards([?RETAINER_SHARD], infinity), case mnesia:table_info(?TAB, storage_type) of Copies -> ok; From e14a62d4d6e76e0651b5637f1adafe185fe571fb Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Fri, 15 Oct 2021 17:47:49 +0200 Subject: [PATCH 05/14] chore(mria): ekka_mnesia:start/stop -> mria:start/stop --- apps/emqx/test/emqx_banned_SUITE.erl | 5 ++--- apps/emqx/test/emqx_flapping_SUITE.erl | 2 +- apps/emqx/test/emqx_trie_SUITE.erl | 5 ++--- apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl | 5 ++--- apps/emqx_dashboard/src/emqx_dashboard_collection.erl | 2 +- apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl | 2 +- apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl | 2 +- apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl | 4 ++-- apps/emqx_management/test/emqx_mgmt_api_test_util.erl | 2 +- 9 files changed, 13 insertions(+), 16 deletions(-) diff --git a/apps/emqx/test/emqx_banned_SUITE.erl b/apps/emqx/test/emqx_banned_SUITE.erl index b215b5055..258030513 100644 --- a/apps/emqx/test/emqx_banned_SUITE.erl +++ b/apps/emqx/test/emqx_banned_SUITE.erl @@ -33,8 +33,8 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ekka:stop(), - ekka_mnesia:ensure_stopped(), - ekka_mnesia:delete_schema(). + mria:ensure_stopped(), + mria_mnesia:delete_schema(). t_add_delete(_) -> Banned = #banned{who = {clientid, <<"TestClient">>}, @@ -92,4 +92,3 @@ t_unused(_) -> ?assertEqual(ok, Banned ! ok), timer:sleep(500), %% expiry timer ok = emqx_banned:stop(). - diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index 88a4d8ee0..b5a5314d7 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -35,7 +35,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([]), - ekka_mnesia:delete_schema(), %% Clean emqx_banned table + mria_mnesia:delete_schema(), %% Clean emqx_banned table ok. t_detect_check(_) -> diff --git a/apps/emqx/test/emqx_trie_SUITE.erl b/apps/emqx/test/emqx_trie_SUITE.erl index e22e643b0..713671766 100644 --- a/apps/emqx/test/emqx_trie_SUITE.erl +++ b/apps/emqx/test/emqx_trie_SUITE.erl @@ -50,8 +50,8 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ekka:stop(), - ekka_mnesia:ensure_stopped(), - ekka_mnesia:delete_schema(). + mria:ensure_stopped(), + mria_mnesia:delete_schema(). init_per_testcase(_TestCase, Config) -> clear_tables(), @@ -194,4 +194,3 @@ trans(Fun) -> mnesia:transaction(Fun). trans(Fun, Args) -> mnesia:transaction(Fun, Args). - diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index 58f64be90..744267e74 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -41,8 +41,8 @@ all() -> [t_auto_subscribe, t_update]. -init_per_suite(Config) -> - ekka_mnesia:start(), +init_per_suite(Config) -> + mria:start(), application:stop(?APP), meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]), @@ -159,4 +159,3 @@ check_subs([{{_, Topic}, #{subid := ?CLIENT_ID}} | Subs], List) -> check_subs(Subs, lists:delete(Topic, List)); check_subs([_ | Subs], List) -> check_subs(Subs, List). - diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index 03c5bfcb3..9f856c36d 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -160,7 +160,7 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) -> diff(Sent, Sent0), diff(Dropped, Dropped0)}, Ts = get_local_time(), - mria:transaction(ekka_mnesia:local_content_shard(), + mria:transaction(mria:local_content_shard(), fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]), {Received, Sent, Dropped}. diff --git a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl index cb947b995..b9008d128 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -50,7 +50,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_management]), - ekka_mnesia:ensure_stopped(). + mria:ensure_stopped(). set_special_configs(emqx_management) -> emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], diff --git a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl index 04a3d8024..d7a8f1d1c 100644 --- a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl @@ -35,7 +35,7 @@ all() -> init_per_suite(Config) -> application:ensure_all_started(ekka), - ekka_mnesia:start(), + mria:start(), Config. end_per_suite(_Config) -> diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index 4e3b2d2c2..8da2dd107 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -54,8 +54,8 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ekka:stop(), - ekka_mnesia:ensure_stopped(), - ekka_mnesia:delete_schema(), + mria:stop(), + mria_mnesia:delete_schema(), meck:unload(emqx_alarm), ok. diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index c023267a6..cdad91b0c 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -24,7 +24,7 @@ init_suite() -> init_suite([]). init_suite(Apps) -> - ekka_mnesia:start(), + mria:start(), application:load(emqx_management), emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], fun set_special_configs/1). From 326923850c1a920bf4520b596da49d3ea7cebcde Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Thu, 14 Oct 2021 20:54:46 +0200 Subject: [PATCH 06/14] chore(mria): Get rid of copy_mnesia callback --- apps/emqx/src/emqx_alarm.erl | 1 - apps/emqx/src/emqx_banned.erl | 1 - apps/emqx/src/emqx_router.erl | 1 - apps/emqx/src/emqx_router_helper.erl | 1 - apps/emqx/src/emqx_shared_sub.erl | 1 - apps/emqx/src/emqx_trie.erl | 1 - .../src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl | 1 - apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl | 1 - apps/emqx_authz/src/emqx_authz_mnesia.erl | 1 - apps/emqx_dashboard/src/emqx_dashboard_admin.erl | 1 - apps/emqx_dashboard/src/emqx_dashboard_collection.erl | 1 - apps/emqx_dashboard/src/emqx_dashboard_token.erl | 1 - apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl | 1 - apps/emqx_machine/src/emqx_cluster_rpc.erl | 1 - apps/emqx_modules/src/emqx_delayed.erl | 1 - apps/emqx_modules/src/emqx_telemetry.erl | 1 - apps/emqx_psk/src/emqx_psk.erl | 1 - 17 files changed, 17 deletions(-) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 6c4e348c4..83a12472d 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -26,7 +26,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([post_config_update/4]). diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 30f91da6e..89442a6aa 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -27,7 +27,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([start_link/0, stop/0]). diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 0416d6d72..915b01dd6 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -28,7 +28,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([start_link/2]). diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 3fc3f42ce..edec76d06 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -27,7 +27,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). %% API -export([ start_link/0 diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 9a3c7c74d..79a7d5522 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -28,7 +28,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). %% APIs -export([start_link/0]). diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index bf2afc910..4354a2bab 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -22,7 +22,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). %% Trie APIs -export([ insert/1 diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index d791d1aed..92bdf0619 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -46,7 +46,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -record(user_info, { user_id diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index 590152aaf..095dc6ef8 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -59,7 +59,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -define(TAB, ?MODULE). diff --git a/apps/emqx_authz/src/emqx_authz_mnesia.erl b/apps/emqx_authz/src/emqx_authz_mnesia.erl index 9c5ca458b..3851affed 100644 --- a/apps/emqx_authz/src/emqx_authz_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_mnesia.erl @@ -32,7 +32,6 @@ -endif. -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index f79eb92e5..2a7f2410a 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -21,7 +21,6 @@ -include("emqx_dashboard.hrl"). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). %% Mnesia bootstrap -export([mnesia/1]). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index 9f856c36d..2108d0919 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -25,7 +25,6 @@ -export([get_local_time/0]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). %% Mnesia bootstrap -export([mnesia/1]). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_token.erl b/apps/emqx_dashboard/src/emqx_dashboard_token.erl index 5b38a163f..b1eb32d74 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_token.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_token.erl @@ -28,7 +28,6 @@ ]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([mnesia/1]). diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 48f8e9b5f..7be7a28e2 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -58,7 +58,6 @@ %-export([mnesia/1]). %-boot_mnesia({mnesia, [boot]}). -%-copy_mnesia({mnesia, [copy]}). %%% @doc Create or replicate tables. %-spec(mnesia(boot | copy) -> ok). diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index ebebd19a4..e64623bdc 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -30,7 +30,6 @@ -endif. -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -include_lib("emqx/include/logger.hrl"). -include("emqx_machine.hrl"). diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 15cafb614..401e2a10c 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -25,7 +25,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([ start_link/0 , on_message_publish/1 diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 9a4881d9b..f9903b845 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -30,7 +30,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -export([ start_link/0 , stop/0 diff --git a/apps/emqx_psk/src/emqx_psk.erl b/apps/emqx_psk/src/emqx_psk.erl index 871276c24..7de332cb9 100644 --- a/apps/emqx_psk/src/emqx_psk.erl +++ b/apps/emqx_psk/src/emqx_psk.erl @@ -47,7 +47,6 @@ -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). -define(TAB, ?MODULE). -define(PSK_SHARD, emqx_psk_shard). From d14b8e7da9e86416c2c04e3094f44776d3d20aa3 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Thu, 14 Oct 2021 21:49:07 +0200 Subject: [PATCH 07/14] chore(mria): ekka_rlog -> mria_rlog --- apps/emqx/src/emqx_app.erl | 2 +- apps/emqx/src/emqx_cm_registry.erl | 2 +- apps/emqx/src/emqx_router.erl | 2 +- apps/emqx_authn/src/emqx_authn_app.erl | 2 +- apps/emqx_authz/src/emqx_authz_app.erl | 2 +- apps/emqx_dashboard/src/emqx_dashboard_app.erl | 2 +- apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl | 2 +- apps/emqx_machine/src/emqx_cluster_rpc.erl | 2 +- apps/emqx_machine/src/emqx_machine.erl | 2 +- apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl | 2 +- apps/emqx_management/src/emqx_mgmt_app.erl | 2 +- apps/emqx_retainer/src/emqx_retainer_mnesia.erl | 2 +- 12 files changed, 12 insertions(+), 12 deletions(-) diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index cd435b864..662439397 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -57,7 +57,7 @@ stop(_State) -> ok. ensure_ekka_started() -> ekka:start(), - ok = ekka_rlog:wait_for_shards(?BOOT_SHARDS, infinity). + ok = mria_rlog:wait_for_shards(?BOOT_SHARDS, infinity). %% @doc Call this function to make emqx boot without loading config, %% in case we want to delegate the config load to a higher level app diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index c51660353..492c6dacd 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -108,7 +108,7 @@ init([]) -> {attributes, record_info(fields, channel)}, {storage_properties, [{ets, [{read_concurrency, true}, {write_concurrency, true}]}]}]), - ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?CM_SHARD], infinity), ok = ekka:monitor(membership), {ok, #{}}. diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 915b01dd6..ad63c6657 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -252,7 +252,7 @@ maybe_trans(Fun, Args) -> trans(Fun, Args); global -> %% Assert: - mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash + mnesia = mria_rlog:backend(), %% TODO: do something smarter than just crash lock_router(), try mnesia:sync_dirty(Fun, Args) after diff --git a/apps/emqx_authn/src/emqx_authn_app.erl b/apps/emqx_authn/src/emqx_authn_app.erl index d297c9042..b71a3e16b 100644 --- a/apps/emqx_authn/src/emqx_authn_app.erl +++ b/apps/emqx_authn/src/emqx_authn_app.erl @@ -30,7 +30,7 @@ %%------------------------------------------------------------------------------ start(_StartType, _StartArgs) -> - ok = ekka_rlog:wait_for_shards([?AUTH_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?AUTH_SHARD], infinity), {ok, Sup} = emqx_authn_sup:start_link(), ok = ?AUTHN:register_providers(providers()), ok = initialize(), diff --git a/apps/emqx_authz/src/emqx_authz_app.erl b/apps/emqx_authz/src/emqx_authz_app.erl index f868ac342..a5044443b 100644 --- a/apps/emqx_authz/src/emqx_authz_app.erl +++ b/apps/emqx_authz/src/emqx_authz_app.erl @@ -12,7 +12,7 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> - ok = ekka_rlog:wait_for_shards([?ACL_SHARDED], infinity), + ok = mria_rlog:wait_for_shards([?ACL_SHARDED], infinity), {ok, Sup} = emqx_authz_sup:start_link(), ok = emqx_authz:init(), {ok, Sup}. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_app.erl b/apps/emqx_dashboard/src/emqx_dashboard_app.erl index 4e1b0caec..16acdb182 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_app.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_app.erl @@ -26,7 +26,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_dashboard_sup:start_link(), - ok = ekka_rlog:wait_for_shards([?DASHBOARD_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?DASHBOARD_SHARD], infinity), _ = emqx_dashboard:start_listeners(), emqx_dashboard_cli:load(), ok = emqx_dashboard_admin:add_default_user(), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 7be7a28e2..b52d9c7dc 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -148,7 +148,7 @@ init([InstaId, PredefTopics]) -> ]), ok = mria:wait_for_tables([Tab]), % FIXME: - %ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), + %ok = mria_rlog:wait_for_shards([?CM_SHARD], infinity), MaxPredefId = lists:foldl( fun(#{id := TopicId, topic := TopicName0}, AccId) -> TopicName = iolist_to_binary(TopicName0), diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index e64623bdc..7ca6b3571 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -84,7 +84,7 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu MFA = {initiate, {M, F, A}}, Begin = erlang:monotonic_time(), InitRes = - case ekka_rlog:role() of + case mria_rlog:role() of core -> gen_server:call(?MODULE, MFA, Timeout); replicant -> %% the initiate transaction must happened on core node diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 0f8208b46..996f40bf7 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -36,7 +36,7 @@ start() -> ok = print_otp_version_warning(), ok = load_config_files(), ekka:start(), - ekka_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), + mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), ok. graceful_shutdown() -> diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index 8da2dd107..cf6c794a7 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -43,7 +43,7 @@ init_per_suite(Config) -> application:load(emqx), application:load(emqx_machine), ok = ekka:start(), - ok = ekka_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), application:set_env(emqx_machine, cluster_call_max_history, 100), application:set_env(emqx_machine, cluster_call_clean_interval, 1000), application:set_env(emqx_machine, cluster_call_retry_interval, 900), diff --git a/apps/emqx_management/src/emqx_mgmt_app.erl b/apps/emqx_management/src/emqx_mgmt_app.erl index 8dc1651da..4dae30a97 100644 --- a/apps/emqx_management/src/emqx_mgmt_app.erl +++ b/apps/emqx_management/src/emqx_mgmt_app.erl @@ -28,7 +28,7 @@ start(_Type, _Args) -> {ok, Sup} = emqx_mgmt_sup:start_link(), - ok = ekka_rlog:wait_for_shards([?MANAGEMENT_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?MANAGEMENT_SHARD], infinity), emqx_mgmt_cli:load(), {ok, Sup}. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 478234def..0d112995f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -59,7 +59,7 @@ create_resource(#{storage_type := StorageType}) -> {record_name, retained}, {attributes, record_info(fields, retained)}, {storage_properties, StoreProps}]), - ok = ekka_rlog:wait_for_shards([?RETAINER_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity), case mnesia:table_info(?TAB, storage_type) of Copies -> ok; _Other -> From 7086135ec8480753b5e3aec905920b5b50b705f6 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Fri, 15 Oct 2021 17:40:22 +0200 Subject: [PATCH 08/14] chore(mria): Remove the obsolete annotiation --- apps/emqx_machine/src/emqx_cluster_rpc.erl | 3 --- 1 file changed, 3 deletions(-) diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index 7ca6b3571..f7b755137 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -34,9 +34,6 @@ -include_lib("emqx/include/logger.hrl"). -include("emqx_machine.hrl"). --rlog_shard({?EMQX_MACHINE_SHARD, ?CLUSTER_MFA}). --rlog_shard({?EMQX_MACHINE_SHARD, ?CLUSTER_COMMIT}). - -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). From 73a4816d9cd68305aab9619eca2b23272191dc0c Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Fri, 15 Oct 2021 17:41:03 +0200 Subject: [PATCH 09/14] fix(mria): Hook up mria to emqx_machine --- apps/emqx_machine/src/emqx_cluster_rpc.erl | 2 +- apps/emqx_machine/src/emqx_machine.erl | 1 + apps/emqx_machine/src/emqx_machine_schema.erl | 6 +++--- apps/emqx_prometheus/src/emqx_prometheus.erl | 2 +- rebar.config.erl | 2 +- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index f7b755137..a55d17616 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -87,7 +87,7 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu %% the initiate transaction must happened on core node %% make sure MFA(in the transaction) and the transaction on the same node %% don't need rpc again inside transaction. - case ekka_rlog_status:upstream_node(?EMQX_MACHINE_SHARD) of + case mria_status:upstream_node(?EMQX_MACHINE_SHARD) of {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); disconnected -> {error, disconnected} end diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 996f40bf7..1df7dba36 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -35,6 +35,7 @@ start() -> ok = set_backtrace_depth(), ok = print_otp_version_warning(), ok = load_config_files(), + mria:start(), ekka:start(), mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), ok. diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl index a124166e5..05c7d3ffa 100644 --- a/apps/emqx_machine/src/emqx_machine_schema.erl +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -139,7 +139,7 @@ fields("cluster") -> #{})} , {"db_backend", sc(hoconsc:enum([mnesia, rlog]), - #{ mapping => "ekka.db_backend" + #{ mapping => "mria.db_backend" , default => mnesia })} , {"rlog", @@ -245,12 +245,12 @@ fields(cluster_k8s) -> fields("rlog") -> [ {"role", sc(hoconsc:enum([core, replicant]), - #{ mapping => "ekka.node_role" + #{ mapping => "mria.node_role" , default => core })} , {"core_nodes", sc(emqx_schema:comma_separated_atoms(), - #{ mapping => "ekka.core_nodes" + #{ mapping => "mria.core_nodes" , default => [] })} ]; diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 0a7aaa8e4..4667de24f 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -590,6 +590,6 @@ emqx_cluster() -> ]. emqx_cluster_data() -> - #{running_nodes := Running, stopped_nodes := Stopped} = mria:cluster_info(), + #{running_nodes := Running, stopped_nodes := Stopped} = mria_mnesia:cluster_info(), [{nodes_running, length(Running)}, {nodes_stopped, length(Stopped)}]. diff --git a/rebar.config.erl b/rebar.config.erl index a3f32d0c4..d5ff6de80 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -60,7 +60,7 @@ community_plugin_overrides() -> %% Temporary workaround for a rebar3 erl_opts duplication %% bug. Ideally, we want to set this define globally snabbkaffe_overrides() -> - Apps = [snabbkaffe, ekka], + Apps = [snabbkaffe, ekka, mria], [{add, App, [{erl_opts, [{d, snk_kind, msg}]}]} || App <- Apps]. config(HasElixir) -> From fcdf10080dcb39f09d546cd3b4d9512d6b895d36 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Fri, 15 Oct 2021 20:00:35 +0200 Subject: [PATCH 10/14] chore(ct): ekka_mnesia -> mria_mnesia --- apps/emqx/test/emqx_common_test_helpers.erl | 4 ++-- apps/emqx/test/props/prop_emqx_sys.erl | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 119f39ae2..e52aad3f2 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -290,8 +290,8 @@ reload(App, SpecAppConfigHandler) -> application:start(App). ensure_mnesia_stopped() -> - ekka_mnesia:ensure_stopped(), - ekka_mnesia:delete_schema(). + mria:stop(), + mria_mnesia:delete_schema(). %% Help function to wait for Fun to yield 'true'. wait_for(Fn, Ln, F, Timeout) -> diff --git a/apps/emqx/test/props/prop_emqx_sys.erl b/apps/emqx/test/props/prop_emqx_sys.erl index b6a0e0e38..3f0373d39 100644 --- a/apps/emqx/test/props/prop_emqx_sys.erl +++ b/apps/emqx/test/props/prop_emqx_sys.erl @@ -80,8 +80,8 @@ do_mock(emqx_broker) -> fun(Msg) -> {node(), <<"test">>, Msg} end); do_mock(emqx_stats) -> meck:expect(emqx_stats, getstats, fun() -> [0] end); -do_mock(ekka_mnesia) -> - meck:expect(ekka_mnesia, running_nodes, fun() -> [node()] end); +do_mock(mria_mnesia) -> + meck:expect(mria_mnesia, running_nodes, fun() -> [node()] end); do_mock(emqx_metrics) -> meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end). @@ -129,4 +129,3 @@ postcondition(_State, {call, _Mod, _Fun, _Args}, _Res) -> next_state(State, _Res, {call, _Mod, _Fun, _Args}) -> NewState = State, NewState. - From 07ea6e56894866597b680a416ac5320524558f60 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Mon, 18 Oct 2021 11:42:00 +0200 Subject: [PATCH 11/14] fix(mria): Fix startup sequence --- apps/emqx/src/emqx.app.src | 2 +- apps/emqx/test/emqx_banned_SUITE.erl | 4 +--- apps/emqx/test/emqx_common_test_helpers.erl | 3 ++- apps/emqx/test/emqx_router_SUITE.erl | 5 ----- apps/emqx/test/emqx_trie_SUITE.erl | 5 +---- apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl | 2 +- apps/emqx_machine/src/emqx_machine.erl | 1 - 7 files changed, 6 insertions(+), 16 deletions(-) diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 3f167d093..f3ea37b67 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -5,7 +5,7 @@ {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, - {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy,lc]}, + {applications, [kernel,stdlib,gproc,gen_rpc,mria,esockd,cowboy,sasl,os_mon,jiffy,lc]}, {mod, {emqx_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx/test/emqx_banned_SUITE.erl b/apps/emqx/test/emqx_banned_SUITE.erl index 258030513..de117ab00 100644 --- a/apps/emqx/test/emqx_banned_SUITE.erl +++ b/apps/emqx/test/emqx_banned_SUITE.erl @@ -27,13 +27,11 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> application:load(emqx), ok = ekka:start(), - %% for coverage - ok = emqx_banned:mnesia(copy), Config. end_per_suite(_Config) -> ekka:stop(), - mria:ensure_stopped(), + mria:stop(), mria_mnesia:delete_schema(). t_add_delete(_) -> diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index e52aad3f2..bc2fdb7c0 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -216,7 +216,8 @@ generate_config(SchemaFile, ConfigFile) -> -spec(stop_apps(list()) -> ok). stop_apps(Apps) -> - [application:stop(App) || App <- Apps ++ [emqx, mnesia]]. + [application:stop(App) || App <- Apps ++ [emqx, mria, mnesia]], + ok. %% backward compatible deps_path(App, RelativePath) -> app_path(App, RelativePath). diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index aeb60fc2c..0d14a7992 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -41,10 +41,6 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_tables(). -t_mnesia(_) -> - %% for coverage - ok = emqx_router:mnesia(copy). - % t_add_route(_) -> % error('TODO'). @@ -117,4 +113,3 @@ t_unexpected(_) -> clear_tables() -> lists:foreach(fun mnesia:clear_table/1, [emqx_route, emqx_trie, emqx_trie_node]). - diff --git a/apps/emqx/test/emqx_trie_SUITE.erl b/apps/emqx/test/emqx_trie_SUITE.erl index 713671766..9135a45dc 100644 --- a/apps/emqx/test/emqx_trie_SUITE.erl +++ b/apps/emqx/test/emqx_trie_SUITE.erl @@ -50,7 +50,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ekka:stop(), - mria:ensure_stopped(), + mria:stop(), mria_mnesia:delete_schema(). init_per_testcase(_TestCase, Config) -> @@ -60,9 +60,6 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_tables(). -t_mnesia(_) -> - ok = ?TRIE:mnesia(copy). - t_insert(_) -> Fun = fun() -> ?TRIE:insert(<<"sensor/1/metric/2">>), diff --git a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl index b9008d128..9f5fb3526 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -50,7 +50,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_management]), - mria:ensure_stopped(). + mria:stop(). set_special_configs(emqx_management) -> emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 1df7dba36..996f40bf7 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -35,7 +35,6 @@ start() -> ok = set_backtrace_depth(), ok = print_otp_version_warning(), ok = load_config_files(), - mria:start(), ekka:start(), mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), ok. From 37a1c45af0e7c335ee7e6acd187f2146cb6c97e4 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Mon, 18 Oct 2021 17:13:11 +0200 Subject: [PATCH 12/14] fix(dialyzer): Fix problems found by dialyzer --- apps/emqx/src/emqx_banned.erl | 2 +- apps/emqx/src/emqx_router_helper.erl | 2 +- apps/emqx_dashboard/src/emqx_dashboard_collection.erl | 4 ++-- apps/emqx_dashboard/src/emqx_dashboard_token.erl | 2 +- apps/emqx_machine/src/emqx_machine.erl | 2 +- apps/emqx_retainer/src/emqx_retainer_mnesia.erl | 10 +++------- 6 files changed, 9 insertions(+), 13 deletions(-) diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 89442a6aa..dfd299d90 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -191,7 +191,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - mria:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), + _ = mria:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index edec76d06..aecce70ac 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -132,7 +132,7 @@ handle_info({mnesia_table_event, Event}, State) -> handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> global:trans({?LOCK, self()}, fun() -> - mria:transaction(fun cleanup_routes/1, [Node]) + mria:transaction(?ROUTE_SHARD, fun cleanup_routes/1, [Node]) end), ok = mria:dirty_delete(?ROUTING_NODE, Node), {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate}; diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index 2108d0919..0eb1e033b 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -159,8 +159,8 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) -> diff(Sent, Sent0), diff(Dropped, Dropped0)}, Ts = get_local_time(), - mria:transaction(mria:local_content_shard(), - fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]), + _ = mria:transaction(mria:local_content_shard(), + fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]), {Received, Sent, Dropped}. avg(Items) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_token.erl b/apps/emqx_dashboard/src/emqx_dashboard_token.erl index b1eb32d74..5a6771ae5 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_token.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_token.erl @@ -101,7 +101,7 @@ do_sign(Username, Password) -> Signed = jose_jwt:sign(JWK, JWS, JWT), {_, Token} = jose_jws:compact(Signed), JWTRec = format(Token, Username, ExpTime), - mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]), + _ = mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]), {ok, Token}. do_verify(Token)-> diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 996f40bf7..313f45dd3 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -36,7 +36,7 @@ start() -> ok = print_otp_version_warning(), ok = load_config_files(), ekka:start(), - mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), + ok = mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), ok. graceful_shutdown() -> diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 0d112995f..6b5a9ac14 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -116,12 +116,8 @@ delete_message(_, Topic) -> Fun = fun() -> mnesia:delete({?TAB, Tokens}) end, - case mria:transaction(?RETAINER_SHARD, Fun) of - {atomic, Result} -> - Result; - ok -> - ok - end + _ = mria:transaction(?RETAINER_SHARD, Fun), + ok end, ok. @@ -156,7 +152,7 @@ match_messages(_, Topic, Cursor) -> end. clean(_) -> - mria:clear_table(?TAB), + _ = mria:clear_table(?TAB), ok. %%-------------------------------------------------------------------- %% Internal functions From 0d2d5f6bf98fddfc5b34d6e3cb205462666d4dfc Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Mon, 18 Oct 2021 22:09:48 +0200 Subject: [PATCH 13/14] fix(emqx_alarm): Wait for tables --- apps/emqx/src/emqx_alarm.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 83a12472d..2585494eb 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -180,6 +180,7 @@ to_rfc3339(Timestamp) -> %%-------------------------------------------------------------------- init([]) -> + _ = mria:wait_for_tables([?ACTIVATED_ALARM, ?DEACTIVATED_ALARM]), deactivate_all_alarms(), ok = emqx_config_handler:add_handler([alarm], ?MODULE), {ok, #state{timer = ensure_timer(undefined, get_validity_period())}}. From b3a87532194f5817f7eb28ba6aae8b6f76b1685e Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Tue, 19 Oct 2021 12:24:35 +0200 Subject: [PATCH 14/14] fix(mria): Replace mnesia calls with mria --- apps/emqx/src/emqx_router.erl | 2 +- apps/emqx/test/emqx_trie_SUITE.erl | 4 +-- .../test/emqx_authz_mnesia_SUITE.erl | 26 +++++++++---------- apps/emqx_modules/src/emqx_telemetry.erl | 2 +- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index ad63c6657..3337b57c8 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -273,7 +273,7 @@ trans(Fun, Args) -> {WPid, RefMon} = spawn_monitor( %% NOTE: this is under the assumption that crashes in Fun - %% are caught by mnesia:transaction/2. + %% are caught by mria:transaction/2. %% Future changes should keep in mind that this process %% always exit with database write result. fun() -> diff --git a/apps/emqx/test/emqx_trie_SUITE.erl b/apps/emqx/test/emqx_trie_SUITE.erl index 9135a45dc..00d64877c 100644 --- a/apps/emqx/test/emqx_trie_SUITE.erl +++ b/apps/emqx/test/emqx_trie_SUITE.erl @@ -188,6 +188,6 @@ t_delete3(_) -> clear_tables() -> emqx_trie:clear_tables(). trans(Fun) -> - mnesia:transaction(Fun). + mria:transaction(?ROUTE_SHARD, Fun). trans(Fun, Args) -> - mnesia:transaction(Fun, Args). + mria:transaction(?ROUTE_SHARD, Fun, Args). diff --git a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl index ed82e9b1d..f2562becc 100644 --- a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl @@ -54,19 +54,19 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_authz, Config) -> - mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>}, - rules = [{allow, publish, <<"test/%u">>}, - {allow, subscribe, <<"eq #">>} - ] - }]), - mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>}, - rules = [{allow, publish, <<"test/%c">>}, - {deny, subscribe, <<"eq #">>} - ] - }]), - mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL, - rules = [{deny, all, <<"#">>}] - }]), + mria:dirty_write(#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>}, + rules = [{allow, publish, <<"test/%u">>}, + {allow, subscribe, <<"eq #">>} + ] + }), + mria:dirty_write(#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>}, + rules = [{allow, publish, <<"test/%c">>}, + {deny, subscribe, <<"eq #">>} + ] + }), + mria:dirty_write(#emqx_acl{who = ?ACL_TABLE_ALL, + rules = [{deny, all, <<"#">>}] + }), Config; init_per_testcase(_, Config) -> Config. diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index f9903b845..17f32ee58 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -127,7 +127,7 @@ get_telemetry() -> %% gen_server callbacks %%-------------------------------------------------------------------- -%% This is to suppress dialyzer warnings for mnesia:dirty_write and +%% This is to suppress dialyzer warnings for mria:dirty_write and %% dirty_read race condition. Given that the init function is not evaluated %% concurrently in one node, it should be free of race condition. %% Given the chance of having two nodes bootstraping with the write