Breaking Change: Add new hooks for client and session lifecircle (#3138)
This commit is contained in:
parent
d000284a40
commit
b7ca3905a6
|
@ -24,6 +24,8 @@
|
||||||
, reload_acl/0
|
, reload_acl/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-import(emqx_hooks, [run_fold/3]).
|
||||||
|
|
||||||
-type(result() :: #{auth_result := emqx_types:auth_result(),
|
-type(result() :: #{auth_result := emqx_types:auth_result(),
|
||||||
anonymous := boolean()
|
anonymous := boolean()
|
||||||
}).
|
}).
|
||||||
|
@ -34,9 +36,9 @@
|
||||||
|
|
||||||
-spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}).
|
-spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}).
|
||||||
authenticate(ClientInfo = #{zone := Zone}) ->
|
authenticate(ClientInfo = #{zone := Zone}) ->
|
||||||
case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
|
case run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
|
||||||
Result = #{auth_result := success} ->
|
Result = #{auth_result := success} ->
|
||||||
{ok, Result};
|
{ok, Result};
|
||||||
Result ->
|
Result ->
|
||||||
{error, maps:get(auth_result, Result, unknown_error)}
|
{error, maps:get(auth_result, Result, unknown_error)}
|
||||||
end.
|
end.
|
||||||
|
@ -61,7 +63,7 @@ check_acl_cache(ClientInfo, PubSub, Topic) ->
|
||||||
|
|
||||||
do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
||||||
Default = emqx_zone:get_env(Zone, acl_nomatch, deny),
|
Default = emqx_zone:get_env(Zone, acl_nomatch, deny),
|
||||||
case emqx_hooks:run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Default) of
|
case run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Default) of
|
||||||
allow -> allow;
|
allow -> allow;
|
||||||
_Other -> deny
|
_Other -> deny
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -230,7 +230,7 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}.
|
||||||
-spec(route([emqx_types:route_entry()], emqx_types:delivery())
|
-spec(route([emqx_types:route_entry()], emqx_types:delivery())
|
||||||
-> emqx_types:publish_result()).
|
-> emqx_types:publish_result()).
|
||||||
route([], #delivery{message = Msg}) ->
|
route([], #delivery{message = Msg}) ->
|
||||||
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
|
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
|
||||||
ok = inc_dropped_cnt(Msg),
|
ok = inc_dropped_cnt(Msg),
|
||||||
[];
|
[];
|
||||||
|
|
||||||
|
@ -282,8 +282,8 @@ forward(Node, To, Delivery, sync) ->
|
||||||
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
|
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
|
||||||
dispatch(Topic, #delivery{message = Msg}) ->
|
dispatch(Topic, #delivery{message = Msg}) ->
|
||||||
case subscribers(Topic) of
|
case subscribers(Topic) of
|
||||||
[] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
|
[] -> ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
|
||||||
_ = inc_dropped_cnt(Topic),
|
ok = inc_dropped_cnt(Topic),
|
||||||
{error, no_subscribers};
|
{error, no_subscribers};
|
||||||
[Sub] -> %% optimize?
|
[Sub] -> %% optimize?
|
||||||
dispatch(Sub, Topic, Msg);
|
dispatch(Sub, Topic, Msg);
|
||||||
|
|
|
@ -45,7 +45,7 @@
|
||||||
, terminate/2
|
, terminate/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% export for ct
|
%% Exports for CT
|
||||||
-export([set_field/3]).
|
-export([set_field/3]).
|
||||||
|
|
||||||
-import(emqx_misc,
|
-import(emqx_misc,
|
||||||
|
@ -204,9 +204,10 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connected}) ->
|
||||||
|
|
||||||
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||||
case pipeline([fun enrich_conninfo/2,
|
case pipeline([fun enrich_conninfo/2,
|
||||||
|
fun run_conn_hooks/2,
|
||||||
fun check_connect/2,
|
fun check_connect/2,
|
||||||
fun enrich_client/2,
|
fun enrich_client/2,
|
||||||
fun set_logger_meta/2,
|
fun set_log_meta/2,
|
||||||
fun check_banned/2,
|
fun check_banned/2,
|
||||||
fun auth_connect/2
|
fun auth_connect/2
|
||||||
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
||||||
|
@ -549,26 +550,25 @@ not_nacked({deliver, _Topic, Msg}) ->
|
||||||
| {ok, replies(), channel()}
|
| {ok, replies(), channel()}
|
||||||
| {shutdown, Reason :: term(), channel()}
|
| {shutdown, Reason :: term(), channel()}
|
||||||
| {shutdown, Reason :: term(), replies(), channel()}).
|
| {shutdown, Reason :: term(), replies(), channel()}).
|
||||||
handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel) ->
|
handle_out(connack, {RC = ?RC_SUCCESS, SP, ConnPkt},
|
||||||
|
Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
AckProps = run_fold([fun enrich_connack_caps/2,
|
AckProps = run_fold([fun enrich_connack_caps/2,
|
||||||
fun enrich_server_keepalive/2,
|
fun enrich_server_keepalive/2,
|
||||||
fun enrich_assigned_clientid/2
|
fun enrich_assigned_clientid/2
|
||||||
], #{}, Channel),
|
], #{}, Channel),
|
||||||
AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
|
AckPacket = ?CONNACK_PACKET(RC, SP, AckProps),
|
||||||
return_connack(AckPacket,
|
AckPacket1 = emqx_hooks:run_fold('client.connack', [ConnInfo], AckPacket),
|
||||||
|
return_connack(AckPacket1,
|
||||||
ensure_keepalive(AckProps,
|
ensure_keepalive(AckProps,
|
||||||
ensure_connected(ConnPkt, Channel)));
|
ensure_connected(ConnPkt, Channel)));
|
||||||
|
|
||||||
handle_out(connack, {ReasonCode, _ConnPkt},
|
handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
Channel = #channel{conninfo = ConnInfo,
|
AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of
|
||||||
clientinfo = ClientInfo}) ->
|
?MQTT_PROTO_V5 -> ReasonCode;
|
||||||
ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
|
_ -> emqx_reason_codes:compat(connack, ReasonCode)
|
||||||
AckPacket = ?CONNACK_PACKET(
|
end),
|
||||||
case maps:get(proto_ver, ConnInfo) of
|
AckPacket1 = emqx_hooks:run_fold('client.connack', [ConnInfo], AckPacket),
|
||||||
?MQTT_PROTO_V5 -> ReasonCode;
|
shutdown(emqx_reason_codes:name(ReasonCode), AckPacket1, Channel);
|
||||||
_Other -> emqx_reason_codes:compat(connack, ReasonCode)
|
|
||||||
end),
|
|
||||||
shutdown(emqx_reason_codes:name(ReasonCode), AckPacket, Channel);
|
|
||||||
|
|
||||||
%% Optimize?
|
%% Optimize?
|
||||||
handle_out(publish, [], Channel) ->
|
handle_out(publish, [], Channel) ->
|
||||||
|
@ -625,10 +625,7 @@ handle_out(Type, Data, Channel) ->
|
||||||
%% Return ConnAck
|
%% Return ConnAck
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
return_connack(AckPacket, Channel = #channel{conninfo = ConnInfo,
|
return_connack(AckPacket, Channel) ->
|
||||||
clientinfo = ClientInfo
|
|
||||||
}) ->
|
|
||||||
ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]),
|
|
||||||
Replies = [{event, connected}, {connack, AckPacket}],
|
Replies = [{event, connected}, {connack, AckPacket}],
|
||||||
case maybe_resume_session(Channel) of
|
case maybe_resume_session(Channel) of
|
||||||
ignore -> {ok, Replies, Channel};
|
ignore -> {ok, Replies, Channel};
|
||||||
|
@ -754,13 +751,12 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
|
||||||
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
|
||||||
shutdown(Reason, Channel);
|
shutdown(Reason, Channel);
|
||||||
|
|
||||||
handle_info({sock_closed, Reason},
|
handle_info({sock_closed, Reason}, Channel =
|
||||||
Channel = #channel{conn_state = connected,
|
#channel{conn_state = connected,
|
||||||
clientinfo = ClientInfo = #{zone := Zone}}) ->
|
clientinfo = ClientInfo = #{zone := Zone}}) ->
|
||||||
emqx_zone:enable_flapping_detect(Zone)
|
emqx_zone:enable_flapping_detect(Zone)
|
||||||
andalso emqx_flapping:detect(ClientInfo),
|
andalso emqx_flapping:detect(ClientInfo),
|
||||||
Channel1 = ensure_disconnected(
|
Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
|
||||||
mabye_publish_will_msg(Channel)),
|
|
||||||
case maybe_shutdown(Reason, Channel1) of
|
case maybe_shutdown(Reason, Channel1) of
|
||||||
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
||||||
Shutdown -> Shutdown
|
Shutdown -> Shutdown
|
||||||
|
@ -879,19 +875,19 @@ interval(will_timer, #channel{will_msg = WillMsg}) ->
|
||||||
%% Terminate
|
%% Terminate
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
terminate(_, #channel{conn_state = idle}) ->
|
terminate(_, #channel{conn_state = idle}) -> ok;
|
||||||
ok;
|
terminate(normal, Channel) ->
|
||||||
terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
|
run_terminate_hook(normal, Channel);
|
||||||
ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]);
|
terminate({shutdown, Reason}, Channel)
|
||||||
terminate({shutdown, Reason}, #channel{conninfo = ConnInfo, clientinfo = ClientInfo})
|
when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered ->
|
||||||
when Reason =:= kicked orelse Reason =:= discarded orelse Reason =:= takeovered ->
|
run_terminate_hook(Reason, Channel);
|
||||||
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]);
|
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
|
||||||
terminate(Reason, #channel{conninfo = ConnInfo, clientinfo = ClientInfo, will_msg = WillMsg}) ->
|
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||||
case WillMsg of
|
run_terminate_hook(Reason, Channel).
|
||||||
undefined -> ok;
|
|
||||||
_ -> publish_will_msg(WillMsg)
|
run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
|
||||||
end,
|
run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
|
||||||
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]).
|
emqx_session:terminate(ClientInfo, Reason, Session).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
@ -940,6 +936,15 @@ expiry_interval(_ClientInfo, #mqtt_packet_connect{clean_start = true}) ->
|
||||||
receive_maximum(#{zone := Zone}, ConnProps) ->
|
receive_maximum(#{zone := Zone}, ConnProps) ->
|
||||||
emqx_mqtt_props:get('Receive-Maximum', ConnProps, emqx_zone:max_inflight(Zone)).
|
emqx_mqtt_props:get('Receive-Maximum', ConnProps, emqx_zone:max_inflight(Zone)).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Run Connect Hooks
|
||||||
|
|
||||||
|
run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
|
case emqx_hooks:run_fold('client.connect', [ConnInfo], ConnPkt) of
|
||||||
|
Error = {error, _Reason} -> Error;
|
||||||
|
NConnPkt -> {ok, NConnPkt, Channel}
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Check Connect Packet
|
%% Check Connect Packet
|
||||||
|
|
||||||
|
@ -987,9 +992,9 @@ fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := MountPoint}) ->
|
||||||
{ok, ClientInfo#{mountpoint := MountPoint1}}.
|
{ok, ClientInfo#{mountpoint := MountPoint1}}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Set logger metadata
|
%% Set log metadata
|
||||||
|
|
||||||
set_logger_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
|
set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
|
||||||
emqx_logger:set_metadata_clientid(ClientId).
|
emqx_logger:set_metadata_clientid(ClientId).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -1172,6 +1177,19 @@ enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo,
|
||||||
_Origin -> AckProps
|
_Origin -> AckProps
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Ensure connected
|
||||||
|
|
||||||
|
ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo,
|
||||||
|
clientinfo = ClientInfo}) ->
|
||||||
|
NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)},
|
||||||
|
ok = emqx_hooks:run('client.connected', [ClientInfo, NConnInfo]),
|
||||||
|
Channel#channel{conninfo = NConnInfo,
|
||||||
|
conn_state = connected,
|
||||||
|
will_msg = emqx_packet:will_msg(ConnPkt),
|
||||||
|
alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
|
||||||
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Init Alias Maximum
|
%% Init Alias Maximum
|
||||||
|
|
||||||
|
@ -1183,20 +1201,6 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
|
||||||
};
|
};
|
||||||
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
|
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Ensure connected
|
|
||||||
|
|
||||||
ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo,
|
|
||||||
clientinfo = ClientInfo}) ->
|
|
||||||
WillMsg = emqx_packet:will_msg(ConnPkt),
|
|
||||||
AliasMaximum = init_alias_maximum(ConnPkt, ClientInfo),
|
|
||||||
NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)},
|
|
||||||
Channel#channel{conninfo = NConnInfo,
|
|
||||||
will_msg = WillMsg,
|
|
||||||
conn_state = connected,
|
|
||||||
alias_maximum = AliasMaximum
|
|
||||||
}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Enrich Keepalive
|
%% Enrich Keepalive
|
||||||
|
|
||||||
|
@ -1255,8 +1259,10 @@ parse_topic_filters(TopicFilters) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Ensure disconnected
|
%% Ensure disconnected
|
||||||
|
|
||||||
ensure_disconnected(Channel = #channel{conninfo = ConnInfo}) ->
|
ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
|
||||||
|
clientinfo = ClientInfo}) ->
|
||||||
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(second)},
|
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(second)},
|
||||||
|
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
||||||
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -206,7 +206,7 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
|
||||||
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
CleanStart = fun(_) ->
|
CleanStart = fun(_) ->
|
||||||
ok = discard_session(ClientId),
|
ok = discard_session(ClientId),
|
||||||
Session = emqx_session:init(ClientInfo, ConnInfo),
|
Session = create_session(ClientInfo, ConnInfo),
|
||||||
{ok, #{session => Session, present => false}}
|
{ok, #{session => Session, present => false}}
|
||||||
end,
|
end,
|
||||||
emqx_cm_locker:trans(ClientId, CleanStart);
|
emqx_cm_locker:trans(ClientId, CleanStart);
|
||||||
|
@ -215,18 +215,23 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
ResumeStart = fun(_) ->
|
ResumeStart = fun(_) ->
|
||||||
case takeover_session(ClientId) of
|
case takeover_session(ClientId) of
|
||||||
{ok, ConnMod, ChanPid, Session} ->
|
{ok, ConnMod, ChanPid, Session} ->
|
||||||
ok = emqx_session:resume(ClientId, Session),
|
ok = emqx_session:resume(ClientInfo, Session),
|
||||||
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
|
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
|
||||||
{ok, #{session => Session,
|
{ok, #{session => Session,
|
||||||
present => true,
|
present => true,
|
||||||
pendings => Pendings}};
|
pendings => Pendings}};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
Session = emqx_session:init(ClientInfo, ConnInfo),
|
Session = create_session(ClientInfo, ConnInfo),
|
||||||
{ok, #{session => Session, present => false}}
|
{ok, #{session => Session, present => false}}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
emqx_cm_locker:trans(ClientId, ResumeStart).
|
emqx_cm_locker:trans(ClientId, ResumeStart).
|
||||||
|
|
||||||
|
create_session(ClientInfo, ConnInfo) ->
|
||||||
|
Session = emqx_session:init(ClientInfo, ConnInfo),
|
||||||
|
ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
|
||||||
|
Session.
|
||||||
|
|
||||||
%% @doc Try to takeover a session.
|
%% @doc Try to takeover a session.
|
||||||
-spec(takeover_session(emqx_types:clientid())
|
-spec(takeover_session(emqx_types:clientid())
|
||||||
-> {ok, emqx_session:session()} | {error, Reason :: term()}).
|
-> {ok, emqx_session:session()} | {error, Reason :: term()}).
|
||||||
|
|
|
@ -28,12 +28,12 @@
|
||||||
, unload/1
|
, unload/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ on_client_connected/4
|
-export([ on_client_connected/3
|
||||||
, on_client_disconnected/4
|
, on_client_disconnected/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([ reason/1 ]).
|
-export([reason/1]).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
load(Env) ->
|
load(Env) ->
|
||||||
|
@ -44,26 +44,12 @@ unload(_Env) ->
|
||||||
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
|
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
|
||||||
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
|
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
|
||||||
|
|
||||||
on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) ->
|
%%--------------------------------------------------------------------
|
||||||
#{peerhost := PeerHost} = ClientInfo,
|
%% Callbacks
|
||||||
#{clean_start := CleanStart,
|
%%--------------------------------------------------------------------
|
||||||
proto_name := ProtoName,
|
|
||||||
proto_ver := ProtoVer,
|
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) ->
|
||||||
keepalive := Keepalive,
|
Presence = connected_presence(ClientInfo, ConnInfo),
|
||||||
expiry_interval := ExpiryInterval} = ConnInfo,
|
|
||||||
ClientId = clientid(ClientInfo, ConnInfo),
|
|
||||||
Username = username(ClientInfo, ConnInfo),
|
|
||||||
Presence = #{clientid => ClientId,
|
|
||||||
username => Username,
|
|
||||||
ipaddress => ntoa(PeerHost),
|
|
||||||
proto_name => ProtoName,
|
|
||||||
proto_ver => ProtoVer,
|
|
||||||
keepalive => Keepalive,
|
|
||||||
connack => ConnAck,
|
|
||||||
clean_start => CleanStart,
|
|
||||||
expiry_interval => ExpiryInterval,
|
|
||||||
ts => erlang:system_time(millisecond)
|
|
||||||
},
|
|
||||||
case emqx_json:safe_encode(Presence) of
|
case emqx_json:safe_encode(Presence) of
|
||||||
{ok, Payload} ->
|
{ok, Payload} ->
|
||||||
emqx_broker:safe_publish(
|
emqx_broker:safe_publish(
|
||||||
|
@ -72,12 +58,12 @@ on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) ->
|
||||||
?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
|
?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
|
on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username},
|
||||||
ClientId = clientid(ClientInfo, ConnInfo),
|
Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
|
||||||
Username = username(ClientInfo, ConnInfo),
|
|
||||||
Presence = #{clientid => ClientId,
|
Presence = #{clientid => ClientId,
|
||||||
username => Username,
|
username => Username,
|
||||||
reason => reason(Reason),
|
reason => reason(Reason),
|
||||||
|
disconnected_at => DisconnectedAt,
|
||||||
ts => erlang:system_time(millisecond)
|
ts => erlang:system_time(millisecond)
|
||||||
},
|
},
|
||||||
case emqx_json:safe_encode(Presence) of
|
case emqx_json:safe_encode(Presence) of
|
||||||
|
@ -88,11 +74,35 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
|
||||||
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
|
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clientid(#{clientid := undefined}, #{clientid := ClientId}) -> ClientId;
|
%%--------------------------------------------------------------------
|
||||||
clientid(#{clientid := ClientId}, _ConnInfo) -> ClientId.
|
%% Helper functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
username(#{username := undefined}, #{username := Username}) -> Username;
|
connected_presence(#{peerhost := PeerHost,
|
||||||
username(#{username := Username}, _ConnInfo) -> Username.
|
sockport := SockPort,
|
||||||
|
clientid := ClientId,
|
||||||
|
username := Username
|
||||||
|
},
|
||||||
|
#{clean_start := CleanStart,
|
||||||
|
proto_name := ProtoName,
|
||||||
|
proto_ver := ProtoVer,
|
||||||
|
keepalive := Keepalive,
|
||||||
|
connected_at := ConnectedAt,
|
||||||
|
expiry_interval := ExpiryInterval
|
||||||
|
}) ->
|
||||||
|
#{clientid => ClientId,
|
||||||
|
username => Username,
|
||||||
|
ipaddress => ntoa(PeerHost),
|
||||||
|
sockport => SockPort,
|
||||||
|
proto_name => ProtoName,
|
||||||
|
proto_ver => ProtoVer,
|
||||||
|
keepalive => Keepalive,
|
||||||
|
connack => 0, %% Deprecated?
|
||||||
|
clean_start => CleanStart,
|
||||||
|
expiry_interval => ExpiryInterval,
|
||||||
|
connected_at => ConnectedAt,
|
||||||
|
ts => erlang:system_time(millisecond)
|
||||||
|
}.
|
||||||
|
|
||||||
make_msg(QoS, Topic, Payload) ->
|
make_msg(QoS, Topic, Payload) ->
|
||||||
emqx_message:set_flag(
|
emqx_message:set_flag(
|
||||||
|
@ -106,6 +116,7 @@ topic(disconnected, ClientId) ->
|
||||||
|
|
||||||
qos(Env) -> proplists:get_value(qos, Env, 0).
|
qos(Env) -> proplists:get_value(qos, Env, 0).
|
||||||
|
|
||||||
|
-compile({inline, [reason/1]}).
|
||||||
reason(Reason) when is_atom(Reason) -> Reason;
|
reason(Reason) when is_atom(Reason) -> Reason;
|
||||||
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
|
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
|
||||||
reason({Error, _}) when is_atom(Error) -> Error;
|
reason({Error, _}) when is_atom(Error) -> Error;
|
||||||
|
|
|
@ -27,7 +27,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([on_client_connected/4]).
|
-export([on_client_connected/3]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Load/Unload Hook
|
%% Load/Unload Hook
|
||||||
|
@ -36,8 +36,7 @@
|
||||||
load(Topics) ->
|
load(Topics) ->
|
||||||
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
|
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
|
||||||
|
|
||||||
on_client_connected(#{clientid := ClientId,
|
on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo, Topics) ->
|
||||||
username := Username}, ?RC_SUCCESS, _ConnInfo, Topics) ->
|
|
||||||
Replace = fun(Topic) ->
|
Replace = fun(Topic) ->
|
||||||
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
|
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
|
||||||
end,
|
end,
|
||||||
|
@ -47,9 +46,9 @@ on_client_connected(#{clientid := ClientId,
|
||||||
unload(_) ->
|
unload(_) ->
|
||||||
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
|
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
rep(<<"%c">>, ClientId, Topic) ->
|
rep(<<"%c">>, ClientId, Topic) ->
|
||||||
emqx_topic:feed_var(<<"%c">>, ClientId, Topic);
|
emqx_topic:feed_var(<<"%c">>, ClientId, Topic);
|
||||||
|
|
|
@ -76,6 +76,7 @@
|
||||||
-export([ deliver/2
|
-export([ deliver/2
|
||||||
, enqueue/2
|
, enqueue/2
|
||||||
, retry/1
|
, retry/1
|
||||||
|
, terminate/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ takeover/1
|
-export([ takeover/1
|
||||||
|
@ -604,11 +605,12 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
|
||||||
takeover(#session{subscriptions = Subs}) ->
|
takeover(#session{subscriptions = Subs}) ->
|
||||||
lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)).
|
lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)).
|
||||||
|
|
||||||
-spec(resume(emqx_types:clientid(), session()) -> ok).
|
-spec(resume(emqx_types:clientinfo(), session()) -> ok).
|
||||||
resume(ClientId, #session{subscriptions = Subs}) ->
|
resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = Subs}) ->
|
||||||
lists:foreach(fun({TopicFilter, SubOpts}) ->
|
lists:foreach(fun({TopicFilter, SubOpts}) ->
|
||||||
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
|
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
|
||||||
end, maps:to_list(Subs)).
|
end, maps:to_list(Subs)),
|
||||||
|
emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]).
|
||||||
|
|
||||||
-spec(replay(session()) -> {ok, replies(), session()}).
|
-spec(replay(session()) -> {ok, replies(), session()}).
|
||||||
replay(Session = #session{inflight = Inflight}) ->
|
replay(Session = #session{inflight = Inflight}) ->
|
||||||
|
@ -626,6 +628,14 @@ replay(Inflight) ->
|
||||||
{PacketId, emqx_message:set_flag(dup, true, Msg)}
|
{PacketId, emqx_message:set_flag(dup, true, Msg)}
|
||||||
end, emqx_inflight:to_list(Inflight)).
|
end, emqx_inflight:to_list(Inflight)).
|
||||||
|
|
||||||
|
-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
|
||||||
|
terminate(ClientInfo, discarded, Session) ->
|
||||||
|
emqx_hooks:run('session.discarded', [ClientInfo, info(Session)]);
|
||||||
|
terminate(ClientInfo, takeovered, Session) ->
|
||||||
|
emqx_hooks:run('session.takeovered', [ClientInfo, info(Session)]);
|
||||||
|
terminate(ClientInfo, Reason, Session) ->
|
||||||
|
emqx_hooks:run('session.terminated', [ClientInfo, Reason, info(Session)]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Inc message/delivery expired counter
|
%% Inc message/delivery expired counter
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -44,6 +44,10 @@ init_per_suite(Config) ->
|
||||||
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
||||||
ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
|
ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
|
||||||
ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
|
ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
|
||||||
|
%% Meck Hooks
|
||||||
|
ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]),
|
||||||
|
ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
|
||||||
|
ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> {ok, Acc} end),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
|
|
|
@ -30,7 +30,6 @@ all() -> emqx_ct:all(?MODULE).
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
%% Broker
|
|
||||||
ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker],
|
ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker],
|
||||||
[passthrough, no_history, no_link]),
|
[passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
||||||
|
@ -329,7 +328,7 @@ t_takeover(_) ->
|
||||||
t_resume(_) ->
|
t_resume(_) ->
|
||||||
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
|
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
|
||||||
Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
|
Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
|
||||||
ok = emqx_session:resume(<<"clientid">>, Session).
|
ok = emqx_session:resume(#{clientid => <<"clientid">>}, Session).
|
||||||
|
|
||||||
t_replay(_) ->
|
t_replay(_) ->
|
||||||
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
||||||
|
|
Loading…
Reference in New Issue