diff --git a/apps/emqx_modules/etc/emqx_modules.conf b/apps/emqx_modules/etc/emqx_modules.conf
index f07e740da..45549edec 100644
--- a/apps/emqx_modules/etc/emqx_modules.conf
+++ b/apps/emqx_modules/etc/emqx_modules.conf
@@ -13,16 +13,6 @@ telemetry {
enable = true
}
-event_message {
- client_connected = true
- client_disconnected = true
- # client_subscribed = false
- # client_unsubscribed = false
- # message_delivered = false
- # message_acked = false
- # message_dropped = false
-}
-
topic_metrics: [
#{topic: "test/1"}
]
diff --git a/apps/emqx_modules/src/emqx_event_message.erl b/apps/emqx_modules/src/emqx_event_message.erl
deleted file mode 100644
index c462e7e25..000000000
--- a/apps/emqx_modules/src/emqx_event_message.erl
+++ /dev/null
@@ -1,320 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_event_message).
-
--include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/logger.hrl").
--include("emqx_modules.hrl").
-
--export([
- list/0,
- update/1,
- enable/0,
- disable/0,
- post_config_update/5,
- init_conf_handler/0
-]).
-
--export([
- on_client_connected/2,
- on_client_disconnected/3,
- on_client_subscribed/3,
- on_client_unsubscribed/3,
- on_message_dropped/3,
- on_message_delivered/2,
- on_message_acked/2
-]).
-
--ifdef(TEST).
--export([reason/1]).
--endif.
-
-init_conf_handler() ->
- emqx_conf:add_handler([event_message], ?MODULE).
-
-list() ->
- emqx_conf:get([event_message], #{}).
-
-update(Params) ->
- case
- emqx_conf:update(
- [event_message],
- Params,
- #{rawconf_with_defaults => true, override_to => cluster}
- )
- of
- {ok, #{raw_config := NewEventMessage}} ->
- {ok, NewEventMessage};
- {error, Reason} ->
- {error, Reason}
- end.
-
-post_config_update(_KeyPath, _Config, NewConf, _OldConf, _AppEnvs) ->
- disable(),
- enable(maps:to_list(NewConf)).
-
-enable() ->
- enable(maps:to_list(list())).
-
-disable() ->
- foreach_with(fun check_enable/2, fun emqx_hooks:del/2, maps:to_list(list())).
-
-%%--------------------------------------------------------------------
-%% Callbacks
-%%--------------------------------------------------------------------
-
-on_client_connected(ClientInfo, ConnInfo) ->
- Payload0 = common_infos(ClientInfo, ConnInfo),
- Payload = Payload0#{
- keepalive => maps:get(keepalive, ConnInfo, 0),
- clean_start => maps:get(clean_start, ConnInfo, true),
- expiry_interval => maps:get(expiry_interval, ConnInfo, 0)
- },
- publish_event_msg(<<"$event/client_connected">>, Payload).
-
-on_client_disconnected(
- ClientInfo,
- Reason,
- ConnInfo = #{disconnected_at := DisconnectedAt}
-) ->
- Payload0 = common_infos(ClientInfo, ConnInfo),
- Payload = Payload0#{
- reason => reason(Reason),
- disconnected_at => DisconnectedAt
- },
- publish_event_msg(<<"$event/client_disconnected">>, Payload).
-
-on_client_subscribed(
- _ClientInfo = #{
- clientid := ClientId,
- username := Username
- },
- Topic,
- SubOpts
-) ->
- Payload = #{
- clientid => ClientId,
- username => Username,
- topic => Topic,
- subopts => SubOpts,
- ts => erlang:system_time(millisecond)
- },
- publish_event_msg(<<"$event/client_subscribed">>, Payload).
-
-on_client_unsubscribed(
- _ClientInfo = #{
- clientid := ClientId,
- username := Username
- },
- Topic,
- _SubOpts
-) ->
- Payload = #{
- clientid => ClientId,
- username => Username,
- topic => Topic,
- ts => erlang:system_time(millisecond)
- },
- publish_event_msg(<<"$event/client_unsubscribed">>, Payload).
-
-on_message_dropped(Message = #message{from = ClientId}, _, Reason) ->
- case ignore_sys_message(Message) of
- true ->
- ok;
- false ->
- Payload0 = base_message(Message),
- Payload = Payload0#{
- reason => Reason,
- clientid => ClientId,
- username => emqx_message:get_header(username, Message, undefined),
- peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined))
- },
- publish_event_msg(<<"$event/message_dropped">>, Payload)
- end,
- {ok, Message}.
-
-on_message_delivered(
- _ClientInfo = #{
- peerhost := PeerHost,
- clientid := ReceiverCId,
- username := ReceiverUsername
- },
- #message{from = ClientId} = Message
-) ->
- case ignore_sys_message(Message) of
- true ->
- ok;
- false ->
- Payload0 = base_message(Message),
- Payload = Payload0#{
- from_clientid => ClientId,
- from_username => emqx_message:get_header(username, Message, undefined),
- clientid => ReceiverCId,
- username => ReceiverUsername,
- peerhost => ntoa(PeerHost)
- },
- publish_event_msg(<<"$event/message_delivered">>, Payload)
- end,
- {ok, Message}.
-
-on_message_acked(
- _ClientInfo = #{
- peerhost := PeerHost,
- clientid := ReceiverCId,
- username := ReceiverUsername
- },
- #message{from = ClientId} = Message
-) ->
- case ignore_sys_message(Message) of
- true ->
- ok;
- false ->
- Payload0 = base_message(Message),
- Payload = Payload0#{
- from_clientid => ClientId,
- from_username => emqx_message:get_header(username, Message, undefined),
- clientid => ReceiverCId,
- username => ReceiverUsername,
- peerhost => ntoa(PeerHost)
- },
- publish_event_msg(<<"$event/message_acked">>, Payload)
- end,
- {ok, Message}.
-
-%%--------------------------------------------------------------------
-%% Helper functions
-%%--------------------------------------------------------------------
-common_infos(
- _ClientInfo = #{
- clientid := ClientId,
- username := Username,
- peerhost := PeerHost,
- sockport := SockPort
- },
- _ConnInfo = #{
- proto_name := ProtoName,
- proto_ver := ProtoVer,
- connected_at := ConnectedAt
- }
-) ->
- #{
- clientid => ClientId,
- username => Username,
- ipaddress => ntoa(PeerHost),
- sockport => SockPort,
- proto_name => ProtoName,
- proto_ver => ProtoVer,
- connected_at => ConnectedAt,
- ts => erlang:system_time(millisecond)
- }.
-
-make_msg(Topic, Payload) ->
- emqx_message:set_flag(
- sys,
- emqx_message:make(
- ?MODULE, 0, Topic, iolist_to_binary(Payload)
- )
- ).
-
--compile({inline, [reason/1]}).
-reason(Reason) when is_atom(Reason) -> Reason;
-reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
-reason({Error, _}) when is_atom(Error) -> Error;
-reason(_) -> internal_error.
-
-ntoa(undefined) -> undefined;
-ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
-ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
-
-printable_maps(undefined) ->
- #{};
-printable_maps(Headers) ->
- maps:fold(
- fun
- (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname ->
- AccIn#{K => ntoa(V0)};
- ('User-Property', V0, AccIn) when is_list(V0) ->
- AccIn#{
- 'User-Property' => maps:from_list(V0),
- 'User-Property-Pairs' => [
- #{
- key => Key,
- value => Value
- }
- || {Key, Value} <- V0
- ]
- };
- (K, V0, AccIn) ->
- AccIn#{K => V0}
- end,
- #{},
- Headers
- ).
-
-base_message(Message) ->
- #message{
- id = Id,
- qos = QoS,
- flags = Flags,
- topic = Topic,
- headers = Headers,
- payload = Payload,
- timestamp = Timestamp
- } = Message,
- #{
- id => emqx_guid:to_hexstr(Id),
- payload => Payload,
- topic => Topic,
- qos => QoS,
- flags => Flags,
- headers => printable_maps(Headers),
- pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
- publish_received_at => Timestamp
- }.
-
-ignore_sys_message(#message{flags = Flags}) ->
- maps:get(sys, Flags, false).
-
-publish_event_msg(Topic, Payload) ->
- _ = emqx_broker:safe_publish(make_msg(Topic, emqx_json:encode(Payload))),
- ok.
-
-enable(List) ->
- foreach_with(fun check_enable/2, fun emqx_hooks:put/2, List).
-
-check_enable(Handler, {client_connected, true}) ->
- Handler('client.connected', {?MODULE, on_client_connected, []});
-check_enable(Handler, {client_disconnected, true}) ->
- Handler('client.disconnected', {?MODULE, on_client_disconnected, []});
-check_enable(Handler, {client_subscribed, true}) ->
- Handler('session.subscribed', {?MODULE, on_client_subscribed, []});
-check_enable(Handler, {client_unsubscribed, true}) ->
- Handler('session.unsubscribed', {?MODULE, on_client_unsubscribed, []});
-check_enable(Handler, {message_delivered, true}) ->
- Handler('message.delivered', {?MODULE, on_message_delivered, []});
-check_enable(Handler, {message_acked, true}) ->
- Handler('message.acked', {?MODULE, on_message_acked, []});
-check_enable(Handler, {message_dropped, true}) ->
- Handler('message.dropped', {?MODULE, on_message_dropped, []});
-check_enable(_Handler, {_Topic, _Enable}) ->
- ok.
-
-foreach_with(Fun, With, [H | T]) ->
- Fun(With, H),
- foreach_with(Fun, With, T);
-foreach_with(_Fun, _With, []) ->
- ok.
diff --git a/apps/emqx_modules/src/emqx_event_message_api.erl b/apps/emqx_modules/src/emqx_event_message_api.erl
deleted file mode 100644
index c6c71b804..000000000
--- a/apps/emqx_modules/src/emqx_event_message_api.erl
+++ /dev/null
@@ -1,70 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
--module(emqx_event_message_api).
-
--include("emqx_modules.hrl").
-
--behaviour(minirest_api).
-
--import(hoconsc, [mk/2, ref/2]).
-
--export([
- api_spec/0,
- paths/0,
- schema/1
-]).
-
--export([event_message/2]).
-
-api_spec() ->
- emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
-
-paths() ->
- ["/mqtt/event_message"].
-
-schema("/mqtt/event_message") ->
- #{
- 'operationId' => event_message,
- get =>
- #{
- description => <<"Event Message">>,
- tags => ?API_TAG_MQTT,
- responses =>
- #{200 => status_schema(<<"Get Event Message config successfully">>)}
- },
- put =>
- #{
- description => <<"Update Event Message">>,
- tags => ?API_TAG_MQTT,
- 'requestBody' => status_schema(<<"Update Event Message config">>),
- responses =>
- #{200 => status_schema(<<"Update Event Message config successfully">>)}
- }
- }.
-
-status_schema(Desc) ->
- mk(ref(?API_SCHEMA_MODULE, "event_message"), #{in => body, desc => Desc}).
-
-event_message(get, _Params) ->
- {200, emqx_event_message:list()};
-event_message(put, #{body := Body}) ->
- case emqx_event_message:update(Body) of
- {ok, NewConfig} ->
- {200, NewConfig};
- {error, Reason} ->
- Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
- {500, 'INTERNAL_ERROR', Message}
- end.
diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl
index bf8efc8f6..51b314861 100644
--- a/apps/emqx_modules/src/emqx_modules_app.erl
+++ b/apps/emqx_modules/src/emqx_modules_app.erl
@@ -36,7 +36,6 @@ maybe_enable_modules() ->
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(),
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
- emqx_event_message:enable(),
emqx_conf_cli:load(),
ok = emqx_rewrite:enable(),
emqx_topic_metrics:enable(),
@@ -46,7 +45,6 @@ maybe_disable_modules() ->
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(),
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:disable(),
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(),
- emqx_event_message:disable(),
emqx_rewrite:disable(),
emqx_conf_cli:unload(),
emqx_topic_metrics:disable(),
diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl
index 8400311d5..4b27879e2 100644
--- a/apps/emqx_modules/src/emqx_modules_schema.erl
+++ b/apps/emqx_modules/src/emqx_modules_schema.erl
@@ -32,7 +32,6 @@ roots() ->
[
"delayed",
"telemetry",
- "event_message",
array("rewrite"),
array("topic_metrics")
].
@@ -63,76 +62,6 @@ fields("rewrite") ->
)},
{re, fun regular_expression/1}
];
-fields("event_message") ->
- Fields =
- [
- {client_connected,
- sc(
- boolean(),
- #{
- desc => <<"Enable/disable client_connected event messages">>,
- default => false
- }
- )},
- {client_disconnected,
- sc(
- boolean(),
- #{
- desc => <<"Enable/disable client_disconnected event messages">>,
- default => false
- }
- )},
- {client_subscribed,
- sc(
- boolean(),
- #{
- desc => <<"Enable/disable client_subscribed event messages">>,
- default => false
- }
- )},
- {client_unsubscribed,
- sc(
- boolean(),
- #{
- desc => <<"Enable/disable client_unsubscribed event messages">>,
- default => false
- }
- )},
- {message_delivered,
- sc(
- boolean(),
- #{
- desc => <<"Enable/disable message_delivered event messages">>,
- default => false
- }
- )},
- {message_acked,
- sc(
- boolean(),
- #{
- desc => <<"Enable/disable message_acked event messages">>,
- default => false
- }
- )},
- {message_dropped,
- sc(
- boolean(),
- #{
- desc => <<"Enable/disable message_dropped event messages">>,
- default => false
- }
- )}
- ],
- #{
- fields => Fields,
- desc =>
- "Enable/Disable system event messages.\n"
- "The messages are published to $event
prefixed topics.\n"
- "For example, if `client_disconnected` is set to `true`,\n"
- "a message is published to $event/client_connected
topic\n"
- "whenever a client is connected.\n"
- ""
- };
fields("topic_metrics") ->
[{topic, sc(binary(), #{})}].
diff --git a/apps/emqx_modules/src/emqx_modules_sup.erl b/apps/emqx_modules/src/emqx_modules_sup.erl
index a0d71bcbc..42b000eaa 100644
--- a/apps/emqx_modules/src/emqx_modules_sup.erl
+++ b/apps/emqx_modules/src/emqx_modules_sup.erl
@@ -39,7 +39,6 @@ start_link() ->
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([]) ->
- emqx_event_message:init_conf_handler(),
{ok,
{{one_for_one, 10, 3600}, [
?CHILD(emqx_telemetry),
diff --git a/apps/emqx_modules/test/emqx_event_message_SUITE.erl b/apps/emqx_modules/test/emqx_event_message_SUITE.erl
deleted file mode 100644
index beb9175fe..000000000
--- a/apps/emqx_modules/test/emqx_event_message_SUITE.erl
+++ /dev/null
@@ -1,162 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_event_message_SUITE).
-
--compile(export_all).
--compile(nowarn_export_all).
-
--include_lib("emqx/include/emqx_mqtt.hrl").
--include_lib("eunit/include/eunit.hrl").
-
--define(EVENT_MESSAGE, <<
- ""
- "\n"
- "event_message: {\n"
- " client_connected: true\n"
- " client_disconnected: true\n"
- " client_subscribed: true\n"
- " client_unsubscribed: true\n"
- " message_delivered: true\n"
- " message_acked: true\n"
- " message_dropped: true\n"
- "}"
- ""
->>).
-
-all() -> emqx_common_test_helpers:all(?MODULE).
-
-init_per_suite(Config) ->
- emqx_common_test_helpers:boot_modules(all),
- emqx_common_test_helpers:start_apps([emqx_modules]),
- load_config(),
- Config.
-
-load_config() ->
- ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?EVENT_MESSAGE).
-
-end_per_suite(_Config) ->
- emqx_common_test_helpers:stop_apps([emqx_modules]).
-
-t_event_topic(_) ->
- ok = emqx_event_message:enable(),
- {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
- {ok, _} = emqtt:connect(C1),
- {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1),
- {ok, C2} = emqtt:start_link([
- {clientid, <<"clientid">>},
- {username, <<"username">>}
- ]),
- {ok, _} = emqtt:connect(C2),
- ok = recv_connected(<<"clientid">>),
-
- {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_subscribed">>, qos1),
- _ = receive_publish(100),
- timer:sleep(50),
- {ok, _, [?QOS_1]} = emqtt:subscribe(C2, <<"test_sub">>, qos1),
- ok = recv_subscribed(<<"clientid">>),
- emqtt:unsubscribe(C1, <<"$event/client_subscribed">>),
- timer:sleep(50),
-
- {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1),
- {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_acked">>, qos1),
- _ = emqx:publish(emqx_message:make(<<"test">>, ?QOS_1, <<"test_sub">>, <<"test">>)),
- {ok, #{qos := QOS1, topic := Topic1}} = receive_publish(100),
- {ok, #{qos := QOS2, topic := Topic2}} = receive_publish(100),
- recv_message_publish_or_delivered(<<"clientid">>, QOS1, Topic1),
- recv_message_publish_or_delivered(<<"clientid">>, QOS2, Topic2),
- recv_message_acked(<<"clientid">>),
-
- {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1),
- ok = emqtt:publish(C2, <<"test_sub1">>, <<"test">>),
- recv_message_dropped(<<"clientid">>),
-
- {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_unsubscribed">>, qos1),
- _ = emqtt:unsubscribe(C2, <<"test_sub">>),
- ok = recv_unsubscribed(<<"clientid">>),
-
- {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_disconnected">>, qos1),
- ok = emqtt:disconnect(C2),
- ok = recv_disconnected(<<"clientid">>),
- ok = emqtt:disconnect(C1),
- ok = emqx_event_message:disable().
-
-t_reason(_) ->
- ?assertEqual(normal, emqx_event_message:reason(normal)),
- ?assertEqual(discarded, emqx_event_message:reason({shutdown, discarded})),
- ?assertEqual(tcp_error, emqx_event_message:reason({tcp_error, einval})),
- ?assertEqual(internal_error, emqx_event_message:reason(<<"unknown error">>)).
-
-recv_connected(ClientId) ->
- {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
- ?assertMatch(<<"$event/client_connected">>, Topic),
- ?assertMatch(
- #{
- <<"clientid">> := ClientId,
- <<"username">> := <<"username">>,
- <<"ipaddress">> := <<"127.0.0.1">>,
- <<"proto_name">> := <<"MQTT">>,
- <<"proto_ver">> := ?MQTT_PROTO_V4,
- <<"clean_start">> := true,
- <<"expiry_interval">> := 0,
- <<"keepalive">> := 60
- },
- emqx_json:decode(Payload, [return_maps])
- ).
-
-recv_subscribed(_ClientId) ->
- {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
- ?assertMatch(<<"$event/client_subscribed">>, Topic).
-
-recv_message_dropped(_ClientId) ->
- {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
- ?assertMatch(<<"$event/message_dropped">>, Topic).
-
-recv_message_publish_or_delivered(_ClientId, 0, Topic) ->
- ?assertMatch(<<"$event/message_delivered">>, Topic);
-recv_message_publish_or_delivered(_ClientId, 1, Topic) ->
- ?assertMatch(<<"test_sub">>, Topic).
-
-recv_message_acked(_ClientId) ->
- {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
- ?assertMatch(<<"$event/message_acked">>, Topic).
-
-recv_unsubscribed(_ClientId) ->
- {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
- ?assertMatch(<<"$event/client_unsubscribed">>, Topic).
-
-recv_disconnected(ClientId) ->
- {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
- ?assertMatch(<<"$event/client_disconnected">>, Topic),
- ?assertMatch(
- #{
- <<"clientid">> := ClientId,
- <<"username">> := <<"username">>,
- <<"reason">> := <<"normal">>
- },
- emqx_json:decode(Payload, [return_maps])
- ).
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-receive_publish(Timeout) ->
- receive
- {publish, Publish} ->
- {ok, Publish}
- after Timeout -> {error, timeout}
- end.