diff --git a/apps/emqx_conf/etc/emqx_conf.conf b/apps/emqx_conf/etc/emqx_conf.conf
index c6bf76901..4869c2297 100644
--- a/apps/emqx_conf/etc/emqx_conf.conf
+++ b/apps/emqx_conf/etc/emqx_conf.conf
@@ -319,8 +319,8 @@ db {
##
## @doc db.backend
## ValueType: mnesia | rlog
- ## Default: mnesia
- backend = mnesia
+ ## Default: rlog
+ backend = rlog
## RLOG role
##
diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl
index fe06a8ce3..0af6bb97c 100644
--- a/apps/emqx_conf/src/emqx_conf_schema.erl
+++ b/apps/emqx_conf/src/emqx_conf_schema.erl
@@ -475,6 +475,28 @@ In sync mode the core node waits for an ack from the replicant nodes before send
transaction log entry.
"""
})}
+ , {"default_shard_transport",
+ sc(hoconsc:enum([gen_rpc, distr]),
+ #{ mapping => "mria.shard_transport"
+ , default => gen_rpc
+ , desc =>
+ "Defines the default transport for pushing transaction logs.
"
+ "This may be overridden on a per-shard basis in db.shard_transports
."
+ "gen_rpc
uses the gen_rpc
library, "
+ "distr
uses the Erlang distribution.
"
+ })}
+ , {"shard_transports",
+ sc(map(shard, hoconsc:enum([gen_rpc, distr])),
+ #{ desc =>
+ "Allows to tune the transport method used for transaction log replication, "
+ "on a per-shard basis.
"
+ "gen_rpc
uses the gen_rpc
library, "
+ "distr
uses the Erlang distribution.
"
+ "If not specified, the default is to use the value "
+ "set in db.default_shard_transport
."
+ , mapping => "emqx_machine.custom_shard_transports"
+ , default => #{}
+ })}
];
fields("cluster_call") ->
diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl
index 27a720a8d..0285e5141 100644
--- a/apps/emqx_machine/src/emqx_machine.erl
+++ b/apps/emqx_machine/src/emqx_machine.erl
@@ -30,12 +30,13 @@
start() ->
case os:type() of
{win32, nt} -> ok;
- _nix ->
+ _Nix ->
os:set_signal(sighup, ignore),
os:set_signal(sigterm, handle) %% default is handle
end,
ok = set_backtrace_depth(),
start_sysmon(),
+ configure_shard_transports(),
ekka:start(),
ok = print_otp_version_warning().
@@ -64,7 +65,7 @@ start_sysmon() ->
application:set_env(system_monitor, node_status_fun, {?MODULE, node_status}),
application:set_env(system_monitor, status_checks, [{?MODULE, update_vips, false, 10}]),
case application:get_env(system_monitor, db_hostname) of
- {ok, [_|_]} ->
+ {ok, [_ | _]} ->
application:set_env(system_monitor, callback_mod, system_monitor_pg),
_ = application:ensure_all_started(system_monitor, temporary),
ok;
@@ -81,3 +82,12 @@ node_status() ->
update_vips() ->
system_monitor:add_vip(mria_status:shards_up()).
+
+configure_shard_transports() ->
+ ShardTransports = application:get_env(emqx_machine, custom_shard_transports, #{}),
+ maps:foreach(
+ fun(ShardBin, Transport) ->
+ ShardName = binary_to_existing_atom(ShardBin),
+ mria_config:set_shard_transport(ShardName, Transport)
+ end,
+ ShardTransports).
diff --git a/apps/emqx_machine/test/emqx_machine_SUITE.erl b/apps/emqx_machine/test/emqx_machine_SUITE.erl
index 800aa221b..01b2d59fc 100644
--- a/apps/emqx_machine/test/emqx_machine_SUITE.erl
+++ b/apps/emqx_machine/test/emqx_machine_SUITE.erl
@@ -34,7 +34,7 @@ init_per_suite(Config) ->
%% emqx_machine_SUITE.erl
%%
%% Reason:
- %% the `emqx_machine_boot:ensure_apps_started()` will crashed
+ %% the `emqx_machine_boot:ensure_apps_started()` will crash
%% on starting `emqx_authz` with dirty confs, which caused the file
%% `.._build/test/lib/emqx_conf/etc/acl.conf` could not be found
%%
@@ -65,6 +65,25 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
+init_per_testcase(t_custom_shard_transports, Config) ->
+ OldConfig = application:get_env(emqx_machine, custom_shard_transports),
+ [{old_config, OldConfig} | Config];
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(t_custom_shard_transports, Config) ->
+ OldConfig0 = ?config(old_config, Config),
+ application:stop(ekka),
+ case OldConfig0 of
+ {ok, OldConfig} ->
+ application:set_env(emqx_machine, custom_shard_transports, OldConfig);
+ undefined ->
+ application:unset_env(emqx_machine, custom_shard_transports)
+ end,
+ ok;
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
t_shutdown_reboot(_Config) ->
emqx_machine_boot:stop_apps(),
false = emqx:is_running(node()),
@@ -72,3 +91,15 @@ t_shutdown_reboot(_Config) ->
true = emqx:is_running(node()),
ok = emqx_machine_boot:stop_apps(),
false = emqx:is_running(node()).
+
+t_custom_shard_transports(_Config) ->
+ %% used to ensure the atom exists
+ Shard = test_shard,
+ %% the config keys are binaries
+ ShardBin = atom_to_binary(Shard),
+ DefaultTransport = gen_rpc,
+ ?assertEqual(DefaultTransport, mria_config:shard_transport(Shard)),
+ application:set_env(emqx_machine, custom_shard_transports, #{ShardBin => distr}),
+ emqx_machine:start(),
+ ?assertEqual(distr, mria_config:shard_transport(Shard)),
+ ok.