From e73bf716ae113008ffb936b30b27816ce775ce1d Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sun, 19 Nov 2023 22:08:42 +0100 Subject: [PATCH 01/19] fix(emqx_channel): do not log stale sock_close event as error In some cases, EMQX may decide to close socket and mark connection at 'disconnected' state, for example, when DISCONNECTE packet is received, or, when failed to write data to socket. However, by the time EMQX decided to close the socket, the socket might have already been closed by peer, and the `tcp_closed` envet is already delivered to the process mailbox -- causing EMQX to handle sock_close event at 'disconnected' state. --- apps/emqx/src/emqx_channel.erl | 6 ++++-- apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl | 9 ++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 4f6d5ac6f..c2f62c840 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1246,8 +1246,10 @@ handle_info( {ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3}; Shutdown -> Shutdown end; -handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?SLOG(error, #{msg => "unexpected_sock_close", reason => Reason}), +handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> + %% This can happen as a race: + %% EMQX closes socket and marks 'disconnected' but 'tcp_closed' or 'ssl_closed' + %% is already in process mailbox {ok, Channel}; handle_info(clean_authz_cache, Channel) -> ok = emqx_authz_cache:empty_authz_cache(), diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 453fa9fd2..10d081e57 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -963,13 +963,12 @@ handle_info( NChannel = ensure_disconnected(Reason, Channel), shutdown(Reason, NChannel); handle_info( - {sock_closed, Reason}, + {sock_closed, _Reason}, Channel = #channel{conn_state = disconnected} ) -> - ?SLOG(error, #{ - msg => "unexpected_sock_closed", - reason => Reason - }), + %% This can happen as a race: + %% EMQX closes socket and marks 'disconnected' but 'tcp_closed' or 'ssl_closed' + %% is already in process mailbox {ok, Channel}; handle_info(clean_authz_cache, Channel) -> ok = emqx_authz_cache:empty_authz_cache(), From 0939b66af5512f582de7811017466923a93c69d4 Mon Sep 17 00:00:00 2001 From: Kinplemelon Date: Mon, 20 Nov 2023 17:28:01 +0800 Subject: [PATCH 02/19] chore: upgrade dashboard to e1.3.2-beta.1 for ee --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 0112776bb..7c9638dd5 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 export EMQX_DASHBOARD_VERSION ?= v1.5.1 -export EMQX_EE_DASHBOARD_VERSION ?= e1.3.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2-beta.1 PROFILE ?= emqx REL_PROFILES := emqx emqx-enterprise From 1f1d9e58c6bd3da70c95c82da282715fca027e0f Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 20 Nov 2023 16:23:46 +0100 Subject: [PATCH 03/19] fix(emqx_connector): don't crash in API on delete with active channels --- .../emqx_connector/src/emqx_connector_api.erl | 2 +- .../test/emqx_connector_api_SUITE.erl | 143 ++++++++++++++++-- 2 files changed, 130 insertions(+), 15 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index b2267539b..f6e0c0f95 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -372,7 +372,7 @@ schema("/connectors_probe") -> case emqx_connector:remove(ConnectorType, ConnectorName) of ok -> ?NO_CONTENT; - {error, {active_channels, Channels}} -> + {error, {post_config_update, _HandlerMod, {active_channels, Channels}}} -> ?BAD_REQUEST( {<<"Cannot delete connector while there are active channels defined for this connector">>, Channels} diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index f6609808f..bd8aa9ddf 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -25,7 +25,7 @@ -include_lib("snabbkaffe/include/test_macros.hrl"). -define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))). --define(CONNECTOR(NAME, TYPE), #{ +-define(RESOURCE(NAME, TYPE), #{ %<<"ssl">> => #{<<"enable">> => false}, <<"type">> => TYPE, <<"name">> => NAME @@ -52,12 +52,57 @@ -define(KAFKA_CONNECTOR_BASE, ?KAFKA_CONNECTOR_BASE(?KAFKA_BOOTSTRAP_HOST)). -define(KAFKA_CONNECTOR(Name, BootstrapHosts), maps:merge( - ?CONNECTOR(Name, ?CONNECTOR_TYPE), + ?RESOURCE(Name, ?CONNECTOR_TYPE), ?KAFKA_CONNECTOR_BASE(BootstrapHosts) ) ). -define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)). +-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))). +-define(BRIDGE_TYPE_STR, "kafka_producer"). +-define(BRIDGE_TYPE, <>). +-define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?BRIDGE_TYPE)#{ + <<"enable">> => true, + <<"connector">> => Connector, + <<"kafka">> => #{ + <<"buffer">> => #{ + <<"memory_overload_protection">> => true, + <<"mode">> => <<"hybrid">>, + <<"per_partition_limit">> => <<"2GB">>, + <<"segment_bytes">> => <<"100MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"kafka_ext_headers">> => [ + #{ + <<"kafka_ext_header_key">> => <<"clientid">>, + <<"kafka_ext_header_value">> => <<"${clientid}">> + }, + #{ + <<"kafka_ext_header_key">> => <<"topic">>, + <<"kafka_ext_header_value">> => <<"${topic}">> + } + ], + <<"kafka_header_value_encode_mode">> => <<"none">>, + <<"kafka_headers">> => <<"${pub_props}">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_inflight">> => 10, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"timestamp">> => <<"${.timestamp}">>, + <<"value">> => <<"${.}">> + }, + <<"partition_count_refresh_interval">> => <<"60s">>, + <<"partition_strategy">> => <<"random">>, + <<"required_acks">> => <<"all_isr">>, + <<"topic">> => <<"kafka-topic">> + }, + <<"local_topic">> => <<"mqtt/local/topic">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"32s">> + } +}). +-define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)). + %% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>). %% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{ %% <<"server">> => SERVER, @@ -105,7 +150,8 @@ emqx, emqx_auth, emqx_management, - {emqx_connector, "connectors {}"} + {emqx_connector, "connectors {}"}, + {emqx_bridge, "actions {}"} ]). -define(APPSPEC_DASHBOARD, @@ -128,7 +174,8 @@ all() -> groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), SingleOnlyTests = [ - t_connectors_probe + t_connectors_probe, + t_fail_delete_with_action ], ClusterLaterJoinOnlyTCs = [ % t_cluster_later_join_metrics @@ -187,29 +234,38 @@ end_per_group(_, Config) -> emqx_cth_suite:stop(?config(group_apps, Config)), ok. -init_per_testcase(_TestCase, Config) -> +init_per_testcase(TestCase, Config) -> case ?config(cluster_nodes, Config) of undefined -> - init_mocks(); + init_mocks(TestCase); Nodes -> - [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] + [erpc:call(Node, ?MODULE, init_mocks, [TestCase]) || Node <- Nodes] end, Config. -end_per_testcase(_TestCase, Config) -> +end_per_testcase(TestCase, Config) -> + Node = ?config(node, Config), + ok = erpc:call(Node, ?MODULE, clear_resources, [TestCase]), case ?config(cluster_nodes, Config) of undefined -> meck:unload(); Nodes -> - [erpc:call(Node, meck, unload, []) || Node <- Nodes] + [erpc:call(N, meck, unload, []) || N <- Nodes] end, - Node = ?config(node, Config), ok = emqx_common_test_helpers:call_janitor(), - ok = erpc:call(Node, fun clear_resources/0), ok. -define(CONNECTOR_IMPL, dummy_connector_impl). -init_mocks() -> +init_mocks(t_fail_delete_with_action) -> + init_mocks(common), + meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}), + meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}), + meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected), + ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId) + end), + ok; +init_mocks(_TestCase) -> meck:new(emqx_connector_ee_schema, [passthrough, no_link]), meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), @@ -235,7 +291,15 @@ init_mocks() -> ), [?CONNECTOR_IMPL, emqx_connector_ee_schema]. -clear_resources() -> +clear_resources(t_fail_delete_with_action) -> + lists:foreach( + fun(#{type := Type, name := Name}) -> + ok = emqx_bridge_v2:remove(Type, Name) + end, + emqx_bridge_v2:list() + ), + clear_resources(common); +clear_resources(_) -> lists:foreach( fun(#{type := Type, name := Name}) -> ok = emqx_connector:remove(Type, Name) @@ -646,7 +710,7 @@ t_connectors_probe(Config) -> request_json( post, uri(["connectors_probe"]), - ?CONNECTOR(<<"broken_connector">>, <<"unknown_type">>), + ?RESOURCE(<<"broken_connector">>, <<"unknown_type">>), Config ) ), @@ -674,6 +738,57 @@ t_create_with_bad_name(Config) -> ?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg), ok. +t_fail_delete_with_action(Config) -> + Name = ?CONNECTOR_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?CONNECTOR_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _] + }}, + request_json( + post, + uri(["connectors"]), + ?KAFKA_CONNECTOR(Name), + Config + ) + ), + ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), + BridgeName = ?BRIDGE_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"connector">> := Name, + <<"kafka">> := #{}, + <<"local_topic">> := _, + <<"resource_opts">> := _ + }}, + request_json( + post, + uri(["actions"]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ) + ), + + %% delete the connector + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := + <<"{<<\"Cannot delete connector while there are active channels", + " defined for this connector\">>,", _/binary>> + }}, + request_json(delete, uri(["connectors", ConnectorID]), Config) + ), + ok. + %%% helpers listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], From f40f6bc5ddfb14b7d3a08c7a0cd3662da56fe4e4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 20 Nov 2023 12:37:36 -0300 Subject: [PATCH 04/19] refactor: split `resource_opts` fields between connector and actions --- .../src/schema/emqx_bridge_v2_schema.erl | 26 ++++++++++++ .../emqx_bridge/test/emqx_bridge_v2_tests.erl | 41 +++++++++++++++++++ .../src/emqx_bridge_kafka.erl | 2 +- .../src/schema/emqx_connector_schema.erl | 20 +++++++++ 4 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 apps/emqx_bridge/test/emqx_bridge_v2_tests.erl diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index ede783e97..b0ac870e7 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -39,6 +39,7 @@ ]). -export([types/0, types_sc/0]). +-export([resource_opts_fields/0, resource_opts_fields/1]). -export_type([action_type/0]). @@ -137,6 +138,31 @@ types() -> types_sc() -> hoconsc:enum(types()). +resource_opts_fields() -> + resource_opts_fields(_Overrides = []). + +resource_opts_fields(Overrides) -> + ActionROFields = [ + batch_size, + batch_time, + buffer_mode, + buffer_seg_bytes, + health_check_interval, + inflight_window, + max_buffer_bytes, + metrics_flush_interval, + query_mode, + request_ttl, + resume_interval, + start_after_created, + start_timeout, + worker_pool_size + ], + lists:filter( + fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end, + emqx_resource_schema:create_opts(Overrides) + ). + examples(Method) -> MergeFun = fun(Example, Examples) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl new file mode 100644 index 000000000..4e28f3d88 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl @@ -0,0 +1,41 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_v2_tests). + +-include_lib("eunit/include/eunit.hrl"). + +resource_opts_union_connector_actions_test() -> + %% The purpose of this test is to ensure we have split `resource_opts' fields + %% consciouly between connector and actions, in particular when/if we introduce new + %% fields there. + AllROFields = non_deprecated_fields(emqx_resource_schema:create_opts([])), + ActionROFields = non_deprecated_fields(emqx_bridge_v2_schema:resource_opts_fields()), + ConnectorROFields = non_deprecated_fields(emqx_connector_schema:resource_opts_fields()), + UnionROFields = lists:usort(ConnectorROFields ++ ActionROFields), + ?assertEqual( + lists:usort(AllROFields), + UnionROFields, + #{ + missing_fields => AllROFields -- UnionROFields, + unexpected_fields => UnionROFields -- AllROFields, + action_fields => ActionROFields, + connector_fields => ConnectorROFields + } + ), + ok. + +non_deprecated_fields(Fields) -> + [K || {K, Schema} <- Fields, not hocon_schema:is_deprecated(Schema)]. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 0eb015cd3..b3934c7bb 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -525,7 +525,7 @@ fields(consumer_kafka_opts) -> ]; fields(resource_opts) -> SupportedFields = [health_check_interval], - CreationOpts = emqx_resource_schema:create_opts(_Overrides = []), + CreationOpts = emqx_bridge_v2_schema:resource_opts_fields(), lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts); fields(action_field) -> {kafka_producer, diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 22eb523be..070c1a165 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -30,6 +30,8 @@ -export([connector_type_to_bridge_types/1]). +-export([resource_opts_fields/0, resource_opts_fields/1]). + -if(?EMQX_RELEASE_EDITION == ee). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use @@ -296,6 +298,24 @@ desc(connectors) -> desc(_) -> undefined. +resource_opts_fields() -> + resource_opts_fields(_Overrides = []). + +resource_opts_fields(Overrides) -> + %% Note: these don't include buffer-related configurations because buffer workers are + %% tied to the action. + ConnectorROFields = [ + health_check_interval, + query_mode, + request_ttl, + start_after_created, + start_timeout + ], + lists:filter( + fun({Key, _Sc}) -> lists:member(Key, ConnectorROFields) end, + emqx_resource_schema:create_opts(Overrides) + ). + %%====================================================================================== %% Helper Functions %%====================================================================================== From 8ec3b1db5df25dc5d024944eed15acc8d87af8ee Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 20 Nov 2023 16:51:39 +0100 Subject: [PATCH 05/19] fix(emqx_connection): handle socket activation error return --- apps/emqx/src/emqx_connection.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index db36fbea9..31281b8c2 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -555,7 +555,7 @@ handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) -> handle_msg(check_cache, #state{limiter_buffer = Cache} = State) -> case queue:peek(Cache) of empty -> - activate_socket(State); + handle_info(activate_socket, State); {value, #pending_req{need = Needs, data = Data, next = Next}} -> State2 = State#state{limiter_buffer = queue:drop(Cache)}, check_limiter(Needs, Data, Next, [check_cache], State2) From ec19247271aac074d67f8a9e81b0e9542a4ff0be Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 20 Nov 2023 16:55:26 +0100 Subject: [PATCH 06/19] refactor: rename limiter buffer related messages and var names --- apps/emqx/src/emqx_connection.erl | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 31281b8c2..11d42f9dd 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -552,13 +552,13 @@ handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) -> inc_counter(incoming_bytes, Len), ok = emqx_metrics:inc('bytes.received', Len), when_bytes_in(Len, Data, State); -handle_msg(check_cache, #state{limiter_buffer = Cache} = State) -> - case queue:peek(Cache) of +handle_msg(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) -> + case queue:peek(Buffer) of empty -> handle_info(activate_socket, State); {value, #pending_req{need = Needs, data = Data, next = Next}} -> - State2 = State#state{limiter_buffer = queue:drop(Cache)}, - check_limiter(Needs, Data, Next, [check_cache], State2) + State2 = State#state{limiter_buffer = queue:drop(Buffer)}, + check_limiter(Needs, Data, Next, [check_limiter_buffer], State2) end; handle_msg( {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, @@ -1036,13 +1036,13 @@ check_limiter( Data, WhenOk, _Msgs, - #state{limiter_buffer = Cache} = State + #state{limiter_buffer = Buffer} = State ) -> %% if there has a retry timer, - %% cache the operation and execute it after the retry is over - %% the maximum length of the cache queue is equal to the active_n + %% Buffer the operation and execute it after the retry is over + %% the maximum length of the buffer queue is equal to the active_n New = #pending_req{need = Needs, data = Data, next = WhenOk}, - {ok, State#state{limiter_buffer = queue:in(New, Cache)}}. + {ok, State#state{limiter_buffer = queue:in(New, Buffer)}}. %% try to perform a retry -spec retry_limiter(state()) -> _. @@ -1053,7 +1053,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> {ok, Limiter2} -> Next( Data, - [check_cache], + [check_limiter_buffer], State#state{ limiter = Limiter2, limiter_timer = undefined From 7f078295c1d3a60803c885bd4b38af63702cf646 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 20 Nov 2023 17:44:39 +0100 Subject: [PATCH 07/19] docs: add changelog for PR 11987 --- changes/ce/fix-11987.en.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changes/ce/fix-11987.en.md diff --git a/changes/ce/fix-11987.en.md b/changes/ce/fix-11987.en.md new file mode 100644 index 000000000..4d85cff41 --- /dev/null +++ b/changes/ce/fix-11987.en.md @@ -0,0 +1,3 @@ +Fix connection crash when trying to set TCP/SSL socket `active_n` option. + +Prior to this fix, if a socket is already closed when connection process tries to set `active_n` option, it causes a `case_clause` crash. From b02711af79d6b914beb55f6aefaa18bf60860c75 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 20 Nov 2023 17:47:20 +0100 Subject: [PATCH 08/19] refactor(emqx_ws_connection): rename cache to buffer for limiter --- apps/emqx/src/emqx_ws_connection.erl | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 37ce72d74..07329721a 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -94,7 +94,7 @@ limiter :: container(), %% cache operation when overload - limiter_cache :: queue:queue(cache()), + limiter_buffer :: queue:queue(cache()), %% limiter timers limiter_timer :: undefined | reference() @@ -326,7 +326,7 @@ websocket_init([Req, Opts]) -> zone = Zone, listener = {Type, Listener}, limiter_timer = undefined, - limiter_cache = queue:new() + limiter_buffer = queue:new() }, hibernate}; {denny, Reason} -> @@ -462,13 +462,13 @@ websocket_info( State ) -> return(retry_limiter(State)); -websocket_info(check_cache, #state{limiter_cache = Cache} = State) -> - case queue:peek(Cache) of +websocket_info(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) -> + case queue:peek(Buffer) of empty -> return(enqueue({active, true}, State#state{sockstate = running})); {value, #cache{need = Needs, data = Data, next = Next}} -> - State2 = State#state{limiter_cache = queue:drop(Cache)}, - return(check_limiter(Needs, Data, Next, [check_cache], State2)) + State2 = State#state{limiter_buffer = queue:drop(Buffer)}, + return(check_limiter(Needs, Data, Next, [check_limiter_buffer], State2)) end; websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> handle_timeout(TRef, Msg, State); @@ -630,10 +630,10 @@ check_limiter( Data, WhenOk, _Msgs, - #state{limiter_cache = Cache} = State + #state{limiter_buffer = Buffer} = State ) -> New = #cache{need = Needs, data = Data, next = WhenOk}, - State#state{limiter_cache = queue:in(New, Cache)}. + State#state{limiter_buffer = queue:in(New, Buffer)}. -spec retry_limiter(state()) -> state(). retry_limiter(#state{limiter = Limiter} = State) -> @@ -644,7 +644,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> {ok, Limiter2} -> Next( Data, - [check_cache], + [check_limiter_buffer], State#state{ limiter = Limiter2, limiter_timer = undefined From 3909e0cc089ec270395b9fbf54c0eb9af72c1d95 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 20 Nov 2023 18:06:27 +0100 Subject: [PATCH 09/19] docs: add changelog for PR 11975 --- changes/ce/fix-11975.en.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changes/ce/fix-11975.en.md diff --git a/changes/ce/fix-11975.en.md b/changes/ce/fix-11975.en.md new file mode 100644 index 000000000..cba1c3a17 --- /dev/null +++ b/changes/ce/fix-11975.en.md @@ -0,0 +1,5 @@ +Resolve redundant error logging on socket closure + +Addressed a race condition causing duplicate error logs when a socket is closed by both a peer and the server. +Dual socket close events from the OS and EMQX previously led to excessive error logging. +The fix improves event handling to avoid redundant error-level logging. From 6030bf6fa53edd27c1dcb25205563c294dbda07e Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 20 Nov 2023 12:06:21 +0100 Subject: [PATCH 10/19] fix(action): upgrade and downgrade strategy This commit adds upgrade and downgrade hooks that are called when upgrading from a bridge V1 to connector and action or the other way around. The automatic translation is used if the callback is not defined. NOTE: Backported from master --- apps/emqx_bridge/src/emqx_action_info.erl | 67 ++++++++- apps/emqx_bridge/src/emqx_bridge_v2.erl | 20 ++- ...mqx_bridge_azure_event_hub_action_info.erl | 12 +- .../src/emqx_bridge_kafka_action_info.erl | 32 ++++- .../src/schema/emqx_connector_schema.erl | 129 ++++++++++++------ 5 files changed, 215 insertions(+), 45 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 8e8d51aff..1cdf61dfd 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -25,15 +25,39 @@ action_type_to_bridge_v1_type/1, bridge_v1_type_to_action_type/1, is_action_type/1, - registered_schema_modules/0 + registered_schema_modules/0, + connector_action_config_to_bridge_v1_config/3, + has_custom_connector_action_config_to_bridge_v1_config/1, + bridge_v1_config_to_connector_config/2, + has_custom_bridge_v1_config_to_connector_config/1, + bridge_v1_config_to_action_config/3, + has_custom_bridge_v1_config_to_action_config/1, + transform_bridge_v1_config_to_action_config/4 ]). -callback bridge_v1_type_name() -> atom(). -callback action_type_name() -> atom(). -callback connector_type_name() -> atom(). -callback schema_module() -> atom(). +%% Define this if the automatic config downgrade is not enough for the bridge. +-callback connector_action_config_to_bridge_v1_config( + ConnectorConfig :: map(), ActionConfig :: map() +) -> map(). +%% Define this if the automatic config upgrade is not enough for the connector. +-callback bridge_v1_config_to_connector_config(BridgeV1Config :: map()) -> map(). +%% Define this if the automatic config upgrade is not enough for the bridge. +%% If you want to make use of the automatic config upgrade, you can call +%% emqx_action_info:transform_bridge_v1_config_to_action_config/4 in your +%% implementation and do some adjustments on the result. +-callback bridge_v1_config_to_action_config(BridgeV1Config :: map(), ConnectorName :: binary()) -> + map(). --optional_callbacks([bridge_v1_type_name/0]). +-optional_callbacks([ + bridge_v1_type_name/0, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_config_to_connector_config/1, + bridge_v1_config_to_action_config/2 +]). %% ==================================================================== %% Hadcoded list of info modules for actions @@ -110,10 +134,49 @@ registered_schema_modules() -> Schemas = maps:get(action_type_to_schema_module, InfoMap), maps:to_list(Schemas). +has_custom_connector_action_config_to_bridge_v1_config(ActionOrBridgeType) -> + Module = get_action_info_module(ActionOrBridgeType), + erlang:function_exported(Module, connector_action_config_to_bridge_v1_config, 2). + +connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig, ActionConfig) -> + Module = get_action_info_module(ActionOrBridgeType), + %% should only be called if defined + Module:connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig). + +has_custom_bridge_v1_config_to_connector_config(ActionOrBridgeType) -> + Module = get_action_info_module(ActionOrBridgeType), + erlang:function_exported(Module, bridge_v1_config_to_connector_config, 1). + +bridge_v1_config_to_connector_config(ActionOrBridgeType, BridgeV1Config) -> + Module = get_action_info_module(ActionOrBridgeType), + %% should only be called if defined + Module:bridge_v1_config_to_connector_config(BridgeV1Config). + +has_custom_bridge_v1_config_to_action_config(ActionOrBridgeType) -> + Module = get_action_info_module(ActionOrBridgeType), + erlang:function_exported(Module, bridge_v1_config_to_action_config, 2). + +bridge_v1_config_to_action_config(ActionOrBridgeType, BridgeV1Config, ConnectorName) -> + Module = get_action_info_module(ActionOrBridgeType), + %% should only be called if defined + Module:bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName). + +transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName +) -> + emqx_connector_schema:transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName + ). + %% ==================================================================== %% Internal functions for building the info map and accessing it %% ==================================================================== +get_action_info_module(ActionOrBridgeType) -> + InfoMap = info_map(), + ActionInfoModuleMap = maps:get(action_type_to_info_module, InfoMap), + maps:get(ActionOrBridgeType, ActionInfoModuleMap, undefined). + internal_emqx_action_persistent_term_info_key() -> ?FUNCTION_NAME. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 7ce266922..706849965 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1105,9 +1105,23 @@ bridge_v1_lookup_and_transform_helper( <<"actions">>, emqx_bridge_v2_schema ), - BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2), - BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2), - BridgeV1Tmp = maps:put(raw_config, BridgeV1Config2, BridgeV2), + BridgeV1ConfigFinal = + case + emqx_action_info:has_custom_connector_action_config_to_bridge_v1_config(BridgeV1Type) + of + false -> + BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2), + %% Move parameters to the top level + ParametersMap = maps:get(<<"parameters">>, BridgeV1Config1, #{}), + BridgeV1Config2 = maps:remove(<<"parameters">>, BridgeV1Config1), + BridgeV1Config3 = emqx_utils_maps:deep_merge(BridgeV1Config2, ParametersMap), + emqx_utils_maps:deep_merge(ConnectorRawConfig2, BridgeV1Config3); + true -> + emqx_action_info:connector_action_config_to_bridge_v1_config( + BridgeV1Type, ConnectorRawConfig2, BridgeV2RawConfig2 + ) + end, + BridgeV1Tmp = maps:put(raw_config, BridgeV1ConfigFinal, BridgeV2), BridgeV1 = maps:remove(status, BridgeV1Tmp), BridgeV2Status = maps:get(status, BridgeV2, undefined), BridgeV2Error = maps:get(error, BridgeV2, undefined), diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl index 8ebdb2435..c4f395041 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl @@ -10,7 +10,9 @@ bridge_v1_type_name/0, action_type_name/0, connector_type_name/0, - schema_module/0 + schema_module/0, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_config_to_action_config/2 ]). bridge_v1_type_name() -> azure_event_hub_producer. @@ -20,3 +22,11 @@ action_type_name() -> azure_event_hub_producer. connector_type_name() -> azure_event_hub_producer. schema_module() -> emqx_bridge_azure_event_hub. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + emqx_bridge_kafka_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, ActionConfig + ). + +bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> + emqx_bridge_kafka_action_info:bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl index 50d4f0c63..7b6a946d0 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl @@ -10,7 +10,9 @@ bridge_v1_type_name/0, action_type_name/0, connector_type_name/0, - schema_module/0 + schema_module/0, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_config_to_action_config/2 ]). bridge_v1_type_name() -> kafka. @@ -20,3 +22,31 @@ action_type_name() -> kafka_producer. connector_type_name() -> kafka_producer. schema_module() -> emqx_bridge_kafka. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig), + BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), + emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2). + +bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> + Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, schema_module(), kafka_producer + ), + KafkaMap = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config0, #{}), + Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config0), + Config2 = emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaMap}), + maps:with(producer_action_field_keys(), Config2). + +%%------------------------------------------------------------------------------------------ +%% Internal helper functions +%%------------------------------------------------------------------------------------------ + +producer_action_field_keys() -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_kafka:fields(kafka_producer_action) + ]. + +to_bin(B) when is_binary(B) -> B; +to_bin(L) when is_list(L) -> list_to_binary(L); +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 22eb523be..9cb0bc931 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -22,7 +22,10 @@ -import(hoconsc, [mk/2, ref/2]). --export([transform_bridges_v1_to_connectors_and_bridges_v2/1]). +-export([ + transform_bridges_v1_to_connectors_and_bridges_v2/1, + transform_bridge_v1_config_to_action_config/4 +]). -export([roots/0, fields/1, desc/1, namespace/0, tags/0]). @@ -96,53 +99,103 @@ bridge_configs_to_transform( end. split_bridge_to_connector_and_action( - {ConnectorsMap, {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}} + {ConnectorsMap, {BridgeType, BridgeName, BridgeV1Conf, ConnectorFields, PreviousRawConfig}} ) -> - %% Get connector fields from bridge config - ConnectorMap = lists:foldl( - fun({ConnectorFieldName, _Spec}, ToTransformSoFar) -> - case maps:is_key(to_bin(ConnectorFieldName), BridgeConf) of - true -> - NewToTransform = maps:put( - to_bin(ConnectorFieldName), - maps:get(to_bin(ConnectorFieldName), BridgeConf), - ToTransformSoFar - ), - NewToTransform; - false -> - ToTransformSoFar - end + ConnectorMap = + case emqx_action_info:has_custom_bridge_v1_config_to_connector_config(BridgeType) of + true -> + emqx_action_info:bridge_v1_config_to_connector_config( + BridgeType, BridgeV1Conf + ); + false -> + %% We do an automatic transfomation to get the connector config + %% if the callback is not defined. + %% Get connector fields from bridge config + lists:foldl( + fun({ConnectorFieldName, _Spec}, ToTransformSoFar) -> + case maps:is_key(to_bin(ConnectorFieldName), BridgeV1Conf) of + true -> + NewToTransform = maps:put( + to_bin(ConnectorFieldName), + maps:get(to_bin(ConnectorFieldName), BridgeV1Conf), + ToTransformSoFar + ), + NewToTransform; + false -> + ToTransformSoFar + end + end, + #{}, + ConnectorFields + ) end, - #{}, - ConnectorFields - ), - %% Remove connector fields from bridge config to create Action - ActionMap0 = lists:foldl( - fun - ({enable, _Spec}, ToTransformSoFar) -> - %% Enable filed is used in both - ToTransformSoFar; - ({ConnectorFieldName, _Spec}, ToTransformSoFar) -> - case maps:is_key(to_bin(ConnectorFieldName), BridgeConf) of - true -> - maps:remove(to_bin(ConnectorFieldName), ToTransformSoFar); - false -> - ToTransformSoFar - end - end, - BridgeConf, - ConnectorFields - ), %% Generate a connector name, if needed. Avoid doing so if there was a previous config. ConnectorName = case PreviousRawConfig of #{<<"connector">> := ConnectorName0} -> ConnectorName0; _ -> generate_connector_name(ConnectorsMap, BridgeName, 0) end, - %% Add connector field to action map - ActionMap = maps:put(<<"connector">>, ConnectorName, ActionMap0), + ActionMap = + case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of + true -> + emqx_action_info:bridge_v1_config_to_action_config( + BridgeType, BridgeV1Conf, ConnectorName + ); + false -> + transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, ConnectorFields + ) + end, {BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}. +transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName +) -> + ConnectorFields = ConnectorConfSchemaMod:fields(ConnectorConfSchemaName), + transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, ConnectorFields + ). + +transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, ConnectorFields +) -> + TopKeys = [ + <<"enable">>, + <<"connector">>, + <<"local_topic">>, + <<"resource_opts">>, + <<"description">>, + <<"parameters">> + ], + TopKeysMap = maps:from_keys(TopKeys, true), + %% Remove connector fields + ActionMap0 = lists:foldl( + fun + ({enable, _Spec}, ToTransformSoFar) -> + %% Enable filed is used in both + ToTransformSoFar; + ({ConnectorFieldName, _Spec}, ToTransformSoFar) -> + ConnectorFieldNameBin = to_bin(ConnectorFieldName), + case + maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) andalso + (not maps:is_key(ConnectorFieldNameBin, TopKeysMap)) + of + true -> + maps:remove(ConnectorFieldNameBin, ToTransformSoFar); + false -> + ToTransformSoFar + end + end, + BridgeV1Conf, + ConnectorFields + ), + %% Add the connector field + ActionMap1 = maps:put(<<"connector">>, ConnectorName, ActionMap0), + TopMap = maps:with(TopKeys, ActionMap1), + RestMap = maps:without(TopKeys, ActionMap1), + %% Other parameters should be stuffed into `parameters' + emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap}). + generate_connector_name(ConnectorsMap, BridgeName, Attempt) -> ConnectorNameList = case Attempt of From cd72dc11dda97e49576b4c8b35f9f28465691255 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 21 Nov 2023 13:19:58 +0100 Subject: [PATCH 11/19] fix: missing emqx_action_info module mapping --- apps/emqx_bridge/src/emqx_action_info.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 1cdf61dfd..b236558e1 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -224,7 +224,8 @@ initial_info_map() -> bridge_v1_type_to_action_type => #{}, action_type_to_bridge_v1_type => #{}, action_type_to_connector_type => #{}, - action_type_to_schema_module => #{} + action_type_to_schema_module => #{}, + action_type_to_info_module => #{} }. get_info_map(Module) -> @@ -258,5 +259,10 @@ get_info_map(Module) -> }, action_type_to_schema_module => #{ ActionType => Module:schema_module() + }, + action_type_to_info_module => #{ + ActionType => Module, + %% Alias the bridge V1 type to the action type + BridgeV1Type => Module } }. From fa7151f255d928d78b05058a87b9625198caa331 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 21 Nov 2023 15:18:22 +0100 Subject: [PATCH 12/19] fix: port emqx_utils_maps:rename function from master The emqx_utils_maps:rename function is needed by action upgrade/downgrade hoos. --- apps/emqx_utils/src/emqx_utils_maps.erl | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/apps/emqx_utils/src/emqx_utils_maps.erl b/apps/emqx_utils/src/emqx_utils_maps.erl index 3945b7201..a3b6961f0 100644 --- a/apps/emqx_utils/src/emqx_utils_maps.erl +++ b/apps/emqx_utils/src/emqx_utils_maps.erl @@ -34,7 +34,8 @@ best_effort_recursive_sum/3, if_only_to_toggle_enable/2, update_if_present/3, - put_if/4 + put_if/4, + rename/3 ]). -export_type([config_key/0, config_key_path/0]). @@ -309,3 +310,11 @@ put_if(Acc, K, V, true) -> Acc#{K => V}; put_if(Acc, _K, _V, false) -> Acc. + +rename(OldKey, NewKey, Map) -> + case maps:find(OldKey, Map) of + {ok, Value} -> + maps:put(NewKey, Value, maps:remove(OldKey, Map)); + error -> + Map + end. From b3dffa4390eb1cc25a6d1db41fc81889252128b5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 17 Nov 2023 10:26:20 -0300 Subject: [PATCH 13/19] fix(kafka): don't return `parameters` from `/bridges` API Fixes https://emqx.atlassian.net/browse/EMQX-11412 --- .../test/emqx_bridge_v2_testlib.erl | 19 +++++++++++++ .../src/emqx_bridge_azure_event_hub.erl | 2 +- .../emqx_bridge_azure_event_hub_v2_SUITE.erl | 27 +++++++++++++++++++ .../src/emqx_bridge_kafka.erl | 21 ++++++++------- .../src/emqx_bridge_kafka_action_info.erl | 5 ++-- .../test/emqx_bridge_kafka_tests.erl | 10 +++---- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 27 +++++++++++++++++++ 7 files changed, 93 insertions(+), 18 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 5a2b6b000..6c48f5663 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -312,6 +312,25 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) -> Error end. +api_spec_schemas(Root) -> + Method = get, + Path = emqx_mgmt_api_test_util:api_path(["schemas", Root]), + Params = [], + AuthHeader = [], + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, 200, _}, _, Res0}} -> + #{<<"components">> := #{<<"schemas">> := Schemas}} = + emqx_utils_json:decode(Res0, [return_maps]), + Schemas + end. + +bridges_api_spec_schemas() -> + api_spec_schemas("bridges"). + +actions_api_spec_schemas() -> + api_spec_schemas("actions"). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index eb364bdff..553d77326 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -126,7 +126,7 @@ fields(action) -> fields(actions) -> Fields = override( - emqx_bridge_kafka:producer_opts(), + emqx_bridge_kafka:producer_opts(action), bridge_v2_overrides() ) ++ [ diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index 206cc08e0..4d441ea0b 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -272,6 +272,22 @@ make_message() -> timestamp => Time }. +bridge_api_spec_props_for_get() -> + #{ + <<"bridge_azure_event_hub.get_producer">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:bridges_api_spec_schemas(), + Props. + +action_api_spec_props_for_get() -> + #{ + <<"bridge_azure_event_hub.get_bridge_v2">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:actions_api_spec_schemas(), + Props. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -341,3 +357,14 @@ t_same_name_azure_kafka_bridges(Config) -> end ), ok. + +t_parameters_key_api_spec(_Config) -> + BridgeProps = bridge_api_spec_props_for_get(), + ?assert(is_map_key(<<"kafka">>, BridgeProps), #{bridge_props => BridgeProps}), + ?assertNot(is_map_key(<<"parameters">>, BridgeProps), #{bridge_props => BridgeProps}), + + ActionProps = action_api_spec_props_for_get(), + ?assertNot(is_map_key(<<"kafka">>, ActionProps), #{action_props => ActionProps}), + ?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}), + + ok. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index b3934c7bb..f7205e6ae 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -29,7 +29,7 @@ desc/1, host_opts/0, ssl_client_opts_fields/0, - producer_opts/0 + producer_opts/1 ]). -export([ @@ -261,7 +261,7 @@ fields("config_producer") -> fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> - connector_config_fields() ++ producer_opts(); + connector_config_fields() ++ producer_opts(v1); fields(kafka_producer_action) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, @@ -270,7 +270,7 @@ fields(kafka_producer_action) -> desc => ?DESC(emqx_connector_schema, "connector_field"), required => true })}, {description, emqx_schema:description_schema()} - ] ++ producer_opts(); + ] ++ producer_opts(action); fields(kafka_consumer) -> connector_config_fields() ++ fields(consumer_opts); fields(ssl_client_opts) -> @@ -601,25 +601,28 @@ connector_config_fields() -> {ssl, mk(ref(ssl_client_opts), #{})} ]. -producer_opts() -> +producer_opts(ActionOrBridgeV1) -> [ %% Note: there's an implicit convention in `emqx_bridge' that, %% for egress bridges with this config, the published messages %% will be forwarded to such bridges. {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}, - parameters_field(), + parameters_field(ActionOrBridgeV1), {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})} ]. %% Since e5.3.1, we want to rename the field 'kafka' to 'parameters' %% Hoever we need to keep it backward compatible for generated schema json (version 0.1.0) %% since schema is data for the 'schemas' API. -parameters_field() -> +parameters_field(ActionOrBridgeV1) -> + OverriddenV1 = <<"0.1.0">> =:= get(emqx_bridge_schema_version), {Name, Alias} = - case get(emqx_bridge_schema_version) of - <<"0.1.0">> -> + case {OverriddenV1, ActionOrBridgeV1} of + {true, _} -> {kafka, parameters}; - _ -> + {_, v1} -> + {kafka, parameters}; + {_, action} -> {parameters, kafka} end, {Name, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl index 7b6a946d0..31efc7c11 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl @@ -32,9 +32,8 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config( BridgeV1Conf, ConnectorName, schema_module(), kafka_producer ), - KafkaMap = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config0, #{}), - Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config0), - Config2 = emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaMap}), + KafkaMap = maps:get(<<"kafka">>, BridgeV1Conf, #{}), + Config2 = emqx_utils_maps:deep_merge(Config0, #{<<"parameters">> => KafkaMap}), maps:with(producer_action_field_keys(), Config2). %%------------------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 1d9682b9b..69794f2b9 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -25,7 +25,7 @@ kafka_producer_test() -> <<"kafka_producer">> := #{ <<"myproducer">> := - #{<<"parameters">> := #{}} + #{<<"kafka">> := #{}} } } }, @@ -52,7 +52,7 @@ kafka_producer_test() -> #{ <<"myproducer">> := #{ - <<"parameters">> := #{}, + <<"kafka">> := #{}, <<"local_topic">> := <<"mqtt/local">> } } @@ -68,7 +68,7 @@ kafka_producer_test() -> #{ <<"myproducer">> := #{ - <<"parameters">> := #{}, + <<"kafka">> := #{}, <<"local_topic">> := <<"mqtt/local">> } } @@ -166,7 +166,7 @@ message_key_dispatch_validations_test() -> ?assertThrow( {_, [ #{ - path := "bridges.kafka_producer.myproducer.parameters", + path := "bridges.kafka_producer.myproducer.kafka", reason := "Message key cannot be empty when `key_dispatch` strategy is used" } ]}, @@ -175,7 +175,7 @@ message_key_dispatch_validations_test() -> ?assertThrow( {_, [ #{ - path := "bridges.kafka_producer.myproducer.parameters", + path := "bridges.kafka_producer.myproducer.kafka", reason := "Message key cannot be empty when `key_dispatch` strategy is used" } ]}, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 6c48146cd..8ce3b7f6b 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -182,6 +182,22 @@ create_action(Name, Config) -> on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end), Res. +bridge_api_spec_props_for_get() -> + #{ + <<"bridge_kafka.get_producer">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:bridges_api_spec_schemas(), + Props. + +action_api_spec_props_for_get() -> + #{ + <<"bridge_kafka.get_bridge_v2">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:actions_api_spec_schemas(), + Props. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -342,3 +358,14 @@ t_bad_url(_Config) -> ), ?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)), ok. + +t_parameters_key_api_spec(_Config) -> + BridgeProps = bridge_api_spec_props_for_get(), + ?assert(is_map_key(<<"kafka">>, BridgeProps), #{bridge_props => BridgeProps}), + ?assertNot(is_map_key(<<"parameters">>, BridgeProps), #{bridge_props => BridgeProps}), + + ActionProps = action_api_spec_props_for_get(), + ?assertNot(is_map_key(<<"kafka">>, ActionProps), #{action_props => ActionProps}), + ?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}), + + ok. From 11ec1a30a0a2d33617b008e46d51af2a11afbcb2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 21 Nov 2023 16:00:19 -0300 Subject: [PATCH 14/19] test(flaky): fix flaky pulsar test --- apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl index 29299dcc9..5492bb2a8 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl @@ -10,8 +10,11 @@ %% Test cases %%=========================================================================== +atoms() -> + [my_producer]. + pulsar_producer_validations_test() -> - Name = list_to_atom("my_producer"), + Name = hd(atoms()), Conf0 = pulsar_producer_hocon(), Conf1 = Conf0 ++ From 3a8c33280548f63e016f5a88d22bb165ef178be5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 21 Nov 2023 16:27:54 -0300 Subject: [PATCH 15/19] fix(actions_api): don't crash on validation errors Fixes https://emqx.atlassian.net/browse/EMQX-11394 --- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 2 ++ .../test/emqx_bridge_v2_api_SUITE.erl | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index d5fd09631..cb1f7cc62 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -791,6 +791,8 @@ do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) -> PreOrPostConfigUpdate =:= pre_config_update; PreOrPostConfigUpdate =:= post_config_update -> + ?BAD_REQUEST(map_to_json(redact(Reason))); + {error, Reason} when is_map(Reason) -> ?BAD_REQUEST(map_to_json(redact(Reason))) end. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index b99a462b4..ed6ef9eb9 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -1021,6 +1021,28 @@ t_action_types(Config) -> ?assert(lists:all(fun is_binary/1, Types), #{types => Types}), ok. +t_bad_name(Config) -> + Name = <<"_bad_name">>, + Res = request_json( + post, + uri([?ROOT]), + ?KAFKA_BRIDGE(Name), + Config + ), + ?assertMatch({ok, 400, #{<<"message">> := _}}, Res), + {ok, 400, #{<<"message">> := Msg0}} = Res, + Msg = emqx_utils_json:decode(Msg0, [return_maps]), + ?assertMatch( + #{ + <<"got">> := [<<"_bad_name">>], + <<"kind">> := <<"validation_error">>, + <<"path">> := <<"actions.kafka_producer">>, + <<"reason">> := <<"invalid_map_key">> + }, + Msg + ), + ok. + %%% helpers listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], From 5ebd954b166ba7c2731d17d4944c06a6921b2e80 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 21 Nov 2023 21:17:57 +0100 Subject: [PATCH 16/19] chore: bump release version to e5.3.2-alpha.1 --- apps/emqx/include/emqx_release.hrl | 4 ++-- deploy/charts/emqx-enterprise/Chart.yaml | 4 ++-- deploy/charts/emqx/Chart.yaml | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 011d52595..2f9254d70 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,10 +32,10 @@ %% `apps/emqx/src/bpapi/README.md' %% Opensource edition --define(EMQX_RELEASE_CE, "5.3.1"). +-define(EMQX_RELEASE_CE, "5.3.2"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.3.1"). +-define(EMQX_RELEASE_EE, "5.3.2-alpha.1"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/deploy/charts/emqx-enterprise/Chart.yaml b/deploy/charts/emqx-enterprise/Chart.yaml index d9ad72611..aed38cd63 100644 --- a/deploy/charts/emqx-enterprise/Chart.yaml +++ b/deploy/charts/emqx-enterprise/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.3.1 +version: 5.3.2-alpha.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.3.1 +appVersion: 5.3.2-alpha.1 diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index 76bcd3aaa..9444fe14c 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.3.1 +version: 5.3.2 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.3.1 +appVersion: 5.3.2 From 39791511fcab8f7ebaedad9eb54433abbfad4ebc Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 21 Nov 2023 17:38:13 -0300 Subject: [PATCH 17/19] chore: remove obsolete workaround --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl | 9 +++------ apps/emqx_conf/src/emqx_conf.erl | 7 +------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index f7205e6ae..8c90e0896 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -615,14 +615,11 @@ producer_opts(ActionOrBridgeV1) -> %% Hoever we need to keep it backward compatible for generated schema json (version 0.1.0) %% since schema is data for the 'schemas' API. parameters_field(ActionOrBridgeV1) -> - OverriddenV1 = <<"0.1.0">> =:= get(emqx_bridge_schema_version), {Name, Alias} = - case {OverriddenV1, ActionOrBridgeV1} of - {true, _} -> + case ActionOrBridgeV1 of + v1 -> {kafka, parameters}; - {_, v1} -> - {kafka, parameters}; - {_, action} -> + action -> {parameters, kafka} end, {Name, diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index c986a65ee..7ff06b0ef 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -193,12 +193,7 @@ hotconf_schema_json() -> bridge_schema_json() -> Version = <<"0.1.0">>, SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => Version}, - put(emqx_bridge_schema_version, Version), - try - gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo) - after - erase(emqx_bridge_schema_version) - end. + gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo). %% TODO: remove it and also remove hocon_md.erl and friends. %% markdown generation from schema is a failure and we are moving to an interactive From 3261a12140ddbae7564e4f54ddc2cb6435f98563 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 21 Nov 2023 21:42:55 +0100 Subject: [PATCH 18/19] fix(emqx_resource): do not allow leading _ or - as resource name --- apps/emqx_bridge/test/emqx_bridge_SUITE.erl | 2 +- apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl | 2 +- .../emqx_bridge_v1_compatibility_layer_SUITE.erl | 2 +- apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl | 4 +--- apps/emqx_connector/test/emqx_connector_SUITE.erl | 2 +- apps/emqx_resource/src/emqx_resource.erl | 13 +++++++++---- 6 files changed, 14 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index b29ba154e..bc8be5476 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -199,7 +199,7 @@ t_create_with_bad_name(_Config) -> ?assertMatch( {error, {pre_config_update, emqx_bridge_app, #{ - reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>, + reason := <<"Invalid name format.", _/binary>>, kind := validation_error }}}, emqx:update_config(Path, Conf) diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 99a2bc8cd..ccc944572 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1365,7 +1365,7 @@ t_create_with_bad_name(Config) -> ?assertMatch( #{ <<"kind">> := <<"validation_error">>, - <<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>> + <<"reason">> := <<"Invalid name format.", _/binary>> }, Msg ), diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index c714b858a..aa564aa9c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -829,7 +829,7 @@ t_create_with_bad_name(_Config) -> <<"code">> := <<"BAD_REQUEST">>, <<"message">> := #{ <<"kind">> := <<"validation_error">>, - <<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>> + <<"reason">> := <<"Invalid name format.", _/binary>> } }}} = create_bridge_http_api_v1(Opts), ok. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index ed6ef9eb9..cf58eefde 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -1034,10 +1034,8 @@ t_bad_name(Config) -> Msg = emqx_utils_json:decode(Msg0, [return_maps]), ?assertMatch( #{ - <<"got">> := [<<"_bad_name">>], <<"kind">> := <<"validation_error">>, - <<"path">> := <<"actions.kafka_producer">>, - <<"reason">> := <<"invalid_map_key">> + <<"reason">> := <<"Invalid name format.", _/binary>> }, Msg ), diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index ee7e29741..669d05442 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -229,7 +229,7 @@ t_create_with_bad_name_direct_path(_Config) -> {error, {pre_config_update, _ConfigHandlerMod, #{ kind := validation_error, - reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>> + reason := <<"Invalid name format.", _/binary>> }}}, emqx:update_config(Path, ConnConfig) ), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 90df229e4..0bc1eb615 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -812,11 +812,11 @@ validate_name(Name) -> ok. validate_name(<<>>, _Opts) -> - invalid_data("name cannot be empty string"); + invalid_data("Name cannot be empty string"); validate_name(Name, _Opts) when size(Name) >= 255 -> - invalid_data("name length must be less than 255"); + invalid_data("Name length must be less than 255"); validate_name(Name, Opts) -> - case re:run(Name, <<"^[-0-9a-zA-Z_]+$">>, [{capture, none}]) of + case re:run(Name, <<"^[0-9a-zA-Z][-0-9a-zA-Z_]*$">>, [{capture, none}]) of match -> case maps:get(atom_name, Opts, true) of %% NOTE @@ -827,7 +827,12 @@ validate_name(Name, Opts) -> end; nomatch -> invalid_data( - <<"only 0-9a-zA-Z_- is allowed in resource name, got: ", Name/binary>> + << + "Invalid name format. The name must begin with a letter or number " + "(0-9, a-z, A-Z) and can only include underscores and hyphens as " + "non-initial characters. Got: ", + Name/binary + >> ) end. From 38d3a1d7d0254b3bc90650def178b22600438bbe Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 22 Nov 2023 09:25:28 -0300 Subject: [PATCH 19/19] feat(actions): allow multiple action info modules per application --- apps/emqx_bridge/src/emqx_action_info.erl | 6 +++--- .../src/emqx_bridge_azure_event_hub.app.src | 2 +- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index b236558e1..7c246a797 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -211,9 +211,9 @@ action_info_modules() -> lists:usort(lists:flatten(ActionInfoModules) ++ hard_coded_action_info_modules()). action_info_modules(App) -> - case application:get_env(App, emqx_action_info_module) of - {ok, Module} -> - [Module]; + case application:get_env(App, emqx_action_info_modules) of + {ok, Modules} -> + Modules; _ -> [] end. diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src index 40ea79334..f1c097d29 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src @@ -9,7 +9,7 @@ telemetry, wolff ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_azure_event_hub_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 00b9d8968..da8df2ddc 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -12,7 +12,7 @@ brod, brod_gssapi ]}, - {env, [{emqx_action_info_module, emqx_bridge_kafka_action_info}]}, + {env, [{emqx_action_info_modules, [emqx_bridge_kafka_action_info]}]}, {modules, []}, {links, []}