From d5b62eead0152e5221bd667571ea7116987ddd14 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 14 Nov 2023 17:16:09 +0100 Subject: [PATCH] feat: split pgsql, matrix and timescale into connector action This commit splits the bridges pgsql, matrix and timescale into connector and action. Fixes: https://emqx.atlassian.net/browse/EMQX-11155 --- apps/emqx_bridge/src/emqx_action_info.erl | 6 +- apps/emqx_bridge/src/emqx_bridge.erl | 6 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 55 ++++- .../src/emqx_bridge_matrix.app.src | 2 +- .../src/emqx_bridge_matrix.erl | 53 ++++- .../src/emqx_bridge_matrix_action_info.erl | 22 ++ .../src/emqx_bridge_pgsql_action_info.erl | 22 ++ .../src/schema/emqx_bridge_pgsql_schema.erl | 172 +++++++++++++++ .../test/emqx_bridge_pgsql_SUITE.erl | 50 +++-- .../src/emqx_bridge_timescale.app.src | 2 +- .../src/emqx_bridge_timescale.erl | 53 ++++- .../src/emqx_bridge_timescale_action_info.erl | 22 ++ .../src/schema/emqx_connector_ee_schema.erl | 40 +++- .../src/schema/emqx_connector_schema.erl | 5 +- apps/emqx_postgresql/src/emqx_postgresql.erl | 178 ++++++++++++++-- .../emqx_postgresql_connector_schema.erl | 201 ++++++++++++++++++ 16 files changed, 832 insertions(+), 57 deletions(-) create mode 100644 apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl create mode 100644 apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl create mode 100644 apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl create mode 100644 apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl create mode 100644 apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 129142f24..74fd38811 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -78,7 +78,11 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, emqx_bridge_mongodb_action_info, - emqx_bridge_syskeeper_action_info + emqx_bridge_syskeeper_action_info, + emqx_bridge_syskeeper_action_info, + emqx_bridge_pgsql_action_info, + emqx_bridge_timescale_action_info, + emqx_bridge_matrix_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index f557210ed..9e14c0c9a 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -240,8 +240,8 @@ send_message(BridgeId, Message) -> {BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId), case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of true -> - BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), - emqx_bridge_v2:send_message(BridgeV2Type, BridgeName, Message, #{}); + ActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeV1Type), + emqx_bridge_v2:send_message(ActionType, BridgeName, Message, #{}); false -> ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName), send_message(BridgeV1Type, BridgeName, ResId, Message, #{}) @@ -414,7 +414,7 @@ remove(BridgeType0, BridgeName) -> }), case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of true -> - emqx_bridge_v2:remove(BridgeType, BridgeName); + emqx_bridge_v2:bridge_v1_remove(BridgeType0, BridgeName); false -> remove_v1(BridgeType, BridgeName) end. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 54ccf1b24..63874d67e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -55,6 +55,7 @@ disable_enable/3, health_check/2, send_message/4, + query/4, start/2, reset_metrics/2, create_dry_run/2, @@ -116,7 +117,9 @@ bridge_v1_enable_disable/3, bridge_v1_restart/2, bridge_v1_stop/2, - bridge_v1_start/2 + bridge_v1_start/2, + %% For test cases only + bridge_v1_remove/2 ]). %%==================================================================== @@ -547,25 +550,25 @@ get_query_mode(BridgeV2Type, Config) -> ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType), emqx_resource:query_mode(ResourceType, Config, CreationOpts). --spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) -> +-spec query(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) -> term() | {error, term()}. -send_message(BridgeType, BridgeName, Message, QueryOpts0) -> +query(BridgeType, BridgeName, Message, QueryOpts0) -> case lookup_conf(BridgeType, BridgeName) of #{enable := true} = Config0 -> Config = combine_connector_and_bridge_v2_config(BridgeType, BridgeName, Config0), - do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); + do_query_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); #{enable := false} -> {error, bridge_stopped}; _Error -> {error, bridge_not_found} end. -do_send_msg_with_enabled_config( +do_query_with_enabled_config( _BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error ) -> ?SLOG(error, Reason), Error; -do_send_msg_with_enabled_config( +do_query_with_enabled_config( BridgeType, BridgeName, Message, QueryOpts0, Config ) -> QueryMode = get_query_mode(BridgeType, Config), @@ -579,7 +582,17 @@ do_send_msg_with_enabled_config( } ), BridgeV2Id = id(BridgeType, BridgeName), - emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). + case Message of + {send_message, Msg} -> + emqx_resource:query(BridgeV2Id, {BridgeV2Id, Msg}, QueryOpts); + Msg -> + emqx_resource:query(BridgeV2Id, Msg, QueryOpts) + end. + +-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) -> + term() | {error, term()}. +send_message(BridgeType, BridgeName, Message, QueryOpts0) -> + query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0). -spec health_check(BridgeType :: term(), BridgeName :: term()) -> #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}. @@ -1325,6 +1338,34 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf). +%% Only called by test cases (may create broken references) +bridge_v1_remove(BridgeV1Type, BridgeName) -> + ActionType = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), + bridge_v1_remove( + ActionType, + BridgeName, + lookup_conf(ActionType, BridgeName) + ). + +bridge_v1_remove( + ActionType, + Name, + #{connector := ConnectorName} +) -> + case remove(ActionType, Name) of + ok -> + ConnectorType = connector_type(ActionType), + emqx_connector:remove(ConnectorType, ConnectorName); + Error -> + Error + end; +bridge_v1_remove( + _ActionType, + _Name, + Error +) -> + Error. + bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), bridge_v1_check_deps_and_remove( diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src index 14aca1f75..e2a63a01e 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_matrix, [ {description, "EMQX Enterprise MatrixDB Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl index abd98adb6..acfd86ded 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl @@ -14,6 +14,12 @@ desc/1 ]). +%% Examples +-export([ + bridge_v2_examples/1, + connector_examples/1 +]). + %% ------------------------------------------------------------------------------------------------- %% api @@ -22,7 +28,7 @@ conn_bridge_examples(Method) -> #{ <<"matrix">> => #{ summary => <<"Matrix Bridge">>, - value => emqx_bridge_pgsql:values(Method, matrix) + value => emqx_bridge_pgsql_schema:values_conn_bridge_examples(Method, matrix) } } ]. @@ -35,8 +41,53 @@ roots() -> []. fields("post") -> emqx_bridge_pgsql:fields("post", matrix); +fields("config_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields(action) -> + {matrix, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + #{ + desc => <<"Matrix Action Config">>, + required => false + } + )}; +fields("put_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("get_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("post_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("put_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields("get_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields("post_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); fields(Method) -> emqx_bridge_pgsql:fields(Method). desc(_) -> undefined. + +%% Examples + +connector_examples(Method) -> + [ + #{ + <<"matrix">> => #{ + summary => <<"Matrix Connector">>, + value => emqx_postgresql_connector_schema:values({Method, connector}) + } + } + ]. + +bridge_v2_examples(Method) -> + [ + #{ + <<"matrix">> => #{ + summary => <<"Matrix Action">>, + value => emqx_bridge_pgsql_schema:values({Method, matrix}) + } + } + ]. diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl new file mode 100644 index 000000000..4eae13415 --- /dev/null +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_matrix_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> matrix. + +action_type_name() -> matrix. + +connector_type_name() -> matrix. + +schema_module() -> emqx_bridge_matrix. diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl new file mode 100644 index 000000000..e353eb440 --- /dev/null +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_pgsql_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> pgsql. + +action_type_name() -> pgsql. + +connector_type_name() -> pgsql. + +schema_module() -> emqx_bridge_pgsql_schema. diff --git a/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl b/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl new file mode 100644 index 000000000..a3ad4bb11 --- /dev/null +++ b/apps/emqx_bridge_pgsql/src/schema/emqx_bridge_pgsql_schema.erl @@ -0,0 +1,172 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_pgsql_schema). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_postgresql/include/emqx_postgresql.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("epgsql/include/epgsql.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-export([roots/0, fields/1]). + +%% Examples +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1 +]). + +%% Exported for timescale and matrix bridges +-export([ + values/1, + values_conn_bridge_examples/2 +]). + +-define(PGSQL_HOST_OPTIONS, #{ + default_port => ?PGSQL_DEFAULT_PORT +}). + +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields("config_connector") -> + emqx_postgresql_connector_schema:fields("config_connector"); +fields(config) -> + fields("config_connector") ++ + fields(action); +fields(action) -> + {pgsql, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + #{ + desc => <<"PostgreSQL Action Config">>, + required => false + } + )}; +fields(action_parameters) -> + [ + {sql, + hoconsc:mk( + binary(), + #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>} + )} + ] ++ + emqx_connector_schema_lib:prepare_statement_fields(); +fields(pgsql_action) -> + emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); +%% TODO: All of these needs to be fixed +fields("put_bridge_v2") -> + fields(pgsql_action); +fields("get_bridge_v2") -> + fields(pgsql_action); +fields("post_bridge_v2") -> + fields(pgsql_action). + +default_sql() -> + << + "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " + "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" + >>. + +%% Examples + +bridge_v2_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Producer Action">>, + value => values({Method, bridge_v2_producer}) + } + } + ]. + +conn_bridge_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Producer Bridge">>, + value => values_conn_bridge_examples(Method, pgsql) + } + } + ]. + +values({get, PostgreSQLType}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values({post, PostgreSQLType}) + ); +values({post, PostgreSQLType}) -> + maps:merge( + #{ + name => <<"my_pgsql_action">>, + type => PostgreSQLType + }, + values({put, PostgreSQLType}) + ); +values({put, PostgreSQLType}) -> + maps:merge( + #{ + enable => true, + connector => <<"my_pgsql_connector">>, + resource_opts => #{ + health_check_interval => "32s" + } + }, + values({producer, PostgreSQLType}) + ); +values({producer, _PostgreSQLType}) -> + #{ + <<"enable">> => true, + <<"connector">> => <<"connector_pgsql_test">>, + <<"parameters">> => #{ + <<"sql">> => + <<"INSERT INTO client_events(clientid, event, created_at) VALUES (\n ${clientid},\n ${event},\n TO_TIMESTAMP((${timestamp} :: bigint))\n)">> + }, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"query_mode">> => <<"async">>, + <<"request_ttl">> => <<"45s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">>, + <<"worker_pool_size">> => 16 + } + }. + +values_conn_bridge_examples(_Method, Type) -> + #{ + enable => true, + type => Type, + name => <<"foo">>, + server => <<"127.0.0.1:5432">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"root">>, + password => <<"******">>, + sql => default_sql(), + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index 722489ba6..58aaa7d71 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -114,7 +114,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok = emqx_common_test_helpers:stop_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]), ok. init_per_testcase(_Testcase, Config) -> @@ -147,7 +147,7 @@ common_init(Config0) -> ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), % Ensure enterprise bridge module is loaded - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]), _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), % Connect to pgsql directly and create the table @@ -259,17 +259,16 @@ send_message(Config, Payload) -> BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), emqx_bridge:send_message(BridgeID, Payload). -query_resource(Config, Request) -> +query_resource(Config, Msg = _Request) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). + emqx_bridge_v2:query(BridgeType, Name, Msg, #{timeout => 1_000}). query_resource_sync(Config, Request) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request). + ActionId = emqx_bridge_v2:id(BridgeType, Name), + emqx_resource_buffer_worker:simple_sync_query(ActionId, Request). query_resource_async(Config, Request) -> query_resource_async(Config, Request, _Opts = #{}). @@ -279,9 +278,8 @@ query_resource_async(Config, Request, Opts) -> BridgeType = ?config(pgsql_bridge_type, Config), Ref = alias([reply]), AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), Timeout = maps:get(timeout, Opts, 500), - Return = emqx_resource:query(ResourceID, Request, #{ + Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{ timeout => Timeout, async_reply_fun => {AsyncReplyFun, []} }), @@ -441,13 +439,12 @@ t_get_status(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)), emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ?assertMatch( - {ok, Status} when Status =:= disconnected orelse Status =:= connecting, - emqx_resource_manager:health_check(ResourceID) + #{status := Status} when Status =:= disconnected orelse Status =:= connecting, + emqx_bridge_v2:health_check(BridgeType, Name) ) end), ok. @@ -655,7 +652,7 @@ t_nasty_sql_string(Config) -> t_missing_table(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + % ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?check_trace( begin @@ -665,21 +662,20 @@ t_missing_table(Config) -> _Sleep = 1_000, _Attempts = 20, ?assertMatch( - {ok, Status} when Status == connecting orelse Status == disconnected, - emqx_resource_manager:health_check(ResourceID) + #{status := Status} when Status == connecting orelse Status == disconnected, + emqx_bridge_v2:health_check(BridgeType, Name) ) ), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, - Timeout = 1000, ?assertMatch( {error, {resource_error, #{reason := unhealthy_target}}}, - query_resource(Config, {send_message, SentData, [], Timeout}) + query_resource(Config, {send_message, SentData}) ), ok end, fun(Trace) -> - ?assertMatch([_], ?of_kind(pgsql_undefined_table, Trace)), + ?assertMatch([_ | _], ?of_kind(pgsql_undefined_table, Trace)), ok end ), @@ -689,7 +685,7 @@ t_missing_table(Config) -> t_table_removed(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + %%ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?check_trace( begin connect_and_create_table(Config), @@ -697,13 +693,14 @@ t_table_removed(Config) -> ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)) ), connect_and_drop_table(Config), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, - case query_resource_sync(Config, {send_message, SentData, []}) of - {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}} -> + ActionId = emqx_bridge_v2:id(BridgeType, Name), + case query_resource_sync(Config, {ActionId, SentData}) of + {error, {unrecoverable_error, _}} -> ok; ?RESOURCE_ERROR_M(not_connected, _) -> ok; @@ -720,7 +717,6 @@ t_table_removed(Config) -> t_concurrent_health_checks(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?check_trace( begin connect_and_create_table(Config), @@ -728,11 +724,13 @@ t_concurrent_health_checks(Config) -> ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)) ), emqx_utils:pmap( fun(_) -> - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ?assertMatch( + #{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name) + ) end, lists:seq(1, 20) ), diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src index adb024591..4842f5c93 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_timescale, [ {description, "EMQX Enterprise TimescaleDB Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource]}, {env, []}, diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl index c4dedf07c..edeef26d4 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl @@ -14,6 +14,12 @@ desc/1 ]). +%% Examples +-export([ + bridge_v2_examples/1, + connector_examples/1 +]). + %% ------------------------------------------------------------------------------------------------- %% api @@ -22,7 +28,7 @@ conn_bridge_examples(Method) -> #{ <<"timescale">> => #{ summary => <<"Timescale Bridge">>, - value => emqx_bridge_pgsql:values(Method, timescale) + value => emqx_bridge_pgsql_schema:values_conn_bridge_examples(Method, timescale) } } ]. @@ -35,8 +41,53 @@ roots() -> []. fields("post") -> emqx_bridge_pgsql:fields("post", timescale); +fields("config_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields(action) -> + {timescale, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + #{ + desc => <<"Timescale Action Config">>, + required => false + } + )}; +fields("put_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("get_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("post_bridge_v2") -> + emqx_bridge_pgsql_schema:fields(pgsql_action); +fields("put_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields("get_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); +fields("post_connector") -> + emqx_bridge_pgsql_schema:fields("config_connector"); fields(Method) -> emqx_bridge_pgsql:fields(Method). desc(_) -> undefined. + +%% Examples + +connector_examples(Method) -> + [ + #{ + <<"timescale">> => #{ + summary => <<"Timescale Connector">>, + value => emqx_postgresql_connector_schema:values({Method, connector}) + } + } + ]. + +bridge_v2_examples(Method) -> + [ + #{ + <<"timescale">> => #{ + summary => <<"Timescale Action">>, + value => emqx_bridge_pgsql_schema:values({Method, timescale}) + } + } + ]. diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl new file mode 100644 index 000000000..fff74b578 --- /dev/null +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_timescale_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> timescale. + +action_type_name() -> timescale. + +connector_type_name() -> timescale. + +schema_module() -> emqx_bridge_timescale. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 535917e4e..1ffe306e9 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -35,6 +35,12 @@ resource_type(syskeeper_forwarder) -> emqx_bridge_syskeeper_connector; resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server; +resource_type(pgsql) -> + emqx_postgresql; +resource_type(timescale) -> + emqx_postgresql; +resource_type(matrix) -> + emqx_postgresql; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -108,6 +114,30 @@ connector_structs() -> desc => <<"Syskeeper Proxy Connector Config">>, required => false } + )}, + {pgsql, + mk( + hoconsc:map(name, ref(emqx_postgresql_connector_schema, "config_connector")), + #{ + desc => <<"PostgreSQL Connector Config">>, + required => false + } + )}, + {timescale, + mk( + hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")), + #{ + desc => <<"Timescale Connector Config">>, + required => false + } + )}, + {matrix, + mk( + hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")), + #{ + desc => <<"Matrix Connector Config">>, + required => false + } )} ]. @@ -131,7 +161,10 @@ schema_modules() -> emqx_bridge_kafka, emqx_bridge_mongodb, emqx_bridge_syskeeper_connector, - emqx_bridge_syskeeper_proxy + emqx_bridge_syskeeper_proxy, + emqx_postgresql_connector_schema, + emqx_bridge_timescale, + emqx_bridge_matrix ]. api_schemas(Method) -> @@ -152,7 +185,10 @@ api_schemas(Method) -> api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), - api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method) + api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), + api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), + api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), + api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 765a693e2..cd99f0fe6 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -72,7 +72,10 @@ connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_p connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; -connector_type_to_bridge_types(syskeeper_proxy) -> []. +connector_type_to_bridge_types(syskeeper_proxy) -> []; +connector_type_to_bridge_types(pgsql) -> [pgsql]; +connector_type_to_bridge_types(timescale) -> [timescale]; +connector_type_to_bridge_types(matrix) -> [matrix]. actions_config_name() -> <<"actions">>. diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index ba1ad4be5..68fa1da00 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -34,7 +34,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([connect/1]). @@ -136,10 +140,11 @@ on_start( {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {pool_size, PoolSize} ], - State = parse_prepare_sql(Config), + State1 = parse_prepare_sql(Config, <<"send_message">>), + State2 = State1#{installed_channels => #{}}, case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> - {ok, init_prepare(State#{pool_name => InstId, prepares => #{}})}; + {ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})}; {error, Reason} -> ?tp( pgsql_connector_start_failed, @@ -148,13 +153,140 @@ on_start( {error, Reason} end. -on_stop(InstId, _State) -> +on_stop(InstId, State) -> ?SLOG(info, #{ msg => "stopping_postgresql_connector", connector => InstId }), + close_connections(State), emqx_resource_pool:stop(InstId). +close_connections(#{pool_name := PoolName} = _State) -> + WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + close_connections_with_worker_pids(WorkerPids), + ok. + +close_connections_with_worker_pids([WorkerPid | Rest]) -> + %% We ignore errors since any error probably means that the + %% connection is closed already. + try ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + _ = epgsql:close(Conn), + close_connections_with_worker_pids(Rest); + _ -> + close_connections_with_worker_pids(Rest) + catch + _:_ -> + close_connections_with_worker_pids(Rest) + end; +close_connections_with_worker_pids([]) -> + ok. + +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId, + ChannelConfig +) -> + %% The following will throw an exception if the bridge producers fails to start + {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig), + case ChannelState of + #{prepares := {error, Reason}} -> + {error, {unhealthy_target, Reason}}; + _ -> + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState} + end. + +create_channel_state( + ChannelId, + #{pool_name := PoolName} = _ConnectorState, + #{parameters := Parameters} = _ChannelConfig +) -> + State1 = parse_prepare_sql(Parameters, ChannelId), + {ok, + init_prepare(State1#{ + pool_name => PoolName, + prepare_statement => #{} + })}. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + %% Close prepared statements + ok = close_prepared_statement(ChannelId, OldState), + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) -> + WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + close_prepared_statement(WorkerPids, ChannelId, State), + ok. + +close_prepared_statement([WorkerPid | Rest], ChannelId, State) -> + %% We ignore errors since any error probably means that the + %% prepared statement doesn't exist. + try ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + Statement = get_prepared_statement(ChannelId, State), + _ = epgsql:close(Conn, Statement), + close_prepared_statement(Rest, ChannelId, State); + _ -> + close_prepared_statement(Rest, ChannelId, State) + catch + _:_ -> + close_prepared_statement(Rest, ChannelId, State) + end; +close_prepared_statement([], _ChannelId, _State) -> + ok. + +on_get_channel_status( + _ResId, + ChannelId, + #{ + pool_name := PoolName, + installed_channels := Channels + } = _State +) -> + ChannelState = maps:get(ChannelId, Channels), + case + do_check_channel_sql( + PoolName, + ChannelId, + ChannelState + ) + of + ok -> + connected; + {error, undefined_table} -> + {error, {unhealthy_target, <<"Table does not exist">>}}; + {error, _Reason} -> + %% do not log error, it is logged in prepare_sql_to_conn + connecting + end. + +do_check_channel_sql( + PoolName, + ChannelId, + #{query_templates := ChannelQueryTemplates} = _ChannelState +) -> + {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates), + WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + validate_table_existence(WorkerPids, SQL). + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + on_query(InstId, {TypeOrKey, NameOrSQL}, State) -> on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); on_query( @@ -187,10 +319,10 @@ pgsql_query_type(_) -> on_batch_query( InstId, [{Key, _} = Request | _] = BatchReq, - #{pool_name := PoolName, query_templates := Templates, prepares := PrepStatements} = State + #{pool_name := PoolName} = State ) -> BinKey = to_bin(Key), - case maps:get(BinKey, Templates, undefined) of + case get_template(BinKey, State) of undefined -> Log = #{ connector => InstId, @@ -201,7 +333,7 @@ on_batch_query( ?SLOG(error, Log), {error, {unrecoverable_error, batch_prepare_not_implemented}}; {_Statement, RowTemplate} -> - PrepStatement = maps:get(BinKey, PrepStatements), + PrepStatement = get_prepared_statement(BinKey, State), Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq], case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of {error, _Error} = Result -> @@ -223,15 +355,35 @@ proc_sql_params(query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; -proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) -> - Key = to_bin(TypeOrKey), - case maps:get(Key, Templates, undefined) of +proc_sql_params(TypeOrKey, SQLOrData, Params, State) -> + BinKey = to_bin(TypeOrKey), + case get_template(BinKey, State) of undefined -> {SQLOrData, Params}; {_Statement, RowTemplate} -> - {Key, render_prepare_sql_row(RowTemplate, SQLOrData)} + {BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)} end. +get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) -> + BinKey = to_bin(Key), + ChannelState = maps:get(BinKey, Channels), + ChannelQueryTemplates = maps:get(query_templates, ChannelState), + maps:get(BinKey, ChannelQueryTemplates); +get_template(Key, #{query_templates := Templates}) -> + BinKey = to_bin(Key), + maps:get(BinKey, Templates, undefined). + +get_prepared_statement(Key, #{installed_channels := Channels} = _State) when + is_map_key(Key, Channels) +-> + BinKey = to_bin(Key), + ChannelState = maps:get(BinKey, Channels), + ChannelPreparedStatements = maps:get(prepares, ChannelState), + maps:get(BinKey, ChannelPreparedStatements); +get_prepared_statement(Key, #{prepares := PrepStatements}) -> + BinKey = to_bin(Key), + maps:get(BinKey, PrepStatements). + on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of {error, Reason} = Result -> @@ -415,13 +567,13 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) -> conn_opts([_Opt | Opts], Acc) -> conn_opts(Opts, Acc). -parse_prepare_sql(Config) -> +parse_prepare_sql(Config, SQLID) -> Queries = case Config of #{prepare_statement := Qs} -> Qs; #{sql := Query} -> - #{<<"send_message">> => Query}; + #{SQLID => Query}; #{} -> #{} end, diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl new file mode 100644 index 000000000..d709f87c3 --- /dev/null +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -0,0 +1,201 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_postgresql_connector_schema). + +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_postgresql/include/emqx_postgresql.hrl"). + +-define(PGSQL_HOST_OPTIONS, #{ + default_port => ?PGSQL_DEFAULT_PORT +}). + +-export([ + roots/0, + fields/1 +]). + +%% Examples +-export([ + connector_examples/1, + values/1 +]). + +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields("config_connector") -> + [{server, server()}] ++ + adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ + emqx_connector_schema_lib:ssl_fields(); +fields(config) -> + fields("config_connector") ++ + fields(action); +fields(action) -> + {pgsql, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql_schema, pgsql_action)), + #{ + desc => <<"PostgreSQL Action Config">>, + required => false + } + )}; +fields(pgsql_action) -> + emqx_bridge_v2_schema:make_producer_action_schema(hoconsc:ref(?MODULE, action_parameters)); +%% TODO: All of these needs to be fixed +fields("put_bridge_v2") -> + fields(pgsql_action); +fields("get_bridge_v2") -> + fields(pgsql_action); +fields("post_bridge_v2") -> + fields(pgsql_action); +fields("put_connector") -> + fields("config_connector"); +fields("get_connector") -> + fields("config_connector"); +fields("post_connector") -> + fields("config_connector"). + +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS). + +adjust_fields(Fields) -> + lists:map( + fun + ({username, Sc}) -> + %% to please dialyzer... + Override = #{type => hocon_schema:field_schema(Sc, type), required => true}, + {username, hocon_schema:override(Sc, Override)}; + (Field) -> + Field + end, + Fields + ). + +%% Examples +connector_examples(Method) -> + [ + #{ + <<"pgsql">> => #{ + summary => <<"PostgreSQL Producer Connector">>, + value => values({Method, connector}) + } + } + ]. + +%% TODO: All of these needs to be adjusted from Kafka to PostgreSQL +values({get, PostgreSQLType}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values({post, PostgreSQLType}) + ); +values({post, connector}) -> + maps:merge( + #{ + name => <<"my_pgsql_connector">>, + type => <<"pgsql">> + }, + values(common_config) + ); +values({post, PostgreSQLType}) -> + maps:merge( + #{ + name => <<"my_pgsql_action">>, + type => <<"pgsql">> + }, + values({put, PostgreSQLType}) + ); +values({put, bridge_v2_producer}) -> + values(bridge_v2_producer); +values({put, connector}) -> + values(common_config); +values({put, PostgreSQLType}) -> + maps:merge(values(common_config), values(PostgreSQLType)); +values(bridge_v2_producer) -> + maps:merge( + #{ + enable => true, + connector => <<"my_pgsql_connector">>, + resource_opts => #{ + health_check_interval => "32s" + } + }, + values(producer) + ); +values(common_config) -> + #{ + authentication => #{ + mechanism => <<"plain">>, + username => <<"username">>, + password => <<"******">> + }, + bootstrap_hosts => <<"localhost:9092">>, + connect_timeout => <<"5s">>, + enable => true, + metadata_request_timeout => <<"4s">>, + min_metadata_refresh_interval => <<"3s">>, + socket_opts => #{ + sndbuf => <<"1024KB">>, + recbuf => <<"1024KB">>, + nodelay => true, + tcp_keepalive => <<"none">> + } + }; +values(producer) -> + #{ + kafka => #{ + topic => <<"kafka-topic">>, + message => #{ + key => <<"${.clientid}">>, + value => <<"${.}">>, + timestamp => <<"${.timestamp}">> + }, + max_batch_bytes => <<"896KB">>, + compression => <<"no_compression">>, + partition_strategy => <<"random">>, + required_acks => <<"all_isr">>, + partition_count_refresh_interval => <<"60s">>, + kafka_headers => <<"${pub_props}">>, + kafka_ext_headers => [ + #{ + kafka_ext_header_key => <<"clientid">>, + kafka_ext_header_value => <<"${clientid}">> + }, + #{ + kafka_ext_header_key => <<"topic">>, + kafka_ext_header_value => <<"${topic}">> + } + ], + kafka_header_value_encode_mode => none, + max_inflight => 10, + buffer => #{ + mode => <<"hybrid">>, + per_partition_limit => <<"2GB">>, + segment_bytes => <<"100MB">>, + memory_overload_protection => true + } + }, + local_topic => <<"mqtt/local/topic">> + }.