Improve the 'channel' module and add more test cases
- Rename the 'Client' field to 'ClientInfo' - Remove the 'expiry_interval' from session record - Add more test cases for emqx_zone module - Add more test cases for emqx_banned module - Add more test cases for emqx_message module - Remove 'sockname', 'conn_mod' fields from type 'client'
This commit is contained in:
parent
81e2f47126
commit
3202ed2392
|
@ -30,7 +30,7 @@
|
||||||
-boot_mnesia({mnesia, [boot]}).
|
-boot_mnesia({mnesia, [boot]}).
|
||||||
-copy_mnesia({mnesia, [copy]}).
|
-copy_mnesia({mnesia, [copy]}).
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0, stop/0]).
|
||||||
|
|
||||||
-export([ check/1
|
-export([ check/1
|
||||||
, add/1
|
, add/1
|
||||||
|
@ -69,6 +69,10 @@ mnesia(copy) ->
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%% for tests
|
||||||
|
-spec(stop() -> ok).
|
||||||
|
stop() -> gen_server:stop(?MODULE).
|
||||||
|
|
||||||
-spec(check(emqx_types:client()) -> boolean()).
|
-spec(check(emqx_types:client()) -> boolean()).
|
||||||
check(#{client_id := ClientId,
|
check(#{client_id := ClientId,
|
||||||
username := Username,
|
username := Username,
|
||||||
|
@ -105,8 +109,7 @@ handle_cast(Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
|
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
|
||||||
mnesia:async_dirty(fun expire_banned_items/1,
|
mnesia:async_dirty(fun expire_banned_items/1, [erlang:system_time(second)]),
|
||||||
[erlang:system_time(second)]),
|
|
||||||
{noreply, ensure_expiry_timer(State), hibernate};
|
{noreply, ensure_expiry_timer(State), hibernate};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
@ -125,7 +128,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
ensure_expiry_timer(State) ->
|
ensure_expiry_timer(State) ->
|
||||||
State#{expiry_timer := emqx_misc:start_timer(timer:seconds(1), expire)}.
|
State#{expiry_timer := emqx_misc:start_timer(10, expire)}.
|
||||||
-else.
|
-else.
|
||||||
ensure_expiry_timer(State) ->
|
ensure_expiry_timer(State) ->
|
||||||
State#{expiry_timer := emqx_misc:start_timer(timer:minutes(1), expire)}.
|
State#{expiry_timer := emqx_misc:start_timer(timer:minutes(1), expire)}.
|
||||||
|
|
|
@ -24,8 +24,6 @@
|
||||||
|
|
||||||
-logger_header("[Channel]").
|
-logger_header("[Channel]").
|
||||||
|
|
||||||
-export([init/2]).
|
|
||||||
|
|
||||||
-export([ info/1
|
-export([ info/1
|
||||||
, info/2
|
, info/2
|
||||||
, attrs/1
|
, attrs/1
|
||||||
|
@ -36,12 +34,13 @@
|
||||||
%% Exports for unit tests:(
|
%% Exports for unit tests:(
|
||||||
-export([set_field/3]).
|
-export([set_field/3]).
|
||||||
|
|
||||||
-export([ handle_in/2
|
-export([ init/2
|
||||||
|
, handle_in/2
|
||||||
, handle_out/2
|
, handle_out/2
|
||||||
, handle_call/2
|
, handle_call/2
|
||||||
, handle_cast/2
|
, handle_cast/2
|
||||||
, handle_info/2
|
, handle_info/2
|
||||||
, timeout/3
|
, handle_timeout/3
|
||||||
, terminate/2
|
, terminate/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -58,19 +57,25 @@
|
||||||
-export_type([channel/0]).
|
-export_type([channel/0]).
|
||||||
|
|
||||||
-record(channel, {
|
-record(channel, {
|
||||||
%% MQTT Client
|
%% MQTT ConnInfo
|
||||||
|
conninfo :: emqx_types:conninfo(),
|
||||||
|
%% MQTT ClientInfo
|
||||||
client :: emqx_types:client(),
|
client :: emqx_types:client(),
|
||||||
%% MQTT Session
|
%% MQTT Session
|
||||||
session :: emqx_session:session(),
|
session :: emqx_session:session(),
|
||||||
%% MQTT Protocol
|
|
||||||
protocol :: emqx_protocol:protocol(),
|
|
||||||
%% Keepalive
|
%% Keepalive
|
||||||
keepalive :: emqx_keepalive:keepalive(),
|
keepalive :: emqx_keepalive:keepalive(),
|
||||||
|
%% MQTT Will Msg
|
||||||
|
will_msg :: emqx_types:message(),
|
||||||
|
%% MQTT Topic Aliases
|
||||||
|
topic_aliases :: maybe(map()),
|
||||||
|
%% MQTT Topic Alias Maximum
|
||||||
|
alias_maximum :: maybe(map()),
|
||||||
%% Timers
|
%% Timers
|
||||||
timers :: #{atom() => disabled | maybe(reference())},
|
timers :: #{atom() => disabled | maybe(reference())},
|
||||||
%% GC State
|
%% GC State
|
||||||
gc_state :: maybe(emqx_gc:gc_state()),
|
gc_state :: maybe(emqx_gc:gc_state()),
|
||||||
%% OOM Policy
|
%% OOM Policy TODO: should be removed from channel.
|
||||||
oom_policy :: maybe(emqx_oom:oom_policy()),
|
oom_policy :: maybe(emqx_oom:oom_policy()),
|
||||||
%% Connected
|
%% Connected
|
||||||
connected :: undefined | boolean(),
|
connected :: undefined | boolean(),
|
||||||
|
@ -97,53 +102,8 @@
|
||||||
will_timer => will_message
|
will_timer => will_message
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(ATTR_KEYS, [client, session, protocol, connected, connected_at, disconnected_at]).
|
-define(ATTR_KEYS, [conninfo, client, session, connected, connected_at, disconnected_at]).
|
||||||
|
-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, topic_aliases, alias_maximum, gc_state, disconnected_at]).
|
||||||
-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, gc_state, disconnected_at]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Init the channel
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()).
|
|
||||||
init(ConnInfo, Options) ->
|
|
||||||
Zone = proplists:get_value(zone, Options),
|
|
||||||
Peercert = maps:get(peercert, ConnInfo, undefined),
|
|
||||||
Username = case peer_cert_as_username(Options) of
|
|
||||||
cn -> esockd_peercert:common_name(Peercert);
|
|
||||||
dn -> esockd_peercert:subject(Peercert);
|
|
||||||
crt -> Peercert;
|
|
||||||
_ -> undefined
|
|
||||||
end,
|
|
||||||
MountPoint = emqx_zone:get_env(Zone, mountpoint),
|
|
||||||
Client = maps:merge(#{zone => Zone,
|
|
||||||
username => Username,
|
|
||||||
client_id => <<>>,
|
|
||||||
mountpoint => MountPoint,
|
|
||||||
is_bridge => false,
|
|
||||||
is_superuser => false
|
|
||||||
}, ConnInfo),
|
|
||||||
EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
|
|
||||||
StatsTimer = if
|
|
||||||
EnableStats -> undefined;
|
|
||||||
?Otherwise -> disabled
|
|
||||||
end,
|
|
||||||
GcState = maybe_apply(fun emqx_gc:init/1,
|
|
||||||
emqx_zone:get_env(Zone, force_gc_policy)),
|
|
||||||
OomPolicy = maybe_apply(fun emqx_oom:init/1,
|
|
||||||
emqx_zone:get_env(Zone, force_shutdown_policy)),
|
|
||||||
#channel{client = Client,
|
|
||||||
gc_state = GcState,
|
|
||||||
oom_policy = OomPolicy,
|
|
||||||
timers = #{stats_timer => StatsTimer},
|
|
||||||
connected = undefined,
|
|
||||||
takeover = false,
|
|
||||||
resuming = false,
|
|
||||||
pendings = []
|
|
||||||
}.
|
|
||||||
|
|
||||||
peer_cert_as_username(Options) ->
|
|
||||||
proplists:get_value(peer_cert_as_username, Options).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Info, Attrs and Caps
|
%% Info, Attrs and Caps
|
||||||
|
@ -157,14 +117,18 @@ info(Channel) ->
|
||||||
-spec(info(list(atom())|atom(), channel()) -> term()).
|
-spec(info(list(atom())|atom(), channel()) -> term()).
|
||||||
info(Keys, Channel) when is_list(Keys) ->
|
info(Keys, Channel) when is_list(Keys) ->
|
||||||
[{Key, info(Key, Channel)} || Key <- Keys];
|
[{Key, info(Key, Channel)} || Key <- Keys];
|
||||||
info(client, #channel{client = Client}) ->
|
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
||||||
Client;
|
ConnInfo;
|
||||||
|
info(client, #channel{client = ClientInfo}) ->
|
||||||
|
ClientInfo;
|
||||||
info(session, #channel{session = Session}) ->
|
info(session, #channel{session = Session}) ->
|
||||||
maybe_apply(fun emqx_session:info/1, Session);
|
maybe_apply(fun emqx_session:info/1, Session);
|
||||||
info(protocol, #channel{protocol = Protocol}) ->
|
|
||||||
maybe_apply(fun emqx_protocol:info/1, Protocol);
|
|
||||||
info(keepalive, #channel{keepalive = Keepalive}) ->
|
info(keepalive, #channel{keepalive = Keepalive}) ->
|
||||||
maybe_apply(fun emqx_keepalive:info/1, Keepalive);
|
maybe_apply(fun emqx_keepalive:info/1, Keepalive);
|
||||||
|
info(topic_aliases, #channel{topic_aliases = Aliases}) ->
|
||||||
|
Aliases;
|
||||||
|
info(alias_maximum, #channel{alias_maximum = Limits}) ->
|
||||||
|
Limits;
|
||||||
info(gc_state, #channel{gc_state = GcState}) ->
|
info(gc_state, #channel{gc_state = GcState}) ->
|
||||||
maybe_apply(fun emqx_gc:info/1, GcState);
|
maybe_apply(fun emqx_gc:info/1, GcState);
|
||||||
info(oom_policy, #channel{oom_policy = OomPolicy}) ->
|
info(oom_policy, #channel{oom_policy = OomPolicy}) ->
|
||||||
|
@ -181,8 +145,8 @@ info(disconnected_at, #channel{disconnected_at = DisconnectedAt}) ->
|
||||||
attrs(Channel) ->
|
attrs(Channel) ->
|
||||||
maps:from_list([{Key, attr(Key, Channel)} || Key <- ?ATTR_KEYS]).
|
maps:from_list([{Key, attr(Key, Channel)} || Key <- ?ATTR_KEYS]).
|
||||||
|
|
||||||
attr(protocol, #channel{protocol = Proto}) ->
|
attr(conninfo, #channel{conninfo = ConnInfo}) ->
|
||||||
maybe_apply(fun emqx_protocol:attrs/1, Proto);
|
ConnInfo;
|
||||||
attr(session, #channel{session = Session}) ->
|
attr(session, #channel{session = Session}) ->
|
||||||
maybe_apply(fun emqx_session:attrs/1, Session);
|
maybe_apply(fun emqx_session:attrs/1, Session);
|
||||||
attr(Key, Channel) -> info(Key, Channel).
|
attr(Key, Channel) -> info(Key, Channel).
|
||||||
|
@ -201,6 +165,54 @@ set_field(Name, Val, Channel) ->
|
||||||
Pos = emqx_misc:index_of(Name, Fields),
|
Pos = emqx_misc:index_of(Name, Fields),
|
||||||
setelement(Pos+1, Channel, Val).
|
setelement(Pos+1, Channel, Val).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Init the channel
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()).
|
||||||
|
init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
|
||||||
|
Zone = proplists:get_value(zone, Options),
|
||||||
|
Peercert = maps:get(peercert, ConnInfo, undefined),
|
||||||
|
Username = case peer_cert_as_username(Options) of
|
||||||
|
cn -> esockd_peercert:common_name(Peercert);
|
||||||
|
dn -> esockd_peercert:subject(Peercert);
|
||||||
|
crt -> Peercert;
|
||||||
|
_ -> undefined
|
||||||
|
end,
|
||||||
|
MountPoint = emqx_zone:get_env(Zone, mountpoint),
|
||||||
|
ClientInfo = #{zone => Zone,
|
||||||
|
peerhost => PeerHost,
|
||||||
|
peercert => Peercert,
|
||||||
|
client_id => undefined,
|
||||||
|
username => Username,
|
||||||
|
mountpoint => MountPoint,
|
||||||
|
is_bridge => false,
|
||||||
|
is_superuser => false
|
||||||
|
},
|
||||||
|
StatsTimer = case emqx_zone:enable_stats(Zone) of
|
||||||
|
true -> undefined;
|
||||||
|
false -> disabled
|
||||||
|
end,
|
||||||
|
#channel{conninfo = ConnInfo,
|
||||||
|
client = ClientInfo,
|
||||||
|
gc_state = init_gc_state(Zone),
|
||||||
|
oom_policy = init_oom_policy(Zone),
|
||||||
|
timers = #{stats_timer => StatsTimer},
|
||||||
|
connected = undefined,
|
||||||
|
takeover = false,
|
||||||
|
resuming = false,
|
||||||
|
pendings = []
|
||||||
|
}.
|
||||||
|
|
||||||
|
peer_cert_as_username(Options) ->
|
||||||
|
proplists:get_value(peer_cert_as_username, Options).
|
||||||
|
|
||||||
|
init_gc_state(Zone) ->
|
||||||
|
maybe_apply(fun emqx_gc:init/1, emqx_zone:force_gc_policy(Zone)).
|
||||||
|
|
||||||
|
init_oom_policy(Zone) ->
|
||||||
|
maybe_apply(fun emqx_oom:init/1, emqx_zone:force_shutdown_policy(Zone)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle incoming packet
|
%% Handle incoming packet
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -215,8 +227,8 @@ handle_in(?CONNECT_PACKET(_), Channel = #channel{connected = true}) ->
|
||||||
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
|
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
|
||||||
|
|
||||||
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||||
case pipeline([fun check_connpkt/2,
|
case pipeline([fun enrich_conninfo/2,
|
||||||
fun init_protocol/2,
|
fun check_connect/2,
|
||||||
fun enrich_client/2,
|
fun enrich_client/2,
|
||||||
fun set_logger_meta/2,
|
fun set_logger_meta/2,
|
||||||
fun check_banned/2,
|
fun check_banned/2,
|
||||||
|
@ -225,31 +237,25 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||||
{ok, NConnPkt, NChannel} ->
|
{ok, NConnPkt, NChannel} ->
|
||||||
process_connect(NConnPkt, NChannel);
|
process_connect(NConnPkt, NChannel);
|
||||||
{error, ReasonCode, NChannel} ->
|
{error, ReasonCode, NChannel} ->
|
||||||
handle_out({connack, ReasonCode}, NChannel)
|
handle_out({connack, ReasonCode, ConnPkt}, NChannel)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId),
|
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
||||||
Channel = #channel{protocol = Protocol}) ->
|
case emqx_packet:check(Packet) of
|
||||||
case pipeline([fun emqx_packet:check/1,
|
ok ->
|
||||||
fun process_alias/2,
|
handle_publish(Packet, Channel);
|
||||||
fun check_publish/2], Packet, Channel) of
|
{error, ReasonCode} ->
|
||||||
{ok, NPacket, NChannel} ->
|
handle_out({disconnect, ReasonCode}, Channel)
|
||||||
process_publish(NPacket, NChannel);
|
|
||||||
{error, ReasonCode, NChannel} ->
|
|
||||||
ProtoVer = emqx_protocol:info(proto_ver, Protocol),
|
|
||||||
?LOG(warning, "Cannot publish message to ~s due to ~s",
|
|
||||||
[Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]),
|
|
||||||
handle_out({disconnect, ReasonCode}, NChannel)
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
|
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
|
||||||
Channel = #channel{client = Client, session = Session}) ->
|
Channel = #channel{client = ClientInfo, session = Session}) ->
|
||||||
case emqx_session:puback(PacketId, Session) of
|
case emqx_session:puback(PacketId, Session) of
|
||||||
{ok, Msg, Publishes, NSession} ->
|
{ok, Msg, Publishes, NSession} ->
|
||||||
ok = emqx_hooks:run('message.acked', [Client, Msg]),
|
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
||||||
handle_out({publish, Publishes}, Channel#channel{session = NSession});
|
handle_out({publish, Publishes}, Channel#channel{session = NSession});
|
||||||
{ok, Msg, NSession} ->
|
{ok, Msg, NSession} ->
|
||||||
ok = emqx_hooks:run('message.acked', [Client, Msg]),
|
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, Channel#channel{session = NSession}};
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
|
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
|
||||||
|
@ -262,10 +268,10 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
|
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
|
||||||
Channel = #channel{client = Client, session = Session}) ->
|
Channel = #channel{client = ClientInfo, session = Session}) ->
|
||||||
case emqx_session:pubrec(PacketId, Session) of
|
case emqx_session:pubrec(PacketId, Session) of
|
||||||
{ok, Msg, NSession} ->
|
{ok, Msg, NSession} ->
|
||||||
ok = emqx_hooks:run('message.acked', [Client, Msg]),
|
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = Channel#channel{session = NSession},
|
||||||
handle_out({pubrel, PacketId, ?RC_SUCCESS}, NChannel);
|
handle_out({pubrel, PacketId, ?RC_SUCCESS}, NChannel);
|
||||||
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
|
@ -301,10 +307,10 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||||
Channel = #channel{client = Client}) ->
|
Channel = #channel{client = ClientInfo}) ->
|
||||||
case emqx_packet:check(Packet) of
|
case emqx_packet:check(Packet) of
|
||||||
ok -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
ok -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
||||||
[Client, Properties],
|
[ClientInfo, Properties],
|
||||||
parse_topic_filters(TopicFilters)),
|
parse_topic_filters(TopicFilters)),
|
||||||
TopicFilters2 = enrich_subid(Properties, TopicFilters1),
|
TopicFilters2 = enrich_subid(Properties, TopicFilters1),
|
||||||
{ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel),
|
{ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel),
|
||||||
|
@ -314,10 +320,10 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||||
Channel = #channel{client = Client}) ->
|
Channel = #channel{client = ClientInfo}) ->
|
||||||
case emqx_packet:check(Packet) of
|
case emqx_packet:check(Packet) of
|
||||||
ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
||||||
[Client, Properties],
|
[ClientInfo, Properties],
|
||||||
parse_topic_filters(TopicFilters)),
|
parse_topic_filters(TopicFilters)),
|
||||||
{ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
{ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
||||||
handle_out({unsuback, PacketId, ReasonCodes}, NChannel);
|
handle_out({unsuback, PacketId, ReasonCodes}, NChannel);
|
||||||
|
@ -328,18 +334,18 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||||
handle_in(?PACKET(?PINGREQ), Channel) ->
|
handle_in(?PACKET(?PINGREQ), Channel) ->
|
||||||
{ok, ?PACKET(?PINGRESP), Channel};
|
{ok, ?PACKET(?PINGRESP), Channel};
|
||||||
|
|
||||||
handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Session, protocol = Protocol}) ->
|
handle_in(?DISCONNECT_PACKET(RC, Props),
|
||||||
OldInterval = emqx_session:info(expiry_interval, Session),
|
Channel = #channel{conninfo = ConnInfo = #{expiry_interval := OldInterval}}) ->
|
||||||
Interval = get_property('Session-Expiry-Interval', Properties, OldInterval),
|
Interval = emqx_mqtt_props:get('Session-Expiry-Interval', Props, OldInterval),
|
||||||
case OldInterval =:= 0 andalso Interval =/= OldInterval of
|
case OldInterval =:= 0 andalso Interval =/= OldInterval of
|
||||||
true ->
|
true ->
|
||||||
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
|
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
|
||||||
false ->
|
false ->
|
||||||
Channel1 = case RC of
|
Channel1 = case RC of
|
||||||
?RC_SUCCESS -> Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)};
|
?RC_SUCCESS -> Channel#channel{will_msg = undefined};
|
||||||
_ -> Channel
|
_ -> Channel
|
||||||
end,
|
end,
|
||||||
Channel2 = Channel1#channel{session = emqx_session:update_expiry_interval(Interval, Session)},
|
Channel2 = Channel1#channel{conninfo = ConnInfo#{expiry_interval => Interval}},
|
||||||
case Interval of
|
case Interval of
|
||||||
?UINT_MAX ->
|
?UINT_MAX ->
|
||||||
{ok, ensure_timer(will_timer, Channel2)};
|
{ok, ensure_timer(will_timer, Channel2)};
|
||||||
|
@ -348,9 +354,7 @@ handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Sessi
|
||||||
_Other ->
|
_Other ->
|
||||||
Reason = case RC of
|
Reason = case RC of
|
||||||
?RC_SUCCESS -> normal;
|
?RC_SUCCESS -> normal;
|
||||||
_ ->
|
_ -> emqx_reason_codes:name(RC, maps:get(proto_ver, ConnInfo))
|
||||||
Ver = emqx_protocol:info(proto_ver, Protocol),
|
|
||||||
emqx_reason_codes:name(RC, Ver)
|
|
||||||
end,
|
end,
|
||||||
{stop, {shutdown, Reason}, Channel2}
|
{stop, {shutdown, Reason}, Channel2}
|
||||||
end
|
end
|
||||||
|
@ -368,28 +372,43 @@ handle_in(Packet, Channel) ->
|
||||||
%% Process Connect
|
%% Process Connect
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
process_connect(ConnPkt, Channel) ->
|
process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
|
||||||
case open_session(ConnPkt, Channel) of
|
Channel = #channel{conninfo = ConnInfo, client = ClientInfo}) ->
|
||||||
|
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
|
||||||
{ok, #{session := Session, present := false}} ->
|
{ok, #{session := Session, present := false}} ->
|
||||||
NChannel = Channel#channel{session = Session},
|
NChannel = Channel#channel{session = Session},
|
||||||
handle_out({connack, ?RC_SUCCESS, sp(false)}, NChannel);
|
handle_out({connack, ?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
|
||||||
{ok, #{session := Session, present := true, pendings := Pendings}} ->
|
{ok, #{session := Session, present := true, pendings := Pendings}} ->
|
||||||
%%TODO: improve later.
|
%%TODO: improve later.
|
||||||
NPendings = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
|
NPendings = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
|
||||||
NChannel = Channel#channel{session = Session,
|
NChannel = Channel#channel{session = Session,
|
||||||
resuming = true,
|
resuming = true,
|
||||||
pendings = NPendings},
|
pendings = NPendings},
|
||||||
handle_out({connack, ?RC_SUCCESS, sp(true)}, NChannel);
|
handle_out({connack, ?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
%% TODO: Unknown error?
|
%% TODO: Unknown error?
|
||||||
?LOG(error, "Failed to open session: ~p", [Reason]),
|
?LOG(error, "Failed to open session: ~p", [Reason]),
|
||||||
handle_out({connack, ?RC_UNSPECIFIED_ERROR}, Channel)
|
handle_out({connack, ?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Process Publish
|
%% Process Publish
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId),
|
||||||
|
Channel = #channel{conninfo = #{proto_ver := ProtoVer}}) ->
|
||||||
|
case pipeline([fun process_alias/2,
|
||||||
|
fun check_pub_acl/2,
|
||||||
|
fun check_pub_alias/2,
|
||||||
|
fun check_pub_caps/2], Packet, Channel) of
|
||||||
|
{ok, NPacket, NChannel} ->
|
||||||
|
process_publish(NPacket, NChannel);
|
||||||
|
{error, ReasonCode, NChannel} ->
|
||||||
|
?LOG(warning, "Cannot publish message to ~s due to ~s",
|
||||||
|
[Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]),
|
||||||
|
handle_out({disconnect, ReasonCode}, NChannel)
|
||||||
|
end.
|
||||||
|
|
||||||
process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), Channel) ->
|
process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), Channel) ->
|
||||||
Msg = publish_to_msg(Packet, Channel),
|
Msg = publish_to_msg(Packet, Channel),
|
||||||
process_publish(PacketId, Msg, Channel).
|
process_publish(PacketId, Msg, Channel).
|
||||||
|
@ -424,11 +443,10 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
||||||
handle_out({pubrec, PacketId, RC}, Channel)
|
handle_out({pubrec, PacketId, RC}, Channel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
publish_to_msg(Packet, #channel{client = Client = #{mountpoint := MountPoint},
|
publish_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer},
|
||||||
protocol = Protocol}) ->
|
client = ClientInfo = #{mountpoint := MountPoint}}) ->
|
||||||
Msg = emqx_packet:to_message(Client, Packet),
|
Msg = emqx_packet:to_message(ClientInfo, Packet),
|
||||||
Msg1 = emqx_message:set_flag(dup, false, Msg),
|
Msg1 = emqx_message:set_flag(dup, false, Msg),
|
||||||
ProtoVer = emqx_protocol:info(proto_ver, Protocol),
|
|
||||||
Msg2 = emqx_message:set_header(proto_ver, ProtoVer, Msg1),
|
Msg2 = emqx_message:set_header(proto_ver, ProtoVer, Msg1),
|
||||||
emqx_mountpoint:mount(MountPoint, Msg2).
|
emqx_mountpoint:mount(MountPoint, Msg2).
|
||||||
|
|
||||||
|
@ -447,13 +465,13 @@ process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
|
||||||
process_subscribe(More, [RC|Acc], NChannel).
|
process_subscribe(More, [RC|Acc], NChannel).
|
||||||
|
|
||||||
do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
||||||
#channel{client = Client = #{mountpoint := MountPoint},
|
#channel{client = ClientInfo = #{mountpoint := MountPoint},
|
||||||
session = Session}) ->
|
session = Session}) ->
|
||||||
case check_subscribe(TopicFilter, SubOpts, Channel) of
|
case check_subscribe(TopicFilter, SubOpts, Channel) of
|
||||||
ok ->
|
ok ->
|
||||||
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
||||||
SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
||||||
case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of
|
case emqx_session:subscribe(ClientInfo, TopicFilter1, SubOpts1, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{QoS, Channel#channel{session = NSession}};
|
{QoS, Channel#channel{session = NSession}};
|
||||||
{error, RC} -> {RC, Channel}
|
{error, RC} -> {RC, Channel}
|
||||||
|
@ -477,10 +495,10 @@ process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
|
||||||
process_unsubscribe(More, [RC|Acc], NChannel).
|
process_unsubscribe(More, [RC|Acc], NChannel).
|
||||||
|
|
||||||
do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
||||||
#channel{client = Client = #{mountpoint := MountPoint},
|
#channel{client = ClientInfo = #{mountpoint := MountPoint},
|
||||||
session = Session}) ->
|
session = Session}) ->
|
||||||
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
||||||
case emqx_session:unsubscribe(Client, TopicFilter1, Session) of
|
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{?RC_SUCCESS, Channel#channel{session = NSession}};
|
{?RC_SUCCESS, Channel#channel{session = NSession}};
|
||||||
{error, RC} -> {RC, Channel}
|
{error, RC} -> {RC, Channel}
|
||||||
|
@ -491,35 +509,37 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%%TODO: RunFold or Pipeline
|
%%TODO: RunFold or Pipeline
|
||||||
handle_out({connack, ?RC_SUCCESS, SP}, Channel = #channel{client = Client}) ->
|
handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
|
||||||
|
Channel = #channel{conninfo = ConnInfo, client = ClientInfo}) ->
|
||||||
AckProps = run_fold([fun enrich_caps/2,
|
AckProps = run_fold([fun enrich_caps/2,
|
||||||
fun enrich_server_keepalive/2,
|
fun enrich_server_keepalive/2,
|
||||||
fun enrich_assigned_clientid/2
|
fun enrich_assigned_clientid/2
|
||||||
], #{}, Channel),
|
], #{}, Channel),
|
||||||
Channel1 = ensure_keepalive(AckProps, ensure_connected(Channel)),
|
Channel1 = Channel#channel{will_msg = emqx_packet:will_msg(ConnPkt),
|
||||||
ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, attrs(Channel1)]),
|
alias_maximum = init_alias_maximum(ConnPkt, ClientInfo),
|
||||||
|
connected = true,
|
||||||
|
connected_at = os:timestamp()
|
||||||
|
},
|
||||||
|
Channel2 = ensure_keepalive(AckProps, Channel1),
|
||||||
|
ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]),
|
||||||
AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
|
AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
|
||||||
case maybe_resume_session(Channel1) of
|
case maybe_resume_session(Channel2) of
|
||||||
ignore -> {ok, AckPacket, Channel1};
|
ignore ->
|
||||||
|
{ok, AckPacket, Channel2};
|
||||||
{ok, Publishes, NSession} ->
|
{ok, Publishes, NSession} ->
|
||||||
Channel2 = Channel1#channel{session = NSession,
|
Channel3 = Channel2#channel{session = NSession,
|
||||||
resuming = false,
|
resuming = false,
|
||||||
pendings = []},
|
pendings = []},
|
||||||
{ok, Packets, _} = handle_out({publish, Publishes}, Channel2),
|
{ok, Packets, _} = handle_out({publish, Publishes}, Channel3),
|
||||||
{ok, [AckPacket|Packets], Channel2}
|
{ok, [AckPacket|Packets], Channel2}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_out({connack, ReasonCode}, Channel = #channel{client = Client,
|
handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
|
||||||
protocol = Protocol
|
client = ClientInfo}) ->
|
||||||
}) ->
|
ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
|
||||||
ok = emqx_hooks:run('client.connected', [Client, ReasonCode, attrs(Channel)]),
|
ReasonCode1 = case ProtoVer = maps:get(proto_ver, ConnInfo) of
|
||||||
ProtoVer = case Protocol of
|
?MQTT_PROTO_V5 -> ReasonCode;
|
||||||
undefined -> ?MQTT_PROTO_V5;
|
_Ver -> emqx_reason_codes:compat(connack, ReasonCode)
|
||||||
_ -> emqx_protocol:info(proto_ver, Protocol)
|
|
||||||
end,
|
|
||||||
ReasonCode1 = if
|
|
||||||
ProtoVer == ?MQTT_PROTO_V5 -> ReasonCode;
|
|
||||||
true -> emqx_reason_codes:compat(connack, ReasonCode)
|
|
||||||
end,
|
end,
|
||||||
Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
|
Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
|
||||||
{stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel};
|
{stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel};
|
||||||
|
@ -563,9 +583,9 @@ handle_out({publish, _PacketId, #message{from = ClientId,
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_out({publish, PacketId, Msg}, Channel =
|
handle_out({publish, PacketId, Msg}, Channel =
|
||||||
#channel{client = Client = #{mountpoint := MountPoint}}) ->
|
#channel{client = ClientInfo = #{mountpoint := MountPoint}}) ->
|
||||||
Msg1 = emqx_message:update_expiry(Msg),
|
Msg1 = emqx_message:update_expiry(Msg),
|
||||||
Msg2 = emqx_hooks:run_fold('message.delivered', [Client], Msg1),
|
Msg2 = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg1),
|
||||||
Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2),
|
Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2),
|
||||||
{ok, emqx_message:to_packet(PacketId, Msg3), Channel};
|
{ok, emqx_message:to_packet(PacketId, Msg3), Channel};
|
||||||
|
|
||||||
|
@ -581,24 +601,23 @@ handle_out({pubrec, PacketId, ReasonCode}, Channel) ->
|
||||||
handle_out({pubcomp, PacketId, ReasonCode}, Channel) ->
|
handle_out({pubcomp, PacketId, ReasonCode}, Channel) ->
|
||||||
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
|
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
|
||||||
|
|
||||||
handle_out({suback, PacketId, ReasonCodes}, Channel = #channel{protocol = Protocol}) ->
|
handle_out({suback, PacketId, ReasonCodes},
|
||||||
ReasonCodes1 = case emqx_protocol:info(proto_ver, Protocol) of
|
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
|
||||||
?MQTT_PROTO_V5 -> ReasonCodes;
|
{ok, ?SUBACK_PACKET(PacketId, ReasonCodes), Channel};
|
||||||
_Ver ->
|
|
||||||
[emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes]
|
handle_out({suback, PacketId, ReasonCodes}, Channel) ->
|
||||||
end,
|
ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes],
|
||||||
{ok, ?SUBACK_PACKET(PacketId, ReasonCodes1), Channel};
|
{ok, ?SUBACK_PACKET(PacketId, ReasonCodes1), Channel};
|
||||||
|
|
||||||
handle_out({unsuback, PacketId, ReasonCodes}, Channel = #channel{protocol = Protocol}) ->
|
handle_out({unsuback, PacketId, ReasonCodes},
|
||||||
Unsuback = case emqx_protocol:info(proto_ver, Protocol) of
|
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
|
||||||
?MQTT_PROTO_V5 ->
|
{ok, ?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel};
|
||||||
?UNSUBACK_PACKET(PacketId, ReasonCodes);
|
|
||||||
_Ver -> ?UNSUBACK_PACKET(PacketId)
|
|
||||||
end,
|
|
||||||
{ok, Unsuback, Channel};
|
|
||||||
|
|
||||||
handle_out({disconnect, ReasonCode}, Channel = #channel{protocol = Protocol}) ->
|
handle_out({unsuback, PacketId, _ReasonCodes}, Channel) ->
|
||||||
case emqx_protocol:info(proto_ver, Protocol) of
|
{ok, ?UNSUBACK_PACKET(PacketId), Channel};
|
||||||
|
|
||||||
|
handle_out({disconnect, ReasonCode}, Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
|
case maps:get(proto_ver, ConnInfo) of
|
||||||
?MQTT_PROTO_V5 ->
|
?MQTT_PROTO_V5 ->
|
||||||
Reason = emqx_reason_codes:name(ReasonCode),
|
Reason = emqx_reason_codes:name(ReasonCode),
|
||||||
Packet = ?DISCONNECT_PACKET(ReasonCode),
|
Packet = ?DISCONNECT_PACKET(ReasonCode),
|
||||||
|
@ -660,16 +679,16 @@ handle_cast(Msg, Channel) ->
|
||||||
|
|
||||||
-spec(handle_info(Info :: term(), channel())
|
-spec(handle_info(Info :: term(), channel())
|
||||||
-> {ok, channel()} | {stop, Reason :: term(), channel()}).
|
-> {ok, channel()} | {stop, Reason :: term(), channel()}).
|
||||||
handle_info({subscribe, TopicFilters}, Channel = #channel{client = Client}) ->
|
handle_info({subscribe, TopicFilters}, Channel = #channel{client = ClientInfo}) ->
|
||||||
TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
||||||
[Client, #{'Internal' => true}],
|
[ClientInfo, #{'Internal' => true}],
|
||||||
parse_topic_filters(TopicFilters)),
|
parse_topic_filters(TopicFilters)),
|
||||||
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
||||||
{ok, NChannel};
|
{ok, NChannel};
|
||||||
|
|
||||||
handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) ->
|
handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = ClientInfo}) ->
|
||||||
TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
||||||
[Client, #{'Internal' => true}],
|
[ClientInfo, #{'Internal' => true}],
|
||||||
parse_topic_filters(TopicFilters)),
|
parse_topic_filters(TopicFilters)),
|
||||||
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
||||||
{ok, NChannel};
|
{ok, NChannel};
|
||||||
|
@ -677,12 +696,11 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) ->
|
||||||
handle_info(disconnected, Channel = #channel{connected = undefined}) ->
|
handle_info(disconnected, Channel = #channel{connected = undefined}) ->
|
||||||
shutdown(closed, Channel);
|
shutdown(closed, Channel);
|
||||||
|
|
||||||
handle_info(disconnected, Channel = #channel{protocol = Protocol,
|
handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := Interval},
|
||||||
session = Session}) ->
|
will_msg = WillMsg}) ->
|
||||||
%% TODO: Why handle will_msg here?
|
%% TODO: Why handle will_msg here?
|
||||||
publish_will_msg(emqx_protocol:info(will_msg, Protocol)),
|
publish_will_msg(WillMsg),
|
||||||
NChannel = Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)},
|
NChannel = Channel#channel{will_msg = undefined},
|
||||||
Interval = emqx_session:info(expiry_interval, Session),
|
|
||||||
case Interval of
|
case Interval of
|
||||||
?UINT_MAX ->
|
?UINT_MAX ->
|
||||||
{ok, ensure_disconnected(NChannel)};
|
{ok, ensure_disconnected(NChannel)};
|
||||||
|
@ -699,20 +717,19 @@ handle_info(Info, Channel) ->
|
||||||
%% Handle timeout
|
%% Handle timeout
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(timeout(reference(), Msg :: term(), channel())
|
-spec(handle_timeout(reference(), Msg :: term(), channel())
|
||||||
-> {ok, channel()}
|
-> {ok, channel()}
|
||||||
| {ok, Result :: term(), channel()}
|
| {ok, Result :: term(), channel()}
|
||||||
| {stop, Reason :: term(), channel()}).
|
| {stop, Reason :: term(), channel()}).
|
||||||
timeout(TRef, {emit_stats, Stats},
|
handle_timeout(TRef, {emit_stats, Stats},
|
||||||
Channel = #channel{client = #{client_id := ClientId},
|
Channel = #channel{client = #{client_id := ClientId},
|
||||||
timers = #{stats_timer := TRef}
|
timers = #{stats_timer := TRef}}) ->
|
||||||
}) ->
|
|
||||||
ok = emqx_cm:set_chan_stats(ClientId, Stats),
|
ok = emqx_cm:set_chan_stats(ClientId, Stats),
|
||||||
{ok, clean_timer(stats_timer, Channel)};
|
{ok, clean_timer(stats_timer, Channel)};
|
||||||
|
|
||||||
timeout(TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive,
|
handle_timeout(TRef, {keepalive, StatVal},
|
||||||
timers = #{alive_timer := TRef}
|
Channel = #channel{keepalive = Keepalive,
|
||||||
}) ->
|
timers = #{alive_timer := TRef}}) ->
|
||||||
case emqx_keepalive:check(StatVal, Keepalive) of
|
case emqx_keepalive:check(StatVal, Keepalive) of
|
||||||
{ok, NKeepalive} ->
|
{ok, NKeepalive} ->
|
||||||
NChannel = Channel#channel{keepalive = NKeepalive},
|
NChannel = Channel#channel{keepalive = NKeepalive},
|
||||||
|
@ -721,9 +738,9 @@ timeout(TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive,
|
||||||
{stop, {shutdown, keepalive_timeout}, Channel}
|
{stop, {shutdown, keepalive_timeout}, Channel}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
timeout(TRef, retry_delivery, Channel = #channel{session = Session,
|
handle_timeout(TRef, retry_delivery,
|
||||||
timers = #{retry_timer := TRef}
|
Channel = #channel{session = Session,
|
||||||
}) ->
|
timers = #{retry_timer := TRef}}) ->
|
||||||
case emqx_session:retry(Session) of
|
case emqx_session:retry(Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
||||||
|
@ -735,8 +752,9 @@ timeout(TRef, retry_delivery, Channel = #channel{session = Session,
|
||||||
handle_out({publish, Publishes}, reset_timer(retry_timer, Timeout, NChannel))
|
handle_out({publish, Publishes}, reset_timer(retry_timer, Timeout, NChannel))
|
||||||
end;
|
end;
|
||||||
|
|
||||||
timeout(TRef, expire_awaiting_rel, Channel = #channel{session = Session,
|
handle_timeout(TRef, expire_awaiting_rel,
|
||||||
timers = #{await_timer := TRef}}) ->
|
Channel = #channel{session = Session,
|
||||||
|
timers = #{await_timer := TRef}}) ->
|
||||||
case emqx_session:expire(awaiting_rel, Session) of
|
case emqx_session:expire(awaiting_rel, Session) of
|
||||||
{ok, Session} ->
|
{ok, Session} ->
|
||||||
{ok, clean_timer(await_timer, Channel#channel{session = Session})};
|
{ok, clean_timer(await_timer, Channel#channel{session = Session})};
|
||||||
|
@ -744,15 +762,15 @@ timeout(TRef, expire_awaiting_rel, Channel = #channel{session = Session,
|
||||||
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})}
|
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
timeout(TRef, expire_session, Channel = #channel{timers = #{expire_timer := TRef}}) ->
|
handle_timeout(TRef, expire_session, Channel = #channel{timers = #{expire_timer := TRef}}) ->
|
||||||
shutdown(expired, Channel);
|
shutdown(expired, Channel);
|
||||||
|
|
||||||
timeout(TRef, will_message, Channel = #channel{protocol = Protocol,
|
handle_timeout(TRef, will_message, Channel = #channel{will_msg = WillMsg,
|
||||||
timers = #{will_timer := TRef}}) ->
|
timers = #{will_timer := TRef}}) ->
|
||||||
publish_will_msg(emqx_protocol:info(will_msg, Protocol)),
|
publish_will_msg(WillMsg),
|
||||||
{ok, clean_timer(will_timer, Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)})};
|
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
||||||
|
|
||||||
timeout(_TRef, Msg, Channel) ->
|
handle_timeout(_TRef, Msg, Channel) ->
|
||||||
?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
|
?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
|
@ -796,25 +814,21 @@ interval(retry_timer, #channel{session = Session}) ->
|
||||||
emqx_session:info(retry_interval, Session);
|
emqx_session:info(retry_interval, Session);
|
||||||
interval(await_timer, #channel{session = Session}) ->
|
interval(await_timer, #channel{session = Session}) ->
|
||||||
emqx_session:info(await_rel_timeout, Session);
|
emqx_session:info(await_rel_timeout, Session);
|
||||||
interval(expire_timer, #channel{session = Session}) ->
|
interval(expire_timer, #channel{conninfo = ConnInfo}) ->
|
||||||
timer:seconds(emqx_session:info(expiry_interval, Session));
|
timer:seconds(maps:get(expiry_interval, ConnInfo));
|
||||||
interval(will_timer, #channel{protocol = Protocol}) ->
|
interval(will_timer, #channel{will_msg = WillMsg}) ->
|
||||||
timer:seconds(emqx_protocol:info(will_delay_interval, Protocol)).
|
%% TODO: Ensure the header exists.
|
||||||
|
timer:seconds(emqx_message:get_header('Will-Delay-Interval', WillMsg)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Terminate
|
%% Terminate
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
terminate(normal, #channel{client = Client}) ->
|
terminate(normal, #channel{conninfo = ConnInfo, client = ClientInfo}) ->
|
||||||
ok = emqx_hooks:run('client.disconnected', [Client, normal]);
|
ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]);
|
||||||
terminate(Reason, #channel{client = Client,
|
terminate(Reason, #channel{conninfo = ConnInfo, client = ClientInfo, will_msg = WillMsg}) ->
|
||||||
protocol = Protocol
|
publish_will_msg(WillMsg),
|
||||||
}) ->
|
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]).
|
||||||
ok = emqx_hooks:run('client.disconnected', [Client, Reason]),
|
|
||||||
if
|
|
||||||
Protocol == undefined -> ok;
|
|
||||||
true -> publish_will_msg(emqx_protocol:info(will_msg, Protocol))
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec(received(pos_integer(), channel()) -> channel()).
|
-spec(received(pos_integer(), channel()) -> channel()).
|
||||||
received(Oct, Channel) ->
|
received(Oct, Channel) ->
|
||||||
|
@ -830,51 +844,78 @@ publish_will_msg(undefined) ->
|
||||||
publish_will_msg(Msg) ->
|
publish_will_msg(Msg) ->
|
||||||
emqx_broker:publish(Msg).
|
emqx_broker:publish(Msg).
|
||||||
|
|
||||||
|
%% @doc Enrich MQTT Connect Info.
|
||||||
|
enrich_conninfo(#mqtt_packet_connect{
|
||||||
|
proto_name = ProtoName,
|
||||||
|
proto_ver = ProtoVer,
|
||||||
|
clean_start = CleanStart,
|
||||||
|
keepalive = Keepalive,
|
||||||
|
properties = ConnProps,
|
||||||
|
client_id = ClientId,
|
||||||
|
username = Username}, Channel) ->
|
||||||
|
#channel{conninfo = ConnInfo, client = #{zone := Zone}} = Channel,
|
||||||
|
MaxInflight = emqx_mqtt_props:get('Receive-Maximum',
|
||||||
|
ConnProps, emqx_zone:max_inflight(Zone)),
|
||||||
|
Interval = if ProtoVer == ?MQTT_PROTO_V5 ->
|
||||||
|
emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0);
|
||||||
|
true -> case CleanStart of
|
||||||
|
true -> 0;
|
||||||
|
false -> emqx_zone:session_expiry_interval(Zone)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
NConnInfo = ConnInfo#{proto_name => ProtoName,
|
||||||
|
proto_ver => ProtoVer,
|
||||||
|
clean_start => CleanStart,
|
||||||
|
keepalive => Keepalive,
|
||||||
|
client_id => ClientId,
|
||||||
|
username => Username,
|
||||||
|
conn_props => ConnProps,
|
||||||
|
receive_maximum => MaxInflight,
|
||||||
|
expiry_interval => Interval
|
||||||
|
},
|
||||||
|
{ok, Channel#channel{conninfo = NConnInfo}}.
|
||||||
|
|
||||||
%% @doc Check connect packet.
|
%% @doc Check connect packet.
|
||||||
check_connpkt(ConnPkt, #channel{client = #{zone := Zone}}) ->
|
check_connect(ConnPkt, #channel{client = #{zone := Zone}}) ->
|
||||||
emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)).
|
emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)).
|
||||||
|
|
||||||
%% @doc Init protocol record.
|
|
||||||
init_protocol(ConnPkt, Channel = #channel{client = #{zone := Zone}}) ->
|
|
||||||
{ok, Channel#channel{protocol = emqx_protocol:init(ConnPkt, Zone)}}.
|
|
||||||
|
|
||||||
%% @doc Enrich client
|
%% @doc Enrich client
|
||||||
enrich_client(ConnPkt, Channel = #channel{client = Client}) ->
|
enrich_client(ConnPkt, Channel = #channel{client = ClientInfo}) ->
|
||||||
{ok, NConnPkt, NClient} = pipeline([fun set_username/2,
|
{ok, NConnPkt, NClientInfo} =
|
||||||
fun set_bridge_mode/2,
|
pipeline([fun set_username/2,
|
||||||
fun maybe_username_as_clientid/2,
|
fun set_bridge_mode/2,
|
||||||
fun maybe_assign_clientid/2,
|
fun maybe_username_as_clientid/2,
|
||||||
fun fix_mountpoint/2
|
fun maybe_assign_clientid/2,
|
||||||
], ConnPkt, Client),
|
fun fix_mountpoint/2], ConnPkt, ClientInfo),
|
||||||
{ok, NConnPkt, Channel#channel{client = NClient}}.
|
{ok, NConnPkt, Channel#channel{client = NClientInfo}}.
|
||||||
|
|
||||||
set_username(#mqtt_packet_connect{username = Username}, Client = #{username := undefined}) ->
|
set_username(#mqtt_packet_connect{username = Username},
|
||||||
{ok, Client#{username => Username}};
|
ClientInfo = #{username := undefined}) ->
|
||||||
set_username(_ConnPkt, Client) ->
|
{ok, ClientInfo#{username => Username}};
|
||||||
{ok, Client}.
|
set_username(_ConnPkt, ClientInfo) ->
|
||||||
|
{ok, ClientInfo}.
|
||||||
|
|
||||||
set_bridge_mode(#mqtt_packet_connect{is_bridge = true}, Client) ->
|
set_bridge_mode(#mqtt_packet_connect{is_bridge = true}, ClientInfo) ->
|
||||||
{ok, Client#{is_bridge => true}};
|
{ok, ClientInfo#{is_bridge => true}};
|
||||||
set_bridge_mode(_ConnPkt, _Client) -> ok.
|
set_bridge_mode(_ConnPkt, _ClientInfo) -> ok.
|
||||||
|
|
||||||
maybe_username_as_clientid(_ConnPkt, Client = #{username := undefined}) ->
|
maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) ->
|
||||||
{ok, Client};
|
{ok, ClientInfo};
|
||||||
maybe_username_as_clientid(_ConnPkt, Client = #{zone := Zone, username := Username}) ->
|
maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, username := Username}) ->
|
||||||
case emqx_zone:use_username_as_clientid(Zone) of
|
case emqx_zone:use_username_as_clientid(Zone) of
|
||||||
true -> {ok, Client#{client_id => Username}};
|
true -> {ok, ClientInfo#{client_id => Username}};
|
||||||
false -> ok
|
false -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, Client) ->
|
maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, ClientInfo) ->
|
||||||
%% Generate a rand clientId
|
%% Generate a rand clientId
|
||||||
RandId = emqx_guid:to_base62(emqx_guid:gen()),
|
{ok, ClientInfo#{client_id => emqx_guid:to_base62(emqx_guid:gen())}};
|
||||||
{ok, Client#{client_id => RandId}};
|
maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, ClientInfo) ->
|
||||||
maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, Client) ->
|
{ok, ClientInfo#{client_id => ClientId}}.
|
||||||
{ok, Client#{client_id => ClientId}}.
|
|
||||||
|
|
||||||
fix_mountpoint(_ConnPkt, #{mountpoint := undefined}) -> ok;
|
fix_mountpoint(_ConnPkt, #{mountpoint := undefined}) -> ok;
|
||||||
fix_mountpoint(_ConnPkt, Client = #{mountpoint := Mountpoint}) ->
|
fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := Mountpoint}) ->
|
||||||
{ok, Client#{mountpoint := emqx_mountpoint:replvar(Mountpoint, Client)}}.
|
{ok, ClientInfo#{mountpoint := emqx_mountpoint:replvar(Mountpoint, ClientInfo)}}.
|
||||||
|
|
||||||
%% @doc Set logger metadata.
|
%% @doc Set logger metadata.
|
||||||
set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) ->
|
set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) ->
|
||||||
|
@ -884,15 +925,15 @@ set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) ->
|
||||||
%% Check banned/flapping
|
%% Check banned/flapping
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
check_banned(_ConnPkt, #channel{client = Client = #{zone := Zone}}) ->
|
check_banned(_ConnPkt, #channel{client = ClientInfo = #{zone := Zone}}) ->
|
||||||
case emqx_zone:enable_banned(Zone) andalso emqx_banned:check(Client) of
|
case emqx_zone:enable_ban(Zone) andalso emqx_banned:check(ClientInfo) of
|
||||||
true -> {error, ?RC_BANNED};
|
true -> {error, ?RC_BANNED};
|
||||||
false -> ok
|
false -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_flapping(_ConnPkt, #channel{client = Client = #{zone := Zone}}) ->
|
check_flapping(_ConnPkt, #channel{client = ClientInfo = #{zone := Zone}}) ->
|
||||||
case emqx_zone:enable_flapping_detect(Zone)
|
case emqx_zone:enable_flapping_detect(Zone)
|
||||||
andalso emqx_flapping:check(Client) of
|
andalso emqx_flapping:check(ClientInfo) of
|
||||||
true -> {error, ?RC_CONNECTION_RATE_EXCEEDED};
|
true -> {error, ?RC_CONNECTION_RATE_EXCEEDED};
|
||||||
false -> ok
|
false -> ok
|
||||||
end.
|
end.
|
||||||
|
@ -904,38 +945,16 @@ check_flapping(_ConnPkt, #channel{client = Client = #{zone := Zone}}) ->
|
||||||
auth_connect(#mqtt_packet_connect{client_id = ClientId,
|
auth_connect(#mqtt_packet_connect{client_id = ClientId,
|
||||||
username = Username,
|
username = Username,
|
||||||
password = Password},
|
password = Password},
|
||||||
Channel = #channel{client = Client}) ->
|
Channel = #channel{client = ClientInfo}) ->
|
||||||
case emqx_access_control:authenticate(Client#{password => Password}) of
|
case emqx_access_control:authenticate(ClientInfo#{password => Password}) of
|
||||||
{ok, AuthResult} ->
|
{ok, AuthResult} ->
|
||||||
{ok, Channel#channel{client = maps:merge(Client, AuthResult)}};
|
{ok, Channel#channel{client = maps:merge(ClientInfo, AuthResult)}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
|
?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
|
||||||
[ClientId, Username, Reason]),
|
[ClientId, Username, Reason]),
|
||||||
{error, emqx_reason_codes:connack_error(Reason)}
|
{error, emqx_reason_codes:connack_error(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Open session
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
open_session(#mqtt_packet_connect{clean_start = CleanStart,
|
|
||||||
properties = ConnProps},
|
|
||||||
#channel{client = Client = #{zone := Zone}, protocol = Protocol}) ->
|
|
||||||
MaxInflight = get_property('Receive-Maximum', ConnProps,
|
|
||||||
emqx_zone:get_env(Zone, max_inflight, 65535)),
|
|
||||||
Interval =
|
|
||||||
case emqx_protocol:info(proto_ver, Protocol) of
|
|
||||||
?MQTT_PROTO_V5 -> get_property('Session-Expiry-Interval', ConnProps, 0);
|
|
||||||
_ ->
|
|
||||||
case CleanStart of
|
|
||||||
true -> 0;
|
|
||||||
false -> emqx_zone:get_env(Zone, session_expiry_interval, 0)
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
emqx_cm:open_session(CleanStart, Client, #{max_inflight => MaxInflight,
|
|
||||||
expiry_interval => Interval
|
|
||||||
}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Process publish message: Client -> Broker
|
%% Process publish message: Client -> Broker
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -945,8 +964,8 @@ process_alias(Packet = #mqtt_packet{
|
||||||
properties = #{'Topic-Alias' := AliasId}
|
properties = #{'Topic-Alias' := AliasId}
|
||||||
} = Publish
|
} = Publish
|
||||||
},
|
},
|
||||||
Channel = #channel{protocol = Protocol}) ->
|
Channel = #channel{topic_aliases = Aliases}) ->
|
||||||
case emqx_protocol:find_alias(AliasId, Protocol) of
|
case find_alias(AliasId, Aliases) of
|
||||||
{ok, Topic} ->
|
{ok, Topic} ->
|
||||||
{ok, Packet#mqtt_packet{
|
{ok, Packet#mqtt_packet{
|
||||||
variable = Publish#mqtt_packet_publish{
|
variable = Publish#mqtt_packet_publish{
|
||||||
|
@ -958,23 +977,23 @@ process_alias(#mqtt_packet{
|
||||||
variable = #mqtt_packet_publish{topic_name = Topic,
|
variable = #mqtt_packet_publish{topic_name = Topic,
|
||||||
properties = #{'Topic-Alias' := AliasId}
|
properties = #{'Topic-Alias' := AliasId}
|
||||||
}
|
}
|
||||||
}, Channel = #channel{protocol = Protocol}) ->
|
}, Channel = #channel{topic_aliases = Aliases}) ->
|
||||||
{ok, Channel#channel{protocol = emqx_protocol:save_alias(AliasId, Topic, Protocol)}};
|
{ok, Channel#channel{topic_aliases = save_alias(AliasId, Topic, Aliases)}};
|
||||||
|
|
||||||
process_alias(_Packet, Channel) ->
|
process_alias(_Packet, Channel) ->
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
%% Check Publish
|
find_alias(_AliasId, undefined) -> false;
|
||||||
check_publish(Packet, Channel) ->
|
find_alias(AliasId, Aliases) -> maps:find(AliasId, Aliases).
|
||||||
pipeline([fun check_pub_acl/2,
|
|
||||||
fun check_pub_alias/2,
|
save_alias(AliasId, Topic, undefined) -> #{AliasId => Topic};
|
||||||
fun check_pub_caps/2], Packet, Channel).
|
save_alias(AliasId, Topic, Aliases) -> maps:put(AliasId, Topic, Aliases).
|
||||||
|
|
||||||
%% Check Pub ACL
|
%% Check Pub ACL
|
||||||
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
|
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
|
||||||
#channel{client = Client}) ->
|
#channel{client = ClientInfo}) ->
|
||||||
case is_acl_enabled(Client) andalso
|
case is_acl_enabled(ClientInfo) andalso
|
||||||
emqx_access_control:check_acl(Client, publish, Topic) of
|
emqx_access_control:check_acl(ClientInfo, publish, Topic) of
|
||||||
false -> ok;
|
false -> ok;
|
||||||
allow -> ok;
|
allow -> ok;
|
||||||
deny -> {error, ?RC_NOT_AUTHORIZED}
|
deny -> {error, ?RC_NOT_AUTHORIZED}
|
||||||
|
@ -986,9 +1005,8 @@ check_pub_alias(#mqtt_packet{
|
||||||
properties = #{'Topic-Alias' := AliasId}
|
properties = #{'Topic-Alias' := AliasId}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
#channel{protocol = Protocol}) ->
|
#channel{alias_maximum = Limits}) ->
|
||||||
%% TODO: Move to Protocol
|
%% TODO: Move to Protocol
|
||||||
Limits = emqx_protocol:info(alias_maximum, Protocol),
|
|
||||||
case (Limits == undefined)
|
case (Limits == undefined)
|
||||||
orelse (Max = maps:get(inbound, Limits, 0)) == 0
|
orelse (Max = maps:get(inbound, Limits, 0)) == 0
|
||||||
orelse (AliasId > Max) of
|
orelse (AliasId > Max) of
|
||||||
|
@ -1013,9 +1031,9 @@ check_subscribe(TopicFilter, SubOpts, Channel) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Check Sub ACL
|
%% Check Sub ACL
|
||||||
check_sub_acl(TopicFilter, #channel{client = Client}) ->
|
check_sub_acl(TopicFilter, #channel{client = ClientInfo}) ->
|
||||||
case is_acl_enabled(Client) andalso
|
case is_acl_enabled(ClientInfo) andalso
|
||||||
emqx_access_control:check_acl(Client, subscribe, TopicFilter) of
|
emqx_access_control:check_acl(ClientInfo, subscribe, TopicFilter) of
|
||||||
false -> allow;
|
false -> allow;
|
||||||
Result -> Result
|
Result -> Result
|
||||||
end.
|
end.
|
||||||
|
@ -1029,64 +1047,63 @@ enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
|
||||||
enrich_subid(_Properties, TopicFilters) ->
|
enrich_subid(_Properties, TopicFilters) ->
|
||||||
TopicFilters.
|
TopicFilters.
|
||||||
|
|
||||||
enrich_subopts(SubOpts, #channel{client = Client, protocol = Proto}) ->
|
enrich_subopts(SubOpts, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
|
||||||
#{zone := Zone, is_bridge := IsBridge} = Client,
|
SubOpts;
|
||||||
case emqx_protocol:info(proto_ver, Proto) of
|
|
||||||
?MQTT_PROTO_V5 -> SubOpts;
|
|
||||||
_Ver -> Rap = flag(IsBridge),
|
|
||||||
Nl = flag(emqx_zone:get_env(Zone, ignore_loop_deliver, false)),
|
|
||||||
SubOpts#{rap => Rap, nl => Nl}
|
|
||||||
end.
|
|
||||||
|
|
||||||
enrich_caps(AckProps, #channel{client = #{zone := Zone}, protocol = Protocol}) ->
|
enrich_subopts(SubOpts, #channel{client = #{zone := Zone, is_bridge := IsBridge}}) ->
|
||||||
case emqx_protocol:info(proto_ver, Protocol) of
|
NL = flag(emqx_zone:ignore_loop_deliver(Zone)),
|
||||||
?MQTT_PROTO_V5 ->
|
SubOpts#{rap => flag(IsBridge), nl => NL}.
|
||||||
#{max_packet_size := MaxPktSize,
|
|
||||||
max_qos_allowed := MaxQoS,
|
enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5},
|
||||||
retain_available := Retain,
|
client = #{zone := Zone}}) ->
|
||||||
max_topic_alias := MaxAlias,
|
#{max_packet_size := MaxPktSize,
|
||||||
shared_subscription := Shared,
|
max_qos_allowed := MaxQoS,
|
||||||
wildcard_subscription := Wildcard
|
retain_available := Retain,
|
||||||
} = emqx_mqtt_caps:get_caps(Zone),
|
max_topic_alias := MaxAlias,
|
||||||
AckProps#{'Retain-Available' => flag(Retain),
|
shared_subscription := Shared,
|
||||||
'Maximum-Packet-Size' => MaxPktSize,
|
wildcard_subscription := Wildcard
|
||||||
'Topic-Alias-Maximum' => MaxAlias,
|
} = emqx_mqtt_caps:get_caps(Zone),
|
||||||
'Wildcard-Subscription-Available' => flag(Wildcard),
|
AckProps#{'Retain-Available' => flag(Retain),
|
||||||
'Subscription-Identifier-Available' => 1,
|
'Maximum-Packet-Size' => MaxPktSize,
|
||||||
'Shared-Subscription-Available' => flag(Shared),
|
'Topic-Alias-Maximum' => MaxAlias,
|
||||||
'Maximum-QoS' => MaxQoS
|
'Wildcard-Subscription-Available' => flag(Wildcard),
|
||||||
};
|
'Subscription-Identifier-Available' => 1,
|
||||||
_Ver -> AckProps
|
'Shared-Subscription-Available' => flag(Shared),
|
||||||
end.
|
'Maximum-QoS' => MaxQoS
|
||||||
|
};
|
||||||
|
enrich_caps(AckProps, _Channel) ->
|
||||||
|
AckProps.
|
||||||
|
|
||||||
enrich_server_keepalive(AckProps, #channel{client = #{zone := Zone}}) ->
|
enrich_server_keepalive(AckProps, #channel{client = #{zone := Zone}}) ->
|
||||||
case emqx_zone:get_env(Zone, server_keepalive) of
|
case emqx_zone:server_keepalive(Zone) of
|
||||||
undefined -> AckProps;
|
undefined -> AckProps;
|
||||||
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
|
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
enrich_assigned_clientid(AckProps, #channel{client = #{client_id := ClientId},
|
enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo,
|
||||||
protocol = Protocol}) ->
|
client = #{client_id := ClientId}
|
||||||
case emqx_protocol:info(client_id, Protocol) of
|
}) ->
|
||||||
<<>> -> %% Original ClientId.
|
case maps:get(client_id, ConnInfo) of
|
||||||
|
<<>> -> %% Original ClientId is null.
|
||||||
AckProps#{'Assigned-Client-Identifier' => ClientId};
|
AckProps#{'Assigned-Client-Identifier' => ClientId};
|
||||||
_Origin -> AckProps
|
_Origin -> AckProps
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ensure_connected(Channel) ->
|
init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
|
||||||
Channel#channel{connected = true, connected_at = os:timestamp(), disconnected_at = undefined}.
|
properties = Properties}, #{zone := Zone}) ->
|
||||||
|
#{outbound => emqx_mqtt_props:get('Topic-Alias-Maximum', Properties, 0),
|
||||||
|
inbound => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, 0)};
|
||||||
|
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
|
||||||
|
|
||||||
ensure_disconnected(Channel) ->
|
ensure_disconnected(Channel) ->
|
||||||
Channel#channel{connected = false, disconnected_at = os:timestamp()}.
|
Channel#channel{connected = false, disconnected_at = os:timestamp()}.
|
||||||
|
|
||||||
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
|
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
|
||||||
ensure_keepalive_timer(Interval, Channel);
|
ensure_keepalive_timer(Interval, Channel);
|
||||||
ensure_keepalive(_AckProp, Channel = #channel{protocol = Protocol}) ->
|
ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
case emqx_protocol:info(keepalive, Protocol) of
|
ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel).
|
||||||
0 -> Channel;
|
|
||||||
Interval -> ensure_keepalive_timer(Interval, Channel)
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
ensure_keepalive_timer(0, Channel) -> Channel;
|
||||||
ensure_keepalive_timer(Interval, Channel = #channel{client = #{zone := Zone}}) ->
|
ensure_keepalive_timer(Interval, Channel = #channel{client = #{zone := Zone}}) ->
|
||||||
Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75),
|
Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75),
|
||||||
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
|
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
|
||||||
|
@ -1137,11 +1154,6 @@ check_oom(OomPolicy) ->
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
get_property(_Name, undefined, Default) ->
|
|
||||||
Default;
|
|
||||||
get_property(Name, Props, Default) ->
|
|
||||||
maps:get(Name, Props, Default).
|
|
||||||
|
|
||||||
sp(true) -> 1;
|
sp(true) -> 1;
|
||||||
sp(false) -> 0.
|
sp(false) -> 0.
|
||||||
|
|
||||||
|
|
|
@ -114,8 +114,6 @@
|
||||||
max_awaiting_rel :: non_neg_integer(),
|
max_awaiting_rel :: non_neg_integer(),
|
||||||
%% Awaiting PUBREL Timeout
|
%% Awaiting PUBREL Timeout
|
||||||
await_rel_timeout :: timeout(),
|
await_rel_timeout :: timeout(),
|
||||||
%% Session Expiry Interval
|
|
||||||
expiry_interval :: timeout(),
|
|
||||||
%% Enqueue Count
|
%% Enqueue Count
|
||||||
enqueue_cnt :: non_neg_integer(),
|
enqueue_cnt :: non_neg_integer(),
|
||||||
%% Created at
|
%% Created at
|
||||||
|
@ -127,11 +125,12 @@
|
||||||
-type(publish() :: {publish, emqx_types:packet_id(), emqx_types:message()}).
|
-type(publish() :: {publish, emqx_types:packet_id(), emqx_types:message()}).
|
||||||
|
|
||||||
-define(DEFAULT_BATCH_N, 1000).
|
-define(DEFAULT_BATCH_N, 1000).
|
||||||
-define(ATTR_KEYS, [expiry_interval, created_at]).
|
-define(ATTR_KEYS, [max_inflight, max_mqueue, retry_interval,
|
||||||
|
max_awaiting_rel, await_rel_timeout, created_at]).
|
||||||
-define(INFO_KEYS, [subscriptions, max_subscriptions, upgrade_qos, inflight,
|
-define(INFO_KEYS, [subscriptions, max_subscriptions, upgrade_qos, inflight,
|
||||||
max_inflight, retry_interval, mqueue_len, max_mqueue,
|
max_inflight, retry_interval, mqueue_len, max_mqueue,
|
||||||
mqueue_dropped, next_pkt_id, awaiting_rel, max_awaiting_rel,
|
mqueue_dropped, next_pkt_id, awaiting_rel, max_awaiting_rel,
|
||||||
await_rel_timeout, expiry_interval, created_at]).
|
await_rel_timeout, created_at]).
|
||||||
-define(STATS_KEYS, [subscriptions_cnt, max_subscriptions, inflight, max_inflight,
|
-define(STATS_KEYS, [subscriptions_cnt, max_subscriptions, inflight, max_inflight,
|
||||||
mqueue_len, max_mqueue, mqueue_dropped, awaiting_rel,
|
mqueue_len, max_mqueue, mqueue_dropped, awaiting_rel,
|
||||||
max_awaiting_rel, enqueue_cnt]).
|
max_awaiting_rel, enqueue_cnt]).
|
||||||
|
@ -142,8 +141,7 @@
|
||||||
|
|
||||||
%% @doc Init a session.
|
%% @doc Init a session.
|
||||||
-spec(init(emqx_types:client(), Options :: map()) -> session()).
|
-spec(init(emqx_types:client(), Options :: map()) -> session()).
|
||||||
init(#{zone := Zone}, #{receive_maximum := MaxInflight,
|
init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
|
||||||
expiry_interval := ExpiryInterval}) ->
|
|
||||||
#session{max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
#session{max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
||||||
subscriptions = #{},
|
subscriptions = #{},
|
||||||
upgrade_qos = get_env(Zone, upgrade_qos, false),
|
upgrade_qos = get_env(Zone, upgrade_qos, false),
|
||||||
|
@ -154,7 +152,6 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight,
|
||||||
awaiting_rel = #{},
|
awaiting_rel = #{},
|
||||||
max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
|
max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
|
||||||
await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000),
|
await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000),
|
||||||
expiry_interval = ExpiryInterval,
|
|
||||||
enqueue_cnt = 0,
|
enqueue_cnt = 0,
|
||||||
created_at = os:timestamp()
|
created_at = os:timestamp()
|
||||||
}.
|
}.
|
||||||
|
@ -210,8 +207,6 @@ info(max_awaiting_rel, #session{max_awaiting_rel = MaxAwaitingRel}) ->
|
||||||
MaxAwaitingRel;
|
MaxAwaitingRel;
|
||||||
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
|
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
|
||||||
Timeout;
|
Timeout;
|
||||||
info(expiry_interval, #session{expiry_interval = Interval}) ->
|
|
||||||
Interval;
|
|
||||||
info(enqueue_cnt, #session{enqueue_cnt = Cnt}) ->
|
info(enqueue_cnt, #session{enqueue_cnt = Cnt}) ->
|
||||||
Cnt;
|
Cnt;
|
||||||
info(created_at, #session{created_at = CreatedAt}) ->
|
info(created_at, #session{created_at = CreatedAt}) ->
|
||||||
|
|
|
@ -103,9 +103,7 @@
|
||||||
atom() => term()
|
atom() => term()
|
||||||
}).
|
}).
|
||||||
-type(client() :: #{zone := zone(),
|
-type(client() :: #{zone := zone(),
|
||||||
conn_mod := maybe(module()),
|
|
||||||
peerhost := peerhost(),
|
peerhost := peerhost(),
|
||||||
sockname := peername(),
|
|
||||||
client_id := client_id(),
|
client_id := client_id(),
|
||||||
username := username(),
|
username := username(),
|
||||||
peercert := esockd_peercert:peercert(),
|
peercert := esockd_peercert:peercert(),
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
-export([ use_username_as_clientid/1
|
-export([ use_username_as_clientid/1
|
||||||
, enable_stats/1
|
, enable_stats/1
|
||||||
, enable_acl/1
|
, enable_acl/1
|
||||||
, enable_banned/1
|
, enable_ban/1
|
||||||
, enable_flapping_detect/1
|
, enable_flapping_detect/1
|
||||||
, ignore_loop_deliver/1
|
, ignore_loop_deliver/1
|
||||||
, server_keepalive/1
|
, server_keepalive/1
|
||||||
|
@ -88,9 +88,9 @@ enable_stats(Zone) ->
|
||||||
enable_acl(Zone) ->
|
enable_acl(Zone) ->
|
||||||
get_env(Zone, enable_acl, true).
|
get_env(Zone, enable_acl, true).
|
||||||
|
|
||||||
-spec(enable_banned(zone()) -> boolean()).
|
-spec(enable_ban(zone()) -> boolean()).
|
||||||
enable_banned(Zone) ->
|
enable_ban(Zone) ->
|
||||||
get_env(Zone, enable_banned, false).
|
get_env(Zone, enable_ban, false).
|
||||||
|
|
||||||
-spec(enable_flapping_detect(zone()) -> boolean()).
|
-spec(enable_flapping_detect(zone()) -> boolean()).
|
||||||
enable_flapping_detect(Zone) ->
|
enable_flapping_detect(Zone) ->
|
||||||
|
|
|
@ -27,6 +27,8 @@ all() -> emqx_ct:all(?MODULE).
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
application:load(emqx),
|
application:load(emqx),
|
||||||
ok = ekka:start(),
|
ok = ekka:start(),
|
||||||
|
%% for coverage
|
||||||
|
ok = emqx_banned:mnesia(copy),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
|
@ -80,3 +82,14 @@ t_check(_) ->
|
||||||
?assertNot(emqx_banned:check(ClientInfo4)),
|
?assertNot(emqx_banned:check(ClientInfo4)),
|
||||||
?assertEqual(0, emqx_banned:info(size)).
|
?assertEqual(0, emqx_banned:info(size)).
|
||||||
|
|
||||||
|
t_unused(_) ->
|
||||||
|
{ok, Banned} = emqx_banned:start_link(),
|
||||||
|
ok = emqx_banned:add(#banned{who = {client_id, <<"BannedClient">>},
|
||||||
|
until = erlang:system_time(second)
|
||||||
|
}),
|
||||||
|
?assertEqual(ignored, gen_server:call(Banned, unexpected_req)),
|
||||||
|
?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)),
|
||||||
|
?assertEqual(ok, Banned ! ok),
|
||||||
|
timer:sleep(500), %% expiry timer
|
||||||
|
ok = emqx_banned:stop().
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_cm_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include("emqx.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_ct_helpers:start_apps([]),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
emqx_ct_helpers:stop_apps([]).
|
||||||
|
|
||||||
|
t_reg_unreg_channel(_) ->
|
||||||
|
error(not_implemented).
|
||||||
|
|
||||||
|
t_get_set_chan_attrs(_) ->
|
||||||
|
error(not_implemented).
|
||||||
|
|
||||||
|
t_get_set_chan_stats(_) ->
|
||||||
|
error(not_implemented).
|
||||||
|
|
||||||
|
t_open_session(_) ->
|
||||||
|
error(not_implemented).
|
||||||
|
|
||||||
|
t_discard_session(_) ->
|
||||||
|
error(not_implemented).
|
||||||
|
|
||||||
|
t_takeover_session(_) ->
|
||||||
|
error(not_implemented).
|
||||||
|
|
||||||
|
t_lookup_channels(_) ->
|
||||||
|
error(not_implemented).
|
||||||
|
|
||||||
|
t_lock_clientid(_) ->
|
||||||
|
error(not_implemented).
|
||||||
|
|
||||||
|
t_unlock_clientid(_) ->
|
||||||
|
error(not_implemented).
|
||||||
|
|
|
@ -84,8 +84,9 @@ t_is_empty(_) ->
|
||||||
?assert(emqx_inflight:is_empty(Inflight1)).
|
?assert(emqx_inflight:is_empty(Inflight1)).
|
||||||
|
|
||||||
t_window(_) ->
|
t_window(_) ->
|
||||||
|
?assertEqual([], emqx_inflight:window(emqx_inflight:new(0))),
|
||||||
Inflight = emqx_inflight:insert(
|
Inflight = emqx_inflight:insert(
|
||||||
b, 2, emqx_inflight:insert(
|
b, 2, emqx_inflight:insert(
|
||||||
a, 1, emqx_inflight:new(2))),
|
a, 1, emqx_inflight:new(2))),
|
||||||
[a, b] = emqx_inflight:window(Inflight).
|
?assertEqual([a, b], emqx_inflight:window(Inflight)).
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,7 @@ t_get_set_flag(_) ->
|
||||||
?assertNot(emqx_message:get_flag(retain, Msg3)),
|
?assertNot(emqx_message:get_flag(retain, Msg3)),
|
||||||
Msg4 = emqx_message:unset_flag(dup, Msg3),
|
Msg4 = emqx_message:unset_flag(dup, Msg3),
|
||||||
Msg5 = emqx_message:unset_flag(retain, Msg4),
|
Msg5 = emqx_message:unset_flag(retain, Msg4),
|
||||||
|
Msg5 = emqx_message:unset_flag(badflag, Msg5),
|
||||||
?assertEqual(undefined, emqx_message:get_flag(dup, Msg5, undefined)),
|
?assertEqual(undefined, emqx_message:get_flag(dup, Msg5, undefined)),
|
||||||
?assertEqual(undefined, emqx_message:get_flag(retain, Msg5, undefined)),
|
?assertEqual(undefined, emqx_message:get_flag(retain, Msg5, undefined)),
|
||||||
Msg6 = emqx_message:set_flags(#{dup => true, retain => true}, Msg5),
|
Msg6 = emqx_message:set_flags(#{dup => true, retain => true}, Msg5),
|
||||||
|
@ -81,7 +82,7 @@ t_get_set_header(_) ->
|
||||||
?assertEqual(1, emqx_message:get_header(a, Msg3)),
|
?assertEqual(1, emqx_message:get_header(a, Msg3)),
|
||||||
?assertEqual(4, emqx_message:get_header(d, Msg2, 4)),
|
?assertEqual(4, emqx_message:get_header(d, Msg2, 4)),
|
||||||
Msg4 = emqx_message:remove_header(a, Msg3),
|
Msg4 = emqx_message:remove_header(a, Msg3),
|
||||||
Msg4 = emqx_message:remove_header(a, Msg3),
|
Msg4 = emqx_message:remove_header(a, Msg4),
|
||||||
?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg4)).
|
?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg4)).
|
||||||
|
|
||||||
t_undefined_headers(_) ->
|
t_undefined_headers(_) ->
|
||||||
|
@ -93,16 +94,24 @@ t_undefined_headers(_) ->
|
||||||
|
|
||||||
t_format(_) ->
|
t_format(_) ->
|
||||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||||
io:format("~s", [emqx_message:format(Msg)]).
|
io:format("~s~n", [emqx_message:format(Msg)]),
|
||||||
|
Msg1 = #message{id = <<"id">>,
|
||||||
|
qos = ?QOS_0,
|
||||||
|
flags = undefined,
|
||||||
|
headers = undefined
|
||||||
|
},
|
||||||
|
io:format("~s~n", [emqx_message:format(Msg1)]).
|
||||||
|
|
||||||
t_expired(_) ->
|
t_expired(_) ->
|
||||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||||
|
?assertNot(emqx_message:is_expired(Msg)),
|
||||||
Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg),
|
Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
?assertNot(emqx_message:is_expired(Msg1)),
|
?assertNot(emqx_message:is_expired(Msg1)),
|
||||||
timer:sleep(600),
|
timer:sleep(600),
|
||||||
?assert(emqx_message:is_expired(Msg1)),
|
?assert(emqx_message:is_expired(Msg1)),
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
|
Msg = emqx_message:update_expiry(Msg),
|
||||||
Msg2 = emqx_message:update_expiry(Msg1),
|
Msg2 = emqx_message:update_expiry(Msg1),
|
||||||
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).
|
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).
|
||||||
|
|
||||||
|
|
|
@ -21,29 +21,81 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(OPTS, [{enable_acl, true},
|
-define(ENVS, [{use_username_as_clientid, false},
|
||||||
{enable_banned, false}
|
{server_keepalive, 60},
|
||||||
|
{upgrade_qos, false},
|
||||||
|
{session_expiry_interval, 7200},
|
||||||
|
{retry_interval, 20000},
|
||||||
|
{mqueue_store_qos0, true},
|
||||||
|
{mqueue_priorities, none},
|
||||||
|
{mqueue_default_priority, highest},
|
||||||
|
{max_subscriptions, 0},
|
||||||
|
{max_mqueue_len, 1000},
|
||||||
|
{max_inflight, 32},
|
||||||
|
{max_awaiting_rel, 100},
|
||||||
|
{keepalive_backoff, 0.75},
|
||||||
|
{ignore_loop_deliver, false},
|
||||||
|
{idle_timeout, 15000},
|
||||||
|
{force_shutdown_policy, #{max_heap_size => 838860800,
|
||||||
|
message_queue_len => 8000}},
|
||||||
|
{force_gc_policy, #{bytes => 1048576, count => 1000}},
|
||||||
|
{enable_stats, true},
|
||||||
|
{enable_flapping_detect, false},
|
||||||
|
{enable_ban, true},
|
||||||
|
{enable_acl, true},
|
||||||
|
{await_rel_timeout, 300000},
|
||||||
|
{acl_deny_action, ignore}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
t_set_get_env(_) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx),
|
_ = application:load(emqx),
|
||||||
application:set_env(emqx, zones, [{external, ?OPTS}]),
|
application:set_env(emqx, zone_env, val),
|
||||||
{ok, _} = emqx_zone:start_link(),
|
application:set_env(emqx, zones, [{zone, ?ENVS}]),
|
||||||
?assert(emqx_zone:get_env(external, enable_acl)),
|
Config.
|
||||||
?assertNot(emqx_zone:get_env(external, enable_banned)),
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
application:unset_env(emqx, zone_env),
|
||||||
|
application:unset_env(emqx, zones).
|
||||||
|
|
||||||
|
t_zone_env_func(_) ->
|
||||||
|
lists:foreach(fun({Env, Val}) ->
|
||||||
|
case erlang:function_exported(emqx_zone, Env, 1) of
|
||||||
|
true ->
|
||||||
|
?assertEqual(Val, erlang:apply(emqx_zone, Env, [zone]));
|
||||||
|
false -> ok
|
||||||
|
end
|
||||||
|
end, ?ENVS).
|
||||||
|
|
||||||
|
t_get_env(_) ->
|
||||||
|
?assertEqual(val, emqx_zone:get_env(undefined, zone_env)),
|
||||||
|
?assertEqual(val, emqx_zone:get_env(undefined, zone_env, def)),
|
||||||
|
?assert(emqx_zone:get_env(zone, enable_acl)),
|
||||||
|
?assert(emqx_zone:get_env(zone, enable_ban)),
|
||||||
?assertEqual(defval, emqx_zone:get_env(extenal, key, defval)),
|
?assertEqual(defval, emqx_zone:get_env(extenal, key, defval)),
|
||||||
?assertEqual(undefined, emqx_zone:get_env(external, key)),
|
?assertEqual(undefined, emqx_zone:get_env(external, key)),
|
||||||
?assertEqual(undefined, emqx_zone:get_env(internal, key)),
|
?assertEqual(undefined, emqx_zone:get_env(internal, key)),
|
||||||
?assertEqual(def, emqx_zone:get_env(internal, key, def)),
|
?assertEqual(def, emqx_zone:get_env(internal, key, def)).
|
||||||
emqx_zone:stop().
|
|
||||||
|
t_get_set_env(_) ->
|
||||||
|
ok = emqx_zone:set_env(zone, key, val),
|
||||||
|
?assertEqual(val, emqx_zone:get_env(zone, key)),
|
||||||
|
true = emqx_zone:unset_env(zone, key),
|
||||||
|
?assertEqual(undefined, emqx_zone:get_env(zone, key)).
|
||||||
|
|
||||||
t_force_reload(_) ->
|
t_force_reload(_) ->
|
||||||
{ok, _} = emqx_zone:start_link(),
|
{ok, _} = emqx_zone:start_link(),
|
||||||
application:set_env(emqx, zones, [{zone, [{key, val}]}]),
|
?assertEqual(undefined, emqx_zone:get_env(xzone, key)),
|
||||||
?assertEqual(undefined, emqx_zone:get_env(zone, key)),
|
application:set_env(emqx, zones, [{xzone, [{key, val}]}]),
|
||||||
ok = emqx_zone:force_reload(),
|
ok = emqx_zone:force_reload(),
|
||||||
?assertEqual(val, emqx_zone:get_env(zone, key)),
|
?assertEqual(val, emqx_zone:get_env(xzone, key)),
|
||||||
|
emqx_zone:stop().
|
||||||
|
|
||||||
|
t_uncovered_func(_) ->
|
||||||
|
{ok, Pid} = emqx_zone:start_link(),
|
||||||
|
ignored = gen_server:call(Pid, unexpected_call),
|
||||||
|
ok = gen_server:cast(Pid, unexpected_cast),
|
||||||
|
ok = Pid ! ok,
|
||||||
emqx_zone:stop().
|
emqx_zone:stop().
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue