Merge pull request #2924 from emqx/master
Auto-pull-request-by-2019-09-21
This commit is contained in:
commit
8f0e16e119
|
@ -64,7 +64,7 @@
|
|||
%% Message flags
|
||||
flags :: #{atom() => boolean()},
|
||||
%% Message headers, or MQTT 5.0 Properties
|
||||
headers = #{},
|
||||
headers :: map(),
|
||||
%% Topic that the message is published to
|
||||
topic :: binary(),
|
||||
%% Message Payload
|
||||
|
|
|
@ -108,9 +108,9 @@ match_who(#{client_id := ClientId}, {client, ClientId}) ->
|
|||
true;
|
||||
match_who(#{username := Username}, {user, Username}) ->
|
||||
true;
|
||||
match_who(#{peername := undefined}, {ipaddr, _Tup}) ->
|
||||
match_who(#{peerhost := undefined}, {ipaddr, _Tup}) ->
|
||||
false;
|
||||
match_who(#{peername := {IP, _}}, {ipaddr, CIDR}) ->
|
||||
match_who(#{peerhost := IP}, {ipaddr, CIDR}) ->
|
||||
esockd_cidr:match(IP, CIDR);
|
||||
match_who(Client, {'and', Conds}) when is_list(Conds) ->
|
||||
lists:foldl(fun(Who, Allow) ->
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
-boot_mnesia({mnesia, [boot]}).
|
||||
-copy_mnesia({mnesia, [copy]}).
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([start_link/0, stop/0]).
|
||||
|
||||
-export([ check/1
|
||||
, add/1
|
||||
|
@ -69,11 +69,14 @@ mnesia(copy) ->
|
|||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
%% for tests
|
||||
-spec(stop() -> ok).
|
||||
stop() -> gen_server:stop(?MODULE).
|
||||
|
||||
-spec(check(emqx_types:client()) -> boolean()).
|
||||
check(#{client_id := ClientId,
|
||||
username := Username,
|
||||
peername := {IPAddr, _}
|
||||
}) ->
|
||||
peerhost := IPAddr}) ->
|
||||
ets:member(?BANNED_TAB, {client_id, ClientId})
|
||||
orelse ets:member(?BANNED_TAB, {username, Username})
|
||||
orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}).
|
||||
|
@ -82,11 +85,10 @@ check(#{client_id := ClientId,
|
|||
add(Banned) when is_record(Banned, banned) ->
|
||||
mnesia:dirty_write(?BANNED_TAB, Banned).
|
||||
|
||||
-spec(delete({client_id, emqx_types:client_id()} |
|
||||
{username, emqx_types:username()} |
|
||||
{peername, emqx_types:peername()}) -> ok).
|
||||
delete(Key) ->
|
||||
mnesia:dirty_delete(?BANNED_TAB, Key).
|
||||
-spec(delete({client_id, emqx_types:client_id()}
|
||||
| {username, emqx_types:username()}
|
||||
| {peerhost, emqx_types:peerhost()}) -> ok).
|
||||
delete(Key) -> mnesia:dirty_delete(?BANNED_TAB, Key).
|
||||
|
||||
info(InfoKey) ->
|
||||
mnesia:table_info(?BANNED_TAB, InfoKey).
|
||||
|
@ -107,8 +109,7 @@ handle_cast(Msg, State) ->
|
|||
{noreply, State}.
|
||||
|
||||
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
|
||||
mnesia:async_dirty(fun expire_banned_items/1,
|
||||
[erlang:system_time(second)]),
|
||||
mnesia:async_dirty(fun expire_banned_items/1, [erlang:system_time(second)]),
|
||||
{noreply, ensure_expiry_timer(State), hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
|
@ -127,7 +128,7 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
-ifdef(TEST).
|
||||
ensure_expiry_timer(State) ->
|
||||
State#{expiry_timer := emqx_misc:start_timer(timer:seconds(1), expire)}.
|
||||
State#{expiry_timer := emqx_misc:start_timer(10, expire)}.
|
||||
-else.
|
||||
ensure_expiry_timer(State) ->
|
||||
State#{expiry_timer := emqx_misc:start_timer(timer:minutes(1), expire)}.
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -28,6 +28,7 @@
|
|||
-export([start_link/0]).
|
||||
|
||||
-export([ register_channel/1
|
||||
, unregister_channel/1
|
||||
]).
|
||||
|
||||
-export([ get_chan_attrs/1
|
||||
|
@ -105,6 +106,11 @@ register_channel(ClientId, ChanPid) ->
|
|||
ok = emqx_cm_registry:register_channel(Chan),
|
||||
cast({registered, Chan}).
|
||||
|
||||
-spec(unregister_channel(emqx_types:client_id()) -> ok).
|
||||
unregister_channel(ClientId) when is_binary(ClientId) ->
|
||||
true = do_unregister_channel({ClientId, self()}),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
do_unregister_channel(Chan) ->
|
||||
ok = emqx_cm_registry:unregister_channel(Chan),
|
||||
|
@ -161,15 +167,15 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
|
|||
present := boolean(),
|
||||
pendings => list()}}
|
||||
| {error, Reason :: term()}).
|
||||
open_session(true, Client = #{client_id := ClientId}, Options) ->
|
||||
open_session(true, ClientInfo = #{client_id := ClientId}, ConnInfo) ->
|
||||
CleanStart = fun(_) ->
|
||||
ok = discard_session(ClientId),
|
||||
Session = emqx_session:init(Client, Options),
|
||||
Session = emqx_session:init(ClientInfo, ConnInfo),
|
||||
{ok, #{session => Session, present => false}}
|
||||
end,
|
||||
emqx_cm_locker:trans(ClientId, CleanStart);
|
||||
|
||||
open_session(false, Client = #{client_id := ClientId}, Options) ->
|
||||
open_session(false, ClientInfo = #{client_id := ClientId}, ConnInfo) ->
|
||||
ResumeStart = fun(_) ->
|
||||
case takeover_session(ClientId) of
|
||||
{ok, ConnMod, ChanPid, Session} ->
|
||||
|
@ -179,7 +185,7 @@ open_session(false, Client = #{client_id := ClientId}, Options) ->
|
|||
present => true,
|
||||
pendings => Pendings}};
|
||||
{error, not_found} ->
|
||||
Session = emqx_session:init(Client, Options),
|
||||
Session = emqx_session:init(ClientInfo, ConnInfo),
|
||||
{ok, #{session => Session, present => false}}
|
||||
end
|
||||
end,
|
||||
|
@ -204,7 +210,7 @@ takeover_session(ClientId) ->
|
|||
|
||||
takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||
case get_chan_attrs(ClientId, ChanPid) of
|
||||
#{client := #{conn_mod := ConnMod}} ->
|
||||
#{conninfo := #{conn_mod := ConnMod}} ->
|
||||
Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
|
||||
{ok, ConnMod, ChanPid, Session};
|
||||
undefined ->
|
||||
|
@ -233,7 +239,7 @@ discard_session(ClientId) when is_binary(ClientId) ->
|
|||
|
||||
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||
case get_chan_attrs(ClientId, ChanPid) of
|
||||
#{client := #{conn_mod := ConnMod}} ->
|
||||
#{conninfo := #{conn_mod := ConnMod}} ->
|
||||
ConnMod:call(ChanPid, discard);
|
||||
undefined -> ok
|
||||
end;
|
||||
|
@ -336,3 +342,4 @@ update_stats({Tab, Stat, MaxStat}) ->
|
|||
undefined -> ok;
|
||||
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
|
||||
end.
|
||||
|
||||
|
|
|
@ -102,9 +102,9 @@ start_link(Transport, Socket, Options) ->
|
|||
info(CPid) when is_pid(CPid) ->
|
||||
call(CPid, info);
|
||||
info(Conn = #connection{chan_state = ChanState}) ->
|
||||
ConnInfo = info(?INFO_KEYS, Conn),
|
||||
ChanInfo = emqx_channel:info(ChanState),
|
||||
maps:merge(ChanInfo, #{conninfo => maps:from_list(ConnInfo)}).
|
||||
SockInfo = maps:from_list(info(?INFO_KEYS, Conn)),
|
||||
maps:merge(ChanInfo, #{sockinfo => SockInfo}).
|
||||
|
||||
info(Keys, Conn) when is_list(Keys) ->
|
||||
[{Key, info(Key, Conn)} || Key <- Keys];
|
||||
|
@ -133,9 +133,9 @@ limit_info(Limit) ->
|
|||
attrs(CPid) when is_pid(CPid) ->
|
||||
call(CPid, attrs);
|
||||
attrs(Conn = #connection{chan_state = ChanState}) ->
|
||||
ConnAttrs = info(?ATTR_KEYS, Conn),
|
||||
ChanAttrs = emqx_channel:attrs(ChanState),
|
||||
maps:merge(ChanAttrs, #{conninfo => maps:from_list(ConnAttrs)}).
|
||||
SockAttrs = maps:from_list(info(?ATTR_KEYS, Conn)),
|
||||
maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
|
||||
|
||||
%% @doc Get stats of the channel.
|
||||
-spec(stats(pid()|connection()) -> emqx_types:stats()).
|
||||
|
@ -180,6 +180,7 @@ init({Transport, RawSocket, Options}) ->
|
|||
ChanState = emqx_channel:init(#{peername => Peername,
|
||||
sockname => Sockname,
|
||||
peercert => Peercert,
|
||||
protocol => mqtt,
|
||||
conn_mod => ?MODULE}, Options),
|
||||
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
|
||||
State = #connection{transport = Transport,
|
||||
|
@ -219,15 +220,18 @@ idle(timeout, _Timeout, State) ->
|
|||
|
||||
idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
|
||||
#mqtt_packet_connect{proto_ver = ProtoVer, properties = Properties} = ConnPkt,
|
||||
MaxPacketSize = emqx_mqtt_props:get_property('Maximum-Packet-Size', Properties, undefined),
|
||||
MaxPacketSize = emqx_mqtt_props:get('Maximum-Packet-Size', Properties, undefined),
|
||||
NState = State#connection{serialize = serialize_fun(ProtoVer, MaxPacketSize)},
|
||||
SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end,
|
||||
handle_incoming(Packet, SuccFun, NState);
|
||||
|
||||
idle(cast, {incoming, Packet}, State) ->
|
||||
idle(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
|
||||
?LOG(warning, "Unexpected incoming: ~p", [Packet]),
|
||||
shutdown(unexpected_incoming_packet, State);
|
||||
|
||||
idle(cast, {incoming, {error, Reason}}, State) ->
|
||||
shutdown(Reason, State);
|
||||
|
||||
idle(EventType, Content, State) ->
|
||||
?HANDLE(EventType, Content, State).
|
||||
|
||||
|
@ -241,6 +245,17 @@ connected(enter, _PrevSt, State) ->
|
|||
connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
|
||||
handle_incoming(Packet, fun keep_state/1, State);
|
||||
|
||||
connected(cast, {incoming, {error, Reason}}, State = #connection{chan_state = ChanState}) ->
|
||||
case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of
|
||||
{wait_session_expire, _, NChanState} ->
|
||||
?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
||||
{next_state, disconnected, State#connection{chan_state= NChanState}};
|
||||
{wait_session_expire, _, OutPackets, NChanState} ->
|
||||
?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
||||
NState = State#connection{chan_state= NChanState},
|
||||
{next_state, disconnected, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)}
|
||||
end;
|
||||
|
||||
connected(info, Deliver = {deliver, _Topic, _Msg}, State) ->
|
||||
handle_deliver(emqx_misc:drain_deliver([Deliver]), State);
|
||||
|
||||
|
@ -408,8 +423,7 @@ process_incoming(Data, State) ->
|
|||
process_incoming(<<>>, Packets, State) ->
|
||||
{keep_state, State, next_incoming_events(Packets)};
|
||||
|
||||
process_incoming(Data, Packets, State = #connection{parse_state = ParseState,
|
||||
chan_state = ChanState}) ->
|
||||
process_incoming(Data, Packets, State = #connection{parse_state = ParseState}) ->
|
||||
try emqx_frame:parse(Data, ParseState) of
|
||||
{more, NParseState} ->
|
||||
NState = State#connection{parse_state = NParseState},
|
||||
|
@ -418,32 +432,16 @@ process_incoming(Data, Packets, State = #connection{parse_state = ParseState,
|
|||
NState = State#connection{parse_state = NParseState},
|
||||
process_incoming(Rest, [Packet|Packets], NState);
|
||||
{error, Reason} ->
|
||||
shutdown(Reason, State)
|
||||
{keep_state, State, next_incoming_events({error, Reason})}
|
||||
catch
|
||||
error:Reason:Stk ->
|
||||
?LOG(error, "Parse failed for ~p~nStacktrace:~p~nError data:~p", [Reason, Stk, Data]),
|
||||
Result =
|
||||
case emqx_channel:info(connected, ChanState) of
|
||||
undefined ->
|
||||
emqx_channel:handle_out({connack, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState);
|
||||
true ->
|
||||
emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState);
|
||||
_ ->
|
||||
ignore
|
||||
end,
|
||||
case Result of
|
||||
{stop, Reason0, OutPackets, NChanState} ->
|
||||
Shutdown = fun(NewSt) -> stop(Reason0, NewSt) end,
|
||||
NState = State#connection{chan_state = NChanState},
|
||||
handle_outgoing(OutPackets, Shutdown, NState);
|
||||
{stop, Reason0, NChanState} ->
|
||||
stop(Reason0, State#connection{chan_state = NChanState});
|
||||
ignore ->
|
||||
keep_state(State)
|
||||
end
|
||||
?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nError data:~p", [Reason, Stk, Data]),
|
||||
{keep_state, State, next_incoming_events({error, Reason})}
|
||||
end.
|
||||
|
||||
-compile({inline, [next_incoming_events/1]}).
|
||||
next_incoming_events({error, Reason}) ->
|
||||
[next_event(cast, {incoming, {error, Reason}})];
|
||||
next_incoming_events(Packets) ->
|
||||
[next_event(cast, {incoming, Packet}) || Packet <- Packets].
|
||||
|
||||
|
@ -459,14 +457,19 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun,
|
|||
{ok, NChanState} ->
|
||||
SuccFun(State#connection{chan_state= NChanState});
|
||||
{ok, OutPackets, NChanState} ->
|
||||
handle_outgoing(OutPackets, SuccFun,
|
||||
State#connection{chan_state = NChanState});
|
||||
handle_outgoing(OutPackets, SuccFun, State#connection{chan_state = NChanState});
|
||||
{wait_session_expire, Reason, NChanState} ->
|
||||
?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
||||
{next_state, disconnected, State#connection{chan_state = NChanState}};
|
||||
{wait_session_expire, Reason, OutPackets, NChanState} ->
|
||||
?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
||||
NState = State#connection{chan_state= NChanState},
|
||||
{next_state, disconnected, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)};
|
||||
{stop, Reason, NChanState} ->
|
||||
stop(Reason, State#connection{chan_state = NChanState});
|
||||
{stop, Reason, OutPackets, NChanState} ->
|
||||
Shutdown = fun(NewSt) -> stop(Reason, NewSt) end,
|
||||
NState = State#connection{chan_state = NChanState},
|
||||
handle_outgoing(OutPackets, Shutdown, NState)
|
||||
NState = State#connection{chan_state= NChanState},
|
||||
stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState))
|
||||
end.
|
||||
|
||||
%%-------------------------------------------------------------------
|
||||
|
@ -477,10 +480,7 @@ handle_deliver(Delivers, State = #connection{chan_state = ChanState}) ->
|
|||
{ok, NChanState} ->
|
||||
keep_state(State#connection{chan_state = NChanState});
|
||||
{ok, Packets, NChanState} ->
|
||||
NState = State#connection{chan_state = NChanState},
|
||||
handle_outgoing(Packets, fun keep_state/1, NState);
|
||||
{stop, Reason, NChanState} ->
|
||||
stop(Reason, State#connection{chan_state = NChanState})
|
||||
handle_outgoing(Packets, fun keep_state/1, State#connection{chan_state = NChanState})
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -530,12 +530,14 @@ send(IoData, SuccFun, State = #connection{transport = Transport,
|
|||
%% Handle timeout
|
||||
|
||||
handle_timeout(TRef, Msg, State = #connection{chan_state = ChanState}) ->
|
||||
case emqx_channel:timeout(TRef, Msg, ChanState) of
|
||||
case emqx_channel:handle_timeout(TRef, Msg, ChanState) of
|
||||
{ok, NChanState} ->
|
||||
keep_state(State#connection{chan_state = NChanState});
|
||||
{ok, Packets, NChanState} ->
|
||||
handle_outgoing(Packets, fun keep_state/1,
|
||||
State#connection{chan_state = NChanState});
|
||||
handle_outgoing(Packets, fun keep_state/1, State#connection{chan_state = NChanState});
|
||||
{wait_session_expire, Reason, NChanState} ->
|
||||
?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
||||
{next_state, disconnected, State#connection{chan_state = NChanState}};
|
||||
{stop, Reason, NChanState} ->
|
||||
stop(Reason, State#connection{chan_state = NChanState})
|
||||
end.
|
||||
|
|
|
@ -18,11 +18,12 @@
|
|||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("types.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-logger_header("[Ctl]").
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([start_link/0, stop/0]).
|
||||
|
||||
-export([ register_command/2
|
||||
, register_command/3
|
||||
|
@ -32,6 +33,7 @@
|
|||
-export([ run_command/1
|
||||
, run_command/2
|
||||
, lookup_command/1
|
||||
, get_commands/0
|
||||
]).
|
||||
|
||||
-export([ print/1
|
||||
|
@ -40,7 +42,7 @@
|
|||
, usage/2
|
||||
]).
|
||||
|
||||
%% format/1,2 and format_usage/1,2 are exported mainly for test cases
|
||||
%% Exports mainly for test cases
|
||||
-export([ format/1
|
||||
, format/2
|
||||
, format_usage/1
|
||||
|
@ -59,38 +61,44 @@
|
|||
-record(state, {seq = 0}).
|
||||
|
||||
-type(cmd() :: atom()).
|
||||
-type(cmd_params() :: string()).
|
||||
-type(cmd_descr() :: string()).
|
||||
-type(cmd_usage() :: {cmd(), cmd_descr()}).
|
||||
-type(cmd_usage() :: {cmd_params(), cmd_descr()}).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
-define(TAB, emqx_command).
|
||||
-define(CMD_TAB, emqx_command).
|
||||
|
||||
-spec(start_link() -> startlink_ret()).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
-spec(stop() -> ok).
|
||||
stop() -> gen_server:stop(?SERVER).
|
||||
|
||||
-spec(register_command(cmd(), {module(), atom()}) -> ok).
|
||||
register_command(Cmd, MF) when is_atom(Cmd) ->
|
||||
register_command(Cmd, MF, []).
|
||||
|
||||
-spec(register_command(cmd(), {module(), atom()}, list()) -> ok).
|
||||
register_command(Cmd, MF, Opts) when is_atom(Cmd) ->
|
||||
cast({register_command, Cmd, MF, Opts}).
|
||||
call({register_command, Cmd, MF, Opts}).
|
||||
|
||||
-spec(unregister_command(cmd()) -> ok).
|
||||
unregister_command(Cmd) when is_atom(Cmd) ->
|
||||
cast({unregister_command, Cmd}).
|
||||
|
||||
cast(Msg) ->
|
||||
gen_server:cast(?SERVER, Msg).
|
||||
call(Req) -> gen_server:call(?SERVER, Req).
|
||||
|
||||
cast(Msg) -> gen_server:cast(?SERVER, Msg).
|
||||
|
||||
-spec(run_command(list(string())) -> ok | {error, term()}).
|
||||
run_command([]) ->
|
||||
run_command(help, []);
|
||||
run_command([Cmd | Args]) ->
|
||||
run_command(list_to_atom(Cmd), Args).
|
||||
|
||||
-spec(run_command(cmd(), [string()]) -> ok | {error, term()}).
|
||||
run_command(help, []) ->
|
||||
help();
|
||||
-spec(run_command(cmd(), list(string())) -> ok | {error, term()}).
|
||||
run_command(help, []) -> help();
|
||||
run_command(Cmd, Args) when is_atom(Cmd) ->
|
||||
case lookup_command(Cmd) of
|
||||
[{Mod, Fun}] ->
|
||||
|
@ -107,15 +115,19 @@ run_command(Cmd, Args) when is_atom(Cmd) ->
|
|||
|
||||
-spec(lookup_command(cmd()) -> [{module(), atom()}]).
|
||||
lookup_command(Cmd) when is_atom(Cmd) ->
|
||||
case ets:match(?TAB, {{'_', Cmd}, '$1', '_'}) of
|
||||
case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of
|
||||
[El] -> El;
|
||||
[] -> []
|
||||
end.
|
||||
|
||||
-spec(get_commands() -> list({cmd(), module(), atom()})).
|
||||
get_commands() ->
|
||||
[{Cmd, M, F} || {{_Seq, Cmd}, {M, F}, _Opts} <- ets:tab2list(?CMD_TAB)].
|
||||
|
||||
help() ->
|
||||
print("Usage: ~s~n", [?MODULE]),
|
||||
[begin print("~80..-s~n", [""]), Mod:Cmd(usage) end
|
||||
|| {_, {Mod, Cmd}, _} <- ets:tab2list(?TAB)].
|
||||
|| {_, {Mod, Cmd}, _} <- ets:tab2list(?CMD_TAB)].
|
||||
|
||||
-spec(print(io:format()) -> ok).
|
||||
print(Msg) ->
|
||||
|
@ -129,13 +141,13 @@ print(Format, Args) ->
|
|||
usage(UsageList) ->
|
||||
io:format(format_usage(UsageList)).
|
||||
|
||||
-spec(usage(cmd(), cmd_descr()) -> ok).
|
||||
usage(Cmd, Desc) ->
|
||||
io:format(format_usage(Cmd, Desc)).
|
||||
-spec(usage(cmd_params(), cmd_descr()) -> ok).
|
||||
usage(CmdParams, Desc) ->
|
||||
io:format(format_usage(CmdParams, Desc)).
|
||||
|
||||
-spec(format(io:format()) -> string()).
|
||||
format(Msg) ->
|
||||
lists:flatten(io_lib:format("~p", [Msg])).
|
||||
lists:flatten(io_lib:format("~s", [Msg])).
|
||||
|
||||
-spec(format(io:format(), [term()]) -> string()).
|
||||
format(Format, Args) ->
|
||||
|
@ -144,42 +156,41 @@ format(Format, Args) ->
|
|||
-spec(format_usage([cmd_usage()]) -> ok).
|
||||
format_usage(UsageList) ->
|
||||
lists:map(
|
||||
fun({Cmd, Desc}) ->
|
||||
format_usage(Cmd, Desc)
|
||||
fun({CmdParams, Desc}) ->
|
||||
format_usage(CmdParams, Desc)
|
||||
end, UsageList).
|
||||
|
||||
-spec(format_usage(cmd(), cmd_descr()) -> string()).
|
||||
format_usage(Cmd, Desc) ->
|
||||
CmdLines = split_cmd(Cmd),
|
||||
-spec(format_usage(cmd_params(), cmd_descr()) -> string()).
|
||||
format_usage(CmdParams, Desc) ->
|
||||
CmdLines = split_cmd(CmdParams),
|
||||
DescLines = split_cmd(Desc),
|
||||
lists:foldl(
|
||||
fun({CmdStr, DescStr}, Usage) ->
|
||||
Usage ++ format("~-48s# ~s~n", [CmdStr, DescStr])
|
||||
end, "", zip_cmd(CmdLines, DescLines)).
|
||||
lists:foldl(fun({CmdStr, DescStr}, Usage) ->
|
||||
Usage ++ format("~-48s# ~s~n", [CmdStr, DescStr])
|
||||
end, "", zip_cmd(CmdLines, DescLines)).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
ok = emqx_tables:new(?TAB, [protected, ordered_set]),
|
||||
ok = emqx_tables:new(?CMD_TAB, [protected, ordered_set]),
|
||||
{ok, #state{seq = 0}}.
|
||||
|
||||
handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq}) ->
|
||||
case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of
|
||||
[] -> ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts});
|
||||
[[OriginSeq] | _] ->
|
||||
?LOG(warning, "CMD ~s is overidden by ~p", [Cmd, MF]),
|
||||
true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts})
|
||||
end,
|
||||
{reply, ok, next_seq(State)};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({register_command, Cmd, MF, Opts}, State = #state{seq = Seq}) ->
|
||||
case ets:match(?TAB, {{'$1', Cmd}, '_', '_'}) of
|
||||
[] -> ets:insert(?TAB, {{Seq, Cmd}, MF, Opts});
|
||||
[[OriginSeq] | _] ->
|
||||
?LOG(warning, "CMD ~s is overidden by ~p", [Cmd, MF]),
|
||||
ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts})
|
||||
end,
|
||||
noreply(next_seq(State));
|
||||
|
||||
handle_cast({unregister_command, Cmd}, State) ->
|
||||
ets:match_delete(?TAB, {{'_', Cmd}, '_', '_'}),
|
||||
ets:match_delete(?CMD_TAB, {{'_', Cmd}, '_', '_'}),
|
||||
noreply(State);
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
|
@ -214,3 +225,4 @@ zip_cmd([X | Xs], [Y | Ys]) -> [{X, Y} | zip_cmd(Xs, Ys)];
|
|||
zip_cmd([X | Xs], []) -> [{X, ""} | zip_cmd(Xs, [])];
|
||||
zip_cmd([], [Y | Ys]) -> [{"", Y} | zip_cmd([], Ys)];
|
||||
zip_cmd([], []) -> [].
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@
|
|||
|
||||
-record(flapping, {
|
||||
client_id :: emqx_types:client_id(),
|
||||
peername :: emqx_types:peername(),
|
||||
peerhost :: emqx_types:peerhost(),
|
||||
started_at :: pos_integer(),
|
||||
detect_cnt :: pos_integer(),
|
||||
banned_at :: pos_integer()
|
||||
|
@ -84,7 +84,7 @@ check(ClientId, #{banned_interval := Interval}) ->
|
|||
-spec(detect(emqx_types:client()) -> boolean()).
|
||||
detect(Client) -> detect(Client, get_policy()).
|
||||
|
||||
detect(#{client_id := ClientId, peername := Peername},
|
||||
detect(#{client_id := ClientId, peerhost := PeerHost},
|
||||
Policy = #{threshold := Threshold}) ->
|
||||
try ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}) of
|
||||
Cnt when Cnt < Threshold -> false;
|
||||
|
@ -98,7 +98,7 @@ detect(#{client_id := ClientId, peername := Peername},
|
|||
error:badarg ->
|
||||
%% Create a flapping record.
|
||||
Flapping = #flapping{client_id = ClientId,
|
||||
peername = Peername,
|
||||
peerhost = PeerHost,
|
||||
started_at = emqx_time:now_ms(),
|
||||
detect_cnt = 1
|
||||
},
|
||||
|
@ -132,7 +132,7 @@ handle_call(Req, _From, State) ->
|
|||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({detected, Flapping = #flapping{client_id = ClientId,
|
||||
peername = Peername,
|
||||
peerhost = PeerHost,
|
||||
started_at = StartedAt,
|
||||
detect_cnt = DetectCnt},
|
||||
#{duration := Duration}}, State) ->
|
||||
|
@ -140,16 +140,16 @@ handle_cast({detected, Flapping = #flapping{client_id = ClientId,
|
|||
true -> %% Flapping happened:(
|
||||
%% Log first
|
||||
?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
|
||||
[ClientId, esockd_net:format(Peername), DetectCnt, Duration]),
|
||||
%% TODO: Send Alarm
|
||||
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Duration]),
|
||||
%% Banned.
|
||||
BannedFlapping = Flapping#flapping{client_id = {banned, ClientId},
|
||||
banned_at = emqx_time:now_ms()
|
||||
},
|
||||
alarm_handler:set_alarm({{flapping_detected, ClientId}, BannedFlapping}),
|
||||
ets:insert(?FLAPPING_TAB, BannedFlapping);
|
||||
false ->
|
||||
?LOG(warning, "~s(~s) disconnected ~w times in ~wms",
|
||||
[ClientId, esockd_net:format(Peername), DetectCnt, Interval]),
|
||||
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval]),
|
||||
ets:delete_object(?FLAPPING_TAB, Flapping)
|
||||
end,
|
||||
{noreply, State};
|
||||
|
@ -189,9 +189,17 @@ with_flapping_tab(Fun, Args) ->
|
|||
end.
|
||||
|
||||
expire_flapping(NowTime, #{duration := Duration, banned_interval := Interval}) ->
|
||||
ets:select_delete(?FLAPPING_TAB,
|
||||
[{#flapping{started_at = '$1', banned_at = undefined, _ = '_'},
|
||||
[{'<', '$1', NowTime-Duration}], [true]},
|
||||
{#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'},
|
||||
[{'<', '$1', NowTime-Interval}], [true]}]).
|
||||
case ets:select(?FLAPPING_TAB,
|
||||
[{#flapping{started_at = '$1', banned_at = undefined, _ = '_'},
|
||||
[{'<', '$1', NowTime-Duration}], ['$_']},
|
||||
{#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'},
|
||||
[{'<', '$1', NowTime-Interval}], ['$_']}]) of
|
||||
[] -> ok;
|
||||
Flappings ->
|
||||
lists:foreach(fun(Flapping = #flapping{client_id = {banned, ClientId}}) ->
|
||||
ets:delete_object(?FLAPPING_TAB, Flapping),
|
||||
alarm_handler:clear_alarm({flapping_detected, ClientId});
|
||||
(_) -> ok
|
||||
end, Flappings)
|
||||
end.
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
, reset/1
|
||||
]).
|
||||
|
||||
-export_type([gc_state/0]).
|
||||
-export_type([opts/0, gc_state/0]).
|
||||
|
||||
-type(opts() :: #{count => integer(),
|
||||
bytes => integer()}).
|
||||
|
|
|
@ -38,6 +38,7 @@
|
|||
%% Flags
|
||||
-export([ get_flag/2
|
||||
, get_flag/3
|
||||
, get_flags/1
|
||||
, set_flag/2
|
||||
, set_flag/3
|
||||
, set_flags/2
|
||||
|
@ -85,6 +86,7 @@ make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
|
|||
qos = QoS,
|
||||
from = From,
|
||||
flags = #{dup => false},
|
||||
headers = #{},
|
||||
topic = Topic,
|
||||
payload = Payload,
|
||||
timestamp = os:timestamp()}.
|
||||
|
@ -119,6 +121,9 @@ get_flag(Flag, Msg) ->
|
|||
get_flag(Flag, #message{flags = Flags}, Default) ->
|
||||
maps:get(Flag, Flags, Default).
|
||||
|
||||
-spec(get_flags(emqx_types:message()) -> maybe(map())).
|
||||
get_flags(#message{flags = Flags}) -> Flags.
|
||||
|
||||
-spec(set_flag(flag(), emqx_types:message()) -> emqx_types:message()).
|
||||
set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) ->
|
||||
Msg#message{flags = #{Flag => true}};
|
||||
|
@ -144,8 +149,7 @@ unset_flag(Flag, Msg = #message{flags = Flags}) ->
|
|||
set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) ->
|
||||
Msg#message{headers = Headers};
|
||||
set_headers(New, Msg = #message{headers = Old}) when is_map(New) ->
|
||||
Msg#message{headers = maps:merge(Old, New)};
|
||||
set_headers(undefined, Msg) -> Msg.
|
||||
Msg#message{headers = maps:merge(Old, New)}.
|
||||
|
||||
-spec(get_headers(emqx_types:message()) -> map()).
|
||||
get_headers(Msg) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-module(emqx_misc).
|
||||
|
||||
-include("types.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([ merge_opts/2
|
||||
, maybe_apply/2
|
||||
|
@ -70,7 +71,7 @@ pipeline([], Input, State) ->
|
|||
{ok, Input, State};
|
||||
|
||||
pipeline([Fun|More], Input, State) ->
|
||||
case apply_fun(Fun, Input, State) of
|
||||
try apply_fun(Fun, Input, State) of
|
||||
ok -> pipeline(More, Input, State);
|
||||
{ok, NState} ->
|
||||
pipeline(More, Input, NState);
|
||||
|
@ -80,6 +81,11 @@ pipeline([Fun|More], Input, State) ->
|
|||
{error, Reason, State};
|
||||
{error, Reason, NState} ->
|
||||
{error, Reason, NState}
|
||||
catch
|
||||
Error:Reason:Stacktrace ->
|
||||
?LOG(error, "pipeline ~p failed: ~p,\nstacktrace: ~0p",
|
||||
[{Fun, Input, State}, {Error, Reason}, Stacktrace]),
|
||||
{error, Reason, State}
|
||||
end.
|
||||
|
||||
-compile({inline, [apply_fun/3]}).
|
||||
|
|
|
@ -23,70 +23,78 @@
|
|||
|
||||
-logger_header("[Presence]").
|
||||
|
||||
%% APIs
|
||||
-export([ on_client_connected/4
|
||||
, on_client_disconnected/3
|
||||
]).
|
||||
|
||||
%% emqx_gen_mod callbacks
|
||||
-export([ load/1
|
||||
, unload/1
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
-export([ on_client_connected/4
|
||||
, on_client_disconnected/4
|
||||
]).
|
||||
|
||||
load(_Env) ->
|
||||
ok.
|
||||
%% emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Env]}),
|
||||
%% emqx_hooks:add('client.disconnected', {?MODULE, on_client_disconnected, [Env]}).
|
||||
-ifdef(TEST).
|
||||
-export([ reason/1 ]).
|
||||
-endif.
|
||||
|
||||
on_client_connected(#{client_id := ClientId,
|
||||
username := Username,
|
||||
peername := {IpAddr, _}
|
||||
}, ConnAck,
|
||||
#{session := Session,
|
||||
proto_name := ProtoName,
|
||||
proto_ver := ProtoVer,
|
||||
keepalive := Keepalive
|
||||
}, Env) ->
|
||||
case emqx_json:safe_encode(maps:merge(#{clientid => ClientId,
|
||||
username => Username,
|
||||
ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)),
|
||||
proto_name => ProtoName,
|
||||
proto_ver => ProtoVer,
|
||||
keepalive => Keepalive,
|
||||
connack => ConnAck,
|
||||
ts => erlang:system_time(millisecond)
|
||||
}, maps:with([clean_start, expiry_interval], Session))) of
|
||||
{ok, Payload} ->
|
||||
emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
|
||||
{error, Reason} ->
|
||||
?LOG(error, "Encoding connected event error: ~p", [Reason])
|
||||
end.
|
||||
|
||||
|
||||
|
||||
|
||||
on_client_disconnected(#{client_id := ClientId,
|
||||
username := Username}, Reason, Env) ->
|
||||
case emqx_json:safe_encode(#{clientid => ClientId,
|
||||
username => Username,
|
||||
reason => reason(Reason),
|
||||
ts => erlang:system_time(millisecond)
|
||||
}) of
|
||||
{ok, Payload} ->
|
||||
emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload));
|
||||
{error, Reason} ->
|
||||
?LOG(error, "Encoding disconnected event error: ~p", [Reason])
|
||||
end.
|
||||
load(Env) ->
|
||||
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Env]}),
|
||||
emqx_hooks:add('client.disconnected', {?MODULE, on_client_disconnected, [Env]}).
|
||||
|
||||
unload(_Env) ->
|
||||
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
|
||||
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
|
||||
|
||||
message(QoS, Topic, Payload) ->
|
||||
on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) ->
|
||||
#{peerhost := PeerHost} = ClientInfo,
|
||||
#{clean_start := CleanStart,
|
||||
proto_name := ProtoName,
|
||||
proto_ver := ProtoVer,
|
||||
keepalive := Keepalive,
|
||||
expiry_interval := ExpiryInterval} = ConnInfo,
|
||||
ClientId = clientid(ClientInfo, ConnInfo),
|
||||
Username = username(ClientInfo, ConnInfo),
|
||||
Presence = #{clientid => ClientId,
|
||||
username => Username,
|
||||
ipaddress => ntoa(PeerHost),
|
||||
proto_name => ProtoName,
|
||||
proto_ver => ProtoVer,
|
||||
keepalive => Keepalive,
|
||||
connack => ConnAck,
|
||||
clean_start => CleanStart,
|
||||
expiry_interval => ExpiryInterval,
|
||||
ts => emqx_time:now_ms()
|
||||
},
|
||||
case emqx_json:safe_encode(Presence) of
|
||||
{ok, Payload} ->
|
||||
emqx_broker:safe_publish(
|
||||
make_msg(qos(Env), topic(connected, ClientId), Payload));
|
||||
{error, _Reason} ->
|
||||
?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
|
||||
end.
|
||||
|
||||
on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
|
||||
ClientId = clientid(ClientInfo, ConnInfo),
|
||||
Username = username(ClientInfo, ConnInfo),
|
||||
Presence = #{clientid => ClientId,
|
||||
username => Username,
|
||||
reason => reason(Reason),
|
||||
ts => emqx_time:now_ms()
|
||||
},
|
||||
case emqx_json:safe_encode(Presence) of
|
||||
{ok, Payload} ->
|
||||
emqx_broker:safe_publish(
|
||||
make_msg(qos(Env), topic(disconnected, ClientId), Payload));
|
||||
{error, _Reason} ->
|
||||
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
|
||||
end.
|
||||
|
||||
clientid(#{client_id := undefined}, #{client_id := ClientId}) -> ClientId;
|
||||
clientid(#{client_id := ClientId}, _ConnInfo) -> ClientId.
|
||||
|
||||
username(#{username := undefined}, #{username := Username}) -> Username;
|
||||
username(#{username := Username}, _ConnInfo) -> Username.
|
||||
|
||||
make_msg(QoS, Topic, Payload) ->
|
||||
emqx_message:set_flag(
|
||||
sys, emqx_message:make(
|
||||
?MODULE, QoS, Topic, iolist_to_binary(Payload))).
|
||||
|
@ -99,6 +107,10 @@ topic(disconnected, ClientId) ->
|
|||
qos(Env) -> proplists:get_value(qos, Env, 0).
|
||||
|
||||
reason(Reason) when is_atom(Reason) -> Reason;
|
||||
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
|
||||
reason({Error, _}) when is_atom(Error) -> Error;
|
||||
reason(_) -> internal_error.
|
||||
|
||||
-compile({inline, [ntoa/1]}).
|
||||
ntoa(IpAddr) -> iolist_to_binary(esockd_net:ntoa(IpAddr)).
|
||||
|
||||
|
|
|
@ -22,8 +22,9 @@
|
|||
-include_lib("emqx_mqtt.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
-export([ compile/1
|
||||
, match_and_rewrite/2
|
||||
]).
|
||||
-endif.
|
||||
|
||||
%% APIs
|
||||
|
@ -47,14 +48,14 @@ load(RawRules) ->
|
|||
emqx_hooks:add('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [Rules]}),
|
||||
emqx_hooks:add('message.publish', {?MODULE, rewrite_publish, [Rules]}).
|
||||
|
||||
rewrite_subscribe(_Client, _Properties, TopicFilters, Rules) ->
|
||||
{ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
|
||||
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
|
||||
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
|
||||
|
||||
rewrite_unsubscribe(_Client, _Properties, TopicFilters, Rules) ->
|
||||
{ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
|
||||
rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
|
||||
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
|
||||
|
||||
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
|
||||
{ok, Message#message{topic = match_rule(Topic, Rules)}}.
|
||||
{ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}.
|
||||
|
||||
unload(_) ->
|
||||
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
|
||||
|
@ -65,16 +66,22 @@ unload(_) ->
|
|||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
match_rule(Topic, []) ->
|
||||
compile(Rules) ->
|
||||
lists:map(fun({rewrite, Topic, Re, Dest}) ->
|
||||
{ok, MP} = re:compile(Re),
|
||||
{rewrite, Topic, MP, Dest}
|
||||
end, Rules).
|
||||
|
||||
match_and_rewrite(Topic, []) ->
|
||||
Topic;
|
||||
|
||||
match_rule(Topic, [{rewrite, Filter, MP, Dest} | Rules]) ->
|
||||
match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules]) ->
|
||||
case emqx_topic:match(Topic, Filter) of
|
||||
true -> match_regx(Topic, MP, Dest);
|
||||
false -> match_rule(Topic, Rules)
|
||||
true -> rewrite(Topic, MP, Dest);
|
||||
false -> match_and_rewrite(Topic, Rules)
|
||||
end.
|
||||
|
||||
match_regx(Topic, MP, Dest) ->
|
||||
rewrite(Topic, MP, Dest) ->
|
||||
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
|
||||
{match, Captured} ->
|
||||
Vars = lists:zip(["\\$" ++ integer_to_list(I)
|
||||
|
@ -86,8 +93,3 @@ match_regx(Topic, MP, Dest) ->
|
|||
nomatch -> Topic
|
||||
end.
|
||||
|
||||
compile(Rules) ->
|
||||
lists:map(fun({rewrite, Topic, Re, Dest}) ->
|
||||
{ok, MP} = re:compile(Re),
|
||||
{rewrite, Topic, MP, Dest}
|
||||
end, Rules).
|
||||
|
|
|
@ -21,14 +21,14 @@
|
|||
-include_lib("emqx.hrl").
|
||||
-include_lib("emqx_mqtt.hrl").
|
||||
|
||||
%% APIs
|
||||
-export([on_client_connected/4]).
|
||||
|
||||
%% emqx_gen_mod callbacks
|
||||
-export([ load/1
|
||||
, unload/1
|
||||
]).
|
||||
|
||||
%% APIs
|
||||
-export([on_client_connected/4]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Load/Unload Hook
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -37,7 +37,7 @@ load(Topics) ->
|
|||
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
|
||||
|
||||
on_client_connected(#{client_id := ClientId,
|
||||
username := Username}, ?RC_SUCCESS, _ConnAttrs, Topics) ->
|
||||
username := Username}, ?RC_SUCCESS, _ConnInfo, Topics) ->
|
||||
Replace = fun(Topic) ->
|
||||
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
|
||||
end,
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
|
||||
-export([ get_caps/1
|
||||
, get_caps/2
|
||||
, get_caps/3
|
||||
]).
|
||||
|
||||
-export([default/0]).
|
||||
|
@ -114,10 +115,13 @@ get_caps(Zone) ->
|
|||
-spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()).
|
||||
get_caps(Zone, publish) ->
|
||||
with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1);
|
||||
|
||||
get_caps(Zone, subscribe) ->
|
||||
with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1).
|
||||
|
||||
-spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()).
|
||||
get_caps(Zone, Cap, Def) ->
|
||||
emqx_zone:get_env(Zone, Cap, Def).
|
||||
|
||||
pub_caps(Zone) ->
|
||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone)).
|
||||
|
||||
|
|
|
@ -28,8 +28,8 @@
|
|||
%% For tests
|
||||
-export([all/0]).
|
||||
|
||||
-export([ set_property/3
|
||||
, get_property/3
|
||||
-export([ set/3
|
||||
, get/3
|
||||
]).
|
||||
|
||||
-type(prop_name() :: atom()).
|
||||
|
@ -183,13 +183,13 @@ validate_value(_Type, _Val) -> false.
|
|||
-spec(all() -> map()).
|
||||
all() -> ?PROPS_TABLE.
|
||||
|
||||
set_property(Name, Value, undefined) ->
|
||||
set(Name, Value, undefined) ->
|
||||
#{Name => Value};
|
||||
set_property(Name, Value, Props) ->
|
||||
set(Name, Value, Props) ->
|
||||
Props#{Name => Value}.
|
||||
|
||||
get_property(_Name, undefined, Default) ->
|
||||
get(_Name, undefined, Default) ->
|
||||
Default;
|
||||
get_property(Name, Props, Default) ->
|
||||
get(Name, Props, Default) ->
|
||||
maps:get(Name, Props, Default).
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
, info/1
|
||||
]).
|
||||
|
||||
-export_type([oom_policy/0]).
|
||||
-export_type([opts/0, oom_policy/0]).
|
||||
|
||||
-type(opts() :: #{message_queue_len => non_neg_integer(),
|
||||
max_heap_size => non_neg_integer()
|
||||
|
|
|
@ -89,7 +89,7 @@ proto_name(#mqtt_packet_connect{proto_name = Name}) ->
|
|||
|
||||
%% @doc Protocol version of the CONNECT Packet.
|
||||
-spec(proto_ver(emqx_types:packet()|connect()) -> emqx_types:version()).
|
||||
proto_ver(?CONNACK_PACKET(ConnPkt)) ->
|
||||
proto_ver(?CONNECT_PACKET(ConnPkt)) ->
|
||||
proto_ver(ConnPkt);
|
||||
proto_ver(#mqtt_packet_connect{proto_ver = Ver}) ->
|
||||
Ver.
|
||||
|
@ -241,7 +241,7 @@ validate_topic_filters(TopicFilters) ->
|
|||
|
||||
%% @doc Publish Packet to Message.
|
||||
-spec(to_message(emqx_types:client(), emqx_ypes:packet()) -> emqx_types:message()).
|
||||
to_message(#{client_id := ClientId, username := Username, peername := Peername},
|
||||
to_message(#{client_id := ClientId, username := Username, peerhost := PeerHost},
|
||||
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||
retain = Retain,
|
||||
qos = QoS,
|
||||
|
@ -252,7 +252,7 @@ to_message(#{client_id := ClientId, username := Username, peername := Peername},
|
|||
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
|
||||
Msg#message{flags = #{dup => Dup, retain => Retain},
|
||||
headers = merge_props(#{username => Username,
|
||||
peername => Peername}, Props)}.
|
||||
peerhost => PeerHost}, Props)}.
|
||||
|
||||
-spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()).
|
||||
will_msg(#mqtt_packet_connect{will_flag = false}) ->
|
||||
|
|
|
@ -1,133 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% MQTT Protocol
|
||||
-module(emqx_protocol).
|
||||
|
||||
-include("types.hrl").
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
-export([ init/2
|
||||
, info/1
|
||||
, info/2
|
||||
, attrs/1
|
||||
]).
|
||||
|
||||
-export([ find_alias/2
|
||||
, save_alias/3
|
||||
, clear_will_msg/1
|
||||
]).
|
||||
|
||||
-export_type([protocol/0]).
|
||||
|
||||
-record(protocol, {
|
||||
%% MQTT Proto Name
|
||||
proto_name :: binary(),
|
||||
%% MQTT Proto Version
|
||||
proto_ver :: emqx_types:ver(),
|
||||
%% Clean Start Flag
|
||||
clean_start :: boolean(),
|
||||
%% MQTT Keepalive interval
|
||||
keepalive :: non_neg_integer(),
|
||||
%% ClientId in CONNECT Packet
|
||||
client_id :: emqx_types:client_id(),
|
||||
%% Username in CONNECT Packet
|
||||
username :: emqx_types:username(),
|
||||
%% MQTT Will Msg
|
||||
will_msg :: emqx_types:message(),
|
||||
%% MQTT Topic Aliases
|
||||
topic_aliases :: maybe(map()),
|
||||
%% MQTT Topic Alias Maximum
|
||||
alias_maximum :: maybe(map())
|
||||
}).
|
||||
|
||||
-opaque(protocol() :: #protocol{}).
|
||||
|
||||
-define(INFO_KEYS, record_info(fields, protocol)).
|
||||
|
||||
-define(ATTR_KEYS, [proto_name, proto_ver, clean_start, keepalive]).
|
||||
|
||||
-spec(init(#mqtt_packet_connect{}, atom()) -> protocol()).
|
||||
init(#mqtt_packet_connect{proto_name = ProtoName,
|
||||
proto_ver = ProtoVer,
|
||||
clean_start = CleanStart,
|
||||
keepalive = Keepalive,
|
||||
properties = Properties,
|
||||
client_id = ClientId,
|
||||
username = Username} = ConnPkt, Zone) ->
|
||||
WillMsg = emqx_packet:will_msg(ConnPkt),
|
||||
#protocol{proto_name = ProtoName,
|
||||
proto_ver = ProtoVer,
|
||||
clean_start = CleanStart,
|
||||
keepalive = Keepalive,
|
||||
client_id = ClientId,
|
||||
username = Username,
|
||||
will_msg = WillMsg,
|
||||
alias_maximum = #{outbound => emqx_mqtt_props:get_property('Topic-Alias-Maximum', Properties, 0),
|
||||
inbound => maps:get(max_topic_alias, emqx_mqtt_caps:get_caps(Zone), 0)}
|
||||
}.
|
||||
|
||||
-spec(info(protocol()) -> emqx_types:infos()).
|
||||
info(Proto) ->
|
||||
maps:from_list(info(?INFO_KEYS, Proto)).
|
||||
|
||||
-spec(info(atom()|list(atom()), protocol()) -> term()).
|
||||
info(Keys, Proto) when is_list(Keys) ->
|
||||
[{Key, info(Key, Proto)} || Key <- Keys];
|
||||
info(proto_name, #protocol{proto_name = ProtoName}) ->
|
||||
ProtoName;
|
||||
info(proto_ver, #protocol{proto_ver = ProtoVer}) ->
|
||||
ProtoVer;
|
||||
info(clean_start, #protocol{clean_start = CleanStart}) ->
|
||||
CleanStart;
|
||||
info(keepalive, #protocol{keepalive = Keepalive}) ->
|
||||
Keepalive;
|
||||
info(client_id, #protocol{client_id = ClientId}) ->
|
||||
ClientId;
|
||||
info(username, #protocol{username = Username}) ->
|
||||
Username;
|
||||
info(will_msg, #protocol{will_msg = WillMsg}) ->
|
||||
WillMsg;
|
||||
info(will_delay_interval, #protocol{will_msg = undefined}) ->
|
||||
0;
|
||||
info(will_delay_interval, #protocol{will_msg = WillMsg}) ->
|
||||
emqx_message:get_header('Will-Delay-Interval', WillMsg, 0);
|
||||
info(topic_aliases, #protocol{topic_aliases = Aliases}) ->
|
||||
Aliases;
|
||||
info(alias_maximum, #protocol{alias_maximum = AliasMaximum}) ->
|
||||
AliasMaximum.
|
||||
|
||||
-spec(attrs(protocol()) -> emqx_types:attrs()).
|
||||
attrs(Proto) ->
|
||||
maps:from_list(info(?ATTR_KEYS, Proto)).
|
||||
|
||||
-spec(find_alias(emqx_types:alias_id(), protocol())
|
||||
-> {ok, emqx_types:topic()} | false).
|
||||
find_alias(_AliasId, #protocol{topic_aliases = undefined}) ->
|
||||
false;
|
||||
find_alias(AliasId, #protocol{topic_aliases = Aliases}) ->
|
||||
maps:find(AliasId, Aliases).
|
||||
|
||||
-spec(save_alias(emqx_types:alias_id(), emqx_types:topic(), protocol())
|
||||
-> protocol()).
|
||||
save_alias(AliasId, Topic, Proto = #protocol{topic_aliases = undefined}) ->
|
||||
Proto#protocol{topic_aliases = #{AliasId => Topic}};
|
||||
save_alias(AliasId, Topic, Proto = #protocol{topic_aliases = Aliases}) ->
|
||||
Proto#protocol{topic_aliases = maps:put(AliasId, Topic, Aliases)}.
|
||||
|
||||
clear_will_msg(Protocol) ->
|
||||
Protocol#protocol{will_msg = undefined}.
|
||||
|
|
@ -25,6 +25,7 @@
|
|||
, text/2
|
||||
, connack_error/1
|
||||
, mqtt_frame_error/1
|
||||
, formalized/2
|
||||
]).
|
||||
|
||||
-export([compat/2]).
|
||||
|
@ -178,3 +179,7 @@ connack_error(_) -> ?RC_NOT_AUTHORIZED.
|
|||
|
||||
mqtt_frame_error(mqtt_frame_too_large) -> ?RC_PACKET_TOO_LARGE;
|
||||
mqtt_frame_error(_) -> ?RC_MALFORMED_PACKET.
|
||||
|
||||
formalized(connack, Code) when is_integer(Code) -> Code;
|
||||
formalized(connack, _Code) ->
|
||||
?RC_SERVER_UNAVAILABLE.
|
||||
|
|
|
@ -61,8 +61,6 @@
|
|||
%% Exports for unit tests
|
||||
-export([set_field/3]).
|
||||
|
||||
-export([update_expiry_interval/2]).
|
||||
|
||||
-export([ subscribe/4
|
||||
, unsubscribe/3
|
||||
]).
|
||||
|
@ -116,13 +114,10 @@
|
|||
max_awaiting_rel :: non_neg_integer(),
|
||||
%% Awaiting PUBREL Timeout
|
||||
await_rel_timeout :: timeout(),
|
||||
%% Session Expiry Interval
|
||||
expiry_interval :: timeout(),
|
||||
%% Enqueue Count
|
||||
enqueue_cnt :: non_neg_integer(),
|
||||
%% Created at
|
||||
created_at :: erlang:timestamp()
|
||||
|
||||
}).
|
||||
|
||||
-opaque(session() :: #session{}).
|
||||
|
@ -130,11 +125,12 @@
|
|||
-type(publish() :: {publish, emqx_types:packet_id(), emqx_types:message()}).
|
||||
|
||||
-define(DEFAULT_BATCH_N, 1000).
|
||||
-define(ATTR_KEYS, [expiry_interval, created_at]).
|
||||
-define(ATTR_KEYS, [max_inflight, max_mqueue, retry_interval,
|
||||
max_awaiting_rel, await_rel_timeout, created_at]).
|
||||
-define(INFO_KEYS, [subscriptions, max_subscriptions, upgrade_qos, inflight,
|
||||
max_inflight, retry_interval, mqueue_len, max_mqueue,
|
||||
mqueue_dropped, next_pkt_id, awaiting_rel, max_awaiting_rel,
|
||||
await_rel_timeout, expiry_interval, created_at]).
|
||||
await_rel_timeout, created_at]).
|
||||
-define(STATS_KEYS, [subscriptions_cnt, max_subscriptions, inflight, max_inflight,
|
||||
mqueue_len, max_mqueue, mqueue_dropped, awaiting_rel,
|
||||
max_awaiting_rel, enqueue_cnt]).
|
||||
|
@ -145,8 +141,7 @@
|
|||
|
||||
%% @doc Init a session.
|
||||
-spec(init(emqx_types:client(), Options :: map()) -> session()).
|
||||
init(#{zone := Zone}, #{max_inflight := MaxInflight,
|
||||
expiry_interval := ExpiryInterval}) ->
|
||||
init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
|
||||
#session{max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
||||
subscriptions = #{},
|
||||
upgrade_qos = get_env(Zone, upgrade_qos, false),
|
||||
|
@ -157,7 +152,6 @@ init(#{zone := Zone}, #{max_inflight := MaxInflight,
|
|||
awaiting_rel = #{},
|
||||
max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
|
||||
await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000),
|
||||
expiry_interval = ExpiryInterval,
|
||||
enqueue_cnt = 0,
|
||||
created_at = os:timestamp()
|
||||
}.
|
||||
|
@ -213,8 +207,6 @@ info(max_awaiting_rel, #session{max_awaiting_rel = MaxAwaitingRel}) ->
|
|||
MaxAwaitingRel;
|
||||
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
|
||||
Timeout;
|
||||
info(expiry_interval, #session{expiry_interval = Interval}) ->
|
||||
Interval;
|
||||
info(enqueue_cnt, #session{enqueue_cnt = Cnt}) ->
|
||||
Cnt;
|
||||
info(created_at, #session{created_at = CreatedAt}) ->
|
||||
|
@ -226,9 +218,6 @@ set_field(Name, Val, Channel) ->
|
|||
Pos = emqx_misc:index_of(Name, Fields),
|
||||
setelement(Pos+1, Channel, Val).
|
||||
|
||||
update_expiry_interval(ExpiryInterval, Session) ->
|
||||
Session#session{expiry_interval = ExpiryInterval}.
|
||||
|
||||
-spec(takeover(session()) -> ok).
|
||||
takeover(#session{subscriptions = Subs}) ->
|
||||
lists:foreach(fun({TopicFilter, _SubOpts}) ->
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
, client_id/0
|
||||
, username/0
|
||||
, password/0
|
||||
, peerhost/0
|
||||
, peername/0
|
||||
, protocol/0
|
||||
]).
|
||||
|
@ -102,9 +103,8 @@
|
|||
atom() => term()
|
||||
}).
|
||||
-type(client() :: #{zone := zone(),
|
||||
conn_mod := maybe(module()),
|
||||
peername := peername(),
|
||||
sockname := peername(),
|
||||
protocol := protocol(),
|
||||
peerhost := peerhost(),
|
||||
client_id := client_id(),
|
||||
username := username(),
|
||||
peercert := esockd_peercert:peercert(),
|
||||
|
@ -117,11 +117,12 @@
|
|||
anonymous => boolean(),
|
||||
atom() => term()
|
||||
}).
|
||||
-type(client_id() :: binary() | atom()).
|
||||
-type(client_id() :: binary()|atom()).
|
||||
-type(username() :: maybe(binary())).
|
||||
-type(password() :: maybe(binary())).
|
||||
-type(peerhost() :: inet:ip_address()).
|
||||
-type(peername() :: {inet:ip_address(), inet:port_number()}).
|
||||
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
|
||||
-type(protocol() :: mqtt | 'mqtt-sn' | coap | lwm2m | stomp | none | atom()).
|
||||
-type(auth_result() :: success
|
||||
| client_identifier_not_valid
|
||||
| bad_username_or_password
|
||||
|
|
|
@ -74,9 +74,9 @@
|
|||
info(WsPid) when is_pid(WsPid) ->
|
||||
call(WsPid, info);
|
||||
info(WsConn = #ws_connection{chan_state = ChanState}) ->
|
||||
ConnInfo = info(?INFO_KEYS, WsConn),
|
||||
ChanInfo = emqx_channel:info(ChanState),
|
||||
maps:merge(ChanInfo, #{conninfo => maps:from_list(ConnInfo)}).
|
||||
SockInfo = maps:from_list(info(?INFO_KEYS, WsConn)),
|
||||
maps:merge(ChanInfo, #{sockinfo => SockInfo}).
|
||||
|
||||
info(Keys, WsConn) when is_list(Keys) ->
|
||||
[{Key, info(Key, WsConn)} || Key <- Keys];
|
||||
|
@ -95,9 +95,9 @@ info(chan_state, #ws_connection{chan_state = ChanState}) ->
|
|||
attrs(WsPid) when is_pid(WsPid) ->
|
||||
call(WsPid, attrs);
|
||||
attrs(WsConn = #ws_connection{chan_state = ChanState}) ->
|
||||
ConnAttrs = info(?ATTR_KEYS, WsConn),
|
||||
ChanAttrs = emqx_channel:attrs(ChanState),
|
||||
maps:merge(ChanAttrs, #{conninfo => maps:from_list(ConnAttrs)}).
|
||||
SockAttrs = maps:from_list(info(?ATTR_KEYS, WsConn)),
|
||||
maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
|
||||
|
||||
-spec(stats(pid()|ws_connection()) -> emqx_types:stats()).
|
||||
stats(WsPid) when is_pid(WsPid) ->
|
||||
|
@ -181,6 +181,7 @@ websocket_init([Req, Opts]) ->
|
|||
sockname => Sockname,
|
||||
peercert => Peercert,
|
||||
ws_cookie => WsCookie,
|
||||
protocol => mqtt,
|
||||
conn_mod => ?MODULE
|
||||
}, Opts),
|
||||
Zone = proplists:get_value(zone, Opts),
|
||||
|
@ -254,10 +255,26 @@ websocket_info({cast, Msg}, State = #ws_connection{chan_state = ChanState}) ->
|
|||
stop(Reason, State#ws_connection{chan_state = NChanState})
|
||||
end;
|
||||
|
||||
websocket_info({incoming, {error, Reason}}, State = #ws_connection{fsm_state = idle}) ->
|
||||
stop({shutdown, Reason}, State);
|
||||
|
||||
websocket_info({incoming, {error, Reason}}, State = #ws_connection{fsm_state = connected, chan_state = ChanState}) ->
|
||||
case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of
|
||||
{wait_session_expire, _, NChanState} ->
|
||||
?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
||||
disconnected(State#ws_connection{chan_state= NChanState});
|
||||
{wait_session_expire, _, OutPackets, NChanState} ->
|
||||
?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
||||
disconnected(enqueue(OutPackets, State#ws_connection{chan_state = NChanState}))
|
||||
end;
|
||||
|
||||
websocket_info({incoming, {error, _Reason}}, State = #ws_connection{fsm_state = disconnected}) ->
|
||||
reply(State);
|
||||
|
||||
websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
|
||||
State = #ws_connection{fsm_state = idle}) ->
|
||||
#mqtt_packet_connect{proto_ver = ProtoVer, properties = Properties} = ConnPkt,
|
||||
MaxPacketSize = emqx_mqtt_props:get_property('Maximum-Packet-Size', Properties, undefined),
|
||||
MaxPacketSize = emqx_mqtt_props:get('Maximum-Packet-Size', Properties, undefined),
|
||||
NState = State#ws_connection{serialize = serialize_fun(ProtoVer, MaxPacketSize)},
|
||||
handle_incoming(Packet, fun connected/1, NState);
|
||||
|
||||
|
@ -276,9 +293,7 @@ websocket_info(Deliver = {deliver, _Topic, _Msg},
|
|||
{ok, NChanState} ->
|
||||
reply(State#ws_connection{chan_state = NChanState});
|
||||
{ok, Packets, NChanState} ->
|
||||
reply(enqueue(Packets, State#ws_connection{chan_state = NChanState}));
|
||||
{stop, Reason, NChanState} ->
|
||||
stop(Reason, State#ws_connection{chan_state = NChanState})
|
||||
reply(enqueue(Packets, State#ws_connection{chan_state = NChanState}))
|
||||
end;
|
||||
|
||||
websocket_info({timeout, TRef, keepalive}, State) when is_reference(TRef) ->
|
||||
|
@ -307,8 +322,7 @@ websocket_info(Info, State = #ws_connection{chan_state = ChanState}) ->
|
|||
|
||||
terminate(SockError, _Req, #ws_connection{chan_state = ChanState,
|
||||
stop_reason = Reason}) ->
|
||||
?LOG(debug, "Terminated for ~p, sockerror: ~p",
|
||||
[Reason, SockError]),
|
||||
?LOG(debug, "Terminated for ~p, sockerror: ~p", [Reason, SockError]),
|
||||
emqx_channel:terminate(Reason, ChanState).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -318,16 +332,25 @@ connected(State = #ws_connection{chan_state = ChanState}) ->
|
|||
ok = emqx_channel:handle_cast({register, attrs(State), stats(State)}, ChanState),
|
||||
reply(State#ws_connection{fsm_state = connected}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Disconnected callback
|
||||
|
||||
disconnected(State) ->
|
||||
reply(State#ws_connection{fsm_state = disconnected}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle timeout
|
||||
|
||||
handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) ->
|
||||
case emqx_channel:timeout(TRef, Msg, ChanState) of
|
||||
case emqx_channel:handle_timeout(TRef, Msg, ChanState) of
|
||||
{ok, NChanState} ->
|
||||
{ok, State#ws_connection{chan_state = NChanState}};
|
||||
{ok, Packets, NChanState} ->
|
||||
NState = State#ws_connection{chan_state = NChanState},
|
||||
reply(enqueue(Packets, NState));
|
||||
{wait_session_expire, Reason, NChanState} ->
|
||||
?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
||||
disconnected(State#ws_connection{chan_state = NChanState});
|
||||
{stop, Reason, NChanState} ->
|
||||
stop(Reason, State#ws_connection{chan_state = NChanState})
|
||||
end.
|
||||
|
@ -338,8 +361,7 @@ handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) ->
|
|||
process_incoming(<<>>, State) ->
|
||||
{ok, State};
|
||||
|
||||
process_incoming(Data, State = #ws_connection{parse_state = ParseState,
|
||||
chan_state = ChanState}) ->
|
||||
process_incoming(Data, State = #ws_connection{parse_state = ParseState}) ->
|
||||
try emqx_frame:parse(Data, ParseState) of
|
||||
{more, NParseState} ->
|
||||
{ok, State#ws_connection{parse_state = NParseState}};
|
||||
|
@ -347,29 +369,13 @@ process_incoming(Data, State = #ws_connection{parse_state = ParseState,
|
|||
self() ! {incoming, Packet},
|
||||
process_incoming(Rest, State#ws_connection{parse_state = NParseState});
|
||||
{error, Reason} ->
|
||||
?LOG(error, "Frame error: ~p", [Reason]),
|
||||
stop(Reason, State)
|
||||
self() ! {incoming, {error, Reason}},
|
||||
{ok, State}
|
||||
catch
|
||||
error:Reason:Stk ->
|
||||
?LOG(error, "Parse failed for ~p~nStacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]),
|
||||
Result =
|
||||
case emqx_channel:info(connected, ChanState) of
|
||||
undefined ->
|
||||
emqx_channel:handle_out({connack, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState);
|
||||
true ->
|
||||
emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState);
|
||||
_ ->
|
||||
ignore
|
||||
end,
|
||||
case Result of
|
||||
{stop, Reason0, OutPackets, NChanState} ->
|
||||
NState = State#ws_connection{chan_state = NChanState},
|
||||
stop(Reason0, enqueue(OutPackets, NState));
|
||||
{stop, Reason0, NChanState} ->
|
||||
stop(Reason0, State#ws_connection{chan_state = NChanState});
|
||||
ignore ->
|
||||
{ok, State}
|
||||
end
|
||||
?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data: ~p", [Reason, Stk, Data]),
|
||||
self() ! {incoming, {error, Reason}},
|
||||
{ok, State}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -386,11 +392,17 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun,
|
|||
{ok, OutPackets, NChanState} ->
|
||||
NState = State#ws_connection{chan_state= NChanState},
|
||||
SuccFun(enqueue(OutPackets, NState));
|
||||
{wait_session_expire, Reason, NChanState} ->
|
||||
?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
||||
disconnected(State#ws_connection{chan_state = NChanState});
|
||||
{wait_session_expire, Reason, OutPackets, NChanState} ->
|
||||
?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
||||
disconnected(enqueue(OutPackets, State#ws_connection{chan_state = NChanState}));
|
||||
{stop, Reason, NChanState} ->
|
||||
stop(Reason, State#ws_connection{chan_state= NChanState});
|
||||
{stop, Reason, OutPacket, NChanState} ->
|
||||
stop(Reason, State#ws_connection{chan_state = NChanState});
|
||||
{stop, Reason, OutPackets, NChanState} ->
|
||||
NState = State#ws_connection{chan_state= NChanState},
|
||||
stop(Reason, enqueue(OutPacket, NState))
|
||||
stop(Reason, enqueue(OutPackets, NState))
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -28,18 +28,26 @@
|
|||
-export([start_link/0, stop/0]).
|
||||
|
||||
-export([ use_username_as_clientid/1
|
||||
, enable_stats/1
|
||||
, enable_acl/1
|
||||
, enable_banned/1
|
||||
, enable_ban/1
|
||||
, enable_flapping_detect/1
|
||||
, ignore_loop_deliver/1
|
||||
, server_keepalive/1
|
||||
, max_inflight/1
|
||||
, session_expiry_interval/1
|
||||
, force_gc_policy/1
|
||||
, force_shutdown_policy/1
|
||||
]).
|
||||
|
||||
-export([ get_env/2
|
||||
, get_env/3
|
||||
, set_env/3
|
||||
, unset_env/2
|
||||
, force_reload/0
|
||||
]).
|
||||
|
||||
-export([force_reload/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
|
@ -72,18 +80,46 @@ start_link() ->
|
|||
use_username_as_clientid(Zone) ->
|
||||
get_env(Zone, use_username_as_clientid, false).
|
||||
|
||||
-spec(enable_stats(zone()) -> boolean()).
|
||||
enable_stats(Zone) ->
|
||||
get_env(Zone, enable_stats, true).
|
||||
|
||||
-spec(enable_acl(zone()) -> boolean()).
|
||||
enable_acl(Zone) ->
|
||||
get_env(Zone, enable_acl, true).
|
||||
|
||||
-spec(enable_banned(zone()) -> boolean()).
|
||||
enable_banned(Zone) ->
|
||||
get_env(Zone, enable_banned, false).
|
||||
-spec(enable_ban(zone()) -> boolean()).
|
||||
enable_ban(Zone) ->
|
||||
get_env(Zone, enable_ban, false).
|
||||
|
||||
-spec(enable_flapping_detect(zone()) -> boolean()).
|
||||
enable_flapping_detect(Zone) ->
|
||||
get_env(Zone, enable_flapping_detect, false).
|
||||
|
||||
-spec(ignore_loop_deliver(zone()) -> boolean()).
|
||||
ignore_loop_deliver(Zone) ->
|
||||
get_env(Zone, ignore_loop_deliver, false).
|
||||
|
||||
-spec(server_keepalive(zone()) -> pos_integer()).
|
||||
server_keepalive(Zone) ->
|
||||
get_env(Zone, server_keepalive).
|
||||
|
||||
-spec(max_inflight(zone()) -> 0..65535).
|
||||
max_inflight(Zone) ->
|
||||
get_env(Zone, max_inflight, 65535).
|
||||
|
||||
-spec(session_expiry_interval(zone()) -> non_neg_integer()).
|
||||
session_expiry_interval(Zone) ->
|
||||
get_env(Zone, session_expiry_interval, 0).
|
||||
|
||||
-spec(force_gc_policy(zone()) -> maybe(emqx_gc:opts())).
|
||||
force_gc_policy(Zone) ->
|
||||
get_env(Zone, force_gc_policy).
|
||||
|
||||
-spec(force_shutdown_policy(zone()) -> maybe(emqx_oom:opts())).
|
||||
force_shutdown_policy(Zone) ->
|
||||
get_env(Zone, force_shutdown_policy).
|
||||
|
||||
-spec(get_env(maybe(zone()), atom()) -> maybe(term())).
|
||||
get_env(undefined, Key) -> emqx:get_env(Key);
|
||||
get_env(Zone, Key) ->
|
||||
|
|
|
@ -344,35 +344,35 @@ t_compile_rule(_) ->
|
|||
{deny, all} = compile({deny, all}).
|
||||
|
||||
t_match_rule(_) ->
|
||||
Client1 = #{zone => external,
|
||||
client_id => <<"testClient">>,
|
||||
username => <<"TestUser">>,
|
||||
peername => {{127,0,0,1}, 2948}
|
||||
},
|
||||
Client2 = #{zone => external,
|
||||
client_id => <<"testClient">>,
|
||||
username => <<"TestUser">>,
|
||||
peername => {{192,168,0,10}, 3028}
|
||||
},
|
||||
{matched, allow} = match(Client1, <<"Test/Topic">>, {allow, all}),
|
||||
{matched, deny} = match(Client1, <<"Test/Topic">>, {deny, all}),
|
||||
{matched, allow} = match(Client1, <<"Test/Topic">>,
|
||||
ClientInfo1 = #{zone => external,
|
||||
client_id => <<"testClient">>,
|
||||
username => <<"TestUser">>,
|
||||
peerhost => {127,0,0,1}
|
||||
},
|
||||
ClientInfo2 = #{zone => external,
|
||||
client_id => <<"testClient">>,
|
||||
username => <<"TestUser">>,
|
||||
peerhost => {192,168,0,10}
|
||||
},
|
||||
{matched, allow} = match(ClientInfo1, <<"Test/Topic">>, {allow, all}),
|
||||
{matched, deny} = match(ClientInfo1, <<"Test/Topic">>, {deny, all}),
|
||||
{matched, allow} = match(ClientInfo1, <<"Test/Topic">>,
|
||||
compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})),
|
||||
{matched, allow} = match(Client2, <<"Test/Topic">>,
|
||||
{matched, allow} = match(ClientInfo2, <<"Test/Topic">>,
|
||||
compile({allow, {ipaddr, "192.168.0.1/24"}, subscribe, ["$SYS/#", "#"]})),
|
||||
{matched, allow} = match(Client1, <<"d/e/f/x">>,
|
||||
{matched, allow} = match(ClientInfo1, <<"d/e/f/x">>,
|
||||
compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]})),
|
||||
nomatch = match(Client1, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})),
|
||||
{matched, allow} = match(Client1, <<"testTopics/testClient">>,
|
||||
nomatch = match(ClientInfo1, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})),
|
||||
{matched, allow} = match(ClientInfo1, <<"testTopics/testClient">>,
|
||||
compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]})),
|
||||
{matched, allow} = match(Client1, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/%c"]})),
|
||||
{matched, allow} = match(ClientInfo1, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/%c"]})),
|
||||
{matched, allow} = match(#{username => <<"user2">>}, <<"users/user2/abc/def">>,
|
||||
compile({allow, all, subscribe, ["users/%u/#"]})),
|
||||
{matched, deny} = match(Client1, <<"d/e/f">>, compile({deny, all, subscribe, ["$SYS/#", "#"]})),
|
||||
{matched, deny} = match(ClientInfo1, <<"d/e/f">>, compile({deny, all, subscribe, ["$SYS/#", "#"]})),
|
||||
Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}),
|
||||
nomatch = match(Client1, <<"Topic">>, Rule),
|
||||
nomatch = match(ClientInfo1, <<"Topic">>, Rule),
|
||||
AndRule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}),
|
||||
{matched, allow} = match(Client1, <<"Topic">>, AndRule),
|
||||
{matched, allow} = match(ClientInfo1, <<"Topic">>, AndRule),
|
||||
OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}),
|
||||
{matched, allow} = match(Client1, <<"Topic">>, OrRule).
|
||||
{matched, allow} = match(ClientInfo1, <<"Topic">>, OrRule).
|
||||
|
||||
|
|
|
@ -27,6 +27,8 @@ all() -> emqx_ct:all(?MODULE).
|
|||
init_per_suite(Config) ->
|
||||
application:load(emqx),
|
||||
ok = ekka:start(),
|
||||
%% for coverage
|
||||
ok = emqx_banned:mnesia(copy),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
@ -51,32 +53,43 @@ t_check(_) ->
|
|||
ok = emqx_banned:add(#banned{who = {username, <<"BannedUser">>}}),
|
||||
ok = emqx_banned:add(#banned{who = {ipaddr, {192,168,0,1}}}),
|
||||
?assertEqual(3, emqx_banned:info(size)),
|
||||
Client1 = #{client_id => <<"BannedClient">>,
|
||||
username => <<"user">>,
|
||||
peername => {{127,0,0,1}, 5000}
|
||||
},
|
||||
Client2 = #{client_id => <<"client">>,
|
||||
username => <<"BannedUser">>,
|
||||
peername => {{127,0,0,1}, 5000}
|
||||
},
|
||||
Client3 = #{client_id => <<"client">>,
|
||||
username => <<"user">>,
|
||||
peername => {{192,168,0,1}, 5000}
|
||||
},
|
||||
Client4 = #{client_id => <<"client">>,
|
||||
username => <<"user">>,
|
||||
peername => {{127,0,0,1}, 5000}
|
||||
},
|
||||
?assert(emqx_banned:check(Client1)),
|
||||
?assert(emqx_banned:check(Client2)),
|
||||
?assert(emqx_banned:check(Client3)),
|
||||
?assertNot(emqx_banned:check(Client4)),
|
||||
ClientInfo1 = #{client_id => <<"BannedClient">>,
|
||||
username => <<"user">>,
|
||||
peerhost => {127,0,0,1}
|
||||
},
|
||||
ClientInfo2 = #{client_id => <<"client">>,
|
||||
username => <<"BannedUser">>,
|
||||
peerhost => {127,0,0,1}
|
||||
},
|
||||
ClientInfo3 = #{client_id => <<"client">>,
|
||||
username => <<"user">>,
|
||||
peerhost => {192,168,0,1}
|
||||
},
|
||||
ClientInfo4 = #{client_id => <<"client">>,
|
||||
username => <<"user">>,
|
||||
peerhost => {127,0,0,1}
|
||||
},
|
||||
?assert(emqx_banned:check(ClientInfo1)),
|
||||
?assert(emqx_banned:check(ClientInfo2)),
|
||||
?assert(emqx_banned:check(ClientInfo3)),
|
||||
?assertNot(emqx_banned:check(ClientInfo4)),
|
||||
ok = emqx_banned:delete({client_id, <<"BannedClient">>}),
|
||||
ok = emqx_banned:delete({username, <<"BannedUser">>}),
|
||||
ok = emqx_banned:delete({ipaddr, {192,168,0,1}}),
|
||||
?assertNot(emqx_banned:check(Client1)),
|
||||
?assertNot(emqx_banned:check(Client2)),
|
||||
?assertNot(emqx_banned:check(Client3)),
|
||||
?assertNot(emqx_banned:check(Client4)),
|
||||
?assertNot(emqx_banned:check(ClientInfo1)),
|
||||
?assertNot(emqx_banned:check(ClientInfo2)),
|
||||
?assertNot(emqx_banned:check(ClientInfo3)),
|
||||
?assertNot(emqx_banned:check(ClientInfo4)),
|
||||
?assertEqual(0, emqx_banned:info(size)).
|
||||
|
||||
t_unused(_) ->
|
||||
{ok, Banned} = emqx_banned:start_link(),
|
||||
ok = emqx_banned:add(#banned{who = {client_id, <<"BannedClient">>},
|
||||
until = erlang:system_time(second)
|
||||
}),
|
||||
?assertEqual(ignored, gen_server:call(Banned, unexpected_req)),
|
||||
?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)),
|
||||
?assertEqual(ok, Banned ! ok),
|
||||
timer:sleep(500), %% expiry timer
|
||||
ok = emqx_banned:stop().
|
||||
|
||||
|
|
|
@ -144,8 +144,8 @@ t_handle_pingreq(_) ->
|
|||
t_handle_disconnect(_) ->
|
||||
with_channel(
|
||||
fun(Channel) ->
|
||||
{stop, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel),
|
||||
?assertMatch(#{will_msg := undefined}, emqx_channel:info(protocol, Channel1))
|
||||
{wait_session_expire, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel),
|
||||
?assertEqual(undefined, emqx_channel:info(will_msg, Channel1))
|
||||
end).
|
||||
|
||||
t_handle_auth(_) ->
|
||||
|
@ -176,13 +176,20 @@ t_handle_deliver(_) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
t_handle_connack(_) ->
|
||||
ConnPkt = #mqtt_packet_connect{
|
||||
proto_name = <<"MQTT">>,
|
||||
proto_ver = ?MQTT_PROTO_V4,
|
||||
clean_start = true,
|
||||
properties = #{},
|
||||
client_id = <<"clientid">>
|
||||
},
|
||||
with_channel(
|
||||
fun(Channel) ->
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, _), _}
|
||||
= handle_out({connack, ?RC_SUCCESS, 0}, Channel),
|
||||
= handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel),
|
||||
{stop, {shutdown, not_authorized},
|
||||
?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _}
|
||||
= handle_out({connack, ?RC_NOT_AUTHORIZED}, Channel)
|
||||
= handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel)
|
||||
end).
|
||||
|
||||
t_handle_out_publish(_) ->
|
||||
|
@ -271,30 +278,33 @@ t_terminate(_) ->
|
|||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
with_channel(Fun) ->
|
||||
ConnInfo = #{peername => {{127,0,0,1}, 3456},
|
||||
sockname => {{127,0,0,1}, 1883},
|
||||
with_channel(TestFun) ->
|
||||
ConnInfo = #{peername => {{127,0,0,1}, 3456},
|
||||
sockname => {{127,0,0,1}, 1883},
|
||||
protocol => mqtt,
|
||||
conn_mod => emqx_connection,
|
||||
proto_name => <<"MQTT">>,
|
||||
proto_ver => ?MQTT_PROTO_V5,
|
||||
clean_start => true,
|
||||
keepalive => 30,
|
||||
client_id => <<"clientid">>,
|
||||
username => <<"username">>
|
||||
username => <<"username">>,
|
||||
conn_props => #{},
|
||||
receive_maximum => 100,
|
||||
expiry_interval => 60
|
||||
},
|
||||
Options = [{zone, testing}],
|
||||
Channel = emqx_channel:init(ConnInfo, Options),
|
||||
ConnPkt = #mqtt_packet_connect{
|
||||
proto_name = <<"MQTT">>,
|
||||
proto_ver = ?MQTT_PROTO_V5,
|
||||
clean_start = true,
|
||||
keepalive = 30,
|
||||
properties = #{},
|
||||
client_id = <<"clientid">>,
|
||||
username = <<"username">>,
|
||||
password = <<"passwd">>
|
||||
},
|
||||
Protocol = emqx_protocol:init(ConnPkt, testing),
|
||||
Session = emqx_session:init(#{zone => testing},
|
||||
#{max_inflight => 100,
|
||||
expiry_interval => 0
|
||||
}),
|
||||
Fun(emqx_channel:set_field(protocol, Protocol,
|
||||
emqx_channel:set_field(
|
||||
session, Session, Channel))).
|
||||
ClientInfo = #{zone => <<"external">>,
|
||||
protocol => mqtt,
|
||||
peerhost => {127,0,0,1},
|
||||
client_id => <<"clientid">>,
|
||||
username => <<"username">>,
|
||||
peercert => undefined,
|
||||
is_bridge => false,
|
||||
is_superuser => false,
|
||||
mountpoint => undefined
|
||||
},
|
||||
Channel = emqx_channel:init(ConnInfo, [{zone, testing}]),
|
||||
Session = emqx_session:init(ClientInfo, ConnInfo),
|
||||
Channel1 = emqx_channel:set_field(client, ClientInfo, Channel),
|
||||
TestFun(emqx_channel:set_field(session, Session, Channel1)).
|
||||
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_cm_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% CT callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% TODO: Add more test cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_reg_unreg_channel(_) ->
|
||||
ok = emqx_cm:register_channel(<<"clientid">>),
|
||||
?assertEqual([self()], emqx_cm:lookup_channels(<<"clientid">>)),
|
||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
||||
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
|
||||
|
||||
t_get_set_chan_attrs(_) ->
|
||||
Attrs = #{proto_ver => 4, proto_name => <<"MQTT">>},
|
||||
ok = emqx_cm:register_channel(<<"clientid">>),
|
||||
ok = emqx_cm:set_chan_attrs(<<"clientid">>, Attrs),
|
||||
?assertEqual(Attrs, emqx_cm:get_chan_attrs(<<"clientid">>)),
|
||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
||||
?assertEqual(undefined, emqx_cm:get_chan_attrs(<<"clientid">>)).
|
||||
|
||||
t_get_set_chan_stats(_) ->
|
||||
Stats = [{recv_oct, 10}, {send_oct, 8}],
|
||||
ok = emqx_cm:register_channel(<<"clientid">>),
|
||||
ok = emqx_cm:set_chan_stats(<<"clientid">>, Stats),
|
||||
?assertEqual(Stats, emqx_cm:get_chan_stats(<<"clientid">>)),
|
||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
||||
?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)).
|
||||
|
||||
t_open_session(_) ->
|
||||
ClientInfo = #{zone => external,
|
||||
client_id => <<"clientid">>,
|
||||
username => <<"username">>,
|
||||
peerhost => {127,0,0,1}},
|
||||
ConnInfo = #{peername => {{127,0,0,1}, 5000},
|
||||
receive_maximum => 100},
|
||||
{ok, #{session := Session1, present := false}}
|
||||
= emqx_cm:open_session(true, ClientInfo, ConnInfo),
|
||||
?assertEqual(100, emqx_session:info(max_inflight, Session1)),
|
||||
{ok, #{session := Session2, present := false}}
|
||||
= emqx_cm:open_session(false, ClientInfo, ConnInfo),
|
||||
?assertEqual(100, emqx_session:info(max_inflight, Session2)).
|
||||
|
||||
t_discard_session(_) ->
|
||||
ok = emqx_cm:discard_session(<<"clientid">>).
|
||||
|
||||
t_takeover_session(_) ->
|
||||
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>).
|
||||
|
||||
t_lock_clientid(_) ->
|
||||
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
||||
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
||||
{true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>),
|
||||
{true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>).
|
||||
|
|
@ -25,30 +25,106 @@
|
|||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules([]),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
ok.
|
||||
|
||||
t_command(_) ->
|
||||
emqx_ctl:start_link(),
|
||||
emqx_ctl:register_command(test, {?MODULE, test}),
|
||||
ct:sleep(50),
|
||||
?assertEqual([{emqx_ctl_SUITE,test}], emqx_ctl:lookup_command(test)),
|
||||
?assertEqual(ok, emqx_ctl:run_command(["test", "ok"])),
|
||||
?assertEqual({error, test_failed}, emqx_ctl:run_command(["test", "error"])),
|
||||
?assertEqual({error, cmd_not_found}, emqx_ctl:run_command(["test2", "ok"])),
|
||||
emqx_ctl:unregister_command(test),
|
||||
ct:sleep(50),
|
||||
?assertEqual([], emqx_ctl:lookup_command(test)).
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
test(["ok"]) ->
|
||||
ok;
|
||||
test(["error"]) ->
|
||||
error(test_failed);
|
||||
test(_) ->
|
||||
io:format("Hello world").
|
||||
t_reg_unreg_command(_) ->
|
||||
with_ctl_server(
|
||||
fun(_CtlSrv) ->
|
||||
emqx_ctl:register_command(cmd1, {?MODULE, cmd1_fun}),
|
||||
emqx_ctl:register_command(cmd2, {?MODULE, cmd2_fun}),
|
||||
?assertEqual([{?MODULE, cmd1_fun}], emqx_ctl:lookup_command(cmd1)),
|
||||
?assertEqual([{?MODULE, cmd2_fun}], emqx_ctl:lookup_command(cmd2)),
|
||||
?assertEqual([{cmd1, ?MODULE, cmd1_fun}, {cmd2, ?MODULE, cmd2_fun}],
|
||||
emqx_ctl:get_commands()),
|
||||
emqx_ctl:unregister_command(cmd1),
|
||||
emqx_ctl:unregister_command(cmd2),
|
||||
ct:sleep(100),
|
||||
?assertEqual([], emqx_ctl:lookup_command(cmd1)),
|
||||
?assertEqual([], emqx_ctl:lookup_command(cmd2)),
|
||||
?assertEqual([], emqx_ctl:get_commands())
|
||||
end).
|
||||
|
||||
t_run_commands(_) ->
|
||||
with_ctl_server(
|
||||
fun(_CtlSrv) ->
|
||||
?assertEqual({error, cmd_not_found}, emqx_ctl:run_command(["cmd", "arg"])),
|
||||
emqx_ctl:register_command(cmd1, {?MODULE, cmd1_fun}),
|
||||
emqx_ctl:register_command(cmd2, {?MODULE, cmd2_fun}),
|
||||
ok = emqx_ctl:run_command(["cmd1", "arg"]),
|
||||
{error, badarg} = emqx_ctl:run_command(["cmd1", "badarg"]),
|
||||
ok = emqx_ctl:run_command(["cmd2", "arg1", "arg2"]),
|
||||
{error, badarg} = emqx_ctl:run_command(["cmd2", "arg1", "badarg"])
|
||||
end).
|
||||
|
||||
t_print(_) ->
|
||||
ok = emqx_ctl:print("help"),
|
||||
ok = emqx_ctl:print("~s", [help]),
|
||||
% - check the output of the usage
|
||||
mock_print(),
|
||||
?assertEqual("help", emqx_ctl:print("help")),
|
||||
?assertEqual("help", emqx_ctl:print("~s", [help])),
|
||||
unmock_print().
|
||||
|
||||
t_usage(_) ->
|
||||
CmdParams1 = "emqx_cmd_1 param1 param2",
|
||||
CmdDescr1 = "emqx_cmd_1 is a test command means nothing",
|
||||
Output1 = "emqx_cmd_1 param1 param2 # emqx_cmd_1 is a test command means nothing\n",
|
||||
% - usage/1,2 should return ok
|
||||
ok = emqx_ctl:usage([{CmdParams1, CmdDescr1}, {CmdParams1, CmdDescr1}]),
|
||||
ok = emqx_ctl:usage(CmdParams1, CmdDescr1),
|
||||
|
||||
% - check the output of the usage
|
||||
mock_print(),
|
||||
?assertEqual(Output1, emqx_ctl:usage(CmdParams1, CmdDescr1)),
|
||||
?assertEqual([Output1, Output1], emqx_ctl:usage([{CmdParams1, CmdDescr1}, {CmdParams1, CmdDescr1}])),
|
||||
|
||||
% - for the commands or descriptions have multi-lines
|
||||
CmdParams2 = "emqx_cmd_2 param1 param2",
|
||||
CmdDescr2 = "emqx_cmd_2 is a test command\nmeans nothing",
|
||||
Output2 = "emqx_cmd_2 param1 param2 # emqx_cmd_2 is a test command\n"
|
||||
" ""# means nothing\n",
|
||||
?assertEqual(Output2, emqx_ctl:usage(CmdParams2, CmdDescr2)),
|
||||
?assertEqual([Output2, Output2], emqx_ctl:usage([{CmdParams2, CmdDescr2}, {CmdParams2, CmdDescr2}])),
|
||||
unmock_print().
|
||||
|
||||
t_unexpected(_) ->
|
||||
with_ctl_server(
|
||||
fun(CtlSrv) ->
|
||||
ignored = gen_server:call(CtlSrv, unexpected_call),
|
||||
ok = gen_server:cast(CtlSrv, unexpected_cast),
|
||||
CtlSrv ! unexpected_info,
|
||||
?assert(is_process_alive(CtlSrv))
|
||||
end).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Cmds for test
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
cmd1_fun(["arg"]) -> ok;
|
||||
cmd1_fun(["badarg"]) -> error(badarg).
|
||||
|
||||
cmd2_fun(["arg1", "arg2"]) -> ok;
|
||||
cmd2_fun(["arg1", "badarg"]) -> error(badarg).
|
||||
|
||||
with_ctl_server(Fun) ->
|
||||
{ok, Pid} = emqx_ctl:start_link(),
|
||||
_ = Fun(Pid),
|
||||
ok = emqx_ctl:stop().
|
||||
|
||||
mock_print() ->
|
||||
%% proxy usage/1,2 and print/1,2 to format_xx/1,2 funcs
|
||||
meck:new(emqx_ctl, [non_strict, passthrough]),
|
||||
meck:expect(emqx_ctl, print, fun(Arg) -> emqx_ctl:format(Arg) end),
|
||||
meck:expect(emqx_ctl, print, fun(Msg, Arg) -> emqx_ctl:format(Msg, Arg) end),
|
||||
meck:expect(emqx_ctl, usage, fun(Usages) -> emqx_ctl:format_usage(Usages) end),
|
||||
meck:expect(emqx_ctl, usage, fun(CmdParams, CmdDescr) -> emqx_ctl:format_usage(CmdParams, CmdDescr) end).
|
||||
|
||||
unmock_print() ->
|
||||
meck:unload(emqx_ctl).
|
||||
|
|
|
@ -1,17 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ctl_SUTIES).
|
|
@ -22,34 +22,36 @@
|
|||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
prepare_env(),
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
|
||||
Config.
|
||||
|
||||
prepare_env() ->
|
||||
set_special_configs(emqx) ->
|
||||
emqx_zone:set_env(external, enable_flapping_detect, true),
|
||||
application:set_env(emqx, flapping_detect_policy,
|
||||
#{threshold => 3,
|
||||
duration => 100,
|
||||
banned_interval => 200
|
||||
}).
|
||||
});
|
||||
set_special_configs(_App) -> ok.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
ok.
|
||||
|
||||
t_detect_check(_) ->
|
||||
{ok, _Pid} = emqx_flapping:start_link(),
|
||||
Client = #{zone => external,
|
||||
client_id => <<"clientid">>,
|
||||
peername => {{127,0,0,1}, 5000}
|
||||
},
|
||||
false = emqx_flapping:detect(Client),
|
||||
false = emqx_flapping:check(Client),
|
||||
false = emqx_flapping:detect(Client),
|
||||
false = emqx_flapping:check(Client),
|
||||
true = emqx_flapping:detect(Client),
|
||||
timer:sleep(50),
|
||||
true = emqx_flapping:check(Client),
|
||||
ClientInfo = #{zone => external,
|
||||
client_id => <<"clientid">>,
|
||||
peerhost => {127,0,0,1}
|
||||
},
|
||||
false = emqx_flapping:detect(ClientInfo),
|
||||
false = emqx_flapping:check(ClientInfo),
|
||||
false = emqx_flapping:detect(ClientInfo),
|
||||
false = emqx_flapping:check(ClientInfo),
|
||||
true = emqx_flapping:detect(ClientInfo),
|
||||
timer:sleep(100),
|
||||
true = emqx_flapping:check(ClientInfo),
|
||||
timer:sleep(300),
|
||||
false = emqx_flapping:check(Client),
|
||||
false = emqx_flapping:check(ClientInfo),
|
||||
ok = emqx_flapping:stop().
|
||||
|
||||
|
|
|
@ -84,8 +84,9 @@ t_is_empty(_) ->
|
|||
?assert(emqx_inflight:is_empty(Inflight1)).
|
||||
|
||||
t_window(_) ->
|
||||
?assertEqual([], emqx_inflight:window(emqx_inflight:new(0))),
|
||||
Inflight = emqx_inflight:insert(
|
||||
b, 2, emqx_inflight:insert(
|
||||
a, 1, emqx_inflight:new(2))),
|
||||
[a, b] = emqx_inflight:window(Inflight).
|
||||
?assertEqual([a, b], emqx_inflight:window(Inflight)).
|
||||
|
||||
|
|
|
@ -19,22 +19,10 @@
|
|||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-export([ t_make/1
|
||||
, t_flag/1
|
||||
, t_header/1
|
||||
, t_format/1
|
||||
, t_expired/1
|
||||
, t_to_packet/1
|
||||
, t_to_map/1
|
||||
]).
|
||||
|
||||
-export([ all/0
|
||||
, suite/0
|
||||
]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
suite() ->
|
||||
|
@ -55,7 +43,12 @@ t_make(_) ->
|
|||
?assertEqual(<<"topic">>, emqx_message:topic(Msg2)),
|
||||
?assertEqual(<<"payload">>, emqx_message:payload(Msg2)).
|
||||
|
||||
t_flag(_) ->
|
||||
t_get_set_flags(_) ->
|
||||
Msg = #message{id = <<"id">>, qos = ?QOS_1, flags = undefined},
|
||||
Msg1 = emqx_message:set_flags(#{retain => true}, Msg),
|
||||
?assertEqual(#{retain => true}, emqx_message:get_flags(Msg1)).
|
||||
|
||||
t_get_set_flag(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
Msg2 = emqx_message:set_flag(retain, false, Msg),
|
||||
Msg3 = emqx_message:set_flag(dup, Msg2),
|
||||
|
@ -63,32 +56,62 @@ t_flag(_) ->
|
|||
?assertNot(emqx_message:get_flag(retain, Msg3)),
|
||||
Msg4 = emqx_message:unset_flag(dup, Msg3),
|
||||
Msg5 = emqx_message:unset_flag(retain, Msg4),
|
||||
Msg5 = emqx_message:unset_flag(badflag, Msg5),
|
||||
?assertEqual(undefined, emqx_message:get_flag(dup, Msg5, undefined)),
|
||||
?assertEqual(undefined, emqx_message:get_flag(retain, Msg5, undefined)),
|
||||
Msg6 = emqx_message:set_flags(#{dup => true, retain => true}, Msg5),
|
||||
?assert(emqx_message:get_flag(dup, Msg6)),
|
||||
?assert(emqx_message:get_flag(retain, Msg6)).
|
||||
?assert(emqx_message:get_flag(retain, Msg6)),
|
||||
Msg7 = #message{id = <<"id">>, qos = ?QOS_1, flags = undefined},
|
||||
Msg8 = emqx_message:set_flag(retain, Msg7),
|
||||
Msg9 = emqx_message:set_flag(retain, true, Msg7),
|
||||
?assertEqual(#{retain => true}, emqx_message:get_flags(Msg8)),
|
||||
?assertEqual(#{retain => true}, emqx_message:get_flags(Msg9)).
|
||||
|
||||
t_header(_) ->
|
||||
t_get_set_headers(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg),
|
||||
Msg2 = emqx_message:set_header(c, 3, Msg1),
|
||||
?assertEqual(1, emqx_message:get_header(a, Msg2)),
|
||||
Msg2 = emqx_message:set_headers(#{c => 3}, Msg1),
|
||||
?assertEqual(#{a => 1, b => 2, c => 3}, emqx_message:get_headers(Msg2)).
|
||||
|
||||
t_get_set_header(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
Msg1 = emqx_message:set_header(a, 1, Msg),
|
||||
Msg2 = emqx_message:set_header(b, 2, Msg1),
|
||||
Msg3 = emqx_message:set_header(c, 3, Msg2),
|
||||
?assertEqual(1, emqx_message:get_header(a, Msg3)),
|
||||
?assertEqual(4, emqx_message:get_header(d, Msg2, 4)),
|
||||
Msg3 = emqx_message:remove_header(a, Msg2),
|
||||
?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg3)).
|
||||
Msg4 = emqx_message:remove_header(a, Msg3),
|
||||
Msg4 = emqx_message:remove_header(a, Msg4),
|
||||
?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg4)).
|
||||
|
||||
t_undefined_headers(_) ->
|
||||
Msg = #message{id = <<"id">>, qos = ?QOS_0, headers = undefined},
|
||||
Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg),
|
||||
?assertEqual(1, emqx_message:get_header(a, Msg1)),
|
||||
Msg2 = emqx_message:set_header(c, 3, Msg),
|
||||
?assertEqual(3, emqx_message:get_header(c, Msg2)).
|
||||
|
||||
t_format(_) ->
|
||||
io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]).
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
io:format("~s~n", [emqx_message:format(Msg)]),
|
||||
Msg1 = #message{id = <<"id">>,
|
||||
qos = ?QOS_0,
|
||||
flags = undefined,
|
||||
headers = undefined
|
||||
},
|
||||
io:format("~s~n", [emqx_message:format(Msg1)]).
|
||||
|
||||
t_expired(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
?assertNot(emqx_message:is_expired(Msg)),
|
||||
Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg),
|
||||
timer:sleep(500),
|
||||
?assertNot(emqx_message:is_expired(Msg1)),
|
||||
timer:sleep(600),
|
||||
?assert(emqx_message:is_expired(Msg1)),
|
||||
timer:sleep(1000),
|
||||
Msg = emqx_message:update_expiry(Msg),
|
||||
Msg2 = emqx_message:update_expiry(Msg1),
|
||||
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).
|
||||
|
||||
|
|
|
@ -1,50 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_mod_rewrite_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(rules, [{rewrite,<<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},
|
||||
{rewrite,<<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
t_rewrite_rule(_Config) ->
|
||||
{ok, _} = emqx_hooks:start_link(),
|
||||
ok = emqx_mod_rewrite:load(?rules),
|
||||
RawTopicFilters = [{<<"x/y/2">>, opts},
|
||||
{<<"x/1/2">>, opts},
|
||||
{<<"y/a/z/b">>, opts},
|
||||
{<<"y/def">>, opts}],
|
||||
SubTopicFilters = emqx_hooks:run_fold('client.subscribe', [client, properties], RawTopicFilters),
|
||||
UnSubTopicFilters = emqx_hooks:run_fold('client.unsubscribe', [client, properties], RawTopicFilters),
|
||||
Messages = [emqx_hooks:run_fold('message.publish', [], emqx_message:make(Topic, <<"payload">>))
|
||||
|| {Topic, _Opts} <- RawTopicFilters],
|
||||
ExpectedTopicFilters = [{<<"z/y/2">>, opts},
|
||||
{<<"x/1/2">>, opts},
|
||||
{<<"y/z/b">>, opts},
|
||||
{<<"y/def">>, opts}],
|
||||
?assertEqual(ExpectedTopicFilters, SubTopicFilters),
|
||||
?assertEqual(ExpectedTopicFilters, UnSubTopicFilters),
|
||||
[?assertEqual(ExpectedTopic, emqx_message:topic(Message))
|
||||
|| {{ExpectedTopic, _opts}, Message} <- lists:zip(ExpectedTopicFilters, Messages)],
|
||||
ok = emqx_mod_rewrite:unload(?rules),
|
||||
ok = emqx_hooks:stop().
|
|
@ -1,52 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_mod_subscription_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include("emqx.hrl").
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([emqx]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([emqx]).
|
||||
|
||||
t_mod_subscription(_) ->
|
||||
emqx_mod_subscription:load([{<<"connected/%c/%u">>, ?QOS_0}]),
|
||||
{ok, C} = emqtt:start_link([{host, "localhost"}, {client_id, "myclient"}, {username, "admin"}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
% ct:sleep(100),
|
||||
emqtt:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0),
|
||||
receive
|
||||
{publish, #{topic := Topic, payload := Payload}} ->
|
||||
?assertEqual(<<"connected/myclient/admin">>, Topic),
|
||||
?assertEqual(<<"Hello world">>, Payload)
|
||||
after 100 ->
|
||||
ct:fail("no_message")
|
||||
end,
|
||||
ok = emqtt:disconnect(C),
|
||||
emqx_mod_subscription:unload([]).
|
|
@ -0,0 +1,154 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_modules_SUITE).
|
||||
|
||||
%% API
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
%%-include_lib("proper/include/proper.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
|
||||
|
||||
-define(RULES, [{rewrite,<<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},
|
||||
{rewrite,<<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}
|
||||
]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
suite() ->
|
||||
[{ct_hooks,[cth_surefire]}, {timetrap, {seconds, 30}}].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([emqx]),
|
||||
%% Ensure all the modules unloaded.
|
||||
ok = emqx_modules:unload(),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([emqx]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% Test case for emqx_mod_presence
|
||||
t_mod_presence(_) ->
|
||||
ok = emqx_mod_presence:load([{qos, ?QOS_1}]),
|
||||
{ok, C1} = emqtt:start_link([{client_id, <<"monsys">>}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
{ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1),
|
||||
%% Connected Presence
|
||||
{ok, C2} = emqtt:start_link([{client_id, <<"clientid">>},
|
||||
{username, <<"username">>}]),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
ok = recv_and_check_presence(<<"clientid">>, <<"connected">>),
|
||||
%% Disconnected Presence
|
||||
ok = emqtt:disconnect(C2),
|
||||
ok = recv_and_check_presence(<<"clientid">>, <<"disconnected">>),
|
||||
ok = emqtt:disconnect(C1),
|
||||
ok = emqx_mod_presence:unload([{qos, ?QOS_1}]).
|
||||
|
||||
t_mod_presence_reason(_) ->
|
||||
?assertEqual(normal, emqx_mod_presence:reason(normal)),
|
||||
?assertEqual(discarded, emqx_mod_presence:reason({shutdown, discarded})),
|
||||
?assertEqual(tcp_error, emqx_mod_presence:reason({tcp_error, einval})),
|
||||
?assertEqual(internal_error, emqx_mod_presence:reason(<<"unknown error">>)).
|
||||
|
||||
recv_and_check_presence(ClientId, Presence) ->
|
||||
{ok, #{qos := ?QOS_1, topic := Topic, payload := Payload}} = receive_publish(100),
|
||||
?assertMatch([<<"$SYS">>, <<"brokers">>, _Node, <<"clients">>, ClientId, Presence],
|
||||
binary:split(Topic, <<"/">>, [global])),
|
||||
case Presence of
|
||||
<<"connected">> ->
|
||||
?assertMatch(#{clientid := <<"clientid">>,
|
||||
username := <<"username">>,
|
||||
ipaddress := <<"127.0.0.1">>,
|
||||
proto_name := <<"MQTT">>,
|
||||
proto_ver := ?MQTT_PROTO_V4,
|
||||
connack := ?RC_SUCCESS,
|
||||
clean_start := true}, emqx_json:decode(Payload, [{labels, atom}, return_maps]));
|
||||
<<"disconnected">> ->
|
||||
?assertMatch(#{clientid := <<"clientid">>,
|
||||
username := <<"username">>,
|
||||
reason := <<"closed">>}, emqx_json:decode(Payload, [{labels, atom}, return_maps]))
|
||||
end.
|
||||
|
||||
%% Test case for emqx_mod_subscription
|
||||
t_mod_subscription(_) ->
|
||||
emqx_mod_subscription:load([{<<"connected/%c/%u">>, ?QOS_0}]),
|
||||
{ok, C} = emqtt:start_link([{host, "localhost"},
|
||||
{client_id, "myclient"},
|
||||
{username, "admin"}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
emqtt:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0),
|
||||
{ok, #{topic := Topic, payload := Payload}} = receive_publish(100),
|
||||
?assertEqual(<<"connected/myclient/admin">>, Topic),
|
||||
?assertEqual(<<"Hello world">>, Payload),
|
||||
ok = emqtt:disconnect(C),
|
||||
emqx_mod_subscription:unload([]).
|
||||
|
||||
%% Test case for emqx_mod_write
|
||||
t_mod_rewrite(_Config) ->
|
||||
ok = emqx_mod_rewrite:load(?RULES),
|
||||
{ok, C} = emqtt:start_link([{client_id, <<"rewrite_client">>}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
OrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"y/a/z/b">>, <<"y/def">>],
|
||||
DestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"y/z/b">>, <<"y/def">>],
|
||||
%% Subscribe
|
||||
{ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- OrigTopics]),
|
||||
timer:sleep(100),
|
||||
Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>),
|
||||
?assertEqual(DestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
|
||||
%% Publish
|
||||
RecvTopics = [begin
|
||||
ok = emqtt:publish(C, Topic, <<"payload">>),
|
||||
{ok, #{topic := RecvTopic}} = receive_publish(100),
|
||||
RecvTopic
|
||||
end || Topic <- OrigTopics],
|
||||
?assertEqual(DestTopics, RecvTopics),
|
||||
%% Unsubscribe
|
||||
{ok, _, _} = emqtt:unsubscribe(C, OrigTopics),
|
||||
timer:sleep(100),
|
||||
?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)),
|
||||
ok = emqtt:disconnect(C),
|
||||
ok = emqx_mod_rewrite:unload(?RULES).
|
||||
|
||||
t_rewrite_rule(_Config) ->
|
||||
Rules = emqx_mod_rewrite:compile(?RULES),
|
||||
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, Rules)),
|
||||
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, Rules)),
|
||||
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, Rules)),
|
||||
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, Rules)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
receive_publish(Timeout) ->
|
||||
receive
|
||||
{publish, Publish} -> {ok, Publish}
|
||||
after
|
||||
Timeout -> {error, timeout}
|
||||
end.
|
||||
|
|
@ -33,11 +33,13 @@ end_per_suite(_Config) ->
|
|||
|
||||
t_message_expiry_interval_1(_) ->
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]].
|
||||
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
||||
emqtt:stop(ClientA).
|
||||
|
||||
t_message_expiry_interval_2(_) ->
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]].
|
||||
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
||||
emqtt:stop(ClientA).
|
||||
|
||||
message_expiry_interval_init() ->
|
||||
{ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
|
@ -53,7 +55,7 @@ message_expiry_interval_exipred(ClientA, QoS) ->
|
|||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a and waiting for the message expired
|
||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
ct:sleep(1000),
|
||||
ct:sleep(1500),
|
||||
|
||||
%% resume the session for client-b
|
||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
|
|
|
@ -24,45 +24,41 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(PACKETS,
|
||||
[{?CONNECT, 'CONNECT', ?CONNECT_PACKET(#mqtt_packet_connect{})},
|
||||
{?CONNACK, 'CONNACK', ?CONNACK_PACKET(?RC_SUCCESS)},
|
||||
{?PUBLISH, 'PUBLISH', ?PUBLISH_PACKET(?QOS_1)},
|
||||
{?PUBACK, 'PUBACK', ?PUBACK_PACKET(1)},
|
||||
{?PUBREC, 'PUBREC', ?PUBREC_PACKET(1)},
|
||||
{?PUBREL, 'PUBREL', ?PUBREL_PACKET(1)},
|
||||
{?PUBCOMP, 'PUBCOMP', ?PUBCOMP_PACKET(1)},
|
||||
{?SUBSCRIBE, 'SUBSCRIBE', ?SUBSCRIBE_PACKET(1, [])},
|
||||
{?SUBACK, 'SUBACK', ?SUBACK_PACKET(1, [0])},
|
||||
{?UNSUBSCRIBE, 'UNSUBSCRIBE', ?UNSUBSCRIBE_PACKET(1, [])},
|
||||
{?UNSUBACK, 'UNSUBACK', ?UNSUBACK_PACKET(1)},
|
||||
{?DISCONNECT, 'DISCONNECT', ?DISCONNECT_PACKET(?RC_SUCCESS)},
|
||||
{?AUTH, 'AUTH', ?AUTH_PACKET()}
|
||||
]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
t_type(_) ->
|
||||
?assertEqual(?CONNECT, emqx_packet:type(?CONNECT_PACKET(#mqtt_packet_connect{}))),
|
||||
?assertEqual(?CONNACK, emqx_packet:type(?CONNACK_PACKET(?RC_SUCCESS))),
|
||||
?assertEqual(?PUBLISH, emqx_packet:type(?PUBLISH_PACKET(?QOS_1))),
|
||||
?assertEqual(?PUBACK, emqx_packet:type(?PUBACK_PACKET(1))),
|
||||
?assertEqual(?PUBREC, emqx_packet:type(?PUBREC_PACKET(1))),
|
||||
?assertEqual(?PUBREL, emqx_packet:type(?PUBREL_PACKET(1))),
|
||||
?assertEqual(?PUBCOMP, emqx_packet:type(?PUBCOMP_PACKET(1))),
|
||||
?assertEqual(?SUBSCRIBE, emqx_packet:type(?SUBSCRIBE_PACKET(1, []))),
|
||||
?assertEqual(?SUBACK, emqx_packet:type(?SUBACK_PACKET(1, [0]))),
|
||||
?assertEqual(?UNSUBSCRIBE, emqx_packet:type(?UNSUBSCRIBE_PACKET(1, []))),
|
||||
?assertEqual(?UNSUBACK, emqx_packet:type(?UNSUBACK_PACKET(1))),
|
||||
?assertEqual(?DISCONNECT, emqx_packet:type(?DISCONNECT_PACKET(?RC_SUCCESS))),
|
||||
?assertEqual(?AUTH, emqx_packet:type(?AUTH_PACKET())).
|
||||
lists:foreach(fun({Type, _Name, Packet}) ->
|
||||
?assertEqual(Type, emqx_packet:type(Packet))
|
||||
end, ?PACKETS).
|
||||
|
||||
t_type_name(_) ->
|
||||
?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT_PACKET(#mqtt_packet_connect{}))),
|
||||
?assertEqual('CONNACK', emqx_packet:type_name(?CONNACK_PACKET(?RC_SUCCESS))),
|
||||
?assertEqual('PUBLISH', emqx_packet:type_name(?PUBLISH_PACKET(?QOS_1))),
|
||||
?assertEqual('PUBACK', emqx_packet:type_name(?PUBACK_PACKET(1))),
|
||||
?assertEqual('PUBREC', emqx_packet:type_name(?PUBREC_PACKET(1))),
|
||||
?assertEqual('PUBREL', emqx_packet:type_name(?PUBREL_PACKET(1))),
|
||||
?assertEqual('PUBCOMP', emqx_packet:type_name(?PUBCOMP_PACKET(1))),
|
||||
?assertEqual('SUBSCRIBE', emqx_packet:type_name(?SUBSCRIBE_PACKET(1, []))),
|
||||
?assertEqual('SUBACK', emqx_packet:type_name(?SUBACK_PACKET(1, [0]))),
|
||||
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE_PACKET(1, []))),
|
||||
?assertEqual('UNSUBACK', emqx_packet:type_name(?UNSUBACK_PACKET(1))),
|
||||
?assertEqual('DISCONNECT', emqx_packet:type_name(?DISCONNECT_PACKET(?RC_SUCCESS))),
|
||||
?assertEqual('AUTH', emqx_packet:type_name(?AUTH_PACKET())).
|
||||
lists:foreach(fun({_Type, Name, Packet}) ->
|
||||
?assertEqual(Name, emqx_packet:type_name(Packet))
|
||||
end, ?PACKETS).
|
||||
|
||||
t_dup(_) ->
|
||||
?assertEqual(false, emqx_packet:dup(?PUBLISH_PACKET(?QOS_1))).
|
||||
|
||||
t_qos(_) ->
|
||||
?assertEqual(?QOS_0, emqx_packet:qos(?PUBLISH_PACKET(?QOS_0))),
|
||||
?assertEqual(?QOS_1, emqx_packet:qos(?PUBLISH_PACKET(?QOS_1))),
|
||||
?assertEqual(?QOS_2, emqx_packet:qos(?PUBLISH_PACKET(?QOS_2))).
|
||||
lists:foreach(fun(QoS) ->
|
||||
?assertEqual(QoS, emqx_packet:qos(?PUBLISH_PACKET(QoS)))
|
||||
end, [?QOS_0, ?QOS_1, ?QOS_2]).
|
||||
|
||||
t_retain(_) ->
|
||||
?assertEqual(false, emqx_packet:retain(?PUBLISH_PACKET(?QOS_1))).
|
||||
|
@ -78,15 +74,16 @@ t_proto_name(_) ->
|
|||
t_proto_ver(_) ->
|
||||
lists:foreach(
|
||||
fun(Ver) ->
|
||||
?assertEqual(Ver, emqx_packet:proto_ver(#mqtt_packet_connect{proto_ver = Ver}))
|
||||
ConnPkt = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = Ver}),
|
||||
?assertEqual(Ver, emqx_packet:proto_ver(ConnPkt))
|
||||
end, [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, ?MQTT_PROTO_V5]).
|
||||
|
||||
t_check_publish(_) ->
|
||||
Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1},
|
||||
ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)),
|
||||
ok = emqx_packet:check(#mqtt_packet_publish{packet_id = 1, topic_name = <<"t">>}),
|
||||
{error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1,<<>>,1,#{},<<"payload">>)),
|
||||
{error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(?PUBLISH_PACKET(1, <<"+/+">>, 1, #{}, <<"payload">>)),
|
||||
{error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<>>, 1, #{}, <<"payload">>)),
|
||||
{error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"+/+">>, 1, #{}, <<"payload">>)),
|
||||
{error, ?RC_TOPIC_ALIAS_INVALID} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>)),
|
||||
%% TODO::
|
||||
%% {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)),
|
||||
|
@ -143,10 +140,10 @@ t_check_connect(_) ->
|
|||
properties = #{'Receive-Maximum' => 0}}), Opts).
|
||||
|
||||
t_from_to_message(_) ->
|
||||
ExpectedMsg = emqx_message:set_headers(
|
||||
#{peername => {{127,0,0,1}, 9527}, username => <<"test">>},
|
||||
emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>)),
|
||||
ExpectedMsg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>),
|
||||
ExpectedMsg1 = emqx_message:set_flag(retain, false, ExpectedMsg),
|
||||
ExpectedMsg2 = emqx_message:set_headers(#{peerhost => {127,0,0,1},
|
||||
username => <<"test">>}, ExpectedMsg1),
|
||||
Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||
qos = ?QOS_0,
|
||||
retain = false,
|
||||
|
@ -157,8 +154,8 @@ t_from_to_message(_) ->
|
|||
payload = <<"payload">>},
|
||||
MsgFromPkt = emqx_packet:to_message(#{client_id => <<"clientid">>,
|
||||
username => <<"test">>,
|
||||
peername => {{127,0,0,1}, 9527}}, Pkt),
|
||||
?assertEqual(ExpectedMsg1, MsgFromPkt#message{id = emqx_message:id(ExpectedMsg),
|
||||
peerhost => {127,0,0,1}}, Pkt),
|
||||
?assertEqual(ExpectedMsg2, MsgFromPkt#message{id = emqx_message:id(ExpectedMsg),
|
||||
timestamp = emqx_message:timestamp(ExpectedMsg)
|
||||
}).
|
||||
|
||||
|
|
|
@ -1,81 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_protocol_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
[{proto, init_protocol()}|Config].
|
||||
|
||||
init_protocol() ->
|
||||
emqx_protocol:init(#mqtt_packet_connect{
|
||||
proto_name = <<"MQTT">>,
|
||||
proto_ver = ?MQTT_PROTO_V5,
|
||||
is_bridge = false,
|
||||
clean_start = true,
|
||||
keepalive = 30,
|
||||
properties = #{},
|
||||
client_id = <<"clientid">>,
|
||||
username = <<"username">>,
|
||||
password = <<"passwd">>
|
||||
}, testing).
|
||||
|
||||
end_per_suite(_Config) -> ok.
|
||||
|
||||
t_init_info_1(Config) ->
|
||||
Proto = proplists:get_value(proto, Config),
|
||||
?assertEqual(#{proto_name => <<"MQTT">>,
|
||||
proto_ver => ?MQTT_PROTO_V5,
|
||||
clean_start => true,
|
||||
keepalive => 30,
|
||||
will_msg => undefined,
|
||||
client_id => <<"clientid">>,
|
||||
username => <<"username">>,
|
||||
topic_aliases => undefined,
|
||||
alias_maximum => #{outbound => 0, inbound => 0}
|
||||
}, emqx_protocol:info(Proto)).
|
||||
|
||||
t_init_info_2(Config) ->
|
||||
Proto = proplists:get_value(proto, Config),
|
||||
?assertEqual(<<"MQTT">>, emqx_protocol:info(proto_name, Proto)),
|
||||
?assertEqual(?MQTT_PROTO_V5, emqx_protocol:info(proto_ver, Proto)),
|
||||
?assertEqual(true, emqx_protocol:info(clean_start, Proto)),
|
||||
?assertEqual(30, emqx_protocol:info(keepalive, Proto)),
|
||||
?assertEqual(<<"clientid">>, emqx_protocol:info(client_id, Proto)),
|
||||
?assertEqual(<<"username">>, emqx_protocol:info(username, Proto)),
|
||||
?assertEqual(undefined, emqx_protocol:info(will_msg, Proto)),
|
||||
?assertEqual(0, emqx_protocol:info(will_delay_interval, Proto)),
|
||||
?assertEqual(undefined, emqx_protocol:info(topic_aliases, Proto)),
|
||||
?assertEqual(#{outbound => 0, inbound => 0}, emqx_protocol:info(alias_maximum, Proto)).
|
||||
|
||||
t_find_save_alias(Config) ->
|
||||
Proto = proplists:get_value(proto, Config),
|
||||
?assertEqual(undefined, emqx_protocol:info(topic_aliases, Proto)),
|
||||
?assertEqual(false, emqx_protocol:find_alias(1, Proto)),
|
||||
Proto1 = emqx_protocol:save_alias(1, <<"t1">>, Proto),
|
||||
Proto2 = emqx_protocol:save_alias(2, <<"t2">>, Proto1),
|
||||
?assertEqual(#{1 => <<"t1">>, 2 => <<"t2">>},
|
||||
emqx_protocol:info(topic_aliases, Proto2)),
|
||||
?assertEqual({ok, <<"t1">>}, emqx_protocol:find_alias(1, Proto2)),
|
||||
?assertEqual({ok, <<"t2">>}, emqx_protocol:find_alias(2, Proto2)).
|
||||
|
|
@ -35,7 +35,7 @@
|
|||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
t_proper_session(_) ->
|
||||
Opts = [{numtests, 1000}, {to_file, user}],
|
||||
Opts = [{numtests, 100}, {to_file, user}],
|
||||
ok = emqx_logger:set_log_level(emergency),
|
||||
ok = before_proper(),
|
||||
?assert(proper:quickcheck(prop_session(), Opts)),
|
||||
|
@ -72,17 +72,17 @@ apply_ops(Session, [Op | Rest]) ->
|
|||
apply_op(Session, info) ->
|
||||
Info = emqx_session:info(Session),
|
||||
?assert(is_map(Info)),
|
||||
?assertEqual(15, maps:size(Info)),
|
||||
?assert(maps:size(Info) > 0),
|
||||
Session;
|
||||
apply_op(Session, attrs) ->
|
||||
Attrs = emqx_session:attrs(Session),
|
||||
?assert(is_map(Attrs)),
|
||||
?assertEqual(2, maps:size(Attrs)),
|
||||
?assert(maps:size(Attrs) > 0),
|
||||
Session;
|
||||
apply_op(Session, stats) ->
|
||||
Stats = emqx_session:stats(Session),
|
||||
?assert(is_list(Stats)),
|
||||
?assertEqual(10, length(Stats)),
|
||||
?assert(length(Stats) > 0),
|
||||
Session;
|
||||
apply_op(Session, {info, InfoArg}) ->
|
||||
_Ret = emqx_session:info(InfoArg, Session),
|
||||
|
@ -182,7 +182,6 @@ info_args() ->
|
|||
awaiting_rel,
|
||||
max_awaiting_rel,
|
||||
await_rel_timeout,
|
||||
expiry_interval,
|
||||
created_at
|
||||
]).
|
||||
|
||||
|
@ -270,11 +269,8 @@ await_rel_timeout() -> ?LET(Interval, choose(0, 150), Interval*1000).
|
|||
|
||||
max_inflight() -> choose(0, 10).
|
||||
|
||||
expiry_interval() -> ?LET(EI, choose(1, 10), EI * 3600).
|
||||
|
||||
option() ->
|
||||
?LET(Option, [{max_inflight, max_inflight()},
|
||||
{expiry_interval, expiry_interval()}],
|
||||
?LET(Option, [{receive_maximum , max_inflight()}],
|
||||
maps:from_list(Option)).
|
||||
|
||||
session() ->
|
||||
|
|
|
@ -21,29 +21,81 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(OPTS, [{enable_acl, true},
|
||||
{enable_banned, false}
|
||||
-define(ENVS, [{use_username_as_clientid, false},
|
||||
{server_keepalive, 60},
|
||||
{upgrade_qos, false},
|
||||
{session_expiry_interval, 7200},
|
||||
{retry_interval, 20000},
|
||||
{mqueue_store_qos0, true},
|
||||
{mqueue_priorities, none},
|
||||
{mqueue_default_priority, highest},
|
||||
{max_subscriptions, 0},
|
||||
{max_mqueue_len, 1000},
|
||||
{max_inflight, 32},
|
||||
{max_awaiting_rel, 100},
|
||||
{keepalive_backoff, 0.75},
|
||||
{ignore_loop_deliver, false},
|
||||
{idle_timeout, 15000},
|
||||
{force_shutdown_policy, #{max_heap_size => 838860800,
|
||||
message_queue_len => 8000}},
|
||||
{force_gc_policy, #{bytes => 1048576, count => 1000}},
|
||||
{enable_stats, true},
|
||||
{enable_flapping_detect, false},
|
||||
{enable_ban, true},
|
||||
{enable_acl, true},
|
||||
{await_rel_timeout, 300000},
|
||||
{acl_deny_action, ignore}
|
||||
]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
t_set_get_env(_) ->
|
||||
init_per_suite(Config) ->
|
||||
_ = application:load(emqx),
|
||||
application:set_env(emqx, zones, [{external, ?OPTS}]),
|
||||
{ok, _} = emqx_zone:start_link(),
|
||||
?assert(emqx_zone:get_env(external, enable_acl)),
|
||||
?assertNot(emqx_zone:get_env(external, enable_banned)),
|
||||
application:set_env(emqx, zone_env, val),
|
||||
application:set_env(emqx, zones, [{zone, ?ENVS}]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
application:unset_env(emqx, zone_env),
|
||||
application:unset_env(emqx, zones).
|
||||
|
||||
t_zone_env_func(_) ->
|
||||
lists:foreach(fun({Env, Val}) ->
|
||||
case erlang:function_exported(emqx_zone, Env, 1) of
|
||||
true ->
|
||||
?assertEqual(Val, erlang:apply(emqx_zone, Env, [zone]));
|
||||
false -> ok
|
||||
end
|
||||
end, ?ENVS).
|
||||
|
||||
t_get_env(_) ->
|
||||
?assertEqual(val, emqx_zone:get_env(undefined, zone_env)),
|
||||
?assertEqual(val, emqx_zone:get_env(undefined, zone_env, def)),
|
||||
?assert(emqx_zone:get_env(zone, enable_acl)),
|
||||
?assert(emqx_zone:get_env(zone, enable_ban)),
|
||||
?assertEqual(defval, emqx_zone:get_env(extenal, key, defval)),
|
||||
?assertEqual(undefined, emqx_zone:get_env(external, key)),
|
||||
?assertEqual(undefined, emqx_zone:get_env(internal, key)),
|
||||
?assertEqual(def, emqx_zone:get_env(internal, key, def)),
|
||||
emqx_zone:stop().
|
||||
?assertEqual(def, emqx_zone:get_env(internal, key, def)).
|
||||
|
||||
t_get_set_env(_) ->
|
||||
ok = emqx_zone:set_env(zone, key, val),
|
||||
?assertEqual(val, emqx_zone:get_env(zone, key)),
|
||||
true = emqx_zone:unset_env(zone, key),
|
||||
?assertEqual(undefined, emqx_zone:get_env(zone, key)).
|
||||
|
||||
t_force_reload(_) ->
|
||||
{ok, _} = emqx_zone:start_link(),
|
||||
application:set_env(emqx, zones, [{zone, [{key, val}]}]),
|
||||
?assertEqual(undefined, emqx_zone:get_env(zone, key)),
|
||||
?assertEqual(undefined, emqx_zone:get_env(xzone, key)),
|
||||
application:set_env(emqx, zones, [{xzone, [{key, val}]}]),
|
||||
ok = emqx_zone:force_reload(),
|
||||
?assertEqual(val, emqx_zone:get_env(zone, key)),
|
||||
?assertEqual(val, emqx_zone:get_env(xzone, key)),
|
||||
emqx_zone:stop().
|
||||
|
||||
t_uncovered_func(_) ->
|
||||
{ok, Pid} = emqx_zone:start_link(),
|
||||
ignored = gen_server:call(Pid, unexpected_call),
|
||||
ok = gen_server:cast(Pid, unexpected_cast),
|
||||
ok = Pid ! ok,
|
||||
emqx_zone:stop().
|
||||
|
||||
|
|
Loading…
Reference in New Issue