From 6505340cb81011fd1d76c3266a1887d12f474ce4 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Tue, 15 Jun 2021 11:42:57 +0200 Subject: [PATCH] feat(rlog): Introduce routing RLOG shard --- apps/emqx/include/emqx.hrl | 2 ++ apps/emqx/rebar.config | 2 +- apps/emqx/src/emqx_app.erl | 2 +- apps/emqx/src/emqx_cm_registry.erl | 9 +++++---- apps/emqx/src/emqx_router.erl | 9 ++++++--- apps/emqx/src/emqx_router_helper.erl | 9 +++++---- apps/emqx/src/emqx_schema.erl | 17 ++++++++++++----- apps/emqx/src/emqx_trie.erl | 4 +++- rebar.config | 2 +- 9 files changed, 36 insertions(+), 20 deletions(-) diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 8d0b61c21..ba72a47b5 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -84,6 +84,8 @@ %% Route %%-------------------------------------------------------------------- +-define(ROUTE_SHARD, route_shard). + -record(route, { topic :: binary(), dest :: node() | {binary(), node()} diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 7e649ae80..a33aa2e64 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -13,7 +13,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.0"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.5.1"}}} diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 26b81c8e7..fe21edff5 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -28,7 +28,7 @@ -define(APP, emqx). --define(EMQX_SHARDS, []). +-define(EMQX_SHARDS, [route_shard]). -include("emqx_release.hrl"). diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 50a55c1bf..d8095b445 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -48,6 +48,8 @@ -define(TAB, emqx_channel_registry). -define(LOCK, {?MODULE, cleanup_down}). +-rlog_shard({?ROUTE_SHARD, ?TAB}). + -record(channel, {chid, pid}). %% @doc Start the global channel registry. @@ -72,7 +74,7 @@ register_channel(ClientId) when is_binary(ClientId) -> register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mnesia:dirty_write(?TAB, record(ClientId, ChanPid)); + true -> ekka_mnesia:dirty_write(?TAB, record(ClientId, ChanPid)); false -> ok end. @@ -84,7 +86,7 @@ unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid)); + true -> ekka_mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid)); false -> ok end. @@ -123,7 +125,7 @@ handle_cast(Msg, State) -> handle_info({membership, {mnesia, down, Node}}, State) -> global:trans({?LOCK, self()}, fun() -> - mnesia:transaction(fun cleanup_channels/1, [Node]) + ekka_mnesia:transaction(?ROUTE_SHARD, fun cleanup_channels/1, [Node]) end), {noreply, State}; @@ -150,4 +152,3 @@ cleanup_channels(Node) -> delete_channel(Chan) -> mnesia:delete_object(?TAB, Chan, write). - diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 75b567666..3641c49ff 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -69,6 +69,7 @@ -type(dest() :: node() | {group(), node()}). -define(ROUTE_TAB, emqx_route). +-rlog_shard({?ROUTE_SHARD, ?ROUTE_TAB}). %%-------------------------------------------------------------------- %% Mnesia bootstrap @@ -225,7 +226,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- insert_direct_route(Route) -> - mnesia:async_dirty(fun mnesia:write/3, [?ROUTE_TAB, Route, sticky_write]). + ekka_mnesia:dirty_write(?ROUTE_TAB, Route). insert_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE_TAB, Topic}) of @@ -235,7 +236,7 @@ insert_trie_route(Route = #route{topic = Topic}) -> mnesia:write(?ROUTE_TAB, Route, sticky_write). delete_direct_route(Route) -> - mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE_TAB, Route, sticky_write]). + ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route). delete_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE_TAB, Topic}) of @@ -254,6 +255,8 @@ maybe_trans(Fun, Args) -> key -> trans(Fun, Args); global -> + %% Assert: + mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash lock_router(), try mnesia:sync_dirty(Fun, Args) after @@ -278,7 +281,7 @@ trans(Fun, Args) -> %% Future changes should keep in mind that this process %% always exit with database write result. fun() -> - Res = case mnesia:transaction(Fun, Args) of + Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of {atomic, Ok} -> Ok; {aborted, Reason} -> {error, Reason} end, diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index f05220791..8761a250e 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -53,6 +53,8 @@ -define(ROUTING_NODE, emqx_routing_node). -define(LOCK, {?MODULE, cleanup_routes}). +-rlog_shard({?ROUTE_SHARD, ?ROUTING_NODE}). + -dialyzer({nowarn_function, [cleanup_routes/1]}). %%-------------------------------------------------------------------- @@ -87,7 +89,7 @@ monitor(Node) when is_atom(Node) -> case ekka:is_member(Node) orelse ets:member(?ROUTING_NODE, Node) of true -> ok; - false -> mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node}) + false -> ekka_mnesia:dirty_write(?ROUTING_NODE, #routing_node{name = Node}) end. %%-------------------------------------------------------------------- @@ -136,9 +138,9 @@ handle_info({mnesia_table_event, Event}, State) -> handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> global:trans({?LOCK, self()}, fun() -> - mnesia:transaction(fun cleanup_routes/1, [Node]) + ekka_mnesia:transaction(fun cleanup_routes/1, [Node]) end), - ok = mnesia:dirty_delete(?ROUTING_NODE, Node), + ok = ekka_mnesia:dirty_delete(?ROUTING_NODE, Node), {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate}; handle_info({membership, {mnesia, down, Node}}, State) -> @@ -176,4 +178,3 @@ cleanup_routes(Node) -> #route{_ = '_', dest = {'_', Node}}], [mnesia:delete_object(?ROUTE, Route, write) || Pat <- Patterns, Route <- mnesia:match_object(?ROUTE, Pat, write)]. - diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ae6f1d547..bee052926 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -17,6 +17,7 @@ -type percent() :: float(). -type file() :: string(). -type comma_separated_list() :: list(). +-type comma_separated_atoms() :: [atom()]. -type bar_separated_list() :: list(). -type ip_port() :: tuple(). @@ -29,17 +30,20 @@ -typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}). -typerefl_from_string({bar_separated_list/0, emqx_schema, to_bar_separated_list}). -typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}). +-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}). % workaround: prevent being recognized as unused functions -export([to_duration/1, to_duration_s/1, to_duration_ms/1, to_bytesize/1, to_flag/1, to_percent/1, to_comma_separated_list/1, - to_bar_separated_list/1, to_ip_port/1]). + to_bar_separated_list/1, to_ip_port/1, + to_comma_separated_atoms/1]). -behaviour(hocon_schema). -reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, duration_ms/0, bytesize/0, percent/0, file/0, - comma_separated_list/0, bar_separated_list/0, ip_port/0]). + comma_separated_list/0, bar_separated_list/0, ip_port/0, + comma_separated_atoms/0]). -export([structs/0, fields/1, translations/0, translation/1]). -export([t/1, t/3, t/4, ref/1]). @@ -61,6 +65,7 @@ fields("cluster") -> , {"dns", ref("dns")} , {"etcd", ref("etcd")} , {"k8s", ref("k8s")} + , {"db_backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)} , {"rlog", ref("rlog")} ]; @@ -101,9 +106,8 @@ fields("k8s") -> ]; fields("rlog") -> - [ {"backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)} - , {"role", t(union([core, replicant]), "ekka.node_role", core)} - , {"core_nodes", t(comma_separated_list(), "ekka.core_nodes", [])} + [ {"role", t(union([core, replicant]), "ekka.node_role", core)} + , {"core_nodes", t(comma_separated_atoms(), "ekka.core_nodes", [])} ]; fields("node") -> @@ -1228,6 +1232,9 @@ to_percent(Str) -> to_comma_separated_list(Str) -> {ok, string:tokens(Str, ", ")}. +to_comma_separated_atoms(Str) -> + {ok, lists:map(fun list_to_atom/1, string:tokens(Str, ", "))}. + to_bar_separated_list(Str) -> {ok, string:tokens(Str, "| ")}. diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 7146feb74..bb5e171b1 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -52,6 +52,8 @@ -define(IS_COMPACT, true). +-rlog_shard({?ROUTE_SHARD, ?TRIE}). + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -342,6 +344,6 @@ do_compact_test() -> do_compact(words(<<"a/+/+/+/+/b">>))), ok. -clear_tables() -> mnesia:clear_table(?TRIE). +clear_tables() -> ekka_mnesia:clear_table(?TRIE). -endif. % TEST diff --git a/rebar.config b/rebar.config index 9afedcf02..03a36570f 100644 --- a/rebar.config +++ b/rebar.config @@ -39,7 +39,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.0"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}