diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 009dc72ea..c67c02d66 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -744,9 +744,13 @@ do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) -> {ok, NChannel}; do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) -> PubRes = emqx_broker:publish(Msg), - RC = puback_reason_code(PubRes), - NChannel = ensure_quota(PubRes, Channel), - handle_out(puback, {PacketId, RC}, NChannel); + RC = puback_reason_code(PacketId, Msg, PubRes), + case RC of + undefined -> + {ok, Channel}; + _Value -> + do_finish_publish(PacketId, PubRes, RC, Channel) + end; do_publish( PacketId, Msg = #message{qos = ?QOS_2}, @@ -754,7 +758,7 @@ do_publish( ) -> case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of {ok, PubRes, NSession} -> - RC = puback_reason_code(PubRes), + RC = pubrec_reason_code(PubRes), NChannel0 = set_session(NSession, Channel), NChannel1 = ensure_timer(await_timer, NChannel0), NChannel2 = ensure_quota(PubRes, NChannel1), @@ -767,6 +771,10 @@ do_publish( handle_out(disconnect, RC, Channel) end. +do_finish_publish(PacketId, PubRes, RC, Channel) -> + NChannel = ensure_quota(PubRes, Channel), + handle_out(puback, {PacketId, RC}, NChannel). + ensure_quota(_, Channel = #channel{quota = undefined}) -> Channel; ensure_quota(PubRes, Channel = #channel{quota = Limiter}) -> @@ -786,9 +794,14 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) -> ensure_timer(quota_timer, Intv, Channel#channel{quota = NLimiter}) end. --compile({inline, [puback_reason_code/1]}). -puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; -puback_reason_code([_ | _]) -> ?RC_SUCCESS. +-compile({inline, [pubrec_reason_code/1]}). +pubrec_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; +pubrec_reason_code([_ | _]) -> ?RC_SUCCESS. + +puback_reason_code(PacketId, Msg, [] = PubRes) -> + emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS); +puback_reason_code(PacketId, Msg, [_ | _] = PubRes) -> + emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS). -compile({inline, [after_message_acked/3]}). after_message_acked(ClientInfo, Msg, PubAckProps) -> @@ -1283,6 +1296,8 @@ handle_info(die_if_test = Info, Channel) -> die_if_test_compiled(), ?SLOG(error, #{msg => "unexpected_info", info => Info}), {ok, Channel}; +handle_info({puback, PacketId, PubRes, RC}, Channel) -> + do_finish_publish(PacketId, PubRes, RC, Channel); handle_info(Info, Channel) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {ok, Channel}. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index d2ac642ca..724139142 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -313,7 +313,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, #{conn_mod := NewConnM Session1 = emqx_persistent_session:persist( ClientInfo, ConnInfo, Session ), - ok = emqx_hooks:run('channel.takeovered', [NewConnMod, Self, TakoverData]), + ok = emqx_hooks:run('channel.takenover', [NewConnMod, Self, TakoverData]), {ok, #{ session => clean_session(Session1), present => true, diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 6dd389350..633337f80 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -1133,7 +1133,7 @@ t_ws_cookie_init(_) -> ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). %%-------------------------------------------------------------------- -%% Test cases for other mechnisms +%% Test cases for other mechanisms %%-------------------------------------------------------------------- t_flapping_detect(_) -> diff --git a/apps/emqx/test/emqx_channel_delayed_puback_SUITE.erl b/apps/emqx/test/emqx_channel_delayed_puback_SUITE.erl new file mode 100644 index 000000000..4f2938b24 --- /dev/null +++ b/apps/emqx/test/emqx_channel_delayed_puback_SUITE.erl @@ -0,0 +1,70 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_channel_delayed_puback_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps([]). + +init_per_testcase(Case, Config) -> + ?MODULE:Case({init, Config}). + +end_per_testcase(Case, Config) -> + ?MODULE:Case({'end', Config}). + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_delayed_puback({init, Config}) -> + emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST), + Config; +t_delayed_puback({'end', _Config}) -> + emqx_hooks:del('message.puback', {?MODULE, on_message_puback}); +t_delayed_puback(_Config) -> + {ok, ConnPid} = emqtt:start_link([{clientid, <<"clientid">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(ConnPid), + {ok, #{reason_code := ?RC_UNSPECIFIED_ERROR}} = emqtt:publish( + ConnPid, <<"topic">>, <<"hello">>, 1 + ), + emqtt:disconnect(ConnPid). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +on_message_puback(PacketId, _Msg, PubRes, _RC) -> + erlang:send(self(), {puback, PacketId, PubRes, ?RC_UNSPECIFIED_ERROR}), + {stop, undefined}. diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 1f8a90a6f..c73d31559 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -17,6 +17,10 @@ -module(emqx_ft). -include("emqx_ft.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). -export([ create_tab/0, @@ -27,9 +31,14 @@ -export([ on_channel_unregistered/1, on_channel_takeover/3, - on_channel_takeovered/3 + on_channel_takenover/3, + on_message_publish/1, + on_message_puback/4 ]). +%% For Debug +-export([transfer/2, storage/0]). + -export_type([clientid/0]). -export_type([transfer/0]). -export_type([offset/0]). @@ -67,16 +76,20 @@ create_tab() -> ok. hook() -> - % ok = emqx_hooks:put('channel.registered', {?MODULE, on_channel_registered, []}), - ok = emqx_hooks:put('channel.unregistered', {?MODULE, on_channel_unregistered, []}), - ok = emqx_hooks:put('channel.takeover', {?MODULE, on_channel_takeover, []}), - ok = emqx_hooks:put('channel.takeovered', {?MODULE, on_channel_takeovered, []}). + ok = emqx_hooks:put('channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST), + ok = emqx_hooks:put('channel.takeover', {?MODULE, on_channel_takeover, []}, ?HP_LOWEST), + ok = emqx_hooks:put('channel.takenover', {?MODULE, on_channel_takenover, []}, ?HP_LOWEST), + + ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST), + ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST). unhook() -> - % ok = emqx_hooks:del('channel.registered', {?MODULE, on_channel_registered}), ok = emqx_hooks:del('channel.unregistered', {?MODULE, on_channel_unregistered}), ok = emqx_hooks:del('channel.takeover', {?MODULE, on_channel_takeover}), - ok = emqx_hooks:del('channel.takeovered', {?MODULE, on_channel_takeovered}). + ok = emqx_hooks:del('channel.takenover', {?MODULE, on_channel_takenover}), + + ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), + ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}). %%-------------------------------------------------------------------- %% Hooks @@ -93,11 +106,30 @@ on_channel_takeover(_ConnMod, ChanPid, TakeoverData) -> ok end. -on_channel_takeovered(_ConnMod, ChanPid, #{ft_data := FTData}) -> +on_channel_takenover(_ConnMod, ChanPid, #{ft_data := FTData}) -> ok = put_ft_data(ChanPid, FTData); -on_channel_takeovered(_ConnMod, _ChanPid, _) -> +on_channel_takenover(_ConnMod, _ChanPid, _) -> ok. +on_message_publish( + Msg = #message{ + id = _Id, + topic = <<"$file/", _/binary>> + } +) -> + Headers = Msg#message.headers, + {stop, Msg#message{headers = Headers#{allow_publish => false}}}; +on_message_publish(Msg) -> + {ok, Msg}. + +on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) -> + case Topic of + <<"$file/", FileCommand/binary>> -> + {stop, on_file_command(PacketId, Msg, FileCommand)}; + _ -> + ignore + end. + %%-------------------------------------------------------------------- %% Private funs %%-------------------------------------------------------------------- @@ -115,3 +147,92 @@ delete_ft_data(ChanPid) -> put_ft_data(ChanPid, FTData) -> true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}), ok. + +on_file_command(PacketId, Msg, FileCommand) -> + case string:split(FileCommand, <<"/">>, all) of + [FileId, <<"init">>] -> + on_init(Msg, FileId); + [FileId, <<"fin">>] -> + on_fin(PacketId, Msg, FileId, undefined); + [FileId, <<"fin">>, Checksum] -> + on_fin(PacketId, Msg, FileId, Checksum); + [FileId, <<"abort">>] -> + on_abort(Msg, FileId); + [FileId, Offset] -> + on_segment(Msg, FileId, Offset, undefined); + [FileId, Offset, Checksum] -> + on_segment(Msg, FileId, Offset, Checksum); + _ -> + ?RC_UNSPECIFIED_ERROR + end. + +on_init(Msg, FileId) -> + ?SLOG(info, #{ + msg => "on_init", + mqtt_msg => Msg, + file_id => FileId + }), + % Payload = Msg#message.payload, + % %% Add validations here + % Meta = emqx_json:decode(Payload, [return_maps]), + % ok = emqx_ft_storage_fs:store_filemeta(storage(), transfer(Msg, FileId), Meta), + % ?RC_SUCCESS. + ?RC_UNSPECIFIED_ERROR. + +on_abort(_Msg, _FileId) -> + %% TODO + ?RC_SUCCESS. + +on_segment(Msg, FileId, Offset, Checksum) -> + ?SLOG(info, #{ + msg => "on_segment", + mqtt_msg => Msg, + file_id => FileId, + offset => Offset, + checksum => Checksum + }), + % %% TODO: handle checksum + % Payload = Msg#message.payload, + % %% Add offset/checksum validations + % ok = emqx_ft_storage_fs:store_segment( + % storage(), + % transfer(Msg, FileId), + % {binary_to_integer(Offset), Payload} + % ), + % ?RC_SUCCESS. + ?RC_UNSPECIFIED_ERROR. + +on_fin(PacketId, Msg, FileId, Checksum) -> + ?SLOG(info, #{ + msg => "on_fin", + mqtt_msg => Msg, + file_id => FileId, + checksum => Checksum, + packet_id => PacketId + }), + % %% TODO: handle checksum? Do we need it? + % {ok, _} = emqx_ft_storage_fs:assemble( + % storage(), + % transfer(Msg, FileId), + % callback(FileId, Msg) + % ), + Callback = callback(FileId, PacketId), + spawn(fun() -> Callback({error, not_implemented}) end), + undefined. + +callback(_FileId, PacketId) -> + ChanPid = self(), + fun + (ok) -> + erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS}); + ({error, _}) -> + erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}) + end. + +transfer(Msg, FileId) -> + ClientId = Msg#message.from, + {ClientId, FileId}. + +%% TODO: configure +storage() -> + filename:join(emqx:data_dir(), "file_transfer"). diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 2bd01c8b0..dfbb2bd3e 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -16,7 +16,7 @@ -module(emqx_ft_assembler). --export([start_link/2]). +-export([start_link/3]). -behaviour(gen_statem). -export([callback_mode/0]). @@ -35,7 +35,8 @@ transfer :: emqx_ft:transfer(), assembly :: _TODO, file :: io:device(), - hash + hash, + callback :: fun((ok | {error, term()}) -> any()) }). -define(RPC_LIST_TIMEOUT, 1000). @@ -43,8 +44,8 @@ %% -start_link(Storage, Transfer) -> - gen_server:start_link(?MODULE, {Storage, Transfer}, []). +start_link(Storage, Transfer, Callback) -> + gen_server:start_link(?MODULE, {Storage, Transfer, Callback}, []). %% @@ -53,12 +54,13 @@ start_link(Storage, Transfer) -> callback_mode() -> handle_event_function. -init({Storage, Transfer}) -> +init({Storage, Transfer, Callback}) -> St = #st{ storage = Storage, transfer = Transfer, assembly = emqx_ft_assembly:new(), - hash = crypto:hash_init(sha256) + hash = crypto:hash_init(sha256), + callback = Callback }, {ok, list_local_fragments, St, ?internal([])}. @@ -91,7 +93,7 @@ handle_event({list_remote_fragments, Nodes}, internal, _, St) -> fun ({Node, {ok, {ok, Fragments}}}, Asm) -> emqx_ft_assembly:append(Asm, Node, Fragments); - ({Node, Result}, Asm) -> + ({_Node, _Result}, Asm) -> % TODO: log? Asm end, @@ -128,9 +130,11 @@ handle_event({assemble, [{Node, Segment} | Rest]}, internal, _, St = #st{}) -> end; handle_event({assemble, []}, internal, _, St = #st{}) -> {next_state, complete, St, ?internal([])}; -handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle}) -> +handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle, callback = Callback}) -> Filemeta = emqx_ft_assembly:filemeta(Asm), - ok = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle), + Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle), + %% TODO: safe apply + _ = Callback(Result), {stop, shutdown}. % handle_continue(list_local, St = #st{storage = Storage, transfer = Transfer, assembly = Asm}) -> diff --git a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl index fbf948fbb..e837d96f1 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl @@ -16,24 +16,22 @@ -module(emqx_ft_assembler_sup). --export([start_link/1]). +-export([start_link/0]). -export([start_child/3]). -behaviour(supervisor). -export([init/1]). --define(REF(ID), {via, gproc, {n, l, {?MODULE, ID}}}). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). -start_link(ID) -> - supervisor:start_link(?REF(ID), ?MODULE, []). - -start_child(ID, Storage, Transfer) -> +start_child(Storage, Transfer, Callback) -> Childspec = #{ id => {Storage, Transfer}, - start => {emqx_ft_assembler, start_link, [Storage, Transfer]}, + start => {emqx_ft_assembler, start_link, [Storage, Transfer, Callback]}, restart => transient }, - supervisor:start_child(?REF(ID), Childspec). + supervisor:start_child(?MODULE, Childspec). init(_) -> SupFlags = #{ @@ -41,4 +39,4 @@ init(_) -> intensity => 100, period => 1000 }, - {ok, SupFlags, []}. + {ok, {SupFlags, []}}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 2d50e1b18..15efa142b 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -24,7 +24,7 @@ -export([store_filemeta/3]). -export([store_segment/3]). -export([list/2]). --export([assemble/2]). +-export([assemble/3]). -export([open_file/3]). -export([complete/4]). @@ -101,12 +101,12 @@ %% --define(PROCREF(Root), {via, gproc, {n, l, {?MODULE, Root}}}). +% -define(PROCREF(Root), {via, gproc, {n, l, {?MODULE, Root}}}). --spec start_link(root()) -> - {ok, pid()} | {error, already_started}. -start_link(Root) -> - gen_server:start_link(?PROCREF(Root), ?MODULE, [], []). +% -spec start_link(root()) -> +% {ok, pid()} | {error, already_started}. +% start_link(Root) -> +% gen_server:start_link(?PROCREF(Root), ?MODULE, [], []). %% Store manifest in the backing filesystem. %% Atomic operation. @@ -119,13 +119,13 @@ store_filemeta(Storage, Transfer, Meta) -> {ok, Meta} -> _ = touch_file(Filepath), ok; - {ok, Conflict} -> + {ok, _Conflict} -> % TODO % We won't see conflicts in case of concurrent `store_filemeta` % requests. It's rather odd scenario so it's fine not to worry % about it too much now. {error, conflict}; - {error, Reason} when Reason =:= notfound; Reason =:= corrupted -> + {error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent -> write_file_atomic(Filepath, encode_filemeta(Meta)) end. @@ -154,15 +154,15 @@ list(Storage, Transfer) -> Error end. --spec assemble(storage(), transfer()) -> +-spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) -> % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. {ok, _Assembler :: pid()} | {error, _TODO}. -assemble(Storage, Transfer) -> - emqx_ft_assembler_sup:start_child(Storage, Storage, Transfer). +assemble(Storage, Transfer, Callback) -> + emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback). %% --opaque handle() :: {file:name(), io:device(), crypto:hash_state()}. +-type handle() :: {file:name(), io:device(), crypto:hash_state()}. -spec open_file(storage(), transfer(), filemeta()) -> {ok, handle()} | {error, _TODO}. @@ -229,12 +229,12 @@ verify_checksum(undefined, _) -> %% --spec init(root()) -> {ok, storage()}. -init(Root) -> - % TODO: garbage_collect(...) - {ok, Root}. +% -spec init(root()) -> {ok, storage()}. +% init(Root) -> +% % TODO: garbage_collect(...) +% {ok, Root}. -%% +% %% -define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]). @@ -243,7 +243,7 @@ schema() -> roots => [ {name, hoconsc:mk(string(), #{required => true})}, {size, hoconsc:mk(non_neg_integer())}, - {expire_at, hoconsc:mk(non_neg_integer(), #{required => true})}, + {expire_at, hoconsc:mk(non_neg_integer())}, {checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})}, {segments_ttl, hoconsc:mk(pos_integer())}, {user_data, hoconsc:mk(json_value())} @@ -353,7 +353,7 @@ safe_decode(Content, DecodeFun) -> try {ok, DecodeFun(Content)} catch - C:R:Stacktrace -> + _C:_R:_Stacktrace -> % TODO: Log? {error, corrupted} end. @@ -414,7 +414,7 @@ mk_filefrag(Dirname, Filename, Fun) -> timestamp => Fileinfo#file_info.mtime, fragment => Frag }}; - {error, Reason} -> + {error, _Reason} -> false end. diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl index 4c976246b..fb7a6104f 100644 --- a/apps/emqx_ft/src/emqx_ft_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -43,7 +43,17 @@ init([]) -> intensity => 100, period => 10 }, - ChildSpecs = [], + + AssemblerSup = #{ + id => emqx_ft_assembler_sup, + start => {emqx_ft_assembler_sup, start_link, []}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [emqx_ft_assembler_sup] + }, + + ChildSpecs = [AssemblerSup], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/rebar.config.erl b/rebar.config.erl index 98cd30570..ea0016ca9 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -400,7 +400,8 @@ relx_apps(ReleaseType, Edition) -> emqx_prometheus, emqx_psk, emqx_slow_subs, - emqx_plugins + emqx_plugins, + emqx_ft ] ++ [quicer || is_quicer_supported()] ++ [bcrypt || provide_bcrypt_release(ReleaseType)] ++