Merge pull request #12895 from lafirest/fix/dyndb

fix(dynamo): Added missing keys for DynamoDB
This commit is contained in:
lafirest 2024-04-22 23:03:00 +08:00 committed by GitHub
commit 0d1c13661f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 113 additions and 18 deletions

View File

@ -87,6 +87,7 @@ connector_values() ->
<<"url">> => <<"http://127.0.0.1:8000">>, <<"url">> => <<"http://127.0.0.1:8000">>,
<<"aws_access_key_id">> => <<"root">>, <<"aws_access_key_id">> => <<"root">>,
<<"aws_secret_access_key">> => <<"******">>, <<"aws_secret_access_key">> => <<"******">>,
<<"region">> => <<"us-west-2">>,
<<"pool_size">> => 8, <<"pool_size">> => 8,
<<"resource_opts">> => <<"resource_opts">> =>
#{ #{
@ -113,7 +114,8 @@ action_values() ->
<<"parameters">> => <<"parameters">> =>
#{ #{
<<"table">> => <<"mqtt_msg">>, <<"table">> => <<"mqtt_msg">>,
<<"template">> => ?DEFAULT_TEMPLATE <<"template">> => ?DEFAULT_TEMPLATE,
<<"hash_key">> => <<"clientid">>
} }
}. }.
@ -160,7 +162,19 @@ fields(dynamo_action) ->
); );
fields(action_parameters) -> fields(action_parameters) ->
Parameters = Parameters =
[{template, template_field_schema()}] ++ emqx_bridge_dynamo_connector:fields(config), [
{template, template_field_schema()},
{hash_key,
mk(
binary(),
#{desc => ?DESC("hash_key"), required => true}
)},
{range_key,
mk(
binary(),
#{desc => ?DESC("range_key"), required => false}
)}
] ++ emqx_bridge_dynamo_connector:fields(config),
lists:foldl( lists:foldl(
fun(Key, Acc) -> fun(Key, Acc) ->
proplists:delete(Key, Acc) proplists:delete(Key, Acc)
@ -168,6 +182,7 @@ fields(action_parameters) ->
Parameters, Parameters,
[ [
url, url,
region,
aws_access_key_id, aws_access_key_id,
aws_secret_access_key, aws_secret_access_key,
pool_size, pool_size,
@ -199,6 +214,16 @@ fields("config") ->
binary(), binary(),
#{desc => ?DESC("local_topic"), default => undefined} #{desc => ?DESC("local_topic"), default => undefined}
)}, )},
{hash_key,
mk(
binary(),
#{desc => ?DESC("hash_key"), required => true}
)},
{range_key,
mk(
binary(),
#{desc => ?DESC("range_key"), required => false}
)},
{resource_opts, {resource_opts,
mk( mk(
ref(?MODULE, "creation_opts"), ref(?MODULE, "creation_opts"),

View File

@ -45,6 +45,7 @@ roots() ->
fields(config) -> fields(config) ->
[ [
{url, mk(binary(), #{required => true, desc => ?DESC("url")})}, {url, mk(binary(), #{required => true, desc => ?DESC("url")})},
{region, mk(binary(), #{required => true, desc => ?DESC("region")})},
{table, mk(binary(), #{required => true, desc => ?DESC("table")})}, {table, mk(binary(), #{required => true, desc => ?DESC("table")})},
{aws_access_key_id, {aws_access_key_id,
mk( mk(
@ -102,6 +103,12 @@ on_start(
pool_name => InstanceId, pool_name => InstanceId,
installed_channels => #{} installed_channels => #{}
}, },
case Config of
#{region := Region} ->
application:set_env(erlcloud, aws_region, to_str(Region));
_ ->
ok
end,
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -126,12 +133,20 @@ on_add_channel(
create_channel_state( create_channel_state(
#{parameters := Conf} = _ChannelConfig #{parameters := Conf} = _ChannelConfig
) -> ) ->
#{ Keys = maps:with([hash_key, range_key], Conf),
table := Table Keys1 = maps:fold(
} = Conf, fun(K, V, Acc) ->
Acc#{K := erlang:binary_to_existing_atom(V)}
end,
Keys,
Keys
),
Base = maps:without([template, hash_key, range_key], Conf),
Base1 = maps:merge(Base, Keys1),
Templates = parse_template_from_conf(Conf), Templates = parse_template_from_conf(Conf),
State = #{ State = Base1#{
table => Table,
templates => Templates templates => Templates
}, },
{ok, State}. {ok, State}.
@ -232,11 +247,16 @@ do_query(
templates := Templates templates := Templates
} = ChannelState, } = ChannelState,
Result = Result =
case ensuare_dynamo_keys(Query, ChannelState) of
true ->
ecpool:pick_and_do( ecpool:pick_and_do(
PoolName, PoolName,
{emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]}, {emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]},
no_handover no_handover
), );
_ ->
{error, missing_filter_or_range_key}
end,
case Result of case Result of
{error, Reason} -> {error, Reason} ->
@ -288,6 +308,25 @@ get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
get_query_tuple([InsertQuery | _]) -> get_query_tuple([InsertQuery | _]) ->
get_query_tuple(InsertQuery). get_query_tuple(InsertQuery).
ensuare_dynamo_keys({_, Data} = Query, State) when is_map(Data) ->
ensuare_dynamo_keys([Query], State);
ensuare_dynamo_keys([{_, Data} | _] = Queries, State) when is_map(Data) ->
Keys = maps:to_list(maps:with([hash_key, range_key], State)),
lists:all(
fun({_, Query}) ->
lists:all(
fun({_, Key}) ->
maps:is_key(Key, Query)
end,
Keys
)
end,
Queries
);
%% this is not a insert query
ensuare_dynamo_keys(_Query, _State) ->
true.
connect(Opts) -> connect(Opts) ->
Config = proplists:get_value(config, Opts), Config = proplists:get_value(config, Opts),
{ok, _Pid} = emqx_bridge_dynamo_connector_client:start_link(Config). {ok, _Pid} = emqx_bridge_dynamo_connector_client:start_link(Config).

View File

@ -16,6 +16,7 @@
-define(TABLE_BIN, to_bin(?TABLE)). -define(TABLE_BIN, to_bin(?TABLE)).
-define(ACCESS_KEY_ID, "root"). -define(ACCESS_KEY_ID, "root").
-define(SECRET_ACCESS_KEY, "public"). -define(SECRET_ACCESS_KEY, "public").
-define(REGION, "us-west-2").
-define(HOST, "dynamo"). -define(HOST, "dynamo").
-define(PORT, 8000). -define(PORT, 8000).
-define(SCHEMA, "http://"). -define(SCHEMA, "http://").
@ -177,7 +178,9 @@ dynamo_config(BridgeType, Config) ->
"bridges.~s.~s {" "bridges.~s.~s {"
"\n enable = true" "\n enable = true"
"\n url = \"http://~s:~p\"" "\n url = \"http://~s:~p\""
"\n region = ~p"
"\n table = ~p" "\n table = ~p"
"\n hash_key =\"clientid\""
"\n aws_access_key_id = ~p" "\n aws_access_key_id = ~p"
"\n aws_secret_access_key = ~p" "\n aws_secret_access_key = ~p"
"\n resource_opts = {" "\n resource_opts = {"
@ -191,6 +194,7 @@ dynamo_config(BridgeType, Config) ->
Name, Name,
Host, Host,
Port, Port,
?REGION,
?TABLE, ?TABLE,
?ACCESS_KEY_ID, ?ACCESS_KEY_ID,
%% NOTE: using file-based secrets with HOCON configs %% NOTE: using file-based secrets with HOCON configs
@ -210,7 +214,8 @@ action_config(Config) ->
<<"enable">> => true, <<"enable">> => true,
<<"parameters">> => <<"parameters">> =>
#{ #{
<<"table">> => ?TABLE <<"table">> => ?TABLE,
<<"hash_key">> => <<"clientid">>
}, },
<<"resource_opts">> => <<"resource_opts">> =>
#{ #{
@ -234,6 +239,7 @@ connector_config(Config) ->
<<"url">> => URL, <<"url">> => URL,
<<"aws_access_key_id">> => ?ACCESS_KEY_ID, <<"aws_access_key_id">> => ?ACCESS_KEY_ID,
<<"aws_secret_access_key">> => AccessKey, <<"aws_secret_access_key">> => AccessKey,
<<"region">> => ?REGION,
<<"enable">> => true, <<"enable">> => true,
<<"pool_size">> => 8, <<"pool_size">> => 8,
<<"resource_opts">> => <<"resource_opts">> =>
@ -355,7 +361,7 @@ t_setup_via_config_and_publish(Config) ->
create_bridge(Config) create_bridge(Config)
), ),
MsgId = emqx_utils:gen_id(), MsgId = emqx_utils:gen_id(),
SentData = #{id => MsgId, payload => ?PAYLOAD}, SentData = #{clientid => <<"clientid">>, id => MsgId, payload => ?PAYLOAD},
?check_trace( ?check_trace(
begin begin
?wait_async_action( ?wait_async_action(
@ -421,7 +427,7 @@ t_setup_via_http_api_and_publish(Config) ->
create_bridge_http(PgsqlConfig) create_bridge_http(PgsqlConfig)
), ),
MsgId = emqx_utils:gen_id(), MsgId = emqx_utils:gen_id(),
SentData = #{id => MsgId, payload => ?PAYLOAD}, SentData = #{clientid => <<"clientid">>, id => MsgId, payload => ?PAYLOAD},
?check_trace( ?check_trace(
begin begin
?wait_async_action( ?wait_async_action(
@ -486,7 +492,7 @@ t_write_failure(Config) ->
#{?snk_kind := resource_connected_enter}, #{?snk_kind := resource_connected_enter},
20_000 20_000
), ),
SentData = #{id => emqx_utils:gen_id(), payload => ?PAYLOAD}, SentData = #{clientid => <<"clientid">>, id => emqx_utils:gen_id(), payload => ?PAYLOAD},
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch( ?assertMatch(
{error, {resource_error, #{reason := timeout}}}, send_message(Config, SentData) {error, {resource_error, #{reason := timeout}}}, send_message(Config, SentData)
@ -513,12 +519,21 @@ t_simple_query(Config) ->
ok. ok.
t_missing_data(Config) -> t_missing_data(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Result = send_message(Config, #{clientid => <<"clientid">>}),
?assertMatch({error, {<<"ValidationException">>, <<>>}}, Result),
ok.
t_missing_hash_key(Config) ->
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Result = send_message(Config, #{}), Result = send_message(Config, #{}),
?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result), ?assertMatch({error, missing_filter_or_range_key}, Result),
ok. ok.
t_bad_parameter(Config) -> t_bad_parameter(Config) ->
@ -543,7 +558,9 @@ t_action_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config). emqx_bridge_v2_testlib:t_create_via_http(Config).
t_action_sync_query(Config) -> t_action_sync_query(Config) ->
MakeMessageFun = fun() -> #{id => <<"the_message_id">>, payload => ?PAYLOAD} end, MakeMessageFun = fun() ->
#{clientid => <<"clientid">>, id => <<"the_message_id">>, payload => ?PAYLOAD}
end,
IsSuccessCheck = fun(Result) -> ?assertEqual({ok, []}, Result) end, IsSuccessCheck = fun(Result) -> ?assertEqual({ok, []}, Result) end,
TracePoint = dynamo_connector_query_return, TracePoint = dynamo_connector_query_return,
emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint). emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint).

View File

@ -0,0 +1,6 @@
Complemented some necessary but missed keys for the DynamoDB connector and the action.
## Breaking changes
* The old configuration no longer works, although it actually didn't work properly until this fix.
* For DynamoDB connector, a new key `region` is necessary.
* `hash_key` and `range_key` are now supported in the DynamoDB action, and `hash_key` is required.

View File

@ -60,4 +60,9 @@ config_connector.desc:
config_connector.label: config_connector.label:
"""DynamoDB Connector Configuration""" """DynamoDB Connector Configuration"""
hash_key.desc:
"""DynamoDB Hash Key"""
range_key.desc:
"""DynamoDB Range Key"""
} }

View File

@ -18,6 +18,9 @@ table.desc:
table.label: table.label:
"""Table """ """Table """
region.desc:
"""Region of AWS Dynamo"""
url.desc: url.desc:
"""The url of DynamoDB endpoint.""" """The url of DynamoDB endpoint."""