Merge remote-tracking branch 'origin/release-53' into sync-r53-m-20231122

This commit is contained in:
Thales Macedo Garitezi 2023-11-22 11:50:40 -03:00
commit 6c9417efe0
32 changed files with 391 additions and 83 deletions

View File

@ -21,7 +21,7 @@ endif
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.5.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.3.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2-beta.1
PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise

View File

@ -32,10 +32,10 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.3.1").
-define(EMQX_RELEASE_CE, "5.3.2").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.3.1").
-define(EMQX_RELEASE_EE, "5.3.2-alpha.1").
%% The HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -1216,8 +1216,10 @@ handle_info(
{ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3};
Shutdown -> Shutdown
end;
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
?SLOG(error, #{msg => "unexpected_sock_close", reason => Reason}),
handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) ->
%% This can happen as a race:
%% EMQX closes socket and marks 'disconnected' but 'tcp_closed' or 'ssl_closed'
%% is already in process mailbox
{ok, Channel};
handle_info(clean_authz_cache, Channel) ->
ok = emqx_authz_cache:empty_authz_cache(),

View File

@ -552,13 +552,13 @@ handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) ->
inc_counter(incoming_bytes, Len),
ok = emqx_metrics:inc('bytes.received', Len),
when_bytes_in(Len, Data, State);
handle_msg(check_cache, #state{limiter_buffer = Cache} = State) ->
case queue:peek(Cache) of
handle_msg(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) ->
case queue:peek(Buffer) of
empty ->
activate_socket(State);
handle_info(activate_socket, State);
{value, #pending_req{need = Needs, data = Data, next = Next}} ->
State2 = State#state{limiter_buffer = queue:drop(Cache)},
check_limiter(Needs, Data, Next, [check_cache], State2)
State2 = State#state{limiter_buffer = queue:drop(Buffer)},
check_limiter(Needs, Data, Next, [check_limiter_buffer], State2)
end;
handle_msg(
{incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
@ -1036,13 +1036,13 @@ check_limiter(
Data,
WhenOk,
_Msgs,
#state{limiter_buffer = Cache} = State
#state{limiter_buffer = Buffer} = State
) ->
%% if there has a retry timer,
%% cache the operation and execute it after the retry is over
%% the maximum length of the cache queue is equal to the active_n
%% Buffer the operation and execute it after the retry is over
%% the maximum length of the buffer queue is equal to the active_n
New = #pending_req{need = Needs, data = Data, next = WhenOk},
{ok, State#state{limiter_buffer = queue:in(New, Cache)}}.
{ok, State#state{limiter_buffer = queue:in(New, Buffer)}}.
%% try to perform a retry
-spec retry_limiter(state()) -> _.
@ -1053,7 +1053,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
{ok, Limiter2} ->
Next(
Data,
[check_cache],
[check_limiter_buffer],
State#state{
limiter = Limiter2,
limiter_timer = undefined

View File

@ -94,7 +94,7 @@
limiter :: container(),
%% cache operation when overload
limiter_cache :: queue:queue(cache()),
limiter_buffer :: queue:queue(cache()),
%% limiter timers
limiter_timer :: undefined | reference()
@ -326,7 +326,7 @@ websocket_init([Req, Opts]) ->
zone = Zone,
listener = {Type, Listener},
limiter_timer = undefined,
limiter_cache = queue:new()
limiter_buffer = queue:new()
},
hibernate};
{denny, Reason} ->
@ -462,13 +462,13 @@ websocket_info(
State
) ->
return(retry_limiter(State));
websocket_info(check_cache, #state{limiter_cache = Cache} = State) ->
case queue:peek(Cache) of
websocket_info(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) ->
case queue:peek(Buffer) of
empty ->
return(enqueue({active, true}, State#state{sockstate = running}));
{value, #cache{need = Needs, data = Data, next = Next}} ->
State2 = State#state{limiter_cache = queue:drop(Cache)},
return(check_limiter(Needs, Data, Next, [check_cache], State2))
State2 = State#state{limiter_buffer = queue:drop(Buffer)},
return(check_limiter(Needs, Data, Next, [check_limiter_buffer], State2))
end;
websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
handle_timeout(TRef, Msg, State);
@ -630,10 +630,10 @@ check_limiter(
Data,
WhenOk,
_Msgs,
#state{limiter_cache = Cache} = State
#state{limiter_buffer = Buffer} = State
) ->
New = #cache{need = Needs, data = Data, next = WhenOk},
State#state{limiter_cache = queue:in(New, Cache)}.
State#state{limiter_buffer = queue:in(New, Buffer)}.
-spec retry_limiter(state()) -> state().
retry_limiter(#state{limiter = Limiter} = State) ->
@ -644,7 +644,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
{ok, Limiter2} ->
Next(
Data,
[check_cache],
[check_limiter_buffer],
State#state{
limiter = Limiter2,
limiter_timer = undefined

View File

@ -791,6 +791,8 @@ do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
PreOrPostConfigUpdate =:= pre_config_update;
PreOrPostConfigUpdate =:= post_config_update
->
?BAD_REQUEST(map_to_json(redact(Reason)));
{error, Reason} when is_map(Reason) ->
?BAD_REQUEST(map_to_json(redact(Reason)))
end.

View File

@ -39,6 +39,7 @@
]).
-export([types/0, types_sc/0]).
-export([resource_opts_fields/0, resource_opts_fields/1]).
-export([
make_producer_action_schema/1,
@ -147,6 +148,31 @@ types() ->
types_sc() ->
hoconsc:enum(types()).
resource_opts_fields() ->
resource_opts_fields(_Overrides = []).
resource_opts_fields(Overrides) ->
ActionROFields = [
batch_size,
batch_time,
buffer_mode,
buffer_seg_bytes,
health_check_interval,
inflight_window,
max_buffer_bytes,
metrics_flush_interval,
query_mode,
request_ttl,
resume_interval,
start_after_created,
start_timeout,
worker_pool_size
],
lists:filter(
fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end,
emqx_resource_schema:create_opts(Overrides)
).
examples(Method) ->
MergeFun =
fun(Example, Examples) ->

View File

@ -199,7 +199,7 @@ t_create_with_bad_name(_Config) ->
?assertMatch(
{error,
{pre_config_update, emqx_bridge_app, #{
reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>,
reason := <<"Invalid name format.", _/binary>>,
kind := validation_error
}}},
emqx:update_config(Path, Conf)

View File

@ -1365,7 +1365,7 @@ t_create_with_bad_name(Config) ->
?assertMatch(
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
<<"reason">> := <<"Invalid name format.", _/binary>>
},
Msg
),

View File

@ -821,7 +821,7 @@ t_create_with_bad_name(_Config) ->
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> := #{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
<<"reason">> := <<"Invalid name format.", _/binary>>
}
}}} = create_bridge_http_api_v1(Opts),
ok.

View File

@ -1021,6 +1021,26 @@ t_action_types(Config) ->
?assert(lists:all(fun is_binary/1, Types), #{types => Types}),
ok.
t_bad_name(Config) ->
Name = <<"_bad_name">>,
Res = request_json(
post,
uri([?ROOT]),
?KAFKA_BRIDGE(Name),
Config
),
?assertMatch({ok, 400, #{<<"message">> := _}}, Res),
{ok, 400, #{<<"message">> := Msg0}} = Res,
Msg = emqx_utils_json:decode(Msg0, [return_maps]),
?assertMatch(
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"Invalid name format.", _/binary>>
},
Msg
),
ok.
%%% helpers
listen_on_random_port() ->
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],

View File

@ -312,6 +312,25 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
Error
end.
api_spec_schemas(Root) ->
Method = get,
Path = emqx_mgmt_api_test_util:api_path(["schemas", Root]),
Params = [],
AuthHeader = [],
Opts = #{return_all => true},
case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, 200, _}, _, Res0}} ->
#{<<"components">> := #{<<"schemas">> := Schemas}} =
emqx_utils_json:decode(Res0, [return_maps]),
Schemas
end.
bridges_api_spec_schemas() ->
api_spec_schemas("bridges").
actions_api_spec_schemas() ->
api_spec_schemas("actions").
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,41 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_tests).
-include_lib("eunit/include/eunit.hrl").
resource_opts_union_connector_actions_test() ->
%% The purpose of this test is to ensure we have split `resource_opts' fields
%% consciouly between connector and actions, in particular when/if we introduce new
%% fields there.
AllROFields = non_deprecated_fields(emqx_resource_schema:create_opts([])),
ActionROFields = non_deprecated_fields(emqx_bridge_v2_schema:resource_opts_fields()),
ConnectorROFields = non_deprecated_fields(emqx_connector_schema:resource_opts_fields()),
UnionROFields = lists:usort(ConnectorROFields ++ ActionROFields),
?assertEqual(
lists:usort(AllROFields),
UnionROFields,
#{
missing_fields => AllROFields -- UnionROFields,
unexpected_fields => UnionROFields -- AllROFields,
action_fields => ActionROFields,
connector_fields => ConnectorROFields
}
),
ok.
non_deprecated_fields(Fields) ->
[K || {K, Schema} <- Fields, not hocon_schema:is_deprecated(Schema)].

View File

@ -126,7 +126,7 @@ fields(action) ->
fields(actions) ->
Fields =
override(
emqx_bridge_kafka:producer_opts(),
emqx_bridge_kafka:producer_opts(action),
bridge_v2_overrides()
) ++
[

View File

@ -272,6 +272,22 @@ make_message() ->
timestamp => Time
}.
bridge_api_spec_props_for_get() ->
#{
<<"bridge_azure_event_hub.get_producer">> :=
#{<<"properties">> := Props}
} =
emqx_bridge_v2_testlib:bridges_api_spec_schemas(),
Props.
action_api_spec_props_for_get() ->
#{
<<"bridge_azure_event_hub.get_bridge_v2">> :=
#{<<"properties">> := Props}
} =
emqx_bridge_v2_testlib:actions_api_spec_schemas(),
Props.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -341,3 +357,14 @@ t_same_name_azure_kafka_bridges(Config) ->
end
),
ok.
t_parameters_key_api_spec(_Config) ->
BridgeProps = bridge_api_spec_props_for_get(),
?assert(is_map_key(<<"kafka">>, BridgeProps), #{bridge_props => BridgeProps}),
?assertNot(is_map_key(<<"parameters">>, BridgeProps), #{bridge_props => BridgeProps}),
ActionProps = action_api_spec_props_for_get(),
?assertNot(is_map_key(<<"kafka">>, ActionProps), #{action_props => ActionProps}),
?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}),
ok.

View File

@ -113,7 +113,7 @@ fields(action) ->
fields(actions) ->
Fields =
override(
emqx_bridge_kafka:producer_opts(),
emqx_bridge_kafka:producer_opts(action),
bridge_v2_overrides()
) ++
[

View File

@ -29,7 +29,7 @@
desc/1,
host_opts/0,
ssl_client_opts_fields/0,
producer_opts/0
producer_opts/1
]).
-export([
@ -261,7 +261,7 @@ fields("config_producer") ->
fields("config_consumer") ->
fields(kafka_consumer);
fields(kafka_producer) ->
connector_config_fields() ++ producer_opts();
connector_config_fields() ++ producer_opts(v1);
fields(kafka_producer_action) ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@ -270,7 +270,7 @@ fields(kafka_producer_action) ->
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})},
{description, emqx_schema:description_schema()}
] ++ producer_opts();
] ++ producer_opts(action);
fields(kafka_consumer) ->
connector_config_fields() ++ fields(consumer_opts);
fields(ssl_client_opts) ->
@ -523,7 +523,7 @@ fields(consumer_kafka_opts) ->
];
fields(resource_opts) ->
SupportedFields = [health_check_interval],
CreationOpts = emqx_resource_schema:create_opts(_Overrides = []),
CreationOpts = emqx_bridge_v2_schema:resource_opts_fields(),
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts);
fields(action_field) ->
{kafka_producer,
@ -599,25 +599,25 @@ connector_config_fields() ->
{ssl, mk(ref(ssl_client_opts), #{})}
].
producer_opts() ->
producer_opts(ActionOrBridgeV1) ->
[
%% Note: there's an implicit convention in `emqx_bridge' that,
%% for egress bridges with this config, the published messages
%% will be forwarded to such bridges.
{local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
parameters_field(),
parameters_field(ActionOrBridgeV1),
{resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
].
%% Since e5.3.1, we want to rename the field 'kafka' to 'parameters'
%% However we need to keep it backward compatible for generated schema json (version 0.1.0)
%% since schema is data for the 'schemas' API.
parameters_field() ->
parameters_field(ActionOrBridgeV1) ->
{Name, Alias} =
case get(emqx_bridge_schema_version) of
<<"0.1.0">> ->
case ActionOrBridgeV1 of
v1 ->
{kafka, parameters};
_ ->
action ->
{parameters, kafka}
end,
{Name,

View File

@ -32,9 +32,8 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, schema_module(), kafka_producer
),
KafkaMap = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config0, #{}),
Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config0),
Config2 = emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaMap}),
KafkaMap = maps:get(<<"kafka">>, BridgeV1Conf, #{}),
Config2 = emqx_utils_maps:deep_merge(Config0, #{<<"parameters">> => KafkaMap}),
maps:with(producer_action_field_keys(), Config2).
%%------------------------------------------------------------------------------------------

View File

@ -25,7 +25,7 @@ kafka_producer_test() ->
<<"kafka_producer">> :=
#{
<<"myproducer">> :=
#{<<"parameters">> := #{}}
#{<<"kafka">> := #{}}
}
}
},
@ -52,7 +52,7 @@ kafka_producer_test() ->
#{
<<"myproducer">> :=
#{
<<"parameters">> := #{},
<<"kafka">> := #{},
<<"local_topic">> := <<"mqtt/local">>
}
}
@ -68,7 +68,7 @@ kafka_producer_test() ->
#{
<<"myproducer">> :=
#{
<<"parameters">> := #{},
<<"kafka">> := #{},
<<"local_topic">> := <<"mqtt/local">>
}
}
@ -166,7 +166,7 @@ message_key_dispatch_validations_test() ->
?assertThrow(
{_, [
#{
path := "bridges.kafka_producer.myproducer.parameters",
path := "bridges.kafka_producer.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},
@ -175,7 +175,7 @@ message_key_dispatch_validations_test() ->
?assertThrow(
{_, [
#{
path := "bridges.kafka_producer.myproducer.parameters",
path := "bridges.kafka_producer.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},

View File

@ -182,6 +182,22 @@ create_action(Name, Config) ->
on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end),
Res.
bridge_api_spec_props_for_get() ->
#{
<<"bridge_kafka.get_producer">> :=
#{<<"properties">> := Props}
} =
emqx_bridge_v2_testlib:bridges_api_spec_schemas(),
Props.
action_api_spec_props_for_get() ->
#{
<<"bridge_kafka.get_bridge_v2">> :=
#{<<"properties">> := Props}
} =
emqx_bridge_v2_testlib:actions_api_spec_schemas(),
Props.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -342,3 +358,14 @@ t_bad_url(_Config) ->
),
?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
ok.
t_parameters_key_api_spec(_Config) ->
BridgeProps = bridge_api_spec_props_for_get(),
?assert(is_map_key(<<"kafka">>, BridgeProps), #{bridge_props => BridgeProps}),
?assertNot(is_map_key(<<"parameters">>, BridgeProps), #{bridge_props => BridgeProps}),
ActionProps = action_api_spec_props_for_get(),
?assertNot(is_map_key(<<"kafka">>, ActionProps), #{action_props => ActionProps}),
?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}),
ok.

View File

@ -10,8 +10,11 @@
%% Test cases
%%===========================================================================
atoms() ->
[my_producer].
pulsar_producer_validations_test() ->
Name = list_to_atom("my_producer"),
Name = hd(atoms()),
Conf0 = pulsar_producer_hocon(),
Conf1 =
Conf0 ++

View File

@ -193,12 +193,7 @@ hotconf_schema_json() ->
bridge_schema_json() ->
Version = <<"0.1.0">>,
SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => Version},
put(emqx_bridge_schema_version, Version),
try
gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo)
after
erase(emqx_bridge_schema_version)
end.
gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo).
%% TODO: remove it and also remove hocon_md.erl and friends.
%% markdown generation from schema is a failure and we are moving to an interactive

View File

@ -372,7 +372,7 @@ schema("/connectors_probe") ->
case emqx_connector:remove(ConnectorType, ConnectorName) of
ok ->
?NO_CONTENT;
{error, {active_channels, Channels}} ->
{error, {post_config_update, _HandlerMod, {active_channels, Channels}}} ->
?BAD_REQUEST(
{<<"Cannot delete connector while there are active channels defined for this connector">>,
Channels}

View File

@ -35,6 +35,8 @@
-export([connector_type_to_bridge_types/1]).
-export([common_fields/0]).
-export([resource_opts_fields/0, resource_opts_fields/1]).
-if(?EMQX_RELEASE_EDITION == ee).
enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use
@ -364,6 +366,24 @@ common_fields() ->
{description, emqx_schema:description_schema()}
].
resource_opts_fields() ->
resource_opts_fields(_Overrides = []).
resource_opts_fields(Overrides) ->
%% Note: these don't include buffer-related configurations because buffer workers are
%% tied to the action.
ConnectorROFields = [
health_check_interval,
query_mode,
request_ttl,
start_after_created,
start_timeout
],
lists:filter(
fun({Key, _Sc}) -> lists:member(Key, ConnectorROFields) end,
emqx_resource_schema:create_opts(Overrides)
).
%%======================================================================================
%% Helper Functions
%%======================================================================================

View File

@ -229,7 +229,7 @@ t_create_with_bad_name_direct_path(_Config) ->
{error,
{pre_config_update, _ConfigHandlerMod, #{
kind := validation_error,
reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
reason := <<"Invalid name format.", _/binary>>
}}},
emqx:update_config(Path, ConnConfig)
),

View File

@ -25,7 +25,7 @@
-include_lib("snabbkaffe/include/test_macros.hrl").
-define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))).
-define(CONNECTOR(NAME, TYPE), #{
-define(RESOURCE(NAME, TYPE), #{
%<<"ssl">> => #{<<"enable">> => false},
<<"type">> => TYPE,
<<"name">> => NAME
@ -52,12 +52,57 @@
-define(KAFKA_CONNECTOR_BASE, ?KAFKA_CONNECTOR_BASE(?KAFKA_BOOTSTRAP_HOST)).
-define(KAFKA_CONNECTOR(Name, BootstrapHosts),
maps:merge(
?CONNECTOR(Name, ?CONNECTOR_TYPE),
?RESOURCE(Name, ?CONNECTOR_TYPE),
?KAFKA_CONNECTOR_BASE(BootstrapHosts)
)
).
-define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)).
-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
-define(BRIDGE_TYPE_STR, "kafka_producer").
-define(BRIDGE_TYPE, <<?BRIDGE_TYPE_STR>>).
-define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?BRIDGE_TYPE)#{
<<"enable">> => true,
<<"connector">> => Connector,
<<"kafka">> => #{
<<"buffer">> => #{
<<"memory_overload_protection">> => true,
<<"mode">> => <<"hybrid">>,
<<"per_partition_limit">> => <<"2GB">>,
<<"segment_bytes">> => <<"100MB">>
},
<<"compression">> => <<"no_compression">>,
<<"kafka_ext_headers">> => [
#{
<<"kafka_ext_header_key">> => <<"clientid">>,
<<"kafka_ext_header_value">> => <<"${clientid}">>
},
#{
<<"kafka_ext_header_key">> => <<"topic">>,
<<"kafka_ext_header_value">> => <<"${topic}">>
}
],
<<"kafka_header_value_encode_mode">> => <<"none">>,
<<"kafka_headers">> => <<"${pub_props}">>,
<<"max_batch_bytes">> => <<"896KB">>,
<<"max_inflight">> => 10,
<<"message">> => #{
<<"key">> => <<"${.clientid}">>,
<<"timestamp">> => <<"${.timestamp}">>,
<<"value">> => <<"${.}">>
},
<<"partition_count_refresh_interval">> => <<"60s">>,
<<"partition_strategy">> => <<"random">>,
<<"required_acks">> => <<"all_isr">>,
<<"topic">> => <<"kafka-topic">>
},
<<"local_topic">> => <<"mqtt/local/topic">>,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"32s">>
}
}).
-define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)).
%% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>).
%% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{
%% <<"server">> => SERVER,
@ -105,7 +150,8 @@
emqx,
emqx_auth,
emqx_management,
{emqx_connector, "connectors {}"}
{emqx_connector, "connectors {}"},
{emqx_bridge, "actions {}"}
]).
-define(APPSPEC_DASHBOARD,
@ -128,7 +174,8 @@ all() ->
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
SingleOnlyTests = [
t_connectors_probe
t_connectors_probe,
t_fail_delete_with_action
],
ClusterLaterJoinOnlyTCs = [
% t_cluster_later_join_metrics
@ -187,29 +234,38 @@ end_per_group(_, Config) ->
emqx_cth_suite:stop(?config(group_apps, Config)),
ok.
init_per_testcase(_TestCase, Config) ->
init_per_testcase(TestCase, Config) ->
case ?config(cluster_nodes, Config) of
undefined ->
init_mocks();
init_mocks(TestCase);
Nodes ->
[erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
[erpc:call(Node, ?MODULE, init_mocks, [TestCase]) || Node <- Nodes]
end,
Config.
end_per_testcase(_TestCase, Config) ->
end_per_testcase(TestCase, Config) ->
Node = ?config(node, Config),
ok = erpc:call(Node, ?MODULE, clear_resources, [TestCase]),
case ?config(cluster_nodes, Config) of
undefined ->
meck:unload();
Nodes ->
[erpc:call(Node, meck, unload, []) || Node <- Nodes]
[erpc:call(N, meck, unload, []) || N <- Nodes]
end,
Node = ?config(node, Config),
ok = emqx_common_test_helpers:call_janitor(),
ok = erpc:call(Node, fun clear_resources/0),
ok.
-define(CONNECTOR_IMPL, dummy_connector_impl).
init_mocks() ->
init_mocks(t_fail_delete_with_action) ->
init_mocks(common),
meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId)
end),
ok;
init_mocks(_TestCase) ->
meck:new(emqx_connector_ee_schema, [passthrough, no_link]),
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL),
meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
@ -235,7 +291,15 @@ init_mocks() ->
),
[?CONNECTOR_IMPL, emqx_connector_ee_schema].
clear_resources() ->
clear_resources(t_fail_delete_with_action) ->
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_bridge_v2:remove(Type, Name)
end,
emqx_bridge_v2:list()
),
clear_resources(common);
clear_resources(_) ->
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_connector:remove(Type, Name)
@ -646,7 +710,7 @@ t_connectors_probe(Config) ->
request_json(
post,
uri(["connectors_probe"]),
?CONNECTOR(<<"broken_connector">>, <<"unknown_type">>),
?RESOURCE(<<"broken_connector">>, <<"unknown_type">>),
Config
)
),
@ -674,6 +738,57 @@ t_create_with_bad_name(Config) ->
?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg),
ok.
t_fail_delete_with_action(Config) ->
Name = ?CONNECTOR_NAME,
?assertMatch(
{ok, 201, #{
<<"type">> := ?CONNECTOR_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _]
}},
request_json(
post,
uri(["connectors"]),
?KAFKA_CONNECTOR(Name),
Config
)
),
ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name),
BridgeName = ?BRIDGE_NAME,
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := BridgeName,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"connector">> := Name,
<<"kafka">> := #{},
<<"local_topic">> := _,
<<"resource_opts">> := _
}},
request_json(
post,
uri(["actions"]),
?KAFKA_BRIDGE(?BRIDGE_NAME),
Config
)
),
%% delete the connector
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> :=
<<"{<<\"Cannot delete connector while there are active channels",
" defined for this connector\">>,", _/binary>>
}},
request_json(delete, uri(["connectors", ConnectorID]), Config)
),
ok.
%%% helpers
listen_on_random_port() ->
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],

View File

@ -963,13 +963,12 @@ handle_info(
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, NChannel);
handle_info(
{sock_closed, Reason},
{sock_closed, _Reason},
Channel = #channel{conn_state = disconnected}
) ->
?SLOG(error, #{
msg => "unexpected_sock_closed",
reason => Reason
}),
%% This can happen as a race:
%% EMQX closes socket and marks 'disconnected' but 'tcp_closed' or 'ssl_closed'
%% is already in process mailbox
{ok, Channel};
handle_info(clean_authz_cache, Channel) ->
ok = emqx_authz_cache:empty_authz_cache(),

View File

@ -812,11 +812,11 @@ validate_name(Name) ->
ok.
validate_name(<<>>, _Opts) ->
invalid_data("name cannot be empty string");
invalid_data("Name cannot be empty string");
validate_name(Name, _Opts) when size(Name) >= 255 ->
invalid_data("name length must be less than 255");
invalid_data("Name length must be less than 255");
validate_name(Name, Opts) ->
case re:run(Name, <<"^[-0-9a-zA-Z_]+$">>, [{capture, none}]) of
case re:run(Name, <<"^[0-9a-zA-Z][-0-9a-zA-Z_]*$">>, [{capture, none}]) of
match ->
case maps:get(atom_name, Opts, true) of
%% NOTE
@ -827,7 +827,12 @@ validate_name(Name, Opts) ->
end;
nomatch ->
invalid_data(
<<"only 0-9a-zA-Z_- is allowed in resource name, got: ", Name/binary>>
<<
"Invalid name format. The name must begin with a letter or number "
"(0-9, a-z, A-Z) and can only include underscores and hyphens as "
"non-initial characters. Got: ",
Name/binary
>>
)
end.

View File

@ -0,0 +1,5 @@
Resolve redundant error logging on socket closure
Addressed a race condition causing duplicate error logs when a socket is closed by both a peer and the server.
Dual socket close events from the OS and EMQX previously led to excessive error logging.
The fix improves event handling to avoid redundant error-level logging.

View File

@ -0,0 +1,3 @@
Fix connection crash when trying to set TCP/SSL socket `active_n` option.
Prior to this fix, if a socket is already closed when connection process tries to set `active_n` option, it causes a `case_clause` crash.

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.3.1
version: 5.3.2-alpha.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.3.1
appVersion: 5.3.2-alpha.1

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.3.1
version: 5.3.2
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.3.1
appVersion: 5.3.2