Merge pull request #4186 from zmstone/chore-sytle-fix-elvis
Chore sytle fix elvis
This commit is contained in:
commit
9c5c99c94f
|
@ -71,14 +71,13 @@ compile(topic, {eq, Topic}) ->
|
|||
{eq, emqx_topic:words(bin(Topic))};
|
||||
compile(topic, Topic) ->
|
||||
Words = emqx_topic:words(bin(Topic)),
|
||||
case 'pattern?'(Words) of
|
||||
case pattern(Words) of
|
||||
true -> {pattern, Words};
|
||||
false -> Words
|
||||
end.
|
||||
|
||||
'pattern?'(Words) ->
|
||||
lists:member(<<"%u">>, Words)
|
||||
orelse lists:member(<<"%c">>, Words).
|
||||
pattern(Words) ->
|
||||
lists:member(<<"%u">>, Words) orelse lists:member(<<"%c">>, Words).
|
||||
|
||||
bin(L) when is_list(L) ->
|
||||
list_to_binary(L);
|
||||
|
|
|
@ -29,6 +29,8 @@
|
|||
-compile(nowarn_export_all).
|
||||
-endif.
|
||||
|
||||
-elvis([{elvis_style, invalid_dynamic_call, #{ ignore => [emqx_connection]}}]).
|
||||
|
||||
%% API
|
||||
-export([ start_link/3
|
||||
, stop/1
|
||||
|
@ -661,7 +663,7 @@ tcp_congestion_alarm_details(Socket, Transport, Channel) ->
|
|||
|
||||
conn_info(Key, Channel) when Key =:= sockname; Key =:= peername ->
|
||||
{IPStr, Port} = emqx_channel:info(Key, Channel),
|
||||
{Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])};
|
||||
{Key, iolist_to_binary([inet:ntoa(IPStr), ":", integer_to_list(Port)])};
|
||||
conn_info(Key, Channel) ->
|
||||
{Key, emqx_channel:info(Key, Channel)}.
|
||||
|
||||
|
@ -671,9 +673,9 @@ conn_info(Key, Channel) ->
|
|||
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
||||
case activate_socket(State) of
|
||||
{ok, NState = #state{sockstate = NewSst}} ->
|
||||
if OldSst =/= NewSst ->
|
||||
{ok, {event, NewSst}, NState};
|
||||
true -> {ok, NState}
|
||||
case OldSst =/= NewSst of
|
||||
true -> {ok, {event, NewSst}, NState};
|
||||
false -> {ok, NState}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
handle_info({sock_error, Reason}, State)
|
||||
|
|
|
@ -81,7 +81,7 @@ get_mem_check_interval() ->
|
|||
|
||||
set_mem_check_interval(Seconds) when Seconds < 60 ->
|
||||
memsup:set_check_interval(1);
|
||||
set_mem_check_interval(Seconds) ->
|
||||
set_mem_check_interval(Seconds) ->
|
||||
memsup:set_check_interval(Seconds div 60).
|
||||
|
||||
get_sysmem_high_watermark() ->
|
||||
|
|
|
@ -162,86 +162,99 @@ t_append_msg(_) ->
|
|||
|
||||
t_handle_msg(_) ->
|
||||
From = {make_ref(), self()},
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({'$gen_call', From, for_testing}, st())),
|
||||
?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())),
|
||||
?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())),
|
||||
?assertMatch({ok, [], _St}, emqx_connection:handle_msg({tcp, From, <<"for_testing">>}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg(for_testing, st())).
|
||||
?assertMatch({ok, _St}, handle_msg({'$gen_call', From, for_testing}, st())),
|
||||
?assertMatch({stop, {shutdown,discarded}, _St}, handle_msg({'$gen_call', From, discard}, st())),
|
||||
?assertMatch({stop, {shutdown,discarded}, _St}, handle_msg({'$gen_call', From, discard}, st())),
|
||||
?assertMatch({ok, [], _St}, handle_msg({tcp, From, <<"for_testing">>}, st())),
|
||||
?assertMatch({ok, _St}, handle_msg(for_testing, st())).
|
||||
|
||||
t_handle_msg_incoming(_) ->
|
||||
?assertMatch({ok, _Out, _St}, emqx_connection:handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())),
|
||||
?assertEqual(ok, emqx_connection:handle_msg({incoming, ?PACKET(?PINGREQ)}, st())),
|
||||
?assertMatch({ok, _Out, _St},
|
||||
handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())),
|
||||
?assertEqual(ok, handle_msg({incoming, ?PACKET(?PINGREQ)}, st())),
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, undefined}, st())).
|
||||
?assertMatch({ok, _St},
|
||||
handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())),
|
||||
Sub1 = <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>,
|
||||
?assertMatch({ok, _St}, handle_msg({incoming, Sub1}, st())),
|
||||
Sub2 = <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>,
|
||||
?assertMatch({ok, _St}, handle_msg({incoming, Sub2}, st())),
|
||||
?assertMatch({ok, _St}, handle_msg({incoming, undefined}, st())).
|
||||
|
||||
t_handle_msg_outgoing(_) ->
|
||||
?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())),
|
||||
?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
|
||||
?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
|
||||
?assertEqual(ok, handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())),
|
||||
?assertEqual(ok, handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
|
||||
?assertEqual(ok, handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
|
||||
|
||||
t_handle_msg_tcp_error(_) ->
|
||||
?assertMatch({stop, {shutdown, econnreset}, _St}, emqx_connection:handle_msg({tcp_error, sock, econnreset}, st())).
|
||||
?assertMatch({stop, {shutdown, econnreset}, _St},
|
||||
handle_msg({tcp_error, sock, econnreset}, st())).
|
||||
|
||||
t_handle_msg_tcp_closed(_) ->
|
||||
?assertMatch({stop, {shutdown, tcp_closed}, _St}, emqx_connection:handle_msg({tcp_closed, sock}, st())).
|
||||
?assertMatch({stop, {shutdown, tcp_closed}, _St}, handle_msg({tcp_closed, sock}, st())).
|
||||
|
||||
t_handle_msg_passive(_) ->
|
||||
?assertMatch({ok, _Event, _St}, emqx_connection:handle_msg({tcp_passive, sock}, st())).
|
||||
|
||||
?assertMatch({ok, _Event, _St}, handle_msg({tcp_passive, sock}, st())).
|
||||
|
||||
t_handle_msg_deliver(_) ->
|
||||
ok = meck:expect(emqx_channel, handle_deliver, fun(_, Channel) -> {ok, Channel} end),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({deliver, topic, msg}, st())).
|
||||
|
||||
?assertMatch({ok, _St}, handle_msg({deliver, topic, msg}, st())).
|
||||
|
||||
t_handle_msg_inet_reply(_) ->
|
||||
ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))),
|
||||
?assertEqual(ok, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))),
|
||||
?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({inet_reply, for_testing, {error, for_testing}}, st())).
|
||||
?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))),
|
||||
?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))),
|
||||
?assertMatch({stop, {shutdown, for_testing}, _St},
|
||||
handle_msg({inet_reply, for_testing, {error, for_testing}}, st())).
|
||||
|
||||
t_handle_msg_connack(_) ->
|
||||
?assertEqual(ok, emqx_connection:handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
|
||||
?assertEqual(ok, handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
|
||||
|
||||
t_handle_msg_close(_) ->
|
||||
?assertMatch({stop, {shutdown, normal}, _St}, emqx_connection:handle_msg({close, normal}, st())).
|
||||
|
||||
?assertMatch({stop, {shutdown, normal}, _St}, handle_msg({close, normal}, st())).
|
||||
|
||||
t_handle_msg_event(_) ->
|
||||
ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
|
||||
ok = meck:expect(emqx_cm, insert_channel_info, fun(_, _, _) -> ok end),
|
||||
ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
|
||||
ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end),
|
||||
?assertEqual(ok, emqx_connection:handle_msg({event, connected}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({event, disconnected}, st())),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, st())).
|
||||
|
||||
?assertEqual(ok, handle_msg({event, connected}, st())),
|
||||
?assertMatch({ok, _St}, handle_msg({event, disconnected}, st())),
|
||||
?assertMatch({ok, _St}, handle_msg({event, undefined}, st())).
|
||||
|
||||
t_handle_msg_timeout(_) ->
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({timeout, make_ref(), for_testing}, st())).
|
||||
?assertMatch({ok, _St}, handle_msg({timeout, make_ref(), for_testing}, st())).
|
||||
|
||||
t_handle_msg_shutdown(_) ->
|
||||
?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({shutdown, for_testing}, st())).
|
||||
?assertMatch({stop, {shutdown, for_testing}, _St}, handle_msg({shutdown, for_testing}, st())).
|
||||
|
||||
t_handle_call(_) ->
|
||||
St = st(),
|
||||
?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, St)),
|
||||
?assertMatch({reply, _Info, _NSt}, emqx_connection:handle_call(self(), info, St)),
|
||||
?assertMatch({reply, _Stats, _NSt}, emqx_connection:handle_call(self(), stats, St)),
|
||||
?assertMatch({reply, ok, _NSt}, emqx_connection:handle_call(self(), {ratelimit, []}, St)),
|
||||
?assertMatch({reply, ok, _NSt}, emqx_connection:handle_call(self(), {ratelimit, [{conn_messages_in, {100, 1}}]}, St)),
|
||||
?assertEqual({reply, ignored, St}, emqx_connection:handle_call(self(), for_testing, St)),
|
||||
?assertMatch({stop, {shutdown,kicked}, ok, _NSt}, emqx_connection:handle_call(self(), kick, St)).
|
||||
?assertMatch({ok, _St}, handle_msg({event, undefined}, St)),
|
||||
?assertMatch({reply, _Info, _NSt}, handle_call(self(), info, St)),
|
||||
?assertMatch({reply, _Stats, _NSt}, handle_call(self(), stats, St)),
|
||||
?assertMatch({reply, ok, _NSt}, handle_call(self(), {ratelimit, []}, St)),
|
||||
?assertMatch({reply, ok, _NSt},
|
||||
handle_call(self(), {ratelimit, [{conn_messages_in, {100, 1}}]}, St)),
|
||||
?assertEqual({reply, ignored, St}, handle_call(self(), for_testing, St)),
|
||||
?assertMatch({stop, {shutdown,kicked}, ok, _NSt},
|
||||
handle_call(self(), kick, St)).
|
||||
|
||||
t_handle_timeout(_) ->
|
||||
TRef = make_ref(),
|
||||
State = st(#{idle_timer => TRef, limit_timer => TRef, stats_timer => TRef}),
|
||||
?assertMatch({stop, {shutdown,idle_timeout}, _NState}, emqx_connection:handle_timeout(TRef, idle_timeout, State)),
|
||||
?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_timeout(TRef, limit_timeout, State)),
|
||||
?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, emit_stats, State)),
|
||||
?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)),
|
||||
?assertMatch({stop, {shutdown,idle_timeout}, _NState},
|
||||
emqx_connection:handle_timeout(TRef, idle_timeout, State)),
|
||||
?assertMatch({ok, {event,running}, _NState},
|
||||
emqx_connection:handle_timeout(TRef, limit_timeout, State)),
|
||||
?assertMatch({ok, _NState},
|
||||
emqx_connection:handle_timeout(TRef, emit_stats, State)),
|
||||
?assertMatch({ok, _NState},
|
||||
emqx_connection:handle_timeout(TRef, keepalive, State)),
|
||||
|
||||
ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end),
|
||||
?assertMatch({stop, {shutdown,for_testing}, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)),
|
||||
?assertMatch({stop, {shutdown,for_testing}, _NState},
|
||||
emqx_connection:handle_timeout(TRef, keepalive, State)),
|
||||
?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, undefined, State)).
|
||||
|
||||
t_parse_incoming(_) ->
|
||||
|
@ -250,10 +263,12 @@ t_parse_incoming(_) ->
|
|||
|
||||
t_next_incoming_msgs(_) ->
|
||||
?assertEqual({incoming, packet}, emqx_connection:next_incoming_msgs([packet])),
|
||||
?assertEqual([{incoming, packet2}, {incoming, packet1}], emqx_connection:next_incoming_msgs([packet1, packet2])).
|
||||
?assertEqual([{incoming, packet2}, {incoming, packet1}],
|
||||
emqx_connection:next_incoming_msgs([packet1, packet2])).
|
||||
|
||||
t_handle_incoming(_) ->
|
||||
?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(?CONNECT_PACKET(#mqtt_packet_connect{}), st())),
|
||||
?assertMatch({ok, _Out, _NState},
|
||||
emqx_connection:handle_incoming(?CONNECT_PACKET(#mqtt_packet_connect{}), st())),
|
||||
?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(frame_error, st())).
|
||||
|
||||
t_with_channel(_) ->
|
||||
|
@ -265,33 +280,46 @@ t_with_channel(_) ->
|
|||
ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, Channel} end),
|
||||
?assertMatch({ok, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)),
|
||||
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, ?DISCONNECT_PACKET(),Channel} end),
|
||||
?assertMatch({ok, _Out, _NChannel}, emqx_connection:with_channel(handle_in, [for_testing], State)),
|
||||
ok = meck:expect(emqx_channel, handle_in,
|
||||
fun(_, _) -> Channel = channel(), {ok, ?DISCONNECT_PACKET(),Channel} end),
|
||||
?assertMatch({ok, _Out, _NChannel},
|
||||
emqx_connection:with_channel(handle_in, [for_testing], State)),
|
||||
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], Channel} end),
|
||||
?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)),
|
||||
ok = meck:expect(emqx_channel, handle_in,
|
||||
fun(_, _) -> Channel = channel(), {shutdown, [for_testing], Channel} end),
|
||||
?assertMatch({stop, {shutdown,[for_testing]}, _NState},
|
||||
emqx_connection:with_channel(handle_in, [for_testing], State)),
|
||||
|
||||
ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], ?DISCONNECT_PACKET(), Channel} end),
|
||||
?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)).
|
||||
ok = meck:expect(emqx_channel, handle_in,
|
||||
fun(_, _) ->
|
||||
Channel = channel(),
|
||||
{shutdown, [for_testing], ?DISCONNECT_PACKET(), Channel}
|
||||
end),
|
||||
?assertMatch({stop, {shutdown,[for_testing]}, _NState},
|
||||
emqx_connection:with_channel(handle_in, [for_testing], State)).
|
||||
|
||||
t_handle_outgoing(_) ->
|
||||
?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
|
||||
?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
|
||||
|
||||
t_handle_info(_) ->
|
||||
?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_info(activate_socket, st())),
|
||||
?assertMatch({stop, {shutdown, for_testing}, _NStats}, emqx_connection:handle_info({sock_error, for_testing}, st())),
|
||||
?assertMatch({ok, {event,running}, _NState},
|
||||
emqx_connection:handle_info(activate_socket, st())),
|
||||
?assertMatch({stop, {shutdown, for_testing}, _NStats},
|
||||
emqx_connection:handle_info({sock_error, for_testing}, st())),
|
||||
?assertMatch({ok, _NState}, emqx_connection:handle_info(for_testing, st())).
|
||||
|
||||
t_ensure_rate_limit(_) ->
|
||||
State = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => undefined})),
|
||||
?assertEqual(undefined, emqx_connection:info(limiter, State)),
|
||||
|
||||
ok = meck:expect(emqx_limiter, check, fun(_, _) -> {ok, emqx_limiter:init(external, [])} end),
|
||||
ok = meck:expect(emqx_limiter, check,
|
||||
fun(_, _) -> {ok, emqx_limiter:init(external, [])} end),
|
||||
State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
|
||||
?assertEqual(undefined, emqx_connection:info(limiter, State1)),
|
||||
|
||||
ok = meck:expect(emqx_limiter, check, fun(_, _) -> {pause, 3000, emqx_limiter:init(external, [])} end),
|
||||
ok = meck:expect(emqx_limiter, check,
|
||||
fun(_, _) -> {pause, 3000, emqx_limiter:init(external, [])} end),
|
||||
State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
|
||||
?assertEqual(undefined, emqx_connection:info(limiter, State2)),
|
||||
?assertEqual(blocked, emqx_connection:info(sockstate, State2)).
|
||||
|
@ -442,3 +470,7 @@ channel(InitFields) ->
|
|||
session => Session,
|
||||
conn_state => connected
|
||||
}, InitFields)).
|
||||
|
||||
handle_msg(Msg, St) -> emqx_connection:handle_msg(Msg, St).
|
||||
|
||||
handle_call(Pid, Call, St) -> emqx_connection:handle_call(Pid, Call, St).
|
||||
|
|
Loading…
Reference in New Issue