emqx/apps/emqx_gateway/src/emqx_gateway_ctx.erl

189 lines
5.4 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The gateway instance context
-module(emqx_gateway_ctx).
-export_type([context/0]).
-include("emqx_gateway.hrl").
%% @doc The running context for a Connection/Channel process.
%%
%% The `Context` encapsulates a complex structure of contextual information.
%% It is convenient to use it directly in Channel/Connection to read
%% configuration, register devices and other common operations.
%%
-type context() ::
#{
%% Gateway Name
gwname := gateway_name(),
%% FIXME: use process name instead of pid()
%% The ConnectionManager PID
cm := pid()
}.
%% Authentication circle
-export([
authenticate/2,
open_session/5,
open_session/6,
insert_channel_info/4,
set_chan_info/3,
set_chan_stats/3,
connection_closed/2
]).
%% Message circle
-export([
authorize/4
% Needless for pub/sub
%, publish/3
%, subscribe/4
]).
%% Metrics & Stats
-export([
metrics_inc/2,
metrics_inc/3
]).
%%--------------------------------------------------------------------
%% Authentication circle
%% @doc Authenticate whether the client has access to the Broker.
-spec authenticate(context(), emqx_types:clientinfo()) ->
{ok, emqx_types:clientinfo()}
| {error, any()}.
authenticate(_Ctx, ClientInfo0) ->
ClientInfo = ClientInfo0#{zone => default},
case emqx_access_control:authenticate(ClientInfo) of
{ok, AuthResult} ->
ClientInfo1 = merge_auth_result(ClientInfo, AuthResult),
{ok, eval_mountpoint(ClientInfo1)};
{error, Reason} ->
{error, Reason}
end.
%% @doc Register the session to the cluster.
%%
%% This function should be called after the client has authenticated
%% successfully so that the client can be managed in the cluster.
-spec open_session(
context(),
boolean(),
emqx_types:clientinfo(),
emqx_types:conninfo(),
fun(
(
emqx_types:clientinfo(),
emqx_types:conninfo()
) -> Session
)
) ->
{ok, #{
session := Session,
present := boolean(),
pendings => list()
}}
| {error, any()}.
open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(
Ctx,
CleanStart,
ClientInfo,
ConnInfo,
CreateSessionFun,
emqx_session
).
open_session(
_Ctx = #{gwname := GwName},
CleanStart,
ClientInfo,
ConnInfo,
CreateSessionFun,
SessionMod
) ->
emqx_gateway_cm:open_session(
GwName,
CleanStart,
ClientInfo,
ConnInfo,
CreateSessionFun,
SessionMod
).
-spec insert_channel_info(
context(),
emqx_types:clientid(),
emqx_types:infos(),
emqx_types:stats()
) -> ok.
insert_channel_info(_Ctx = #{gwname := GwName}, ClientId, Infos, Stats) ->
emqx_gateway_cm:insert_channel_info(GwName, ClientId, Infos, Stats).
%% @doc Set the Channel Info to the ConnectionManager for this client
-spec set_chan_info(
context(),
emqx_types:clientid(),
emqx_types:infos()
) -> boolean().
set_chan_info(_Ctx = #{gwname := GwName}, ClientId, Infos) ->
emqx_gateway_cm:set_chan_info(GwName, ClientId, Infos).
-spec set_chan_stats(
context(),
emqx_types:clientid(),
emqx_types:stats()
) -> boolean().
set_chan_stats(_Ctx = #{gwname := GwName}, ClientId, Stats) ->
emqx_gateway_cm:set_chan_stats(GwName, ClientId, Stats).
-spec connection_closed(context(), emqx_types:clientid()) -> boolean().
connection_closed(_Ctx = #{gwname := GwName}, ClientId) ->
emqx_gateway_cm:connection_closed(GwName, ClientId).
-spec authorize(
context(),
emqx_types:clientinfo(),
emqx_types:pubsub(),
emqx_types:topic()
) ->
allow | deny.
authorize(_Ctx, ClientInfo, Action, Topic) ->
emqx_access_control:authorize(ClientInfo, Action, Topic).
metrics_inc(_Ctx = #{gwname := GwName}, Name) ->
emqx_gateway_metrics:inc(GwName, Name).
metrics_inc(_Ctx = #{gwname := GwName}, Name, Oct) ->
emqx_gateway_metrics:inc(GwName, Name, Oct).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
eval_mountpoint(ClientInfo = #{mountpoint := undefined}) ->
ClientInfo;
eval_mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
ClientInfo#{mountpoint := MountPoint1}.
merge_auth_result(ClientInfo, AuthResult) when is_map(ClientInfo) andalso is_map(AuthResult) ->
IsSuperuser = maps:get(is_superuser, AuthResult, false),
maps:merge(ClientInfo, AuthResult#{is_superuser => IsSuperuser}).