From 9e6966110ddefa5c706ba1d4de53bfe75f66c89b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 26 Mar 2020 18:32:24 +0800 Subject: [PATCH] Move emqx_delay_publish here (#3323) Move emqx_delay_publish here --- etc/emqx.conf | 8 ++ priv/emqx.schema | 9 ++ src/emqx_mod_delayed.erl | 195 ++++++++++++++++++++++++++++++++ src/emqx_sup.erl | 3 +- test/emqx_mod_delayed_SUITE.erl | 84 ++++++++++++++ 5 files changed, 298 insertions(+), 1 deletion(-) create mode 100644 src/emqx_mod_delayed.erl create mode 100644 test/emqx_mod_delayed_SUITE.erl diff --git a/etc/emqx.conf b/etc/emqx.conf index 0dc1b314e..173aedb3d 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1896,6 +1896,14 @@ module.rewrite = off ## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1 ## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 +##-------------------------------------------------------------------- +## Delayed Module + +## Enable Delayed Module. +## +## Value: on | off +module.delayed = off + ##------------------------------------------------------------------- ## Plugins ##------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 2f862c9bd..12bf88bcc 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1841,6 +1841,11 @@ end}. {datatype, string} ]}. +{mapping, "module.delayed", "emqx.modules", [ + {default, off}, + {datatype, flag} +]}. + {translation, "emqx.modules", fun(Conf) -> Subscriptions = fun() -> List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), @@ -1870,6 +1875,10 @@ end}. case cuttlefish:conf_get("module.rewrite", Conf) of %% Rewrite true -> [{emqx_mod_rewrite, Rewrites()}]; false -> [] + end, + case cuttlefish:conf_get("module.delayed", Conf) of %% Delayed + true -> [{emqx_mod_delayed, []}]; + false -> [] end ]) end}. diff --git a/src/emqx_mod_delayed.erl b/src/emqx_mod_delayed.erl new file mode 100644 index 000000000..ca5f2218e --- /dev/null +++ b/src/emqx_mod_delayed.erl @@ -0,0 +1,195 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mod_delayed). + +-behaviour(gen_server). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% emqx_gen_mod callbacks +-export([ load/1 + , unload/1 + ]). + +-export([ start_link/0 + , on_message_publish/1 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-record(delayed_message, + { key + , msg + }). + +-define(TAB, ?MODULE). +-define(SERVER, ?MODULE). +-define(MAX_INTERVAL, 4294967). + +%%-------------------------------------------------------------------- +%% Load/Unload +%%-------------------------------------------------------------------- + +-spec(load(list()) -> ok). +load(_Env) -> + emqx_mod_sup:start_child(?MODULE, worker), + emqx:hook('message.publish', {?MODULE, on_message_publish, []}). + +-spec(unload(list()) -> ok). +unload(_Env) -> + emqx_mod_sup:stop_child(?MODULE), + emqx:unhook('message.publish', {?MODULE, on_message_publish, []}). + +%%-------------------------------------------------------------------- +%% Hooks +%%-------------------------------------------------------------------- + +on_message_publish(Msg = #message{id = Id, topic = <<"$delayed/", Topic/binary>>, timestamp = Ts}) -> + [Delay, Topic1] = binary:split(Topic, <<"/">>), + PubAt = case binary_to_integer(Delay) of + Interval when Interval < ?MAX_INTERVAL -> + Interval + erlang:round(Ts / 1000); + Timestamp -> + %% Check malicious timestamp? + case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of + true -> error(invalid_delayed_timestamp); + false -> Timestamp + end + end, + PubMsg = Msg#message{topic = Topic1}, + Headers = case PubMsg#message.headers of + undefined -> #{}; + Headers0 -> Headers0 + end, + ok = store(#delayed_message{key = {PubAt, delayed_mid(Id)}, msg = PubMsg}), + {stop, PubMsg#message{headers = Headers#{allow_publish => false}}}; + +on_message_publish(Msg) -> + {ok, Msg}. + +%% @private +delayed_mid(undefined) -> + emqx_guid:gen(); +delayed_mid(MsgId) -> MsgId. + +%%-------------------------------------------------------------------- +%% Start delayed publish server +%%-------------------------------------------------------------------- + +-spec(start_link() -> emqx_types:startlink_ret()). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +-spec(store(#delayed_message{}) -> ok). +store(DelayedMsg) -> + gen_server:call(?SERVER, {store, DelayedMsg}, infinity). + +%%-------------------------------------------------------------------- +%% gen_server callback +%%-------------------------------------------------------------------- + +init([]) -> + ok = ekka_mnesia:create_table(?TAB, [ + {type, ordered_set}, + {disc_copies, [node()]}, + {local_content, true}, + {record_name, delayed_message}, + {attributes, record_info(fields, delayed_message)}]), + ok = ekka_mnesia:copy_table(?TAB, disc_copies), + {ok, ensure_publish_timer(#{timer => undefined, publish_at => 0})}. + +handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) -> + ok = mnesia:dirty_write(?TAB, DelayedMsg), + emqx_metrics:set('messages.delayed', delayed_count()), + {reply, ok, ensure_publish_timer(Key, State)}; + +handle_call(Req, _From, State) -> + ?LOG(error, "[Delayed] Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "[Delayed] Unexpected cast: ~p", [Msg]), + {noreply, State}. + +%% Do Publish... +handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) -> + DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), + lists:foreach(fun(Key) -> mnesia:dirty_delete(?TAB, Key) end, DeletedKeys), + emqx_metrics:set('messages.delayed', delayed_count()), + {noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})}; + +handle_info(Info, State) -> + ?LOG(error, "[Delayed] Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #{timer := TRef}) -> + emqx_misc:cancel_timer(TRef). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +%% Ensure publish timer +ensure_publish_timer(State) -> + ensure_publish_timer(mnesia:dirty_first(?TAB), State). + +ensure_publish_timer('$end_of_table', State) -> + State#{timer := undefined, publish_at := 0}; +ensure_publish_timer({Ts, _Id}, State = #{timer := undefined}) -> + ensure_publish_timer(Ts, os:system_time(seconds), State); +ensure_publish_timer({Ts, _Id}, State = #{timer := TRef, publish_at := PubAt}) + when Ts < PubAt -> + ok = emqx_misc:cancel_timer(TRef), + ensure_publish_timer(Ts, os:system_time(seconds), State); +ensure_publish_timer(_Key, State) -> + State. + +ensure_publish_timer(Ts, Now, State) -> + Interval = max(1, Ts - Now), + TRef = emqx_misc:start_timer(timer:seconds(Interval), do_publish), + State#{timer := TRef, publish_at := Now + Interval}. + +do_publish(Key, Now) -> + do_publish(Key, Now, []). + +%% Do publish +do_publish('$end_of_table', _Now, Acc) -> + Acc; +do_publish({Ts, _Id}, Now, Acc) when Ts > Now -> + Acc; +do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> + case mnesia:dirty_read(?TAB, Key) of + [] -> ok; + [#delayed_message{msg = Msg}] -> + emqx_pool:async_submit(fun emqx_broker:publish/1, [Msg]) + end, + do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key|Acc]). + +-spec(delayed_count() -> non_neg_integer()). +delayed_count() -> mnesia:table_info(?TAB, size). + diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index 1782b64a0..c0aa3a2a8 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -67,11 +67,12 @@ init([]) -> BrokerSup = child_spec(emqx_broker_sup, supervisor), CMSup = child_spec(emqx_cm_sup, supervisor), SysSup = child_spec(emqx_sys_sup, supervisor), + ModSup = child_spec(emqx_mod_sup, supervisor), Childs = [KernelSup] ++ [RouterSup || emqx_boot:is_enabled(router)] ++ [BrokerSup || emqx_boot:is_enabled(broker)] ++ [CMSup || emqx_boot:is_enabled(broker)] ++ - [SysSup], + [SysSup] ++ [ModSup], SupFlags = #{strategy => one_for_all, intensity => 0, period => 1 diff --git a/test/emqx_mod_delayed_SUITE.erl b/test/emqx_mod_delayed_SUITE.erl new file mode 100644 index 000000000..b75180e01 --- /dev/null +++ b/test/emqx_mod_delayed_SUITE.erl @@ -0,0 +1,84 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mod_delayed_SUITE). + +-import(emqx_mod_delayed, [on_message_publish/1]). + +-compile(export_all). +-compile(nowarn_export_all). + +-record(delayed_message, {key, msg}). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx.hrl"). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([], fun set_special_configs/1), + Config. + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([]). + +set_special_configs(emqx) -> + application:set_env(emqx, modules, [{emqx_mod_delayed, []}]), + application:set_env(emqx, allow_anonymous, false), + application:set_env(emqx, enable_acl_cache, false), + application:set_env(emqx, plugins_loaded_file, + emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); +set_special_configs(_App) -> + ok. + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_load_case(_) -> + ok = emqx_mod_delayed:unload([]), + timer:sleep(100), + UnHooks = emqx_hooks:lookup('message.publish'), + ?assertEqual([], UnHooks), + ok = emqx_mod_delayed:load([]), + Hooks = emqx_hooks:lookup('message.publish'), + ?assertEqual(1, length(Hooks)), + ok. + +t_delayed_message(_) -> + DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/5/publish">>, <<"delayed_m">>), + ?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)), + + Msg = emqx_message:make(?MODULE, 1, <<"publish">>, <<"delayed_m">>), + ?assertEqual({ok, Msg}, on_message_publish(Msg)), + + [Key] = mnesia:dirty_all_keys(emqx_mod_delayed), + [#delayed_message{msg = #message{payload = Payload}}] = mnesia:dirty_read({emqx_mod_delayed, Key}), + ?assertEqual(<<"delayed_m">>, Payload), + timer:sleep(6000), + + EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed), + ?assertEqual([], EmptyKey), + %%TODO + %% ExMsg = emqx_message:make(emqx_mod_delayed_SUITE, 1, <<"$delayed/time/publish">>, <<"delayed_message">>), + %% {ok, _} = on_message_publish(ExMsg), + ok.