811 lines
26 KiB
Erlang
811 lines
26 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_exproto_channel).
|
|
|
|
-include("emqx_exproto.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("emqx/include/types.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-export([
|
|
info/1,
|
|
info/2,
|
|
stats/1
|
|
]).
|
|
|
|
-export([
|
|
init/2,
|
|
handle_in/2,
|
|
handle_deliver/2,
|
|
handle_timeout/3,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_info/2,
|
|
terminate/2
|
|
]).
|
|
|
|
-export_type([channel/0]).
|
|
|
|
-record(channel, {
|
|
%% Context
|
|
ctx :: emqx_gateway_ctx:context(),
|
|
%% gRPC channel options
|
|
gcli :: emqx_exproto_gcli:grpc_client_state(),
|
|
%% Conn info
|
|
conninfo :: emqx_types:conninfo(),
|
|
%% Client info from `register` function
|
|
clientinfo :: maybe(map()),
|
|
%% Connection state
|
|
conn_state :: conn_state(),
|
|
%% Subscription
|
|
subscriptions = #{},
|
|
%% Keepalive
|
|
keepalive :: maybe(emqx_keepalive:keepalive()),
|
|
%% Timers
|
|
timers :: #{atom() => disabled | maybe(reference())},
|
|
%% Closed reason
|
|
closed_reason = undefined
|
|
}).
|
|
|
|
-opaque channel() :: #channel{}.
|
|
|
|
-type conn_state() :: idle | connecting | connected | disconnected.
|
|
|
|
-type reply() ::
|
|
{outgoing, binary()}
|
|
| {outgoing, [binary()]}
|
|
| {close, Reason :: atom()}.
|
|
|
|
-type replies() :: emqx_types:packet() | reply() | [reply()].
|
|
|
|
-define(TIMER_TABLE, #{
|
|
alive_timer => keepalive,
|
|
force_timer => force_close,
|
|
idle_timer => force_close_idle
|
|
}).
|
|
|
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Info, Attrs and Caps
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc Get infos of the channel.
|
|
-spec info(channel()) -> emqx_types:infos().
|
|
info(Channel) ->
|
|
maps:from_list(info(?INFO_KEYS, Channel)).
|
|
|
|
-spec info(list(atom()) | atom(), channel()) -> term().
|
|
info(Keys, Channel) when is_list(Keys) ->
|
|
[{Key, info(Key, Channel)} || Key <- Keys];
|
|
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
|
ConnInfo;
|
|
info(clientid, #channel{clientinfo = ClientInfo}) ->
|
|
maps:get(clientid, ClientInfo, undefined);
|
|
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
|
ClientInfo;
|
|
info(session, #channel{
|
|
subscriptions = Subs,
|
|
conninfo = ConnInfo
|
|
}) ->
|
|
#{
|
|
subscriptions => Subs,
|
|
upgrade_qos => false,
|
|
retry_interval => 0,
|
|
await_rel_timeout => 0,
|
|
created_at => maps:get(connected_at, ConnInfo)
|
|
};
|
|
info(conn_state, #channel{conn_state = ConnState}) ->
|
|
ConnState;
|
|
info(will_msg, _) ->
|
|
undefined;
|
|
info(ctx, #channel{ctx = Ctx}) ->
|
|
Ctx.
|
|
|
|
-spec stats(channel()) -> emqx_types:stats().
|
|
stats(#channel{subscriptions = Subs}) ->
|
|
[
|
|
{subscriptions_cnt, maps:size(Subs)},
|
|
{subscriptions_max, 0},
|
|
{inflight_cnt, 0},
|
|
{inflight_max, 0},
|
|
{mqueue_len, 0},
|
|
{mqueue_max, 0},
|
|
{mqueue_dropped, 0},
|
|
{next_pkt_id, 0},
|
|
{awaiting_rel_cnt, 0},
|
|
{awaiting_rel_max, 0}
|
|
].
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Init the channel
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec init(emqx_exproto_types:conninfo(), map()) -> channel().
|
|
init(
|
|
ConnInfo = #{
|
|
socktype := Socktype,
|
|
peername := Peername,
|
|
sockname := Sockname,
|
|
peercert := Peercert
|
|
},
|
|
Options
|
|
) ->
|
|
GRpcChann = maps:get(grpc_client_channel, Options),
|
|
ServiceName = maps:get(grpc_client_service_name, Options),
|
|
GRpcClient = emqx_exproto_gcli:init(ServiceName, #{channel => GRpcChann}),
|
|
|
|
Ctx = maps:get(ctx, Options),
|
|
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
|
|
|
|
NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}),
|
|
ListenerId =
|
|
case maps:get(listener, Options, undefined) of
|
|
undefined -> undefined;
|
|
{GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName)
|
|
end,
|
|
|
|
EnableAuthn = maps:get(enable_authn, Options, true),
|
|
DefaultClientInfo = default_clientinfo(NConnInfo),
|
|
ClientInfo = DefaultClientInfo#{
|
|
listener => ListenerId,
|
|
enable_authn => EnableAuthn
|
|
},
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
gcli = GRpcClient,
|
|
conninfo = NConnInfo,
|
|
clientinfo = ClientInfo,
|
|
conn_state = connecting,
|
|
timers = #{}
|
|
},
|
|
|
|
Req = #{
|
|
conninfo =>
|
|
peercert(
|
|
Peercert,
|
|
#{
|
|
socktype => socktype(Socktype),
|
|
peername => address(Peername),
|
|
sockname => address(Sockname)
|
|
}
|
|
)
|
|
},
|
|
dispatch(on_socket_created, Req, start_idle_checking_timer(Channel)).
|
|
|
|
%% @private
|
|
peercert(NoSsl, ConnInfo) when
|
|
NoSsl == nossl;
|
|
NoSsl == undefined
|
|
->
|
|
ConnInfo;
|
|
peercert(Peercert, ConnInfo) ->
|
|
Fn = fun(_, V) -> V =/= undefined end,
|
|
Infos = maps:filter(
|
|
Fn,
|
|
#{
|
|
cn => esockd_peercert:common_name(Peercert),
|
|
dn => esockd_peercert:subject(Peercert)
|
|
}
|
|
),
|
|
case maps:size(Infos) of
|
|
0 ->
|
|
ConnInfo;
|
|
_ ->
|
|
ConnInfo#{peercert => Infos}
|
|
end.
|
|
|
|
%% @private
|
|
socktype(tcp) -> 'TCP';
|
|
socktype(ssl) -> 'SSL';
|
|
socktype(udp) -> 'UDP';
|
|
socktype(dtls) -> 'DTLS'.
|
|
|
|
%% @private
|
|
address({Host, Port}) ->
|
|
#{host => inet:ntoa(Host), port => Port}.
|
|
|
|
%% avoid udp connection process leak
|
|
start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) ->
|
|
ensure_timer(idle_timer, Channel);
|
|
start_idle_checking_timer(Channel) ->
|
|
Channel.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle incoming packet
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec handle_in(binary(), channel()) ->
|
|
{ok, channel()}
|
|
| {shutdown, Reason :: term(), channel()}.
|
|
handle_in(Data, Channel) ->
|
|
Req = #{bytes => Data},
|
|
{ok, dispatch(on_received_bytes, Req, Channel)}.
|
|
|
|
-spec handle_deliver(list(emqx_types:deliver()), channel()) ->
|
|
{ok, channel()}
|
|
| {shutdown, Reason :: term(), channel()}.
|
|
handle_deliver(
|
|
Delivers,
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
clientinfo = ClientInfo
|
|
}
|
|
) ->
|
|
%% XXX: ?? Nack delivers from shared subscriptions
|
|
Mountpoint = maps:get(mountpoint, ClientInfo),
|
|
NodeStr = atom_to_binary(node(), utf8),
|
|
Msgs = lists:map(
|
|
fun({_, _, Msg}) ->
|
|
ok = metrics_inc(Ctx, 'messages.delivered'),
|
|
Msg1 = emqx_hooks:run_fold(
|
|
'message.delivered',
|
|
[ClientInfo],
|
|
Msg
|
|
),
|
|
NMsg = emqx_mountpoint:unmount(Mountpoint, Msg1),
|
|
#{
|
|
node => NodeStr,
|
|
id => emqx_guid:to_hexstr(emqx_message:id(NMsg)),
|
|
qos => emqx_message:qos(NMsg),
|
|
from => fmt_from(emqx_message:from(NMsg)),
|
|
topic => emqx_message:topic(NMsg),
|
|
payload => emqx_message:payload(NMsg),
|
|
timestamp => emqx_message:timestamp(NMsg)
|
|
}
|
|
end,
|
|
Delivers
|
|
),
|
|
Req = #{messages => Msgs},
|
|
{ok, dispatch(on_received_messages, Req, Channel)}.
|
|
|
|
-spec handle_timeout(reference(), Msg :: term(), channel()) ->
|
|
{ok, channel()}
|
|
| {shutdown, Reason :: term(), channel()}.
|
|
handle_timeout(
|
|
_TRef,
|
|
{keepalive, _StatVal},
|
|
Channel = #channel{keepalive = undefined}
|
|
) ->
|
|
{ok, Channel};
|
|
handle_timeout(
|
|
_TRef,
|
|
{keepalive, StatVal},
|
|
Channel = #channel{keepalive = Keepalive}
|
|
) ->
|
|
case emqx_keepalive:check(StatVal, Keepalive) of
|
|
{ok, NKeepalive} ->
|
|
NChannel = Channel#channel{keepalive = NKeepalive},
|
|
{ok, reset_timer(alive_timer, NChannel)};
|
|
{error, timeout} ->
|
|
Req = #{type => 'KEEPALIVE'},
|
|
NChannel = remove_timer_ref(alive_timer, Channel),
|
|
%% close connection if keepalive timeout
|
|
Replies = [{event, disconnected}, {close, keepalive_timeout}],
|
|
NChannel1 = dispatch(on_timer_timeout, Req, NChannel#channel{
|
|
closed_reason = keepalive_timeout
|
|
}),
|
|
{ok, Replies, NChannel1}
|
|
end;
|
|
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
|
{shutdown, Reason, Channel};
|
|
handle_timeout(_TRef, force_close_idle, Channel) ->
|
|
{shutdown, idle_timeout, Channel};
|
|
handle_timeout(_TRef, Msg, Channel) ->
|
|
?SLOG(warning, #{
|
|
msg => "unexpected_timeout_signal",
|
|
signal => Msg
|
|
}),
|
|
{ok, Channel}.
|
|
|
|
-spec handle_call(Req :: any(), From :: any(), channel()) ->
|
|
{reply, Reply :: term(), channel()}
|
|
| {reply, Reply :: term(), replies(), channel()}
|
|
| {shutdown, Reason :: term(), Reply :: term(), channel()}.
|
|
|
|
handle_call({send, Data}, _From, Channel) ->
|
|
{reply, ok, [{outgoing, Data}], Channel};
|
|
handle_call(close, _From, Channel = #channel{conn_state = connected}) ->
|
|
{reply, ok, [{event, disconnected}, {close, normal}], Channel};
|
|
handle_call(close, _From, Channel) ->
|
|
{reply, ok, [{close, normal}], Channel};
|
|
handle_call(
|
|
{auth, ClientInfo, _Password},
|
|
_From,
|
|
Channel = #channel{conn_state = connected}
|
|
) ->
|
|
?SLOG(warning, #{
|
|
msg => "ingore_duplicated_authenticate_command",
|
|
request_clientinfo => ClientInfo
|
|
}),
|
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
|
|
handle_call(
|
|
{auth, ClientInfo, _Password},
|
|
_From,
|
|
Channel = #channel{conn_state = disconnected}
|
|
) ->
|
|
?SLOG(warning, #{
|
|
msg => "authenticate_command_after_socket_disconnected",
|
|
request_clientinfo => ClientInfo
|
|
}),
|
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"Client socket disconnected">>}, Channel};
|
|
handle_call(
|
|
{auth, ClientInfo0, Password},
|
|
_From,
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
conninfo = ConnInfo,
|
|
clientinfo = ClientInfo
|
|
}
|
|
) ->
|
|
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
|
|
ConnInfo1 = enrich_conninfo(ClientInfo0, ConnInfo),
|
|
|
|
Channel1 = Channel#channel{
|
|
conninfo = ConnInfo1,
|
|
clientinfo = ClientInfo1
|
|
},
|
|
|
|
#{clientid := ClientId, username := Username} = ClientInfo1,
|
|
|
|
case
|
|
emqx_gateway_ctx:authenticate(
|
|
Ctx, ClientInfo1#{password => Password}
|
|
)
|
|
of
|
|
{ok, NClientInfo} ->
|
|
SessFun = fun(_, _) -> #{} end,
|
|
emqx_logger:set_metadata_clientid(ClientId),
|
|
case
|
|
emqx_gateway_ctx:open_session(
|
|
Ctx,
|
|
true,
|
|
NClientInfo,
|
|
ConnInfo1,
|
|
SessFun
|
|
)
|
|
of
|
|
{ok, _Session} ->
|
|
?SLOG(debug, #{
|
|
msg => "client_login_succeed",
|
|
clientid => ClientId,
|
|
username => Username
|
|
}),
|
|
{reply, ok, [{event, connected}],
|
|
ensure_connected(Channel1#channel{clientinfo = NClientInfo})};
|
|
{error, Reason} ->
|
|
?SLOG(warning, #{
|
|
msg => "client_login_failed",
|
|
clientid => ClientId,
|
|
username => Username,
|
|
reason => Reason
|
|
}),
|
|
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
|
|
end;
|
|
{error, Reason} ->
|
|
?SLOG(warning, #{
|
|
msg => "client_login_failed",
|
|
clientid => ClientId,
|
|
username => Username,
|
|
reason => Reason
|
|
}),
|
|
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
|
|
end;
|
|
handle_call(
|
|
{start_timer, keepalive, Interval},
|
|
_From,
|
|
Channel = #channel{
|
|
conninfo = ConnInfo,
|
|
clientinfo = ClientInfo
|
|
}
|
|
) ->
|
|
NConnInfo = ConnInfo#{keepalive => Interval},
|
|
NClientInfo = ClientInfo#{keepalive => Interval},
|
|
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
|
|
{reply, ok, [{event, updated}], ensure_keepalive(cancel_timer(idle_timer, NChannel))};
|
|
handle_call(
|
|
{subscribe_from_client, TopicFilter, Qos},
|
|
_From,
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
conn_state = connected,
|
|
clientinfo = ClientInfo
|
|
}
|
|
) ->
|
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicFilter) of
|
|
deny ->
|
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
|
|
_ ->
|
|
{ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
|
|
{reply, ok, [{event, updated}], NChannel}
|
|
end;
|
|
handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
|
|
{ok, [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel),
|
|
{reply, {ok, {NTopicFilter, NSubOpts}}, [{event, updated}], NChannel};
|
|
handle_call(
|
|
{unsubscribe_from_client, TopicFilter},
|
|
_From,
|
|
Channel = #channel{conn_state = connected}
|
|
) ->
|
|
{ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
|
|
{reply, ok, [{event, updated}], NChannel};
|
|
handle_call({unsubscribe, Topic}, _From, Channel) ->
|
|
{ok, NChannel} = do_unsubscribe([Topic], Channel),
|
|
{reply, ok, [{event, update}], NChannel};
|
|
handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) ->
|
|
{reply, {ok, maps:to_list(Subs)}, Channel};
|
|
handle_call(
|
|
{publish, Topic, Qos, Payload},
|
|
_From,
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
conn_state = connected,
|
|
clientinfo =
|
|
ClientInfo =
|
|
#{
|
|
clientid := From,
|
|
mountpoint := Mountpoint
|
|
}
|
|
}
|
|
) ->
|
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
|
|
deny ->
|
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
|
|
_ ->
|
|
Msg = emqx_message:make(From, Qos, Topic, Payload),
|
|
NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
|
|
_ = emqx:publish(NMsg),
|
|
{reply, ok, Channel}
|
|
end;
|
|
handle_call(kick, _From, Channel) ->
|
|
{reply, ok, [{event, disconnected}, {close, kicked}], Channel};
|
|
handle_call(discard, _From, Channel) ->
|
|
{shutdown, discarded, ok, Channel};
|
|
handle_call(
|
|
Req,
|
|
_From,
|
|
Channel = #channel{
|
|
conn_state = ConnState,
|
|
clientinfo = ClientInfo,
|
|
closed_reason = ClosedReason
|
|
}
|
|
) ->
|
|
?SLOG(warning, #{
|
|
msg => "unexpected_call",
|
|
call => Req,
|
|
conn_state => ConnState,
|
|
clientid => maps:get(clientid, ClientInfo, undefined),
|
|
closed_reason => ClosedReason
|
|
}),
|
|
{reply, {error, unexpected_call}, Channel}.
|
|
|
|
-spec handle_cast(any(), channel()) ->
|
|
{ok, channel()}
|
|
| {ok, replies(), channel()}
|
|
| {shutdown, Reason :: term(), channel()}.
|
|
handle_cast(Req, Channel) ->
|
|
?SLOG(warning, #{
|
|
msg => "unexpected_call",
|
|
call => Req
|
|
}),
|
|
{ok, Channel}.
|
|
|
|
-spec handle_info(any(), channel()) ->
|
|
{ok, channel()}
|
|
| {shutdown, Reason :: term(), channel()}.
|
|
handle_info(
|
|
{sock_closed, Reason},
|
|
Channel = #channel{gcli = GClient, closed_reason = ClosedReason}
|
|
) ->
|
|
case emqx_exproto_gcli:is_empty(GClient) of
|
|
true ->
|
|
Channel1 = ensure_disconnected(Reason, Channel),
|
|
{shutdown, Reason, Channel1};
|
|
_ ->
|
|
%% delayed close process for flushing all callback funcs to gRPC server
|
|
Channel1 =
|
|
case ClosedReason of
|
|
undefined ->
|
|
Channel#channel{closed_reason = Reason};
|
|
_ ->
|
|
Channel
|
|
end,
|
|
Channel2 = ensure_timer(force_timer, Channel1),
|
|
{ok, ensure_disconnected(Reason, Channel2)}
|
|
end;
|
|
handle_info(
|
|
{hreply, FunName, Result},
|
|
Channel0 = #channel{gcli = GClient0, timers = Timers}
|
|
) when
|
|
FunName =:= on_socket_created;
|
|
FunName =:= on_socket_closed;
|
|
FunName =:= on_received_bytes;
|
|
FunName =:= on_received_messages;
|
|
FunName =:= on_timer_timeout
|
|
->
|
|
GClient = emqx_exproto_gcli:ack(FunName, GClient0),
|
|
Channel = Channel0#channel{gcli = GClient},
|
|
|
|
ShutdownNow =
|
|
emqx_exproto_gcli:is_empty(GClient) andalso
|
|
maps:get(force_timer, Timers, undefined) =/= undefined,
|
|
case Result of
|
|
ok when not ShutdownNow ->
|
|
GClient1 = emqx_exproto_gcli:maybe_shoot(GClient),
|
|
{ok, Channel#channel{gcli = GClient1}};
|
|
ok when ShutdownNow ->
|
|
Channel1 = cancel_timer(force_timer, Channel),
|
|
{shutdown, Channel1#channel.closed_reason, Channel1};
|
|
{error, Reason} ->
|
|
{shutdown, {error, {FunName, Reason}}, Channel}
|
|
end;
|
|
handle_info({subscribe, _}, Channel) ->
|
|
{ok, Channel};
|
|
handle_info(Info, Channel) ->
|
|
?SLOG(warning, #{
|
|
msg => "unexpected_info",
|
|
info => Info
|
|
}),
|
|
{ok, Channel}.
|
|
|
|
-spec terminate(any(), channel()) -> channel().
|
|
terminate(Reason, Channel) ->
|
|
Req = #{reason => stringfy(Reason)},
|
|
%% XXX: close streams?
|
|
dispatch(on_socket_closed, Req, Channel).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Sub/UnSub
|
|
%%--------------------------------------------------------------------
|
|
|
|
do_subscribe(TopicFilters, Channel) ->
|
|
{MadeSubs, NChannel} = lists:foldl(
|
|
fun({TopicFilter, SubOpts}, {MadeSubs, ChannelAcc}) ->
|
|
{Sub, Channel1} = do_subscribe(TopicFilter, SubOpts, ChannelAcc),
|
|
{MadeSubs ++ [Sub], Channel1}
|
|
end,
|
|
{[], Channel},
|
|
parse_topic_filters(TopicFilters)
|
|
),
|
|
{ok, MadeSubs, NChannel}.
|
|
|
|
%% @private
|
|
do_subscribe(
|
|
TopicFilter,
|
|
SubOpts,
|
|
Channel =
|
|
#channel{
|
|
clientinfo = ClientInfo = #{mountpoint := Mountpoint},
|
|
subscriptions = Subs
|
|
}
|
|
) ->
|
|
%% Mountpoint first
|
|
NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter),
|
|
NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts),
|
|
SubId = maps:get(clientid, ClientInfo, undefined),
|
|
%% XXX: is_new?
|
|
IsNew = not maps:is_key(NTopicFilter, Subs),
|
|
case IsNew of
|
|
true ->
|
|
ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
|
|
ok = emqx_hooks:run(
|
|
'session.subscribed',
|
|
[ClientInfo, NTopicFilter, NSubOpts#{is_new => IsNew}]
|
|
),
|
|
{{NTopicFilter, NSubOpts}, Channel#channel{
|
|
subscriptions = Subs#{NTopicFilter => NSubOpts}
|
|
}};
|
|
_ ->
|
|
%% Update subopts
|
|
ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
|
|
{{NTopicFilter, NSubOpts}, Channel#channel{
|
|
subscriptions = Subs#{NTopicFilter => NSubOpts}
|
|
}}
|
|
end.
|
|
|
|
do_unsubscribe(TopicFilters, Channel) ->
|
|
NChannel = lists:foldl(
|
|
fun({TopicFilter, SubOpts}, ChannelAcc) ->
|
|
do_unsubscribe(TopicFilter, SubOpts, ChannelAcc)
|
|
end,
|
|
Channel,
|
|
parse_topic_filters(TopicFilters)
|
|
),
|
|
{ok, NChannel}.
|
|
|
|
%% @private
|
|
do_unsubscribe(
|
|
TopicFilter,
|
|
UnSubOpts,
|
|
Channel =
|
|
#channel{
|
|
clientinfo = ClientInfo = #{mountpoint := Mountpoint},
|
|
subscriptions = Subs
|
|
}
|
|
) ->
|
|
NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter),
|
|
case maps:find(NTopicFilter, Subs) of
|
|
{ok, SubOpts} ->
|
|
ok = emqx:unsubscribe(NTopicFilter),
|
|
ok = emqx_hooks:run(
|
|
'session.unsubscribed',
|
|
[ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]
|
|
),
|
|
Channel#channel{subscriptions = maps:remove(NTopicFilter, Subs)};
|
|
_ ->
|
|
Channel
|
|
end.
|
|
|
|
%% @private
|
|
parse_topic_filters(TopicFilters) ->
|
|
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Ensure & Hooks
|
|
%%--------------------------------------------------------------------
|
|
|
|
ensure_connected(
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
conninfo = ConnInfo,
|
|
clientinfo = ClientInfo
|
|
}
|
|
) ->
|
|
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
|
|
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
|
|
Channel#channel{
|
|
conninfo = NConnInfo,
|
|
conn_state = connected
|
|
}.
|
|
|
|
ensure_disconnected(
|
|
Reason,
|
|
Channel = #channel{
|
|
ctx = Ctx,
|
|
conn_state = connected,
|
|
conninfo = ConnInfo,
|
|
clientinfo = ClientInfo
|
|
}
|
|
) ->
|
|
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
|
ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
|
Channel#channel{conninfo = NConnInfo, conn_state = disconnected};
|
|
ensure_disconnected(_Reason, Channel = #channel{conninfo = ConnInfo}) ->
|
|
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
|
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
|
|
|
run_hooks(Ctx, Name, Args) ->
|
|
emqx_gateway_ctx:metrics_inc(Ctx, Name),
|
|
emqx_hooks:run(Name, Args).
|
|
|
|
metrics_inc(Ctx, Name) ->
|
|
emqx_gateway_ctx:metrics_inc(Ctx, Name).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enrich Keepalive
|
|
|
|
ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) ->
|
|
ensure_keepalive_timer(maps:get(keepalive, ClientInfo, 0), Channel).
|
|
|
|
ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
|
|
Channel;
|
|
ensure_keepalive_timer(Interval, Channel) ->
|
|
StatVal = emqx_gateway_conn:keepalive_stats(recv),
|
|
Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)),
|
|
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
|
|
|
|
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
|
|
TRef = maps:get(Name, Timers, undefined),
|
|
Time = interval(Name, Channel),
|
|
case TRef == undefined andalso Time > 0 of
|
|
true -> ensure_timer(Name, Time, Channel);
|
|
%% Timer disabled or exists
|
|
false -> Channel
|
|
end.
|
|
|
|
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
|
|
Msg = maps:get(Name, ?TIMER_TABLE),
|
|
TRef = emqx_utils:start_timer(Time, Msg),
|
|
Channel#channel{timers = Timers#{Name => TRef}}.
|
|
|
|
reset_timer(Name, Channel) ->
|
|
ensure_timer(Name, remove_timer_ref(Name, Channel)).
|
|
|
|
cancel_timer(Name, Channel = #channel{timers = Timers}) ->
|
|
emqx_utils:cancel_timer(maps:get(Name, Timers, undefined)),
|
|
remove_timer_ref(Name, Channel).
|
|
|
|
remove_timer_ref(Name, Channel = #channel{timers = Timers}) ->
|
|
Channel#channel{timers = maps:remove(Name, Timers)}.
|
|
|
|
interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
|
|
IdleTimeout;
|
|
interval(force_timer, _) ->
|
|
15000;
|
|
interval(alive_timer, #channel{keepalive = Keepalive}) ->
|
|
emqx_keepalive:info(interval, Keepalive).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Dispatch
|
|
%%--------------------------------------------------------------------
|
|
|
|
dispatch(FunName, Req, Channel = #channel{gcli = GClient}) ->
|
|
Req1 = Req#{conn => base64:encode(term_to_binary(self()))},
|
|
NGClient = emqx_exproto_gcli:maybe_shoot(FunName, Req1, GClient),
|
|
Channel#channel{gcli = NGClient}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Format
|
|
%%--------------------------------------------------------------------
|
|
|
|
enrich_conninfo(InClientInfo, ConnInfo) ->
|
|
Ks = [proto_name, proto_ver, clientid, username],
|
|
maps:merge(ConnInfo, maps:with(Ks, InClientInfo)).
|
|
|
|
enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
|
|
Ks = [clientid, username, mountpoint],
|
|
NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)),
|
|
NClientInfo#{protocol => proto_name_to_protocol(ProtoName)}.
|
|
|
|
default_conninfo(ConnInfo) ->
|
|
ConnInfo#{
|
|
clean_start => true,
|
|
clientid => anonymous_clientid(),
|
|
username => undefined,
|
|
conn_props => #{},
|
|
connected => true,
|
|
proto_name => <<"exproto">>,
|
|
proto_ver => <<"1.0">>,
|
|
connected_at => erlang:system_time(millisecond),
|
|
keepalive => 0,
|
|
receive_maximum => 0,
|
|
expiry_interval => 0
|
|
}.
|
|
|
|
default_clientinfo(#{
|
|
peername := {PeerHost, _},
|
|
sockname := {_, SockPort},
|
|
clientid := ClientId
|
|
}) ->
|
|
#{
|
|
zone => default,
|
|
protocol => exproto,
|
|
peerhost => PeerHost,
|
|
sockport => SockPort,
|
|
clientid => ClientId,
|
|
username => undefined,
|
|
is_bridge => false,
|
|
is_superuser => false,
|
|
mountpoint => undefined
|
|
}.
|
|
|
|
stringfy(Reason) ->
|
|
unicode:characters_to_binary((io_lib:format("~0p", [Reason]))).
|
|
|
|
fmt_from(undefined) -> <<>>;
|
|
fmt_from(Bin) when is_binary(Bin) -> Bin;
|
|
fmt_from(T) -> stringfy(T).
|
|
|
|
proto_name_to_protocol(<<>>) ->
|
|
exproto;
|
|
proto_name_to_protocol(ProtoName) when is_binary(ProtoName) ->
|
|
binary_to_atom(ProtoName).
|
|
|
|
anonymous_clientid() ->
|
|
iolist_to_binary(["exproto-", emqx_utils:gen_id()]).
|