diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl index 7e503cebb..82efd609e 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl @@ -102,6 +102,18 @@ init_per_group(Group, Config) when {connector_type, group_to_type(Group)} | Config ]; +init_per_group(batch_enabled, Config) -> + [ + {batch_size, 10}, + {batch_time, <<"10ms">>} + | Config + ]; +init_per_group(batch_disabled, Config) -> + [ + {batch_size, 1}, + {batch_time, <<"0ms">>} + | Config + ]; init_per_group(_Group, Config) -> Config. @@ -262,17 +274,68 @@ t_start_action_or_source_with_disabled_connector(Config) -> ok. t_disable_prepared_statements(matrix) -> - [[postgres], [timescale], [matrix]]; + [ + [postgres, batch_disabled], + [postgres, batch_enabled], + [timescale, batch_disabled], + [timescale, batch_enabled], + [matrix, batch_disabled], + [matrix, batch_enabled] + ]; t_disable_prepared_statements(Config0) -> + BatchSize = ?config(batch_size, Config0), + BatchTime = ?config(batch_time, Config0), ConnectorConfig0 = ?config(connector_config, Config0), ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}), - Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}), - ok = emqx_bridge_v2_testlib:t_sync_query( - Config, - fun make_message/0, - fun(Res) -> ?assertMatch({ok, _}, Res) end, - postgres_bridge_connector_on_query_return + BridgeConfig0 = ?config(bridge_config, Config0), + BridgeConfig = emqx_utils_maps:deep_merge( + BridgeConfig0, + #{ + <<"resource_opts">> => #{ + <<"batch_size">> => BatchSize, + <<"batch_time">> => BatchTime, + <<"query_mode">> => <<"async">> + } + } ), + Config1 = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}), + Config = lists:keyreplace(bridge_config, 1, Config1, {bridge_config, BridgeConfig}), + ?check_trace( + #{timetrap => 5_000}, + begin + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge_api(Config)), + RuleTopic = <<"t/postgres">>, + Type = ?config(bridge_type, Config), + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config), + ResourceId = emqx_bridge_v2_testlib:resource_id(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + {ok, C} = emqtt:start_link(), + {ok, _} = emqtt:connect(C), + lists:foreach( + fun(N) -> + emqtt:publish(C, RuleTopic, integer_to_binary(N)) + end, + lists:seq(1, BatchSize) + ), + case BatchSize > 1 of + true -> + ?block_until(#{ + ?snk_kind := "postgres_success_batch_result", + row_count := BatchSize + }), + ok; + false -> + ok + end, + ok + end, + [] + ), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), ok = emqx_bridge_v2_testlib:t_create_via_http(Config), diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 54bce1006..01b07cc86 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -420,8 +420,12 @@ get_templated_statement(Key, #{installed_channels := Channels} = _State) when -> BinKey = to_bin(Key), ChannelState = maps:get(BinKey, Channels), - ChannelPreparedStatements = maps:get(prepares, ChannelState), - maps:get(BinKey, ChannelPreparedStatements); + case ChannelState of + #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} -> + ExprTemplate; + #{prepares := #{BinKey := ExprTemplate}} -> + ExprTemplate + end; get_templated_statement(Key, #{prepares := PrepStatements}) -> BinKey = to_bin(Key), maps:get(BinKey, PrepStatements). @@ -785,6 +789,7 @@ handle_batch_result([{error, Error} | _Rest], _Acc) -> TranslatedError = translate_to_log_context(Error), {error, {unrecoverable_error, export_error(TranslatedError)}}; handle_batch_result([], Acc) -> + ?tp("postgres_success_batch_result", #{row_count => Acc}), {ok, Acc}. translate_to_log_context({error, Reason}) ->