Merge pull request #12423 from zhongwencool/rabbitmq-e560
feat: rabbitmq bridge v2 integration
This commit is contained in:
commit
eace8f2fcb
|
@ -9,10 +9,12 @@ services:
|
|||
expose:
|
||||
- "15672"
|
||||
- "5672"
|
||||
- "5671"
|
||||
# We don't want to take ports from the host
|
||||
# ports:
|
||||
#ports:
|
||||
# - "15672:15672"
|
||||
# - "5672:5672"
|
||||
# - "5671:5671"
|
||||
volumes:
|
||||
- ./certs/ca.crt:/opt/certs/ca.crt
|
||||
- ./certs/server.crt:/opt/certs/server.crt
|
||||
|
|
|
@ -584,7 +584,14 @@ is_tcp_server_available(Host, Port) ->
|
|||
Timeout :: integer()
|
||||
) -> boolean.
|
||||
is_tcp_server_available(Host, Port, Timeout) ->
|
||||
case gen_tcp:connect(Host, Port, [], Timeout) of
|
||||
case
|
||||
gen_tcp:connect(
|
||||
emqx_utils_conv:str(Host),
|
||||
emqx_utils_conv:int(Port),
|
||||
[],
|
||||
Timeout
|
||||
)
|
||||
of
|
||||
{ok, Socket} ->
|
||||
gen_tcp:close(Socket),
|
||||
true;
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
transform_bridge_v1_config_to_action_config/4,
|
||||
action_convert_from_connector/3
|
||||
]).
|
||||
-export([clean_cache/0]).
|
||||
|
||||
-callback bridge_v1_type_name() ->
|
||||
atom()
|
||||
|
@ -77,7 +78,7 @@
|
|||
]).
|
||||
|
||||
%% ====================================================================
|
||||
%% Hadcoded list of info modules for actions
|
||||
%% HardCoded list of info modules for actions
|
||||
%% TODO: Remove this list once we have made sure that all relevants
|
||||
%% apps are loaded before this module is called.
|
||||
%% ====================================================================
|
||||
|
@ -103,6 +104,7 @@ hard_coded_action_info_modules_ee() ->
|
|||
emqx_bridge_iotdb_action_info,
|
||||
emqx_bridge_es_action_info,
|
||||
emqx_bridge_opents_action_info,
|
||||
emqx_bridge_rabbitmq_action_info,
|
||||
emqx_bridge_greptimedb_action_info,
|
||||
emqx_bridge_tdengine_action_info
|
||||
].
|
||||
|
@ -313,6 +315,9 @@ build_cache() ->
|
|||
persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap),
|
||||
ActionInfoMap.
|
||||
|
||||
clean_cache() ->
|
||||
persistent_term:erase(internal_emqx_action_persistent_term_info_key()).
|
||||
|
||||
action_info_modules() ->
|
||||
ActionInfoModules = [
|
||||
action_info_modules(App)
|
||||
|
|
|
@ -44,6 +44,7 @@ stop(_State) ->
|
|||
emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH),
|
||||
ok = emqx_bridge:unload(),
|
||||
ok = emqx_bridge_v2:unload(),
|
||||
emqx_action_info:clean_cache(),
|
||||
ok.
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
%% -*- mode: erlang; -*-
|
||||
|
||||
{erl_opts, [debug_info]}.
|
||||
{erl_opts, [debug_info, {feature, maybe_expr, enable}]}.
|
||||
{deps, [
|
||||
%% The following two are dependencies of rabbit_common
|
||||
{thoas, {git, "https://github.com/emqx/thoas.git", {tag, "v1.0.0"}}},
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
{application, emqx_bridge_rabbitmq, [
|
||||
{description, "EMQX Enterprise RabbitMQ Bridge"},
|
||||
{vsn, "0.1.7"},
|
||||
{vsn, "0.1.8"},
|
||||
{registered, []},
|
||||
{mod, {emqx_bridge_rabbitmq_app, []}},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_rabbitmq).
|
||||
|
||||
|
@ -22,7 +22,7 @@
|
|||
]).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Callback used by HTTP API
|
||||
%% Callback used by HTTP API v1
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
|
||||
conn_bridge_examples(Method) ->
|
||||
|
@ -78,7 +78,7 @@ fields("config") ->
|
|||
{local_topic,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("local_topic"), default => undefined}
|
||||
#{desc => ?DESC("local_topic")}
|
||||
)},
|
||||
{resource_opts,
|
||||
mk(
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_rabbitmq_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
||||
-export([
|
||||
bridge_v1_type_name/0,
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0,
|
||||
bridge_v1_config_to_connector_config/1,
|
||||
bridge_v1_config_to_action_config/2,
|
||||
is_source/0,
|
||||
is_action/0
|
||||
]).
|
||||
|
||||
-define(SCHEMA_MODULE, emqx_bridge_rabbitmq_pubsub_schema).
|
||||
-import(emqx_utils_conv, [bin/1]).
|
||||
|
||||
bridge_v1_type_name() -> rabbitmq.
|
||||
|
||||
action_type_name() -> rabbitmq.
|
||||
|
||||
connector_type_name() -> rabbitmq.
|
||||
|
||||
schema_module() -> ?SCHEMA_MODULE.
|
||||
|
||||
is_source() -> true.
|
||||
is_action() -> true.
|
||||
|
||||
bridge_v1_config_to_connector_config(BridgeV1Config) ->
|
||||
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(publisher_action)),
|
||||
ActionParametersKeys = schema_keys(?SCHEMA_MODULE:fields(action_parameters)),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ConnectorTopLevelKeys = schema_keys(
|
||||
emqx_bridge_rabbitmq_connector_schema:fields("config_connector")
|
||||
),
|
||||
ConnectorKeys = (maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)),
|
||||
ConnectorConfig0 = maps:with(ConnectorKeys, BridgeV1Config),
|
||||
emqx_utils_maps:update_if_present(
|
||||
<<"resource_opts">>,
|
||||
fun emqx_connector_schema:project_to_connector_resource_opts/1,
|
||||
ConnectorConfig0
|
||||
).
|
||||
|
||||
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
||||
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(publisher_action)),
|
||||
ActionParametersKeys = schema_keys(?SCHEMA_MODULE:fields(action_parameters)),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ActionConfig0 = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
|
||||
emqx_utils_maps:update_if_present(
|
||||
<<"resource_opts">>,
|
||||
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
|
||||
ActionConfig0#{<<"connector">> => ConnectorName}
|
||||
).
|
||||
|
||||
schema_keys(Schema) ->
|
||||
[bin(Key) || {Key, _} <- Schema].
|
||||
|
||||
make_config_map(PickKeys, IndentKeys, Config) ->
|
||||
Conf0 = maps:with(PickKeys, Config),
|
||||
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
|
|
@ -0,0 +1,26 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_rabbitmq_app).
|
||||
|
||||
-behaviour(application).
|
||||
|
||||
-export([start/2, stop/1]).
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
emqx_bridge_rabbitmq_sup:start_link().
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
|
@ -1,9 +1,10 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_rabbitmq_connector).
|
||||
|
||||
-feature(maybe_expr, enable).
|
||||
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
|
@ -22,17 +23,16 @@
|
|||
%% hocon_schema callbacks
|
||||
-export([namespace/0, roots/0, fields/1]).
|
||||
|
||||
%% HTTP API callbacks
|
||||
-export([values/1]).
|
||||
|
||||
%% emqx_resource callbacks
|
||||
-export([
|
||||
%% Required callbacks
|
||||
on_start/2,
|
||||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_stop/2,
|
||||
callback_mode/0,
|
||||
%% Optional callbacks
|
||||
on_get_status/2,
|
||||
on_get_channel_status/3,
|
||||
on_query/3,
|
||||
on_batch_query/3
|
||||
]).
|
||||
|
@ -41,142 +41,18 @@
|
|||
-export([connect/1]).
|
||||
|
||||
%% Internal callbacks
|
||||
-export([publish_messages/3]).
|
||||
-export([publish_messages/4]).
|
||||
|
||||
namespace() -> "rabbitmq".
|
||||
|
||||
%% bridge v1
|
||||
roots() ->
|
||||
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
||||
|
||||
%% bridge v1 called by emqx_bridge_rabbitmq
|
||||
fields(config) ->
|
||||
[
|
||||
{server,
|
||||
hoconsc:mk(
|
||||
typerefl:binary(),
|
||||
#{
|
||||
default => <<"localhost">>,
|
||||
desc => ?DESC("server")
|
||||
}
|
||||
)},
|
||||
{port,
|
||||
hoconsc:mk(
|
||||
emqx_schema:port_number(),
|
||||
#{
|
||||
default => 5672,
|
||||
desc => ?DESC("server")
|
||||
}
|
||||
)},
|
||||
{username,
|
||||
hoconsc:mk(
|
||||
typerefl:binary(),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC("username")
|
||||
}
|
||||
)},
|
||||
{password, emqx_connector_schema_lib:password_field(#{required => true})},
|
||||
{pool_size,
|
||||
hoconsc:mk(
|
||||
typerefl:pos_integer(),
|
||||
#{
|
||||
default => 8,
|
||||
desc => ?DESC("pool_size")
|
||||
}
|
||||
)},
|
||||
{timeout,
|
||||
hoconsc:mk(
|
||||
emqx_schema:timeout_duration_ms(),
|
||||
#{
|
||||
default => <<"5s">>,
|
||||
desc => ?DESC("timeout")
|
||||
}
|
||||
)},
|
||||
{wait_for_publish_confirmations,
|
||||
hoconsc:mk(
|
||||
boolean(),
|
||||
#{
|
||||
default => true,
|
||||
desc => ?DESC("wait_for_publish_confirmations")
|
||||
}
|
||||
)},
|
||||
{publish_confirmation_timeout,
|
||||
hoconsc:mk(
|
||||
emqx_schema:timeout_duration_ms(),
|
||||
#{
|
||||
default => <<"30s">>,
|
||||
desc => ?DESC("timeout")
|
||||
}
|
||||
)},
|
||||
|
||||
{virtual_host,
|
||||
hoconsc:mk(
|
||||
typerefl:binary(),
|
||||
#{
|
||||
default => <<"/">>,
|
||||
desc => ?DESC("virtual_host")
|
||||
}
|
||||
)},
|
||||
{heartbeat,
|
||||
hoconsc:mk(
|
||||
emqx_schema:timeout_duration_ms(),
|
||||
#{
|
||||
default => <<"30s">>,
|
||||
desc => ?DESC("heartbeat")
|
||||
}
|
||||
)},
|
||||
%% Things related to sending messages to RabbitMQ
|
||||
{exchange,
|
||||
hoconsc:mk(
|
||||
typerefl:binary(),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC("exchange")
|
||||
}
|
||||
)},
|
||||
{routing_key,
|
||||
hoconsc:mk(
|
||||
typerefl:binary(),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC("routing_key")
|
||||
}
|
||||
)},
|
||||
{delivery_mode,
|
||||
hoconsc:mk(
|
||||
hoconsc:enum([non_persistent, persistent]),
|
||||
#{
|
||||
default => non_persistent,
|
||||
desc => ?DESC("delivery_mode")
|
||||
}
|
||||
)},
|
||||
{payload_template,
|
||||
hoconsc:mk(
|
||||
binary(),
|
||||
#{
|
||||
default => <<"${.}">>,
|
||||
desc => ?DESC("payload_template")
|
||||
}
|
||||
)}
|
||||
] ++ emqx_connector_schema_lib:ssl_fields().
|
||||
|
||||
values(post) ->
|
||||
maps:merge(values(put), #{name => <<"connector">>});
|
||||
values(get) ->
|
||||
values(post);
|
||||
values(put) ->
|
||||
#{
|
||||
server => <<"localhost">>,
|
||||
port => 5672,
|
||||
enable => true,
|
||||
pool_size => 8,
|
||||
type => rabbitmq,
|
||||
username => <<"guest">>,
|
||||
password => <<"******">>,
|
||||
routing_key => <<"my_routing_key">>,
|
||||
payload_template => <<"">>
|
||||
};
|
||||
values(_) ->
|
||||
#{}.
|
||||
emqx_bridge_rabbitmq_connector_schema:fields(connector) ++
|
||||
emqx_bridge_rabbitmq_pubsub_schema:fields(action_parameters).
|
||||
|
||||
%% ===================================================================
|
||||
%% Callbacks defined in emqx_resource
|
||||
|
@ -186,127 +62,84 @@ values(_) ->
|
|||
|
||||
callback_mode() -> always_sync.
|
||||
|
||||
%% emqx_resource callback
|
||||
|
||||
%% emqx_resource callback called when the resource is started
|
||||
|
||||
-spec on_start(resource_id(), term()) -> {ok, resource_state()} | {error, _}.
|
||||
on_start(
|
||||
InstanceID,
|
||||
#{
|
||||
pool_size := PoolSize,
|
||||
payload_template := PayloadTemplate,
|
||||
delivery_mode := InitialDeliveryMode
|
||||
} = InitialConfig
|
||||
) ->
|
||||
DeliveryMode =
|
||||
case InitialDeliveryMode of
|
||||
non_persistent -> 1;
|
||||
persistent -> 2
|
||||
end,
|
||||
Config = InitialConfig#{
|
||||
delivery_mode => DeliveryMode
|
||||
},
|
||||
on_start(InstanceID, Config) ->
|
||||
?SLOG(info, #{
|
||||
msg => "starting_rabbitmq_connector",
|
||||
connector => InstanceID,
|
||||
config => emqx_utils:redact(Config)
|
||||
}),
|
||||
init_secret(),
|
||||
Options = [
|
||||
{config, Config},
|
||||
%% The pool_size is read by ecpool and decides the number of workers in
|
||||
%% the pool
|
||||
{pool_size, PoolSize},
|
||||
{pool_size, maps:get(pool_size, Config)},
|
||||
{pool, InstanceID}
|
||||
],
|
||||
ProcessedTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate),
|
||||
State = #{
|
||||
poolname => InstanceID,
|
||||
processed_payload_template => ProcessedTemplate,
|
||||
config => Config
|
||||
},
|
||||
%% Initialize RabbitMQ's secret library so that the password is encrypted
|
||||
%% in the log files.
|
||||
case credentials_obfuscation:secret() of
|
||||
?PENDING_SECRET ->
|
||||
Bytes = crypto:strong_rand_bytes(128),
|
||||
%% The password can appear in log files if we don't do this
|
||||
credentials_obfuscation:set_secret(Bytes);
|
||||
_ ->
|
||||
%% Already initialized
|
||||
ok
|
||||
end,
|
||||
case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{ok, #{channels => #{}}};
|
||||
{error, Reason} ->
|
||||
?SLOG(info, #{
|
||||
?SLOG(error, #{
|
||||
msg => "rabbitmq_connector_start_failed",
|
||||
error_reason => Reason,
|
||||
reason => Reason,
|
||||
config => emqx_utils:redact(Config)
|
||||
}),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% emqx_resource callback called when the resource is stopped
|
||||
|
||||
-spec on_stop(resource_id(), resource_state()) -> term().
|
||||
on_stop(
|
||||
ResourceID,
|
||||
_State
|
||||
on_add_channel(
|
||||
InstanceId,
|
||||
#{channels := Channels} = State,
|
||||
ChannelId,
|
||||
Config
|
||||
) ->
|
||||
case maps:is_key(ChannelId, Channels) of
|
||||
true ->
|
||||
{error, already_exists};
|
||||
false ->
|
||||
ProcParam = preproc_parameter(Config),
|
||||
case make_channel(InstanceId, ChannelId, ProcParam) of
|
||||
{ok, RabbitChannels} ->
|
||||
Channel = #{param => ProcParam, rabbitmq => RabbitChannels},
|
||||
NewChannels = maps:put(ChannelId, Channel, Channels),
|
||||
{ok, State#{channels => NewChannels}};
|
||||
{error, Error} ->
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_start_rabbitmq_channel",
|
||||
instance_id => InstanceId,
|
||||
params => emqx_utils:redact(Config),
|
||||
error => Error
|
||||
}),
|
||||
{error, Error}
|
||||
end
|
||||
end.
|
||||
|
||||
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
|
||||
try_unsubscribe(ChannelId, Channels),
|
||||
{ok, State#{channels => maps:remove(ChannelId, Channels)}}.
|
||||
|
||||
on_get_channels(InstanceId) ->
|
||||
emqx_bridge_v2:get_channels_for_connector(InstanceId).
|
||||
|
||||
on_stop(ResourceID, _State) ->
|
||||
?SLOG(info, #{
|
||||
msg => "stopping_rabbitmq_connector",
|
||||
connector => ResourceID
|
||||
}),
|
||||
stop_clients_and_pool(ResourceID).
|
||||
|
||||
stop_clients_and_pool(PoolName) ->
|
||||
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
||||
Clients = [
|
||||
begin
|
||||
{ok, Client} = ecpool_worker:client(Worker),
|
||||
Client
|
||||
end
|
||||
|| Worker <- Workers
|
||||
],
|
||||
%% We need to stop the pool before stopping the workers as the pool monitors the workers
|
||||
StopResult = emqx_resource_pool:stop(PoolName),
|
||||
lists:foreach(fun stop_worker/1, Clients),
|
||||
StopResult.
|
||||
|
||||
stop_worker({Channel, Connection}) ->
|
||||
amqp_channel:close(Channel),
|
||||
amqp_connection:close(Connection).
|
||||
|
||||
%% This is the callback function that is called by ecpool when the pool is
|
||||
%% started
|
||||
lists:foreach(
|
||||
fun({_Name, Worker}) ->
|
||||
case ecpool_worker:client(Worker) of
|
||||
{ok, Conn} -> amqp_connection:close(Conn);
|
||||
_ -> ok
|
||||
end
|
||||
end,
|
||||
ecpool:workers(ResourceID)
|
||||
),
|
||||
emqx_resource_pool:stop(ResourceID).
|
||||
|
||||
%% This is the callback function that is called by ecpool
|
||||
-spec connect(term()) -> {ok, {pid(), pid()}, map()} | {error, term()}.
|
||||
connect(Options) ->
|
||||
Config = proplists:get_value(config, Options),
|
||||
try
|
||||
create_rabbitmq_connection_and_channel(Config)
|
||||
catch
|
||||
_:{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "rabbitmq_connector_connection_failed",
|
||||
error_type => error,
|
||||
error_reason => Reason,
|
||||
config => emqx_utils:redact(Config)
|
||||
}),
|
||||
{error, Reason};
|
||||
Type:Reason ->
|
||||
?SLOG(error, #{
|
||||
msg => "rabbitmq_connector_connection_failed",
|
||||
error_type => Type,
|
||||
error_reason => Reason,
|
||||
config => emqx_utils:redact(Config)
|
||||
}),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
create_rabbitmq_connection_and_channel(Config) ->
|
||||
#{
|
||||
server := Host,
|
||||
port := Port,
|
||||
|
@ -314,237 +147,164 @@ create_rabbitmq_connection_and_channel(Config) ->
|
|||
password := WrappedPassword,
|
||||
timeout := Timeout,
|
||||
virtual_host := VirtualHost,
|
||||
heartbeat := Heartbeat,
|
||||
wait_for_publish_confirmations := WaitForPublishConfirmations
|
||||
heartbeat := Heartbeat
|
||||
} = Config,
|
||||
%% TODO: teach `amqp` to accept 0-arity closures as passwords.
|
||||
Password = emqx_secret:unwrap(WrappedPassword),
|
||||
SSLOptions =
|
||||
case maps:get(ssl, Config, #{}) of
|
||||
#{enable := true} = SSLOpts ->
|
||||
emqx_tls_lib:to_client_opts(SSLOpts);
|
||||
_ ->
|
||||
none
|
||||
end,
|
||||
RabbitMQConnectionOptions =
|
||||
RabbitMQConnOptions =
|
||||
#amqp_params_network{
|
||||
host = erlang:binary_to_list(Host),
|
||||
host = Host,
|
||||
port = Port,
|
||||
ssl_options = SSLOptions,
|
||||
ssl_options = to_ssl_options(Config),
|
||||
username = Username,
|
||||
password = Password,
|
||||
connection_timeout = Timeout,
|
||||
virtual_host = VirtualHost,
|
||||
heartbeat = Heartbeat
|
||||
},
|
||||
{ok, RabbitMQConnection} =
|
||||
case amqp_connection:start(RabbitMQConnectionOptions) of
|
||||
{ok, Connection} ->
|
||||
{ok, Connection};
|
||||
{error, Reason} ->
|
||||
erlang:error({error, Reason})
|
||||
end,
|
||||
{ok, RabbitMQChannel} =
|
||||
case amqp_connection:open_channel(RabbitMQConnection) of
|
||||
{ok, Channel} ->
|
||||
{ok, Channel};
|
||||
{error, OpenChannelErrorReason} ->
|
||||
erlang:error({error, OpenChannelErrorReason})
|
||||
end,
|
||||
%% We need to enable confirmations if we want to wait for them
|
||||
case WaitForPublishConfirmations of
|
||||
true ->
|
||||
case amqp_channel:call(RabbitMQChannel, #'confirm.select'{}) of
|
||||
#'confirm.select_ok'{} ->
|
||||
ok;
|
||||
Error ->
|
||||
ConfirmModeErrorReason =
|
||||
erlang:iolist_to_binary(
|
||||
io_lib:format(
|
||||
"Could not enable RabbitMQ confirmation mode ~p",
|
||||
[Error]
|
||||
)
|
||||
),
|
||||
erlang:error({error, ConfirmModeErrorReason})
|
||||
end;
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
{ok, {RabbitMQConnection, RabbitMQChannel}, #{
|
||||
supervisees => [RabbitMQConnection, RabbitMQChannel]
|
||||
}}.
|
||||
|
||||
%% emqx_resource callback called to check the status of the resource
|
||||
case amqp_connection:start(RabbitMQConnOptions) of
|
||||
{ok, RabbitMQConn} ->
|
||||
{ok, RabbitMQConn};
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "rabbitmq_connector_connection_failed",
|
||||
reason => Reason,
|
||||
config => emqx_utils:redact(Config)
|
||||
}),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec on_get_status(resource_id(), term()) ->
|
||||
{connected, resource_state()} | {disconnected, resource_state(), binary()}.
|
||||
on_get_status(
|
||||
_InstId,
|
||||
#{
|
||||
poolname := PoolName
|
||||
} = State
|
||||
) ->
|
||||
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
||||
Clients = [
|
||||
begin
|
||||
{ok, Client} = ecpool_worker:client(Worker),
|
||||
Client
|
||||
end
|
||||
|| Worker <- Workers
|
||||
],
|
||||
CheckResults = [
|
||||
check_worker(Client)
|
||||
|| Client <- Clients
|
||||
],
|
||||
Connected = length(CheckResults) > 0 andalso lists:all(fun(R) -> R end, CheckResults),
|
||||
case Connected of
|
||||
true ->
|
||||
{connected, State};
|
||||
false ->
|
||||
{disconnected, State, <<"not_connected">>}
|
||||
end;
|
||||
on_get_status(
|
||||
_InstId,
|
||||
State
|
||||
) ->
|
||||
{disconnect, State, <<"not_connected: no connection pool in state">>}.
|
||||
on_get_status(PoolName, #{channels := Channels} = State) ->
|
||||
ChannelNum = maps:size(Channels),
|
||||
Conns = get_rabbitmq_connections(PoolName),
|
||||
Check =
|
||||
lists:all(
|
||||
fun(Conn) ->
|
||||
[{num_channels, ActualNum}] = amqp_connection:info(Conn, [num_channels]),
|
||||
ChannelNum >= ActualNum
|
||||
end,
|
||||
Conns
|
||||
),
|
||||
case Check andalso Conns =/= [] of
|
||||
true -> {connected, State};
|
||||
false -> {disconnected, State, <<"not_connected">>}
|
||||
end.
|
||||
|
||||
check_worker({Channel, Connection}) ->
|
||||
erlang:is_process_alive(Channel) andalso erlang:is_process_alive(Connection).
|
||||
on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
|
||||
case emqx_utils_maps:deep_find([ChannelId, rabbitmq], Channels) of
|
||||
{ok, RabbitMQ} ->
|
||||
case lists:all(fun is_process_alive/1, maps:values(RabbitMQ)) of
|
||||
true -> connected;
|
||||
false -> {error, not_connected}
|
||||
end;
|
||||
_ ->
|
||||
{error, not_exists}
|
||||
end.
|
||||
|
||||
%% emqx_resource callback that is called when a non-batch query is received
|
||||
|
||||
-spec on_query(resource_id(), Request, resource_state()) -> query_result() when
|
||||
Request :: {RequestType, Data},
|
||||
RequestType :: send_message,
|
||||
Data :: map().
|
||||
on_query(
|
||||
ResourceID,
|
||||
{RequestType, Data},
|
||||
#{
|
||||
poolname := PoolName,
|
||||
processed_payload_template := PayloadTemplate,
|
||||
config := Config
|
||||
} = State
|
||||
) ->
|
||||
on_query(ResourceID, {ChannelId, Data} = MsgReq, State) ->
|
||||
?SLOG(debug, #{
|
||||
msg => "rabbitmq_connector_received_query",
|
||||
connector => ResourceID,
|
||||
type => RequestType,
|
||||
channel => ChannelId,
|
||||
data => Data,
|
||||
state => emqx_utils:redact(State)
|
||||
}),
|
||||
MessageData = format_data(PayloadTemplate, Data),
|
||||
Res = ecpool:pick_and_do(
|
||||
PoolName,
|
||||
{?MODULE, publish_messages, [Config, [MessageData]]},
|
||||
no_handover
|
||||
),
|
||||
handle_result(Res).
|
||||
#{channels := Channels} = State,
|
||||
case maps:find(ChannelId, Channels) of
|
||||
{ok, #{param := ProcParam, rabbitmq := RabbitMQ}} ->
|
||||
Res = ecpool:pick_and_do(
|
||||
ResourceID,
|
||||
{?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq]]},
|
||||
no_handover
|
||||
),
|
||||
handle_result(Res);
|
||||
error ->
|
||||
{error, {unrecoverable_error, {invalid_message_tag, ChannelId}}}
|
||||
end.
|
||||
|
||||
%% emqx_resource callback that is called when a batch query is received
|
||||
|
||||
-spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when
|
||||
BatchReq :: nonempty_list({'send_message', map()}).
|
||||
on_batch_query(
|
||||
ResourceID,
|
||||
BatchReq,
|
||||
State
|
||||
) ->
|
||||
on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) ->
|
||||
?SLOG(debug, #{
|
||||
msg => "rabbitmq_connector_received_batch_query",
|
||||
connector => ResourceID,
|
||||
data => BatchReq,
|
||||
data => Batch,
|
||||
state => emqx_utils:redact(State)
|
||||
}),
|
||||
%% Currently we only support batch requests with the send_message key
|
||||
{Keys, MessagesToInsert} = lists:unzip(BatchReq),
|
||||
ensure_keys_are_of_type_send_message(Keys),
|
||||
%% Pick out the payload template
|
||||
#{
|
||||
processed_payload_template := PayloadTemplate,
|
||||
poolname := PoolName,
|
||||
config := Config
|
||||
} = State,
|
||||
%% Create batch payload
|
||||
FormattedMessages = [
|
||||
format_data(PayloadTemplate, Data)
|
||||
|| Data <- MessagesToInsert
|
||||
],
|
||||
%% Publish the messages
|
||||
Res = ecpool:pick_and_do(
|
||||
PoolName,
|
||||
{?MODULE, publish_messages, [Config, FormattedMessages]},
|
||||
no_handover
|
||||
),
|
||||
handle_result(Res).
|
||||
#{channels := Channels} = State,
|
||||
case maps:find(ChannelId, Channels) of
|
||||
{ok, #{param := ProcParam, rabbitmq := RabbitMQ}} ->
|
||||
Res = ecpool:pick_and_do(
|
||||
ResourceID,
|
||||
{?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch]},
|
||||
no_handover
|
||||
),
|
||||
handle_result(Res);
|
||||
error ->
|
||||
{error, {unrecoverable_error, {invalid_message_tag, ChannelId}}}
|
||||
end.
|
||||
|
||||
publish_messages(
|
||||
{_Connection, Channel},
|
||||
Conn,
|
||||
RabbitMQ,
|
||||
#{
|
||||
delivery_mode := DeliveryMode,
|
||||
payload_template := PayloadTmpl,
|
||||
routing_key := RoutingKey,
|
||||
exchange := Exchange,
|
||||
wait_for_publish_confirmations := WaitForPublishConfirmations,
|
||||
publish_confirmation_timeout := PublishConfirmationTimeout
|
||||
} = _Config,
|
||||
},
|
||||
Messages
|
||||
) ->
|
||||
MessageProperties = #'P_basic'{
|
||||
headers = [],
|
||||
delivery_mode = DeliveryMode
|
||||
},
|
||||
Method = #'basic.publish'{
|
||||
exchange = Exchange,
|
||||
routing_key = RoutingKey
|
||||
},
|
||||
_ = [
|
||||
amqp_channel:cast(
|
||||
Channel,
|
||||
Method,
|
||||
#amqp_msg{
|
||||
payload = Message,
|
||||
props = MessageProperties
|
||||
}
|
||||
)
|
||||
|| Message <- Messages
|
||||
],
|
||||
case WaitForPublishConfirmations of
|
||||
true ->
|
||||
case amqp_channel:wait_for_confirms(Channel, PublishConfirmationTimeout) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
erlang:error(
|
||||
{recoverable_error,
|
||||
<<"RabbitMQ: Got NACK when waiting for message acknowledgment.">>}
|
||||
);
|
||||
timeout ->
|
||||
erlang:error(
|
||||
{recoverable_error,
|
||||
<<"RabbitMQ: Timeout when waiting for message acknowledgment.">>}
|
||||
case maps:find(Conn, RabbitMQ) of
|
||||
{ok, Channel} ->
|
||||
MessageProperties = #'P_basic'{
|
||||
headers = [],
|
||||
delivery_mode = DeliveryMode
|
||||
},
|
||||
Method = #'basic.publish'{
|
||||
exchange = Exchange,
|
||||
routing_key = RoutingKey
|
||||
},
|
||||
lists:foreach(
|
||||
fun({_, MsgRaw}) ->
|
||||
amqp_channel:cast(
|
||||
Channel,
|
||||
Method,
|
||||
#amqp_msg{
|
||||
payload = format_data(PayloadTmpl, MsgRaw),
|
||||
props = MessageProperties
|
||||
}
|
||||
)
|
||||
end,
|
||||
Messages
|
||||
),
|
||||
case WaitForPublishConfirmations of
|
||||
true ->
|
||||
case amqp_channel:wait_for_confirms(Channel, PublishConfirmationTimeout) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
erlang:error(
|
||||
{recoverable_error,
|
||||
<<"RabbitMQ: Got NACK when waiting for message acknowledgment.">>}
|
||||
);
|
||||
timeout ->
|
||||
erlang:error(
|
||||
{recoverable_error,
|
||||
<<"RabbitMQ: Timeout when waiting for message acknowledgment.">>}
|
||||
)
|
||||
end;
|
||||
false ->
|
||||
ok
|
||||
end;
|
||||
false ->
|
||||
ok
|
||||
end.
|
||||
|
||||
ensure_keys_are_of_type_send_message(Keys) ->
|
||||
case lists:all(fun is_send_message_atom/1, Keys) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
error ->
|
||||
erlang:error(
|
||||
{unrecoverable_error,
|
||||
<<"Unexpected type for batch message (Expected send_message)">>}
|
||||
{recoverable_error, {<<"RabbitMQ: channel_not_found">>, Conn, RabbitMQ}}
|
||||
)
|
||||
end.
|
||||
|
||||
is_send_message_atom(send_message) ->
|
||||
true;
|
||||
is_send_message_atom(_) ->
|
||||
false.
|
||||
|
||||
format_data([], Msg) ->
|
||||
emqx_utils_json:encode(Msg);
|
||||
format_data(Tokens, Msg) ->
|
||||
|
@ -554,3 +314,119 @@ handle_result({error, ecpool_empty}) ->
|
|||
{error, {recoverable_error, ecpool_empty}};
|
||||
handle_result(Res) ->
|
||||
Res.
|
||||
|
||||
make_channel(PoolName, ChannelId, Params) ->
|
||||
Conns = get_rabbitmq_connections(PoolName),
|
||||
make_channel(Conns, PoolName, ChannelId, Params, #{}).
|
||||
|
||||
make_channel([], _PoolName, _ChannelId, _Param, Acc) ->
|
||||
{ok, Acc};
|
||||
make_channel([Conn | Conns], PoolName, ChannelId, Params, Acc) ->
|
||||
maybe
|
||||
{ok, RabbitMQChannel} ?= amqp_connection:open_channel(Conn),
|
||||
ok ?= try_confirm_channel(Params, RabbitMQChannel),
|
||||
ok ?= try_subscribe(Params, RabbitMQChannel, PoolName, ChannelId),
|
||||
NewAcc = Acc#{Conn => RabbitMQChannel},
|
||||
make_channel(Conns, PoolName, ChannelId, Params, NewAcc)
|
||||
end.
|
||||
|
||||
%% We need to enable confirmations if we want to wait for them
|
||||
try_confirm_channel(#{wait_for_publish_confirmations := true}, Channel) ->
|
||||
case amqp_channel:call(Channel, #'confirm.select'{}) of
|
||||
#'confirm.select_ok'{} ->
|
||||
ok;
|
||||
Error ->
|
||||
Reason =
|
||||
iolist_to_binary(
|
||||
io_lib:format(
|
||||
"Could not enable RabbitMQ confirmation mode ~p",
|
||||
[Error]
|
||||
)
|
||||
),
|
||||
{error, Reason}
|
||||
end;
|
||||
try_confirm_channel(#{wait_for_publish_confirmations := false}, _Channel) ->
|
||||
ok.
|
||||
|
||||
%% Initialize Rabbitmq's secret library so that the password is encrypted
|
||||
%% in the log files.
|
||||
init_secret() ->
|
||||
case credentials_obfuscation:secret() of
|
||||
?PENDING_SECRET ->
|
||||
Bytes = crypto:strong_rand_bytes(128),
|
||||
%% The password can appear in log files if we don't do this
|
||||
credentials_obfuscation:set_secret(Bytes);
|
||||
_ ->
|
||||
%% Already initialized
|
||||
ok
|
||||
end.
|
||||
|
||||
preproc_parameter(#{config_root := actions, parameters := Parameter}) ->
|
||||
#{
|
||||
payload_template := PayloadTemplate,
|
||||
delivery_mode := InitialDeliveryMode
|
||||
} = Parameter,
|
||||
Parameter#{
|
||||
delivery_mode => delivery_mode(InitialDeliveryMode),
|
||||
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
|
||||
config_root => actions
|
||||
};
|
||||
preproc_parameter(#{config_root := sources, parameters := Parameter, hookpoints := Hooks}) ->
|
||||
#{
|
||||
payload_template := PayloadTmpl,
|
||||
qos := QosTmpl,
|
||||
topic := TopicTmpl
|
||||
} = Parameter,
|
||||
Parameter#{
|
||||
payload_template => emqx_placeholder:preproc_tmpl(PayloadTmpl),
|
||||
qos => preproc_qos(QosTmpl),
|
||||
topic => emqx_placeholder:preproc_tmpl(TopicTmpl),
|
||||
hookpoints => Hooks,
|
||||
config_root => sources
|
||||
}.
|
||||
|
||||
preproc_qos(Qos) when is_integer(Qos) -> Qos;
|
||||
preproc_qos(Qos) -> emqx_placeholder:preproc_tmpl(Qos).
|
||||
|
||||
delivery_mode(non_persistent) -> 1;
|
||||
delivery_mode(persistent) -> 2.
|
||||
|
||||
to_ssl_options(#{ssl := #{enable := true} = SSLOpts}) ->
|
||||
emqx_tls_lib:to_client_opts(SSLOpts);
|
||||
to_ssl_options(_) ->
|
||||
none.
|
||||
|
||||
get_rabbitmq_connections(PoolName) ->
|
||||
lists:filtermap(
|
||||
fun({_Name, Worker}) ->
|
||||
case ecpool_worker:client(Worker) of
|
||||
{ok, Conn} -> {true, Conn};
|
||||
_ -> false
|
||||
end
|
||||
end,
|
||||
ecpool:workers(PoolName)
|
||||
).
|
||||
|
||||
try_subscribe(
|
||||
#{queue := Queue, no_ack := NoAck, config_root := sources} = Params,
|
||||
RabbitChan,
|
||||
PoolName,
|
||||
ChannelId
|
||||
) ->
|
||||
WorkState = {RabbitChan, PoolName, Params},
|
||||
{ok, ConsumePid} = emqx_bridge_rabbitmq_sup:ensure_started(ChannelId, WorkState),
|
||||
BasicConsume = #'basic.consume'{queue = Queue, no_ack = NoAck},
|
||||
#'basic.consume_ok'{consumer_tag = _} =
|
||||
amqp_channel:subscribe(RabbitChan, BasicConsume, ConsumePid),
|
||||
ok;
|
||||
try_subscribe(#{config_root := actions}, _RabbitChan, _PoolName, _ChannelId) ->
|
||||
ok.
|
||||
|
||||
try_unsubscribe(ChannelId, Channels) ->
|
||||
case emqx_utils_maps:deep_find([ChannelId, rabbitmq], Channels) of
|
||||
{ok, RabbitMQ} ->
|
||||
lists:foreach(fun(Pid) -> catch amqp_channel:close(Pid) end, maps:values(RabbitMQ)),
|
||||
emqx_bridge_rabbitmq_sup:ensure_deleted(ChannelId);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_rabbitmq_connector_schema).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
|
||||
-define(TYPE, rabbitmq).
|
||||
|
||||
-export([roots/0, fields/1, desc/1, namespace/0]).
|
||||
-export([connector_examples/1, connector_example_values/0]).
|
||||
|
||||
%%======================================================================================
|
||||
%% Hocon Schema Definitions
|
||||
namespace() -> ?TYPE.
|
||||
|
||||
roots() -> [].
|
||||
|
||||
fields("config_connector") ->
|
||||
emqx_bridge_schema:common_bridge_fields() ++
|
||||
fields(connector) ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
||||
fields(connector) ->
|
||||
[
|
||||
{server,
|
||||
?HOCON(
|
||||
string(),
|
||||
#{
|
||||
default => <<"localhost">>,
|
||||
desc => ?DESC("server")
|
||||
}
|
||||
)},
|
||||
{port,
|
||||
?HOCON(
|
||||
emqx_schema:port_number(),
|
||||
#{
|
||||
default => 5672,
|
||||
desc => ?DESC("server")
|
||||
}
|
||||
)},
|
||||
{username,
|
||||
?HOCON(
|
||||
binary(),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC("username")
|
||||
}
|
||||
)},
|
||||
{password, emqx_connector_schema_lib:password_field(#{required => true})},
|
||||
{pool_size,
|
||||
?HOCON(
|
||||
pos_integer(),
|
||||
#{
|
||||
default => 8,
|
||||
desc => ?DESC("pool_size")
|
||||
}
|
||||
)},
|
||||
{timeout,
|
||||
?HOCON(
|
||||
emqx_schema:timeout_duration_ms(),
|
||||
#{
|
||||
default => <<"5s">>,
|
||||
desc => ?DESC("timeout")
|
||||
}
|
||||
)},
|
||||
{virtual_host,
|
||||
?HOCON(
|
||||
binary(),
|
||||
#{
|
||||
default => <<"/">>,
|
||||
desc => ?DESC("virtual_host")
|
||||
}
|
||||
)},
|
||||
{heartbeat,
|
||||
?HOCON(
|
||||
emqx_schema:timeout_duration_ms(),
|
||||
#{
|
||||
default => <<"30s">>,
|
||||
desc => ?DESC("heartbeat")
|
||||
}
|
||||
)}
|
||||
] ++
|
||||
emqx_connector_schema_lib:ssl_fields();
|
||||
fields(connector_resource_opts) ->
|
||||
emqx_connector_schema:resource_opts_fields();
|
||||
fields("post") ->
|
||||
emqx_connector_schema:type_and_name_fields(?TYPE) ++ fields("config_connector");
|
||||
fields("put") ->
|
||||
fields("config_connector");
|
||||
fields("get") ->
|
||||
emqx_bridge_schema:status_fields() ++ fields("config_connector").
|
||||
|
||||
desc("config_connector") ->
|
||||
?DESC("config_connector");
|
||||
desc(connector_resource_opts) ->
|
||||
?DESC(connector_resource_opts);
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
connector_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"rabbitmq">> =>
|
||||
#{
|
||||
summary => <<"Rabbitmq Connector">>,
|
||||
value => emqx_connector_schema:connector_values(
|
||||
Method, ?TYPE, connector_example_values()
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
connector_example_values() ->
|
||||
#{
|
||||
name => <<"rabbitmq_connector">>,
|
||||
type => rabbitmq,
|
||||
enable => true,
|
||||
server => <<"127.0.0.1">>,
|
||||
port => 5672,
|
||||
username => <<"guest">>,
|
||||
password => <<"******">>,
|
||||
pool_size => 8,
|
||||
timeout => <<"5s">>,
|
||||
virtual_host => <<"/">>,
|
||||
heartbeat => <<"30s">>,
|
||||
ssl => #{enable => false}
|
||||
}.
|
|
@ -0,0 +1,273 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_rabbitmq_pubsub_schema).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
|
||||
-export([roots/0, fields/1, desc/1, namespace/0]).
|
||||
|
||||
-export([
|
||||
bridge_v2_examples/1,
|
||||
source_examples/1
|
||||
]).
|
||||
|
||||
-define(ACTION_TYPE, rabbitmq).
|
||||
-define(SOURCE_TYPE, rabbitmq).
|
||||
-define(CONNECTOR_SCHEMA, emqx_bridge_rabbitmq_connector_schema).
|
||||
|
||||
%%======================================================================================
|
||||
%% Hocon Schema Definitions
|
||||
namespace() -> "bridge_rabbitmq".
|
||||
|
||||
roots() -> [].
|
||||
|
||||
fields(action) ->
|
||||
{rabbitmq,
|
||||
?HOCON(
|
||||
?MAP(name, ?R_REF(publisher_action)),
|
||||
#{
|
||||
desc => <<"RabbitMQ Action Config">>,
|
||||
required => false
|
||||
}
|
||||
)};
|
||||
fields(publisher_action) ->
|
||||
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||
?HOCON(
|
||||
?R_REF(action_parameters),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(action_parameters)
|
||||
}
|
||||
),
|
||||
#{resource_opts_ref => ?R_REF(action_resource_opts)}
|
||||
);
|
||||
fields(action_parameters) ->
|
||||
[
|
||||
{wait_for_publish_confirmations,
|
||||
hoconsc:mk(
|
||||
boolean(),
|
||||
#{
|
||||
default => true,
|
||||
desc => ?DESC(?CONNECTOR_SCHEMA, "wait_for_publish_confirmations")
|
||||
}
|
||||
)},
|
||||
{publish_confirmation_timeout,
|
||||
hoconsc:mk(
|
||||
emqx_schema:timeout_duration_ms(),
|
||||
#{
|
||||
default => <<"30s">>,
|
||||
desc => ?DESC(?CONNECTOR_SCHEMA, "timeout")
|
||||
}
|
||||
)},
|
||||
{exchange,
|
||||
hoconsc:mk(
|
||||
typerefl:binary(),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(?CONNECTOR_SCHEMA, "exchange")
|
||||
}
|
||||
)},
|
||||
{routing_key,
|
||||
hoconsc:mk(
|
||||
typerefl:binary(),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(?CONNECTOR_SCHEMA, "routing_key")
|
||||
}
|
||||
)},
|
||||
{delivery_mode,
|
||||
hoconsc:mk(
|
||||
hoconsc:enum([non_persistent, persistent]),
|
||||
#{
|
||||
default => non_persistent,
|
||||
desc => ?DESC(?CONNECTOR_SCHEMA, "delivery_mode")
|
||||
}
|
||||
)},
|
||||
{payload_template,
|
||||
hoconsc:mk(
|
||||
binary(),
|
||||
#{
|
||||
desc => ?DESC(?CONNECTOR_SCHEMA, "payload_template")
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(source) ->
|
||||
{rabbitmq,
|
||||
?HOCON(
|
||||
hoconsc:map(name, ?R_REF(subscriber_source)),
|
||||
#{
|
||||
desc => <<"MQTT Subscriber Source Config">>,
|
||||
required => false
|
||||
}
|
||||
)};
|
||||
fields(subscriber_source) ->
|
||||
emqx_bridge_v2_schema:make_consumer_action_schema(
|
||||
?HOCON(
|
||||
?R_REF(source_parameters),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC("source_parameters")
|
||||
}
|
||||
)
|
||||
);
|
||||
fields(source_parameters) ->
|
||||
[
|
||||
{wait_for_publish_confirmations,
|
||||
hoconsc:mk(
|
||||
boolean(),
|
||||
#{
|
||||
default => true,
|
||||
desc => ?DESC(?CONNECTOR_SCHEMA, "wait_for_publish_confirmations")
|
||||
}
|
||||
)},
|
||||
{topic,
|
||||
?HOCON(
|
||||
binary(),
|
||||
#{
|
||||
required => true,
|
||||
validator => fun emqx_schema:non_empty_string/1,
|
||||
desc => ?DESC("source_topic")
|
||||
}
|
||||
)},
|
||||
{qos,
|
||||
?HOCON(
|
||||
?UNION([emqx_schema:qos(), binary()]),
|
||||
#{
|
||||
default => 0,
|
||||
desc => ?DESC("source_qos")
|
||||
}
|
||||
)},
|
||||
{payload_template,
|
||||
?HOCON(
|
||||
binary(),
|
||||
#{
|
||||
required => false,
|
||||
desc => ?DESC("source_payload_template")
|
||||
}
|
||||
)},
|
||||
{queue,
|
||||
?HOCON(
|
||||
binary(),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC("source_queue")
|
||||
}
|
||||
)},
|
||||
{no_ack,
|
||||
?HOCON(
|
||||
boolean(),
|
||||
#{
|
||||
required => false,
|
||||
default => true,
|
||||
desc => ?DESC("source_no_ack")
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(action_resource_opts) ->
|
||||
emqx_bridge_v2_schema:action_resource_opts_fields();
|
||||
fields(source_resource_opts) ->
|
||||
emqx_bridge_v2_schema:source_resource_opts_fields();
|
||||
fields(Field) when
|
||||
Field == "get_bridge_v2";
|
||||
Field == "post_bridge_v2";
|
||||
Field == "put_bridge_v2"
|
||||
->
|
||||
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(publisher_action));
|
||||
fields(Field) when
|
||||
Field == "get_source";
|
||||
Field == "post_source";
|
||||
Field == "put_source"
|
||||
->
|
||||
emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(subscriber_source));
|
||||
fields(What) ->
|
||||
error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}).
|
||||
%% v2: api schema
|
||||
%% The parameter equals to
|
||||
%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
|
||||
%% `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1
|
||||
%%--------------------------------------------------------------------
|
||||
%% v1/v2
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(action_resource_opts) ->
|
||||
?DESC(emqx_resource_schema, "creation_opts");
|
||||
desc(source_resource_opts) ->
|
||||
?DESC(emqx_resource_schema, "creation_opts");
|
||||
desc(action_parameters) ->
|
||||
?DESC(action_parameters);
|
||||
desc(source_parameters) ->
|
||||
?DESC(source_parameters);
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for WebHook using `", string:to_upper(Method), "` method."];
|
||||
desc("http_action") ->
|
||||
?DESC("desc_config");
|
||||
desc("parameters_opts") ->
|
||||
?DESC("config_parameters_opts");
|
||||
desc(publisher_action) ->
|
||||
?DESC(publisher_action);
|
||||
desc(subscriber_source) ->
|
||||
?DESC(subscriber_source);
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
bridge_v2_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"rabbitmq">> => #{
|
||||
summary => <<"RabbitMQ Producer Action">>,
|
||||
value => emqx_bridge_v2_schema:action_values(
|
||||
Method,
|
||||
_ActionType = ?ACTION_TYPE,
|
||||
_ConnectorType = rabbitmq,
|
||||
#{
|
||||
parameters => #{
|
||||
wait_for_publish_confirmations => true,
|
||||
publish_confirmation_timeout => <<"30s">>,
|
||||
exchange => <<"test_exchange">>,
|
||||
routing_key => <<"/">>,
|
||||
delivery_mode => <<"non_persistent">>,
|
||||
payload_template => <<"${.payload}">>
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
source_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"rabbitmq">> => #{
|
||||
summary => <<"RabbitMQ Subscriber Source">>,
|
||||
value => emqx_bridge_v2_schema:source_values(
|
||||
Method,
|
||||
_SourceType = ?SOURCE_TYPE,
|
||||
_ConnectorType = rabbitmq,
|
||||
#{
|
||||
parameters => #{
|
||||
topic => <<"${payload.mqtt_topic}">>,
|
||||
qos => <<"${payload.mqtt_qos}">>,
|
||||
payload_template => <<"${payload.mqtt_payload}">>,
|
||||
queue => <<"test_queue">>,
|
||||
no_ack => true
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
|
@ -0,0 +1,32 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_rabbitmq_source_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([init/1]).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link(?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
SupFlags = #{
|
||||
strategy => simple_one_for_one,
|
||||
intensity => 100,
|
||||
period => 10
|
||||
},
|
||||
{ok, {SupFlags, [worker_spec()]}}.
|
||||
|
||||
worker_spec() ->
|
||||
Mod = emqx_bridge_rabbitmq_source_worker,
|
||||
#{
|
||||
id => Mod,
|
||||
start => {Mod, start_link, []},
|
||||
restart => transient,
|
||||
shutdown => brutal_kill,
|
||||
type => worker,
|
||||
modules => [Mod]
|
||||
}.
|
|
@ -0,0 +1,85 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_rabbitmq_source_worker).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-export([start_link/1]).
|
||||
-export([
|
||||
init/1,
|
||||
handle_continue/2,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2
|
||||
]).
|
||||
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
||||
start_link(Args) ->
|
||||
gen_server:start_link(?MODULE, Args, []).
|
||||
|
||||
init({_RabbitChannel, _InstanceId, _Params} = State) ->
|
||||
{ok, State, {continue, confirm_ok}}.
|
||||
|
||||
handle_continue(confirm_ok, State) ->
|
||||
receive
|
||||
#'basic.consume_ok'{} -> {noreply, State}
|
||||
end.
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast(_Request, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(
|
||||
{#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{
|
||||
payload = Payload,
|
||||
props = #'P_basic'{message_id = MessageId, headers = Headers}
|
||||
}},
|
||||
{Channel, InstanceId, Params} = State
|
||||
) ->
|
||||
#{
|
||||
hookpoints := Hooks,
|
||||
payload_template := PayloadTmpl,
|
||||
qos := QoSTmpl,
|
||||
topic := TopicTmpl,
|
||||
no_ack := NoAck
|
||||
} = Params,
|
||||
MQTTMsg = emqx_message:make(
|
||||
make_message_id(MessageId),
|
||||
InstanceId,
|
||||
render(Payload, QoSTmpl),
|
||||
render(Payload, TopicTmpl),
|
||||
render(Payload, PayloadTmpl),
|
||||
#{},
|
||||
make_headers(Headers)
|
||||
),
|
||||
_ = emqx:publish(MQTTMsg),
|
||||
lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [MQTTMsg]) end, Hooks),
|
||||
(NoAck =:= false) andalso
|
||||
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
|
||||
emqx_resource_metrics:received_inc(InstanceId),
|
||||
{noreply, State};
|
||||
handle_info(#'basic.cancel_ok'{}, State) ->
|
||||
{stop, normal, State};
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
render(_Message, QoS) when is_integer(QoS) -> QoS;
|
||||
render(Message, PayloadTmpl) ->
|
||||
Opts = #{return => full_binary},
|
||||
emqx_placeholder:proc_tmpl(PayloadTmpl, Message, Opts).
|
||||
|
||||
make_message_id(undefined) -> emqx_guid:gen();
|
||||
make_message_id(Id) -> Id.
|
||||
|
||||
make_headers(undefined) ->
|
||||
#{};
|
||||
make_headers(Headers) when is_list(Headers) ->
|
||||
maps:from_list([{Key, Value} || {Key, _Type, Value} <- Headers]).
|
|
@ -0,0 +1,76 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_rabbitmq_sup).
|
||||
|
||||
-feature(maybe_expr, enable).
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([ensure_started/2]).
|
||||
-export([ensure_deleted/1]).
|
||||
-export([start_link/0]).
|
||||
-export([init/1]).
|
||||
|
||||
-define(BRIDGE_SUP, ?MODULE).
|
||||
|
||||
ensure_started(SuperId, Config) ->
|
||||
{ok, SuperPid} = ensure_supervisor_started(SuperId),
|
||||
case supervisor:start_child(SuperPid, [Config]) of
|
||||
{ok, WorkPid} ->
|
||||
{ok, WorkPid};
|
||||
{error, {already_started, WorkPid}} ->
|
||||
{ok, WorkPid};
|
||||
{error, Error} ->
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
ensure_deleted(SuperId) ->
|
||||
maybe
|
||||
Pid = erlang:whereis(?BRIDGE_SUP),
|
||||
true ?= Pid =/= undefined,
|
||||
ok ?= supervisor:terminate_child(Pid, SuperId),
|
||||
ok ?= supervisor:delete_child(Pid, SuperId)
|
||||
else
|
||||
false -> ok;
|
||||
{error, not_found} -> ok;
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
ensure_supervisor_started(Id) ->
|
||||
SupervisorSpec =
|
||||
#{
|
||||
id => Id,
|
||||
start => {emqx_bridge_rabbitmq_source_sup, start_link, []},
|
||||
restart => permanent,
|
||||
type => supervisor
|
||||
},
|
||||
case supervisor:start_child(?BRIDGE_SUP, SupervisorSpec) of
|
||||
{ok, Pid} ->
|
||||
{ok, Pid};
|
||||
{error, {already_started, Pid}} ->
|
||||
{ok, Pid}
|
||||
end.
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?BRIDGE_SUP}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
SupFlags = #{
|
||||
strategy => one_for_one,
|
||||
intensity => 50,
|
||||
period => 10
|
||||
},
|
||||
ChildSpecs = [],
|
||||
{ok, {SupFlags, ChildSpecs}}.
|
|
@ -14,7 +14,8 @@
|
|||
|
||||
%% See comment in
|
||||
%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to
|
||||
%% run this without bringing up the whole CI infrastucture
|
||||
%% run this without bringing up the whole CI infrastructure
|
||||
-define(TYPE, <<"rabbitmq">>).
|
||||
|
||||
rabbit_mq_host() ->
|
||||
<<"rabbitmq">>.
|
||||
|
@ -34,8 +35,14 @@ rabbit_mq_routing_key() ->
|
|||
get_channel_connection(Config) ->
|
||||
proplists:get_value(channel_connection, Config).
|
||||
|
||||
get_rabbitmq(Config) ->
|
||||
proplists:get_value(rabbitmq, Config).
|
||||
|
||||
get_tls(Config) ->
|
||||
proplists:get_value(tls, Config).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Common Test Setup, Teardown and Testcase List
|
||||
%% Common Test Setup, Tear down and Testcase List
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
|
@ -101,35 +108,25 @@ common_init_per_group(Opts) ->
|
|||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
{ok, _} = application:ensure_all_started(amqp_client),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Opts),
|
||||
[{channel_connection, ChannelConnection}].
|
||||
#{host := Host, port := Port, tls := UseTLS} = Opts,
|
||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS),
|
||||
[
|
||||
{channel_connection, ChannelConnection},
|
||||
{rabbitmq, #{server => Host, port => Port}},
|
||||
{tls, UseTLS}
|
||||
].
|
||||
|
||||
setup_rabbit_mq_exchange_and_queue(#{host := RabbitMQHost, port := RabbitMQPort, tls := UseTLS}) ->
|
||||
setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) ->
|
||||
SSLOptions =
|
||||
case UseTLS of
|
||||
false ->
|
||||
none;
|
||||
true ->
|
||||
CertsDir = filename:join([
|
||||
emqx_common_test_helpers:proj_root(),
|
||||
".ci",
|
||||
"docker-compose-file",
|
||||
"certs"
|
||||
]),
|
||||
emqx_tls_lib:to_client_opts(
|
||||
#{
|
||||
enable => true,
|
||||
cacertfile => filename:join([CertsDir, "ca.crt"]),
|
||||
certfile => filename:join([CertsDir, "client.pem"]),
|
||||
keyfile => filename:join([CertsDir, "client.key"])
|
||||
}
|
||||
)
|
||||
false -> none;
|
||||
true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS))
|
||||
end,
|
||||
%% Create an exachange and a queue
|
||||
%% Create an exchange and a queue
|
||||
{ok, Connection} =
|
||||
amqp_connection:start(#amqp_params_network{
|
||||
host = RabbitMQHost,
|
||||
port = RabbitMQPort,
|
||||
host = Host,
|
||||
port = Port,
|
||||
ssl_options = SSLOptions
|
||||
}),
|
||||
{ok, Channel} = amqp_connection:open_channel(Connection),
|
||||
|
@ -184,8 +181,7 @@ init_per_testcase(_, Config) ->
|
|||
end_per_testcase(_, _Config) ->
|
||||
ok.
|
||||
|
||||
rabbitmq_config(Config) ->
|
||||
%%SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),
|
||||
rabbitmq_config(UseTLS, Config) ->
|
||||
BatchSize = maps:get(batch_size, Config, 1),
|
||||
BatchTime = maps:get(batch_time_ms, Config, 0),
|
||||
Name = atom_to_binary(?MODULE),
|
||||
|
@ -196,6 +192,7 @@ rabbitmq_config(Config) ->
|
|||
io_lib:format(
|
||||
"bridges.rabbitmq.~s {\n"
|
||||
" enable = true\n"
|
||||
" ssl = ~s\n"
|
||||
" server = \"~s\"\n"
|
||||
" port = ~p\n"
|
||||
" username = \"guest\"\n"
|
||||
|
@ -210,6 +207,7 @@ rabbitmq_config(Config) ->
|
|||
"}\n",
|
||||
[
|
||||
Name,
|
||||
hocon_pp:do(ssl_options(UseTLS), #{embedded => true}),
|
||||
Server,
|
||||
Port,
|
||||
rabbit_mq_routing_key(),
|
||||
|
@ -222,80 +220,86 @@ rabbitmq_config(Config) ->
|
|||
ct:pal(ConfigString),
|
||||
parse_and_check(ConfigString, <<"rabbitmq">>, Name).
|
||||
|
||||
ssl_options(true) ->
|
||||
CertsDir = filename:join([
|
||||
emqx_common_test_helpers:proj_root(),
|
||||
".ci",
|
||||
"docker-compose-file",
|
||||
"certs"
|
||||
]),
|
||||
#{
|
||||
enable => true,
|
||||
cacertfile => filename:join([CertsDir, "ca.crt"]),
|
||||
certfile => filename:join([CertsDir, "client.pem"]),
|
||||
keyfile => filename:join([CertsDir, "client.key"])
|
||||
};
|
||||
ssl_options(false) ->
|
||||
#{
|
||||
enable => false
|
||||
}.
|
||||
|
||||
parse_and_check(ConfigString, BridgeType, Name) ->
|
||||
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
||||
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
||||
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
|
||||
RetConfig.
|
||||
|
||||
make_bridge(Config) ->
|
||||
Type = <<"rabbitmq">>,
|
||||
Name = atom_to_binary(?MODULE),
|
||||
BridgeConfig = rabbitmq_config(Config),
|
||||
{ok, _} = emqx_bridge:create(
|
||||
Type,
|
||||
Name,
|
||||
BridgeConfig
|
||||
),
|
||||
emqx_bridge_resource:bridge_id(Type, Name).
|
||||
create_bridge(Name, UseTLS, Config) ->
|
||||
BridgeConfig = rabbitmq_config(UseTLS, Config),
|
||||
{ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig),
|
||||
emqx_bridge_resource:bridge_id(?TYPE, Name).
|
||||
|
||||
delete_bridge() ->
|
||||
Type = <<"rabbitmq">>,
|
||||
Name = atom_to_binary(?MODULE),
|
||||
ok = emqx_bridge:remove(Type, Name).
|
||||
delete_bridge(Name) ->
|
||||
ok = emqx_bridge:remove(?TYPE, Name).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Test Cases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_make_delete_bridge(_Config) ->
|
||||
make_bridge(#{}),
|
||||
%% Check that the new brige is in the list of bridges
|
||||
t_create_delete_bridge(Config) ->
|
||||
Name = atom_to_binary(?FUNCTION_NAME),
|
||||
RabbitMQ = get_rabbitmq(Config),
|
||||
UseTLS = get_tls(Config),
|
||||
create_bridge(Name, UseTLS, RabbitMQ),
|
||||
Bridges = emqx_bridge:list(),
|
||||
Name = atom_to_binary(?MODULE),
|
||||
IsRightName =
|
||||
fun
|
||||
(#{name := BName}) when BName =:= Name ->
|
||||
true;
|
||||
(_) ->
|
||||
false
|
||||
end,
|
||||
?assert(lists:any(IsRightName, Bridges)),
|
||||
delete_bridge(),
|
||||
Any = fun(#{name := BName}) -> BName =:= Name end,
|
||||
?assert(lists:any(Any, Bridges), Bridges),
|
||||
ok = delete_bridge(Name),
|
||||
BridgesAfterDelete = emqx_bridge:list(),
|
||||
?assertNot(lists:any(IsRightName, BridgesAfterDelete)),
|
||||
?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete),
|
||||
ok.
|
||||
|
||||
t_make_delete_bridge_non_existing_server(_Config) ->
|
||||
make_bridge(#{server => <<"non_existing_server">>, port => 3174}),
|
||||
%% Check that the new brige is in the list of bridges
|
||||
t_create_delete_bridge_non_existing_server(Config) ->
|
||||
Name = atom_to_binary(?FUNCTION_NAME),
|
||||
UseTLS = get_tls(Config),
|
||||
create_bridge(Name, UseTLS, #{server => <<"non_existing_server">>, port => 3174}),
|
||||
%% Check that the new bridge is in the list of bridges
|
||||
Bridges = emqx_bridge:list(),
|
||||
Name = atom_to_binary(?MODULE),
|
||||
IsRightName =
|
||||
fun
|
||||
(#{name := BName}) when BName =:= Name ->
|
||||
true;
|
||||
(_) ->
|
||||
false
|
||||
end,
|
||||
?assert(lists:any(IsRightName, Bridges)),
|
||||
delete_bridge(),
|
||||
Any = fun(#{name := BName}) -> BName =:= Name end,
|
||||
?assert(lists:any(Any, Bridges)),
|
||||
ok = delete_bridge(Name),
|
||||
BridgesAfterDelete = emqx_bridge:list(),
|
||||
?assertNot(lists:any(IsRightName, BridgesAfterDelete)),
|
||||
?assertNot(lists:any(Any, BridgesAfterDelete)),
|
||||
ok.
|
||||
|
||||
t_send_message_query(Config) ->
|
||||
BridgeID = make_bridge(#{batch_size => 1}),
|
||||
Name = atom_to_binary(?FUNCTION_NAME),
|
||||
RabbitMQ = get_rabbitmq(Config),
|
||||
UseTLS = get_tls(Config),
|
||||
BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{batch_size => 1}),
|
||||
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
|
||||
%% This will use the SQL template included in the bridge
|
||||
emqx_bridge:send_message(BridgeID, Payload),
|
||||
%% Check that the data got to the database
|
||||
?assertEqual(Payload, receive_simple_test_message(Config)),
|
||||
delete_bridge(),
|
||||
ok = delete_bridge(Name),
|
||||
ok.
|
||||
|
||||
t_send_message_query_with_template(Config) ->
|
||||
BridgeID = make_bridge(#{
|
||||
Name = atom_to_binary(?FUNCTION_NAME),
|
||||
RabbitMQ = get_rabbitmq(Config),
|
||||
UseTLS = get_tls(Config),
|
||||
BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{
|
||||
batch_size => 1,
|
||||
payload_template =>
|
||||
<<
|
||||
|
@ -318,24 +322,27 @@ t_send_message_query_with_template(Config) ->
|
|||
<<"secret">> => 42
|
||||
},
|
||||
?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
|
||||
delete_bridge(),
|
||||
ok = delete_bridge(Name),
|
||||
ok.
|
||||
|
||||
t_send_simple_batch(Config) ->
|
||||
BridgeConf =
|
||||
#{
|
||||
batch_size => 100
|
||||
},
|
||||
BridgeID = make_bridge(BridgeConf),
|
||||
Name = atom_to_binary(?FUNCTION_NAME),
|
||||
RabbitMQ = get_rabbitmq(Config),
|
||||
BridgeConf = RabbitMQ#{batch_size => 100},
|
||||
UseTLS = get_tls(Config),
|
||||
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
|
||||
Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
|
||||
emqx_bridge:send_message(BridgeID, Payload),
|
||||
?assertEqual(Payload, receive_simple_test_message(Config)),
|
||||
delete_bridge(),
|
||||
ok = delete_bridge(Name),
|
||||
ok.
|
||||
|
||||
t_send_simple_batch_with_template(Config) ->
|
||||
Name = atom_to_binary(?FUNCTION_NAME),
|
||||
RabbitMQ = get_rabbitmq(Config),
|
||||
UseTLS = get_tls(Config),
|
||||
BridgeConf =
|
||||
#{
|
||||
RabbitMQ#{
|
||||
batch_size => 100,
|
||||
payload_template =>
|
||||
<<
|
||||
|
@ -347,7 +354,7 @@ t_send_simple_batch_with_template(Config) ->
|
|||
"}"
|
||||
>>
|
||||
},
|
||||
BridgeID = make_bridge(BridgeConf),
|
||||
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
|
||||
Payload = #{
|
||||
<<"key">> => 7,
|
||||
<<"data">> => <<"RabbitMQ">>,
|
||||
|
@ -358,20 +365,21 @@ t_send_simple_batch_with_template(Config) ->
|
|||
<<"secret">> => 42
|
||||
},
|
||||
?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
|
||||
delete_bridge(),
|
||||
ok = delete_bridge(Name),
|
||||
ok.
|
||||
|
||||
t_heavy_batching(Config) ->
|
||||
Name = atom_to_binary(?FUNCTION_NAME),
|
||||
NumberOfMessages = 20000,
|
||||
BridgeConf = #{
|
||||
RabbitMQ = get_rabbitmq(Config),
|
||||
UseTLS = get_tls(Config),
|
||||
BridgeConf = RabbitMQ#{
|
||||
batch_size => 10173,
|
||||
batch_time_ms => 50
|
||||
},
|
||||
BridgeID = make_bridge(BridgeConf),
|
||||
BridgeID = create_bridge(Name, UseTLS, BridgeConf),
|
||||
SendMessage = fun(Key) ->
|
||||
Payload = #{
|
||||
<<"key">> => Key
|
||||
},
|
||||
Payload = #{<<"key">> => Key},
|
||||
emqx_bridge:send_message(BridgeID, Payload)
|
||||
end,
|
||||
[SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)],
|
||||
|
@ -385,7 +393,7 @@ t_heavy_batching(Config) ->
|
|||
lists:seq(1, NumberOfMessages)
|
||||
),
|
||||
?assertEqual(NumberOfMessages, maps:size(AllMessages)),
|
||||
delete_bridge(),
|
||||
ok = delete_bridge(Name),
|
||||
ok.
|
||||
|
||||
receive_simple_test_message(Config) ->
|
||||
|
@ -410,17 +418,6 @@ receive_simple_test_message(Config) ->
|
|||
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
|
||||
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
|
||||
emqx_utils_json:decode(Content#amqp_msg.payload)
|
||||
after 5000 ->
|
||||
?assert(false, "Did not receive message within 5 second")
|
||||
end.
|
||||
|
||||
rabbitmq_config() ->
|
||||
Config =
|
||||
#{
|
||||
server => rabbit_mq_host(),
|
||||
port => 5672,
|
||||
exchange => rabbit_mq_exchange(),
|
||||
routing_key => rabbit_mq_routing_key()
|
||||
},
|
||||
#{<<"config">> => Config}.
|
||||
|
||||
test_data() ->
|
||||
#{<<"msg_field">> => <<"Hello">>}.
|
||||
|
|
|
@ -14,18 +14,18 @@
|
|||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
||||
%% This test SUITE requires a running RabbitMQ instance. If you don't want to
|
||||
%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script
|
||||
%% bring up the whole CI infrastructure with the `scripts/ct/run.sh` script
|
||||
%% you can create a clickhouse instance with the following command.
|
||||
%% 5672 is the default port for AMQP 0-9-1 and 15672 is the default port for
|
||||
%% the HTTP managament interface.
|
||||
%% the HTTP management interface.
|
||||
%%
|
||||
%% docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3.11-management
|
||||
|
||||
rabbit_mq_host() ->
|
||||
<<"rabbitmq">>.
|
||||
list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")).
|
||||
|
||||
rabbit_mq_port() ->
|
||||
5672.
|
||||
list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")).
|
||||
|
||||
rabbit_mq_password() ->
|
||||
<<"guest">>.
|
||||
|
@ -43,17 +43,16 @@ all() ->
|
|||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
case
|
||||
emqx_common_test_helpers:is_tcp_server_available(
|
||||
erlang:binary_to_list(rabbit_mq_host()), rabbit_mq_port()
|
||||
)
|
||||
of
|
||||
Host = rabbit_mq_host(),
|
||||
Port = rabbit_mq_port(),
|
||||
ct:pal("rabbitmq:~p~n", [{Host, Port}]),
|
||||
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
|
||||
true ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
[emqx_conf, emqx_connector, emqx_bridge_rabbitmq],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
),
|
||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(),
|
||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port),
|
||||
[{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config];
|
||||
false ->
|
||||
case os:getenv("IS_CI") of
|
||||
|
@ -64,12 +63,12 @@ init_per_suite(Config) ->
|
|||
end
|
||||
end.
|
||||
|
||||
setup_rabbit_mq_exchange_and_queue() ->
|
||||
%% Create an exachange and a queue
|
||||
setup_rabbit_mq_exchange_and_queue(Host, Port) ->
|
||||
%% Create an exchange and a queue
|
||||
{ok, Connection} =
|
||||
amqp_connection:start(#amqp_params_network{
|
||||
host = erlang:binary_to_list(rabbit_mq_host()),
|
||||
port = rabbit_mq_port()
|
||||
host = binary_to_list(Host),
|
||||
port = Port
|
||||
}),
|
||||
{ok, Channel} = amqp_connection:open_channel(Connection),
|
||||
%% Create an exchange
|
||||
|
@ -122,7 +121,7 @@ end_per_suite(Config) ->
|
|||
|
||||
t_lifecycle(Config) ->
|
||||
perform_lifecycle_check(
|
||||
erlang:atom_to_binary(?MODULE),
|
||||
erlang:atom_to_binary(?FUNCTION_NAME),
|
||||
rabbitmq_config(),
|
||||
Config
|
||||
).
|
||||
|
@ -144,12 +143,11 @@ t_start_passfile(Config) ->
|
|||
).
|
||||
|
||||
perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
|
||||
#{
|
||||
channel := Channel
|
||||
} = get_channel_connection(TestConfig),
|
||||
#{channel := Channel} = get_channel_connection(TestConfig),
|
||||
CheckedConfig = check_config(InitialConfig),
|
||||
#{
|
||||
state := #{poolname := PoolName} = State,
|
||||
id := PoolName,
|
||||
state := State,
|
||||
status := InitialStatus
|
||||
} = create_local_resource(ResourceID, CheckedConfig),
|
||||
?assertEqual(InitialStatus, connected),
|
||||
|
@ -211,9 +209,14 @@ create_local_resource(ResourceID, CheckedConfig) ->
|
|||
|
||||
perform_query(PoolName, Channel) ->
|
||||
%% Send message to queue:
|
||||
ok = emqx_resource:query(PoolName, {query, test_data()}),
|
||||
ActionConfig = rabbitmq_action_config(),
|
||||
ChannelId = <<"test_channel">>,
|
||||
?assertEqual(ok, emqx_resource_manager:add_channel(PoolName, ChannelId, ActionConfig)),
|
||||
ok = emqx_resource:query(PoolName, {ChannelId, payload()}),
|
||||
%% Get the message from queue:
|
||||
ok = receive_simple_test_message(Channel).
|
||||
ok = receive_simple_test_message(Channel),
|
||||
?assertEqual(ok, emqx_resource_manager:remove_channel(PoolName, ChannelId)),
|
||||
ok.
|
||||
|
||||
receive_simple_test_message(Channel) ->
|
||||
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
|
||||
|
@ -238,6 +241,8 @@ receive_simple_test_message(Channel) ->
|
|||
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
|
||||
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
|
||||
ok
|
||||
after 5000 ->
|
||||
?assert(false, "Did not receive message within 5 second")
|
||||
end.
|
||||
|
||||
rabbitmq_config() ->
|
||||
|
@ -255,5 +260,21 @@ rabbitmq_config(Overrides) ->
|
|||
},
|
||||
#{<<"config">> => maps:merge(Config, Overrides)}.
|
||||
|
||||
payload() ->
|
||||
#{<<"payload">> => test_data()}.
|
||||
|
||||
test_data() ->
|
||||
#{<<"msg_field">> => <<"Hello">>}.
|
||||
#{<<"Hello">> => <<"World">>}.
|
||||
|
||||
rabbitmq_action_config() ->
|
||||
#{
|
||||
config_root => actions,
|
||||
parameters => #{
|
||||
delivery_mode => non_persistent,
|
||||
exchange => rabbit_mq_exchange(),
|
||||
payload_template => <<"${.payload}">>,
|
||||
publish_confirmation_timeout => 30000,
|
||||
routing_key => rabbit_mq_routing_key(),
|
||||
wait_for_publish_confirmations => true
|
||||
}
|
||||
}.
|
||||
|
|
|
@ -64,6 +64,8 @@ resource_type(greptimedb) ->
|
|||
emqx_bridge_greptimedb_connector;
|
||||
resource_type(tdengine) ->
|
||||
emqx_bridge_tdengine_connector;
|
||||
resource_type(rabbitmq) ->
|
||||
emqx_bridge_rabbitmq_connector;
|
||||
resource_type(Type) ->
|
||||
error({unknown_connector_type, Type}).
|
||||
|
||||
|
@ -82,6 +84,8 @@ connector_impl_module(opents) ->
|
|||
emqx_bridge_opents_connector;
|
||||
connector_impl_module(tdengine) ->
|
||||
emqx_bridge_tdengine_connector;
|
||||
connector_impl_module(rabbitmq) ->
|
||||
emqx_bridge_rabbitmq_connector;
|
||||
connector_impl_module(_ConnectorType) ->
|
||||
undefined.
|
||||
|
||||
|
@ -258,6 +262,14 @@ connector_structs() ->
|
|||
desc => <<"TDengine Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{rabbitmq,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_rabbitmq_connector_schema, "config_connector")),
|
||||
#{
|
||||
desc => <<"RabbitMQ Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)}
|
||||
].
|
||||
|
||||
|
@ -281,6 +293,7 @@ schema_modules() ->
|
|||
emqx_bridge_redis_schema,
|
||||
emqx_bridge_iotdb_connector,
|
||||
emqx_bridge_es_connector,
|
||||
emqx_bridge_rabbitmq_connector_schema,
|
||||
emqx_bridge_opents_connector,
|
||||
emqx_bridge_greptimedb,
|
||||
emqx_bridge_tdengine_connector
|
||||
|
@ -317,6 +330,7 @@ api_schemas(Method) ->
|
|||
api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method),
|
||||
api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method),
|
||||
api_ref(emqx_bridge_opents_connector, <<"opents">>, Method),
|
||||
api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method),
|
||||
api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method)
|
||||
].
|
||||
|
|
|
@ -91,7 +91,6 @@ api_schemas(Method) ->
|
|||
%% We need to map the `type' field of a request (binary) to a
|
||||
%% connector schema module.
|
||||
api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector"),
|
||||
% api_ref(emqx_bridge_mqtt_connector_schema, <<"mqtt_subscriber">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_mqtt_connector_schema, <<"mqtt">>, Method ++ "_connector")
|
||||
].
|
||||
|
||||
|
@ -166,7 +165,9 @@ connector_type_to_bridge_types(opents) ->
|
|||
connector_type_to_bridge_types(greptimedb) ->
|
||||
[greptimedb];
|
||||
connector_type_to_bridge_types(tdengine) ->
|
||||
[tdengine].
|
||||
[tdengine];
|
||||
connector_type_to_bridge_types(rabbitmq) ->
|
||||
[rabbitmq].
|
||||
|
||||
actions_config_name(action) -> <<"actions">>;
|
||||
actions_config_name(source) -> <<"sources">>.
|
||||
|
@ -730,17 +731,17 @@ schema_homogeneous_test() ->
|
|||
is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
|
||||
Fields = Module:fields(TypeName),
|
||||
ExpectedFieldNames = common_field_names(),
|
||||
MissingFileds = lists:filter(
|
||||
MissingFields = lists:filter(
|
||||
fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames
|
||||
),
|
||||
case MissingFileds of
|
||||
case MissingFields of
|
||||
[] ->
|
||||
false;
|
||||
_ ->
|
||||
{true, #{
|
||||
schema_module => Module,
|
||||
type_name => TypeName,
|
||||
missing_fields => MissingFileds
|
||||
missing_fields => MissingFields
|
||||
}}
|
||||
end.
|
||||
|
||||
|
|
4
bin/emqx
4
bin/emqx
|
@ -1188,10 +1188,10 @@ case "${COMMAND}" in
|
|||
esac
|
||||
case "$COMMAND" in
|
||||
foreground)
|
||||
FOREGROUNDOPTIONS="-noshell -noinput +Bd"
|
||||
FOREGROUNDOPTIONS="-enable-feature maybe_expr -noshell -noinput +Bd"
|
||||
;;
|
||||
*)
|
||||
FOREGROUNDOPTIONS=''
|
||||
FOREGROUNDOPTIONS='-enable-feature maybe_expr'
|
||||
;;
|
||||
esac
|
||||
|
||||
|
|
2
build
2
build
|
@ -136,7 +136,7 @@ make_docs() {
|
|||
local docdir="_build/docgen/$PROFILE"
|
||||
mkdir -p "$docdir"
|
||||
# shellcheck disable=SC2086
|
||||
erl -noshell -eval \
|
||||
erl -enable-feature maybe_expr -noshell -eval \
|
||||
"ok = emqx_conf:dump_schema('$docdir', $SCHEMA_MODULE), \
|
||||
halt(0)."
|
||||
local desc="$docdir/desc.en.hocon"
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
Split RabbitMQ bridge into connector and action components.
|
||||
RabbitMQ support source api to sink RabbitMQ message to EMQX broker.
|
3
mix.exs
3
mix.exs
|
@ -680,7 +680,8 @@ defmodule EMQXUmbrella.MixProject do
|
|||
|
||||
# the elixir version of escript + start.boot required the boot_var
|
||||
# RELEASE_LIB to be defined.
|
||||
boot_var = "%%!-boot_var RELEASE_LIB $RUNNER_ROOT_DIR/lib"
|
||||
# enable-feature is not required when 1.6.x
|
||||
boot_var = "%%!-boot_var RELEASE_LIB $RUNNER_ROOT_DIR/lib -enable-feature maybe_expr"
|
||||
|
||||
# Files with the version appended are expected by the release
|
||||
# upgrade script `install_upgrade.escript`
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
emqx_bridge_rabbitmq_connector {
|
||||
emqx_bridge_rabbitmq_connector_schema {
|
||||
|
||||
server.desc:
|
||||
"""The RabbitMQ server address that you want to connect to (for example, localhost)."""
|
||||
|
@ -97,4 +97,15 @@ wait_for_publish_confirmations.desc:
|
|||
wait_for_publish_confirmations.label:
|
||||
"""Wait for Publish Confirmations"""
|
||||
|
||||
connector_resource_opts.desc:
|
||||
"""Connector resource options."""
|
||||
|
||||
connector_resource_opts.label:
|
||||
"""Connector Resource Options"""
|
||||
|
||||
config_connector.desc:
|
||||
"""The configuration for the RabbitMQ connector."""
|
||||
config_connector.label:
|
||||
"""RabbitMQ Connector Configuration"""
|
||||
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
emqx_bridge_rabbitmq_pubsub_schema {
|
||||
|
||||
publisher_action.desc:
|
||||
"""Action configs."""
|
||||
publisher_action.label:
|
||||
"""Action"""
|
||||
|
||||
subscriber_source.desc:
|
||||
"""Source configs."""
|
||||
subscriber_source.label:
|
||||
"""Source"""
|
||||
|
||||
|
||||
action_parameters.desc:
|
||||
"""The action config defines how this bridge send messages to the remote RabbitMQ broker"""
|
||||
action_parameters.label:
|
||||
"""Action Parameters"""
|
||||
|
||||
source_parameters.desc:
|
||||
"""The source config defines how this bridge receive messages from the remote RabbitMQ broker"""
|
||||
source_parameters.label:
|
||||
"""Source Parameters"""
|
||||
|
||||
source_topic.desc:
|
||||
"""Topic used for constructing MQTT messages, supporting templates."""
|
||||
source_topic.label:
|
||||
"""Source Topic"""
|
||||
|
||||
source_qos.desc:
|
||||
"""The QoS level of the MQTT message, supporting templates."""
|
||||
source_qos.label:
|
||||
"""QoS"""
|
||||
|
||||
source_payload_template.desc:
|
||||
"""The template used to construct the payload of the MQTT message."""
|
||||
source_payload_template.label:
|
||||
"""Source Payload Template"""
|
||||
|
||||
source_queue.desc:
|
||||
"""The queue name of the RabbitMQ broker."""
|
||||
source_queue.label:
|
||||
"""Source Queue"""
|
||||
|
||||
source_no_ack.desc:
|
||||
"""Whether to use no_ack mode when consuming messages from the RabbitMQ broker."""
|
||||
source_no_ack.label:
|
||||
"""Source No Ack"""
|
||||
|
||||
}
|
Loading…
Reference in New Issue