From 7f1b4cef271d85a1470e6742dfa991822c4632c0 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 20 Feb 2024 09:36:01 +0800 Subject: [PATCH] feat: pulsar bridge v2 --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + apps/emqx_bridge/src/emqx_bridge_api.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 8 +- .../src/schema/emqx_bridge_enterprise.erl | 2 +- .../src/emqx_bridge_mqtt_connector_schema.erl | 2 +- .../src/emqx_bridge_mqtt_pubsub_schema.erl | 2 +- .../src/emqx_bridge_pulsar.app.src | 2 +- .../src/emqx_bridge_pulsar.erl | 48 ++- .../src/emqx_bridge_pulsar_action_info.erl | 54 ++++ ...r.erl => emqx_bridge_pulsar_connector.erl} | 144 +++++---- .../emqx_bridge_pulsar_connector_schema.erl | 71 +++++ .../src/emqx_bridge_pulsar_pubsub_schema.erl | 123 ++++++++ ...=> emqx_bridge_pulsar_connector_SUITE.erl} | 41 ++- .../emqx_bridge_rabbitmq_pubsub_schema.erl | 2 +- .../src/schema/emqx_connector_ee_schema.erl | 14 + .../src/schema/emqx_connector_schema.erl | 3 + rel/i18n/emqx_bridge_pulsar.hocon | 291 +++++++++--------- .../emqx_bridge_pulsar_pubsub_schema.hocon | 38 +++ 18 files changed, 584 insertions(+), 264 deletions(-) create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl rename apps/emqx_bridge_pulsar/src/{emqx_bridge_pulsar_impl_producer.erl => emqx_bridge_pulsar_connector.erl} (81%) create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl rename apps/emqx_bridge_pulsar/test/{emqx_bridge_pulsar_impl_producer_SUITE.erl => emqx_bridge_pulsar_connector_SUITE.erl} (97%) create mode 100644 rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 12fda5d51..36ac10716 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -110,6 +110,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_es_action_info, emqx_bridge_opents_action_info, emqx_bridge_rabbitmq_action_info, + emqx_bridge_pulsar_action_info, emqx_bridge_greptimedb_action_info, emqx_bridge_tdengine_action_info, emqx_bridge_s3_action_info diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 35b964b83..69b17a843 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -761,7 +761,7 @@ is_bridge_enabled(BridgeType, BridgeName) -> end. is_bridge_enabled_v1(BridgeType, BridgeName) -> - %% we read from the transalted config because the defaults are populated here. + %% we read from the translated config because the defaults are populated here. try emqx:get_config([bridges, BridgeType, binary_to_existing_atom(BridgeName)]) of ConfMap -> maps:get(enable, ConfMap, false) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 56fe0029a..be20b9a7b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1659,8 +1659,11 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> connector_conf := ConnectorRawConf, bridge_v2_type := BridgeV2Type, bridge_v2_name := _BridgeName, - bridge_v2_conf := BridgeV2RawConf + bridge_v2_conf := BridgeV2RawConf0 } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), + BridgeV2RawConf = emqx_action_info:action_convert_from_connector( + BridgeType, ConnectorRawConf, BridgeV2RawConf0 + ), create_dry_run_helper( ensure_atom_root_key(ConfRootKey), BridgeV2Type, ConnectorRawConf, BridgeV2RawConf ) @@ -1928,7 +1931,8 @@ convert_from_connectors(ConfRootKey, Conf) -> convert_from_connector(ConfRootKey, Type, Name, Action = #{<<"connector">> := ConnectorName}) -> case get_connector_info(ConnectorName, Type) of {ok, Connector} -> - Action1 = emqx_action_info:action_convert_from_connector(Type, Connector, Action), + TypeAtom = to_existing_atom(Type), + Action1 = emqx_action_info:action_convert_from_connector(TypeAtom, Connector, Action), {ok, Action1}; {error, not_found} -> {error, #{ diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index faac94dcb..233d87fa1 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -123,7 +123,7 @@ resource_type(dynamo) -> emqx_bridge_dynamo_connector; resource_type(rocketmq) -> emqx_bridge_rocketmq_connector; resource_type(sqlserver) -> emqx_bridge_sqlserver_connector; resource_type(opents) -> emqx_bridge_opents_connector; -resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer; +resource_type(pulsar_producer) -> emqx_bridge_pulsar_connector; resource_type(oracle) -> emqx_oracle; resource_type(iotdb) -> emqx_bridge_iotdb_connector; resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl index 6d5ae4f4c..7233e9e6c 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl @@ -308,7 +308,7 @@ fields(Field) when Fields = fields("specific_connector_config"), emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); fields(What) -> - error({emqx_bridge_mqtt_connector_schema, missing_field_handler, What}). + error({?MODULE, missing_field_handler, What}). ingress_pool_size(desc) -> ?DESC("ingress_pool_size"); diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl index 2b9bb05bd..60cf634c4 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl @@ -124,7 +124,7 @@ fields(Field) when -> emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields("mqtt_subscriber_source")); fields(What) -> - error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}). + error({?MODULE, missing_field_handler, What}). %% v2: api schema %% The parameter equls to %% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1 diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index c9abebf8b..ce7c313ae 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.1.8"}, + {vsn, "0.2.0"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index f9f37846e..291c656ef 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -31,7 +31,21 @@ roots() -> []. fields(pulsar_producer) -> - fields(config) ++ fields(producer_opts); + fields(config) ++ + emqx_bridge_pulsar_pubsub_schema:fields(action_parameters) ++ + fields(producer_opts) ++ + [ + {local_topic, + mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})}, + {resource_opts, + mk( + ref(producer_resource_opts), + #{ + required => false, + desc => ?DESC(emqx_resource_schema, "creation_opts") + } + )} + ]; fields(config) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, @@ -85,10 +99,6 @@ fields(producer_opts) -> mk(emqx_schema:bytesize(), #{ default => <<"1MB">>, desc => ?DESC("producer_send_buffer") })}, - {sync_timeout, - mk(emqx_schema:timeout_duration_ms(), #{ - default => <<"3s">>, desc => ?DESC("producer_sync_timeout") - })}, {retention_period, mk( %% not used in a `receive ... after' block, just timestamp comparison @@ -100,26 +110,13 @@ fields(producer_opts) -> emqx_schema:bytesize(), #{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")} )}, - {local_topic, mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})}, {pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})}, {strategy, mk( hoconsc:enum([random, roundrobin, key_dispatch]), #{default => random, desc => ?DESC("producer_strategy")} )}, - {buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})}, - {message, - mk(ref(producer_pulsar_message), #{ - required => false, desc => ?DESC("producer_message_opts") - })}, - {resource_opts, - mk( - ref(producer_resource_opts), - #{ - required => false, - desc => ?DESC(emqx_resource_schema, "creation_opts") - } - )} + {buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})} ]; fields(producer_buffer) -> [ @@ -144,12 +141,6 @@ fields(producer_buffer) -> desc => ?DESC("buffer_memory_overload_protection") })} ]; -fields(producer_pulsar_message) -> - [ - {key, - mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC("producer_key_template")})}, - {value, mk(string(), #{default => <<"${.}">>, desc => ?DESC("producer_value_template")})} - ]; fields(producer_resource_opts) -> SupportedOpts = [ health_check_interval, @@ -225,8 +216,8 @@ producer_strategy_key_validator( producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf)); producer_strategy_key_validator(#{ <<"strategy">> := key_dispatch, - <<"message">> := #{<<"key">> := ""} -}) -> + <<"message">> := #{<<"key">> := Key} +}) when Key =:= "" orelse Key =:= <<>> -> {error, "Message key cannot be empty when `key_dispatch` strategy is used"}; producer_strategy_key_validator(_) -> ok. @@ -248,8 +239,7 @@ struct_names() -> [ auth_basic, auth_token, - producer_buffer, - producer_pulsar_message + producer_buffer ]. override_default(OriginalFn, NewDefault) -> diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl new file mode 100644 index 000000000..f51ed7884 --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl @@ -0,0 +1,54 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_pulsar_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + is_action/1, + action_convert_from_connector/2 +]). + +is_action(_) -> true. + +bridge_v1_type_name() -> pulsar_producer. + +action_type_name() -> pulsar. + +connector_type_name() -> pulsar. + +schema_module() -> emqx_bridge_pulsar_pubsub_schema. + +action_convert_from_connector(ConnectorConfig, ActionConfig) -> + Dispatch = emqx_utils_conv:bin(maps:get(<<"strategy">>, ConnectorConfig, <<>>)), + case Dispatch of + <<"key_dispatch">> -> + case emqx_utils_maps:deep_find([<<"parameters">>, <<"message">>], ActionConfig) of + {ok, Message} -> + Validator = + #{ + <<"strategy">> => key_dispatch, + <<"message">> => emqx_utils_maps:binary_key_map(Message) + }, + case emqx_bridge_pulsar:producer_strategy_key_validator(Validator) of + ok -> + ActionConfig; + {error, Reason} -> + throw(#{ + reason => Reason, + kind => validation_error + }) + end; + {not_found, _, _} -> + %% no message field, use the default message template + ActionConfig + end; + _ -> + ActionConfig + end. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl similarity index 81% rename from apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl rename to apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 2098cfeba..7b080d0e6 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_pulsar_impl_producer). +-module(emqx_bridge_pulsar_connector). -include("emqx_bridge_pulsar.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -13,8 +13,12 @@ callback_mode/0, query_mode/1, on_start/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, on_stop/2, on_get_status/2, + on_get_channel_status/3, on_query/3, on_query_async/4 ]). @@ -23,8 +27,7 @@ -type state() :: #{ pulsar_client_id := pulsar_client_id(), producers := pulsar_producers:producers(), - sync_timeout := erlang:timeout(), - message_template := message_template() + channels := map() }. -type buffer_mode() :: memory | disk | hybrid. -type compression_mode() :: no_compression | snappy | zlib. @@ -77,16 +80,12 @@ query_mode(_Config) -> -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> - #{ - bridge_name := BridgeName, - servers := Servers0, - ssl := SSL - } = Config, + #{servers := Servers0, ssl := SSL} = Config, Servers = format_servers(Servers0), - ClientId = make_client_id(InstanceId, BridgeName), + ClientId = make_client_id(InstanceId), ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId), SSLOpts = emqx_tls_lib:to_client_opts(SSL), - ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)), + ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(10)), ClientOpts = #{ connect_timeout => ConnectTimeout, ssl_opts => SSLOpts, @@ -119,6 +118,30 @@ on_start(InstanceId, Config) -> end, start_producer(Config, InstanceId, ClientId, ClientOpts). +on_add_channel( + _InstanceId, + #{channels := Channels} = State, + ChannelId, + #{parameters := #{message := Message, sync_timeout := SyncTimeout}} +) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + false -> + Parameters = #{ + message => compile_message_template(Message), + sync_timeout => SyncTimeout + }, + NewChannels = maps:put(ChannelId, Parameters, Channels), + {ok, State#{channels => NewChannels}} + end. + +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> + {ok, State#{channels => maps:remove(ChannelId, Channels)}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + -spec on_stop(resource_id(), state()) -> ok. on_stop(InstanceId, _State) -> case emqx_resource:get_allocated_resources(InstanceId) of @@ -174,76 +197,77 @@ on_get_status(_InstanceId, _State) -> %% create the bridge is not quite finished, `State = undefined'. connecting. --spec on_query(resource_id(), {send_message, map()}, state()) -> +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> connected; + false -> {error, channel_not_exists} + end. + +-spec on_query(resource_id(), tuple(), state()) -> {ok, term()} | {error, timeout} | {error, term()}. -on_query(_InstanceId, {send_message, Message}, State) -> - #{ - producers := Producers, - sync_timeout := SyncTimeout, - message_template := MessageTemplate - } = State, - PulsarMessage = render_message(Message, MessageTemplate), - try - pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) - catch - error:timeout -> - {error, timeout} +on_query(_InstanceId, {ChannelId, Message}, State) -> + #{producers := Producers, channels := Channels} = State, + case maps:find(ChannelId, Channels) of + error -> + {error, channel_not_exists}; + {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout}} -> + PulsarMessage = render_message(Message, MessageTmpl), + try + pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) + catch + error:timeout -> + {error, timeout} + end end. -spec on_query_async( - resource_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() + resource_id(), tuple(), {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, pid()}. -on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) -> - ?tp_span( - pulsar_producer_on_query_async, - #{instance_id => _InstanceId, message => Message}, - do_on_query_async(Message, AsyncReplyFn, State) - ). +on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> + #{producers := Producers, channels := Channels} = State, + case maps:find(ChannelId, Channels) of + error -> + {error, channel_not_exists}; + {ok, #{message := MessageTmpl}} -> + ?tp_span( + pulsar_producer_on_query_async, + #{instance_id => _InstanceId, message => Message}, + on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) + ) + end. -do_on_query_async(Message, AsyncReplyFn, State) -> - #{ - producers := Producers, - message_template := MessageTemplate - } = State, - PulsarMessage = render_message(Message, MessageTemplate), +on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) -> + PulsarMessage = render_message(Message, MessageTmpl), pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). %%------------------------------------------------------------------------------------- %% Internal fns %%------------------------------------------------------------------------------------- --spec to_bin(atom() | string() | binary()) -> binary(). -to_bin(A) when is_atom(A) -> - atom_to_binary(A); -to_bin(L) when is_list(L) -> - list_to_binary(L); -to_bin(B) when is_binary(B) -> - B. - -spec format_servers(binary()) -> [string()]. format_servers(Servers0) -> - Servers1 = emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS), lists:map( fun(#{scheme := Scheme, hostname := Host, port := Port}) -> Scheme ++ "://" ++ Host ++ ":" ++ integer_to_list(Port) end, - Servers1 + emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS) ). --spec make_client_id(resource_id(), atom() | binary()) -> pulsar_client_id(). -make_client_id(InstanceId, BridgeName) -> +-spec make_client_id(resource_id()) -> pulsar_client_id(). +make_client_id(InstanceId) -> case is_dry_run(InstanceId) of true -> pulsar_producer_probe; false -> + {pulsar, Name} = emqx_connector_resource:parse_connector_id(InstanceId), ClientIdBin = iolist_to_binary([ - <<"pulsar_producer:">>, - to_bin(BridgeName), + <<"pulsar:">>, + emqx_utils_conv:bin(Name), <<":">>, - to_bin(node()) + emqx_utils_conv:bin(node()) ]), binary_to_atom(ClientIdBin) end. @@ -252,10 +276,8 @@ make_client_id(InstanceId, BridgeName) -> is_dry_run(InstanceId) -> TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), case TestIdStart of - nomatch -> - false; - _ -> - string:equal(TestIdStart, InstanceId) + nomatch -> false; + _ -> string:equal(TestIdStart, InstanceId) end. conn_opts(#{authentication := none}) -> @@ -275,11 +297,11 @@ conn_opts(#{authentication := #{jwt := JWT}}) -> -spec replayq_dir(pulsar_client_id()) -> string(). replayq_dir(ClientId) -> - filename:join([emqx:data_dir(), "pulsar", to_bin(ClientId)]). + filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]). -spec producer_name(pulsar_client_id()) -> atom(). producer_name(ClientId) -> - ClientIdBin = to_bin(ClientId), + ClientIdBin = emqx_utils_conv:bin(ClientId), binary_to_atom( iolist_to_binary([ <<"producer-">>, @@ -303,12 +325,10 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> }, compression := Compression, max_batch_bytes := MaxBatchBytes, - message := MessageTemplateOpts, pulsar_topic := PulsarTopic0, retention_period := RetentionPeriod, send_buffer := SendBuffer, - strategy := Strategy, - sync_timeout := SyncTimeout + strategy := Strategy } = Config, {OffloadMode, ReplayQDir} = case BufferMode of @@ -330,7 +350,6 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> }, ProducerName = producer_name(ClientId), ?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}), - MessageTemplate = compile_message_template(MessageTemplateOpts), ProducerOpts0 = #{ batch_size => BatchSize, @@ -353,8 +372,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> State = #{ pulsar_client_id => ClientId, producers => Producers, - sync_timeout => SyncTimeout, - message_template => MessageTemplate + channels => #{} }, ?tp(pulsar_producer_bridge_started, #{}), {ok, State} diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl new file mode 100644 index 000000000..953318e0a --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar_connector_schema). + +-export([namespace/0, roots/0, fields/1, desc/1]). +-export([connector_examples/1, connector_example_values/0]). + +-include("emqx_bridge_pulsar.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-define(TYPE, pulsar). + +namespace() -> ?TYPE. + +roots() -> []. + +fields("config_connector") -> + lists:keydelete(enable, 1, emqx_bridge_schema:common_bridge_fields()) ++ + emqx_bridge_pulsar:fields(config) ++ + emqx_bridge_pulsar:fields(producer_opts) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields("post") -> + emqx_connector_schema:type_and_name_fields(?TYPE) ++ fields("config_connector"); +fields("put") -> + fields("config_connector"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("config_connector"). + +desc("config_connector") -> + ?DESC(emqx_bridge_pulsar, "config_connector"); +desc(connector_resource_opts) -> + ?DESC(emqx_bridge_pulsar, connector_resource_opts); +desc(_) -> + undefined. + +connector_examples(Method) -> + [ + #{ + <<"pulsar">> => + #{ + summary => <<"Pulsar Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?TYPE, connector_example_values() + ) + } + } + ]. + +connector_example_values() -> + #{ + name => <<"pulsar_connector">>, + type => ?TYPE, + enable => true, + servers => <<"pulsar://127.0.0.1:6650">>, + authentication => none, + connect_timeout => <<"5s">>, + batch_size => 10, + compression => no_compression, + send_buffer => <<"1MB">>, + retention_period => <<"100s">>, + max_batch_bytes => <<"32MB">>, + pulsar_topic => <<"test_topic">>, + strategy => random, + buffer => #{mode => memory}, + ssl => #{enable => false} + }. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl new file mode 100644 index 000000000..a705ed560 --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl @@ -0,0 +1,123 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar_pubsub_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1, desc/1, namespace/0]). + +-export([bridge_v2_examples/1]). + +-define(ACTION_TYPE, pulsar). +-define(CONNECTOR_SCHEMA, emqx_bridge_rabbitmq_connector_schema). + +namespace() -> "pulsar". + +roots() -> []. + +fields(action) -> + {pulsar, + ?HOCON( + ?MAP(name, ?R_REF(publisher_action)), + #{ + desc => <<"Pulsar Action Config">>, + required => false + } + )}; +fields(publisher_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + ?HOCON( + ?R_REF(action_parameters), + #{ + required => true, + desc => ?DESC(action_parameters) + } + ), + #{resource_opts_ref => ?R_REF(action_resource_opts)} + ); +fields(action_parameters) -> + [ + {sync_timeout, + ?HOCON(emqx_schema:timeout_duration_ms(), #{ + default => <<"3s">>, desc => ?DESC("producer_sync_timeout") + })}, + {message, + ?HOCON(?R_REF(producer_pulsar_message), #{ + required => false, desc => ?DESC("producer_message_opts") + })} + ]; +fields(producer_pulsar_message) -> + [ + {key, + ?HOCON(string(), #{ + default => <<"${.clientid}">>, + desc => ?DESC("producer_key_template") + })}, + {value, + ?HOCON(string(), #{ + default => <<"${.}">>, + desc => ?DESC("producer_value_template") + })} + ]; +fields(action_resource_opts) -> + UnsupportedOpts = [ + batch_size, + batch_time, + worker_pool_size, + request_ttl, + inflight_window, + max_buffer_bytes, + query_mode + ], + lists:filter( + fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, + emqx_bridge_v2_schema:action_resource_opts_fields() + ); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(publisher_action)); +fields(What) -> + error({?MODULE, missing_field_handler, What}). + +desc("config") -> + ?DESC("desc_config"); +desc(action_resource_opts) -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc(action_parameters) -> + ?DESC(action_parameters); +desc(producer_pulsar_message) -> + ?DESC("producer_message_opts"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; +desc(publisher_action) -> + ?DESC(publisher_action); +desc(_) -> + undefined. + +bridge_v2_examples(Method) -> + [ + #{ + <<"pulsar">> => #{ + summary => <<"Pulsar Producer Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, + _ActionType = ?ACTION_TYPE, + _ConnectorType = pulsar, + #{ + parameters => #{ + sync_timeout => <<"5s">>, + message => #{ + key => <<"${.clientid}">>, + value => <<"${.}">> + } + } + } + ) + } + } + ]. diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl similarity index 97% rename from apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl rename to apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl index dfc5af3a7..c9b25cc71 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_pulsar_impl_producer_SUITE). +-module(emqx_bridge_pulsar_connector_SUITE). -compile(nowarn_export_all). -compile(export_all). @@ -550,7 +550,6 @@ kill_resource_managers() -> t_start_and_produce_ok(Config) -> MQTTTopic = ?config(mqtt_topic, Config), - ResourceId = resource_id(Config), ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), QoS = 0, Payload = emqx_guid:to_hexstr(emqx_guid:gen()), @@ -600,6 +599,13 @@ t_start_and_produce_ok(Config) -> _Sleep = 100, _Attempts0 = 20, begin + BridgeId = emqx_bridge_resource:bridge_id( + <<"pulsar">>, ?config(pulsar_name, Config) + ), + ConnectorId = emqx_bridge_resource:resource_id( + <<"pulsar">>, ?config(pulsar_name, Config) + ), + Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>, ?assertMatch( #{ counters := #{ @@ -612,7 +618,7 @@ t_start_and_produce_ok(Config) -> success := 2 } }, - emqx_resource_manager:get_metrics(ResourceId) + emqx_resource:get_metrics(Id) ), ?assertEqual( 1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success') @@ -631,17 +637,22 @@ t_start_and_produce_ok(Config) -> %% Under normal operations, the bridge will be called async via %% `simple_async_query'. t_sync_query(Config) -> - ResourceId = resource_id(Config), Payload = emqx_guid:to_hexstr(emqx_guid:gen()), ?check_trace( begin ?assertMatch({ok, _}, create_bridge_api(Config)), + ResourceId = resource_id(Config), ?retry( _Sleep = 1_000, _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - Message = {send_message, #{payload => Payload}}, + BridgeId = emqx_bridge_resource:bridge_id(<<"pulsar">>, ?config(pulsar_name, Config)), + ConnectorId = emqx_bridge_resource:resource_id( + <<"pulsar">>, ?config(pulsar_name, Config) + ), + Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>, + Message = {Id, #{payload => Payload}}, ?assertMatch( {ok, #{sequence_id := _}}, emqx_resource:simple_sync_query(ResourceId, Message) ), @@ -688,13 +699,13 @@ t_create_via_http(Config) -> t_start_stop(Config) -> PulsarName = ?config(pulsar_name, Config), - ResourceId = resource_id(Config), ?check_trace( begin ?assertMatch( {ok, _}, create_bridge(Config) ), + ResourceId = resource_id(Config), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. ?retry( @@ -745,11 +756,11 @@ t_on_get_status(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), - ResourceId = resource_id(Config), ?assertMatch( {ok, _}, create_bridge(Config) ), + ResourceId = resource_id(Config), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. ?retry( @@ -777,7 +788,6 @@ t_start_when_down(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), - ResourceId = resource_id(Config), ?check_trace( begin emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> @@ -787,6 +797,7 @@ t_start_when_down(Config) -> ), ok end), + ResourceId = resource_id(Config), %% Should recover given enough time. ?retry( _Sleep = 1_000, @@ -902,7 +913,6 @@ t_failure_to_start_producer(Config) -> %% die for whatever reason. t_producer_process_crash(Config) -> MQTTTopic = ?config(mqtt_topic, Config), - ResourceId = resource_id(Config), QoS = 0, ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), Payload = emqx_guid:to_hexstr(emqx_guid:gen()), @@ -934,6 +944,7 @@ t_producer_process_crash(Config) -> ok after 1_000 -> ct:fail("pid didn't die") end, + ResourceId = resource_id(Config), ?retry( _Sleep0 = 50, _Attempts0 = 50, @@ -995,8 +1006,8 @@ t_resource_manager_crash_after_producers_started(Config) -> Producers =/= undefined, 10_000 ), - ?assertMatch(ok, delete_bridge(Config)), ?assertEqual([], get_pulsar_producers()), + ?assertMatch({error, bridge_not_found}, delete_bridge(Config)), ok end, [] @@ -1028,8 +1039,8 @@ t_resource_manager_crash_before_producers_started(Config) -> #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, 10_000 ), - ?assertMatch(ok, delete_bridge(Config)), ?assertEqual([], get_pulsar_producers()), + ?assertMatch({error, bridge_not_found}, delete_bridge(Config)), ok end, [] @@ -1046,7 +1057,7 @@ t_strategy_key_validation(Config) -> <<"reason">> := <<"Message key cannot be empty", _/binary>> } }}}, - probe_bridge_api( + create_bridge_api( Config, #{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}} ) @@ -1060,7 +1071,7 @@ t_strategy_key_validation(Config) -> <<"reason">> := <<"Message key cannot be empty", _/binary>> } }}}, - create_bridge_api( + probe_bridge_api( Config, #{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}} ) @@ -1075,7 +1086,6 @@ do_t_cluster(Config) -> ?check_trace( begin MQTTTopic = ?config(mqtt_topic, Config), - ResourceId = resource_id(Config), Nodes = [N1, N2 | _] = cluster(Config), ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), QoS = 0, @@ -1095,6 +1105,7 @@ do_t_cluster(Config) -> ), 25_000 ), + ResourceId = erpc:call(N1, ?MODULE, resource_id, [Config]), lists:foreach( fun(N) -> ?retry( @@ -1147,12 +1158,12 @@ t_resilience(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), - ResourceId = resource_id(Config), ?check_trace( begin {ok, _} = create_bridge(Config), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + ResourceId = resource_id(Config), ?retry( _Sleep0 = 1_000, _Attempts0 = 20, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl index 3fb00632c..9a9741226 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl @@ -170,7 +170,7 @@ fields(Field) when -> emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(subscriber_source)); fields(What) -> - error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}). + error({?MODULE, missing_field_handler, What}). %% v2: api schema %% The parameter equals to %% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1 diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 23bc5a8b4..15b196af4 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -74,6 +74,8 @@ resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; resource_type(tdengine) -> emqx_bridge_tdengine_connector; +resource_type(pulsar) -> + emqx_bridge_pulsar_connector; resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; resource_type(s3) -> @@ -94,6 +96,8 @@ connector_impl_module(elasticsearch) -> emqx_bridge_es_connector; connector_impl_module(opents) -> emqx_bridge_opents_connector; +connector_impl_module(pulsar) -> + emqx_bridge_pulsar_connector; connector_impl_module(tdengine) -> emqx_bridge_tdengine_connector; connector_impl_module(rabbitmq) -> @@ -317,6 +321,14 @@ connector_structs() -> required => false } )}, + {pulsar, + mk( + hoconsc:map(name, ref(emqx_bridge_pulsar_connector_schema, "config_connector")), + #{ + desc => <<"Pulsar Connector Config">>, + required => false + } + )}, {rabbitmq, mk( hoconsc:map(name, ref(emqx_bridge_rabbitmq_connector_schema, "config_connector")), @@ -361,6 +373,7 @@ schema_modules() -> emqx_bridge_iotdb_connector, emqx_bridge_es_connector, emqx_bridge_rabbitmq_connector_schema, + emqx_bridge_pulsar_connector_schema, emqx_bridge_opents_connector, emqx_bridge_greptimedb, emqx_bridge_tdengine_connector, @@ -410,6 +423,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method), api_ref(emqx_bridge_opents_connector, <<"opents">>, Method), api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method), + api_ref(emqx_bridge_pulsar_connector_schema, <<"pulsar">>, Method), api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"), api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method), api_ref(emqx_bridge_s3, <<"s3">>, Method ++ "_connector") diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index fc68bbd9d..430a74bdb 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -174,6 +174,8 @@ connector_type_to_bridge_types(opents) -> [opents]; connector_type_to_bridge_types(greptimedb) -> [greptimedb]; +connector_type_to_bridge_types(pulsar) -> + [pulsar_producer, pulsar]; connector_type_to_bridge_types(tdengine) -> [tdengine]; connector_type_to_bridge_types(rabbitmq) -> @@ -269,6 +271,7 @@ split_bridge_to_connector_and_action( #{<<"connector">> := ConnectorName0} -> ConnectorName0; _ -> generate_connector_name(ConnectorsMap, BridgeName, 0) end, + OrgActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeType), {ActionMap, ActionType, ActionOrSource} = case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of diff --git a/rel/i18n/emqx_bridge_pulsar.hocon b/rel/i18n/emqx_bridge_pulsar.hocon index e1b6153d3..913ab8d2a 100644 --- a/rel/i18n/emqx_bridge_pulsar.hocon +++ b/rel/i18n/emqx_bridge_pulsar.hocon @@ -1,180 +1,173 @@ emqx_bridge_pulsar { - auth_basic { - desc = "Parameters for basic authentication." - label = "Basic auth params" - } - auth_basic_password { - desc = "Basic authentication password." - label = "Password" - } +config_connector.desc: +"""Pulsar connector config""" +config_connector.label: +"""Pulsar Connector""" - auth_basic_username { - desc = "Basic authentication username." - label = "Username" - } +connector_resource_opts.desc: +"""Pulsar connector resource options""" +connector_resource_opts.label: +"""Resource Options""" - auth_token { - desc = "Parameters for token authentication." - label = "Token auth params" - } +auth_basic.desc: + """Parameters for basic authentication.""" +auth_basic.label: +"""Basic auth params""" - auth_token_jwt { - desc = "JWT authentication token." - label = "JWT" - } +auth_basic_password.desc: +"""Basic authentication password.""" +auth_basic_password.label: +"""Password""" - authentication { - desc = "Authentication configs." - label = "Authentication" - } +auth_basic_username.desc: +"""Basic authentication username.""" +auth_basic_username.label: +"""Username""" - buffer_memory_overload_protection { - desc = "Applicable when buffer mode is set to memory\n" - "EMQX will drop old buffered messages under high memory pressure." - " The high memory threshold is defined in config sysmon.os.sysmem_high_watermark." - " NOTE: This config only works on Linux." - label = "Memory Overload Protection" - } +auth_token.desc: +"""Parameters for token authentication.""" +auth_token.label: +"""Token auth params""" - buffer_mode { - desc = "Message buffer mode.\n" - "memory: Buffer all messages in memory. The messages will be lost" - " in case of EMQX node restart\ndisk: Buffer all messages on disk." - " The messages on disk are able to survive EMQX node restart.\n" - "hybrid: Buffer message in memory first, when up to certain limit" - " (see segment_bytes config for more information), then start offloading" - " messages to disk, Like memory mode, the messages will be lost in" - " case of EMQX node restart." - label = "Buffer Mode" - } +auth_token_jwt.desc: +"""JWT authentication token.""" +auth_token_jwt.label: +"""JWT""" - buffer_per_partition_limit { - desc = "Number of bytes allowed to buffer for each Pulsar partition." - " When this limit is exceeded, old messages will be dropped in a trade for credits" - " for new messages to be buffered." - label = "Per-partition Buffer Limit" - } +authentication.desc: +"""Authentication configs.""" +authentication.label: +"""Authentication""" - buffer_segment_bytes { - desc = "Applicable when buffer mode is set to disk or hybrid.\n" - "This value is to specify the size of each on-disk buffer file." - label = "Segment File Bytes" - } +buffer_memory_overload_protection.desc: +"""Applicable when buffer mode is set to memory +EMQX will drop old buffered messages under high memory pressure. +The high memory threshold is defined in config sysmon.os.sysmem_high_watermark. + NOTE: This config only works on Linux.""" +buffer_memory_overload_protection.label: +"""Memory Overload Protection""" - config_enable { - desc = "Enable (true) or disable (false) this Pulsar bridge." - label = "Enable or Disable" - } +buffer_mode.desc: +"""Message buffer mode. +memory: Buffer all messages in memory. The messages will be lost + in case of EMQX node restart\ndisk: Buffer all messages on disk. + The messages on disk are able to survive EMQX node restart. +hybrid: Buffer message in memory first, when up to certain limit + (see segment_bytes config for more information), then start offloading + messages to disk, Like memory mode, the messages will be lost in + case of EMQX node restart.""" +buffer_mode.label: +"""Buffer Mode""" - connect_timeout { - desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)." - label = "Connect Timeout" - } +buffer_per_partition_limit.desc: +"""Number of bytes allowed to buffer for each Pulsar partition. + When this limit is exceeded, old messages will be dropped in a trade for credits + for new messages to be buffered.""" + buffer_per_partition_limit.label: +"""Per-partition Buffer Limit""" - desc_name { - desc = "Action name, a human-readable identifier." - label = "Action Name" - } +desc_name.desc: +"""Action name, a human-readable identifier.""" +desc_name.label: +"""Action Name""" - desc_type { - desc = "The Bridge Type" - label = "Bridge Type" - } +buffer_segment_bytes.desc: +"""Applicable when buffer mode is set to disk or hybrid. +This value is to specify the size of each on-disk buffer file.""" +buffer_segment_bytes.label: +"""Segment File Bytes""" - producer_batch_size { - desc = "Maximum number of individual requests to batch in a Pulsar message." - label = "Batch size" - } +config_enable.desc: +"""Enable (true) or disable (false) this Pulsar bridge.""" +config_enable.label: +"""Enable or Disable""" - producer_buffer { - desc = "Configure producer message buffer.\n\n" - "Tell Pulsar producer how to buffer messages when EMQX has more messages to" - " send than Pulsar can keep up, or when Pulsar is down." - label = "Message Buffer" - } +connect_timeout.desc: +"""Maximum wait time for TCP connection establishment (including authentication time if enabled).""" +connect_timeout.label: +"""Connect Timeout""" - producer_compression { - desc = "Compression method." - label = "Compression" - } +desc_name.desc: +"""Bridge name, used as a human-readable description of the bridge.""" +desc_name.label: +"""Bridge Name""" - producer_key_template { - desc = "Template to render Pulsar message key." - label = "Message Key" - } +desc_type.desc: +"""The Bridge Type""" +desc_type.label: +"""Bridge Type""" - producer_local_topic { - desc = "MQTT topic or topic filter as data source (bridge input)." - " If rule action is used as data source, this config should be left empty," - " otherwise messages will be duplicated in Pulsar." - label = "Source MQTT Topic" - } +producer_batch_size.desc: +"""Maximum number of individual requests to batch in a Pulsar message.""" +producer_batch_size.label: +"""Batch size""" - producer_max_batch_bytes { - desc = "Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers" - " default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in" - " order to compensate Pulsar message encoding overheads (especially when each individual" - " message is very small). When a single message is over the limit, it is still" - " sent (as a single element batch)." - label = "Max Batch Bytes" - } +producer_buffer.desc: +"""Configure producer message buffer." +Tell Pulsar producer how to buffer messages when EMQX has more messages to" + send than Pulsar can keep up, or when Pulsar is down.""" +producer_buffer.label: +"""Message Buffer""" - producer_message_opts { - desc = "Template to render a Pulsar message." - label = "Pulsar Message Template" - } +producer_compression.desc: +"""Compression method.""" +producer_compression.label: +"""Compression""" - producer_pulsar_message { - desc = "Template to render a Pulsar message." - label = "Pulsar Message Template" - } +producer_local_topic.desc: +"""MQTT topic or topic filter as data source (bridge input) + If rule action is used as data source, this config should be left empty, + otherwise messages will be duplicated in Pulsar.""" +producer_local_topic.label: +"""Source MQTT Topic""" - producer_pulsar_topic { - desc = "Pulsar topic name" - label = "Pulsar topic name" - } +producer_max_batch_bytes.desc: +"""Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers + default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in + order to compensate Pulsar message encoding overheads (especially when each individual + message is very small). When a single message is over the limit, it is still + sent (as a single element batch).""" +producer_max_batch_bytes.label: +"""Max Batch Bytes""" - producer_retention_period { - desc = "The amount of time messages will be buffered while there is no connection to" - " the Pulsar broker. Longer times mean that more memory/disk will be used" - label = "Retention Period" - } - producer_send_buffer { - desc = "Fine tune the socket send buffer. The default value is tuned for high throughput." - label = "Socket Send Buffer Size" - } +producer_pulsar_topic.desc: +"""Pulsar topic name""" +producer_pulsar_topic.label: +"""Pulsar topic name""" - producer_strategy { - desc = "Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.\n" - "\n" - "random: Randomly pick a partition for each message.\n" - "roundrobin: Pick each available producer in turn for each message.\n" - "key_dispatch: Hash Pulsar message key of the first message in a batch" - " to a partition number." - label = "Partition Strategy" - } +producer_retention_period.desc: +"""The amount of time messages will be buffered while there is no connection to + the Pulsar broker. Longer times mean that more memory/disk will be used""" +producer_retention_period.label: +"""Retention Period""" - producer_sync_timeout { - desc = "Maximum wait time for receiving a receipt from Pulsar when publishing synchronously." - label = "Sync publish timeout" - } +producer_send_buffer.desc: +"""Fine tune the socket send buffer. The default value is tuned for high throughput.""" +producer_send_buffer.label: +"""Socket Send Buffer Size""" - producer_value_template { - desc = "Template to render Pulsar message value." - label = "Message Value" - } +producer_strategy.desc: +"""Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions. - pulsar_producer_struct { - desc = "Configuration for a Pulsar bridge." - label = "Pulsar Bridge Configuration" - } +random: Randomly pick a partition for each message. +roundrobin: Pick each available producer in turn for each message. +key_dispatch: Hash Pulsar message key of the first message in a batch + to a partition number.""" +producer_strategy.label: +"""Partition Strategy""" + +pulsar_producer_struct.desc: +"""Configuration for a Pulsar bridge.""" +pulsar_producer_struct.label: +"""Pulsar Bridge Configuration""" + +servers.desc: +"""A comma separated list of Pulsar URLs in the form scheme://host[:port] + for the client to connect to. The supported schemes are pulsar:// (default) + and pulsar+ssl://. The default port is 6650.""" +servers.label: +"""Servers""" - servers { - desc = "A comma separated list of Pulsar URLs in the form scheme://host[:port]" - " for the client to connect to. The supported schemes are pulsar:// (default)" - " and pulsar+ssl://. The default port is 6650." - label = "Servers" - } } diff --git a/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon b/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon new file mode 100644 index 000000000..a359bc755 --- /dev/null +++ b/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon @@ -0,0 +1,38 @@ +emqx_bridge_pulsar_pubsub_schema { + +action_parameters.desc: +"""Action specific configs.""" +action_parameters.label: +"""Action""" + +publisher_action.desc: +"""Publish message to pulsar topic""" +publisher_action.label: +"""Publish Action """ + +producer_sync_timeout.desc: +"""Maximum wait time for receiving a receipt from Pulsar when publishing synchronously.""" +producer_sync_timeout.label: +"""Sync publish timeout""" + +producer_key_template.desc: +"""Template to render Pulsar message key.""" +producer_key_template.label: +"""Message Key""" + +producer_value_template.desc: +"""Template to render Pulsar message value.""" +producer_value_template.label: +"""Message Value""" + +producer_message_opts.desc: +"""Template to render a Pulsar message.""" +producer_message_opts.label: +"""Pulsar Message Template""" + +producer_pulsar_message.desc: +"""Template to render a Pulsar message.""" +producer_pulsar_message.label: +"""Pulsar Message Template""" + +}