From 0e3728c9403f47b3e16b2779981c6ba1a351a9d4 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 26 Aug 2018 16:24:51 +0800 Subject: [PATCH] Add emqx_types module and 'credentials' type --- include/emqx.hrl | 20 +++-- src/emqx_access_control.erl | 152 ++++++++++++++++++------------------ src/emqx_access_rule.erl | 87 +++++++++++---------- src/emqx_acl_internal.erl | 49 ++++++------ src/emqx_acl_mod.erl | 8 +- src/emqx_auth_mod.erl | 10 +-- src/emqx_types.erl | 38 +++++++++ src/emqx_ws_connection.erl | 1 + 8 files changed, 201 insertions(+), 164 deletions(-) create mode 100644 src/emqx_types.erl diff --git a/include/emqx.hrl b/include/emqx.hrl index 0372f547e..022660287 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -70,18 +70,26 @@ -type(topic_table() :: [{topic(), subopts()}]). %%-------------------------------------------------------------------- -%% Client and Session +%% Zone, Credentials, Client and Session %%-------------------------------------------------------------------- --type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()). - --type(peername() :: {inet:ip_address(), inet:port_number()}). +-type(zone() :: atom()). -type(client_id() :: binary() | atom()). --type(username() :: binary() | atom()). +-type(username() :: binary() | undefined). --type(zone() :: atom()). +-type(password() :: binary() | undefined). + +-type(peername() :: {inet:ip_address(), inet:port_number()}). + +-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()). + +-type(credentials() :: #{client_id := binary(), + username := binary(), + peername := peername(), + zone => zone(), + atom() => term()}). -record(client, { id :: client_id(), diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index f43309088..1577ca122 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -20,9 +20,9 @@ -export([start_link/0]). -export([authenticate/2]). --export([check_acl/3, reload_acl/0, lookup_mods/1]). --export([clean_acl_cache/1, clean_acl_cache/2]). +-export([check_acl/3, reload_acl/0]). -export([register_mod/3, register_mod/4, unregister_mod/2]). +-export([lookup_mods/1]). -export([stop/0]). %% gen_server callbacks @@ -32,8 +32,6 @@ -define(TAB, ?MODULE). -define(SERVER, ?MODULE). --type(password() :: undefined | binary()). - -record(state, {}). %%------------------------------------------------------------------------------ @@ -41,81 +39,88 @@ %%------------------------------------------------------------------------------ %% @doc Start access control server. --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +-spec(start_link() -> {ok, pid()} | {error, term()}). start_link() -> + start_with(fun register_default_acl/0). + +start_with(Fun) -> case gen_server:start_link({local, ?SERVER}, ?MODULE, [], []) of {ok, Pid} -> - ok = register_default_mod(), - {ok, Pid}; + Fun(), {ok, Pid}; {error, Reason} -> {error, Reason} end. -register_default_mod() -> +register_default_acl() -> case emqx_config:get_env(acl_file) of undefined -> ok; - File -> - emqx_access_control:register_mod(acl, emqx_acl_internal, [File]) + File -> register_mod(acl, emqx_acl_internal, [File]) end. -%% @doc Authenticate Client. --spec(authenticate(Client :: client(), Password :: password()) - -> ok | {ok, boolean()} | {error, term()}). -authenticate(Client, Password) when is_record(Client, client) -> - authenticate(Client, Password, lookup_mods(auth)). +-spec(authenticate(credentials(), password()) + -> ok | {ok, map()} | {continue, map()} | {error, term()}). +authenticate(Credentials, Password) -> + authenticate(Credentials, Password, lookup_mods(auth)). -authenticate(#client{zone = Zone}, _Password, []) -> +authenticate(Credentials, _Password, []) -> + Zone = maps:get(zone, Credentials, undefined), case emqx_zone:get_env(Zone, allow_anonymous, false) of true -> ok; - false -> {error, "No auth module to check!"} + false -> {error, auth_modules_not_found} end; -authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) -> - case catch Mod:check(Client, Password, State) of - ok -> ok; - {ok, IsSuper} -> {ok, IsSuper}; - ignore -> authenticate(Client, Password, Mods); - {error, Reason} -> {error, Reason}; - {'EXIT', Error} -> {error, Error} +authenticate(Credentials, Password, [{Mod, State, _Seq} | Mods]) -> + case catch Mod:check(Credentials, Password, State) of + ok -> ok; + {ok, IsSuper} when is_boolean(IsSuper) -> + {ok, #{is_superuser => IsSuper}}; + {ok, Result} when is_map(Result) -> + {ok, Result}; + {continue, Result} when is_map(Result) -> + {continue, Result}; + ignore -> + authenticate(Credentials, Password, Mods); + {error, Reason} -> + {error, Reason}; + {'EXIT', Error} -> + {error, Error} end. -%% @doc Check ACL --spec(check_acl(client(), pubsub(), topic()) -> allow | deny). -check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> - check_acl(Client, PubSub, Topic, lookup_mods(acl)). +-spec(check_acl(credentials(), pubsub(), topic()) -> allow | deny). +check_acl(Credentials, PubSub, Topic) when ?PS(PubSub) -> + check_acl(Credentials, PubSub, Topic, lookup_mods(acl)). -check_acl(#client{zone = Zone}, _PubSub, _Topic, []) -> +check_acl(Credentials, _PubSub, _Topic, []) -> + Zone = maps:get(zone, Credentials, undefined), emqx_zone:get_env(Zone, acl_nomatch, deny); -check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> - case Mod:check_acl({Client, PubSub, Topic}, State) of +check_acl(Credentials, PubSub, Topic, [{Mod, State, _Seq}|Mods]) -> + case Mod:check_acl({Credentials, PubSub, Topic}, State) of + ignore -> check_acl(Credentials, PubSub, Topic, Mods); allow -> allow; - deny -> deny; - ignore -> check_acl(Client, PubSub, Topic, AclMods) + deny -> deny end. -%% @doc Reload ACL Rules. --spec(reload_acl() -> list(ok | {error, already_existed})). +-spec(reload_acl() -> list(ok | {error, term()})). reload_acl() -> [Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)]. -%% @doc Register Authentication or ACL module. --spec(register_mod(auth | acl, atom(), list()) -> ok | {error, term()}). +%% @doc Register an Auth/ACL module. +-spec(register_mod(auth | acl, module(), list()) -> ok | {error, term()}). register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl -> register_mod(Type, Mod, Opts, 0). --spec(register_mod(auth | acl, atom(), list(), non_neg_integer()) +-spec(register_mod(auth | acl, module(), list(), non_neg_integer()) -> ok | {error, term()}). register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl-> gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}). -%% @doc Unregister authentication or ACL module --spec(unregister_mod(Type :: auth | acl, Mod :: atom()) - -> ok | {error, not_found | term()}). +%% @doc Unregister an Auth/ACL module. +-spec(unregister_mod(auth | acl, module()) -> ok | {error, not_found | term()}). unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl -> gen_server:call(?SERVER, {unregister_mod, Type, Mod}). -%% @doc Lookup authentication or ACL modules. +%% @doc Lookup all Auth/ACL modules. -spec(lookup_mods(auth | acl) -> list()). lookup_mods(Type) -> case ets:lookup(?TAB, tab_key(Type)) of @@ -126,19 +131,12 @@ lookup_mods(Type) -> tab_key(auth) -> auth_modules; tab_key(acl) -> acl_modules. -%% @doc Stop access control server. stop() -> - gen_server:stop(?MODULE, normal, infinity). + gen_server:stop(?SERVER, normal, infinity). -%%TODO: Support ACL cache... -clean_acl_cache(_ClientId) -> - ok. -clean_acl_cache(_ClientId, _Topic) -> - ok. - -%%-------------------------------------------------------------------- +%%----------------------------------------------------------------------------- %% gen_server callbacks -%%-------------------------------------------------------------------- +%%----------------------------------------------------------------------------- init([]) -> _ = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]), @@ -146,31 +144,31 @@ init([]) -> handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> Mods = lookup_mods(Type), - Existed = lists:keyfind(Mod, 1, Mods), - {reply, if_existed(Existed, fun() -> - case catch Mod:init(Opts) of - {ok, ModState} -> - NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) -> - Seq1 >= Seq2 - end, [{Mod, ModState, Seq} | Mods]), - ets:insert(?TAB, {tab_key(Type), NewMods}), - ok; - {error, Error} -> - {error, Error}; - {'EXIT', Reason} -> - {error, Reason} - end - end), State}; + reply(case lists:keyfind(Mod, 1, Mods) of + true -> + {error, already_existed}; + false -> + case catch Mod:init(Opts) of + {ok, ModState} -> + NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) -> + Seq1 >= Seq2 + end, [{Mod, ModState, Seq} | Mods]), + ets:insert(?TAB, {tab_key(Type), NewMods}), ok; + {error, Error} -> + {error, Error}; + {'EXIT', Reason} -> + {error, Reason} + end + end, State); handle_call({unregister_mod, Type, Mod}, _From, State) -> Mods = lookup_mods(Type), - case lists:keyfind(Mod, 1, Mods) of - false -> - {reply, {error, not_found}, State}; - _ -> - _ = ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), - {reply, ok, State} - end; + reply(case lists:keyfind(Mod, 1, Mods) of + false -> + {error, not_found}; + true -> + ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), ok + end, State); handle_call(stop, _From, State) -> {stop, normal, ok, State}; @@ -197,8 +195,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -if_existed(false, Fun) -> - Fun(); -if_existed(_Mod, _Fun) -> - {error, already_existed}. +reply(Reply, State) -> + {reply, Reply, State}. diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index 91c601db4..2f5d190a9 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -17,9 +17,9 @@ -include("emqx.hrl"). -type(who() :: all | binary() | - {ipaddr, esockd_cidr:cidr_string()} | {client, binary()} | - {user, binary()}). + {user, binary()} | + {ipaddr, esockd_cidr:cidr_string()}). -type(access() :: subscribe | publish | pubsub). @@ -30,7 +30,8 @@ -export_type([rule/0]). --export([compile/1, match/3]). +-export([compile/1]). +-export([match/3]). -define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))). @@ -71,7 +72,8 @@ compile(topic, Topic) -> end. 'pattern?'(Words) -> - lists:member(<<"%u">>, Words) orelse lists:member(<<"%c">>, Words). + lists:member(<<"%u">>, Words) + orelse lists:member(<<"%c">>, Words). bin(L) when is_list(L) -> list_to_binary(L); @@ -79,69 +81,70 @@ bin(B) when is_binary(B) -> B. %% @doc Match access rule --spec(match(client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch). -match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) -> +-spec(match(credentials(), topic(), rule()) + -> {matched, allow} | {matched, deny} | nomatch). +match(_Credentials, _Topic, {AllowDeny, all}) when ?ALLOW_DENY(AllowDeny) -> {matched, AllowDeny}; -match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters}) - when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) -> - case match_who(Client, Who) - andalso match_topics(Client, Topic, TopicFilters) of +match(Credentials, Topic, {AllowDeny, Who, _PubSub, TopicFilters}) + when ?ALLOW_DENY(AllowDeny) -> + case match_who(Credentials, Who) + andalso match_topics(Credentials, Topic, TopicFilters) of true -> {matched, AllowDeny}; false -> nomatch end. -match_who(_Client, all) -> +match_who(_Credentials, all) -> true; -match_who(_Client, {user, all}) -> +match_who(_Credentials, {user, all}) -> true; -match_who(_Client, {client, all}) -> +match_who(_Credentials, {client, all}) -> true; -match_who(#client{id = ClientId}, {client, ClientId}) -> +match_who(#{client_id := ClientId}, {client, ClientId}) -> true; -match_who(#client{username = Username}, {user, Username}) -> +match_who(#{username := Username}, {user, Username}) -> true; -match_who(#client{peername = undefined}, {ipaddr, _Tup}) -> +match_who(#{peername := undefined}, {ipaddr, _Tup}) -> false; -match_who(#client{peername = {IP, _}}, {ipaddr, CIDR}) -> +match_who(#{peername := {IP, _}}, {ipaddr, CIDR}) -> esockd_cidr:match(IP, CIDR); -match_who(Client, {'and', Conds}) when is_list(Conds) -> +match_who(Credentials, {'and', Conds}) when is_list(Conds) -> lists:foldl(fun(Who, Allow) -> - match_who(Client, Who) andalso Allow + match_who(Credentials, Who) andalso Allow end, true, Conds); -match_who(Client, {'or', Conds}) when is_list(Conds) -> +match_who(Credentials, {'or', Conds}) when is_list(Conds) -> lists:foldl(fun(Who, Allow) -> - match_who(Client, Who) orelse Allow + match_who(Credentials, Who) orelse Allow end, false, Conds); -match_who(_Client, _Who) -> +match_who(_Credentials, _Who) -> false. -match_topics(_Client, _Topic, []) -> +match_topics(_Credentials, _Topic, []) -> false; -match_topics(Client, Topic, [{pattern, PatternFilter}|Filters]) -> - TopicFilter = feed_var(Client, PatternFilter), +match_topics(Credentials, Topic, [{pattern, PatternFilter}|Filters]) -> + TopicFilter = feed_var(Credentials, PatternFilter), match_topic(emqx_topic:words(Topic), TopicFilter) - orelse match_topics(Client, Topic, Filters); -match_topics(Client, Topic, [TopicFilter|Filters]) -> + orelse match_topics(Credentials, Topic, Filters); +match_topics(Credentials, Topic, [TopicFilter|Filters]) -> match_topic(emqx_topic:words(Topic), TopicFilter) - orelse match_topics(Client, Topic, Filters). + orelse match_topics(Credentials, Topic, Filters). match_topic(Topic, {eq, TopicFilter}) -> - Topic =:= TopicFilter; + Topic == TopicFilter; match_topic(Topic, TopicFilter) -> emqx_topic:match(Topic, TopicFilter). -feed_var(Client, Pattern) -> - feed_var(Client, Pattern, []). -feed_var(_Client, [], Acc) -> +feed_var(Credentials, Pattern) -> + feed_var(Credentials, Pattern, []). +feed_var(_Credentials, [], Acc) -> lists:reverse(Acc); -feed_var(Client = #client{id = undefined}, [<<"%c">>|Words], Acc) -> - feed_var(Client, Words, [<<"%c">>|Acc]); -feed_var(Client = #client{id = ClientId}, [<<"%c">>|Words], Acc) -> - feed_var(Client, Words, [ClientId |Acc]); -feed_var(Client = #client{username = undefined}, [<<"%u">>|Words], Acc) -> - feed_var(Client, Words, [<<"%u">>|Acc]); -feed_var(Client = #client{username = Username}, [<<"%u">>|Words], Acc) -> - feed_var(Client, Words, [Username|Acc]); -feed_var(Client, [W|Words], Acc) -> - feed_var(Client, Words, [W|Acc]). +feed_var(Credentials = #{client_id := undefined}, [<<"%c">>|Words], Acc) -> + feed_var(Credentials, Words, [<<"%c">>|Acc]); +feed_var(Credentials = #{client_id := ClientId}, [<<"%c">>|Words], Acc) -> + feed_var(Credentials, Words, [ClientId |Acc]); +feed_var(Credentials = #{username := undefined}, [<<"%u">>|Words], Acc) -> + feed_var(Credentials, Words, [<<"%u">>|Acc]); +feed_var(Credentials = #{username := Username}, [<<"%u">>|Words], Acc) -> + feed_var(Credentials, Words, [Username|Acc]); +feed_var(Credentials, [W|Words], Acc) -> + feed_var(Credentials, Words, [W|Acc]). diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index 65a3199ae..07aada812 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -25,11 +25,11 @@ -define(ACL_RULE_TAB, emqx_acl_rule). --record(state, {config}). +-record(state, {acl_file}). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% API -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% @doc Read all rules -spec(all_rules() -> list(emqx_access_rule:rule())). @@ -39,17 +39,17 @@ all_rules() -> [{_, Rules}] -> Rules end. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% ACL callbacks -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% @doc Init internal ACL --spec(init([File :: string()]) -> {ok, State :: any()}). +-spec(init([File :: string()]) -> {ok, State :: term()}). init([File]) -> _ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]), - {ok, load_rules_from_file(#state{config = File})}. + {ok, load_rules_from_file(#state{acl_file = File})}. -load_rules_from_file(State = #state{config = AclFile}) -> +load_rules_from_file(State = #state{acl_file = AclFile}) -> {ok, Terms} = file:consult(AclFile), Rules = [emqx_access_rule:compile(Term) || Term <- Terms], lists:foreach(fun(PubSub) -> @@ -73,15 +73,12 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> false. %% @doc Check ACL --spec(check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when - Client :: client(), - PubSub :: pubsub(), - Topic :: topic(), - State :: #state{}). -check_acl(_Who, #state{config = undefined}) -> +-spec(check_acl({credentials(), pubsub(), topic()}, #state{}) + -> allow | deny | ignore). +check_acl(_Who, #state{acl_file = undefined}) -> allow; -check_acl({Client, PubSub, Topic}, #state{}) -> - case match(Client, Topic, lookup(PubSub)) of +check_acl({Credentials, PubSub, Topic}, #state{}) -> + case match(Credentials, Topic, lookup(PubSub)) of {matched, allow} -> allow; {matched, deny} -> deny; nomatch -> ignore @@ -93,26 +90,24 @@ lookup(PubSub) -> [{PubSub, Rules}] -> Rules end. -match(_Client, _Topic, []) -> +match(_Credentials, _Topic, []) -> nomatch; - -match(Client, Topic, [Rule|Rules]) -> - case emqx_access_rule:match(Client, Topic, Rule) of - nomatch -> match(Client, Topic, Rules); +match(Credentials, Topic, [Rule|Rules]) -> + case emqx_access_rule:match(Credentials, Topic, Rule) of + nomatch -> match(Credentials, Topic, Rules); {matched, AllowDeny} -> {matched, AllowDeny} end. -%% @doc Reload ACL --spec(reload_acl(State :: #state{}) -> ok | {error, Reason :: any()}). -reload_acl(#state{config = undefined}) -> +-spec(reload_acl(#state{}) -> ok | {error, term()}). +reload_acl(#state{acl_file = undefined}) -> ok; reload_acl(State) -> case catch load_rules_from_file(State) of - {'EXIT', Error} -> {error, Error}; - true -> io:format("~s~n", ["reload acl_internal successfully"]), ok + {'EXIT', Error} -> + {error, Error}; + true -> ok end. -%% @doc ACL Module Description -spec(description() -> string()). description() -> "Internal ACL with etc/acl.conf". diff --git a/src/emqx_acl_mod.erl b/src/emqx_acl_mod.erl index 85844b042..716b27967 100644 --- a/src/emqx_acl_mod.erl +++ b/src/emqx_acl_mod.erl @@ -22,14 +22,12 @@ -ifdef(use_specs). --callback(init(AclOpts :: list()) -> {ok, State :: any()}). +-callback(init(AclOpts :: list()) -> {ok, State :: term()}). --callback(check_acl({Client :: client(), - PubSub :: pubsub(), - Topic :: topic()}, State :: any()) +-callback(check_acl({credentials(), pubsub(), topic()}, State :: term()) -> allow | deny | ignore). --callback(reload_acl(State :: any()) -> ok | {error, term()}). +-callback(reload_acl(State :: term()) -> ok | {error, term()}). -callback(description() -> string()). diff --git a/src/emqx_auth_mod.erl b/src/emqx_auth_mod.erl index 65298ef9b..8f03eb4fa 100644 --- a/src/emqx_auth_mod.erl +++ b/src/emqx_auth_mod.erl @@ -22,13 +22,11 @@ -ifdef(use_specs). --callback(init(AuthOpts :: list()) -> {ok, State :: any()}). - --callback(check(Client :: client(), - Password :: binary(), - State :: any()) - -> ok | {ok, boolean()} | ignore | {error, string()}). +-callback(init(AuthOpts :: list()) -> {ok, State :: term()}). +-callback(check(credentials(), password(), State :: term()) + -> ok | {ok, boolean()} | {ok, map()} | + {continue, map()} | ignore | {error, term()}). -callback(description() -> string()). -else. diff --git a/src/emqx_types.erl b/src/emqx_types.erl new file mode 100644 index 000000000..de1f5df4b --- /dev/null +++ b/src/emqx_types.erl @@ -0,0 +1,38 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_types). + +%%-include("emqx.hrl"). + +-export_type([zone/0, client_id/0, username/0, password/0, peername/0, + protocol/0, credentials/0]). +%%-export_type([payload/0, message/0, delivery/0]). + +-type(zone() :: atom()). +-type(client_id() :: binary() | atom()). +-type(username() :: binary() | undefined). +-type(password() :: binary() | undefined). +-type(peername() :: {inet:ip_address(), inet:port_number()}). +-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()). +-type(credentials() :: #{client_id := client_id(), + username := username(), + peername := peername(), + zone => zone(), + atom() => term()}). + +-type(payload() :: binary() | iodata()). +%-type(message() :: #message{}). +%-type(delivery() :: #delivery{}). + diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index c36b484c6..d488097bd 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -247,6 +247,7 @@ terminate(SockError, _Req, #state{keepalive = Keepalive, case Reason of undefined -> ok; + %%TODO: %%emqx_protocol:shutdown(SockError, ProtoState); _ -> ok%%emqx_protocol:shutdown(Reason, ProtoState)