Merge pull request #11365 from thalesmg/fix-bridge-probe-atom-conversion-20230728

fix(http_bridge): don't attempt to convert headers to atoms
This commit is contained in:
Thales Macedo Garitezi 2023-08-07 15:08:46 -03:00 committed by GitHub
commit 250b87d884
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 511 additions and 68 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge, [ {application, emqx_bridge, [
{description, "EMQX bridges"}, {description, "EMQX bridges"},
{vsn, "0.1.25"}, {vsn, "0.1.26"},
{registered, [emqx_bridge_sup]}, {registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}}, {mod, {emqx_bridge_app, []}},
{applications, [ {applications, [

View File

@ -544,18 +544,20 @@ schema("/bridges_probe") ->
case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of
ok -> ok ->
?NO_CONTENT; ?NO_CONTENT;
{error, #{kind := validation_error} = Reason} -> {error, #{kind := validation_error} = Reason0} ->
Reason = redact(Reason0),
?BAD_REQUEST('TEST_FAILED', map_to_json(Reason)); ?BAD_REQUEST('TEST_FAILED', map_to_json(Reason));
{error, Reason0} when not is_tuple(Reason0); element(1, Reason0) =/= 'exit' -> {error, Reason0} when not is_tuple(Reason0); element(1, Reason0) =/= 'exit' ->
Reason = Reason1 =
case Reason0 of case Reason0 of
{unhealthy_target, Message} -> Message; {unhealthy_target, Message} -> Message;
_ -> Reason0 _ -> Reason0
end, end,
Reason = redact(Reason1),
?BAD_REQUEST('TEST_FAILED', Reason) ?BAD_REQUEST('TEST_FAILED', Reason)
end; end;
BadRequest -> BadRequest ->
BadRequest redact(BadRequest)
end. end.
maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeName} = Params) -> maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeName} = Params) ->
@ -608,7 +610,7 @@ create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
{ok, _} -> {ok, _} ->
lookup_from_all_nodes(BridgeType, BridgeName, HttpStatusCode); lookup_from_all_nodes(BridgeType, BridgeName, HttpStatusCode);
{error, Reason} when is_map(Reason) -> {error, Reason} when is_map(Reason) ->
?BAD_REQUEST(map_to_json(emqx_utils:redact(Reason))) ?BAD_REQUEST(map_to_json(redact(Reason)))
end. end.
get_metrics_from_local_node(BridgeType, BridgeName) -> get_metrics_from_local_node(BridgeType, BridgeName) ->
@ -990,7 +992,9 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
{error, timeout} -> {error, timeout} ->
?BAD_REQUEST(<<"Request timeout">>); ?BAD_REQUEST(<<"Request timeout">>);
{error, {start_pool_failed, Name, Reason}} -> {error, {start_pool_failed, Name, Reason}} ->
Msg = bin(io_lib:format("Failed to start ~p pool for reason ~p", [Name, Reason])), Msg = bin(
io_lib:format("Failed to start ~p pool for reason ~p", [Name, redact(Reason)])
),
?BAD_REQUEST(Msg); ?BAD_REQUEST(Msg);
{error, not_found} -> {error, not_found} ->
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
@ -1007,7 +1011,7 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
{error, {unhealthy_target, Message}} -> {error, {unhealthy_target, Message}} ->
?BAD_REQUEST(Message); ?BAD_REQUEST(Message);
{error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
?BAD_REQUEST(Reason) ?BAD_REQUEST(redact(Reason))
end. end.
maybe_try_restart(all, start_bridges_to_all_nodes, Args) -> maybe_try_restart(all, start_bridges_to_all_nodes, Args) ->
@ -1071,7 +1075,15 @@ deobfuscate(NewConf, OldConf) ->
NewConf NewConf
). ).
map_to_json(M) -> map_to_json(M0) ->
emqx_utils_json:encode( %% When dealing with Hocon validation errors, `value' might contain non-serializable
emqx_utils_maps:jsonable_map(M, fun(K, V) -> {K, emqx_utils_maps:binary_string(V)} end) %% values (e.g.: user_lookup_fun), so we try again without that key if serialization
). %% fails as a best effort.
M1 = emqx_utils_maps:jsonable_map(M0, fun(K, V) -> {K, emqx_utils_maps:binary_string(V)} end),
try
emqx_utils_json:encode(M1)
catch
error:_ ->
M2 = maps:without([value, <<"value">>], M1),
emqx_utils_json:encode(M2)
end.

View File

@ -261,21 +261,31 @@ recreate(Type, Name, Conf, Opts) ->
create_dry_run(Type, Conf0) -> create_dry_run(Type, Conf0) ->
TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
TmpPath = emqx_utils:safe_filename(TmpName), TmpPath = emqx_utils:safe_filename(TmpName),
Conf = emqx_utils_maps:safe_atom_key_map(Conf0), %% Already typechecked, no need to catch errors
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of TypeBin = bin(Type),
{error, Reason} -> TypeAtom = safe_atom(Type),
{error, Reason}; Conf1 = maps:without([<<"name">>], Conf0),
{ok, ConfNew} -> RawConf = #{<<"bridges">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
try try
#{bridges := #{TypeAtom := #{temp_name := Conf}}} =
hocon_tconf:check_plain(
emqx_bridge_schema,
RawConf,
#{atom_key => true, required => false}
),
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
{error, Reason} ->
{error, Reason};
{ok, ConfNew} ->
ParseConf = parse_confs(bin(Type), TmpName, ConfNew), ParseConf = parse_confs(bin(Type), TmpName, ConfNew),
emqx_resource:create_dry_run_local(bridge_to_resource_type(Type), ParseConf) emqx_resource:create_dry_run_local(bridge_to_resource_type(Type), ParseConf)
catch end
%% validation errors catch
throw:Reason -> %% validation errors
{error, Reason} throw:Reason1 ->
after {error, Reason1}
_ = file:del_dir_r(emqx_tls_lib:pem_dir(TmpPath)) after
end _ = file:del_dir_r(emqx_tls_lib:pem_dir(TmpPath))
end. end.
remove(BridgeId) -> remove(BridgeId) ->
@ -415,6 +425,9 @@ bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8);
safe_atom(Atom) when is_atom(Atom) -> Atom.
parse_opts(Conf, Opts0) -> parse_opts(Conf, Opts0) ->
override_start_after_created(Conf, Opts0). override_start_after_created(Conf, Opts0).

View File

@ -212,6 +212,19 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
ct:pal("bridge probe result: ~p", [Res]), ct:pal("bridge probe result: ~p", [Res]),
Res. Res.
try_decode_error(Body0) ->
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
{ok, #{<<"message">> := Msg0} = Body1} ->
case emqx_utils_json:safe_decode(Msg0, [return_maps]) of
{ok, Msg1} -> Body1#{<<"message">> := Msg1};
{error, _} -> Body1
end;
{ok, Body1} ->
Body1;
{error, _} ->
Body0
end.
create_rule_and_action_http(BridgeType, RuleTopic, Config) -> create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}). create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [ {application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"}, {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -363,9 +363,9 @@ service_account_json_validator(Map) ->
{[], <<"service_account">>} -> {[], <<"service_account">>} ->
ok; ok;
{[], Type} -> {[], Type} ->
{error, {wrong_type, Type}}; {error, #{wrong_type => Type}};
{_, _} -> {_, _} ->
{error, {missing_keys, MissingKeys}} {error, #{missing_keys => MissingKeys}}
end. end.
service_account_json_converter(Map) when is_map(Map) -> service_account_json_converter(Map) when is_map(Map) ->
@ -382,7 +382,8 @@ service_account_json_converter(Val) ->
consumer_topic_mapping_validator(_TopicMapping = []) -> consumer_topic_mapping_validator(_TopicMapping = []) ->
{error, "There must be at least one GCP PubSub-MQTT topic mapping"}; {error, "There must be at least one GCP PubSub-MQTT topic mapping"};
consumer_topic_mapping_validator(TopicMapping = [_ | _]) -> consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) ->
TopicMapping = [emqx_utils_maps:binary_key_map(TM) || TM <- TopicMapping0],
NumEntries = length(TopicMapping), NumEntries = length(TopicMapping),
PubSubTopics = [KT || #{<<"pubsub_topic">> := KT} <- TopicMapping], PubSubTopics = [KT || #{<<"pubsub_topic">> := KT} <- TopicMapping],
DistinctPubSubTopics = length(lists:usort(PubSubTopics)), DistinctPubSubTopics = length(lists:usort(PubSubTopics)),

View File

@ -220,10 +220,10 @@ parse_jwt_config(ResourceId, #{
service_account_json := ServiceAccountJSON service_account_json := ServiceAccountJSON
}) -> }) ->
#{ #{
project_id := ProjectId, <<"project_id">> := ProjectId,
private_key_id := KId, <<"private_key_id">> := KId,
private_key := PrivateKeyPEM, <<"private_key">> := PrivateKeyPEM,
client_email := ServiceAccountEmail <<"client_email">> := ServiceAccountEmail
} = ServiceAccountJSON, } = ServiceAccountJSON,
%% fixed for pubsub; trailing slash is important. %% fixed for pubsub; trailing slash is important.
Aud = <<"https://pubsub.googleapis.com/">>, Aud = <<"https://pubsub.googleapis.com/">>,

View File

@ -64,7 +64,9 @@ callback_mode() -> async_if_possible.
query_mode(_Config) -> no_queries. query_mode(_Config) -> no_queries.
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
on_start(InstanceId, Config) -> on_start(InstanceId, Config0) ->
%% ensure it's a binary key map
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
{ok, Client} -> {ok, Client} ->
start_consumers(InstanceId, Client, Config); start_consumers(InstanceId, Client, Config);
@ -125,7 +127,7 @@ start_consumers(InstanceId, Client, Config) ->
consumer := ConsumerConfig0, consumer := ConsumerConfig0,
hookpoint := Hookpoint, hookpoint := Hookpoint,
resource_opts := #{request_ttl := RequestTTL}, resource_opts := #{request_ttl := RequestTTL},
service_account_json := #{project_id := ProjectId} service_account_json := #{<<"project_id">> := ProjectId}
} = Config, } = Config,
ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0), ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0),
TopicMapping = maps:get(topic_mapping, ConsumerConfig1), TopicMapping = maps:get(topic_mapping, ConsumerConfig1),

View File

@ -50,15 +50,16 @@ callback_mode() -> async_if_possible.
query_mode(_Config) -> async. query_mode(_Config) -> async.
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
on_start(InstanceId, Config) -> on_start(InstanceId, Config0) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_gcp_pubsub_bridge", msg => "starting_gcp_pubsub_bridge",
config => Config config => Config0
}), }),
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
#{ #{
payload_template := PayloadTemplate, payload_template := PayloadTemplate,
pubsub_topic := PubSubTopic, pubsub_topic := PubSubTopic,
service_account_json := #{project_id := ProjectId} service_account_json := #{<<"project_id">> := ProjectId}
} = Config, } = Config,
case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
{ok, Client} -> {ok, Client} ->

View File

@ -275,14 +275,13 @@ ensure_topic(Config, Topic) ->
start_control_client() -> start_control_client() ->
RawServiceAccount = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(), RawServiceAccount = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
ServiceAccount = emqx_utils_maps:unsafe_atom_key_map(RawServiceAccount),
ConnectorConfig = ConnectorConfig =
#{ #{
connect_timeout => 5_000, connect_timeout => 5_000,
max_retries => 0, max_retries => 0,
pool_size => 1, pool_size => 1,
resource_opts => #{request_ttl => 5_000}, resource_opts => #{request_ttl => 5_000},
service_account_json => ServiceAccount service_account_json => RawServiceAccount
}, },
PoolName = <<"control_connector">>, PoolName = <<"control_connector">>,
{ok, Client} = emqx_bridge_gcp_pubsub_client:start(PoolName, ConnectorConfig), {ok, Client} = emqx_bridge_gcp_pubsub_client:start(PoolName, ConnectorConfig),

View File

@ -196,16 +196,27 @@ create_bridge_http(Config, GCPPubSubConfigOverrides) ->
Path = emqx_mgmt_api_test_util:api_path(["bridges"]), Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
ProbeResult = emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params), Opts = #{return_all => true},
ProbeResult = emqx_mgmt_api_test_util:request_api(
post, ProbePath, "", AuthHeader, Params, Opts
),
ct:pal("creating bridge (via http): ~p", [Params]), ct:pal("creating bridge (via http): ~p", [Params]),
ct:pal("probe result: ~p", [ProbeResult]), ct:pal("probe result: ~p", [ProbeResult]),
Res = Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, Res0} -> {ok, emqx_utils_json:decode(Res0, [return_maps])}; {ok, {Status, Headhers, Res0}} ->
Error -> Error {ok, {Status, Headhers, emqx_utils_json:decode(Res0, [return_maps])}};
{error, {Status, Headers, Body0}} ->
{error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
Error ->
Error
end, end,
ct:pal("bridge creation result: ~p", [Res]), ct:pal("bridge creation result: ~p", [Res]),
?assertEqual(element(1, ProbeResult), element(1, Res)), ?assertEqual(element(1, ProbeResult), element(1, Res)),
case ProbeResult of
{error, {{_, 500, _}, _, _}} -> error({bad_probe_result, ProbeResult});
_ -> ok
end,
Res. Res.
create_rule_and_action_http(Config) -> create_rule_and_action_http(Config) ->
@ -821,7 +832,7 @@ t_not_of_service_account_type(Config) ->
?assertMatch( ?assertMatch(
{error, #{ {error, #{
kind := validation_error, kind := validation_error,
reason := {wrong_type, <<"not a service account">>}, reason := #{wrong_type := <<"not a service account">>},
%% should be censored as it contains secrets %% should be censored as it contains secrets
value := <<"******">> value := <<"******">>
}}, }},
@ -832,6 +843,23 @@ t_not_of_service_account_type(Config) ->
} }
) )
), ),
?assertMatch(
{error,
{{_, 400, _}, _, #{
<<"message">> := #{
<<"kind">> := <<"validation_error">>,
<<"reason">> := #{<<"wrong_type">> := <<"not a service account">>},
%% should be censored as it contains secrets
<<"value">> := <<"******">>
}
}}},
create_bridge_http(
Config,
#{
<<"service_account_json">> => #{<<"type">> => <<"not a service account">>}
}
)
),
ok. ok.
t_json_missing_fields(Config) -> t_json_missing_fields(Config) ->
@ -840,13 +868,15 @@ t_json_missing_fields(Config) ->
{error, #{ {error, #{
kind := validation_error, kind := validation_error,
reason := reason :=
{missing_keys, [ #{
<<"client_email">>, missing_keys := [
<<"private_key">>, <<"client_email">>,
<<"private_key_id">>, <<"private_key">>,
<<"project_id">>, <<"private_key_id">>,
<<"type">> <<"project_id">>,
]}, <<"type">>
]
},
%% should be censored as it contains secrets %% should be censored as it contains secrets
value := <<"******">> value := <<"******">>
}}, }},
@ -855,6 +885,30 @@ t_json_missing_fields(Config) ->
| Config | Config
]) ])
), ),
?assertMatch(
{error,
{{_, 400, _}, _, #{
<<"message">> := #{
<<"kind">> := <<"validation_error">>,
<<"reason">> :=
#{
<<"missing_keys">> := [
<<"client_email">>,
<<"private_key">>,
<<"private_key_id">>,
<<"project_id">>,
<<"type">>
]
},
%% should be censored as it contains secrets
<<"value">> := <<"******">>
}
}}},
create_bridge_http([
{gcp_pubsub_config, GCPPubSubConfig0#{<<"service_account_json">> := #{}}}
| Config
])
),
ok. ok.
t_invalid_private_key(Config) -> t_invalid_private_key(Config) ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_http, [ {application, emqx_bridge_http, [
{description, "EMQX HTTP Bridge and Connector Application"}, {description, "EMQX HTTP Bridge and Connector Application"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]}, {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
{env, []}, {env, []},

View File

@ -155,7 +155,16 @@ desc("request") ->
desc(_) -> desc(_) ->
undefined. undefined.
validate_method(M) when M =:= <<"post">>; M =:= <<"put">>; M =:= <<"get">>; M =:= <<"delete">> -> validate_method(M) when
M =:= <<"post">>;
M =:= <<"put">>;
M =:= <<"get">>;
M =:= <<"delete">>;
M =:= post;
M =:= put;
M =:= get;
M =:= delete
->
ok; ok;
validate_method(M) -> validate_method(M) ->
case string:find(M, "${") of case string:find(M, "${") of

View File

@ -82,6 +82,14 @@ init_per_testcase(t_rule_action_expired, Config) ->
{bridge_name, ?BRIDGE_NAME} {bridge_name, ?BRIDGE_NAME}
| Config | Config
]; ];
init_per_testcase(t_bridge_probes_header_atoms, Config) ->
HTTPPath = <<"/path">>,
ServerSSLOpts = false,
{ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
_Port = random, HTTPPath, ServerSSLOpts
),
ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()),
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Server = start_http_server(#{response_delay_ms => 0}), Server = start_http_server(#{response_delay_ms => 0}),
[{http_server, Server} | Config]. [{http_server, Server} | Config].
@ -89,7 +97,8 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(TestCase, _Config) when end_per_testcase(TestCase, _Config) when
TestCase =:= t_path_not_found; TestCase =:= t_path_not_found;
TestCase =:= t_too_many_requests; TestCase =:= t_too_many_requests;
TestCase =:= t_rule_action_expired TestCase =:= t_rule_action_expired;
TestCase =:= t_bridge_probes_header_atoms
-> ->
ok = emqx_bridge_http_connector_test_server:stop(), ok = emqx_bridge_http_connector_test_server:stop(),
persistent_term:erase({?MODULE, times_called}), persistent_term:erase({?MODULE, times_called}),
@ -292,6 +301,22 @@ make_bridge(Config) ->
), ),
emqx_bridge_resource:bridge_id(Type, Name). emqx_bridge_resource:bridge_id(Type, Name).
success_http_handler() ->
TestPid = self(),
fun(Req0, State) ->
{ok, Body, Req} = cowboy_req:read_body(Req0),
Headers = cowboy_req:headers(Req),
ct:pal("http request received: ~p", [#{body => Body, headers => Headers}]),
TestPid ! {http, Headers, Body},
Rep = cowboy_req:reply(
200,
#{<<"content-type">> => <<"text/plain">>},
<<"hello">>,
Req
),
{ok, Rep, State}
end.
not_found_http_handler() -> not_found_http_handler() ->
TestPid = self(), TestPid = self(),
fun(Req0, State) -> fun(Req0, State) ->
@ -613,6 +638,55 @@ t_rule_action_expired(Config) ->
), ),
ok. ok.
t_bridge_probes_header_atoms(Config) ->
#{port := Port, path := Path} = ?config(http_server, Config),
?check_trace(
begin
LocalTopic = <<"t/local/topic">>,
BridgeConfig0 = bridge_async_config(#{
type => ?BRIDGE_TYPE,
name => ?BRIDGE_NAME,
port => Port,
path => Path,
resume_interval => "100ms",
connect_timeout => "1s",
request_timeout => "100ms",
resource_request_ttl => "100ms",
local_topic => LocalTopic
}),
BridgeConfig = BridgeConfig0#{
<<"headers">> => #{
<<"some-non-existent-atom">> => <<"x">>
}
},
?assertMatch(
{ok, {{_, 204, _}, _Headers, _Body}},
probe_bridge_api(BridgeConfig)
),
?assertMatch(
{ok, {{_, 201, _}, _Headers, _Body}},
emqx_bridge_testlib:create_bridge_api(
?BRIDGE_TYPE,
?BRIDGE_NAME,
BridgeConfig
)
),
Msg = emqx_message:make(LocalTopic, <<"hi">>),
emqx:publish(Msg),
receive
{http, Headers, _Body} ->
?assertMatch(#{<<"some-non-existent-atom">> := <<"x">>}, Headers),
ok
after 5_000 ->
ct:pal("mailbox: ~p", [process_info(self(), messages)]),
ct:fail("request not made")
end,
ok
end,
[]
),
ok.
%% helpers %% helpers
do_t_async_retries(TestContext, Error, Fn) -> do_t_async_retries(TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext, #{error_attempts := ErrorAttempts} = TestContext,
@ -659,3 +733,17 @@ remove_message_id(MessageIDs, #{body := IDBin}) ->
ID = erlang:binary_to_integer(IDBin), ID = erlang:binary_to_integer(IDBin),
%% It is acceptable to get the same message more than once %% It is acceptable to get the same message more than once
maps:without([ID], MessageIDs). maps:without([ID], MessageIDs).
probe_bridge_api(BridgeConfig) ->
Params = BridgeConfig#{<<"type">> => ?BRIDGE_TYPE, <<"name">> => ?BRIDGE_NAME},
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("probing bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
Error -> Error
end,
ct:pal("bridge probe result: ~p", [Res]),
Res.

View File

@ -91,3 +91,121 @@ is_unwrapped_headers(Headers) ->
is_unwrapped_header({_, V}) when is_function(V) -> false; is_unwrapped_header({_, V}) when is_function(V) -> false;
is_unwrapped_header({_, [{str, _V}]}) -> throw(unexpected_tmpl_token); is_unwrapped_header({_, [{str, _V}]}) -> throw(unexpected_tmpl_token);
is_unwrapped_header(_) -> true. is_unwrapped_header(_) -> true.
method_validator_test() ->
Conf0 = parse(webhook_config_hocon()),
?assertMatch(
#{<<"method">> := _},
emqx_utils_maps:deep_get([<<"bridges">>, <<"webhook">>, <<"a">>], Conf0)
),
lists:foreach(
fun(Method) ->
Conf1 = emqx_utils_maps:deep_put(
[<<"bridges">>, <<"webhook">>, <<"a">>, <<"method">>],
Conf0,
Method
),
?assertMatch(
#{},
check(Conf1),
#{method => Method}
),
?assertMatch(
#{},
check_atom_key(Conf1),
#{method => Method}
),
ok
end,
[<<"post">>, <<"put">>, <<"get">>, <<"delete">>]
),
lists:foreach(
fun(Method) ->
Conf1 = emqx_utils_maps:deep_put(
[<<"bridges">>, <<"webhook">>, <<"a">>, <<"method">>],
Conf0,
Method
),
?assertThrow(
{_, [
#{
kind := validation_error,
reason := not_a_enum_symbol
}
]},
check(Conf1),
#{method => Method}
),
?assertThrow(
{_, [
#{
kind := validation_error,
reason := not_a_enum_symbol
}
]},
check_atom_key(Conf1),
#{method => Method}
),
ok
end,
[<<"x">>, <<"patch">>, <<"options">>]
),
ok.
%%===========================================================================
%% Helper functions
%%===========================================================================
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
%% what bridge creation does
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
%% what bridge probe does
check_atom_key(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}).
%%===========================================================================
%% Data section
%%===========================================================================
%% erlfmt-ignore
webhook_config_hocon() ->
"""
bridges.webhook.a {
body = \"${.}\"
connect_timeout = 15s
enable = false
enable_pipelining = 100
headers {content-type = \"application/json\", jjjjjjjjjjjjjjjjjjj = jjjjjjj}
max_retries = 2
method = post
pool_size = 8
pool_type = random
resource_opts {
health_check_interval = 15s
inflight_window = 100
max_buffer_bytes = 1GB
query_mode = async
request_ttl = 45s
start_after_created = true
start_timeout = 5s
worker_pool_size = 4
}
ssl {
ciphers = []
depth = 10
enable = false
hibernate_after = 5s
log_level = notice
reuse_sessions = true
secure_renegotiate = true
verify = verify_peer
versions = [tlsv1.3, tlsv1.2]
}
url = \"http://some.host:4000/api/echo\"
}
""".

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [ {application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"}, {description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, [emqx_bridge_kafka_consumer_sup]}, {registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [ {applications, [
kernel, kernel,

View File

@ -528,7 +528,8 @@ kafka_producer_converter(Config, _HoconOpts) ->
consumer_topic_mapping_validator(_TopicMapping = []) -> consumer_topic_mapping_validator(_TopicMapping = []) ->
{error, "There must be at least one Kafka-MQTT topic mapping"}; {error, "There must be at least one Kafka-MQTT topic mapping"};
consumer_topic_mapping_validator(TopicMapping = [_ | _]) -> consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) ->
TopicMapping = [emqx_utils_maps:binary_key_map(TM) || TM <- TopicMapping0],
NumEntries = length(TopicMapping), NumEntries = length(TopicMapping),
KafkaTopics = [KT || #{<<"kafka_topic">> := KT} <- TopicMapping], KafkaTopics = [KT || #{<<"kafka_topic">> := KT} <- TopicMapping],
DistinctKafkaTopics = length(lists:usort(KafkaTopics)), DistinctKafkaTopics = length(lists:usort(KafkaTopics)),
@ -539,6 +540,13 @@ consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
{error, "Kafka topics must not be repeated in a bridge"} {error, "Kafka topics must not be repeated in a bridge"}
end. end.
producer_strategy_key_validator(
#{
partition_strategy := _,
message := #{key := _}
} = Conf
) ->
producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf));
producer_strategy_key_validator(#{ producer_strategy_key_validator(#{
<<"partition_strategy">> := key_dispatch, <<"partition_strategy">> := key_dispatch,
<<"message">> := #{<<"key">> := ""} <<"message">> := #{<<"key">> := ""}

View File

@ -166,11 +166,24 @@ message_key_dispatch_validations_test() ->
]}, ]},
check(Conf) check(Conf)
), ),
%% ensure atoms exist
_ = [myproducer],
?assertThrow(
{_, [
#{
path := "bridges.kafka.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},
check_atom_key(Conf)
),
ok. ok.
tcp_keepalive_validation_test_() -> tcp_keepalive_validation_test_() ->
ProducerConf = parse(kafka_producer_new_hocon()), ProducerConf = parse(kafka_producer_new_hocon()),
ConsumerConf = parse(kafka_consumer_hocon()), ConsumerConf = parse(kafka_consumer_hocon()),
%% ensure atoms exist
_ = [my_producer, my_consumer],
test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++ test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++
test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf). test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf).
@ -184,7 +197,9 @@ test_keepalive_validation(Name, Conf) ->
InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>), InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>),
InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2], InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2],
[?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++ [?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++
[?_assertThrow(_, check(C)) || C <- InvalidConfs]. [?_assertMatch(#{bridges := _}, check_atom_key(C)) || C <- ValidConfs] ++
[?_assertThrow(_, check(C)) || C <- InvalidConfs] ++
[?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs].
%%=========================================================================== %%===========================================================================
%% Helper functions %% Helper functions
@ -194,9 +209,14 @@ parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon), {ok, Conf} = hocon:binary(Hocon),
Conf. Conf.
%% what bridge creation does
check(Conf) when is_map(Conf) -> check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf). hocon_tconf:check_plain(emqx_bridge_schema, Conf).
%% what bridge probe does
check_atom_key(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}).
%%=========================================================================== %%===========================================================================
%% Data section %% Data section
%%=========================================================================== %%===========================================================================

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_oracle, [ {application, emqx_bridge_oracle, [
{description, "EMQX Enterprise Oracle Database Bridge"}, {description, "EMQX Enterprise Oracle Database Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -108,6 +108,8 @@ type_field(Type) ->
name_field() -> name_field() ->
{name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
config_validator(#{server := _} = Config) ->
config_validator(emqx_utils_maps:binary_key_map(Config));
config_validator(#{<<"server">> := Server} = Config) when config_validator(#{<<"server">> := Server} = Config) when
not is_map(Server) andalso not is_map(Server) andalso
not is_map_key(<<"sid">>, Config) andalso not is_map_key(<<"sid">>, Config) andalso

View File

@ -305,6 +305,8 @@ create_bridge_api(Config, Overrides) ->
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} -> {ok, {Status, Headers, Body0}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
{error, {Status, Headers, Body0}} ->
{error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
Error -> Error ->
Error Error
end, end,
@ -348,8 +350,12 @@ probe_bridge_api(Config, Overrides) ->
ct:pal("probing bridge (via http): ~p", [Params]), ct:pal("probing bridge (via http): ~p", [Params]),
Res = Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; {ok, {{_, 204, _}, _Headers, _Body0} = Res0} ->
Error -> Error {ok, Res0};
{error, {Status, Headers, Body0}} ->
{error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
Error ->
Error
end, end,
ct:pal("bridge probe result: ~p", [Res]), ct:pal("bridge probe result: ~p", [Res]),
Res. Res.
@ -630,6 +636,30 @@ t_no_sid_nor_service_name(Config0) ->
{error, #{kind := validation_error, reason := "neither SID nor Service Name was set"}}, {error, #{kind := validation_error, reason := "neither SID nor Service Name was set"}},
create_bridge(Config) create_bridge(Config)
), ),
?assertMatch(
{error,
{{_, 400, _}, _, #{
<<"message">> := #{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"neither SID nor Service Name was set">>,
%% should be censored as it contains secrets
<<"value">> := #{<<"password">> := <<"******">>}
}
}}},
create_bridge_api(Config)
),
?assertMatch(
{error,
{{_, 400, _}, _, #{
<<"message">> := #{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"neither SID nor Service Name was set">>,
%% should be censored as it contains secrets
<<"value">> := #{<<"password">> := <<"******">>}
}
}}},
probe_bridge_api(Config)
),
ok. ok.
t_missing_table(Config) -> t_missing_table(Config) ->

View File

@ -10,6 +10,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_resource_errors.hrl").
% SQL definitions % SQL definitions
-define(SQL_BRIDGE, -define(SQL_BRIDGE,
@ -690,10 +691,14 @@ t_table_removed(Config) ->
connect_and_drop_table(Config), connect_and_drop_table(Config),
Val = integer_to_binary(erlang:unique_integer()), Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
?assertMatch( case query_resource_sync(Config, {send_message, SentData, []}) of
{error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}}, {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}} ->
query_resource_sync(Config, {send_message, SentData, []}) ok;
), ?RESOURCE_ERROR_M(not_connected, _) ->
ok;
Res ->
ct:fail("unexpected result: ~p", [Res])
end,
ok ok
end, end,
[] []

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [ {application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"}, {description, "EMQX Pulsar Bridge"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -220,6 +220,13 @@ conn_bridge_examples(_Method) ->
} }
]. ].
producer_strategy_key_validator(
#{
strategy := _,
message := #{key := _}
} = Conf
) ->
producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf));
producer_strategy_key_validator(#{ producer_strategy_key_validator(#{
<<"strategy">> := key_dispatch, <<"strategy">> := key_dispatch,
<<"message">> := #{<<"key">> := ""} <<"message">> := #{<<"key">> := ""}
@ -257,7 +264,12 @@ override_default(OriginalFn, NewDefault) ->
auth_union_member_selector(all_union_members) -> auth_union_member_selector(all_union_members) ->
[none, ref(auth_basic), ref(auth_token)]; [none, ref(auth_basic), ref(auth_token)];
auth_union_member_selector({value, V}) -> auth_union_member_selector({value, V0}) ->
V =
case is_map(V0) of
true -> emqx_utils_maps:binary_key_map(V0);
false -> V0
end,
case V of case V of
#{<<"password">> := _} -> #{<<"password">> := _} ->
[ref(auth_basic)]; [ref(auth_basic)];
@ -265,6 +277,8 @@ auth_union_member_selector({value, V}) ->
[ref(auth_token)]; [ref(auth_token)];
<<"none">> -> <<"none">> ->
[none]; [none];
none ->
[none];
_ -> _ ->
Expected = "none | basic | token", Expected = "none | basic | token",
throw(#{ throw(#{

View File

@ -40,6 +40,7 @@ groups() ->
only_once_tests() -> only_once_tests() ->
[ [
t_create_via_http, t_create_via_http,
t_strategy_key_validation,
t_start_when_down, t_start_when_down,
t_send_when_down, t_send_when_down,
t_send_when_timeout, t_send_when_timeout,
@ -313,6 +314,8 @@ create_bridge_api(Config, Overrides) ->
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} -> {ok, {Status, Headers, Body0}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
{error, {Status, Headers, Body0}} ->
{error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
Error -> Error ->
Error Error
end, end,
@ -356,8 +359,12 @@ probe_bridge_api(Config, Overrides) ->
ct:pal("probing bridge (via http): ~p", [Params]), ct:pal("probing bridge (via http): ~p", [Params]),
Res = Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; {ok, {{_, 204, _}, _Headers, _Body0} = Res0} ->
Error -> Error {ok, Res0};
{error, {Status, Headers, Body0}} ->
{error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
Error ->
Error
end, end,
ct:pal("bridge probe result: ~p", [Res]), ct:pal("bridge probe result: ~p", [Res]),
Res. Res.
@ -1074,6 +1081,37 @@ t_resource_manager_crash_before_producers_started(Config) ->
), ),
ok. ok.
t_strategy_key_validation(Config) ->
?assertMatch(
{error,
{{_, 400, _}, _, #{
<<"message">> :=
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"Message key cannot be empty", _/binary>>
} = Msg
}}},
probe_bridge_api(
Config,
#{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}}
)
),
?assertMatch(
{error,
{{_, 400, _}, _, #{
<<"message">> :=
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"Message key cannot be empty", _/binary>>
} = Msg
}}},
create_bridge_api(
Config,
#{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}}
)
),
ok.
t_cluster(Config0) -> t_cluster(Config0) ->
ct:timetrap({seconds, 120}), ct:timetrap({seconds, 120}),
?retrying(Config0, 3, fun do_t_cluster/1). ?retrying(Config0, 3, fun do_t_cluster/1).

View File

@ -35,6 +35,17 @@ pulsar_producer_validations_test() ->
]}, ]},
check(Conf) check(Conf)
), ),
%% ensure atoms exist
_ = [my_producer],
?assertThrow(
{_, [
#{
path := "bridges.pulsar_producer.my_producer",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},
check_atom_key(Conf)
),
ok. ok.
@ -46,9 +57,14 @@ parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon), {ok, Conf} = hocon:binary(Hocon),
Conf. Conf.
%% what bridge creation does
check(Conf) when is_map(Conf) -> check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf). hocon_tconf:check_plain(emqx_bridge_schema, Conf).
%% what bridge probe does
check_atom_key(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}).
%%=========================================================================== %%===========================================================================
%% Data section %% Data section
%%=========================================================================== %%===========================================================================