From 43284768d046aab0cfe207f8d75f6d587ca11d5c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 13 Jan 2022 15:56:23 +0800 Subject: [PATCH] chore(gw): more code coverage for emqx_gateway_conn module --- .../src/bhvrs/emqx_gateway_conn.erl | 25 ++++--- apps/emqx_gateway/src/emqx_gateway_ctx.erl | 16 ++--- apps/emqx_gateway/src/emqx_gateway_http.erl | 32 ++++----- .../src/emqx_gateway_insta_sup.erl | 4 +- .../test/emqx_gateway_ctx_SUITE.erl | 67 +++++++++++++++++++ .../test/emqx_sn_protocol_SUITE.erl | 1 - apps/emqx_gateway/test/emqx_stomp_SUITE.erl | 34 ++++++++++ 7 files changed, 139 insertions(+), 40 deletions(-) create mode 100644 apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 51653d3c0..8e0c3d7ba 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -720,20 +720,29 @@ serialize_and_inc_stats_fun(#state{ channel = Channel}) -> Ctx = ChannMod:info(ctx, Channel), fun(Packet) -> - case FrameMod:serialize_pkt(Packet, Serialize) of - <<>> -> + try + Data = FrameMod:serialize_pkt(Packet, Serialize), + ?SLOG(debug, #{ msg => "SEND_packet" + %% XXX: optimize it, less cpu comsuption? + , packet => FrameMod:format(Packet) + }), + ok = inc_outgoing_stats(Ctx, FrameMod, Packet), + Data + catch + _ : too_large -> ?SLOG(warning, #{ msg => "packet_too_large_discarded" , packet => FrameMod:format(Packet) }), ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'), ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), <<>>; - Data -> - ?SLOG(debug, #{ msg => "SEND_packet" - , packet => FrameMod:format(Packet) - }), - ok = inc_outgoing_stats(Ctx, FrameMod, Packet), - Data + _ : Reason -> + ?SLOG(warning, #{ msg => "packet_serialize_failure" + , reason => Reason + , packet => FrameMod:format(Packet) + }), + ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), + <<>> end end. diff --git a/apps/emqx_gateway/src/emqx_gateway_ctx.erl b/apps/emqx_gateway/src/emqx_gateway_ctx.erl index ca170814f..ab81c1ddb 100644 --- a/apps/emqx_gateway/src/emqx_gateway_ctx.erl +++ b/apps/emqx_gateway/src/emqx_gateway_ctx.erl @@ -30,7 +30,7 @@ #{ %% Gateway Name gwname := gateway_name() %% Authentication chains - , auth := [emqx_authentication:chain_name()] | undefined + , auth := [emqx_authentication:chain_name()] %% The ConnectionManager PID , cm := pid() }. @@ -64,18 +64,15 @@ -spec authenticate(context(), emqx_types:clientinfo()) -> {ok, emqx_types:clientinfo()} | {error, any()}. -authenticate(_Ctx = #{auth := undefined}, ClientInfo) -> - {ok, mountpoint(ClientInfo)}; -authenticate(_Ctx = #{auth := _ChainName}, ClientInfo0) -> +authenticate(_Ctx = #{auth := _ChainNames}, ClientInfo0) + when is_list(_ChainNames) -> ClientInfo = ClientInfo0#{zone => default}, case emqx_access_control:authenticate(ClientInfo) of {ok, _} -> {ok, mountpoint(ClientInfo)}; {error, Reason} -> {error, Reason} - end; -authenticate(_Ctx, ClientInfo) -> - {ok, mountpoint(ClientInfo)}. + end. %% @doc Register the session to the cluster. %% @@ -95,11 +92,6 @@ open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session). -open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> - logger:warning("clean_start=false is not supported now, " - "fallback to clean_start mode"), - open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun, SessionMod); - open_session(_Ctx = #{gwname := GwName}, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> emqx_gateway_cm:open_session(GwName, CleanStart, diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 7a1ac519d..641f29932 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -79,7 +79,6 @@ -elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, no_nested_try_catch, disable}]). - -define(DEFAULT_CALL_TIMEOUT, 15000). %%-------------------------------------------------------------------- @@ -336,53 +335,51 @@ return_http_error(Code, Msg) -> }) }. --spec reason2msg({atom(), map()} | any()) -> error | io_lib:chars(). +-spec reason2msg({atom(), map()} | any()) -> error | string(). reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) -> - io_lib:format("Bad config value '~s' for '~s', reason: ~s", - [Value, Key, Reason]); + fmtstr("Bad config value '~s' for '~s', reason: ~s", [Value, Key, Reason]); reason2msg({badres, #{resource := gateway, gateway := GwName, reason := not_found}}) -> - io_lib:format("The ~s gateway is unloaded", [GwName]); + fmtstr("The ~s gateway is unloaded", [GwName]); reason2msg({badres, #{resource := gateway, gateway := GwName, reason := already_exist}}) -> - io_lib:format("The ~s gateway already loaded", [GwName]); + fmtstr("The ~s gateway already loaded", [GwName]); reason2msg({badres, #{resource := listener, listener := {GwName, LType, LName}, reason := not_found}}) -> - io_lib:format("Listener ~s not found", - [listener_id(GwName, LType, LName)]); + fmtstr("Listener ~s not found", [listener_id(GwName, LType, LName)]); reason2msg({badres, #{resource := listener, listener := {GwName, LType, LName}, reason := already_exist}}) -> - io_lib:format("The listener ~s of ~s already exist", - [listener_id(GwName, LType, LName), GwName]); + fmtstr("The listener ~s of ~s already exist", + [listener_id(GwName, LType, LName), GwName]); reason2msg({badres, #{resource := authn, gateway := GwName, reason := not_found}}) -> - io_lib:format("The authentication not found on ~s", [GwName]); + fmtstr("The authentication not found on ~s", [GwName]); reason2msg({badres, #{resource := authn, gateway := GwName, reason := already_exist}}) -> - io_lib:format("The authentication already exist on ~s", [GwName]); + fmtstr("The authentication already exist on ~s", [GwName]); reason2msg({badres, #{resource := listener_authn, listener := {GwName, LType, LName}, reason := not_found}}) -> - io_lib:format("The authentication not found on ~s", - [listener_id(GwName, LType, LName)]); + fmtstr("The authentication not found on ~s", + [listener_id(GwName, LType, LName)]); reason2msg({badres, #{resource := listener_authn, listener := {GwName, LType, LName}, reason := already_exist}}) -> - io_lib:format("The authentication already exist on ~s", - [listener_id(GwName, LType, LName)]); + fmtstr("The authentication already exist on ~s", + [listener_id(GwName, LType, LName)]); reason2msg(_) -> error. @@ -393,6 +390,9 @@ codestr(405) -> 'METHOD_NOT_ALLOWED'; codestr(500) -> 'UNKNOW_ERROR'; codestr(501) -> 'NOT_IMPLEMENTED'. +fmtstr(Fmt, Args) -> + lists:flatten(io_lib:format(Fmt, Args)). + -spec with_authn(binary(), function()) -> any(). with_authn(GwName0, Fun) -> with_gateway(GwName0, fun(GwName, _GwConf) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 096ab1f81..ddeb3620d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -122,7 +122,6 @@ handle_call(info, _From, State) -> {reply, detailed_gateway_info(State), State}; handle_call(disable, _From, State = #state{status = Status}) -> - %% XXX: The `disable` opertaion is not persist to config database case Status of running -> case cb_gateway_unload(State) of @@ -328,7 +327,7 @@ do_update_one_by_one(NCfg, State = #state{ AuthnNames = init_authn(State#state.name, NCfg), State#state{authns = AuthnNames} end, - %% XXX: minimum impact update ??? + %% TODO: minimum impact update ??? cb_gateway_update(NCfg, NState); {running, false} -> case cb_gateway_unload(State) of @@ -413,7 +412,6 @@ cb_gateway_update(Config, case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of {error, Reason} -> {error, Reason}; {ok, ChildPidOrSpecs, NGwState} -> - %% XXX: Hot-upgrade ??? ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ config = Config, diff --git a/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl new file mode 100644 index 000000000..e09e5bc1b --- /dev/null +++ b/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl @@ -0,0 +1,67 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_ctx_SUITE). + +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). +-compile(nowarn_export_all). + +%%-------------------------------------------------------------------- +%% setups +%%-------------------------------------------------------------------- + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Conf) -> + ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_access_control, authenticate, + fun(#{clientid := bad_client}) -> + {error, bad_username_or_password}; + (ClientInfo) -> {ok, ClientInfo} + end), + Conf. + +end_per_suite(_Conf) -> + ok. + +%%-------------------------------------------------------------------- +%% cases +%%-------------------------------------------------------------------- + +t_authenticate(_) -> + Ctx = #{gwname => mqttsn, auth => [], cm => self()}, + Info1 = #{ mountpoint => undefined + , clientid => <<"user1">> + }, + NInfo1 = zone(Info1), + ?assertEqual({ok, NInfo1}, emqx_gateway_ctx:authenticate(Ctx, Info1)), + + Info2 = #{ mountpoint => <<"mqttsn/${clientid}/">> + , clientid => <<"user1">> + }, + NInfo2 = zone(Info2#{mountpoint => <<"mqttsn/user1/">>}), + ?assertEqual({ok, NInfo2}, emqx_gateway_ctx:authenticate(Ctx, Info2)), + + Info3 = #{ mountpoint => <<"mqttsn/${clientid}/">> + , clientid => bad_client + }, + {error, bad_username_or_password} + = emqx_gateway_ctx:authenticate(Ctx, Info3), + ok. + +zone(Info) -> Info#{zone => default}. diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 114a5ceed..ab333cf87 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -648,7 +648,6 @@ t_publish_qos0_case05(_) -> gen_udp:close(Socket). - t_publish_qos0_case06(_) -> Dup = 0, QoS = 0, diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl index f2ed76114..53aae7562 100644 --- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl @@ -351,6 +351,40 @@ t_ack(_) -> body = _}, _, _} = parse(Data4) end). +t_1000_msg_send(_) -> + with_connection(fun(Sock) -> + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _, _} = parse(Data), + + Topic = <<"/queue/foo">>, + SendFun = fun() -> + gen_tcp:send(Sock, serialize(<<"SEND">>, + [{<<"destination">>, Topic}], + <<"msgtest">>)) + end, + + RecvFun = fun() -> + receive + {deliver, Topic, _Msg}-> + ok + after 100 -> + ?assert(false, "waiting message timeout") + end + end, + + emqx:subscribe(Topic), + lists:foreach(fun(_) -> SendFun() end, lists:seq(1, 1000)), + lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000)) + end). + t_rest_clienit_info(_) -> with_connection(fun(Sock) -> gen_tcp:send(Sock, serialize(<<"CONNECT">>,