diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 724139142..766acd4b8 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -172,7 +172,6 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}), ok = emqx_cm_registry:register_channel(Chan), mark_channel_connected(ChanPid), - ok = emqx_hooks:run('channel.registered', [ConnMod, ChanPid]), cast({registered, Chan}). %% @doc Unregister a channel. @@ -282,7 +281,7 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> {ok, #{session => Session1, present => false}} end, emqx_cm_locker:trans(ClientId, CleanStart); -open_session(false, ClientInfo = #{clientid := ClientId}, #{conn_mod := NewConnMod} = ConnInfo) -> +open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), ResumeStart = fun(_) -> CreateSess = @@ -309,11 +308,10 @@ open_session(false, ClientInfo = #{clientid := ClientId}, #{conn_mod := NewConnM {living, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), case wrap_rpc(emqx_cm_proto_v2:takeover_finish(ConnMod, ChanPid)) of - {ok, Pendings, TakoverData} -> + {ok, Pendings} -> Session1 = emqx_persistent_session:persist( ClientInfo, ConnInfo, Session ), - ok = emqx_hooks:run('channel.takenover', [NewConnMod, Self, TakoverData]), {ok, #{ session => clean_session(Session1), present => true, @@ -399,18 +397,11 @@ takeover_session(ClientId) -> end. takeover_finish(ConnMod, ChanPid) -> - TakoverData = emqx_hooks:run_fold('channel.takeover', [ConnMod, ChanPid], #{}), - case - %% node-local call - request_stepdown( - {takeover, 'end'}, - ConnMod, - ChanPid - ) - of - {ok, Pendings} -> {ok, Pendings, TakoverData}; - {error, _} = Error -> Error - end. + request_stepdown( + {takeover, 'end'}, + ConnMod, + ChanPid + ). takeover_session(ClientId, Pid) -> try diff --git a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf index 85fb81cfe..481ad8154 100644 --- a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf +++ b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf @@ -2,13 +2,13 @@ emqx_ft_schema { local { desc { - en: "Use local file system to store uploaded files and temporary data." - zh: "使用本地文件系统来存储上传的文件和临时数据。" + en: "Use local file system to store uploaded files and temporary data." + zh: "使用本地文件系统来存储上传的文件和临时数据。" } label: { - en: "Local Storage" - zh: "本地存储" - } + en: "Local Storage" + zh: "本地存储" + } } } diff --git a/apps/emqx_ft/include/emqx_ft.hrl b/apps/emqx_ft/include/emqx_ft.hrl index 2cbd24fb4..e46d79490 100644 --- a/apps/emqx_ft/include/emqx_ft.hrl +++ b/apps/emqx_ft/include/emqx_ft.hrl @@ -13,5 +13,3 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- - --define(FT_TAB, emqx_ft). diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index a286a6186..be2f80831 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -16,29 +16,21 @@ -module(emqx_ft). --include("emqx_ft.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -export([ - create_tab/0, hook/0, unhook/0 ]). -export([ - on_channel_unregistered/1, - on_channel_takeover/3, - on_channel_takenover/3, on_message_publish/1, on_message_puback/4 ]). -%% For Debug --export([transfer/2, storage/0]). - -export([on_assemble_timeout/1]). -export_type([ @@ -79,43 +71,17 @@ -type segment() :: {offset(), _Content :: binary()}. --type ft_data() :: #{ - nodes := list(node()) -}. - --record(emqx_ft, { - chan_pid :: pid(), - ft_data :: ft_data() -}). - -define(ASSEMBLE_TIMEOUT, 5000). %%-------------------------------------------------------------------- %% API for app %%-------------------------------------------------------------------- -create_tab() -> - _Tab = ets:new(?FT_TAB, [ - set, - public, - named_table, - {keypos, #emqx_ft.chan_pid} - ]), - ok. - hook() -> - ok = emqx_hooks:put('channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST), - ok = emqx_hooks:put('channel.takeover', {?MODULE, on_channel_takeover, []}, ?HP_LOWEST), - ok = emqx_hooks:put('channel.takenover', {?MODULE, on_channel_takenover, []}, ?HP_LOWEST), - ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST), ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST). unhook() -> - ok = emqx_hooks:del('channel.unregistered', {?MODULE, on_channel_unregistered}), - ok = emqx_hooks:del('channel.takeover', {?MODULE, on_channel_takeover}), - ok = emqx_hooks:del('channel.takenover', {?MODULE, on_channel_takenover}), - ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}). @@ -123,22 +89,6 @@ unhook() -> %% Hooks %%-------------------------------------------------------------------- -on_channel_unregistered(ChanPid) -> - ok = delete_ft_data(ChanPid). - -on_channel_takeover(_ConnMod, ChanPid, TakeoverData) -> - case get_ft_data(ChanPid) of - {ok, FTData} -> - {ok, TakeoverData#{ft_data => FTData}}; - none -> - ok - end. - -on_channel_takenover(_ConnMod, ChanPid, #{ft_data := FTData}) -> - ok = put_ft_data(ChanPid, FTData); -on_channel_takenover(_ConnMod, _ChanPid, _) -> - ok. - on_message_publish( Msg = #message{ id = _Id, @@ -190,8 +140,7 @@ on_init(Msg, FileId) -> % %% Add validations here Meta = emqx_json:decode(Payload, [return_maps]), case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of - {ok, Ctx} -> - ok = put_context(Ctx), + ok -> ?RC_SUCCESS; {error, _Reason} -> ?RC_UNSPECIFIED_ERROR @@ -213,9 +162,8 @@ on_segment(Msg, FileId, Offset, Checksum) -> Payload = Msg#message.payload, Segment = {binary_to_integer(Offset), Payload}, %% Add offset/checksum validations - case emqx_ft_storage:store_segment(get_context(), transfer(Msg, FileId), Segment) of - {ok, Ctx} -> - ok = put_context(Ctx), + case emqx_ft_storage:store_segment(transfer(Msg, FileId), Segment) of + ok -> ?RC_SUCCESS; {error, _Reason} -> ?RC_UNSPECIFIED_ERROR @@ -267,7 +215,6 @@ on_fin(PacketId, Msg, FileId, Checksum) -> assemble(Transfer, Callback) -> emqx_ft_storage:assemble( - get_context(), Transfer, Callback ). @@ -277,12 +224,12 @@ callback({ChanPid, PacketId} = Key, _FileId) -> case emqx_ft_responder:unregister(Key) of ok -> case Result of - {ok, _} -> + ok -> erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS}); {error, _} -> erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}) end; - {error, not_registered} -> + {error, not_found} -> ok end end. @@ -291,35 +238,6 @@ transfer(Msg, FileId) -> ClientId = Msg#message.from, {ClientId, FileId}. -%% TODO: configure - -storage() -> - emqx_config:get([file_transfer, storage]). - on_assemble_timeout({ChanPid, PacketId}) -> ?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}), erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}). - -%%-------------------------------------------------------------------- -%% Context management -%%-------------------------------------------------------------------- - -get_context() -> - get_ft_data(self()). - -put_context(Context) -> - put_ft_data(self(), Context). - -get_ft_data(ChanPid) -> - case ets:lookup(?FT_TAB, ChanPid) of - [#emqx_ft{ft_data = FTData}] -> {ok, FTData}; - [] -> none - end. - -delete_ft_data(ChanPid) -> - true = ets:delete(?FT_TAB, ChanPid), - ok. - -put_ft_data(ChanPid, FTData) -> - true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}), - ok. diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 5e945965d..8729a2ad4 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -19,12 +19,11 @@ -export( [ store_filemeta/2, - store_segment/3, - assemble/3 + store_segment/2, + assemble/2 ] ). --type ctx() :: term(). -type storage() :: emqx_config:config(). -export_type([assemble_callback/0]). @@ -32,14 +31,14 @@ -type assemble_callback() :: fun((ok | {error, term()}) -> any()). %%-------------------------------------------------------------------- -%% behaviour +%% Behaviour %%-------------------------------------------------------------------- -callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) -> - {ok, ctx()} | {error, term()}. --callback store_segment(storage(), ctx(), emqx_ft:transfer(), emqx_ft:segment()) -> - {ok, ctx()} | {error, term()}. --callback assemble(storage(), ctx(), emqx_ft:transfer(), assemble_callback()) -> + ok | {error, term()}. +-callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) -> + ok | {error, term()}. +-callback assemble(storage(), emqx_ft:transfer(), assemble_callback()) -> {ok, pid()} | {error, term()}. %%-------------------------------------------------------------------- @@ -47,28 +46,28 @@ %%-------------------------------------------------------------------- -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) -> - {ok, ctx()} | {error, term()}. + ok | {error, term()}. store_filemeta(Transfer, FileMeta) -> Mod = mod(), Mod:store_filemeta(storage(), Transfer, FileMeta). --spec store_segment(ctx(), emqx_ft:transfer(), emqx_ft:segment()) -> - {ok, ctx()} | {error, term()}. -store_segment(Ctx, Transfer, Segment) -> +-spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) -> + ok | {error, term()}. +store_segment(Transfer, Segment) -> Mod = mod(), - Mod:store_segment(storage(), Ctx, Transfer, Segment). + Mod:store_segment(storage(), Transfer, Segment). --spec assemble(ctx(), emqx_ft:transfer(), assemble_callback()) -> +-spec assemble(emqx_ft:transfer(), assemble_callback()) -> {ok, pid()} | {error, term()}. -assemble(Ctx, Transfer, Callback) -> +assemble(Transfer, Callback) -> Mod = mod(), - Mod:assemble(storage(), Ctx, Transfer, Callback). + Mod:assemble(storage(), Transfer, Callback). mod() -> case storage() of #{type := local} -> - % emqx_ft_storage_fs - emqx_ft_storage_dummy + emqx_ft_storage_fs + % emqx_ft_storage_dummy end. storage() -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_dummy.erl b/apps/emqx_ft/src/emqx_ft_storage_dummy.erl index 1ab7f558e..d486c0c29 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_dummy.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_dummy.erl @@ -20,16 +20,16 @@ -export([ store_filemeta/3, - store_segment/4, - assemble/4 + store_segment/3, + assemble/3 ]). store_filemeta(_Storage, _Transfer, _Meta) -> - {ok, #{}}. + ok. -store_segment(_Storage, Ctx, _Transfer, _Segment) -> - {ok, Ctx}. +store_segment(_Storage, _Transfer, _Segment) -> + ok. -assemble(_Storage, _Ctx, _Transfer, Callback) -> +assemble(_Storage, _Transfer, Callback) -> Pid = spawn(fun() -> Callback({error, not_implemented}) end), {ok, Pid}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index bbd61eba9..a530c9cf8 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -22,7 +22,7 @@ -behaviour(emqx_ft_storage). -export([store_filemeta/3]). --export([store_segment/4]). +-export([store_segment/3]). -export([list/2]). -export([read_segment/5]). -export([assemble/3]). @@ -84,8 +84,7 @@ store_filemeta(Storage, Transfer, Meta) -> case read_file(Filepath, fun decode_filemeta/1) of {ok, Meta} -> _ = touch_file(Filepath), - %% No context is needed for this implementation. - {ok, #{}}; + ok; {ok, _Conflict} -> % TODO % We won't see conflicts in case of concurrent `store_filemeta` @@ -98,18 +97,13 @@ store_filemeta(Storage, Transfer, Meta) -> %% Store a segment in the backing filesystem. %% Atomic operation. --spec store_segment(storage(), emqx_ft_storage:ctx(), transfer(), segment()) -> +-spec store_segment(storage(), transfer(), segment()) -> % Where is the checksum gets verified? Upper level probably. % Quota? Some lower level errors? ok | {error, _TODO}. -store_segment(Storage, Ctx, Transfer, Segment = {_Offset, Content}) -> +store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)), - case write_file_atomic(Filepath, Content) of - ok -> - {ok, Ctx}; - {error, _} = Error -> - Error - end. + write_file_atomic(Filepath, Content). -spec list(storage(), transfer()) -> % Some lower level errors? {error, notfound}? @@ -149,8 +143,9 @@ read_segment(_Storage, _Transfer, Segment, Offset, Size) -> end. -spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) -> + % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. {ok, _Assembler :: pid()} | {error, _TODO}. -assemble(Storage, _Ctx, Transfer, Callback) -> +assemble(Storage, Transfer, Callback) -> emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback). -type handle() :: {file:name(), io:device(), crypto:hash_state()}. @@ -321,8 +316,8 @@ mk_filedir(Storage, {ClientId, FileId}) -> mk_filepath(Storage, Transfer, Filename) -> filename:join(mk_filedir(Storage, Transfer), Filename). -get_storage_root(_Storage) -> - filename:join(emqx:data_dir(), "file_transfer"). +get_storage_root(Storage) -> + maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")). -include_lib("kernel/include/file.hrl"). diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl index ef2d8033f..b4ce52edb 100644 --- a/apps/emqx_ft/src/emqx_ft_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -27,17 +27,7 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). -%% sup_flags() = #{strategy => strategy(), % optional -%% intensity => non_neg_integer(), % optional -%% period => pos_integer()} % optional -%% child_spec() = #{id => child_id(), % mandatory -%% start => mfargs(), % mandatory -%% restart => restart(), % optional -%% shutdown => shutdown(), % optional -%% type => worker(), % optional -%% modules => modules()} % optional init([]) -> - ok = emqx_ft:create_tab(), SupFlags = #{ strategy => one_for_all, intensity => 100, @@ -64,5 +54,3 @@ init([]) -> ChildSpecs = [Responder, AssemblerSup], {ok, {SupFlags, ChildSpecs}}. - -%% internal functions diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index 1112ccbb7..9ec2fce61 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -54,7 +54,7 @@ end_per_testcase(_TC, Config) -> -define(CLIENTID, <<"thatsme">>). t_assemble_empty_transfer(Config) -> - Storage = ?config(storage_root, Config), + Storage = storage(Config), Transfer = {?CLIENTID, mk_fileid()}, Filename = "important.pdf", Meta = #{ @@ -84,7 +84,7 @@ t_assemble_empty_transfer(Config) -> ok. t_assemble_complete_local_transfer(Config) -> - Storage = ?config(storage_root, Config), + Storage = storage(Config), Transfer = {?CLIENTID, mk_fileid()}, Filename = "topsecret.pdf", TransferSize = 10000 + rand:uniform(50000), @@ -161,3 +161,8 @@ inspect_file(Filename) -> mk_fileid() -> integer_to_binary(erlang:system_time(millisecond)). + +storage(Config) -> + #{ + root => ?config(storage_root, Config) + }.