chore(ft): refactor async reply mechanism
This commit is contained in:
parent
e421c963dc
commit
b0d4a22aa8
|
@ -1252,6 +1252,11 @@ handle_info({disconnect, ReasonCode, ReasonName, Props}, Channel) ->
|
||||||
handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel);
|
handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel);
|
||||||
handle_info({puback, PacketId, PubRes, RC}, Channel) ->
|
handle_info({puback, PacketId, PubRes, RC}, Channel) ->
|
||||||
do_finish_publish(PacketId, PubRes, RC, Channel);
|
do_finish_publish(PacketId, PubRes, RC, Channel);
|
||||||
|
handle_info({'DOWN', Ref, process, Pid, Reason}, Channel) ->
|
||||||
|
case emqx_hooks:run_fold('client.monitored_process_down', [Ref, Pid, Reason], []) of
|
||||||
|
[] -> {ok, Channel};
|
||||||
|
Msgs -> {ok, Msgs, Channel}
|
||||||
|
end;
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
@ -1358,9 +1363,13 @@ handle_timeout(
|
||||||
{_, Quota2} ->
|
{_, Quota2} ->
|
||||||
{ok, clean_timer(quota_timer, Channel#channel{quota = Quota2})}
|
{ok, clean_timer(quota_timer, Channel#channel{quota = Quota2})}
|
||||||
end;
|
end;
|
||||||
handle_timeout(_TRef, Msg, Channel) ->
|
handle_timeout(TRef, Msg, Channel) ->
|
||||||
?SLOG(error, #{msg => "unexpected_timeout", timeout_msg => Msg}),
|
case emqx_hooks:run_fold('client.timeout', [TRef, Msg], []) of
|
||||||
{ok, Channel}.
|
[] ->
|
||||||
|
{ok, Channel};
|
||||||
|
Msgs ->
|
||||||
|
{ok, Msgs, Channel}
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Ensure timers
|
%% Ensure timers
|
||||||
|
|
|
@ -189,7 +189,7 @@ do_unregister_channel({_ClientId, ChanPid} = Chan) ->
|
||||||
true = ets:delete(?CHAN_CONN_TAB, Chan),
|
true = ets:delete(?CHAN_CONN_TAB, Chan),
|
||||||
true = ets:delete(?CHAN_INFO_TAB, Chan),
|
true = ets:delete(?CHAN_INFO_TAB, Chan),
|
||||||
ets:delete_object(?CHAN_TAB, Chan),
|
ets:delete_object(?CHAN_TAB, Chan),
|
||||||
ok = emqx_hooks:run('channel.unregistered', [ChanPid]),
|
ok = emqx_hooks:run('cm.channel.unregistered', [ChanPid]),
|
||||||
true.
|
true.
|
||||||
|
|
||||||
-spec connection_closed(emqx_types:clientid()) -> true.
|
-spec connection_closed(emqx_types:clientid()) -> true.
|
||||||
|
|
|
@ -28,7 +28,10 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
on_message_publish/1,
|
on_message_publish/1,
|
||||||
on_message_puback/4
|
on_message_puback/4,
|
||||||
|
on_client_timeout/3,
|
||||||
|
on_process_down/4,
|
||||||
|
on_channel_unregistered/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -36,8 +39,6 @@
|
||||||
encode_filemeta/1
|
encode_filemeta/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([on_complete/4]).
|
|
||||||
|
|
||||||
-export_type([
|
-export_type([
|
||||||
clientid/0,
|
clientid/0,
|
||||||
transfer/0,
|
transfer/0,
|
||||||
|
@ -85,17 +86,29 @@
|
||||||
checksum => checksum()
|
checksum => checksum()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-define(FT_EVENT(EVENT), {?MODULE, EVENT}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API for app
|
%% API for app
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
hook() ->
|
hook() ->
|
||||||
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST),
|
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST),
|
||||||
ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST).
|
ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST),
|
||||||
|
ok = emqx_hooks:put('client.timeout', {?MODULE, on_client_timeout, []}, ?HP_LOWEST),
|
||||||
|
ok = emqx_hooks:put(
|
||||||
|
'client.monitored_process_down', {?MODULE, on_process_down, []}, ?HP_LOWEST
|
||||||
|
),
|
||||||
|
ok = emqx_hooks:put(
|
||||||
|
'cm.channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST
|
||||||
|
).
|
||||||
|
|
||||||
unhook() ->
|
unhook() ->
|
||||||
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
||||||
ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}).
|
ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}),
|
||||||
|
ok = emqx_hooks:del('client.timeout', {?MODULE, on_client_timeout}),
|
||||||
|
ok = emqx_hooks:del('client.monitored_process_down', {?MODULE, on_process_down}),
|
||||||
|
ok = emqx_hooks:del('cm.channel.unregistered', {?MODULE, on_channel_unregistered}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
|
@ -145,6 +158,25 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) ->
|
||||||
ignore
|
ignore
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
on_channel_unregistered(ChannelPid) ->
|
||||||
|
ok = emqx_ft_async_reply:deregister_all(ChannelPid).
|
||||||
|
|
||||||
|
on_client_timeout(_TRef, ?FT_EVENT({MRef, PacketId}), Acc) ->
|
||||||
|
_ = erlang:demonitor(MRef, [flush]),
|
||||||
|
_ = emqx_ft_async_reply:take_by_mref(MRef),
|
||||||
|
{ok, [{outgoing, ?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)} | Acc]};
|
||||||
|
on_client_timeout(_TRef, _Event, Acc) ->
|
||||||
|
{ok, Acc}.
|
||||||
|
|
||||||
|
on_process_down(MRef, _Pid, Reason, Acc) ->
|
||||||
|
case emqx_ft_async_reply:take_by_mref(MRef) of
|
||||||
|
{ok, PacketId, TRef} ->
|
||||||
|
_ = emqx_utils:cancel_timer(TRef),
|
||||||
|
{ok, [{outgoing, ?PUBACK_PACKET(PacketId, reason_to_rc(Reason))} | Acc]};
|
||||||
|
not_found ->
|
||||||
|
{ok, Acc}
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handlers for transfer messages
|
%% Handlers for transfer messages
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -208,24 +240,13 @@ on_init(PacketId, Msg, Transfer, Meta) ->
|
||||||
transfer => Transfer,
|
transfer => Transfer,
|
||||||
filemeta => Meta
|
filemeta => Meta
|
||||||
}),
|
}),
|
||||||
PacketKey = {self(), PacketId},
|
%% Currently synchronous.
|
||||||
Callback = fun(Result) ->
|
%% If we want to make it async, we need to use `emqx_ft_async_reply`,
|
||||||
?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result)
|
%% like in `on_fin`.
|
||||||
end,
|
case store_filemeta(Transfer, Meta) of
|
||||||
with_responder(PacketKey, Callback, emqx_ft_conf:init_timeout(), fun() ->
|
ok -> ?RC_SUCCESS;
|
||||||
case store_filemeta(Transfer, Meta) of
|
{error, _} -> ?RC_UNSPECIFIED_ERROR
|
||||||
% Stored, ack through the responder right away
|
end.
|
||||||
ok ->
|
|
||||||
emqx_ft_responder:ack(PacketKey, ok);
|
|
||||||
% Storage operation started, packet will be acked by the responder
|
|
||||||
% {async, Pid} ->
|
|
||||||
% ok = emqx_ft_responder:kickoff(PacketKey, Pid),
|
|
||||||
% ok;
|
|
||||||
%% Storage operation failed, ack through the responder
|
|
||||||
{error, _} = Error ->
|
|
||||||
emqx_ft_responder:ack(PacketKey, Error)
|
|
||||||
end
|
|
||||||
end).
|
|
||||||
|
|
||||||
on_abort(_Msg, _FileId) ->
|
on_abort(_Msg, _FileId) ->
|
||||||
%% TODO
|
%% TODO
|
||||||
|
@ -240,21 +261,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
|
||||||
checksum => Checksum
|
checksum => Checksum
|
||||||
}),
|
}),
|
||||||
Segment = {Offset, Msg#message.payload},
|
Segment = {Offset, Msg#message.payload},
|
||||||
PacketKey = {self(), PacketId},
|
%% Currently synchronous.
|
||||||
Callback = fun(Result) ->
|
%% If we want to make it async, we need to use `emqx_ft_async_reply`,
|
||||||
?MODULE:on_complete("store_segment", PacketKey, Transfer, Result)
|
%% like in `on_fin`.
|
||||||
end,
|
case store_segment(Transfer, Segment) of
|
||||||
with_responder(PacketKey, Callback, emqx_ft_conf:store_segment_timeout(), fun() ->
|
ok -> ?RC_SUCCESS;
|
||||||
case store_segment(Transfer, Segment) of
|
{error, _} -> ?RC_UNSPECIFIED_ERROR
|
||||||
ok ->
|
end.
|
||||||
emqx_ft_responder:ack(PacketKey, ok);
|
|
||||||
% {async, Pid} ->
|
|
||||||
% ok = emqx_ft_responder:kickoff(PacketKey, Pid),
|
|
||||||
% ok;
|
|
||||||
{error, _} = Error ->
|
|
||||||
emqx_ft_responder:ack(PacketKey, Error)
|
|
||||||
end
|
|
||||||
end).
|
|
||||||
|
|
||||||
on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
|
on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
|
||||||
?tp(info, "file_transfer_fin", #{
|
?tp(info, "file_transfer_fin", #{
|
||||||
|
@ -265,37 +278,30 @@ on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
|
||||||
checksum => FinalChecksum
|
checksum => FinalChecksum
|
||||||
}),
|
}),
|
||||||
%% TODO: handle checksum? Do we need it?
|
%% TODO: handle checksum? Do we need it?
|
||||||
FinPacketKey = {self(), PacketId},
|
emqx_ft_async_reply:with_new_packet(
|
||||||
Callback = fun(Result) ->
|
PacketId,
|
||||||
?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result)
|
fun() ->
|
||||||
end,
|
case assemble(Transfer, FinalSize, FinalChecksum) of
|
||||||
with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
|
ok ->
|
||||||
case assemble(Transfer, FinalSize, FinalChecksum) of
|
?RC_SUCCESS;
|
||||||
%% Assembling completed, ack through the responder right away
|
%% Assembling started, packet will be acked by monitor or timeout
|
||||||
ok ->
|
{async, Pid} ->
|
||||||
emqx_ft_responder:ack(FinPacketKey, ok);
|
ok = register_async_reply(Pid, PacketId),
|
||||||
%% Assembling started, packet will be acked by the responder
|
ok = emqx_ft_storage:kickoff(Pid),
|
||||||
{async, Pid} ->
|
undefined;
|
||||||
ok = emqx_ft_responder:kickoff(FinPacketKey, Pid),
|
{error, _} ->
|
||||||
ok;
|
?RC_UNSPECIFIED_ERROR
|
||||||
%% Assembling failed, ack through the responder
|
end
|
||||||
{error, _} = Error ->
|
end,
|
||||||
emqx_ft_responder:ack(FinPacketKey, Error)
|
undefined
|
||||||
end
|
).
|
||||||
end).
|
|
||||||
|
|
||||||
with_responder(Key, Callback, Timeout, CriticalSection) ->
|
register_async_reply(Pid, PacketId) ->
|
||||||
case emqx_ft_responder:start(Key, Callback, Timeout) of
|
MRef = erlang:monitor(process, Pid),
|
||||||
%% We have new packet
|
TRef = erlang:start_timer(
|
||||||
{ok, _} ->
|
emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, PacketId})
|
||||||
CriticalSection();
|
),
|
||||||
%% Packet already received.
|
ok = emqx_ft_async_reply:register(PacketId, MRef, TRef).
|
||||||
%% Since we are still handling the previous one,
|
|
||||||
%% we probably have retransmit here
|
|
||||||
{error, {already_started, _}} ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
undefined.
|
|
||||||
|
|
||||||
store_filemeta(Transfer, Segment) ->
|
store_filemeta(Transfer, Segment) ->
|
||||||
try
|
try
|
||||||
|
@ -335,28 +341,6 @@ transfer(Msg, FileId) ->
|
||||||
ClientId = Msg#message.from,
|
ClientId = Msg#message.from,
|
||||||
{clientid_to_binary(ClientId), FileId}.
|
{clientid_to_binary(ClientId), FileId}.
|
||||||
|
|
||||||
on_complete(Op, {ChanPid, PacketId}, Transfer, Result) ->
|
|
||||||
?tp(debug, "on_complete", #{
|
|
||||||
operation => Op,
|
|
||||||
packet_id => PacketId,
|
|
||||||
transfer => Transfer
|
|
||||||
}),
|
|
||||||
case Result of
|
|
||||||
{Mode, ok} when Mode == ack orelse Mode == down ->
|
|
||||||
erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
|
|
||||||
{Mode, {error, _} = Reason} when Mode == ack orelse Mode == down ->
|
|
||||||
?tp(error, Op ++ "_failed", #{
|
|
||||||
transfer => Transfer,
|
|
||||||
reason => Reason
|
|
||||||
}),
|
|
||||||
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR});
|
|
||||||
timeout ->
|
|
||||||
?tp(error, Op ++ "_timed_out", #{
|
|
||||||
transfer => Transfer
|
|
||||||
}),
|
|
||||||
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})
|
|
||||||
end.
|
|
||||||
|
|
||||||
validate(Validations, Fun) ->
|
validate(Validations, Fun) ->
|
||||||
case do_validate(Validations, []) of
|
case do_validate(Validations, []) of
|
||||||
{ok, Parsed} ->
|
{ok, Parsed} ->
|
||||||
|
@ -429,3 +413,20 @@ clientid_to_binary(A) when is_atom(A) ->
|
||||||
atom_to_binary(A);
|
atom_to_binary(A);
|
||||||
clientid_to_binary(B) when is_binary(B) ->
|
clientid_to_binary(B) when is_binary(B) ->
|
||||||
B.
|
B.
|
||||||
|
|
||||||
|
reason_to_rc(Reason) ->
|
||||||
|
case map_down_reason(Reason) of
|
||||||
|
ok -> ?RC_SUCCESS;
|
||||||
|
{error, _} -> ?RC_UNSPECIFIED_ERROR
|
||||||
|
end.
|
||||||
|
|
||||||
|
map_down_reason(normal) ->
|
||||||
|
ok;
|
||||||
|
map_down_reason(shutdown) ->
|
||||||
|
ok;
|
||||||
|
map_down_reason({shutdown, Result}) ->
|
||||||
|
Result;
|
||||||
|
map_down_reason(noproc) ->
|
||||||
|
{error, noproc};
|
||||||
|
map_down_reason(Error) ->
|
||||||
|
{error, {internal_error, Error}}.
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
|
|
||||||
start(_StartType, _StartArgs) ->
|
start(_StartType, _StartArgs) ->
|
||||||
{ok, Sup} = emqx_ft_sup:start_link(),
|
{ok, Sup} = emqx_ft_sup:start_link(),
|
||||||
|
ok = emqx_ft_async_reply:create_table(),
|
||||||
ok = emqx_ft_conf:load(),
|
ok = emqx_ft_conf:load(),
|
||||||
{ok, Sup}.
|
{ok, Sup}.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,106 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_ft_async_reply).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx/include/types.hrl").
|
||||||
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
|
||||||
|
-export([
|
||||||
|
create_tables/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
register/3,
|
||||||
|
take_by_mref/1,
|
||||||
|
with_new_packet/3,
|
||||||
|
deregister_all/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-type channel_pid() :: pid().
|
||||||
|
-type mon_ref() :: reference().
|
||||||
|
-type timer_ref() :: reference().
|
||||||
|
-type packet_id() :: emqx_types:packet_id().
|
||||||
|
|
||||||
|
%% packets waiting for async workers
|
||||||
|
|
||||||
|
-define(WORKER_TAB, emqx_ft_async_mons).
|
||||||
|
-define(WORKER_KEY(MRef), ?WORKER_KEY(self(), MRef)).
|
||||||
|
-define(WORKER_KEY(ChannelPid, MRef), {ChannelPid, MRef}).
|
||||||
|
|
||||||
|
%% async worker monitors by packet ids
|
||||||
|
|
||||||
|
-define(PACKET_TAB, emqx_ft_async_packets).
|
||||||
|
-define(PACKET_KEY(PacketId), ?PACKET_KEY(self(), PacketId)).
|
||||||
|
-define(PACKET_KEY(ChannelPid, PacketId), {ChannelPid, PacketId}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec create_tables() -> ok.
|
||||||
|
create_tables() ->
|
||||||
|
_ = ets:new(?WORKER_TAB, [named_table, public, ordered_set]),
|
||||||
|
_ = ets:new(?PACKET_TAB, [named_table, public, ordered_set]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec register(packet_id(), mon_ref(), timer_ref()) -> ok.
|
||||||
|
register(PacketId, MRef, TRef) ->
|
||||||
|
_ = ets:insert(?PACKET_TAB, {?PACKET_KEY(PacketId), MRef}),
|
||||||
|
_ = ets:insert(?WORKER_TAB, {?WORKER_KEY(MRef), PacketId, TRef}),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any().
|
||||||
|
with_new_packet(PacketId, Fun, Default) ->
|
||||||
|
case ets:member(?PACKET_TAB, ?PACKET_KEY(PacketId)) of
|
||||||
|
true -> Default;
|
||||||
|
false -> Fun()
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec take_by_mref(mon_ref()) -> {ok, packet_id(), timer_ref()} | not_found.
|
||||||
|
take_by_mref(MRef) ->
|
||||||
|
case ets:take(?WORKER_TAB, ?WORKER_KEY(MRef)) of
|
||||||
|
[{_, PacketId, TRef}] ->
|
||||||
|
_ = ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)),
|
||||||
|
{ok, PacketId, TRef};
|
||||||
|
[] ->
|
||||||
|
not_found
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec deregister_all(channel_pid()) -> ok.
|
||||||
|
deregister_all(ChannelPid) ->
|
||||||
|
ok = deregister_packets(ChannelPid),
|
||||||
|
ok = deregister_mons(ChannelPid),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec info() -> {non_neg_integer(), non_neg_integer()}.
|
||||||
|
info() ->
|
||||||
|
{ets:info(?MON_TAB, size), ets:info(?PACKET_TAB, size)}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal
|
||||||
|
%%-------------------------------------------------------------------
|
||||||
|
|
||||||
|
deregister_packets(ChannelPid) when is_pid(ChannelPid) ->
|
||||||
|
MS = [{{?PACKET_KEY(ChannelPid, '_'), '_'}, [], [true]}],
|
||||||
|
_ = ets:select_delete(?PACKET_TAB, MS),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
deregister_mons(ChannelPid) ->
|
||||||
|
MS = [{{?MON_KEY(ChannelPid, '_'), '_', '_'}, [], [true]}],
|
||||||
|
_ = ets:select_delete(?MON_TAB, MS),
|
||||||
|
ok.
|
|
@ -1,116 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_ft_responder).
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
-include_lib("emqx/include/types.hrl").
|
|
||||||
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start/3]).
|
|
||||||
-export([kickoff/2]).
|
|
||||||
-export([ack/2]).
|
|
||||||
|
|
||||||
%% Supervisor API
|
|
||||||
-export([start_link/3]).
|
|
||||||
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
|
||||||
|
|
||||||
-define(REF(Key), {via, gproc, {n, l, {?MODULE, Key}}}).
|
|
||||||
|
|
||||||
-type key() :: term().
|
|
||||||
-type respfun() :: fun(({ack, _Result} | {down, _Result} | timeout) -> _SideEffect).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% API
|
|
||||||
%% -------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec start(key(), respfun(), timeout()) -> startlink_ret().
|
|
||||||
start(Key, RespFun, Timeout) ->
|
|
||||||
emqx_ft_responder_sup:start_child(Key, RespFun, Timeout).
|
|
||||||
|
|
||||||
-spec kickoff(key(), pid()) -> ok.
|
|
||||||
kickoff(Key, Pid) ->
|
|
||||||
gen_server:call(?REF(Key), {kickoff, Pid}).
|
|
||||||
|
|
||||||
-spec ack(key(), _Result) -> _Return.
|
|
||||||
ack(Key, Result) ->
|
|
||||||
% TODO: it's possible to avoid term copy
|
|
||||||
gen_server:call(?REF(Key), {ack, Result}, infinity).
|
|
||||||
|
|
||||||
-spec start_link(key(), timeout(), respfun()) -> startlink_ret().
|
|
||||||
start_link(Key, RespFun, Timeout) ->
|
|
||||||
gen_server:start_link(?REF(Key), ?MODULE, {Key, RespFun, Timeout}, []).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% gen_server callbacks
|
|
||||||
%% -------------------------------------------------------------------
|
|
||||||
|
|
||||||
init({Key, RespFun, Timeout}) ->
|
|
||||||
_ = erlang:process_flag(trap_exit, true),
|
|
||||||
_TRef = erlang:send_after(Timeout, self(), timeout),
|
|
||||||
{ok, {Key, RespFun}}.
|
|
||||||
|
|
||||||
handle_call({kickoff, Pid}, _From, St) ->
|
|
||||||
% TODO: more state?
|
|
||||||
_MRef = erlang:monitor(process, Pid),
|
|
||||||
_ = Pid ! kickoff,
|
|
||||||
{reply, ok, St};
|
|
||||||
handle_call({ack, Result}, _From, {Key, RespFun}) ->
|
|
||||||
Ret = apply(RespFun, [{ack, Result}]),
|
|
||||||
?tp(debug, ft_responder_ack, #{key => Key, result => Result, return => Ret}),
|
|
||||||
{stop, {shutdown, Ret}, Ret, undefined};
|
|
||||||
handle_call(Msg, _From, State) ->
|
|
||||||
?SLOG(warning, #{msg => "unknown_call", call_msg => Msg}),
|
|
||||||
{reply, {error, unknown_call}, State}.
|
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
|
||||||
?SLOG(warning, #{msg => "unknown_cast", cast_msg => Msg}),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
handle_info(timeout, {Key, RespFun}) ->
|
|
||||||
Ret = apply(RespFun, [timeout]),
|
|
||||||
?tp(debug, ft_responder_timeout, #{key => Key, return => Ret}),
|
|
||||||
{stop, {shutdown, Ret}, undefined};
|
|
||||||
handle_info({'DOWN', _MRef, process, _Pid, Reason}, {Key, RespFun}) ->
|
|
||||||
Ret = apply(RespFun, [{down, map_down_reason(Reason)}]),
|
|
||||||
?tp(debug, ft_responder_procdown, #{key => Key, reason => Reason, return => Ret}),
|
|
||||||
{stop, {shutdown, Ret}, undefined};
|
|
||||||
handle_info(Msg, State) ->
|
|
||||||
?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
terminate(_Reason, undefined) ->
|
|
||||||
ok;
|
|
||||||
terminate(Reason, {Key, RespFun}) ->
|
|
||||||
Ret = apply(RespFun, [timeout]),
|
|
||||||
?tp(debug, ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
map_down_reason(normal) ->
|
|
||||||
ok;
|
|
||||||
map_down_reason(shutdown) ->
|
|
||||||
ok;
|
|
||||||
map_down_reason({shutdown, Result}) ->
|
|
||||||
Result;
|
|
||||||
map_down_reason(noproc) ->
|
|
||||||
{error, noproc};
|
|
||||||
map_down_reason(Error) ->
|
|
||||||
{error, {internal_error, Error}}.
|
|
|
@ -1,48 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_ft_responder_sup).
|
|
||||||
|
|
||||||
-export([start_link/0]).
|
|
||||||
-export([start_child/3]).
|
|
||||||
|
|
||||||
-behaviour(supervisor).
|
|
||||||
-export([init/1]).
|
|
||||||
|
|
||||||
-define(SUPERVISOR, ?MODULE).
|
|
||||||
|
|
||||||
%%
|
|
||||||
|
|
||||||
-spec start_link() -> {ok, pid()}.
|
|
||||||
start_link() ->
|
|
||||||
supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
|
|
||||||
|
|
||||||
start_child(Key, RespFun, Timeout) ->
|
|
||||||
supervisor:start_child(?SUPERVISOR, [Key, RespFun, Timeout]).
|
|
||||||
|
|
||||||
-spec init(_) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
|
|
||||||
init(_) ->
|
|
||||||
Flags = #{
|
|
||||||
strategy => simple_one_for_one,
|
|
||||||
intensity => 100,
|
|
||||||
period => 100
|
|
||||||
},
|
|
||||||
ChildSpec = #{
|
|
||||||
id => responder,
|
|
||||||
start => {emqx_ft_responder, start_link, []},
|
|
||||||
restart => temporary
|
|
||||||
},
|
|
||||||
{ok, {Flags, [ChildSpec]}}.
|
|
|
@ -23,6 +23,7 @@
|
||||||
store_filemeta/2,
|
store_filemeta/2,
|
||||||
store_segment/2,
|
store_segment/2,
|
||||||
assemble/3,
|
assemble/3,
|
||||||
|
kickoff/1,
|
||||||
|
|
||||||
files/0,
|
files/0,
|
||||||
files/1,
|
files/1,
|
||||||
|
@ -121,6 +122,13 @@ store_segment(Transfer, Segment) ->
|
||||||
assemble(Transfer, Size, FinOpts) ->
|
assemble(Transfer, Size, FinOpts) ->
|
||||||
dispatch(assemble, [Transfer, Size, FinOpts]).
|
dispatch(assemble, [Transfer, Size, FinOpts]).
|
||||||
|
|
||||||
|
-spec kickoff(pid()) -> ok.
|
||||||
|
kickoff(Pid) ->
|
||||||
|
_ = erlang:send(Pid, kickoff),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
-spec files() ->
|
-spec files() ->
|
||||||
{ok, page(file_info(), _)} | {error, term()}.
|
{ok, page(file_info(), _)} | {error, term()}.
|
||||||
files() ->
|
files() ->
|
||||||
|
|
|
@ -52,14 +52,5 @@ init([]) ->
|
||||||
modules => [emqx_ft_storage_fs_reader_sup]
|
modules => [emqx_ft_storage_fs_reader_sup]
|
||||||
},
|
},
|
||||||
|
|
||||||
Responder = #{
|
ChildSpecs = [AssemblerSup, FileReaderSup],
|
||||||
id => emqx_ft_responder_sup,
|
|
||||||
start => {emqx_ft_responder_sup, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
shutdown => infinity,
|
|
||||||
type => worker,
|
|
||||||
modules => [emqx_ft_responder_sup]
|
|
||||||
},
|
|
||||||
|
|
||||||
ChildSpecs = [Responder, AssemblerSup, FileReaderSup],
|
|
||||||
{ok, {SupFlags, ChildSpecs}}.
|
{ok, {SupFlags, ChildSpecs}}.
|
||||||
|
|
|
@ -37,7 +37,7 @@ all() ->
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
[
|
[
|
||||||
{single_node, [parallel], [
|
{single_node, [], [
|
||||||
t_assemble_crash,
|
t_assemble_crash,
|
||||||
t_corrupted_segment_retry,
|
t_corrupted_segment_retry,
|
||||||
t_invalid_checksum,
|
t_invalid_checksum,
|
||||||
|
|
|
@ -1,84 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_ft_responder_SUITE).
|
|
||||||
|
|
||||||
-compile(export_all).
|
|
||||||
-compile(nowarn_export_all).
|
|
||||||
|
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)),
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
|
||||||
ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
init_per_testcase(_Case, Config) ->
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_testcase(_Case, _Config) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_start_ack(_Config) ->
|
|
||||||
Key = <<"test">>,
|
|
||||||
DefaultAction = fun({ack, Ref}) -> Ref end,
|
|
||||||
?assertMatch(
|
|
||||||
{ok, _Pid},
|
|
||||||
emqx_ft_responder:start(Key, DefaultAction, 1000)
|
|
||||||
),
|
|
||||||
?assertMatch(
|
|
||||||
{error, {already_started, _Pid}},
|
|
||||||
emqx_ft_responder:start(Key, DefaultAction, 1000)
|
|
||||||
),
|
|
||||||
Ref = make_ref(),
|
|
||||||
?assertEqual(
|
|
||||||
Ref,
|
|
||||||
emqx_ft_responder:ack(Key, Ref)
|
|
||||||
),
|
|
||||||
?assertExit(
|
|
||||||
{noproc, _},
|
|
||||||
emqx_ft_responder:ack(Key, Ref)
|
|
||||||
).
|
|
||||||
|
|
||||||
t_timeout(_Config) ->
|
|
||||||
Key = <<"test">>,
|
|
||||||
Self = self(),
|
|
||||||
DefaultAction = fun(timeout) -> Self ! {timeout, Key} end,
|
|
||||||
{ok, _Pid} = emqx_ft_responder:start(Key, DefaultAction, 20),
|
|
||||||
receive
|
|
||||||
{timeout, Key} ->
|
|
||||||
ok
|
|
||||||
after 100 ->
|
|
||||||
ct:fail("emqx_ft_responder not called")
|
|
||||||
end,
|
|
||||||
?assertExit(
|
|
||||||
{noproc, _},
|
|
||||||
emqx_ft_responder:ack(Key, oops)
|
|
||||||
).
|
|
||||||
|
|
||||||
t_unknown_msgs(_Config) ->
|
|
||||||
{ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_) -> ok end, 100),
|
|
||||||
Pid ! {unknown_msg, <<"test">>},
|
|
||||||
ok = gen_server:cast(Pid, {unknown_msg, <<"test">>}),
|
|
||||||
?assertEqual(
|
|
||||||
{error, unknown_call},
|
|
||||||
gen_server:call(Pid, {unknown_call, <<"test">>})
|
|
||||||
).
|
|
Loading…
Reference in New Issue