Merge pull request #3147 from emqx/master
Auto-pull-request-by-2019-12-31
This commit is contained in:
commit
c818b22bd4
|
@ -34,7 +34,7 @@
|
|||
|
||||
-spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}).
|
||||
authenticate(ClientInfo = #{zone := Zone}) ->
|
||||
case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
|
||||
case run_hooks('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
|
||||
Result = #{auth_result := success} ->
|
||||
{ok, Result};
|
||||
Result ->
|
||||
|
@ -61,7 +61,7 @@ check_acl_cache(ClientInfo, PubSub, Topic) ->
|
|||
|
||||
do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
||||
Default = emqx_zone:get_env(Zone, acl_nomatch, deny),
|
||||
case emqx_hooks:run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Default) of
|
||||
case run_hooks('client.check_acl', [ClientInfo, PubSub, Topic], Default) of
|
||||
allow -> allow;
|
||||
_Other -> deny
|
||||
end.
|
||||
|
@ -77,3 +77,7 @@ default_auth_result(Zone) ->
|
|||
false -> #{auth_result => not_authorized, anonymous => false}
|
||||
end.
|
||||
|
||||
-compile({inline, [run_hooks/3]}).
|
||||
run_hooks(Name, Args, Acc) ->
|
||||
ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
|
||||
|
||||
|
|
|
@ -230,7 +230,7 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}.
|
|||
-spec(route([emqx_types:route_entry()], emqx_types:delivery())
|
||||
-> emqx_types:publish_result()).
|
||||
route([], #delivery{message = Msg}) ->
|
||||
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
|
||||
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
|
||||
ok = inc_dropped_cnt(Msg),
|
||||
[];
|
||||
|
||||
|
@ -282,8 +282,8 @@ forward(Node, To, Delivery, sync) ->
|
|||
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
|
||||
dispatch(Topic, #delivery{message = Msg}) ->
|
||||
case subscribers(Topic) of
|
||||
[] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
|
||||
_ = inc_dropped_cnt(Topic),
|
||||
[] -> ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
|
||||
ok = inc_dropped_cnt(Topic),
|
||||
{error, no_subscribers};
|
||||
[Sub] -> %% optimize?
|
||||
dispatch(Sub, Topic, Msg);
|
||||
|
|
|
@ -45,7 +45,7 @@
|
|||
, terminate/2
|
||||
]).
|
||||
|
||||
%% export for ct
|
||||
%% Exports for CT
|
||||
-export([set_field/3]).
|
||||
|
||||
-import(emqx_misc,
|
||||
|
@ -204,9 +204,10 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connected}) ->
|
|||
|
||||
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||
case pipeline([fun enrich_conninfo/2,
|
||||
fun run_conn_hooks/2,
|
||||
fun check_connect/2,
|
||||
fun enrich_client/2,
|
||||
fun set_logger_meta/2,
|
||||
fun set_log_meta/2,
|
||||
fun check_banned/2,
|
||||
fun auth_connect/2
|
||||
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
||||
|
@ -288,11 +289,13 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
|
|||
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||
Channel = #channel{clientinfo = ClientInfo}) ->
|
||||
case emqx_packet:check(Packet) of
|
||||
ok -> TopicFilters1 = enrich_subid(Properties, parse_topic_filters(TopicFilters)),
|
||||
TopicFilters2 = emqx_hooks:run_fold('client.subscribe',
|
||||
ok -> TopicFilters1 = parse_topic_filters(TopicFilters),
|
||||
TopicFilters2 = enrich_subid(Properties, TopicFilters1),
|
||||
TopicFilters3 = run_hooks('client.subscribe',
|
||||
[ClientInfo, Properties],
|
||||
TopicFilters1),
|
||||
{ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel),
|
||||
TopicFilters2
|
||||
),
|
||||
{ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel),
|
||||
handle_out(suback, {PacketId, ReasonCodes}, NChannel);
|
||||
{error, ReasonCode} ->
|
||||
handle_out(disconnect, ReasonCode, Channel)
|
||||
|
@ -301,9 +304,10 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|||
handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||
Channel = #channel{clientinfo = ClientInfo}) ->
|
||||
case emqx_packet:check(Packet) of
|
||||
ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
||||
ok -> TopicFilters1 = run_hooks('client.unsubscribe',
|
||||
[ClientInfo, Properties],
|
||||
parse_topic_filters(TopicFilters)),
|
||||
parse_topic_filters(TopicFilters)
|
||||
),
|
||||
{ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
||||
handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
|
||||
{error, ReasonCode} ->
|
||||
|
@ -549,26 +553,24 @@ not_nacked({deliver, _Topic, Msg}) ->
|
|||
| {ok, replies(), channel()}
|
||||
| {shutdown, Reason :: term(), channel()}
|
||||
| {shutdown, Reason :: term(), replies(), channel()}).
|
||||
handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel) ->
|
||||
handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
|
||||
AckProps = run_fold([fun enrich_connack_caps/2,
|
||||
fun enrich_server_keepalive/2,
|
||||
fun enrich_assigned_clientid/2
|
||||
], #{}, Channel),
|
||||
AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
|
||||
AckPacket = run_hooks('client.connack', [ConnInfo],
|
||||
?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps)),
|
||||
return_connack(AckPacket,
|
||||
ensure_keepalive(AckProps,
|
||||
ensure_connected(ConnPkt, Channel)));
|
||||
|
||||
handle_out(connack, {ReasonCode, _ConnPkt},
|
||||
Channel = #channel{conninfo = ConnInfo,
|
||||
clientinfo = ClientInfo}) ->
|
||||
ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
|
||||
AckPacket = ?CONNACK_PACKET(
|
||||
case maps:get(proto_ver, ConnInfo) of
|
||||
handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
|
||||
AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of
|
||||
?MQTT_PROTO_V5 -> ReasonCode;
|
||||
_Other -> emqx_reason_codes:compat(connack, ReasonCode)
|
||||
_ -> emqx_reason_codes:compat(connack, ReasonCode)
|
||||
end),
|
||||
shutdown(emqx_reason_codes:name(ReasonCode), AckPacket, Channel);
|
||||
AckPacket1 = run_hooks('client.connack', [ConnInfo], AckPacket),
|
||||
shutdown(emqx_reason_codes:name(ReasonCode), AckPacket1, Channel);
|
||||
|
||||
%% Optimize?
|
||||
handle_out(publish, [], Channel) ->
|
||||
|
@ -625,10 +627,7 @@ handle_out(Type, Data, Channel) ->
|
|||
%% Return ConnAck
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
return_connack(AckPacket, Channel = #channel{conninfo = ConnInfo,
|
||||
clientinfo = ClientInfo
|
||||
}) ->
|
||||
ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]),
|
||||
return_connack(AckPacket, Channel) ->
|
||||
Replies = [{event, connected}, {connack, AckPacket}],
|
||||
case maybe_resume_session(Channel) of
|
||||
ignore -> {ok, Replies, Channel};
|
||||
|
@ -735,16 +734,18 @@ handle_call(Req, Channel) ->
|
|||
-spec(handle_info(Info :: term(), channel())
|
||||
-> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}).
|
||||
handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
|
||||
TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
||||
TopicFilters1 = run_hooks('client.subscribe',
|
||||
[ClientInfo, #{'Internal' => true}],
|
||||
parse_topic_filters(TopicFilters)),
|
||||
parse_topic_filters(TopicFilters)
|
||||
),
|
||||
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
||||
{ok, NChannel};
|
||||
|
||||
handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
|
||||
TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
||||
TopicFilters1 = run_hooks('client.unsubscribe',
|
||||
[ClientInfo, #{'Internal' => true}],
|
||||
parse_topic_filters(TopicFilters)),
|
||||
parse_topic_filters(TopicFilters)
|
||||
),
|
||||
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
||||
{ok, NChannel};
|
||||
|
||||
|
@ -754,13 +755,12 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
|
|||
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
|
||||
shutdown(Reason, Channel);
|
||||
|
||||
handle_info({sock_closed, Reason},
|
||||
Channel = #channel{conn_state = connected,
|
||||
handle_info({sock_closed, Reason}, Channel =
|
||||
#channel{conn_state = connected,
|
||||
clientinfo = ClientInfo = #{zone := Zone}}) ->
|
||||
emqx_zone:enable_flapping_detect(Zone)
|
||||
andalso emqx_flapping:detect(ClientInfo),
|
||||
Channel1 = ensure_disconnected(
|
||||
mabye_publish_will_msg(Channel)),
|
||||
Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
|
||||
case maybe_shutdown(Reason, Channel1) of
|
||||
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
||||
Shutdown -> Shutdown
|
||||
|
@ -786,9 +786,11 @@ handle_info(Info, Channel) ->
|
|||
-> {ok, channel()}
|
||||
| {ok, replies(), channel()}
|
||||
| {shutdown, Reason :: term(), channel()}).
|
||||
handle_timeout(TRef, {keepalive, StatVal},
|
||||
Channel = #channel{keepalive = Keepalive,
|
||||
timers = #{alive_timer := TRef}}) ->
|
||||
handle_timeout(_TRef, {keepalive, _StatVal},
|
||||
Channel = #channel{keepalive = undefined}) ->
|
||||
{ok, Channel};
|
||||
handle_timeout(_TRef, {keepalive, StatVal},
|
||||
Channel = #channel{keepalive = Keepalive}) ->
|
||||
case emqx_keepalive:check(StatVal, Keepalive) of
|
||||
{ok, NKeepalive} ->
|
||||
NChannel = Channel#channel{keepalive = NKeepalive},
|
||||
|
@ -797,9 +799,8 @@ handle_timeout(TRef, {keepalive, StatVal},
|
|||
handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
|
||||
end;
|
||||
|
||||
handle_timeout(TRef, retry_delivery,
|
||||
Channel = #channel{session = Session,
|
||||
timers = #{retry_timer := TRef}}) ->
|
||||
handle_timeout(_TRef, retry_delivery,
|
||||
Channel = #channel{session = Session}) ->
|
||||
case emqx_session:retry(Session) of
|
||||
{ok, NSession} ->
|
||||
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
||||
|
@ -811,9 +812,8 @@ handle_timeout(TRef, retry_delivery,
|
|||
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
||||
end;
|
||||
|
||||
handle_timeout(TRef, expire_awaiting_rel,
|
||||
Channel = #channel{session = Session,
|
||||
timers = #{await_timer := TRef}}) ->
|
||||
handle_timeout(_TRef, expire_awaiting_rel,
|
||||
Channel = #channel{session = Session}) ->
|
||||
case emqx_session:expire(awaiting_rel, Session) of
|
||||
{ok, Session} ->
|
||||
{ok, clean_timer(await_timer, Channel#channel{session = Session})};
|
||||
|
@ -821,11 +821,10 @@ handle_timeout(TRef, expire_awaiting_rel,
|
|||
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})}
|
||||
end;
|
||||
|
||||
handle_timeout(TRef, expire_session, Channel = #channel{timers = #{expire_timer := TRef}}) ->
|
||||
handle_timeout(_TRef, expire_session, Channel) ->
|
||||
shutdown(expired, Channel);
|
||||
|
||||
handle_timeout(TRef, will_message, Channel = #channel{will_msg = WillMsg,
|
||||
timers = #{will_timer := TRef}}) ->
|
||||
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
||||
|
||||
|
@ -879,19 +878,19 @@ interval(will_timer, #channel{will_msg = WillMsg}) ->
|
|||
%% Terminate
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
terminate(_, #channel{conn_state = idle}) ->
|
||||
ok;
|
||||
terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
|
||||
ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]);
|
||||
terminate({shutdown, Reason}, #channel{conninfo = ConnInfo, clientinfo = ClientInfo})
|
||||
when Reason =:= kicked orelse Reason =:= discarded orelse Reason =:= takeovered ->
|
||||
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]);
|
||||
terminate(Reason, #channel{conninfo = ConnInfo, clientinfo = ClientInfo, will_msg = WillMsg}) ->
|
||||
case WillMsg of
|
||||
undefined -> ok;
|
||||
_ -> publish_will_msg(WillMsg)
|
||||
end,
|
||||
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]).
|
||||
terminate(_, #channel{conn_state = idle}) -> ok;
|
||||
terminate(normal, Channel) ->
|
||||
run_terminate_hook(normal, Channel);
|
||||
terminate({shutdown, Reason}, Channel)
|
||||
when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered ->
|
||||
run_terminate_hook(Reason, Channel);
|
||||
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||
run_terminate_hook(Reason, Channel).
|
||||
|
||||
run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
|
||||
run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
|
||||
emqx_session:terminate(ClientInfo, Reason, Session).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
|
@ -940,6 +939,15 @@ expiry_interval(_ClientInfo, #mqtt_packet_connect{clean_start = true}) ->
|
|||
receive_maximum(#{zone := Zone}, ConnProps) ->
|
||||
emqx_mqtt_props:get('Receive-Maximum', ConnProps, emqx_zone:max_inflight(Zone)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Run Connect Hooks
|
||||
|
||||
run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) ->
|
||||
case run_hooks('client.connect', [ConnInfo], ConnPkt) of
|
||||
Error = {error, _Reason} -> Error;
|
||||
NConnPkt -> {ok, NConnPkt, Channel}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Check Connect Packet
|
||||
|
||||
|
@ -987,9 +995,9 @@ fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := MountPoint}) ->
|
|||
{ok, ClientInfo#{mountpoint := MountPoint1}}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Set logger metadata
|
||||
%% Set log metadata
|
||||
|
||||
set_logger_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
|
||||
set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
|
||||
emqx_logger:set_metadata_clientid(ClientId).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -1172,6 +1180,19 @@ enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo,
|
|||
_Origin -> AckProps
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure connected
|
||||
|
||||
ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo,
|
||||
clientinfo = ClientInfo}) ->
|
||||
NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)},
|
||||
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
|
||||
Channel#channel{conninfo = NConnInfo,
|
||||
conn_state = connected,
|
||||
will_msg = emqx_packet:will_msg(ConnPkt),
|
||||
alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Init Alias Maximum
|
||||
|
||||
|
@ -1183,20 +1204,6 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
|
|||
};
|
||||
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure connected
|
||||
|
||||
ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo,
|
||||
clientinfo = ClientInfo}) ->
|
||||
WillMsg = emqx_packet:will_msg(ConnPkt),
|
||||
AliasMaximum = init_alias_maximum(ConnPkt, ClientInfo),
|
||||
NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)},
|
||||
Channel#channel{conninfo = NConnInfo,
|
||||
will_msg = WillMsg,
|
||||
conn_state = connected,
|
||||
alias_maximum = AliasMaximum
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Enrich Keepalive
|
||||
|
||||
|
@ -1255,8 +1262,10 @@ parse_topic_filters(TopicFilters) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Ensure disconnected
|
||||
|
||||
ensure_disconnected(Channel = #channel{conninfo = ConnInfo}) ->
|
||||
ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
|
||||
clientinfo = ClientInfo}) ->
|
||||
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(second)},
|
||||
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
||||
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -1286,6 +1295,13 @@ disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode).
|
|||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-compile({inline, [run_hooks/2, run_hooks/3]}).
|
||||
run_hooks(Name, Args) ->
|
||||
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
|
||||
|
||||
run_hooks(Name, Args, Acc) ->
|
||||
ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
|
||||
|
||||
-compile({inline, [find_alias/2, save_alias/3]}).
|
||||
|
||||
find_alias(_AliasId, undefined) -> false;
|
||||
|
|
|
@ -206,7 +206,7 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
|
|||
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||
CleanStart = fun(_) ->
|
||||
ok = discard_session(ClientId),
|
||||
Session = emqx_session:init(ClientInfo, ConnInfo),
|
||||
Session = create_session(ClientInfo, ConnInfo),
|
||||
{ok, #{session => Session, present => false}}
|
||||
end,
|
||||
emqx_cm_locker:trans(ClientId, CleanStart);
|
||||
|
@ -215,18 +215,24 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
|||
ResumeStart = fun(_) ->
|
||||
case takeover_session(ClientId) of
|
||||
{ok, ConnMod, ChanPid, Session} ->
|
||||
ok = emqx_session:resume(ClientId, Session),
|
||||
ok = emqx_session:resume(ClientInfo, Session),
|
||||
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
|
||||
{ok, #{session => Session,
|
||||
present => true,
|
||||
pendings => Pendings}};
|
||||
{error, not_found} ->
|
||||
Session = emqx_session:init(ClientInfo, ConnInfo),
|
||||
Session = create_session(ClientInfo, ConnInfo),
|
||||
{ok, #{session => Session, present => false}}
|
||||
end
|
||||
end,
|
||||
emqx_cm_locker:trans(ClientId, ResumeStart).
|
||||
|
||||
create_session(ClientInfo, ConnInfo) ->
|
||||
Session = emqx_session:init(ClientInfo, ConnInfo),
|
||||
ok = emqx_metrics:inc('session.created'),
|
||||
ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
|
||||
Session.
|
||||
|
||||
%% @doc Try to takeover a session.
|
||||
-spec(takeover_session(emqx_types:clientid())
|
||||
-> {ok, emqx_session:session()} | {error, Reason :: term()}).
|
||||
|
|
|
@ -459,17 +459,17 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Handle timeout
|
||||
|
||||
handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
|
||||
handle_timeout(_TRef, idle_timeout, State) ->
|
||||
shutdown(idle_timeout, State);
|
||||
|
||||
handle_timeout(TRef, limit_timeout, State = #state{limit_timer = TRef}) ->
|
||||
handle_timeout(_TRef, limit_timeout, State) ->
|
||||
NState = State#state{sockstate = idle,
|
||||
limit_timer = undefined
|
||||
},
|
||||
handle_info(activate_socket, NState);
|
||||
|
||||
handle_timeout(TRef, emit_stats, State =
|
||||
#state{stats_timer = TRef, channel = Channel}) ->
|
||||
handle_timeout(_TRef, emit_stats, State =
|
||||
#state{channel = Channel}) ->
|
||||
ClientId = emqx_channel:info(clientid, Channel),
|
||||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
{ok, State#state{stats_timer = undefined}};
|
||||
|
|
|
@ -157,7 +157,9 @@
|
|||
|
||||
%% Client Lifecircle metrics
|
||||
-define(CLIENT_METRICS,
|
||||
[{counter, 'client.connected'},
|
||||
[{counter, 'client.connect'},
|
||||
{counter, 'client.connack'},
|
||||
{counter, 'client.connected'},
|
||||
{counter, 'client.authenticate'},
|
||||
{counter, 'client.auth.anonymous'},
|
||||
{counter, 'client.check_acl'},
|
||||
|
@ -512,13 +514,15 @@ reserved_idx('delivery.dropped.qos0_msg') -> 121;
|
|||
reserved_idx('delivery.dropped.queue_full') -> 122;
|
||||
reserved_idx('delivery.dropped.expired') -> 123;
|
||||
|
||||
reserved_idx('client.connected') -> 200;
|
||||
reserved_idx('client.authenticate') -> 201;
|
||||
reserved_idx('client.auth.anonymous') -> 202;
|
||||
reserved_idx('client.check_acl') -> 203;
|
||||
reserved_idx('client.subscribe') -> 204;
|
||||
reserved_idx('client.unsubscribe') -> 205;
|
||||
reserved_idx('client.disconnected') -> 206;
|
||||
reserved_idx('client.connect') -> 200;
|
||||
reserved_idx('client.connack') -> 201;
|
||||
reserved_idx('client.connected') -> 202;
|
||||
reserved_idx('client.authenticate') -> 203;
|
||||
reserved_idx('client.auth.anonymous') -> 204;
|
||||
reserved_idx('client.check_acl') -> 205;
|
||||
reserved_idx('client.subscribe') -> 206;
|
||||
reserved_idx('client.unsubscribe') -> 207;
|
||||
reserved_idx('client.disconnected') -> 208;
|
||||
|
||||
reserved_idx('session.created') -> 220;
|
||||
reserved_idx('session.resumed') -> 221;
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
, unload/1
|
||||
]).
|
||||
|
||||
-export([ on_client_connected/4
|
||||
-export([ on_client_connected/3
|
||||
, on_client_disconnected/4
|
||||
]).
|
||||
|
||||
|
@ -44,26 +44,12 @@ unload(_Env) ->
|
|||
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
|
||||
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
|
||||
|
||||
on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) ->
|
||||
#{peerhost := PeerHost} = ClientInfo,
|
||||
#{clean_start := CleanStart,
|
||||
proto_name := ProtoName,
|
||||
proto_ver := ProtoVer,
|
||||
keepalive := Keepalive,
|
||||
expiry_interval := ExpiryInterval} = ConnInfo,
|
||||
ClientId = clientid(ClientInfo, ConnInfo),
|
||||
Username = username(ClientInfo, ConnInfo),
|
||||
Presence = #{clientid => ClientId,
|
||||
username => Username,
|
||||
ipaddress => ntoa(PeerHost),
|
||||
proto_name => ProtoName,
|
||||
proto_ver => ProtoVer,
|
||||
keepalive => Keepalive,
|
||||
connack => ConnAck,
|
||||
clean_start => CleanStart,
|
||||
expiry_interval => ExpiryInterval,
|
||||
ts => erlang:system_time(millisecond)
|
||||
},
|
||||
%%--------------------------------------------------------------------
|
||||
%% Callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) ->
|
||||
Presence = connected_presence(ClientInfo, ConnInfo),
|
||||
case emqx_json:safe_encode(Presence) of
|
||||
{ok, Payload} ->
|
||||
emqx_broker:safe_publish(
|
||||
|
@ -72,12 +58,12 @@ on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) ->
|
|||
?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
|
||||
end.
|
||||
|
||||
on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
|
||||
ClientId = clientid(ClientInfo, ConnInfo),
|
||||
Username = username(ClientInfo, ConnInfo),
|
||||
on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username},
|
||||
Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
|
||||
Presence = #{clientid => ClientId,
|
||||
username => Username,
|
||||
reason => reason(Reason),
|
||||
disconnected_at => DisconnectedAt,
|
||||
ts => erlang:system_time(millisecond)
|
||||
},
|
||||
case emqx_json:safe_encode(Presence) of
|
||||
|
@ -88,11 +74,35 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
|
|||
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
|
||||
end.
|
||||
|
||||
clientid(#{clientid := undefined}, #{clientid := ClientId}) -> ClientId;
|
||||
clientid(#{clientid := ClientId}, _ConnInfo) -> ClientId.
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
username(#{username := undefined}, #{username := Username}) -> Username;
|
||||
username(#{username := Username}, _ConnInfo) -> Username.
|
||||
connected_presence(#{peerhost := PeerHost,
|
||||
sockport := SockPort,
|
||||
clientid := ClientId,
|
||||
username := Username
|
||||
},
|
||||
#{clean_start := CleanStart,
|
||||
proto_name := ProtoName,
|
||||
proto_ver := ProtoVer,
|
||||
keepalive := Keepalive,
|
||||
connected_at := ConnectedAt,
|
||||
expiry_interval := ExpiryInterval
|
||||
}) ->
|
||||
#{clientid => ClientId,
|
||||
username => Username,
|
||||
ipaddress => ntoa(PeerHost),
|
||||
sockport => SockPort,
|
||||
proto_name => ProtoName,
|
||||
proto_ver => ProtoVer,
|
||||
keepalive => Keepalive,
|
||||
connack => 0, %% Deprecated?
|
||||
clean_start => CleanStart,
|
||||
expiry_interval => ExpiryInterval,
|
||||
connected_at => ConnectedAt,
|
||||
ts => erlang:system_time(millisecond)
|
||||
}.
|
||||
|
||||
make_msg(QoS, Topic, Payload) ->
|
||||
emqx_message:set_flag(
|
||||
|
@ -106,6 +116,7 @@ topic(disconnected, ClientId) ->
|
|||
|
||||
qos(Env) -> proplists:get_value(qos, Env, 0).
|
||||
|
||||
-compile({inline, [reason/1]}).
|
||||
reason(Reason) when is_atom(Reason) -> Reason;
|
||||
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
|
||||
reason({Error, _}) when is_atom(Error) -> Error;
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
]).
|
||||
|
||||
%% APIs
|
||||
-export([on_client_connected/4]).
|
||||
-export([on_client_connected/3]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Load/Unload Hook
|
||||
|
@ -36,8 +36,7 @@
|
|||
load(Topics) ->
|
||||
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
|
||||
|
||||
on_client_connected(#{clientid := ClientId,
|
||||
username := Username}, ?RC_SUCCESS, _ConnInfo, Topics) ->
|
||||
on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo, Topics) ->
|
||||
Replace = fun(Topic) ->
|
||||
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
|
||||
end,
|
||||
|
@ -47,9 +46,9 @@ on_client_connected(#{clientid := ClientId,
|
|||
unload(_) ->
|
||||
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
rep(<<"%c">>, ClientId, Topic) ->
|
||||
emqx_topic:feed_var(<<"%c">>, ClientId, Topic);
|
||||
|
|
|
@ -27,8 +27,10 @@
|
|||
, retain/1
|
||||
]).
|
||||
|
||||
%% Field APIs
|
||||
-export([ proto_name/1
|
||||
, proto_ver/1
|
||||
, info/2
|
||||
]).
|
||||
|
||||
%% Check API
|
||||
|
@ -95,6 +97,100 @@ proto_ver(?CONNECT_PACKET(ConnPkt)) ->
|
|||
proto_ver(#mqtt_packet_connect{proto_ver = Ver}) ->
|
||||
Ver.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Field Info
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
info(proto_name, #mqtt_packet_connect{proto_name = Name}) ->
|
||||
Name;
|
||||
info(proto_ver, #mqtt_packet_connect{proto_ver = Ver}) ->
|
||||
Ver;
|
||||
info(is_bridge, #mqtt_packet_connect{is_bridge = IsBridge}) ->
|
||||
IsBridge;
|
||||
info(clean_start, #mqtt_packet_connect{clean_start = CleanStart}) ->
|
||||
CleanStart;
|
||||
info(will_flag, #mqtt_packet_connect{will_flag = WillFlag}) ->
|
||||
WillFlag;
|
||||
info(will_qos, #mqtt_packet_connect{will_qos = WillQoS}) ->
|
||||
WillQoS;
|
||||
info(will_retain, #mqtt_packet_connect{will_retain = WillRetain}) ->
|
||||
WillRetain;
|
||||
info(keepalive, #mqtt_packet_connect{keepalive = KeepAlive}) ->
|
||||
KeepAlive;
|
||||
info(properties, #mqtt_packet_connect{properties = Props}) ->
|
||||
Props;
|
||||
info(clientid, #mqtt_packet_connect{clientid = ClientId}) ->
|
||||
ClientId;
|
||||
info(will_props, #mqtt_packet_connect{will_props = WillProps}) ->
|
||||
WillProps;
|
||||
info(will_topic, #mqtt_packet_connect{will_topic = WillTopic}) ->
|
||||
WillTopic;
|
||||
info(will_payload, #mqtt_packet_connect{will_payload = Payload}) ->
|
||||
Payload;
|
||||
info(username, #mqtt_packet_connect{username = Username}) ->
|
||||
Username;
|
||||
info(password, #mqtt_packet_connect{password = Password}) ->
|
||||
Password;
|
||||
|
||||
info(ack_flags, #mqtt_packet_connack{ack_flags = Flags}) ->
|
||||
Flags;
|
||||
info(reason_code, #mqtt_packet_connack{reason_code = RC}) ->
|
||||
RC;
|
||||
info(properties, #mqtt_packet_connack{properties = Props}) ->
|
||||
Props;
|
||||
|
||||
info(topic_name, #mqtt_packet_publish{topic_name = Topic}) ->
|
||||
Topic;
|
||||
info(packet_id, #mqtt_packet_publish{packet_id = PacketId}) ->
|
||||
PacketId;
|
||||
info(properties, #mqtt_packet_publish{properties = Props}) ->
|
||||
Props;
|
||||
|
||||
info(packet_id, #mqtt_packet_puback{packet_id = PacketId}) ->
|
||||
PacketId;
|
||||
info(reason_code, #mqtt_packet_puback{reason_code = RC}) ->
|
||||
RC;
|
||||
info(properties, #mqtt_packet_puback{properties = Props}) ->
|
||||
Props;
|
||||
|
||||
info(packet_id, #mqtt_packet_subscribe{packet_id = PacketId}) ->
|
||||
PacketId;
|
||||
info(properties, #mqtt_packet_subscribe{properties = Props}) ->
|
||||
Props;
|
||||
info(topic_filters, #mqtt_packet_subscribe{topic_filters = Topics}) ->
|
||||
Topics;
|
||||
|
||||
info(packet_id, #mqtt_packet_suback{packet_id = PacketId}) ->
|
||||
PacketId;
|
||||
info(properties, #mqtt_packet_suback{properties = Props}) ->
|
||||
Props;
|
||||
info(reason_codes, #mqtt_packet_suback{reason_codes = RCs}) ->
|
||||
RCs;
|
||||
|
||||
info(packet_id, #mqtt_packet_unsubscribe{packet_id = PacketId}) ->
|
||||
PacketId;
|
||||
info(properties, #mqtt_packet_unsubscribe{properties = Props}) ->
|
||||
Props;
|
||||
info(topic_filters, #mqtt_packet_unsubscribe{topic_filters = Topics}) ->
|
||||
Topics;
|
||||
|
||||
info(packet_id, #mqtt_packet_unsuback{packet_id = PacketId}) ->
|
||||
PacketId;
|
||||
info(properties, #mqtt_packet_unsuback{properties = Props}) ->
|
||||
Props;
|
||||
info(reason_codes, #mqtt_packet_unsuback{reason_codes = RCs}) ->
|
||||
RCs;
|
||||
|
||||
info(reason_code, #mqtt_packet_disconnect{reason_code = RC}) ->
|
||||
RC;
|
||||
info(properties, #mqtt_packet_disconnect{properties = Props}) ->
|
||||
Props;
|
||||
|
||||
info(reason_code, #mqtt_packet_auth{reason_code = RC}) ->
|
||||
RC;
|
||||
info(properties, #mqtt_packet_auth{properties = Props}) ->
|
||||
Props.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Check MQTT Packet
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -76,6 +76,7 @@
|
|||
-export([ deliver/2
|
||||
, enqueue/2
|
||||
, retry/1
|
||||
, terminate/3
|
||||
]).
|
||||
|
||||
-export([ takeover/1
|
||||
|
@ -604,11 +605,13 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
|
|||
takeover(#session{subscriptions = Subs}) ->
|
||||
lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)).
|
||||
|
||||
-spec(resume(emqx_types:clientid(), session()) -> ok).
|
||||
resume(ClientId, #session{subscriptions = Subs}) ->
|
||||
-spec(resume(emqx_types:clientinfo(), session()) -> ok).
|
||||
resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = Subs}) ->
|
||||
lists:foreach(fun({TopicFilter, SubOpts}) ->
|
||||
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
|
||||
end, maps:to_list(Subs)).
|
||||
end, maps:to_list(Subs)),
|
||||
ok = emqx_metrics:inc('session.resumed'),
|
||||
emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]).
|
||||
|
||||
-spec(replay(session()) -> {ok, replies(), session()}).
|
||||
replay(Session = #session{inflight = Inflight}) ->
|
||||
|
@ -626,6 +629,18 @@ replay(Inflight) ->
|
|||
{PacketId, emqx_message:set_flag(dup, true, Msg)}
|
||||
end, emqx_inflight:to_list(Inflight)).
|
||||
|
||||
-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
|
||||
terminate(ClientInfo, discarded, Session) ->
|
||||
run_hook('session.discarded', [ClientInfo, info(Session)]);
|
||||
terminate(ClientInfo, takeovered, Session) ->
|
||||
run_hook('session.takeovered', [ClientInfo, info(Session)]);
|
||||
terminate(ClientInfo, Reason, Session) ->
|
||||
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
||||
|
||||
-compile({inline, [run_hook/2]}).
|
||||
run_hook(Name, Args) ->
|
||||
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Inc message/delivery expired counter
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -44,6 +44,10 @@ init_per_suite(Config) ->
|
|||
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
||||
ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
|
||||
ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
|
||||
%% Meck Hooks
|
||||
ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]),
|
||||
ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
|
||||
ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> {ok, Acc} end),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
@ -53,6 +57,7 @@ end_per_suite(_Config) ->
|
|||
ok = meck:unload(emqx_limiter),
|
||||
ok = meck:unload(emqx_pd),
|
||||
ok = meck:unload(emqx_metrics),
|
||||
ok = meck:unload(emqx_hooks),
|
||||
ok.
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
|
|
|
@ -78,6 +78,85 @@ t_proto_ver(_) ->
|
|||
?assertEqual(Ver, emqx_packet:proto_ver(ConnPkt))
|
||||
end, [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, ?MQTT_PROTO_V5]).
|
||||
|
||||
t_connect_info(_) ->
|
||||
ConnPkt = #mqtt_packet_connect{will_flag = true,
|
||||
clientid = <<"clientid">>,
|
||||
username = <<"username">>,
|
||||
will_retain = true,
|
||||
will_qos = ?QOS_2,
|
||||
will_topic = <<"topic">>,
|
||||
will_props = undefined,
|
||||
will_payload = <<"payload">>
|
||||
},
|
||||
?assertEqual(<<"MQTT">>, emqx_packet:info(proto_name, ConnPkt)),
|
||||
?assertEqual(4, emqx_packet:info(proto_ver, ConnPkt)),
|
||||
?assertEqual(false, emqx_packet:info(is_bridge, ConnPkt)),
|
||||
?assertEqual(true, emqx_packet:info(clean_start, ConnPkt)),
|
||||
?assertEqual(true, emqx_packet:info(will_flag, ConnPkt)),
|
||||
?assertEqual(?QOS_2, emqx_packet:info(will_qos, ConnPkt)),
|
||||
?assertEqual(true, emqx_packet:info(will_retain, ConnPkt)),
|
||||
?assertEqual(0, emqx_packet:info(keepalive, ConnPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(properties, ConnPkt)),
|
||||
?assertEqual(<<"clientid">>, emqx_packet:info(clientid, ConnPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(will_props, ConnPkt)),
|
||||
?assertEqual(<<"topic">>, emqx_packet:info(will_topic, ConnPkt)),
|
||||
?assertEqual(<<"payload">>, emqx_packet:info(will_payload, ConnPkt)),
|
||||
?assertEqual(<<"username">>, emqx_packet:info(username, ConnPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(password, ConnPkt)).
|
||||
|
||||
t_connack_info(_) ->
|
||||
AckPkt = #mqtt_packet_connack{ack_flags = 0, reason_code = 0},
|
||||
?assertEqual(0, emqx_packet:info(ack_flags, AckPkt)),
|
||||
?assertEqual(0, emqx_packet:info(reason_code, AckPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(properties, AckPkt)).
|
||||
|
||||
t_publish_info(_) ->
|
||||
PubPkt = #mqtt_packet_publish{topic_name = <<"t">>, packet_id = 1},
|
||||
?assertEqual(1, emqx_packet:info(packet_id, PubPkt)),
|
||||
?assertEqual(<<"t">>, emqx_packet:info(topic_name, PubPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(properties, PubPkt)).
|
||||
|
||||
t_puback_info(_) ->
|
||||
AckPkt = #mqtt_packet_puback{packet_id = 1, reason_code = 0},
|
||||
?assertEqual(1, emqx_packet:info(packet_id, AckPkt)),
|
||||
?assertEqual(0, emqx_packet:info(reason_code, AckPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(properties, AckPkt)).
|
||||
|
||||
t_subscribe_info(_) ->
|
||||
TopicFilters = [{<<"t/#">>, #{}}],
|
||||
SubPkt = #mqtt_packet_subscribe{packet_id = 1, topic_filters = TopicFilters},
|
||||
?assertEqual(1, emqx_packet:info(packet_id, SubPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(properties, SubPkt)),
|
||||
?assertEqual(TopicFilters, emqx_packet:info(topic_filters, SubPkt)).
|
||||
|
||||
t_suback_info(_) ->
|
||||
SubackPkt = #mqtt_packet_suback{packet_id = 1, reason_codes = [0]},
|
||||
?assertEqual(1, emqx_packet:info(packet_id, SubackPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(properties, SubackPkt)),
|
||||
?assertEqual([0], emqx_packet:info(reason_codes, SubackPkt)).
|
||||
|
||||
t_unsubscribe_info(_) ->
|
||||
UnsubPkt = #mqtt_packet_unsubscribe{packet_id = 1, topic_filters = [<<"t/#">>]},
|
||||
?assertEqual(1, emqx_packet:info(packet_id, UnsubPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(properties, UnsubPkt)),
|
||||
?assertEqual([<<"t/#">>], emqx_packet:info(topic_filters, UnsubPkt)).
|
||||
|
||||
t_unsuback_info(_) ->
|
||||
AckPkt = #mqtt_packet_unsuback{packet_id = 1, reason_codes = [0]},
|
||||
?assertEqual(1, emqx_packet:info(packet_id, AckPkt)),
|
||||
?assertEqual([0], emqx_packet:info(reason_codes, AckPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(properties, AckPkt)).
|
||||
|
||||
t_disconnect_info(_) ->
|
||||
DisconnPkt = #mqtt_packet_disconnect{reason_code = 0},
|
||||
?assertEqual(0, emqx_packet:info(reason_code, DisconnPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(properties, DisconnPkt)).
|
||||
|
||||
t_auth_info(_) ->
|
||||
AuthPkt = #mqtt_packet_auth{reason_code = 0},
|
||||
?assertEqual(0, emqx_packet:info(reason_code, AuthPkt)),
|
||||
?assertEqual(undefined, emqx_packet:info(properties, AuthPkt)).
|
||||
|
||||
t_check_publish(_) ->
|
||||
Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1},
|
||||
ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)),
|
||||
|
|
|
@ -30,7 +30,6 @@ all() -> emqx_ct:all(?MODULE).
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
init_per_suite(Config) ->
|
||||
%% Broker
|
||||
ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker],
|
||||
[passthrough, no_history, no_link]),
|
||||
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
||||
|
@ -329,7 +328,7 @@ t_takeover(_) ->
|
|||
t_resume(_) ->
|
||||
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
|
||||
Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
|
||||
ok = emqx_session:resume(<<"clientid">>, Session).
|
||||
ok = emqx_session:resume(#{clientid => <<"clientid">>}, Session).
|
||||
|
||||
t_replay(_) ->
|
||||
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
||||
|
|
Loading…
Reference in New Issue