chore(mria): Rename module: ekka_mnesia -> mria

This commit is contained in:
k32 2021-10-14 18:24:59 +02:00 committed by x1001100011
parent 591b704f65
commit fd482e2ec0
25 changed files with 109 additions and 110 deletions

View File

@ -15,7 +15,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.6"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}

View File

@ -201,7 +201,7 @@ handle_call({activate_alarm, Name, Details}, _From, State) ->
details = Details,
message = normalize_message(Name, Details),
activate_at = erlang:system_time(microsecond)},
ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm),
mria:dirty_write(?ACTIVATED_ALARM, Alarm),
do_actions(activate, Alarm, emqx:get_config([alarm, actions])),
{reply, ok, State}
end;
@ -282,7 +282,7 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name
case mnesia:dirty_first(?DEACTIVATED_ALARM) of
'$end_of_table' -> ok;
ActivateAt2 ->
ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
mria:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
end;
false -> ok
end,
@ -291,8 +291,8 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name
DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details,
normalize_message(Name, Details),
erlang:system_time(microsecond)),
ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
mria:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
mria:dirty_delete(?ACTIVATED_ALARM, Name),
do_actions(deactivate, DeActAlarm, emqx:get_config([alarm, actions])).
make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) ->
@ -309,7 +309,7 @@ deactivate_all_alarms() ->
details = Details,
message = Message,
activate_at = ActivateAt}) ->
ekka_mnesia:dirty_write(?DEACTIVATED_ALARM,
mria:dirty_write(?DEACTIVATED_ALARM,
#deactivated_alarm{
activate_at = ActivateAt,
name = Name,
@ -347,7 +347,7 @@ delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) ->
delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) ->
case ActivatedAt =< Checkpoint of
true ->
ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt),
mria:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt),
NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt),
delete_expired_deactivated_alarms(NActivatedAt, Checkpoint);
false ->

View File

@ -155,13 +155,13 @@ create(#{who := Who,
reason := Reason,
at := At,
until := Until}) ->
ekka_mnesia:dirty_write(?BANNED_TAB, #banned{who = Who,
by = By,
reason = Reason,
at = At,
until = Until});
mria:dirty_write(?BANNED_TAB, #banned{who = Who,
by = By,
reason = Reason,
at = At,
until = Until});
create(Banned) when is_record(Banned, banned) ->
ekka_mnesia:dirty_write(?BANNED_TAB, Banned).
mria:dirty_write(?BANNED_TAB, Banned).
look_up(Who) when is_map(Who) ->
look_up(pares_who(Who));
@ -174,7 +174,7 @@ look_up(Who) ->
delete(Who) when is_map(Who)->
delete(pares_who(Who));
delete(Who) ->
ekka_mnesia:dirty_delete(?BANNED_TAB, Who).
mria:dirty_delete(?BANNED_TAB, Who).
info(InfoKey) ->
mnesia:table_info(?BANNED_TAB, InfoKey).
@ -195,7 +195,7 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
ekka_mnesia:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]),
mria:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]),
{noreply, ensure_expiry_timer(State), hibernate};
handle_info(Info, State) ->

View File

@ -71,7 +71,7 @@ register_channel(ClientId) when is_binary(ClientId) ->
register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of
true -> ekka_mnesia:dirty_write(?TAB, record(ClientId, ChanPid));
true -> mria:dirty_write(?TAB, record(ClientId, ChanPid));
false -> ok
end.
@ -83,7 +83,7 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of
true -> ekka_mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid));
true -> mria:dirty_delete_object(?TAB, record(ClientId, ChanPid));
false -> ok
end.
@ -124,7 +124,7 @@ handle_cast(Msg, State) ->
handle_info({membership, {mnesia, down, Node}}, State) ->
global:trans({?LOCK, self()},
fun() ->
ekka_mnesia:transaction(?CM_SHARD, fun cleanup_channels/1, [Node])
mria:transaction(?CM_SHARD, fun cleanup_channels/1, [Node])
end),
{noreply, State};

View File

@ -225,7 +225,7 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
insert_direct_route(Route) ->
ekka_mnesia:dirty_write(?ROUTE_TAB, Route).
mria:dirty_write(?ROUTE_TAB, Route).
insert_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of
@ -235,7 +235,7 @@ insert_trie_route(Route = #route{topic = Topic}) ->
mnesia:write(?ROUTE_TAB, Route, sticky_write).
delete_direct_route(Route) ->
ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route).
mria:dirty_delete_object(?ROUTE_TAB, Route).
delete_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of
@ -280,7 +280,7 @@ trans(Fun, Args) ->
%% Future changes should keep in mind that this process
%% always exit with database write result.
fun() ->
Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of
Res = case mria:transaction(?ROUTE_SHARD, Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,

View File

@ -87,7 +87,7 @@ monitor(Node) when is_atom(Node) ->
case ekka:is_member(Node)
orelse ets:member(?ROUTING_NODE, Node) of
true -> ok;
false -> ekka_mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
false -> mria:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
end.
%%--------------------------------------------------------------------
@ -136,9 +136,9 @@ handle_info({mnesia_table_event, Event}, State) ->
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
global:trans({?LOCK, self()},
fun() ->
ekka_mnesia:transaction(fun cleanup_routes/1, [Node])
mria:transaction(fun cleanup_routes/1, [Node])
end),
ok = ekka_mnesia:dirty_delete(?ROUTING_NODE, Node),
ok = mria:dirty_delete(?ROUTING_NODE, Node),
{noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate};
handle_info({membership, {mnesia, down, Node}}, State) ->

View File

@ -297,7 +297,7 @@ subscribers(Group, Topic) ->
init([]) ->
{ok, _} = mnesia:subscribe({table, ?TAB, simple}),
{atomic, PMon} = ekka_mnesia:transaction(?SHARED_SUB_SHARD, fun init_monitors/0),
{atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun init_monitors/0),
ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
{ok, update_stats(#state{pmon = PMon})}.
@ -309,7 +309,7 @@ init_monitors() ->
end, emqx_pmon:new(), ?TAB).
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
ekka_mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
mria:dirty_write(?TAB, record(Group, Topic, SubPid)),
case ets:member(?SHARED_SUBS, {Group, Topic}) of
true -> ok;
false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
@ -319,7 +319,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon
{reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
ekka_mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
mria:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
delete_route_if_needed({Group, Topic}),
{reply, ok, State};
@ -373,7 +373,7 @@ cleanup_down(SubPid) ->
?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
lists:foreach(
fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
ok = ekka_mnesia:dirty_delete_object(?TAB, Record),
ok = mria:dirty_delete_object(?TAB, Record),
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
delete_route_if_needed({Group, Topic})
end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).

View File

@ -29,7 +29,7 @@
[ emqx_metrics
, emqx_stats
, emqx_broker
, ekka_mnesia
, mria_mnesia
]).
-define(ALL(Vars, Types, Exprs),

View File

@ -268,7 +268,7 @@ trans(Fun) ->
trans(Fun, []).
trans(Fun, Args) ->
case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of
case mria:transaction(?AUTH_SHARD, Fun, Args) of
{atomic, Res} -> Res;
{aborted, Reason} -> {error, Reason}
end.

View File

@ -390,7 +390,7 @@ trans(Fun) ->
trans(Fun, []).
trans(Fun, Args) ->
case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of
case mria:transaction(?AUTH_SHARD, Fun, Args) of
{atomic, Res} -> Res;
{aborted, Reason} -> {error, Reason}
end.

View File

@ -528,10 +528,10 @@ users(get, #{query_string := Qs}) ->
end;
users(post, #{body := Body}) when is_list(Body) ->
lists:foreach(fun(#{<<"username">> := Username, <<"rules">> := Rules}) ->
ekka_mnesia:dirty_write(#emqx_acl{
who = {?ACL_TABLE_USERNAME, Username},
rules = format_rules(Rules)
})
mria:dirty_write(#emqx_acl{
who = {?ACL_TABLE_USERNAME, Username},
rules = format_rules(Rules)
})
end, Body),
{204}.
@ -561,10 +561,10 @@ clients(get, #{query_string := Qs}) ->
end;
clients(post, #{body := Body}) when is_list(Body) ->
lists:foreach(fun(#{<<"clientid">> := Clientid, <<"rules">> := Rules}) ->
ekka_mnesia:dirty_write(#emqx_acl{
who = {?ACL_TABLE_CLIENTID, Clientid},
rules = format_rules(Rules)
})
mria:dirty_write(#emqx_acl{
who = {?ACL_TABLE_CLIENTID, Clientid},
rules = format_rules(Rules)
})
end, Body),
{204}.
@ -581,13 +581,13 @@ user(get, #{bindings := #{username := Username}}) ->
end;
user(put, #{bindings := #{username := Username},
body := #{<<"username">> := Username, <<"rules">> := Rules}}) ->
ekka_mnesia:dirty_write(#emqx_acl{
who = {?ACL_TABLE_USERNAME, Username},
rules = format_rules(Rules)
}),
mria:dirty_write(#emqx_acl{
who = {?ACL_TABLE_USERNAME, Username},
rules = format_rules(Rules)
}),
{204};
user(delete, #{bindings := #{username := Username}}) ->
ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}}),
mria:dirty_delete({?ACL_TABLE, {?ACL_TABLE_USERNAME, Username}}),
{204}.
client(get, #{bindings := #{clientid := Clientid}}) ->
@ -603,13 +603,13 @@ client(get, #{bindings := #{clientid := Clientid}}) ->
end;
client(put, #{bindings := #{clientid := Clientid},
body := #{<<"clientid">> := Clientid, <<"rules">> := Rules}}) ->
ekka_mnesia:dirty_write(#emqx_acl{
who = {?ACL_TABLE_CLIENTID, Clientid},
rules = format_rules(Rules)
}),
mria:dirty_write(#emqx_acl{
who = {?ACL_TABLE_CLIENTID, Clientid},
rules = format_rules(Rules)
}),
{204};
client(delete, #{bindings := #{clientid := Clientid}}) ->
ekka_mnesia:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}}),
mria:dirty_delete({?ACL_TABLE, {?ACL_TABLE_CLIENTID, Clientid}}),
{204}.
all(get, _) ->
@ -624,17 +624,17 @@ all(get, _) ->
}
end;
all(put, #{body := #{<<"rules">> := Rules}}) ->
ekka_mnesia:dirty_write(#emqx_acl{
who = ?ACL_TABLE_ALL,
rules = format_rules(Rules)
}),
mria:dirty_write(#emqx_acl{
who = ?ACL_TABLE_ALL,
rules = format_rules(Rules)
}),
{204}.
purge(delete, _) ->
case emqx_authz_api_sources:get_raw_source(<<"built-in-database">>) of
[#{<<"enable">> := false}] ->
ok = lists:foreach(fun(Key) ->
ok = ekka_mnesia:dirty_delete(?ACL_TABLE, Key)
ok = mria:dirty_delete(?ACL_TABLE, Key)
end, mnesia:dirty_all_keys(?ACL_TABLE)),
{204};
[#{<<"enable">> := true}] ->

View File

@ -54,24 +54,24 @@ end_per_suite(_Config) ->
ok.
init_per_testcase(t_authz, Config) ->
mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>},
rules = [{allow, publish, <<"test/%u">>},
{allow, subscribe, <<"eq #">>}
]
}]),
mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>},
rules = [{allow, publish, <<"test/%c">>},
{deny, subscribe, <<"eq #">>}
]
}]),
mnesia:transaction(fun ekka_mnesia:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL,
rules = [{deny, all, <<"#">>}]
mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_USERNAME, <<"test_username">>},
rules = [{allow, publish, <<"test/%u">>},
{allow, subscribe, <<"eq #">>}
]
}]),
mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = {?ACL_TABLE_CLIENTID, <<"test_clientid">>},
rules = [{allow, publish, <<"test/%c">>},
{deny, subscribe, <<"eq #">>}
]
}]),
mnesia:transaction(fun mria:dirty_write/1, [#emqx_acl{who = ?ACL_TABLE_ALL,
rules = [{deny, all, <<"#">>}]
}]),
Config;
init_per_testcase(_, Config) -> Config.
end_per_testcase(t_authz, Config) ->
[ ekka_mnesia:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)],
[ mria:dirty_delete(?ACL_TABLE, K) || K <- mnesia:dirty_all_keys(?ACL_TABLE)],
Config;
end_per_testcase(_, Config) -> Config.
@ -96,7 +96,7 @@ t_authz(_) ->
listener => {tcp, default}
},
?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)),
?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)),
?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"#">>)),
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_username">>)),
@ -106,4 +106,3 @@ t_authz(_) ->
?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"#">>)),
ok.

View File

@ -68,7 +68,7 @@ mnesia(copy) ->
-spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}).
add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) ->
Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags},
return(ekka_mnesia:transaction(?DASHBOARD_SHARD, fun add_user_/1, [Admin])).
return(mria:transaction(?DASHBOARD_SHARD, fun add_user_/1, [Admin])).
force_add_user(Username, Password, Tags) ->
AddFun = fun() ->
@ -76,7 +76,7 @@ force_add_user(Username, Password, Tags) ->
password = Password,
tags = Tags})
end,
case ekka_mnesia:transaction(?DASHBOARD_SHARD, AddFun) of
case mria:transaction(?DASHBOARD_SHARD, AddFun) of
{atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason}
end.
@ -98,11 +98,11 @@ remove_user(Username) when is_binary(Username) ->
end,
mnesia:delete({mqtt_admin, Username})
end,
return(ekka_mnesia:transaction(?DASHBOARD_SHARD, Trans)).
return(mria:transaction(?DASHBOARD_SHARD, Trans)).
-spec(update_user(binary(), binary()) -> ok | {error, term()}).
update_user(Username, Tags) when is_binary(Username) ->
return(ekka_mnesia:transaction(?DASHBOARD_SHARD, fun update_user_/2, [Username, Tags])).
return(mria:transaction(?DASHBOARD_SHARD, fun update_user_/2, [Username, Tags])).
%% @private
update_user_(Username, Tags) ->
@ -135,7 +135,7 @@ update_pwd(Username, Fun) ->
end,
mnesia:write(Fun(User))
end,
return(ekka_mnesia:transaction(?DASHBOARD_SHARD, Trans)).
return(mria:transaction(?DASHBOARD_SHARD, Trans)).
-spec(lookup_user(binary()) -> [mqtt_admin()]).

View File

@ -162,7 +162,7 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) ->
diff(Sent, Sent0),
diff(Dropped, Dropped0)},
Ts = get_local_time(),
ekka_mnesia:transaction(ekka_mnesia:local_content_shard(),
mria:transaction(ekka_mnesia:local_content_shard(),
fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]),
{Received, Sent, Dropped}.

View File

@ -104,7 +104,7 @@ do_sign(Username, Password) ->
Signed = jose_jwt:sign(JWK, JWS, JWT),
{_, Token} = jose_jws:compact(Signed),
JWTRec = format(Token, Username, ExpTime),
ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]),
mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]),
{ok, Token}.
do_verify(Token)->
@ -113,7 +113,7 @@ do_verify(Token)->
case ExpTime > erlang:system_time(millisecond) of
true ->
NewJWT = JWT#mqtt_admin_jwt{exptime = jwt_expiration_time()},
{atomic, Res} = ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]),
{atomic, Res} = mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]),
Res;
_ ->
{error, token_timeout}
@ -124,7 +124,7 @@ do_verify(Token)->
do_destroy(Token) ->
Fun = fun mnesia:delete/1,
{atomic, ok} = ekka_mnesia:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]),
{atomic, ok} = mria:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]),
ok.
do_destroy_by_username(Username) ->

View File

@ -102,7 +102,7 @@ t_rest_api(_Config) ->
ok.
t_cli(_Config) ->
[ekka_mnesia:dirty_delete(mqtt_admin, Admin) || Admin <- mnesia:dirty_all_keys(mqtt_admin)],
[mria:dirty_delete(mqtt_admin, Admin) || Admin <- mnesia:dirty_all_keys(mqtt_admin)],
emqx_dashboard_cli:admins(["add", "username", "password"]),
[{mqtt_admin, <<"username">>, <<Salt:4/binary, Hash/binary>>, _}] =
emqx_dashboard_admin:lookup_user(<<"username">>),

View File

@ -65,7 +65,7 @@ register_channel(Type, ClientId) when is_binary(ClientId) ->
register_channel(Type, {ClientId, self()});
register_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
ekka_mnesia:dirty_write(tabname(Type), record(ClientId, ChanPid)).
mria:dirty_write(tabname(Type), record(ClientId, ChanPid)).
%% @doc Unregister a global channel.
-spec unregister_channel(atom(), binary() | {binary(), pid()}) -> ok.
@ -73,7 +73,7 @@ unregister_channel(Type, ClientId) when is_binary(ClientId) ->
unregister_channel(Type, {ClientId, self()});
unregister_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
ekka_mnesia:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)).
mria:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)).
%% @doc Lookup the global channels.
-spec lookup_channels(atom(), binary()) -> list(pid()).
@ -115,7 +115,7 @@ handle_info({membership, {mnesia, down, Node}}, State = #{type := Type}) ->
global:trans({?LOCK, self()},
fun() ->
%% FIXME: The shard name should be fixed later
ekka_mnesia:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab])
mria:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab])
end),
{noreply, State};

View File

@ -155,14 +155,14 @@ init([InstaId, PredefTopics]) ->
MaxPredefId = lists:foldl(
fun(#{id := TopicId, topic := TopicName0}, AccId) ->
TopicName = iolist_to_binary(TopicName0),
ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{
key = {predef, TopicId},
value = TopicName}
),
ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{
key = {predef, TopicName},
value = TopicId}
),
mria:dirty_write(Tab, #emqx_sn_registry{
key = {predef, TopicId},
value = TopicName}
),
mria:dirty_write(Tab, #emqx_sn_registry{
key = {predef, TopicName},
value = TopicId}
),
if TopicId > AccId -> TopicId; true -> AccId end
end, 0, PredefTopics),
{ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}.
@ -190,7 +190,7 @@ handle_call({register, ClientId, TopicName}, _From,
key = {ClientId, TopicId},
value = TopicName}, write)
end,
case ekka_mnesia:transaction(?SN_SHARD, Fun) of
case mria:transaction(?SN_SHARD, Fun) of
{atomic, ok} ->
{reply, TopicId, State};
{aborted, Error} ->
@ -205,7 +205,7 @@ handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) ->
{emqx_sn_registry, {ClientId, '_'}, '_'}
),
lists:foreach(fun(R) ->
ekka_mnesia:dirty_delete_object(Tab, R)
mria:dirty_delete_object(Tab, R)
end, Registry),
{reply, ok, State};

View File

@ -280,7 +280,7 @@ do_catch_up_in_one_trans(LatestId, Node) ->
end.
transaction(Func, Args) ->
ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, Func, Args).
mria:transaction(?EMQX_MACHINE_SHARD, Func, Args).
trans_status() ->
mnesia:foldl(fun(Rec, Acc) ->

View File

@ -49,7 +49,7 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) ->
case ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of
case mria:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of
{atomic, ok} -> ok;
Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error])
end,

View File

@ -184,7 +184,7 @@ delete_delayed_message(Id0) ->
{error, not_found};
Rows ->
Timestamp = hd(Rows),
ekka_mnesia:dirty_delete(?TAB, {Timestamp, Id})
mria:dirty_delete(?TAB, {Timestamp, Id})
end.
update_config(Config) ->
{ok, _} = emqx:update_config([delayed], Config).
@ -205,7 +205,7 @@ handle_call({set_max_delayed_messages, Max}, _From, State) ->
handle_call({store, DelayedMsg = #delayed_message{key = Key}},
_From, State = #{max_delayed_messages := 0}) ->
ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg),
ok = mria:dirty_write(?TAB, DelayedMsg),
emqx_metrics:inc('messages.delayed'),
{reply, ok, ensure_publish_timer(Key, State)};
@ -216,7 +216,7 @@ handle_call({store, DelayedMsg = #delayed_message{key = Key}},
true ->
{reply, {error, max_delayed_messages_full}, State};
false ->
ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg),
ok = mria:dirty_write(?TAB, DelayedMsg),
emqx_metrics:inc('messages.delayed'),
{reply, ok, ensure_publish_timer(Key, State)}
end;
@ -240,7 +240,7 @@ handle_cast(Msg, State) ->
%% Do Publish...
handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) ->
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)),
lists:foreach(fun(Key) -> ekka_mnesia:dirty_delete(?TAB, Key) end, DeletedKeys),
lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys),
{noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})};
handle_info(stats, State = #{stats_fun := StatsFun}) ->

View File

@ -140,8 +140,8 @@ init(_Opts) ->
UUID1 = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of
[] ->
UUID = generate_uuid(),
ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
uuid = UUID}),
mria:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
uuid = UUID}),
UUID;
[#telemetry{uuid = UUID} | _] ->
UUID

View File

@ -237,7 +237,7 @@ trim_crlf(Bin) ->
end.
trans(Fun, Args) ->
case ekka_mnesia:transaction(?PSK_SHARD, Fun, Args) of
case mria:transaction(?PSK_SHARD, Fun, Args) of
{atomic, Res} -> Res;
{aborted, Reason} -> {error, Reason}
end.

View File

@ -73,10 +73,10 @@ store_retained(_, Msg =#message{topic = Topic}) ->
case is_table_full() of
false ->
ok = emqx_metrics:inc('messages.retained'),
ekka_mnesia:dirty_write(?TAB,
#retained{topic = topic2tokens(Topic),
msg = Msg,
expiry_time = ExpiryTime});
mria:dirty_write(?TAB,
#retained{topic = topic2tokens(Topic),
msg = Msg,
expiry_time = ExpiryTime});
_ ->
Tokens = topic2tokens(Topic),
Fun = fun() ->
@ -94,7 +94,7 @@ store_retained(_, Msg =#message{topic = Topic}) ->
ok
end
end,
{atomic, ok} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun),
{atomic, ok} = mria:transaction(?RETAINER_SHARD, Fun),
ok
end.
@ -106,7 +106,7 @@ clear_expired(_) ->
Keys = mnesia:select(?TAB, Ms, write),
lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys)
end,
{atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun),
{atomic, _} = mria:transaction(?RETAINER_SHARD, Fun),
ok.
delete_message(_, Topic) ->
@ -117,7 +117,7 @@ delete_message(_, Topic) ->
Fun = fun() ->
mnesia:delete({?TAB, Tokens})
end,
case ekka_mnesia:transaction(?RETAINER_SHARD, Fun) of
case mria:transaction(?RETAINER_SHARD, Fun) of
{atomic, Result} ->
Result;
ok ->
@ -214,7 +214,7 @@ match_delete_messages(Filter) ->
MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'},
Ms = [{MsHd, [], ['$_']}],
Rs = mnesia:dirty_select(?TAB, Ms),
lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Rs).
lists:foreach(fun(R) -> mria:dirty_delete_object(?TAB, R) end, Rs).
%% @private
condition(Ws) ->

View File

@ -50,7 +50,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}