diff --git a/.ci/docker-compose-file/docker-compose-rabbitmq.yaml b/.ci/docker-compose-file/docker-compose-rabbitmq.yaml index d362eb4e0..03ff12f6f 100644 --- a/.ci/docker-compose-file/docker-compose-rabbitmq.yaml +++ b/.ci/docker-compose-file/docker-compose-rabbitmq.yaml @@ -9,10 +9,12 @@ services: expose: - "15672" - "5672" + - "5671" # We don't want to take ports from the host - # ports: + #ports: # - "15672:15672" # - "5672:5672" + # - "5671:5671" volumes: - ./certs/ca.crt:/opt/certs/ca.crt - ./certs/server.crt:/opt/certs/server.crt diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 81314ce23..8a0d31fa9 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -584,7 +584,14 @@ is_tcp_server_available(Host, Port) -> Timeout :: integer() ) -> boolean. is_tcp_server_available(Host, Port, Timeout) -> - case gen_tcp:connect(Host, Port, [], Timeout) of + case + gen_tcp:connect( + emqx_utils_conv:str(Host), + emqx_utils_conv:int(Port), + [], + Timeout + ) + of {ok, Socket} -> gen_tcp:close(Socket), true; diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 9a4de69ed..e74e6aa3e 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -39,6 +39,7 @@ transform_bridge_v1_config_to_action_config/4, action_convert_from_connector/3 ]). +-export([clean_cache/0]). -callback bridge_v1_type_name() -> atom() @@ -77,7 +78,7 @@ ]). %% ==================================================================== -%% Hadcoded list of info modules for actions +%% HardCoded list of info modules for actions %% TODO: Remove this list once we have made sure that all relevants %% apps are loaded before this module is called. %% ==================================================================== @@ -103,6 +104,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_iotdb_action_info, emqx_bridge_es_action_info, emqx_bridge_opents_action_info, + emqx_bridge_rabbitmq_action_info, emqx_bridge_greptimedb_action_info, emqx_bridge_tdengine_action_info ]. @@ -313,6 +315,9 @@ build_cache() -> persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap), ActionInfoMap. +clean_cache() -> + persistent_term:erase(internal_emqx_action_persistent_term_info_key()). + action_info_modules() -> ActionInfoModules = [ action_info_modules(App) diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index 321f59f28..285102aa9 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -44,6 +44,7 @@ stop(_State) -> emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH), ok = emqx_bridge:unload(), ok = emqx_bridge_v2:unload(), + emqx_action_info:clean_cache(), ok. -if(?EMQX_RELEASE_EDITION == ee). diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index 2e1ec3444..a885cc6bc 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -1,7 +1,8 @@ {application, emqx_bridge_rabbitmq, [ {description, "EMQX Enterprise RabbitMQ Bridge"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, + {mod, {emqx_bridge_rabbitmq_app, []}}, {applications, [ kernel, stdlib, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl index 608e0a669..6aa2cc038 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_rabbitmq). @@ -22,7 +22,7 @@ ]). %% ------------------------------------------------------------------------------------------------- -%% Callback used by HTTP API +%% Callback used by HTTP API v1 %% ------------------------------------------------------------------------------------------------- conn_bridge_examples(Method) -> diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_action_info.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_action_info.erl new file mode 100644 index 000000000..cd7d340de --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_action_info.erl @@ -0,0 +1,77 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_rabbitmq_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + bridge_v1_config_to_connector_config/1, + bridge_v1_config_to_action_config/2, + is_source/0, + is_action/0 +]). + +-define(SCHEMA_MODULE, emqx_bridge_rabbitmq_pubsub_schema). +-import(emqx_utils_conv, [bin/1]). + +bridge_v1_type_name() -> rabbitmq. + +action_type_name() -> rabbitmq. + +connector_type_name() -> rabbitmq. + +schema_module() -> ?SCHEMA_MODULE. + +is_source() -> true. +is_action() -> true. + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(publisher_action)), + ActionParametersKeys = schema_keys(?SCHEMA_MODULE:fields(action_parameters)), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys( + emqx_bridge_rabbitmq_connector_schema:fields("config_connector") + ), + ConnectorKeys = (maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)), + ConnectorConfig0 = maps:with(ConnectorKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnectorConfig0 + ). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(publisher_action)), + ActionParametersKeys = schema_keys(?SCHEMA_MODULE:fields(action_parameters)), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig0 = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig0#{<<"connector">> => ConnectorName} + ). + +schema_keys(Schema) -> + [bin(Key) || {Key, _} <- Schema]. + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_app.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_app.erl new file mode 100644 index 000000000..e43a70620 --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_app.erl @@ -0,0 +1,26 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_rabbitmq_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + emqx_bridge_rabbitmq_sup:start_link(). + +stop(_State) -> + ok. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 2e4074f79..45d21d8d0 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -1,9 +1,9 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_rabbitmq_connector). - +%-feature(maybe_expr, enable). -include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("typerefl/include/types.hrl"). @@ -22,17 +22,16 @@ %% hocon_schema callbacks -export([namespace/0, roots/0, fields/1]). -%% HTTP API callbacks --export([values/1]). - %% emqx_resource callbacks -export([ - %% Required callbacks on_start/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, on_stop/2, callback_mode/0, - %% Optional callbacks on_get_status/2, + on_get_channel_status/3, on_query/3, on_batch_query/3 ]). @@ -41,142 +40,18 @@ -export([connect/1]). %% Internal callbacks --export([publish_messages/3]). +-export([publish_messages/4]). namespace() -> "rabbitmq". +%% bridge v1 roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. +%% bridge v1 called by emqx_bridge_rabbitmq fields(config) -> - [ - {server, - hoconsc:mk( - typerefl:binary(), - #{ - default => <<"localhost">>, - desc => ?DESC("server") - } - )}, - {port, - hoconsc:mk( - emqx_schema:port_number(), - #{ - default => 5672, - desc => ?DESC("server") - } - )}, - {username, - hoconsc:mk( - typerefl:binary(), - #{ - required => true, - desc => ?DESC("username") - } - )}, - {password, emqx_connector_schema_lib:password_field(#{required => true})}, - {pool_size, - hoconsc:mk( - typerefl:pos_integer(), - #{ - default => 8, - desc => ?DESC("pool_size") - } - )}, - {timeout, - hoconsc:mk( - emqx_schema:timeout_duration_ms(), - #{ - default => <<"5s">>, - desc => ?DESC("timeout") - } - )}, - {wait_for_publish_confirmations, - hoconsc:mk( - boolean(), - #{ - default => true, - desc => ?DESC("wait_for_publish_confirmations") - } - )}, - {publish_confirmation_timeout, - hoconsc:mk( - emqx_schema:timeout_duration_ms(), - #{ - default => <<"30s">>, - desc => ?DESC("timeout") - } - )}, - - {virtual_host, - hoconsc:mk( - typerefl:binary(), - #{ - default => <<"/">>, - desc => ?DESC("virtual_host") - } - )}, - {heartbeat, - hoconsc:mk( - emqx_schema:timeout_duration_ms(), - #{ - default => <<"30s">>, - desc => ?DESC("heartbeat") - } - )}, - %% Things related to sending messages to RabbitMQ - {exchange, - hoconsc:mk( - typerefl:binary(), - #{ - required => true, - desc => ?DESC("exchange") - } - )}, - {routing_key, - hoconsc:mk( - typerefl:binary(), - #{ - required => true, - desc => ?DESC("routing_key") - } - )}, - {delivery_mode, - hoconsc:mk( - hoconsc:enum([non_persistent, persistent]), - #{ - default => non_persistent, - desc => ?DESC("delivery_mode") - } - )}, - {payload_template, - hoconsc:mk( - binary(), - #{ - default => <<"${.}">>, - desc => ?DESC("payload_template") - } - )} - ] ++ emqx_connector_schema_lib:ssl_fields(). - -values(post) -> - maps:merge(values(put), #{name => <<"connector">>}); -values(get) -> - values(post); -values(put) -> - #{ - server => <<"localhost">>, - port => 5672, - enable => true, - pool_size => 8, - type => rabbitmq, - username => <<"guest">>, - password => <<"******">>, - routing_key => <<"my_routing_key">>, - payload_template => <<"">> - }; -values(_) -> - #{}. + emqx_bridge_rabbitmq_connector_schema:fields(connector) ++ + emqx_bridge_rabbitmq_pubsub_schema:fields(action_parameters). %% =================================================================== %% Callbacks defined in emqx_resource @@ -186,127 +61,84 @@ values(_) -> callback_mode() -> always_sync. -%% emqx_resource callback - -%% emqx_resource callback called when the resource is started - --spec on_start(resource_id(), term()) -> {ok, resource_state()} | {error, _}. -on_start( - InstanceID, - #{ - pool_size := PoolSize, - payload_template := PayloadTemplate, - delivery_mode := InitialDeliveryMode - } = InitialConfig -) -> - DeliveryMode = - case InitialDeliveryMode of - non_persistent -> 1; - persistent -> 2 - end, - Config = InitialConfig#{ - delivery_mode => DeliveryMode - }, +on_start(InstanceID, Config) -> ?SLOG(info, #{ msg => "starting_rabbitmq_connector", connector => InstanceID, config => emqx_utils:redact(Config) }), + init_secret(), Options = [ {config, Config}, - %% The pool_size is read by ecpool and decides the number of workers in - %% the pool - {pool_size, PoolSize}, + {pool_size, maps:get(pool_size, Config)}, {pool, InstanceID} ], - ProcessedTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate), - State = #{ - poolname => InstanceID, - processed_payload_template => ProcessedTemplate, - config => Config - }, - %% Initialize RabbitMQ's secret library so that the password is encrypted - %% in the log files. - case credentials_obfuscation:secret() of - ?PENDING_SECRET -> - Bytes = crypto:strong_rand_bytes(128), - %% The password can appear in log files if we don't do this - credentials_obfuscation:set_secret(Bytes); - _ -> - %% Already initialized - ok - end, case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of ok -> - {ok, State}; + {ok, #{channels => #{}}}; {error, Reason} -> - ?SLOG(info, #{ + ?SLOG(error, #{ msg => "rabbitmq_connector_start_failed", - error_reason => Reason, + reason => Reason, config => emqx_utils:redact(Config) }), {error, Reason} end. -%% emqx_resource callback called when the resource is stopped - --spec on_stop(resource_id(), resource_state()) -> term(). -on_stop( - ResourceID, - _State +on_add_channel( + InstanceId, + #{channels := Channels} = State, + ChannelId, + Config ) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + false -> + ProcParam = preproc_parameter(Config), + case make_channel(InstanceId, ChannelId, ProcParam) of + {ok, RabbitChannels} -> + Channel = #{param => ProcParam, rabbitmq => RabbitChannels}, + NewChannels = maps:put(ChannelId, Channel, Channels), + {ok, State#{channels => NewChannels}}; + {error, Error} -> + ?SLOG(error, #{ + msg => "failed_to_start_rabbitmq_channel", + instance_id => InstanceId, + params => emqx_utils:redact(Config), + error => Error + }), + {error, Error} + end + end. + +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> + try_unsubscribe(ChannelId, Channels), + {ok, State#{channels => maps:remove(ChannelId, Channels)}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_stop(ResourceID, _State) -> ?SLOG(info, #{ msg => "stopping_rabbitmq_connector", connector => ResourceID }), - stop_clients_and_pool(ResourceID). - -stop_clients_and_pool(PoolName) -> - Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - Clients = [ - begin - {ok, Client} = ecpool_worker:client(Worker), - Client - end - || Worker <- Workers - ], - %% We need to stop the pool before stopping the workers as the pool monitors the workers - StopResult = emqx_resource_pool:stop(PoolName), - lists:foreach(fun stop_worker/1, Clients), - StopResult. - -stop_worker({Channel, Connection}) -> - amqp_channel:close(Channel), - amqp_connection:close(Connection). - -%% This is the callback function that is called by ecpool when the pool is -%% started + lists:foreach( + fun({_Name, Worker}) -> + case ecpool_worker:client(Worker) of + {ok, Conn} -> amqp_connection:close(Conn); + _ -> ok + end + end, + ecpool:workers(ResourceID) + ), + emqx_resource_pool:stop(ResourceID). +%% This is the callback function that is called by ecpool -spec connect(term()) -> {ok, {pid(), pid()}, map()} | {error, term()}. connect(Options) -> Config = proplists:get_value(config, Options), - try - create_rabbitmq_connection_and_channel(Config) - catch - _:{error, Reason} -> - ?SLOG(error, #{ - msg => "rabbitmq_connector_connection_failed", - error_type => error, - error_reason => Reason, - config => emqx_utils:redact(Config) - }), - {error, Reason}; - Type:Reason -> - ?SLOG(error, #{ - msg => "rabbitmq_connector_connection_failed", - error_type => Type, - error_reason => Reason, - config => emqx_utils:redact(Config) - }), - {error, Reason} - end. - -create_rabbitmq_connection_and_channel(Config) -> #{ server := Host, port := Port, @@ -314,237 +146,164 @@ create_rabbitmq_connection_and_channel(Config) -> password := WrappedPassword, timeout := Timeout, virtual_host := VirtualHost, - heartbeat := Heartbeat, - wait_for_publish_confirmations := WaitForPublishConfirmations + heartbeat := Heartbeat } = Config, %% TODO: teach `amqp` to accept 0-arity closures as passwords. Password = emqx_secret:unwrap(WrappedPassword), - SSLOptions = - case maps:get(ssl, Config, #{}) of - #{enable := true} = SSLOpts -> - emqx_tls_lib:to_client_opts(SSLOpts); - _ -> - none - end, - RabbitMQConnectionOptions = + RabbitMQConnOptions = #amqp_params_network{ - host = erlang:binary_to_list(Host), + host = Host, port = Port, - ssl_options = SSLOptions, + ssl_options = to_ssl_options(Config), username = Username, password = Password, connection_timeout = Timeout, virtual_host = VirtualHost, heartbeat = Heartbeat }, - {ok, RabbitMQConnection} = - case amqp_connection:start(RabbitMQConnectionOptions) of - {ok, Connection} -> - {ok, Connection}; - {error, Reason} -> - erlang:error({error, Reason}) - end, - {ok, RabbitMQChannel} = - case amqp_connection:open_channel(RabbitMQConnection) of - {ok, Channel} -> - {ok, Channel}; - {error, OpenChannelErrorReason} -> - erlang:error({error, OpenChannelErrorReason}) - end, - %% We need to enable confirmations if we want to wait for them - case WaitForPublishConfirmations of - true -> - case amqp_channel:call(RabbitMQChannel, #'confirm.select'{}) of - #'confirm.select_ok'{} -> - ok; - Error -> - ConfirmModeErrorReason = - erlang:iolist_to_binary( - io_lib:format( - "Could not enable RabbitMQ confirmation mode ~p", - [Error] - ) - ), - erlang:error({error, ConfirmModeErrorReason}) - end; - false -> - ok - end, - {ok, {RabbitMQConnection, RabbitMQChannel}, #{ - supervisees => [RabbitMQConnection, RabbitMQChannel] - }}. - -%% emqx_resource callback called to check the status of the resource + case amqp_connection:start(RabbitMQConnOptions) of + {ok, RabbitMQConn} -> + {ok, RabbitMQConn}; + {error, Reason} -> + ?SLOG(error, #{ + msg => "rabbitmq_connector_connection_failed", + reason => Reason, + config => emqx_utils:redact(Config) + }), + {error, Reason} + end. -spec on_get_status(resource_id(), term()) -> {connected, resource_state()} | {disconnected, resource_state(), binary()}. -on_get_status( - _InstId, - #{ - poolname := PoolName - } = State -) -> - Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - Clients = [ - begin - {ok, Client} = ecpool_worker:client(Worker), - Client - end - || Worker <- Workers - ], - CheckResults = [ - check_worker(Client) - || Client <- Clients - ], - Connected = length(CheckResults) > 0 andalso lists:all(fun(R) -> R end, CheckResults), - case Connected of - true -> - {connected, State}; - false -> - {disconnected, State, <<"not_connected">>} - end; -on_get_status( - _InstId, - State -) -> - {disconnect, State, <<"not_connected: no connection pool in state">>}. +on_get_status(PoolName, #{channels := Channels} = State) -> + ChannelNum = maps:size(Channels), + Conns = get_rabbitmq_connections(PoolName), + Check = + lists:all( + fun(Conn) -> + [{num_channels, ActualNum}] = amqp_connection:info(Conn, [num_channels]), + ChannelNum >= ActualNum + end, + Conns + ), + case Check andalso Conns =/= [] of + true -> {connected, State}; + false -> {disconnected, State, <<"not_connected">>} + end. -check_worker({Channel, Connection}) -> - erlang:is_process_alive(Channel) andalso erlang:is_process_alive(Connection). +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case emqx_utils_maps:deep_find([ChannelId, rabbitmq], Channels) of + {ok, RabbitMQ} -> + case lists:all(fun is_process_alive/1, maps:values(RabbitMQ)) of + true -> connected; + false -> {error, not_connected} + end; + _ -> + {error, not_exists} + end. -%% emqx_resource callback that is called when a non-batch query is received - --spec on_query(resource_id(), Request, resource_state()) -> query_result() when - Request :: {RequestType, Data}, - RequestType :: send_message, - Data :: map(). -on_query( - ResourceID, - {RequestType, Data}, - #{ - poolname := PoolName, - processed_payload_template := PayloadTemplate, - config := Config - } = State -) -> +on_query(ResourceID, {ChannelId, Data} = MsgReq, State) -> ?SLOG(debug, #{ msg => "rabbitmq_connector_received_query", connector => ResourceID, - type => RequestType, + channel => ChannelId, data => Data, state => emqx_utils:redact(State) }), - MessageData = format_data(PayloadTemplate, Data), - Res = ecpool:pick_and_do( - PoolName, - {?MODULE, publish_messages, [Config, [MessageData]]}, - no_handover - ), - handle_result(Res). + #{channels := Channels} = State, + case maps:find(ChannelId, Channels) of + {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> + Res = ecpool:pick_and_do( + ResourceID, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq]]}, + no_handover + ), + handle_result(Res); + error -> + {error, {unrecoverable_error, {invalid_message_tag, ChannelId}}} + end. -%% emqx_resource callback that is called when a batch query is received - --spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when - BatchReq :: nonempty_list({'send_message', map()}). -on_batch_query( - ResourceID, - BatchReq, - State -) -> +on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) -> ?SLOG(debug, #{ msg => "rabbitmq_connector_received_batch_query", connector => ResourceID, - data => BatchReq, + data => Batch, state => emqx_utils:redact(State) }), - %% Currently we only support batch requests with the send_message key - {Keys, MessagesToInsert} = lists:unzip(BatchReq), - ensure_keys_are_of_type_send_message(Keys), - %% Pick out the payload template - #{ - processed_payload_template := PayloadTemplate, - poolname := PoolName, - config := Config - } = State, - %% Create batch payload - FormattedMessages = [ - format_data(PayloadTemplate, Data) - || Data <- MessagesToInsert - ], - %% Publish the messages - Res = ecpool:pick_and_do( - PoolName, - {?MODULE, publish_messages, [Config, FormattedMessages]}, - no_handover - ), - handle_result(Res). + #{channels := Channels} = State, + case maps:find(ChannelId, Channels) of + {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> + Res = ecpool:pick_and_do( + ResourceID, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch]}, + no_handover + ), + handle_result(Res); + error -> + {error, {unrecoverable_error, {invalid_message_tag, ChannelId}}} + end. publish_messages( - {_Connection, Channel}, + Conn, + RabbitMQ, #{ delivery_mode := DeliveryMode, + payload_template := PayloadTmpl, routing_key := RoutingKey, exchange := Exchange, wait_for_publish_confirmations := WaitForPublishConfirmations, publish_confirmation_timeout := PublishConfirmationTimeout - } = _Config, + }, Messages ) -> - MessageProperties = #'P_basic'{ - headers = [], - delivery_mode = DeliveryMode - }, - Method = #'basic.publish'{ - exchange = Exchange, - routing_key = RoutingKey - }, - _ = [ - amqp_channel:cast( - Channel, - Method, - #amqp_msg{ - payload = Message, - props = MessageProperties - } - ) - || Message <- Messages - ], - case WaitForPublishConfirmations of - true -> - case amqp_channel:wait_for_confirms(Channel, PublishConfirmationTimeout) of - true -> - ok; - false -> - erlang:error( - {recoverable_error, - <<"RabbitMQ: Got NACK when waiting for message acknowledgment.">>} - ); - timeout -> - erlang:error( - {recoverable_error, - <<"RabbitMQ: Timeout when waiting for message acknowledgment.">>} + case maps:find(Conn, RabbitMQ) of + {ok, Channel} -> + MessageProperties = #'P_basic'{ + headers = [], + delivery_mode = DeliveryMode + }, + Method = #'basic.publish'{ + exchange = Exchange, + routing_key = RoutingKey + }, + lists:foreach( + fun({_, MsgRaw}) -> + amqp_channel:cast( + Channel, + Method, + #amqp_msg{ + payload = format_data(PayloadTmpl, MsgRaw), + props = MessageProperties + } ) + end, + Messages + ), + case WaitForPublishConfirmations of + true -> + case amqp_channel:wait_for_confirms(Channel, PublishConfirmationTimeout) of + true -> + ok; + false -> + erlang:error( + {recoverable_error, + <<"RabbitMQ: Got NACK when waiting for message acknowledgment.">>} + ); + timeout -> + erlang:error( + {recoverable_error, + <<"RabbitMQ: Timeout when waiting for message acknowledgment.">>} + ) + end; + false -> + ok end; - false -> - ok - end. - -ensure_keys_are_of_type_send_message(Keys) -> - case lists:all(fun is_send_message_atom/1, Keys) of - true -> - ok; - false -> + error -> erlang:error( - {unrecoverable_error, - <<"Unexpected type for batch message (Expected send_message)">>} + {recoverable_error, {<<"RabbitMQ: channel_not_found">>, Conn, RabbitMQ}} ) end. -is_send_message_atom(send_message) -> - true; -is_send_message_atom(_) -> - false. - format_data([], Msg) -> emqx_utils_json:encode(Msg); format_data(Tokens, Msg) -> @@ -554,3 +313,119 @@ handle_result({error, ecpool_empty}) -> {error, {recoverable_error, ecpool_empty}}; handle_result(Res) -> Res. + +make_channel(PoolName, ChannelId, Params) -> + Conns = get_rabbitmq_connections(PoolName), + make_channel(Conns, PoolName, ChannelId, Params, #{}). + +make_channel([], _PoolName, _ChannelId, _Param, Acc) -> + {ok, Acc}; +make_channel([Conn | Conns], PoolName, ChannelId, Params, Acc) -> + maybe + {ok, RabbitMQChannel} ?= amqp_connection:open_channel(Conn), + ok ?= try_confirm_channel(Params, RabbitMQChannel), + ok ?= try_subscribe(Params, RabbitMQChannel, PoolName, ChannelId), + NewAcc = Acc#{Conn => RabbitMQChannel}, + make_channel(Conns, PoolName, ChannelId, Params, NewAcc) + end. + +%% We need to enable confirmations if we want to wait for them +try_confirm_channel(#{wait_for_publish_confirmations := true}, Channel) -> + case amqp_channel:call(Channel, #'confirm.select'{}) of + #'confirm.select_ok'{} -> + ok; + Error -> + Reason = + iolist_to_binary( + io_lib:format( + "Could not enable RabbitMQ confirmation mode ~p", + [Error] + ) + ), + {error, Reason} + end; +try_confirm_channel(#{wait_for_publish_confirmations := false}, _Channel) -> + ok. + +%% Initialize Rabbitmq's secret library so that the password is encrypted +%% in the log files. +init_secret() -> + case credentials_obfuscation:secret() of + ?PENDING_SECRET -> + Bytes = crypto:strong_rand_bytes(128), + %% The password can appear in log files if we don't do this + credentials_obfuscation:set_secret(Bytes); + _ -> + %% Already initialized + ok + end. + +preproc_parameter(#{config_root := actions, parameters := Parameter}) -> + #{ + payload_template := PayloadTemplate, + delivery_mode := InitialDeliveryMode + } = Parameter, + Parameter#{ + delivery_mode => delivery_mode(InitialDeliveryMode), + payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), + config_root => actions + }; +preproc_parameter(#{config_root := sources, parameters := Parameter, hookpoints := Hooks}) -> + #{ + payload_template := PayloadTmpl, + qos := QosTmpl, + topic := TopicTmpl + } = Parameter, + Parameter#{ + payload_template => emqx_placeholder:preproc_tmpl(PayloadTmpl), + qos => preproc_qos(QosTmpl), + topic => emqx_placeholder:preproc_tmpl(TopicTmpl), + hookpoints => Hooks, + config_root => sources + }. + +preproc_qos(Qos) when is_integer(Qos) -> Qos; +preproc_qos(Qos) -> emqx_placeholder:preproc_tmpl(Qos). + +delivery_mode(non_persistent) -> 1; +delivery_mode(persistent) -> 2. + +to_ssl_options(#{ssl := #{enable := true} = SSLOpts}) -> + emqx_tls_lib:to_client_opts(SSLOpts); +to_ssl_options(_) -> + none. + +get_rabbitmq_connections(PoolName) -> + lists:filtermap( + fun({_Name, Worker}) -> + case ecpool_worker:client(Worker) of + {ok, Conn} -> {true, Conn}; + _ -> false + end + end, + ecpool:workers(PoolName) + ). + +try_subscribe( + #{queue := Queue, no_ack := NoAck, config_root := sources} = Params, + RabbitChan, + PoolName, + ChannelId +) -> + WorkState = {RabbitChan, PoolName, Params}, + {ok, ConsumePid} = emqx_bridge_rabbitmq_sup:ensure_started(ChannelId, WorkState), + BasicConsume = #'basic.consume'{queue = Queue, no_ack = NoAck}, + #'basic.consume_ok'{consumer_tag = _} = + amqp_channel:subscribe(RabbitChan, BasicConsume, ConsumePid), + ok; +try_subscribe(#{config_root := actions}, _RabbitChan, _PoolName, _ChannelId) -> + ok. + +try_unsubscribe(ChannelId, Channels) -> + case emqx_utils_maps:deep_find([ChannelId, rabbitmq], Channels) of + {ok, RabbitMQ} -> + lists:foreach(fun(Pid) -> catch amqp_channel:close(Pid) end, maps:values(RabbitMQ)), + emqx_bridge_rabbitmq_sup:ensure_deleted(ChannelId); + _ -> + ok + end. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector_schema.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector_schema.erl new file mode 100644 index 000000000..d36eb463c --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector_schema.erl @@ -0,0 +1,139 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_rabbitmq_connector_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-define(TYPE, rabbitmq). + +-export([roots/0, fields/1, desc/1, namespace/0]). +-export([connector_examples/1, connector_example_values/0]). + +%%====================================================================================== +%% Hocon Schema Definitions +namespace() -> ?TYPE. + +roots() -> []. + +fields("config_connector") -> + emqx_bridge_schema:common_bridge_fields() ++ + fields(connector) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector) -> + [ + {server, + ?HOCON( + string(), + #{ + default => <<"localhost">>, + desc => ?DESC("server") + } + )}, + {port, + ?HOCON( + emqx_schema:port_number(), + #{ + default => 5672, + desc => ?DESC("server") + } + )}, + {username, + ?HOCON( + binary(), + #{ + required => true, + desc => ?DESC("username") + } + )}, + {password, emqx_connector_schema_lib:password_field(#{required => true})}, + {pool_size, + ?HOCON( + pos_integer(), + #{ + default => 8, + desc => ?DESC("pool_size") + } + )}, + {timeout, + ?HOCON( + emqx_schema:timeout_duration_ms(), + #{ + default => <<"5s">>, + desc => ?DESC("timeout") + } + )}, + {virtual_host, + ?HOCON( + binary(), + #{ + default => <<"/">>, + desc => ?DESC("virtual_host") + } + )}, + {heartbeat, + ?HOCON( + emqx_schema:timeout_duration_ms(), + #{ + default => <<"30s">>, + desc => ?DESC("heartbeat") + } + )} + ] ++ + emqx_connector_schema_lib:ssl_fields(); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields("post") -> + emqx_connector_schema:type_and_name_fields(?TYPE) ++ fields("config_connector"); +fields("put") -> + fields("config_connector"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("config_connector"). + +desc("config_connector") -> + ?DESC("config_connector"); +desc(_) -> + undefined. + +connector_examples(Method) -> + [ + #{ + <<"rabbitmq">> => + #{ + summary => <<"Rabbitmq Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?TYPE, connector_example_values() + ) + } + } + ]. + +connector_example_values() -> + #{ + name => <<"rabbitmq_connector">>, + type => rabbitmq, + enable => true, + server => <<"127.0.0.1">>, + port => 5672, + username => <<"guest">>, + password => <<"******">>, + pool_size => 8, + timeout => <<"5s">>, + virtual_host => <<"/">>, + heartbeat => <<"30s">>, + ssl => #{enable => false} + }. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl new file mode 100644 index 000000000..b4bb72c22 --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl @@ -0,0 +1,273 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_rabbitmq_pubsub_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1, desc/1, namespace/0]). + +-export([ + bridge_v2_examples/1, + source_examples/1 +]). + +-define(ACTION_TYPE, rabbitmq). +-define(SOURCE_TYPE, rabbitmq). + +%%====================================================================================== +%% Hocon Schema Definitions +namespace() -> "bridge_rabbitmq". + +roots() -> []. + +fields(action) -> + {rabbitmq, + ?HOCON( + ?MAP(name, ?R_REF(publisher_action)), + #{ + desc => <<"RabbitMQ Action Config">>, + required => false + } + )}; +fields(publisher_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + ?HOCON( + ?R_REF(action_parameters), + #{ + required => true, + desc => ?DESC(action_parameters) + } + ), + #{resource_opts_ref => ?R_REF(action_resource_opts)} + ); +fields(action_parameters) -> + [ + {wait_for_publish_confirmations, + hoconsc:mk( + boolean(), + #{ + default => true, + desc => ?DESC("wait_for_publish_confirmations") + } + )}, + {publish_confirmation_timeout, + hoconsc:mk( + emqx_schema:timeout_duration_ms(), + #{ + default => <<"30s">>, + desc => ?DESC("timeout") + } + )}, + {exchange, + hoconsc:mk( + typerefl:binary(), + #{ + required => true, + desc => ?DESC("exchange") + } + )}, + {routing_key, + hoconsc:mk( + typerefl:binary(), + #{ + required => true, + desc => ?DESC("routing_key") + } + )}, + {delivery_mode, + hoconsc:mk( + hoconsc:enum([non_persistent, persistent]), + #{ + default => non_persistent, + desc => ?DESC("delivery_mode") + } + )}, + {payload_template, + hoconsc:mk( + binary(), + #{ + default => <<"${.}">>, + desc => ?DESC("payload_template") + } + )} + ]; +fields(source) -> + {rabbitmq, + ?HOCON( + hoconsc:map(name, ?R_REF(subscriber_source)), + #{ + desc => <<"MQTT Subscriber Source Config">>, + required => false + } + )}; +fields(subscriber_source) -> + emqx_bridge_v2_schema:make_consumer_action_schema( + ?HOCON( + ?R_REF(ingress_parameters), + #{ + required => true, + desc => ?DESC("source_parameters") + } + ) + ); +fields(ingress_parameters) -> + [ + {wait_for_publish_confirmations, + hoconsc:mk( + boolean(), + #{ + default => true, + desc => ?DESC("wait_for_publish_confirmations") + } + )}, + {topic, + ?HOCON( + binary(), + #{ + required => true, + validator => fun emqx_schema:non_empty_string/1, + desc => ?DESC("ingress_topic") + } + )}, + {qos, + ?HOCON( + ?UNION([emqx_schema:qos(), binary()]), + #{ + default => 0, + desc => ?DESC("ingress_qos") + } + )}, + {payload_template, + ?HOCON( + binary(), + #{ + required => false, + desc => ?DESC("ingress_payload_template") + } + )}, + {queue, + ?HOCON( + binary(), + #{ + required => true, + desc => ?DESC("ingress_queue") + } + )}, + {no_ack, + ?HOCON( + boolean(), + #{ + required => false, + default => true, + desc => ?DESC("ingress_no_ack") + } + )} + ]; +fields(action_resource_opts) -> + emqx_bridge_v2_schema:action_resource_opts_fields(); +fields(source_resource_opts) -> + emqx_bridge_v2_schema:source_resource_opts_fields(); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(publisher_action)); +fields(Field) when + Field == "get_source"; + Field == "post_source"; + Field == "put_source" +-> + emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(subscriber_source)); +fields(What) -> + error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}). +%% v2: api schema +%% The parameter equals to +%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1 +%% `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1 +%%-------------------------------------------------------------------- +%% v1/v2 + +desc("config") -> + ?DESC("desc_config"); +desc(action_resource_opts) -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc(source_resource_opts) -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc(action_parameters) -> + ?DESC(action_parameters); +desc(ingress_parameters) -> + ?DESC(ingress_parameters); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; +desc("http_action") -> + ?DESC("desc_config"); +desc("parameters_opts") -> + ?DESC("config_parameters_opts"); +desc(publisher_action) -> + ?DESC(publisher_action); +desc(subscriber_source) -> + ?DESC(subscriber_source); +desc(_) -> + undefined. + +bridge_v2_examples(Method) -> + [ + #{ + <<"rabbitmq">> => #{ + summary => <<"RabbitMQ Producer Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, + _ActionType = ?ACTION_TYPE, + _ConnectorType = rabbitmq, + #{ + parameters => #{ + wait_for_publish_confirmations => true, + publish_confirmation_timeout => <<"30s">>, + exchange => <<"test_exchange">>, + routing_key => <<"/">>, + delivery_mode => <<"non_persistent">>, + payload_template => <<"${.payload}">> + } + } + ) + } + } + ]. + +source_examples(Method) -> + [ + #{ + <<"rabbitmq">> => #{ + summary => <<"RabbitMQ Subscriber Source">>, + value => emqx_bridge_v2_schema:source_values( + Method, + _SourceType = ?SOURCE_TYPE, + _ConnectorType = rabbitmq, + #{ + parameters => #{ + topic => <<"${payload.mqtt_topic}">>, + qos => <<"${payload.mqtt_qos}">>, + payload_template => <<"${payload.mqtt_payload}">>, + queue => <<"test_queue">>, + no_ack => true + } + } + ) + } + } + ]. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_sup.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_sup.erl new file mode 100644 index 000000000..6497929f5 --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_sup.erl @@ -0,0 +1,28 @@ +-module(emqx_bridge_rabbitmq_source_sup). + +-behaviour(supervisor). +%% API +-export([start_link/0]). +-export([init/1]). + +start_link() -> + supervisor:start_link(?MODULE, []). + +init([]) -> + SupFlags = #{ + strategy => simple_one_for_one, + intensity => 100, + period => 10 + }, + {ok, {SupFlags, [worker_spec()]}}. + +worker_spec() -> + Mod = emqx_bridge_rabbitmq_source_worker, + #{ + id => Mod, + start => {Mod, start_link, []}, + restart => transient, + shutdown => brutal_kill, + type => worker, + modules => [Mod] + }. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl new file mode 100644 index 000000000..b296887eb --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl @@ -0,0 +1,82 @@ +-module(emqx_bridge_rabbitmq_source_worker). + +-behaviour(gen_server). + +-export([start_link/1]). +-export([ + init/1, + handle_continue/2, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-include_lib("amqp_client/include/amqp_client.hrl"). + +start_link(Args) -> + gen_server:start_link(?MODULE, Args, []). + +init({_RabbitChannel, _InstanceId, _Params} = State) -> + {ok, State, {continue, confirm_ok}}. + +handle_continue(confirm_ok, State) -> + receive + #'basic.consume_ok'{} -> {noreply, State} + end. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info( + {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{ + payload = Payload, + props = #'P_basic'{message_id = MessageId, headers = Headers} + }}, + {Channel, InstanceId, Params} = State +) -> + #{ + hookpoints := Hooks, + payload_template := PayloadTmpl, + qos := QoSTmpl, + topic := TopicTmpl, + no_ack := NoAck + } = Params, + MQTTMsg = emqx_message:make( + make_message_id(MessageId), + InstanceId, + render(Payload, QoSTmpl), + render(Payload, TopicTmpl), + render(Payload, PayloadTmpl), + #{}, + make_headers(Headers) + ), + _ = emqx:publish(MQTTMsg), + lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [MQTTMsg]) end, Hooks), + (NoAck =:= false) andalso + amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}), + emqx_resource_metrics:received_inc(InstanceId), + {noreply, State}; +handle_info(#'basic.cancel_ok'{}, State) -> + {stop, normal, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +render(_Message, QoS) when is_integer(QoS) -> QoS; +render(Message, PayloadTmpl) -> + Opts = #{return => full_binary}, + emqx_placeholder:proc_tmpl(PayloadTmpl, Message, Opts). + +make_message_id(undefined) -> emqx_guid:gen(); +make_message_id(Id) -> Id. + +make_headers(undefined) -> + #{}; +make_headers(Headers) when is_list(Headers) -> + maps:from_list([{Key, Value} || {Key, _Type, Value} <- Headers]). diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_sup.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_sup.erl new file mode 100644 index 000000000..8cc442ada --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_sup.erl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_rabbitmq_sup). + +-behaviour(supervisor). + +-export([ensure_started/2]). +-export([ensure_deleted/1]). +-export([start_link/0]). +-export([init/1]). + +-define(BRIDGE_SUP, ?MODULE). + +ensure_started(SuperId, Config) -> + {ok, SuperPid} = ensure_supervisor_started(SuperId), + case supervisor:start_child(SuperPid, [Config]) of + {ok, WorkPid} -> + {ok, WorkPid}; + {error, {already_started, WorkPid}} -> + {ok, WorkPid}; + {error, Error} -> + {error, Error} + end. + +ensure_deleted(SuperId) -> + maybe + Pid = erlang:whereis(?BRIDGE_SUP), + true ?= Pid =/= undefined, + ok ?= supervisor:terminate_child(Pid, SuperId), + ok ?= supervisor:delete_child(Pid, SuperId) + else + false -> ok; + {error, not_found} -> ok; + Error -> Error + end. + +ensure_supervisor_started(Id) -> + SupervisorSpec = + #{ + id => Id, + start => {emqx_bridge_rabbitmq_source_sup, start_link, []}, + restart => permanent, + type => supervisor + }, + case supervisor:start_child(?BRIDGE_SUP, SupervisorSpec) of + {ok, Pid} -> + {ok, Pid}; + {error, {already_started, Pid}} -> + {ok, Pid} + end. + +start_link() -> + supervisor:start_link({local, ?BRIDGE_SUP}, ?MODULE, []). + +init([]) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 50, + period => 10 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl index 0ae7af9fc..7698608d3 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl @@ -14,7 +14,8 @@ %% See comment in %% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to -%% run this without bringing up the whole CI infrastucture +%% run this without bringing up the whole CI infrastructure +-define(TYPE, <<"rabbitmq">>). rabbit_mq_host() -> <<"rabbitmq">>. @@ -34,8 +35,14 @@ rabbit_mq_routing_key() -> get_channel_connection(Config) -> proplists:get_value(channel_connection, Config). +get_rabbitmq(Config) -> + proplists:get_value(rabbitmq, Config). + +get_tls(Config) -> + proplists:get_value(tls, Config). + %%------------------------------------------------------------------------------ -%% Common Test Setup, Teardown and Testcase List +%% Common Test Setup, Tear down and Testcase List %%------------------------------------------------------------------------------ all() -> @@ -101,35 +108,25 @@ common_init_per_group(Opts) -> {ok, _} = application:ensure_all_started(emqx_connector), {ok, _} = application:ensure_all_started(amqp_client), emqx_mgmt_api_test_util:init_suite(), - ChannelConnection = setup_rabbit_mq_exchange_and_queue(Opts), - [{channel_connection, ChannelConnection}]. + #{host := Host, port := Port, tls := UseTLS} = Opts, + ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS), + [ + {channel_connection, ChannelConnection}, + {rabbitmq, #{server => Host, port => Port}}, + {tls, UseTLS} + ]. -setup_rabbit_mq_exchange_and_queue(#{host := RabbitMQHost, port := RabbitMQPort, tls := UseTLS}) -> +setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) -> SSLOptions = case UseTLS of - false -> - none; - true -> - CertsDir = filename:join([ - emqx_common_test_helpers:proj_root(), - ".ci", - "docker-compose-file", - "certs" - ]), - emqx_tls_lib:to_client_opts( - #{ - enable => true, - cacertfile => filename:join([CertsDir, "ca.crt"]), - certfile => filename:join([CertsDir, "client.pem"]), - keyfile => filename:join([CertsDir, "client.key"]) - } - ) + false -> none; + true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS)) end, - %% Create an exachange and a queue + %% Create an exchange and a queue {ok, Connection} = amqp_connection:start(#amqp_params_network{ - host = RabbitMQHost, - port = RabbitMQPort, + host = Host, + port = Port, ssl_options = SSLOptions }), {ok, Channel} = amqp_connection:open_channel(Connection), @@ -184,8 +181,7 @@ init_per_testcase(_, Config) -> end_per_testcase(_, _Config) -> ok. -rabbitmq_config(Config) -> - %%SQL = maps:get(sql, Config, sql_insert_template_for_bridge()), +rabbitmq_config(UseTLS, Config) -> BatchSize = maps:get(batch_size, Config, 1), BatchTime = maps:get(batch_time_ms, Config, 0), Name = atom_to_binary(?MODULE), @@ -196,6 +192,7 @@ rabbitmq_config(Config) -> io_lib:format( "bridges.rabbitmq.~s {\n" " enable = true\n" + " ssl = ~s\n" " server = \"~s\"\n" " port = ~p\n" " username = \"guest\"\n" @@ -210,6 +207,7 @@ rabbitmq_config(Config) -> "}\n", [ Name, + hocon_pp:do(ssl_options(UseTLS), #{embedded => true}), Server, Port, rabbit_mq_routing_key(), @@ -222,80 +220,86 @@ rabbitmq_config(Config) -> ct:pal(ConfigString), parse_and_check(ConfigString, <<"rabbitmq">>, Name). +ssl_options(true) -> + CertsDir = filename:join([ + emqx_common_test_helpers:proj_root(), + ".ci", + "docker-compose-file", + "certs" + ]), + #{ + enable => true, + cacertfile => filename:join([CertsDir, "ca.crt"]), + certfile => filename:join([CertsDir, "client.pem"]), + keyfile => filename:join([CertsDir, "client.key"]) + }; +ssl_options(false) -> + #{ + enable => false + }. + parse_and_check(ConfigString, BridgeType, Name) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, RetConfig. -make_bridge(Config) -> - Type = <<"rabbitmq">>, - Name = atom_to_binary(?MODULE), - BridgeConfig = rabbitmq_config(Config), - {ok, _} = emqx_bridge:create( - Type, - Name, - BridgeConfig - ), - emqx_bridge_resource:bridge_id(Type, Name). +create_bridge(Name, UseTLS, Config) -> + BridgeConfig = rabbitmq_config(UseTLS, Config), + {ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig), + emqx_bridge_resource:bridge_id(?TYPE, Name). -delete_bridge() -> - Type = <<"rabbitmq">>, - Name = atom_to_binary(?MODULE), - ok = emqx_bridge:remove(Type, Name). +delete_bridge(Name) -> + ok = emqx_bridge:remove(?TYPE, Name). %%------------------------------------------------------------------------------ %% Test Cases %%------------------------------------------------------------------------------ -t_make_delete_bridge(_Config) -> - make_bridge(#{}), - %% Check that the new brige is in the list of bridges +t_create_delete_bridge(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + UseTLS = get_tls(Config), + create_bridge(Name, UseTLS, RabbitMQ), Bridges = emqx_bridge:list(), - Name = atom_to_binary(?MODULE), - IsRightName = - fun - (#{name := BName}) when BName =:= Name -> - true; - (_) -> - false - end, - ?assert(lists:any(IsRightName, Bridges)), - delete_bridge(), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Bridges), Bridges), + ok = delete_bridge(Name), BridgesAfterDelete = emqx_bridge:list(), - ?assertNot(lists:any(IsRightName, BridgesAfterDelete)), + ?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete), ok. -t_make_delete_bridge_non_existing_server(_Config) -> - make_bridge(#{server => <<"non_existing_server">>, port => 3174}), - %% Check that the new brige is in the list of bridges +t_create_delete_bridge_non_existing_server(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + UseTLS = get_tls(Config), + create_bridge(Name, UseTLS, #{server => <<"non_existing_server">>, port => 3174}), + %% Check that the new bridge is in the list of bridges Bridges = emqx_bridge:list(), - Name = atom_to_binary(?MODULE), - IsRightName = - fun - (#{name := BName}) when BName =:= Name -> - true; - (_) -> - false - end, - ?assert(lists:any(IsRightName, Bridges)), - delete_bridge(), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Bridges)), + ok = delete_bridge(Name), BridgesAfterDelete = emqx_bridge:list(), - ?assertNot(lists:any(IsRightName, BridgesAfterDelete)), + ?assertNot(lists:any(Any, BridgesAfterDelete)), ok. t_send_message_query(Config) -> - BridgeID = make_bridge(#{batch_size => 1}), + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + UseTLS = get_tls(Config), + BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{batch_size => 1}), Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, %% This will use the SQL template included in the bridge emqx_bridge:send_message(BridgeID, Payload), %% Check that the data got to the database ?assertEqual(Payload, receive_simple_test_message(Config)), - delete_bridge(), + ok = delete_bridge(Name), ok. t_send_message_query_with_template(Config) -> - BridgeID = make_bridge(#{ + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + UseTLS = get_tls(Config), + BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{ batch_size => 1, payload_template => << @@ -318,24 +322,27 @@ t_send_message_query_with_template(Config) -> <<"secret">> => 42 }, ?assertEqual(ExpectedResult, receive_simple_test_message(Config)), - delete_bridge(), + ok = delete_bridge(Name), ok. t_send_simple_batch(Config) -> - BridgeConf = - #{ - batch_size => 100 - }, - BridgeID = make_bridge(BridgeConf), + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + BridgeConf = RabbitMQ#{batch_size => 100}, + UseTLS = get_tls(Config), + BridgeID = create_bridge(Name, UseTLS, BridgeConf), Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, emqx_bridge:send_message(BridgeID, Payload), ?assertEqual(Payload, receive_simple_test_message(Config)), - delete_bridge(), + ok = delete_bridge(Name), ok. t_send_simple_batch_with_template(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + UseTLS = get_tls(Config), BridgeConf = - #{ + RabbitMQ#{ batch_size => 100, payload_template => << @@ -347,7 +354,7 @@ t_send_simple_batch_with_template(Config) -> "}" >> }, - BridgeID = make_bridge(BridgeConf), + BridgeID = create_bridge(Name, UseTLS, BridgeConf), Payload = #{ <<"key">> => 7, <<"data">> => <<"RabbitMQ">>, @@ -358,20 +365,21 @@ t_send_simple_batch_with_template(Config) -> <<"secret">> => 42 }, ?assertEqual(ExpectedResult, receive_simple_test_message(Config)), - delete_bridge(), + ok = delete_bridge(Name), ok. t_heavy_batching(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), NumberOfMessages = 20000, - BridgeConf = #{ + RabbitMQ = get_rabbitmq(Config), + UseTLS = get_tls(Config), + BridgeConf = RabbitMQ#{ batch_size => 10173, batch_time_ms => 50 }, - BridgeID = make_bridge(BridgeConf), + BridgeID = create_bridge(Name, UseTLS, BridgeConf), SendMessage = fun(Key) -> - Payload = #{ - <<"key">> => Key - }, + Payload = #{<<"key">> => Key}, emqx_bridge:send_message(BridgeID, Payload) end, [SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)], @@ -385,7 +393,7 @@ t_heavy_batching(Config) -> lists:seq(1, NumberOfMessages) ), ?assertEqual(NumberOfMessages, maps:size(AllMessages)), - delete_bridge(), + ok = delete_bridge(Name), ok. receive_simple_test_message(Config) -> @@ -410,17 +418,6 @@ receive_simple_test_message(Config) -> #'basic.cancel_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), emqx_utils_json:decode(Content#amqp_msg.payload) + after 5000 -> + ?assert(false, "Did not receive message within 5 second") end. - -rabbitmq_config() -> - Config = - #{ - server => rabbit_mq_host(), - port => 5672, - exchange => rabbit_mq_exchange(), - routing_key => rabbit_mq_routing_key() - }, - #{<<"config">> => Config}. - -test_data() -> - #{<<"msg_field">> => <<"Hello">>}. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl index 689c39dc5..ee5a2609b 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl @@ -14,18 +14,18 @@ -include_lib("amqp_client/include/amqp_client.hrl"). %% This test SUITE requires a running RabbitMQ instance. If you don't want to -%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script +%% bring up the whole CI infrastructure with the `scripts/ct/run.sh` script %% you can create a clickhouse instance with the following command. %% 5672 is the default port for AMQP 0-9-1 and 15672 is the default port for -%% the HTTP managament interface. +%% the HTTP management interface. %% %% docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3.11-management rabbit_mq_host() -> - <<"rabbitmq">>. + list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")). rabbit_mq_port() -> - 5672. + list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")). rabbit_mq_password() -> <<"guest">>. @@ -43,17 +43,16 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - case - emqx_common_test_helpers:is_tcp_server_available( - erlang:binary_to_list(rabbit_mq_host()), rabbit_mq_port() - ) - of + Host = rabbit_mq_host(), + Port = rabbit_mq_port(), + ct:pal("rabbitmq:~p~n", [{Host, Port}]), + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> Apps = emqx_cth_suite:start( [emqx_conf, emqx_connector, emqx_bridge_rabbitmq], #{work_dir => emqx_cth_suite:work_dir(Config)} ), - ChannelConnection = setup_rabbit_mq_exchange_and_queue(), + ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port), [{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config]; false -> case os:getenv("IS_CI") of @@ -64,12 +63,12 @@ init_per_suite(Config) -> end end. -setup_rabbit_mq_exchange_and_queue() -> - %% Create an exachange and a queue +setup_rabbit_mq_exchange_and_queue(Host, Port) -> + %% Create an exchange and a queue {ok, Connection} = amqp_connection:start(#amqp_params_network{ - host = erlang:binary_to_list(rabbit_mq_host()), - port = rabbit_mq_port() + host = binary_to_list(Host), + port = Port }), {ok, Channel} = amqp_connection:open_channel(Connection), %% Create an exchange @@ -122,7 +121,7 @@ end_per_suite(Config) -> t_lifecycle(Config) -> perform_lifecycle_check( - erlang:atom_to_binary(?MODULE), + erlang:atom_to_binary(?FUNCTION_NAME), rabbitmq_config(), Config ). @@ -144,12 +143,11 @@ t_start_passfile(Config) -> ). perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> - #{ - channel := Channel - } = get_channel_connection(TestConfig), + #{channel := Channel} = get_channel_connection(TestConfig), CheckedConfig = check_config(InitialConfig), #{ - state := #{poolname := PoolName} = State, + id := PoolName, + state := State, status := InitialStatus } = create_local_resource(ResourceID, CheckedConfig), ?assertEqual(InitialStatus, connected), @@ -211,9 +209,14 @@ create_local_resource(ResourceID, CheckedConfig) -> perform_query(PoolName, Channel) -> %% Send message to queue: - ok = emqx_resource:query(PoolName, {query, test_data()}), + ActionConfig = rabbitmq_action_config(), + ChannelId = <<"test_channel">>, + ?assertEqual(ok, emqx_resource_manager:add_channel(PoolName, ChannelId, ActionConfig)), + ok = emqx_resource:query(PoolName, {ChannelId, payload()}), %% Get the message from queue: - ok = receive_simple_test_message(Channel). + ok = receive_simple_test_message(Channel), + ?assertEqual(ok, emqx_resource_manager:remove_channel(PoolName, ChannelId)), + ok. receive_simple_test_message(Channel) -> #'basic.consume_ok'{consumer_tag = ConsumerTag} = @@ -238,6 +241,8 @@ receive_simple_test_message(Channel) -> #'basic.cancel_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), ok + after 5000 -> + ?assert(false, "Did not receive message within 5 second") end. rabbitmq_config() -> @@ -255,5 +260,21 @@ rabbitmq_config(Overrides) -> }, #{<<"config">> => maps:merge(Config, Overrides)}. +payload() -> + #{<<"payload">> => test_data()}. + test_data() -> - #{<<"msg_field">> => <<"Hello">>}. + #{<<"Hello">> => <<"World">>}. + +rabbitmq_action_config() -> + #{ + config_root => actions, + parameters => #{ + delivery_mode => non_persistent, + exchange => rabbit_mq_exchange(), + payload_template => <<"${.payload}">>, + publish_confirmation_timeout => 30000, + routing_key => rabbit_mq_routing_key(), + wait_for_publish_confirmations => true + } + }. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 379151639..2935233be 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -64,6 +64,8 @@ resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; resource_type(tdengine) -> emqx_bridge_tdengine_connector; +resource_type(rabbitmq) -> + emqx_bridge_rabbitmq_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -82,6 +84,8 @@ connector_impl_module(opents) -> emqx_bridge_opents_connector; connector_impl_module(tdengine) -> emqx_bridge_tdengine_connector; +connector_impl_module(rabbitmq) -> + emqx_bridge_rabbitmq_connector; connector_impl_module(_ConnectorType) -> undefined. @@ -258,6 +262,14 @@ connector_structs() -> desc => <<"TDengine Connector Config">>, required => false } + )}, + {rabbitmq, + mk( + hoconsc:map(name, ref(emqx_bridge_rabbitmq_connector_schema, "config_connector")), + #{ + desc => <<"RabbitMQ Connector Config">>, + required => false + } )} ]. @@ -281,6 +293,7 @@ schema_modules() -> emqx_bridge_redis_schema, emqx_bridge_iotdb_connector, emqx_bridge_es_connector, + emqx_bridge_rabbitmq_connector_schema, emqx_bridge_opents_connector, emqx_bridge_greptimedb, emqx_bridge_tdengine_connector @@ -317,6 +330,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method), api_ref(emqx_bridge_opents_connector, <<"opents">>, Method), + api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method), api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"), api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method) ]. diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index bec452f08..53622a828 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -91,7 +91,6 @@ api_schemas(Method) -> %% We need to map the `type' field of a request (binary) to a %% connector schema module. api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector"), - % api_ref(emqx_bridge_mqtt_connector_schema, <<"mqtt_subscriber">>, Method ++ "_connector"), api_ref(emqx_bridge_mqtt_connector_schema, <<"mqtt">>, Method ++ "_connector") ]. @@ -166,7 +165,9 @@ connector_type_to_bridge_types(opents) -> connector_type_to_bridge_types(greptimedb) -> [greptimedb]; connector_type_to_bridge_types(tdengine) -> - [tdengine]. + [tdengine]; +connector_type_to_bridge_types(rabbitmq) -> + [rabbitmq]. actions_config_name(action) -> <<"actions">>; actions_config_name(source) -> <<"sources">>. diff --git a/rebar.config b/rebar.config index 5ebe9da15..1d9d082af 100644 --- a/rebar.config +++ b/rebar.config @@ -14,6 +14,7 @@ warn_unused_import, warn_obsolete_guard, compressed, + {feature, maybe_expr, enable}, nowarn_unused_import, {d, snk_kind, msg} ]}.