Merge pull request #12540 from zhongwencool/bridge-pulsar-v2

feat: pulsar bridge v2
This commit is contained in:
zhongwencool 2024-02-29 21:25:11 +08:00 committed by GitHub
commit bc9de20024
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1143 additions and 359 deletions

View File

@ -110,6 +110,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_es_action_info,
emqx_bridge_opents_action_info,
emqx_bridge_rabbitmq_action_info,
emqx_bridge_pulsar_action_info,
emqx_bridge_greptimedb_action_info,
emqx_bridge_tdengine_action_info,
emqx_bridge_s3_action_info

View File

@ -761,7 +761,7 @@ is_bridge_enabled(BridgeType, BridgeName) ->
end.
is_bridge_enabled_v1(BridgeType, BridgeName) ->
%% we read from the transalted config because the defaults are populated here.
%% we read from the translated config because the defaults are populated here.
try emqx:get_config([bridges, BridgeType, binary_to_existing_atom(BridgeName)]) of
ConfMap ->
maps:get(enable, ConfMap, false)

View File

@ -1659,8 +1659,11 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
connector_conf := ConnectorRawConf,
bridge_v2_type := BridgeV2Type,
bridge_v2_name := _BridgeName,
bridge_v2_conf := BridgeV2RawConf
bridge_v2_conf := BridgeV2RawConf0
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
BridgeV2RawConf = emqx_action_info:action_convert_from_connector(
BridgeType, ConnectorRawConf, BridgeV2RawConf0
),
create_dry_run_helper(
ensure_atom_root_key(ConfRootKey), BridgeV2Type, ConnectorRawConf, BridgeV2RawConf
)
@ -1928,7 +1931,8 @@ convert_from_connectors(ConfRootKey, Conf) ->
convert_from_connector(ConfRootKey, Type, Name, Action = #{<<"connector">> := ConnectorName}) ->
case get_connector_info(ConnectorName, Type) of
{ok, Connector} ->
Action1 = emqx_action_info:action_convert_from_connector(Type, Connector, Action),
TypeAtom = to_existing_atom(Type),
Action1 = emqx_action_info:action_convert_from_connector(TypeAtom, Connector, Action),
{ok, Action1};
{error, not_found} ->
{error, #{

View File

@ -123,7 +123,7 @@ resource_type(dynamo) -> emqx_bridge_dynamo_connector;
resource_type(rocketmq) -> emqx_bridge_rocketmq_connector;
resource_type(sqlserver) -> emqx_bridge_sqlserver_connector;
resource_type(opents) -> emqx_bridge_opents_connector;
resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
resource_type(pulsar_producer) -> emqx_bridge_pulsar_connector;
resource_type(oracle) -> emqx_oracle;
resource_type(iotdb) -> emqx_bridge_iotdb_connector;
resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector;

View File

@ -308,7 +308,7 @@ fields(Field) when
Fields = fields("specific_connector_config"),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
fields(What) ->
error({emqx_bridge_mqtt_connector_schema, missing_field_handler, What}).
error({?MODULE, missing_field_handler, What}).
ingress_pool_size(desc) ->
?DESC("ingress_pool_size");

View File

@ -124,7 +124,7 @@ fields(Field) when
->
emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields("mqtt_subscriber_source"));
fields(What) ->
error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}).
error({?MODULE, missing_field_handler, What}).
%% v2: api schema
%% The parameter equls to
%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"},
{vsn, "0.1.8"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -31,7 +31,20 @@ roots() ->
[].
fields(pulsar_producer) ->
fields(config) ++ fields(producer_opts);
fields(config) ++
emqx_bridge_pulsar_pubsub_schema:fields(action_parameters) ++
[
{local_topic,
mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})},
{resource_opts,
mk(
ref(producer_resource_opts),
#{
required => false,
desc => ?DESC(emqx_resource_schema, "creation_opts")
}
)}
];
fields(config) ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@ -71,6 +84,7 @@ fields(config) ->
] ++ emqx_connector_schema_lib:ssl_fields();
fields(producer_opts) ->
[
{pulsar_topic, mk(string(), #{required => true, desc => ?DESC("producer_pulsar_topic")})},
{batch_size,
mk(
pos_integer(),
@ -85,10 +99,6 @@ fields(producer_opts) ->
mk(emqx_schema:bytesize(), #{
default => <<"1MB">>, desc => ?DESC("producer_send_buffer")
})},
{sync_timeout,
mk(emqx_schema:timeout_duration_ms(), #{
default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
})},
{retention_period,
mk(
%% not used in a `receive ... after' block, just timestamp comparison
@ -100,26 +110,12 @@ fields(producer_opts) ->
emqx_schema:bytesize(),
#{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")}
)},
{local_topic, mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})},
{pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})},
{strategy,
mk(
hoconsc:enum([random, roundrobin, key_dispatch]),
#{default => random, desc => ?DESC("producer_strategy")}
)},
{buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})},
{message,
mk(ref(producer_pulsar_message), #{
required => false, desc => ?DESC("producer_message_opts")
})},
{resource_opts,
mk(
ref(producer_resource_opts),
#{
required => false,
desc => ?DESC(emqx_resource_schema, "creation_opts")
}
)}
{buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})}
];
fields(producer_buffer) ->
[
@ -144,12 +140,6 @@ fields(producer_buffer) ->
desc => ?DESC("buffer_memory_overload_protection")
})}
];
fields(producer_pulsar_message) ->
[
{key,
mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC("producer_key_template")})},
{value, mk(string(), #{default => <<"${.}">>, desc => ?DESC("producer_value_template")})}
];
fields(producer_resource_opts) ->
SupportedOpts = [
health_check_interval,
@ -211,7 +201,37 @@ conn_bridge_examples(_Method) ->
#{
<<"pulsar_producer">> => #{
summary => <<"Pulsar Producer Bridge">>,
value => #{todo => true}
value => #{
<<"authentication">> => <<"none">>,
<<"batch_size">> => 1,
<<"buffer">> =>
#{
<<"memory_overload_protection">> => true,
<<"mode">> => <<"memory">>,
<<"per_partition_limit">> => <<"10MB">>,
<<"segment_bytes">> => <<"5MB">>
},
<<"compression">> => <<"no_compression">>,
<<"enable">> => true,
<<"local_topic">> => <<"mqtt/topic/-576460752303423482">>,
<<"max_batch_bytes">> => <<"900KB">>,
<<"message">> =>
#{<<"key">> => <<"${.clientid}">>, <<"value">> => <<"${.}">>},
<<"name">> => <<"pulsar_example_name">>,
<<"pulsar_topic">> => <<"pulsar_example_topic">>,
<<"retention_period">> => <<"infinity">>,
<<"send_buffer">> => <<"1MB">>,
<<"servers">> => <<"pulsar://127.0.0.1:6650">>,
<<"ssl">> =>
#{
<<"enable">> => false,
<<"server_name_indication">> => <<"auto">>,
<<"verify">> => <<"verify_none">>
},
<<"strategy">> => <<"key_dispatch">>,
<<"sync_timeout">> => <<"5s">>,
<<"type">> => <<"pulsar_producer">>
}
}
}
].
@ -225,8 +245,8 @@ producer_strategy_key_validator(
producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf));
producer_strategy_key_validator(#{
<<"strategy">> := key_dispatch,
<<"message">> := #{<<"key">> := ""}
}) ->
<<"message">> := #{<<"key">> := Key}
}) when Key =:= "" orelse Key =:= <<>> ->
{error, "Message key cannot be empty when `key_dispatch` strategy is used"};
producer_strategy_key_validator(_) ->
ok.
@ -248,8 +268,7 @@ struct_names() ->
[
auth_basic,
auth_token,
producer_buffer,
producer_pulsar_message
producer_buffer
].
override_default(OriginalFn, NewDefault) ->

View File

@ -0,0 +1,25 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
is_action/1
]).
is_action(_) -> true.
bridge_v1_type_name() -> pulsar_producer.
action_type_name() -> pulsar.
connector_type_name() -> pulsar.
schema_module() -> emqx_bridge_pulsar_pubsub_schema.

View File

@ -1,7 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_impl_producer).
-module(emqx_bridge_pulsar_connector).
-include("emqx_bridge_pulsar.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
@ -13,22 +13,23 @@
callback_mode/0,
query_mode/1,
on_start/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_stop/2,
on_get_status/2,
on_get_channel_status/3,
on_query/3,
on_query_async/4
]).
-type pulsar_client_id() :: atom().
-type state() :: #{
pulsar_client_id := pulsar_client_id(),
producers := pulsar_producers:producers(),
sync_timeout := erlang:timeout(),
message_template := message_template()
client_id := pulsar_client_id(),
channels := map(),
client_opts := map()
}.
-type buffer_mode() :: memory | disk | hybrid.
-type compression_mode() :: no_compression | snappy | zlib.
-type partition_strategy() :: random | roundrobin | key_dispatch.
-type message_template_raw() :: #{
key := binary(),
value := binary()
@ -39,25 +40,9 @@
}.
-type config() :: #{
authentication := _,
batch_size := pos_integer(),
bridge_name := atom(),
buffer := #{
mode := buffer_mode(),
per_partition_limit := emqx_schema:bytesize(),
segment_bytes := emqx_schema:bytesize(),
memory_overload_protection := boolean()
},
compression := compression_mode(),
connect_timeout := emqx_schema:duration_ms(),
max_batch_bytes := emqx_schema:bytesize(),
message := message_template_raw(),
pulsar_topic := binary(),
retention_period := infinity | emqx_schema:duration_ms(),
send_buffer := emqx_schema:bytesize(),
servers := binary(),
ssl := _,
strategy := partition_strategy(),
sync_timeout := emqx_schema:duration_ms()
ssl := _
}.
%% Allocatable resources
@ -77,16 +62,12 @@ query_mode(_Config) ->
-spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) ->
#{
bridge_name := BridgeName,
servers := Servers0,
ssl := SSL
} = Config,
#{servers := Servers0, ssl := SSL} = Config,
Servers = format_servers(Servers0),
ClientId = make_client_id(InstanceId, BridgeName),
ClientId = make_client_id(InstanceId),
ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId),
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(10)),
ClientOpts = #{
connect_timeout => ConnectTimeout,
ssl_opts => SSLOpts,
@ -117,30 +98,57 @@ on_start(InstanceId, Config) ->
end,
throw(Message)
end,
start_producer(Config, InstanceId, ClientId, ClientOpts).
{ok, #{channels => #{}, client_id => ClientId, client_opts => ClientOpts}}.
on_add_channel(
InstanceId,
#{channels := Channels, client_id := ClientId, client_opts := ClientOpts} = State,
ChannelId,
#{parameters := #{message := Message, sync_timeout := SyncTimeout} = Params}
) ->
case maps:is_key(ChannelId, Channels) of
true ->
{error, channel_already_exists};
false ->
{ok, Producers} = start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params),
Parameters = #{
message => compile_message_template(Message),
sync_timeout => SyncTimeout,
producers => Producers
},
NewChannels = maps:put(ChannelId, Parameters, Channels),
{ok, State#{channels => NewChannels}}
end.
on_remove_channel(InstanceId, State, ChannelId) ->
#{channels := Channels, client_id := ClientId} = State,
case maps:find(ChannelId, Channels) of
{ok, #{producers := Producers}} ->
stop_producers(ClientId, Producers),
emqx_resource:deallocate_resource(InstanceId, {?pulsar_producers, ChannelId}),
{ok, State#{channels => maps:remove(ChannelId, Channels)}};
error ->
{ok, State}
end.
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
-spec on_stop(resource_id(), state()) -> ok.
on_stop(InstanceId, _State) ->
case emqx_resource:get_allocated_resources(InstanceId) of
#{?pulsar_client_id := ClientId, ?pulsar_producers := Producers} ->
stop_producers(ClientId, Producers),
Resources0 = emqx_resource:get_allocated_resources(InstanceId),
case maps:take(?pulsar_client_id, Resources0) of
{ClientId, Resources} ->
maps:foreach(
fun({?pulsar_producers, _BridgeV2Id}, Producers) ->
stop_producers(ClientId, Producers)
end,
Resources
),
stop_client(ClientId),
?tp(pulsar_bridge_stopped, #{
instance_id => InstanceId,
pulsar_client_id => ClientId,
pulsar_producers => Producers
}),
ok;
#{?pulsar_client_id := ClientId} ->
stop_client(ClientId),
?tp(pulsar_bridge_stopped, #{
instance_id => InstanceId,
pulsar_client_id => ClientId,
pulsar_producers => undefined
}),
ok;
_ ->
?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}),
ok;
error ->
ok
end.
@ -149,101 +157,99 @@ on_stop(InstanceId, _State) ->
%% `emqx_resource_manager' will kill the Pulsar producers and messages might be lost.
-spec on_get_status(resource_id(), state()) -> connected | connecting.
on_get_status(_InstanceId, State = #{}) ->
#{
pulsar_client_id := ClientId,
producers := Producers
} = State,
#{client_id := ClientId} = State,
case pulsar_client_sup:find_client(ClientId) of
{ok, Pid} ->
try pulsar_client:get_status(Pid) of
true ->
get_producer_status(Producers);
false ->
connecting
true -> ?status_connected;
false -> ?status_connecting
catch
error:timeout ->
connecting;
?status_connecting;
exit:{noproc, _} ->
connecting
?status_connecting
end;
{error, _} ->
connecting
?status_connecting
end;
on_get_status(_InstanceId, _State) ->
%% If a health check happens just after a concurrent request to
%% create the bridge is not quite finished, `State = undefined'.
connecting.
?status_connecting.
-spec on_query(resource_id(), {send_message, map()}, state()) ->
on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
case maps:find(ChannelId, Channels) of
{ok, #{producers := Producers}} ->
get_producer_status(Producers);
error ->
{error, channel_not_found}
end.
-spec on_query(resource_id(), tuple(), state()) ->
{ok, term()}
| {error, timeout}
| {error, term()}.
on_query(_InstanceId, {send_message, Message}, State) ->
#{
producers := Producers,
sync_timeout := SyncTimeout,
message_template := MessageTemplate
} = State,
PulsarMessage = render_message(Message, MessageTemplate),
on_query(_InstanceId, {ChannelId, Message}, State) ->
#{channels := Channels} = State,
case maps:find(ChannelId, Channels) of
error ->
{error, channel_not_found};
{ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} ->
PulsarMessage = render_message(Message, MessageTmpl),
try
pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
catch
error:timeout ->
{error, timeout}
end
end.
-spec on_query_async(
resource_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state()
resource_id(), tuple(), {ReplyFun :: function(), Args :: list()}, state()
) ->
{ok, pid()}.
on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) ->
on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) ->
#{channels := Channels} = State,
case maps:find(ChannelId, Channels) of
error ->
{error, channel_not_found};
{ok, #{message := MessageTmpl, producers := Producers}} ->
?tp_span(
pulsar_producer_on_query_async,
#{instance_id => _InstanceId, message => Message},
do_on_query_async(Message, AsyncReplyFn, State)
).
on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn)
)
end.
do_on_query_async(Message, AsyncReplyFn, State) ->
#{
producers := Producers,
message_template := MessageTemplate
} = State,
PulsarMessage = render_message(Message, MessageTemplate),
on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) ->
PulsarMessage = render_message(Message, MessageTmpl),
pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
%%-------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------
-spec to_bin(atom() | string() | binary()) -> binary().
to_bin(A) when is_atom(A) ->
atom_to_binary(A);
to_bin(L) when is_list(L) ->
list_to_binary(L);
to_bin(B) when is_binary(B) ->
B.
-spec format_servers(binary()) -> [string()].
format_servers(Servers0) ->
Servers1 = emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS),
lists:map(
fun(#{scheme := Scheme, hostname := Host, port := Port}) ->
Scheme ++ "://" ++ Host ++ ":" ++ integer_to_list(Port)
end,
Servers1
emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS)
).
-spec make_client_id(resource_id(), atom() | binary()) -> pulsar_client_id().
make_client_id(InstanceId, BridgeName) ->
-spec make_client_id(resource_id()) -> pulsar_client_id().
make_client_id(InstanceId) ->
case is_dry_run(InstanceId) of
true ->
pulsar_producer_probe;
false ->
{pulsar, Name} = emqx_connector_resource:parse_connector_id(InstanceId),
ClientIdBin = iolist_to_binary([
<<"pulsar_producer:">>,
to_bin(BridgeName),
<<"pulsar:">>,
emqx_utils_conv:bin(Name),
<<":">>,
to_bin(node())
emqx_utils_conv:bin(node())
]),
binary_to_atom(ClientIdBin)
end.
@ -252,10 +258,8 @@ make_client_id(InstanceId, BridgeName) ->
is_dry_run(InstanceId) ->
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
case TestIdStart of
nomatch ->
false;
_ ->
string:equal(TestIdStart, InstanceId)
nomatch -> false;
_ -> string:equal(TestIdStart, InstanceId)
end.
conn_opts(#{authentication := none}) ->
@ -275,20 +279,24 @@ conn_opts(#{authentication := #{jwt := JWT}}) ->
-spec replayq_dir(pulsar_client_id()) -> string().
replayq_dir(ClientId) ->
filename:join([emqx:data_dir(), "pulsar", to_bin(ClientId)]).
filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]).
-spec producer_name(pulsar_client_id()) -> atom().
producer_name(ClientId) ->
ClientIdBin = to_bin(ClientId),
producer_name(InstanceId, ChannelId) ->
case is_dry_run(InstanceId) of
%% do not create more atom
true ->
pulsar_producer_probe_worker;
false ->
ChannelIdBin = emqx_utils_conv:bin(ChannelId),
binary_to_atom(
iolist_to_binary([
<<"producer-">>,
ClientIdBin
ChannelIdBin
])
).
)
end.
-spec start_producer(config(), resource_id(), pulsar_client_id(), map()) -> {ok, state()}.
start_producer(Config, InstanceId, ClientId, ClientOpts) ->
start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params) ->
#{
conn_opts := ConnOpts,
ssl_opts := SSLOpts
@ -303,18 +311,16 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
},
compression := Compression,
max_batch_bytes := MaxBatchBytes,
message := MessageTemplateOpts,
pulsar_topic := PulsarTopic0,
pulsar_topic := PulsarTopic,
retention_period := RetentionPeriod,
send_buffer := SendBuffer,
strategy := Strategy,
sync_timeout := SyncTimeout
} = Config,
strategy := Strategy
} = Params,
{OffloadMode, ReplayQDir} =
case BufferMode of
memory -> {false, false};
disk -> {false, replayq_dir(ClientId)};
hybrid -> {true, replayq_dir(ClientId)}
disk -> {false, replayq_dir(ChannelId)};
hybrid -> {true, replayq_dir(ChannelId)}
end,
MemOLP =
case os:type() of
@ -328,9 +334,8 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
replayq_seg_bytes => SegmentBytes,
drop_if_highmem => MemOLP
},
ProducerName = producer_name(ClientId),
ProducerName = producer_name(InstanceId, ChannelId),
?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}),
MessageTemplate = compile_message_template(MessageTemplateOpts),
ProducerOpts0 =
#{
batch_size => BatchSize,
@ -344,20 +349,17 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
tcp_opts => [{sndbuf, SendBuffer}]
},
ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0),
PulsarTopic = binary_to_list(PulsarTopic0),
?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
{ok, Producers} ->
ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_producers, Producers),
ok = emqx_resource:allocate_resource(
InstanceId,
{?pulsar_producers, ChannelId},
Producers
),
?tp(pulsar_producer_producers_allocated, #{}),
State = #{
pulsar_client_id => ClientId,
producers => Producers,
sync_timeout => SyncTimeout,
message_template => MessageTemplate
},
?tp(pulsar_producer_bridge_started, #{}),
{ok, State}
{ok, Producers}
catch
Kind:Error:Stacktrace ->
?tp(
@ -370,7 +372,10 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
stacktrace => Stacktrace
}
),
stop_client(ClientId),
?tp(pulsar_bridge_producer_stopped, #{
pulsar_client_id => ClientId,
producers => undefined
}),
throw(failed_to_start_pulsar_producer)
end.
@ -394,7 +399,10 @@ stop_producers(ClientId, Producers) ->
_ = log_when_error(
fun() ->
ok = pulsar:stop_and_delete_supervised_producers(Producers),
?tp(pulsar_bridge_producer_stopped, #{pulsar_client_id => ClientId}),
?tp(pulsar_bridge_producer_stopped, #{
pulsar_client_id => ClientId,
producers => Producers
}),
ok
end,
#{
@ -449,15 +457,19 @@ get_producer_status(Producers) ->
do_get_producer_status(Producers, 0).
do_get_producer_status(_Producers, TimeSpent) when TimeSpent > ?HEALTH_CHECK_RETRY_TIMEOUT ->
connecting;
?status_connecting;
do_get_producer_status(Producers, TimeSpent) ->
case pulsar_producers:all_connected(Producers) of
try pulsar_producers:all_connected(Producers) of
true ->
connected;
?status_connected;
false ->
Sleep = 200,
timer:sleep(Sleep),
do_get_producer_status(Producers, TimeSpent + Sleep)
%% producer crashed with badarg. will recover later
catch
error:badarg ->
?status_connecting
end.
partition_strategy(key_dispatch) -> first_key_dispatch;
@ -467,17 +479,17 @@ is_sensitive_key(auth_data) -> true;
is_sensitive_key(_) -> false.
get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) ->
Iter = maps:iterator(BrokerErrorMap),
do_get_error_message(Iter);
Iterator = maps:iterator(BrokerErrorMap),
do_get_error_message(Iterator);
get_error_message(_Error) ->
error.
do_get_error_message(Iter) ->
case maps:next(Iter) of
{{_Broker, _Port}, #{message := Message}, _NIter} ->
do_get_error_message(Iterator) ->
case maps:next(Iterator) of
{{_Broker, _Port}, #{message := Message}, _NIterator} ->
{ok, Message};
{_K, _V, NIter} ->
do_get_error_message(NIter);
{_K, _V, NIterator} ->
do_get_error_message(NIterator);
none ->
error
end.

View File

@ -0,0 +1,62 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_connector_schema).
-export([namespace/0, roots/0, fields/1, desc/1]).
-export([connector_examples/1, connector_example_values/0]).
-include("emqx_bridge_pulsar.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-define(TYPE, pulsar).
namespace() -> ?TYPE.
roots() -> [].
fields("config_connector") ->
emqx_bridge_schema:common_bridge_fields() ++
lists:keydelete(enable, 1, emqx_bridge_pulsar:fields(config)) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("post") ->
emqx_connector_schema:type_and_name_fields(?TYPE) ++ fields("config_connector");
fields("put") ->
fields("config_connector");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("config_connector").
desc("config_connector") ->
?DESC(emqx_bridge_pulsar, "config_connector");
desc(connector_resource_opts) ->
?DESC(emqx_bridge_pulsar, connector_resource_opts);
desc(_) ->
undefined.
connector_examples(Method) ->
[
#{
<<"pulsar">> =>
#{
summary => <<"Pulsar Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?TYPE, connector_example_values()
)
}
}
].
connector_example_values() ->
#{
name => <<"pulsar_connector">>,
type => ?TYPE,
enable => true,
servers => <<"pulsar://127.0.0.1:6650">>,
authentication => none,
connect_timeout => <<"5s">>,
ssl => #{enable => false}
}.

View File

@ -0,0 +1,124 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_pubsub_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-export([roots/0, fields/1, desc/1, namespace/0]).
-export([bridge_v2_examples/1]).
-define(ACTION_TYPE, pulsar).
namespace() -> "pulsar".
roots() -> [].
fields(action) ->
{pulsar,
?HOCON(
?MAP(name, ?R_REF(publisher_action)),
#{
desc => <<"Pulsar Action Config">>,
required => false
}
)};
fields(publisher_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
?HOCON(
?R_REF(action_parameters),
#{
required => true,
desc => ?DESC(action_parameters),
validator => fun emqx_bridge_pulsar:producer_strategy_key_validator/1
}
),
#{resource_opts_ref => ?R_REF(action_resource_opts)}
);
fields(action_parameters) ->
[
{message,
?HOCON(?R_REF(producer_pulsar_message), #{
required => false, desc => ?DESC("producer_message_opts")
})},
{sync_timeout,
?HOCON(emqx_schema:timeout_duration_ms(), #{
default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
})}
] ++ emqx_bridge_pulsar:fields(producer_opts);
fields(producer_pulsar_message) ->
[
{key,
?HOCON(string(), #{
default => <<"${.clientid}">>,
desc => ?DESC("producer_key_template")
})},
{value,
?HOCON(string(), #{
default => <<"${.}">>,
desc => ?DESC("producer_value_template")
})}
];
fields(action_resource_opts) ->
UnsupportedOpts = [
batch_size,
batch_time,
worker_pool_size,
request_ttl,
inflight_window,
max_buffer_bytes,
query_mode
],
lists:filter(
fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
emqx_bridge_v2_schema:action_resource_opts_fields()
);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(publisher_action));
fields(What) ->
error({?MODULE, missing_field_handler, What}).
desc("config") ->
?DESC("desc_config");
desc(action_resource_opts) ->
?DESC(emqx_resource_schema, "creation_opts");
desc(action_parameters) ->
?DESC(action_parameters);
desc(producer_pulsar_message) ->
?DESC("producer_message_opts");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Pulsar Producer using `", string:to_upper(Method), "` method."];
desc(publisher_action) ->
?DESC(publisher_action);
desc(_) ->
undefined.
bridge_v2_examples(Method) ->
[
#{
<<"pulsar">> => #{
summary => <<"Pulsar Producer Action">>,
value => emqx_bridge_v2_schema:action_values(
Method,
_ActionType = ?ACTION_TYPE,
_ConnectorType = pulsar,
#{
parameters => #{
sync_timeout => <<"5s">>,
message => #{
key => <<"${.clientid}">>,
value => <<"${.}">>
},
pulsar_topic => <<"test_topic">>
}
}
)
}
}
].

View File

@ -1,7 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_impl_producer_SUITE).
-module(emqx_bridge_pulsar_connector_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
@ -550,7 +550,6 @@ kill_resource_managers() ->
t_start_and_produce_ok(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
QoS = 0,
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
@ -600,6 +599,7 @@ t_start_and_produce_ok(Config) ->
_Sleep = 100,
_Attempts0 = 20,
begin
Id = get_channel_id(Config),
?assertMatch(
#{
counters := #{
@ -612,7 +612,7 @@ t_start_and_produce_ok(Config) ->
success := 2
}
},
emqx_resource_manager:get_metrics(ResourceId)
emqx_resource:get_metrics(Id)
),
?assertEqual(
1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')
@ -628,20 +628,34 @@ t_start_and_produce_ok(Config) ->
),
ok.
get_channel_id(Config) ->
BridgeId = emqx_bridge_resource:bridge_id(
<<"pulsar">>, ?config(pulsar_name, Config)
),
ConnectorId = emqx_bridge_resource:resource_id(
<<"pulsar">>, ?config(pulsar_name, Config)
),
<<"action:", BridgeId/binary, ":", ConnectorId/binary>>.
%% Under normal operations, the bridge will be called async via
%% `simple_async_query'.
t_sync_query(Config) ->
ResourceId = resource_id(Config),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
ResourceId = resource_id(Config),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
Message = {send_message, #{payload => Payload}},
BridgeId = emqx_bridge_resource:bridge_id(<<"pulsar">>, ?config(pulsar_name, Config)),
ConnectorId = emqx_bridge_resource:resource_id(
<<"pulsar">>, ?config(pulsar_name, Config)
),
Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>,
Message = {Id, #{payload => Payload}},
?assertMatch(
{ok, #{sequence_id := _}}, emqx_resource:simple_sync_query(ResourceId, Message)
),
@ -688,13 +702,13 @@ t_create_via_http(Config) ->
t_start_stop(Config) ->
PulsarName = ?config(pulsar_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch(
{ok, _},
create_bridge(Config)
),
ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
@ -745,11 +759,11 @@ t_on_get_status(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
@ -777,7 +791,6 @@ t_start_when_down(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
@ -787,6 +800,7 @@ t_start_when_down(Config) ->
),
ok
end),
ResourceId = resource_id(Config),
%% Should recover given enough time.
?retry(
_Sleep = 1_000,
@ -889,7 +903,7 @@ t_failure_to_start_producer(Config) ->
{{ok, _}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_bridge_client_stopped},
#{?snk_kind := pulsar_bridge_producer_stopped},
20_000
),
ok
@ -902,7 +916,6 @@ t_failure_to_start_producer(Config) ->
%% die for whatever reason.
t_producer_process_crash(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
QoS = 0,
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
@ -918,6 +931,8 @@ t_producer_process_crash(Config) ->
#{?snk_kind := pulsar_producer_bridge_started},
10_000
),
ResourceId = resource_id(Config),
ChannelId = get_channel_id(Config),
[ProducerPid | _] = [
Pid
|| {_Name, PS, _Type, _Mods} <- supervisor:which_children(pulsar_producers_sup),
@ -937,13 +952,20 @@ t_producer_process_crash(Config) ->
?retry(
_Sleep0 = 50,
_Attempts0 = 50,
?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId))
?assertEqual(
#{error => <<"Not connected for unknown reason">>, status => connecting},
emqx_resource_manager:channel_health_check(ResourceId, ChannelId)
)
),
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
%% Should recover given enough time.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
?assertEqual(
#{error => undefined, status => connected},
emqx_resource_manager:channel_health_check(ResourceId, ChannelId)
)
),
{_, {ok, _}} =
?wait_async_action(
@ -991,12 +1013,12 @@ t_resource_manager_crash_after_producers_started(Config) ->
{{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := Producers} when
Producers =/= undefined,
#{?snk_kind := pulsar_bridge_stopped, instance_id := InstanceId} when
InstanceId =/= undefined,
10_000
),
?assertMatch(ok, delete_bridge(Config)),
?assertEqual([], get_pulsar_producers()),
?assertMatch({error, bridge_not_found}, delete_bridge(Config)),
ok
end,
[]
@ -1025,11 +1047,11 @@ t_resource_manager_crash_before_producers_started(Config) ->
{{error, {config_update_crashed, _}}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},
#{?snk_kind := pulsar_bridge_stopped},
10_000
),
?assertMatch(ok, delete_bridge(Config)),
?assertEqual([], get_pulsar_producers()),
?assertMatch({error, bridge_not_found}, delete_bridge(Config)),
ok
end,
[]
@ -1046,7 +1068,7 @@ t_strategy_key_validation(Config) ->
<<"reason">> := <<"Message key cannot be empty", _/binary>>
}
}}},
probe_bridge_api(
create_bridge_api(
Config,
#{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}}
)
@ -1060,7 +1082,7 @@ t_strategy_key_validation(Config) ->
<<"reason">> := <<"Message key cannot be empty", _/binary>>
}
}}},
create_bridge_api(
probe_bridge_api(
Config,
#{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}}
)
@ -1075,7 +1097,6 @@ do_t_cluster(Config) ->
?check_trace(
begin
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
Nodes = [N1, N2 | _] = cluster(Config),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
QoS = 0,
@ -1095,6 +1116,7 @@ do_t_cluster(Config) ->
),
25_000
),
ResourceId = erpc:call(N1, ?MODULE, resource_id, [Config]),
lists:foreach(
fun(N) ->
?retry(
@ -1147,12 +1169,12 @@ t_resilience(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
{ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
ResourceId = resource_id(Config),
?retry(
_Sleep0 = 1_000,
_Attempts0 = 20,
@ -1225,3 +1247,19 @@ t_resilience(Config) ->
[]
),
ok.
get_producers_config(ConnectorId, ChannelId) ->
[
#{
state :=
#{
channels :=
#{ChannelId := #{producers := Producers}}
}
}
] =
lists:filter(
fun(#{id := Id}) -> Id =:= ConnectorId end,
emqx_resource_manager:list_all()
),
Producers.

View File

@ -0,0 +1,451 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_v2_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(TYPE, <<"pulsar">>).
-define(APPS, [emqx_conf, emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]).
-define(RULE_TOPIC, "pulsar/rule").
-define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
[
{group, plain},
{group, tls}
].
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
[
{plain, AllTCs},
{tls, AllTCs}
].
init_per_suite(Config) ->
%% Ensure enterprise bridge module is loaded
_ = emqx_bridge_enterprise:module_info(),
{ok, Cwd} = file:get_cwd(),
PrivDir = ?config(priv_dir, Config),
WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd),
Apps = emqx_cth_suite:start(
lists:flatten([
?APPS,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
]),
#{work_dir => WorkDir}
),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_group(plain = Type, Config) ->
PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"),
PulsarPort = list_to_integer(os:getenv("PULSAR_PLAIN_PORT", "6652")),
ProxyName = "pulsar_plain",
case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
true ->
Config1 = common_init_per_group(),
NewConfig =
[
{proxy_name, ProxyName},
{pulsar_host, PulsarHost},
{pulsar_port, PulsarPort},
{pulsar_type, Type},
{use_tls, false}
| Config1 ++ Config
],
create_connector(?MODULE, NewConfig),
NewConfig;
false ->
maybe_skip_without_ci()
end;
init_per_group(tls = Type, Config) ->
PulsarHost = os:getenv("PULSAR_TLS_HOST", "toxiproxy"),
PulsarPort = list_to_integer(os:getenv("PULSAR_TLS_PORT", "6653")),
ProxyName = "pulsar_tls",
case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
true ->
Config1 = common_init_per_group(),
NewConfig =
[
{proxy_name, ProxyName},
{pulsar_host, PulsarHost},
{pulsar_port, PulsarPort},
{pulsar_type, Type},
{use_tls, true}
| Config1 ++ Config
],
create_connector(?MODULE, NewConfig),
NewConfig;
false ->
maybe_skip_without_ci()
end.
end_per_group(Group, Config) when
Group =:= plain;
Group =:= tls
->
common_end_per_group(Config),
ok.
common_init_per_group() ->
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
UniqueNum = integer_to_binary(erlang:unique_integer()),
MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
[
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{mqtt_topic, MQTTTopic}
].
common_end_per_group(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
ok.
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config).
end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
ok = emqx_config:delete_override_conf_files(),
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges(),
stop_consumer(Config),
%% in CI, apparently this needs more time since the
%% machines struggle with all the containers running...
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
flush_consumed(),
ok
end.
common_init_per_testcase(TestCase, Config0) ->
ct:timetrap(timer:seconds(60)),
emqx_bridge_v2_testlib:delete_all_bridges(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
PulsarTopic =
<<
(atom_to_binary(TestCase))/binary,
UniqueNum/binary
>>,
Config1 = [{pulsar_topic, PulsarTopic} | Config0],
ConsumerConfig = start_consumer(TestCase, Config1),
Config = ConsumerConfig ++ Config1,
ok = snabbkaffe:start_trace(),
Config.
create_connector(Name, Config) ->
Connector = pulsar_connector(Config),
{ok, _} = emqx_connector:create(?TYPE, Name, Connector).
delete_connector(Name) ->
ok = emqx_connector:remove(?TYPE, Name).
create_action(Name, Config) ->
Action = pulsar_action(Config),
{ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action).
delete_action(Name) ->
ok = emqx_bridge_v2:remove(actions, ?TYPE, Name).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_action_probe(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
Action = pulsar_action(Config),
{ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
?assertMatch({{_, 204, _}, _, _}, Res0),
ok.
t_action(Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
create_action(Name, Config),
Actions = emqx_bridge_v2:list(actions),
Any = fun(#{name := BName}) -> BName =:= Name end,
?assert(lists:any(Any, Actions), Actions),
Topic = <<"lkadfdaction">>,
{ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
#{
sql => <<"select * from \"", Topic/binary, "\"">>,
id => atom_to_binary(?FUNCTION_NAME),
actions => [<<"pulsar:", Name/binary>>],
description => <<"bridge_v2 send msg to pulsar action">>
}
),
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
MQTTClientID = <<"pulsar_mqtt_clientid">>,
{ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]),
{ok, _} = emqtt:connect(C1),
ReqPayload = payload(),
ReqPayloadBin = emqx_utils_json:encode(ReqPayload),
{ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]),
[#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000),
?assertEqual(MQTTClientID, ClientID),
?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)),
ok = emqtt:disconnect(C1),
InstanceId = instance_id(actions, Name),
#{counters := Counters} = emqx_resource:get_metrics(InstanceId),
ok = delete_action(Name),
ActionsAfterDelete = emqx_bridge_v2:list(actions),
?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
?assertMatch(
#{
dropped := 0,
success := 1,
matched := 1,
failed := 0,
received := 0
},
Counters
),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
pulsar_connector(Config) ->
PulsarHost = ?config(pulsar_host, Config),
PulsarPort = ?config(pulsar_port, Config),
UseTLS = proplists:get_value(use_tls, Config, false),
Name = atom_to_binary(?MODULE),
Prefix =
case UseTLS of
true -> <<"pulsar+ssl://">>;
false -> <<"pulsar://">>
end,
ServerURL = iolist_to_binary([
Prefix,
PulsarHost,
":",
integer_to_binary(PulsarPort)
]),
Connector = #{
<<"connectors">> => #{
<<"pulsar">> => #{
Name => #{
<<"enable">> => true,
<<"ssl">> => #{
<<"enable">> => UseTLS,
<<"verify">> => <<"verify_none">>,
<<"server_name_indication">> => <<"auto">>
},
<<"authentication">> => <<"none">>,
<<"servers">> => ServerURL
}
}
}
},
parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name).
pulsar_action(Config) ->
Name = atom_to_binary(?MODULE),
Action = #{
<<"actions">> => #{
<<"pulsar">> => #{
Name => #{
<<"connector">> => Name,
<<"enable">> => true,
<<"parameters">> => #{
<<"retention_period">> => <<"infinity">>,
<<"max_batch_bytes">> => <<"1MB">>,
<<"batch_size">> => 100,
<<"strategy">> => <<"random">>,
<<"buffer">> => #{
<<"mode">> => <<"memory">>,
<<"per_partition_limit">> => <<"10MB">>,
<<"segment_bytes">> => <<"5MB">>,
<<"memory_overload_protection">> => true
},
<<"message">> => #{
<<"key">> => <<"${.clientid}">>,
<<"value">> => <<"${.}">>
},
<<"pulsar_topic">> => ?config(pulsar_topic, Config)
},
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"1s">>
}
}
}
}
},
parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name).
parse_and_check(Key, Mod, Conf, Name) ->
ConfStr = hocon_pp:do(Conf, #{}),
ct:pal(ConfStr),
{ok, RawConf} = hocon:binary(ConfStr, #{format => map}),
hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}),
#{Key := #{<<"pulsar">> := #{Name := RetConf}}} = RawConf,
RetConf.
instance_id(Type, Name) ->
ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name),
BridgeId = emqx_bridge_resource:bridge_id(?TYPE, Name),
TypeBin =
case Type of
sources -> <<"source:">>;
actions -> <<"action:">>
end,
<<TypeBin/binary, BridgeId/binary, ":", ConnectorId/binary>>.
start_consumer(TestCase, Config) ->
PulsarHost = ?config(pulsar_host, Config),
PulsarPort = ?config(pulsar_port, Config),
PulsarTopic = ?config(pulsar_topic, Config),
UseTLS = ?config(use_tls, Config),
Scheme =
case UseTLS of
true -> <<"pulsar+ssl://">>;
false -> <<"pulsar://">>
end,
URL =
binary_to_list(
<<Scheme/binary, (list_to_binary(PulsarHost))/binary, ":",
(integer_to_binary(PulsarPort))/binary>>
),
ConsumerClientId = list_to_atom(
atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer())
),
CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
SSLOpts = #{
enable => UseTLS,
keyfile => filename:join([CertsPath, "key.pem"]),
certfile => filename:join([CertsPath, "cert.pem"]),
cacertfile => filename:join([CertsPath, "cacert.pem"])
},
Opts = #{enable_ssl => UseTLS, ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)},
{ok, _ClientPid} = pulsar:ensure_supervised_client(ConsumerClientId, [URL], Opts),
ConsumerOpts = Opts#{
cb_init_args => #{send_to => self()},
cb_module => pulsar_echo_consumer,
sub_type => 'Shared',
subscription => atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer()),
max_consumer_num => 1,
%% Note! This must not coincide with the client
%% id, or else weird bugs will happen, like the
%% consumer never starts...
name => list_to_atom("test_consumer" ++ integer_to_list(erlang:unique_integer())),
consumer_id => 1
},
{ok, Consumer} = pulsar:ensure_supervised_consumers(
ConsumerClientId,
PulsarTopic,
ConsumerOpts
),
%% since connection is async, and there's currently no way to
%% specify the subscription initial position as `Earliest', we
%% need to wait until the consumer is connected to avoid
%% flakiness.
ok = wait_until_consumer_connected(Consumer),
[
{consumer_client_id, ConsumerClientId},
{pulsar_consumer, Consumer}
].
stop_consumer(Config) ->
ConsumerClientId = ?config(consumer_client_id, Config),
Consumer = ?config(pulsar_consumer, Config),
ok = pulsar:stop_and_delete_supervised_consumers(Consumer),
ok = pulsar:stop_and_delete_supervised_client(ConsumerClientId),
ok.
wait_until_consumer_connected(Consumer) ->
?retry(
_Sleep = 300,
_Attempts0 = 20,
true = pulsar_consumers:all_connected(Consumer)
),
ok.
wait_until_producer_connected() ->
wait_until_connected(pulsar_producers_sup, pulsar_producer).
wait_until_connected(SupMod, Mod) ->
Pids = get_pids(SupMod, Mod),
?retry(
_Sleep = 300,
_Attempts0 = 20,
begin
true = length(Pids) > 0,
lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
end
),
ok.
get_pulsar_producers() ->
get_pids(pulsar_producers_sup, pulsar_producer).
get_pids(SupMod, Mod) ->
[
P
|| {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod),
P <- element(2, process_info(SupPid, links)),
case proc_lib:initial_call(P) of
{Mod, init, _} -> true;
_ -> false
end
].
receive_consumed(Timeout) ->
receive
{pulsar_message, #{payloads := Payloads}} ->
lists:map(fun try_decode_json/1, Payloads)
after Timeout ->
ct:pal("mailbox: ~p", [process_info(self(), messages)]),
ct:fail("no message consumed")
end.
flush_consumed() ->
receive
{pulsar_message, _} -> flush_consumed()
after 0 -> ok
end.
try_decode_json(Payload) ->
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
{error, _} ->
Payload;
{ok, JSON} ->
JSON
end.
payload() ->
#{<<"key">> => 42, <<"data">> => <<"pulsar">>, <<"timestamp">> => 10000}.
maybe_skip_without_ci() ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_pulsar);
_ ->
{skip, no_pulsar}
end.

View File

@ -170,7 +170,7 @@ fields(Field) when
->
emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(subscriber_source));
fields(What) ->
error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}).
error({?MODULE, missing_field_handler, What}).
%% v2: api schema
%% The parameter equals to
%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1

View File

@ -74,6 +74,8 @@ resource_type(greptimedb) ->
emqx_bridge_greptimedb_connector;
resource_type(tdengine) ->
emqx_bridge_tdengine_connector;
resource_type(pulsar) ->
emqx_bridge_pulsar_connector;
resource_type(rabbitmq) ->
emqx_bridge_rabbitmq_connector;
resource_type(s3) ->
@ -94,6 +96,8 @@ connector_impl_module(elasticsearch) ->
emqx_bridge_es_connector;
connector_impl_module(opents) ->
emqx_bridge_opents_connector;
connector_impl_module(pulsar) ->
emqx_bridge_pulsar_connector;
connector_impl_module(tdengine) ->
emqx_bridge_tdengine_connector;
connector_impl_module(rabbitmq) ->
@ -317,6 +321,14 @@ connector_structs() ->
required => false
}
)},
{pulsar,
mk(
hoconsc:map(name, ref(emqx_bridge_pulsar_connector_schema, "config_connector")),
#{
desc => <<"Pulsar Connector Config">>,
required => false
}
)},
{rabbitmq,
mk(
hoconsc:map(name, ref(emqx_bridge_rabbitmq_connector_schema, "config_connector")),
@ -361,6 +373,7 @@ schema_modules() ->
emqx_bridge_iotdb_connector,
emqx_bridge_es_connector,
emqx_bridge_rabbitmq_connector_schema,
emqx_bridge_pulsar_connector_schema,
emqx_bridge_opents_connector,
emqx_bridge_greptimedb,
emqx_bridge_tdengine_connector,
@ -410,6 +423,7 @@ api_schemas(Method) ->
api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method),
api_ref(emqx_bridge_opents_connector, <<"opents">>, Method),
api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method),
api_ref(emqx_bridge_pulsar_connector_schema, <<"pulsar">>, Method),
api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"),
api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method),
api_ref(emqx_bridge_s3, <<"s3">>, Method ++ "_connector")

View File

@ -174,6 +174,8 @@ connector_type_to_bridge_types(opents) ->
[opents];
connector_type_to_bridge_types(greptimedb) ->
[greptimedb];
connector_type_to_bridge_types(pulsar) ->
[pulsar_producer, pulsar];
connector_type_to_bridge_types(tdengine) ->
[tdengine];
connector_type_to_bridge_types(rabbitmq) ->
@ -269,6 +271,7 @@ split_bridge_to_connector_and_action(
#{<<"connector">> := ConnectorName0} -> ConnectorName0;
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
end,
OrgActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeType),
{ActionMap, ActionType, ActionOrSource} =
case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of

View File

@ -1,180 +1,173 @@
emqx_bridge_pulsar {
auth_basic {
desc = "Parameters for basic authentication."
label = "Basic auth params"
}
auth_basic_password {
desc = "Basic authentication password."
label = "Password"
}
config_connector.desc:
"""Pulsar connector config"""
config_connector.label:
"""Pulsar Connector"""
auth_basic_username {
desc = "Basic authentication username."
label = "Username"
}
connector_resource_opts.desc:
"""Pulsar connector resource options"""
connector_resource_opts.label:
"""Resource Options"""
auth_token {
desc = "Parameters for token authentication."
label = "Token auth params"
}
auth_basic.desc:
"""Parameters for basic authentication."""
auth_basic.label:
"""Basic auth params"""
auth_token_jwt {
desc = "JWT authentication token."
label = "JWT"
}
auth_basic_password.desc:
"""Basic authentication password."""
auth_basic_password.label:
"""Password"""
authentication {
desc = "Authentication configs."
label = "Authentication"
}
auth_basic_username.desc:
"""Basic authentication username."""
auth_basic_username.label:
"""Username"""
buffer_memory_overload_protection {
desc = "Applicable when buffer mode is set to <code>memory</code>\n"
"EMQX will drop old buffered messages under high memory pressure."
" The high memory threshold is defined in config <code>sysmon.os.sysmem_high_watermark</code>."
" NOTE: This config only works on Linux."
label = "Memory Overload Protection"
}
auth_token.desc:
"""Parameters for token authentication."""
auth_token.label:
"""Token auth params"""
buffer_mode {
desc = "Message buffer mode.\n"
"<code>memory</code>: Buffer all messages in memory. The messages will be lost"
" in case of EMQX node restart\n<code>disk</code>: Buffer all messages on disk."
" The messages on disk are able to survive EMQX node restart.\n"
"<code>hybrid</code>: Buffer message in memory first, when up to certain limit"
" (see <code>segment_bytes</code> config for more information), then start offloading"
" messages to disk, Like <code>memory</code> mode, the messages will be lost in"
" case of EMQX node restart."
label = "Buffer Mode"
}
auth_token_jwt.desc:
"""JWT authentication token."""
auth_token_jwt.label:
"""JWT"""
buffer_per_partition_limit {
desc = "Number of bytes allowed to buffer for each Pulsar partition."
" When this limit is exceeded, old messages will be dropped in a trade for credits"
" for new messages to be buffered."
label = "Per-partition Buffer Limit"
}
authentication.desc:
"""Authentication configs."""
authentication.label:
"""Authentication"""
buffer_segment_bytes {
desc = "Applicable when buffer mode is set to <code>disk</code> or <code>hybrid</code>.\n"
"This value is to specify the size of each on-disk buffer file."
label = "Segment File Bytes"
}
buffer_memory_overload_protection.desc:
"""Applicable when buffer mode is set to <code>memory</code>
EMQX will drop old buffered messages under high memory pressure.
The high memory threshold is defined in config <code>sysmon.os.sysmem_high_watermark</code>.
NOTE: This config only works on Linux."""
buffer_memory_overload_protection.label:
"""Memory Overload Protection"""
config_enable {
desc = "Enable (true) or disable (false) this Pulsar bridge."
label = "Enable or Disable"
}
buffer_mode.desc:
"""Message buffer mode.
<code>memory</code>: Buffer all messages in memory. The messages will be lost
in case of EMQX node restart\n<code>disk</code>: Buffer all messages on disk.
The messages on disk are able to survive EMQX node restart.
<code>hybrid</code>: Buffer message in memory first, when up to certain limit
(see <code>segment_bytes</code> config for more information), then start offloading
messages to disk, Like <code>memory</code> mode, the messages will be lost in
case of EMQX node restart."""
buffer_mode.label:
"""Buffer Mode"""
connect_timeout {
desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
label = "Connect Timeout"
}
buffer_per_partition_limit.desc:
"""Number of bytes allowed to buffer for each Pulsar partition.
When this limit is exceeded, old messages will be dropped in a trade for credits
for new messages to be buffered."""
buffer_per_partition_limit.label:
"""Per-partition Buffer Limit"""
desc_name {
desc = "Action name, a human-readable identifier."
label = "Action Name"
}
desc_name.desc:
"""Action name, a human-readable identifier."""
desc_name.label:
"""Action Name"""
desc_type {
desc = "The Bridge Type"
label = "Bridge Type"
}
buffer_segment_bytes.desc:
"""Applicable when buffer mode is set to <code>disk</code> or <code>hybrid</code>.
This value is to specify the size of each on-disk buffer file."""
buffer_segment_bytes.label:
"""Segment File Bytes"""
producer_batch_size {
desc = "Maximum number of individual requests to batch in a Pulsar message."
label = "Batch size"
}
config_enable.desc:
"""Enable (true) or disable (false) this Pulsar bridge."""
config_enable.label:
"""Enable or Disable"""
producer_buffer {
desc = "Configure producer message buffer.\n\n"
"Tell Pulsar producer how to buffer messages when EMQX has more messages to"
" send than Pulsar can keep up, or when Pulsar is down."
label = "Message Buffer"
}
connect_timeout.desc:
"""Maximum wait time for TCP connection establishment (including authentication time if enabled)."""
connect_timeout.label:
"""Connect Timeout"""
producer_compression {
desc = "Compression method."
label = "Compression"
}
desc_name.desc:
"""Bridge name, used as a human-readable description of the bridge."""
desc_name.label:
"""Bridge Name"""
producer_key_template {
desc = "Template to render Pulsar message key."
label = "Message Key"
}
desc_type.desc:
"""The Bridge Type"""
desc_type.label:
"""Bridge Type"""
producer_local_topic {
desc = "MQTT topic or topic filter as data source (bridge input)."
" If rule action is used as data source, this config should be left empty,"
" otherwise messages will be duplicated in Pulsar."
label = "Source MQTT Topic"
}
producer_batch_size.desc:
"""Maximum number of individual requests to batch in a Pulsar message."""
producer_batch_size.label:
"""Batch size"""
producer_max_batch_bytes {
desc = "Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers"
" default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in"
" order to compensate Pulsar message encoding overheads (especially when each individual"
" message is very small). When a single message is over the limit, it is still"
" sent (as a single element batch)."
label = "Max Batch Bytes"
}
producer_buffer.desc:
"""Configure producer message buffer."
Tell Pulsar producer how to buffer messages when EMQX has more messages to"
send than Pulsar can keep up, or when Pulsar is down."""
producer_buffer.label:
"""Message Buffer"""
producer_message_opts {
desc = "Template to render a Pulsar message."
label = "Pulsar Message Template"
}
producer_compression.desc:
"""Compression method."""
producer_compression.label:
"""Compression"""
producer_pulsar_message {
desc = "Template to render a Pulsar message."
label = "Pulsar Message Template"
}
producer_local_topic.desc:
"""MQTT topic or topic filter as data source (bridge input)
If rule action is used as data source, this config should be left empty,
otherwise messages will be duplicated in Pulsar."""
producer_local_topic.label:
"""Source MQTT Topic"""
producer_pulsar_topic {
desc = "Pulsar topic name"
label = "Pulsar topic name"
}
producer_max_batch_bytes.desc:
"""Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers
default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in
order to compensate Pulsar message encoding overheads (especially when each individual
message is very small). When a single message is over the limit, it is still
sent (as a single element batch)."""
producer_max_batch_bytes.label:
"""Max Batch Bytes"""
producer_retention_period {
desc = "The amount of time messages will be buffered while there is no connection to"
" the Pulsar broker. Longer times mean that more memory/disk will be used"
label = "Retention Period"
}
producer_send_buffer {
desc = "Fine tune the socket send buffer. The default value is tuned for high throughput."
label = "Socket Send Buffer Size"
}
producer_pulsar_topic.desc:
"""Pulsar topic name"""
producer_pulsar_topic.label:
"""Pulsar topic name"""
producer_strategy {
desc = "Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.\n"
"\n"
"<code>random</code>: Randomly pick a partition for each message.\n"
"<code>roundrobin</code>: Pick each available producer in turn for each message.\n"
"<code>key_dispatch</code>: Hash Pulsar message key of the first message in a batch"
" to a partition number."
label = "Partition Strategy"
}
producer_retention_period.desc:
"""The amount of time messages will be buffered while there is no connection to
the Pulsar broker. Longer times mean that more memory/disk will be used"""
producer_retention_period.label:
"""Retention Period"""
producer_sync_timeout {
desc = "Maximum wait time for receiving a receipt from Pulsar when publishing synchronously."
label = "Sync publish timeout"
}
producer_send_buffer.desc:
"""Fine tune the socket send buffer. The default value is tuned for high throughput."""
producer_send_buffer.label:
"""Socket Send Buffer Size"""
producer_value_template {
desc = "Template to render Pulsar message value."
label = "Message Value"
}
producer_strategy.desc:
"""Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.
pulsar_producer_struct {
desc = "Configuration for a Pulsar bridge."
label = "Pulsar Bridge Configuration"
}
<code>random</code>: Randomly pick a partition for each message.
<code>roundrobin</code>: Pick each available producer in turn for each message.
<code>key_dispatch</code>: Hash Pulsar message key of the first message in a batch
to a partition number."""
producer_strategy.label:
"""Partition Strategy"""
pulsar_producer_struct.desc:
"""Configuration for a Pulsar bridge."""
pulsar_producer_struct.label:
"""Pulsar Bridge Configuration"""
servers.desc:
"""A comma separated list of Pulsar URLs in the form <code>scheme://host[:port]</code>
for the client to connect to. The supported schemes are <code>pulsar://</code> (default)
and <code>pulsar+ssl://</code>. The default port is 6650."""
servers.label:
"""Servers"""
servers {
desc = "A comma separated list of Pulsar URLs in the form <code>scheme://host[:port]</code>"
" for the client to connect to. The supported schemes are <code>pulsar://</code> (default)"
" and <code>pulsar+ssl://</code>. The default port is 6650."
label = "Servers"
}
}

View File

@ -0,0 +1,38 @@
emqx_bridge_pulsar_pubsub_schema {
action_parameters.desc:
"""Action specific configs."""
action_parameters.label:
"""Action"""
publisher_action.desc:
"""Publish message to Pulsar topic"""
publisher_action.label:
"""Publish Action """
producer_sync_timeout.desc:
"""Maximum wait time for receiving a receipt from Pulsar when publishing synchronously."""
producer_sync_timeout.label:
"""Sync publish timeout"""
producer_key_template.desc:
"""Template to render Pulsar message key."""
producer_key_template.label:
"""Message Key"""
producer_value_template.desc:
"""Template to render Pulsar message value."""
producer_value_template.label:
"""Message Value"""
producer_message_opts.desc:
"""Template to render a Pulsar message."""
producer_message_opts.label:
"""Pulsar Message Template"""
producer_pulsar_message.desc:
"""Template to render a Pulsar message."""
producer_pulsar_message.label:
"""Pulsar Message Template"""
}