Merge pull request #12514 from savonarola/0214-fix-ft-responses
fix(ft): report ft assemble status from a dedicated process
This commit is contained in:
commit
c42583550d
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_ft, [
|
{application, emqx_ft, [
|
||||||
{description, "EMQX file transfer over MQTT"},
|
{description, "EMQX file transfer over MQTT"},
|
||||||
{vsn, "0.1.14"},
|
{vsn, "0.1.15"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_ft_app, []}},
|
{mod, {emqx_ft_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -30,10 +30,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
on_message_publish/1,
|
on_message_publish/1,
|
||||||
on_message_puback/4,
|
on_message_puback/4
|
||||||
on_client_timeout/3,
|
|
||||||
on_process_down/4,
|
|
||||||
on_channel_unregistered/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -88,8 +85,6 @@
|
||||||
checksum => checksum()
|
checksum => checksum()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-define(FT_EVENT(EVENT), {?MODULE, EVENT}).
|
|
||||||
|
|
||||||
-define(ACK_AND_PUBLISH(Result), {true, Result}).
|
-define(ACK_AND_PUBLISH(Result), {true, Result}).
|
||||||
-define(ACK(Result), {false, Result}).
|
-define(ACK(Result), {false, Result}).
|
||||||
-define(DELAY_ACK, delay).
|
-define(DELAY_ACK, delay).
|
||||||
|
@ -100,21 +95,11 @@
|
||||||
|
|
||||||
hook() ->
|
hook() ->
|
||||||
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST),
|
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST),
|
||||||
ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST),
|
ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST).
|
||||||
ok = emqx_hooks:put('client.timeout', {?MODULE, on_client_timeout, []}, ?HP_LOWEST),
|
|
||||||
ok = emqx_hooks:put(
|
|
||||||
'client.monitored_process_down', {?MODULE, on_process_down, []}, ?HP_LOWEST
|
|
||||||
),
|
|
||||||
ok = emqx_hooks:put(
|
|
||||||
'cm.channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST
|
|
||||||
).
|
|
||||||
|
|
||||||
unhook() ->
|
unhook() ->
|
||||||
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
||||||
ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}),
|
ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}).
|
||||||
ok = emqx_hooks:del('client.timeout', {?MODULE, on_client_timeout}),
|
|
||||||
ok = emqx_hooks:del('client.monitored_process_down', {?MODULE, on_process_down}),
|
|
||||||
ok = emqx_hooks:del('cm.channel.unregistered', {?MODULE, on_channel_unregistered}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
|
@ -152,40 +137,6 @@ on_message_puback(PacketId, #message{from = From, topic = Topic} = Msg, _PubRes,
|
||||||
ignore
|
ignore
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_channel_unregistered(ChannelPid) ->
|
|
||||||
ok = emqx_ft_async_reply:deregister_all(ChannelPid).
|
|
||||||
|
|
||||||
on_client_timeout(_TRef0, ?FT_EVENT({MRef, TopicReplyData}), Acc) ->
|
|
||||||
_ = erlang:demonitor(MRef, [flush]),
|
|
||||||
Result = {error, timeout},
|
|
||||||
_ = publish_response(Result, TopicReplyData),
|
|
||||||
case emqx_ft_async_reply:take_by_mref(MRef) of
|
|
||||||
{ok, undefined, _TRef1, _TopicReplyData} ->
|
|
||||||
{stop, Acc};
|
|
||||||
{ok, PacketId, _TRef1, _TopicReplyData} ->
|
|
||||||
{stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]};
|
|
||||||
not_found ->
|
|
||||||
{ok, Acc}
|
|
||||||
end;
|
|
||||||
on_client_timeout(_TRef, _Event, Acc) ->
|
|
||||||
{ok, Acc}.
|
|
||||||
|
|
||||||
on_process_down(MRef, _Pid, DownReason, Acc) ->
|
|
||||||
case emqx_ft_async_reply:take_by_mref(MRef) of
|
|
||||||
{ok, PacketId, TRef, TopicReplyData} ->
|
|
||||||
_ = emqx_utils:cancel_timer(TRef),
|
|
||||||
Result = down_reason_to_result(DownReason),
|
|
||||||
_ = publish_response(Result, TopicReplyData),
|
|
||||||
case PacketId of
|
|
||||||
undefined ->
|
|
||||||
{stop, Acc};
|
|
||||||
_ ->
|
|
||||||
{stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]}
|
|
||||||
end;
|
|
||||||
not_found ->
|
|
||||||
{ok, Acc}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handlers for transfer messages
|
%% Handlers for transfer messages
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -253,7 +204,7 @@ on_init(#{packet_id := PacketId}, Msg, Transfer, Meta) ->
|
||||||
filemeta => Meta
|
filemeta => Meta
|
||||||
}),
|
}),
|
||||||
%% Currently synchronous.
|
%% Currently synchronous.
|
||||||
%% If we want to make it async, we need to use `emqx_ft_async_reply`,
|
%% If we want to make it async, we need to use `with_responder`,
|
||||||
%% like in `on_fin`.
|
%% like in `on_fin`.
|
||||||
?ACK_AND_PUBLISH(store_filemeta(Transfer, Meta)).
|
?ACK_AND_PUBLISH(store_filemeta(Transfer, Meta)).
|
||||||
|
|
||||||
|
@ -271,11 +222,13 @@ on_segment(#{packet_id := PacketId}, Msg, Transfer, Offset, Checksum) ->
|
||||||
}),
|
}),
|
||||||
Segment = {Offset, Msg#message.payload},
|
Segment = {Offset, Msg#message.payload},
|
||||||
%% Currently synchronous.
|
%% Currently synchronous.
|
||||||
%% If we want to make it async, we need to use `emqx_ft_async_reply`,
|
%% If we want to make it async, we need to use `with_responder`,
|
||||||
%% like in `on_fin`.
|
%% like in `on_fin`.
|
||||||
?ACK_AND_PUBLISH(store_segment(Transfer, Segment)).
|
?ACK_AND_PUBLISH(store_segment(Transfer, Segment)).
|
||||||
|
|
||||||
on_fin(#{packet_id := PacketId} = TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum) ->
|
on_fin(
|
||||||
|
#{packet_id := PacketId, mode := Mode} = TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum
|
||||||
|
) ->
|
||||||
?tp(info, "file_transfer_fin", #{
|
?tp(info, "file_transfer_fin", #{
|
||||||
mqtt_msg => Msg,
|
mqtt_msg => Msg,
|
||||||
packet_id => PacketId,
|
packet_id => PacketId,
|
||||||
|
@ -283,39 +236,78 @@ on_fin(#{packet_id := PacketId} = TopicReplyData, Msg, Transfer, FinalSize, Fina
|
||||||
final_size => FinalSize,
|
final_size => FinalSize,
|
||||||
checksum => FinalChecksum
|
checksum => FinalChecksum
|
||||||
}),
|
}),
|
||||||
%% TODO: handle checksum? Do we need it?
|
FinPacketKey = {self(), PacketId},
|
||||||
with_new_packet(
|
Callback = fun(Result) ->
|
||||||
TopicReplyData,
|
on_complete("assemble", TopicReplyData, FinPacketKey, Transfer, Result)
|
||||||
PacketId,
|
end,
|
||||||
fun() ->
|
with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
|
||||||
case assemble(Transfer, FinalSize, FinalChecksum) of
|
case assemble(Transfer, FinalSize, FinalChecksum) of
|
||||||
ok ->
|
%% Assembling completed, ack through the responder right away
|
||||||
?ACK_AND_PUBLISH(ok);
|
ok ->
|
||||||
%% Assembling started, packet will be acked/replied by monitor or timeout
|
emqx_ft_responder:ack(FinPacketKey, ok),
|
||||||
{async, Pid} ->
|
?DELAY_ACK;
|
||||||
register_async_worker(Pid, TopicReplyData);
|
%% Assembling started, packet will be acked by the responder
|
||||||
{error, _} = Error ->
|
{async, Pid} ->
|
||||||
?ACK_AND_PUBLISH(Error)
|
ok = emqx_ft_responder:kickoff(FinPacketKey, Pid),
|
||||||
end
|
ack_if_async(Mode);
|
||||||
|
%% Assembling failed, ack through the responder
|
||||||
|
{error, _} = Error ->
|
||||||
|
emqx_ft_responder:ack(FinPacketKey, Error),
|
||||||
|
?DELAY_ACK
|
||||||
end
|
end
|
||||||
).
|
end).
|
||||||
|
|
||||||
register_async_worker(Pid, #{mode := Mode, packet_id := PacketId} = TopicReplyData) ->
|
with_responder(Key, Callback, Timeout, CriticalSection) ->
|
||||||
MRef = erlang:monitor(process, Pid),
|
case emqx_ft_responder:start(Key, Callback, Timeout) of
|
||||||
TRef = erlang:start_timer(
|
%% We have new packet
|
||||||
emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, TopicReplyData})
|
{ok, _} ->
|
||||||
),
|
CriticalSection();
|
||||||
case Mode of
|
%% Packet already received.
|
||||||
async ->
|
%% Since we are still handling the previous one,
|
||||||
ok = emqx_ft_async_reply:register(MRef, TRef, TopicReplyData),
|
%% we probably have retransmit here
|
||||||
ok = emqx_ft_storage:kickoff(Pid),
|
{error, {already_started, _}} ->
|
||||||
?ACK(ok);
|
|
||||||
sync ->
|
|
||||||
ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, TopicReplyData),
|
|
||||||
ok = emqx_ft_storage:kickoff(Pid),
|
|
||||||
?DELAY_ACK
|
?DELAY_ACK
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
ack_if_async(sync) ->
|
||||||
|
?DELAY_ACK;
|
||||||
|
ack_if_async(async) ->
|
||||||
|
?ACK(ok).
|
||||||
|
|
||||||
|
on_complete(Op, #{mode := Mode} = TopicReplyData, {ChanPid, PacketId}, Transfer, ResponderResult) ->
|
||||||
|
?tp(debug, "on_complete", #{
|
||||||
|
operation => Op,
|
||||||
|
packet_id => PacketId,
|
||||||
|
transfer => Transfer
|
||||||
|
}),
|
||||||
|
Result =
|
||||||
|
case ResponderResult of
|
||||||
|
{RespMode, ok} when RespMode == ack orelse RespMode == down ->
|
||||||
|
ok;
|
||||||
|
{RespMode, {error, _} = Reason} when RespMode == ack orelse RespMode == down ->
|
||||||
|
?tp(error, Op ++ "_failed", #{
|
||||||
|
transfer => Transfer,
|
||||||
|
reason => Reason
|
||||||
|
}),
|
||||||
|
Reason;
|
||||||
|
timeout ->
|
||||||
|
?tp(error, Op ++ "_timed_out", #{
|
||||||
|
transfer => Transfer
|
||||||
|
}),
|
||||||
|
{error, timeout}
|
||||||
|
end,
|
||||||
|
NeedAck =
|
||||||
|
case {ResponderResult, Mode} of
|
||||||
|
{{down, _}, async} -> false;
|
||||||
|
{timeout, async} -> false;
|
||||||
|
_ -> true
|
||||||
|
end,
|
||||||
|
NeedAck andalso ack_packet(ChanPid, PacketId, Result),
|
||||||
|
_ = publish_response(Result, TopicReplyData).
|
||||||
|
|
||||||
|
ack_packet(ChanPid, PacketId, Result) ->
|
||||||
|
erlang:send(ChanPid, {puback, PacketId, [], result_to_rc(Result)}).
|
||||||
|
|
||||||
topic_reply_data(Mode, From, PacketId, #message{topic = Topic, headers = Headers}) ->
|
topic_reply_data(Mode, From, PacketId, #message{topic = Topic, headers = Headers}) ->
|
||||||
Props = maps:get(properties, Headers, #{}),
|
Props = maps:get(properties, Headers, #{}),
|
||||||
#{
|
#{
|
||||||
|
@ -483,19 +475,3 @@ clientid_to_binary(A) when is_atom(A) ->
|
||||||
atom_to_binary(A);
|
atom_to_binary(A);
|
||||||
clientid_to_binary(B) when is_binary(B) ->
|
clientid_to_binary(B) when is_binary(B) ->
|
||||||
B.
|
B.
|
||||||
|
|
||||||
down_reason_to_result(normal) ->
|
|
||||||
ok;
|
|
||||||
down_reason_to_result(shutdown) ->
|
|
||||||
ok;
|
|
||||||
down_reason_to_result({shutdown, Result}) ->
|
|
||||||
Result;
|
|
||||||
down_reason_to_result(noproc) ->
|
|
||||||
{error, noproc};
|
|
||||||
down_reason_to_result(Error) ->
|
|
||||||
{error, {internal_error, Error}}.
|
|
||||||
|
|
||||||
with_new_packet(#{mode := async}, _PacketId, Fun) ->
|
|
||||||
Fun();
|
|
||||||
with_new_packet(#{mode := sync}, PacketId, Fun) ->
|
|
||||||
emqx_ft_async_reply:with_new_packet(PacketId, Fun, undefined).
|
|
||||||
|
|
|
@ -1,122 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_ft_async_reply).
|
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
-include_lib("emqx/include/types.hrl").
|
|
||||||
-include_lib("stdlib/include/ms_transform.hrl").
|
|
||||||
|
|
||||||
-export([
|
|
||||||
create_tables/0,
|
|
||||||
info/0
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
register/3,
|
|
||||||
register/4,
|
|
||||||
take_by_mref/1,
|
|
||||||
with_new_packet/3,
|
|
||||||
deregister_all/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-type channel_pid() :: pid().
|
|
||||||
-type mon_ref() :: reference().
|
|
||||||
-type timer_ref() :: reference().
|
|
||||||
-type packet_id() :: emqx_types:packet_id().
|
|
||||||
|
|
||||||
%% packets waiting for async workers
|
|
||||||
|
|
||||||
-define(MON_TAB, emqx_ft_async_mons).
|
|
||||||
-define(MON_KEY(MRef), ?MON_KEY(self(), MRef)).
|
|
||||||
-define(MON_KEY(ChannelPid, MRef), {ChannelPid, MRef}).
|
|
||||||
-define(MON_RECORD(KEY, PACKET_ID, TREF, DATA), {KEY, PACKET_ID, TREF, DATA}).
|
|
||||||
|
|
||||||
%% async worker monitors by packet ids
|
|
||||||
|
|
||||||
-define(PACKET_TAB, emqx_ft_async_packets).
|
|
||||||
-define(PACKET_KEY(PacketId), ?PACKET_KEY(self(), PacketId)).
|
|
||||||
-define(PACKET_KEY(ChannelPid, PacketId), {ChannelPid, PacketId}).
|
|
||||||
-define(PACKET_RECORD(KEY, MREF, DATA), {KEY, MREF, DATA}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% API
|
|
||||||
%% -------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec create_tables() -> ok.
|
|
||||||
create_tables() ->
|
|
||||||
EtsOptions = [
|
|
||||||
named_table,
|
|
||||||
public,
|
|
||||||
ordered_set,
|
|
||||||
{read_concurrency, true},
|
|
||||||
{write_concurrency, true}
|
|
||||||
],
|
|
||||||
ok = emqx_utils_ets:new(?MON_TAB, EtsOptions),
|
|
||||||
ok = emqx_utils_ets:new(?PACKET_TAB, EtsOptions),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
-spec register(packet_id(), mon_ref(), timer_ref(), term()) -> ok.
|
|
||||||
register(PacketId, MRef, TRef, Data) ->
|
|
||||||
_ = ets:insert(?PACKET_TAB, ?PACKET_RECORD(?PACKET_KEY(PacketId), MRef, Data)),
|
|
||||||
_ = ets:insert(?MON_TAB, ?MON_RECORD(?MON_KEY(MRef), PacketId, TRef, Data)),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
-spec register(mon_ref(), timer_ref(), term()) -> ok.
|
|
||||||
register(MRef, TRef, Data) ->
|
|
||||||
_ = ets:insert(?MON_TAB, ?MON_RECORD(?MON_KEY(MRef), undefined, TRef, Data)),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
-spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any().
|
|
||||||
with_new_packet(PacketId, Fun, Default) ->
|
|
||||||
case ets:member(?PACKET_TAB, ?PACKET_KEY(PacketId)) of
|
|
||||||
true -> Default;
|
|
||||||
false -> Fun()
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec take_by_mref(mon_ref()) -> {ok, packet_id() | undefined, timer_ref(), term()} | not_found.
|
|
||||||
take_by_mref(MRef) ->
|
|
||||||
case ets:take(?MON_TAB, ?MON_KEY(MRef)) of
|
|
||||||
[?MON_RECORD(_, PacketId, TRef, Data)] ->
|
|
||||||
PacketId =/= undefined andalso ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)),
|
|
||||||
{ok, PacketId, TRef, Data};
|
|
||||||
[] ->
|
|
||||||
not_found
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec deregister_all(channel_pid()) -> ok.
|
|
||||||
deregister_all(ChannelPid) ->
|
|
||||||
ok = deregister_packets(ChannelPid),
|
|
||||||
ok = deregister_mons(ChannelPid),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
-spec info() -> {non_neg_integer(), non_neg_integer()}.
|
|
||||||
info() ->
|
|
||||||
{ets:info(?MON_TAB, size), ets:info(?PACKET_TAB, size)}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal
|
|
||||||
%%-------------------------------------------------------------------
|
|
||||||
|
|
||||||
deregister_packets(ChannelPid) when is_pid(ChannelPid) ->
|
|
||||||
MS = [{?PACKET_RECORD(?PACKET_KEY(ChannelPid, '_'), '_', '_'), [], [true]}],
|
|
||||||
_ = ets:select_delete(?PACKET_TAB, MS),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
deregister_mons(ChannelPid) ->
|
|
||||||
MS = [{?MON_RECORD(?MON_KEY(ChannelPid, '_'), '_', '_', '_'), [], [true]}],
|
|
||||||
_ = ets:select_delete(?MON_TAB, MS),
|
|
||||||
ok.
|
|
|
@ -0,0 +1,116 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_ft_responder).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx/include/types.hrl").
|
||||||
|
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start/3]).
|
||||||
|
-export([kickoff/2]).
|
||||||
|
-export([ack/2]).
|
||||||
|
|
||||||
|
%% Supervisor API
|
||||||
|
-export([start_link/3]).
|
||||||
|
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||||
|
|
||||||
|
-define(REF(Key), {via, gproc, {n, l, {?MODULE, Key}}}).
|
||||||
|
|
||||||
|
-type key() :: term().
|
||||||
|
-type respfun() :: fun(({ack, _Result} | {down, _Result} | timeout) -> _SideEffect).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec start(key(), respfun(), timeout()) -> startlink_ret().
|
||||||
|
start(Key, RespFun, Timeout) ->
|
||||||
|
emqx_ft_responder_sup:start_child(Key, RespFun, Timeout).
|
||||||
|
|
||||||
|
-spec kickoff(key(), pid()) -> ok.
|
||||||
|
kickoff(Key, Pid) ->
|
||||||
|
gen_server:call(?REF(Key), {kickoff, Pid}).
|
||||||
|
|
||||||
|
-spec ack(key(), _Result) -> _Return.
|
||||||
|
ack(Key, Result) ->
|
||||||
|
% TODO: it's possible to avoid term copy
|
||||||
|
gen_server:call(?REF(Key), {ack, Result}, infinity).
|
||||||
|
|
||||||
|
-spec start_link(key(), timeout(), respfun()) -> startlink_ret().
|
||||||
|
start_link(Key, RespFun, Timeout) ->
|
||||||
|
gen_server:start_link(?REF(Key), ?MODULE, {Key, RespFun, Timeout}, []).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% gen_server callbacks
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
init({Key, RespFun, Timeout}) ->
|
||||||
|
_ = erlang:process_flag(trap_exit, true),
|
||||||
|
_TRef = erlang:send_after(Timeout, self(), timeout),
|
||||||
|
{ok, {Key, RespFun}}.
|
||||||
|
|
||||||
|
handle_call({kickoff, Pid}, _From, St) ->
|
||||||
|
% TODO: more state?
|
||||||
|
_MRef = erlang:monitor(process, Pid),
|
||||||
|
_ = Pid ! kickoff,
|
||||||
|
{reply, ok, St};
|
||||||
|
handle_call({ack, Result}, _From, {Key, RespFun}) ->
|
||||||
|
Ret = apply(RespFun, [{ack, Result}]),
|
||||||
|
?tp(debug, ft_responder_ack, #{key => Key, result => Result, return => Ret}),
|
||||||
|
{stop, {shutdown, Ret}, Ret, undefined};
|
||||||
|
handle_call(Msg, _From, State) ->
|
||||||
|
?SLOG(warning, #{msg => "unknown_call", call_msg => Msg}),
|
||||||
|
{reply, {error, unknown_call}, State}.
|
||||||
|
|
||||||
|
handle_cast(Msg, State) ->
|
||||||
|
?SLOG(warning, #{msg => "unknown_cast", cast_msg => Msg}),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info(timeout, {Key, RespFun}) ->
|
||||||
|
Ret = apply(RespFun, [timeout]),
|
||||||
|
?tp(debug, ft_responder_timeout, #{key => Key, return => Ret}),
|
||||||
|
{stop, {shutdown, Ret}, undefined};
|
||||||
|
handle_info({'DOWN', _MRef, process, _Pid, Reason}, {Key, RespFun}) ->
|
||||||
|
Ret = apply(RespFun, [{down, map_down_reason(Reason)}]),
|
||||||
|
?tp(debug, ft_responder_procdown, #{key => Key, reason => Reason, return => Ret}),
|
||||||
|
{stop, {shutdown, Ret}, undefined};
|
||||||
|
handle_info(Msg, State) ->
|
||||||
|
?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, undefined) ->
|
||||||
|
ok;
|
||||||
|
terminate(Reason, {Key, RespFun}) ->
|
||||||
|
Ret = apply(RespFun, [timeout]),
|
||||||
|
?tp(debug, ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
map_down_reason(normal) ->
|
||||||
|
ok;
|
||||||
|
map_down_reason(shutdown) ->
|
||||||
|
ok;
|
||||||
|
map_down_reason({shutdown, Result}) ->
|
||||||
|
Result;
|
||||||
|
map_down_reason(noproc) ->
|
||||||
|
{error, noproc};
|
||||||
|
map_down_reason(Error) ->
|
||||||
|
{error, {internal_error, Error}}.
|
|
@ -0,0 +1,48 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_ft_responder_sup).
|
||||||
|
|
||||||
|
-export([start_link/0]).
|
||||||
|
-export([start_child/3]).
|
||||||
|
|
||||||
|
-behaviour(supervisor).
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
-define(SUPERVISOR, ?MODULE).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-spec start_link() -> {ok, pid()}.
|
||||||
|
start_link() ->
|
||||||
|
supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
|
||||||
|
|
||||||
|
start_child(Key, RespFun, Timeout) ->
|
||||||
|
supervisor:start_child(?SUPERVISOR, [Key, RespFun, Timeout]).
|
||||||
|
|
||||||
|
-spec init(_) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
|
||||||
|
init(_) ->
|
||||||
|
Flags = #{
|
||||||
|
strategy => simple_one_for_one,
|
||||||
|
intensity => 100,
|
||||||
|
period => 100
|
||||||
|
},
|
||||||
|
ChildSpec = #{
|
||||||
|
id => responder,
|
||||||
|
start => {emqx_ft_responder, start_link, []},
|
||||||
|
restart => temporary
|
||||||
|
},
|
||||||
|
{ok, {Flags, [ChildSpec]}}.
|
|
@ -28,8 +28,6 @@ start_link() ->
|
||||||
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
|
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
ok = emqx_ft_async_reply:create_tables(),
|
|
||||||
|
|
||||||
SupFlags = #{
|
SupFlags = #{
|
||||||
strategy => one_for_one,
|
strategy => one_for_one,
|
||||||
intensity => 100,
|
intensity => 100,
|
||||||
|
@ -54,5 +52,14 @@ init([]) ->
|
||||||
modules => [emqx_ft_storage_fs_reader_sup]
|
modules => [emqx_ft_storage_fs_reader_sup]
|
||||||
},
|
},
|
||||||
|
|
||||||
ChildSpecs = [AssemblerSup, FileReaderSup],
|
ResponderSup = #{
|
||||||
|
id => emqx_ft_responder_sup,
|
||||||
|
start => {emqx_ft_responder_sup, start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => infinity,
|
||||||
|
type => supervisor,
|
||||||
|
modules => [emqx_ft_responder_sup]
|
||||||
|
},
|
||||||
|
|
||||||
|
ChildSpecs = [ResponderSup, AssemblerSup, FileReaderSup],
|
||||||
{ok, {SupFlags, ChildSpecs}}.
|
{ok, {SupFlags, ChildSpecs}}.
|
||||||
|
|
|
@ -55,7 +55,8 @@ groups() ->
|
||||||
t_assemble_timeout
|
t_assemble_timeout
|
||||||
]},
|
]},
|
||||||
{async_mode, [], [
|
{async_mode, [], [
|
||||||
{group, single_node}
|
{group, single_node},
|
||||||
|
t_client_disconnect_while_assembling
|
||||||
]},
|
]},
|
||||||
{sync_mode, [], [
|
{sync_mode, [], [
|
||||||
{group, single_node}
|
{group, single_node}
|
||||||
|
@ -116,9 +117,9 @@ init_per_group(Group = cluster, Config) ->
|
||||||
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
|
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
|
||||||
[{group, Group}, {cluster_nodes, Nodes} | Config];
|
[{group, Group}, {cluster_nodes, Nodes} | Config];
|
||||||
init_per_group(_Group = async_mode, Config) ->
|
init_per_group(_Group = async_mode, Config) ->
|
||||||
[{mode, sync} | Config];
|
|
||||||
init_per_group(_Group = sync_mode, Config) ->
|
|
||||||
[{mode, async} | Config];
|
[{mode, async} | Config];
|
||||||
|
init_per_group(_Group = sync_mode, Config) ->
|
||||||
|
[{mode, sync} | Config];
|
||||||
init_per_group(Group, Config) ->
|
init_per_group(Group, Config) ->
|
||||||
[{group, Group} | Config].
|
[{group, Group} | Config].
|
||||||
|
|
||||||
|
@ -529,6 +530,62 @@ t_assemble_timeout(Config) ->
|
||||||
|
|
||||||
?assert(2_000_000 < Time).
|
?assert(2_000_000 < Time).
|
||||||
|
|
||||||
|
t_client_disconnect_while_assembling(Config) ->
|
||||||
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
|
||||||
|
Filename = "topsecret.pdf",
|
||||||
|
FileId = <<"f1">>,
|
||||||
|
|
||||||
|
Data = <<"data">>,
|
||||||
|
|
||||||
|
Meta = #{size := Filesize} = meta(Filename, Data),
|
||||||
|
|
||||||
|
?assertRCName(
|
||||||
|
success,
|
||||||
|
emqtt:publish(Client, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertRCName(
|
||||||
|
success,
|
||||||
|
emqtt:publish(Client, mk_segment_topic(Config, FileId, 0), Data, 1)
|
||||||
|
),
|
||||||
|
|
||||||
|
ResponseTopic = emqx_ft_test_helpers:response_topic(ClientId),
|
||||||
|
|
||||||
|
{ok, OtherClient} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"other">>}]),
|
||||||
|
{ok, _} = emqtt:connect(OtherClient),
|
||||||
|
{ok, _, _} = emqtt:subscribe(OtherClient, ResponseTopic, 1),
|
||||||
|
|
||||||
|
FinTopic = mk_fin_topic(Config, FileId, Filesize),
|
||||||
|
|
||||||
|
?assertRCName(
|
||||||
|
success,
|
||||||
|
emqtt:publish(Client, FinTopic, <<>>, 1)
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = emqtt:stop(Client),
|
||||||
|
|
||||||
|
ResultReceive = fun Receive() ->
|
||||||
|
receive
|
||||||
|
{publish, #{payload := Payload, topic := ResponseTopic}} ->
|
||||||
|
case emqx_utils_json:decode(Payload) of
|
||||||
|
#{<<"topic">> := FinTopic, <<"reason_code">> := 0} ->
|
||||||
|
ok;
|
||||||
|
#{<<"topic">> := FinTopic, <<"reason_code">> := _} = FTResult ->
|
||||||
|
ct:fail("unexpected fin result: ~p", [FTResult]);
|
||||||
|
_ ->
|
||||||
|
Receive()
|
||||||
|
end
|
||||||
|
after 1000 ->
|
||||||
|
ct:fail("timeout waiting for fin result")
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
ResultReceive(),
|
||||||
|
|
||||||
|
ok = emqtt:stop(OtherClient).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Cluster tests
|
%% Cluster tests
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -1,247 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_ft_async_reply_SUITE).
|
|
||||||
|
|
||||||
-compile(export_all).
|
|
||||||
-compile(nowarn_export_all).
|
|
||||||
|
|
||||||
-include_lib("common_test/include/ct.hrl").
|
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
|
||||||
-include_lib("emqx/include/asserts.hrl").
|
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
|
||||||
Apps = emqx_cth_suite:start(
|
|
||||||
[
|
|
||||||
{emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
|
|
||||||
{emqx_ft, "file_transfer { enable = true, assemble_timeout = 1s }"}
|
|
||||||
],
|
|
||||||
#{work_dir => ?config(priv_dir, Config)}
|
|
||||||
),
|
|
||||||
[{suite_apps, Apps} | Config].
|
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
|
||||||
ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
init_per_testcase(_Case, Config) ->
|
|
||||||
ok = snabbkaffe:start_trace(),
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_testcase(_Case, _Config) ->
|
|
||||||
ok = snabbkaffe:stop(),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Tests
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
t_register(_Config) ->
|
|
||||||
PacketId = 1,
|
|
||||||
MRef = make_ref(),
|
|
||||||
TRef = make_ref(),
|
|
||||||
ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata),
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
undefined,
|
|
||||||
emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
|
|
||||||
),
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
ok,
|
|
||||||
emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined)
|
|
||||||
),
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
{ok, PacketId, TRef, somedata},
|
|
||||||
emqx_ft_async_reply:take_by_mref(MRef)
|
|
||||||
).
|
|
||||||
|
|
||||||
t_process_independence(_Config) ->
|
|
||||||
PacketId = 1,
|
|
||||||
MRef = make_ref(),
|
|
||||||
TRef = make_ref(),
|
|
||||||
ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata),
|
|
||||||
|
|
||||||
Self = self(),
|
|
||||||
|
|
||||||
spawn_link(fun() ->
|
|
||||||
Self ! emqx_ft_async_reply:take_by_mref(MRef)
|
|
||||||
end),
|
|
||||||
|
|
||||||
Res1 =
|
|
||||||
receive
|
|
||||||
Msg1 -> Msg1
|
|
||||||
end,
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
not_found,
|
|
||||||
Res1
|
|
||||||
),
|
|
||||||
|
|
||||||
spawn_link(fun() ->
|
|
||||||
Self ! emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
|
|
||||||
end),
|
|
||||||
|
|
||||||
Res2 =
|
|
||||||
receive
|
|
||||||
Msg2 -> Msg2
|
|
||||||
end,
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
ok,
|
|
||||||
Res2
|
|
||||||
).
|
|
||||||
|
|
||||||
t_take(_Config) ->
|
|
||||||
PacketId = 1,
|
|
||||||
MRef = make_ref(),
|
|
||||||
TRef = make_ref(),
|
|
||||||
ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata),
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
{ok, PacketId, TRef, somedata},
|
|
||||||
emqx_ft_async_reply:take_by_mref(MRef)
|
|
||||||
),
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
not_found,
|
|
||||||
emqx_ft_async_reply:take_by_mref(MRef)
|
|
||||||
),
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
ok,
|
|
||||||
emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined)
|
|
||||||
).
|
|
||||||
|
|
||||||
t_cleanup(_Config) ->
|
|
||||||
PacketId = 1,
|
|
||||||
MRef0 = make_ref(),
|
|
||||||
TRef0 = make_ref(),
|
|
||||||
MRef1 = make_ref(),
|
|
||||||
TRef1 = make_ref(),
|
|
||||||
ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0, somedata0),
|
|
||||||
|
|
||||||
Self = self(),
|
|
||||||
|
|
||||||
Pid = spawn_link(fun() ->
|
|
||||||
ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1, somedata1),
|
|
||||||
receive
|
|
||||||
kickoff ->
|
|
||||||
?assertEqual(
|
|
||||||
undefined,
|
|
||||||
emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
|
|
||||||
),
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
{ok, PacketId, TRef1, somedata1},
|
|
||||||
emqx_ft_async_reply:take_by_mref(MRef1)
|
|
||||||
),
|
|
||||||
|
|
||||||
Self ! done
|
|
||||||
end
|
|
||||||
end),
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
undefined,
|
|
||||||
emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
|
|
||||||
),
|
|
||||||
|
|
||||||
ok = emqx_ft_async_reply:deregister_all(Self),
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
ok,
|
|
||||||
emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
|
|
||||||
),
|
|
||||||
|
|
||||||
Pid ! kickoff,
|
|
||||||
|
|
||||||
receive
|
|
||||||
done -> ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
t_reply_by_tiemout(_Config) ->
|
|
||||||
process_flag(trap_exit, true),
|
|
||||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
C = emqx_ft_test_helpers:start_client(ClientId, node()),
|
|
||||||
|
|
||||||
SleepForever = fun() ->
|
|
||||||
Ref = make_ref(),
|
|
||||||
receive
|
|
||||||
Ref -> ok
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
|
|
||||||
ok = meck:new(emqx_ft_storage, [passthrough]),
|
|
||||||
meck:expect(emqx_ft_storage, assemble, fun(_, _, _) -> {async, spawn_link(SleepForever)} end),
|
|
||||||
|
|
||||||
FinTopic = <<"$file/fakeid/fin/999999">>,
|
|
||||||
|
|
||||||
?assertMatch(
|
|
||||||
{ok, #{reason_code_name := unspecified_error}},
|
|
||||||
emqtt:publish(C, FinTopic, <<>>, 1)
|
|
||||||
),
|
|
||||||
|
|
||||||
meck:unload(emqx_ft_storage),
|
|
||||||
emqtt:stop(C).
|
|
||||||
|
|
||||||
t_cleanup_by_cm(_Config) ->
|
|
||||||
process_flag(trap_exit, true),
|
|
||||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
C = emqx_ft_test_helpers:start_client(ClientId, node()),
|
|
||||||
|
|
||||||
ok = meck:new(emqx_ft_storage, [passthrough]),
|
|
||||||
meck:expect(emqx_ft_storage, kickoff, fun(_) -> meck:exception(error, oops) end),
|
|
||||||
|
|
||||||
FinTopic = <<"$file/fakeid/fin/999999">>,
|
|
||||||
|
|
||||||
[ClientPid] = emqx_cm:lookup_channels(ClientId),
|
|
||||||
|
|
||||||
?assertWaitEvent(
|
|
||||||
begin
|
|
||||||
emqtt:publish(C, FinTopic, <<>>, 1),
|
|
||||||
exit(ClientPid, kill)
|
|
||||||
end,
|
|
||||||
#{?snk_kind := emqx_cm_clean_down, client_id := ClientId},
|
|
||||||
1000
|
|
||||||
),
|
|
||||||
|
|
||||||
?assertEqual(
|
|
||||||
{0, 0},
|
|
||||||
emqx_ft_async_reply:info()
|
|
||||||
),
|
|
||||||
|
|
||||||
meck:unload(emqx_ft_storage).
|
|
||||||
|
|
||||||
t_unrelated_events(_Config) ->
|
|
||||||
process_flag(trap_exit, true),
|
|
||||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
C = emqx_ft_test_helpers:start_client(ClientId, node()),
|
|
||||||
[ClientPid] = emqx_cm:lookup_channels(ClientId),
|
|
||||||
|
|
||||||
erlang:monitor(process, ClientPid),
|
|
||||||
|
|
||||||
ClientPid ! {'DOWN', make_ref(), process, self(), normal},
|
|
||||||
ClientPid ! {timeout, make_ref(), unknown_timer_event},
|
|
||||||
|
|
||||||
?assertNotReceive(
|
|
||||||
{'DOWN', _Ref, process, ClientPid, _Reason},
|
|
||||||
500
|
|
||||||
),
|
|
||||||
|
|
||||||
emqtt:stop(C).
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_ft_responder_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
-include_lib("emqx/include/asserts.hrl").
|
||||||
|
|
||||||
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
Apps = emqx_cth_suite:start(
|
||||||
|
[
|
||||||
|
{emqx_ft, "file_transfer {enable = true}"}
|
||||||
|
],
|
||||||
|
#{work_dir => ?config(priv_dir, Config)}
|
||||||
|
),
|
||||||
|
[{suite_apps, Apps} | Config].
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
init_per_testcase(_Case, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_Case, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_start_ack(_Config) ->
|
||||||
|
Key = <<"test">>,
|
||||||
|
DefaultAction = fun({ack, Ref}) -> Ref end,
|
||||||
|
{ok, ResponderPid} = emqx_ft_responder:start(Key, DefaultAction, 1000),
|
||||||
|
erlang:monitor(process, ResponderPid),
|
||||||
|
?assertMatch(
|
||||||
|
{error, {already_started, _Pid}},
|
||||||
|
emqx_ft_responder:start(Key, DefaultAction, 1000)
|
||||||
|
),
|
||||||
|
Ref = make_ref(),
|
||||||
|
?assertEqual(
|
||||||
|
Ref,
|
||||||
|
emqx_ft_responder:ack(Key, Ref)
|
||||||
|
),
|
||||||
|
?assertExit(
|
||||||
|
{noproc, _},
|
||||||
|
emqx_ft_responder:ack(Key, Ref)
|
||||||
|
),
|
||||||
|
?assertReceive(
|
||||||
|
{'DOWN', _, process, ResponderPid, {shutdown, _}},
|
||||||
|
1000
|
||||||
|
).
|
||||||
|
|
||||||
|
t_timeout(_Config) ->
|
||||||
|
Key = <<"test">>,
|
||||||
|
Self = self(),
|
||||||
|
DefaultAction = fun(timeout) -> Self ! {timeout, Key} end,
|
||||||
|
{ok, _Pid} = emqx_ft_responder:start(Key, DefaultAction, 20),
|
||||||
|
receive
|
||||||
|
{timeout, Key} ->
|
||||||
|
ok
|
||||||
|
after 100 ->
|
||||||
|
ct:fail("emqx_ft_responder not called")
|
||||||
|
end,
|
||||||
|
?assertExit(
|
||||||
|
{noproc, _},
|
||||||
|
emqx_ft_responder:ack(Key, oops)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_unknown_msgs(_Config) ->
|
||||||
|
{ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_) -> ok end, 100),
|
||||||
|
Pid ! {unknown_msg, <<"test">>},
|
||||||
|
ok = gen_server:cast(Pid, {unknown_msg, <<"test">>}),
|
||||||
|
?assertEqual(
|
||||||
|
{error, unknown_call},
|
||||||
|
gen_server:call(Pid, {unknown_call, <<"test">>})
|
||||||
|
).
|
|
@ -0,0 +1,3 @@
|
||||||
|
Fixed file transfer command result reporting to the `$file-response/${clientid}` channel.
|
||||||
|
|
||||||
|
Previously, if a channel issued an `assemble` command but disconnected before the assemble was completed, the status message was lost (not sent to the response topic).
|
Loading…
Reference in New Issue