fix(gw): close the stomp connections once an error frame occured
According to the Stomp v1.2 specification: > The server MAY send ERROR frames if something goes wrong. In this case, > it MUST then close the connection just after sending the ERROR frame Additional, fixes the `is_superuser` is not working for all gateways
This commit is contained in:
parent
d41a23b7d0
commit
e3d208f678
|
@ -69,8 +69,9 @@
|
||||||
authenticate(_Ctx, ClientInfo0) ->
|
authenticate(_Ctx, ClientInfo0) ->
|
||||||
ClientInfo = ClientInfo0#{zone => default},
|
ClientInfo = ClientInfo0#{zone => default},
|
||||||
case emqx_access_control:authenticate(ClientInfo) of
|
case emqx_access_control:authenticate(ClientInfo) of
|
||||||
{ok, _} ->
|
{ok, AuthResult} ->
|
||||||
{ok, mountpoint(ClientInfo)};
|
ClientInfo1 = merge_auth_result(ClientInfo, AuthResult),
|
||||||
|
{ok, eval_mountpoint(ClientInfo1)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
@ -174,8 +175,12 @@ metrics_inc(_Ctx = #{gwname := GwName}, Name, Oct) ->
|
||||||
%% Internal funcs
|
%% Internal funcs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
mountpoint(ClientInfo = #{mountpoint := undefined}) ->
|
eval_mountpoint(ClientInfo = #{mountpoint := undefined}) ->
|
||||||
ClientInfo;
|
ClientInfo;
|
||||||
mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
|
eval_mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
|
||||||
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
|
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
|
||||||
ClientInfo#{mountpoint := MountPoint1}.
|
ClientInfo#{mountpoint := MountPoint1}.
|
||||||
|
|
||||||
|
merge_auth_result(ClientInfo, AuthResult) when is_map(ClientInfo) andalso is_map(AuthResult) ->
|
||||||
|
IsSuperuser = maps:get(is_superuser, AuthResult, false),
|
||||||
|
maps:merge(ClientInfo, AuthResult#{is_superuser => IsSuperuser}).
|
||||||
|
|
|
@ -36,8 +36,10 @@ init_per_suite(Conf) ->
|
||||||
fun
|
fun
|
||||||
(#{clientid := bad_client}) ->
|
(#{clientid := bad_client}) ->
|
||||||
{error, bad_username_or_password};
|
{error, bad_username_or_password};
|
||||||
(ClientInfo) ->
|
(#{clientid := admin}) ->
|
||||||
{ok, ClientInfo}
|
{ok, #{is_superuser => true}};
|
||||||
|
(_) ->
|
||||||
|
{ok, #{}}
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
Conf.
|
Conf.
|
||||||
|
@ -56,15 +58,15 @@ t_authenticate(_) ->
|
||||||
mountpoint => undefined,
|
mountpoint => undefined,
|
||||||
clientid => <<"user1">>
|
clientid => <<"user1">>
|
||||||
},
|
},
|
||||||
NInfo1 = zone(Info1),
|
NInfo1 = default_result(Info1),
|
||||||
?assertEqual({ok, NInfo1}, emqx_gateway_ctx:authenticate(Ctx, Info1)),
|
?assertMatch({ok, NInfo1}, emqx_gateway_ctx:authenticate(Ctx, Info1)),
|
||||||
|
|
||||||
Info2 = #{
|
Info2 = #{
|
||||||
mountpoint => <<"mqttsn/${clientid}/">>,
|
mountpoint => <<"mqttsn/${clientid}/">>,
|
||||||
clientid => <<"user1">>
|
clientid => <<"user1">>
|
||||||
},
|
},
|
||||||
NInfo2 = zone(Info2#{mountpoint => <<"mqttsn/user1/">>}),
|
NInfo2 = default_result(Info2#{mountpoint => <<"mqttsn/user1/">>}),
|
||||||
?assertEqual({ok, NInfo2}, emqx_gateway_ctx:authenticate(Ctx, Info2)),
|
?assertMatch({ok, NInfo2}, emqx_gateway_ctx:authenticate(Ctx, Info2)),
|
||||||
|
|
||||||
Info3 = #{
|
Info3 = #{
|
||||||
mountpoint => <<"mqttsn/${clientid}/">>,
|
mountpoint => <<"mqttsn/${clientid}/">>,
|
||||||
|
@ -72,6 +74,12 @@ t_authenticate(_) ->
|
||||||
},
|
},
|
||||||
{error, bad_username_or_password} =
|
{error, bad_username_or_password} =
|
||||||
emqx_gateway_ctx:authenticate(Ctx, Info3),
|
emqx_gateway_ctx:authenticate(Ctx, Info3),
|
||||||
|
|
||||||
|
Info4 = #{
|
||||||
|
mountpoint => undefined,
|
||||||
|
clientid => admin
|
||||||
|
},
|
||||||
|
?assertMatch({ok, #{is_superuser := true}}, emqx_gateway_ctx:authenticate(Ctx, Info4)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
zone(Info) -> Info#{zone => default}.
|
default_result(Info) -> Info#{zone => default, is_superuser => false}.
|
||||||
|
|
|
@ -448,7 +448,9 @@ handle_in(
|
||||||
Topic = header(<<"destination">>, Headers),
|
Topic = header(<<"destination">>, Headers),
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
|
||||||
deny ->
|
deny ->
|
||||||
handle_out(error, {receipt_id(Headers), "Authorization Deny"}, Channel);
|
ErrMsg = io_lib:format("Insufficient permissions for ~s", [Topic]),
|
||||||
|
ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
|
||||||
|
shutdown(acl_denied, ErrorFrame, Channel);
|
||||||
allow ->
|
allow ->
|
||||||
case header(<<"transaction">>, Headers) of
|
case header(<<"transaction">>, Headers) of
|
||||||
undefined ->
|
undefined ->
|
||||||
|
@ -494,20 +496,25 @@ handle_in(
|
||||||
),
|
),
|
||||||
case do_subscribe(NTopicFilters, NChannel) of
|
case do_subscribe(NTopicFilters, NChannel) of
|
||||||
[] ->
|
[] ->
|
||||||
ErrMsg = "Permission denied",
|
ErrMsg = io_lib:format(
|
||||||
handle_out(error, {receipt_id(Headers), ErrMsg}, Channel);
|
"The client.subscribe hook blocked the ~s subscription request",
|
||||||
|
[TopicFilter]
|
||||||
|
),
|
||||||
|
ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
|
||||||
|
shutdown(normal, ErrorFrame, Channel);
|
||||||
[{MountedTopic, SubOpts} | _] ->
|
[{MountedTopic, SubOpts} | _] ->
|
||||||
NSubs = [{SubId, MountedTopic, Ack, SubOpts} | Subs],
|
NSubs = [{SubId, MountedTopic, Ack, SubOpts} | Subs],
|
||||||
NChannel1 = NChannel#channel{subscriptions = NSubs},
|
NChannel1 = NChannel#channel{subscriptions = NSubs},
|
||||||
handle_out_and_update(receipt, receipt_id(Headers), NChannel1)
|
handle_out_and_update(receipt, receipt_id(Headers), NChannel1)
|
||||||
end;
|
end;
|
||||||
{error, ErrMsg, NChannel} ->
|
{error, subscription_id_inused, NChannel} ->
|
||||||
?SLOG(error, #{
|
ErrMsg = io_lib:format("Subscription id ~w is in used", [SubId]),
|
||||||
msg => "failed_top_subscribe_topic",
|
ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
|
||||||
topic => Topic,
|
shutdown(subscription_id_inused, ErrorFrame, NChannel);
|
||||||
reason => ErrMsg
|
{error, acl_denied, NChannel} ->
|
||||||
}),
|
ErrMsg = io_lib:format("Insufficient permissions for ~s", [Topic]),
|
||||||
handle_out(error, {receipt_id(Headers), ErrMsg}, NChannel)
|
ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
|
||||||
|
shutdown(acl_denied, ErrorFrame, NChannel)
|
||||||
end;
|
end;
|
||||||
handle_in(
|
handle_in(
|
||||||
?PACKET(?CMD_UNSUBSCRIBE, Headers),
|
?PACKET(?CMD_UNSUBSCRIBE, Headers),
|
||||||
|
@ -691,7 +698,7 @@ check_subscribed_status(
|
||||||
{SubId, MountedTopic, _Ack, _} ->
|
{SubId, MountedTopic, _Ack, _} ->
|
||||||
ok;
|
ok;
|
||||||
{SubId, _OtherTopic, _Ack, _} ->
|
{SubId, _OtherTopic, _Ack, _} ->
|
||||||
{error, "Conflict subscribe id"};
|
{error, subscription_id_inused};
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
@ -704,7 +711,7 @@ check_sub_acl(
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, ParsedTopic) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, ParsedTopic) of
|
||||||
deny -> {error, "ACL Deny"};
|
deny -> {error, acl_denied};
|
||||||
allow -> ok
|
allow -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -185,6 +185,8 @@ parse(headers, Bin, State) ->
|
||||||
parse(hdname, Bin, State);
|
parse(hdname, Bin, State);
|
||||||
parse(hdname, <<?LF, _Rest/binary>>, _State) ->
|
parse(hdname, <<?LF, _Rest/binary>>, _State) ->
|
||||||
error(unexpected_linefeed);
|
error(unexpected_linefeed);
|
||||||
|
parse(hdname, <<?COLON, $\s, Rest/binary>>, State = #parser_state{acc = Acc}) ->
|
||||||
|
parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
|
||||||
parse(hdname, <<?COLON, Rest/binary>>, State = #parser_state{acc = Acc}) ->
|
parse(hdname, <<?COLON, Rest/binary>>, State = #parser_state{acc = Acc}) ->
|
||||||
parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
|
parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
|
||||||
parse(hdname, <<Ch:8, Rest/binary>>, State) ->
|
parse(hdname, <<Ch:8, Rest/binary>>, State) ->
|
||||||
|
|
|
@ -78,68 +78,28 @@ stomp_ver() ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_connect(_) ->
|
t_connect(_) ->
|
||||||
%% Connect should be succeed
|
%% Successful connect
|
||||||
with_connection(fun(Sock) ->
|
ConnectSucced = fun(Sock) ->
|
||||||
gen_tcp:send(
|
ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>, <<"1000,2000">>),
|
||||||
Sock,
|
{ok, Frame} = recv_a_frame(Sock),
|
||||||
serialize(
|
?assertMatch(<<"CONNECTED">>, Frame#stomp_frame.command),
|
||||||
<<"CONNECT">>,
|
?assertEqual(
|
||||||
[
|
<<"2000,1000">>, proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers)
|
||||||
{<<"accept-version">>, ?STOMP_VER},
|
|
||||||
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
||||||
{<<"login">>, <<"guest">>},
|
|
||||||
{<<"passcode">>, <<"guest">>},
|
|
||||||
{<<"heart-beat">>, <<"1000,2000">>}
|
|
||||||
]
|
|
||||||
)
|
|
||||||
),
|
|
||||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
||||||
{ok,
|
|
||||||
Frame = #stomp_frame{
|
|
||||||
command = <<"CONNECTED">>,
|
|
||||||
headers = _,
|
|
||||||
body = _
|
|
||||||
},
|
|
||||||
_, _} = parse(Data),
|
|
||||||
<<"2000,1000">> = proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers),
|
|
||||||
|
|
||||||
gen_tcp:send(
|
|
||||||
Sock,
|
|
||||||
serialize(
|
|
||||||
<<"DISCONNECT">>,
|
|
||||||
[{<<"receipt">>, <<"12345">>}]
|
|
||||||
)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, Data1} = gen_tcp:recv(Sock, 0),
|
ok = send_disconnect_frame(Sock, <<"12345">>),
|
||||||
{ok,
|
?assertMatch(
|
||||||
#stomp_frame{
|
{ok, #stomp_frame{
|
||||||
command = <<"RECEIPT">>,
|
command = <<"RECEIPT">>,
|
||||||
headers = [{<<"receipt-id">>, <<"12345">>}],
|
headers = [{<<"receipt-id">>, <<"12345">>}]
|
||||||
body = _
|
}},
|
||||||
},
|
recv_a_frame(Sock)
|
||||||
_, _} = parse(Data1)
|
)
|
||||||
end),
|
end,
|
||||||
|
with_connection(ConnectSucced),
|
||||||
%% Connect will be failed, because of bad login or passcode
|
|
||||||
%% FIXME: Waiting for authentication works
|
|
||||||
%with_connection(
|
|
||||||
% fun(Sock) ->
|
|
||||||
% gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
|
||||||
% [{<<"accept-version">>, ?STOMP_VER},
|
|
||||||
% {<<"host">>, <<"127.0.0.1:61613">>},
|
|
||||||
% {<<"login">>, <<"admin">>},
|
|
||||||
% {<<"passcode">>, <<"admin">>},
|
|
||||||
% {<<"heart-beat">>, <<"1000,2000">>}])),
|
|
||||||
% {ok, Data} = gen_tcp:recv(Sock, 0),
|
|
||||||
% {ok, Frame, _, _} = parse(Data),
|
|
||||||
% #stomp_frame{command = <<"ERROR">>,
|
|
||||||
% headers = _,
|
|
||||||
% body = <<"Login or passcode error!">>} = Frame
|
|
||||||
% end),
|
|
||||||
|
|
||||||
%% Connect will be failed, because of bad version
|
%% Connect will be failed, because of bad version
|
||||||
with_connection(fun(Sock) ->
|
ProtocolError = fun(Sock) ->
|
||||||
gen_tcp:send(
|
gen_tcp:send(
|
||||||
Sock,
|
Sock,
|
||||||
serialize(
|
serialize(
|
||||||
|
@ -160,7 +120,8 @@ t_connect(_) ->
|
||||||
headers = _,
|
headers = _,
|
||||||
body = <<"Login Failed: Supported protocol versions < 1.2">>
|
body = <<"Login Failed: Supported protocol versions < 1.2">>
|
||||||
} = Frame
|
} = Frame
|
||||||
end).
|
end,
|
||||||
|
with_connection(ProtocolError).
|
||||||
|
|
||||||
t_heartbeat(_) ->
|
t_heartbeat(_) ->
|
||||||
%% Test heart beat
|
%% Test heart beat
|
||||||
|
@ -755,8 +716,7 @@ t_frame_error_too_many_headers(_) ->
|
||||||
),
|
),
|
||||||
Assert =
|
Assert =
|
||||||
fun(Sock) ->
|
fun(Sock) ->
|
||||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
{ok, ErrorFrame} = recv_a_frame(Sock),
|
||||||
{ok, ErrorFrame, _, _} = parse(Data),
|
|
||||||
?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
|
?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
match, re:run(ErrorFrame#stomp_frame.body, "too_many_headers", [{capture, none}])
|
match, re:run(ErrorFrame#stomp_frame.body, "too_many_headers", [{capture, none}])
|
||||||
|
@ -777,8 +737,7 @@ t_frame_error_too_long_header(_) ->
|
||||||
),
|
),
|
||||||
Assert =
|
Assert =
|
||||||
fun(Sock) ->
|
fun(Sock) ->
|
||||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
{ok, ErrorFrame} = recv_a_frame(Sock),
|
||||||
{ok, ErrorFrame, _, _} = parse(Data),
|
|
||||||
?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
|
?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
match, re:run(ErrorFrame#stomp_frame.body, "too_long_header", [{capture, none}])
|
match, re:run(ErrorFrame#stomp_frame.body, "too_long_header", [{capture, none}])
|
||||||
|
@ -796,8 +755,7 @@ t_frame_error_too_long_body(_) ->
|
||||||
),
|
),
|
||||||
Assert =
|
Assert =
|
||||||
fun(Sock) ->
|
fun(Sock) ->
|
||||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
{ok, ErrorFrame} = recv_a_frame(Sock),
|
||||||
{ok, ErrorFrame, _, _} = parse(Data),
|
|
||||||
?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
|
?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
match, re:run(ErrorFrame#stomp_frame.body, "too_long_body", [{capture, none}])
|
match, re:run(ErrorFrame#stomp_frame.body, "too_long_body", [{capture, none}])
|
||||||
|
@ -808,54 +766,16 @@ t_frame_error_too_long_body(_) ->
|
||||||
|
|
||||||
test_frame_error(Frame, AssertFun) ->
|
test_frame_error(Frame, AssertFun) ->
|
||||||
with_connection(fun(Sock) ->
|
with_connection(fun(Sock) ->
|
||||||
gen_tcp:send(
|
send_connection_frame(Sock, <<"guest">>, <<"guest">>),
|
||||||
Sock,
|
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
|
||||||
serialize(
|
|
||||||
<<"CONNECT">>,
|
|
||||||
[
|
|
||||||
{<<"accept-version">>, ?STOMP_VER},
|
|
||||||
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
||||||
{<<"login">>, <<"guest">>},
|
|
||||||
{<<"passcode">>, <<"guest">>},
|
|
||||||
{<<"heart-beat">>, <<"0,0">>}
|
|
||||||
]
|
|
||||||
)
|
|
||||||
),
|
|
||||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
||||||
{ok,
|
|
||||||
#stomp_frame{
|
|
||||||
command = <<"CONNECTED">>,
|
|
||||||
headers = _,
|
|
||||||
body = _
|
|
||||||
},
|
|
||||||
_, _} = parse(Data),
|
|
||||||
gen_tcp:send(Sock, Frame),
|
gen_tcp:send(Sock, Frame),
|
||||||
AssertFun(Sock)
|
AssertFun(Sock)
|
||||||
end).
|
end).
|
||||||
|
|
||||||
t_rest_clienit_info(_) ->
|
t_rest_clienit_info(_) ->
|
||||||
with_connection(fun(Sock) ->
|
with_connection(fun(Sock) ->
|
||||||
gen_tcp:send(
|
send_connection_frame(Sock, <<"guest">>, <<"guest">>),
|
||||||
Sock,
|
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
|
||||||
serialize(
|
|
||||||
<<"CONNECT">>,
|
|
||||||
[
|
|
||||||
{<<"accept-version">>, ?STOMP_VER},
|
|
||||||
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
||||||
{<<"login">>, <<"guest">>},
|
|
||||||
{<<"passcode">>, <<"guest">>},
|
|
||||||
{<<"heart-beat">>, <<"0,0">>}
|
|
||||||
]
|
|
||||||
)
|
|
||||||
),
|
|
||||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
||||||
{ok,
|
|
||||||
#stomp_frame{
|
|
||||||
command = <<"CONNECTED">>,
|
|
||||||
headers = _,
|
|
||||||
body = _
|
|
||||||
},
|
|
||||||
_, _} = parse(Data),
|
|
||||||
|
|
||||||
%% client lists
|
%% client lists
|
||||||
{200, Clients} = request(get, "/gateways/stomp/clients"),
|
{200, Clients} = request(get, "/gateways/stomp/clients"),
|
||||||
|
@ -909,18 +829,8 @@ t_rest_clienit_info(_) ->
|
||||||
|
|
||||||
%% sub & unsub
|
%% sub & unsub
|
||||||
{200, []} = request(get, ClientPath ++ "/subscriptions"),
|
{200, []} = request(get, ClientPath ++ "/subscriptions"),
|
||||||
gen_tcp:send(
|
ok = send_subscribe_frame(Sock, 0, <<"/queue/foo">>),
|
||||||
Sock,
|
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
|
||||||
serialize(
|
|
||||||
<<"SUBSCRIBE">>,
|
|
||||||
[
|
|
||||||
{<<"id">>, 0},
|
|
||||||
{<<"destination">>, <<"/queue/foo">>},
|
|
||||||
{<<"ack">>, <<"client">>}
|
|
||||||
]
|
|
||||||
)
|
|
||||||
),
|
|
||||||
timer:sleep(100),
|
|
||||||
|
|
||||||
{200, Subs} = request(get, ClientPath ++ "/subscriptions"),
|
{200, Subs} = request(get, ClientPath ++ "/subscriptions"),
|
||||||
?assertEqual(1, length(Subs)),
|
?assertEqual(1, length(Subs)),
|
||||||
|
@ -956,6 +866,78 @@ t_rest_clienit_info(_) ->
|
||||||
?assertEqual(0, length(maps:get(data, Clients2)))
|
?assertEqual(0, length(maps:get(data, Clients2)))
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
t_authn_superuser(_) ->
|
||||||
|
%% mock authn
|
||||||
|
meck:new(emqx_access_control, [passthrough]),
|
||||||
|
meck:expect(
|
||||||
|
emqx_access_control,
|
||||||
|
authenticate,
|
||||||
|
fun
|
||||||
|
(#{username := <<"admin">>}) ->
|
||||||
|
{ok, #{is_superuser => true}};
|
||||||
|
(#{username := <<"bad_user">>}) ->
|
||||||
|
{error, not_authorized};
|
||||||
|
(_) ->
|
||||||
|
{ok, #{is_superuser => false}}
|
||||||
|
end
|
||||||
|
),
|
||||||
|
%% mock authz
|
||||||
|
meck:expect(
|
||||||
|
emqx_access_control,
|
||||||
|
authorize,
|
||||||
|
fun
|
||||||
|
(_ClientInfo = #{is_superuser := true}, _PubSub, _Topic) ->
|
||||||
|
allow;
|
||||||
|
(_ClientInfo, _PubSub, _Topic) ->
|
||||||
|
deny
|
||||||
|
end
|
||||||
|
),
|
||||||
|
|
||||||
|
LoginFailure = fun(Sock) ->
|
||||||
|
ok = send_connection_frame(Sock, <<"bad_user">>, <<"public">>),
|
||||||
|
?assertMatch({ok, #stomp_frame{command = <<"ERROR">>}}, recv_a_frame(Sock)),
|
||||||
|
?assertMatch({error, closed}, recv_a_frame(Sock))
|
||||||
|
end,
|
||||||
|
|
||||||
|
PublishFailure = fun(Sock) ->
|
||||||
|
ok = send_connection_frame(Sock, <<"user1">>, <<"public">>),
|
||||||
|
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
|
||||||
|
ok = send_message_frame(Sock, <<"t/a">>, <<"hello">>),
|
||||||
|
?assertMatch({ok, #stomp_frame{command = <<"ERROR">>}}, recv_a_frame(Sock)),
|
||||||
|
?assertMatch({error, closed}, recv_a_frame(Sock))
|
||||||
|
end,
|
||||||
|
|
||||||
|
SubscribeFailed = fun(Sock) ->
|
||||||
|
ok = send_connection_frame(Sock, <<"user1">>, <<"public">>),
|
||||||
|
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
|
||||||
|
ok = send_subscribe_frame(Sock, 0, <<"t/a">>),
|
||||||
|
?assertMatch({ok, #stomp_frame{command = <<"ERROR">>}}, recv_a_frame(Sock)),
|
||||||
|
?assertMatch({error, closed}, recv_a_frame(Sock))
|
||||||
|
end,
|
||||||
|
|
||||||
|
LoginAsSuperUser = fun(Sock) ->
|
||||||
|
ok = send_connection_frame(Sock, <<"admin">>, <<"public">>),
|
||||||
|
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
|
||||||
|
ok = send_subscribe_frame(Sock, 0, <<"t/a">>),
|
||||||
|
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
|
||||||
|
ok = send_message_frame(Sock, <<"t/a">>, <<"hello">>),
|
||||||
|
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, #stomp_frame{
|
||||||
|
command = <<"MESSAGE">>,
|
||||||
|
body = <<"hello">>
|
||||||
|
}},
|
||||||
|
recv_a_frame(Sock)
|
||||||
|
),
|
||||||
|
ok = send_disconnect_frame(Sock)
|
||||||
|
end,
|
||||||
|
|
||||||
|
with_connection(LoginFailure),
|
||||||
|
with_connection(PublishFailure),
|
||||||
|
with_connection(SubscribeFailed),
|
||||||
|
with_connection(LoginAsSuperUser),
|
||||||
|
meck:unload(emqx_access_control).
|
||||||
|
|
||||||
%% TODO: Mountpoint, AuthChain, Authorization + Mountpoint, ClientInfoOverride,
|
%% TODO: Mountpoint, AuthChain, Authorization + Mountpoint, ClientInfoOverride,
|
||||||
%% Listeners, Metrics, Stats, ClientInfo
|
%% Listeners, Metrics, Stats, ClientInfo
|
||||||
%%
|
%%
|
||||||
|
@ -963,6 +945,9 @@ t_rest_clienit_info(_) ->
|
||||||
%%
|
%%
|
||||||
%% TODO: RateLimit, OOM,
|
%% TODO: RateLimit, OOM,
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% helpers
|
||||||
|
|
||||||
with_connection(DoFun) ->
|
with_connection(DoFun) ->
|
||||||
{ok, Sock} = gen_tcp:connect(
|
{ok, Sock} = gen_tcp:connect(
|
||||||
{127, 0, 0, 1},
|
{127, 0, 0, 1},
|
||||||
|
@ -973,6 +958,8 @@ with_connection(DoFun) ->
|
||||||
try
|
try
|
||||||
DoFun(Sock)
|
DoFun(Sock)
|
||||||
after
|
after
|
||||||
|
erase(parser),
|
||||||
|
erase(rest),
|
||||||
gen_tcp:close(Sock)
|
gen_tcp:close(Sock)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -982,6 +969,46 @@ serialize(Command, Headers) ->
|
||||||
serialize(Command, Headers, Body) ->
|
serialize(Command, Headers, Body) ->
|
||||||
emqx_stomp_frame:serialize_pkt(emqx_stomp_frame:make(Command, Headers, Body), #{}).
|
emqx_stomp_frame:serialize_pkt(emqx_stomp_frame:make(Command, Headers, Body), #{}).
|
||||||
|
|
||||||
|
recv_a_frame(Sock) ->
|
||||||
|
Parser =
|
||||||
|
case get(parser) of
|
||||||
|
undefined ->
|
||||||
|
ProtoEnv = #{
|
||||||
|
max_headers => 1024,
|
||||||
|
max_header_length => 10240,
|
||||||
|
max_body_length => 81920
|
||||||
|
},
|
||||||
|
emqx_stomp_frame:initial_parse_state(ProtoEnv);
|
||||||
|
P ->
|
||||||
|
P
|
||||||
|
end,
|
||||||
|
LastRest =
|
||||||
|
case get(rest) of
|
||||||
|
undefined -> <<>>;
|
||||||
|
R -> R
|
||||||
|
end,
|
||||||
|
case emqx_stomp_frame:parse(LastRest, Parser) of
|
||||||
|
{more, NParser} ->
|
||||||
|
case gen_tcp:recv(Sock, 0, 5000) of
|
||||||
|
{ok, Data} ->
|
||||||
|
put(parser, NParser),
|
||||||
|
put(rest, <<LastRest/binary, Data/binary>>),
|
||||||
|
recv_a_frame(Sock);
|
||||||
|
{error, _} = Err1 ->
|
||||||
|
erase(parser),
|
||||||
|
erase(rest),
|
||||||
|
Err1
|
||||||
|
end;
|
||||||
|
{ok, Frame, Rest, NParser} ->
|
||||||
|
put(parser, NParser),
|
||||||
|
put(rest, Rest),
|
||||||
|
{ok, Frame};
|
||||||
|
{error, _} = Err ->
|
||||||
|
erase(parser),
|
||||||
|
erase(rest),
|
||||||
|
Err
|
||||||
|
end.
|
||||||
|
|
||||||
parse(Data) ->
|
parse(Data) ->
|
||||||
ProtoEnv = #{
|
ProtoEnv = #{
|
||||||
max_headers => 1024,
|
max_headers => 1024,
|
||||||
|
@ -991,10 +1018,51 @@ parse(Data) ->
|
||||||
Parser = emqx_stomp_frame:initial_parse_state(ProtoEnv),
|
Parser = emqx_stomp_frame:initial_parse_state(ProtoEnv),
|
||||||
emqx_stomp_frame:parse(Data, Parser).
|
emqx_stomp_frame:parse(Data, Parser).
|
||||||
|
|
||||||
get_field(command, #stomp_frame{command = Command}) ->
|
send_connection_frame(Sock, Username, Password) ->
|
||||||
Command;
|
send_connection_frame(Sock, Username, Password, <<"0,0">>).
|
||||||
get_field(body, #stomp_frame{body = Body}) ->
|
|
||||||
Body.
|
send_connection_frame(Sock, Username, Password, Heartbeat) ->
|
||||||
|
Headers =
|
||||||
|
case Username == undefined of
|
||||||
|
true -> [];
|
||||||
|
false -> [{<<"login">>, Username}]
|
||||||
|
end ++
|
||||||
|
case Password == undefined of
|
||||||
|
true -> [];
|
||||||
|
false -> [{<<"passcode">>, Password}]
|
||||||
|
end,
|
||||||
|
Headers1 = [
|
||||||
|
{<<"accept-version">>, ?STOMP_VER},
|
||||||
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
||||||
|
{<<"heart-beat">>, Heartbeat}
|
||||||
|
| Headers
|
||||||
|
],
|
||||||
|
ok = gen_tcp:send(Sock, serialize(<<"CONNECT">>, Headers1)).
|
||||||
|
|
||||||
|
send_subscribe_frame(Sock, Id, Topic) ->
|
||||||
|
Headers =
|
||||||
|
[
|
||||||
|
{<<"id">>, Id},
|
||||||
|
{<<"receipt">>, Id},
|
||||||
|
{<<"destination">>, Topic},
|
||||||
|
{<<"ack">>, <<"auto">>}
|
||||||
|
],
|
||||||
|
ok = gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, Headers)).
|
||||||
|
|
||||||
|
send_message_frame(Sock, Topic, Payload) ->
|
||||||
|
Headers =
|
||||||
|
[
|
||||||
|
{<<"destination">>, Topic},
|
||||||
|
{<<"receipt">>, <<"rp-", Topic/binary>>}
|
||||||
|
],
|
||||||
|
ok = gen_tcp:send(Sock, serialize(<<"SEND">>, Headers, Payload)).
|
||||||
|
|
||||||
|
send_disconnect_frame(Sock) ->
|
||||||
|
ok = gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, [])).
|
||||||
|
|
||||||
|
send_disconnect_frame(Sock, ReceiptId) ->
|
||||||
|
Headers = [{<<"receipt">>, ReceiptId}],
|
||||||
|
ok = gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, Headers)).
|
||||||
|
|
||||||
clients() ->
|
clients() ->
|
||||||
{200, Clients} = request(get, "/gateways/stomp/clients"),
|
{200, Clients} = request(get, "/gateways/stomp/clients"),
|
||||||
|
|
Loading…
Reference in New Issue