From 6561d989d67bb00c10fb506883b8f20f2ccdedad Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 15 Feb 2024 17:19:32 +0100 Subject: [PATCH 1/4] feat: refactor DynamoDB bridge to connector and action Fixes: https://emqx.atlassian.net/browse/EMQX-11456 --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + .../src/emqx_bridge_dynamo.app.src | 2 +- .../src/emqx_bridge_dynamo.erl | 141 +++++++++++++++++- .../src/emqx_bridge_dynamo_action_info.erl | 22 +++ .../src/emqx_bridge_dynamo_connector.erl | 128 +++++++++++++--- .../emqx_bridge_dynamo_connector_client.erl | 2 +- .../test/emqx_bridge_dynamo_SUITE.erl | 83 ++++++++++- .../src/schema/emqx_connector_ee_schema.erl | 14 ++ .../src/schema/emqx_connector_schema.erl | 2 + rel/i18n/emqx_bridge_dynamo.hocon | 18 +++ 10 files changed, 383 insertions(+), 30 deletions(-) create mode 100644 apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_action_info.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 724fee48e..e8edfcebb 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -88,6 +88,7 @@ hard_coded_action_info_modules_ee() -> [ emqx_bridge_azure_event_hub_action_info, emqx_bridge_confluent_producer_action_info, + emqx_bridge_dynamo_action_info, emqx_bridge_gcp_pubsub_consumer_action_info, emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index a0e8e2f19..580f4eebc 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -8,7 +8,7 @@ emqx_resource, erlcloud ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_dynamo_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl index 6ddae57f7..37fa92d7d 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl @@ -11,7 +11,6 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_examples/1, values/1 ]). @@ -22,6 +21,14 @@ desc/1 ]). +-export([ + bridge_v2_examples/1, + connector_examples/1, + conn_bridge_examples/1 +]). + +-define(CONNECTOR_TYPE, dynamo). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). -define(DEFAULT_TEMPLATE, <<>>). %% ------------------------------------------------------------------------------------------------- @@ -59,12 +66,134 @@ values(_Method) -> } }. +connector_examples(Method) -> + [ + #{ + <<"dynamo">> => + #{ + summary => <<"DynamoDB Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_values() + ) + } + } + ]. + +connector_values() -> + #{ + <<"enable">> => true, + <<"url">> => <<"http://127.0.0.1:8000">>, + <<"aws_access_key_id">> => <<"root">>, + <<"aws_secret_access_key">> => <<"******">>, + <<"pool_size">> => 8, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_timeout">> => <<"5s">> + } + }. + +bridge_v2_examples(Method) -> + [ + #{ + <<"dynamo">> => + #{ + summary => <<"DynamoDB Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + <<"parameters">> => + #{ + <<"table">> => <<"mqtt_msg">>, + <<"template">> => ?DEFAULT_TEMPLATE + } + }. + %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions namespace() -> "bridge_dynamo". roots() -> []. +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields( + Field, + ?CONNECTOR_TYPE, + fields("config_connector") -- emqx_connector_schema:common_fields() + ); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(dynamo_action)); +fields(action) -> + {?ACTION_TYPE, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, dynamo_action)), + #{ + desc => <<"DynamoDB Action Config">>, + required => false + } + )}; +fields(dynamo_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => ?DESC("action_parameters") + } + ) + ); +fields(action_parameters) -> + Parameters = + [ + {template, + mk( + binary(), + #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} + )} + ] ++ emqx_bridge_dynamo_connector:fields(config), + lists:foldl( + fun(Key, Acc) -> + proplists:delete(Key, Acc) + end, + Parameters, + [ + url, + aws_access_key_id, + aws_secret_access_key, + pool_size, + auto_reconnect + ] + ); +fields("config_connector") -> + Config = + emqx_connector_schema:common_fields() ++ + emqx_bridge_dynamo_connector:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + lists:foldl( + fun(Key, Acc) -> + proplists:delete(Key, Acc) + end, + Config, + [ + table + ] + ); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, @@ -102,9 +231,17 @@ fields("get") -> desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> - ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."]; + ["Configuration for DynamoDB using `", string:to_upper(Method), "` method."]; desc("creation_opts" = Name) -> emqx_resource_schema:desc(Name); +desc("config_connector") -> + ?DESC("config_connector"); +desc(dynamo_action) -> + ?DESC("dynamo_action"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_action_info.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_action_info.erl new file mode 100644 index 000000000..4806deca9 --- /dev/null +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_dynamo_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> dynamo. + +action_type_name() -> dynamo. + +connector_type_name() -> dynamo. + +schema_module() -> emqx_bridge_dynamo. diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 0739df747..f1e90cb21 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -21,7 +21,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([ @@ -70,7 +74,6 @@ on_start( url := Url, aws_access_key_id := AccessKeyID, aws_secret_access_key := SecretAccessKey, - table := Table, pool_size := PoolSize } = Config ) -> @@ -95,12 +98,9 @@ on_start( }}, {pool_size, PoolSize} ], - - Templates = parse_template(Config), State = #{ pool_name => InstanceId, - table => Table, - templates => Templates + installed_channels => #{} }, case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> @@ -109,32 +109,83 @@ on_start( Error end. +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId, + ChannelConfig +) -> + {ok, ChannelState} = create_channel_state(ChannelConfig), + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +create_channel_state( + #{parameters := Conf} = _ChannelConfig +) -> + #{ + table := Table + } = Conf, + Templates = parse_template_from_conf(Conf), + State = #{ + table => Table, + templates => Templates + }, + {ok, State}. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_get_channel_status( + _ResId, + _ChannelId, + _State +) -> + ?status_connected. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + on_stop(InstanceId, _State) -> ?SLOG(info, #{ msg => "stopping_dynamo_connector", connector => InstanceId }), + ?tp( + dynamo_connector_on_stop, + #{instance_id => InstanceId} + ), emqx_resource_pool:stop(InstanceId). on_query(InstanceId, Query, State) -> do_query(InstanceId, Query, State). %% we only support batch insert -on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> +on_batch_query(InstanceId, [{_ChannelId, _} | _] = Query, State) -> do_query(InstanceId, Query, State); on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. -%% we only support batch insert - on_get_status(_InstanceId, #{pool_name := Pool}) -> Health = emqx_resource_pool:health_check_workers( Pool, {emqx_bridge_dynamo_connector_client, is_connected, []} ), status_result(Health). -status_result(_Status = true) -> connected; -status_result(_Status = false) -> connecting. +status_result(_Status = true) -> ?status_connected; +status_result(_Status = false) -> ?status_connecting. %%======================================================================================== %% Helper fns @@ -143,29 +194,44 @@ status_result(_Status = false) -> connecting. do_query( InstanceId, Query, - #{pool_name := PoolName, templates := Templates, table := Table} = State + #{ + pool_name := PoolName, + installed_channels := Channels + } = State ) -> ?TRACE( "QUERY", "dynamo_connector_received", #{connector => InstanceId, query => Query, state => State} ), - Result = ecpool:pick_and_do( - PoolName, - {emqx_bridge_dynamo_connector_client, query, [Table, Query, Templates]}, - no_handover - ), + ChannelId = get_channel_id(Query), + QueryTuple = get_query_tuple(Query), + ChannelState = maps:get(ChannelId, Channels), + #{ + table := Table, + templates := Templates + } = ChannelState, + Result = + ecpool:pick_and_do( + PoolName, + {emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]}, + no_handover + ), case Result of {error, Reason} -> ?tp( dynamo_connector_query_return, - #{error => Reason} + #{ + error => Reason, + instance_id => InstanceId + } ), ?SLOG(error, #{ msg => "dynamo_connector_do_query_failed", connector => InstanceId, - query => Query, + channel => ChannelId, + query => QueryTuple, reason => Reason }), case Reason of @@ -177,16 +243,36 @@ do_query( _ -> ?tp( dynamo_connector_query_return, - #{result => Result} + #{ + result => Result, + instance_id => InstanceId + } ), Result end. +get_channel_id([{ChannelId, _Req} | _]) -> + ChannelId; +get_channel_id({ChannelId, _Req}) -> + ChannelId. + +get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) -> + {QueryType, Data}; +get_query_tuple({_ChannelId, Data} = _Query) -> + {send_message, Data}; +get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) -> + error( + {unrecoverable_error, + {invalid_request, <<"The only query type that support batching is insert.">>}} + ); +get_query_tuple([InsertQuery | _]) -> + get_query_tuple(InsertQuery). + connect(Opts) -> Config = proplists:get_value(config, Opts), {ok, _Pid} = emqx_bridge_dynamo_connector_client:start_link(Config). -parse_template(Config) -> +parse_template_from_conf(Config) -> Templates = case maps:get(template, Config, undefined) of undefined -> #{}; diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index 1cb326cf7..e0d4053bb 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -144,7 +144,7 @@ apply_template({Key, Msg} = Req, Templates) -> %% 1. we can simply replace the `send_message` to `put` %% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`, %% so we can reduce some list map cost -apply_template([{send_message, _Msg} | _] = Msgs, Templates) -> +apply_template([{_, _Msg} | _] = Msgs, Templates) -> lists:map( fun(Req) -> {_, Msg} = apply_template(Req, Templates), diff --git a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl index 936d2d506..52e90328f 100644 --- a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl +++ b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl @@ -117,7 +117,11 @@ common_init(ConfigT) -> {host, Host}, {port, Port}, {query_mode, sync}, - {proxy_name, "dynamo"} + {proxy_name, "dynamo"}, + {bridge_type, <<"dynamo">>}, + {bridge_name, <<"my_dynamo_action">>}, + {connector_type, <<"dynamo">>}, + {connector_name, <<"my_dynamo_connector">>} | ConfigT ], @@ -143,6 +147,8 @@ common_init(ConfigT) -> {dynamo_config, TDConf}, {dynamo_bridge_type, BridgeType}, {dynamo_name, Name}, + {bridge_config, action_config(Config0)}, + {connector_config, connector_config(Config0)}, {proxy_host, ProxyHost}, {proxy_port, ProxyPort} | Config0 @@ -193,6 +199,48 @@ dynamo_config(BridgeType, Config) -> ), {Name, parse_and_check(ConfigString, BridgeType, Name)}. +action_config(Config) -> + ConnectorName = ?config(connector_name, Config), + BatchSize = ?config(batch_size, Config), + QueryMode = ?config(query_mode, Config), + #{ + <<"connector">> => ConnectorName, + <<"enable">> => true, + <<"parameters">> => + #{ + <<"table">> => ?TABLE + }, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"request_ttl">> => <<"45s">>, + <<"worker_pool_size">> => 16, + <<"query_mode">> => QueryMode, + <<"batch_size">> => BatchSize + } + }. + +connector_config(Config) -> + Host = ?config(host, Config), + Port = ?config(port, Config), + URL = list_to_binary("http://" ++ Host ++ ":" ++ integer_to_list(Port)), + SecretFile = ?config(dynamo_secretfile, Config), + AccessKey = "file://" ++ SecretFile, + #{ + <<"url">> => URL, + <<"aws_access_key_id">> => ?ACCESS_KEY_ID, + <<"aws_secret_access_key">> => AccessKey, + <<"enable">> => true, + <<"pool_size">> => 8, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_timeout">> => <<"5s">> + } + }. + parse_and_check(ConfigString, BridgeType, Name) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), @@ -234,8 +282,9 @@ send_message(Config, Payload) -> query_resource(Config, Request) -> Name = ?config(dynamo_name, Config), BridgeType = ?config(dynamo_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). + ID = emqx_bridge_v2:id(BridgeType, Name), + ResID = emqx_connector_resource:resource_id(BridgeType, Name), + emqx_resource:query(ID, Request, #{timeout => 500, connector_resource_id => ResID}). %% create a table, use the apps/emqx_bridge_dynamo/priv/dynamo/mqtt_msg.json as template create_table(Config) -> @@ -403,7 +452,10 @@ t_simple_query(Config) -> {ok, _}, create_bridge(Config) ), - Request = {get_item, {<<"id">>, <<"not_exists">>}}, + BridgeType = ?config(dynamo_bridge_type, Config), + Name = ?config(dynamo_name, Config), + ActionID = emqx_bridge_v2:id(BridgeType, Name), + Request = {ActionID, {get_item, {<<"id">>, <<"not_exists">>}}}, Result = query_resource(Config, Request), case ?config(batch_size, Config) of ?BATCH_SIZE -> @@ -427,11 +479,32 @@ t_bad_parameter(Config) -> {ok, _}, create_bridge(Config) ), - Request = {insert_item, bad_parameter}, + BridgeType = ?config(dynamo_bridge_type, Config), + Name = ?config(dynamo_name, Config), + ActionID = emqx_bridge_v2:id(BridgeType, Name), + Request = {ActionID, {insert_item, bad_parameter}}, Result = query_resource(Config, Request), ?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result), ok. +%% Connector Action Tests + +t_action_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}). + +t_action_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config). + +t_action_sync_query(Config) -> + MakeMessageFun = fun() -> #{id => <<"the_message_id">>, payload => ?PAYLOAD} end, + IsSuccessCheck = fun(Result) -> ?assertEqual({ok, []}, Result) end, + TracePoint = dynamo_connector_query_return, + emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint). + +t_action_start_stop(Config) -> + StopTracePoint = dynamo_connector_on_stop, + emqx_bridge_v2_testlib:t_start_stop(Config, StopTracePoint). + to_bin(List) when is_list(List) -> unicode:characters_to_binary(List, utf8); to_bin(Bin) when is_binary(Bin) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 628305172..561b9cc70 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -26,6 +26,8 @@ resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; resource_type(confluent_producer) -> emqx_bridge_kafka_impl_producer; +resource_type(dynamo) -> + emqx_bridge_dynamo_connector; resource_type(gcp_pubsub_consumer) -> emqx_bridge_gcp_pubsub_impl_consumer; resource_type(gcp_pubsub_producer) -> @@ -122,6 +124,14 @@ connector_structs() -> required => false } )}, + {dynamo, + mk( + hoconsc:map(name, ref(emqx_bridge_dynamo, "config_connector")), + #{ + desc => <<"DynamoDB Connector Config">>, + required => false + } + )}, {gcp_pubsub_consumer, mk( hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_consumer_schema, "config_connector")), @@ -329,6 +339,7 @@ schema_modules() -> [ emqx_bridge_azure_event_hub, emqx_bridge_confluent_producer, + emqx_bridge_dynamo, emqx_bridge_gcp_pubsub_consumer_schema, emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_hstreamdb, @@ -366,6 +377,9 @@ api_schemas(Method) -> api_ref( emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector" ), + api_ref( + emqx_bridge_dynamo, <<"dynamo">>, Method ++ "_connector" + ), api_ref( emqx_bridge_gcp_pubsub_consumer_schema, <<"gcp_pubsub_consumer">>, diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 9f44f8ce2..e8ab571c0 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -126,6 +126,8 @@ connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; +connector_type_to_bridge_types(dynamo) -> + [dynamo]; connector_type_to_bridge_types(gcp_pubsub_consumer) -> [gcp_pubsub_consumer]; connector_type_to_bridge_types(gcp_pubsub_producer) -> diff --git a/rel/i18n/emqx_bridge_dynamo.hocon b/rel/i18n/emqx_bridge_dynamo.hocon index a014aae9f..0d3bcd3f9 100644 --- a/rel/i18n/emqx_bridge_dynamo.hocon +++ b/rel/i18n/emqx_bridge_dynamo.hocon @@ -42,4 +42,22 @@ The template can be any valid JSON with placeholders and make sure all keys for template.label: """Template""" +action_parameters.desc: +"""Action specific configuration.""" + +action_parameters.label: +"""Action""" + +dynamo_action.desc: +"""Configuration for DynamoDB action.""" + +dynamo_action.label: +"""DynamoDB Action Configuration""" + +config_connector.desc: +"""Configuration for an DynamoDB connector.""" + +config_connector.label: +"""DynamoDB Connector Configuration""" + } From 72d340bd4b2fcb0d49e62fa335419380a2693d64 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 21 Feb 2024 14:19:42 +0100 Subject: [PATCH 2/4] docs: add change log entry for DynamoDB bridge refactoring --- changes/ee/feat-12543.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/feat-12543.en.md diff --git a/changes/ee/feat-12543.en.md b/changes/ee/feat-12543.en.md new file mode 100644 index 000000000..841ae3d83 --- /dev/null +++ b/changes/ee/feat-12543.en.md @@ -0,0 +1 @@ +The DynamoDB bridge has been split into connector and action components. Old DynamoDB bridges will be upgraded automatically. From 4678df22290a468b8994e462c6cec082b27785cc Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 21 Feb 2024 14:36:04 +0100 Subject: [PATCH 3/4] build: bump emqx_bridge_gcp_pubsub version The CI check ./scripts/apps-version-check.sh requires that emqx_bridge_gcp_pubsub's version number is bumped. --- apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index f4af9445b..bd98c43d6 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, From 9b6511b3e7a622769e7868504188c1342aa30e88 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 21 Feb 2024 14:57:55 +0100 Subject: [PATCH 4/4] docs: fix spelling mistake in error message Co-authored-by: Thales Macedo Garitezi --- apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index f1e90cb21..0531fdb9a 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -263,7 +263,7 @@ get_query_tuple({_ChannelId, Data} = _Query) -> get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) -> error( {unrecoverable_error, - {invalid_request, <<"The only query type that support batching is insert.">>}} + {invalid_request, <<"The only query type that supports batching is insert.">>}} ); get_query_tuple([InsertQuery | _]) -> get_query_tuple(InsertQuery).