diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index 5ccd0f52b..efffabee3 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -64,7 +64,7 @@ %% Connection mode connection_required :: boolean(), %% Connection State - conn_state :: idle | connected | disconnected, + conn_state :: conn_state(), %% Session token to identity this connection token :: binary() | undefined }). @@ -103,8 +103,8 @@ info(Keys, Channel) when is_list(Keys) -> info(conninfo, #channel{conninfo = ConnInfo}) -> ConnInfo; -info(conn_state, #channel{conn_state = CState}) -> - CState; +info(conn_state, #channel{conn_state = ConnState}) -> + ConnState; info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> @@ -236,7 +236,11 @@ handle_call(subscriptions, _From, Channel) -> {reply, {error, noimpl}, Channel}; handle_call(kick, _From, Channel) -> - {reply, {error, noimpl}, Channel}; + NChannel = ensure_disconnected(kicked, Channel), + shutdown_and_reply(kicked, ok, NChannel); + +handle_call(discard, _From, Channel) -> + shutdown_and_reply(discarded, ok, Channel); handle_call(Req, _From, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), @@ -398,15 +402,6 @@ fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) -> Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), {ok, ClientInfo#{mountpoint := Mountpoint1}}. -ensure_connected(Channel = #channel{ctx = Ctx, - conninfo = ConnInfo, - clientinfo = ClientInfo}) -> - NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond) - }, - ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]), - Channel#channel{conninfo = NConnInfo}. - process_connect(#channel{ctx = Ctx, session = Session, conninfo = ConnInfo, @@ -447,6 +442,21 @@ run_hooks(Ctx, Name, Args, Acc) -> metrics_inc(Name, Ctx) -> emqx_gateway_ctx:metrics_inc(Ctx, Name). +%%-------------------------------------------------------------------- +%% Ensure connected + +ensure_connected(Channel = #channel{ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond) + }, + _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]), + ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), + Channel#channel{conninfo = NConnInfo, conn_state = connected}. + +%%-------------------------------------------------------------------- +%% Ensure disconnected + ensure_disconnected(Reason, Channel = #channel{ ctx = Ctx, conninfo = ConnInfo, @@ -455,6 +465,12 @@ ensure_disconnected(Reason, Channel = #channel{ ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. +shutdown_and_reply(Reason, Reply, Channel) -> + {shutdown, Reason, Reply, Channel}. + +%shutdown_and_reply(Reason, Reply, OutPkt, Channel) -> +% {shutdown, Reason, Reply, OutPkt, Channel}. + %%-------------------------------------------------------------------- %% Call Chain %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index be3b81bdd..ef59cfb19 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -87,8 +87,7 @@ paths() -> , {<<"lte_lifetime">>, timestamp} ]). --define(query_fun, {?MODULE, query}). --define(format_fun, {?MODULE, format_channel_info}). +-define(QUERY_FUN, {?MODULE, query}). clients(get, #{ bindings := #{name := Name0} , query_string := Params @@ -99,14 +98,14 @@ clients(get, #{ bindings := #{name := Name0} undefined -> Response = emqx_mgmt_api:cluster_query( Params, TabName, - ?CLIENT_QS_SCHEMA, ?query_fun), + ?CLIENT_QS_SCHEMA, ?QUERY_FUN), emqx_mgmt_util:generate_response(Response); Node1 -> Node = binary_to_atom(Node1, utf8), ParamsWithoutNode = maps:without([<<"node">>], Params), Response = emqx_mgmt_api:node_query( Node, ParamsWithoutNode, - TabName, ?CLIENT_QS_SCHEMA, ?query_fun), + TabName, ?CLIENT_QS_SCHEMA, ?QUERY_FUN), emqx_mgmt_util:generate_response(Response) end end). @@ -456,8 +455,7 @@ schema("/gateway/:name/clients/:clientid/subscriptions") -> , post => #{ description => <<"Create a subscription membership">> , parameters => params_client_insta() - %% FIXME: - , requestBody => emqx_dashboard_swagger:schema_with_examples( + , 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(subscription), examples_subsctiption()) , responses => diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index 5919701cb..c28a4724d 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -51,11 +51,26 @@ clientinfo :: emqx_types:clientinfo(), %% Session session :: emqx_lwm2m_session:session() | undefined, + %% Channl State + %% TODO: is there need + conn_state :: conn_state(), %% Timer timers :: #{atom() => disable | undefined | reference()}, + %% FIXME: don't store anonymouse func with_context :: function() }). +-type channel() :: #channel{}. + +-type conn_state() :: idle | connecting | connected | disconnected. + +-type reply() :: {outgoing, coap_message()} + | {outgoing, [coap_message()]} + | {event, conn_state()|updated} + | {close, Reason :: atom()}. + +-type replies() :: reply() | [reply()]. + %% TODO: -define(DEFAULT_OVERRIDE, #{ clientid => <<"">> %% Generate clientid by default @@ -79,8 +94,8 @@ info(Keys, Channel) when is_list(Keys) -> info(conninfo, #channel{conninfo = ConnInfo}) -> ConnInfo; -info(conn_state, _) -> - connected; +info(conn_state, #channel{conn_state = ConnState}) -> + ConnState; info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> @@ -125,7 +140,7 @@ init(ConnInfoT = #{peername := {PeerHost, _}, , clientinfo = ClientInfo , timers = #{} , session = emqx_lwm2m_session:new() - %% FIXME: don't store anonymouse func + , conn_state = idle , with_context = with_context(Ctx, ClientInfo) }. @@ -143,9 +158,15 @@ send_cmd(Channel, Cmd) -> %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- -handle_in(Msg, ChannleT) -> - Channel = update_life_timer(ChannleT), - call_session(handle_coap_in, Msg, Channel). + +-spec handle_in(coap_message() | {frame_error, any()}, channel()) + -> {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: term(), channel()} + | {shutdown, Reason :: term(), replies(), channel()}. +handle_in(Msg, Channle) -> + NChannel = update_life_timer(Channle), + call_session(handle_coap_in, Msg, NChannel). %%-------------------------------------------------------------------- %% Handle Delivers from broker to client @@ -193,10 +214,23 @@ handle_call(subscriptions, _From, Channel) -> {reply, {error, noimpl}, Channel}; handle_call(kick, _From, Channel) -> - {reply, {error, noimpl}, Channel}; + NChannel = ensure_disconnected(kicked, Channel), + shutdown_and_reply(kicked, ok, NChannel); handle_call(discard, _From, Channel) -> - {reply, {error, noimpl}, Channel}; + shutdown_and_reply(discarded, ok, Channel); + +%% TODO: No Session Takeover +%handle_call({takeover, 'begin'}, _From, Channel = #channel{session = Session}) -> +% reply(Session, Channel#channel{takeover = true}); +% +%handle_call({takeover, 'end'}, _From, Channel = #channel{session = Session, +% pendings = Pendings}) -> +% ok = emqx_session:takeover(Session), +% %% TODO: Should not drain deliver here (side effect) +% Delivers = emqx_misc:drain_deliver(), +% AllPendings = lists:append(Delivers, Pendings), +% shutdown_and_reply(takenover, AllPendings, Channel); handle_call(Req, _From, Channel) -> ?SLOG(error, #{ msg => "unexpected_call" @@ -239,6 +273,41 @@ terminate(Reason, #channel{ctx = Ctx, %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% Ensure connected + +ensure_connected(Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]), + + NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, + ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), + Channel#channel{ + conninfo = NConnInfo, + conn_state = connected + }. + +%%-------------------------------------------------------------------- +%% Ensure disconnected + +ensure_disconnected(Reason, Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, + ok = run_hooks(Ctx, 'client.disconnected', + [ClientInfo, Reason, NConnInfo]), + Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. + +shutdown_and_reply(Reason, Reply, Channel) -> + {shutdown, Reason, Reply, Channel}. + +%shutdown_and_reply(Reason, Reply, OutPkt, Channel) -> +% {shutdown, Reason, Reply, OutPkt, Channel}. + set_peercert_infos(NoSSL, ClientInfo) when NoSSL =:= nossl; NoSSL =:= undefined -> @@ -335,6 +404,7 @@ enrich_clientinfo(#coap_message{options = Options} = Msg, Query = maps:get(uri_query, Options, #{}), case Query of #{<<"ep">> := Epn, <<"lt">> := Lifetime} -> + %% FIXME: the following keys is not belong standrad protocol Username = maps:get(<<"imei">>, Query, Epn), Password = maps:get(<<"password">>, Query, undefined), ClientId = maps:get(<<"device_id">>, Query, Epn), @@ -379,13 +449,6 @@ fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) -> Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), {ok, ClientInfo#{mountpoint := Mountpoint1}}. -ensure_connected(Channel = #channel{ctx = Ctx, - conninfo = ConnInfo, - clientinfo = ClientInfo}) -> - _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]), - ok = run_hooks(Ctx, 'client.connected', [ClientInfo, ConnInfo]), - Channel. - process_connect(Channel = #channel{ctx = Ctx, session = Session, conninfo = ConnInfo, diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 4c5d84caa..48cef70f8 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -61,7 +61,7 @@ session :: undefined | map(), %% ClientInfo override specs clientinfo_override :: map(), - %% Connection Channel + %% Channel State conn_state :: conn_state(), %% Heartbeat heartbeat :: emqx_stomp_heartbeat:heartbeat(), @@ -294,9 +294,9 @@ ensure_connected(Channel = #channel{ clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - Channel#channel{conninfo = NConnInfo, - conn_state = connected - }. + Channel#channel{ + conninfo = NConnInfo, + conn_state = connected}. process_connect(Channel = #channel{ ctx = Ctx,