Merge pull request #10378 from thalesmg/pulsar-producer-e50

feat: implement Pulsar Producer bridge (e5.0)
This commit is contained in:
Thales Macedo Garitezi 2023-04-25 18:01:43 -03:00 committed by GitHub
commit 79cf5cad19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 2562 additions and 95 deletions

View File

@ -0,0 +1,32 @@
version: '3'
services:
pulsar:
container_name: pulsar
image: apachepulsar/pulsar:2.11.0
# ports:
# - 6650:6650
# - 8080:8080
networks:
emqx_bridge:
volumes:
- ../../apps/emqx/etc/certs/cert.pem:/etc/certs/server.pem
- ../../apps/emqx/etc/certs/key.pem:/etc/certs/key.pem
- ../../apps/emqx/etc/certs/cacert.pem:/etc/certs/ca.pem
restart: always
command:
- bash
- "-c"
- |
sed -i 's/^advertisedAddress=/#advertisedAddress=/' conf/standalone.conf
sed -ie 's/^brokerServicePort=.*/brokerServicePort=6649/' conf/standalone.conf
sed -i 's/^bindAddress=/#bindAddress=/' conf/standalone.conf
sed -i 's#^bindAddresses=#bindAddresses=plain:pulsar://0.0.0.0:6650,ssl:pulsar+ssl://0.0.0.0:6651,toxiproxy:pulsar://0.0.0.0:6652,toxiproxy_ssl:pulsar+ssl://0.0.0.0:6653#' conf/standalone.conf
sed -i 's#^advertisedAddress=#advertisedAddress=plain:pulsar://pulsar:6650,ssl:pulsar+ssl://pulsar:6651,toxiproxy:pulsar://toxiproxy:6652,toxiproxy_ssl:pulsar+ssl://toxiproxy:6653#' conf/standalone.conf
sed -i 's#^tlsCertificateFilePath=#tlsCertificateFilePath=/etc/certs/server.pem#' conf/standalone.conf
sed -i 's#^tlsTrustCertsFilePath=#tlsTrustCertsFilePath=/etc/certs/ca.pem#' conf/standalone.conf
sed -i 's#^tlsKeyFilePath=#tlsKeyFilePath=/etc/certs/key.pem#' conf/standalone.conf
sed -i 's#^tlsProtocols=#tlsProtocols=TLSv1.3,TLSv1.2#' conf/standalone.conf
sed -i 's#^tlsCiphers=#tlsCiphers=TLS_AES_256_GCM_SHA384#' conf/standalone.conf
echo 'advertisedListeners=plain:pulsar://pulsar:6650,ssl:pulsar+ssl://pulsar:6651,toxiproxy:pulsar://toxiproxy:6652,toxiproxy_ssl:pulsar+ssl://toxiproxy:6653' >> conf/standalone.conf
bin/pulsar standalone -nfw -nss

View File

@ -107,5 +107,17 @@
"listen": "0.0.0.0:4242",
"upstream": "opents:4242",
"enabled": true
},
{
"name": "pulsar_plain",
"listen": "0.0.0.0:6652",
"upstream": "pulsar:6652",
"enabled": true
},
{
"name": "pulsar_tls",
"listen": "0.0.0.0:6653",
"upstream": "pulsar:6653",
"enabled": true
}
]

View File

@ -1,7 +1,7 @@
Source code in this repository is variously licensed under below licenses.
For EMQX: Apache License 2.0, see APL.txt,
which applies to all source files except for lib-ee sub-directory.
For Default: Apache License 2.0, see APL.txt,
which applies to all source files except for folders applied with Business Source License.
For EMQX Enterprise (since version 5.0): Business Source License 1.1,
see lib-ee/BSL.txt, which applies to source code in lib-ee sub-directory.
see apps/emqx_bridge_kafka/BSL.txt as an example, please check license files under sub directory of apps.

View File

@ -42,7 +42,12 @@
-type ip_port() :: tuple() | integer().
-type cipher() :: map().
-type port_number() :: 1..65536.
-type server_parse_option() :: #{default_port => port_number(), no_port => boolean()}.
-type server_parse_option() :: #{
default_port => port_number(),
no_port => boolean(),
supported_schemes => [string()],
default_scheme => string()
}.
-type url() :: binary().
-type json_binary() :: binary().
@ -61,6 +66,12 @@
-typerefl_from_string({url/0, emqx_schema, to_url}).
-typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}).
-type parsed_server() :: #{
hostname := string(),
port => port_number(),
scheme => string()
}.
-export([
validate_heap_size/1,
user_lookup_fun_tr/2,
@ -2896,7 +2907,7 @@ servers_validator(Opts, Required) ->
%% `no_port': by default it's `false', when set to `true',
%% a `throw' exception is raised if the port is found.
-spec parse_server(undefined | string() | binary(), server_parse_option()) ->
{string(), port_number()}.
undefined | parsed_server().
parse_server(Str, Opts) ->
case parse_servers(Str, Opts) of
undefined ->
@ -2910,7 +2921,7 @@ parse_server(Str, Opts) ->
%% @doc Parse comma separated `host[:port][,host[:port]]' endpoints
%% into a list of `{Host, Port}' tuples or just `Host' string.
-spec parse_servers(undefined | string() | binary(), server_parse_option()) ->
[{string(), port_number()}].
undefined | [parsed_server()].
parse_servers(undefined, _Opts) ->
%% should not parse 'undefined' as string,
%% not to throw exception either,
@ -2956,6 +2967,9 @@ split_host_port(Str) ->
do_parse_server(Str, Opts) ->
DefaultPort = maps:get(default_port, Opts, undefined),
NotExpectingPort = maps:get(no_port, Opts, false),
DefaultScheme = maps:get(default_scheme, Opts, undefined),
SupportedSchemes = maps:get(supported_schemes, Opts, []),
NotExpectingScheme = (not is_list(DefaultScheme)) andalso length(SupportedSchemes) =:= 0,
case is_integer(DefaultPort) andalso NotExpectingPort of
true ->
%% either provide a default port from schema,
@ -2964,22 +2978,129 @@ do_parse_server(Str, Opts) ->
false ->
ok
end,
case is_list(DefaultScheme) andalso (not lists:member(DefaultScheme, SupportedSchemes)) of
true ->
%% inconsistent schema
error("bad_schema");
false ->
ok
end,
%% do not split with space, there should be no space allowed between host and port
case string:tokens(Str, ":") of
[Hostname, Port] ->
NotExpectingPort andalso throw("not_expecting_port_number"),
{check_hostname(Hostname), parse_port(Port)};
[Hostname] ->
case is_integer(DefaultPort) of
true ->
{check_hostname(Hostname), DefaultPort};
false when NotExpectingPort ->
check_hostname(Hostname);
false ->
throw("missing_port_number")
end;
_ ->
throw("bad_host_port")
Tokens = string:tokens(Str, ":"),
Context = #{
not_expecting_port => NotExpectingPort,
not_expecting_scheme => NotExpectingScheme,
default_port => DefaultPort,
default_scheme => DefaultScheme,
opts => Opts
},
check_server_parts(Tokens, Context).
check_server_parts([Scheme, "//" ++ Hostname, Port], Context) ->
#{
not_expecting_scheme := NotExpectingScheme,
not_expecting_port := NotExpectingPort,
opts := Opts
} = Context,
NotExpectingPort andalso throw("not_expecting_port_number"),
NotExpectingScheme andalso throw("not_expecting_scheme"),
#{
scheme => check_scheme(Scheme, Opts),
hostname => check_hostname(Hostname),
port => parse_port(Port)
};
check_server_parts([Scheme, "//" ++ Hostname], Context) ->
#{
not_expecting_scheme := NotExpectingScheme,
not_expecting_port := NotExpectingPort,
default_port := DefaultPort,
opts := Opts
} = Context,
NotExpectingScheme andalso throw("not_expecting_scheme"),
case is_integer(DefaultPort) of
true ->
#{
scheme => check_scheme(Scheme, Opts),
hostname => check_hostname(Hostname),
port => DefaultPort
};
false when NotExpectingPort ->
#{
scheme => check_scheme(Scheme, Opts),
hostname => check_hostname(Hostname)
};
false ->
throw("missing_port_number")
end;
check_server_parts([Hostname, Port], Context) ->
#{
not_expecting_port := NotExpectingPort,
default_scheme := DefaultScheme
} = Context,
NotExpectingPort andalso throw("not_expecting_port_number"),
case is_list(DefaultScheme) of
false ->
#{
hostname => check_hostname(Hostname),
port => parse_port(Port)
};
true ->
#{
scheme => DefaultScheme,
hostname => check_hostname(Hostname),
port => parse_port(Port)
}
end;
check_server_parts([Hostname], Context) ->
#{
not_expecting_scheme := NotExpectingScheme,
not_expecting_port := NotExpectingPort,
default_port := DefaultPort,
default_scheme := DefaultScheme
} = Context,
case is_integer(DefaultPort) orelse NotExpectingPort of
true ->
ok;
false ->
throw("missing_port_number")
end,
case is_list(DefaultScheme) orelse NotExpectingScheme of
true ->
ok;
false ->
throw("missing_scheme")
end,
case {is_integer(DefaultPort), is_list(DefaultScheme)} of
{true, true} ->
#{
scheme => DefaultScheme,
hostname => check_hostname(Hostname),
port => DefaultPort
};
{true, false} ->
#{
hostname => check_hostname(Hostname),
port => DefaultPort
};
{false, true} ->
#{
scheme => DefaultScheme,
hostname => check_hostname(Hostname)
};
{false, false} ->
#{hostname => check_hostname(Hostname)}
end;
check_server_parts(_Tokens, _Context) ->
throw("bad_host_port").
check_scheme(Str, Opts) ->
SupportedSchemes = maps:get(supported_schemes, Opts, []),
IsSupported = lists:member(Str, SupportedSchemes),
case IsSupported of
true ->
Str;
false ->
throw("unsupported_scheme")
end.
check_hostname(Str) ->

View File

@ -500,6 +500,7 @@ t_get_conn_info(_) ->
end).
t_oom_shutdown(init, Config) ->
ok = snabbkaffe:stop(),
ok = snabbkaffe:start_trace(),
ok = meck:new(emqx_utils, [non_strict, passthrough, no_history, no_link]),
meck:expect(

View File

@ -219,112 +219,124 @@ parse_server_test_() ->
?T(
"single server, binary, no port",
?assertEqual(
[{"localhost", DefaultPort}],
[#{hostname => "localhost", port => DefaultPort}],
Parse(<<"localhost">>)
)
),
?T(
"single server, string, no port",
?assertEqual(
[{"localhost", DefaultPort}],
[#{hostname => "localhost", port => DefaultPort}],
Parse("localhost")
)
),
?T(
"single server, list(string), no port",
?assertEqual(
[{"localhost", DefaultPort}],
[#{hostname => "localhost", port => DefaultPort}],
Parse(["localhost"])
)
),
?T(
"single server, list(binary), no port",
?assertEqual(
[{"localhost", DefaultPort}],
[#{hostname => "localhost", port => DefaultPort}],
Parse([<<"localhost">>])
)
),
?T(
"single server, binary, with port",
?assertEqual(
[{"localhost", 9999}],
[#{hostname => "localhost", port => 9999}],
Parse(<<"localhost:9999">>)
)
),
?T(
"single server, list(string), with port",
?assertEqual(
[{"localhost", 9999}],
[#{hostname => "localhost", port => 9999}],
Parse(["localhost:9999"])
)
),
?T(
"single server, string, with port",
?assertEqual(
[{"localhost", 9999}],
[#{hostname => "localhost", port => 9999}],
Parse("localhost:9999")
)
),
?T(
"single server, list(binary), with port",
?assertEqual(
[{"localhost", 9999}],
[#{hostname => "localhost", port => 9999}],
Parse([<<"localhost:9999">>])
)
),
?T(
"multiple servers, string, no port",
?assertEqual(
[{"host1", DefaultPort}, {"host2", DefaultPort}],
[
#{hostname => "host1", port => DefaultPort},
#{hostname => "host2", port => DefaultPort}
],
Parse("host1, host2")
)
),
?T(
"multiple servers, binary, no port",
?assertEqual(
[{"host1", DefaultPort}, {"host2", DefaultPort}],
[
#{hostname => "host1", port => DefaultPort},
#{hostname => "host2", port => DefaultPort}
],
Parse(<<"host1, host2,,,">>)
)
),
?T(
"multiple servers, list(string), no port",
?assertEqual(
[{"host1", DefaultPort}, {"host2", DefaultPort}],
[
#{hostname => "host1", port => DefaultPort},
#{hostname => "host2", port => DefaultPort}
],
Parse(["host1", "host2"])
)
),
?T(
"multiple servers, list(binary), no port",
?assertEqual(
[{"host1", DefaultPort}, {"host2", DefaultPort}],
[
#{hostname => "host1", port => DefaultPort},
#{hostname => "host2", port => DefaultPort}
],
Parse([<<"host1">>, <<"host2">>])
)
),
?T(
"multiple servers, string, with port",
?assertEqual(
[{"host1", 1234}, {"host2", 2345}],
[#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}],
Parse("host1:1234, host2:2345")
)
),
?T(
"multiple servers, binary, with port",
?assertEqual(
[{"host1", 1234}, {"host2", 2345}],
[#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}],
Parse(<<"host1:1234, host2:2345, ">>)
)
),
?T(
"multiple servers, list(string), with port",
?assertEqual(
[{"host1", 1234}, {"host2", 2345}],
[#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}],
Parse([" host1:1234 ", "host2:2345"])
)
),
?T(
"multiple servers, list(binary), with port",
?assertEqual(
[{"host1", 1234}, {"host2", 2345}],
[#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}],
Parse([<<"host1:1234">>, <<"host2:2345">>])
)
),
@ -350,9 +362,9 @@ parse_server_test_() ->
)
),
?T(
"multiple servers wihtout port, mixed list(binary|string)",
"multiple servers without port, mixed list(binary|string)",
?assertEqual(
["host1", "host2"],
[#{hostname => "host1"}, #{hostname => "host2"}],
Parse2([<<"host1">>, "host2"], #{no_port => true})
)
),
@ -394,14 +406,18 @@ parse_server_test_() ->
?T(
"single server map",
?assertEqual(
[{"host1.domain", 1234}],
[#{hostname => "host1.domain", port => 1234}],
HoconParse("host1.domain:1234")
)
),
?T(
"multiple servers map",
?assertEqual(
[{"host1.domain", 1234}, {"host2.domain", 2345}, {"host3.domain", 3456}],
[
#{hostname => "host1.domain", port => 1234},
#{hostname => "host2.domain", port => 2345},
#{hostname => "host3.domain", port => 3456}
],
HoconParse("host1.domain:1234,host2.domain:2345,host3.domain:3456")
)
),
@ -447,6 +463,171 @@ parse_server_test_() ->
"bad_schema",
emqx_schema:parse_server("whatever", #{default_port => 10, no_port => true})
)
),
?T(
"scheme, hostname and port",
?assertEqual(
#{scheme => "pulsar+ssl", hostname => "host", port => 6651},
emqx_schema:parse_server(
"pulsar+ssl://host:6651",
#{
default_port => 6650,
supported_schemes => ["pulsar", "pulsar+ssl"]
}
)
)
),
?T(
"scheme and hostname, default port",
?assertEqual(
#{scheme => "pulsar", hostname => "host", port => 6650},
emqx_schema:parse_server(
"pulsar://host",
#{
default_port => 6650,
supported_schemes => ["pulsar", "pulsar+ssl"]
}
)
)
),
?T(
"scheme and hostname, no port",
?assertEqual(
#{scheme => "pulsar", hostname => "host"},
emqx_schema:parse_server(
"pulsar://host",
#{
no_port => true,
supported_schemes => ["pulsar", "pulsar+ssl"]
}
)
)
),
?T(
"scheme and hostname, missing port",
?assertThrow(
"missing_port_number",
emqx_schema:parse_server(
"pulsar://host",
#{
no_port => false,
supported_schemes => ["pulsar", "pulsar+ssl"]
}
)
)
),
?T(
"hostname, default scheme, no default port",
?assertEqual(
#{scheme => "pulsar", hostname => "host"},
emqx_schema:parse_server(
"host",
#{
default_scheme => "pulsar",
no_port => true,
supported_schemes => ["pulsar", "pulsar+ssl"]
}
)
)
),
?T(
"hostname, default scheme, default port",
?assertEqual(
#{scheme => "pulsar", hostname => "host", port => 6650},
emqx_schema:parse_server(
"host",
#{
default_port => 6650,
default_scheme => "pulsar",
supported_schemes => ["pulsar", "pulsar+ssl"]
}
)
)
),
?T(
"just hostname, expecting missing scheme",
?assertThrow(
"missing_scheme",
emqx_schema:parse_server(
"host",
#{
no_port => true,
supported_schemes => ["pulsar", "pulsar+ssl"]
}
)
)
),
?T(
"hostname, default scheme, defined port",
?assertEqual(
#{scheme => "pulsar", hostname => "host", port => 6651},
emqx_schema:parse_server(
"host:6651",
#{
default_port => 6650,
default_scheme => "pulsar",
supported_schemes => ["pulsar", "pulsar+ssl"]
}
)
)
),
?T(
"inconsistent scheme opts",
?assertError(
"bad_schema",
emqx_schema:parse_server(
"pulsar+ssl://host:6651",
#{
default_port => 6650,
default_scheme => "something",
supported_schemes => ["not", "supported"]
}
)
)
),
?T(
"hostname, default scheme, defined port",
?assertEqual(
#{scheme => "pulsar", hostname => "host", port => 6651},
emqx_schema:parse_server(
"host:6651",
#{
default_port => 6650,
default_scheme => "pulsar",
supported_schemes => ["pulsar", "pulsar+ssl"]
}
)
)
),
?T(
"unsupported scheme",
?assertThrow(
"unsupported_scheme",
emqx_schema:parse_server(
"pulsar+quic://host:6651",
#{
default_port => 6650,
supported_schemes => ["pulsar"]
}
)
)
),
?T(
"multiple hostnames with schemes (1)",
?assertEqual(
[
#{scheme => "pulsar", hostname => "host", port => 6649},
#{scheme => "pulsar+ssl", hostname => "other.host", port => 6651},
#{scheme => "pulsar", hostname => "yet.another", port => 6650}
],
emqx_schema:parse_servers(
"pulsar://host:6649, pulsar+ssl://other.host:6651,pulsar://yet.another",
#{
default_port => 6650,
supported_schemes => ["pulsar", "pulsar+ssl"]
}
)
)
)
].

View File

@ -60,12 +60,12 @@ init(Parent) ->
{ok, #{callbacks => [], owner => Parent}}.
terminate(_Reason, #{callbacks := Callbacks}) ->
lists:foreach(fun(Fun) -> catch Fun() end, Callbacks).
do_terminate(Callbacks).
handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) ->
{reply, ok, State#{callbacks := [Callback | Callbacks]}};
handle_call(terminate, _From, State = #{callbacks := Callbacks}) ->
lists:foreach(fun(Fun) -> catch Fun() end, Callbacks),
do_terminate(Callbacks),
{stop, normal, ok, State};
handle_call(_Req, _From, State) ->
{reply, error, State}.
@ -77,3 +77,23 @@ handle_info({'EXIT', Parent, _Reason}, State = #{owner := Parent}) ->
{stop, normal, State};
handle_info(_Msg, State) ->
{noreply, State}.
%%----------------------------------------------------------------------------------
%% Internal fns
%%----------------------------------------------------------------------------------
do_terminate(Callbacks) ->
lists:foreach(
fun(Fun) ->
try
Fun()
catch
K:E:S ->
ct:pal("error executing callback ~p: ~p", [Fun, {K, E}]),
ct:pal("stacktrace: ~p", [S]),
ok
end
end,
Callbacks
),
ok.

View File

@ -70,7 +70,8 @@
T == dynamo;
T == rocketmq;
T == cassandra;
T == sqlserver
T == sqlserver;
T == pulsar_producer
).
load() ->

View File

@ -340,6 +340,8 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) ->
%% to hocon; keeping this as just `kafka' for backwards compatibility.
parse_confs(<<"kafka">> = _Type, Name, Conf) ->
Conf#{bridge_name => Name};
parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) ->
Conf#{bridge_name => Name};
parse_confs(_Type, _Name, Conf) ->
Conf.

View File

@ -92,7 +92,7 @@ callback_mode() -> async_if_possible.
on_start(
InstId,
#{
servers := Servers,
servers := Servers0,
keyspace := Keyspace,
username := Username,
pool_size := PoolSize,
@ -104,9 +104,16 @@ on_start(
connector => InstId,
config => emqx_utils:redact(Config)
}),
Servers =
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(Servers0, ?DEFAULT_SERVER_OPTION)
),
Options = [
{nodes, emqx_schema:parse_servers(Servers, ?DEFAULT_SERVER_OPTION)},
{nodes, Servers},
{username, Username},
{password, emqx_secret:wrap(maps:get(password, Config, ""))},
{keyspace, Keyspace},

View File

@ -38,9 +38,14 @@ groups() ->
[].
cassandra_servers() ->
emqx_schema:parse_servers(
iolist_to_binary([?CASSANDRA_HOST, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]),
#{default_port => ?CASSANDRA_DEFAULT_PORT}
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(
iolist_to_binary([?CASSANDRA_HOST, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]),
#{default_port => ?CASSANDRA_DEFAULT_PORT}
)
).
init_per_suite(Config) ->

View File

@ -81,7 +81,7 @@ on_start(
%% emulating the emulator behavior
%% https://cloud.google.com/pubsub/docs/emulator
HostPort = os:getenv("PUBSUB_EMULATOR_HOST", "pubsub.googleapis.com:443"),
{Host, Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}),
#{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}),
PoolType = random,
Transport = tls,
TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}),

View File

@ -10,10 +10,21 @@ workers from `emqx_resource`. It implements the connection management
and interaction without need for a separate connector app, since it's
not used by authentication and authorization applications.
## Contributing
# Documentation links
For more information on Apache Kafka, please see its [official
site](https://kafka.apache.org/).
# Configurations
Please see [our official
documentation](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-kafka.html)
for more detailed info.
# Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
## License
# License
See [BSL](./BSL.txt).
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [
kernel,

View File

@ -179,7 +179,12 @@ on_get_status(_InstanceID, State) ->
kafka_client_id := ClientID,
kafka_topics := KafkaTopics
} = State,
do_get_status(State, ClientID, KafkaTopics, SubscriberId).
case do_get_status(ClientID, KafkaTopics, SubscriberId) of
{disconnected, Message} ->
{disconnected, State, Message};
Res ->
Res
end.
%%-------------------------------------------------------------------------------------
%% `brod_group_subscriber' API
@ -376,41 +381,41 @@ stop_client(ClientID) ->
),
ok.
do_get_status(State, ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
case brod:get_partitions_count(ClientID, KafkaTopic) of
{ok, NPartitions} ->
case do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) of
connected -> do_get_status(State, ClientID, RestTopics, SubscriberId);
case do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
connected -> do_get_status(ClientID, RestTopics, SubscriberId);
disconnected -> disconnected
end;
{error, {client_down, Context}} ->
case infer_client_error(Context) of
auth_error ->
Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
{disconnected, Message};
{auth_error, Message0} ->
Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
{disconnected, Message};
connection_refused ->
Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
{disconnected, Message};
_ ->
{disconnected, State, ?CLIENT_DOWN_MESSAGE}
{disconnected, ?CLIENT_DOWN_MESSAGE}
end;
{error, leader_not_available} ->
Message =
"Leader connection not available. Please check the Kafka topic used,"
" the connection parameters and Kafka cluster health",
{disconnected, State, Message};
{disconnected, Message};
_ ->
disconnected
end;
do_get_status(_State, _ClientID, _KafkaTopics = [], _SubscriberId) ->
do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
connected.
-spec do_get_status1(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
-spec do_get_topic_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
connected | disconnected.
do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
Results =
lists:map(
fun(N) ->

View File

@ -1156,11 +1156,12 @@ t_start_and_consume_ok(Config) ->
),
%% Check that the bridge probe API doesn't leak atoms.
ProbeRes = probe_bridge_api(Config),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes),
ProbeRes0 = probe_bridge_api(Config),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms.
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes),
ProbeRes1 = probe_bridge_api(Config),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
@ -1259,11 +1260,12 @@ t_multiple_topic_mappings(Config) ->
{ok, _} = snabbkaffe:receive_events(SRef0),
%% Check that the bridge probe API doesn't leak atoms.
ProbeRes = probe_bridge_api(Config),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes),
ProbeRes0 = probe_bridge_api(Config),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms.
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes),
ProbeRes1 = probe_bridge_api(Config),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),

19
apps/emqx_bridge_pulsar/.gitignore vendored Normal file
View File

@ -0,0 +1,19 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2027-02-01
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,30 @@
# Pulsar Data Integration Bridge
This application houses the Pulsar Producer data integration bridge
for EMQX Enterprise Edition. It provides the means to connect to
Pulsar and publish messages to it.
Currently, our Pulsar Producer library has its own `replayq` buffering
implementation, so this bridge does not require buffer workers from
`emqx_resource`. It implements the connection management and
interaction without need for a separate connector app, since it's not
used by authentication and authorization applications.
# Documentation links
For more information on Apache Pulsar, please see its [official
site](https://pulsar.apache.org/).
# Configurations
Please see [our official
documentation](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-pulsar.html)
for more detailed info.
# Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
# License
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

View File

@ -0,0 +1,2 @@
toxiproxy
pulsar

View File

@ -0,0 +1,14 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(EMQX_BRIDGE_PULSAR_HRL).
-define(EMQX_BRIDGE_PULSAR_HRL, true).
-define(PULSAR_HOST_OPTIONS, #{
default_port => 6650,
default_scheme => "pulsar",
supported_schemes => ["pulsar", "pulsar+ssl"]
}).
-endif.

View File

@ -0,0 +1,14 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [
{pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.0"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
]}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_bridge_pulsar]}
]}.

View File

@ -0,0 +1,14 @@
{application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"},
{vsn, "0.1.0"},
{registered, []},
{applications, [
kernel,
stdlib,
pulsar
]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,228 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar).
-include("emqx_bridge_pulsar.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
%% hocon_schema API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% emqx_ee_bridge "unofficial" API
-export([conn_bridge_examples/1]).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"bridge_pulsar".
roots() ->
[].
fields(pulsar_producer) ->
fields(config) ++ fields(producer_opts);
fields(config) ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{servers,
mk(
binary(),
#{
required => true,
desc => ?DESC("servers"),
validator => emqx_schema:servers_validator(
?PULSAR_HOST_OPTIONS, _Required = true
)
}
)},
{authentication,
mk(hoconsc:union([none, ref(auth_basic), ref(auth_token)]), #{
default => none, desc => ?DESC("authentication")
})}
] ++ emqx_connector_schema_lib:ssl_fields();
fields(producer_opts) ->
[
{batch_size,
mk(
pos_integer(),
#{default => 100, desc => ?DESC("producer_batch_size")}
)},
{compression,
mk(
hoconsc:enum([no_compression, snappy, zlib]),
#{default => no_compression, desc => ?DESC("producer_compression")}
)},
{send_buffer,
mk(emqx_schema:bytesize(), #{
default => <<"1MB">>, desc => ?DESC("producer_send_buffer")
})},
{sync_timeout,
mk(emqx_schema:duration_ms(), #{
default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
})},
{retention_period,
mk(
hoconsc:union([infinity, emqx_schema:duration_ms()]),
#{default => infinity, desc => ?DESC("producer_retention_period")}
)},
{max_batch_bytes,
mk(
emqx_schema:bytesize(),
#{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")}
)},
{local_topic, mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})},
{pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})},
{strategy,
mk(
hoconsc:enum([random, roundrobin, key_dispatch]),
#{default => random, desc => ?DESC("producer_strategy")}
)},
{buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})},
{message,
mk(ref(producer_pulsar_message), #{
required => false, desc => ?DESC("producer_message_opts")
})},
{resource_opts,
mk(
ref(producer_resource_opts),
#{
required => false,
desc => ?DESC(emqx_resource_schema, "creation_opts")
}
)}
];
fields(producer_buffer) ->
[
{mode,
mk(
hoconsc:enum([memory, disk, hybrid]),
#{default => memory, desc => ?DESC("buffer_mode")}
)},
{per_partition_limit,
mk(
emqx_schema:bytesize(),
#{default => <<"2GB">>, desc => ?DESC("buffer_per_partition_limit")}
)},
{segment_bytes,
mk(
emqx_schema:bytesize(),
#{default => <<"100MB">>, desc => ?DESC("buffer_segment_bytes")}
)},
{memory_overload_protection,
mk(boolean(), #{
default => false,
desc => ?DESC("buffer_memory_overload_protection")
})}
];
fields(producer_pulsar_message) ->
[
{key,
mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC("producer_key_template")})},
{value, mk(string(), #{default => <<"${.}">>, desc => ?DESC("producer_value_template")})}
];
fields(producer_resource_opts) ->
SupportedOpts = [
health_check_interval,
resume_interval,
start_after_created,
start_timeout,
auto_restart_interval
],
lists:filtermap(
fun
({health_check_interval = Field, MetaFn}) ->
{true, {Field, override_default(MetaFn, 1_000)}};
({Field, _Meta}) ->
lists:member(Field, SupportedOpts)
end,
emqx_resource_schema:fields("creation_opts")
);
fields(auth_basic) ->
[
{username, mk(binary(), #{required => true, desc => ?DESC("auth_basic_username")})},
{password,
mk(binary(), #{
required => true,
desc => ?DESC("auth_basic_password"),
sensitive => true,
converter => fun emqx_schema:password_converter/2
})}
];
fields(auth_token) ->
[
{jwt,
mk(binary(), #{
required => true,
desc => ?DESC("auth_token_jwt"),
sensitive => true,
converter => fun emqx_schema:password_converter/2
})}
];
fields("get_" ++ Type) ->
emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
fields("put_" ++ Type) ->
fields("config_" ++ Type);
fields("post_" ++ Type) ->
[type_field(), name_field() | fields("config_" ++ Type)];
fields("config_producer") ->
fields(pulsar_producer).
desc(pulsar_producer) ->
?DESC(pulsar_producer_struct);
desc(producer_resource_opts) ->
?DESC(emqx_resource_schema, "creation_opts");
desc("get_" ++ Type) when Type =:= "producer" ->
["Configuration for Pulsar using `GET` method."];
desc("put_" ++ Type) when Type =:= "producer" ->
["Configuration for Pulsar using `PUT` method."];
desc("post_" ++ Type) when Type =:= "producer" ->
["Configuration for Pulsar using `POST` method."];
desc(Name) ->
lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
?DESC(Name).
conn_bridge_examples(_Method) ->
[
#{
<<"pulsar_producer">> => #{
summary => <<"Pulsar Producer Bridge">>,
value => #{todo => true}
}
}
].
%%-------------------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------------------
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Name) -> hoconsc:ref(?MODULE, Name).
type_field() ->
{type, mk(hoconsc:enum([pulsar_producer]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
struct_names() ->
[
auth_basic,
auth_token,
producer_buffer,
producer_pulsar_message
].
override_default(OriginalFn, NewDefault) ->
fun
(default) -> NewDefault;
(Field) -> OriginalFn(Field)
end.

View File

@ -0,0 +1,402 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_impl_producer).
-include("emqx_bridge_pulsar.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_get_status/2,
on_query/3,
on_query_async/4
]).
-type pulsar_client_id() :: atom().
-type state() :: #{
pulsar_client_id := pulsar_client_id(),
producers := pulsar_producers:producers(),
sync_timeout := infinity | time:time(),
message_template := message_template()
}.
-type buffer_mode() :: memory | disk | hybrid.
-type compression_mode() :: no_compression | snappy | zlib.
-type partition_strategy() :: random | roundrobin | key_dispatch.
-type message_template_raw() :: #{
key := binary(),
value := binary()
}.
-type message_template() :: #{
key := emqx_plugin_libs_rule:tmpl_token(),
value := emqx_plugin_libs_rule:tmpl_token()
}.
-type config() :: #{
authentication := _,
batch_size := pos_integer(),
bridge_name := atom(),
buffer := #{
mode := buffer_mode(),
per_partition_limit := emqx_schema:byte_size(),
segment_bytes := emqx_schema:byte_size(),
memory_overload_protection := boolean()
},
compression := compression_mode(),
max_batch_bytes := emqx_schema:bytesize(),
message := message_template_raw(),
pulsar_topic := binary(),
retention_period := infinity | emqx_schema:duration_ms(),
send_buffer := emqx_schema:bytesize(),
servers := binary(),
ssl := _,
strategy := partition_strategy(),
sync_timeout := emqx_schema:duration_ms()
}.
%%-------------------------------------------------------------------------------------
%% `emqx_resource' API
%%-------------------------------------------------------------------------------------
callback_mode() -> async_if_possible.
%% there are no queries to be made to this bridge, so we say that
%% buffer is supported so we don't spawn unused resource buffer
%% workers.
is_buffer_supported() -> true.
-spec on_start(manager_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) ->
#{
authentication := _Auth,
bridge_name := BridgeName,
servers := Servers0,
ssl := SSL
} = Config,
Servers = format_servers(Servers0),
ClientId = make_client_id(InstanceId, BridgeName),
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
ClientOpts = #{
ssl_opts => SSLOpts,
conn_opts => conn_opts(Config)
},
case pulsar:ensure_supervised_client(ClientId, Servers, ClientOpts) of
{ok, _Pid} ->
?SLOG(info, #{
msg => "pulsar_client_started",
instance_id => InstanceId,
pulsar_hosts => Servers
});
{error, Reason} ->
?SLOG(error, #{
msg => "failed_to_start_pulsar_client",
instance_id => InstanceId,
pulsar_hosts => Servers,
reason => Reason
}),
throw(failed_to_start_pulsar_client)
end,
start_producer(Config, InstanceId, ClientId, ClientOpts).
-spec on_stop(manager_id(), state()) -> ok.
on_stop(_InstanceId, State) ->
#{
pulsar_client_id := ClientId,
producers := Producers
} = State,
stop_producers(ClientId, Producers),
stop_client(ClientId),
?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}),
ok.
-spec on_get_status(manager_id(), state()) -> connected | disconnected.
on_get_status(_InstanceId, State) ->
#{
pulsar_client_id := ClientId,
producers := Producers
} = State,
case pulsar_client_sup:find_client(ClientId) of
{ok, Pid} ->
try pulsar_client:get_status(Pid) of
true ->
get_producer_status(Producers);
false ->
disconnected
catch
error:timeout ->
disconnected;
exit:{noproc, _} ->
disconnected
end;
{error, _} ->
disconnected
end.
-spec on_query(manager_id(), {send_message, map()}, state()) ->
{ok, term()}
| {error, timeout}
| {error, term()}.
on_query(_InstanceId, {send_message, Message}, State) ->
#{
producers := Producers,
sync_timeout := SyncTimeout,
message_template := MessageTemplate
} = State,
PulsarMessage = render_message(Message, MessageTemplate),
try
pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
catch
error:timeout ->
{error, timeout}
end.
-spec on_query_async(
manager_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state()
) ->
{ok, pid()}.
on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) ->
#{
producers := Producers,
message_template := MessageTemplate
} = State,
PulsarMessage = render_message(Message, MessageTemplate),
pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
%%-------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------
-spec to_bin(atom() | string() | binary()) -> binary().
to_bin(A) when is_atom(A) ->
atom_to_binary(A);
to_bin(L) when is_list(L) ->
list_to_binary(L);
to_bin(B) when is_binary(B) ->
B.
-spec format_servers(binary()) -> [string()].
format_servers(Servers0) ->
Servers1 = emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS),
lists:map(
fun(#{scheme := Scheme, hostname := Host, port := Port}) ->
Scheme ++ "://" ++ Host ++ ":" ++ integer_to_list(Port)
end,
Servers1
).
-spec make_client_id(manager_id(), atom() | binary()) -> pulsar_client_id().
make_client_id(InstanceId, BridgeName) ->
case is_dry_run(InstanceId) of
true ->
pulsar_producer_probe;
false ->
ClientIdBin = iolist_to_binary([
<<"pulsar_producer:">>,
to_bin(BridgeName),
<<":">>,
to_bin(node())
]),
binary_to_atom(ClientIdBin)
end.
-spec is_dry_run(manager_id()) -> boolean().
is_dry_run(InstanceId) ->
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
case TestIdStart of
nomatch ->
false;
_ ->
string:equal(TestIdStart, InstanceId)
end.
conn_opts(#{authentication := none}) ->
#{};
conn_opts(#{authentication := #{username := Username, password := Password}}) ->
#{
auth_data => iolist_to_binary([Username, <<":">>, Password]),
auth_method_name => <<"basic">>
};
conn_opts(#{authentication := #{jwt := JWT}}) ->
#{
auth_data => JWT,
auth_method_name => <<"token">>
}.
-spec replayq_dir(pulsar_client_id()) -> string().
replayq_dir(ClientId) ->
filename:join([emqx:data_dir(), "pulsar", to_bin(ClientId)]).
-spec producer_name(pulsar_client_id()) -> atom().
producer_name(ClientId) ->
ClientIdBin = to_bin(ClientId),
binary_to_atom(
iolist_to_binary([
<<"producer-">>,
ClientIdBin
])
).
-spec start_producer(config(), manager_id(), pulsar_client_id(), map()) -> {ok, state()}.
start_producer(Config, InstanceId, ClientId, ClientOpts) ->
#{
conn_opts := ConnOpts,
ssl_opts := SSLOpts
} = ClientOpts,
#{
batch_size := BatchSize,
buffer := #{
mode := BufferMode,
per_partition_limit := PerPartitionLimit,
segment_bytes := SegmentBytes,
memory_overload_protection := MemOLP0
},
compression := Compression,
max_batch_bytes := MaxBatchBytes,
message := MessageTemplateOpts,
pulsar_topic := PulsarTopic0,
retention_period := RetentionPeriod,
send_buffer := SendBuffer,
strategy := Strategy,
sync_timeout := SyncTimeout
} = Config,
{OffloadMode, ReplayQDir} =
case BufferMode of
memory -> {false, false};
disk -> {false, replayq_dir(ClientId)};
hybrid -> {true, replayq_dir(ClientId)}
end,
MemOLP =
case os:type() of
{unix, linux} -> MemOLP0;
_ -> false
end,
ReplayQOpts = #{
replayq_dir => ReplayQDir,
replayq_offload_mode => OffloadMode,
replayq_max_total_bytes => PerPartitionLimit,
replayq_seg_bytes => SegmentBytes,
drop_if_highmem => MemOLP
},
ProducerName = producer_name(ClientId),
MessageTemplate = compile_message_template(MessageTemplateOpts),
ProducerOpts0 =
#{
batch_size => BatchSize,
compression => Compression,
conn_opts => ConnOpts,
max_batch_bytes => MaxBatchBytes,
name => ProducerName,
retention_period => RetentionPeriod,
ssl_opts => SSLOpts,
strategy => partition_strategy(Strategy),
tcp_opts => [{sndbuf, SendBuffer}]
},
ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0),
PulsarTopic = binary_to_list(PulsarTopic0),
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
{ok, Producers} ->
State = #{
pulsar_client_id => ClientId,
producers => Producers,
sync_timeout => SyncTimeout,
message_template => MessageTemplate
},
?tp(pulsar_producer_bridge_started, #{}),
{ok, State}
catch
Kind:Error:Stacktrace ->
?SLOG(error, #{
msg => "failed_to_start_pulsar_producer",
instance_id => InstanceId,
kind => Kind,
reason => Error,
stacktrace => Stacktrace
}),
stop_client(ClientId),
throw(failed_to_start_pulsar_producer)
end.
-spec stop_client(pulsar_client_id()) -> ok.
stop_client(ClientId) ->
_ = log_when_error(
fun() ->
ok = pulsar:stop_and_delete_supervised_client(ClientId),
?tp(pulsar_bridge_client_stopped, #{pulsar_client_id => ClientId}),
ok
end,
#{
msg => "failed_to_delete_pulsar_client",
pulsar_client_id => ClientId
}
),
ok.
-spec stop_producers(pulsar_client_id(), pulsar_producers:producers()) -> ok.
stop_producers(ClientId, Producers) ->
_ = log_when_error(
fun() ->
ok = pulsar:stop_and_delete_supervised_producers(Producers),
?tp(pulsar_bridge_producer_stopped, #{pulsar_client_id => ClientId}),
ok
end,
#{
msg => "failed_to_delete_pulsar_producer",
pulsar_client_id => ClientId
}
),
ok.
log_when_error(Fun, Log) ->
try
Fun()
catch
C:E ->
?SLOG(error, Log#{
exception => C,
reason => E
})
end.
-spec compile_message_template(message_template_raw()) -> message_template().
compile_message_template(TemplateOpts) ->
KeyTemplate = maps:get(key, TemplateOpts, <<"${.clientid}">>),
ValueTemplate = maps:get(value, TemplateOpts, <<"${.}">>),
#{
key => preproc_tmpl(KeyTemplate),
value => preproc_tmpl(ValueTemplate)
}.
preproc_tmpl(Template) ->
emqx_plugin_libs_rule:preproc_tmpl(Template).
render_message(
Message, #{key := KeyTemplate, value := ValueTemplate}
) ->
#{
key => render(Message, KeyTemplate),
value => render(Message, ValueTemplate)
}.
render(Message, Template) ->
Opts = #{
var_trans => fun
(undefined) -> <<"">>;
(X) -> emqx_plugin_libs_rule:bin(X)
end,
return => full_binary
},
emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts).
get_producer_status(Producers) ->
case pulsar_producers:all_connected(Producers) of
true -> connected;
false -> connecting
end.
partition_strategy(key_dispatch) -> first_key_dispatch;
partition_strategy(Strategy) -> Strategy.

View File

@ -0,0 +1,820 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_impl_producer_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>).
-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_pulsar]).
-define(RULE_TOPIC, "mqtt/rule").
-define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
[
{group, plain},
{group, tls}
].
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
OnlyOnceTCs = only_once_tests(),
TCs = AllTCs -- OnlyOnceTCs,
[
{plain, AllTCs},
{tls, TCs}
].
only_once_tests() ->
[t_create_via_http].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
_ = application:stop(emqx_connector),
ok.
init_per_group(plain = Type, Config) ->
PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"),
PulsarPort = list_to_integer(os:getenv("PULSAR_PLAIN_PORT", "6652")),
ProxyName = "pulsar_plain",
case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
true ->
Config1 = common_init_per_group(),
[
{proxy_name, ProxyName},
{pulsar_host, PulsarHost},
{pulsar_port, PulsarPort},
{pulsar_type, Type},
{use_tls, false}
| Config1 ++ Config
];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_pulsar);
_ ->
{skip, no_pulsar}
end
end;
init_per_group(tls = Type, Config) ->
PulsarHost = os:getenv("PULSAR_TLS_HOST", "toxiproxy"),
PulsarPort = list_to_integer(os:getenv("PULSAR_TLS_PORT", "6653")),
ProxyName = "pulsar_tls",
case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
true ->
Config1 = common_init_per_group(),
[
{proxy_name, ProxyName},
{pulsar_host, PulsarHost},
{pulsar_port, PulsarPort},
{pulsar_type, Type},
{use_tls, true}
| Config1 ++ Config
];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_pulsar);
_ ->
{skip, no_pulsar}
end
end;
init_per_group(_Group, Config) ->
Config.
end_per_group(Group, Config) when
Group =:= plain;
Group =:= tls
->
common_end_per_group(Config),
ok;
end_per_group(_Group, _Config) ->
ok.
common_init_per_group() ->
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
application:load(emqx_bridge),
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps(?APPS),
{ok, _} = application:ensure_all_started(emqx_connector),
emqx_mgmt_api_test_util:init_suite(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
[
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{mqtt_topic, MQTTTopic}
].
common_end_per_group(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
delete_all_bridges(),
ok.
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config).
end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
delete_all_bridges(),
stop_consumer(Config),
%% in CI, apparently this needs more time since the
%% machines struggle with all the containers running...
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok
end.
common_init_per_testcase(TestCase, Config0) ->
ct:timetrap(timer:seconds(60)),
delete_all_bridges(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
PulsarTopic =
<<
(atom_to_binary(TestCase))/binary,
UniqueNum/binary
>>,
PulsarType = ?config(pulsar_type, Config0),
Config1 = [{pulsar_topic, PulsarTopic} | Config0],
{Name, ConfigString, PulsarConfig} = pulsar_config(
TestCase, PulsarType, Config1
),
ConsumerConfig = start_consumer(TestCase, Config1),
Config = ConsumerConfig ++ Config1,
ok = snabbkaffe:start_trace(),
[
{pulsar_name, Name},
{pulsar_config_string, ConfigString},
{pulsar_config, PulsarConfig}
| Config
].
delete_all_bridges() ->
lists:foreach(
fun(#{name := Name, type := Type}) ->
emqx_bridge:remove(Type, Name)
end,
emqx_bridge:list()
).
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
pulsar_config(TestCase, _PulsarType, Config) ->
UniqueNum = integer_to_binary(erlang:unique_integer()),
PulsarHost = ?config(pulsar_host, Config),
PulsarPort = ?config(pulsar_port, Config),
PulsarTopic = ?config(pulsar_topic, Config),
AuthType = proplists:get_value(sasl_auth_mechanism, Config, none),
UseTLS = proplists:get_value(use_tls, Config, false),
Name = <<
(atom_to_binary(TestCase))/binary, UniqueNum/binary
>>,
MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>),
Prefix =
case UseTLS of
true -> <<"pulsar+ssl://">>;
false -> <<"pulsar://">>
end,
ServerURL = iolist_to_binary([
Prefix,
PulsarHost,
":",
integer_to_binary(PulsarPort)
]),
ConfigString =
io_lib:format(
"bridges.pulsar_producer.~s {\n"
" enable = true\n"
" servers = \"~s\"\n"
" sync_timeout = 5s\n"
" compression = no_compression\n"
" send_buffer = 1MB\n"
" retention_period = infinity\n"
" max_batch_bytes = 900KB\n"
" batch_size = 1\n"
" strategy = random\n"
" buffer {\n"
" mode = memory\n"
" per_partition_limit = 10MB\n"
" segment_bytes = 5MB\n"
" memory_overload_protection = true\n"
" }\n"
" message {\n"
" key = \"${.clientid}\"\n"
" value = \"${.}\"\n"
" }\n"
"~s"
" ssl {\n"
" enable = ~p\n"
" verify = verify_none\n"
" server_name_indication = \"auto\"\n"
" }\n"
" pulsar_topic = \"~s\"\n"
" local_topic = \"~s\"\n"
"}\n",
[
Name,
ServerURL,
authentication(AuthType),
UseTLS,
PulsarTopic,
MQTTTopic
]
),
{Name, ConfigString, parse_and_check(ConfigString, Name)}.
parse_and_check(ConfigString, Name) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
TypeBin = ?BRIDGE_TYPE_BIN,
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
#{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
Config.
authentication(_) ->
" authentication = none\n".
resource_id(Config) ->
Type = ?BRIDGE_TYPE_BIN,
Name = ?config(pulsar_name, Config),
emqx_bridge_resource:resource_id(Type, Name).
create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
Type = ?BRIDGE_TYPE_BIN,
Name = ?config(pulsar_name, Config),
PulsarConfig0 = ?config(pulsar_config, Config),
PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides),
emqx_bridge:create(Type, Name, PulsarConfig).
create_bridge_api(Config) ->
create_bridge_api(Config, _Overrides = #{}).
create_bridge_api(Config, Overrides) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(pulsar_name, Config),
PulsarConfig0 = ?config(pulsar_config, Config),
PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides),
Params = PulsarConfig#{<<"type">> => TypeBin, <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("creating bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
Error ->
Error
end,
ct:pal("bridge create result: ~p", [Res]),
Res.
update_bridge_api(Config) ->
update_bridge_api(Config, _Overrides = #{}).
update_bridge_api(Config, Overrides) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(pulsar_name, Config),
PulsarConfig0 = ?config(pulsar_config, Config),
PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides),
BridgeId = emqx_bridge_resource:bridge_id(TypeBin, Name),
Params = PulsarConfig#{<<"type">> => TypeBin, <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("updating bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Params, Opts) of
{ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])};
Error -> Error
end,
ct:pal("bridge update result: ~p", [Res]),
Res.
probe_bridge_api(Config) ->
probe_bridge_api(Config, _Overrides = #{}).
probe_bridge_api(Config, Overrides) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(pulsar_name, Config),
PulsarConfig = ?config(pulsar_config, Config),
Params0 = PulsarConfig#{<<"type">> => TypeBin, <<"name">> => Name},
Params = maps:merge(Params0, Overrides),
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("probing bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
Error -> Error
end,
ct:pal("bridge probe result: ~p", [Res]),
Res.
start_consumer(TestCase, Config) ->
PulsarHost = ?config(pulsar_host, Config),
PulsarPort = ?config(pulsar_port, Config),
PulsarTopic = ?config(pulsar_topic, Config),
UseTLS = ?config(use_tls, Config),
%% FIXME: patch pulsar to accept binary urls...
Scheme =
case UseTLS of
true -> <<"pulsar+ssl://">>;
false -> <<"pulsar://">>
end,
URL =
binary_to_list(
<<Scheme/binary, (list_to_binary(PulsarHost))/binary, ":",
(integer_to_binary(PulsarPort))/binary>>
),
ConnOpts = #{},
ConsumerClientId = TestCase,
CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
SSLOpts = #{
enable => UseTLS,
keyfile => filename:join([CertsPath, "key.pem"]),
certfile => filename:join([CertsPath, "cert.pem"]),
cacertfile => filename:join([CertsPath, "cacert.pem"])
},
{ok, _ClientPid} = pulsar:ensure_supervised_client(
ConsumerClientId,
[URL],
#{
conn_opts => ConnOpts,
ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)
}
),
ConsumerOpts = #{
cb_init_args => #{send_to => self()},
cb_module => pulsar_echo_consumer,
sub_type => 'Shared',
subscription => atom_to_list(TestCase),
max_consumer_num => 1,
%% Note! This must not coincide with the client
%% id, or else weird bugs will happen, like the
%% consumer never starts...
name => test_consumer,
consumer_id => 1,
conn_opts => ConnOpts
},
{ok, Consumer} = pulsar:ensure_supervised_consumers(
ConsumerClientId,
PulsarTopic,
ConsumerOpts
),
%% since connection is async, and there's currently no way to
%% specify the subscription initial position as `Earliest', we
%% need to wait until the consumer is connected to avoid
%% flakiness.
ok = wait_until_consumer_connected(Consumer),
[
{consumer_client_id, ConsumerClientId},
{pulsar_consumer, Consumer}
].
stop_consumer(Config) ->
ConsumerClientId = ?config(consumer_client_id, Config),
Consumer = ?config(pulsar_consumer, Config),
ok = pulsar:stop_and_delete_supervised_consumers(Consumer),
ok = pulsar:stop_and_delete_supervised_client(ConsumerClientId),
ok.
wait_until_consumer_connected(Consumer) ->
?retry(
_Sleep = 300,
_Attempts0 = 20,
true = pulsar_consumers:all_connected(Consumer)
),
ok.
wait_until_producer_connected() ->
wait_until_connected(pulsar_producers_sup, pulsar_producer).
wait_until_connected(SupMod, Mod) ->
Pids = [
P
|| {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod),
P <- element(2, process_info(SupPid, links)),
case proc_lib:initial_call(P) of
{Mod, init, _} -> true;
_ -> false
end
],
?retry(
_Sleep = 300,
_Attempts0 = 20,
lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
),
ok.
create_rule_and_action_http(Config) ->
PulsarName = ?config(pulsar_name, Config),
BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, PulsarName),
Params = #{
enable => true,
sql => <<"SELECT * FROM \"", ?RULE_TOPIC, "\"">>,
actions => [BridgeId]
},
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
ct:pal("rule action params: ~p", [Params]),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
Error -> Error
end.
receive_consumed(Timeout) ->
receive
{pulsar_message, #{payloads := Payloads}} ->
lists:map(fun try_decode_json/1, Payloads)
after Timeout ->
ct:pal("mailbox: ~p", [process_info(self(), messages)]),
ct:fail("no message consumed")
end.
try_decode_json(Payload) ->
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
{error, _} ->
Payload;
{ok, JSON} ->
JSON
end.
cluster(Config) ->
PrivDataDir = ?config(priv_dir, Config),
PeerModule =
case os:getenv("IS_CI") of
false ->
slave;
_ ->
ct_slave
end,
Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core],
[
{apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]},
{listener_ports, []},
{peer_mod, PeerModule},
{priv_data_dir, PrivDataDir},
{load_schema, true},
{start_autocluster, true},
{schema_mod, emqx_ee_conf_schema},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, [broker, router]),
ok;
(emqx_conf) ->
ok;
(_) ->
ok
end}
]
),
ct:pal("cluster: ~p", [Cluster]),
Cluster.
start_cluster(Cluster) ->
Nodes =
[
emqx_common_test_helpers:start_slave(Name, Opts)
|| {Name, Opts} <- Cluster
],
on_exit(fun() ->
emqx_utils:pmap(
fun(N) ->
ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N)
end,
Nodes
)
end),
Nodes.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_start_and_produce_ok(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
QoS = 0,
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
?check_trace(
begin
?assertMatch(
{ok, _},
create_bridge(Config)
),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
%% Publish using local topic.
Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
emqx:publish(Message0),
%% Publish using rule engine.
Message1 = emqx_message:make(ClientId, QoS, ?RULE_TOPIC_BIN, Payload),
emqx:publish(Message1),
#{rule_id => RuleId}
end,
fun(#{rule_id := RuleId}, _Trace) ->
Data0 = receive_consumed(5_000),
?assertMatch(
[
#{
<<"clientid">> := ClientId,
<<"event">> := <<"message.publish">>,
<<"payload">> := Payload,
<<"topic">> := MQTTTopic
}
],
Data0
),
Data1 = receive_consumed(5_000),
?assertMatch(
[
#{
<<"clientid">> := ClientId,
<<"event">> := <<"message.publish">>,
<<"payload">> := Payload,
<<"topic">> := ?RULE_TOPIC_BIN
}
],
Data1
),
?retry(
_Sleep = 100,
_Attempts0 = 20,
begin
?assertMatch(
#{
counters := #{
dropped := 0,
failed := 0,
late_reply := 0,
matched := 2,
received := 0,
retried := 0,
success := 2
}
},
emqx_resource_manager:get_metrics(ResourceId)
),
?assertEqual(
1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')
),
?assertEqual(
0, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.failed')
),
ok
end
),
ok
end
),
ok.
%% Under normal operations, the bridge will be called async via
%% `simple_async_query'.
t_sync_query(Config) ->
ResourceId = resource_id(Config),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
Message = {send_message, #{payload => Payload}},
?assertMatch(
{ok, #{sequence_id := _}}, emqx_resource:simple_sync_query(ResourceId, Message)
),
ok
end,
[]
),
ok.
t_create_via_http(Config) ->
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
%% lightweight matrix testing some configs
?assertMatch(
{ok, _},
update_bridge_api(
Config,
#{
<<"buffer">> =>
#{<<"mode">> => <<"disk">>}
}
)
),
?assertMatch(
{ok, _},
update_bridge_api(
Config,
#{
<<"buffer">> =>
#{
<<"mode">> => <<"hybrid">>,
<<"memory_overload_protection">> => true
}
}
)
),
ok
end,
[]
),
ok.
t_start_stop(Config) ->
PulsarName = ?config(pulsar_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch(
{ok, _},
create_bridge(Config)
),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
%% Check that the bridge probe API doesn't leak atoms.
redbug:start(
[
"emqx_resource_manager:health_check_interval -> return",
"emqx_resource_manager:with_health_check -> return"
],
[{msgs, 100}, {time, 30_000}]
),
ProbeRes0 = probe_bridge_api(
Config,
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms.
ProbeRes1 = probe_bridge_api(
Config,
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
%% Now stop the bridge.
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_bridge:disable_enable(disable, ?BRIDGE_TYPE_BIN, PulsarName),
#{?snk_kind := pulsar_bridge_stopped},
5_000
)
),
ok
end,
fun(Trace) ->
%% one for each probe, one for real
?assertMatch([_, _, _], ?of_kind(pulsar_bridge_producer_stopped, Trace)),
?assertMatch([_, _, _], ?of_kind(pulsar_bridge_client_stopped, Trace)),
?assertMatch([_, _, _], ?of_kind(pulsar_bridge_stopped, Trace)),
ok
end
),
ok.
t_on_get_status(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
ct:sleep(500),
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
end),
%% Check that it recovers itself.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
ok.
t_cluster(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
Cluster = cluster(Config),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
QoS = 0,
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
?check_trace(
begin
Nodes = [N1, N2 | _] = start_cluster(Cluster),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := pulsar_producer_bridge_started}),
length(Nodes),
15_000
),
{ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end),
{ok, _} = snabbkaffe:receive_events(SRef0),
lists:foreach(
fun(N) ->
?retry(
_Sleep = 1_000,
_Attempts0 = 20,
?assertEqual(
{ok, connected},
erpc:call(N, emqx_resource_manager, health_check, [ResourceId]),
#{node => N}
)
)
end,
Nodes
),
erpc:multicall(Nodes, fun wait_until_producer_connected/0),
Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
erpc:call(N2, emqx, publish, [Message0]),
lists:foreach(
fun(N) ->
?assertEqual(
{ok, connected},
erpc:call(N, emqx_resource_manager, health_check, [ResourceId]),
#{node => N}
)
end,
Nodes
),
ok
end,
fun(_Trace) ->
Data0 = receive_consumed(10_000),
?assertMatch(
[
#{
<<"clientid">> := ClientId,
<<"event">> := <<"message.publish">>,
<<"payload">> := Payload,
<<"topic">> := MQTTTopic
}
],
Data0
),
ok
end
),
ok.

View File

@ -0,0 +1,25 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(pulsar_echo_consumer).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% pulsar consumer API
-export([init/2, handle_message/3]).
init(Topic, Args) ->
ct:pal("consumer init: ~p", [#{topic => Topic, args => Args}]),
SendTo = maps:get(send_to, Args),
?tp(pulsar_echo_consumer_init, #{topic => Topic}),
{ok, #{topic => Topic, send_to => SendTo}}.
handle_message(Message, Payloads, State) ->
#{send_to := SendTo, topic := Topic} = State,
ct:pal(
"pulsar consumer received:\n ~p",
[#{message => Message, payloads => Payloads}]
),
SendTo ! {pulsar_message, #{topic => Topic, message => Message, payloads => Payloads}},
?tp(pulsar_echo_consumer_message, #{topic => Topic, message => Message, payloads => Payloads}),
{ok, 'Individual', State}.

View File

@ -67,7 +67,17 @@ on_start(
connector => InstId,
config => emqx_utils:redact(Config)
}),
Servers = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS),
Servers1 = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS),
Servers =
lists:map(
fun
(#{hostname := Host, port := Port0}) ->
{Host, Port0};
(#{hostname := Host}) ->
Host
end,
Servers1
),
SslOpts =
case maps:get(enable, SSL) of
true ->

View File

@ -537,4 +537,9 @@ format_hosts(Hosts) ->
lists:map(fun format_host/1, Hosts).
parse_servers(HoconValue) ->
emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS).
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS)
).

View File

@ -98,7 +98,7 @@ on_start(
ssl := SSL
} = Config
) ->
{Host, Port} = emqx_schema:parse_server(Server, ?MYSQL_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MYSQL_HOST_OPTIONS),
?SLOG(info, #{
msg => "starting_mysql_connector",
connector => InstId,

View File

@ -91,7 +91,7 @@ on_start(
ssl := SSL
} = Config
) ->
{Host, Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
?SLOG(info, #{
msg => "starting_postgresql_connector",
connector => InstId,

View File

@ -131,7 +131,13 @@ on_start(
_ -> servers
end,
Servers0 = maps:get(ConfKey, Config),
Servers = [{servers, emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)}],
Servers1 = lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)
),
Servers = [{servers, Servers1}],
Database =
case Type of
cluster -> [];

View File

@ -293,4 +293,5 @@ qos() ->
hoconsc:union([emqx_schema:qos(), binary()]).
parse_server(Str) ->
emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS).
#{hostname := Host, port := Port} = emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS),
{Host, Port}.

View File

@ -165,8 +165,13 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
create_dry_run(ResourceType, Config) ->
ResId = make_test_id(),
MgrId = set_new_owner(ResId),
Opts =
case is_map(Config) of
true -> maps:get(resource_opts, Config, #{});
false -> #{}
end,
ok = emqx_resource_manager_sup:ensure_child(
MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{}
MgrId, ResId, <<"dry_run">>, ResourceType, Config, Opts
),
case wait_for_ready(ResId, 5000) of
ok ->

View File

@ -80,7 +80,7 @@ init(Conf) ->
flush_time_interval := FlushTimeInterval
} = Conf,
FlushTimeInterval1 = flush_interval(FlushTimeInterval, SampleTimeInterval),
{Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS),
Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw),
Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}],
{ok, Pid} = estatsd:start_link(Opts),

View File

@ -0,0 +1 @@
Implement Pulsar Producer Bridge, which supports publishing messages to Pulsar from MQTT topics.

View File

@ -9,7 +9,9 @@
telemetry,
emqx_bridge_kafka,
emqx_bridge_gcp_pubsub,
emqx_bridge_opents
emqx_bridge_cassandra,
emqx_bridge_opents,
emqx_bridge_pulsar
]},
{env, []},
{modules, []},

View File

@ -36,7 +36,8 @@ api_schemas(Method) ->
ref(emqx_ee_bridge_dynamo, Method),
ref(emqx_ee_bridge_rocketmq, Method),
ref(emqx_ee_bridge_sqlserver, Method),
ref(emqx_bridge_opents, Method)
ref(emqx_bridge_opents, Method),
ref(emqx_bridge_pulsar, Method ++ "_producer")
].
schema_modules() ->
@ -57,7 +58,8 @@ schema_modules() ->
emqx_ee_bridge_dynamo,
emqx_ee_bridge_rocketmq,
emqx_ee_bridge_sqlserver,
emqx_bridge_opents
emqx_bridge_opents,
emqx_bridge_pulsar
].
examples(Method) ->
@ -97,7 +99,8 @@ resource_type(clickhouse) -> emqx_ee_connector_clickhouse;
resource_type(dynamo) -> emqx_ee_connector_dynamo;
resource_type(rocketmq) -> emqx_ee_connector_rocketmq;
resource_type(sqlserver) -> emqx_ee_connector_sqlserver;
resource_type(opents) -> emqx_bridge_opents_connector.
resource_type(opents) -> emqx_bridge_opents_connector;
resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer.
fields(bridges) ->
[
@ -165,7 +168,8 @@ fields(bridges) ->
required => false
}
)}
] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++
] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++
redis_structs() ++
pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs().
mongodb_structs() ->
@ -202,6 +206,18 @@ kafka_structs() ->
)}
].
pulsar_structs() ->
[
{pulsar_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_pulsar, pulsar_producer)),
#{
desc => <<"Pulsar Producer Bridge Config">>,
required => false
}
)}
].
influxdb_structs() ->
[
{Protocol,

View File

@ -449,9 +449,14 @@ all_test_hosts() ->
).
parse_servers(Servers) ->
emqx_schema:parse_servers(Servers, #{
default_port => 6379
}).
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(Servers, #{
default_port => 6379
})
).
redis_connect_ssl_opts(Type) ->
maps:merge(

View File

@ -92,7 +92,7 @@ on_start(
}),
{Schema, Server} = get_host_schema(to_str(Url)),
{Host, Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS),
Options = [
{config, #{

View File

@ -294,7 +294,7 @@ client_config(
server := Server
}
) ->
{Host, Port} = emqx_schema:parse_server(Server, ?INFLUXDB_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?INFLUXDB_HOST_OPTIONS),
[
{host, str(Host)},
{port, Port},

View File

@ -105,7 +105,7 @@ on_start(
config => redact(Config1)
}),
Config = maps:merge(default_security_info(), Config1),
{Host, Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS),
Server1 = [{Host, Port}],
ClientId = client_id(InstanceId),

View File

@ -355,7 +355,7 @@ conn_str([], Acc) ->
conn_str([{driver, Driver} | Opts], Acc) ->
conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]);
conn_str([{server, Server} | Opts], Acc) ->
{Host, Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS),
conn_str(Opts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]);
conn_str([{database, Database} | Opts], Acc) ->
conn_str(Opts, ["Database=" ++ str(Database) | Acc]);

View File

@ -96,7 +96,7 @@ on_start(
config => emqx_utils:redact(Config)
}),
{Host, Port} = emqx_schema:parse_server(Server, ?TD_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?TD_HOST_OPTIONS),
Options = [
{host, to_bin(Host)},
{port, Port},

View File

@ -169,7 +169,8 @@ defmodule EMQXUmbrella.MixProject do
:emqx_bridge_redis,
:emqx_bridge_rocketmq,
:emqx_bridge_tdengine,
:emqx_bridge_timescale
:emqx_bridge_timescale,
:emqx_bridge_pulsar
])
end
@ -360,6 +361,7 @@ defmodule EMQXUmbrella.MixProject do
emqx_ee_connector: :permanent,
emqx_ee_bridge: :permanent,
emqx_bridge_kafka: :permanent,
emqx_bridge_pulsar: :permanent,
emqx_bridge_gcp_pubsub: :permanent,
emqx_bridge_cassandra: :permanent,
emqx_bridge_opents: :permanent,

View File

@ -454,6 +454,7 @@ relx_apps_per_edition(ee) ->
emqx_ee_connector,
emqx_ee_bridge,
emqx_bridge_kafka,
emqx_bridge_pulsar,
emqx_bridge_gcp_pubsub,
emqx_bridge_cassandra,
emqx_bridge_opents,

View File

@ -0,0 +1,175 @@
emqx_bridge_pulsar {
auth_basic {
desc = "Parameters for basic authentication."
label = "Basic auth params"
}
auth_basic_password {
desc = "Basic authentication password."
label = "Password"
}
auth_basic_username {
desc = "Basic authentication username."
label = "Username"
}
auth_token {
desc = "Parameters for token authentication."
label = "Token auth params"
}
auth_token_jwt {
desc = "JWT authentication token."
label = "JWT"
}
authentication {
desc = "Authentication configs."
label = "Authentication"
}
buffer_memory_overload_protection {
desc = "Applicable when buffer mode is set to <code>memory</code>\n"
"EMQX will drop old buffered messages under high memory pressure."
" The high memory threshold is defined in config <code>sysmon.os.sysmem_high_watermark</code>."
" NOTE: This config only works on Linux."
label = "Memory Overload Protection"
}
buffer_mode {
desc = "Message buffer mode.\n"
"<code>memory</code>: Buffer all messages in memory. The messages will be lost"
" in case of EMQX node restart\n<code>disk</code>: Buffer all messages on disk."
" The messages on disk are able to survive EMQX node restart.\n"
"<code>hybrid</code>: Buffer message in memory first, when up to certain limit"
" (see <code>segment_bytes</code> config for more information), then start offloading"
" messages to disk, Like <code>memory</code> mode, the messages will be lost in"
" case of EMQX node restart."
label = "Buffer Mode"
}
buffer_per_partition_limit {
desc = "Number of bytes allowed to buffer for each Pulsar partition."
" When this limit is exceeded, old messages will be dropped in a trade for credits"
" for new messages to be buffered."
label = "Per-partition Buffer Limit"
}
buffer_segment_bytes {
desc = "Applicable when buffer mode is set to <code>disk</code> or <code>hybrid</code>.\n"
"This value is to specify the size of each on-disk buffer file."
label = "Segment File Bytes"
}
config_enable {
desc = "Enable (true) or disable (false) this Pulsar bridge."
label = "Enable or Disable"
}
desc_name {
desc = "Bridge name, used as a human-readable description of the bridge."
label = "Bridge Name"
}
desc_type {
desc = "The Bridge Type"
label = "Bridge Type"
}
producer_batch_size {
desc = "Maximum number of individual requests to batch in a Pulsar message."
label = "Batch size"
}
producer_buffer {
desc = "Configure producer message buffer.\n\n"
"Tell Pulsar producer how to buffer messages when EMQX has more messages to"
" send than Pulsar can keep up, or when Pulsar is down."
label = "Message Buffer"
}
producer_compression {
desc = "Compression method."
label = "Compression"
}
producer_key_template {
desc = "Template to render Pulsar message key."
label = "Message Key"
}
producer_local_topic {
desc = "MQTT topic or topic filter as data source (bridge input)."
" If rule action is used as data source, this config should be left empty,"
" otherwise messages will be duplicated in Pulsar."
label = "Source MQTT Topic"
}
producer_max_batch_bytes {
desc = "Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers"
" default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in"
" order to compensate Pulsar message encoding overheads (especially when each individual"
" message is very small). When a single message is over the limit, it is still"
" sent (as a single element batch)."
label = "Max Batch Bytes"
}
producer_message_opts {
desc = "Template to render a Pulsar message."
label = "Pulsar Message Template"
}
producer_pulsar_message {
desc = "Template to render a Pulsar message."
label = "Pulsar Message Template"
}
producer_pulsar_topic {
desc = "Pulsar topic name"
label = "Pulsar topic name"
}
producer_retention_period {
desc = "The amount of time messages will be buffered while there is no connection to"
" the Pulsar broker. Longer times mean that more memory/disk will be used"
label = "Retention Period"
}
producer_send_buffer {
desc = "Fine tune the socket send buffer. The default value is tuned for high throughput."
label = "Socket Send Buffer Size"
}
producer_strategy {
desc = "Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.\n"
"\n"
"<code>random</code>: Randomly pick a partition for each message.\n"
"<code>roundrobin</code>: Pick each available producer in turn for each message.\n"
"<code>key_dispatch</code>: Hash Pulsar message key of the first message in a batch"
" to a partition number."
label = "Partition Strategy"
}
producer_sync_timeout {
desc = "Maximum wait time for receiving a receipt from Pulsar when publishing synchronously."
label = "Sync publish timeout"
}
producer_value_template {
desc = "Template to render Pulsar message value."
label = "Message Value"
}
pulsar_producer_struct {
desc = "Configuration for a Pulsar bridge."
label = "Pulsar Bridge Configuration"
}
servers {
desc = "A comma separated list of Pulsar URLs in the form <code>scheme://host[:port]</code>"
" for the client to connect to. The supported schemes are <code>pulsar://</code> (default)"
" and <code>pulsar+ssl://</code>. The default port is 6650."
label = "Servers"
}
}

View File

@ -0,0 +1,173 @@
emqx_bridge_pulsar {
pulsar_producer_struct {
desc = "Pulsar 桥接配置"
label = "Pulsar 桥接配置"
}
desc_type {
desc = "桥接类型"
label = "桥接类型"
}
desc_name {
desc = "桥接名字,可读描述"
label = "桥接名字"
}
config_enable {
desc = "启用true或停用false该 Pulsar 数据桥接。"
label = "启用或停用"
}
servers {
desc = "以逗号分隔的 <code>scheme://host[:port]</code> 格式的 Pulsar URL 列表,"
"支持的 scheme 有 <code>pulsar://</code> (默认)"
"和<code>pulsar+ssl://</code>。默认的端口是6650。"
label = "服务员"
}
authentication {
desc = "认证参数。"
label = "认证"
}
producer_batch_size {
desc = "在一个Pulsar消息中批处理的单个请求的最大数量。"
label = "批量大小"
}
producer_compression {
desc = "压缩方法。"
label = "压缩"
}
producer_send_buffer {
desc = "TCP socket 的发送缓存调优。默认值是针对高吞吐量的一个推荐值。"
label = "Socket 发送缓存大小"
}
producer_sync_timeout {
desc = "同步发布时从Pulsar接收发送回执的最长等待时间。"
label = "同步发布超时"
}
auth_basic_username {
desc = "基本认证用户名。"
label = "用户名"
}
auth_basic_password {
desc = "基本认证密码。"
label = "密码"
}
auth_token_jwt {
desc = "JWT认证令牌。"
label = "JWT"
}
producer_max_batch_bytes {
desc = "最大消息批量字节数。"
"大多数 Pulsar 环境的默认最低值是 5 MBEMQX 的默认值比 5 MB 更小是因为需要"
"补偿 Pulsar 消息编码所需要的额外字节(尤其是当每条消息都很小的情况下)。"
"当单个消息的大小超过该限制时,它仍然会被发送,(相当于该批量中只有单个消息)。"
label = "最大批量字节数"
}
producer_retention_period {
desc = "当没有连接到Pulsar代理时信息将被缓冲的时间。 较长的时间意味着将使用更多的内存/磁盘"
label = "保留期"
}
producer_local_topic {
desc = "MQTT 主题数据源由桥接指定,或留空由规则动作指定。"
label = "源 MQTT 主题"
}
producer_pulsar_topic {
desc = "Pulsar 主题名称"
label = "Pulsar 主题名称"
}
producer_strategy {
desc = "设置消息发布时应该如何选择 Pulsar 分区。\n\n"
"<code>random</code>: 为每个消息随机选择一个分区。\n"
"<code>roundrobin</code>: 依次为每条信息挑选可用的生产商。\n"
"<code>key_dispatch</code>: 将一批信息中的第一条信息的Pulsar信息密钥哈希到一个分区编号。"
label = "分区选择策略"
}
producer_buffer {
desc = "配置消息缓存的相关参数。\n\n"
"当 EMQX 需要发送的消息超过 Pulsar 处理能力,或者当 Pulsar 临时下线时EMQX 内部会将消息缓存起来。"
label = "消息缓存"
}
buffer_mode {
desc = "消息缓存模式。\n"
"<code>memory</code>: 所有的消息都缓存在内存里。如果 EMQX 服务重启,缓存的消息会丢失。\n"
"<code>disk</code>: 缓存到磁盘上。EMQX 重启后会继续发送重启前未发送完成的消息。\n"
"<code>hybrid</code>: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制"
"(配置项 <code>segment_bytes</code> 描述了该限制)后,后续的消息会缓存到磁盘上。"
"与 <code>memory</code> 模式一样,如果 EMQX 服务重启,缓存的消息会丢失。"
label = "缓存模式"
}
buffer_per_partition_limit {
desc = "为每个 Pulsar 分区设置的最大缓存字节数。当超过这个上限之后,老的消息会被丢弃,"
"为新的消息腾出空间。"
label = "Pulsar 分区缓存上限"
}
buffer_segment_bytes {
desc = "当缓存模式是 <code>disk</code> 或 <code>hybrid</code> 时适用。"
"该配置用于指定缓存到磁盘上的文件的大小。"
label = "缓存文件大小"
}
buffer_memory_overload_protection {
desc = "缓存模式是 <code>memory</code> 或 <code>hybrid</code> 时适用。"
"当系统处于高内存压力时,从队列中丢弃旧的消息以减缓内存增长。"
"内存压力值由配置项 <code>sysmon.os.sysmem_high_watermark</code> 决定。"
"注意,该配置仅在 Linux 系统中有效。"
label = "内存过载保护"
}
producer_message_opts {
desc = "用于生成 Pulsar 消息的模版。"
label = "Pulsar 消息模版"
}
producer_key_template {
desc = "生成 Pulsar 消息 Key 的模版。"
label = "消息的 Key"
}
producer_value_template {
desc = "生成 Pulsar 消息 Value 的模版。"
label = "消息的 Value"
}
auth_basic {
desc = "基本认证的参数。"
label = "基本认证参数"
}
auth_token {
desc = "令牌认证的参数。"
label = "Token auth params"
}
producer_buffer {
desc = "配置消息缓存的相关参数。\n\n"
"当 EMQX 需要发送的消息超过 Pulsar 处理能力,或者当 Pulsar 临时下线时EMQX 内部会将消息缓存起来。"
label = "消息缓存"
}
producer_pulsar_message {
desc = "用于生成 Pulsar 消息的模版。"
label = "Pulsar 消息模版"
}
}

View File

@ -191,6 +191,9 @@ for dep in ${CT_DEPS}; do
opents)
FILES+=( '.ci/docker-compose-file/docker-compose-opents.yaml' )
;;
pulsar)
FILES+=( '.ci/docker-compose-file/docker-compose-pulsar.yaml' )
;;
*)
echo "unknown_ct_dependency $dep"
exit 1