diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 6fbd7d217..59ff7c1ef 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -321,9 +321,13 @@ handle_in(?AUTH_PACKET(), Channel) -> handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> shutdown(Reason, Channel); +handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = connecting}) -> + shutdown(frame_too_large, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) -> shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel); +handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = connected}) -> + handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = connected}) -> handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 161f7d8e5..1f9f81bc5 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -267,7 +267,7 @@ t_handle_in_frame_error(_) -> ConnackPacket = ?CONNACK_PACKET(?RC_MALFORMED_PACKET), {shutdown, frame_too_large, ConnackPacket, _} = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan), - DisconnectPacket = ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), + DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE), ConnectedChan = channel(#{conn_state => connected}), {ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _} = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan), diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index c148b14e0..985d91684 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -22,6 +22,8 @@ -include("emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). +-define(CM, emqx_cm). + %%-------------------------------------------------------------------- %% CT callbacks %%-------------------------------------------------------------------- @@ -81,10 +83,55 @@ t_open_session(_) -> ?assertEqual(100, emqx_session:info(inflight_max, Session2)). t_discard_session(_) -> - ok = emqx_cm:discard_session(<<"clientid">>). + ok = meck:new(emqx_connection, [passthrough, no_history]), + ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), + ok = emqx_cm:discard_session(<<"clientid">>), + ok = emqx_cm:register_channel(<<"clientid">>), + ok = emqx_cm:discard_session(<<"clientid">>), + ok = emqx_cm:unregister_channel(<<"clientid">>), + ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), + ok = emqx_cm:discard_session(<<"clientid">>), + ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end), + ok = emqx_cm:discard_session(<<"clientid">>), + ok = emqx_cm:unregister_channel(<<"clientid">>), + ok = meck:unload(emqx_connection). t_takeover_session(_) -> - {error, not_found} = emqx_cm:takeover_session(<<"clientid">>). + ok = meck:new(emqx_connection, [passthrough, no_history]), + ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), + {error, not_found} = emqx_cm:takeover_session(<<"clientid">>), + ok = emqx_cm:register_channel(<<"clientid">>), + {error, not_found} = emqx_cm:takeover_session(<<"clientid">>), + ok = emqx_cm:unregister_channel(<<"clientid">>), + ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), + Pid = self(), + {ok, emqx_connection, Pid, test} = emqx_cm:takeover_session(<<"clientid">>), + erlang:spawn(fun() -> + ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), + timer:sleep(1000) + end), + ct:sleep(100), + {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), + ok = emqx_cm:unregister_channel(<<"clientid">>), + ok = meck:unload(emqx_connection). + +t_kick_session(_) -> + ok = meck:new(emqx_connection, [passthrough, no_history]), + ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), + {error, not_found} = emqx_cm:kick_session(<<"clientid">>), + ok = emqx_cm:register_channel(<<"clientid">>), + {error, not_found} = emqx_cm:kick_session(<<"clientid">>), + ok = emqx_cm:unregister_channel(<<"clientid">>), + ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), + test = emqx_cm:kick_session(<<"clientid">>), + erlang:spawn(fun() -> + ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), + timer:sleep(1000) + end), + ct:sleep(100), + test = emqx_cm:kick_session(<<"clientid">>), + ok = emqx_cm:unregister_channel(<<"clientid">>), + ok = meck:unload(emqx_connection). t_lock_clientid(_) -> {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>), @@ -92,27 +139,7 @@ t_lock_clientid(_) -> {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>), {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>). - -% t_unregister_channel(_) -> -% error('TODO'). - -% t_get_chan_attrs(_) -> -% error('TODO'). - -% t_get_chan_stats(_) -> -% error('TODO'). - -% t_lookup_channels(_) -> -% error('TODO'). - -% t_set_chan_stats(_) -> -% error('TODO'). - -% t_set_chan_attrs(_) -> -% error('TODO'). - -% t_register_channel(_) -> -% error('TODO'). - -% t_stats_fun(_) -> -% error('TODO'). +t_message(_) -> + ?CM ! testing, + gen_server:cast(?CM, testing), + gen_server:call(?CM, testing). diff --git a/test/emqx_router_helper_SUITE.erl b/test/emqx_router_helper_SUITE.erl index 55bb6c5d7..f7ce4b11d 100644 --- a/test/emqx_router_helper_SUITE.erl +++ b/test/emqx_router_helper_SUITE.erl @@ -21,20 +21,30 @@ -include_lib("eunit/include/eunit.hrl"). +-define(ROUTER_HELPER, emqx_router_helper). + all() -> emqx_ct:all(?MODULE). init_per_testcase(_TestCase, Config) -> + emqx_ct_helpers:start_apps([emqx]), Config. end_per_testcase(_TestCase, Config) -> Config. -% t_mnesia(_) -> -% error('TODO'). +t_monitor(_) -> + ok = emqx_router_helper:monitor({undefined, node()}), + emqx_router_helper:monitor(undefined). -% t_monitor(_) -> -% error('TODO'). - -% t_stats_fun(_) -> -% error('TODO'). +t_mnesia(_) -> + ?ROUTER_HELPER ! {mnesia_table_event, {delete, {emqx_routing_node, node()}, undefined}}, + ?ROUTER_HELPER ! {mnesia_table_event, testing}, + ?ROUTER_HELPER ! {mnesia_table_event, {write, {emqx_routing_node, node()}, undefined}}, + ?ROUTER_HELPER ! {membership, testing}, + ?ROUTER_HELPER ! {membership, {mnesia, down, node()}}, + ct:sleep(200). +t_message(_) -> + ?ROUTER_HELPER ! testing, + gen_server:cast(?ROUTER_HELPER, testing), + gen_server:call(?ROUTER_HELPER, testing).