From 5f47ceb118b0689d051da2d5cefeb13b0a420aee Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 22 Jul 2021 19:55:12 +0800 Subject: [PATCH] refactor(gw-exproto): move exproto into gateway --- apps/emqx_gateway/.gitignore | 5 + apps/emqx_gateway/etc/emqx_gateway.conf | 32 +++ apps/emqx_gateway/rebar.config | 32 ++- apps/emqx_gateway/src/emqx_gateway.app.src | 2 +- apps/emqx_gateway/src/emqx_gateway_app.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_schema.erl | 30 +- .../src/exproto/emqx_exproto.app.src | 12 - .../src/exproto/emqx_exproto_channel.erl | 48 ++-- .../src/exproto/emqx_exproto_frame.erl | 43 +++ .../src/exproto/emqx_exproto_gcli.erl | 11 +- .../src/exproto/emqx_exproto_impl.erl | 212 ++++++++++++++ .../src/exproto/protos/exproto.proto | 259 ++++++++++++++++++ .../src/mqttsn/emqx_sn_channel.erl | 53 ++-- .../exproto => }/test/emqx_exproto_SUITE.erl | 0 .../test/emqx_exproto_echo_svr.erl | 0 15 files changed, 669 insertions(+), 72 deletions(-) delete mode 100644 apps/emqx_gateway/src/exproto/emqx_exproto.app.src create mode 100644 apps/emqx_gateway/src/exproto/emqx_exproto_frame.erl create mode 100644 apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl create mode 100644 apps/emqx_gateway/src/exproto/protos/exproto.proto rename apps/emqx_gateway/{src/exproto => }/test/emqx_exproto_SUITE.erl (100%) rename apps/emqx_gateway/{src/exproto => }/test/emqx_exproto_echo_svr.erl (100%) diff --git a/apps/emqx_gateway/.gitignore b/apps/emqx_gateway/.gitignore index 71ab0135c..5bff8a84d 100644 --- a/apps/emqx_gateway/.gitignore +++ b/apps/emqx_gateway/.gitignore @@ -18,3 +18,8 @@ _build rebar3.crashdump *~ rebar.lock +src/exproto/emqx_exproto_pb.erl +src/exproto/emqx_exproto_v_1_connection_adapter_bhvr.erl +src/exproto/emqx_exproto_v_1_connection_adapter_client.erl +src/exproto/emqx_exproto_v_1_connection_handler_bhvr.erl +src/exproto/emqx_exproto_v_1_connection_handler_client.erl diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index 591f2523d..55dd04526 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -71,4 +71,36 @@ emqx_gateway: { max_conn_rate: 1000 } } + + ## Extension Protocol Gateway + exproto.1: { + + ## The gRPC server to accept requests + server: { + bind: 9100 + #ssl.keyfile: + #ssl.certfile: + #ssl.cacertfile: + } + + handler: { + address: "http://127.0.0.1:9001" + #ssl.keyfile: + #ssl.certfile: + #ssl.cacertfile: + } + + authenticator: allow_anonymous + + listener.tcp.1: { + bind: 7993 + acceptors: 8 + max_connections: 10240 + max_conn_rate: 1000 + } + + #listener.ssl.1: {} + #listener.udp.1: {} + #listener.dtls.1: {} + } } diff --git a/apps/emqx_gateway/rebar.config b/apps/emqx_gateway/rebar.config index 93b3f858a..a209d9723 100644 --- a/apps/emqx_gateway/rebar.config +++ b/apps/emqx_gateway/rebar.config @@ -5,18 +5,26 @@ {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} ]}. -{shell, [ - % {config, "config/sys.config"}, - {apps, [emqx_gateway]} +{plugins, [ + {grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}} ]}. -% {plugins, -% [rebar3_proper, -% {grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}} -% ]}. +{grpc, + [{protos, ["src/exproto/protos"]}, + {out_dir, "src/exproto/"}, + {gpb_opts, [{module_name_prefix, "emqx_"}, + {module_name_suffix, "_pb"}]} +]}. -% {grpc, -% [{protos, ["priv/protos"]}, -% {gpb_opts, [{module_name_prefix, "emqx_"}, -% {module_name_suffix, "_pb"}]} -% ]}. +{provider_hooks, + [{pre, [{compile, {grpc, gen}}, + {clean, {grpc, clean}}]} +]}. + +{xref_ignores, [emqx_exproto_pb]}. + +{cover_excl_mods, [emqx_exproto_pb, + emqx_exproto_v_1_connection_adapter_client, + emqx_exproto_v_1_connection_adapter_bhvr, + emqx_exproto_v_1_connection_handler_client, + emqx_exproto_v_1_connection_handler_bhvr]}. diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src index 287d710eb..541f31a23 100644 --- a/apps/emqx_gateway/src/emqx_gateway.app.src +++ b/apps/emqx_gateway/src/emqx_gateway.app.src @@ -3,7 +3,7 @@ {vsn, "0.1.0"}, {registered, []}, {mod, {emqx_gateway_app, []}}, - {applications, [kernel, stdlib]}, + {applications, [kernel, stdlib, grpc]}, {env, []}, {modules, []}, {licenses, ["Apache 2.0"]}, diff --git a/apps/emqx_gateway/src/emqx_gateway_app.erl b/apps/emqx_gateway/src/emqx_gateway_app.erl index 3982e260b..e3ed0fbe3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_app.erl +++ b/apps/emqx_gateway/src/emqx_gateway_app.erl @@ -45,7 +45,7 @@ load_default_gateway_applications() -> gateway_type_searching() -> %% FIXME: Hardcoded apps - [emqx_stomp_impl, emqx_sn_impl]. + [emqx_stomp_impl, emqx_sn_impl, emqx_exproto_impl]. load(Mod) -> try diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 92dc834c9..2b5269758 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -33,7 +33,8 @@ structs() -> ["emqx_gateway"]. fields("emqx_gateway") -> [{stomp, t(ref(stomp))}, - {mqttsn, t(ref(mqttsn))} + {mqttsn, t(ref(mqttsn))}, + {exproto, t(ref(exproto))} ]; fields(stomp) -> @@ -72,6 +73,26 @@ fields(mqttsn_predefined) -> , {topic, t(string())} ]; +fields(exproto) -> + [{"$id", t(ref(exproto_structs))}]; + +fields(exproto_structs) -> + [ {server, t(ref(exproto_grpc_server))} + , {handler, t(ref(exproto_grpc_handler))} + , {authenticator, t(union([allow_anonymous]))} + , {listener, t(ref(udp_tcp_listener_group))} + ]; + +fields(exproto_grpc_server) -> + [ {bind, t(integer())} + %% TODO: ssl options + ]; + +fields(exproto_grpc_handler) -> + [ {address, t(string())} + %% TODO: ssl + ]; + fields(clientinfo_override) -> [ {username, t(string())} , {password, t(string())} @@ -88,6 +109,13 @@ fields(tcp_listener_group) -> , {ssl, t(ref(ssl_listener))} ]; +fields(udp_tcp_listener_group) -> + [ {udp, t(ref(udp_listener))} + , {dtls, t(ref(dtls_listener))} + , {tcp, t(ref(tcp_listener))} + , {ssl, t(ref(ssl_listener))} + ]; + fields(tcp_listener) -> [ {"$name", t(ref(tcp_listener_settings))}]; diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto.app.src b/apps/emqx_gateway/src/exproto/emqx_exproto.app.src deleted file mode 100644 index 9841f5bcb..000000000 --- a/apps/emqx_gateway/src/exproto/emqx_exproto.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqx_exproto, - [{description, "EMQ X Extension for Protocol"}, - {vsn, "4.4.0"}, %% strict semver - {modules, []}, - {registered, []}, - {mod, {emqx_exproto_app, []}}, - {applications, [kernel,stdlib,grpc]}, - {env,[]}, - {licenses, ["Apache-2.0"]}, - {maintainers, ["EMQ X Team "]}, - {links, [{"Homepage", "https://emqx.io/"}]} - ]}. diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 5978e83b7..805dfb832 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -41,6 +41,8 @@ -export_type([channel/0]). -record(channel, { + %% Context + ctx :: emqx_gateway_ctx:context(), %% gRPC channel options gcli :: map(), %% Conn info @@ -121,7 +123,9 @@ info(session, #channel{subscriptions = Subs, info(conn_state, #channel{conn_state = ConnState}) -> ConnState; info(will_msg, _) -> - undefined. + undefined; +info(ctx, #channel{ctx = Ctx}) -> + Ctx. -spec(stats(channel()) -> emqx_types:stats()). stats(#channel{subscriptions = Subs}) -> @@ -145,15 +149,19 @@ init(ConnInfo = #{socktype := Socktype, peername := Peername, sockname := Sockname, peercert := Peercert}, Options) -> - GRpcChann = proplists:get_value(handler, Options), + Ctx = maps:get(ctx, Options), + GRpcChann = maps:get(handler, Options), + PoolName = maps:get(pool_name, Options), NConnInfo = default_conninfo(ConnInfo), ClientInfo = default_clientinfo(ConnInfo), - Channel = #channel{gcli = #{channel => GRpcChann}, - conninfo = NConnInfo, - clientinfo = ClientInfo, - conn_state = connecting, - timers = #{} - }, + Channel = #channel{ + ctx = Ctx, + gcli = #{channel => GRpcChann, pool_name => PoolName}, + conninfo = NConnInfo, + clientinfo = ClientInfo, + conn_state = connecting, + timers = #{} + }, Req = #{conninfo => peercert(Peercert, @@ -203,12 +211,13 @@ handle_in(Data, Channel) -> -spec(handle_deliver(list(emqx_types:deliver()), channel()) -> {ok, channel()} | {shutdown, Reason :: term(), channel()}). -handle_deliver(Delivers, Channel = #channel{clientinfo = ClientInfo}) -> +handle_deliver(Delivers, Channel = #channel{ctx = Ctx, + clientinfo = ClientInfo}) -> %% XXX: ?? Nack delivers from shared subscriptions Mountpoint = maps:get(mountpoint, ClientInfo), NodeStr = atom_to_binary(node(), utf8), Msgs = lists:map(fun({_, _, Msg}) -> - ok = emqx_metrics:inc('messages.delivered'), + ok = metrics_inc(Ctx, 'messages.delivered'), Msg1 = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg), NMsg = emqx_mountpoint:unmount(Mountpoint, Msg1), @@ -462,28 +471,35 @@ is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser %% Ensure & Hooks %%-------------------------------------------------------------------- -ensure_connected(Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> +ensure_connected(Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, - ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), + ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = connected }. ensure_disconnected(Reason, Channel = #channel{ + ctx = Ctx, conn_state = connected, conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, - ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), + ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}; ensure_disconnected(_Reason, Channel = #channel{conninfo = ConnInfo}) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. -run_hooks(Name, Args) -> - ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). +run_hooks(Ctx, Name, Args) -> + emqx_gateway_ctx:metrics_inc(Ctx, Name), + emqx_hooks:run(Name, Args). + +metrics_inc(Ctx, Name) -> + emqx_gateway_ctx:metrics_inc(Ctx, Name). %%-------------------------------------------------------------------- %% Enrich Keepalive diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_frame.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_frame.erl new file mode 100644 index 000000000..656766b9b --- /dev/null +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_frame.erl @@ -0,0 +1,43 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The frame parser for ExProto +-module(emqx_exproto_frame). + +-behavior(emqx_gateway_frame). + +-export([ initial_parse_state/1 + , serialize_opts/0 + , parse/2 + , serialize_pkt/2 + , format/1 + ]). + +initial_parse_state(_) -> + #{}. + +serialize_opts() -> + #{}. + +parse(Data, State) -> + {ok, Data, <<>>, State}. + +serialize_pkt(Data, _Opts) -> + Data. + +format(Data) -> + io_lib:format("~p", [Data]). diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl index 650922c4b..34f0606ef 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl @@ -53,20 +53,21 @@ start_link(Pool, Id) -> gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). -async_call(FunName, Req = #{conn := Conn}, Options) -> - cast(pick(Conn), {rpc, FunName, Req, Options, self()}). +async_call(FunName, Req = #{conn := Conn}, + Options = #{pool_name := PoolName}) -> + cast(pick(PoolName, Conn), {rpc, FunName, Req, Options, self()}). %%-------------------------------------------------------------------- %% cast, pick %%-------------------------------------------------------------------- --compile({inline, [cast/2, pick/1]}). +-compile({inline, [cast/2, pick/2]}). cast(Deliver, Msg) -> gen_server:cast(Deliver, Msg). -pick(Conn) -> - gproc_pool:pick_worker(exproto_gcli_pool, Conn). +pick(PoolName, Conn) -> + gproc_pool:pick_worker(PoolName, Conn). %%-------------------------------------------------------------------- %% gen_server callbacks diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl new file mode 100644 index 000000000..92bc42716 --- /dev/null +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -0,0 +1,212 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The ExProto Gateway Implement interface +-module(emqx_exproto_impl). + +-behavior(emqx_gateway_impl). + +%% APIs +-export([ load/0 + , unload/0 + ]). + +-export([]). + +-export([ init/1 + , on_insta_create/3 + , on_insta_update/4 + , on_insta_destroy/3 + ]). + +-define(TCP_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true}, + {backlog, 512}, {nodelay, true}]). + +-define(UDP_SOCKOPTS, []). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +load() -> + RegistryOptions = [ {cbkmod, ?MODULE} + ], + emqx_gateway_registry:load(exproto, RegistryOptions, []). + +unload() -> + emqx_gateway_registry:unload(exproto). + +init(_) -> + GwState = #{}, + {ok, GwState}. + + +%%-------------------------------------------------------------------- +%% emqx_gateway_registry callbacks +%%-------------------------------------------------------------------- + +start_grpc_server(InstaId, Options = #{bind := ListenOn}) -> + Services = #{protos => [emqx_exproto_pb], + services => #{ + 'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr} + }, + SvrOptions = case maps:to_list(maps:get(ssl, Options, #{})) of + [] -> []; + SslOpts -> + [{ssl_options, SslOpts}] + end, + grpc:start_server(InstaId, ListenOn, Services, SvrOptions), + io:format("Start ~s gRPC server on ~p successfully.~n", + [InstaId, ListenOn]). + +start_grpc_client_channel(InstaId, Options = #{address := UriStr}) -> + UriMap = uri_string:parse(UriStr), + Scheme = maps:get(scheme, UriMap), + Host = maps:get(host, UriMap), + Port = maps:get(port, UriMap), + SvrAddr = lists:flatten( + io_lib:format( + "~s://~s:~w", [Scheme, Host, Port]) + ), + ClientOpts = case Scheme of + https -> + SslOpts = maps:to_list(maps:get(ssl, Options, #{})), + #{gun_opts => + #{transport => ssl, + transport_opts => SslOpts}}; + _ -> #{} + end, + grpc_client_sup:create_channel_pool(InstaId, SvrAddr, ClientOpts). + +on_insta_create(_Insta = #{ id := InstaId, + rawconf := RawConf + }, Ctx, _GwState) -> + %% XXX: How to monitor it ? + %% Start grpc client pool & client channel + PoolName = pool_name(InstaId), + PoolSize = emqx_vm:schedulers() * 2, + {ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize, + {emqx_exproto_gcli, start_link, []}), + _ = start_grpc_client_channel(InstaId, maps:get(handler, RawConf)), + + %% XXX: How to monitor it ? + _ = start_grpc_server(InstaId, maps:get(server, RawConf)), + + NRawConf = maps:without( + [server, handler], + RawConf#{pool_name => PoolName} + ), + Listeners = emqx_gateway_utils:normalize_rawconf( + NRawConf#{handler => InstaId} + ), + ListenerPids = lists:map(fun(Lis) -> + start_listener(InstaId, Ctx, Lis) + end, Listeners), + {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. + +on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> + InstaId = maps:get(id, NewInsta), + try + %% XXX: 1. How hot-upgrade the changes ??? + %% XXX: 2. Check the New confs first before destroy old instance ??? + on_insta_destroy(OldInsta, GwInstaState, GwState), + on_insta_create(NewInsta, Ctx, GwState) + catch + Class : Reason : Stk -> + logger:error("Failed to update exproto instance ~s; " + "reason: {~0p, ~0p} stacktrace: ~0p", + [InstaId, Class, Reason, Stk]), + {error, {Class, Reason}} + end. + +on_insta_destroy(_Insta = #{ id := InstaId, + rawconf := RawConf + }, _GwInstaState, _GwState) -> + Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), + lists:foreach(fun(Lis) -> + stop_listener(InstaId, Lis) + end, Listeners). + +pool_name(InstaId) -> + list_to_atom(lists:concat([InstaId, "_gcli_pool"])). + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), + case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of + {ok, Pid} -> + io:format("Start exproto ~s:~s listener on ~s successfully.~n", + [InstaId, Type, ListenOnStr]), + Pid; + {error, Reason} -> + io:format(standard_error, + "Failed to start exproto ~s:~s listener on ~s: ~0p~n", + [InstaId, Type, ListenOnStr, Reason]), + throw({badconf, Reason}) + end. + +start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(InstaId, Type), + NCfg = Cfg#{ + ctx => Ctx, + frame_mod => emqx_exproto_frame, + chann_mod => emqx_exproto_channel + }, + esockd:open(Name, ListenOn, merge_default_by_type(Type, SocketOpts), + {emqx_gateway_conn, start_link, [NCfg]}). + +name(InstaId, Type) -> + list_to_atom(lists:concat([InstaId, ":", Type])). + +merge_default_by_type(Type, Options) when Type =:= tcp; + Type =:= ssl -> + case lists:keytake(tcp_options, 1, Options) of + {value, {tcp_options, TcpOpts}, Options1} -> + [{tcp_options, emqx_misc:merge_opts(?TCP_SOCKOPTS, TcpOpts)} + | Options1]; + false -> + [{tcp_options, ?TCP_SOCKOPTS} | Options] + end; +merge_default_by_type(Type, Options) when Type =:= udp; + Type =:= dtls -> + case lists:keytake(udp_options, 1, Options) of + {value, {udp_options, TcpOpts}, Options1} -> + [{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} + | Options1]; + false -> + [{udp_options, ?UDP_SOCKOPTS} | Options] + end. + +stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg), + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), + case StopRet of + ok -> io:format("Stop exproto ~s:~s listener on ~s successfully.~n", + [InstaId, Type, ListenOnStr]); + {error, Reason} -> + io:format(standard_error, + "Failed to stop exproto ~s:~s listener on ~s: ~0p~n", + [InstaId, Type, ListenOnStr, Reason] + ) + end, + StopRet. + +stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(InstaId, Type), + esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/exproto/protos/exproto.proto b/apps/emqx_gateway/src/exproto/protos/exproto.proto new file mode 100644 index 000000000..bbc10073c --- /dev/null +++ b/apps/emqx_gateway/src/exproto/protos/exproto.proto @@ -0,0 +1,259 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//------------------------------------------------------------------------------ + +syntax = "proto3"; + +package emqx.exproto.v1; + +// The Broker side serivce. It provides a set of APIs to +// handle a protcol access +service ConnectionAdapter { + + // -- socket layer + + rpc Send(SendBytesRequest) returns (CodeResponse) {}; + + rpc Close(CloseSocketRequest) returns (CodeResponse) {}; + + // -- protocol layer + + rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {}; + + rpc StartTimer(TimerRequest) returns (CodeResponse) {}; + + // -- pub/sub layer + + rpc Publish(PublishRequest) returns (CodeResponse) {}; + + rpc Subscribe(SubscribeRequest) returns (CodeResponse) {}; + + rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {}; +} + +service ConnectionHandler { + + // -- socket layer + + rpc OnSocketCreated(stream SocketCreatedRequest) returns (EmptySuccess) {}; + + rpc OnSocketClosed(stream SocketClosedRequest) returns (EmptySuccess) {}; + + rpc OnReceivedBytes(stream ReceivedBytesRequest) returns (EmptySuccess) {}; + + // -- pub/sub layer + + rpc OnTimerTimeout(stream TimerTimeoutRequest) returns (EmptySuccess) {}; + + rpc OnReceivedMessages(stream ReceivedMessagesRequest) returns (EmptySuccess) {}; +} + +message EmptySuccess { } + +enum ResultCode { + + // Operation successfully + SUCCESS = 0; + + // Unknown Error + UNKNOWN = 1; + + // Connection process is not alive + CONN_PROCESS_NOT_ALIVE = 2; + + // Miss the required parameter + REQUIRED_PARAMS_MISSED = 3; + + // Params type or values incorrect + PARAMS_TYPE_ERROR = 4; + + // No permission or Pre-conditions not fulfilled + PERMISSION_DENY = 5; +} + +message CodeResponse { + + ResultCode code = 1; + + // The reason message if result is false + string message = 2; +} + +message SendBytesRequest { + + string conn = 1; + + bytes bytes = 2; +} + +message CloseSocketRequest { + + string conn = 1; +} + +message AuthenticateRequest { + + string conn = 1; + + ClientInfo clientinfo = 2; + + string password = 3; +} + +message TimerRequest { + + string conn = 1; + + TimerType type = 2; + + uint32 interval = 3; +} + +enum TimerType { + + KEEPALIVE = 0; +} + +message PublishRequest { + + string conn = 1; + + string topic = 2; + + uint32 qos = 3; + + bytes payload = 4; +} + +message SubscribeRequest { + + string conn = 1; + + string topic = 2; + + uint32 qos = 3; +} + +message UnsubscribeRequest { + + string conn = 1; + + string topic = 2; +} + +message SocketCreatedRequest { + + string conn = 1; + + ConnInfo conninfo = 2; +} + +message ReceivedBytesRequest { + + string conn = 1; + + bytes bytes = 2; +} + +message TimerTimeoutRequest { + + string conn = 1; + + TimerType type = 2; +} + +message SocketClosedRequest { + + string conn = 1; + + string reason = 2; +} + +message ReceivedMessagesRequest { + + string conn = 1; + + repeated Message messages = 2; +} + +//-------------------------------------------------------------------- +// Basic data types +//-------------------------------------------------------------------- + +message ConnInfo { + + SocketType socktype = 1; + + Address peername = 2; + + Address sockname = 3; + + CertificateInfo peercert = 4; +} + +enum SocketType { + + TCP = 0; + + SSL = 1; + + UDP = 2; + + DTLS = 3; +} + +message Address { + + string host = 1; + + uint32 port = 2; +} + +message CertificateInfo { + + string cn = 1; + + string dn = 2; +} + +message ClientInfo { + + string proto_name = 1; + + string proto_ver = 2; + + string clientid = 3; + + string username = 4; + + string mountpoint = 5; +} + +message Message { + + string node = 1; + + string id = 2; + + uint32 qos = 3; + + string from = 4; + + string topic = 5; + + bytes payload = 6; + + uint64 timestamp = 7; +} diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 0ebbc5195..af4143436 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -497,6 +497,7 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) -> handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), Channel = #channel{ + ctx = Ctx, registry = Registry, session = Session, clientinfo = ClientInfo = #{clientid := ClientId}}) -> @@ -514,12 +515,12 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBACK MsgId ~w is inuse.", [MsgId]), - ok = metrics_inc('packets.puback.inuse', Channel), + ok = metrics_inc(Ctx, 'packets.puback.inuse'), {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBACK MsgId ~w is not found.", [MsgId]), - ok = metrics_inc('packets.puback.missed', Channel), + ok = metrics_inc(Ctx, 'packets.puback.missed'), {ok, Channel} end; ?SN_RC_INVALID_TOPIC_ID -> @@ -540,7 +541,9 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), end; handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId), - Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + Channel = #channel{ctx = Ctx, + session = Session, + clientinfo = ClientInfo}) -> case emqx_session:pubrec(MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), @@ -548,28 +551,28 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId), handle_out(pubrel, MsgId, NChannel); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBREC MsgId ~w is inuse.", [MsgId]), - ok = metrics_inc('packets.pubrec.inuse', Channel), + ok = metrics_inc(Ctx, 'packets.pubrec.inuse'), handle_out(pubrel, MsgId, Channel); {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBREC ~w is not found.", [MsgId]), - ok = metrics_inc('packets.pubrec.missed', Channel), + ok = metrics_inc(Ctx, 'packets.pubrec.missed'), handle_out(pubrel, MsgId, Channel) end; handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId), - Channel = #channel{session = Session}) -> + Channel = #channel{ctx = Ctx, session = Session}) -> case emqx_session:pubrel(MsgId, Session) of {ok, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, MsgId, NChannel); {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBREL MsgId ~w is not found.", [MsgId]), - ok = metrics_inc('packets.pubrel.missed', Channel), + ok = metrics_inc(Ctx, 'packets.pubrel.missed'), handle_out(pubcomp, MsgId, Channel) end; handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId), - Channel = #channel{session = Session}) -> + Channel = #channel{ctx = Ctx, session = Session}) -> case emqx_session:pubcomp(MsgId, Session) of {ok, NSession} -> {ok, Channel#channel{session = NSession}}; @@ -577,11 +580,11 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId), handle_out(publish, Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> - ok = metrics_inc('packets.pubcomp.inuse', Channel), + ok = metrics_inc(Ctx, 'packets.pubcomp.inuse'), {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?LOG(warning, "The PUBCOMP MsgId ~w is not found", [MsgId]), - ok = metrics_inc('packets.pubcomp.missed', Channel), + ok = metrics_inc(Ctx, 'packets.pubcomp.missed'), {ok, Channel} end; @@ -664,9 +667,8 @@ handle_in({frame_error, Reason}, ?LOG(error, "Unexpected frame error: ~p", [Reason]), shutdown(Reason, Channel). -after_message_acked(ClientInfo, Msg, - Channel = #channel{ctx = Ctx}) -> - ok = metrics_inc('messages.acked', Channel), +after_message_acked(ClientInfo, Msg, #channel{ctx = Ctx}) -> + ok = metrics_inc(Ctx, 'messages.acked'), run_hooks_without_metrics(Ctx, 'message.acked', [ClientInfo, emqx_message:set_header(puback_props, #{}, Msg)]). @@ -756,7 +758,7 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_1}, Channel) -> handle_out(puback, {TopicId, MsgId, ?SN_RC_ACCEPTED}, Channel); do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2}, - Channel = #channel{session = Session}) -> + Channel = #channel{ctx = Ctx, session = Session}) -> case emqx_session:publish(MsgId, Msg, Session) of {ok, _PubRes, NSession} -> NChannel1 = ensure_timer(await_timer, @@ -764,14 +766,14 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2}, ), handle_out(pubrec, MsgId, NChannel1); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> - ok = metrics_inc('packets.publish.inuse', Channel), + ok = metrics_inc(Ctx, 'packets.publish.inuse'), %% XXX: Use PUBACK to reply a PUBLISH Error Code handle_out(puback , {TopicId, MsgId, ?SN_RC_NOT_SUPPORTED}, Channel); {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> ?LOG(warning, "Dropped the qos2 packet ~w " "due to awaiting_rel is full.", [MsgId]), - ok = emqx_metrics:inc('packets.publish.dropped'), + ok = metrics_inc(Ctx, 'packets.publish.dropped'), handle_out(puback, {TopicId, MsgId, ?SN_RC_CONGESTION}, Channel) end. @@ -1022,7 +1024,7 @@ do_deliver({MsgId, Msg}, ctx = Ctx, clientinfo = ClientInfo = #{mountpoint := Mountpoint}}) -> - metrics_inc('messages.delivered', Channel), + metrics_inc(Ctx, 'messages.delivered'), Msg1 = run_hooks_without_metrics( Ctx, 'message.delivered', @@ -1197,33 +1199,36 @@ publish_will_msg(Msg) -> -> {ok, channel()} | {ok, replies(), channel()}. handle_deliver(Delivers, Channel = #channel{ + ctx = Ctx, conn_state = ConnState, session = Session, clientinfo = #{clientid := ClientId}}) when ConnState =:= disconnected; ConnState =:= asleep -> NSession = emqx_session:enqueue( - ignore_local(maybe_nack(Delivers), ClientId, Session), + ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx), Session ), {ok, Channel#channel{session = NSession}}; handle_deliver(Delivers, Channel = #channel{ + ctx = Ctx, takeover = true, pendings = Pendings, session = Session, clientinfo = #{clientid := ClientId}}) -> NPendings = lists:append( Pendings, - ignore_local(maybe_nack(Delivers), ClientId, Session) + ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx) ), {ok, Channel#channel{pendings = NPendings}}; handle_deliver(Delivers, Channel = #channel{ + ctx = Ctx, session = Session, clientinfo = #{clientid := ClientId}}) -> case emqx_session:deliver( - ignore_local(Delivers, ClientId, Session), + ignore_local(Delivers, ClientId, Session, Ctx), Session ) of {ok, Publishes, NSession} -> @@ -1234,13 +1239,13 @@ handle_deliver(Delivers, Channel = #channel{ {ok, Channel#channel{session = NSession}} end. -ignore_local(Delivers, Subscriber, Session) -> +ignore_local(Delivers, Subscriber, Session, Ctx) -> Subs = emqx_session:info(subscriptions, Session), lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) -> case maps:find(Topic, Subs) of {ok, #{nl := 1}} when Subscriber =:= Publisher -> - ok = emqx_metrics:inc('delivery.dropped'), - ok = emqx_metrics:inc('delivery.dropped.no_local'), + ok = metrics_inc(Ctx, 'delivery.dropped'), + ok = metrics_inc(Ctx, 'delivery.dropped.no_local'), true; _ -> false @@ -1413,7 +1418,7 @@ run_hooks_without_metrics(_Ctx, Name, Args) -> run_hooks_without_metrics(_Ctx, Name, Args, Acc) -> emqx_hooks:run_fold(Name, Args, Acc). -metrics_inc(Name, #channel{ctx = Ctx}) -> +metrics_inc(Ctx, Name) -> emqx_gateway_ctx:metrics_inc(Ctx, Name). returncode_name(?SN_RC_ACCEPTED) -> accepted; diff --git a/apps/emqx_gateway/src/exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl similarity index 100% rename from apps/emqx_gateway/src/exproto/test/emqx_exproto_SUITE.erl rename to apps/emqx_gateway/test/emqx_exproto_SUITE.erl diff --git a/apps/emqx_gateway/src/exproto/test/emqx_exproto_echo_svr.erl b/apps/emqx_gateway/test/emqx_exproto_echo_svr.erl similarity index 100% rename from apps/emqx_gateway/src/exproto/test/emqx_exproto_echo_svr.erl rename to apps/emqx_gateway/test/emqx_exproto_echo_svr.erl