Merge pull request #13296 from zmstone/0619-sync-release-57-to-master

0619 sync release 57 to master
This commit is contained in:
zmstone 2024-06-20 10:33:52 +02:00 committed by GitHub
commit f969a4ef5e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 461 additions and 101 deletions

View File

@ -0,0 +1,41 @@
version: '3.9'
services:
mqnamesrvssl:
image: apache/rocketmq:4.9.4
container_name: rocketmq_namesrv_ssl
# ports:
# - 9876:9876
volumes:
- ./rocketmq/logs_ssl:/opt/logs
- ./rocketmq/store_ssl:/opt/store
environment:
JAVA_OPT: "-Dtls.server.mode=enforcing"
command: ./mqnamesrv
networks:
- emqx_bridge
mqbrokerssl:
image: apache/rocketmq:4.9.4
container_name: rocketmq_broker_ssl
# ports:
# - 10909:10909
# - 10911:10911
volumes:
- ./rocketmq/logs_ssl:/opt/logs
- ./rocketmq/store_ssl:/opt/store
- ./rocketmq/conf_ssl/broker.conf:/etc/rocketmq/broker.conf
- ./rocketmq/conf_ssl/plain_acl.yml:/home/rocketmq/rocketmq-4.9.4/conf/plain_acl.yml
environment:
NAMESRV_ADDR: "rocketmq_namesrv_ssl:9876"
JAVA_OPTS: " -Duser.home=/opt -Drocketmq.broker.diskSpaceWarningLevelRatio=0.99"
JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn512m -Dtls.server.mode=enforcing"
command: ./mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- mqnamesrvssl
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,24 @@
brokerClusterName=DefaultClusterSSL
brokerName=broker-a
brokerId=0
brokerIP1=rocketmq_broker_ssl
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=100
maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
aclEnable=true

View File

@ -0,0 +1,12 @@
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: PUB|SUB
topicPerms:
- TopicTest=PUB|SUB
- Topic2=PUB|SUB

View File

@ -18,6 +18,8 @@
-type traverse_break_reason() :: over | migrate.
-type opts() :: #{print_fun => fun((io:format(), [term()]) -> ok)}.
-callback backup_tables() -> [mria:table()].
%% validate the backup
@ -31,6 +33,9 @@
-callback migrate_mnesia_backup(tuple()) -> {ok, tuple()} | {error, term()}.
-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1]).
%% NOTE: currently, this is called only when the table has been restored successfully.
-callback on_backup_table_imported(mria:table(), opts()) -> ok | {error, term()}.
-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1, on_backup_table_imported/2]).
-export_type([traverse_break_reason/0]).

View File

@ -24,11 +24,18 @@
version/0,
version_with_prefix/0,
vsn_compare/1,
vsn_compare/2
vsn_compare/2,
on_load/0
]).
-on_load(on_load/0).
-include("emqx_release.hrl").
-ifndef(EMQX_RELEASE_EDITION).
-define(EMQX_RELEASE_EDITION, ce).
-endif.
-define(EMQX_DESCS, #{
ee => "EMQX Enterprise",
ce => "EMQX"
@ -49,6 +56,11 @@
ce => "v"
}).
%% @hidden Initialize edition. Almost static. use persistent_term to trick compiler.
-spec on_load() -> ok.
on_load() ->
persistent_term:put('EMQX_RELEASE_EDITION', ?EMQX_RELEASE_EDITION).
%% @doc Return EMQX description.
description() ->
maps:get(edition(), ?EMQX_DESCS).
@ -57,11 +69,8 @@ description() ->
%% Read info from persistent_term at runtime.
%% Or meck this function to run tests for another edition.
-spec edition() -> ce | ee.
-ifdef(EMQX_RELEASE_EDITION).
edition() -> ?EMQX_RELEASE_EDITION.
-else.
edition() -> ce.
-endif.
edition() ->
persistent_term:get('EMQX_RELEASE_EDITION').
%% @doc Return EMQX version prefix string.
edition_vsn_prefix() ->

View File

@ -191,8 +191,6 @@
-define(DEFAULT_MULTIPLIER, 1.5).
-define(DEFAULT_BACKOFF, 0.75).
-define(INJECTING_CONFIGS, [?AUTH_EXT_SCHEMA_MODS]).
namespace() -> emqx.
tags() ->

View File

@ -3,7 +3,7 @@
{erl_opts, [debug_info]}.
{deps, [
{rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.5.3"}}},
{rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.6.1"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, [

View File

@ -178,6 +178,7 @@ fields(action_parameters) ->
Parameters,
[
servers,
ssl,
namespace,
pool_size,
auto_reconnect,

View File

@ -80,7 +80,7 @@ fields(config) ->
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
].
] ++ emqx_connector_schema_lib:ssl_fields().
servers() ->
Meta = #{desc => ?DESC("servers")},
@ -98,7 +98,8 @@ on_start(
servers := BinServers,
access_key := AccessKey,
secret_key := SecretKey,
security_token := SecurityToken
security_token := SecurityToken,
ssl := SSLOptsMap
} = Config
) ->
?SLOG(info, #{
@ -113,13 +114,15 @@ on_start(
ClientId = client_id(InstanceId),
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
Namespace = maps:get(namespace, Config, <<>>),
ClientCfg = #{acl_info => ACLInfo, namespace => Namespace},
ClientCfg0 = #{acl_info => ACLInfo, namespace => Namespace},
SSLOpts = emqx_tls_lib:to_client_opts(SSLOptsMap),
ClientCfg = emqx_utils_maps:put_if(ClientCfg0, ssl_opts, SSLOpts, SSLOpts =/= []),
State = #{
client_id => ClientId,
acl_info => ACLInfo,
namespace => Namespace,
installed_channels => #{}
installed_channels => #{},
ssl_opts => SSLOpts
},
ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId),
@ -142,12 +145,13 @@ on_add_channel(
#{
installed_channels := InstalledChannels,
namespace := Namespace,
acl_info := ACLInfo
acl_info := ACLInfo,
ssl_opts := SSLOpts
} = OldState,
ChannelId,
ChannelConfig
) ->
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace),
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace, SSLOpts),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
@ -156,7 +160,8 @@ on_add_channel(
create_channel_state(
#{parameters := Conf} = _ChannelConfig,
ACLInfo,
Namespace
Namespace,
SSLOpts
) ->
#{
topic := Topic,
@ -164,7 +169,7 @@ create_channel_state(
strategy := Strategy
} = Conf,
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy),
ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy, SSLOpts),
Templates = parse_template(Conf),
DispatchStrategy = parse_dispatch_strategy(Strategy),
State = #{
@ -407,9 +412,10 @@ make_producer_opts(
},
ACLInfo,
Namespace,
Strategy
Strategy,
SSLOpts
) ->
#{
ProducerOpts = #{
tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval,
acl_info => emqx_secret:wrap(ACLInfo),
@ -419,7 +425,8 @@ make_producer_opts(
roundrobin -> roundrobin;
_ -> key_dispatch
end
}.
},
emqx_utils_maps:put_if(ProducerOpts, ssl_opts, SSLOpts, SSLOpts =/= []).
acl_info(<<>>, _, _) ->
#{};

View File

@ -116,9 +116,17 @@ common_init(ConfigT) ->
_ = emqx_bridge_enterprise:module_info(),
emqx_mgmt_api_test_util:init_suite(),
{Name, RocketMQConf} = rocketmq_config(BridgeType, Config0),
RocketMQSSLConf = RocketMQConf#{
<<"servers">> => <<"rocketmq_namesrv_ssl:9876">>,
<<"ssl">> => #{
<<"enable">> => true,
<<"verify">> => verify_none
}
},
Config =
[
{rocketmq_config, RocketMQConf},
{rocketmq_config_ssl, RocketMQSSLConf},
{rocketmq_bridge_type, BridgeType},
{rocketmq_name, Name},
{proxy_host, ProxyHost},
@ -180,6 +188,28 @@ create_bridge(Config) ->
RocketMQConf = ?GET_CONFIG(rocketmq_config, Config),
emqx_bridge:create(BridgeType, Name, RocketMQConf).
create_bridge_ssl(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
RocketMQConf = ?GET_CONFIG(rocketmq_config_ssl, Config),
emqx_bridge:create(BridgeType, Name, RocketMQConf).
create_bridge_ssl_bad_ssl_opts(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
RocketMQConf0 = ?GET_CONFIG(rocketmq_config_ssl, Config),
%% This config is wrong because we use verify_peer without
%% a cert that can be used in the verification.
RocketMQConf1 = maps:put(
<<"ssl">>,
#{
<<"enable">> => true,
<<"verify">> => verify_peer
},
RocketMQConf0
),
emqx_bridge:create(BridgeType, Name, RocketMQConf1).
delete_bridge(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
@ -233,6 +263,44 @@ t_setup_via_config_and_publish(Config) ->
),
ok.
t_setup_via_config_and_publish_ssl(Config) ->
?assertMatch(
{ok, _},
create_bridge_ssl(Config)
),
SentData = #{payload => ?PAYLOAD},
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := rocketmq_connector_query_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace),
ok
end
),
ok.
%% Check that we can not connect to the SSL only RocketMQ instance
%% with incorrect SSL options
t_setup_via_config_ssl_host_bad_ssl_opts(Config) ->
?assertMatch(
{ok, _},
create_bridge_ssl_bad_ssl_opts(Config)
),
Name = ?GET_CONFIG(rocketmq_name, Config),
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)),
?assertMatch(#{status := disconnected}, emqx_bridge_v2:health_check(BridgeType, Name)),
ok.
t_setup_via_http_api_and_publish(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),

View File

@ -37,60 +37,4 @@
-define(READONLY_KEYS, [cluster, rpc, node]).
-define(CE_AUTHZ_SOURCE_SCHEMA_MODS, [
emqx_authz_file_schema,
emqx_authz_mnesia_schema,
emqx_authz_http_schema,
emqx_authz_redis_schema,
emqx_authz_mysql_schema,
emqx_authz_postgresql_schema,
emqx_authz_mongodb_schema,
emqx_authz_ldap_schema
]).
-define(EE_AUTHZ_SOURCE_SCHEMA_MODS, []).
-define(CE_AUTHN_PROVIDER_SCHEMA_MODS, [
emqx_authn_mnesia_schema,
emqx_authn_mysql_schema,
emqx_authn_postgresql_schema,
emqx_authn_mongodb_schema,
emqx_authn_redis_schema,
emqx_authn_http_schema,
emqx_authn_jwt_schema,
emqx_authn_scram_mnesia_schema,
emqx_authn_ldap_schema
]).
-define(EE_AUTHN_PROVIDER_SCHEMA_MODS, [
emqx_gcp_device_authn_schema
]).
-if(?EMQX_RELEASE_EDITION == ee).
-define(AUTH_EXT_SCHEMA_MODS, [emqx_auth_ext_schema]).
-define(AUTHZ_SOURCE_SCHEMA_MODS, ?CE_AUTHZ_SOURCE_SCHEMA_MODS ++ ?EE_AUTHZ_SOURCE_SCHEMA_MODS).
-define(AUTHN_PROVIDER_SCHEMA_MODS,
(?CE_AUTHN_PROVIDER_SCHEMA_MODS ++ ?EE_AUTHN_PROVIDER_SCHEMA_MODS)
).
-define(CLUSTER_LINKING_SCHEMA_MODS, [emqx_cluster_link_schema]).
-define(OTHER_INJECTING_CONFIGS, ?AUTH_EXT_SCHEMA_MODS ++ ?CLUSTER_LINKING_SCHEMA_MODS).
-else.
-define(AUTHZ_SOURCE_SCHEMA_MODS, ?CE_AUTHZ_SOURCE_SCHEMA_MODS).
-define(AUTHN_PROVIDER_SCHEMA_MODS, ?CE_AUTHN_PROVIDER_SCHEMA_MODS).
-define(OTHER_INJECTING_CONFIGS, []).
-endif.
-define(INJECTING_CONFIGS, [
{emqx_authn_schema, ?AUTHN_PROVIDER_SCHEMA_MODS},
{emqx_authz_schema, ?AUTHZ_SOURCE_SCHEMA_MODS}
| ?OTHER_INJECTING_CONFIGS
]).
-endif.

View File

@ -97,7 +97,8 @@ tags() ->
[<<"EMQX">>].
roots() ->
ok = emqx_schema_hooks:inject_from_modules(?INJECTING_CONFIGS),
Injections = emqx_conf_schema_inject:schemas(),
ok = emqx_schema_hooks:inject_from_modules(Injections),
emqx_schema_high_prio_roots() ++
[
{node,

View File

@ -0,0 +1,77 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf_schema_inject).
-export([schemas/0]).
schemas() ->
schemas(emqx_release:edition()).
schemas(Edition) ->
auth_ext(Edition) ++
cluster_linking(Edition) ++
authn(Edition) ++
authz() ++
customized().
auth_ext(ce) ->
[];
auth_ext(ee) ->
[emqx_auth_ext_schema].
cluster_linking(ce) ->
[];
cluster_linking(ee) ->
[emqx_cluster_link_schema].
authn(Edition) ->
[{emqx_authn_schema, authn_mods(Edition)}].
authn_mods(ce) ->
[
emqx_authn_mnesia_schema,
emqx_authn_mysql_schema,
emqx_authn_postgresql_schema,
emqx_authn_mongodb_schema,
emqx_authn_redis_schema,
emqx_authn_http_schema,
emqx_authn_jwt_schema,
emqx_authn_scram_mnesia_schema,
emqx_authn_ldap_schema
];
authn_mods(ee) ->
authn_mods(ce) ++
[emqx_gcp_device_authn_schema].
authz() ->
[{emqx_authz_schema, authz_mods()}].
authz_mods() ->
[
emqx_authz_file_schema,
emqx_authz_mnesia_schema,
emqx_authz_http_schema,
emqx_authz_redis_schema,
emqx_authz_mysql_schema,
emqx_authz_postgresql_schema,
emqx_authz_mongodb_schema,
emqx_authz_ldap_schema
].
%% Add more schemas here.
customized() ->
[].

View File

@ -273,7 +273,7 @@ shards(DB) ->
[Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs].
-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
#{replica_set := #{site() => #{status => up | joining}}}
#{replica_set := #{site() => #{status => up | down}}}
| undefined.
shard_info(DB, Shard) ->
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
@ -282,8 +282,13 @@ shard_info(DB, Shard) ->
[#?SHARD_TAB{replica_set = Replicas}] ->
ReplicaSet = maps:from_list([
begin
%% TODO:
ReplInfo = #{status => up},
Status =
case mria:cluster_status(?MODULE:node(I)) of
running -> up;
stopped -> down;
false -> down
end,
ReplInfo = #{status => Status},
{I, ReplInfo}
end
|| I <- Replicas

View File

@ -408,9 +408,13 @@ export_mnesia_tab(TarDescriptor, TabName, BackupName, BackupBaseName, Opts) ->
do_export_mnesia_tab(TabName, BackupName) ->
Node = node(),
try
{ok, TabName, [Node]} = mnesia:activate_checkpoint(
[{name, TabName}, {min, [TabName]}, {allow_remote, false}]
),
Opts0 = [{name, TabName}, {min, [TabName]}, {allow_remote, false}],
Opts =
case mnesia:table_info(TabName, storage_type) of
ram_copies -> [{ram_overrides_dump, true} | Opts0];
_ -> Opts0
end,
{ok, TabName, [Node]} = mnesia:activate_checkpoint(Opts),
MnesiaBackupName = mnesia_backup_name(BackupName, TabName),
ok = filelib:ensure_dir(MnesiaBackupName),
ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName),
@ -549,6 +553,8 @@ import_mnesia_tabs(BackupDir, Opts) ->
)
).
-spec import_mnesia_tab(file:filename_all(), module(), mria:table(), map()) ->
ok | {ok, no_backup_file} | {error, term()} | no_return().
import_mnesia_tab(BackupDir, Mod, TabName, Opts) ->
MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName),
case filelib:is_regular(MnesiaBackupFileName) of
@ -572,7 +578,7 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]),
case Restored of
{atomic, [TabName]} ->
ok;
on_table_imported(Mod, TabName, Opts);
RestoreErr ->
?SLOG(error, #{
msg => "failed_to_restore_mnesia_backup",
@ -598,6 +604,27 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
_ = file:delete(MnesiaBackupFileName)
end.
on_table_imported(Mod, Tab, Opts) ->
case erlang:function_exported(Mod, on_backup_table_imported, 2) of
true ->
try
Mod:on_backup_table_imported(Tab, Opts)
catch
Class:Reason:Stack ->
?SLOG(error, #{
msg => "post_database_import_callback_failed",
table => Tab,
module => Mod,
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
end;
false ->
ok
end.
%% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema.
%% Looks like there is no clean way to abort traversal without triggering any error reporting,
%% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided...

View File

@ -19,6 +19,7 @@
-compile(nowarn_export_all).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -114,6 +115,64 @@ t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) ->
end,
ok.
t_import_retained_messages(Config) ->
FName = "emqx-export-ce-retained-msgs-test.tar.gz",
BackupFile = filename:join(?config(data_dir, Config), FName),
Exp = {ok, #{db_errors => #{}, config_errors => #{}}},
?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)),
%% verify that retainer messages are imported
?assertMatch(
{ok, [#message{payload = <<"Hi 1!!!">>}]},
emqx_retainer:read_message(<<"t/backup-retainer/test1">>)
),
?assertMatch(
{ok, [#message{payload = <<"Hi 5!!!">>}]},
emqx_retainer:read_message(<<"t/backup-retainer/test5">>)
),
%% verify that messages are re-indexed
?assertMatch(
{ok, _, [
#message{payload = <<"Hi 5!!!">>},
#message{payload = <<"Hi 4!!!">>},
#message{payload = <<"Hi 3!!!">>},
#message{payload = <<"Hi 2!!!">>},
#message{payload = <<"Hi 1!!!">>}
]},
emqx_retainer:page_read(<<"t/backup-retainer/#">>, 1, 5)
),
%% Export and import again
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)).
t_export_ram_retained_messages(_Config) ->
{ok, _} = emqx_retainer:update_config(
#{
<<"enable">> => true,
<<"backend">> => #{<<"storage_type">> => <<"ram">>}
}
),
?assertEqual(ram_copies, mnesia:table_info(emqx_retainer_message, storage_type)),
Topic = <<"t/backup_test_export_retained_ram/1">>,
Payload = <<"backup_test_retained_ram">>,
Msg = emqx_message:make(
<<"backup_test">>,
?QOS_0,
Topic,
Payload,
#{retain => true},
#{}
),
_ = emqx_broker:publish(Msg),
{ok, #{filename := BackupFileName}} = emqx_mgmt_data_backup:export(),
ok = emqx_retainer:delete(Topic),
?assertEqual({ok, []}, emqx_retainer:read_message(Topic)),
?assertEqual(
{ok, #{db_errors => #{}, config_errors => #{}}},
emqx_mgmt_data_backup:import(BackupFileName)
),
?assertMatch({ok, [#message{payload = Payload}]}, emqx_retainer:read_message(Topic)).
t_cluster_hocon_export_import(Config) ->
RawConfBeforeImport = emqx:get_raw_config([]),
BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP),

View File

@ -986,14 +986,7 @@ start_app(App) ->
%% but not the ones shared with others.
ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
%% load plugin apps and beam code
AppsToStop =
lists:map(
fun(NameVsn) ->
{ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
AppName
end,
Apps
),
AppsToStop = lists:filtermap(fun parse_name_vsn_for_stopping/1, Apps),
case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
{ok, []} ->
%% all apps stopped
@ -1009,6 +1002,30 @@ ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
{error, Reason}
end.
%% On one hand, Elixir plugins might include Elixir itself, when targetting a non-Elixir
%% EMQX release. If, on the other hand, the EMQX release already includes Elixir, we
%% shouldn't stop Elixir nor IEx.
-ifdef(EMQX_ELIXIR).
is_protected_app(elixir) -> true;
is_protected_app(iex) -> true;
is_protected_app(_) -> false.
parse_name_vsn_for_stopping(NameVsn) ->
{ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
case is_protected_app(AppName) of
true ->
false;
false ->
{true, AppName}
end.
%% ELSE ifdef(EMQX_ELIXIR)
-else.
parse_name_vsn_for_stopping(NameVsn) ->
{ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
{true, AppName}.
%% END ifdef(EMQX_ELIXIR)
-endif.
stop_apps(Apps) ->
RunningApps = running_apps(),
case do_stop_apps(Apps, [], RunningApps) of
@ -1045,8 +1062,10 @@ stop_app(App) ->
unload_moudle_and_app(App) ->
case application:get_key(App, modules) of
{ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules);
_ -> ok
{ok, Modules} ->
lists:foreach(fun code:soft_purge/1, Modules);
_ ->
ok
end,
_ = application:unload(App),
ok.

View File

@ -55,7 +55,10 @@
-export([populate_index_meta/0]).
-export([reindex/3]).
-export([backup_tables/0]).
-export([
backup_tables/0,
on_backup_table_imported/2
]).
-record(retained_message, {topic, msg, expiry_time}).
-record(retained_index, {key, expiry_time}).
@ -80,8 +83,45 @@ topics() ->
%%--------------------------------------------------------------------
%% Data backup
%%--------------------------------------------------------------------
backup_tables() ->
[?TAB_MESSAGE].
[?TAB_MESSAGE || is_enabled()].
on_backup_table_imported(?TAB_MESSAGE, Opts) ->
case is_enabled() of
true ->
maybe_print("Starting reindexing retained messages ~n", [], Opts),
Res = reindex(false, mk_status_fun(Opts)),
maybe_print("Reindexing retained messages finished~n", [], Opts),
Res;
false ->
ok
end;
on_backup_table_imported(_Tab, _Opts) ->
ok.
mk_status_fun(Opts) ->
fun(Done) ->
log_status(Done),
maybe_print("Reindexed ~p messages~n", [Done], Opts)
end.
maybe_print(Fmt, Args, #{print_fun := Fun}) when is_function(Fun, 2) ->
Fun(Fmt, Args);
maybe_print(_Fmt, _Args, _Opts) ->
ok.
log_status(Done) ->
?SLOG(
info,
#{
msg => "retainer_message_record_reindexing_progress",
done => Done
}
).
is_enabled() ->
emqx_retainer:enabled() andalso emqx_retainer:backend_module() =:= ?MODULE.
%%--------------------------------------------------------------------
%% emqx_retainer callbacks

View File

@ -825,8 +825,21 @@ join_to_string(Sep, List) -> emqx_variform_bif:join_to_string(Sep, List).
%% - String values are always quoted
%% - No escape sequence for keys and values
%% - Float point values are formatted with fixed (6) decimal point compact-formatting
map_to_redis_hset_args(Payload) when erlang:is_binary(Payload) ->
try
Map = json_decode(Payload),
map_to_redis_hset_args(Map)
catch
_:_ ->
%% Discard invalid JSON
[map_to_redis_hset_args]
end;
map_to_redis_hset_args(Map) when erlang:is_map(Map) ->
[map_to_redis_hset_args | maps:fold(fun redis_hset_acc/3, [], Map)].
Fields = maps:fold(fun redis_hset_acc/3, [], Map),
%% Fields can be [], the final template may have other fields for concatenation
[map_to_redis_hset_args | Fields];
map_to_redis_hset_args(_Other) ->
[map_to_redis_hset_args].
redis_hset_acc(K, V, IoData) ->
try

View File

@ -1395,6 +1395,8 @@ t_map_to_redis_hset_args(_Config) ->
true
end
),
?assertEqual([], Do(<<"not json">>)),
?assertEqual([], Do([<<"not map">>, <<"not json either">>])),
ok.
%%------------------------------------------------------------------------------

View File

@ -0,0 +1 @@
Fixed an issue where durable storage sites that were down being reported as up.

View File

@ -0,0 +1,4 @@
- Automatically re-index imported retained messages during restoring a data backup file. Previously, it was needed to manually trigger re-indexing with `emqx ctl retainer reindex start` CLI
after the data backup file is imported.
- Allow exporting retained messages to a backup file if the configured storage_type (`retainer.backend.storage_type`) is `ram`. Previously, retained messages could be exported only if `disc` storage_type was configured.

View File

@ -0,0 +1 @@
The RocketMQ connector has got support for configuring SSL settings.

View File

@ -273,6 +273,7 @@ defmodule EMQXUmbrella.MixProject do
:debug_info,
{:compile_info, [{:emqx_vsn, String.to_charlist(version)}]},
{:d, :EMQX_RELEASE_EDITION, erlang_edition(edition_type)},
{:d, :EMQX_ELIXIR},
{:d, :snk_kind, :msg}
]
end

View File

@ -200,7 +200,8 @@ for dep in ${CT_DEPS}; do
FILES+=( '.ci/docker-compose-file/docker-compose-dynamo.yaml' )
;;
rocketmq)
FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' )
FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml'
'.ci/docker-compose-file/docker-compose-rocketmq-ssl.yaml' )
;;
cassandra)
FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' )