chore(retainer): reformat retainer codes

This commit is contained in:
firest 2022-04-19 10:41:54 +08:00
parent ec29b8381c
commit f1acfece6b
14 changed files with 895 additions and 605 deletions

View File

@ -24,8 +24,10 @@
-type payload() :: binary().
-type message() :: #message{}.
-type context() :: #{context_id := pos_integer(),
atom() => term()}.
-type context() :: #{
context_id := pos_integer(),
atom() => term()
}.
-define(DELIVER_SEMAPHORE, deliver_remained_quota).
-type semaphore() :: ?DELIVER_SEMAPHORE.

View File

@ -1,27 +1,35 @@
%% -*- mode: erlang -*-
{deps, [ {emqx, {path, "../emqx"}}
]}.
{deps, [{emqx, {path, "../emqx"}}]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
{erl_opts, [
warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{parse_transform}
]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{xref_checks, [
undefined_function_calls,
undefined_functions,
locals_not_used,
deprecated_function_calls,
warnings_as_errors,
deprecated_functions
]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps,
[
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.0"}}}]}
{profiles, [
{test, [
{deps, [
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.0"}}}
]}
]}.
]}
]}.
{project_plugins, [erlfmt]}.

View File

@ -1,15 +1,17 @@
%% -*- mode: erlang -*-
{application, emqx_retainer,
[{description, "EMQX Retainer"},
{vsn, "5.0.0"}, % strict semver, bump manually!
{application, emqx_retainer, [
{description, "EMQX Retainer"},
% strict semver, bump manually!
{vsn, "5.0.0"},
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel,stdlib,emqx]},
{mod, {emqx_retainer_app,[]}},
{applications, [kernel, stdlib, emqx]},
{mod, {emqx_retainer_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQX Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"},
{links, [
{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx-retainer"}
]}
]}.
]}.

View File

@ -23,38 +23,43 @@
-export([start_link/0]).
-export([ on_session_subscribed/4
, on_message_publish/2
]).
-export([
on_session_subscribed/4,
on_message_publish/2
]).
-export([ delete_message/2
, store_retained/2
, get_backend_module/0
]).
-export([
delete_message/2,
store_retained/2,
get_backend_module/0
]).
-export([ get_expiry_time/1
, update_config/1
, clean/0
, delete/1
, page_read/3
, post_config_update/5
, stats_fun/0
]).
-export([
get_expiry_time/1,
update_config/1,
clean/0,
delete/1,
page_read/3,
post_config_update/5,
stats_fun/0
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-type state() :: #{ enable := boolean()
, context_id := non_neg_integer()
, context := undefined | context()
, clear_timer := undefined | reference()
}.
-type state() :: #{
enable := boolean(),
context_id := non_neg_integer(),
context := undefined | context(),
clear_timer := undefined | reference()
}.
-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
-define(DEF_EXPIRY_INTERVAL, 0).
@ -86,10 +91,14 @@ on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) ->
end.
%% RETAIN flag set to 1 and payload containing zero bytes
on_message_publish(Msg = #message{flags = #{retain := true},
on_message_publish(
Msg = #message{
flags = #{retain := true},
topic = Topic,
payload = <<>>},
Context) ->
payload = <<>>
},
Context
) ->
delete_message(Context, Topic),
case get_stop_publish_clear_msg() of
true ->
@ -97,7 +106,6 @@ on_message_publish(Msg = #message{flags = #{retain := true},
_ ->
{ok, Msg}
end;
on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
Msg1 = emqx_message:set_header(retained, true, Msg),
store_retained(Context, Msg1),
@ -110,14 +118,16 @@ on_message_publish(Msg, _) ->
%%--------------------------------------------------------------------
%% @doc Start the retainer
-spec(start_link() -> emqx_types:startlink_ret()).
-spec start_link() -> emqx_types:startlink_ret().
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) ->
0;
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
timestamp = Ts}) ->
get_expiry_time(#message{
headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
timestamp = Ts
}) ->
Ts + Interval * 1000;
get_expiry_time(#message{timestamp = Ts}) ->
Interval = emqx_conf:get([retainer, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL),
@ -171,20 +181,16 @@ init([]) ->
handle_call({update_config, NewConf, OldConf}, _, State) ->
State2 = update_config(State, NewConf, OldConf),
{reply, ok, State2};
handle_call(clean, _, #{context := Context} = State) ->
clean(Context),
{reply, ok, State};
handle_call({delete, Topic}, _, #{context := Context} = State) ->
delete_message(Context, Topic),
{reply, ok, State};
handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) ->
Mod = get_backend_module(),
Result = Mod:page_read(Context, Topic, Page, Limit),
{reply, Result, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.
@ -194,7 +200,6 @@ handle_cast(stats_fun, #{context := Context} = State) ->
Size = Mod:size(Context),
emqx_stats:setstat('retained.count', 'retained.max', Size),
{noreply, State};
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}.
@ -204,7 +209,6 @@ handle_info(clear_expired, #{context := Context} = State) ->
Mod:clear_expired(Context),
Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}.
@ -222,7 +226,8 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-spec new_state() -> state().
new_state() ->
#{enable => false,
#{
enable => false,
context_id => 0,
context => undefined,
clear_timer => undefined
@ -249,11 +254,13 @@ store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->
Size = iolist_size(Payload),
case payload_size_limit() of
Limit when Limit > 0 andalso Limit < Size ->
?SLOG(error, #{msg => "retain_failed_for_payload_size_exceeded_limit",
?SLOG(error, #{
msg => "retain_failed_for_payload_size_exceeded_limit",
topic => Topic,
config => emqx_hocon:format_path(?MAX_PAYLOAD_SIZE_CONFIG_PATH),
size => Size,
limit => Limit});
limit => Limit
});
_ ->
Mod = get_backend_module(),
Mod:store_retained(Context, Msg)
@ -266,23 +273,30 @@ clean(Context) ->
-spec update_config(state(), hocons:config(), hocons:config()) -> state().
update_config(State, Conf, OldConf) ->
update_config(maps:get(enable, Conf),
update_config(
maps:get(enable, Conf),
maps:get(enable, OldConf),
State,
Conf,
OldConf).
OldConf
).
-spec update_config(boolean(), boolean(), state(), hocons:config(), hocons:config()) -> state().
update_config(false, _, State, _, _) ->
disable_retainer(State);
update_config(true, false, State, NewConf, _) ->
enable_retainer(State, NewConf);
update_config(true, true,
#{clear_timer := ClearTimer} = State, NewConf, OldConf) ->
#{backend := BackendCfg,
msg_clear_interval := ClearInterval} = NewConf,
update_config(
true,
true,
#{clear_timer := ClearTimer} = State,
NewConf,
OldConf
) ->
#{
backend := BackendCfg,
msg_clear_interval := ClearInterval
} = NewConf,
#{backend := OldBackendCfg} = OldConf,
@ -290,32 +304,47 @@ update_config(true, true,
OldStrorageType = maps:get(type, OldBackendCfg),
case OldStrorageType of
StorageType ->
State#{clear_timer := check_timer(ClearTimer,
State#{
clear_timer := check_timer(
ClearTimer,
ClearInterval,
clear_expired)};
clear_expired
)
};
_ ->
State2 = disable_retainer(State),
enable_retainer(State2, NewConf)
end.
-spec enable_retainer(state(), hocon:config()) -> state().
enable_retainer(#{context_id := ContextId} = State,
#{msg_clear_interval := ClearInterval,
backend := BackendCfg}) ->
enable_retainer(
#{context_id := ContextId} = State,
#{
msg_clear_interval := ClearInterval,
backend := BackendCfg
}
) ->
NewContextId = ContextId + 1,
Context = create_resource(new_context(NewContextId), BackendCfg),
load(Context),
State#{enable := true,
State#{
enable := true,
context_id := NewContextId,
context := Context,
clear_timer := add_timer(ClearInterval, clear_expired)}.
clear_timer := add_timer(ClearInterval, clear_expired)
}.
-spec disable_retainer(state()) -> state().
disable_retainer(#{clear_timer := ClearTimer,
context := Context} = State) ->
disable_retainer(
#{
clear_timer := ClearTimer,
context := Context
} = State
) ->
unload(),
ok = close_resource(Context),
State#{enable := false,
State#{
enable := false,
clear_timer := stop_timer(ClearTimer)
}.
@ -344,7 +373,8 @@ check_timer(Timer, _, _) ->
-spec get_backend_module() -> backend().
get_backend_module() ->
ModName = case emqx:get_config([retainer, backend]) of
ModName =
case emqx:get_config([retainer, backend]) of
#{type := built_in_database} -> mnesia;
#{type := Backend} -> Backend
end,
@ -353,15 +383,17 @@ get_backend_module() ->
create_resource(Context, #{type := built_in_database} = Cfg) ->
emqx_retainer_mnesia:create_resource(Cfg),
Context;
create_resource(Context, #{type := DB} = Config) ->
ResourceID = erlang:iolist_to_binary([io_lib:format("~ts_~ts", [?APP, DB])]),
case emqx_resource:create(
case
emqx_resource:create(
ResourceID,
<<"emqx_retainer">>,
list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])),
Config,
#{}) of
#{}
)
of
{ok, already_created} ->
Context#{resource_id => ResourceID};
{ok, _} ->

View File

@ -24,14 +24,17 @@
%% API
-export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]).
-export([ lookup_retained_warp/2
, with_topic_warp/2
, config/2]).
-export([
lookup_retained_warp/2,
with_topic_warp/2,
config/2
]).
-import(hoconsc, [mk/2, ref/1, ref/2, array/1]).
-import(emqx_dashboard_swagger, [error_codes/2]).
-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024
%% 1MB = 1024 x 1024
-define(MAX_PAYLOAD_SIZE, 1048576).
-define(PREFIX, "/mqtt/retainer").
-define(TAGS, [<<"retainer">>]).
@ -44,49 +47,62 @@ paths() ->
[?PREFIX, ?PREFIX ++ "/messages", ?PREFIX ++ "/message/:topic"].
schema(?PREFIX) ->
#{'operationId' => config,
get => #{tags => ?TAGS,
#{
'operationId' => config,
get => #{
tags => ?TAGS,
description => <<"Get retainer config">>,
responses => #{200 => mk(conf_schema(), #{desc => "The config content"}),
responses => #{
200 => mk(conf_schema(), #{desc => "The config content"}),
404 => error_codes(['NOT_FOUND'], <<"Config not found">>)
}
},
put => #{tags => ?TAGS,
put => #{
tags => ?TAGS,
description => <<"Update retainer config">>,
'requestBody' => mk(conf_schema(), #{desc => "The config content"}),
responses => #{200 => mk(conf_schema(), #{desc => "Update configs successfully"}),
responses => #{
200 => mk(conf_schema(), #{desc => "Update configs successfully"}),
400 => error_codes(['UPDATE_FAILED'], <<"Update config failed">>)
}
}
};
schema(?PREFIX ++ "/messages") ->
#{'operationId' => lookup_retained_warp,
get => #{tags => ?TAGS,
#{
'operationId' => lookup_retained_warp,
get => #{
tags => ?TAGS,
description => <<"List retained messages">>,
parameters => page_params(),
responses => #{200 => mk(array(ref(message_summary)), #{desc => "The result list"}),
responses => #{
200 => mk(array(ref(message_summary)), #{desc => "The result list"}),
400 => error_codes(['BAD_REQUEST'], <<"Unsupported backend">>)
}
}
};
schema(?PREFIX ++ "/message/:topic") ->
#{'operationId' => with_topic_warp,
get => #{tags => ?TAGS,
#{
'operationId' => with_topic_warp,
get => #{
tags => ?TAGS,
description => <<"Lookup a message by a topic without wildcards">>,
parameters => parameters(),
responses => #{200 => mk(ref(message), #{desc => "Details of the message"}),
responses => #{
200 => mk(ref(message), #{desc => "Details of the message"}),
404 => error_codes(['NOT_FOUND'], <<"Viewed message doesn't exist">>),
400 => error_codes(['BAD_REQUEST'], <<"Unsupported backend">>)
}
},
delete => #{tags => ?TAGS,
delete => #{
tags => ?TAGS,
description => <<"Delete matching messages">>,
parameters => parameters(),
responses => #{204 => <<>>,
400 => error_codes(['BAD_REQUEST'],
<<"Unsupported backend">>)
responses => #{
204 => <<>>,
400 => error_codes(
['BAD_REQUEST'],
<<"Unsupported backend">>
)
}
}
}.
@ -98,23 +114,29 @@ conf_schema() ->
ref(emqx_retainer_schema, "retainer").
parameters() ->
[{topic, mk(binary(), #{in => path,
[
{topic,
mk(binary(), #{
in => path,
required => true,
desc => "The target topic"
})}].
})}
].
fields(message_summary) ->
[ {msgid, mk(binary(), #{desc => <<"Message ID">>})}
, {topic, mk(binary(), #{desc => "The topic"})}
, {qos, mk(emqx_schema:qos(), #{desc => "The QoS"})}
, {publish_at, mk(string(), #{desc => "Publish datetime, in RFC 3339 format"})}
, {from_clientid, mk(binary(), #{desc => "Publisher ClientId"})}
, {from_username, mk(binary(), #{desc => "Publisher Username"})}
[
{msgid, mk(binary(), #{desc => <<"Message ID">>})},
{topic, mk(binary(), #{desc => "The topic"})},
{qos, mk(emqx_schema:qos(), #{desc => "The QoS"})},
{publish_at, mk(string(), #{desc => "Publish datetime, in RFC 3339 format"})},
{from_clientid, mk(binary(), #{desc => "Publisher ClientId"})},
{from_username, mk(binary(), #{desc => "Publisher Username"})}
];
fields(message) ->
[{payload, mk(binary(), #{desc => "The payload content"})} |
fields(message_summary)].
[
{payload, mk(binary(), #{desc => "The payload content"})}
| fields(message_summary)
].
lookup_retained_warp(Type, Params) ->
check_backend(Type, Params, fun lookup_retained/2).
@ -124,15 +146,16 @@ with_topic_warp(Type, Params) ->
config(get, _) ->
{200, emqx:get_raw_config([retainer])};
config(put, #{body := Body}) ->
try
{ok, _} = emqx_retainer:update_config(Body),
{200, emqx:get_raw_config([retainer])}
catch _:Reason:_ ->
{400,
#{code => <<"UPDATE_FAILED">>,
message => iolist_to_binary(io_lib:format("~p~n", [Reason]))}}
catch
_:Reason:_ ->
{400, #{
code => <<"UPDATE_FAILED">>,
message => iolist_to_binary(io_lib:format("~p~n", [Reason]))
}}
end.
%%------------------------------------------------------------------------------
@ -151,23 +174,33 @@ with_topic(get, #{bindings := Bindings}) ->
[H | _] ->
{200, format_detail_message(H)};
_ ->
{404, #{code => <<"NOT_FOUND">>,
{404, #{
code => <<"NOT_FOUND">>,
message => <<"Viewed message doesn't exist">>
}}
end;
with_topic(delete, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings),
emqx_retainer_mnesia:delete_message(undefined, Topic),
{204}.
format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From
, timestamp = Timestamp, headers = Headers}) ->
#{msgid => emqx_guid:to_hexstr(ID),
format_message(#message{
id = ID,
qos = Qos,
topic = Topic,
from = From,
timestamp = Timestamp,
headers = Headers
}) ->
#{
msgid => emqx_guid:to_hexstr(ID),
qos => Qos,
topic => Topic,
publish_at => list_to_binary(calendar:system_time_to_rfc3339(
Timestamp, [{unit, millisecond}])),
publish_at => list_to_binary(
calendar:system_time_to_rfc3339(
Timestamp, [{unit, millisecond}]
)
),
from_clientid => to_bin_string(From),
from_username => maps:get(username, Headers, <<>>)
}.

View File

@ -18,13 +18,13 @@
-behaviour(application).
-export([ start/2
, stop/1
]).
-export([
start/2,
stop/1
]).
start(_Type, _Args) ->
emqx_retainer_sup:start_link().
stop(_State) ->
ok.

View File

@ -22,15 +22,23 @@
-include_lib("emqx/include/logger.hrl").
%% API
-export([ start_link/2
, dispatch/2
, refresh_limiter/0
, worker/0
]).
-export([
start_link/2,
dispatch/2,
refresh_limiter/0,
worker/0
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, format_status/2]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
format_status/2
]).
-type limiter() :: emqx_htb_limiter:limiter().
@ -46,10 +54,12 @@ dispatch(Context, Topic) ->
%% an limiter update handler maybe added later, now this is a workaround
refresh_limiter() ->
Workers = gproc_pool:active_workers(?POOL),
lists:foreach(fun({_, Pid}) ->
lists:foreach(
fun({_, Pid}) ->
gen_server:cast(Pid, ?FUNCTION_NAME)
end,
Workers).
Workers
).
worker() ->
gproc_pool:pick_worker(?POOL, self()).
@ -59,13 +69,18 @@ worker() ->
%% Starts the server
%% @end
%%--------------------------------------------------------------------
-spec start_link(atom(), pos_integer()) -> {ok, Pid :: pid()} |
{error, Error :: {already_started, pid()}} |
{error, Error :: term()} |
ignore.
-spec start_link(atom(), pos_integer()) ->
{ok, Pid :: pid()}
| {error, Error :: {already_started, pid()}}
| {error, Error :: term()}
| ignore.
start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
gen_server:start_link(
{local, emqx_misc:proc_name(?MODULE, Id)},
?MODULE,
[Pool, Id],
[{hibernate_after, 1000}]
).
%%%===================================================================
%%% gen_server callbacks
@ -77,11 +92,12 @@ start_link(Pool, Id) ->
%% Initializes the server
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) -> {ok, State :: term()} |
{ok, State :: term(), Timeout :: timeout()} |
{ok, State :: term(), hibernate} |
{stop, Reason :: term()} |
ignore.
-spec init(Args :: term()) ->
{ok, State :: term()}
| {ok, State :: term(), Timeout :: timeout()}
| {ok, State :: term(), hibernate}
| {stop, Reason :: term()}
| ignore.
init([Pool, Id]) ->
erlang:process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
@ -96,14 +112,14 @@ init([Pool, Id]) ->
%% @end
%%--------------------------------------------------------------------
-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
{reply, Reply :: term(), NewState :: term()} |
{reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} |
{reply, Reply :: term(), NewState :: term(), hibernate} |
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: term()} |
{stop, Reason :: term(), NewState :: term()}.
{reply, Reply :: term(), NewState :: term()}
| {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()}
| {reply, Reply :: term(), NewState :: term(), hibernate}
| {noreply, NewState :: term()}
| {noreply, NewState :: term(), Timeout :: timeout()}
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: term(), Reply :: term(), NewState :: term()}
| {stop, Reason :: term(), NewState :: term()}.
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.
@ -115,19 +131,17 @@ handle_call(Req, _From, State) ->
%% @end
%%--------------------------------------------------------------------
-spec handle_cast(Request :: term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), NewState :: term()}.
{noreply, NewState :: term()}
| {noreply, NewState :: term(), Timeout :: timeout()}
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: term(), NewState :: term()}.
handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
{ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
{noreply, State#{limiter := Limiter2}};
handle_cast(refresh_limiter, State) ->
BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
Limiter = emqx_limiter_server:connect(batch, BucketName),
{noreply, State#{limiter := Limiter}};
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}.
@ -139,10 +153,10 @@ handle_cast(Msg, State) ->
%% @end
%%--------------------------------------------------------------------
-spec handle_info(Info :: timeout() | term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: normal | term(), NewState :: term()}.
{noreply, NewState :: term()}
| {noreply, NewState :: term(), Timeout :: timeout()}
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: normal | term(), NewState :: term()}.
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}.
@ -156,8 +170,10 @@ handle_info(Info, State) ->
%% with Reason. The return value is ignored.
%% @end
%%--------------------------------------------------------------------
-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()) -> any().
-spec terminate(
Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()
) -> any().
terminate(_Reason, #{pool := Pool, id := Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
%%--------------------------------------------------------------------
@ -166,10 +182,13 @@ terminate(_Reason, #{pool := Pool, id := Id}) ->
%% Convert process state when code is changed
%% @end
%%--------------------------------------------------------------------
-spec code_change(OldVsn :: term() | {down, term()},
-spec code_change(
OldVsn :: term() | {down, term()},
State :: term(),
Extra :: term()) -> {ok, NewState :: term()} |
{error, Reason :: term()}.
Extra :: term()
) ->
{ok, NewState :: term()}
| {error, Reason :: term()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -181,8 +200,10 @@ code_change(_OldVsn, State, _Extra) ->
%% or when it appears in termination error logs.
%% @end
%%--------------------------------------------------------------------
-spec format_status(Opt :: normal | terminate,
Status :: list()) -> Status :: term().
-spec format_status(
Opt :: normal | terminate,
Status :: list()
) -> Status :: term().
format_status(_Opt, Status) ->
Status.
@ -209,10 +230,8 @@ dispatch(Context, Pid, Topic, Cursor, Limiter) ->
{ok, limiter()}.
deliver([], _Context, _Pid, _Topic, undefined, Limiter) ->
{ok, Limiter};
deliver([], Context, Pid, Topic, Cursor, Limiter) ->
dispatch(Context, Pid, Topic, Cursor, Limiter);
deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
case erlang:is_process_alive(Pid) of
false ->
@ -235,7 +254,6 @@ deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) ->
{ok, Limiter};
do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
{Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs),
case emqx_htb_limiter:consume(Num, Limiter) of
@ -243,7 +261,8 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
do_deliver(ToDelivers, Pid, Topic),
do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2);
{drop, _} = Drop ->
?SLOG(error, #{msg => "retained_message_dropped",
?SLOG(error, #{
msg => "retained_message_dropped",
reason => "reached_ratelimit",
dropped_count => length(ToDelivers)
}),
@ -253,7 +272,6 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
do_deliver([Msg | T], Pid, Topic) ->
Pid ! {deliver, Topic, Msg},
do_deliver(T, Pid, Topic);
do_deliver([], _, _) ->
ok.
@ -262,9 +280,7 @@ safe_split(N, List) ->
safe_split(0, List, Count, Acc) ->
{Count, lists:reverse(Acc), List};
safe_split(_N, [], Count, Acc) ->
{Count, lists:reverse(Acc), []};
safe_split(N, [H | T], Count, Acc) ->
safe_split(N - 1, T, Count + 1, [H | Acc]).

View File

@ -23,16 +23,16 @@
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("stdlib/include/qlc.hrl").
-export([ delete_message/2
, store_retained/2
, read_message/2
, page_read/4
, match_messages/3
, clear_expired/1
, clean/1
, size/1
]).
-export([
delete_message/2,
store_retained/2,
read_message/2,
page_read/4,
match_messages/3,
clear_expired/1,
clean/1,
size/1
]).
-export([create_resource/1]).
@ -45,15 +45,20 @@
%% emqx_retainer_storage callbacks
%%--------------------------------------------------------------------
create_resource(#{storage_type := StorageType}) ->
Copies = case StorageType of
Copies =
case StorageType of
ram -> ram_copies;
disc -> disc_copies
end,
StoreProps = [{ets, [compressed,
StoreProps = [
{ets, [
compressed,
{read_concurrency, true},
{write_concurrency, true}]},
{dets, [{auto_save, 1000}]}],
{write_concurrency, true}
]},
{dets, [{auto_save, 1000}]}
],
ok = mria:create_table(?TAB, [
{type, ordered_set},
@ -65,40 +70,51 @@ create_resource(#{storage_type := StorageType}) ->
]),
ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
case mnesia:table_info(?TAB, storage_type) of
Copies -> ok;
Copies ->
ok;
_Other ->
{atomic, ok} = mnesia:change_table_copy_type(?TAB, node(), Copies),
ok
end.
store_retained(_, Msg =#message{topic = Topic}) ->
store_retained(_, Msg = #message{topic = Topic}) ->
ExpiryTime = emqx_retainer:get_expiry_time(Msg),
case is_table_full() of
false ->
mria:dirty_write(?TAB,
#retained{topic = topic2tokens(Topic),
mria:dirty_write(
?TAB,
#retained{
topic = topic2tokens(Topic),
msg = Msg,
expiry_time = ExpiryTime});
expiry_time = ExpiryTime
}
);
_ ->
Tokens = topic2tokens(Topic),
Fun = fun() ->
case mnesia:read(?TAB, Tokens) of
[_] ->
mnesia:write(?TAB,
#retained{topic = Tokens,
mnesia:write(
?TAB,
#retained{
topic = Tokens,
msg = Msg,
expiry_time = ExpiryTime},
write);
expiry_time = ExpiryTime
},
write
);
[] ->
mnesia:abort(table_is_full)
end
end,
case mria:transaction(?RETAINER_SHARD, Fun) of
{atomic, ok} -> ok;
{atomic, ok} ->
ok;
{aborted, Reason} ->
?SLOG(error, #{ msg => "failed_to_retain_message"
, topic => Topic
, reason => Reason
?SLOG(error, #{
msg => "failed_to_retain_message",
topic => Topic,
reason => Reason
})
end
end.
@ -116,7 +132,8 @@ clear_expired(_) ->
delete_message(_, Topic) ->
case emqx_topic:wildcard(Topic) of
true -> match_delete_messages(Topic);
true ->
match_delete_messages(Topic);
false ->
Tokens = topic2tokens(Topic),
Fun = fun() ->
@ -169,8 +186,7 @@ size(_) ->
%%--------------------------------------------------------------------
sort_retained([]) -> [];
sort_retained([Msg]) -> [Msg];
sort_retained(Msgs) ->
lists:sort(fun compare_message/2, Msgs).
sort_retained(Msgs) -> lists:sort(fun compare_message/2, Msgs).
compare_message(M1, M2) ->
M1#message.timestamp =< M2#message.timestamp.
@ -194,12 +210,13 @@ batch_read_messages(Cursor, MaxReadNum) ->
{ok, Answers, Cursor}
end.
-spec(read_messages(emqx_types:topic())
-> [emqx_types:message()]).
-spec read_messages(emqx_types:topic()) ->
[emqx_types:message()].
read_messages(Topic) ->
Tokens = topic2tokens(Topic),
case mnesia:dirty_read(?TAB, Tokens) of
[] -> [];
[] ->
[];
[#retained{msg = Msg, expiry_time = Et}] ->
case Et =:= 0 orelse Et >= erlang:system_time(millisecond) of
true -> [Msg];
@ -207,13 +224,13 @@ read_messages(Topic) ->
end
end.
-spec(match_messages(emqx_types:topic())
-> [emqx_types:message()]).
-spec match_messages(emqx_types:topic()) ->
[emqx_types:message()].
match_messages(Filter) ->
Ms = make_match_spec(Filter),
mnesia:dirty_select(?TAB, Ms).
-spec(match_delete_messages(emqx_types:topic()) -> ok).
-spec match_delete_messages(emqx_types:topic()) -> ok.
match_delete_messages(Filter) ->
Cond = condition(emqx_topic:words(Filter)),
MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'},
@ -223,7 +240,13 @@ match_delete_messages(Filter) ->
%% @private
condition(Ws) ->
Ws1 = [case W =:= '+' of true -> '_'; _ -> W end || W <- Ws],
Ws1 = [
case W =:= '+' of
true -> '_';
_ -> W
end
|| W <- Ws
],
case lists:last(Ws1) =:= '#' of
false -> Ws1;
_ -> (Ws1 -- ['#']) ++ '_'
@ -240,8 +263,10 @@ make_match_spec(Topic) ->
condition(emqx_topic:words(Topic))
end,
MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'},
[{MsHd, [{'=:=', '$3', 0}], ['$2']},
{MsHd, [{'>', '$3', NowMs}], ['$2']}].
[
{MsHd, [{'=:=', '$3', 0}], ['$2']},
{MsHd, [{'>', '$3', NowMs}], ['$2']}
].
-spec make_cursor(undefined | topic()) -> qlc:query_cursor().
make_cursor(Topic) ->

View File

@ -11,47 +11,74 @@ namespace() -> "retainer".
roots() -> ["retainer"].
fields("retainer") ->
[ {enable, sc(boolean(), "Enable retainer feature.", false)}
, {msg_expiry_interval, sc(emqx_schema:duration_ms(),
[
{enable, sc(boolean(), "Enable retainer feature.", false)},
{msg_expiry_interval,
sc(
emqx_schema:duration_ms(),
"Message retention time. 0 means message will never be expired.",
"0s")}
, {msg_clear_interval, sc(emqx_schema:duration_ms(),
"0s"
)},
{msg_clear_interval,
sc(
emqx_schema:duration_ms(),
"Periodic interval for cleaning up expired messages. "
"Never clear if the value is 0.",
"0s")}
, {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))}
, {max_payload_size, sc(emqx_schema:bytesize(),
"0s"
)},
{flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))},
{max_payload_size,
sc(
emqx_schema:bytesize(),
"Maximum retained message size.",
"1MB")}
, {stop_publish_clear_msg, sc(boolean(),
"1MB"
)},
{stop_publish_clear_msg,
sc(
boolean(),
"When the retained flag of the `PUBLISH` message is set and Payload is empty, "
"whether to continue to publish the message.<br/>"
"See: "
"http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038",
false)}
, {backend, backend_config()}
false
)},
{backend, backend_config()}
];
fields(mnesia_config) ->
[ {type, hoconsc:mk(hoconsc:union([built_in_database]), #{desc => "Backend type."})}
, {storage_type, sc(hoconsc:union([ram, disc]),
[
{type, hoconsc:mk(hoconsc:union([built_in_database]), #{desc => "Backend type."})},
{storage_type,
sc(
hoconsc:union([ram, disc]),
"Specifies whether the messages are stored in RAM or persisted on disc.",
ram)}
, {max_retained_messages, sc(integer(),
ram
)},
{max_retained_messages,
sc(
integer(),
"Maximum number of retained messages. 0 means no limit.",
0,
fun is_pos_integer/1)}
fun is_pos_integer/1
)}
];
fields(flow_control) ->
[ {batch_read_number, sc(integer(),
[
{batch_read_number,
sc(
integer(),
"Size of the batch when reading messages from storage. 0 means no limit.",
0,
fun is_pos_integer/1)}
, {batch_deliver_number, sc(range(0, 1000),
fun is_pos_integer/1
)},
{batch_deliver_number,
sc(
range(0, 1000),
"The number of retained messages can be delivered per batch.",
0)}
, {batch_deliver_limiter, sc(emqx_limiter_schema:bucket_name(),
0
)},
{batch_deliver_limiter,
sc(
emqx_limiter_schema:bucket_name(),
"The rate limiter name for retained messages' delivery.<br/>"
"Limiter helps to avoid delivering too many messages to the client at once, "
"which may cause the client "
@ -60,7 +87,8 @@ fields(flow_control) ->
"The names of the available rate limiters are taken from the existing rate "
"limiters under `limiter.batch`.<br/>"
"If this field is empty, limiter is not used.",
undefined)}
undefined
)}
].
desc("retainer") ->
@ -79,9 +107,11 @@ sc(Type, Desc, Default) ->
hoconsc:mk(Type, #{default => Default, desc => Desc}).
sc(Type, Desc, Default, Validator) ->
hoconsc:mk(Type, #{default => Default,
hoconsc:mk(Type, #{
default => Default,
desc => Desc,
validator => Validator}).
validator => Validator
}).
is_pos_integer(V) ->
V >= 0.

View File

@ -26,13 +26,21 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
PoolSpec = emqx_pool_sup:spec([emqx_retainer_dispatcher, hash, emqx_vm:schedulers(),
{emqx_retainer_dispatcher, start_link, []}]),
{ok, {{one_for_one, 10, 3600},
[#{id => retainer,
PoolSpec = emqx_pool_sup:spec([
emqx_retainer_dispatcher,
hash,
emqx_vm:schedulers(),
{emqx_retainer_dispatcher, start_link, []}
]),
{ok,
{{one_for_one, 10, 3600}, [
#{
id => retainer,
start => {emqx_retainer, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_retainer]},
PoolSpec]}}.
modules => [emqx_retainer]
},
PoolSpec
]}}.

View File

@ -27,23 +27,27 @@
all() -> emqx_common_test_helpers:all(?MODULE).
-define(BASE_CONF, <<"""
retainer {
enable = true
msg_clear_interval = 0s
msg_expiry_interval = 0s
max_payload_size = 1MB
flow_control {
batch_read_number = 0
batch_deliver_number = 0
batch_deliver_limiter = retainer
}
backend {
type = built_in_database
storage_type = ram
max_retained_messages = 0
}
}""">>).
-define(BASE_CONF, <<
""
"\n"
"retainer {\n"
" enable = true\n"
" msg_clear_interval = 0s\n"
" msg_expiry_interval = 0s\n"
" max_payload_size = 1MB\n"
" flow_control {\n"
" batch_read_number = 0\n"
" batch_deliver_number = 0\n"
" batch_deliver_limiter = retainer\n"
" }\n"
" backend {\n"
" type = built_in_database\n"
" storage_type = ram\n"
" max_retained_messages = 0\n"
" }\n"
"}"
""
>>).
%%--------------------------------------------------------------------
%% Setups
@ -82,9 +86,11 @@ t_store_and_clean(_) ->
{ok, _} = emqtt:connect(C1),
emqtt:publish(
C1, <<"retained">>,
C1,
<<"retained">>,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
timer:sleep(100),
{ok, List} = emqx_retainer:page_read(<<"retained">>, 1, 10),
@ -121,9 +127,11 @@ t_retain_handling(_) ->
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>),
emqtt:publish(
C1, <<"retained">>,
C1,
<<"retained">>,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(1, length(receive_messages(1))),
@ -154,17 +162,23 @@ t_wildcard_subscription(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(
C1, <<"retained/0">>,
C1,
<<"retained/0">>,
<<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/1">>,
C1,
<<"retained/1">>,
<<"this is a retained message 1">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/a/b/c">>,
C1,
<<"retained/a/b/c">>,
<<"this is a retained message 2">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0),
@ -180,25 +194,38 @@ t_message_expiry(_) ->
{ok, _} = emqtt:connect(C1),
emqtt:publish(
C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0},
C1,
<<"retained/0">>,
#{'Message-Expiry-Interval' => 0},
<<"don't expire">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2},
C1,
<<"retained/1">>,
#{'Message-Expiry-Interval' => 2},
<<"expire">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5},
C1,
<<"retained/2">>,
#{'Message-Expiry-Interval' => 5},
<<"don't expire">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/3">>,
C1,
<<"retained/3">>,
<<"don't expire">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"$SYS/retained/4">>,
C1,
<<"$SYS/retained/4">>,
<<"don't expire">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
@ -240,17 +267,23 @@ t_clean(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(
C1, <<"retained/0">>,
C1,
<<"retained/0">>,
<<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/1">>,
C1,
<<"retained/1">>,
<<"this is a retained message 1">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/test/0">>,
C1,
<<"retained/test/0">>,
<<"this is a retained message 2">>,
[{qos, 0}, {retain, true}]),
[{qos, 0}, {retain, true}]
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
?assertEqual(3, length(receive_messages(3))),
@ -266,7 +299,8 @@ t_stop_publish_clear_msg(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(
C1, <<"retained/0">>,
C1,
<<"retained/0">>,
<<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]
),
@ -282,9 +316,13 @@ t_stop_publish_clear_msg(_) ->
t_flow_control(_) ->
#{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]),
RetainerCfg2 = RetainerCfg#{per_client :=
PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
capacity := 1}},
RetainerCfg2 = RetainerCfg#{
per_client :=
PerClient#{
rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
capacity := 1
}
},
emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2),
emqx_limiter_manager:restart_server(shared),
timer:sleep(500),
@ -292,24 +330,31 @@ t_flow_control(_) ->
emqx_retainer_dispatcher:refresh_limiter(),
timer:sleep(500),
emqx_retainer:update_config(#{<<"flow_control">> =>
#{<<"batch_read_number">> => 1,
emqx_retainer:update_config(#{
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1,
<<"batch_deliver_limiter">> => retainer}}),
<<"batch_deliver_limiter">> => retainer
}
}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(
C1, <<"retained/0">>,
C1,
<<"retained/0">>,
<<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/1">>,
C1,
<<"retained/1">>,
<<"this is a retained message 1">>,
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/3">>,
C1,
<<"retained/3">>,
<<"this is a retained message 3">>,
[{qos, 0}, {retain, true}]
),
@ -319,8 +364,10 @@ t_flow_control(_) ->
End = erlang:system_time(millisecond),
Diff = End - Begin,
?assert(Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9),
lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))),
?assert(
Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9),
lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))
),
ok = emqtt:disconnect(C1),
@ -335,22 +382,28 @@ t_flow_control(_) ->
t_clear_expired(_) ->
ConfMod = fun(Conf) ->
Conf#{<<"msg_clear_interval">> := <<"1s">>,
<<"msg_expiry_interval">> := <<"3s">>}
Conf#{
<<"msg_clear_interval">> := <<"1s">>,
<<"msg_expiry_interval">> := <<"3s">>
}
end,
Case = fun() ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
lists:foreach(fun(I) ->
emqtt:publish(C1,
lists:foreach(
fun(I) ->
emqtt:publish(
C1,
<<"retained/", (I + 60):8/unsigned-integer>>,
#{'Message-Expiry-Interval' => 3},
<<"retained">>,
[{qos, 0}, {retain, true}])
[{qos, 0}, {retain, true}]
)
end,
lists:seq(1, 5)),
lists:seq(1, 5)
),
timer:sleep(1000),
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
@ -373,11 +426,21 @@ t_max_payload_size(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(C1,
<<"retained/1">>, #{}, <<"1234">>, [{qos, 0}, {retain, true}]),
emqtt:publish(
C1,
<<"retained/1">>,
#{},
<<"1234">>,
[{qos, 0}, {retain, true}]
),
emqtt:publish(C1,
<<"retained/2">>, #{}, <<"1234567">>, [{qos, 0}, {retain, true}]),
emqtt:publish(
C1,
<<"retained/2">>,
#{},
<<"1234567">>,
[{qos, 0}, {retain, true}]
),
timer:sleep(500),
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
@ -394,7 +457,8 @@ t_page_read(_) ->
timer:sleep(500),
Fun = fun(I) ->
emqtt:publish(C1,
emqtt:publish(
C1,
<<"retained/", (I + 60)>>,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]
@ -436,9 +500,9 @@ receive_messages(Count, Msgs) ->
receive
{publish, Msg} ->
ct:log("Msg: ~p ~n", [Msg]),
receive_messages(Count-1, [Msg|Msgs]);
receive_messages(Count - 1, [Msg | Msgs]);
Other ->
ct:log("Other Msg: ~p~n",[Other]),
ct:log("Other Msg: ~p~n", [Other]),
receive_messages(Count, Msgs)
after 2000 ->
Msgs
@ -451,7 +515,8 @@ with_conf(ConfMod, Case) ->
try
Case(),
emqx_retainer:update_config(Conf)
catch Type:Error:Strace ->
catch
Type:Error:Strace ->
emqx_retainer:update_config(Conf),
erlang:raise(Type, Error, Strace)
end.

View File

@ -56,16 +56,28 @@ t_config(_Config) ->
Path = api_path(["mqtt", "retainer"]),
{ok, ConfJson} = request_api(get, Path),
ReturnConf = decode_json(ConfJson),
?assertMatch(#{backend := _, enable := _, flow_control := _,
max_payload_size := _, msg_clear_interval := _,
msg_expiry_interval := _},
ReturnConf),
?assertMatch(
#{
backend := _,
enable := _,
flow_control := _,
max_payload_size := _,
msg_clear_interval := _,
msg_expiry_interval := _
},
ReturnConf
),
UpdateConf = fun(Enable) ->
RawConf = emqx_json:decode(ConfJson, [return_maps]),
UpdateJson = RawConf#{<<"enable">> := Enable},
{ok, UpdateResJson} = request_api(put,
Path, [], auth_header_(), UpdateJson),
{ok, UpdateResJson} = request_api(
put,
Path,
[],
auth_header_(),
UpdateJson
),
UpdateRawConf = emqx_json:decode(UpdateResJson, [return_maps]),
?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf))
end,
@ -80,9 +92,12 @@ t_messages(_) ->
timer:sleep(500),
Each = fun(I) ->
emqtt:publish(C1, <<"retained/", (I + 60)>>,
emqtt:publish(
C1,
<<"retained/", (I + 60)>>,
<<"retained">>,
[{qos, 0}, {retain, true}])
[{qos, 0}, {retain, true}]
)
end,
lists:foreach(Each, lists:seq(1, 5)),
@ -91,19 +106,28 @@ t_messages(_) ->
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
Msgs = decode_json(MsgsJson),
MsgLen = erlang:length(Msgs),
?assert(MsgLen >= 5,
io_lib:format("message length is:~p~n", [MsgLen])), %% maybe has $SYS messages
?assert(
MsgLen >= 5,
%% maybe has $SYS messages
io_lib:format("message length is:~p~n", [MsgLen])
),
[First | _] = Msgs,
?assertMatch(#{msgid := _, topic := _, qos := _,
publish_at := _, from_clientid := _, from_username := _
?assertMatch(
#{
msgid := _,
topic := _,
qos := _,
publish_at := _,
from_clientid := _,
from_username := _
},
First),
First
),
ok = emqtt:disconnect(C1).
t_lookup_and_delete(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqx_retainer:clean(),
@ -116,10 +140,18 @@ t_lookup_and_delete(_) ->
{ok, LookupJson} = request_api(get, API),
LookupResult = decode_json(LookupJson),
?assertMatch(#{msgid := _, topic := _, qos := _, payload := _,
publish_at := _, from_clientid := _, from_username := _
?assertMatch(
#{
msgid := _,
topic := _,
qos := _,
payload := _,
publish_at := _,
from_clientid := _,
from_username := _
},
LookupResult),
LookupResult
),
{ok, []} = request_api(delete, API),

View File

@ -37,4 +37,3 @@ end_per_testcase(_TestCase, Config) ->
% t_load(_) ->
% error('TODO').

View File

@ -43,7 +43,7 @@ receive_messages(0, Msgs) ->
receive_messages(Count, Msgs) ->
receive
{publish, Msg} ->
receive_messages(Count-1, [Msg|Msgs]);
receive_messages(Count - 1, [Msg | Msgs]);
_Other ->
receive_messages(Count, Msgs)
after 300 ->
@ -74,17 +74,26 @@ t_publish_retain_message(_) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1),
{ok, _} = emqtt:publish(
Client1, Topic, #{},
Client1,
Topic,
#{},
<<"retained message">>,
[{qos, 2}, {retain, true}]),
[{qos, 2}, {retain, true}]
),
{ok, _} = emqtt:publish(
Client1, Topic, #{},
Client1,
Topic,
#{},
<<"new retained message">>,
[{qos, 2}, {retain, true}]),
[{qos, 2}, {retain, true}]
),
{ok, _} = emqtt:publish(
Client1, Topic, #{},
Client1,
Topic,
#{},
<<"not retained message">>,
[{qos, 2}, {retain, false}]),
[{qos, 2}, {retain, false}]
),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
[Msg] = receive_messages(3),
@ -95,7 +104,8 @@ t_publish_retain_message(_) ->
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
?assertEqual(0, length(receive_messages(1))), %% [MQTT-3.3.1-6] [MQTT-3.3.1-7]
%% [MQTT-3.3.1-6] [MQTT-3.3.1-7]
?assertEqual(0, length(receive_messages(1))),
ok = emqtt:disconnect(Client1).
@ -103,38 +113,55 @@ t_publish_message_expiry_interval(_) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1),
{ok, _} = emqtt:publish(
Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1},
Client1,
<<"topic/A">>,
#{'Message-Expiry-Interval' => 1},
<<"retained message">>,
[{qos, 1}, {retain, true}]),
[{qos, 1}, {retain, true}]
),
{ok, _} = emqtt:publish(
Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1},
Client1,
<<"topic/B">>,
#{'Message-Expiry-Interval' => 1},
<<"retained message">>,
[{qos, 2}, {retain, true}]),
[{qos, 2}, {retain, true}]
),
{ok, _} = emqtt:publish(
Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10},
Client1,
<<"topic/C">>,
#{'Message-Expiry-Interval' => 10},
<<"retained message">>,
[{qos, 1}, {retain, true}]),
[{qos, 1}, {retain, true}]
),
{ok, _} = emqtt:publish(
Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10},
Client1,
<<"topic/D">>,
#{'Message-Expiry-Interval' => 10},
<<"retained message">>,
[{qos, 2}, {retain, true}]),
[{qos, 2}, {retain, true}]
),
timer:sleep(1500),
{ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2),
Msgs = receive_messages(6),
?assertEqual(2, length(Msgs)), %% [MQTT-3.3.2-5]
%% [MQTT-3.3.2-5]
?assertEqual(2, length(Msgs)),
L = lists:map(
fun(Msg) ->
MessageExpiryInterval = maps:get('Message-Expiry-Interval',
maps:get(properties, Msg)),
MessageExpiryInterval = maps:get(
'Message-Expiry-Interval',
maps:get(properties, Msg)
),
MessageExpiryInterval < 10
end, Msgs),
?assertEqual(2, length(L)), %% [MQTT-3.3.2-6]
end,
Msgs
),
%% [MQTT-3.3.2-6]
?assertEqual(2, length(L)),
ok = emqtt:disconnect(Client1),
clean_retained( <<"topic/C">>),
clean_retained( <<"topic/D">>).
clean_retained(<<"topic/C">>),
clean_retained(<<"topic/D">>).
%%--------------------------------------------------------------------
%% Subsctibe
@ -144,17 +171,23 @@ t_subscribe_retain_handing(_) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1),
ok = emqtt:publish(
Client1, <<"topic/A">>, #{},
Client1,
<<"topic/A">>,
#{},
<<"retained message">>,
[{qos, 0}, {retain, true}]
),
{ok, _} = emqtt:publish(
Client1, <<"topic/B">>, #{},
Client1,
<<"topic/B">>,
#{},
<<"retained message">>,
[{qos, 1}, {retain, true}]
),
{ok, _} = emqtt:publish(
Client1, <<"topic/C">>, #{},
Client1,
<<"topic/C">>,
#{},
<<"retained message">>,
[{qos, 2}, {retain, true}]
),
@ -162,21 +195,26 @@ t_subscribe_retain_handing(_) ->
timer:sleep(200),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]),
?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-10]
%% [MQTT-3.3.1-10]
?assertEqual(3, length(receive_messages(3))),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 2}, {qos, 2}]}]),
?assertEqual(0, length(receive_messages(3))), %% [MQTT-3.3.1-11]
%% [MQTT-3.3.1-11]
?assertEqual(0, length(receive_messages(3))),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 0}, {qos, 2}]}]),
?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-9]
%% [MQTT-3.3.1-9]
?assertEqual(3, length(receive_messages(3))),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]),
?assertEqual(0, length(receive_messages(3))), %% [MQTT-3.3.1-10]
%% [MQTT-3.3.1-10]
?assertEqual(0, length(receive_messages(3))),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 0}, {qos, 2}]}]),
?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.8.4-4]
%% [MQTT-3.8.4-4]
?assertEqual(3, length(receive_messages(3))),
ok = emqtt:disconnect(Client1),
clean_retained( <<"topic/A">>),
clean_retained( <<"topic/B">>),
clean_retained( <<"topic/C">>).
clean_retained(<<"topic/A">>),
clean_retained(<<"topic/B">>),
clean_retained(<<"topic/C">>).