feat: pulsar bridge v2

This commit is contained in:
zhongwencool 2024-02-20 09:36:01 +08:00
parent 21e0ecfcce
commit 7f1b4cef27
18 changed files with 584 additions and 264 deletions

View File

@ -110,6 +110,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_es_action_info,
emqx_bridge_opents_action_info,
emqx_bridge_rabbitmq_action_info,
emqx_bridge_pulsar_action_info,
emqx_bridge_greptimedb_action_info,
emqx_bridge_tdengine_action_info,
emqx_bridge_s3_action_info

View File

@ -761,7 +761,7 @@ is_bridge_enabled(BridgeType, BridgeName) ->
end.
is_bridge_enabled_v1(BridgeType, BridgeName) ->
%% we read from the transalted config because the defaults are populated here.
%% we read from the translated config because the defaults are populated here.
try emqx:get_config([bridges, BridgeType, binary_to_existing_atom(BridgeName)]) of
ConfMap ->
maps:get(enable, ConfMap, false)

View File

@ -1659,8 +1659,11 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
connector_conf := ConnectorRawConf,
bridge_v2_type := BridgeV2Type,
bridge_v2_name := _BridgeName,
bridge_v2_conf := BridgeV2RawConf
bridge_v2_conf := BridgeV2RawConf0
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
BridgeV2RawConf = emqx_action_info:action_convert_from_connector(
BridgeType, ConnectorRawConf, BridgeV2RawConf0
),
create_dry_run_helper(
ensure_atom_root_key(ConfRootKey), BridgeV2Type, ConnectorRawConf, BridgeV2RawConf
)
@ -1928,7 +1931,8 @@ convert_from_connectors(ConfRootKey, Conf) ->
convert_from_connector(ConfRootKey, Type, Name, Action = #{<<"connector">> := ConnectorName}) ->
case get_connector_info(ConnectorName, Type) of
{ok, Connector} ->
Action1 = emqx_action_info:action_convert_from_connector(Type, Connector, Action),
TypeAtom = to_existing_atom(Type),
Action1 = emqx_action_info:action_convert_from_connector(TypeAtom, Connector, Action),
{ok, Action1};
{error, not_found} ->
{error, #{

View File

@ -123,7 +123,7 @@ resource_type(dynamo) -> emqx_bridge_dynamo_connector;
resource_type(rocketmq) -> emqx_bridge_rocketmq_connector;
resource_type(sqlserver) -> emqx_bridge_sqlserver_connector;
resource_type(opents) -> emqx_bridge_opents_connector;
resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
resource_type(pulsar_producer) -> emqx_bridge_pulsar_connector;
resource_type(oracle) -> emqx_oracle;
resource_type(iotdb) -> emqx_bridge_iotdb_connector;
resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector;

View File

@ -308,7 +308,7 @@ fields(Field) when
Fields = fields("specific_connector_config"),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
fields(What) ->
error({emqx_bridge_mqtt_connector_schema, missing_field_handler, What}).
error({?MODULE, missing_field_handler, What}).
ingress_pool_size(desc) ->
?DESC("ingress_pool_size");

View File

@ -124,7 +124,7 @@ fields(Field) when
->
emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields("mqtt_subscriber_source"));
fields(What) ->
error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}).
error({?MODULE, missing_field_handler, What}).
%% v2: api schema
%% The parameter equls to
%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"},
{vsn, "0.1.8"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -31,7 +31,21 @@ roots() ->
[].
fields(pulsar_producer) ->
fields(config) ++ fields(producer_opts);
fields(config) ++
emqx_bridge_pulsar_pubsub_schema:fields(action_parameters) ++
fields(producer_opts) ++
[
{local_topic,
mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})},
{resource_opts,
mk(
ref(producer_resource_opts),
#{
required => false,
desc => ?DESC(emqx_resource_schema, "creation_opts")
}
)}
];
fields(config) ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@ -85,10 +99,6 @@ fields(producer_opts) ->
mk(emqx_schema:bytesize(), #{
default => <<"1MB">>, desc => ?DESC("producer_send_buffer")
})},
{sync_timeout,
mk(emqx_schema:timeout_duration_ms(), #{
default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
})},
{retention_period,
mk(
%% not used in a `receive ... after' block, just timestamp comparison
@ -100,26 +110,13 @@ fields(producer_opts) ->
emqx_schema:bytesize(),
#{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")}
)},
{local_topic, mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})},
{pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})},
{strategy,
mk(
hoconsc:enum([random, roundrobin, key_dispatch]),
#{default => random, desc => ?DESC("producer_strategy")}
)},
{buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})},
{message,
mk(ref(producer_pulsar_message), #{
required => false, desc => ?DESC("producer_message_opts")
})},
{resource_opts,
mk(
ref(producer_resource_opts),
#{
required => false,
desc => ?DESC(emqx_resource_schema, "creation_opts")
}
)}
{buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})}
];
fields(producer_buffer) ->
[
@ -144,12 +141,6 @@ fields(producer_buffer) ->
desc => ?DESC("buffer_memory_overload_protection")
})}
];
fields(producer_pulsar_message) ->
[
{key,
mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC("producer_key_template")})},
{value, mk(string(), #{default => <<"${.}">>, desc => ?DESC("producer_value_template")})}
];
fields(producer_resource_opts) ->
SupportedOpts = [
health_check_interval,
@ -225,8 +216,8 @@ producer_strategy_key_validator(
producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf));
producer_strategy_key_validator(#{
<<"strategy">> := key_dispatch,
<<"message">> := #{<<"key">> := ""}
}) ->
<<"message">> := #{<<"key">> := Key}
}) when Key =:= "" orelse Key =:= <<>> ->
{error, "Message key cannot be empty when `key_dispatch` strategy is used"};
producer_strategy_key_validator(_) ->
ok.
@ -248,8 +239,7 @@ struct_names() ->
[
auth_basic,
auth_token,
producer_buffer,
producer_pulsar_message
producer_buffer
].
override_default(OriginalFn, NewDefault) ->

View File

@ -0,0 +1,54 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
is_action/1,
action_convert_from_connector/2
]).
is_action(_) -> true.
bridge_v1_type_name() -> pulsar_producer.
action_type_name() -> pulsar.
connector_type_name() -> pulsar.
schema_module() -> emqx_bridge_pulsar_pubsub_schema.
action_convert_from_connector(ConnectorConfig, ActionConfig) ->
Dispatch = emqx_utils_conv:bin(maps:get(<<"strategy">>, ConnectorConfig, <<>>)),
case Dispatch of
<<"key_dispatch">> ->
case emqx_utils_maps:deep_find([<<"parameters">>, <<"message">>], ActionConfig) of
{ok, Message} ->
Validator =
#{
<<"strategy">> => key_dispatch,
<<"message">> => emqx_utils_maps:binary_key_map(Message)
},
case emqx_bridge_pulsar:producer_strategy_key_validator(Validator) of
ok ->
ActionConfig;
{error, Reason} ->
throw(#{
reason => Reason,
kind => validation_error
})
end;
{not_found, _, _} ->
%% no message field, use the default message template
ActionConfig
end;
_ ->
ActionConfig
end.

View File

@ -1,7 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_impl_producer).
-module(emqx_bridge_pulsar_connector).
-include("emqx_bridge_pulsar.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
@ -13,8 +13,12 @@
callback_mode/0,
query_mode/1,
on_start/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_stop/2,
on_get_status/2,
on_get_channel_status/3,
on_query/3,
on_query_async/4
]).
@ -23,8 +27,7 @@
-type state() :: #{
pulsar_client_id := pulsar_client_id(),
producers := pulsar_producers:producers(),
sync_timeout := erlang:timeout(),
message_template := message_template()
channels := map()
}.
-type buffer_mode() :: memory | disk | hybrid.
-type compression_mode() :: no_compression | snappy | zlib.
@ -77,16 +80,12 @@ query_mode(_Config) ->
-spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) ->
#{
bridge_name := BridgeName,
servers := Servers0,
ssl := SSL
} = Config,
#{servers := Servers0, ssl := SSL} = Config,
Servers = format_servers(Servers0),
ClientId = make_client_id(InstanceId, BridgeName),
ClientId = make_client_id(InstanceId),
ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId),
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(10)),
ClientOpts = #{
connect_timeout => ConnectTimeout,
ssl_opts => SSLOpts,
@ -119,6 +118,30 @@ on_start(InstanceId, Config) ->
end,
start_producer(Config, InstanceId, ClientId, ClientOpts).
on_add_channel(
_InstanceId,
#{channels := Channels} = State,
ChannelId,
#{parameters := #{message := Message, sync_timeout := SyncTimeout}}
) ->
case maps:is_key(ChannelId, Channels) of
true ->
{error, already_exists};
false ->
Parameters = #{
message => compile_message_template(Message),
sync_timeout => SyncTimeout
},
NewChannels = maps:put(ChannelId, Parameters, Channels),
{ok, State#{channels => NewChannels}}
end.
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
{ok, State#{channels => maps:remove(ChannelId, Channels)}}.
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
-spec on_stop(resource_id(), state()) -> ok.
on_stop(InstanceId, _State) ->
case emqx_resource:get_allocated_resources(InstanceId) of
@ -174,76 +197,77 @@ on_get_status(_InstanceId, _State) ->
%% create the bridge is not quite finished, `State = undefined'.
connecting.
-spec on_query(resource_id(), {send_message, map()}, state()) ->
on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
case maps:is_key(ChannelId, Channels) of
true -> connected;
false -> {error, channel_not_exists}
end.
-spec on_query(resource_id(), tuple(), state()) ->
{ok, term()}
| {error, timeout}
| {error, term()}.
on_query(_InstanceId, {send_message, Message}, State) ->
#{
producers := Producers,
sync_timeout := SyncTimeout,
message_template := MessageTemplate
} = State,
PulsarMessage = render_message(Message, MessageTemplate),
try
pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
catch
error:timeout ->
{error, timeout}
on_query(_InstanceId, {ChannelId, Message}, State) ->
#{producers := Producers, channels := Channels} = State,
case maps:find(ChannelId, Channels) of
error ->
{error, channel_not_exists};
{ok, #{message := MessageTmpl, sync_timeout := SyncTimeout}} ->
PulsarMessage = render_message(Message, MessageTmpl),
try
pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
catch
error:timeout ->
{error, timeout}
end
end.
-spec on_query_async(
resource_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state()
resource_id(), tuple(), {ReplyFun :: function(), Args :: list()}, state()
) ->
{ok, pid()}.
on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) ->
?tp_span(
pulsar_producer_on_query_async,
#{instance_id => _InstanceId, message => Message},
do_on_query_async(Message, AsyncReplyFn, State)
).
on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) ->
#{producers := Producers, channels := Channels} = State,
case maps:find(ChannelId, Channels) of
error ->
{error, channel_not_exists};
{ok, #{message := MessageTmpl}} ->
?tp_span(
pulsar_producer_on_query_async,
#{instance_id => _InstanceId, message => Message},
on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn)
)
end.
do_on_query_async(Message, AsyncReplyFn, State) ->
#{
producers := Producers,
message_template := MessageTemplate
} = State,
PulsarMessage = render_message(Message, MessageTemplate),
on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) ->
PulsarMessage = render_message(Message, MessageTmpl),
pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
%%-------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------
-spec to_bin(atom() | string() | binary()) -> binary().
to_bin(A) when is_atom(A) ->
atom_to_binary(A);
to_bin(L) when is_list(L) ->
list_to_binary(L);
to_bin(B) when is_binary(B) ->
B.
-spec format_servers(binary()) -> [string()].
format_servers(Servers0) ->
Servers1 = emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS),
lists:map(
fun(#{scheme := Scheme, hostname := Host, port := Port}) ->
Scheme ++ "://" ++ Host ++ ":" ++ integer_to_list(Port)
end,
Servers1
emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS)
).
-spec make_client_id(resource_id(), atom() | binary()) -> pulsar_client_id().
make_client_id(InstanceId, BridgeName) ->
-spec make_client_id(resource_id()) -> pulsar_client_id().
make_client_id(InstanceId) ->
case is_dry_run(InstanceId) of
true ->
pulsar_producer_probe;
false ->
{pulsar, Name} = emqx_connector_resource:parse_connector_id(InstanceId),
ClientIdBin = iolist_to_binary([
<<"pulsar_producer:">>,
to_bin(BridgeName),
<<"pulsar:">>,
emqx_utils_conv:bin(Name),
<<":">>,
to_bin(node())
emqx_utils_conv:bin(node())
]),
binary_to_atom(ClientIdBin)
end.
@ -252,10 +276,8 @@ make_client_id(InstanceId, BridgeName) ->
is_dry_run(InstanceId) ->
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
case TestIdStart of
nomatch ->
false;
_ ->
string:equal(TestIdStart, InstanceId)
nomatch -> false;
_ -> string:equal(TestIdStart, InstanceId)
end.
conn_opts(#{authentication := none}) ->
@ -275,11 +297,11 @@ conn_opts(#{authentication := #{jwt := JWT}}) ->
-spec replayq_dir(pulsar_client_id()) -> string().
replayq_dir(ClientId) ->
filename:join([emqx:data_dir(), "pulsar", to_bin(ClientId)]).
filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]).
-spec producer_name(pulsar_client_id()) -> atom().
producer_name(ClientId) ->
ClientIdBin = to_bin(ClientId),
ClientIdBin = emqx_utils_conv:bin(ClientId),
binary_to_atom(
iolist_to_binary([
<<"producer-">>,
@ -303,12 +325,10 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
},
compression := Compression,
max_batch_bytes := MaxBatchBytes,
message := MessageTemplateOpts,
pulsar_topic := PulsarTopic0,
retention_period := RetentionPeriod,
send_buffer := SendBuffer,
strategy := Strategy,
sync_timeout := SyncTimeout
strategy := Strategy
} = Config,
{OffloadMode, ReplayQDir} =
case BufferMode of
@ -330,7 +350,6 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
},
ProducerName = producer_name(ClientId),
?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}),
MessageTemplate = compile_message_template(MessageTemplateOpts),
ProducerOpts0 =
#{
batch_size => BatchSize,
@ -353,8 +372,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
State = #{
pulsar_client_id => ClientId,
producers => Producers,
sync_timeout => SyncTimeout,
message_template => MessageTemplate
channels => #{}
},
?tp(pulsar_producer_bridge_started, #{}),
{ok, State}

View File

@ -0,0 +1,71 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_connector_schema).
-export([namespace/0, roots/0, fields/1, desc/1]).
-export([connector_examples/1, connector_example_values/0]).
-include("emqx_bridge_pulsar.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-define(TYPE, pulsar).
namespace() -> ?TYPE.
roots() -> [].
fields("config_connector") ->
lists:keydelete(enable, 1, emqx_bridge_schema:common_bridge_fields()) ++
emqx_bridge_pulsar:fields(config) ++
emqx_bridge_pulsar:fields(producer_opts) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("post") ->
emqx_connector_schema:type_and_name_fields(?TYPE) ++ fields("config_connector");
fields("put") ->
fields("config_connector");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("config_connector").
desc("config_connector") ->
?DESC(emqx_bridge_pulsar, "config_connector");
desc(connector_resource_opts) ->
?DESC(emqx_bridge_pulsar, connector_resource_opts);
desc(_) ->
undefined.
connector_examples(Method) ->
[
#{
<<"pulsar">> =>
#{
summary => <<"Pulsar Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?TYPE, connector_example_values()
)
}
}
].
connector_example_values() ->
#{
name => <<"pulsar_connector">>,
type => ?TYPE,
enable => true,
servers => <<"pulsar://127.0.0.1:6650">>,
authentication => none,
connect_timeout => <<"5s">>,
batch_size => 10,
compression => no_compression,
send_buffer => <<"1MB">>,
retention_period => <<"100s">>,
max_batch_bytes => <<"32MB">>,
pulsar_topic => <<"test_topic">>,
strategy => random,
buffer => #{mode => memory},
ssl => #{enable => false}
}.

View File

@ -0,0 +1,123 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_pubsub_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-export([roots/0, fields/1, desc/1, namespace/0]).
-export([bridge_v2_examples/1]).
-define(ACTION_TYPE, pulsar).
-define(CONNECTOR_SCHEMA, emqx_bridge_rabbitmq_connector_schema).
namespace() -> "pulsar".
roots() -> [].
fields(action) ->
{pulsar,
?HOCON(
?MAP(name, ?R_REF(publisher_action)),
#{
desc => <<"Pulsar Action Config">>,
required => false
}
)};
fields(publisher_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
?HOCON(
?R_REF(action_parameters),
#{
required => true,
desc => ?DESC(action_parameters)
}
),
#{resource_opts_ref => ?R_REF(action_resource_opts)}
);
fields(action_parameters) ->
[
{sync_timeout,
?HOCON(emqx_schema:timeout_duration_ms(), #{
default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
})},
{message,
?HOCON(?R_REF(producer_pulsar_message), #{
required => false, desc => ?DESC("producer_message_opts")
})}
];
fields(producer_pulsar_message) ->
[
{key,
?HOCON(string(), #{
default => <<"${.clientid}">>,
desc => ?DESC("producer_key_template")
})},
{value,
?HOCON(string(), #{
default => <<"${.}">>,
desc => ?DESC("producer_value_template")
})}
];
fields(action_resource_opts) ->
UnsupportedOpts = [
batch_size,
batch_time,
worker_pool_size,
request_ttl,
inflight_window,
max_buffer_bytes,
query_mode
],
lists:filter(
fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
emqx_bridge_v2_schema:action_resource_opts_fields()
);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(publisher_action));
fields(What) ->
error({?MODULE, missing_field_handler, What}).
desc("config") ->
?DESC("desc_config");
desc(action_resource_opts) ->
?DESC(emqx_resource_schema, "creation_opts");
desc(action_parameters) ->
?DESC(action_parameters);
desc(producer_pulsar_message) ->
?DESC("producer_message_opts");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for WebHook using `", string:to_upper(Method), "` method."];
desc(publisher_action) ->
?DESC(publisher_action);
desc(_) ->
undefined.
bridge_v2_examples(Method) ->
[
#{
<<"pulsar">> => #{
summary => <<"Pulsar Producer Action">>,
value => emqx_bridge_v2_schema:action_values(
Method,
_ActionType = ?ACTION_TYPE,
_ConnectorType = pulsar,
#{
parameters => #{
sync_timeout => <<"5s">>,
message => #{
key => <<"${.clientid}">>,
value => <<"${.}">>
}
}
}
)
}
}
].

View File

@ -1,7 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_impl_producer_SUITE).
-module(emqx_bridge_pulsar_connector_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
@ -550,7 +550,6 @@ kill_resource_managers() ->
t_start_and_produce_ok(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
QoS = 0,
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
@ -600,6 +599,13 @@ t_start_and_produce_ok(Config) ->
_Sleep = 100,
_Attempts0 = 20,
begin
BridgeId = emqx_bridge_resource:bridge_id(
<<"pulsar">>, ?config(pulsar_name, Config)
),
ConnectorId = emqx_bridge_resource:resource_id(
<<"pulsar">>, ?config(pulsar_name, Config)
),
Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>,
?assertMatch(
#{
counters := #{
@ -612,7 +618,7 @@ t_start_and_produce_ok(Config) ->
success := 2
}
},
emqx_resource_manager:get_metrics(ResourceId)
emqx_resource:get_metrics(Id)
),
?assertEqual(
1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')
@ -631,17 +637,22 @@ t_start_and_produce_ok(Config) ->
%% Under normal operations, the bridge will be called async via
%% `simple_async_query'.
t_sync_query(Config) ->
ResourceId = resource_id(Config),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
ResourceId = resource_id(Config),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
Message = {send_message, #{payload => Payload}},
BridgeId = emqx_bridge_resource:bridge_id(<<"pulsar">>, ?config(pulsar_name, Config)),
ConnectorId = emqx_bridge_resource:resource_id(
<<"pulsar">>, ?config(pulsar_name, Config)
),
Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>,
Message = {Id, #{payload => Payload}},
?assertMatch(
{ok, #{sequence_id := _}}, emqx_resource:simple_sync_query(ResourceId, Message)
),
@ -688,13 +699,13 @@ t_create_via_http(Config) ->
t_start_stop(Config) ->
PulsarName = ?config(pulsar_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch(
{ok, _},
create_bridge(Config)
),
ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
@ -745,11 +756,11 @@ t_on_get_status(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
@ -777,7 +788,6 @@ t_start_when_down(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
@ -787,6 +797,7 @@ t_start_when_down(Config) ->
),
ok
end),
ResourceId = resource_id(Config),
%% Should recover given enough time.
?retry(
_Sleep = 1_000,
@ -902,7 +913,6 @@ t_failure_to_start_producer(Config) ->
%% die for whatever reason.
t_producer_process_crash(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
QoS = 0,
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
@ -934,6 +944,7 @@ t_producer_process_crash(Config) ->
ok
after 1_000 -> ct:fail("pid didn't die")
end,
ResourceId = resource_id(Config),
?retry(
_Sleep0 = 50,
_Attempts0 = 50,
@ -995,8 +1006,8 @@ t_resource_manager_crash_after_producers_started(Config) ->
Producers =/= undefined,
10_000
),
?assertMatch(ok, delete_bridge(Config)),
?assertEqual([], get_pulsar_producers()),
?assertMatch({error, bridge_not_found}, delete_bridge(Config)),
ok
end,
[]
@ -1028,8 +1039,8 @@ t_resource_manager_crash_before_producers_started(Config) ->
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},
10_000
),
?assertMatch(ok, delete_bridge(Config)),
?assertEqual([], get_pulsar_producers()),
?assertMatch({error, bridge_not_found}, delete_bridge(Config)),
ok
end,
[]
@ -1046,7 +1057,7 @@ t_strategy_key_validation(Config) ->
<<"reason">> := <<"Message key cannot be empty", _/binary>>
}
}}},
probe_bridge_api(
create_bridge_api(
Config,
#{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}}
)
@ -1060,7 +1071,7 @@ t_strategy_key_validation(Config) ->
<<"reason">> := <<"Message key cannot be empty", _/binary>>
}
}}},
create_bridge_api(
probe_bridge_api(
Config,
#{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}}
)
@ -1075,7 +1086,6 @@ do_t_cluster(Config) ->
?check_trace(
begin
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
Nodes = [N1, N2 | _] = cluster(Config),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
QoS = 0,
@ -1095,6 +1105,7 @@ do_t_cluster(Config) ->
),
25_000
),
ResourceId = erpc:call(N1, ?MODULE, resource_id, [Config]),
lists:foreach(
fun(N) ->
?retry(
@ -1147,12 +1158,12 @@ t_resilience(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
{ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
ResourceId = resource_id(Config),
?retry(
_Sleep0 = 1_000,
_Attempts0 = 20,

View File

@ -170,7 +170,7 @@ fields(Field) when
->
emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(subscriber_source));
fields(What) ->
error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}).
error({?MODULE, missing_field_handler, What}).
%% v2: api schema
%% The parameter equals to
%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1

View File

@ -74,6 +74,8 @@ resource_type(greptimedb) ->
emqx_bridge_greptimedb_connector;
resource_type(tdengine) ->
emqx_bridge_tdengine_connector;
resource_type(pulsar) ->
emqx_bridge_pulsar_connector;
resource_type(rabbitmq) ->
emqx_bridge_rabbitmq_connector;
resource_type(s3) ->
@ -94,6 +96,8 @@ connector_impl_module(elasticsearch) ->
emqx_bridge_es_connector;
connector_impl_module(opents) ->
emqx_bridge_opents_connector;
connector_impl_module(pulsar) ->
emqx_bridge_pulsar_connector;
connector_impl_module(tdengine) ->
emqx_bridge_tdengine_connector;
connector_impl_module(rabbitmq) ->
@ -317,6 +321,14 @@ connector_structs() ->
required => false
}
)},
{pulsar,
mk(
hoconsc:map(name, ref(emqx_bridge_pulsar_connector_schema, "config_connector")),
#{
desc => <<"Pulsar Connector Config">>,
required => false
}
)},
{rabbitmq,
mk(
hoconsc:map(name, ref(emqx_bridge_rabbitmq_connector_schema, "config_connector")),
@ -361,6 +373,7 @@ schema_modules() ->
emqx_bridge_iotdb_connector,
emqx_bridge_es_connector,
emqx_bridge_rabbitmq_connector_schema,
emqx_bridge_pulsar_connector_schema,
emqx_bridge_opents_connector,
emqx_bridge_greptimedb,
emqx_bridge_tdengine_connector,
@ -410,6 +423,7 @@ api_schemas(Method) ->
api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method),
api_ref(emqx_bridge_opents_connector, <<"opents">>, Method),
api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method),
api_ref(emqx_bridge_pulsar_connector_schema, <<"pulsar">>, Method),
api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"),
api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method),
api_ref(emqx_bridge_s3, <<"s3">>, Method ++ "_connector")

View File

@ -174,6 +174,8 @@ connector_type_to_bridge_types(opents) ->
[opents];
connector_type_to_bridge_types(greptimedb) ->
[greptimedb];
connector_type_to_bridge_types(pulsar) ->
[pulsar_producer, pulsar];
connector_type_to_bridge_types(tdengine) ->
[tdengine];
connector_type_to_bridge_types(rabbitmq) ->
@ -269,6 +271,7 @@ split_bridge_to_connector_and_action(
#{<<"connector">> := ConnectorName0} -> ConnectorName0;
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
end,
OrgActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeType),
{ActionMap, ActionType, ActionOrSource} =
case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of

View File

@ -1,180 +1,173 @@
emqx_bridge_pulsar {
auth_basic {
desc = "Parameters for basic authentication."
label = "Basic auth params"
}
auth_basic_password {
desc = "Basic authentication password."
label = "Password"
}
config_connector.desc:
"""Pulsar connector config"""
config_connector.label:
"""Pulsar Connector"""
auth_basic_username {
desc = "Basic authentication username."
label = "Username"
}
connector_resource_opts.desc:
"""Pulsar connector resource options"""
connector_resource_opts.label:
"""Resource Options"""
auth_token {
desc = "Parameters for token authentication."
label = "Token auth params"
}
auth_basic.desc:
"""Parameters for basic authentication."""
auth_basic.label:
"""Basic auth params"""
auth_token_jwt {
desc = "JWT authentication token."
label = "JWT"
}
auth_basic_password.desc:
"""Basic authentication password."""
auth_basic_password.label:
"""Password"""
authentication {
desc = "Authentication configs."
label = "Authentication"
}
auth_basic_username.desc:
"""Basic authentication username."""
auth_basic_username.label:
"""Username"""
buffer_memory_overload_protection {
desc = "Applicable when buffer mode is set to <code>memory</code>\n"
"EMQX will drop old buffered messages under high memory pressure."
" The high memory threshold is defined in config <code>sysmon.os.sysmem_high_watermark</code>."
" NOTE: This config only works on Linux."
label = "Memory Overload Protection"
}
auth_token.desc:
"""Parameters for token authentication."""
auth_token.label:
"""Token auth params"""
buffer_mode {
desc = "Message buffer mode.\n"
"<code>memory</code>: Buffer all messages in memory. The messages will be lost"
" in case of EMQX node restart\n<code>disk</code>: Buffer all messages on disk."
" The messages on disk are able to survive EMQX node restart.\n"
"<code>hybrid</code>: Buffer message in memory first, when up to certain limit"
" (see <code>segment_bytes</code> config for more information), then start offloading"
" messages to disk, Like <code>memory</code> mode, the messages will be lost in"
" case of EMQX node restart."
label = "Buffer Mode"
}
auth_token_jwt.desc:
"""JWT authentication token."""
auth_token_jwt.label:
"""JWT"""
buffer_per_partition_limit {
desc = "Number of bytes allowed to buffer for each Pulsar partition."
" When this limit is exceeded, old messages will be dropped in a trade for credits"
" for new messages to be buffered."
label = "Per-partition Buffer Limit"
}
authentication.desc:
"""Authentication configs."""
authentication.label:
"""Authentication"""
buffer_segment_bytes {
desc = "Applicable when buffer mode is set to <code>disk</code> or <code>hybrid</code>.\n"
"This value is to specify the size of each on-disk buffer file."
label = "Segment File Bytes"
}
buffer_memory_overload_protection.desc:
"""Applicable when buffer mode is set to <code>memory</code>
EMQX will drop old buffered messages under high memory pressure.
The high memory threshold is defined in config <code>sysmon.os.sysmem_high_watermark</code>.
NOTE: This config only works on Linux."""
buffer_memory_overload_protection.label:
"""Memory Overload Protection"""
config_enable {
desc = "Enable (true) or disable (false) this Pulsar bridge."
label = "Enable or Disable"
}
buffer_mode.desc:
"""Message buffer mode.
<code>memory</code>: Buffer all messages in memory. The messages will be lost
in case of EMQX node restart\n<code>disk</code>: Buffer all messages on disk.
The messages on disk are able to survive EMQX node restart.
<code>hybrid</code>: Buffer message in memory first, when up to certain limit
(see <code>segment_bytes</code> config for more information), then start offloading
messages to disk, Like <code>memory</code> mode, the messages will be lost in
case of EMQX node restart."""
buffer_mode.label:
"""Buffer Mode"""
connect_timeout {
desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
label = "Connect Timeout"
}
buffer_per_partition_limit.desc:
"""Number of bytes allowed to buffer for each Pulsar partition.
When this limit is exceeded, old messages will be dropped in a trade for credits
for new messages to be buffered."""
buffer_per_partition_limit.label:
"""Per-partition Buffer Limit"""
desc_name {
desc = "Action name, a human-readable identifier."
label = "Action Name"
}
desc_name.desc:
"""Action name, a human-readable identifier."""
desc_name.label:
"""Action Name"""
desc_type {
desc = "The Bridge Type"
label = "Bridge Type"
}
buffer_segment_bytes.desc:
"""Applicable when buffer mode is set to <code>disk</code> or <code>hybrid</code>.
This value is to specify the size of each on-disk buffer file."""
buffer_segment_bytes.label:
"""Segment File Bytes"""
producer_batch_size {
desc = "Maximum number of individual requests to batch in a Pulsar message."
label = "Batch size"
}
config_enable.desc:
"""Enable (true) or disable (false) this Pulsar bridge."""
config_enable.label:
"""Enable or Disable"""
producer_buffer {
desc = "Configure producer message buffer.\n\n"
"Tell Pulsar producer how to buffer messages when EMQX has more messages to"
" send than Pulsar can keep up, or when Pulsar is down."
label = "Message Buffer"
}
connect_timeout.desc:
"""Maximum wait time for TCP connection establishment (including authentication time if enabled)."""
connect_timeout.label:
"""Connect Timeout"""
producer_compression {
desc = "Compression method."
label = "Compression"
}
desc_name.desc:
"""Bridge name, used as a human-readable description of the bridge."""
desc_name.label:
"""Bridge Name"""
producer_key_template {
desc = "Template to render Pulsar message key."
label = "Message Key"
}
desc_type.desc:
"""The Bridge Type"""
desc_type.label:
"""Bridge Type"""
producer_local_topic {
desc = "MQTT topic or topic filter as data source (bridge input)."
" If rule action is used as data source, this config should be left empty,"
" otherwise messages will be duplicated in Pulsar."
label = "Source MQTT Topic"
}
producer_batch_size.desc:
"""Maximum number of individual requests to batch in a Pulsar message."""
producer_batch_size.label:
"""Batch size"""
producer_max_batch_bytes {
desc = "Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers"
" default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in"
" order to compensate Pulsar message encoding overheads (especially when each individual"
" message is very small). When a single message is over the limit, it is still"
" sent (as a single element batch)."
label = "Max Batch Bytes"
}
producer_buffer.desc:
"""Configure producer message buffer."
Tell Pulsar producer how to buffer messages when EMQX has more messages to"
send than Pulsar can keep up, or when Pulsar is down."""
producer_buffer.label:
"""Message Buffer"""
producer_message_opts {
desc = "Template to render a Pulsar message."
label = "Pulsar Message Template"
}
producer_compression.desc:
"""Compression method."""
producer_compression.label:
"""Compression"""
producer_pulsar_message {
desc = "Template to render a Pulsar message."
label = "Pulsar Message Template"
}
producer_local_topic.desc:
"""MQTT topic or topic filter as data source (bridge input)
If rule action is used as data source, this config should be left empty,
otherwise messages will be duplicated in Pulsar."""
producer_local_topic.label:
"""Source MQTT Topic"""
producer_pulsar_topic {
desc = "Pulsar topic name"
label = "Pulsar topic name"
}
producer_max_batch_bytes.desc:
"""Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers
default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in
order to compensate Pulsar message encoding overheads (especially when each individual
message is very small). When a single message is over the limit, it is still
sent (as a single element batch)."""
producer_max_batch_bytes.label:
"""Max Batch Bytes"""
producer_retention_period {
desc = "The amount of time messages will be buffered while there is no connection to"
" the Pulsar broker. Longer times mean that more memory/disk will be used"
label = "Retention Period"
}
producer_send_buffer {
desc = "Fine tune the socket send buffer. The default value is tuned for high throughput."
label = "Socket Send Buffer Size"
}
producer_pulsar_topic.desc:
"""Pulsar topic name"""
producer_pulsar_topic.label:
"""Pulsar topic name"""
producer_strategy {
desc = "Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.\n"
"\n"
"<code>random</code>: Randomly pick a partition for each message.\n"
"<code>roundrobin</code>: Pick each available producer in turn for each message.\n"
"<code>key_dispatch</code>: Hash Pulsar message key of the first message in a batch"
" to a partition number."
label = "Partition Strategy"
}
producer_retention_period.desc:
"""The amount of time messages will be buffered while there is no connection to
the Pulsar broker. Longer times mean that more memory/disk will be used"""
producer_retention_period.label:
"""Retention Period"""
producer_sync_timeout {
desc = "Maximum wait time for receiving a receipt from Pulsar when publishing synchronously."
label = "Sync publish timeout"
}
producer_send_buffer.desc:
"""Fine tune the socket send buffer. The default value is tuned for high throughput."""
producer_send_buffer.label:
"""Socket Send Buffer Size"""
producer_value_template {
desc = "Template to render Pulsar message value."
label = "Message Value"
}
producer_strategy.desc:
"""Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.
pulsar_producer_struct {
desc = "Configuration for a Pulsar bridge."
label = "Pulsar Bridge Configuration"
}
<code>random</code>: Randomly pick a partition for each message.
<code>roundrobin</code>: Pick each available producer in turn for each message.
<code>key_dispatch</code>: Hash Pulsar message key of the first message in a batch
to a partition number."""
producer_strategy.label:
"""Partition Strategy"""
pulsar_producer_struct.desc:
"""Configuration for a Pulsar bridge."""
pulsar_producer_struct.label:
"""Pulsar Bridge Configuration"""
servers.desc:
"""A comma separated list of Pulsar URLs in the form <code>scheme://host[:port]</code>
for the client to connect to. The supported schemes are <code>pulsar://</code> (default)
and <code>pulsar+ssl://</code>. The default port is 6650."""
servers.label:
"""Servers"""
servers {
desc = "A comma separated list of Pulsar URLs in the form <code>scheme://host[:port]</code>"
" for the client to connect to. The supported schemes are <code>pulsar://</code> (default)"
" and <code>pulsar+ssl://</code>. The default port is 6650."
label = "Servers"
}
}

View File

@ -0,0 +1,38 @@
emqx_bridge_pulsar_pubsub_schema {
action_parameters.desc:
"""Action specific configs."""
action_parameters.label:
"""Action"""
publisher_action.desc:
"""Publish message to pulsar topic"""
publisher_action.label:
"""Publish Action """
producer_sync_timeout.desc:
"""Maximum wait time for receiving a receipt from Pulsar when publishing synchronously."""
producer_sync_timeout.label:
"""Sync publish timeout"""
producer_key_template.desc:
"""Template to render Pulsar message key."""
producer_key_template.label:
"""Message Key"""
producer_value_template.desc:
"""Template to render Pulsar message value."""
producer_value_template.label:
"""Message Value"""
producer_message_opts.desc:
"""Template to render a Pulsar message."""
producer_message_opts.label:
"""Pulsar Message Template"""
producer_pulsar_message.desc:
"""Template to render a Pulsar message."""
producer_pulsar_message.label:
"""Pulsar Message Template"""
}