From 9f97bff7d0f83b58d7b6ac71c1458d48e0a480e4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 30 Jul 2024 18:21:53 -0300 Subject: [PATCH] feat: expose `resource_opts.query_mode` for pulsar action Fixes https://emqx.atlassian.net/browse/EMQX-12782 --- .../test/emqx_bridge_v2_testlib.erl | 5 +- .../src/emqx_bridge_pulsar.app.src | 2 +- .../src/emqx_bridge_pulsar_action_info.erl | 28 ++- .../src/emqx_bridge_pulsar_connector.erl | 26 ++- .../src/emqx_bridge_pulsar_pubsub_schema.erl | 4 +- .../emqx_bridge_pulsar_connector_SUITE.erl | 9 +- .../test/emqx_bridge_pulsar_v2_SUITE.erl | 189 +++++++++++------- .../src/emqx_resource_buffer_worker.erl | 12 +- changes/ee/feat-13546.en.md | 1 + 9 files changed, 182 insertions(+), 94 deletions(-) create mode 100644 changes/ee/feat-13546.en.md diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index d98f4f926..46d1883bd 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -889,7 +889,8 @@ t_sync_query_down(Config, Opts) -> ), ?force_ordering( - #{?snk_kind := call_query}, + #{?snk_kind := SNKKind} when + SNKKind =:= call_query orelse SNKKind =:= simple_query_enter, #{?snk_kind := cut_connection, ?snk_span := start} ), %% Note: order of arguments here is reversed compared to `?force_ordering'. @@ -913,6 +914,7 @@ t_sync_query_down(Config, Opts) -> emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort) ) end), + ?tp("publishing_message", #{}), try {_, {ok, _}} = snabbkaffe:wait_async_action( @@ -921,6 +923,7 @@ t_sync_query_down(Config, Opts) -> infinity ) after + ?tp("healing_failure", #{}), emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort) end, {ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity), diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index a8eeba483..dcb86a3ca 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl index 6d15687f6..fb9a38cc6 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl @@ -11,7 +11,8 @@ action_type_name/0, connector_type_name/0, schema_module/0, - is_action/1 + is_action/1, + connector_action_config_to_bridge_v1_config/2 ]). is_action(_) -> true. @@ -23,3 +24,28 @@ action_type_name() -> pulsar. connector_type_name() -> pulsar. schema_module() -> emqx_bridge_pulsar_pubsub_schema. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + BridgeV1Config1 = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, ActionConfig + ), + BridgeV1Config = maps:with(v1_fields(pulsar_producer), BridgeV1Config1), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun(RO) -> maps:with(v1_fields(producer_resource_opts), RO) end, + BridgeV1Config + ). + +%%------------------------------------------------------------------------------------------ +%% Internal helper functions +%%------------------------------------------------------------------------------------------ + +v1_fields(Struct) -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_pulsar:fields(Struct) + ]. + +to_bin(B) when is_binary(B) -> B; +to_bin(L) when is_list(L) -> list_to_binary(L); +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 9d269493d..c98dd19ed 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -58,6 +58,8 @@ callback_mode() -> async_if_possible. +query_mode(#{resource_opts := #{query_mode := sync}}) -> + simple_sync_internal_buffer; query_mode(_Config) -> simple_async_internal_buffer. @@ -202,12 +204,17 @@ on_query(_InstanceId, {ChannelId, Message}, State) -> sync_timeout => SyncTimeout, is_async => false }), - try - pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) - catch - error:timeout -> - {error, timeout} - end + ?tp_span( + "pulsar_producer_query_enter", + #{instance_id => _InstanceId, message => Message, mode => sync}, + try + ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => sync}), + pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) + catch + error:timeout -> + {error, timeout} + end + ) end. -spec on_query_async( @@ -218,11 +225,11 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of error -> - {error, channel_not_found}; + {error, {unrecoverable_error, channel_not_found}}; {ok, #{message := MessageTmpl, producers := Producers}} -> ?tp_span( - pulsar_producer_on_query_async, - #{instance_id => _InstanceId, message => Message}, + "pulsar_producer_query_enter", + #{instance_id => _InstanceId, message => Message, mode => async}, on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ) end. @@ -233,6 +240,7 @@ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) -> message => PulsarMessage, is_async => true }), + ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => async}), pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). on_format_query_result({ok, Info}) -> diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl index dff62843e..515fcdb5a 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl @@ -66,10 +66,8 @@ fields(action_resource_opts) -> batch_size, batch_time, worker_pool_size, - request_ttl, inflight_window, - max_buffer_bytes, - query_mode + max_buffer_bytes ], lists:filter( fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl index cd54e2194..0a908f5be 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl @@ -843,7 +843,8 @@ do_t_send_with_failure(Config, FailureType) -> ?wait_async_action( emqx:publish(Message0), #{ - ?snk_kind := pulsar_producer_on_query_async, + ?snk_kind := "pulsar_producer_query_enter", + mode := async, ?snk_span := {complete, _} }, 5_000 @@ -970,7 +971,11 @@ t_producer_process_crash(Config) -> {_, {ok, _}} = ?wait_async_action( emqx:publish(Message0), - #{?snk_kind := pulsar_producer_on_query_async, ?snk_span := {complete, _}}, + #{ + ?snk_kind := "pulsar_producer_query_enter", + mode := async, + ?snk_span := {complete, _} + }, 5_000 ), Data0 = receive_consumed(20_000), diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl index 11caa15c6..94534fafd 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl @@ -23,31 +23,25 @@ %%------------------------------------------------------------------------------ all() -> - [ - {group, plain}, - {group, tls} - ]. + All0 = emqx_common_test_helpers:all(?MODULE), + All = All0 -- matrix_cases(), + Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()), + Groups ++ All. groups() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), - [ - {plain, AllTCs}, - {tls, AllTCs} - ]. + emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()). + +matrix_cases() -> + emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - %% Ensure enterprise bridge module is loaded - _ = emqx_bridge_enterprise:module_info(), - {ok, Cwd} = file:get_cwd(), - PrivDir = ?config(priv_dir, Config), - WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd), Apps = emqx_cth_suite:start( lists:flatten([ ?APPS, emqx_management, emqx_mgmt_api_test_util:emqx_dashboard() ]), - #{work_dir => WorkDir} + #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{suite_apps, Apps} | Config]. @@ -61,6 +55,7 @@ init_per_group(plain = Type, Config) -> case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of true -> Config1 = common_init_per_group(), + ConnectorName = ?MODULE, NewConfig = [ {proxy_name, ProxyName}, @@ -70,7 +65,7 @@ init_per_group(plain = Type, Config) -> {use_tls, false} | Config1 ++ Config ], - create_connector(?MODULE, NewConfig), + create_connector(ConnectorName, NewConfig), NewConfig; false -> maybe_skip_without_ci() @@ -82,6 +77,7 @@ init_per_group(tls = Type, Config) -> case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of true -> Config1 = common_init_per_group(), + ConnectorName = ?MODULE, NewConfig = [ {proxy_name, ProxyName}, @@ -91,17 +87,21 @@ init_per_group(tls = Type, Config) -> {use_tls, true} | Config1 ++ Config ], - create_connector(?MODULE, NewConfig), + create_connector(ConnectorName, NewConfig), NewConfig; false -> maybe_skip_without_ci() - end. + end; +init_per_group(_Group, Config) -> + Config. end_per_group(Group, Config) when Group =:= plain; Group =:= tls -> common_end_per_group(Config), + ok; +end_per_group(_Group, _Config) -> ok. common_init_per_group() -> @@ -189,66 +189,49 @@ pulsar_connector(Config) -> ":", integer_to_binary(PulsarPort) ]), - Connector = #{ - <<"connectors">> => #{ - <<"pulsar">> => #{ - Name => #{ - <<"enable">> => true, - <<"ssl">> => #{ - <<"enable">> => UseTLS, - <<"verify">> => <<"verify_none">>, - <<"server_name_indication">> => <<"auto">> - }, - <<"authentication">> => <<"none">>, - <<"servers">> => ServerURL - } - } - } + InnerConfigMap = #{ + <<"enable">> => true, + <<"ssl">> => #{ + <<"enable">> => UseTLS, + <<"verify">> => <<"verify_none">>, + <<"server_name_indication">> => <<"auto">> + }, + <<"authentication">> => <<"none">>, + <<"servers">> => ServerURL }, - parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name). + emqx_bridge_v2_testlib:parse_and_check_connector(?TYPE, Name, InnerConfigMap). pulsar_action(Config) -> + QueryMode = proplists:get_value(query_mode, Config, <<"sync">>), Name = atom_to_binary(?MODULE), - Action = #{ - <<"actions">> => #{ - <<"pulsar">> => #{ - Name => #{ - <<"connector">> => Name, - <<"enable">> => true, - <<"parameters">> => #{ - <<"retention_period">> => <<"infinity">>, - <<"max_batch_bytes">> => <<"1MB">>, - <<"batch_size">> => 100, - <<"strategy">> => <<"random">>, - <<"buffer">> => #{ - <<"mode">> => <<"memory">>, - <<"per_partition_limit">> => <<"10MB">>, - <<"segment_bytes">> => <<"5MB">>, - <<"memory_overload_protection">> => true - }, - <<"message">> => #{ - <<"key">> => <<"${.clientid}">>, - <<"value">> => <<"${.}">> - }, - <<"pulsar_topic">> => ?config(pulsar_topic, Config) - }, - <<"resource_opts">> => #{ - <<"health_check_interval">> => <<"1s">>, - <<"metrics_flush_interval">> => <<"300ms">> - } - } - } + InnerConfigMap = #{ + <<"connector">> => Name, + <<"enable">> => true, + <<"parameters">> => #{ + <<"retention_period">> => <<"infinity">>, + <<"max_batch_bytes">> => <<"1MB">>, + <<"batch_size">> => 100, + <<"strategy">> => <<"random">>, + <<"buffer">> => #{ + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"10MB">>, + <<"segment_bytes">> => <<"5MB">>, + <<"memory_overload_protection">> => true + }, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"value">> => <<"${.}">> + }, + <<"pulsar_topic">> => ?config(pulsar_topic, Config) + }, + <<"resource_opts">> => #{ + <<"query_mode">> => QueryMode, + <<"request_ttl">> => <<"1s">>, + <<"health_check_interval">> => <<"1s">>, + <<"metrics_flush_interval">> => <<"300ms">> } }, - parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name). - -parse_and_check(Key, Mod, Conf, Name) -> - ConfStr = hocon_pp:do(Conf, #{}), - ct:pal(ConfStr), - {ok, RawConf} = hocon:binary(ConfStr, #{format => map}), - hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}), - #{Key := #{<<"pulsar">> := #{Name := RetConf}}} = RawConf, - RetConf. + emqx_bridge_v2_testlib:parse_and_check(action, ?TYPE, Name, InnerConfigMap). instance_id(Type, Name) -> ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name), @@ -404,20 +387,44 @@ assert_status_api(Line, Type, Name, Status) -> ). -define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)). +proplists_with(Keys, PList) -> + lists:filter(fun({K, _}) -> lists:member(K, Keys) end, PList). + +group_path(Config) -> + case emqx_common_test_helpers:group_path(Config) of + [] -> + undefined; + Path -> + Path + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ -t_action_probe(Config) -> +t_action_probe(matrix) -> + [[plain], [tls]]; +t_action_probe(Config) when is_list(Config) -> Name = atom_to_binary(?FUNCTION_NAME), Action = pulsar_action(Config), {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), ?assertMatch({{_, 204, _}, _, _}, Res0), ok. -t_action(Config) -> +t_action(matrix) -> + [ + [plain, async], + [plain, sync], + [tls, async] + ]; +t_action(Config) when is_list(Config) -> + QueryMode = + case group_path(Config) of + [_, QM | _] -> atom_to_binary(QM); + _ -> <<"async">> + end, Name = atom_to_binary(?FUNCTION_NAME), - create_action(Name, Config), + create_action(Name, [{query_mode, QueryMode} | Config]), Actions = emqx_bridge_v2:list(actions), Any = fun(#{name := BName}) -> BName =:= Name end, ?assert(lists:any(Any, Actions), Actions), @@ -465,7 +472,9 @@ t_action(Config) -> %% Tests that deleting/disabling an action that share the same Pulsar topic with other %% actions do not disturb the latter. -t_multiple_actions_sharing_topic(Config) -> +t_multiple_actions_sharing_topic(matrix) -> + [[plain], [tls]]; +t_multiple_actions_sharing_topic(Config) when is_list(Config) -> Type = ?TYPE, ConnectorName = <<"c">>, ConnectorConfig = pulsar_connector(Config), @@ -546,3 +555,31 @@ t_multiple_actions_sharing_topic(Config) -> [] ), ok. + +t_sync_query_down(matrix) -> + [[plain]]; +t_sync_query_down(Config0) when is_list(Config0) -> + ct:timetrap({seconds, 15}), + Payload = #{<<"x">> => <<"some data">>}, + PayloadBin = emqx_utils_json:encode(Payload), + ClientId = <<"some_client">>, + Opts = #{ + make_message_fn => fun(Topic) -> emqx_message:make(ClientId, Topic, PayloadBin) end, + enter_tp_filter => + ?match_event(#{?snk_kind := "pulsar_producer_send"}), + error_tp_filter => + ?match_event(#{?snk_kind := "resource_simple_sync_internal_buffer_query_timeout"}), + success_tp_filter => + ?match_event(#{?snk_kind := pulsar_echo_consumer_message}) + }, + Config = [ + {connector_type, ?TYPE}, + {connector_name, ?FUNCTION_NAME}, + {connector_config, pulsar_connector(Config0)}, + {action_type, ?TYPE}, + {action_name, ?FUNCTION_NAME}, + {action_config, pulsar_action(Config0)} + | proplists_with([proxy_name, proxy_host, proxy_port], Config0) + ], + emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts), + ok. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 516795f39..8eb7b373d 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -198,6 +198,9 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) -> QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1), case simple_async_query(Id, Request, QueryOpts) of {error, _} = Error -> + ?tp("resource_simple_sync_internal_buffer_query_error", #{ + id => Id, request => Request + }), Error; {async_return, {error, _} = Error} -> Error; @@ -210,7 +213,11 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) -> receive {ReplyAlias, Response} -> Response - after 0 -> {error, timeout} + after 0 -> + ?tp("resource_simple_sync_internal_buffer_query_timeout", #{ + id => Id, request => Request + }), + {error, timeout} end end end @@ -1302,6 +1309,7 @@ do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Res ?tp(simple_query_override, #{query_mode => ReqQM}), #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, CallMode = call_mode(QM, CBM), + ?tp(simple_query_enter, #{}), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer @@ -1309,6 +1317,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Res %% The connector supports buffer, send even in disconnected state #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, CallMode = call_mode(QM, CBM), + ?tp(simple_query_enter, #{}), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) -> %% when calling from the buffer worker or other simple queries, @@ -2297,6 +2306,7 @@ reply_call(Alias, Response) -> %% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to' %% callbacks. reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) -> + ?tp("reply_call_internal_buffer", #{}), ?MODULE:reply_call(ReplyAlias, Response), do_reply_caller(MaybeReplyTo, Response). diff --git a/changes/ee/feat-13546.en.md b/changes/ee/feat-13546.en.md new file mode 100644 index 000000000..c403409ac --- /dev/null +++ b/changes/ee/feat-13546.en.md @@ -0,0 +1 @@ +Added the option to configure the query mode for Pulsar Producer action.