189 lines
5.4 KiB
Erlang
189 lines
5.4 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc The gateway instance context
|
|
-module(emqx_gateway_ctx).
|
|
|
|
-export_type([context/0]).
|
|
|
|
-include("emqx_gateway.hrl").
|
|
|
|
%% @doc The running context for a Connection/Channel process.
|
|
%%
|
|
%% The `Context` encapsulates a complex structure of contextual information.
|
|
%% It is convenient to use it directly in Channel/Connection to read
|
|
%% configuration, register devices and other common operations.
|
|
%%
|
|
-type context() ::
|
|
#{
|
|
%% Gateway Name
|
|
gwname := gateway_name(),
|
|
%% FIXME: use process name instead of pid()
|
|
%% The ConnectionManager PID
|
|
cm := pid()
|
|
}.
|
|
|
|
%% Authentication circle
|
|
-export([
|
|
authenticate/2,
|
|
open_session/5,
|
|
open_session/6,
|
|
insert_channel_info/4,
|
|
set_chan_info/3,
|
|
set_chan_stats/3,
|
|
connection_closed/2
|
|
]).
|
|
|
|
%% Message circle
|
|
-export([
|
|
authorize/4
|
|
% Needless for pub/sub
|
|
%, publish/3
|
|
%, subscribe/4
|
|
]).
|
|
|
|
%% Metrics & Stats
|
|
-export([
|
|
metrics_inc/2,
|
|
metrics_inc/3
|
|
]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Authentication circle
|
|
|
|
%% @doc Authenticate whether the client has access to the Broker.
|
|
-spec authenticate(context(), emqx_types:clientinfo()) ->
|
|
{ok, emqx_types:clientinfo()}
|
|
| {error, any()}.
|
|
authenticate(_Ctx, ClientInfo0) ->
|
|
ClientInfo = ClientInfo0#{zone => default},
|
|
case emqx_access_control:authenticate(ClientInfo) of
|
|
{ok, AuthResult} ->
|
|
ClientInfo1 = merge_auth_result(ClientInfo, AuthResult),
|
|
{ok, eval_mountpoint(ClientInfo1)};
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
%% @doc Register the session to the cluster.
|
|
%%
|
|
%% This function should be called after the client has authenticated
|
|
%% successfully so that the client can be managed in the cluster.
|
|
-spec open_session(
|
|
context(),
|
|
boolean(),
|
|
emqx_types:clientinfo(),
|
|
emqx_types:conninfo(),
|
|
fun(
|
|
(
|
|
emqx_types:clientinfo(),
|
|
emqx_types:conninfo()
|
|
) -> Session
|
|
)
|
|
) ->
|
|
{ok, #{
|
|
session := Session,
|
|
present := boolean(),
|
|
pendings => list()
|
|
}}
|
|
| {error, any()}.
|
|
open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
|
|
open_session(
|
|
Ctx,
|
|
CleanStart,
|
|
ClientInfo,
|
|
ConnInfo,
|
|
CreateSessionFun,
|
|
emqx_session
|
|
).
|
|
|
|
open_session(
|
|
_Ctx = #{gwname := GwName},
|
|
CleanStart,
|
|
ClientInfo,
|
|
ConnInfo,
|
|
CreateSessionFun,
|
|
SessionMod
|
|
) ->
|
|
emqx_gateway_cm:open_session(
|
|
GwName,
|
|
CleanStart,
|
|
ClientInfo,
|
|
ConnInfo,
|
|
CreateSessionFun,
|
|
SessionMod
|
|
).
|
|
|
|
-spec insert_channel_info(
|
|
context(),
|
|
emqx_types:clientid(),
|
|
emqx_types:infos(),
|
|
emqx_types:stats()
|
|
) -> ok.
|
|
insert_channel_info(_Ctx = #{gwname := GwName}, ClientId, Infos, Stats) ->
|
|
emqx_gateway_cm:insert_channel_info(GwName, ClientId, Infos, Stats).
|
|
|
|
%% @doc Set the Channel Info to the ConnectionManager for this client
|
|
-spec set_chan_info(
|
|
context(),
|
|
emqx_types:clientid(),
|
|
emqx_types:infos()
|
|
) -> boolean().
|
|
set_chan_info(_Ctx = #{gwname := GwName}, ClientId, Infos) ->
|
|
emqx_gateway_cm:set_chan_info(GwName, ClientId, Infos).
|
|
|
|
-spec set_chan_stats(
|
|
context(),
|
|
emqx_types:clientid(),
|
|
emqx_types:stats()
|
|
) -> boolean().
|
|
set_chan_stats(_Ctx = #{gwname := GwName}, ClientId, Stats) ->
|
|
emqx_gateway_cm:set_chan_stats(GwName, ClientId, Stats).
|
|
|
|
-spec connection_closed(context(), emqx_types:clientid()) -> boolean().
|
|
connection_closed(_Ctx = #{gwname := GwName}, ClientId) ->
|
|
emqx_gateway_cm:connection_closed(GwName, ClientId).
|
|
|
|
-spec authorize(
|
|
context(),
|
|
emqx_types:clientinfo(),
|
|
emqx_types:pubsub(),
|
|
emqx_types:topic()
|
|
) ->
|
|
allow | deny.
|
|
authorize(_Ctx, ClientInfo, Action, Topic) ->
|
|
emqx_access_control:authorize(ClientInfo, Action, Topic).
|
|
|
|
metrics_inc(_Ctx = #{gwname := GwName}, Name) ->
|
|
emqx_gateway_metrics:inc(GwName, Name).
|
|
|
|
metrics_inc(_Ctx = #{gwname := GwName}, Name, Oct) ->
|
|
emqx_gateway_metrics:inc(GwName, Name, Oct).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal funcs
|
|
%%--------------------------------------------------------------------
|
|
|
|
eval_mountpoint(ClientInfo = #{mountpoint := undefined}) ->
|
|
ClientInfo;
|
|
eval_mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
|
|
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
|
|
ClientInfo#{mountpoint := MountPoint1}.
|
|
|
|
merge_auth_result(ClientInfo, AuthResult) when is_map(ClientInfo) andalso is_map(AuthResult) ->
|
|
IsSuperuser = maps:get(is_superuser, AuthResult, false),
|
|
maps:merge(ClientInfo, AuthResult#{is_superuser => IsSuperuser}).
|