feat(rlog): Introduce routing RLOG shard

This commit is contained in:
k32 2021-06-15 11:42:57 +02:00
parent 56c6cf560a
commit 6505340cb8
9 changed files with 36 additions and 20 deletions

View File

@ -84,6 +84,8 @@
%% Route
%%--------------------------------------------------------------------
-define(ROUTE_SHARD, route_shard).
-record(route, {
topic :: binary(),
dest :: node() | {binary(), node()}

View File

@ -13,7 +13,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.5.1"}}}

View File

@ -28,7 +28,7 @@
-define(APP, emqx).
-define(EMQX_SHARDS, []).
-define(EMQX_SHARDS, [route_shard]).
-include("emqx_release.hrl").

View File

@ -48,6 +48,8 @@
-define(TAB, emqx_channel_registry).
-define(LOCK, {?MODULE, cleanup_down}).
-rlog_shard({?ROUTE_SHARD, ?TAB}).
-record(channel, {chid, pid}).
%% @doc Start the global channel registry.
@ -72,7 +74,7 @@ register_channel(ClientId) when is_binary(ClientId) ->
register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of
true -> mnesia:dirty_write(?TAB, record(ClientId, ChanPid));
true -> ekka_mnesia:dirty_write(?TAB, record(ClientId, ChanPid));
false -> ok
end.
@ -84,7 +86,7 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of
true -> mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid));
true -> ekka_mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid));
false -> ok
end.
@ -123,7 +125,7 @@ handle_cast(Msg, State) ->
handle_info({membership, {mnesia, down, Node}}, State) ->
global:trans({?LOCK, self()},
fun() ->
mnesia:transaction(fun cleanup_channels/1, [Node])
ekka_mnesia:transaction(?ROUTE_SHARD, fun cleanup_channels/1, [Node])
end),
{noreply, State};
@ -150,4 +152,3 @@ cleanup_channels(Node) ->
delete_channel(Chan) ->
mnesia:delete_object(?TAB, Chan, write).

View File

@ -69,6 +69,7 @@
-type(dest() :: node() | {group(), node()}).
-define(ROUTE_TAB, emqx_route).
-rlog_shard({?ROUTE_SHARD, ?ROUTE_TAB}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
@ -225,7 +226,7 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
insert_direct_route(Route) ->
mnesia:async_dirty(fun mnesia:write/3, [?ROUTE_TAB, Route, sticky_write]).
ekka_mnesia:dirty_write(?ROUTE_TAB, Route).
insert_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of
@ -235,7 +236,7 @@ insert_trie_route(Route = #route{topic = Topic}) ->
mnesia:write(?ROUTE_TAB, Route, sticky_write).
delete_direct_route(Route) ->
mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE_TAB, Route, sticky_write]).
ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route).
delete_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE_TAB, Topic}) of
@ -254,6 +255,8 @@ maybe_trans(Fun, Args) ->
key ->
trans(Fun, Args);
global ->
%% Assert:
mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash
lock_router(),
try mnesia:sync_dirty(Fun, Args)
after
@ -278,7 +281,7 @@ trans(Fun, Args) ->
%% Future changes should keep in mind that this process
%% always exit with database write result.
fun() ->
Res = case mnesia:transaction(Fun, Args) of
Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,

View File

@ -53,6 +53,8 @@
-define(ROUTING_NODE, emqx_routing_node).
-define(LOCK, {?MODULE, cleanup_routes}).
-rlog_shard({?ROUTE_SHARD, ?ROUTING_NODE}).
-dialyzer({nowarn_function, [cleanup_routes/1]}).
%%--------------------------------------------------------------------
@ -87,7 +89,7 @@ monitor(Node) when is_atom(Node) ->
case ekka:is_member(Node)
orelse ets:member(?ROUTING_NODE, Node) of
true -> ok;
false -> mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
false -> ekka_mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
end.
%%--------------------------------------------------------------------
@ -136,9 +138,9 @@ handle_info({mnesia_table_event, Event}, State) ->
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
global:trans({?LOCK, self()},
fun() ->
mnesia:transaction(fun cleanup_routes/1, [Node])
ekka_mnesia:transaction(fun cleanup_routes/1, [Node])
end),
ok = mnesia:dirty_delete(?ROUTING_NODE, Node),
ok = ekka_mnesia:dirty_delete(?ROUTING_NODE, Node),
{noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate};
handle_info({membership, {mnesia, down, Node}}, State) ->
@ -176,4 +178,3 @@ cleanup_routes(Node) ->
#route{_ = '_', dest = {'_', Node}}],
[mnesia:delete_object(?ROUTE, Route, write)
|| Pat <- Patterns, Route <- mnesia:match_object(?ROUTE, Pat, write)].

View File

@ -17,6 +17,7 @@
-type percent() :: float().
-type file() :: string().
-type comma_separated_list() :: list().
-type comma_separated_atoms() :: [atom()].
-type bar_separated_list() :: list().
-type ip_port() :: tuple().
@ -29,17 +30,20 @@
-typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}).
-typerefl_from_string({bar_separated_list/0, emqx_schema, to_bar_separated_list}).
-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
% workaround: prevent being recognized as unused functions
-export([to_duration/1, to_duration_s/1, to_duration_ms/1, to_bytesize/1,
to_flag/1, to_percent/1, to_comma_separated_list/1,
to_bar_separated_list/1, to_ip_port/1]).
to_bar_separated_list/1, to_ip_port/1,
to_comma_separated_atoms/1]).
-behaviour(hocon_schema).
-reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, duration_ms/0,
bytesize/0, percent/0, file/0,
comma_separated_list/0, bar_separated_list/0, ip_port/0]).
comma_separated_list/0, bar_separated_list/0, ip_port/0,
comma_separated_atoms/0]).
-export([structs/0, fields/1, translations/0, translation/1]).
-export([t/1, t/3, t/4, ref/1]).
@ -61,6 +65,7 @@ fields("cluster") ->
, {"dns", ref("dns")}
, {"etcd", ref("etcd")}
, {"k8s", ref("k8s")}
, {"db_backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)}
, {"rlog", ref("rlog")}
];
@ -101,9 +106,8 @@ fields("k8s") ->
];
fields("rlog") ->
[ {"backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)}
, {"role", t(union([core, replicant]), "ekka.node_role", core)}
, {"core_nodes", t(comma_separated_list(), "ekka.core_nodes", [])}
[ {"role", t(union([core, replicant]), "ekka.node_role", core)}
, {"core_nodes", t(comma_separated_atoms(), "ekka.core_nodes", [])}
];
fields("node") ->
@ -1228,6 +1232,9 @@ to_percent(Str) ->
to_comma_separated_list(Str) ->
{ok, string:tokens(Str, ", ")}.
to_comma_separated_atoms(Str) ->
{ok, lists:map(fun list_to_atom/1, string:tokens(Str, ", "))}.
to_bar_separated_list(Str) ->
{ok, string:tokens(Str, "| ")}.

View File

@ -52,6 +52,8 @@
-define(IS_COMPACT, true).
-rlog_shard({?ROUTE_SHARD, ?TRIE}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
@ -342,6 +344,6 @@ do_compact_test() ->
do_compact(words(<<"a/+/+/+/+/b">>))),
ok.
clear_tables() -> mnesia:clear_table(?TRIE).
clear_tables() -> ekka_mnesia:clear_table(?TRIE).
-endif. % TEST

View File

@ -39,7 +39,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}