diff --git a/Makefile b/Makefile index 0112776bb..7c9638dd5 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 export EMQX_DASHBOARD_VERSION ?= v1.5.1 -export EMQX_EE_DASHBOARD_VERSION ?= e1.3.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2-beta.1 PROFILE ?= emqx REL_PROFILES := emqx emqx-enterprise diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 011d52595..2f9254d70 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,10 +32,10 @@ %% `apps/emqx/src/bpapi/README.md' %% Opensource edition --define(EMQX_RELEASE_CE, "5.3.1"). +-define(EMQX_RELEASE_CE, "5.3.2"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.3.1"). +-define(EMQX_RELEASE_EE, "5.3.2-alpha.1"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 81e01e1bd..306341700 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1216,8 +1216,10 @@ handle_info( {ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3}; Shutdown -> Shutdown end; -handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?SLOG(error, #{msg => "unexpected_sock_close", reason => Reason}), +handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> + %% This can happen as a race: + %% EMQX closes socket and marks 'disconnected' but 'tcp_closed' or 'ssl_closed' + %% is already in process mailbox {ok, Channel}; handle_info(clean_authz_cache, Channel) -> ok = emqx_authz_cache:empty_authz_cache(), diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index db36fbea9..11d42f9dd 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -552,13 +552,13 @@ handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) -> inc_counter(incoming_bytes, Len), ok = emqx_metrics:inc('bytes.received', Len), when_bytes_in(Len, Data, State); -handle_msg(check_cache, #state{limiter_buffer = Cache} = State) -> - case queue:peek(Cache) of +handle_msg(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) -> + case queue:peek(Buffer) of empty -> - activate_socket(State); + handle_info(activate_socket, State); {value, #pending_req{need = Needs, data = Data, next = Next}} -> - State2 = State#state{limiter_buffer = queue:drop(Cache)}, - check_limiter(Needs, Data, Next, [check_cache], State2) + State2 = State#state{limiter_buffer = queue:drop(Buffer)}, + check_limiter(Needs, Data, Next, [check_limiter_buffer], State2) end; handle_msg( {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, @@ -1036,13 +1036,13 @@ check_limiter( Data, WhenOk, _Msgs, - #state{limiter_buffer = Cache} = State + #state{limiter_buffer = Buffer} = State ) -> %% if there has a retry timer, - %% cache the operation and execute it after the retry is over - %% the maximum length of the cache queue is equal to the active_n + %% Buffer the operation and execute it after the retry is over + %% the maximum length of the buffer queue is equal to the active_n New = #pending_req{need = Needs, data = Data, next = WhenOk}, - {ok, State#state{limiter_buffer = queue:in(New, Cache)}}. + {ok, State#state{limiter_buffer = queue:in(New, Buffer)}}. %% try to perform a retry -spec retry_limiter(state()) -> _. @@ -1053,7 +1053,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> {ok, Limiter2} -> Next( Data, - [check_cache], + [check_limiter_buffer], State#state{ limiter = Limiter2, limiter_timer = undefined diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 37ce72d74..07329721a 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -94,7 +94,7 @@ limiter :: container(), %% cache operation when overload - limiter_cache :: queue:queue(cache()), + limiter_buffer :: queue:queue(cache()), %% limiter timers limiter_timer :: undefined | reference() @@ -326,7 +326,7 @@ websocket_init([Req, Opts]) -> zone = Zone, listener = {Type, Listener}, limiter_timer = undefined, - limiter_cache = queue:new() + limiter_buffer = queue:new() }, hibernate}; {denny, Reason} -> @@ -462,13 +462,13 @@ websocket_info( State ) -> return(retry_limiter(State)); -websocket_info(check_cache, #state{limiter_cache = Cache} = State) -> - case queue:peek(Cache) of +websocket_info(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) -> + case queue:peek(Buffer) of empty -> return(enqueue({active, true}, State#state{sockstate = running})); {value, #cache{need = Needs, data = Data, next = Next}} -> - State2 = State#state{limiter_cache = queue:drop(Cache)}, - return(check_limiter(Needs, Data, Next, [check_cache], State2)) + State2 = State#state{limiter_buffer = queue:drop(Buffer)}, + return(check_limiter(Needs, Data, Next, [check_limiter_buffer], State2)) end; websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> handle_timeout(TRef, Msg, State); @@ -630,10 +630,10 @@ check_limiter( Data, WhenOk, _Msgs, - #state{limiter_cache = Cache} = State + #state{limiter_buffer = Buffer} = State ) -> New = #cache{need = Needs, data = Data, next = WhenOk}, - State#state{limiter_cache = queue:in(New, Cache)}. + State#state{limiter_buffer = queue:in(New, Buffer)}. -spec retry_limiter(state()) -> state(). retry_limiter(#state{limiter = Limiter} = State) -> @@ -644,7 +644,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> {ok, Limiter2} -> Next( Data, - [check_cache], + [check_limiter_buffer], State#state{ limiter = Limiter2, limiter_timer = undefined diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index d5fd09631..cb1f7cc62 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -791,6 +791,8 @@ do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) -> PreOrPostConfigUpdate =:= pre_config_update; PreOrPostConfigUpdate =:= post_config_update -> + ?BAD_REQUEST(map_to_json(redact(Reason))); + {error, Reason} when is_map(Reason) -> ?BAD_REQUEST(map_to_json(redact(Reason))) end. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 7346ae6c7..25619d99a 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -39,6 +39,7 @@ ]). -export([types/0, types_sc/0]). +-export([resource_opts_fields/0, resource_opts_fields/1]). -export([ make_producer_action_schema/1, @@ -147,6 +148,31 @@ types() -> types_sc() -> hoconsc:enum(types()). +resource_opts_fields() -> + resource_opts_fields(_Overrides = []). + +resource_opts_fields(Overrides) -> + ActionROFields = [ + batch_size, + batch_time, + buffer_mode, + buffer_seg_bytes, + health_check_interval, + inflight_window, + max_buffer_bytes, + metrics_flush_interval, + query_mode, + request_ttl, + resume_interval, + start_after_created, + start_timeout, + worker_pool_size + ], + lists:filter( + fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end, + emqx_resource_schema:create_opts(Overrides) + ). + examples(Method) -> MergeFun = fun(Example, Examples) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index b29ba154e..bc8be5476 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -199,7 +199,7 @@ t_create_with_bad_name(_Config) -> ?assertMatch( {error, {pre_config_update, emqx_bridge_app, #{ - reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>, + reason := <<"Invalid name format.", _/binary>>, kind := validation_error }}}, emqx:update_config(Path, Conf) diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 99a2bc8cd..ccc944572 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1365,7 +1365,7 @@ t_create_with_bad_name(Config) -> ?assertMatch( #{ <<"kind">> := <<"validation_error">>, - <<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>> + <<"reason">> := <<"Invalid name format.", _/binary>> }, Msg ), diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index 26fd99256..b5c0ec9f2 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -821,7 +821,7 @@ t_create_with_bad_name(_Config) -> <<"code">> := <<"BAD_REQUEST">>, <<"message">> := #{ <<"kind">> := <<"validation_error">>, - <<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>> + <<"reason">> := <<"Invalid name format.", _/binary>> } }}} = create_bridge_http_api_v1(Opts), ok. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index b99a462b4..cf58eefde 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -1021,6 +1021,26 @@ t_action_types(Config) -> ?assert(lists:all(fun is_binary/1, Types), #{types => Types}), ok. +t_bad_name(Config) -> + Name = <<"_bad_name">>, + Res = request_json( + post, + uri([?ROOT]), + ?KAFKA_BRIDGE(Name), + Config + ), + ?assertMatch({ok, 400, #{<<"message">> := _}}, Res), + {ok, 400, #{<<"message">> := Msg0}} = Res, + Msg = emqx_utils_json:decode(Msg0, [return_maps]), + ?assertMatch( + #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"Invalid name format.", _/binary>> + }, + Msg + ), + ok. + %%% helpers listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 5a2b6b000..6c48f5663 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -312,6 +312,25 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) -> Error end. +api_spec_schemas(Root) -> + Method = get, + Path = emqx_mgmt_api_test_util:api_path(["schemas", Root]), + Params = [], + AuthHeader = [], + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, 200, _}, _, Res0}} -> + #{<<"components">> := #{<<"schemas">> := Schemas}} = + emqx_utils_json:decode(Res0, [return_maps]), + Schemas + end. + +bridges_api_spec_schemas() -> + api_spec_schemas("bridges"). + +actions_api_spec_schemas() -> + api_spec_schemas("actions"). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl new file mode 100644 index 000000000..4e28f3d88 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl @@ -0,0 +1,41 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_v2_tests). + +-include_lib("eunit/include/eunit.hrl"). + +resource_opts_union_connector_actions_test() -> + %% The purpose of this test is to ensure we have split `resource_opts' fields + %% consciouly between connector and actions, in particular when/if we introduce new + %% fields there. + AllROFields = non_deprecated_fields(emqx_resource_schema:create_opts([])), + ActionROFields = non_deprecated_fields(emqx_bridge_v2_schema:resource_opts_fields()), + ConnectorROFields = non_deprecated_fields(emqx_connector_schema:resource_opts_fields()), + UnionROFields = lists:usort(ConnectorROFields ++ ActionROFields), + ?assertEqual( + lists:usort(AllROFields), + UnionROFields, + #{ + missing_fields => AllROFields -- UnionROFields, + unexpected_fields => UnionROFields -- AllROFields, + action_fields => ActionROFields, + connector_fields => ConnectorROFields + } + ), + ok. + +non_deprecated_fields(Fields) -> + [K || {K, Schema} <- Fields, not hocon_schema:is_deprecated(Schema)]. diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index e196aac30..569725a34 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -126,7 +126,7 @@ fields(action) -> fields(actions) -> Fields = override( - emqx_bridge_kafka:producer_opts(), + emqx_bridge_kafka:producer_opts(action), bridge_v2_overrides() ) ++ [ diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index 206cc08e0..4d441ea0b 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -272,6 +272,22 @@ make_message() -> timestamp => Time }. +bridge_api_spec_props_for_get() -> + #{ + <<"bridge_azure_event_hub.get_producer">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:bridges_api_spec_schemas(), + Props. + +action_api_spec_props_for_get() -> + #{ + <<"bridge_azure_event_hub.get_bridge_v2">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:actions_api_spec_schemas(), + Props. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -341,3 +357,14 @@ t_same_name_azure_kafka_bridges(Config) -> end ), ok. + +t_parameters_key_api_spec(_Config) -> + BridgeProps = bridge_api_spec_props_for_get(), + ?assert(is_map_key(<<"kafka">>, BridgeProps), #{bridge_props => BridgeProps}), + ?assertNot(is_map_key(<<"parameters">>, BridgeProps), #{bridge_props => BridgeProps}), + + ActionProps = action_api_spec_props_for_get(), + ?assertNot(is_map_key(<<"kafka">>, ActionProps), #{action_props => ActionProps}), + ?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}), + + ok. diff --git a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl index 7714b0b2e..8742d7ccf 100644 --- a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl +++ b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl @@ -113,7 +113,7 @@ fields(action) -> fields(actions) -> Fields = override( - emqx_bridge_kafka:producer_opts(), + emqx_bridge_kafka:producer_opts(action), bridge_v2_overrides() ) ++ [ diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 9709bb174..5b3e3ca01 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -29,7 +29,7 @@ desc/1, host_opts/0, ssl_client_opts_fields/0, - producer_opts/0 + producer_opts/1 ]). -export([ @@ -261,7 +261,7 @@ fields("config_producer") -> fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> - connector_config_fields() ++ producer_opts(); + connector_config_fields() ++ producer_opts(v1); fields(kafka_producer_action) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, @@ -270,7 +270,7 @@ fields(kafka_producer_action) -> desc => ?DESC(emqx_connector_schema, "connector_field"), required => true })}, {description, emqx_schema:description_schema()} - ] ++ producer_opts(); + ] ++ producer_opts(action); fields(kafka_consumer) -> connector_config_fields() ++ fields(consumer_opts); fields(ssl_client_opts) -> @@ -523,7 +523,7 @@ fields(consumer_kafka_opts) -> ]; fields(resource_opts) -> SupportedFields = [health_check_interval], - CreationOpts = emqx_resource_schema:create_opts(_Overrides = []), + CreationOpts = emqx_bridge_v2_schema:resource_opts_fields(), lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts); fields(action_field) -> {kafka_producer, @@ -599,25 +599,25 @@ connector_config_fields() -> {ssl, mk(ref(ssl_client_opts), #{})} ]. -producer_opts() -> +producer_opts(ActionOrBridgeV1) -> [ %% Note: there's an implicit convention in `emqx_bridge' that, %% for egress bridges with this config, the published messages %% will be forwarded to such bridges. {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}, - parameters_field(), + parameters_field(ActionOrBridgeV1), {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})} ]. %% Since e5.3.1, we want to rename the field 'kafka' to 'parameters' %% However we need to keep it backward compatible for generated schema json (version 0.1.0) %% since schema is data for the 'schemas' API. -parameters_field() -> +parameters_field(ActionOrBridgeV1) -> {Name, Alias} = - case get(emqx_bridge_schema_version) of - <<"0.1.0">> -> + case ActionOrBridgeV1 of + v1 -> {kafka, parameters}; - _ -> + action -> {parameters, kafka} end, {Name, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl index 7b6a946d0..31efc7c11 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl @@ -32,9 +32,8 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config( BridgeV1Conf, ConnectorName, schema_module(), kafka_producer ), - KafkaMap = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config0, #{}), - Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config0), - Config2 = emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaMap}), + KafkaMap = maps:get(<<"kafka">>, BridgeV1Conf, #{}), + Config2 = emqx_utils_maps:deep_merge(Config0, #{<<"parameters">> => KafkaMap}), maps:with(producer_action_field_keys(), Config2). %%------------------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index ff4334a85..64871bf6d 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -25,7 +25,7 @@ kafka_producer_test() -> <<"kafka_producer">> := #{ <<"myproducer">> := - #{<<"parameters">> := #{}} + #{<<"kafka">> := #{}} } } }, @@ -52,7 +52,7 @@ kafka_producer_test() -> #{ <<"myproducer">> := #{ - <<"parameters">> := #{}, + <<"kafka">> := #{}, <<"local_topic">> := <<"mqtt/local">> } } @@ -68,7 +68,7 @@ kafka_producer_test() -> #{ <<"myproducer">> := #{ - <<"parameters">> := #{}, + <<"kafka">> := #{}, <<"local_topic">> := <<"mqtt/local">> } } @@ -166,7 +166,7 @@ message_key_dispatch_validations_test() -> ?assertThrow( {_, [ #{ - path := "bridges.kafka_producer.myproducer.parameters", + path := "bridges.kafka_producer.myproducer.kafka", reason := "Message key cannot be empty when `key_dispatch` strategy is used" } ]}, @@ -175,7 +175,7 @@ message_key_dispatch_validations_test() -> ?assertThrow( {_, [ #{ - path := "bridges.kafka_producer.myproducer.parameters", + path := "bridges.kafka_producer.myproducer.kafka", reason := "Message key cannot be empty when `key_dispatch` strategy is used" } ]}, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 6c48146cd..8ce3b7f6b 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -182,6 +182,22 @@ create_action(Name, Config) -> on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end), Res. +bridge_api_spec_props_for_get() -> + #{ + <<"bridge_kafka.get_producer">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:bridges_api_spec_schemas(), + Props. + +action_api_spec_props_for_get() -> + #{ + <<"bridge_kafka.get_bridge_v2">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:actions_api_spec_schemas(), + Props. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -342,3 +358,14 @@ t_bad_url(_Config) -> ), ?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)), ok. + +t_parameters_key_api_spec(_Config) -> + BridgeProps = bridge_api_spec_props_for_get(), + ?assert(is_map_key(<<"kafka">>, BridgeProps), #{bridge_props => BridgeProps}), + ?assertNot(is_map_key(<<"parameters">>, BridgeProps), #{bridge_props => BridgeProps}), + + ActionProps = action_api_spec_props_for_get(), + ?assertNot(is_map_key(<<"kafka">>, ActionProps), #{action_props => ActionProps}), + ?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}), + + ok. diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl index 29299dcc9..5492bb2a8 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl @@ -10,8 +10,11 @@ %% Test cases %%=========================================================================== +atoms() -> + [my_producer]. + pulsar_producer_validations_test() -> - Name = list_to_atom("my_producer"), + Name = hd(atoms()), Conf0 = pulsar_producer_hocon(), Conf1 = Conf0 ++ diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index c986a65ee..7ff06b0ef 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -193,12 +193,7 @@ hotconf_schema_json() -> bridge_schema_json() -> Version = <<"0.1.0">>, SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => Version}, - put(emqx_bridge_schema_version, Version), - try - gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo) - after - erase(emqx_bridge_schema_version) - end. + gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo). %% TODO: remove it and also remove hocon_md.erl and friends. %% markdown generation from schema is a failure and we are moving to an interactive diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index b2267539b..f6e0c0f95 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -372,7 +372,7 @@ schema("/connectors_probe") -> case emqx_connector:remove(ConnectorType, ConnectorName) of ok -> ?NO_CONTENT; - {error, {active_channels, Channels}} -> + {error, {post_config_update, _HandlerMod, {active_channels, Channels}}} -> ?BAD_REQUEST( {<<"Cannot delete connector while there are active channels defined for this connector">>, Channels} diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 6382d2bcd..2330e5491 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -35,6 +35,8 @@ -export([connector_type_to_bridge_types/1]). -export([common_fields/0]). +-export([resource_opts_fields/0, resource_opts_fields/1]). + -if(?EMQX_RELEASE_EDITION == ee). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use @@ -364,6 +366,24 @@ common_fields() -> {description, emqx_schema:description_schema()} ]. +resource_opts_fields() -> + resource_opts_fields(_Overrides = []). + +resource_opts_fields(Overrides) -> + %% Note: these don't include buffer-related configurations because buffer workers are + %% tied to the action. + ConnectorROFields = [ + health_check_interval, + query_mode, + request_ttl, + start_after_created, + start_timeout + ], + lists:filter( + fun({Key, _Sc}) -> lists:member(Key, ConnectorROFields) end, + emqx_resource_schema:create_opts(Overrides) + ). + %%====================================================================================== %% Helper Functions %%====================================================================================== diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index ee7e29741..669d05442 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -229,7 +229,7 @@ t_create_with_bad_name_direct_path(_Config) -> {error, {pre_config_update, _ConfigHandlerMod, #{ kind := validation_error, - reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>> + reason := <<"Invalid name format.", _/binary>> }}}, emqx:update_config(Path, ConnConfig) ), diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index f6609808f..bd8aa9ddf 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -25,7 +25,7 @@ -include_lib("snabbkaffe/include/test_macros.hrl"). -define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))). --define(CONNECTOR(NAME, TYPE), #{ +-define(RESOURCE(NAME, TYPE), #{ %<<"ssl">> => #{<<"enable">> => false}, <<"type">> => TYPE, <<"name">> => NAME @@ -52,12 +52,57 @@ -define(KAFKA_CONNECTOR_BASE, ?KAFKA_CONNECTOR_BASE(?KAFKA_BOOTSTRAP_HOST)). -define(KAFKA_CONNECTOR(Name, BootstrapHosts), maps:merge( - ?CONNECTOR(Name, ?CONNECTOR_TYPE), + ?RESOURCE(Name, ?CONNECTOR_TYPE), ?KAFKA_CONNECTOR_BASE(BootstrapHosts) ) ). -define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)). +-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))). +-define(BRIDGE_TYPE_STR, "kafka_producer"). +-define(BRIDGE_TYPE, <>). +-define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?BRIDGE_TYPE)#{ + <<"enable">> => true, + <<"connector">> => Connector, + <<"kafka">> => #{ + <<"buffer">> => #{ + <<"memory_overload_protection">> => true, + <<"mode">> => <<"hybrid">>, + <<"per_partition_limit">> => <<"2GB">>, + <<"segment_bytes">> => <<"100MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"kafka_ext_headers">> => [ + #{ + <<"kafka_ext_header_key">> => <<"clientid">>, + <<"kafka_ext_header_value">> => <<"${clientid}">> + }, + #{ + <<"kafka_ext_header_key">> => <<"topic">>, + <<"kafka_ext_header_value">> => <<"${topic}">> + } + ], + <<"kafka_header_value_encode_mode">> => <<"none">>, + <<"kafka_headers">> => <<"${pub_props}">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_inflight">> => 10, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"timestamp">> => <<"${.timestamp}">>, + <<"value">> => <<"${.}">> + }, + <<"partition_count_refresh_interval">> => <<"60s">>, + <<"partition_strategy">> => <<"random">>, + <<"required_acks">> => <<"all_isr">>, + <<"topic">> => <<"kafka-topic">> + }, + <<"local_topic">> => <<"mqtt/local/topic">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"32s">> + } +}). +-define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)). + %% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>). %% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{ %% <<"server">> => SERVER, @@ -105,7 +150,8 @@ emqx, emqx_auth, emqx_management, - {emqx_connector, "connectors {}"} + {emqx_connector, "connectors {}"}, + {emqx_bridge, "actions {}"} ]). -define(APPSPEC_DASHBOARD, @@ -128,7 +174,8 @@ all() -> groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), SingleOnlyTests = [ - t_connectors_probe + t_connectors_probe, + t_fail_delete_with_action ], ClusterLaterJoinOnlyTCs = [ % t_cluster_later_join_metrics @@ -187,29 +234,38 @@ end_per_group(_, Config) -> emqx_cth_suite:stop(?config(group_apps, Config)), ok. -init_per_testcase(_TestCase, Config) -> +init_per_testcase(TestCase, Config) -> case ?config(cluster_nodes, Config) of undefined -> - init_mocks(); + init_mocks(TestCase); Nodes -> - [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] + [erpc:call(Node, ?MODULE, init_mocks, [TestCase]) || Node <- Nodes] end, Config. -end_per_testcase(_TestCase, Config) -> +end_per_testcase(TestCase, Config) -> + Node = ?config(node, Config), + ok = erpc:call(Node, ?MODULE, clear_resources, [TestCase]), case ?config(cluster_nodes, Config) of undefined -> meck:unload(); Nodes -> - [erpc:call(Node, meck, unload, []) || Node <- Nodes] + [erpc:call(N, meck, unload, []) || N <- Nodes] end, - Node = ?config(node, Config), ok = emqx_common_test_helpers:call_janitor(), - ok = erpc:call(Node, fun clear_resources/0), ok. -define(CONNECTOR_IMPL, dummy_connector_impl). -init_mocks() -> +init_mocks(t_fail_delete_with_action) -> + init_mocks(common), + meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}), + meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}), + meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected), + ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId) + end), + ok; +init_mocks(_TestCase) -> meck:new(emqx_connector_ee_schema, [passthrough, no_link]), meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), @@ -235,7 +291,15 @@ init_mocks() -> ), [?CONNECTOR_IMPL, emqx_connector_ee_schema]. -clear_resources() -> +clear_resources(t_fail_delete_with_action) -> + lists:foreach( + fun(#{type := Type, name := Name}) -> + ok = emqx_bridge_v2:remove(Type, Name) + end, + emqx_bridge_v2:list() + ), + clear_resources(common); +clear_resources(_) -> lists:foreach( fun(#{type := Type, name := Name}) -> ok = emqx_connector:remove(Type, Name) @@ -646,7 +710,7 @@ t_connectors_probe(Config) -> request_json( post, uri(["connectors_probe"]), - ?CONNECTOR(<<"broken_connector">>, <<"unknown_type">>), + ?RESOURCE(<<"broken_connector">>, <<"unknown_type">>), Config ) ), @@ -674,6 +738,57 @@ t_create_with_bad_name(Config) -> ?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg), ok. +t_fail_delete_with_action(Config) -> + Name = ?CONNECTOR_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?CONNECTOR_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _] + }}, + request_json( + post, + uri(["connectors"]), + ?KAFKA_CONNECTOR(Name), + Config + ) + ), + ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), + BridgeName = ?BRIDGE_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"connector">> := Name, + <<"kafka">> := #{}, + <<"local_topic">> := _, + <<"resource_opts">> := _ + }}, + request_json( + post, + uri(["actions"]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ) + ), + + %% delete the connector + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := + <<"{<<\"Cannot delete connector while there are active channels", + " defined for this connector\">>,", _/binary>> + }}, + request_json(delete, uri(["connectors", ConnectorID]), Config) + ), + ok. + %%% helpers listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 453fa9fd2..10d081e57 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -963,13 +963,12 @@ handle_info( NChannel = ensure_disconnected(Reason, Channel), shutdown(Reason, NChannel); handle_info( - {sock_closed, Reason}, + {sock_closed, _Reason}, Channel = #channel{conn_state = disconnected} ) -> - ?SLOG(error, #{ - msg => "unexpected_sock_closed", - reason => Reason - }), + %% This can happen as a race: + %% EMQX closes socket and marks 'disconnected' but 'tcp_closed' or 'ssl_closed' + %% is already in process mailbox {ok, Channel}; handle_info(clean_authz_cache, Channel) -> ok = emqx_authz_cache:empty_authz_cache(), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 90df229e4..0bc1eb615 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -812,11 +812,11 @@ validate_name(Name) -> ok. validate_name(<<>>, _Opts) -> - invalid_data("name cannot be empty string"); + invalid_data("Name cannot be empty string"); validate_name(Name, _Opts) when size(Name) >= 255 -> - invalid_data("name length must be less than 255"); + invalid_data("Name length must be less than 255"); validate_name(Name, Opts) -> - case re:run(Name, <<"^[-0-9a-zA-Z_]+$">>, [{capture, none}]) of + case re:run(Name, <<"^[0-9a-zA-Z][-0-9a-zA-Z_]*$">>, [{capture, none}]) of match -> case maps:get(atom_name, Opts, true) of %% NOTE @@ -827,7 +827,12 @@ validate_name(Name, Opts) -> end; nomatch -> invalid_data( - <<"only 0-9a-zA-Z_- is allowed in resource name, got: ", Name/binary>> + << + "Invalid name format. The name must begin with a letter or number " + "(0-9, a-z, A-Z) and can only include underscores and hyphens as " + "non-initial characters. Got: ", + Name/binary + >> ) end. diff --git a/changes/ce/fix-11975.en.md b/changes/ce/fix-11975.en.md new file mode 100644 index 000000000..cba1c3a17 --- /dev/null +++ b/changes/ce/fix-11975.en.md @@ -0,0 +1,5 @@ +Resolve redundant error logging on socket closure + +Addressed a race condition causing duplicate error logs when a socket is closed by both a peer and the server. +Dual socket close events from the OS and EMQX previously led to excessive error logging. +The fix improves event handling to avoid redundant error-level logging. diff --git a/changes/ce/fix-11987.en.md b/changes/ce/fix-11987.en.md new file mode 100644 index 000000000..4d85cff41 --- /dev/null +++ b/changes/ce/fix-11987.en.md @@ -0,0 +1,3 @@ +Fix connection crash when trying to set TCP/SSL socket `active_n` option. + +Prior to this fix, if a socket is already closed when connection process tries to set `active_n` option, it causes a `case_clause` crash. diff --git a/deploy/charts/emqx-enterprise/Chart.yaml b/deploy/charts/emqx-enterprise/Chart.yaml index d9ad72611..aed38cd63 100644 --- a/deploy/charts/emqx-enterprise/Chart.yaml +++ b/deploy/charts/emqx-enterprise/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.3.1 +version: 5.3.2-alpha.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.3.1 +appVersion: 5.3.2-alpha.1 diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index 76bcd3aaa..9444fe14c 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.3.1 +version: 5.3.2 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.3.1 +appVersion: 5.3.2