Merge remote-tracking branch 'upstream/release-54' into 0105-sync-r54

This commit is contained in:
Ivan Dyachkov 2024-01-05 14:16:23 +01:00
commit 06117c3a33
54 changed files with 461 additions and 205 deletions

View File

@ -30,11 +30,10 @@ VOLUME ["/opt/emqx/log", "/opt/emqx/data"]
# - 8083 for WebSocket/HTTP
# - 8084 for WSS/HTTPS
# - 8883 port for MQTT(SSL)
# - 11883 port for internal MQTT/TCP
# - 18083 for dashboard and API
# - 4370 default Erlang distrbution port
# - 5369 for backplain gen_rpc
EXPOSE 1883 8083 8084 8883 11883 18083 4370 5369
EXPOSE 1883 8083 8084 8883 18083 4370 5369
ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]

View File

@ -21,7 +21,7 @@ endif
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.6.0
export EMQX_EE_DASHBOARD_VERSION ?= e1.4.0
export EMQX_EE_DASHBOARD_VERSION ?= e1.4.1
PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise

View File

@ -32,10 +32,10 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.4.0").
-define(EMQX_RELEASE_CE, "5.4.1").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.4.0").
-define(EMQX_RELEASE_EE, "5.4.1").
%% The HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -2,7 +2,7 @@
{application, emqx, [
{id, "emqx"},
{description, "EMQX Core"},
{vsn, "5.1.18"},
{vsn, "5.1.19"},
{modules, []},
{registered, []},
{applications, [

View File

@ -141,4 +141,5 @@ pbkdf2(MacFun, Password, Salt, Iterations, DKLength) ->
pbkdf2:pbkdf2(MacFun, Password, Salt, Iterations, DKLength).
hex(X) when is_binary(X) ->
pbkdf2:to_hex(X).
%% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25
string:lowercase(binary:encode_hex(X)).

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_http, [
{description, "EMQX HTTP Bridge and Connector Application"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
{env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]},

View File

@ -97,7 +97,10 @@ fields("http_action") ->
required => true,
desc => ?DESC("config_parameters_opts")
})}
] ++ emqx_connector_schema:resource_opts_ref(?MODULE, action_resource_opts);
] ++
emqx_connector_schema:resource_opts_ref(
?MODULE, action_resource_opts, fun legacy_action_resource_opts_converter/2
);
fields(action_resource_opts) ->
UnsupportedOpts = [batch_size, batch_time],
lists:filter(
@ -342,6 +345,13 @@ mark_request_field_deperecated(Fields) ->
Fields
).
legacy_action_resource_opts_converter(Conf, _Opts) when is_map(Conf) ->
%% In e5.3.0, we accidentally added `start_after_created` and `start_timeout` to the action resource opts.
%% Since e5.4.0, we have removed them. This function is used to convert the old config to the new one.
maps:without([<<"start_after_created">>, <<"start_timeout">>], Conf);
legacy_action_resource_opts_converter(Conf, _Opts) ->
Conf.
%%--------------------------------------------------------------------
%% Examples

View File

@ -1,5 +1,6 @@
%% -*- mode: erlang; -*-
{deps, [
{emqx, {path, "../../apps/emqx"}}
{emqx, {path, "../../apps/emqx"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}
]}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"},
{vsn, "0.1.6"},
{vsn, "0.1.7"},
{registered, []},
{applications, [
kernel,

View File

@ -17,6 +17,7 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-behaviour(emqx_resource).
@ -35,6 +36,8 @@
-export([on_async_result/2]).
-define(HEALTH_CHECK_TIMEOUT, 1000).
-define(INGRESS, "I").
-define(EGRESS, "E").
%% ===================================================================
%% When use this bridge as a data source, ?MODULE:on_message_received will be called
@ -66,7 +69,7 @@ on_start(ResourceId, Conf) ->
end.
start_ingress(ResourceId, Conf) ->
ClientOpts = mk_client_opts(ResourceId, "ingress", Conf),
ClientOpts = mk_client_opts(ResourceId, ?INGRESS, Conf),
case mk_ingress_config(ResourceId, Conf) of
Ingress = #{} ->
start_ingress(ResourceId, Ingress, ClientOpts);
@ -91,6 +94,8 @@ start_ingress(ResourceId, Ingress, ClientOpts) ->
{error, Reason}
end.
choose_ingress_pool_size(<<?TEST_ID_PREFIX, _/binary>>, _) ->
1;
choose_ingress_pool_size(
ResourceId,
#{remote := #{topic := RemoteTopic}, pool_size := PoolSize}
@ -119,7 +124,7 @@ start_egress(ResourceId, Conf) ->
% NOTE
% We are ignoring the user configuration here because there's currently no reliable way
% to ensure proper session recovery according to the MQTT spec.
ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, "egress", Conf)),
ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, ?EGRESS, Conf)),
case mk_egress_config(Conf) of
Egress = #{} ->
start_egress(ResourceId, Egress, ClientOpts);
@ -326,9 +331,10 @@ mk_client_opts(
],
Config
),
Name = parse_id_to_name(ResourceId),
mk_client_opt_password(Options#{
hosts => [HostPort],
clientid => clientid(ResourceId, ClientScope, Config),
clientid => clientid(Name, ClientScope, Config),
connect_timeout => 30,
keepalive => ms_to_s(KeepAlive),
force_ping => true,
@ -336,6 +342,12 @@ mk_client_opts(
ssl_opts => maps:to_list(maps:remove(enable, Ssl))
}).
parse_id_to_name(<<?TEST_ID_PREFIX, Name/binary>>) ->
Name;
parse_id_to_name(Id) ->
{_Type, Name} = emqx_bridge_resource:parse_bridge_id(Id, #{atom_name => false}),
Name.
mk_client_opt_password(Options = #{password := Secret}) ->
%% TODO: Teach `emqtt` to accept 0-arity closures as passwords.
Options#{password := emqx_secret:unwrap(Secret)};
@ -345,7 +357,9 @@ mk_client_opt_password(Options) ->
ms_to_s(Ms) ->
erlang:ceil(Ms / 1000).
clientid(Id, ClientScope, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) ->
iolist_to_binary([Prefix, ":", Id, ":", ClientScope, ":", atom_to_list(node())]);
clientid(Id, ClientScope, _Conf) ->
iolist_to_binary([Id, ":", ClientScope, ":", atom_to_list(node())]).
clientid(Name, ClientScope, _Conf = #{clientid_prefix := Prefix}) when
is_binary(Prefix) andalso Prefix =/= <<>>
->
emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name, ClientScope]);
clientid(Name, ClientScope, _Conf) ->
emqx_bridge_mqtt_lib:clientid_base([Name, ClientScope]).

View File

@ -81,7 +81,7 @@ mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) ->
ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}.
mk_clientid(WorkerId, ClientId) ->
iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId).
connect(Pid, Name) ->
case emqtt:connect(Pid) of

View File

@ -81,7 +81,7 @@ mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) ->
}.
mk_clientid(WorkerId, ClientId) ->
iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId).
mk_client_event_handler(Name, Ingress = #{}) ->
IngressVars = maps:with([server], Ingress),

View File

@ -0,0 +1,57 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_mqtt_lib).
-export([clientid_base/1, bytes23/2]).
%% @doc Make the base ID of client IDs.
%% A base ID is used to concatenate with pool worker ID to build a
%% full ID.
%% In order to avoid client ID clashing when EMQX is clustered,
%% the base ID is the resource name concatenated with
%% broker node name SHA-hash and truncated to 8 hex characters.
clientid_base(Name) ->
bin([Name, shortener(atom_to_list(node()), 8)]).
%% @doc Limit the number of bytes for client ID under 23 bytes.
%% If Prefix and suffix concatenated is longer than 23 bytes
%% it hashes the concatenation and replace the non-random suffix.
bytes23(Prefix, SeqNo) ->
Suffix = integer_to_binary(SeqNo),
Concat = bin([Prefix, $:, Suffix]),
case size(Concat) =< 23 of
true ->
Concat;
false ->
shortener(Concat, 23)
end.
%% @private SHA hash a string and return the prefix of
%% the given length as hex string in binary format.
shortener(Str, Length) when is_list(Str) ->
shortener(bin(Str), Length);
shortener(Str, Length) when is_binary(Str) ->
true = size(Str) > 0,
true = (Length > 0 andalso Length =< 40),
Sha = crypto:hash(sha, Str),
%% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25
Hex = string:lowercase(binary:encode_hex(Sha)),
<<UniqueEnough:Length/binary, _/binary>> = Hex,
UniqueEnough.
bin(IoList) ->
iolist_to_binary(IoList).

View File

@ -494,7 +494,6 @@ t_mqtt_conn_bridge_egress(_) ->
<<"egress">> => ?EGRESS_CONF
}
),
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
%% we now test if the bridge works as expected
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
@ -509,11 +508,6 @@ t_mqtt_conn_bridge_egress(_) ->
#{?snk_kind := buffer_worker_flush_ack}
),
%% we should receive a message on the "remote" broker, with specified topic
Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
Size = byte_size(ResourceID),
?assertMatch(<<ResourceID:Size/binary, _/binary>>, Msg#message.from),
%% verify the metrics of the bridge
?retry(
_Interval = 200,
@ -537,7 +531,6 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
<<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE
}
),
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
%% we now test if the bridge works as expected
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
@ -554,8 +547,6 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
%% we should receive a message on the "remote" broker, with specified topic
Msg = assert_mqtt_msg_received(RemoteTopic),
%% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
?assertMatch(<<ResourceID:(byte_size(ResourceID))/binary, _/binary>>, Msg#message.from),
?assertMatch(#{<<"payload">> := Payload}, emqx_utils_json:decode(Msg#message.payload)),
%% verify the metrics of the bridge
@ -574,15 +565,29 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
ok.
t_egress_custom_clientid_prefix(_Config) ->
t_egress_short_clientid(_Config) ->
%% Name is short, expect the actual client ID in use is hashed from
%% <name>E<nodename-hash>:<pool_worker_id>
Name = "abc01234",
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]),
ExpectedClientId = iolist_to_binary([BaseId, $:, "1"]),
test_egress_clientid(Name, ExpectedClientId).
t_egress_long_clientid(_Config) ->
%% Expect the actual client ID in use is hashed from
%% <name>E<nodename-hash>:<pool_worker_id>
Name = "abc01234567890123456789",
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]),
ExpectedClientId = emqx_bridge_mqtt_lib:bytes23(BaseId, 1),
test_egress_clientid(Name, ExpectedClientId).
test_egress_clientid(Name, ExpectedClientId) ->
BridgeIDEgress = create_bridge(
?SERVER_CONF#{
<<"clientid_prefix">> => <<"my-custom-prefix">>,
<<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF
<<"name">> => Name,
<<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1}
}
),
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
Payload = <<"hello">>,
@ -591,8 +596,7 @@ t_egress_custom_clientid_prefix(_Config) ->
emqx:publish(emqx_message:make(LocalTopic, Payload)),
Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
Size = byte_size(ResourceID),
?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, Msg#message.from),
?assertEqual(ExpectedClientId, Msg#message.from),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
ok.
@ -842,7 +846,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF,
<<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>,
%% using a long time so we can test recovery
<<"request_ttl">> => <<"15s">>,
@ -948,7 +951,6 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
<<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF,
<<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"query_mode">> => <<"async">>,
%% using a long time so we can test recovery
<<"request_ttl">> => <<"15s">>,

View File

@ -112,8 +112,6 @@ end_per_group(_Group, _Config) ->
ok.
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Config.
end_per_suite(_Config) ->

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [
{description, "EMQX configuration management"},
{vsn, "0.1.33"},
{vsn, "0.1.34"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib, emqx_ctl]},

View File

@ -35,7 +35,7 @@
-define(CLUSTER_CALL, cluster_call).
-define(CONF, conf).
-define(AUDIT_MOD, audit).
-define(UPDATE_READONLY_KEYS_PROHIBITED, "update_readonly_keys_prohibited").
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"update_readonly_keys_prohibited">>).
-dialyzer({no_match, [load/0]}).

View File

@ -77,8 +77,7 @@
%% Callback to upgrade config after loaded from config file but before validation.
upgrade_raw_conf(RawConf) ->
RawConf1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(RawConf),
emqx_otel_schema:upgrade_legacy_metrics(RawConf1).
emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(RawConf).
namespace() -> emqx.

View File

@ -165,7 +165,7 @@ t_load_readonly(Config) ->
ConfBin0 = hocon_pp:do(Base1#{KeyBin => Conf}, #{}),
ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
?assertEqual(
{error, "update_readonly_keys_prohibited"},
{error, <<"update_readonly_keys_prohibited">>},
emqx_conf_cli:conf(["load", ConfFile0])
),
%% reload etc/emqx.conf changed readonly keys

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
{vsn, "0.1.37"},
{vsn, "0.1.38"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [

View File

@ -51,7 +51,8 @@
common_resource_opts_subfields_bin/0,
resource_opts_fields/0,
resource_opts_fields/1,
resource_opts_ref/2
resource_opts_ref/2,
resource_opts_ref/3
]).
-export([examples/1]).
@ -530,13 +531,23 @@ status_and_actions_fields() ->
}
)}
].
resource_opts_ref(Module, RefName) ->
resource_opts_ref(Module, RefName, undefined).
resource_opts_ref(Module, RefName, ConverterFun) ->
Meta =
case ConverterFun of
undefined ->
emqx_resource_schema:resource_opts_meta();
_ ->
M = emqx_resource_schema:resource_opts_meta(),
M#{converter => ConverterFun}
end,
[
{resource_opts,
mk(
ref(Module, RefName),
emqx_resource_schema:resource_opts_meta()
Meta
)}
].

View File

@ -1,6 +1,6 @@
{application, emqx_ft, [
{description, "EMQX file transfer over MQTT"},
{vsn, "0.1.12"},
{vsn, "0.1.13"},
{registered, []},
{mod, {emqx_ft_app, []}},
{applications, [

View File

@ -394,6 +394,7 @@ test_configure(Uri, Config) ->
<<"host">> => <<"localhost">>,
<<"port">> => 9000,
<<"bucket">> => <<"emqx">>,
<<"url_expire_time">> => <<"2h">>,
<<"secret_access_key">> => ?SECRET_ACCESS_KEY,
<<"transport_options">> => #{
<<"ssl">> => #{
@ -433,6 +434,8 @@ test_configure(Uri, Config) ->
<<"keyfile">> := <<"/", _KeyFilepath/bytes>>
}
},
%% ensure 2h is unchanged
<<"url_expire_time">> := <<"2h">>,
<<"secret_access_key">> := <<"******">>
}
}

View File

@ -331,7 +331,7 @@ t_configs_key(_Config) ->
Log
),
Log1 = emqx_utils_maps:deep_put([<<"log">>, <<"console">>, <<"level">>], Log, <<"error">>),
?assertEqual([], update_configs_with_binary(iolist_to_binary(hocon_pp:do(Log1, #{})))),
?assertEqual(<<>>, update_configs_with_binary(iolist_to_binary(hocon_pp:do(Log1, #{})))),
?assertEqual(<<"error">>, read_conf([<<"log">>, <<"console">>, <<"level">>])),
BadLog = emqx_utils_maps:deep_put([<<"log">>, <<"console">>, <<"level">>], Log, <<"erro1r">>),
{error, Error} = update_configs_with_binary(iolist_to_binary(hocon_pp:do(BadLog, #{}))),
@ -345,6 +345,17 @@ t_configs_key(_Config) ->
}
},
?assertEqual(ExpectError, emqx_utils_json:decode(Error, [return_maps])),
ReadOnlyConf = #{
<<"cluster">> =>
#{
<<"autoclean">> => <<"23h">>,
<<"autoheal">> => true,
<<"discovery_strategy">> => <<"manual">>
}
},
ReadOnlyBin = iolist_to_binary(hocon_pp:do(ReadOnlyConf, #{})),
{error, ReadOnlyError} = update_configs_with_binary(ReadOnlyBin),
?assertEqual(<<"update_readonly_keys_prohibited">>, ReadOnlyError),
ok.
t_get_configs_in_different_accept(_Config) ->
@ -394,7 +405,7 @@ t_create_webhook_v1_bridges_api(Config) ->
WebHookFile = filename:join(?config(data_dir, Config), "webhook_v1.conf"),
?assertMatch({ok, _}, hocon:files([WebHookFile])),
{ok, WebHookBin} = file:read_file(WebHookFile),
?assertEqual([], update_configs_with_binary(WebHookBin)),
?assertEqual(<<>>, update_configs_with_binary(WebHookBin)),
Actions =
#{
<<"http">> =>
@ -531,7 +542,7 @@ update_configs_with_binary(Bin) ->
Path = emqx_mgmt_api_test_util:api_path(["configs"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Headers = [{"accept", "text/plain"}, Auth],
case httpc:request(put, {Path, Headers, "text/plain", Bin}, [], []) of
case httpc:request(put, {Path, Headers, "text/plain", Bin}, [], [{body_format, binary}]) of
{ok, {{"HTTP/1.1", Code, _}, _Headers, Body}} when
Code >= 200 andalso Code =< 299
->

View File

@ -1,6 +1,6 @@
{application, emqx_mysql, [
{description, "EMQX MySQL Database Connector"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{registered, []},
{applications, [
kernel,

View File

@ -289,10 +289,17 @@ do_check_prepares(_NoTemplates) ->
connect(Options) ->
%% TODO: teach `tdengine` to accept 0-arity closures as passwords.
{value, {password, Secret}, Rest} = lists:keytake(password, 1, Options),
NOptions = [{password, emqx_secret:unwrap(Secret)} | Rest],
NOptions = init_connect_opts(Options),
mysql:start_link(NOptions).
init_connect_opts(Options) ->
case lists:keytake(password, 1, Options) of
{value, {password, Secret}, Rest} ->
[{password, emqx_secret:unwrap(Secret)} | Rest];
false ->
Options
end.
init_prepare(State = #{query_templates := Templates}) ->
case maps:size(Templates) of
0 ->

View File

@ -1,17 +1,17 @@
% %%--------------------------------------------------------------------
% %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
% %%
% %% Licensed under the Apache License, Version 2.0 (the "License");
% %% you may not use this file except in compliance with the License.
% %% You may obtain a copy of the License at
% %% http://www.apache.org/licenses/LICENSE-2.0
% %%
% %% Unless required by applicable law or agreed to in writing, software
% %% distributed under the License is distributed on an "AS IS" BASIS,
% %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
% %% See the License for the specific language governing permissions and
% %% limitations under the License.
% %%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mysql_SUITE).
@ -19,40 +19,30 @@
-compile(export_all).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(MYSQL_HOST, "mysql").
-define(MYSQL_USER, "root").
-define(MYSQL_PASSWORD, "public").
-define(MYSQL_RESOURCE_MOD, emqx_mysql).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
Config;
Apps = emqx_cth_suite:start(
[emqx_conf, emqx_connector],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config];
false ->
{skip, no_mysql}
end.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector).
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok.
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(proplists:get_value(apps, Config)).
% %%------------------------------------------------------------------------------
% %% Testcases
@ -64,6 +54,12 @@ t_lifecycle(_Config) ->
mysql_config()
).
t_lifecycle_passwordless(_Config) ->
perform_lifecycle_check(
<<"emqx_mysql_SUITE:passwordless">>,
mysql_config(passwordless)
).
perform_lifecycle_check(ResourceId, InitialConfig) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?MYSQL_RESOURCE_MOD, InitialConfig),
@ -136,25 +132,55 @@ perform_lifecycle_check(ResourceId, InitialConfig) ->
% %%------------------------------------------------------------------------------
mysql_config() ->
RawConfig = list_to_binary(
io_lib:format(
""
"\n"
" auto_reconnect = true\n"
" database = mqtt\n"
" username= root\n"
" password = public\n"
" pool_size = 8\n"
" server = \"~s:~b\"\n"
" "
"",
[?MYSQL_HOST, ?MYSQL_DEFAULT_PORT]
)
),
mysql_config(default).
{ok, Config} = hocon:binary(RawConfig),
mysql_config(default) ->
parse_mysql_config(
"\n auto_reconnect = true"
"\n database = mqtt"
"\n username = ~p"
"\n password = ~p"
"\n pool_size = 8"
"\n server = \"~s:~b\""
"\n",
[?MYSQL_USER, ?MYSQL_PASSWORD, ?MYSQL_HOST, ?MYSQL_DEFAULT_PORT]
);
mysql_config(passwordless) ->
ok = run_admin_query("CREATE USER IF NOT EXISTS 'nopwd'@'%'"),
ok = run_admin_query("GRANT ALL ON mqtt.* TO 'nopwd'@'%'"),
parse_mysql_config(
"\n auto_reconnect = true"
"\n database = mqtt"
"\n username = nopwd"
"\n pool_size = 8"
"\n server = \"~s:~b\""
"\n",
[?MYSQL_HOST, ?MYSQL_DEFAULT_PORT]
).
parse_mysql_config(FormatString, Args) ->
{ok, Config} = hocon:binary(io_lib:format(FormatString, Args)),
#{<<"config">> => Config}.
run_admin_query(Query) ->
Pid = connect_mysql(),
try
mysql:query(Pid, Query)
after
mysql:stop(Pid)
end.
connect_mysql() ->
Opts = [
{host, ?MYSQL_HOST},
{port, ?MYSQL_DEFAULT_PORT},
{user, ?MYSQL_USER},
{password, ?MYSQL_PASSWORD},
{database, "mysql"}
],
{ok, Pid} = mysql:start_link(Opts),
Pid.
test_query_no_params() ->
{sql, <<"SELECT 1">>}.

View File

@ -1,6 +1,6 @@
{application, emqx_opentelemetry, [
{description, "OpenTelemetry for EMQX Broker"},
{vsn, "0.2.2"},
{vsn, "0.2.3"},
{registered, []},
{mod, {emqx_otel_app, []}},
{applications, [

View File

@ -24,40 +24,15 @@
desc/1
]).
-export([upgrade_legacy_metrics/1]).
%% Compatibility with the previous schema that defined only metric fields
upgrade_legacy_metrics(RawConf) ->
case RawConf of
#{<<"opentelemetry">> := Otel} ->
Otel1 =
case maps:take(<<"enable">>, Otel) of
{MetricsEnable, OtelConf} ->
emqx_utils_maps:deep_put(
[<<"metrics">>, <<"enable">>], OtelConf, MetricsEnable
);
error ->
Otel
end,
Otel2 =
case Otel1 of
#{<<"exporter">> := #{<<"interval">> := Interval} = Exporter} ->
emqx_utils_maps:deep_put(
[<<"metrics">>, <<"interval">>],
Otel1#{<<"exporter">> => maps:remove(<<"interval">>, Exporter)},
Interval
);
_ ->
Otel1
end,
RawConf#{<<"opentelemetry">> => Otel2};
_ ->
RawConf
end.
namespace() -> opentelemetry.
roots() -> ["opentelemetry"].
roots() ->
[
{"opentelemetry",
?HOCON(?R_REF("opentelemetry"), #{
converter => fun legacy_metrics_converter/2
})}
].
fields("opentelemetry") ->
[
@ -259,3 +234,27 @@ desc("otel_metrics") -> ?DESC(otel_metrics);
desc("otel_traces") -> ?DESC(otel_traces);
desc("trace_filter") -> ?DESC(trace_filter);
desc(_) -> undefined.
%% Compatibility with the previous schema that defined only metrics fields
legacy_metrics_converter(OtelConf, _Opts) when is_map(OtelConf) ->
Otel1 =
case maps:take(<<"enable">>, OtelConf) of
{MetricsEnable, OtelConf1} ->
emqx_utils_maps:deep_put(
[<<"metrics">>, <<"enable">>], OtelConf1, MetricsEnable
);
error ->
OtelConf
end,
case Otel1 of
#{<<"exporter">> := #{<<"interval">> := Interval} = Exporter} ->
emqx_utils_maps:deep_put(
[<<"metrics">>, <<"interval">>],
Otel1#{<<"exporter">> => maps:remove(<<"interval">>, Exporter)},
Interval
);
_ ->
Otel1
end;
legacy_metrics_converter(Conf, _Opts) ->
Conf.

View File

@ -22,8 +22,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% Backward compatibility suite for `upgrade_raw_conf/1`,
%% expected callback is `emqx_otel_schema:upgrade_legacy_metrics/1`
%% Backward compatibility suite for legacy metrics converter
-define(OLD_CONF_ENABLED, <<
"\n"

View File

@ -1,6 +1,6 @@
{application, emqx_s3, [
{description, "EMQX S3"},
{vsn, "5.0.12"},
{vsn, "5.0.13"},
{modules, []},
{registered, [emqx_s3_sup]},
{applications, [
@ -8,8 +8,7 @@
stdlib,
gproc,
erlcloud,
ehttpc,
emqx_bridge_http
ehttpc
]},
{mod, {emqx_s3_app, []}}
]}.

View File

@ -388,7 +388,7 @@ with_path_and_query_only(Url, Fun) ->
%% Users provide headers as a map, but erlcloud expects a list of tuples with string keys and values.
headers_user_to_erlcloud_request(UserHeaders) ->
[{to_list_string(K), V} || {K, V} <- maps:to_list(UserHeaders)].
[{string:to_lower(to_list_string(K)), V} || {K, V} <- maps:to_list(UserHeaders)].
%% Ehttpc returns operates on headers as a list of tuples with binary keys.
%% Erlcloud expects a list of tuples with string values and lowcase string keys
@ -409,6 +409,8 @@ to_binary(Val) when is_binary(Val) -> Val.
to_list_string(Val) when is_binary(Val) ->
binary_to_list(Val);
to_list_string(Val) when is_atom(Val) ->
atom_to_list(Val);
to_list_string(Val) when is_list(Val) ->
Val.

View File

@ -142,6 +142,22 @@ t_no_acl(Config) ->
ok = emqx_s3_client:put_object(Client, Key, <<"data">>).
t_extra_headers(Config0) ->
Config = [{extra_headers, #{'Content-Type' => <<"application/json">>}} | Config0],
Key = ?config(key, Config),
Client = client(Config),
Data = #{foo => bar},
ok = emqx_s3_client:put_object(Client, Key, emqx_utils_json:encode(Data)),
Url = emqx_s3_client:uri(Client, Key),
{ok, {{_StatusLine, 200, "OK"}, _Headers, Content}} = httpc:request(Url),
?_assertEqual(
Data,
emqx_utils_json:decode(Content)
).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
@ -154,17 +170,22 @@ client(Config) ->
profile_config(Config) ->
ProfileConfig0 = emqx_s3_test_helpers:base_config(?config(conn_type, Config)),
ProfileConfig1 = maps:put(
bucket,
?config(bucket, Config),
ProfileConfig0
),
ProfileConfig2 = emqx_utils_maps:deep_put(
[transport_options, pool_type],
ProfileConfig1,
?config(pool_type, Config)
),
ProfileConfig2.
maps:fold(
fun inject_config/3,
ProfileConfig0,
#{
bucket => ?config(bucket, Config),
[transport_options, pool_type] => ?config(pool_type, Config),
[transport_options, headers] => ?config(extra_headers, Config)
}
).
inject_config(_Key, undefined, ProfileConfig) ->
ProfileConfig;
inject_config(KeyPath, Value, ProfileConfig) when is_list(KeyPath) ->
emqx_utils_maps:deep_put(KeyPath, ProfileConfig, Value);
inject_config(Key, Value, ProfileConfig) ->
maps:put(Key, Value, ProfileConfig).
data(Size) ->
iolist_to_binary([$a || _ <- lists:seq(1, Size)]).

View File

@ -2,7 +2,7 @@
{application, emqx_utils, [
{description, "Miscellaneous utilities for EMQX apps"},
% strict semver, bump manually!
{vsn, "5.0.14"},
{vsn, "5.0.15"},
{modules, [
emqx_utils,
emqx_utils_api,

View File

@ -761,7 +761,11 @@ deobfuscate(NewConf, OldConf) ->
fun(K, V, Acc) ->
case maps:find(K, OldConf) of
error ->
Acc#{K => V};
case is_redacted(K, V) of
%% don't put redacted value into new config
true -> Acc;
false -> Acc#{K => V}
end;
{ok, OldV} when is_map(V), is_map(OldV) ->
Acc#{K => deobfuscate(V, OldV)};
{ok, OldV} ->
@ -880,6 +884,25 @@ redact2_test_() ->
Keys = [secret, passcode],
[{case_name(atom, Key), fun() -> Case(Key, Checker) end} || Key <- Keys].
deobfuscate_test() ->
NewConf0 = #{foo => <<"bar0">>, password => <<"123456">>},
?assertEqual(NewConf0, deobfuscate(NewConf0, #{foo => <<"bar">>, password => <<"654321">>})),
NewConf1 = #{foo => <<"bar1">>, password => <<?REDACT_VAL>>},
?assertEqual(
#{foo => <<"bar1">>, password => <<"654321">>},
deobfuscate(NewConf1, #{foo => <<"bar">>, password => <<"654321">>})
),
%% Don't have password before and ignore to put redact_val into new config
NewConf2 = #{foo => <<"bar2">>, password => ?REDACT_VAL},
?assertEqual(#{foo => <<"bar2">>}, deobfuscate(NewConf2, #{foo => <<"bar">>})),
%% Don't have password before and should allow put non-redact-val into new config
NewConf3 = #{foo => <<"bar3">>, password => <<"123456">>},
?assertEqual(NewConf3, deobfuscate(NewConf3, #{foo => <<"bar">>})),
ok.
redact_is_authorization_test_() ->
Types = [string, binary],
Keys = ["auThorization", "Authorization", "authorizaTion"],

View File

@ -0,0 +1 @@
Fix old (prior to EMQX 5.4.0) Open Telemetry configuration incompatibility when the config is defined in emqx.conf.

View File

@ -0,0 +1 @@
Ensure short client ID for MQTT bridges.

View File

@ -0,0 +1 @@
Compatible with the configuration of error formats introduced by HTTP Action in 5.3.2.

21
changes/e5.4.1.en.md Normal file
View File

@ -0,0 +1,21 @@
# e5.4.1
## Bug Fixes
- [#12234](https://github.com/emqx/emqx/pull/12234) Resolved compatibility issues with Open Telemetry configurations defined in `emqx.conf` from versions before EMQX 5.4.0, ensuring smooth integration of legacy configurations with the latest EMQX release.
- [#12236](https://github.com/emqx/emqx/pull/12236) Fixed client ID generation in MQTT broker data integration to comply with MQTT 3.1 specification of 23-byte limit. Client ID is now prefixed with user-assigned connector name, followed by the first 8 bytes of node name's SHA hash and pool member ID. If the resulting ID exceeds 23 bytes, additional SHA hash and truncation are applied to ensure compliance.
- [#12238](https://github.com/emqx/emqx/pull/12238) Resolved compatibility issue with the error format configurations introduced in the HTTP Action feature of EMQX version 5.3.2.
- [#12240](https://github.com/emqx/emqx/pull/12240) Modified the `/file_transfer` API to return file transfer configurations in their original raw format. This change prevents the conversion of time units, such as "1h", to seconds, ensuring that callers receive the initially configured values. This modification aligns with other getter APIs, maintaining consistency in data representation.
- [#12241](https://github.com/emqx/emqx/pull/12241) Fixed a bug where configuring additional HTTP headers for S3 API interactions disrupted file transfers using the S3 storage backend, ensuring stable and uninterrupted file transfer operations.
- [#12246](https://github.com/emqx/emqx/pull/12246) Stopped exposing port 11883 by default in Docker and removed it from Helm charts, as this port is no longer in use.
- [#12249](https://github.com/emqx/emqx/pull/12249) Fixed an issue in the `/configs` API where attempting to modify a read-only configuration value resulted in a garbled response message.
- [#12250](https://github.com/emqx/emqx/pull/12250) Resolved an issue where the `file_transfer` configuration's `secret_access_key` value was erroneously being updated to masked stars (`*****`), ensuring that the original key value remains unaltered and secure.
- [#12256](https://github.com/emqx/emqx/pull/12256) Fixed an issue that prevented establishing connections to MySQL resources without a password.

View File

@ -0,0 +1 @@
Modified the /file_transfer API to return the file transfer configuration in raw format rather than converting time units like "1h" to seconds, providing callers with the original configured values for consistency with other getter APIs

View File

@ -0,0 +1 @@
Fix an issue where setting up extra HTTP headers for communication with S3 API would break File Transfers using S3 storage backend.

View File

@ -0,0 +1 @@
Do not expose 11883 port by default in docker and remove it from helm chart since this port is no longer in use.

View File

@ -0,0 +1 @@
Fixed issue where the response message from the /configs API would be garbled when attempting to update a read-only configuration value

View File

@ -0,0 +1 @@
Fixed incorrect attempt to update the file_transfer configuration's secret_access_key value to masked stars ('*****')

View File

@ -0,0 +1 @@
Fix an issue where connections to passwordless MySQL resources could not be established.

13
changes/v5.4.1.en.md Normal file
View File

@ -0,0 +1,13 @@
# v5.4.1
## Bug Fixes
- [#12234](https://github.com/emqx/emqx/pull/12234) Resolved compatibility issues with Open Telemetry configurations defined in `emqx.conf` from versions before EMQX 5.4.0, ensuring smooth integration of legacy configurations with the latest EMQX release.
- [#12236](https://github.com/emqx/emqx/pull/12236) Fixed client ID generation in MQTT broker data integration to comply with MQTT 3.1 specification of 23-byte limit. Client ID is now prefixed with user-assigned connector name, followed by the first 8 bytes of node name's SHA hash and pool member ID. If the resulting ID exceeds 23 bytes, additional SHA hash and truncation are applied to ensure compliance.
- [#12238](https://github.com/emqx/emqx/pull/12238) Resolved compatibility issue with the error format configurations introduced in the HTTP Action feature of EMQX version 5.3.2.
- [#12246](https://github.com/emqx/emqx/pull/12246) Stopped exposing port 11883 by default in Docker and removed it from Helm charts, as this port is no longer in use.
- [#12249](https://github.com/emqx/emqx/pull/12249) Fixed an issue in the `/configs` API where attempting to modify a read-only configuration value resulted in a garbled response message.

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.4.0
version: 5.4.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.4.0
appVersion: 5.4.1

View File

@ -111,10 +111,6 @@ spec:
containerPort: {{ .Values.emqxConfig.EMQX_LISTENERS__WSS__DEFAULT__BIND | default 8084 }}
- name: dashboard
containerPort: {{ .Values.emqxConfig.EMQX_DASHBOARD__LISTENERS__HTTP__BIND | default 18083 }}
{{- if not (empty .Values.emqxConfig.EMQX_LISTENERS__TCP__INTERNAL__BIND) }}
- name: internalmqtt
containerPort: {{ .Values.emqxConfig.EMQX_LISTENERS__TCP__INTERNAL__BIND }}
{{- end }}
{{- if not (empty .Values.emqxConfig.EMQX_DASHBOARD__LISTENERS__HTTPS__BIND) }}
- name: dashboardtls
containerPort: {{ .Values.emqxConfig.EMQX_DASHBOARD__LISTENERS__HTTPS__BIND }}

View File

@ -44,17 +44,6 @@ spec:
{{- else if eq .Values.service.type "ClusterIP" }}
nodePort: null
{{- end }}
{{- if not (empty .Values.emqxConfig.EMQX_LISTENERS__TCP__INTERNAL__BIND) }}
- name: internalmqtt
port: {{ .Values.service.internalmqtt | default 11883 }}
protocol: TCP
targetPort: internalmqtt
{{- if and (or (eq .Values.service.type "NodePort") (eq .Values.service.type "LoadBalancer")) (not (empty .Values.service.nodePorts.internalmqtt)) }}
nodePort: {{ .Values.service.nodePorts.internalmqtt }}
{{- else if eq .Values.service.type "ClusterIP" }}
nodePort: null
{{- end }}
{{ end }}
- name: mqttssl
port: {{ .Values.service.mqttssl | default 8883 }}
protocol: TCP
@ -127,12 +116,6 @@ spec:
port: {{ .Values.service.mqtt | default 1883 }}
protocol: TCP
targetPort: mqtt
{{- if not (empty .Values.emqxConfig.EMQX_LISTENERS__TCP__INTERNAL__BIND) }}
- name: internalmqtt
port: {{ .Values.service.internalmqtt | default 11883 }}
protocol: TCP
targetPort: internalmqtt
{{ end }}
- name: mqttssl
port: {{ .Values.service.mqttssl | default 8883 }}
protocol: TCP

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.4.0
version: 5.4.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.4.0
appVersion: 5.4.1

View File

@ -111,10 +111,6 @@ spec:
containerPort: {{ splitList ":" ( .Values.emqxConfig.EMQX_LISTENERS__WSS__DEFAULT__BIND | default "8084" ) | last }}
- name: dashboard
containerPort: {{ splitList ":" ( .Values.emqxConfig.EMQX_DASHBOARD__LISTENERS__HTTP__BIND | default "18083" ) | last }}
{{- if not (empty .Values.emqxConfig.EMQX_LISTENERS__TCP__INTERNAL__BIND) }}
- name: internalmqtt
containerPort: {{ splitList ":" .Values.emqxConfig.EMQX_LISTENERS__TCP__INTERNAL__BIND | last }}
{{- end }}
{{- if not (empty .Values.emqxConfig.EMQX_DASHBOARD__LISTENERS__HTTPS__BIND) }}
- name: dashboardtls
containerPort: {{ splitList ":" .Values.emqxConfig.EMQX_DASHBOARD__LISTENERS__HTTPS__BIND | last }}

View File

@ -44,17 +44,6 @@ spec:
{{- else if eq .Values.service.type "ClusterIP" }}
nodePort: null
{{- end }}
{{- if not (empty .Values.emqxConfig.EMQX_LISTENERS__TCP__INTERNAL__BIND) }}
- name: internalmqtt
port: {{ .Values.service.internalmqtt | default 11883 }}
protocol: TCP
targetPort: internalmqtt
{{- if and (or (eq .Values.service.type "NodePort") (eq .Values.service.type "LoadBalancer")) (not (empty .Values.service.nodePorts.internalmqtt)) }}
nodePort: {{ .Values.service.nodePorts.internalmqtt }}
{{- else if eq .Values.service.type "ClusterIP" }}
nodePort: null
{{- end }}
{{ end }}
- name: mqttssl
port: {{ .Values.service.mqttssl | default 8883 }}
protocol: TCP
@ -127,12 +116,6 @@ spec:
port: {{ .Values.service.mqtt | default 1883 }}
protocol: TCP
targetPort: mqtt
{{- if not (empty .Values.emqxConfig.EMQX_LISTENERS__TCP__INTERNAL__BIND) }}
- name: internalmqtt
port: {{ .Values.service.internalmqtt | default 11883 }}
protocol: TCP
targetPort: internalmqtt
{{ end }}
- name: mqttssl
port: {{ .Values.service.mqttssl | default 8883 }}
protocol: TCP

View File

@ -64,11 +64,10 @@ VOLUME ["/opt/emqx/log", "/opt/emqx/data"]
# - 8083 for WebSocket/HTTP
# - 8084 for WSS/HTTPS
# - 8883 port for MQTT(SSL)
# - 11883 port for internal MQTT/TCP
# - 18083 for dashboard and API
# - 4370 default Erlang distribution port
# - 5369 for backplane gen_rpc
EXPOSE 1883 8083 8084 8883 11883 18083 4370 5369
EXPOSE 1883 8083 8084 8883 18083 4370 5369
ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]

View File

@ -0,0 +1,74 @@
node {
name = "emqx@127.0.0.1"
cookie = "emqxsecretcookie"
data_dir = "data"
}
cluster {
name = emqxcl
discovery_strategy = manual
}
actions {
http {
x_WH_D {
connector = connector_x_WH_D
enable = true
parameters {
body = "${clientid}"
headers {}
max_retries = 2
method = post
path = ""
}
resource_opts {
health_check_interval = 15s
inflight_window = 100
max_buffer_bytes = 1GB
query_mode = async
request_ttl = 45s
start_after_created = true
start_timeout = 5s
worker_pool_size = 4
}
}
}
}
connectors {
http {
connector_x_WH_D {
connect_timeout = 15s
enable = true
enable_pipelining = 100
headers {content-type = "application/json"}
pool_size = 8
pool_type = hash
ssl {
ciphers = []
depth = 10
enable = false
hibernate_after = 5s
log_level = notice
reuse_sessions = true
secure_renegotiate = true
verify = verify_peer
versions = [tlsv1.3, tlsv1.2]
}
url = "http://127.0.0.1:18083"
}
}
}
rule_engine {
rules {
x_WH_D {
actions = ["webhook:x_WH_D"]
description = x
enable = true
metadata {created_at = 1699341635802}
name = ""
sql = "SELECT\n *\nFROM\n \"#\",\n \"$events/message_delivered\",\n \"$events/message_acked\""
}
}
}