emqx/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl

811 lines
26 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exproto_channel).
-include("emqx_exproto.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-export([
info/1,
info/2,
stats/1
]).
-export([
init/2,
handle_in/2,
handle_deliver/2,
handle_timeout/3,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).
-export_type([channel/0]).
-record(channel, {
%% Context
ctx :: emqx_gateway_ctx:context(),
%% gRPC channel options
gcli :: emqx_exproto_gcli:grpc_client_state(),
%% Conn info
conninfo :: emqx_types:conninfo(),
%% Client info from `register` function
clientinfo :: maybe(map()),
%% Connection state
conn_state :: conn_state(),
%% Subscription
subscriptions = #{},
%% Keepalive
keepalive :: maybe(emqx_keepalive:keepalive()),
%% Timers
timers :: #{atom() => disabled | maybe(reference())},
%% Closed reason
closed_reason = undefined
}).
-opaque channel() :: #channel{}.
-type conn_state() :: idle | connecting | connected | disconnected.
-type reply() ::
{outgoing, binary()}
| {outgoing, [binary()]}
| {close, Reason :: atom()}.
-type replies() :: emqx_types:packet() | reply() | [reply()].
-define(TIMER_TABLE, #{
alive_timer => keepalive,
force_timer => force_close,
idle_timer => force_close_idle
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
%%--------------------------------------------------------------------
%% Info, Attrs and Caps
%%--------------------------------------------------------------------
%% @doc Get infos of the channel.
-spec info(channel()) -> emqx_types:infos().
info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)).
-spec info(list(atom()) | atom(), channel()) -> term().
info(Keys, Channel) when is_list(Keys) ->
[{Key, info(Key, Channel)} || Key <- Keys];
info(conninfo, #channel{conninfo = ConnInfo}) ->
ConnInfo;
info(clientid, #channel{clientinfo = ClientInfo}) ->
maps:get(clientid, ClientInfo, undefined);
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{
subscriptions = Subs,
conninfo = ConnInfo
}) ->
#{
subscriptions => Subs,
upgrade_qos => false,
retry_interval => 0,
await_rel_timeout => 0,
created_at => maps:get(connected_at, ConnInfo)
};
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(will_msg, _) ->
undefined;
info(ctx, #channel{ctx = Ctx}) ->
Ctx.
-spec stats(channel()) -> emqx_types:stats().
stats(#channel{subscriptions = Subs}) ->
[
{subscriptions_cnt, maps:size(Subs)},
{subscriptions_max, 0},
{inflight_cnt, 0},
{inflight_max, 0},
{mqueue_len, 0},
{mqueue_max, 0},
{mqueue_dropped, 0},
{next_pkt_id, 0},
{awaiting_rel_cnt, 0},
{awaiting_rel_max, 0}
].
%%--------------------------------------------------------------------
%% Init the channel
%%--------------------------------------------------------------------
-spec init(emqx_exproto_types:conninfo(), map()) -> channel().
init(
ConnInfo = #{
socktype := Socktype,
peername := Peername,
sockname := Sockname,
peercert := Peercert
},
Options
) ->
GRpcChann = maps:get(grpc_client_channel, Options),
ServiceName = maps:get(grpc_client_service_name, Options),
GRpcClient = emqx_exproto_gcli:init(ServiceName, #{channel => GRpcChann}),
Ctx = maps:get(ctx, Options),
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}),
ListenerId =
case maps:get(listener, Options, undefined) of
undefined -> undefined;
{GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName)
end,
EnableAuthn = maps:get(enable_authn, Options, true),
DefaultClientInfo = default_clientinfo(NConnInfo),
ClientInfo = DefaultClientInfo#{
listener => ListenerId,
enable_authn => EnableAuthn
},
Channel = #channel{
ctx = Ctx,
gcli = GRpcClient,
conninfo = NConnInfo,
clientinfo = ClientInfo,
conn_state = connecting,
timers = #{}
},
Req = #{
conninfo =>
peercert(
Peercert,
#{
socktype => socktype(Socktype),
peername => address(Peername),
sockname => address(Sockname)
}
)
},
dispatch(on_socket_created, Req, start_idle_checking_timer(Channel)).
%% @private
peercert(NoSsl, ConnInfo) when
NoSsl == nossl;
NoSsl == undefined
->
ConnInfo;
peercert(Peercert, ConnInfo) ->
Fn = fun(_, V) -> V =/= undefined end,
Infos = maps:filter(
Fn,
#{
cn => esockd_peercert:common_name(Peercert),
dn => esockd_peercert:subject(Peercert)
}
),
case maps:size(Infos) of
0 ->
ConnInfo;
_ ->
ConnInfo#{peercert => Infos}
end.
%% @private
socktype(tcp) -> 'TCP';
socktype(ssl) -> 'SSL';
socktype(udp) -> 'UDP';
socktype(dtls) -> 'DTLS'.
%% @private
address({Host, Port}) ->
#{host => inet:ntoa(Host), port => Port}.
%% avoid udp connection process leak
start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) ->
ensure_timer(idle_timer, Channel);
start_idle_checking_timer(Channel) ->
Channel.
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
-spec handle_in(binary(), channel()) ->
{ok, channel()}
| {shutdown, Reason :: term(), channel()}.
handle_in(Data, Channel) ->
Req = #{bytes => Data},
{ok, dispatch(on_received_bytes, Req, Channel)}.
-spec handle_deliver(list(emqx_types:deliver()), channel()) ->
{ok, channel()}
| {shutdown, Reason :: term(), channel()}.
handle_deliver(
Delivers,
Channel = #channel{
ctx = Ctx,
clientinfo = ClientInfo
}
) ->
%% XXX: ?? Nack delivers from shared subscriptions
Mountpoint = maps:get(mountpoint, ClientInfo),
NodeStr = atom_to_binary(node(), utf8),
Msgs = lists:map(
fun({_, _, Msg}) ->
ok = metrics_inc(Ctx, 'messages.delivered'),
Msg1 = emqx_hooks:run_fold(
'message.delivered',
[ClientInfo],
Msg
),
NMsg = emqx_mountpoint:unmount(Mountpoint, Msg1),
#{
node => NodeStr,
id => emqx_guid:to_hexstr(emqx_message:id(NMsg)),
qos => emqx_message:qos(NMsg),
from => fmt_from(emqx_message:from(NMsg)),
topic => emqx_message:topic(NMsg),
payload => emqx_message:payload(NMsg),
timestamp => emqx_message:timestamp(NMsg)
}
end,
Delivers
),
Req = #{messages => Msgs},
{ok, dispatch(on_received_messages, Req, Channel)}.
-spec handle_timeout(reference(), Msg :: term(), channel()) ->
{ok, channel()}
| {shutdown, Reason :: term(), channel()}.
handle_timeout(
_TRef,
{keepalive, _StatVal},
Channel = #channel{keepalive = undefined}
) ->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, StatVal},
Channel = #channel{keepalive = Keepalive}
) ->
case emqx_keepalive:check(StatVal, Keepalive) of
{ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(alive_timer, NChannel)};
{error, timeout} ->
Req = #{type => 'KEEPALIVE'},
NChannel = remove_timer_ref(alive_timer, Channel),
%% close connection if keepalive timeout
Replies = [{event, disconnected}, {close, keepalive_timeout}],
NChannel1 = dispatch(on_timer_timeout, Req, NChannel#channel{
closed_reason = keepalive_timeout
}),
{ok, Replies, NChannel1}
end;
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
{shutdown, Reason, Channel};
handle_timeout(_TRef, force_close_idle, Channel) ->
{shutdown, idle_timeout, Channel};
handle_timeout(_TRef, Msg, Channel) ->
?SLOG(warning, #{
msg => "unexpected_timeout_signal",
signal => Msg
}),
{ok, Channel}.
-spec handle_call(Req :: any(), From :: any(), channel()) ->
{reply, Reply :: term(), channel()}
| {reply, Reply :: term(), replies(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()}.
handle_call({send, Data}, _From, Channel) ->
{reply, ok, [{outgoing, Data}], Channel};
handle_call(close, _From, Channel = #channel{conn_state = connected}) ->
{reply, ok, [{event, disconnected}, {close, normal}], Channel};
handle_call(close, _From, Channel) ->
{reply, ok, [{close, normal}], Channel};
handle_call(
{auth, ClientInfo, _Password},
_From,
Channel = #channel{conn_state = connected}
) ->
?SLOG(warning, #{
msg => "ingore_duplicated_authenticate_command",
request_clientinfo => ClientInfo
}),
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
handle_call(
{auth, ClientInfo, _Password},
_From,
Channel = #channel{conn_state = disconnected}
) ->
?SLOG(warning, #{
msg => "authenticate_command_after_socket_disconnected",
request_clientinfo => ClientInfo
}),
{reply, {error, ?RESP_PERMISSION_DENY, <<"Client socket disconnected">>}, Channel};
handle_call(
{auth, ClientInfo0, Password},
_From,
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
ConnInfo1 = enrich_conninfo(ClientInfo0, ConnInfo),
Channel1 = Channel#channel{
conninfo = ConnInfo1,
clientinfo = ClientInfo1
},
#{clientid := ClientId, username := Username} = ClientInfo1,
case
emqx_gateway_ctx:authenticate(
Ctx, ClientInfo1#{password => Password}
)
of
{ok, NClientInfo} ->
SessFun = fun(_, _) -> #{} end,
emqx_logger:set_metadata_clientid(ClientId),
case
emqx_gateway_ctx:open_session(
Ctx,
true,
NClientInfo,
ConnInfo1,
SessFun
)
of
{ok, _Session} ->
?SLOG(debug, #{
msg => "client_login_succeed",
clientid => ClientId,
username => Username
}),
{reply, ok, [{event, connected}],
ensure_connected(Channel1#channel{clientinfo = NClientInfo})};
{error, Reason} ->
?SLOG(warning, #{
msg => "client_login_failed",
clientid => ClientId,
username => Username,
reason => Reason
}),
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
end;
{error, Reason} ->
?SLOG(warning, #{
msg => "client_login_failed",
clientid => ClientId,
username => Username,
reason => Reason
}),
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
end;
handle_call(
{start_timer, keepalive, Interval},
_From,
Channel = #channel{
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{keepalive => Interval},
NClientInfo = ClientInfo#{keepalive => Interval},
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
{reply, ok, [{event, updated}], ensure_keepalive(cancel_timer(idle_timer, NChannel))};
handle_call(
{subscribe_from_client, TopicFilter, Qos},
_From,
Channel = #channel{
ctx = Ctx,
conn_state = connected,
clientinfo = ClientInfo
}
) ->
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicFilter) of
deny ->
{reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
_ ->
{ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
{reply, ok, [{event, updated}], NChannel}
end;
handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
{ok, [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel),
{reply, {ok, {NTopicFilter, NSubOpts}}, [{event, updated}], NChannel};
handle_call(
{unsubscribe_from_client, TopicFilter},
_From,
Channel = #channel{conn_state = connected}
) ->
{ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
{reply, ok, [{event, updated}], NChannel};
handle_call({unsubscribe, Topic}, _From, Channel) ->
{ok, NChannel} = do_unsubscribe([Topic], Channel),
{reply, ok, [{event, update}], NChannel};
handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) ->
{reply, {ok, maps:to_list(Subs)}, Channel};
handle_call(
{publish, Topic, Qos, Payload},
_From,
Channel = #channel{
ctx = Ctx,
conn_state = connected,
clientinfo =
ClientInfo =
#{
clientid := From,
mountpoint := Mountpoint
}
}
) ->
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
deny ->
{reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
_ ->
Msg = emqx_message:make(From, Qos, Topic, Payload),
NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
_ = emqx:publish(NMsg),
{reply, ok, Channel}
end;
handle_call(kick, _From, Channel) ->
{reply, ok, [{event, disconnected}, {close, kicked}], Channel};
handle_call(discard, _From, Channel) ->
{shutdown, discarded, ok, Channel};
handle_call(
Req,
_From,
Channel = #channel{
conn_state = ConnState,
clientinfo = ClientInfo,
closed_reason = ClosedReason
}
) ->
?SLOG(warning, #{
msg => "unexpected_call",
call => Req,
conn_state => ConnState,
clientid => maps:get(clientid, ClientInfo, undefined),
closed_reason => ClosedReason
}),
{reply, {error, unexpected_call}, Channel}.
-spec handle_cast(any(), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}.
handle_cast(Req, Channel) ->
?SLOG(warning, #{
msg => "unexpected_call",
call => Req
}),
{ok, Channel}.
-spec handle_info(any(), channel()) ->
{ok, channel()}
| {shutdown, Reason :: term(), channel()}.
handle_info(
{sock_closed, Reason},
Channel = #channel{gcli = GClient, closed_reason = ClosedReason}
) ->
case emqx_exproto_gcli:is_empty(GClient) of
true ->
Channel1 = ensure_disconnected(Reason, Channel),
{shutdown, Reason, Channel1};
_ ->
%% delayed close process for flushing all callback funcs to gRPC server
Channel1 =
case ClosedReason of
undefined ->
Channel#channel{closed_reason = Reason};
_ ->
Channel
end,
Channel2 = ensure_timer(force_timer, Channel1),
{ok, ensure_disconnected(Reason, Channel2)}
end;
handle_info(
{hreply, FunName, Result},
Channel0 = #channel{gcli = GClient0, timers = Timers}
) when
FunName =:= on_socket_created;
FunName =:= on_socket_closed;
FunName =:= on_received_bytes;
FunName =:= on_received_messages;
FunName =:= on_timer_timeout
->
GClient = emqx_exproto_gcli:ack(FunName, GClient0),
Channel = Channel0#channel{gcli = GClient},
ShutdownNow =
emqx_exproto_gcli:is_empty(GClient) andalso
maps:get(force_timer, Timers, undefined) =/= undefined,
case Result of
ok when not ShutdownNow ->
GClient1 = emqx_exproto_gcli:maybe_shoot(GClient),
{ok, Channel#channel{gcli = GClient1}};
ok when ShutdownNow ->
Channel1 = cancel_timer(force_timer, Channel),
{shutdown, Channel1#channel.closed_reason, Channel1};
{error, Reason} ->
{shutdown, {error, {FunName, Reason}}, Channel}
end;
handle_info({subscribe, _}, Channel) ->
{ok, Channel};
handle_info(Info, Channel) ->
?SLOG(warning, #{
msg => "unexpected_info",
info => Info
}),
{ok, Channel}.
-spec terminate(any(), channel()) -> channel().
terminate(Reason, Channel) ->
Req = #{reason => stringfy(Reason)},
%% XXX: close streams?
dispatch(on_socket_closed, Req, Channel).
%%--------------------------------------------------------------------
%% Sub/UnSub
%%--------------------------------------------------------------------
do_subscribe(TopicFilters, Channel) ->
{MadeSubs, NChannel} = lists:foldl(
fun({TopicFilter, SubOpts}, {MadeSubs, ChannelAcc}) ->
{Sub, Channel1} = do_subscribe(TopicFilter, SubOpts, ChannelAcc),
{MadeSubs ++ [Sub], Channel1}
end,
{[], Channel},
parse_topic_filters(TopicFilters)
),
{ok, MadeSubs, NChannel}.
%% @private
do_subscribe(
TopicFilter,
SubOpts,
Channel =
#channel{
clientinfo = ClientInfo = #{mountpoint := Mountpoint},
subscriptions = Subs
}
) ->
%% Mountpoint first
NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter),
NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts),
SubId = maps:get(clientid, ClientInfo, undefined),
%% XXX: is_new?
IsNew = not maps:is_key(NTopicFilter, Subs),
case IsNew of
true ->
ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
ok = emqx_hooks:run(
'session.subscribed',
[ClientInfo, NTopicFilter, NSubOpts#{is_new => IsNew}]
),
{{NTopicFilter, NSubOpts}, Channel#channel{
subscriptions = Subs#{NTopicFilter => NSubOpts}
}};
_ ->
%% Update subopts
ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
{{NTopicFilter, NSubOpts}, Channel#channel{
subscriptions = Subs#{NTopicFilter => NSubOpts}
}}
end.
do_unsubscribe(TopicFilters, Channel) ->
NChannel = lists:foldl(
fun({TopicFilter, SubOpts}, ChannelAcc) ->
do_unsubscribe(TopicFilter, SubOpts, ChannelAcc)
end,
Channel,
parse_topic_filters(TopicFilters)
),
{ok, NChannel}.
%% @private
do_unsubscribe(
TopicFilter,
UnSubOpts,
Channel =
#channel{
clientinfo = ClientInfo = #{mountpoint := Mountpoint},
subscriptions = Subs
}
) ->
NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter),
case maps:find(NTopicFilter, Subs) of
{ok, SubOpts} ->
ok = emqx:unsubscribe(NTopicFilter),
ok = emqx_hooks:run(
'session.unsubscribed',
[ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]
),
Channel#channel{subscriptions = maps:remove(NTopicFilter, Subs)};
_ ->
Channel
end.
%% @private
parse_topic_filters(TopicFilters) ->
lists:map(fun emqx_topic:parse/1, TopicFilters).
%%--------------------------------------------------------------------
%% Ensure & Hooks
%%--------------------------------------------------------------------
ensure_connected(
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
ensure_disconnected(
Reason,
Channel = #channel{
ctx = Ctx,
conn_state = connected,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected};
ensure_disconnected(_Reason, Channel = #channel{conninfo = ConnInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
run_hooks(Ctx, Name, Args) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run(Name, Args).
metrics_inc(Ctx, Name) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name).
%%--------------------------------------------------------------------
%% Enrich Keepalive
ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) ->
ensure_keepalive_timer(maps:get(keepalive, ClientInfo, 0), Channel).
ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
Channel;
ensure_keepalive_timer(Interval, Channel) ->
StatVal = emqx_gateway_conn:keepalive_stats(recv),
Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
TRef = maps:get(Name, Timers, undefined),
Time = interval(Name, Channel),
case TRef == undefined andalso Time > 0 of
true -> ensure_timer(Name, Time, Channel);
%% Timer disabled or exists
false -> Channel
end.
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
ensure_timer(Name, remove_timer_ref(Name, Channel)).
cancel_timer(Name, Channel = #channel{timers = Timers}) ->
emqx_utils:cancel_timer(maps:get(Name, Timers, undefined)),
remove_timer_ref(Name, Channel).
remove_timer_ref(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
IdleTimeout;
interval(force_timer, _) ->
15000;
interval(alive_timer, #channel{keepalive = Keepalive}) ->
emqx_keepalive:info(interval, Keepalive).
%%--------------------------------------------------------------------
%% Dispatch
%%--------------------------------------------------------------------
dispatch(FunName, Req, Channel = #channel{gcli = GClient}) ->
Req1 = Req#{conn => base64:encode(term_to_binary(self()))},
NGClient = emqx_exproto_gcli:maybe_shoot(FunName, Req1, GClient),
Channel#channel{gcli = NGClient}.
%%--------------------------------------------------------------------
%% Format
%%--------------------------------------------------------------------
enrich_conninfo(InClientInfo, ConnInfo) ->
Ks = [proto_name, proto_ver, clientid, username],
maps:merge(ConnInfo, maps:with(Ks, InClientInfo)).
enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
Ks = [clientid, username, mountpoint],
NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)),
NClientInfo#{protocol => proto_name_to_protocol(ProtoName)}.
default_conninfo(ConnInfo) ->
ConnInfo#{
clean_start => true,
clientid => anonymous_clientid(),
username => undefined,
conn_props => #{},
connected => true,
proto_name => <<"exproto">>,
proto_ver => <<"1.0">>,
connected_at => erlang:system_time(millisecond),
keepalive => 0,
receive_maximum => 0,
expiry_interval => 0
}.
default_clientinfo(#{
peername := {PeerHost, _},
sockname := {_, SockPort},
clientid := ClientId
}) ->
#{
zone => default,
protocol => exproto,
peerhost => PeerHost,
sockport => SockPort,
clientid => ClientId,
username => undefined,
is_bridge => false,
is_superuser => false,
mountpoint => undefined
}.
stringfy(Reason) ->
unicode:characters_to_binary((io_lib:format("~0p", [Reason]))).
fmt_from(undefined) -> <<>>;
fmt_from(Bin) when is_binary(Bin) -> Bin;
fmt_from(T) -> stringfy(T).
proto_name_to_protocol(<<>>) ->
exproto;
proto_name_to_protocol(ProtoName) when is_binary(ProtoName) ->
binary_to_atom(ProtoName).
anonymous_clientid() ->
iolist_to_binary(["exproto-", emqx_utils:gen_id()]).