emqx/apps/emqx_gateway/src/emqx_gateway_ctx.erl

146 lines
5.3 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The gateway instance context
-module(emqx_gateway_ctx).
-include("include/emqx_gateway.hrl").
%% @doc The running context for a Connection/Channel process.
%%
%% The `Context` encapsulates a complex structure of contextual information.
%% It is convenient to use it directly in Channel/Connection to read
%% configuration, register devices and other common operations.
%%
-type context() ::
#{ %% Gateway Name
gwname := gateway_name()
%% Authentication chains
, auth := [emqx_authentication:chain_name()]
%% The ConnectionManager PID
, cm := pid()
}.
%% Authentication circle
-export([ authenticate/2
, open_session/5
, open_session/6
, insert_channel_info/4
, set_chan_info/3
, set_chan_stats/3
, connection_closed/2
]).
%% Message circle
-export([ authorize/4
% Needless for pub/sub
%, publish/3
%, subscribe/4
]).
%% Metrics & Stats
-export([ metrics_inc/2
, metrics_inc/3
]).
%%--------------------------------------------------------------------
%% Authentication circle
%% @doc Authenticate whether the client has access to the Broker.
-spec authenticate(context(), emqx_types:clientinfo())
-> {ok, emqx_types:clientinfo()}
| {error, any()}.
authenticate(_Ctx = #{auth := _ChainNames}, ClientInfo0)
when is_list(_ChainNames) ->
ClientInfo = ClientInfo0#{zone => default},
case emqx_access_control:authenticate(ClientInfo) of
{ok, _} ->
{ok, mountpoint(ClientInfo)};
{error, Reason} ->
{error, Reason}
end.
%% @doc Register the session to the cluster.
%%
%% This function should be called after the client has authenticated
%% successfully so that the client can be managed in the cluster.
-spec open_session(context(), boolean(), emqx_types:clientinfo(),
emqx_types:conninfo(),
fun((emqx_types:clientinfo(),
emqx_types:conninfo()) -> Session)
)
-> {ok, #{session := Session,
present := boolean(),
pendings => list()
}}
| {error, any()}.
open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(Ctx, CleanStart, ClientInfo, ConnInfo,
CreateSessionFun, emqx_session).
open_session(_Ctx = #{gwname := GwName},
CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
emqx_gateway_cm:open_session(GwName, CleanStart,
ClientInfo, ConnInfo,
CreateSessionFun, SessionMod).
-spec insert_channel_info(context(),
emqx_types:clientid(),
emqx_types:infos(),
emqx_types:stats()) -> ok.
insert_channel_info(_Ctx = #{gwname := GwName}, ClientId, Infos, Stats) ->
emqx_gateway_cm:insert_channel_info(GwName, ClientId, Infos, Stats).
%% @doc Set the Channel Info to the ConnectionManager for this client
-spec set_chan_info(context(),
emqx_types:clientid(),
emqx_types:infos()) -> boolean().
set_chan_info(_Ctx = #{gwname := GwName}, ClientId, Infos) ->
emqx_gateway_cm:set_chan_info(GwName, ClientId, Infos).
-spec set_chan_stats(context(),
emqx_types:clientid(),
emqx_types:stats()) -> boolean().
set_chan_stats(_Ctx = #{gwname := GwName}, ClientId, Stats) ->
emqx_gateway_cm:set_chan_stats(GwName, ClientId, Stats).
-spec connection_closed(context(), emqx_types:clientid()) -> boolean().
connection_closed(_Ctx = #{gwname := GwName}, ClientId) ->
emqx_gateway_cm:connection_closed(GwName, ClientId).
-spec authorize(context(), emqx_types:clientinfo(),
emqx_types:pubsub(), emqx_types:topic())
-> allow | deny.
authorize(_Ctx, ClientInfo, PubSub, Topic) ->
emqx_access_control:authorize(ClientInfo, PubSub, Topic).
metrics_inc(_Ctx = #{gwname := GwName}, Name) ->
emqx_gateway_metrics:inc(GwName, Name).
metrics_inc(_Ctx = #{gwname := GwName}, Name, Oct) ->
emqx_gateway_metrics:inc(GwName, Name, Oct).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
mountpoint(ClientInfo = #{mountpoint := undefined}) ->
ClientInfo;
mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
ClientInfo#{mountpoint := MountPoint1}.