diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index b851700b9..0c8f5e7ba 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -15,7 +15,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.6"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 2df126ea7..412880bad 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -201,7 +201,7 @@ handle_call({activate_alarm, Name, Details}, _From, State) -> details = Details, message = normalize_message(Name, Details), activate_at = erlang:system_time(microsecond)}, - ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), + mria:dirty_write(?ACTIVATED_ALARM, Alarm), do_actions(activate, Alarm, emqx:get_config([alarm, actions])), {reply, ok, State} end; @@ -282,7 +282,7 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name case mnesia:dirty_first(?DEACTIVATED_ALARM) of '$end_of_table' -> ok; ActivateAt2 -> - ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) + mria:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) end; false -> ok end, @@ -291,8 +291,8 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, normalize_message(Name, Details), erlang:system_time(microsecond)), - ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), - ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name), + mria:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), + mria:dirty_delete(?ACTIVATED_ALARM, Name), do_actions(deactivate, DeActAlarm, emqx:get_config([alarm, actions])). make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> @@ -309,7 +309,7 @@ deactivate_all_alarms() -> details = Details, message = Message, activate_at = ActivateAt}) -> - ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, + mria:dirty_write(?DEACTIVATED_ALARM, #deactivated_alarm{ activate_at = ActivateAt, name = Name, @@ -347,7 +347,7 @@ delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) -> delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) -> case ActivatedAt =< Checkpoint of true -> - ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), + mria:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt), delete_expired_deactivated_alarms(NActivatedAt, Checkpoint); false -> diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 759c9f955..86f79ede8 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -155,13 +155,13 @@ create(#{who := Who, reason := Reason, at := At, until := Until}) -> - ekka_mnesia:dirty_write(?BANNED_TAB, #banned{who = Who, - by = By, - reason = Reason, - at = At, - until = Until}); + mria:dirty_write(?BANNED_TAB, #banned{who = Who, + by = By, + reason = Reason, + at = At, + until = Until}); create(Banned) when is_record(Banned, banned) -> - ekka_mnesia:dirty_write(?BANNED_TAB, Banned). + mria:dirty_write(?BANNED_TAB, Banned). look_up(Who) when is_map(Who) -> look_up(pares_who(Who)); @@ -174,7 +174,7 @@ look_up(Who) -> delete(Who) when is_map(Who)-> delete(pares_who(Who)); delete(Who) -> - ekka_mnesia:dirty_delete(?BANNED_TAB, Who). + mria:dirty_delete(?BANNED_TAB, Who). info(InfoKey) -> mnesia:table_info(?BANNED_TAB, InfoKey). @@ -195,7 +195,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - ekka_mnesia:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), + mria:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index ef7ad6131..65b89a884 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -71,7 +71,7 @@ register_channel(ClientId) when is_binary(ClientId) -> register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> ekka_mnesia:dirty_write(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_write(?TAB, record(ClientId, ChanPid)); false -> ok end. @@ -83,7 +83,7 @@ unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> ekka_mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_delete_object(?TAB, record(ClientId, ChanPid)); false -> ok end. @@ -124,7 +124,7 @@ handle_cast(Msg, State) -> handle_info({membership, {mnesia, down, Node}}, State) -> global:trans({?LOCK, self()}, fun() -> - ekka_mnesia:transaction(?CM_SHARD, fun cleanup_channels/1, [Node]) + mria:transaction(?CM_SHARD, fun cleanup_channels/1, [Node]) end), {noreply, State}; diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 3e5475c13..4793100ba 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -225,7 +225,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- insert_direct_route(Route) -> - ekka_mnesia:dirty_write(?ROUTE_TAB, Route). + mria:dirty_write(?ROUTE_TAB, Route). insert_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE_TAB, Topic}) of @@ -235,7 +235,7 @@ insert_trie_route(Route = #route{topic = Topic}) -> mnesia:write(?ROUTE_TAB, Route, sticky_write). delete_direct_route(Route) -> - ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route). + mria:dirty_delete_object(?ROUTE_TAB, Route). delete_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE_TAB, Topic}) of @@ -280,7 +280,7 @@ trans(Fun, Args) -> %% Future changes should keep in mind that this process %% always exit with database write result. fun() -> - Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of + Res = case mria:transaction(?ROUTE_SHARD, Fun, Args) of {atomic, Ok} -> Ok; {aborted, Reason} -> {error, Reason} end, diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index a88e82d8d..6542e3983 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -87,7 +87,7 @@ monitor(Node) when is_atom(Node) -> case ekka:is_member(Node) orelse ets:member(?ROUTING_NODE, Node) of true -> ok; - false -> ekka_mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node}) + false -> mria:dirty_write(?ROUTING_NODE, #routing_node{name = Node}) end. %%-------------------------------------------------------------------- @@ -136,9 +136,9 @@ handle_info({mnesia_table_event, Event}, State) -> handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> global:trans({?LOCK, self()}, fun() -> - ekka_mnesia:transaction(fun cleanup_routes/1, [Node]) + mria:transaction(fun cleanup_routes/1, [Node]) end), - ok = ekka_mnesia:dirty_delete(?ROUTING_NODE, Node), + ok = mria:dirty_delete(?ROUTING_NODE, Node), {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate}; handle_info({membership, {mnesia, down, Node}}, State) -> diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index ef8e3d288..299c4df38 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -297,7 +297,7 @@ subscribers(Group, Topic) -> init([]) -> {ok, _} = mnesia:subscribe({table, ?TAB, simple}), - {atomic, PMon} = ekka_mnesia:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), + {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), {ok, update_stats(#state{pmon = PMon})}. @@ -309,7 +309,7 @@ init_monitors() -> end, emqx_pmon:new(), ?TAB). handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> - ekka_mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), + mria:dirty_write(?TAB, record(Group, Topic, SubPid)), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) @@ -319,7 +319,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> - ekka_mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), + mria:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}), {reply, ok, State}; @@ -373,7 +373,7 @@ cleanup_down(SubPid) -> ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid), lists:foreach( fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> - ok = ekka_mnesia:dirty_delete_object(?TAB, Record), + ok = mria:dirty_delete_object(?TAB, Record), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}) end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). diff --git a/apps/emqx/test/props/prop_emqx_sys.erl b/apps/emqx/test/props/prop_emqx_sys.erl index 3b2b8b94c..b6a0e0e38 100644 --- a/apps/emqx/test/props/prop_emqx_sys.erl +++ b/apps/emqx/test/props/prop_emqx_sys.erl @@ -29,7 +29,7 @@ [ emqx_metrics , emqx_stats , emqx_broker - , ekka_mnesia + , mria_mnesia ]). -define(ALL(Vars, Types, Exprs), diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index 3daece601..4f8b38b1e 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -268,7 +268,7 @@ trans(Fun) -> trans(Fun, []). trans(Fun, Args) -> - case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of + case mria:transaction(?AUTH_SHARD, Fun, Args) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index c3b6badf7..314c3d578 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -390,7 +390,7 @@ trans(Fun) -> trans(Fun, []). trans(Fun, Args) -> - case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of + case mria:transaction(?AUTH_SHARD, Fun, Args) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl index a6bd0f7b8..da9b552c0 100644 --- a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl @@ -528,10 +528,10 @@ users(get, #{query_string := Qs}) -> end; users(post, #{body := Body}) when is_list(Body) -> lists:foreach(fun(#{<<"username">> := Username, <<"rules">> := Rules}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_USERNAME, Username}, - rules = format_rules(Rules) - }) + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_USERNAME, Username}, + rules = format_rules(Rules) + }) end, Body), {204}. @@ -561,10 +561,10 @@ clients(get, #{query_string := Qs}) -> end; clients(post, #{body := Body}) when is_list(Body) -> lists:foreach(fun(#{<<"clientid">> := Clientid, <<"rules">> := Rules}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_CLIENTID, Clientid}, - rules = format_rules(Rules) - }) + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_CLIENTID, Clientid}, + rules = format_rules(Rules) + }) end, Body), {204}. @@ -581,13 +581,13 @@ user(get, #{bindings := #{username := Username}}) -> end; user(put, #{bindings := #{username := Username}, body := #{<<"username">> := Username, <<"rules">> := Rules}}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_USERNAME, Username}, - rules = format_rules(Rules) - }), + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_USERNAME, Username}, + rules = format_rules(Rules) + }), {204}; user(delete, #{bindings := #{username := Username}}) -> - ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}}), + mria:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}}), {204}. client(get, #{bindings := #{clientid := Clientid}}) -> @@ -603,13 +603,13 @@ client(get, #{bindings := #{clientid := Clientid}}) -> end; client(put, #{bindings := #{clientid := Clientid}, body := #{<<"clientid">> := Clientid, <<"rules">> := Rules}}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = {?ACL_TABLE_CLIENTID, Clientid}, - rules = format_rules(Rules) - }), + mria:dirty_write(#emqx_acl{ + who = {?ACL_TABLE_CLIENTID, Clientid}, + rules = format_rules(Rules) + }), {204}; client(delete, #{bindings := #{clientid := Clientid}}) -> - ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}}), + mria:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}}), {204}. all(get, _) -> @@ -624,17 +624,17 @@ all(get, _) -> } end; all(put, #{body := #{<<"rules">> := Rules}}) -> - ekka_mnesia:dirty_write(#emqx_acl{ - who = ?ACL_TABLE_ALL, - rules = format_rules(Rules) - }), + mria:dirty_write(#emqx_acl{ + who = ?ACL_TABLE_ALL, + rules = format_rules(Rules) + }), {204}. purge(delete, _) -> case emqx_authz_api_sources:get_raw_source(<<"built-in-database">>) of [#{<<"enable">> := false}] -> ok = lists:foreach(fun(Key) -> - ok = ekka_mnesia:dirty_delete(?ACL_TABLE, Key) + ok = mria:dirty_delete(?ACL_TABLE, Key) end, mnesia:dirty_all_keys(?ACL_TABLE)), {204}; [#{<<"enable">> := true}] -> diff --git a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl index 947f46a82..ed82e9b1d 100644 --- a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl @@ -54,24 +54,24 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_authz, Config) -> - mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>}, - rules = [{allow, publish, <<"test/%u">>}, - {allow, subscribe, <<"eq #">>} - ] - }]), - mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>}, - rules = [{allow, publish, <<"test/%c">>}, - {deny, subscribe, <<"eq #">>} - ] - }]), - mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL, - rules = [{deny, all, <<"#">>}] + mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>}, + rules = [{allow, publish, <<"test/%u">>}, + {allow, subscribe, <<"eq #">>} + ] }]), + mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>}, + rules = [{allow, publish, <<"test/%c">>}, + {deny, subscribe, <<"eq #">>} + ] + }]), + mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL, + rules = [{deny, all, <<"#">>}] + }]), Config; init_per_testcase(_, Config) -> Config. end_per_testcase(t_authz, Config) -> - [ ekka_mnesia:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)], + [ mria:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)], Config; end_per_testcase(_, Config) -> Config. @@ -96,7 +96,7 @@ t_authz(_) -> listener => {tcp, default} }, - ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), + ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"#">>)), ?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_username">>)), @@ -106,4 +106,3 @@ t_authz(_) -> ?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"#">>)), ok. - diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index 5af983b4d..72115a80b 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -68,7 +68,7 @@ mnesia(copy) -> -spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}). add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) -> Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, fun add_user_/1, [Admin])). + return(mria:transaction(?DASHBOARD_SHARD, fun add_user_/1, [Admin])). force_add_user(Username, Password, Tags) -> AddFun = fun() -> @@ -76,7 +76,7 @@ force_add_user(Username, Password, Tags) -> password = Password, tags = Tags}) end, - case ekka_mnesia:transaction(?DASHBOARD_SHARD, AddFun) of + case mria:transaction(?DASHBOARD_SHARD, AddFun) of {atomic, ok} -> ok; {aborted, Reason} -> {error, Reason} end. @@ -98,11 +98,11 @@ remove_user(Username) when is_binary(Username) -> end, mnesia:delete({mqtt_admin, Username}) end, - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, Trans)). + return(mria:transaction(?DASHBOARD_SHARD, Trans)). -spec(update_user(binary(), binary()) -> ok | {error, term()}). update_user(Username, Tags) when is_binary(Username) -> - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, fun update_user_/2, [Username, Tags])). + return(mria:transaction(?DASHBOARD_SHARD, fun update_user_/2, [Username, Tags])). %% @private update_user_(Username, Tags) -> @@ -135,7 +135,7 @@ update_pwd(Username, Fun) -> end, mnesia:write(Fun(User)) end, - return(ekka_mnesia:transaction(?DASHBOARD_SHARD, Trans)). + return(mria:transaction(?DASHBOARD_SHARD, Trans)). -spec(lookup_user(binary()) -> [mqtt_admin()]). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index 0e2adf7c3..7ef5e0972 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -162,7 +162,7 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) -> diff(Sent, Sent0), diff(Dropped, Dropped0)}, Ts = get_local_time(), - ekka_mnesia:transaction(ekka_mnesia:local_content_shard(), + mria:transaction(ekka_mnesia:local_content_shard(), fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]), {Received, Sent, Dropped}. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_token.erl b/apps/emqx_dashboard/src/emqx_dashboard_token.erl index c1ca15cb3..7d53ee06b 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_token.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_token.erl @@ -104,7 +104,7 @@ do_sign(Username, Password) -> Signed = jose_jwt:sign(JWK, JWS, JWT), {_, Token} = jose_jws:compact(Signed), JWTRec = format(Token, Username, ExpTime), - ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]), + mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]), {ok, Token}. do_verify(Token)-> @@ -113,7 +113,7 @@ do_verify(Token)-> case ExpTime > erlang:system_time(millisecond) of true -> NewJWT = JWT#mqtt_admin_jwt{exptime = jwt_expiration_time()}, - {atomic, Res} = ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]), + {atomic, Res} = mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]), Res; _ -> {error, token_timeout} @@ -124,7 +124,7 @@ do_verify(Token)-> do_destroy(Token) -> Fun = fun mnesia:delete/1, - {atomic, ok} = ekka_mnesia:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]), + {atomic, ok} = mria:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]), ok. do_destroy_by_username(Username) -> diff --git a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl index 6d245c9bc..cb947b995 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -102,7 +102,7 @@ t_rest_api(_Config) -> ok. t_cli(_Config) -> - [ekka_mnesia:dirty_delete(mqtt_admin, Admin) || Admin <- mnesia:dirty_all_keys(mqtt_admin)], + [mria:dirty_delete(mqtt_admin, Admin) || Admin <- mnesia:dirty_all_keys(mqtt_admin)], emqx_dashboard_cli:admins(["add", "username", "password"]), [{mqtt_admin, <<"username">>, <>, _}] = emqx_dashboard_admin:lookup_user(<<"username">>), diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 1d9daa637..51c4e95c2 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -65,7 +65,7 @@ register_channel(Type, ClientId) when is_binary(ClientId) -> register_channel(Type, {ClientId, self()}); register_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> - ekka_mnesia:dirty_write(tabname(Type), record(ClientId, ChanPid)). + mria:dirty_write(tabname(Type), record(ClientId, ChanPid)). %% @doc Unregister a global channel. -spec unregister_channel(atom(), binary() | {binary(), pid()}) -> ok. @@ -73,7 +73,7 @@ unregister_channel(Type, ClientId) when is_binary(ClientId) -> unregister_channel(Type, {ClientId, self()}); unregister_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> - ekka_mnesia:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)). + mria:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)). %% @doc Lookup the global channels. -spec lookup_channels(atom(), binary()) -> list(pid()). @@ -115,7 +115,7 @@ handle_info({membership, {mnesia, down, Node}}, State = #{type := Type}) -> global:trans({?LOCK, self()}, fun() -> %% FIXME: The shard name should be fixed later - ekka_mnesia:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab]) + mria:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab]) end), {noreply, State}; diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index f1a92299c..e51c34dbf 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -155,14 +155,14 @@ init([InstaId, PredefTopics]) -> MaxPredefId = lists:foldl( fun(#{id := TopicId, topic := TopicName0}, AccId) -> TopicName = iolist_to_binary(TopicName0), - ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ - key = {predef, TopicId}, - value = TopicName} - ), - ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ - key = {predef, TopicName}, - value = TopicId} - ), + mria:dirty_write(Tab, #emqx_sn_registry{ + key = {predef, TopicId}, + value = TopicName} + ), + mria:dirty_write(Tab, #emqx_sn_registry{ + key = {predef, TopicName}, + value = TopicId} + ), if TopicId > AccId -> TopicId; true -> AccId end end, 0, PredefTopics), {ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}. @@ -190,7 +190,7 @@ handle_call({register, ClientId, TopicName}, _From, key = {ClientId, TopicId}, value = TopicName}, write) end, - case ekka_mnesia:transaction(?SN_SHARD, Fun) of + case mria:transaction(?SN_SHARD, Fun) of {atomic, ok} -> {reply, TopicId, State}; {aborted, Error} -> @@ -205,7 +205,7 @@ handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) -> {emqx_sn_registry, {ClientId, '_'}, '_'} ), lists:foreach(fun(R) -> - ekka_mnesia:dirty_delete_object(Tab, R) + mria:dirty_delete_object(Tab, R) end, Registry), {reply, ok, State}; diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index 66616f3ea..0d963a2de 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -280,7 +280,7 @@ do_catch_up_in_one_trans(LatestId, Node) -> end. transaction(Func, Args) -> - ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, Func, Args). + mria:transaction(?EMQX_MACHINE_SHARD, Func, Args). trans_status() -> mnesia:foldl(fun(Rec, Acc) -> diff --git a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl index 6dcbd3d25..2b1242fa7 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl @@ -49,7 +49,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> - case ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of + case mria:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of {atomic, ok} -> ok; Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) end, diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 86ef97dac..f2eef3f39 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -184,7 +184,7 @@ delete_delayed_message(Id0) -> {error, not_found}; Rows -> Timestamp = hd(Rows), - ekka_mnesia:dirty_delete(?TAB, {Timestamp, Id}) + mria:dirty_delete(?TAB, {Timestamp, Id}) end. update_config(Config) -> {ok, _} = emqx:update_config([delayed], Config). @@ -205,7 +205,7 @@ handle_call({set_max_delayed_messages, Max}, _From, State) -> handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State = #{max_delayed_messages := 0}) -> - ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), + ok = mria:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)}; @@ -216,7 +216,7 @@ handle_call({store, DelayedMsg = #delayed_message{key = Key}}, true -> {reply, {error, max_delayed_messages_full}, State}; false -> - ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), + ok = mria:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)} end; @@ -240,7 +240,7 @@ handle_cast(Msg, State) -> %% Do Publish... handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) -> DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), - lists:foreach(fun(Key) -> ekka_mnesia:dirty_delete(?TAB, Key) end, DeletedKeys), + lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys), {noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})}; handle_info(stats, State = #{stats_fun := StatsFun}) -> diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 65852df93..04586e8fc 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -140,8 +140,8 @@ init(_Opts) -> UUID1 = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of [] -> UUID = generate_uuid(), - ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, - uuid = UUID}), + mria:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, + uuid = UUID}), UUID; [#telemetry{uuid = UUID} | _] -> UUID diff --git a/apps/emqx_psk/src/emqx_psk.erl b/apps/emqx_psk/src/emqx_psk.erl index 9ea25cdeb..65ce1025b 100644 --- a/apps/emqx_psk/src/emqx_psk.erl +++ b/apps/emqx_psk/src/emqx_psk.erl @@ -237,7 +237,7 @@ trim_crlf(Bin) -> end. trans(Fun, Args) -> - case ekka_mnesia:transaction(?PSK_SHARD, Fun, Args) of + case mria:transaction(?PSK_SHARD, Fun, Args) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 0fc0a4615..e3d90fe7d 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -73,10 +73,10 @@ store_retained(_, Msg =#message{topic = Topic}) -> case is_table_full() of false -> ok = emqx_metrics:inc('messages.retained'), - ekka_mnesia:dirty_write(?TAB, - #retained{topic = topic2tokens(Topic), - msg = Msg, - expiry_time = ExpiryTime}); + mria:dirty_write(?TAB, + #retained{topic = topic2tokens(Topic), + msg = Msg, + expiry_time = ExpiryTime}); _ -> Tokens = topic2tokens(Topic), Fun = fun() -> @@ -94,7 +94,7 @@ store_retained(_, Msg =#message{topic = Topic}) -> ok end end, - {atomic, ok} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), + {atomic, ok} = mria:transaction(?RETAINER_SHARD, Fun), ok end. @@ -106,7 +106,7 @@ clear_expired(_) -> Keys = mnesia:select(?TAB, Ms, write), lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys) end, - {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), + {atomic, _} = mria:transaction(?RETAINER_SHARD, Fun), ok. delete_message(_, Topic) -> @@ -117,7 +117,7 @@ delete_message(_, Topic) -> Fun = fun() -> mnesia:delete({?TAB, Tokens}) end, - case ekka_mnesia:transaction(?RETAINER_SHARD, Fun) of + case mria:transaction(?RETAINER_SHARD, Fun) of {atomic, Result} -> Result; ok -> @@ -214,7 +214,7 @@ match_delete_messages(Filter) -> MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'}, Ms = [{MsHd, [], ['$_']}], Rs = mnesia:dirty_select(?TAB, Ms), - lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Rs). + lists:foreach(fun(R) -> mria:dirty_delete_object(?TAB, R) end, Rs). %% @private condition(Ws) -> diff --git a/rebar.config b/rebar.config index 89b8e3a9f..91c0196c2 100644 --- a/rebar.config +++ b/rebar.config @@ -50,7 +50,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}