diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl index d568fee25..8dafa3922 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl @@ -87,6 +87,7 @@ connector_values() -> <<"url">> => <<"http://127.0.0.1:8000">>, <<"aws_access_key_id">> => <<"root">>, <<"aws_secret_access_key">> => <<"******">>, + <<"region">> => <<"us-west-2">>, <<"pool_size">> => 8, <<"resource_opts">> => #{ @@ -113,7 +114,8 @@ action_values() -> <<"parameters">> => #{ <<"table">> => <<"mqtt_msg">>, - <<"template">> => ?DEFAULT_TEMPLATE + <<"template">> => ?DEFAULT_TEMPLATE, + <<"hash_key">> => <<"clientid">> } }. @@ -160,7 +162,19 @@ fields(dynamo_action) -> ); fields(action_parameters) -> Parameters = - [{template, template_field_schema()}] ++ emqx_bridge_dynamo_connector:fields(config), + [ + {template, template_field_schema()}, + {hash_key, + mk( + binary(), + #{desc => ?DESC("hash_key"), required => true} + )}, + {range_key, + mk( + binary(), + #{desc => ?DESC("range_key"), required => false} + )} + ] ++ emqx_bridge_dynamo_connector:fields(config), lists:foldl( fun(Key, Acc) -> proplists:delete(Key, Acc) @@ -168,6 +182,7 @@ fields(action_parameters) -> Parameters, [ url, + region, aws_access_key_id, aws_secret_access_key, pool_size, @@ -199,6 +214,16 @@ fields("config") -> binary(), #{desc => ?DESC("local_topic"), default => undefined} )}, + {hash_key, + mk( + binary(), + #{desc => ?DESC("hash_key"), required => true} + )}, + {range_key, + mk( + binary(), + #{desc => ?DESC("range_key"), required => false} + )}, {resource_opts, mk( ref(?MODULE, "creation_opts"), diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 36f54a63f..372472dda 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -45,6 +45,7 @@ roots() -> fields(config) -> [ {url, mk(binary(), #{required => true, desc => ?DESC("url")})}, + {region, mk(binary(), #{required => true, desc => ?DESC("region")})}, {table, mk(binary(), #{required => true, desc => ?DESC("table")})}, {aws_access_key_id, mk( @@ -102,6 +103,12 @@ on_start( pool_name => InstanceId, installed_channels => #{} }, + case Config of + #{region := Region} -> + application:set_env(erlcloud, aws_region, to_str(Region)); + _ -> + ok + end, case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; @@ -126,12 +133,20 @@ on_add_channel( create_channel_state( #{parameters := Conf} = _ChannelConfig ) -> - #{ - table := Table - } = Conf, + Keys = maps:with([hash_key, range_key], Conf), + Keys1 = maps:fold( + fun(K, V, Acc) -> + Acc#{K := erlang:binary_to_existing_atom(V)} + end, + Keys, + Keys + ), + + Base = maps:without([template, hash_key, range_key], Conf), + Base1 = maps:merge(Base, Keys1), + Templates = parse_template_from_conf(Conf), - State = #{ - table => Table, + State = Base1#{ templates => Templates }, {ok, State}. @@ -232,11 +247,16 @@ do_query( templates := Templates } = ChannelState, Result = - ecpool:pick_and_do( - PoolName, - {emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]}, - no_handover - ), + case ensuare_dynamo_keys(Query, ChannelState) of + true -> + ecpool:pick_and_do( + PoolName, + {emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]}, + no_handover + ); + _ -> + {error, missing_filter_or_range_key} + end, case Result of {error, Reason} -> @@ -288,6 +308,25 @@ get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) -> get_query_tuple([InsertQuery | _]) -> get_query_tuple(InsertQuery). +ensuare_dynamo_keys({_, Data} = Query, State) when is_map(Data) -> + ensuare_dynamo_keys([Query], State); +ensuare_dynamo_keys([{_, Data} | _] = Queries, State) when is_map(Data) -> + Keys = maps:to_list(maps:with([hash_key, range_key], State)), + lists:all( + fun({_, Query}) -> + lists:all( + fun({_, Key}) -> + maps:is_key(Key, Query) + end, + Keys + ) + end, + Queries + ); +%% this is not a insert query +ensuare_dynamo_keys(_Query, _State) -> + true. + connect(Opts) -> Config = proplists:get_value(config, Opts), {ok, _Pid} = emqx_bridge_dynamo_connector_client:start_link(Config). diff --git a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl index dab7b21f0..ff3d5824e 100644 --- a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl +++ b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl @@ -16,6 +16,7 @@ -define(TABLE_BIN, to_bin(?TABLE)). -define(ACCESS_KEY_ID, "root"). -define(SECRET_ACCESS_KEY, "public"). +-define(REGION, "us-west-2"). -define(HOST, "dynamo"). -define(PORT, 8000). -define(SCHEMA, "http://"). @@ -177,7 +178,9 @@ dynamo_config(BridgeType, Config) -> "bridges.~s.~s {" "\n enable = true" "\n url = \"http://~s:~p\"" + "\n region = ~p" "\n table = ~p" + "\n hash_key =\"clientid\"" "\n aws_access_key_id = ~p" "\n aws_secret_access_key = ~p" "\n resource_opts = {" @@ -191,6 +194,7 @@ dynamo_config(BridgeType, Config) -> Name, Host, Port, + ?REGION, ?TABLE, ?ACCESS_KEY_ID, %% NOTE: using file-based secrets with HOCON configs @@ -210,7 +214,8 @@ action_config(Config) -> <<"enable">> => true, <<"parameters">> => #{ - <<"table">> => ?TABLE + <<"table">> => ?TABLE, + <<"hash_key">> => <<"clientid">> }, <<"resource_opts">> => #{ @@ -234,6 +239,7 @@ connector_config(Config) -> <<"url">> => URL, <<"aws_access_key_id">> => ?ACCESS_KEY_ID, <<"aws_secret_access_key">> => AccessKey, + <<"region">> => ?REGION, <<"enable">> => true, <<"pool_size">> => 8, <<"resource_opts">> => @@ -355,7 +361,7 @@ t_setup_via_config_and_publish(Config) -> create_bridge(Config) ), MsgId = emqx_utils:gen_id(), - SentData = #{id => MsgId, payload => ?PAYLOAD}, + SentData = #{clientid => <<"clientid">>, id => MsgId, payload => ?PAYLOAD}, ?check_trace( begin ?wait_async_action( @@ -421,7 +427,7 @@ t_setup_via_http_api_and_publish(Config) -> create_bridge_http(PgsqlConfig) ), MsgId = emqx_utils:gen_id(), - SentData = #{id => MsgId, payload => ?PAYLOAD}, + SentData = #{clientid => <<"clientid">>, id => MsgId, payload => ?PAYLOAD}, ?check_trace( begin ?wait_async_action( @@ -486,7 +492,7 @@ t_write_failure(Config) -> #{?snk_kind := resource_connected_enter}, 20_000 ), - SentData = #{id => emqx_utils:gen_id(), payload => ?PAYLOAD}, + SentData = #{clientid => <<"clientid">>, id => emqx_utils:gen_id(), payload => ?PAYLOAD}, emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ?assertMatch( {error, {resource_error, #{reason := timeout}}}, send_message(Config, SentData) @@ -513,12 +519,21 @@ t_simple_query(Config) -> ok. t_missing_data(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Result = send_message(Config, #{clientid => <<"clientid">>}), + ?assertMatch({error, {<<"ValidationException">>, <<>>}}, Result), + ok. + +t_missing_hash_key(Config) -> ?assertMatch( {ok, _}, create_bridge(Config) ), Result = send_message(Config, #{}), - ?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result), + ?assertMatch({error, missing_filter_or_range_key}, Result), ok. t_bad_parameter(Config) -> @@ -543,7 +558,9 @@ t_action_create_via_http(Config) -> emqx_bridge_v2_testlib:t_create_via_http(Config). t_action_sync_query(Config) -> - MakeMessageFun = fun() -> #{id => <<"the_message_id">>, payload => ?PAYLOAD} end, + MakeMessageFun = fun() -> + #{clientid => <<"clientid">>, id => <<"the_message_id">>, payload => ?PAYLOAD} + end, IsSuccessCheck = fun(Result) -> ?assertEqual({ok, []}, Result) end, TracePoint = dynamo_connector_query_return, emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint). diff --git a/changes/ee/fix-12895.en.md b/changes/ee/fix-12895.en.md new file mode 100644 index 000000000..dbfd52e2b --- /dev/null +++ b/changes/ee/fix-12895.en.md @@ -0,0 +1,6 @@ +Complemented some necessary but missed keys for the DynamoDB connector and the action. + +## Breaking changes +* The old configuration no longer works, although it actually didn't work properly until this fix. +* For DynamoDB connector, a new key `region` is necessary. +* `hash_key` and `range_key` are now supported in the DynamoDB action, and `hash_key` is required. diff --git a/rel/i18n/emqx_bridge_dynamo.hocon b/rel/i18n/emqx_bridge_dynamo.hocon index 0d3bcd3f9..31771832a 100644 --- a/rel/i18n/emqx_bridge_dynamo.hocon +++ b/rel/i18n/emqx_bridge_dynamo.hocon @@ -60,4 +60,9 @@ config_connector.desc: config_connector.label: """DynamoDB Connector Configuration""" +hash_key.desc: +"""DynamoDB Hash Key""" + +range_key.desc: +"""DynamoDB Range Key""" } diff --git a/rel/i18n/emqx_bridge_dynamo_connector.hocon b/rel/i18n/emqx_bridge_dynamo_connector.hocon index 7c37676b5..18c3670aa 100644 --- a/rel/i18n/emqx_bridge_dynamo_connector.hocon +++ b/rel/i18n/emqx_bridge_dynamo_connector.hocon @@ -18,6 +18,9 @@ table.desc: table.label: """Table """ +region.desc: +"""Region of AWS Dynamo""" + url.desc: """The url of DynamoDB endpoint."""