refactor(gw-exproto): move exproto into gateway

This commit is contained in:
JianBo He 2021-07-22 19:55:12 +08:00
parent 6bc096d22e
commit 5f47ceb118
15 changed files with 669 additions and 72 deletions

View File

@ -18,3 +18,8 @@ _build
rebar3.crashdump
*~
rebar.lock
src/exproto/emqx_exproto_pb.erl
src/exproto/emqx_exproto_v_1_connection_adapter_bhvr.erl
src/exproto/emqx_exproto_v_1_connection_adapter_client.erl
src/exproto/emqx_exproto_v_1_connection_handler_bhvr.erl
src/exproto/emqx_exproto_v_1_connection_handler_client.erl

View File

@ -71,4 +71,36 @@ emqx_gateway: {
max_conn_rate: 1000
}
}
## Extension Protocol Gateway
exproto.1: {
## The gRPC server to accept requests
server: {
bind: 9100
#ssl.keyfile:
#ssl.certfile:
#ssl.cacertfile:
}
handler: {
address: "http://127.0.0.1:9001"
#ssl.keyfile:
#ssl.certfile:
#ssl.cacertfile:
}
authenticator: allow_anonymous
listener.tcp.1: {
bind: 7993
acceptors: 8
max_connections: 10240
max_conn_rate: 1000
}
#listener.ssl.1: {}
#listener.udp.1: {}
#listener.dtls.1: {}
}
}

View File

@ -5,18 +5,26 @@
{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}}
]}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_gateway]}
{plugins, [
{grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}}
]}.
% {plugins,
% [rebar3_proper,
% {grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}}
% ]}.
{grpc,
[{protos, ["src/exproto/protos"]},
{out_dir, "src/exproto/"},
{gpb_opts, [{module_name_prefix, "emqx_"},
{module_name_suffix, "_pb"}]}
]}.
% {grpc,
% [{protos, ["priv/protos"]},
% {gpb_opts, [{module_name_prefix, "emqx_"},
% {module_name_suffix, "_pb"}]}
% ]}.
{provider_hooks,
[{pre, [{compile, {grpc, gen}},
{clean, {grpc, clean}}]}
]}.
{xref_ignores, [emqx_exproto_pb]}.
{cover_excl_mods, [emqx_exproto_pb,
emqx_exproto_v_1_connection_adapter_client,
emqx_exproto_v_1_connection_adapter_bhvr,
emqx_exproto_v_1_connection_handler_client,
emqx_exproto_v_1_connection_handler_bhvr]}.

View File

@ -3,7 +3,7 @@
{vsn, "0.1.0"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib]},
{applications, [kernel, stdlib, grpc]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},

View File

@ -45,7 +45,7 @@ load_default_gateway_applications() ->
gateway_type_searching() ->
%% FIXME: Hardcoded apps
[emqx_stomp_impl, emqx_sn_impl].
[emqx_stomp_impl, emqx_sn_impl, emqx_exproto_impl].
load(Mod) ->
try

View File

@ -33,7 +33,8 @@ structs() -> ["emqx_gateway"].
fields("emqx_gateway") ->
[{stomp, t(ref(stomp))},
{mqttsn, t(ref(mqttsn))}
{mqttsn, t(ref(mqttsn))},
{exproto, t(ref(exproto))}
];
fields(stomp) ->
@ -72,6 +73,26 @@ fields(mqttsn_predefined) ->
, {topic, t(string())}
];
fields(exproto) ->
[{"$id", t(ref(exproto_structs))}];
fields(exproto_structs) ->
[ {server, t(ref(exproto_grpc_server))}
, {handler, t(ref(exproto_grpc_handler))}
, {authenticator, t(union([allow_anonymous]))}
, {listener, t(ref(udp_tcp_listener_group))}
];
fields(exproto_grpc_server) ->
[ {bind, t(integer())}
%% TODO: ssl options
];
fields(exproto_grpc_handler) ->
[ {address, t(string())}
%% TODO: ssl
];
fields(clientinfo_override) ->
[ {username, t(string())}
, {password, t(string())}
@ -88,6 +109,13 @@ fields(tcp_listener_group) ->
, {ssl, t(ref(ssl_listener))}
];
fields(udp_tcp_listener_group) ->
[ {udp, t(ref(udp_listener))}
, {dtls, t(ref(dtls_listener))}
, {tcp, t(ref(tcp_listener))}
, {ssl, t(ref(ssl_listener))}
];
fields(tcp_listener) ->
[ {"$name", t(ref(tcp_listener_settings))}];

View File

@ -1,12 +0,0 @@
{application, emqx_exproto,
[{description, "EMQ X Extension for Protocol"},
{vsn, "4.4.0"}, %% strict semver
{modules, []},
{registered, []},
{mod, {emqx_exproto_app, []}},
{applications, [kernel,stdlib,grpc]},
{env,[]},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"}]}
]}.

View File

@ -41,6 +41,8 @@
-export_type([channel/0]).
-record(channel, {
%% Context
ctx :: emqx_gateway_ctx:context(),
%% gRPC channel options
gcli :: map(),
%% Conn info
@ -121,7 +123,9 @@ info(session, #channel{subscriptions = Subs,
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(will_msg, _) ->
undefined.
undefined;
info(ctx, #channel{ctx = Ctx}) ->
Ctx.
-spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{subscriptions = Subs}) ->
@ -145,15 +149,19 @@ init(ConnInfo = #{socktype := Socktype,
peername := Peername,
sockname := Sockname,
peercert := Peercert}, Options) ->
GRpcChann = proplists:get_value(handler, Options),
Ctx = maps:get(ctx, Options),
GRpcChann = maps:get(handler, Options),
PoolName = maps:get(pool_name, Options),
NConnInfo = default_conninfo(ConnInfo),
ClientInfo = default_clientinfo(ConnInfo),
Channel = #channel{gcli = #{channel => GRpcChann},
conninfo = NConnInfo,
clientinfo = ClientInfo,
conn_state = connecting,
timers = #{}
},
Channel = #channel{
ctx = Ctx,
gcli = #{channel => GRpcChann, pool_name => PoolName},
conninfo = NConnInfo,
clientinfo = ClientInfo,
conn_state = connecting,
timers = #{}
},
Req = #{conninfo =>
peercert(Peercert,
@ -203,12 +211,13 @@ handle_in(Data, Channel) ->
-spec(handle_deliver(list(emqx_types:deliver()), channel())
-> {ok, channel()}
| {shutdown, Reason :: term(), channel()}).
handle_deliver(Delivers, Channel = #channel{clientinfo = ClientInfo}) ->
handle_deliver(Delivers, Channel = #channel{ctx = Ctx,
clientinfo = ClientInfo}) ->
%% XXX: ?? Nack delivers from shared subscriptions
Mountpoint = maps:get(mountpoint, ClientInfo),
NodeStr = atom_to_binary(node(), utf8),
Msgs = lists:map(fun({_, _, Msg}) ->
ok = emqx_metrics:inc('messages.delivered'),
ok = metrics_inc(Ctx, 'messages.delivered'),
Msg1 = emqx_hooks:run_fold('message.delivered',
[ClientInfo], Msg),
NMsg = emqx_mountpoint:unmount(Mountpoint, Msg1),
@ -462,28 +471,35 @@ is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser
%% Ensure & Hooks
%%--------------------------------------------------------------------
ensure_connected(Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
ensure_connected(Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{conninfo = NConnInfo,
conn_state = connected
}.
ensure_disconnected(Reason, Channel = #channel{
ctx = Ctx,
conn_state = connected,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected};
ensure_disconnected(_Reason, Channel = #channel{conninfo = ConnInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
run_hooks(Name, Args) ->
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
run_hooks(Ctx, Name, Args) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run(Name, Args).
metrics_inc(Ctx, Name) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name).
%%--------------------------------------------------------------------
%% Enrich Keepalive

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The frame parser for ExProto
-module(emqx_exproto_frame).
-behavior(emqx_gateway_frame).
-export([ initial_parse_state/1
, serialize_opts/0
, parse/2
, serialize_pkt/2
, format/1
]).
initial_parse_state(_) ->
#{}.
serialize_opts() ->
#{}.
parse(Data, State) ->
{ok, Data, <<>>, State}.
serialize_pkt(Data, _Opts) ->
Data.
format(Data) ->
io_lib:format("~p", [Data]).

View File

@ -53,20 +53,21 @@ start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
?MODULE, [Pool, Id], []).
async_call(FunName, Req = #{conn := Conn}, Options) ->
cast(pick(Conn), {rpc, FunName, Req, Options, self()}).
async_call(FunName, Req = #{conn := Conn},
Options = #{pool_name := PoolName}) ->
cast(pick(PoolName, Conn), {rpc, FunName, Req, Options, self()}).
%%--------------------------------------------------------------------
%% cast, pick
%%--------------------------------------------------------------------
-compile({inline, [cast/2, pick/1]}).
-compile({inline, [cast/2, pick/2]}).
cast(Deliver, Msg) ->
gen_server:cast(Deliver, Msg).
pick(Conn) ->
gproc_pool:pick_worker(exproto_gcli_pool, Conn).
pick(PoolName, Conn) ->
gproc_pool:pick_worker(PoolName, Conn).
%%--------------------------------------------------------------------
%% gen_server callbacks

View File

@ -0,0 +1,212 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The ExProto Gateway Implement interface
-module(emqx_exproto_impl).
-behavior(emqx_gateway_impl).
%% APIs
-export([ load/0
, unload/0
]).
-export([]).
-export([ init/1
, on_insta_create/3
, on_insta_update/4
, on_insta_destroy/3
]).
-define(TCP_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true},
{backlog, 512}, {nodelay, true}]).
-define(UDP_SOCKOPTS, []).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
load() ->
RegistryOptions = [ {cbkmod, ?MODULE}
],
emqx_gateway_registry:load(exproto, RegistryOptions, []).
unload() ->
emqx_gateway_registry:unload(exproto).
init(_) ->
GwState = #{},
{ok, GwState}.
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
start_grpc_server(InstaId, Options = #{bind := ListenOn}) ->
Services = #{protos => [emqx_exproto_pb],
services => #{
'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr}
},
SvrOptions = case maps:to_list(maps:get(ssl, Options, #{})) of
[] -> [];
SslOpts ->
[{ssl_options, SslOpts}]
end,
grpc:start_server(InstaId, ListenOn, Services, SvrOptions),
io:format("Start ~s gRPC server on ~p successfully.~n",
[InstaId, ListenOn]).
start_grpc_client_channel(InstaId, Options = #{address := UriStr}) ->
UriMap = uri_string:parse(UriStr),
Scheme = maps:get(scheme, UriMap),
Host = maps:get(host, UriMap),
Port = maps:get(port, UriMap),
SvrAddr = lists:flatten(
io_lib:format(
"~s://~s:~w", [Scheme, Host, Port])
),
ClientOpts = case Scheme of
https ->
SslOpts = maps:to_list(maps:get(ssl, Options, #{})),
#{gun_opts =>
#{transport => ssl,
transport_opts => SslOpts}};
_ -> #{}
end,
grpc_client_sup:create_channel_pool(InstaId, SvrAddr, ClientOpts).
on_insta_create(_Insta = #{ id := InstaId,
rawconf := RawConf
}, Ctx, _GwState) ->
%% XXX: How to monitor it ?
%% Start grpc client pool & client channel
PoolName = pool_name(InstaId),
PoolSize = emqx_vm:schedulers() * 2,
{ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize,
{emqx_exproto_gcli, start_link, []}),
_ = start_grpc_client_channel(InstaId, maps:get(handler, RawConf)),
%% XXX: How to monitor it ?
_ = start_grpc_server(InstaId, maps:get(server, RawConf)),
NRawConf = maps:without(
[server, handler],
RawConf#{pool_name => PoolName}
),
Listeners = emqx_gateway_utils:normalize_rawconf(
NRawConf#{handler => InstaId}
),
ListenerPids = lists:map(fun(Lis) ->
start_listener(InstaId, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
catch
Class : Reason : Stk ->
logger:error("Failed to update exproto instance ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[InstaId, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_insta_destroy(_Insta = #{ id := InstaId,
rawconf := RawConf
}, _GwInstaState, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(InstaId, Lis)
end, Listeners).
pool_name(InstaId) ->
list_to_atom(lists:concat([InstaId, "_gcli_pool"])).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
io:format("Start exproto ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]),
Pid;
{error, Reason} ->
io:format(standard_error,
"Failed to start exproto ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(InstaId, Type),
NCfg = Cfg#{
ctx => Ctx,
frame_mod => emqx_exproto_frame,
chann_mod => emqx_exproto_channel
},
esockd:open(Name, ListenOn, merge_default_by_type(Type, SocketOpts),
{emqx_gateway_conn, start_link, [NCfg]}).
name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])).
merge_default_by_type(Type, Options) when Type =:= tcp;
Type =:= ssl ->
case lists:keytake(tcp_options, 1, Options) of
{value, {tcp_options, TcpOpts}, Options1} ->
[{tcp_options, emqx_misc:merge_opts(?TCP_SOCKOPTS, TcpOpts)}
| Options1];
false ->
[{tcp_options, ?TCP_SOCKOPTS} | Options]
end;
merge_default_by_type(Type, Options) when Type =:= udp;
Type =:= dtls ->
case lists:keytake(udp_options, 1, Options) of
{value, {udp_options, TcpOpts}, Options1} ->
[{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)}
| Options1];
false ->
[{udp_options, ?UDP_SOCKOPTS} | Options]
end.
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> io:format("Stop exproto ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]);
{error, Reason} ->
io:format(standard_error,
"Failed to stop exproto ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]
)
end,
StopRet.
stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(InstaId, Type),
esockd:close(Name, ListenOn).

View File

@ -0,0 +1,259 @@
//------------------------------------------------------------------------------
// Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//------------------------------------------------------------------------------
syntax = "proto3";
package emqx.exproto.v1;
// The Broker side serivce. It provides a set of APIs to
// handle a protcol access
service ConnectionAdapter {
// -- socket layer
rpc Send(SendBytesRequest) returns (CodeResponse) {};
rpc Close(CloseSocketRequest) returns (CodeResponse) {};
// -- protocol layer
rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {};
rpc StartTimer(TimerRequest) returns (CodeResponse) {};
// -- pub/sub layer
rpc Publish(PublishRequest) returns (CodeResponse) {};
rpc Subscribe(SubscribeRequest) returns (CodeResponse) {};
rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {};
}
service ConnectionHandler {
// -- socket layer
rpc OnSocketCreated(stream SocketCreatedRequest) returns (EmptySuccess) {};
rpc OnSocketClosed(stream SocketClosedRequest) returns (EmptySuccess) {};
rpc OnReceivedBytes(stream ReceivedBytesRequest) returns (EmptySuccess) {};
// -- pub/sub layer
rpc OnTimerTimeout(stream TimerTimeoutRequest) returns (EmptySuccess) {};
rpc OnReceivedMessages(stream ReceivedMessagesRequest) returns (EmptySuccess) {};
}
message EmptySuccess { }
enum ResultCode {
// Operation successfully
SUCCESS = 0;
// Unknown Error
UNKNOWN = 1;
// Connection process is not alive
CONN_PROCESS_NOT_ALIVE = 2;
// Miss the required parameter
REQUIRED_PARAMS_MISSED = 3;
// Params type or values incorrect
PARAMS_TYPE_ERROR = 4;
// No permission or Pre-conditions not fulfilled
PERMISSION_DENY = 5;
}
message CodeResponse {
ResultCode code = 1;
// The reason message if result is false
string message = 2;
}
message SendBytesRequest {
string conn = 1;
bytes bytes = 2;
}
message CloseSocketRequest {
string conn = 1;
}
message AuthenticateRequest {
string conn = 1;
ClientInfo clientinfo = 2;
string password = 3;
}
message TimerRequest {
string conn = 1;
TimerType type = 2;
uint32 interval = 3;
}
enum TimerType {
KEEPALIVE = 0;
}
message PublishRequest {
string conn = 1;
string topic = 2;
uint32 qos = 3;
bytes payload = 4;
}
message SubscribeRequest {
string conn = 1;
string topic = 2;
uint32 qos = 3;
}
message UnsubscribeRequest {
string conn = 1;
string topic = 2;
}
message SocketCreatedRequest {
string conn = 1;
ConnInfo conninfo = 2;
}
message ReceivedBytesRequest {
string conn = 1;
bytes bytes = 2;
}
message TimerTimeoutRequest {
string conn = 1;
TimerType type = 2;
}
message SocketClosedRequest {
string conn = 1;
string reason = 2;
}
message ReceivedMessagesRequest {
string conn = 1;
repeated Message messages = 2;
}
//--------------------------------------------------------------------
// Basic data types
//--------------------------------------------------------------------
message ConnInfo {
SocketType socktype = 1;
Address peername = 2;
Address sockname = 3;
CertificateInfo peercert = 4;
}
enum SocketType {
TCP = 0;
SSL = 1;
UDP = 2;
DTLS = 3;
}
message Address {
string host = 1;
uint32 port = 2;
}
message CertificateInfo {
string cn = 1;
string dn = 2;
}
message ClientInfo {
string proto_name = 1;
string proto_ver = 2;
string clientid = 3;
string username = 4;
string mountpoint = 5;
}
message Message {
string node = 1;
string id = 2;
uint32 qos = 3;
string from = 4;
string topic = 5;
bytes payload = 6;
uint64 timestamp = 7;
}

View File

@ -497,6 +497,7 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) ->
handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
Channel = #channel{
ctx = Ctx,
registry = Registry,
session = Session,
clientinfo = ClientInfo = #{clientid := ClientId}}) ->
@ -514,12 +515,12 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBACK MsgId ~w is inuse.",
[MsgId]),
ok = metrics_inc('packets.puback.inuse', Channel),
ok = metrics_inc(Ctx, 'packets.puback.inuse'),
{ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBACK MsgId ~w is not found.",
[MsgId]),
ok = metrics_inc('packets.puback.missed', Channel),
ok = metrics_inc(Ctx, 'packets.puback.missed'),
{ok, Channel}
end;
?SN_RC_INVALID_TOPIC_ID ->
@ -540,7 +541,9 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
end;
handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId),
Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
Channel = #channel{ctx = Ctx,
session = Session,
clientinfo = ClientInfo}) ->
case emqx_session:pubrec(MsgId, Session) of
{ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Channel),
@ -548,28 +551,28 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId),
handle_out(pubrel, MsgId, NChannel);
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBREC MsgId ~w is inuse.", [MsgId]),
ok = metrics_inc('packets.pubrec.inuse', Channel),
ok = metrics_inc(Ctx, 'packets.pubrec.inuse'),
handle_out(pubrel, MsgId, Channel);
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBREC ~w is not found.", [MsgId]),
ok = metrics_inc('packets.pubrec.missed', Channel),
ok = metrics_inc(Ctx, 'packets.pubrec.missed'),
handle_out(pubrel, MsgId, Channel)
end;
handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
Channel = #channel{session = Session}) ->
Channel = #channel{ctx = Ctx, session = Session}) ->
case emqx_session:pubrel(MsgId, Session) of
{ok, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(pubcomp, MsgId, NChannel);
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBREL MsgId ~w is not found.", [MsgId]),
ok = metrics_inc('packets.pubrel.missed', Channel),
ok = metrics_inc(Ctx, 'packets.pubrel.missed'),
handle_out(pubcomp, MsgId, Channel)
end;
handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
Channel = #channel{session = Session}) ->
Channel = #channel{ctx = Ctx, session = Session}) ->
case emqx_session:pubcomp(MsgId, Session) of
{ok, NSession} ->
{ok, Channel#channel{session = NSession}};
@ -577,11 +580,11 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
handle_out(publish, Publishes,
Channel#channel{session = NSession});
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
ok = metrics_inc('packets.pubcomp.inuse', Channel),
ok = metrics_inc(Ctx, 'packets.pubcomp.inuse'),
{ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBCOMP MsgId ~w is not found", [MsgId]),
ok = metrics_inc('packets.pubcomp.missed', Channel),
ok = metrics_inc(Ctx, 'packets.pubcomp.missed'),
{ok, Channel}
end;
@ -664,9 +667,8 @@ handle_in({frame_error, Reason},
?LOG(error, "Unexpected frame error: ~p", [Reason]),
shutdown(Reason, Channel).
after_message_acked(ClientInfo, Msg,
Channel = #channel{ctx = Ctx}) ->
ok = metrics_inc('messages.acked', Channel),
after_message_acked(ClientInfo, Msg, #channel{ctx = Ctx}) ->
ok = metrics_inc(Ctx, 'messages.acked'),
run_hooks_without_metrics(Ctx,
'message.acked',
[ClientInfo, emqx_message:set_header(puback_props, #{}, Msg)]).
@ -756,7 +758,7 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_1}, Channel) ->
handle_out(puback, {TopicId, MsgId, ?SN_RC_ACCEPTED}, Channel);
do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2},
Channel = #channel{session = Session}) ->
Channel = #channel{ctx = Ctx, session = Session}) ->
case emqx_session:publish(MsgId, Msg, Session) of
{ok, _PubRes, NSession} ->
NChannel1 = ensure_timer(await_timer,
@ -764,14 +766,14 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2},
),
handle_out(pubrec, MsgId, NChannel1);
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
ok = metrics_inc('packets.publish.inuse', Channel),
ok = metrics_inc(Ctx, 'packets.publish.inuse'),
%% XXX: Use PUBACK to reply a PUBLISH Error Code
handle_out(puback , {TopicId, MsgId, ?SN_RC_NOT_SUPPORTED},
Channel);
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
?LOG(warning, "Dropped the qos2 packet ~w "
"due to awaiting_rel is full.", [MsgId]),
ok = emqx_metrics:inc('packets.publish.dropped'),
ok = metrics_inc(Ctx, 'packets.publish.dropped'),
handle_out(puback, {TopicId, MsgId, ?SN_RC_CONGESTION}, Channel)
end.
@ -1022,7 +1024,7 @@ do_deliver({MsgId, Msg},
ctx = Ctx,
clientinfo = ClientInfo
= #{mountpoint := Mountpoint}}) ->
metrics_inc('messages.delivered', Channel),
metrics_inc(Ctx, 'messages.delivered'),
Msg1 = run_hooks_without_metrics(
Ctx,
'message.delivered',
@ -1197,33 +1199,36 @@ publish_will_msg(Msg) ->
-> {ok, channel()}
| {ok, replies(), channel()}.
handle_deliver(Delivers, Channel = #channel{
ctx = Ctx,
conn_state = ConnState,
session = Session,
clientinfo = #{clientid := ClientId}})
when ConnState =:= disconnected;
ConnState =:= asleep ->
NSession = emqx_session:enqueue(
ignore_local(maybe_nack(Delivers), ClientId, Session),
ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx),
Session
),
{ok, Channel#channel{session = NSession}};
handle_deliver(Delivers, Channel = #channel{
ctx = Ctx,
takeover = true,
pendings = Pendings,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
NPendings = lists:append(
Pendings,
ignore_local(maybe_nack(Delivers), ClientId, Session)
ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx)
),
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{
ctx = Ctx,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
case emqx_session:deliver(
ignore_local(Delivers, ClientId, Session),
ignore_local(Delivers, ClientId, Session, Ctx),
Session
) of
{ok, Publishes, NSession} ->
@ -1234,13 +1239,13 @@ handle_deliver(Delivers, Channel = #channel{
{ok, Channel#channel{session = NSession}}
end.
ignore_local(Delivers, Subscriber, Session) ->
ignore_local(Delivers, Subscriber, Session, Ctx) ->
Subs = emqx_session:info(subscriptions, Session),
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
case maps:find(Topic, Subs) of
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.no_local'),
ok = metrics_inc(Ctx, 'delivery.dropped'),
ok = metrics_inc(Ctx, 'delivery.dropped.no_local'),
true;
_ ->
false
@ -1413,7 +1418,7 @@ run_hooks_without_metrics(_Ctx, Name, Args) ->
run_hooks_without_metrics(_Ctx, Name, Args, Acc) ->
emqx_hooks:run_fold(Name, Args, Acc).
metrics_inc(Name, #channel{ctx = Ctx}) ->
metrics_inc(Ctx, Name) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name).
returncode_name(?SN_RC_ACCEPTED) -> accepted;