From 5c8dc092a18cbc387ebeee27b12eed0b91e698d4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 1 Aug 2023 09:38:35 -0300 Subject: [PATCH] fix(http_bridge): don't attempt to convert headers to atoms Fixes https://emqx.atlassian.net/browse/EMQX-10653 --- apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 29 ++++- apps/emqx_bridge/src/emqx_bridge_resource.erl | 15 ++- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 13 ++ .../src/emqx_bridge_gcp_pubsub.app.src | 2 +- .../src/emqx_bridge_gcp_pubsub.erl | 7 +- .../src/emqx_bridge_gcp_pubsub_client.erl | 8 +- .../emqx_bridge_gcp_pubsub_impl_consumer.erl | 6 +- .../emqx_bridge_gcp_pubsub_impl_producer.erl | 7 +- .../emqx_bridge_gcp_pubsub_consumer_SUITE.erl | 3 +- .../emqx_bridge_gcp_pubsub_producer_SUITE.erl | 78 ++++++++++-- .../src/emqx_bridge_http.app.src | 2 +- .../src/emqx_bridge_http_connector.erl | 11 +- .../test/emqx_bridge_http_SUITE.erl | 90 ++++++++++++- .../test/emqx_bridge_http_connector_tests.erl | 118 ++++++++++++++++++ .../src/emqx_bridge_kafka.app.src | 2 +- .../src/emqx_bridge_kafka.erl | 10 +- .../test/emqx_bridge_kafka_tests.erl | 22 +++- .../src/emqx_bridge_oracle.app.src | 2 +- .../src/emqx_bridge_oracle.erl | 2 + .../test/emqx_bridge_oracle_SUITE.erl | 34 ++++- .../src/emqx_bridge_pulsar.app.src | 2 +- .../src/emqx_bridge_pulsar.erl | 16 ++- ...emqx_bridge_pulsar_impl_producer_SUITE.erl | 42 ++++++- .../test/emqx_bridge_pulsar_tests.erl | 16 +++ 25 files changed, 491 insertions(+), 48 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index c9ea7d6bc..96d953e34 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.25"}, + {vsn, "0.1.26"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index e1c3ee987..5ba04a166 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -541,7 +541,7 @@ schema("/bridges_probe") -> case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of {ok, #{body := #{<<"type">> := ConnType} = Params}} -> Params1 = maybe_deobfuscate_bridge_probe(Params), - case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of + try emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of ok -> ?NO_CONTENT; {error, #{kind := validation_error} = Reason} -> @@ -553,6 +553,15 @@ schema("/bridges_probe") -> _ -> Reason0 end, ?BAD_REQUEST('TEST_FAILED', Reason) + catch + %% We need to catch hocon validation errors here as well because, + %% currently, when defining the API union member selector, we can only use + %% references to fields, and they don't share whole-bridge validators if + %% they exist. Such validators will only be triggered by + %% `create_dry_run'... + throw:{_Schema, [#{kind := validation_error} = Reason0]} -> + Reason = redact(Reason0), + ?BAD_REQUEST('TEST_FAILED', map_to_json(Reason)) end; BadRequest -> BadRequest @@ -608,7 +617,7 @@ create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) -> {ok, _} -> lookup_from_all_nodes(BridgeType, BridgeName, HttpStatusCode); {error, Reason} when is_map(Reason) -> - ?BAD_REQUEST(map_to_json(emqx_utils:redact(Reason))) + ?BAD_REQUEST(map_to_json(redact(Reason))) end. get_metrics_from_local_node(BridgeType, BridgeName) -> @@ -1071,7 +1080,15 @@ deobfuscate(NewConf, OldConf) -> NewConf ). -map_to_json(M) -> - emqx_utils_json:encode( - emqx_utils_maps:jsonable_map(M, fun(K, V) -> {K, emqx_utils_maps:binary_string(V)} end) - ). +map_to_json(M0) -> + %% When dealing with Hocon validation errors, `value' might contain non-serializable + %% values (e.g.: user_lookup_fun), so we try again without that key if serialization + %% fails as a best effort. + M1 = emqx_utils_maps:jsonable_map(M0, fun(K, V) -> {K, emqx_utils_maps:binary_string(V)} end), + try + emqx_utils_json:encode(M1) + catch + error:_ -> + M2 = maps:without([value], M1), + emqx_utils_json:encode(M2) + end. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index cd5fd2d24..9a124c8c1 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -261,7 +261,17 @@ recreate(Type, Name, Conf, Opts) -> create_dry_run(Type, Conf0) -> TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), TmpPath = emqx_utils:safe_filename(TmpName), - Conf = emqx_utils_maps:safe_atom_key_map(Conf0), + %% Already typechecked, no need to catch errors + TypeBin = bin(Type), + TypeAtom = safe_atom(Type), + Conf1 = maps:without([<<"name">>], Conf0), + RawConf = #{<<"bridges">> => #{TypeBin => #{<<"a">> => Conf1}}}, + #{bridges := #{TypeAtom := #{a := Conf}}} = + hocon_tconf:check_plain( + emqx_bridge_schema, + RawConf, + #{atom_key => true, required => false} + ), case emqx_connector_ssl:convert_certs(TmpPath, Conf) of {error, Reason} -> {error, Reason}; @@ -415,6 +425,9 @@ bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). +safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8); +safe_atom(Atom) when is_atom(Atom) -> Atom. + parse_opts(Conf, Opts0) -> override_start_after_created(Conf, Opts0). diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index fc35449a7..1c0a3957a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -212,6 +212,19 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> ct:pal("bridge probe result: ~p", [Res]), Res. +try_decode_error(Body0) -> + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, #{<<"message">> := Msg0} = Body1} -> + case emqx_utils_json:safe_decode(Msg0, [return_maps]) of + {ok, Msg1} -> Body1#{<<"message">> := Msg1}; + {error, _} -> Body1 + end; + {ok, Body1} -> + Body1; + {error, _} -> + Body0 + end. + create_rule_and_action_http(BridgeType, RuleTopic, Config) -> create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}). diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index ba3e86eac..9faf65860 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl index 8ef369068..b3792da71 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl @@ -363,9 +363,9 @@ service_account_json_validator(Map) -> {[], <<"service_account">>} -> ok; {[], Type} -> - {error, {wrong_type, Type}}; + {error, #{wrong_type => Type}}; {_, _} -> - {error, {missing_keys, MissingKeys}} + {error, #{missing_keys => MissingKeys}} end. service_account_json_converter(Map) when is_map(Map) -> @@ -382,7 +382,8 @@ service_account_json_converter(Val) -> consumer_topic_mapping_validator(_TopicMapping = []) -> {error, "There must be at least one GCP PubSub-MQTT topic mapping"}; -consumer_topic_mapping_validator(TopicMapping = [_ | _]) -> +consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) -> + TopicMapping = [emqx_utils_maps:binary_key_map(TM) || TM <- TopicMapping0], NumEntries = length(TopicMapping), PubSubTopics = [KT || #{<<"pubsub_topic">> := KT} <- TopicMapping], DistinctPubSubTopics = length(lists:usort(PubSubTopics)), diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl index 80283ee73..cb4aa853c 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl @@ -220,10 +220,10 @@ parse_jwt_config(ResourceId, #{ service_account_json := ServiceAccountJSON }) -> #{ - project_id := ProjectId, - private_key_id := KId, - private_key := PrivateKeyPEM, - client_email := ServiceAccountEmail + <<"project_id">> := ProjectId, + <<"private_key_id">> := KId, + <<"private_key">> := PrivateKeyPEM, + <<"client_email">> := ServiceAccountEmail } = ServiceAccountJSON, %% fixed for pubsub; trailing slash is important. Aud = <<"https://pubsub.googleapis.com/">>, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 8f67d2678..74ee941ec 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -64,7 +64,9 @@ callback_mode() -> async_if_possible. query_mode(_Config) -> no_queries. -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. -on_start(InstanceId, Config) -> +on_start(InstanceId, Config0) -> + %% ensure it's a binary key map + Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0), case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of {ok, Client} -> start_consumers(InstanceId, Client, Config); @@ -125,7 +127,7 @@ start_consumers(InstanceId, Client, Config) -> consumer := ConsumerConfig0, hookpoint := Hookpoint, resource_opts := #{request_ttl := RequestTTL}, - service_account_json := #{project_id := ProjectId} + service_account_json := #{<<"project_id">> := ProjectId} } = Config, ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0), TopicMapping = maps:get(topic_mapping, ConsumerConfig1), diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 1f87d8343..b1ded2121 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -50,15 +50,16 @@ callback_mode() -> async_if_possible. query_mode(_Config) -> async. -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. -on_start(InstanceId, Config) -> +on_start(InstanceId, Config0) -> ?SLOG(info, #{ msg => "starting_gcp_pubsub_bridge", - config => Config + config => Config0 }), + Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0), #{ payload_template := PayloadTemplate, pubsub_topic := PubSubTopic, - service_account_json := #{project_id := ProjectId} + service_account_json := #{<<"project_id">> := ProjectId} } = Config, case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of {ok, Client} -> diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 7d50304b1..8cb0ef2f9 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -275,14 +275,13 @@ ensure_topic(Config, Topic) -> start_control_client() -> RawServiceAccount = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(), - ServiceAccount = emqx_utils_maps:unsafe_atom_key_map(RawServiceAccount), ConnectorConfig = #{ connect_timeout => 5_000, max_retries => 0, pool_size => 1, resource_opts => #{request_ttl => 5_000}, - service_account_json => ServiceAccount + service_account_json => RawServiceAccount }, PoolName = <<"control_connector">>, {ok, Client} = emqx_bridge_gcp_pubsub_client:start(PoolName, ConnectorConfig), diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index cf992bb23..a9bbf6178 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -196,16 +196,27 @@ create_bridge_http(Config, GCPPubSubConfigOverrides) -> Path = emqx_mgmt_api_test_util:api_path(["bridges"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), - ProbeResult = emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params), + Opts = #{return_all => true}, + ProbeResult = emqx_mgmt_api_test_util:request_api( + post, ProbePath, "", AuthHeader, Params, Opts + ), ct:pal("creating bridge (via http): ~p", [Params]), ct:pal("probe result: ~p", [ProbeResult]), Res = - case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of - {ok, Res0} -> {ok, emqx_utils_json:decode(Res0, [return_maps])}; - Error -> Error + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headhers, Res0}} -> + {ok, {Status, Headhers, emqx_utils_json:decode(Res0, [return_maps])}}; + {error, {Status, Headers, Body0}} -> + {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}}; + Error -> + Error end, ct:pal("bridge creation result: ~p", [Res]), ?assertEqual(element(1, ProbeResult), element(1, Res)), + case ProbeResult of + {error, {{_, 500, _}, _, _}} -> error({bad_probe_result, ProbeResult}); + _ -> ok + end, Res. create_rule_and_action_http(Config) -> @@ -821,7 +832,7 @@ t_not_of_service_account_type(Config) -> ?assertMatch( {error, #{ kind := validation_error, - reason := {wrong_type, <<"not a service account">>}, + reason := #{wrong_type := <<"not a service account">>}, %% should be censored as it contains secrets value := <<"******">> }}, @@ -832,6 +843,23 @@ t_not_of_service_account_type(Config) -> } ) ), + ?assertMatch( + {error, + {{_, 400, _}, _, #{ + <<"message">> := #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := #{<<"wrong_type">> := <<"not a service account">>}, + %% should be censored as it contains secrets + <<"value">> := <<"******">> + } + }}}, + create_bridge_http( + Config, + #{ + <<"service_account_json">> => #{<<"type">> => <<"not a service account">>} + } + ) + ), ok. t_json_missing_fields(Config) -> @@ -840,13 +868,15 @@ t_json_missing_fields(Config) -> {error, #{ kind := validation_error, reason := - {missing_keys, [ - <<"client_email">>, - <<"private_key">>, - <<"private_key_id">>, - <<"project_id">>, - <<"type">> - ]}, + #{ + missing_keys := [ + <<"client_email">>, + <<"private_key">>, + <<"private_key_id">>, + <<"project_id">>, + <<"type">> + ] + }, %% should be censored as it contains secrets value := <<"******">> }}, @@ -855,6 +885,30 @@ t_json_missing_fields(Config) -> | Config ]) ), + ?assertMatch( + {error, + {{_, 400, _}, _, #{ + <<"message">> := #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := + #{ + <<"missing_keys">> := [ + <<"client_email">>, + <<"private_key">>, + <<"private_key_id">>, + <<"project_id">>, + <<"type">> + ] + }, + %% should be censored as it contains secrets + <<"value">> := <<"******">> + } + }}}, + create_bridge_http([ + {gcp_pubsub_config, GCPPubSubConfig0#{<<"service_account_json">> := #{}}} + | Config + ]) + ), ok. t_invalid_private_key(Config) -> diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src index 859f80f53..3849747c7 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src +++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_http, [ {description, "EMQX HTTP Bridge and Connector Application"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]}, {env, []}, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 6e58505c4..42eddbeef 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -155,7 +155,16 @@ desc("request") -> desc(_) -> undefined. -validate_method(M) when M =:= <<"post">>; M =:= <<"put">>; M =:= <<"get">>; M =:= <<"delete">> -> +validate_method(M) when + M =:= <<"post">>; + M =:= <<"put">>; + M =:= <<"get">>; + M =:= <<"delete">>; + M =:= post; + M =:= put; + M =:= get; + M =:= delete +-> ok; validate_method(M) -> case string:find(M, "${") of diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl index 3f3a3f62e..5395460b8 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl @@ -82,6 +82,14 @@ init_per_testcase(t_rule_action_expired, Config) -> {bridge_name, ?BRIDGE_NAME} | Config ]; +init_per_testcase(t_bridge_probes_header_atoms, Config) -> + HTTPPath = <<"/path">>, + ServerSSLOpts = false, + {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( + _Port = random, HTTPPath, ServerSSLOpts + ), + ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()), + [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(_TestCase, Config) -> Server = start_http_server(#{response_delay_ms => 0}), [{http_server, Server} | Config]. @@ -89,7 +97,8 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(TestCase, _Config) when TestCase =:= t_path_not_found; TestCase =:= t_too_many_requests; - TestCase =:= t_rule_action_expired + TestCase =:= t_rule_action_expired; + TestCase =:= t_bridge_probes_header_atoms -> ok = emqx_bridge_http_connector_test_server:stop(), persistent_term:erase({?MODULE, times_called}), @@ -292,6 +301,22 @@ make_bridge(Config) -> ), emqx_bridge_resource:bridge_id(Type, Name). +success_http_handler() -> + TestPid = self(), + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + Headers = cowboy_req:headers(Req), + ct:pal("http request received: ~p", [#{body => Body, headers => Headers}]), + TestPid ! {http, Headers, Body}, + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"text/plain">>}, + <<"hello">>, + Req + ), + {ok, Rep, State} + end. + not_found_http_handler() -> TestPid = self(), fun(Req0, State) -> @@ -613,6 +638,55 @@ t_rule_action_expired(Config) -> ), ok. +t_bridge_probes_header_atoms(Config) -> + #{port := Port, path := Path} = ?config(http_server, Config), + ?check_trace( + begin + LocalTopic = <<"t/local/topic">>, + BridgeConfig0 = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + port => Port, + path => Path, + resume_interval => "100ms", + connect_timeout => "1s", + request_timeout => "100ms", + resource_request_ttl => "100ms", + local_topic => LocalTopic + }), + BridgeConfig = BridgeConfig0#{ + <<"headers">> => #{ + <<"some-non-existent-atom">> => <<"x">> + } + }, + ?assertMatch( + {ok, {{_, 204, _}, _Headers, _Body}}, + probe_bridge_api(BridgeConfig) + ), + ?assertMatch( + {ok, {{_, 201, _}, _Headers, _Body}}, + emqx_bridge_testlib:create_bridge_api( + ?BRIDGE_TYPE, + ?BRIDGE_NAME, + BridgeConfig + ) + ), + Msg = emqx_message:make(LocalTopic, <<"hi">>), + emqx:publish(Msg), + receive + {http, Headers, _Body} -> + ?assertMatch(#{<<"some-non-existent-atom">> := <<"x">>}, Headers), + ok + after 5_000 -> + ct:pal("mailbox: ~p", [process_info(self(), messages)]), + ct:fail("request not made") + end, + ok + end, + [] + ), + ok. + %% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, @@ -659,3 +733,17 @@ remove_message_id(MessageIDs, #{body := IDBin}) -> ID = erlang:binary_to_integer(IDBin), %% It is acceptable to get the same message more than once maps:without([ID], MessageIDs). + +probe_bridge_api(BridgeConfig) -> + Params = BridgeConfig#{<<"type">> => ?BRIDGE_TYPE, <<"name">> => ?BRIDGE_NAME}, + Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("probing bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; + Error -> Error + end, + ct:pal("bridge probe result: ~p", [Res]), + Res. diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl index f5b6b1f46..6b5c2b0cd 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl @@ -91,3 +91,121 @@ is_unwrapped_headers(Headers) -> is_unwrapped_header({_, V}) when is_function(V) -> false; is_unwrapped_header({_, [{str, _V}]}) -> throw(unexpected_tmpl_token); is_unwrapped_header(_) -> true. + +method_validator_test() -> + Conf0 = parse(webhook_config_hocon()), + ?assertMatch( + #{<<"method">> := _}, + emqx_utils_maps:deep_get([<<"bridges">>, <<"webhook">>, <<"a">>], Conf0) + ), + lists:foreach( + fun(Method) -> + Conf1 = emqx_utils_maps:deep_put( + [<<"bridges">>, <<"webhook">>, <<"a">>, <<"method">>], + Conf0, + Method + ), + ?assertMatch( + #{}, + check(Conf1), + #{method => Method} + ), + ?assertMatch( + #{}, + check_atom_key(Conf1), + #{method => Method} + ), + ok + end, + [<<"post">>, <<"put">>, <<"get">>, <<"delete">>] + ), + lists:foreach( + fun(Method) -> + Conf1 = emqx_utils_maps:deep_put( + [<<"bridges">>, <<"webhook">>, <<"a">>, <<"method">>], + Conf0, + Method + ), + ?assertThrow( + {_, [ + #{ + kind := validation_error, + reason := not_a_enum_symbol + } + ]}, + check(Conf1), + #{method => Method} + ), + ?assertThrow( + {_, [ + #{ + kind := validation_error, + reason := not_a_enum_symbol + } + ]}, + check_atom_key(Conf1), + #{method => Method} + ), + ok + end, + [<<"x">>, <<"patch">>, <<"options">>] + ), + ok. + +%%=========================================================================== +%% Helper functions +%%=========================================================================== + +parse(Hocon) -> + {ok, Conf} = hocon:binary(Hocon), + Conf. + +%% what bridge creation does +check(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_bridge_schema, Conf). + +%% what bridge probe does +check_atom_key(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}). + +%%=========================================================================== +%% Data section +%%=========================================================================== + +%% erlfmt-ignore +webhook_config_hocon() -> +""" +bridges.webhook.a { + body = \"${.}\" + connect_timeout = 15s + enable = false + enable_pipelining = 100 + headers {content-type = \"application/json\", jjjjjjjjjjjjjjjjjjj = jjjjjjj} + max_retries = 2 + method = post + pool_size = 8 + pool_type = random + resource_opts { + health_check_interval = 15s + inflight_window = 100 + max_buffer_bytes = 1GB + query_mode = async + request_ttl = 45s + start_after_created = true + start_timeout = 5s + worker_pool_size = 4 + } + ssl { + ciphers = [] + depth = 10 + enable = false + hibernate_after = 5s + log_level = notice + reuse_sessions = true + secure_renegotiate = true + verify = verify_peer + versions = [tlsv1.3, tlsv1.2] + } + url = \"http://some.host:4000/api/echo\" +} +""". diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 7157a1580..3792409c6 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index eeaa7d4b7..9bf2e9950 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -528,7 +528,8 @@ kafka_producer_converter(Config, _HoconOpts) -> consumer_topic_mapping_validator(_TopicMapping = []) -> {error, "There must be at least one Kafka-MQTT topic mapping"}; -consumer_topic_mapping_validator(TopicMapping = [_ | _]) -> +consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) -> + TopicMapping = [emqx_utils_maps:binary_key_map(TM) || TM <- TopicMapping0], NumEntries = length(TopicMapping), KafkaTopics = [KT || #{<<"kafka_topic">> := KT} <- TopicMapping], DistinctKafkaTopics = length(lists:usort(KafkaTopics)), @@ -539,6 +540,13 @@ consumer_topic_mapping_validator(TopicMapping = [_ | _]) -> {error, "Kafka topics must not be repeated in a bridge"} end. +producer_strategy_key_validator( + #{ + partition_strategy := _, + message := #{key := _} + } = Conf +) -> + producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf)); producer_strategy_key_validator(#{ <<"partition_strategy">> := key_dispatch, <<"message">> := #{<<"key">> := ""} diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 3b558200c..367423cd4 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -166,11 +166,24 @@ message_key_dispatch_validations_test() -> ]}, check(Conf) ), + %% ensure atoms exist + _ = [myproducer], + ?assertThrow( + {_, [ + #{ + path := "bridges.kafka.myproducer.kafka", + reason := "Message key cannot be empty when `key_dispatch` strategy is used" + } + ]}, + check_atom_key(Conf) + ), ok. tcp_keepalive_validation_test_() -> ProducerConf = parse(kafka_producer_new_hocon()), ConsumerConf = parse(kafka_consumer_hocon()), + %% ensure atoms exist + _ = [my_producer, my_consumer], test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++ test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf). @@ -184,7 +197,9 @@ test_keepalive_validation(Name, Conf) -> InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>), InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2], [?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++ - [?_assertThrow(_, check(C)) || C <- InvalidConfs]. + [?_assertMatch(#{bridges := _}, check_atom_key(C)) || C <- ValidConfs] ++ + [?_assertThrow(_, check(C)) || C <- InvalidConfs] ++ + [?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs]. %%=========================================================================== %% Helper functions @@ -194,9 +209,14 @@ parse(Hocon) -> {ok, Conf} = hocon:binary(Hocon), Conf. +%% what bridge creation does check(Conf) when is_map(Conf) -> hocon_tconf:check_plain(emqx_bridge_schema, Conf). +%% what bridge probe does +check_atom_key(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}). + %%=========================================================================== %% Data section %%=========================================================================== diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src index 4f46ce464..d68c6ca9a 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_oracle, [ {description, "EMQX Enterprise Oracle Database Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl index 46e118c69..15b2be575 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl @@ -108,6 +108,8 @@ type_field(Type) -> name_field() -> {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. +config_validator(#{server := _} = Config) -> + config_validator(emqx_utils_maps:binary_key_map(Config)); config_validator(#{<<"server">> := Server} = Config) when not is_map(Server) andalso not is_map_key(<<"sid">>, Config) andalso diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index bd3ac289c..6b949b047 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -305,6 +305,8 @@ create_bridge_api(Config, Overrides) -> case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of {ok, {Status, Headers, Body0}} -> {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; + {error, {Status, Headers, Body0}} -> + {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}}; Error -> Error end, @@ -348,8 +350,12 @@ probe_bridge_api(Config, Overrides) -> ct:pal("probing bridge (via http): ~p", [Params]), Res = case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of - {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; - Error -> Error + {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> + {ok, Res0}; + {error, {Status, Headers, Body0}} -> + {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}}; + Error -> + Error end, ct:pal("bridge probe result: ~p", [Res]), Res. @@ -630,6 +636,30 @@ t_no_sid_nor_service_name(Config0) -> {error, #{kind := validation_error, reason := "neither SID nor Service Name was set"}}, create_bridge(Config) ), + ?assertMatch( + {error, + {{_, 400, _}, _, #{ + <<"message">> := #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"neither SID nor Service Name was set">>, + %% should be censored as it contains secrets + <<"value">> := #{<<"password">> := <<"******">>} + } + }}}, + create_bridge_api(Config) + ), + ?assertMatch( + {error, + {{_, 400, _}, _, #{ + <<"message">> := #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"neither SID nor Service Name was set">>, + %% should be censored as it contains secrets + <<"value">> := #{<<"password">> := <<"******">>} + } + }}}, + probe_bridge_api(Config) + ), ok. t_missing_table(Config) -> diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index ed468a833..16c9ce59f 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index 2fa5d70cf..beb8452b2 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -220,6 +220,13 @@ conn_bridge_examples(_Method) -> } ]. +producer_strategy_key_validator( + #{ + strategy := _, + message := #{key := _} + } = Conf +) -> + producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf)); producer_strategy_key_validator(#{ <<"strategy">> := key_dispatch, <<"message">> := #{<<"key">> := ""} @@ -257,7 +264,12 @@ override_default(OriginalFn, NewDefault) -> auth_union_member_selector(all_union_members) -> [none, ref(auth_basic), ref(auth_token)]; -auth_union_member_selector({value, V}) -> +auth_union_member_selector({value, V0}) -> + V = + case is_map(V0) of + true -> emqx_utils_maps:binary_key_map(V0); + false -> V0 + end, case V of #{<<"password">> := _} -> [ref(auth_basic)]; @@ -265,6 +277,8 @@ auth_union_member_selector({value, V}) -> [ref(auth_token)]; <<"none">> -> [none]; + none -> + [none]; _ -> Expected = "none | basic | token", throw(#{ diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 4f0f73732..fb2e867ef 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -40,6 +40,7 @@ groups() -> only_once_tests() -> [ t_create_via_http, + t_strategy_key_validation, t_start_when_down, t_send_when_down, t_send_when_timeout, @@ -313,6 +314,8 @@ create_bridge_api(Config, Overrides) -> case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of {ok, {Status, Headers, Body0}} -> {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; + {error, {Status, Headers, Body0}} -> + {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}}; Error -> Error end, @@ -356,8 +359,12 @@ probe_bridge_api(Config, Overrides) -> ct:pal("probing bridge (via http): ~p", [Params]), Res = case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of - {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; - Error -> Error + {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> + {ok, Res0}; + {error, {Status, Headers, Body0}} -> + {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}}; + Error -> + Error end, ct:pal("bridge probe result: ~p", [Res]), Res. @@ -1074,6 +1081,37 @@ t_resource_manager_crash_before_producers_started(Config) -> ), ok. +t_strategy_key_validation(Config) -> + ?assertMatch( + {error, + {{_, 400, _}, _, #{ + <<"message">> := + #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"Message key cannot be empty", _/binary>> + } = Msg + }}} when not is_map_key(<<"value">>, Msg), + probe_bridge_api( + Config, + #{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}} + ) + ), + ?assertMatch( + {error, + {{_, 400, _}, _, #{ + <<"message">> := + #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"Message key cannot be empty", _/binary>> + } = Msg + }}} when not is_map_key(<<"value">>, Msg), + create_bridge_api( + Config, + #{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}} + ) + ), + ok. + t_cluster(Config0) -> ct:timetrap({seconds, 120}), ?retrying(Config0, 3, fun do_t_cluster/1). diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl index d46f2af6f..031767063 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl @@ -35,6 +35,17 @@ pulsar_producer_validations_test() -> ]}, check(Conf) ), + %% ensure atoms exist + _ = [my_producer], + ?assertThrow( + {_, [ + #{ + path := "bridges.pulsar_producer.my_producer", + reason := "Message key cannot be empty when `key_dispatch` strategy is used" + } + ]}, + check_atom_key(Conf) + ), ok. @@ -46,9 +57,14 @@ parse(Hocon) -> {ok, Conf} = hocon:binary(Hocon), Conf. +%% what bridge creation does check(Conf) when is_map(Conf) -> hocon_tconf:check_plain(emqx_bridge_schema, Conf). +%% what bridge probe does +check_atom_key(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}). + %%=========================================================================== %% Data section %%===========================================================================