feat(gcp_pubsub_bridges): make service account json a binary
Fixes https://emqx.atlassian.net/browse/EMQX-11384 Today, service_account_json config field is an embedded object (map()). This requires user to embed a JSON object into the config file instead of embedding it as a string. We should support binary() type as input, but keep supporting map() for backward compatibility.
This commit is contained in:
parent
5af01c041b
commit
f2c372d9ff
|
@ -21,12 +21,11 @@
|
||||||
service_account_json_converter/2
|
service_account_json_converter/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([upgrade_raw_conf/1]).
|
||||||
|
|
||||||
%% emqx_bridge_enterprise "unofficial" API
|
%% emqx_bridge_enterprise "unofficial" API
|
||||||
-export([conn_bridge_examples/1]).
|
-export([conn_bridge_examples/1]).
|
||||||
|
|
||||||
-type service_account_json() :: map().
|
|
||||||
-reflect_type([service_account_json/0]).
|
|
||||||
|
|
||||||
-define(DEFAULT_PIPELINE_SIZE, 100).
|
-define(DEFAULT_PIPELINE_SIZE, 100).
|
||||||
|
|
||||||
%%-------------------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
@ -101,7 +100,7 @@ fields(connector_config) ->
|
||||||
)},
|
)},
|
||||||
{service_account_json,
|
{service_account_json,
|
||||||
sc(
|
sc(
|
||||||
?MODULE:service_account_json(),
|
binary(),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
validator => fun ?MODULE:service_account_json_validator/1,
|
validator => fun ?MODULE:service_account_json_validator/1,
|
||||||
|
@ -354,6 +353,22 @@ values(consumer, _Method) ->
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
upgrade_raw_conf(RawConf0) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun(Path, Acc) ->
|
||||||
|
deep_update(
|
||||||
|
Path,
|
||||||
|
fun ensure_binary_service_account_json/1,
|
||||||
|
Acc
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
RawConf0,
|
||||||
|
[
|
||||||
|
[<<"connectors">>, <<"gcp_pubsub_producer">>],
|
||||||
|
[<<"connectors">>, <<"gcp_pubsub_consumer">>]
|
||||||
|
]
|
||||||
|
).
|
||||||
|
|
||||||
%%-------------------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------------------
|
||||||
%% Helper fns
|
%% Helper fns
|
||||||
%%-------------------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
@ -371,46 +386,53 @@ type_field_consumer() ->
|
||||||
name_field() ->
|
name_field() ->
|
||||||
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
||||||
|
|
||||||
-spec service_account_json_validator(map()) ->
|
-spec service_account_json_validator(binary()) ->
|
||||||
ok
|
ok
|
||||||
| {error, {wrong_type, term()}}
|
| {error, {wrong_type, term()}}
|
||||||
| {error, {missing_keys, [binary()]}}.
|
| {error, {missing_keys, [binary()]}}.
|
||||||
service_account_json_validator(Map) ->
|
service_account_json_validator(Val) ->
|
||||||
ExpectedKeys = [
|
case emqx_utils_json:safe_decode(Val, [return_maps]) of
|
||||||
<<"type">>,
|
{ok, Map} ->
|
||||||
<<"project_id">>,
|
Map = emqx_utils_json:decode(Val, [return_maps]),
|
||||||
<<"private_key_id">>,
|
ExpectedKeys = [
|
||||||
<<"private_key">>,
|
<<"type">>,
|
||||||
<<"client_email">>
|
<<"project_id">>,
|
||||||
],
|
<<"private_key_id">>,
|
||||||
MissingKeys = lists:sort([
|
<<"private_key">>,
|
||||||
K
|
<<"client_email">>
|
||||||
|| K <- ExpectedKeys,
|
],
|
||||||
not maps:is_key(K, Map)
|
MissingKeys = lists:sort([
|
||||||
]),
|
K
|
||||||
Type = maps:get(<<"type">>, Map, null),
|
|| K <- ExpectedKeys,
|
||||||
case {MissingKeys, Type} of
|
not maps:is_key(K, Map)
|
||||||
{[], <<"service_account">>} ->
|
]),
|
||||||
ok;
|
Type = maps:get(<<"type">>, Map, null),
|
||||||
{[], Type} ->
|
case {MissingKeys, Type} of
|
||||||
{error, #{wrong_type => Type}};
|
{[], <<"service_account">>} ->
|
||||||
{_, _} ->
|
ok;
|
||||||
{error, #{missing_keys => MissingKeys}}
|
{[], Type} ->
|
||||||
|
{error, #{wrong_type => Type}};
|
||||||
|
{_, _} ->
|
||||||
|
{error, #{missing_keys => MissingKeys}}
|
||||||
|
end;
|
||||||
|
{error, _} ->
|
||||||
|
{error, "not a json"}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
service_account_json_converter(Val, #{make_serializable := true}) ->
|
service_account_json_converter(Val, #{make_serializable := true}) ->
|
||||||
Val;
|
case is_map(Val) of
|
||||||
|
true -> emqx_utils_json:encode(Val);
|
||||||
|
false -> Val
|
||||||
|
end;
|
||||||
service_account_json_converter(Map, _Opts) when is_map(Map) ->
|
service_account_json_converter(Map, _Opts) when is_map(Map) ->
|
||||||
ExpectedKeys = [
|
emqx_utils_json:encode(Map);
|
||||||
<<"type">>,
|
|
||||||
<<"project_id">>,
|
|
||||||
<<"private_key_id">>,
|
|
||||||
<<"private_key">>,
|
|
||||||
<<"client_email">>
|
|
||||||
],
|
|
||||||
maps:with(ExpectedKeys, Map);
|
|
||||||
service_account_json_converter(Val, _Opts) ->
|
service_account_json_converter(Val, _Opts) ->
|
||||||
Val.
|
case emqx_utils_json:safe_decode(Val, [return_maps]) of
|
||||||
|
{ok, Str} when is_binary(Str) ->
|
||||||
|
emqx_utils_json:decode(Str, [return_maps]);
|
||||||
|
_ ->
|
||||||
|
Val
|
||||||
|
end.
|
||||||
|
|
||||||
consumer_topic_mapping_validator(_TopicMapping = []) ->
|
consumer_topic_mapping_validator(_TopicMapping = []) ->
|
||||||
{error, "There must be at least one GCP PubSub-MQTT topic mapping"};
|
{error, "There must be at least one GCP PubSub-MQTT topic mapping"};
|
||||||
|
@ -425,3 +447,29 @@ consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) ->
|
||||||
false ->
|
false ->
|
||||||
{error, "GCP PubSub topics must not be repeated in a bridge"}
|
{error, "GCP PubSub topics must not be repeated in a bridge"}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
deep_update(Path, Fun, Map) ->
|
||||||
|
case emqx_utils_maps:deep_get(Path, Map, #{}) of
|
||||||
|
M when map_size(M) > 0 ->
|
||||||
|
NewM = Fun(M),
|
||||||
|
emqx_utils_maps:deep_put(Path, Map, NewM);
|
||||||
|
_ ->
|
||||||
|
Map
|
||||||
|
end.
|
||||||
|
|
||||||
|
ensure_binary_service_account_json(Connectors) ->
|
||||||
|
maps:map(
|
||||||
|
fun(_Name, Conf) ->
|
||||||
|
maps:update_with(
|
||||||
|
<<"service_account_json">>,
|
||||||
|
fun(JSON) ->
|
||||||
|
case is_map(JSON) of
|
||||||
|
true -> emqx_utils_json:encode(JSON);
|
||||||
|
false -> JSON
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
Conf
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
Connectors
|
||||||
|
).
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
|
|
||||||
-export([get_jwt_authorization_header/1]).
|
-export([get_jwt_authorization_header/1]).
|
||||||
|
|
||||||
-type service_account_json() :: emqx_bridge_gcp_pubsub:service_account_json().
|
-type service_account_json() :: map().
|
||||||
-type project_id() :: binary().
|
-type project_id() :: binary().
|
||||||
-type duration() :: non_neg_integer().
|
-type duration() :: non_neg_integer().
|
||||||
-type config() :: #{
|
-type config() :: #{
|
||||||
|
|
|
@ -94,8 +94,9 @@ query_mode(_Config) -> no_queries.
|
||||||
-spec on_start(connector_resource_id(), connector_config()) ->
|
-spec on_start(connector_resource_id(), connector_config()) ->
|
||||||
{ok, connector_state()} | {error, term()}.
|
{ok, connector_state()} | {error, term()}.
|
||||||
on_start(ConnectorResId, Config0) ->
|
on_start(ConnectorResId, Config0) ->
|
||||||
%% ensure it's a binary key map
|
Config = maps:update_with(
|
||||||
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
|
service_account_json, fun(X) -> emqx_utils_json:decode(X, [return_maps]) end, Config0
|
||||||
|
),
|
||||||
#{service_account_json := #{<<"project_id">> := ProjectId}} = Config,
|
#{service_account_json := #{<<"project_id">> := ProjectId}} = Config,
|
||||||
case emqx_bridge_gcp_pubsub_client:start(ConnectorResId, Config) of
|
case emqx_bridge_gcp_pubsub_client:start(ConnectorResId, Config) of
|
||||||
{ok, Client} ->
|
{ok, Client} ->
|
||||||
|
|
|
@ -73,7 +73,9 @@ on_start(InstanceId, Config0) ->
|
||||||
msg => "starting_gcp_pubsub_bridge",
|
msg => "starting_gcp_pubsub_bridge",
|
||||||
instance_id => InstanceId
|
instance_id => InstanceId
|
||||||
}),
|
}),
|
||||||
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
|
Config = maps:update_with(
|
||||||
|
service_account_json, fun(X) -> emqx_utils_json:decode(X, [return_maps]) end, Config0
|
||||||
|
),
|
||||||
#{service_account_json := #{<<"project_id">> := ProjectId}} = Config,
|
#{service_account_json := #{<<"project_id">> := ProjectId}} = Config,
|
||||||
case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
|
case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
|
||||||
{ok, Client} ->
|
{ok, Client} ->
|
||||||
|
|
|
@ -909,7 +909,7 @@ t_not_a_json(Config) ->
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{error, #{
|
{error, #{
|
||||||
kind := validation_error,
|
kind := validation_error,
|
||||||
reason := #{exception := {error, {badmap, "not a json"}}},
|
reason := "not a json",
|
||||||
%% should be censored as it contains secrets
|
%% should be censored as it contains secrets
|
||||||
value := <<"******">>
|
value := <<"******">>
|
||||||
}},
|
}},
|
||||||
|
|
|
@ -110,6 +110,23 @@ source_config(Overrides0) ->
|
||||||
},
|
},
|
||||||
maps:merge(CommonConfig, Overrides).
|
maps:merge(CommonConfig, Overrides).
|
||||||
|
|
||||||
|
assert_persisted_service_account_json_is_binary(ConnectorName) ->
|
||||||
|
%% ensure cluster.hocon has a binary encoded json string as the value
|
||||||
|
{ok, Hocon} = hocon:files([application:get_env(emqx, cluster_hocon_file, undefined)]),
|
||||||
|
?assertMatch(
|
||||||
|
Bin when is_binary(Bin),
|
||||||
|
emqx_utils_maps:deep_get(
|
||||||
|
[
|
||||||
|
<<"connectors">>,
|
||||||
|
<<"gcp_pubsub_consumer">>,
|
||||||
|
ConnectorName,
|
||||||
|
<<"service_account_json">>
|
||||||
|
],
|
||||||
|
Hocon
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -122,6 +139,27 @@ t_create_via_http(Config) ->
|
||||||
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
|
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_create_via_http_json_object_service_account(Config0) ->
|
||||||
|
%% After the config goes through the roundtrip with `hocon_tconf:check_plain', service
|
||||||
|
%% account json comes back as a binary even if the input is a json object.
|
||||||
|
ConnectorName = ?config(connector_name, Config0),
|
||||||
|
ConnConfig0 = ?config(connector_config, Config0),
|
||||||
|
Config1 = proplists:delete(connector_config, Config0),
|
||||||
|
ConnConfig1 = maps:update_with(
|
||||||
|
<<"service_account_json">>,
|
||||||
|
fun(X) ->
|
||||||
|
?assert(is_binary(X), #{json => X}),
|
||||||
|
JSON = emqx_utils_json:decode(X, [return_maps]),
|
||||||
|
?assert(is_map(JSON)),
|
||||||
|
JSON
|
||||||
|
end,
|
||||||
|
ConnConfig0
|
||||||
|
),
|
||||||
|
Config = [{connector_config, ConnConfig1} | Config1],
|
||||||
|
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
|
||||||
|
assert_persisted_service_account_json_is_binary(ConnectorName),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_consume(Config) ->
|
t_consume(Config) ->
|
||||||
Topic = ?config(pubsub_topic, Config),
|
Topic = ?config(pubsub_topic, Config),
|
||||||
Payload = #{<<"key">> => <<"value">>},
|
Payload = #{<<"key">> => <<"value">>},
|
||||||
|
|
|
@ -261,20 +261,6 @@ readable("comma_separated_atoms()") ->
|
||||||
dashboard => #{type => comma_separated_string},
|
dashboard => #{type => comma_separated_string},
|
||||||
docgen => #{type => "String", example => <<"item1,item2">>}
|
docgen => #{type => "String", example => <<"item1,item2">>}
|
||||||
};
|
};
|
||||||
readable("service_account_json()") ->
|
|
||||||
%% This is a bit special,
|
|
||||||
%% service_account_josn in swagger spec is an object
|
|
||||||
%% the same in documenation.
|
|
||||||
%% However, dashboard wish it to be a string
|
|
||||||
%% TODO:
|
|
||||||
%% - Change type definition to stirng().
|
|
||||||
%% - Convert the embedded object to a escaped JSON string.
|
|
||||||
%% - Delete this function clause once the above is done.
|
|
||||||
#{
|
|
||||||
swagger => #{type => object},
|
|
||||||
dashboard => #{type => string},
|
|
||||||
docgen => #{type => "Map"}
|
|
||||||
};
|
|
||||||
readable("json_binary()") ->
|
readable("json_binary()") ->
|
||||||
#{
|
#{
|
||||||
swagger => #{type => string, example => <<"{\"a\": [1,true]}">>},
|
swagger => #{type => string, example => <<"{\"a\": [1,true]}">>},
|
||||||
|
|
|
@ -19,7 +19,8 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Callback to upgrade config after loaded from config file but before validation.
|
%% Callback to upgrade config after loaded from config file but before validation.
|
||||||
upgrade_raw_conf(RawConf) ->
|
upgrade_raw_conf(RawConf0) ->
|
||||||
|
RawConf = emqx_bridge_gcp_pubsub:upgrade_raw_conf(RawConf0),
|
||||||
emqx_conf_schema:upgrade_raw_conf(RawConf).
|
emqx_conf_schema:upgrade_raw_conf(RawConf).
|
||||||
|
|
||||||
namespace() ->
|
namespace() ->
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Changed the type of `service_account_json` of both GCP PubSub Producer and Consumer connectors to a string. Now, it's possible to set this field to a JSON-encoded string. Using the previous format (a HOCON map) is still supported but not encouraged.
|
Loading…
Reference in New Issue