Merge pull request #13546 from thalesmg/20240730-r58-pulsar-query-mode

feat: expose `resource_opts.query_mode` for pulsar action
This commit is contained in:
Thales Macedo Garitezi 2024-08-01 14:19:16 -03:00 committed by GitHub
commit 4250d01363
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 182 additions and 94 deletions

View File

@ -889,7 +889,8 @@ t_sync_query_down(Config, Opts) ->
), ),
?force_ordering( ?force_ordering(
#{?snk_kind := call_query}, #{?snk_kind := SNKKind} when
SNKKind =:= call_query orelse SNKKind =:= simple_query_enter,
#{?snk_kind := cut_connection, ?snk_span := start} #{?snk_kind := cut_connection, ?snk_span := start}
), ),
%% Note: order of arguments here is reversed compared to `?force_ordering'. %% Note: order of arguments here is reversed compared to `?force_ordering'.
@ -913,6 +914,7 @@ t_sync_query_down(Config, Opts) ->
emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort) emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort)
) )
end), end),
?tp("publishing_message", #{}),
try try
{_, {ok, _}} = {_, {ok, _}} =
snabbkaffe:wait_async_action( snabbkaffe:wait_async_action(
@ -921,6 +923,7 @@ t_sync_query_down(Config, Opts) ->
infinity infinity
) )
after after
?tp("healing_failure", #{}),
emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort) emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort)
end, end,
{ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity), {ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [ {application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"}, {description, "EMQX Pulsar Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -11,7 +11,8 @@
action_type_name/0, action_type_name/0,
connector_type_name/0, connector_type_name/0,
schema_module/0, schema_module/0,
is_action/1 is_action/1,
connector_action_config_to_bridge_v1_config/2
]). ]).
is_action(_) -> true. is_action(_) -> true.
@ -23,3 +24,28 @@ action_type_name() -> pulsar.
connector_type_name() -> pulsar. connector_type_name() -> pulsar.
schema_module() -> emqx_bridge_pulsar_pubsub_schema. schema_module() -> emqx_bridge_pulsar_pubsub_schema.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
BridgeV1Config1 = emqx_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig, ActionConfig
),
BridgeV1Config = maps:with(v1_fields(pulsar_producer), BridgeV1Config1),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun(RO) -> maps:with(v1_fields(producer_resource_opts), RO) end,
BridgeV1Config
).
%%------------------------------------------------------------------------------------------
%% Internal helper functions
%%------------------------------------------------------------------------------------------
v1_fields(Struct) ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_pulsar:fields(Struct)
].
to_bin(B) when is_binary(B) -> B;
to_bin(L) when is_list(L) -> list_to_binary(L);
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

View File

@ -58,6 +58,8 @@
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
query_mode(#{resource_opts := #{query_mode := sync}}) ->
simple_sync_internal_buffer;
query_mode(_Config) -> query_mode(_Config) ->
simple_async_internal_buffer. simple_async_internal_buffer.
@ -202,12 +204,17 @@ on_query(_InstanceId, {ChannelId, Message}, State) ->
sync_timeout => SyncTimeout, sync_timeout => SyncTimeout,
is_async => false is_async => false
}), }),
try ?tp_span(
pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) "pulsar_producer_query_enter",
catch #{instance_id => _InstanceId, message => Message, mode => sync},
error:timeout -> try
{error, timeout} ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => sync}),
end pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
catch
error:timeout ->
{error, timeout}
end
)
end. end.
-spec on_query_async( -spec on_query_async(
@ -218,11 +225,11 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) ->
#{channels := Channels} = State, #{channels := Channels} = State,
case maps:find(ChannelId, Channels) of case maps:find(ChannelId, Channels) of
error -> error ->
{error, channel_not_found}; {error, {unrecoverable_error, channel_not_found}};
{ok, #{message := MessageTmpl, producers := Producers}} -> {ok, #{message := MessageTmpl, producers := Producers}} ->
?tp_span( ?tp_span(
pulsar_producer_on_query_async, "pulsar_producer_query_enter",
#{instance_id => _InstanceId, message => Message}, #{instance_id => _InstanceId, message => Message, mode => async},
on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn)
) )
end. end.
@ -233,6 +240,7 @@ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ->
message => PulsarMessage, message => PulsarMessage,
is_async => true is_async => true
}), }),
?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => async}),
pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
on_format_query_result({ok, Info}) -> on_format_query_result({ok, Info}) ->

View File

@ -66,10 +66,8 @@ fields(action_resource_opts) ->
batch_size, batch_size,
batch_time, batch_time,
worker_pool_size, worker_pool_size,
request_ttl,
inflight_window, inflight_window,
max_buffer_bytes, max_buffer_bytes
query_mode
], ],
lists:filter( lists:filter(
fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,

View File

@ -843,7 +843,8 @@ do_t_send_with_failure(Config, FailureType) ->
?wait_async_action( ?wait_async_action(
emqx:publish(Message0), emqx:publish(Message0),
#{ #{
?snk_kind := pulsar_producer_on_query_async, ?snk_kind := "pulsar_producer_query_enter",
mode := async,
?snk_span := {complete, _} ?snk_span := {complete, _}
}, },
5_000 5_000
@ -970,7 +971,11 @@ t_producer_process_crash(Config) ->
{_, {ok, _}} = {_, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqx:publish(Message0), emqx:publish(Message0),
#{?snk_kind := pulsar_producer_on_query_async, ?snk_span := {complete, _}}, #{
?snk_kind := "pulsar_producer_query_enter",
mode := async,
?snk_span := {complete, _}
},
5_000 5_000
), ),
Data0 = receive_consumed(20_000), Data0 = receive_consumed(20_000),

View File

@ -23,31 +23,25 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
all() -> all() ->
[ All0 = emqx_common_test_helpers:all(?MODULE),
{group, plain}, All = All0 -- matrix_cases(),
{group, tls} Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
]. Groups ++ All.
groups() -> groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE), emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
[
{plain, AllTCs}, matrix_cases() ->
{tls, AllTCs} emqx_common_test_helpers:all(?MODULE).
].
init_per_suite(Config) -> init_per_suite(Config) ->
%% Ensure enterprise bridge module is loaded
_ = emqx_bridge_enterprise:module_info(),
{ok, Cwd} = file:get_cwd(),
PrivDir = ?config(priv_dir, Config),
WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd),
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
lists:flatten([ lists:flatten([
?APPS, ?APPS,
emqx_management, emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard() emqx_mgmt_api_test_util:emqx_dashboard()
]), ]),
#{work_dir => WorkDir} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
[{suite_apps, Apps} | Config]. [{suite_apps, Apps} | Config].
@ -61,6 +55,7 @@ init_per_group(plain = Type, Config) ->
case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
true -> true ->
Config1 = common_init_per_group(), Config1 = common_init_per_group(),
ConnectorName = ?MODULE,
NewConfig = NewConfig =
[ [
{proxy_name, ProxyName}, {proxy_name, ProxyName},
@ -70,7 +65,7 @@ init_per_group(plain = Type, Config) ->
{use_tls, false} {use_tls, false}
| Config1 ++ Config | Config1 ++ Config
], ],
create_connector(?MODULE, NewConfig), create_connector(ConnectorName, NewConfig),
NewConfig; NewConfig;
false -> false ->
maybe_skip_without_ci() maybe_skip_without_ci()
@ -82,6 +77,7 @@ init_per_group(tls = Type, Config) ->
case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
true -> true ->
Config1 = common_init_per_group(), Config1 = common_init_per_group(),
ConnectorName = ?MODULE,
NewConfig = NewConfig =
[ [
{proxy_name, ProxyName}, {proxy_name, ProxyName},
@ -91,17 +87,21 @@ init_per_group(tls = Type, Config) ->
{use_tls, true} {use_tls, true}
| Config1 ++ Config | Config1 ++ Config
], ],
create_connector(?MODULE, NewConfig), create_connector(ConnectorName, NewConfig),
NewConfig; NewConfig;
false -> false ->
maybe_skip_without_ci() maybe_skip_without_ci()
end. end;
init_per_group(_Group, Config) ->
Config.
end_per_group(Group, Config) when end_per_group(Group, Config) when
Group =:= plain; Group =:= plain;
Group =:= tls Group =:= tls
-> ->
common_end_per_group(Config), common_end_per_group(Config),
ok;
end_per_group(_Group, _Config) ->
ok. ok.
common_init_per_group() -> common_init_per_group() ->
@ -189,66 +189,49 @@ pulsar_connector(Config) ->
":", ":",
integer_to_binary(PulsarPort) integer_to_binary(PulsarPort)
]), ]),
Connector = #{ InnerConfigMap = #{
<<"connectors">> => #{ <<"enable">> => true,
<<"pulsar">> => #{ <<"ssl">> => #{
Name => #{ <<"enable">> => UseTLS,
<<"enable">> => true, <<"verify">> => <<"verify_none">>,
<<"ssl">> => #{ <<"server_name_indication">> => <<"auto">>
<<"enable">> => UseTLS, },
<<"verify">> => <<"verify_none">>, <<"authentication">> => <<"none">>,
<<"server_name_indication">> => <<"auto">> <<"servers">> => ServerURL
},
<<"authentication">> => <<"none">>,
<<"servers">> => ServerURL
}
}
}
}, },
parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name). emqx_bridge_v2_testlib:parse_and_check_connector(?TYPE, Name, InnerConfigMap).
pulsar_action(Config) -> pulsar_action(Config) ->
QueryMode = proplists:get_value(query_mode, Config, <<"sync">>),
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
Action = #{ InnerConfigMap = #{
<<"actions">> => #{ <<"connector">> => Name,
<<"pulsar">> => #{ <<"enable">> => true,
Name => #{ <<"parameters">> => #{
<<"connector">> => Name, <<"retention_period">> => <<"infinity">>,
<<"enable">> => true, <<"max_batch_bytes">> => <<"1MB">>,
<<"parameters">> => #{ <<"batch_size">> => 100,
<<"retention_period">> => <<"infinity">>, <<"strategy">> => <<"random">>,
<<"max_batch_bytes">> => <<"1MB">>, <<"buffer">> => #{
<<"batch_size">> => 100, <<"mode">> => <<"memory">>,
<<"strategy">> => <<"random">>, <<"per_partition_limit">> => <<"10MB">>,
<<"buffer">> => #{ <<"segment_bytes">> => <<"5MB">>,
<<"mode">> => <<"memory">>, <<"memory_overload_protection">> => true
<<"per_partition_limit">> => <<"10MB">>, },
<<"segment_bytes">> => <<"5MB">>, <<"message">> => #{
<<"memory_overload_protection">> => true <<"key">> => <<"${.clientid}">>,
}, <<"value">> => <<"${.}">>
<<"message">> => #{ },
<<"key">> => <<"${.clientid}">>, <<"pulsar_topic">> => ?config(pulsar_topic, Config)
<<"value">> => <<"${.}">> },
}, <<"resource_opts">> => #{
<<"pulsar_topic">> => ?config(pulsar_topic, Config) <<"query_mode">> => QueryMode,
}, <<"request_ttl">> => <<"1s">>,
<<"resource_opts">> => #{ <<"health_check_interval">> => <<"1s">>,
<<"health_check_interval">> => <<"1s">>, <<"metrics_flush_interval">> => <<"300ms">>
<<"metrics_flush_interval">> => <<"300ms">>
}
}
}
} }
}, },
parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name). emqx_bridge_v2_testlib:parse_and_check(action, ?TYPE, Name, InnerConfigMap).
parse_and_check(Key, Mod, Conf, Name) ->
ConfStr = hocon_pp:do(Conf, #{}),
ct:pal(ConfStr),
{ok, RawConf} = hocon:binary(ConfStr, #{format => map}),
hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}),
#{Key := #{<<"pulsar">> := #{Name := RetConf}}} = RawConf,
RetConf.
instance_id(Type, Name) -> instance_id(Type, Name) ->
ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name), ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name),
@ -404,20 +387,44 @@ assert_status_api(Line, Type, Name, Status) ->
). ).
-define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)). -define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)).
proplists_with(Keys, PList) ->
lists:filter(fun({K, _}) -> lists:member(K, Keys) end, PList).
group_path(Config) ->
case emqx_common_test_helpers:group_path(Config) of
[] ->
undefined;
Path ->
Path
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_action_probe(Config) -> t_action_probe(matrix) ->
[[plain], [tls]];
t_action_probe(Config) when is_list(Config) ->
Name = atom_to_binary(?FUNCTION_NAME), Name = atom_to_binary(?FUNCTION_NAME),
Action = pulsar_action(Config), Action = pulsar_action(Config),
{ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
?assertMatch({{_, 204, _}, _, _}, Res0), ?assertMatch({{_, 204, _}, _, _}, Res0),
ok. ok.
t_action(Config) -> t_action(matrix) ->
[
[plain, async],
[plain, sync],
[tls, async]
];
t_action(Config) when is_list(Config) ->
QueryMode =
case group_path(Config) of
[_, QM | _] -> atom_to_binary(QM);
_ -> <<"async">>
end,
Name = atom_to_binary(?FUNCTION_NAME), Name = atom_to_binary(?FUNCTION_NAME),
create_action(Name, Config), create_action(Name, [{query_mode, QueryMode} | Config]),
Actions = emqx_bridge_v2:list(actions), Actions = emqx_bridge_v2:list(actions),
Any = fun(#{name := BName}) -> BName =:= Name end, Any = fun(#{name := BName}) -> BName =:= Name end,
?assert(lists:any(Any, Actions), Actions), ?assert(lists:any(Any, Actions), Actions),
@ -465,7 +472,9 @@ t_action(Config) ->
%% Tests that deleting/disabling an action that share the same Pulsar topic with other %% Tests that deleting/disabling an action that share the same Pulsar topic with other
%% actions do not disturb the latter. %% actions do not disturb the latter.
t_multiple_actions_sharing_topic(Config) -> t_multiple_actions_sharing_topic(matrix) ->
[[plain], [tls]];
t_multiple_actions_sharing_topic(Config) when is_list(Config) ->
Type = ?TYPE, Type = ?TYPE,
ConnectorName = <<"c">>, ConnectorName = <<"c">>,
ConnectorConfig = pulsar_connector(Config), ConnectorConfig = pulsar_connector(Config),
@ -546,3 +555,31 @@ t_multiple_actions_sharing_topic(Config) ->
[] []
), ),
ok. ok.
t_sync_query_down(matrix) ->
[[plain]];
t_sync_query_down(Config0) when is_list(Config0) ->
ct:timetrap({seconds, 15}),
Payload = #{<<"x">> => <<"some data">>},
PayloadBin = emqx_utils_json:encode(Payload),
ClientId = <<"some_client">>,
Opts = #{
make_message_fn => fun(Topic) -> emqx_message:make(ClientId, Topic, PayloadBin) end,
enter_tp_filter =>
?match_event(#{?snk_kind := "pulsar_producer_send"}),
error_tp_filter =>
?match_event(#{?snk_kind := "resource_simple_sync_internal_buffer_query_timeout"}),
success_tp_filter =>
?match_event(#{?snk_kind := pulsar_echo_consumer_message})
},
Config = [
{connector_type, ?TYPE},
{connector_name, ?FUNCTION_NAME},
{connector_config, pulsar_connector(Config0)},
{action_type, ?TYPE},
{action_name, ?FUNCTION_NAME},
{action_config, pulsar_action(Config0)}
| proplists_with([proxy_name, proxy_host, proxy_port], Config0)
],
emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts),
ok.

View File

@ -198,6 +198,9 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1), QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
case simple_async_query(Id, Request, QueryOpts) of case simple_async_query(Id, Request, QueryOpts) of
{error, _} = Error -> {error, _} = Error ->
?tp("resource_simple_sync_internal_buffer_query_error", #{
id => Id, request => Request
}),
Error; Error;
{async_return, {error, _} = Error} -> {async_return, {error, _} = Error} ->
Error; Error;
@ -210,7 +213,11 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
receive receive
{ReplyAlias, Response} -> {ReplyAlias, Response} ->
Response Response
after 0 -> {error, timeout} after 0 ->
?tp("resource_simple_sync_internal_buffer_query_timeout", #{
id => Id, request => Request
}),
{error, timeout}
end end
end end
end end
@ -1302,6 +1309,7 @@ do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Res
?tp(simple_query_override, #{query_mode => ReqQM}), ?tp(simple_query_override, #{query_mode => ReqQM}),
#{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
CallMode = call_mode(QM, CBM), CallMode = call_mode(QM, CBM),
?tp(simple_query_enter, #{}),
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
@ -1309,6 +1317,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Res
%% The connector supports buffer, send even in disconnected state %% The connector supports buffer, send even in disconnected state
#{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
CallMode = call_mode(QM, CBM), CallMode = call_mode(QM, CBM),
?tp(simple_query_enter, #{}),
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) -> do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) ->
%% when calling from the buffer worker or other simple queries, %% when calling from the buffer worker or other simple queries,
@ -2297,6 +2306,7 @@ reply_call(Alias, Response) ->
%% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to' %% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to'
%% callbacks. %% callbacks.
reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) -> reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) ->
?tp("reply_call_internal_buffer", #{}),
?MODULE:reply_call(ReplyAlias, Response), ?MODULE:reply_call(ReplyAlias, Response),
do_reply_caller(MaybeReplyTo, Response). do_reply_caller(MaybeReplyTo, Response).

View File

@ -0,0 +1 @@
Added the option to configure the query mode for Pulsar Producer action.