Merge remote-tracking branch 'origin/release-57' into 0619-sync-release-57-to-master

This commit is contained in:
zmstone 2024-06-19 14:54:48 +02:00
commit 213e4785e7
27 changed files with 445 additions and 100 deletions

View File

@ -0,0 +1,41 @@
version: '3.9'
services:
mqnamesrvssl:
image: apache/rocketmq:4.9.4
container_name: rocketmq_namesrv_ssl
# ports:
# - 9876:9876
volumes:
- ./rocketmq/logs_ssl:/opt/logs
- ./rocketmq/store_ssl:/opt/store
environment:
JAVA_OPT: "-Dtls.server.mode=enforcing"
command: ./mqnamesrv
networks:
- emqx_bridge
mqbrokerssl:
image: apache/rocketmq:4.9.4
container_name: rocketmq_broker_ssl
# ports:
# - 10909:10909
# - 10911:10911
volumes:
- ./rocketmq/logs_ssl:/opt/logs
- ./rocketmq/store_ssl:/opt/store
- ./rocketmq/conf_ssl/broker.conf:/etc/rocketmq/broker.conf
- ./rocketmq/conf_ssl/plain_acl.yml:/home/rocketmq/rocketmq-4.9.4/conf/plain_acl.yml
environment:
NAMESRV_ADDR: "rocketmq_namesrv_ssl:9876"
JAVA_OPTS: " -Duser.home=/opt -Drocketmq.broker.diskSpaceWarningLevelRatio=0.99"
JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn512m -Dtls.server.mode=enforcing"
command: ./mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- mqnamesrvssl
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,24 @@
brokerClusterName=DefaultClusterSSL
brokerName=broker-a
brokerId=0
brokerIP1=rocketmq_broker_ssl
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=100
maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
aclEnable=true

View File

@ -0,0 +1,12 @@
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: PUB|SUB
topicPerms:
- TopicTest=PUB|SUB
- Topic2=PUB|SUB

View File

@ -18,6 +18,8 @@
-type traverse_break_reason() :: over | migrate. -type traverse_break_reason() :: over | migrate.
-type opts() :: #{print_fun => fun((io:format(), [term()]) -> ok)}.
-callback backup_tables() -> [mria:table()]. -callback backup_tables() -> [mria:table()].
%% validate the backup %% validate the backup
@ -31,6 +33,9 @@
-callback migrate_mnesia_backup(tuple()) -> {ok, tuple()} | {error, term()}. -callback migrate_mnesia_backup(tuple()) -> {ok, tuple()} | {error, term()}.
-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1]). %% NOTE: currently, this is called only when the table has been restored successfully.
-callback on_backup_table_imported(mria:table(), opts()) -> ok | {error, term()}.
-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1, on_backup_table_imported/2]).
-export_type([traverse_break_reason/0]). -export_type([traverse_break_reason/0]).

View File

@ -24,11 +24,18 @@
version/0, version/0,
version_with_prefix/0, version_with_prefix/0,
vsn_compare/1, vsn_compare/1,
vsn_compare/2 vsn_compare/2,
on_load/0
]). ]).
-on_load(on_load/0).
-include("emqx_release.hrl"). -include("emqx_release.hrl").
-ifndef(EMQX_RELEASE_EDITION).
-define(EMQX_RELEASE_EDITION, ce).
-endif.
-define(EMQX_DESCS, #{ -define(EMQX_DESCS, #{
ee => "EMQX Enterprise", ee => "EMQX Enterprise",
ce => "EMQX" ce => "EMQX"
@ -49,6 +56,11 @@
ce => "v" ce => "v"
}). }).
%% @hidden Initialize edition. Almost static. use persistent_term to trick compiler.
-spec on_load() -> ok.
on_load() ->
persistent_term:put('EMQX_RELEASE_EDITION', ?EMQX_RELEASE_EDITION).
%% @doc Return EMQX description. %% @doc Return EMQX description.
description() -> description() ->
maps:get(edition(), ?EMQX_DESCS). maps:get(edition(), ?EMQX_DESCS).
@ -57,11 +69,8 @@ description() ->
%% Read info from persistent_term at runtime. %% Read info from persistent_term at runtime.
%% Or meck this function to run tests for another edition. %% Or meck this function to run tests for another edition.
-spec edition() -> ce | ee. -spec edition() -> ce | ee.
-ifdef(EMQX_RELEASE_EDITION). edition() ->
edition() -> ?EMQX_RELEASE_EDITION. persistent_term:get('EMQX_RELEASE_EDITION').
-else.
edition() -> ce.
-endif.
%% @doc Return EMQX version prefix string. %% @doc Return EMQX version prefix string.
edition_vsn_prefix() -> edition_vsn_prefix() ->

View File

@ -191,8 +191,6 @@
-define(DEFAULT_MULTIPLIER, 1.5). -define(DEFAULT_MULTIPLIER, 1.5).
-define(DEFAULT_BACKOFF, 0.75). -define(DEFAULT_BACKOFF, 0.75).
-define(INJECTING_CONFIGS, [?AUTH_EXT_SCHEMA_MODS]).
namespace() -> emqx. namespace() -> emqx.
tags() -> tags() ->

View File

@ -3,7 +3,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.5.3"}}}, {rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.6.1"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}} {emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rocketmq, [ {application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"}, {description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.2.0"}, {vsn, "0.2.1"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]}, {applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, [ {env, [

View File

@ -178,6 +178,7 @@ fields(action_parameters) ->
Parameters, Parameters,
[ [
servers, servers,
ssl,
namespace, namespace,
pool_size, pool_size,
auto_reconnect, auto_reconnect,

View File

@ -80,7 +80,7 @@ fields(config) ->
{pool_size, fun emqx_connector_schema_lib:pool_size/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
]. ] ++ emqx_connector_schema_lib:ssl_fields().
servers() -> servers() ->
Meta = #{desc => ?DESC("servers")}, Meta = #{desc => ?DESC("servers")},
@ -98,7 +98,8 @@ on_start(
servers := BinServers, servers := BinServers,
access_key := AccessKey, access_key := AccessKey,
secret_key := SecretKey, secret_key := SecretKey,
security_token := SecurityToken security_token := SecurityToken,
ssl := SSLOptsMap
} = Config } = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
@ -113,13 +114,15 @@ on_start(
ClientId = client_id(InstanceId), ClientId = client_id(InstanceId),
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
Namespace = maps:get(namespace, Config, <<>>), Namespace = maps:get(namespace, Config, <<>>),
ClientCfg = #{acl_info => ACLInfo, namespace => Namespace}, ClientCfg0 = #{acl_info => ACLInfo, namespace => Namespace},
SSLOpts = emqx_tls_lib:to_client_opts(SSLOptsMap),
ClientCfg = emqx_utils_maps:put_if(ClientCfg0, ssl_opts, SSLOpts, SSLOpts =/= []),
State = #{ State = #{
client_id => ClientId, client_id => ClientId,
acl_info => ACLInfo, acl_info => ACLInfo,
namespace => Namespace, namespace => Namespace,
installed_channels => #{} installed_channels => #{},
ssl_opts => SSLOpts
}, },
ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId), ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId),
@ -142,12 +145,13 @@ on_add_channel(
#{ #{
installed_channels := InstalledChannels, installed_channels := InstalledChannels,
namespace := Namespace, namespace := Namespace,
acl_info := ACLInfo acl_info := ACLInfo,
ssl_opts := SSLOpts
} = OldState, } = OldState,
ChannelId, ChannelId,
ChannelConfig ChannelConfig
) -> ) ->
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace), {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace, SSLOpts),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state %% Update state
NewState = OldState#{installed_channels => NewInstalledChannels}, NewState = OldState#{installed_channels => NewInstalledChannels},
@ -156,7 +160,8 @@ on_add_channel(
create_channel_state( create_channel_state(
#{parameters := Conf} = _ChannelConfig, #{parameters := Conf} = _ChannelConfig,
ACLInfo, ACLInfo,
Namespace Namespace,
SSLOpts
) -> ) ->
#{ #{
topic := Topic, topic := Topic,
@ -164,7 +169,7 @@ create_channel_state(
strategy := Strategy strategy := Strategy
} = Conf, } = Conf,
TopicTks = emqx_placeholder:preproc_tmpl(Topic), TopicTks = emqx_placeholder:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy), ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy, SSLOpts),
Templates = parse_template(Conf), Templates = parse_template(Conf),
DispatchStrategy = parse_dispatch_strategy(Strategy), DispatchStrategy = parse_dispatch_strategy(Strategy),
State = #{ State = #{
@ -407,9 +412,10 @@ make_producer_opts(
}, },
ACLInfo, ACLInfo,
Namespace, Namespace,
Strategy Strategy,
SSLOpts
) -> ) ->
#{ ProducerOpts = #{
tcp_opts => [{sndbuf, SendBuff}], tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval, ref_topic_route_interval => RefreshInterval,
acl_info => emqx_secret:wrap(ACLInfo), acl_info => emqx_secret:wrap(ACLInfo),
@ -419,7 +425,8 @@ make_producer_opts(
roundrobin -> roundrobin; roundrobin -> roundrobin;
_ -> key_dispatch _ -> key_dispatch
end end
}. },
emqx_utils_maps:put_if(ProducerOpts, ssl_opts, SSLOpts, SSLOpts =/= []).
acl_info(<<>>, _, _) -> acl_info(<<>>, _, _) ->
#{}; #{};

View File

@ -116,9 +116,17 @@ common_init(ConfigT) ->
_ = emqx_bridge_enterprise:module_info(), _ = emqx_bridge_enterprise:module_info(),
emqx_mgmt_api_test_util:init_suite(), emqx_mgmt_api_test_util:init_suite(),
{Name, RocketMQConf} = rocketmq_config(BridgeType, Config0), {Name, RocketMQConf} = rocketmq_config(BridgeType, Config0),
RocketMQSSLConf = RocketMQConf#{
<<"servers">> => <<"rocketmq_namesrv_ssl:9876">>,
<<"ssl">> => #{
<<"enable">> => true,
<<"verify">> => verify_none
}
},
Config = Config =
[ [
{rocketmq_config, RocketMQConf}, {rocketmq_config, RocketMQConf},
{rocketmq_config_ssl, RocketMQSSLConf},
{rocketmq_bridge_type, BridgeType}, {rocketmq_bridge_type, BridgeType},
{rocketmq_name, Name}, {rocketmq_name, Name},
{proxy_host, ProxyHost}, {proxy_host, ProxyHost},
@ -180,6 +188,28 @@ create_bridge(Config) ->
RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), RocketMQConf = ?GET_CONFIG(rocketmq_config, Config),
emqx_bridge:create(BridgeType, Name, RocketMQConf). emqx_bridge:create(BridgeType, Name, RocketMQConf).
create_bridge_ssl(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
RocketMQConf = ?GET_CONFIG(rocketmq_config_ssl, Config),
emqx_bridge:create(BridgeType, Name, RocketMQConf).
create_bridge_ssl_bad_ssl_opts(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
RocketMQConf0 = ?GET_CONFIG(rocketmq_config_ssl, Config),
%% This config is wrong because we use verify_peer without
%% a cert that can be used in the verification.
RocketMQConf1 = maps:put(
<<"ssl">>,
#{
<<"enable">> => true,
<<"verify">> => verify_peer
},
RocketMQConf0
),
emqx_bridge:create(BridgeType, Name, RocketMQConf1).
delete_bridge(Config) -> delete_bridge(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config), Name = ?GET_CONFIG(rocketmq_name, Config),
@ -233,6 +263,44 @@ t_setup_via_config_and_publish(Config) ->
), ),
ok. ok.
t_setup_via_config_and_publish_ssl(Config) ->
?assertMatch(
{ok, _},
create_bridge_ssl(Config)
),
SentData = #{payload => ?PAYLOAD},
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := rocketmq_connector_query_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace),
ok
end
),
ok.
%% Check that we can not connect to the SSL only RocketMQ instance
%% with incorrect SSL options
t_setup_via_config_ssl_host_bad_ssl_opts(Config) ->
?assertMatch(
{ok, _},
create_bridge_ssl_bad_ssl_opts(Config)
),
Name = ?GET_CONFIG(rocketmq_name, Config),
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)),
?assertMatch(#{status := disconnected}, emqx_bridge_v2:health_check(BridgeType, Name)),
ok.
t_setup_via_http_api_and_publish(Config) -> t_setup_via_http_api_and_publish(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config), Name = ?GET_CONFIG(rocketmq_name, Config),

View File

@ -37,60 +37,4 @@
-define(READONLY_KEYS, [cluster, rpc, node]). -define(READONLY_KEYS, [cluster, rpc, node]).
-define(CE_AUTHZ_SOURCE_SCHEMA_MODS, [
emqx_authz_file_schema,
emqx_authz_mnesia_schema,
emqx_authz_http_schema,
emqx_authz_redis_schema,
emqx_authz_mysql_schema,
emqx_authz_postgresql_schema,
emqx_authz_mongodb_schema,
emqx_authz_ldap_schema
]).
-define(EE_AUTHZ_SOURCE_SCHEMA_MODS, []).
-define(CE_AUTHN_PROVIDER_SCHEMA_MODS, [
emqx_authn_mnesia_schema,
emqx_authn_mysql_schema,
emqx_authn_postgresql_schema,
emqx_authn_mongodb_schema,
emqx_authn_redis_schema,
emqx_authn_http_schema,
emqx_authn_jwt_schema,
emqx_authn_scram_mnesia_schema,
emqx_authn_ldap_schema
]).
-define(EE_AUTHN_PROVIDER_SCHEMA_MODS, [
emqx_gcp_device_authn_schema
]).
-if(?EMQX_RELEASE_EDITION == ee).
-define(AUTH_EXT_SCHEMA_MODS, [emqx_auth_ext_schema]).
-define(AUTHZ_SOURCE_SCHEMA_MODS, ?CE_AUTHZ_SOURCE_SCHEMA_MODS ++ ?EE_AUTHZ_SOURCE_SCHEMA_MODS).
-define(AUTHN_PROVIDER_SCHEMA_MODS,
(?CE_AUTHN_PROVIDER_SCHEMA_MODS ++ ?EE_AUTHN_PROVIDER_SCHEMA_MODS)
).
-define(CLUSTER_LINKING_SCHEMA_MODS, [emqx_cluster_link_schema]).
-define(OTHER_INJECTING_CONFIGS, ?AUTH_EXT_SCHEMA_MODS ++ ?CLUSTER_LINKING_SCHEMA_MODS).
-else.
-define(AUTHZ_SOURCE_SCHEMA_MODS, ?CE_AUTHZ_SOURCE_SCHEMA_MODS).
-define(AUTHN_PROVIDER_SCHEMA_MODS, ?CE_AUTHN_PROVIDER_SCHEMA_MODS).
-define(OTHER_INJECTING_CONFIGS, []).
-endif.
-define(INJECTING_CONFIGS, [
{emqx_authn_schema, ?AUTHN_PROVIDER_SCHEMA_MODS},
{emqx_authz_schema, ?AUTHZ_SOURCE_SCHEMA_MODS}
| ?OTHER_INJECTING_CONFIGS
]).
-endif. -endif.

View File

@ -97,7 +97,8 @@ tags() ->
[<<"EMQX">>]. [<<"EMQX">>].
roots() -> roots() ->
ok = emqx_schema_hooks:inject_from_modules(?INJECTING_CONFIGS), Injections = emqx_conf_schema_inject:schemas(),
ok = emqx_schema_hooks:inject_from_modules(Injections),
emqx_schema_high_prio_roots() ++ emqx_schema_high_prio_roots() ++
[ [
{node, {node,

View File

@ -0,0 +1,77 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf_schema_inject).
-export([schemas/0]).
schemas() ->
schemas(emqx_release:edition()).
schemas(Edition) ->
auth_ext(Edition) ++
cluster_linking(Edition) ++
authn(Edition) ++
authz() ++
customized().
auth_ext(ce) ->
[];
auth_ext(ee) ->
[emqx_auth_ext_schema].
cluster_linking(ce) ->
[];
cluster_linking(ee) ->
[emqx_cluster_link_schema].
authn(Edition) ->
[{emqx_authn_schema, authn_mods(Edition)}].
authn_mods(ce) ->
[
emqx_authn_mnesia_schema,
emqx_authn_mysql_schema,
emqx_authn_postgresql_schema,
emqx_authn_mongodb_schema,
emqx_authn_redis_schema,
emqx_authn_http_schema,
emqx_authn_jwt_schema,
emqx_authn_scram_mnesia_schema,
emqx_authn_ldap_schema
];
authn_mods(ee) ->
authn_mods(ce) ++
[emqx_gcp_device_authn_schema].
authz() ->
[{emqx_authz_schema, authz_mods()}].
authz_mods() ->
[
emqx_authz_file_schema,
emqx_authz_mnesia_schema,
emqx_authz_http_schema,
emqx_authz_redis_schema,
emqx_authz_mysql_schema,
emqx_authz_postgresql_schema,
emqx_authz_mongodb_schema,
emqx_authz_ldap_schema
].
%% Add more schemas here.
customized() ->
[].

View File

@ -273,7 +273,7 @@ shards(DB) ->
[Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs]. [Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs].
-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> -spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
#{replica_set := #{site() => #{status => up | joining}}} #{replica_set := #{site() => #{status => up | down}}}
| undefined. | undefined.
shard_info(DB, Shard) -> shard_info(DB, Shard) ->
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
@ -282,8 +282,13 @@ shard_info(DB, Shard) ->
[#?SHARD_TAB{replica_set = Replicas}] -> [#?SHARD_TAB{replica_set = Replicas}] ->
ReplicaSet = maps:from_list([ ReplicaSet = maps:from_list([
begin begin
%% TODO: Status =
ReplInfo = #{status => up}, case mria:cluster_status(?MODULE:node(I)) of
running -> up;
stopped -> down;
false -> down
end,
ReplInfo = #{status => Status},
{I, ReplInfo} {I, ReplInfo}
end end
|| I <- Replicas || I <- Replicas

View File

@ -408,9 +408,13 @@ export_mnesia_tab(TarDescriptor, TabName, BackupName, BackupBaseName, Opts) ->
do_export_mnesia_tab(TabName, BackupName) -> do_export_mnesia_tab(TabName, BackupName) ->
Node = node(), Node = node(),
try try
{ok, TabName, [Node]} = mnesia:activate_checkpoint( Opts0 = [{name, TabName}, {min, [TabName]}, {allow_remote, false}],
[{name, TabName}, {min, [TabName]}, {allow_remote, false}] Opts =
), case mnesia:table_info(TabName, storage_type) of
ram_copies -> [{ram_overrides_dump, true} | Opts0];
_ -> Opts0
end,
{ok, TabName, [Node]} = mnesia:activate_checkpoint(Opts),
MnesiaBackupName = mnesia_backup_name(BackupName, TabName), MnesiaBackupName = mnesia_backup_name(BackupName, TabName),
ok = filelib:ensure_dir(MnesiaBackupName), ok = filelib:ensure_dir(MnesiaBackupName),
ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName), ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName),
@ -549,6 +553,8 @@ import_mnesia_tabs(BackupDir, Opts) ->
) )
). ).
-spec import_mnesia_tab(file:filename_all(), module(), mria:table(), map()) ->
ok | {ok, no_backup_file} | {error, term()} | no_return().
import_mnesia_tab(BackupDir, Mod, TabName, Opts) -> import_mnesia_tab(BackupDir, Mod, TabName, Opts) ->
MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName), MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName),
case filelib:is_regular(MnesiaBackupFileName) of case filelib:is_regular(MnesiaBackupFileName) of
@ -572,7 +578,7 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]), Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]),
case Restored of case Restored of
{atomic, [TabName]} -> {atomic, [TabName]} ->
ok; on_table_imported(Mod, TabName, Opts);
RestoreErr -> RestoreErr ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_restore_mnesia_backup", msg => "failed_to_restore_mnesia_backup",
@ -598,6 +604,27 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
_ = file:delete(MnesiaBackupFileName) _ = file:delete(MnesiaBackupFileName)
end. end.
on_table_imported(Mod, Tab, Opts) ->
case erlang:function_exported(Mod, on_backup_table_imported, 2) of
true ->
try
Mod:on_backup_table_imported(Tab, Opts)
catch
Class:Reason:Stack ->
?SLOG(error, #{
msg => "post_database_import_callback_failed",
table => Tab,
module => Mod,
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
end;
false ->
ok
end.
%% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema. %% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema.
%% Looks like there is no clean way to abort traversal without triggering any error reporting, %% Looks like there is no clean way to abort traversal without triggering any error reporting,
%% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided... %% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided...

View File

@ -19,6 +19,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -114,6 +115,64 @@ t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) ->
end, end,
ok. ok.
t_import_retained_messages(Config) ->
FName = "emqx-export-ce-retained-msgs-test.tar.gz",
BackupFile = filename:join(?config(data_dir, Config), FName),
Exp = {ok, #{db_errors => #{}, config_errors => #{}}},
?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)),
%% verify that retainer messages are imported
?assertMatch(
{ok, [#message{payload = <<"Hi 1!!!">>}]},
emqx_retainer:read_message(<<"t/backup-retainer/test1">>)
),
?assertMatch(
{ok, [#message{payload = <<"Hi 5!!!">>}]},
emqx_retainer:read_message(<<"t/backup-retainer/test5">>)
),
%% verify that messages are re-indexed
?assertMatch(
{ok, _, [
#message{payload = <<"Hi 5!!!">>},
#message{payload = <<"Hi 4!!!">>},
#message{payload = <<"Hi 3!!!">>},
#message{payload = <<"Hi 2!!!">>},
#message{payload = <<"Hi 1!!!">>}
]},
emqx_retainer:page_read(<<"t/backup-retainer/#">>, 1, 5)
),
%% Export and import again
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)).
t_export_ram_retained_messages(_Config) ->
{ok, _} = emqx_retainer:update_config(
#{
<<"enable">> => true,
<<"backend">> => #{<<"storage_type">> => <<"ram">>}
}
),
?assertEqual(ram_copies, mnesia:table_info(emqx_retainer_message, storage_type)),
Topic = <<"t/backup_test_export_retained_ram/1">>,
Payload = <<"backup_test_retained_ram">>,
Msg = emqx_message:make(
<<"backup_test">>,
?QOS_0,
Topic,
Payload,
#{retain => true},
#{}
),
_ = emqx_broker:publish(Msg),
{ok, #{filename := BackupFileName}} = emqx_mgmt_data_backup:export(),
ok = emqx_retainer:delete(Topic),
?assertEqual({ok, []}, emqx_retainer:read_message(Topic)),
?assertEqual(
{ok, #{db_errors => #{}, config_errors => #{}}},
emqx_mgmt_data_backup:import(BackupFileName)
),
?assertMatch({ok, [#message{payload = Payload}]}, emqx_retainer:read_message(Topic)).
t_cluster_hocon_export_import(Config) -> t_cluster_hocon_export_import(Config) ->
RawConfBeforeImport = emqx:get_raw_config([]), RawConfBeforeImport = emqx:get_raw_config([]),
BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP), BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP),

View File

@ -986,14 +986,7 @@ start_app(App) ->
%% but not the ones shared with others. %% but not the ones shared with others.
ensure_apps_stopped(#{<<"rel_apps">> := Apps}) -> ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
%% load plugin apps and beam code %% load plugin apps and beam code
AppsToStop = AppsToStop = lists:filtermap(fun parse_name_vsn_for_stopping/1, Apps),
lists:map(
fun(NameVsn) ->
{ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
AppName
end,
Apps
),
case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
{ok, []} -> {ok, []} ->
%% all apps stopped %% all apps stopped
@ -1009,6 +1002,30 @@ ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
{error, Reason} {error, Reason}
end. end.
%% On one hand, Elixir plugins might include Elixir itself, when targetting a non-Elixir
%% EMQX release. If, on the other hand, the EMQX release already includes Elixir, we
%% shouldn't stop Elixir nor IEx.
-ifdef(EMQX_ELIXIR).
is_protected_app(elixir) -> true;
is_protected_app(iex) -> true;
is_protected_app(_) -> false.
parse_name_vsn_for_stopping(NameVsn) ->
{ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
case is_protected_app(AppName) of
true ->
false;
false ->
{true, AppName}
end.
%% ELSE ifdef(EMQX_ELIXIR)
-else.
parse_name_vsn_for_stopping(NameVsn) ->
{ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
{true, AppName}.
%% END ifdef(EMQX_ELIXIR)
-endif.
stop_apps(Apps) -> stop_apps(Apps) ->
RunningApps = running_apps(), RunningApps = running_apps(),
case do_stop_apps(Apps, [], RunningApps) of case do_stop_apps(Apps, [], RunningApps) of
@ -1045,8 +1062,10 @@ stop_app(App) ->
unload_moudle_and_app(App) -> unload_moudle_and_app(App) ->
case application:get_key(App, modules) of case application:get_key(App, modules) of
{ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules); {ok, Modules} ->
_ -> ok lists:foreach(fun code:soft_purge/1, Modules);
_ ->
ok
end, end,
_ = application:unload(App), _ = application:unload(App),
ok. ok.

View File

@ -55,7 +55,10 @@
-export([populate_index_meta/0]). -export([populate_index_meta/0]).
-export([reindex/3]). -export([reindex/3]).
-export([backup_tables/0]). -export([
backup_tables/0,
on_backup_table_imported/2
]).
-record(retained_message, {topic, msg, expiry_time}). -record(retained_message, {topic, msg, expiry_time}).
-record(retained_index, {key, expiry_time}). -record(retained_index, {key, expiry_time}).
@ -80,8 +83,45 @@ topics() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Data backup %% Data backup
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
backup_tables() -> backup_tables() ->
[?TAB_MESSAGE]. [?TAB_MESSAGE || is_enabled()].
on_backup_table_imported(?TAB_MESSAGE, Opts) ->
case is_enabled() of
true ->
maybe_print("Starting reindexing retained messages ~n", [], Opts),
Res = reindex(false, mk_status_fun(Opts)),
maybe_print("Reindexing retained messages finished~n", [], Opts),
Res;
false ->
ok
end;
on_backup_table_imported(_Tab, _Opts) ->
ok.
mk_status_fun(Opts) ->
fun(Done) ->
log_status(Done),
maybe_print("Reindexed ~p messages~n", [Done], Opts)
end.
maybe_print(Fmt, Args, #{print_fun := Fun}) when is_function(Fun, 2) ->
Fun(Fmt, Args);
maybe_print(_Fmt, _Args, _Opts) ->
ok.
log_status(Done) ->
?SLOG(
info,
#{
msg => "retainer_message_record_reindexing_progress",
done => Done
}
).
is_enabled() ->
emqx_retainer:enabled() andalso emqx_retainer:backend_module() =:= ?MODULE.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% emqx_retainer callbacks %% emqx_retainer callbacks

View File

@ -0,0 +1 @@
Fixed an issue where durable storage sites that were down being reported as up.

View File

@ -0,0 +1,4 @@
- Automatically re-index imported retained messages during restoring a data backup file. Previously, it was needed to manually trigger re-indexing with `emqx ctl retainer reindex start` CLI
after the data backup file is imported.
- Allow exporting retained messages to a backup file if the configured storage_type (`retainer.backend.storage_type`) is `ram`. Previously, retained messages could be exported only if `disc` storage_type was configured.

View File

@ -0,0 +1 @@
The RocketMQ connector has got support for configuring SSL settings.

View File

@ -273,6 +273,7 @@ defmodule EMQXUmbrella.MixProject do
:debug_info, :debug_info,
{:compile_info, [{:emqx_vsn, String.to_charlist(version)}]}, {:compile_info, [{:emqx_vsn, String.to_charlist(version)}]},
{:d, :EMQX_RELEASE_EDITION, erlang_edition(edition_type)}, {:d, :EMQX_RELEASE_EDITION, erlang_edition(edition_type)},
{:d, :EMQX_ELIXIR},
{:d, :snk_kind, :msg} {:d, :snk_kind, :msg}
] ]
end end

View File

@ -200,7 +200,8 @@ for dep in ${CT_DEPS}; do
FILES+=( '.ci/docker-compose-file/docker-compose-dynamo.yaml' ) FILES+=( '.ci/docker-compose-file/docker-compose-dynamo.yaml' )
;; ;;
rocketmq) rocketmq)
FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' ) FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml'
'.ci/docker-compose-file/docker-compose-rocketmq-ssl.yaml' )
;; ;;
cassandra) cassandra)
FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' ) FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' )