From aaaef30be66ab7291732f3fb634afdd0210b983b Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 6 Jan 2023 23:53:25 +0300 Subject: [PATCH] feat(ft): add file transfer app and bootstrap replicated ft data structure --- apps/emqx/src/emqx_channel.erl | 7 +++ apps/emqx_ft/README.md | 9 +++ apps/emqx_ft/include/emqx_ft.hrl | 17 +++++ apps/emqx_ft/rebar.config | 11 ++++ apps/emqx_ft/src/emqx_ft.app.src | 12 ++++ apps/emqx_ft/src/emqx_ft.erl | 103 +++++++++++++++++++++++++++++++ apps/emqx_ft/src/emqx_ft_app.erl | 30 +++++++++ apps/emqx_ft/src/emqx_ft_sup.erl | 49 +++++++++++++++ 8 files changed, 238 insertions(+) create mode 100644 apps/emqx_ft/README.md create mode 100644 apps/emqx_ft/include/emqx_ft.hrl create mode 100644 apps/emqx_ft/rebar.config create mode 100644 apps/emqx_ft/src/emqx_ft.app.src create mode 100644 apps/emqx_ft/src/emqx_ft.erl create mode 100644 apps/emqx_ft/src/emqx_ft_app.erl create mode 100644 apps/emqx_ft/src/emqx_ft_sup.erl diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 29a59e482..009dc72ea 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -641,6 +641,13 @@ process_connect( %%-------------------------------------------------------------------- process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> + ?SLOG( + warning, + #{ + packet => Packet, + packet_id => PacketId + } + ), case pipeline( [ diff --git a/apps/emqx_ft/README.md b/apps/emqx_ft/README.md new file mode 100644 index 000000000..c483b3169 --- /dev/null +++ b/apps/emqx_ft/README.md @@ -0,0 +1,9 @@ +emqx_ft +===== + +EMQX file transfer over MQTT + +Build +----- + + $ rebar3 compile diff --git a/apps/emqx_ft/include/emqx_ft.hrl b/apps/emqx_ft/include/emqx_ft.hrl new file mode 100644 index 000000000..2cbd24fb4 --- /dev/null +++ b/apps/emqx_ft/include/emqx_ft.hrl @@ -0,0 +1,17 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-define(FT_TAB, emqx_ft). diff --git a/apps/emqx_ft/rebar.config b/apps/emqx_ft/rebar.config new file mode 100644 index 000000000..2c0962035 --- /dev/null +++ b/apps/emqx_ft/rebar.config @@ -0,0 +1,11 @@ +%% -*- mode: erlang -*- + +{erl_opts, [debug_info]}. +{deps, [{emqx, {path, "../emqx"}}]}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [emqx_ft]} +]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_ft/src/emqx_ft.app.src b/apps/emqx_ft/src/emqx_ft.app.src new file mode 100644 index 000000000..855451bfb --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft.app.src @@ -0,0 +1,12 @@ +{application, emqx_ft, [ + {description, "EMQX file transfer over MQTT"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {emqx_ft_app, []}}, + {applications, [ + kernel, + stdlib + ]}, + {env, []}, + {modules, []} +]}. diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl new file mode 100644 index 000000000..0ba9c17a6 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -0,0 +1,103 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft). + +-include("emqx_ft.hrl"). + +-export([ + create_tab/0, + hook/0, + unhook/0 +]). + +-export([ + on_channel_unregistered/1, + on_channel_takeover/3, + on_channel_takeovered/3 +]). + +-type ft_data() :: #{ + nodes := list(node()) +}. + +-record(emqx_ft, { + chan_pid :: pid(), + ft_data :: ft_data() +}). + +%%-------------------------------------------------------------------- +%% API for app +%%-------------------------------------------------------------------- + +create_tab() -> + _Tab = ets:new(?FT_TAB, [ + set, + public, + named_table, + {keypos, #emqx_ft.chan_pid} + ]), + ok. + +hook() -> + % ok = emqx_hooks:put('channel.registered', {?MODULE, on_channel_registered, []}), + ok = emqx_hooks:put('channel.unregistered', {?MODULE, on_channel_unregistered, []}), + ok = emqx_hooks:put('channel.takeover', {?MODULE, on_channel_takeover, []}), + ok = emqx_hooks:put('channel.takeovered', {?MODULE, on_channel_takeovered, []}). + +unhook() -> + % ok = emqx_hooks:del('channel.registered', {?MODULE, on_channel_registered}), + ok = emqx_hooks:del('channel.unregistered', {?MODULE, on_channel_unregistered}), + ok = emqx_hooks:del('channel.takeover', {?MODULE, on_channel_takeover}), + ok = emqx_hooks:del('channel.takeovered', {?MODULE, on_channel_takeovered}). + +%%-------------------------------------------------------------------- +%% Hooks +%%-------------------------------------------------------------------- + +on_channel_unregistered(ChanPid) -> + ok = delete_ft_data(ChanPid). + +on_channel_takeover(_ConnMod, ChanPid, TakeoverData) -> + case get_ft_data(ChanPid) of + {ok, FTData} -> + {ok, TakeoverData#{ft_data => FTData}}; + none -> + ok + end. + +on_channel_takeovered(_ConnMod, ChanPid, #{ft_data := FTData}) -> + ok = put_ft_data(ChanPid, FTData); +on_channel_takeovered(_ConnMod, _ChanPid, _) -> + ok. + +%%-------------------------------------------------------------------- +%% Private funs +%%-------------------------------------------------------------------- + +get_ft_data(ChanPid) -> + case ets:lookup(?FT_TAB, ChanPid) of + [#emqx_ft{ft_data = FTData}] -> {ok, FTData}; + [] -> none + end. + +delete_ft_data(ChanPid) -> + true = ets:delete(?FT_TAB, ChanPid), + ok. + +put_ft_data(ChanPid, FTData) -> + true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}), + ok. diff --git a/apps/emqx_ft/src/emqx_ft_app.erl b/apps/emqx_ft/src/emqx_ft_app.erl new file mode 100644 index 000000000..4778da1a1 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_app.erl @@ -0,0 +1,30 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + {ok, Sup} = emqx_ft_sup:start_link(), + ok = emqx_ft:hook(), + {ok, Sup}. + +stop(_State) -> + ok = emqx_ft:unhook(), + ok. diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl new file mode 100644 index 000000000..4c976246b --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + ok = emqx_ft:create_tab(), + SupFlags = #{ + strategy => one_for_all, + intensity => 100, + period => 10 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions