From f2c372d9ff9a3887576aacc5afb6a350cae79676 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 23 Feb 2024 15:36:39 -0300 Subject: [PATCH] feat(gcp_pubsub_bridges): make service account json a binary Fixes https://emqx.atlassian.net/browse/EMQX-11384 Today, service_account_json config field is an embedded object (map()). This requires user to embed a JSON object into the config file instead of embedding it as a string. We should support binary() type as input, but keep supporting map() for backward compatibility. --- .../src/emqx_bridge_gcp_pubsub.erl | 120 ++++++++++++------ .../src/emqx_bridge_gcp_pubsub_client.erl | 2 +- .../emqx_bridge_gcp_pubsub_impl_consumer.erl | 5 +- .../emqx_bridge_gcp_pubsub_impl_producer.erl | 4 +- .../emqx_bridge_gcp_pubsub_producer_SUITE.erl | 2 +- ...qx_bridge_v2_gcp_pubsub_consumer_SUITE.erl | 38 ++++++ apps/emqx_conf/src/emqx_conf_schema_types.erl | 14 -- .../src/emqx_enterprise_schema.erl | 3 +- changes/ee/feat-12577.en.md | 1 + 9 files changed, 133 insertions(+), 56 deletions(-) create mode 100644 changes/ee/feat-12577.en.md diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl index 445acd51d..007bbc1a0 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl @@ -21,12 +21,11 @@ service_account_json_converter/2 ]). +-export([upgrade_raw_conf/1]). + %% emqx_bridge_enterprise "unofficial" API -export([conn_bridge_examples/1]). --type service_account_json() :: map(). --reflect_type([service_account_json/0]). - -define(DEFAULT_PIPELINE_SIZE, 100). %%------------------------------------------------------------------------------------------------- @@ -101,7 +100,7 @@ fields(connector_config) -> )}, {service_account_json, sc( - ?MODULE:service_account_json(), + binary(), #{ required => true, validator => fun ?MODULE:service_account_json_validator/1, @@ -354,6 +353,22 @@ values(consumer, _Method) -> } }. +upgrade_raw_conf(RawConf0) -> + lists:foldl( + fun(Path, Acc) -> + deep_update( + Path, + fun ensure_binary_service_account_json/1, + Acc + ) + end, + RawConf0, + [ + [<<"connectors">>, <<"gcp_pubsub_producer">>], + [<<"connectors">>, <<"gcp_pubsub_consumer">>] + ] + ). + %%------------------------------------------------------------------------------------------------- %% Helper fns %%------------------------------------------------------------------------------------------------- @@ -371,46 +386,53 @@ type_field_consumer() -> name_field() -> {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. --spec service_account_json_validator(map()) -> +-spec service_account_json_validator(binary()) -> ok | {error, {wrong_type, term()}} | {error, {missing_keys, [binary()]}}. -service_account_json_validator(Map) -> - ExpectedKeys = [ - <<"type">>, - <<"project_id">>, - <<"private_key_id">>, - <<"private_key">>, - <<"client_email">> - ], - MissingKeys = lists:sort([ - K - || K <- ExpectedKeys, - not maps:is_key(K, Map) - ]), - Type = maps:get(<<"type">>, Map, null), - case {MissingKeys, Type} of - {[], <<"service_account">>} -> - ok; - {[], Type} -> - {error, #{wrong_type => Type}}; - {_, _} -> - {error, #{missing_keys => MissingKeys}} +service_account_json_validator(Val) -> + case emqx_utils_json:safe_decode(Val, [return_maps]) of + {ok, Map} -> + Map = emqx_utils_json:decode(Val, [return_maps]), + ExpectedKeys = [ + <<"type">>, + <<"project_id">>, + <<"private_key_id">>, + <<"private_key">>, + <<"client_email">> + ], + MissingKeys = lists:sort([ + K + || K <- ExpectedKeys, + not maps:is_key(K, Map) + ]), + Type = maps:get(<<"type">>, Map, null), + case {MissingKeys, Type} of + {[], <<"service_account">>} -> + ok; + {[], Type} -> + {error, #{wrong_type => Type}}; + {_, _} -> + {error, #{missing_keys => MissingKeys}} + end; + {error, _} -> + {error, "not a json"} end. service_account_json_converter(Val, #{make_serializable := true}) -> - Val; + case is_map(Val) of + true -> emqx_utils_json:encode(Val); + false -> Val + end; service_account_json_converter(Map, _Opts) when is_map(Map) -> - ExpectedKeys = [ - <<"type">>, - <<"project_id">>, - <<"private_key_id">>, - <<"private_key">>, - <<"client_email">> - ], - maps:with(ExpectedKeys, Map); + emqx_utils_json:encode(Map); service_account_json_converter(Val, _Opts) -> - Val. + case emqx_utils_json:safe_decode(Val, [return_maps]) of + {ok, Str} when is_binary(Str) -> + emqx_utils_json:decode(Str, [return_maps]); + _ -> + Val + end. consumer_topic_mapping_validator(_TopicMapping = []) -> {error, "There must be at least one GCP PubSub-MQTT topic mapping"}; @@ -425,3 +447,29 @@ consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) -> false -> {error, "GCP PubSub topics must not be repeated in a bridge"} end. + +deep_update(Path, Fun, Map) -> + case emqx_utils_maps:deep_get(Path, Map, #{}) of + M when map_size(M) > 0 -> + NewM = Fun(M), + emqx_utils_maps:deep_put(Path, Map, NewM); + _ -> + Map + end. + +ensure_binary_service_account_json(Connectors) -> + maps:map( + fun(_Name, Conf) -> + maps:update_with( + <<"service_account_json">>, + fun(JSON) -> + case is_map(JSON) of + true -> emqx_utils_json:encode(JSON); + false -> JSON + end + end, + Conf + ) + end, + Connectors + ). diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl index c3607467f..f27aab422 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl @@ -25,7 +25,7 @@ -export([get_jwt_authorization_header/1]). --type service_account_json() :: emqx_bridge_gcp_pubsub:service_account_json(). +-type service_account_json() :: map(). -type project_id() :: binary(). -type duration() :: non_neg_integer(). -type config() :: #{ diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index bd02a84be..5c51cd2d9 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -94,8 +94,9 @@ query_mode(_Config) -> no_queries. -spec on_start(connector_resource_id(), connector_config()) -> {ok, connector_state()} | {error, term()}. on_start(ConnectorResId, Config0) -> - %% ensure it's a binary key map - Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0), + Config = maps:update_with( + service_account_json, fun(X) -> emqx_utils_json:decode(X, [return_maps]) end, Config0 + ), #{service_account_json := #{<<"project_id">> := ProjectId}} = Config, case emqx_bridge_gcp_pubsub_client:start(ConnectorResId, Config) of {ok, Client} -> diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index ef2501647..299b90226 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -73,7 +73,9 @@ on_start(InstanceId, Config0) -> msg => "starting_gcp_pubsub_bridge", instance_id => InstanceId }), - Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0), + Config = maps:update_with( + service_account_json, fun(X) -> emqx_utils_json:decode(X, [return_maps]) end, Config0 + ), #{service_account_json := #{<<"project_id">> := ProjectId}} = Config, case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of {ok, Client} -> diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index 1ff182cf9..4acc5ff3c 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -909,7 +909,7 @@ t_not_a_json(Config) -> ?assertMatch( {error, #{ kind := validation_error, - reason := #{exception := {error, {badmap, "not a json"}}}, + reason := "not a json", %% should be censored as it contains secrets value := <<"******">> }}, diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_consumer_SUITE.erl index 77c59b8af..34ee4599e 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_consumer_SUITE.erl @@ -110,6 +110,23 @@ source_config(Overrides0) -> }, maps:merge(CommonConfig, Overrides). +assert_persisted_service_account_json_is_binary(ConnectorName) -> + %% ensure cluster.hocon has a binary encoded json string as the value + {ok, Hocon} = hocon:files([application:get_env(emqx, cluster_hocon_file, undefined)]), + ?assertMatch( + Bin when is_binary(Bin), + emqx_utils_maps:deep_get( + [ + <<"connectors">>, + <<"gcp_pubsub_consumer">>, + ConnectorName, + <<"service_account_json">> + ], + Hocon + ) + ), + ok. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -122,6 +139,27 @@ t_create_via_http(Config) -> ok = emqx_bridge_v2_testlib:t_create_via_http(Config), ok. +t_create_via_http_json_object_service_account(Config0) -> + %% After the config goes through the roundtrip with `hocon_tconf:check_plain', service + %% account json comes back as a binary even if the input is a json object. + ConnectorName = ?config(connector_name, Config0), + ConnConfig0 = ?config(connector_config, Config0), + Config1 = proplists:delete(connector_config, Config0), + ConnConfig1 = maps:update_with( + <<"service_account_json">>, + fun(X) -> + ?assert(is_binary(X), #{json => X}), + JSON = emqx_utils_json:decode(X, [return_maps]), + ?assert(is_map(JSON)), + JSON + end, + ConnConfig0 + ), + Config = [{connector_config, ConnConfig1} | Config1], + ok = emqx_bridge_v2_testlib:t_create_via_http(Config), + assert_persisted_service_account_json_is_binary(ConnectorName), + ok. + t_consume(Config) -> Topic = ?config(pubsub_topic, Config), Payload = #{<<"key">> => <<"value">>}, diff --git a/apps/emqx_conf/src/emqx_conf_schema_types.erl b/apps/emqx_conf/src/emqx_conf_schema_types.erl index 1688284ca..19ecb765d 100644 --- a/apps/emqx_conf/src/emqx_conf_schema_types.erl +++ b/apps/emqx_conf/src/emqx_conf_schema_types.erl @@ -261,20 +261,6 @@ readable("comma_separated_atoms()") -> dashboard => #{type => comma_separated_string}, docgen => #{type => "String", example => <<"item1,item2">>} }; -readable("service_account_json()") -> - %% This is a bit special, - %% service_account_josn in swagger spec is an object - %% the same in documenation. - %% However, dashboard wish it to be a string - %% TODO: - %% - Change type definition to stirng(). - %% - Convert the embedded object to a escaped JSON string. - %% - Delete this function clause once the above is done. - #{ - swagger => #{type => object}, - dashboard => #{type => string}, - docgen => #{type => "Map"} - }; readable("json_binary()") -> #{ swagger => #{type => string, example => <<"{\"a\": [1,true]}">>}, diff --git a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl index d99efb887..316100b67 100644 --- a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl +++ b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl @@ -19,7 +19,8 @@ ]). %% Callback to upgrade config after loaded from config file but before validation. -upgrade_raw_conf(RawConf) -> +upgrade_raw_conf(RawConf0) -> + RawConf = emqx_bridge_gcp_pubsub:upgrade_raw_conf(RawConf0), emqx_conf_schema:upgrade_raw_conf(RawConf). namespace() -> diff --git a/changes/ee/feat-12577.en.md b/changes/ee/feat-12577.en.md new file mode 100644 index 000000000..0b5d61666 --- /dev/null +++ b/changes/ee/feat-12577.en.md @@ -0,0 +1 @@ +Changed the type of `service_account_json` of both GCP PubSub Producer and Consumer connectors to a string. Now, it's possible to set this field to a JSON-encoded string. Using the previous format (a HOCON map) is still supported but not encouraged.