829 lines
26 KiB
Erlang
829 lines
26 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_coap_channel).
|
|
|
|
-behaviour(emqx_gateway_channel).
|
|
|
|
%% API
|
|
-export([
|
|
info/1,
|
|
info/2,
|
|
stats/1,
|
|
validator/4,
|
|
metrics_inc/2,
|
|
run_hooks/3,
|
|
send_request/2
|
|
]).
|
|
|
|
-export([
|
|
init/2,
|
|
handle_in/2,
|
|
handle_deliver/2,
|
|
handle_timeout/3,
|
|
terminate/2
|
|
]).
|
|
|
|
-export([
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_info/2
|
|
]).
|
|
|
|
-export_type([channel/0]).
|
|
|
|
-include("emqx_coap.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-record(channel, {
|
|
%% Context
|
|
ctx :: emqx_gateway_ctx:context(),
|
|
%% Connection Info
|
|
conninfo :: emqx_types:conninfo(),
|
|
%% Client Info
|
|
clientinfo :: emqx_types:clientinfo(),
|
|
%% Session
|
|
session :: emqx_coap_session:session() | undefined,
|
|
%% Keepalive
|
|
keepalive :: emqx_keepalive:keepalive() | undefined,
|
|
%% Timer
|
|
timers :: #{atom() => disable | undefined | reference()},
|
|
%% Connection mode
|
|
connection_required :: boolean(),
|
|
%% Connection State
|
|
conn_state :: conn_state(),
|
|
%% Session token to identity this connection
|
|
token :: binary() | undefined
|
|
}).
|
|
|
|
-type channel() :: #channel{}.
|
|
|
|
-type conn_state() :: idle | connecting | connected | disconnected.
|
|
|
|
-type reply() ::
|
|
{outgoing, coap_message()}
|
|
| {outgoing, [coap_message()]}
|
|
| {event, conn_state() | updated}
|
|
| {close, Reason :: atom()}.
|
|
|
|
-type replies() :: reply() | [reply()].
|
|
|
|
-define(TOKEN_MAXIMUM, 4294967295).
|
|
|
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
|
|
|
|
-define(DEF_IDLE_TIME, timer:seconds(30)).
|
|
|
|
-import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% API
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec info(channel()) -> emqx_types:infos().
|
|
info(Channel) ->
|
|
maps:from_list(info(?INFO_KEYS, Channel)).
|
|
|
|
-spec info(list(atom()) | atom(), channel()) -> term().
|
|
info(Keys, Channel) when is_list(Keys) ->
|
|
[{Key, info(Key, Channel)} || Key <- Keys];
|
|
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
|
ConnInfo;
|
|
info(conn_state, #channel{conn_state = ConnState}) ->
|
|
ConnState;
|
|
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
|
ClientInfo;
|
|
info(session, #channel{session = Session}) ->
|
|
emqx_utils:maybe_apply(fun emqx_coap_session:info/1, Session);
|
|
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
|
|
ClientId;
|
|
info(ctx, #channel{ctx = Ctx}) ->
|
|
Ctx.
|
|
|
|
-spec stats(channel()) -> emqx_types:stats().
|
|
stats(#channel{session = Session}) ->
|
|
emqx_coap_session:stats(Session).
|
|
|
|
-spec init(map(), map()) -> channel().
|
|
init(
|
|
ConnInfo = #{
|
|
peername := {PeerHost, _},
|
|
sockname := {_, SockPort}
|
|
},
|
|
#{ctx := Ctx} = Config
|
|
) ->
|
|
Peercert = maps:get(peercert, ConnInfo, undefined),
|
|
Mountpoint = maps:get(mountpoint, Config, <<>>),
|
|
EnableAuthn = maps:get(enable_authn, Config, true),
|
|
ListenerId =
|
|
case maps:get(listener, Config, undefined) of
|
|
undefined -> undefined;
|
|
{GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName)
|
|
end,
|
|
ClientInfo = set_peercert_infos(
|
|
Peercert,
|
|
#{
|
|
zone => default,
|
|
listener => ListenerId,
|
|
protocol => 'coap',
|
|
peerhost => PeerHost,
|
|
sockport => SockPort,
|
|
clientid => emqx_guid:to_base62(emqx_guid:gen()),
|
|
username => undefined,
|
|
is_bridge => false,
|
|
is_superuser => false,
|
|
enable_authn => EnableAuthn,
|
|
mountpoint => Mountpoint
|
|
}
|
|
),
|
|
Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_TIME),
|
|
#channel{
|
|
ctx = Ctx,
|
|
conninfo = ConnInfo,
|
|
clientinfo = ClientInfo,
|
|
timers = #{},
|
|
session = emqx_coap_session:new(),
|
|
keepalive = emqx_keepalive:init(Heartbeat),
|
|
connection_required = maps:get(connection_required, Config, false),
|
|
conn_state = idle
|
|
}.
|
|
|
|
validator(Action, Topic, Ctx, ClientInfo) ->
|
|
emqx_gateway_ctx:authorize(Ctx, ClientInfo, Action, Topic).
|
|
|
|
-spec send_request(pid(), coap_message()) -> any().
|
|
send_request(Channel, Request) ->
|
|
gen_server:send_request(Channel, {?FUNCTION_NAME, Request}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle incoming packet
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec handle_in(coap_message() | {frame_error, any()}, channel()) ->
|
|
{ok, channel()}
|
|
| {ok, replies(), channel()}
|
|
| {shutdown, Reason :: term(), channel()}
|
|
| {shutdown, Reason :: term(), replies(), channel()}.
|
|
handle_in(Msg, Channel0) ->
|
|
Channel = ensure_keepalive_timer(Channel0),
|
|
case emqx_coap_message:is_request(Msg) of
|
|
true ->
|
|
check_auth_state(Msg, Channel);
|
|
_ ->
|
|
call_session(handle_response, Msg, Channel)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle Delivers from broker to client
|
|
%%--------------------------------------------------------------------
|
|
handle_deliver(
|
|
Delivers,
|
|
#channel{
|
|
session = Session,
|
|
ctx = Ctx
|
|
} = Channel
|
|
) ->
|
|
handle_result(emqx_coap_session:deliver(Delivers, Ctx, Session), Channel).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle timeout
|
|
%%--------------------------------------------------------------------
|
|
|
|
handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) ->
|
|
case emqx_keepalive:check(NewVal, KeepAlive) of
|
|
{ok, NewKeepAlive} ->
|
|
Channel2 = ensure_keepalive_timer(fun make_timer/4, Channel),
|
|
{ok, Channel2#channel{keepalive = NewKeepAlive}};
|
|
{error, timeout} ->
|
|
{shutdown, timeout, ensure_disconnected(keepalive_timeout, Channel)}
|
|
end;
|
|
handle_timeout(_, {transport, Msg}, Channel) ->
|
|
call_session(timeout, Msg, Channel);
|
|
handle_timeout(_, disconnect, Channel) ->
|
|
{shutdown, normal, Channel};
|
|
handle_timeout(_, _, Channel) ->
|
|
{ok, Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle call
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec handle_call(Req :: term(), From :: term(), channel()) ->
|
|
{reply, Reply :: term(), channel()}
|
|
| {reply, Reply :: term(), replies(), channel()}
|
|
| {shutdown, Reason :: term(), Reply :: term(), channel()}
|
|
| {shutdown, Reason :: term(), Reply :: term(), coap_message(), channel()}.
|
|
handle_call({send_request, Msg}, From, Channel) ->
|
|
Result = call_session(handle_out, {{send_request, From}, Msg}, Channel),
|
|
erlang:setelement(1, Result, noreply);
|
|
handle_call(
|
|
{subscribe, Topic, SubOpts},
|
|
_From,
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
clientinfo =
|
|
ClientInfo =
|
|
#{
|
|
clientid := ClientId,
|
|
mountpoint := Mountpoint
|
|
},
|
|
session = Session
|
|
}
|
|
) ->
|
|
Token = maps:get(
|
|
token,
|
|
maps:get(sub_props, SubOpts, #{}),
|
|
<<>>
|
|
),
|
|
NSubOpts = maps:merge(
|
|
emqx_gateway_utils:default_subopts(),
|
|
SubOpts
|
|
),
|
|
MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
|
|
_ = emqx_broker:subscribe(MountedTopic, ClientId, NSubOpts),
|
|
|
|
_ = run_hooks(
|
|
Ctx,
|
|
'session.subscribed',
|
|
[ClientInfo, MountedTopic, NSubOpts]
|
|
),
|
|
%% modify session state
|
|
SubReq = {Topic, Token},
|
|
TempMsg = #coap_message{type = non},
|
|
%% FIXME: The subopts is not used for emqx_coap_session
|
|
Result = emqx_coap_session:process_subscribe(
|
|
SubReq, TempMsg, #{}, Session
|
|
),
|
|
NSession = maps:get(session, Result),
|
|
{reply, {ok, {MountedTopic, NSubOpts}}, [{event, updated}], Channel#channel{session = NSession}};
|
|
handle_call(
|
|
{unsubscribe, Topic},
|
|
_From,
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
clientinfo =
|
|
ClientInfo =
|
|
#{mountpoint := Mountpoint},
|
|
session = Session
|
|
}
|
|
) ->
|
|
MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
|
|
ok = emqx_broker:unsubscribe(MountedTopic),
|
|
_ = run_hooks(
|
|
Ctx,
|
|
'session.unsubscribed',
|
|
[ClientInfo, MountedTopic, #{}]
|
|
),
|
|
|
|
%% modify session state
|
|
UnSubReq = Topic,
|
|
TempMsg = #coap_message{type = non},
|
|
Result = emqx_coap_session:process_subscribe(
|
|
UnSubReq, TempMsg, #{}, Session
|
|
),
|
|
NSession = maps:get(session, Result),
|
|
{reply, ok, [{event, updated}], Channel#channel{session = NSession}};
|
|
handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
|
|
Subs = emqx_coap_session:info(subscriptions, Session),
|
|
{reply, {ok, maps:to_list(Subs)}, Channel};
|
|
handle_call({check_token, ReqToken}, _From, Channel = #channel{token = Token}) ->
|
|
{reply, ReqToken == Token, Channel};
|
|
handle_call(kick, _From, Channel) ->
|
|
NChannel = ensure_disconnected(kicked, Channel),
|
|
shutdown_and_reply(kicked, ok, NChannel);
|
|
handle_call(discard, _From, Channel) ->
|
|
shutdown_and_reply(discarded, ok, Channel);
|
|
handle_call(Req, _From, Channel) ->
|
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
|
{reply, ignored, Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle Cast
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec handle_cast(Req :: term(), channel()) ->
|
|
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
|
|
handle_cast(close, Channel) ->
|
|
?SLOG(info, #{msg => "close_connection"}),
|
|
shutdown(normal, Channel);
|
|
handle_cast(inc_recv_pkt, Channel) ->
|
|
_ = emqx_pd:inc_counter(recv_pkt, 1),
|
|
{ok, Channel};
|
|
handle_cast(Req, Channel) ->
|
|
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
|
|
{ok, Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle Info
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec handle_info(Info :: term(), channel()) ->
|
|
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
|
|
handle_info({subscribe, _AutoSubs}, Channel) ->
|
|
{ok, Channel};
|
|
handle_info(Info, Channel) ->
|
|
?SLOG(warning, #{msg => "unexpected_info", info => Info}),
|
|
{ok, Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Terminate
|
|
%%--------------------------------------------------------------------
|
|
terminate(Reason, #channel{
|
|
clientinfo = ClientInfo,
|
|
ctx = Ctx,
|
|
session = Session
|
|
}) ->
|
|
run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
set_peercert_infos(NoSSL, ClientInfo) when
|
|
NoSSL =:= nossl;
|
|
NoSSL =:= undefined
|
|
->
|
|
ClientInfo;
|
|
set_peercert_infos(Peercert, ClientInfo) ->
|
|
{DN, CN} = {esockd_peercert:subject(Peercert), esockd_peercert:common_name(Peercert)},
|
|
ClientInfo#{dn => DN, cn => CN}.
|
|
|
|
ensure_timer(Name, Time, Msg, #channel{timers = Timers} = Channel) ->
|
|
case maps:get(Name, Timers, undefined) of
|
|
undefined ->
|
|
make_timer(Name, Time, Msg, Channel);
|
|
_ ->
|
|
Channel
|
|
end.
|
|
|
|
make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) ->
|
|
TRef = emqx_utils:start_timer(Time, Msg),
|
|
Channel#channel{timers = Timers#{Name => TRef}}.
|
|
|
|
ensure_keepalive_timer(Channel) ->
|
|
ensure_keepalive_timer(fun ensure_timer/4, Channel).
|
|
|
|
ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) ->
|
|
Heartbeat = emqx_keepalive:info(interval, KeepAlive),
|
|
Fun(keepalive, Heartbeat, keepalive, Channel).
|
|
|
|
check_auth_state(Msg, #channel{connection_required = false} = Channel) ->
|
|
call_session(handle_request, Msg, Channel);
|
|
check_auth_state(Msg, #channel{connection_required = true} = Channel) ->
|
|
case is_create_connection_request(Msg) of
|
|
true ->
|
|
call_session(handle_request, Msg, Channel);
|
|
false ->
|
|
URIQuery = emqx_coap_message:extract_uri_query(Msg),
|
|
case maps:get(<<"token">>, URIQuery, undefined) of
|
|
undefined ->
|
|
?SLOG(debug, #{msg => "token_required_in_conn_mode", message => Msg});
|
|
_ ->
|
|
check_token(Msg, Channel)
|
|
end
|
|
end.
|
|
|
|
is_create_connection_request(Msg = #coap_message{method = Method}) when
|
|
is_atom(Method) andalso Method =/= undefined
|
|
->
|
|
URIPath = emqx_coap_message:get_option(uri_path, Msg, []),
|
|
case URIPath of
|
|
[<<"mqtt">>, <<"connection">>] when Method == post ->
|
|
true;
|
|
_ ->
|
|
false
|
|
end;
|
|
is_create_connection_request(_Msg) ->
|
|
false.
|
|
|
|
is_delete_connection_request(Msg = #coap_message{method = Method}) when
|
|
is_atom(Method) andalso Method =/= undefined
|
|
->
|
|
URIPath = emqx_coap_message:get_option(uri_path, Msg, []),
|
|
case URIPath of
|
|
[<<"mqtt">>, <<"connection">>] when Method == delete ->
|
|
true;
|
|
_ ->
|
|
false
|
|
end;
|
|
is_delete_connection_request(_Msg) ->
|
|
false.
|
|
|
|
check_token(
|
|
Msg,
|
|
#channel{
|
|
token = Token,
|
|
clientinfo = ClientInfo
|
|
} = Channel
|
|
) ->
|
|
IsDeleteConn = is_delete_connection_request(Msg),
|
|
#{clientid := ClientId} = ClientInfo,
|
|
case emqx_coap_message:extract_uri_query(Msg) of
|
|
#{
|
|
<<"clientid">> := ClientId,
|
|
<<"token">> := Token
|
|
} ->
|
|
call_session(handle_request, Msg, Channel);
|
|
#{<<"clientid">> := ReqClientId, <<"token">> := ReqToken} ->
|
|
case emqx_gateway_cm:call(coap, ReqClientId, {check_token, ReqToken}) of
|
|
undefined when IsDeleteConn ->
|
|
Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
|
|
{shutdown, normal, Reply, Channel};
|
|
undefined ->
|
|
?SLOG(info, #{
|
|
msg => "remote_connection_not_found",
|
|
clientid => ReqClientId,
|
|
token => ReqToken
|
|
}),
|
|
Reply = emqx_coap_message:reset(Msg),
|
|
{shutdown, normal, Reply, Channel};
|
|
false ->
|
|
?SLOG(info, #{
|
|
msg => "request_token_invalid", clientid => ReqClientId, token => ReqToken
|
|
}),
|
|
Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
|
|
{shutdown, normal, Reply, Channel};
|
|
true ->
|
|
%% hack: since each message request can spawn a new connection
|
|
%% process, we can't rely on the `inc_incoming_stats' call in
|
|
%% `emqx_gateway_conn:handle_incoming' to properly keep track of
|
|
%% bumping incoming requests for an existing channel. Since this
|
|
%% number is used by keepalive, we have to bump it inside the
|
|
%% requested channel/connection pid so heartbeats actually work.
|
|
emqx_gateway_cm:cast(coap, ReqClientId, inc_recv_pkt),
|
|
call_session(handle_request, Msg, Channel)
|
|
end;
|
|
_ ->
|
|
ErrMsg = <<"Missing token or clientid in connection mode">>,
|
|
Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg),
|
|
{shutdown, normal, Reply, Channel}
|
|
end.
|
|
|
|
run_conn_hooks(
|
|
Input,
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
conninfo = ConnInfo
|
|
}
|
|
) ->
|
|
ConnProps = #{},
|
|
case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of
|
|
Error = {error, _Reason} -> Error;
|
|
_NConnProps -> {ok, Input, Channel}
|
|
end.
|
|
|
|
enrich_conninfo(
|
|
{Queries, _Msg},
|
|
Channel = #channel{
|
|
keepalive = KeepAlive,
|
|
conninfo = ConnInfo
|
|
}
|
|
) ->
|
|
case Queries of
|
|
#{<<"clientid">> := ClientId} ->
|
|
Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)),
|
|
NConnInfo = ConnInfo#{
|
|
clientid => ClientId,
|
|
proto_name => <<"CoAP">>,
|
|
proto_ver => <<"1">>,
|
|
clean_start => true,
|
|
keepalive => Interval,
|
|
expiry_interval => 0
|
|
},
|
|
{ok, Channel#channel{conninfo = NConnInfo}};
|
|
_ ->
|
|
{error, "clientid is required", Channel}
|
|
end.
|
|
|
|
enrich_clientinfo(
|
|
{Queries, Msg},
|
|
Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo0}
|
|
) ->
|
|
ClientInfo = ClientInfo0#{
|
|
clientid => maps:get(clientid, ConnInfo),
|
|
username => maps:get(<<"username">>, Queries, undefined),
|
|
password => maps:get(<<"password">>, Queries, undefined)
|
|
},
|
|
{ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo),
|
|
{ok, Channel#channel{clientinfo = NClientInfo}}.
|
|
|
|
set_log_meta(_Input, #channel{clientinfo = #{clientid := ClientId}}) ->
|
|
emqx_logger:set_metadata_clientid(ClientId),
|
|
ok.
|
|
|
|
auth_connect(
|
|
_Input,
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
clientinfo = ClientInfo
|
|
}
|
|
) ->
|
|
#{
|
|
clientid := ClientId,
|
|
username := Username
|
|
} = ClientInfo,
|
|
case emqx_gateway_ctx:authenticate(Ctx, ClientInfo) of
|
|
{ok, NClientInfo} ->
|
|
{ok, Channel#channel{clientinfo = NClientInfo}};
|
|
{error, Reason} ->
|
|
?SLOG(warning, #{
|
|
msg => "client_login_failed",
|
|
username => Username,
|
|
clientid => ClientId,
|
|
reason => Reason
|
|
}),
|
|
{error, Reason}
|
|
end.
|
|
|
|
fix_mountpoint(_Packet, #{mountpoint := <<>>} = ClientInfo) ->
|
|
{ok, ClientInfo};
|
|
fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
|
|
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
|
|
{ok, ClientInfo#{mountpoint := Mountpoint1}}.
|
|
|
|
process_connect(
|
|
#channel{
|
|
ctx = Ctx,
|
|
session = Session,
|
|
conninfo = ConnInfo,
|
|
clientinfo = ClientInfo
|
|
} = Channel,
|
|
Msg,
|
|
Result,
|
|
Iter
|
|
) ->
|
|
%% inherit the old session
|
|
SessFun = fun(_, _) -> Session end,
|
|
case
|
|
emqx_gateway_ctx:open_session(
|
|
Ctx,
|
|
true,
|
|
ClientInfo,
|
|
ConnInfo,
|
|
SessFun,
|
|
emqx_coap_session
|
|
)
|
|
of
|
|
{ok, _Sess} ->
|
|
%% FIXME: Token in cluster wide?
|
|
RandVal = rand:uniform(?TOKEN_MAXIMUM),
|
|
Token = erlang:list_to_binary(erlang:integer_to_list(RandVal)),
|
|
NResult = Result#{events => [{event, connected}]},
|
|
iter(
|
|
Iter,
|
|
reply({ok, created}, Token, Msg, NResult),
|
|
Channel#channel{token = Token}
|
|
);
|
|
{error, Reason} ->
|
|
?SLOG(error, #{
|
|
msg => "failed_open_session",
|
|
clientid => maps:get(clientid, ClientInfo),
|
|
reason => Reason
|
|
}),
|
|
iter(Iter, reply({error, bad_request}, Msg, Result), Channel)
|
|
end.
|
|
|
|
run_hooks(Ctx, Name, Args) ->
|
|
emqx_gateway_ctx:metrics_inc(Ctx, Name),
|
|
emqx_hooks:run(Name, Args).
|
|
|
|
run_hooks(Ctx, Name, Args, Acc) ->
|
|
emqx_gateway_ctx:metrics_inc(Ctx, Name),
|
|
emqx_hooks:run_fold(Name, Args, Acc).
|
|
|
|
metrics_inc(Name, Ctx) ->
|
|
emqx_gateway_ctx:metrics_inc(Ctx, Name).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Ensure connected
|
|
|
|
ensure_connected(
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
conninfo = ConnInfo,
|
|
clientinfo = ClientInfo
|
|
}
|
|
) ->
|
|
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
|
|
_ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, #{}]),
|
|
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
|
|
Channel#channel{conninfo = NConnInfo, conn_state = connected}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Ensure disconnected
|
|
|
|
ensure_disconnected(
|
|
Reason,
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
conninfo = ConnInfo,
|
|
clientinfo = ClientInfo,
|
|
conn_state = ConnState
|
|
}
|
|
) ->
|
|
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
|
|
|
case ConnState of
|
|
connected ->
|
|
ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]);
|
|
_ ->
|
|
ok
|
|
end,
|
|
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
|
|
|
shutdown(Reason, Channel) ->
|
|
{shutdown, Reason, Channel}.
|
|
|
|
shutdown_and_reply(Reason, Reply, Channel) ->
|
|
{shutdown, Reason, Reply, Channel}.
|
|
|
|
%shutdown_and_reply(Reason, Reply, OutPkt, Channel) ->
|
|
% {shutdown, Reason, Reply, OutPkt, Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Call Chain
|
|
%%--------------------------------------------------------------------
|
|
|
|
call_session(Fun, Msg, #channel{session = Session} = Channel) ->
|
|
Result = emqx_coap_session:Fun(Msg, Session),
|
|
handle_result(Result, Channel).
|
|
|
|
handle_result(Result, Channel) ->
|
|
iter(
|
|
[
|
|
session,
|
|
fun process_session/4,
|
|
proto,
|
|
fun process_protocol/4,
|
|
reply,
|
|
fun process_reply/4,
|
|
out,
|
|
fun process_out/4,
|
|
fun process_nothing/3
|
|
],
|
|
Result,
|
|
Channel
|
|
).
|
|
|
|
call_handler(
|
|
request,
|
|
Msg,
|
|
Result,
|
|
#channel{
|
|
ctx = Ctx,
|
|
clientinfo = ClientInfo
|
|
} = Channel,
|
|
Iter
|
|
) ->
|
|
HandlerResult =
|
|
case emqx_coap_message:get_option(uri_path, Msg) of
|
|
[<<"ps">> | RestPath] ->
|
|
emqx_coap_pubsub_handler:handle_request(RestPath, Msg, Ctx, ClientInfo);
|
|
[<<"mqtt">> | RestPath] ->
|
|
emqx_coap_mqtt_handler:handle_request(RestPath, Msg, Ctx, ClientInfo);
|
|
_ ->
|
|
reply({error, bad_request}, Msg)
|
|
end,
|
|
iter(
|
|
[
|
|
connection,
|
|
fun process_connection/4,
|
|
subscribe,
|
|
fun process_subscribe/4
|
|
| Iter
|
|
],
|
|
maps:merge(Result, HandlerResult),
|
|
Channel
|
|
);
|
|
call_handler(response, {{send_request, From}, Response}, Result, Channel, Iter) ->
|
|
gen_server:reply(From, Response),
|
|
iter(Iter, Result, Channel);
|
|
call_handler(_, _, Result, Channel, Iter) ->
|
|
iter(Iter, Result, Channel).
|
|
|
|
process_session(Session, Result, Channel, Iter) ->
|
|
iter(Iter, Result, Channel#channel{session = Session}).
|
|
|
|
process_protocol({Type, Msg}, Result, Channel, Iter) ->
|
|
call_handler(Type, Msg, Result, Channel, Iter).
|
|
|
|
%% leaf node
|
|
process_out(Outs, Result, Channel, _) ->
|
|
Outs2 = lists:reverse(Outs),
|
|
Outs3 =
|
|
case maps:get(reply, Result, undefined) of
|
|
undefined ->
|
|
Outs2;
|
|
Reply ->
|
|
[Reply | Outs2]
|
|
end,
|
|
Events = maps:get(events, Result, []),
|
|
{ok, [{outgoing, Outs3}] ++ Events, Channel}.
|
|
|
|
%% leaf node
|
|
process_nothing(_, _, Channel) ->
|
|
{ok, Channel}.
|
|
|
|
process_connection(
|
|
{open, Req},
|
|
Result,
|
|
Channel = #channel{conn_state = idle},
|
|
Iter
|
|
) ->
|
|
Queries = emqx_coap_message:extract_uri_query(Req),
|
|
case
|
|
emqx_utils:pipeline(
|
|
[
|
|
fun enrich_conninfo/2,
|
|
fun run_conn_hooks/2,
|
|
fun enrich_clientinfo/2,
|
|
fun set_log_meta/2,
|
|
fun auth_connect/2
|
|
],
|
|
{Queries, Req},
|
|
Channel
|
|
)
|
|
of
|
|
{ok, _Input, NChannel} ->
|
|
process_connect(ensure_connected(NChannel), Req, Result, Iter);
|
|
{error, ReasonCode, NChannel} ->
|
|
ErrMsg = io_lib:format("Login Failed: ~ts", [ReasonCode]),
|
|
Payload = iolist_to_binary(ErrMsg),
|
|
Reply = emqx_coap_message:piggyback({error, bad_request}, Payload, Req),
|
|
process_shutdown(Reply, Result, NChannel, Iter)
|
|
end;
|
|
process_connection(
|
|
{open, Req},
|
|
Result,
|
|
Channel = #channel{
|
|
conn_state = ConnState,
|
|
clientinfo = #{clientid := ClientId}
|
|
},
|
|
Iter
|
|
) when
|
|
ConnState == connected
|
|
->
|
|
Queries = emqx_coap_message:extract_uri_query(Req),
|
|
ErrMsg0 =
|
|
case Queries of
|
|
#{<<"clientid">> := ClientId} ->
|
|
"client has connected";
|
|
#{<<"clientid">> := ReqClientId} ->
|
|
["channel has registered by: ", ReqClientId];
|
|
_ ->
|
|
"invalid queries"
|
|
end,
|
|
ErrMsg = io_lib:format("Bad Request: ~ts", [ErrMsg0]),
|
|
Payload = iolist_to_binary(ErrMsg),
|
|
iter(
|
|
Iter,
|
|
reply({error, bad_request}, Payload, Req, Result),
|
|
Channel
|
|
);
|
|
process_connection({close, Msg}, _, Channel, _) ->
|
|
Queries = emqx_coap_message:extract_uri_query(Msg),
|
|
case maps:get(<<"clientid">>, Queries, undefined) of
|
|
undefined ->
|
|
ok;
|
|
ClientId ->
|
|
%% XXX: A cluster-level connection shutdown needs to be performed here.
|
|
%%
|
|
%% due to the possibility that the current close request may be
|
|
%% from a CoAP client from another IP + Port tuple
|
|
emqx_gateway_cm:cast(coap, ClientId, close)
|
|
end,
|
|
Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
|
|
NChannel = ensure_disconnected(normal, Channel),
|
|
{shutdown, normal, Reply, NChannel}.
|
|
|
|
process_subscribe({Sub, Msg}, Result, #channel{session = Session} = Channel, Iter) ->
|
|
Result2 = emqx_coap_session:process_subscribe(Sub, Msg, Result, Session),
|
|
iter([session, fun process_session/4 | Iter], Result2, Channel).
|
|
|
|
%% leaf node
|
|
process_reply(Reply, Result, #channel{session = Session} = Channel, _) ->
|
|
Session2 = emqx_coap_session:set_reply(Reply, Session),
|
|
Outs = maps:get(out, Result, []),
|
|
Outs2 = lists:reverse(Outs),
|
|
Events = maps:get(events, Result, []),
|
|
{ok, [{outgoing, [Reply | Outs2]}] ++ Events, Channel#channel{session = Session2}}.
|
|
|
|
%% leaf node
|
|
process_shutdown(Reply, _Result, Channel, _) ->
|
|
% Outs = maps:get(out, Result, []),
|
|
% Outs2 = lists:reverse(Outs),
|
|
% Events = maps:get(events, Result, []),
|
|
{shutdown, normal, Reply, Channel}.
|