Merge pull request #11983 from thalesmg/sync-r53-to-m-20231120

sync `release-53` to `master`
This commit is contained in:
Thales Macedo Garitezi 2023-11-21 09:02:51 -03:00 committed by GitHub
commit 068d151b14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 535 additions and 182 deletions

View File

@ -100,7 +100,7 @@ mk_config_listeners(N) ->
t_cluster_routing(Config) ->
Cluster = ?config(cluster, Config),
Clients = [C1, C2, C3] = [start_client(N) || N <- Cluster],
Clients = [C1, C2, C3] = lists:sort([start_client(N) || N <- Cluster]),
Commands = [
{fun publish/3, [C1, <<"a/b/c">>, <<"wontsee">>]},
{fun publish/3, [C2, <<"a/b/d">>, <<"wontsee">>]},

View File

@ -55,7 +55,6 @@
]).
-export([config_key_path/0]).
-export([validate_bridge_name/1]).
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
@ -269,7 +268,12 @@ config_key_path() ->
pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
{ok, RawConf};
pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
{ok, convert_certs(NewConf)}.
case multi_validate_bridge_names(NewConf) of
ok ->
{ok, convert_certs(NewConf)};
Error ->
Error
end.
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} =
@ -658,17 +662,13 @@ get_basic_usage_info() ->
InitialAcc
end.
validate_bridge_name(BridgeName0) ->
BridgeName = to_bin(BridgeName0),
case re:run(BridgeName, ?MAP_KEY_RE, [{capture, none}]) of
match ->
ok;
nomatch ->
{error, #{
kind => validation_error,
reason => bad_bridge_name,
value => BridgeName
}}
validate_bridge_name(BridgeName) ->
try
_ = emqx_resource:validate_name(to_bin(BridgeName)),
ok
catch
throw:Error ->
{error, Error}
end.
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
@ -676,3 +676,31 @@ to_bin(B) when is_binary(B) -> B.
upgrade_type(Type) ->
emqx_bridge_lib:upgrade_type(Type).
multi_validate_bridge_names(Conf) ->
BridgeTypeAndNames =
[
{Type, Name}
|| {Type, NameToConf} <- maps:to_list(Conf),
{Name, _Conf} <- maps:to_list(NameToConf)
],
BadBridges =
lists:filtermap(
fun({Type, Name}) ->
case validate_bridge_name(Name) of
ok -> false;
_Error -> {true, #{type => Type, name => Name}}
end
end,
BridgeTypeAndNames
),
case BadBridges of
[] ->
ok;
[_ | _] ->
{error, #{
kind => validation_error,
reason => bad_bridge_names,
bad_bridges => BadBridges
}}
end.

View File

@ -63,7 +63,7 @@ pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
%% to save the 'enable' to the config files
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
case validate_bridge_name(Path) of
case validate_bridge_name_in_config(Path) of
ok ->
case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
{error, Reason} ->
@ -104,11 +104,23 @@ post_config_update([bridges, BridgeType, BridgeName], _Req, NewConf, OldConf, _A
operation_to_enable(disable) -> false;
operation_to_enable(enable) -> true.
validate_bridge_name(Path) ->
validate_bridge_name_in_config(Path) ->
[RootKey] = emqx_bridge:config_key_path(),
case Path of
[RootKey, _BridgeType, BridgeName] ->
emqx_bridge:validate_bridge_name(BridgeName);
validate_bridge_name(BridgeName);
_ ->
ok
end.
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
to_bin(B) when is_binary(B) -> B.
validate_bridge_name(BridgeName) ->
try
_ = emqx_resource:validate_name(to_bin(BridgeName)),
ok
catch
throw:Error ->
{error, Error}
end.

View File

@ -202,33 +202,36 @@ lookup(Type, Name) ->
%% The connector should always exist
%% ... but, in theory, there might be no channels associated to it when we try
%% to delete the connector, and then this reference will become dangling...
InstanceData =
ConnectorData =
case emqx_resource:get_instance(ConnectorId) of
{ok, _, Data} ->
Data;
{error, not_found} ->
#{}
end,
%% Find the Bridge V2 status from the InstanceData
Channels = maps:get(added_channels, InstanceData, #{}),
%% Find the Bridge V2 status from the ConnectorData
ConnectorStatus = maps:get(status, ConnectorData, undefined),
Channels = maps:get(added_channels, ConnectorData, #{}),
BridgeV2Id = id(Type, Name, BridgeConnector),
ChannelStatus = maps:get(BridgeV2Id, Channels, undefined),
{DisplayBridgeV2Status, ErrorMsg} =
case ChannelStatus of
#{status := connected} ->
{connected, <<"">>};
#{status := Status, error := undefined} ->
case {ChannelStatus, ConnectorStatus} of
{#{status := ?status_connected}, _} ->
{?status_connected, <<"">>};
{#{error := resource_not_operational}, ?status_connecting} ->
{?status_connecting, <<"Not installed">>};
{#{status := Status, error := undefined}, _} ->
{Status, <<"Unknown reason">>};
#{status := Status, error := Error} ->
{#{status := Status, error := Error}, _} ->
{Status, emqx_utils:readable_error_msg(Error)};
undefined ->
{disconnected, <<"Pending installation">>}
{undefined, _} ->
{?status_disconnected, <<"Not installed">>}
end,
{ok, #{
type => bin(Type),
name => bin(Name),
raw_config => RawConf,
resource_data => InstanceData,
resource_data => ConnectorData,
status => DisplayBridgeV2Status,
error => ErrorMsg
}}

View File

@ -82,9 +82,7 @@ schema_modules() ->
].
examples(Method) ->
ActionExamples = emqx_bridge_v2_schema:examples(Method),
RegisteredExamples = registered_examples(Method),
maps:merge(ActionExamples, RegisteredExamples).
registered_examples(Method).
registered_examples(Method) ->
MergeFun =

View File

@ -199,13 +199,41 @@ t_create_with_bad_name(_Config) ->
?assertMatch(
{error,
{pre_config_update, emqx_bridge_app, #{
reason := bad_bridge_name,
reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>,
kind := validation_error
}}},
emqx:update_config(Path, Conf)
),
ok.
t_create_with_bad_name_root(_Config) ->
BadBridgeName = <<"test_哈哈">>,
BridgeConf = #{
<<"bridge_mode">> => false,
<<"clean_start">> => true,
<<"keepalive">> => <<"60s">>,
<<"proto_ver">> => <<"v4">>,
<<"server">> => <<"127.0.0.1:1883">>,
<<"ssl">> =>
#{
%% needed to trigger pre_config_update
<<"certfile">> => cert_file("certfile"),
<<"enable">> => true
}
},
Conf = #{<<"mqtt">> => #{BadBridgeName => BridgeConf}},
Path = [bridges],
?assertMatch(
{error,
{pre_config_update, _ConfigHandlerMod, #{
kind := validation_error,
reason := bad_bridge_names,
bad_bridges := [#{type := <<"mqtt">>, name := BadBridgeName}]
}}},
emqx:update_config(Path, Conf)
),
ok.
data_file(Name) ->
Dir = code:lib_dir(emqx_bridge, test),
{ok, Bin} = file:read_file(filename:join([Dir, "data", Name])),

View File

@ -1362,7 +1362,13 @@ t_create_with_bad_name(Config) ->
Config
),
Msg = emqx_utils_json:decode(Msg0, [return_maps]),
?assertMatch(#{<<"reason">> := <<"bad_bridge_name">>}, Msg),
?assertMatch(
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
},
Msg
),
ok.
validate_resource_request_ttl(single, Timeout, Name) ->

View File

@ -142,7 +142,8 @@ con_schema() ->
fields("connector") ->
[
{enable, hoconsc:mk(any(), #{})},
{resource_opts, hoconsc:mk(map(), #{})}
{resource_opts, hoconsc:mk(map(), #{})},
{ssl, hoconsc:ref(ssl)}
];
fields("api_post") ->
[
@ -151,7 +152,9 @@ fields("api_post") ->
{type, hoconsc:mk(bridge_type(), #{})},
{send_to, hoconsc:mk(atom(), #{})}
| fields("connector")
].
];
fields(ssl) ->
emqx_schema:client_ssl_opts_schema(#{required => false}).
con_config() ->
#{
@ -798,3 +801,27 @@ t_scenario_2(Config) ->
?assert(is_rule_enabled(RuleId2)),
ok.
t_create_with_bad_name(_Config) ->
BadBridgeName = <<"test_哈哈">>,
%% Note: must contain SSL options to trigger bug.
Cacertfile = emqx_common_test_helpers:app_path(
emqx,
filename:join(["etc", "certs", "cacert.pem"])
),
Opts = #{
name => BadBridgeName,
overrides => #{
<<"ssl">> =>
#{<<"cacertfile">> => Cacertfile}
}
},
{error,
{{_, 400, _}, _, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> := #{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
}
}}} = create_bridge_http_api_v1(Opts),
ok.

View File

@ -20,6 +20,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
@ -43,7 +44,7 @@ con_schema() ->
{
con_type(),
hoconsc:mk(
hoconsc:map(name, typerefl:map()),
hoconsc:map(name, hoconsc:ref(?MODULE, connector_config)),
#{
desc => <<"Test Connector Config">>,
required => false
@ -52,6 +53,15 @@ con_schema() ->
}
].
fields(connector_config) ->
[
{enable, hoconsc:mk(typerefl:boolean(), #{})},
{resource_opts, hoconsc:mk(typerefl:map(), #{})},
{on_start_fun, hoconsc:mk(typerefl:binary(), #{})},
{on_get_status_fun, hoconsc:mk(typerefl:binary(), #{})},
{on_add_channel_fun, hoconsc:mk(typerefl:binary(), #{})}
].
con_config() ->
#{
<<"enable">> => true,
@ -112,6 +122,7 @@ setup_mocks() ->
catch meck:new(emqx_connector_schema, MeckOpts),
meck:expect(emqx_connector_schema, fields, 1, con_schema()),
meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]),
catch meck:new(emqx_connector_resource, MeckOpts),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()),
@ -159,15 +170,7 @@ init_per_testcase(_TestCase, Config) ->
ets:new(fun_table_name(), [named_table, public]),
%% Create a fake connector
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
[
{mocked_mods, [
emqx_connector_schema,
emqx_connector_resource,
emqx_bridge_v2
]}
| Config
].
Config.
end_per_testcase(_TestCase, _Config) ->
ets:delete(fun_table_name()),
@ -846,6 +849,51 @@ t_start_operation_when_on_add_channel_gives_error(_Config) ->
),
ok.
t_lookup_status_when_connecting(_Config) ->
ResponseETS = ets:new(response_ets, [public]),
ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}),
OnGetStatusFun = wrap_fun(fun() ->
ets:lookup_element(ResponseETS, on_get_status_value, 2)
end),
ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
<<"on_get_status_fun">> => OnGetStatusFun,
<<"resource_opts">> => #{<<"start_timeout">> => 100}
}),
ConnectorName = ?FUNCTION_NAME,
ct:pal("connector config:\n ~p", [ConnectorConfig]),
{ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
ActionName = my_test_action,
ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end),
ActionConfig = (bridge_config())#{
<<"on_get_channel_status_fun">> => ChanStatusFun,
<<"connector">> => atom_to_binary(ConnectorName)
},
ct:pal("action config:\n ~p", [ActionConfig]),
{ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
%% Top-level status is connecting if the connector status is connecting, but the
%% channel is not yet installed. `resource_data.added_channels.$channel_id.status'
%% contains true internal status.
{ok, Res} = emqx_bridge_v2:lookup(bridge_type(), ActionName),
?assertMatch(
#{
%% This is the action's public status
status := ?status_connecting,
resource_data :=
#{
%% This is the connector's status
status := ?status_connecting
}
},
Res
),
#{resource_data := #{added_channels := Channels}} = Res,
[{_Id, ChannelData}] = maps:to_list(Channels),
?assertMatch(#{status := ?status_disconnected}, ChannelData),
ok.
%% Helper Functions
wait_until(Fun) ->

View File

@ -587,7 +587,7 @@ t_broken_bridge_config(Config) ->
<<"type">> := ?BRIDGE_TYPE,
<<"connector">> := <<"does_not_exist">>,
<<"status">> := <<"disconnected">>,
<<"error">> := <<"Pending installation">>
<<"error">> := <<"Not installed">>
}
]},
request_json(get, uri([?ROOT]), Config)
@ -640,7 +640,7 @@ t_fix_broken_bridge_config(Config) ->
<<"type">> := ?BRIDGE_TYPE,
<<"connector">> := <<"does_not_exist">>,
<<"status">> := <<"disconnected">>,
<<"error">> := <<"Pending installation">>
<<"error">> := <<"Not installed">>
}
]},
request_json(get, uri([?ROOT]), Config)

View File

@ -43,8 +43,8 @@ on_start(
) ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
Fun(Conf);
on_start(_InstId, _Config) ->
{ok, #{}}.
on_start(_InstId, Config) ->
{ok, Config}.
on_add_channel(
_InstId,

View File

@ -481,11 +481,11 @@ on_get_status(
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
case wolff_client:check_connectivity(Pid) of
ok -> connected;
{error, Error} -> {connecting, State, Error}
ok -> ?status_connected;
{error, Error} -> {?status_connecting, State, Error}
end;
{error, _Reason} ->
connecting
?status_connecting
end.
on_get_channel_status(
@ -499,10 +499,10 @@ on_get_channel_status(
#{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels),
try
ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
connected
?status_connected
catch
throw:#{reason := restarting} ->
conneting
?status_connecting
end.
check_topic_and_leader_connections(ClientId, KafkaTopic) ->

View File

@ -23,8 +23,14 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("brod/include/brod.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(TYPE, kafka_producer).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -51,6 +57,135 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(60_000),
ok.
%%-------------------------------------------------------------------------------------
%% Helper fns
%%-------------------------------------------------------------------------------------
check_send_message_with_bridge(BridgeName) ->
%% ######################################
%% Create Kafka message
%% ######################################
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Payload = list_to_binary("payload" ++ integer_to_list(Time)),
Msg = #{
clientid => BinTime,
payload => Payload,
timestamp => Time
},
Offset = resolve_kafka_offset(),
%% ######################################
%% Send message
%% ######################################
emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}),
%% ######################################
%% Check if message is sent to Kafka
%% ######################################
check_kafka_message_payload(Offset, Payload).
resolve_kafka_offset() ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
Partition = 0,
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
{ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
Hosts, KafkaTopic, Partition
),
Offset0.
check_kafka_message_payload(Offset, ExpectedPayload) ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
Partition = 0,
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
{ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
bridge_v2_config(ConnectorName) ->
#{
<<"connector">> => ConnectorName,
<<"enable">> => true,
<<"kafka">> => #{
<<"buffer">> => #{
<<"memory_overload_protection">> => false,
<<"mode">> => <<"memory">>,
<<"per_partition_limit">> => <<"2GB">>,
<<"segment_bytes">> => <<"100MB">>
},
<<"compression">> => <<"no_compression">>,
<<"kafka_header_value_encode_mode">> => <<"none">>,
<<"max_batch_bytes">> => <<"896KB">>,
<<"max_inflight">> => 10,
<<"message">> => #{
<<"key">> => <<"${.clientid}">>,
<<"timestamp">> => <<"${.timestamp}">>,
<<"value">> => <<"${.payload}">>
},
<<"partition_count_refresh_interval">> => <<"60s">>,
<<"partition_strategy">> => <<"random">>,
<<"query_mode">> => <<"sync">>,
<<"required_acks">> => <<"all_isr">>,
<<"sync_query_timeout">> => <<"5s">>,
<<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
},
<<"local_topic">> => <<"kafka_t/#">>,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>
}
}.
connector_config() ->
#{
<<"authentication">> => <<"none">>,
<<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
<<"connect_timeout">> => <<"5s">>,
<<"enable">> => true,
<<"metadata_request_timeout">> => <<"5s">>,
<<"min_metadata_refresh_interval">> => <<"3s">>,
<<"socket_opts">> =>
#{
<<"recbuf">> => <<"1024KB">>,
<<"sndbuf">> => <<"1024KB">>,
<<"tcp_keepalive">> => <<"none">>
},
<<"ssl">> =>
#{
<<"ciphers">> => [],
<<"depth">> => 10,
<<"enable">> => false,
<<"hibernate_after">> => <<"5s">>,
<<"log_level">> => <<"notice">>,
<<"reuse_sessions">> => true,
<<"secure_renegotiate">> => true,
<<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
}
}.
kafka_hosts_string() ->
KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
KafkaHost ++ ":" ++ KafkaPort.
create_connector(Name, Config) ->
Res = emqx_connector:create(?TYPE, Name, Config),
on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end),
Res.
create_action(Name, Config) ->
Res = emqx_bridge_v2:create(?TYPE, Name, Config),
on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end),
Res.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_create_remove_list(_) ->
[] = emqx_bridge_v2:list(),
ConnectorConfig = connector_config(),
@ -187,106 +322,23 @@ t_unknown_topic(_Config) ->
),
ok.
check_send_message_with_bridge(BridgeName) ->
%% ######################################
%% Create Kafka message
%% ######################################
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Payload = list_to_binary("payload" ++ integer_to_list(Time)),
Msg = #{
clientid => BinTime,
payload => Payload,
timestamp => Time
},
Offset = resolve_kafka_offset(),
%% ######################################
%% Send message
%% ######################################
emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}),
%% ######################################
%% Check if message is sent to Kafka
%% ######################################
check_kafka_message_payload(Offset, Payload).
resolve_kafka_offset() ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
Partition = 0,
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
{ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
Hosts, KafkaTopic, Partition
t_bad_url(_Config) ->
ConnectorName = <<"test_connector">>,
ActionName = <<"test_action">>,
ActionConfig = bridge_v2_config(<<"test_connector">>),
ConnectorConfig0 = connector_config(),
ConnectorConfig = ConnectorConfig0#{<<"bootstrap_hosts">> := <<"bad_host:9092">>},
?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)),
?assertMatch({ok, _}, create_action(ActionName, ActionConfig)),
?assertMatch(
{ok, #{
resource_data :=
#{
status := connecting,
error := [#{reason := unresolvable_hostname}]
}
}},
emqx_connector:lookup(?TYPE, ConnectorName)
),
Offset0.
check_kafka_message_payload(Offset, ExpectedPayload) ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
Partition = 0,
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
{ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
bridge_v2_config(ConnectorName) ->
#{
<<"connector">> => ConnectorName,
<<"enable">> => true,
<<"kafka">> => #{
<<"buffer">> => #{
<<"memory_overload_protection">> => false,
<<"mode">> => <<"memory">>,
<<"per_partition_limit">> => <<"2GB">>,
<<"segment_bytes">> => <<"100MB">>
},
<<"compression">> => <<"no_compression">>,
<<"kafka_header_value_encode_mode">> => <<"none">>,
<<"max_batch_bytes">> => <<"896KB">>,
<<"max_inflight">> => 10,
<<"message">> => #{
<<"key">> => <<"${.clientid}">>,
<<"timestamp">> => <<"${.timestamp}">>,
<<"value">> => <<"${.payload}">>
},
<<"partition_count_refresh_interval">> => <<"60s">>,
<<"partition_strategy">> => <<"random">>,
<<"query_mode">> => <<"sync">>,
<<"required_acks">> => <<"all_isr">>,
<<"sync_query_timeout">> => <<"5s">>,
<<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
},
<<"local_topic">> => <<"kafka_t/#">>,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>
}
}.
connector_config() ->
#{
<<"authentication">> => <<"none">>,
<<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
<<"connect_timeout">> => <<"5s">>,
<<"enable">> => true,
<<"metadata_request_timeout">> => <<"5s">>,
<<"min_metadata_refresh_interval">> => <<"3s">>,
<<"socket_opts">> =>
#{
<<"recbuf">> => <<"1024KB">>,
<<"sndbuf">> => <<"1024KB">>,
<<"tcp_keepalive">> => <<"none">>
},
<<"ssl">> =>
#{
<<"ciphers">> => [],
<<"depth">> => 10,
<<"enable">> => false,
<<"hibernate_after">> => <<"5s">>,
<<"log_level">> => <<"notice">>,
<<"reuse_sessions">> => true,
<<"secure_renegotiate">> => true,
<<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
}
}.
kafka_hosts_string() ->
KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
KafkaHost ++ ":" ++ KafkaPort.
?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
ok.

View File

@ -108,18 +108,28 @@ config_key_path() ->
pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
{ok, RawConf};
pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
{ok, convert_certs(NewConf)};
case multi_validate_connector_names(NewConf) of
ok ->
{ok, convert_certs(NewConf)};
Error ->
Error
end;
pre_config_update(_, {_Oper, _, _}, undefined) ->
{error, connector_not_found};
pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
%% to save the 'enable' to the config files
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
{error, Reason} ->
{error, Reason};
{ok, ConfNew} ->
{ok, ConfNew}
case validate_connector_name_in_config(Path) of
ok ->
case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
{error, Reason} ->
{error, Reason};
{ok, ConfNew} ->
{ok, ConfNew}
end;
Error ->
Error
end.
operation_to_enable(disable) -> false;
@ -458,3 +468,51 @@ ensure_no_channels(Configs) ->
{error, Reason, _State} ->
{error, Reason}
end.
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
to_bin(B) when is_binary(B) -> B.
validate_connector_name(ConnectorName) ->
try
_ = emqx_resource:validate_name(to_bin(ConnectorName)),
ok
catch
throw:Error ->
{error, Error}
end.
validate_connector_name_in_config(Path) ->
case Path of
[?ROOT_KEY, _ConnectorType, ConnectorName] ->
validate_connector_name(ConnectorName);
_ ->
ok
end.
multi_validate_connector_names(Conf) ->
ConnectorTypeAndNames =
[
{Type, Name}
|| {Type, NameToConf} <- maps:to_list(Conf),
{Name, _Conf} <- maps:to_list(NameToConf)
],
BadConnectors =
lists:filtermap(
fun({Type, Name}) ->
case validate_connector_name(Name) of
ok -> false;
_Error -> {true, #{type => Type, name => Name}}
end
end,
ConnectorTypeAndNames
),
case BadConnectors of
[] ->
ok;
[_ | _] ->
{error, #{
kind => validation_error,
reason => bad_connector_names,
bad_connectors => BadConnectors
}}
end.

View File

@ -204,6 +204,71 @@ t_remove_fail(_Config) ->
),
ok.
t_create_with_bad_name_direct_path({init, Config}) ->
meck:new(emqx_connector_ee_schema, [passthrough]),
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok),
meck:expect(?CONNECTOR, on_get_status, 2, connected),
Config;
t_create_with_bad_name_direct_path({'end', _Config}) ->
meck:unload(),
ok;
t_create_with_bad_name_direct_path(_Config) ->
Path = [connectors, kafka_producer, 'test_哈哈'],
ConnConfig0 = connector_config(),
%% Note: must contain SSL options to trigger original bug.
Cacertfile = emqx_common_test_helpers:app_path(
emqx,
filename:join(["etc", "certs", "cacert.pem"])
),
ConnConfig = ConnConfig0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}},
?assertMatch(
{error,
{pre_config_update, _ConfigHandlerMod, #{
kind := validation_error,
reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
}}},
emqx:update_config(Path, ConnConfig)
),
ok.
t_create_with_bad_name_root_path({init, Config}) ->
meck:new(emqx_connector_ee_schema, [passthrough]),
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok),
meck:expect(?CONNECTOR, on_get_status, 2, connected),
Config;
t_create_with_bad_name_root_path({'end', _Config}) ->
meck:unload(),
ok;
t_create_with_bad_name_root_path(_Config) ->
Path = [connectors],
BadConnectorName = <<"test_哈哈">>,
ConnConfig0 = connector_config(),
%% Note: must contain SSL options to trigger original bug.
Cacertfile = emqx_common_test_helpers:app_path(
emqx,
filename:join(["etc", "certs", "cacert.pem"])
),
ConnConfig = ConnConfig0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}},
Conf = #{<<"kafka_producer">> => #{BadConnectorName => ConnConfig}},
?assertMatch(
{error,
{pre_config_update, _ConfigHandlerMod, #{
kind := validation_error,
reason := bad_connector_names,
bad_connectors := [#{type := <<"kafka_producer">>, name := BadConnectorName}]
}}},
emqx:update_config(Path, Conf)
),
ok.
%% helpers
connector_config() ->

View File

@ -652,6 +652,28 @@ t_connectors_probe(Config) ->
),
ok.
t_create_with_bad_name(Config) ->
ConnectorName = <<"test_哈哈">>,
Conf0 = ?KAFKA_CONNECTOR(ConnectorName),
%% Note: must contain SSL options to trigger original bug.
Cacertfile = emqx_common_test_helpers:app_path(
emqx,
filename:join(["etc", "certs", "cacert.pem"])
),
Conf = Conf0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}},
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> := Msg0
}} = request_json(
post,
uri(["connectors"]),
Conf,
Config
),
Msg = emqx_utils_json:decode(Msg0, [return_maps]),
?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg),
ok.
%%% helpers
listen_on_random_port() ->
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],

View File

@ -13,6 +13,16 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% bridge/connector/action status
-define(status_connected, connected).
-define(status_connecting, connecting).
-define(status_disconnected, disconnected).
%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'... Modules
%% implementing `emqx_resource' behavior should not return it. The `rm_' prefix is to
%% remind us of that.
-define(rm_status_stopped, stopped).
-type resource_type() :: module().
-type resource_id() :: binary().
-type channel_id() :: binary().
@ -21,8 +31,12 @@
-type resource_config() :: term().
-type resource_spec() :: map().
-type resource_state() :: term().
-type resource_status() :: connected | disconnected | connecting | stopped.
-type channel_status() :: connected | connecting | disconnected.
%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'... Modules
%% implementing `emqx_resource' behavior should not return it.
-type resource_status() ::
?status_connected | ?status_disconnected | ?status_connecting | ?rm_status_stopped.
-type health_check_status() :: ?status_connected | ?status_disconnected | ?status_connecting.
-type channel_status() :: ?status_connected | ?status_connecting | ?status_disconnected.
-type callback_mode() :: always_sync | async_if_possible.
-type query_mode() ::
simple_sync

View File

@ -815,29 +815,21 @@ validate_name(<<>>, _Opts) ->
invalid_data("name cannot be empty string");
validate_name(Name, _Opts) when size(Name) >= 255 ->
invalid_data("name length must be less than 255");
validate_name(Name0, Opts) ->
Name = unicode:characters_to_list(Name0, utf8),
case lists:all(fun is_id_char/1, Name) of
true ->
validate_name(Name, Opts) ->
case re:run(Name, <<"^[-0-9a-zA-Z_]+$">>, [{capture, none}]) of
match ->
case maps:get(atom_name, Opts, true) of
% NOTE
% Rule may be created before bridge, thus not `list_to_existing_atom/1`,
% also it is infrequent user input anyway.
true -> list_to_atom(Name);
false -> Name0
%% NOTE
%% Rule may be created before bridge, thus not `list_to_existing_atom/1`,
%% also it is infrequent user input anyway.
true -> binary_to_atom(Name, utf8);
false -> Name
end;
false ->
nomatch ->
invalid_data(
<<"only 0-9a-zA-Z_- is allowed in resource name, got: ", Name0/binary>>
<<"only 0-9a-zA-Z_- is allowed in resource name, got: ", Name/binary>>
)
end.
-spec invalid_data(binary()) -> no_return().
invalid_data(Reason) -> throw(#{kind => validation_error, reason => Reason}).
is_id_char(C) when C >= $0 andalso C =< $9 -> true;
is_id_char(C) when C >= $a andalso C =< $z -> true;
is_id_char(C) when C >= $A andalso C =< $Z -> true;
is_id_char($_) -> true;
is_id_char($-) -> true;
is_id_char(_) -> false.