fix(gw): handle discard/kick event

This commit is contained in:
JianBo He 2021-12-16 17:19:58 +08:00
parent a2823f2ad6
commit 3443aeff18
4 changed files with 115 additions and 38 deletions

View File

@ -64,7 +64,7 @@
%% Connection mode
connection_required :: boolean(),
%% Connection State
conn_state :: idle | connected | disconnected,
conn_state :: conn_state(),
%% Session token to identity this connection
token :: binary() | undefined
}).
@ -103,8 +103,8 @@ info(Keys, Channel) when is_list(Keys) ->
info(conninfo, #channel{conninfo = ConnInfo}) ->
ConnInfo;
info(conn_state, #channel{conn_state = CState}) ->
CState;
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->
@ -236,7 +236,11 @@ handle_call(subscriptions, _From, Channel) ->
{reply, {error, noimpl}, Channel};
handle_call(kick, _From, Channel) ->
{reply, {error, noimpl}, Channel};
NChannel = ensure_disconnected(kicked, Channel),
shutdown_and_reply(kicked, ok, NChannel);
handle_call(discard, _From, Channel) ->
shutdown_and_reply(discarded, ok, Channel);
handle_call(Req, _From, Channel) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
@ -398,15 +402,6 @@ fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
{ok, ClientInfo#{mountpoint := Mountpoint1}}.
ensure_connected(Channel = #channel{ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond)
},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
_ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]),
Channel#channel{conninfo = NConnInfo}.
process_connect(#channel{ctx = Ctx,
session = Session,
conninfo = ConnInfo,
@ -447,6 +442,21 @@ run_hooks(Ctx, Name, Args, Acc) ->
metrics_inc(Name, Ctx) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name).
%%--------------------------------------------------------------------
%% Ensure connected
ensure_connected(Channel = #channel{ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond)
},
_ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]),
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{conninfo = NConnInfo, conn_state = connected}.
%%--------------------------------------------------------------------
%% Ensure disconnected
ensure_disconnected(Reason, Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
@ -455,6 +465,12 @@ ensure_disconnected(Reason, Channel = #channel{
ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
shutdown_and_reply(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}.
%shutdown_and_reply(Reason, Reply, OutPkt, Channel) ->
% {shutdown, Reason, Reply, OutPkt, Channel}.
%%--------------------------------------------------------------------
%% Call Chain
%%--------------------------------------------------------------------

View File

@ -87,8 +87,7 @@ paths() ->
, {<<"lte_lifetime">>, timestamp}
]).
-define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format_channel_info}).
-define(QUERY_FUN, {?MODULE, query}).
clients(get, #{ bindings := #{name := Name0}
, query_string := Params
@ -99,14 +98,14 @@ clients(get, #{ bindings := #{name := Name0}
undefined ->
Response = emqx_mgmt_api:cluster_query(
Params, TabName,
?CLIENT_QS_SCHEMA, ?query_fun),
?CLIENT_QS_SCHEMA, ?QUERY_FUN),
emqx_mgmt_util:generate_response(Response);
Node1 ->
Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Params),
Response = emqx_mgmt_api:node_query(
Node, ParamsWithoutNode,
TabName, ?CLIENT_QS_SCHEMA, ?query_fun),
TabName, ?CLIENT_QS_SCHEMA, ?QUERY_FUN),
emqx_mgmt_util:generate_response(Response)
end
end).
@ -456,8 +455,7 @@ schema("/gateway/:name/clients/:clientid/subscriptions") ->
, post =>
#{ description => <<"Create a subscription membership">>
, parameters => params_client_insta()
%% FIXME:
, requestBody => emqx_dashboard_swagger:schema_with_examples(
, 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
ref(subscription),
examples_subsctiption())
, responses =>

View File

@ -51,11 +51,26 @@
clientinfo :: emqx_types:clientinfo(),
%% Session
session :: emqx_lwm2m_session:session() | undefined,
%% Channl State
%% TODO: is there need
conn_state :: conn_state(),
%% Timer
timers :: #{atom() => disable | undefined | reference()},
%% FIXME: don't store anonymouse func
with_context :: function()
}).
-type channel() :: #channel{}.
-type conn_state() :: idle | connecting | connected | disconnected.
-type reply() :: {outgoing, coap_message()}
| {outgoing, [coap_message()]}
| {event, conn_state()|updated}
| {close, Reason :: atom()}.
-type replies() :: reply() | [reply()].
%% TODO:
-define(DEFAULT_OVERRIDE,
#{ clientid => <<"">> %% Generate clientid by default
@ -79,8 +94,8 @@ info(Keys, Channel) when is_list(Keys) ->
info(conninfo, #channel{conninfo = ConnInfo}) ->
ConnInfo;
info(conn_state, _) ->
connected;
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->
@ -125,7 +140,7 @@ init(ConnInfoT = #{peername := {PeerHost, _},
, clientinfo = ClientInfo
, timers = #{}
, session = emqx_lwm2m_session:new()
%% FIXME: don't store anonymouse func
, conn_state = idle
, with_context = with_context(Ctx, ClientInfo)
}.
@ -143,9 +158,15 @@ send_cmd(Channel, Cmd) ->
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
handle_in(Msg, ChannleT) ->
Channel = update_life_timer(ChannleT),
call_session(handle_coap_in, Msg, Channel).
-spec handle_in(coap_message() | {frame_error, any()}, channel())
-> {ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}.
handle_in(Msg, Channle) ->
NChannel = update_life_timer(Channle),
call_session(handle_coap_in, Msg, NChannel).
%%--------------------------------------------------------------------
%% Handle Delivers from broker to client
@ -193,10 +214,23 @@ handle_call(subscriptions, _From, Channel) ->
{reply, {error, noimpl}, Channel};
handle_call(kick, _From, Channel) ->
{reply, {error, noimpl}, Channel};
NChannel = ensure_disconnected(kicked, Channel),
shutdown_and_reply(kicked, ok, NChannel);
handle_call(discard, _From, Channel) ->
{reply, {error, noimpl}, Channel};
shutdown_and_reply(discarded, ok, Channel);
%% TODO: No Session Takeover
%handle_call({takeover, 'begin'}, _From, Channel = #channel{session = Session}) ->
% reply(Session, Channel#channel{takeover = true});
%
%handle_call({takeover, 'end'}, _From, Channel = #channel{session = Session,
% pendings = Pendings}) ->
% ok = emqx_session:takeover(Session),
% %% TODO: Should not drain deliver here (side effect)
% Delivers = emqx_misc:drain_deliver(),
% AllPendings = lists:append(Delivers, Pendings),
% shutdown_and_reply(takenover, AllPendings, Channel);
handle_call(Req, _From, Channel) ->
?SLOG(error, #{ msg => "unexpected_call"
@ -239,6 +273,41 @@ terminate(Reason, #channel{ctx = Ctx,
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Ensure connected
ensure_connected(Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
_ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]),
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
%%--------------------------------------------------------------------
%% Ensure disconnected
ensure_disconnected(Reason, Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.disconnected',
[ClientInfo, Reason, NConnInfo]),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
shutdown_and_reply(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}.
%shutdown_and_reply(Reason, Reply, OutPkt, Channel) ->
% {shutdown, Reason, Reply, OutPkt, Channel}.
set_peercert_infos(NoSSL, ClientInfo)
when NoSSL =:= nossl;
NoSSL =:= undefined ->
@ -335,6 +404,7 @@ enrich_clientinfo(#coap_message{options = Options} = Msg,
Query = maps:get(uri_query, Options, #{}),
case Query of
#{<<"ep">> := Epn, <<"lt">> := Lifetime} ->
%% FIXME: the following keys is not belong standrad protocol
Username = maps:get(<<"imei">>, Query, Epn),
Password = maps:get(<<"password">>, Query, undefined),
ClientId = maps:get(<<"device_id">>, Query, Epn),
@ -379,13 +449,6 @@ fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
{ok, ClientInfo#{mountpoint := Mountpoint1}}.
ensure_connected(Channel = #channel{ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
_ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]),
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, ConnInfo]),
Channel.
process_connect(Channel = #channel{ctx = Ctx,
session = Session,
conninfo = ConnInfo,

View File

@ -61,7 +61,7 @@
session :: undefined | map(),
%% ClientInfo override specs
clientinfo_override :: map(),
%% Connection Channel
%% Channel State
conn_state :: conn_state(),
%% Heartbeat
heartbeat :: emqx_stomp_heartbeat:heartbeat(),
@ -294,9 +294,9 @@ ensure_connected(Channel = #channel{
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{conninfo = NConnInfo,
conn_state = connected
}.
Channel#channel{
conninfo = NConnInfo,
conn_state = connected}.
process_connect(Channel = #channel{
ctx = Ctx,