diff --git a/apps/emqx_gateway/test/emqx_coap_SUITE.erl b/apps/emqx_gateway/test/emqx_coap_SUITE.erl index 07a1ac025..0c0c71fb9 100644 --- a/apps/emqx_gateway/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_coap_SUITE.erl @@ -88,6 +88,9 @@ default_config() -> mqtt_prefix() -> ?MQTT_PREFIX. +ps_prefix() -> + ?PS_PREFIX. + %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/test/emqx_gateway_auth_ct.erl b/apps/emqx_gateway/test/emqx_gateway_auth_ct.erl index 92c403b11..68499595f 100644 --- a/apps/emqx_gateway/test/emqx_gateway_auth_ct.erl +++ b/apps/emqx_gateway/test/emqx_gateway_auth_ct.erl @@ -47,8 +47,12 @@ -define(CALL(Msg), gen_server:call(?MODULE, {?FUNCTION_NAME, Msg})). --define(HTTP_PORT, 37333). --define(HTTP_PATH, "/auth"). +-define(AUTHN_HTTP_PORT, 37333). +-define(AUTHN_HTTP_PATH, "/auth"). + +-define(AUTHZ_HTTP_PORT, 38333). +-define(AUTHZ_HTTP_PATH, "/authz/[...]"). + -define(GATEWAYS, [coap, lwm2m, mqttsn, stomp, exproto]). -define(CONFS, [ @@ -123,14 +127,14 @@ format_status(_Opt, Status) -> on_start_auth(authn_http) -> %% start test server - {ok, _} = emqx_authn_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH), + {ok, _} = emqx_authn_http_test_server:start_link(?AUTHN_HTTP_PORT, ?AUTHN_HTTP_PATH), timer:sleep(1000), %% set authn for gateway Setup = fun(Gateway) -> Path = io_lib:format("/gateway/~ts/authentication", [Gateway]), {204, _} = request(delete, Path), - {201, _} = request(post, Path, http_auth_config()) + {201, _} = request(post, Path, http_authn_config()) end, lists:foreach(Setup, ?GATEWAYS), @@ -150,6 +154,33 @@ on_start_auth(authn_http) -> end, emqx_authn_http_test_server:set_handler(Handler), + timer:sleep(500); +on_start_auth(authz_http) -> + ok = emqx_authz_test_lib:reset_authorizers(), + {ok, _} = emqx_authz_http_test_server:start_link(?AUTHZ_HTTP_PORT, ?AUTHZ_HTTP_PATH), + + %% TODO set authz for gateway + ok = emqx_authz_test_lib:setup_config( + http_authz_config(), + #{} + ), + + %% set handler for test server + Handler = fun(Req0, State) -> + case cowboy_req:match_qs([topic, action, username], Req0) of + #{topic := <<"/publish">>, action := <<"publish">>} -> + Req = cowboy_req:reply(200, Req0); + #{topic := <<"/subscribe">>, action := <<"subscribe">>} -> + Req = cowboy_req:reply(200, Req0); + %% for lwm2m + #{username := <<"lwm2m">>} -> + Req = cowboy_req:reply(200, Req0); + _ -> + Req = cowboy_req:reply(400, Req0) + end, + {ok, Req, State} + end, + ok = emqx_authz_http_test_server:set_handler(Handler), timer:sleep(500). on_stop_auth(authn_http) -> @@ -158,13 +189,15 @@ on_stop_auth(authn_http) -> {204, _} = request(delete, Path) end, lists:foreach(Delete, ?GATEWAYS), - ok = emqx_authn_http_test_server:stop(). + ok = emqx_authn_http_test_server:stop(); +on_stop_auth(authz_http) -> + ok = emqx_authz_http_test_server:stop(). %%------------------------------------------------------------------------------ %% Configs %%------------------------------------------------------------------------------ -http_auth_config() -> +http_authn_config() -> #{ <<"mechanism">> => <<"password_based">>, <<"enable">> => <<"true">>, @@ -175,6 +208,16 @@ http_auth_config() -> <<"headers">> => #{<<"X-Test-Header">> => <<"Test Value">>} }. +http_authz_config() -> + #{ + <<"enable">> => <<"true">>, + <<"type">> => <<"http">>, + <<"method">> => <<"get">>, + <<"url">> => + <<"http://127.0.0.1:38333/authz/users/?topic=${topic}&action=${action}&username=${username}">>, + <<"headers">> => #{<<"X-Test-Header">> => <<"Test Value">>} + }. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl new file mode 100644 index 000000000..2e04680a7 --- /dev/null +++ b/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl @@ -0,0 +1,446 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_authz_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-import(emqx_gateway_auth_ct, [init_gateway_conf/0, with_resource/3]). + +-define(checkMatch(Guard), + (fun(Expr) -> + case (Expr) of + Guard -> + ok; + X__V -> + erlang:error( + {assertMatch, [ + {module, ?MODULE}, + {line, ?LINE}, + {expression, (??Expr)}, + {pattern, (??Guard)}, + {value, X__V} + ]} + ) + end + end) +). +-define(FUNCTOR(Expr), fun() -> Expr end). +-define(FUNCTOR(Arg, Expr), fun(Arg) -> Expr end). + +-define(AUTHNS, [authz_http]). + +all() -> + emqx_gateway_auth_ct:group_names(?AUTHNS). + +groups() -> + emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS). + +init_per_group(AuthName, Conf) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + ok = emqx_authz_test_lib:reset_authorizers(), + emqx_gateway_auth_ct:start_auth(AuthName), + timer:sleep(500), + Conf. + +end_per_group(AuthName, Conf) -> + emqx_gateway_auth_ct:stop_auth(AuthName), + Conf. + +init_per_suite(Config) -> + emqx_config:erase(gateway), + init_gateway_conf(), + meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_authz_file, init, fun(S) -> S end), + emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authz, emqx_gateway]), + application:ensure_all_started(cowboy), + emqx_gateway_auth_ct:start(), + Config. + +end_per_suite(Config) -> + meck:unload(emqx_authz_file), + emqx_gateway_auth_ct:stop(), + ok = emqx_authz_test_lib:restore_authorizers(), + emqx_config:erase(gateway), + emqx_mgmt_api_test_util:end_suite([cowboy, emqx_authz, emqx_gateway]), + Config. + +init_per_testcase(_Case, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + Config. + +end_per_testcase(_Case, Config) -> + Config. + +%%------------------------------------------------------------------------------ +%% Tests +%%------------------------------------------------------------------------------ + +t_case_coap_publish(_) -> + Mod = emqx_coap_SUITE, + Prefix = Mod:ps_prefix(), + Fun = fun(Channel, Token, Topic, Checker) -> + TopicStr = binary_to_list(Topic), + URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + + Req = Mod:make_req(post, <<>>), + Checker(Mod:do_request(Channel, URI, Req)) + end, + Case = fun(Channel, Token) -> + Fun(Channel, Token, <<"/publish">>, ?checkMatch({ok, changed, _})), + Fun(Channel, Token, <<"/badpublish">>, ?checkMatch({error, uauthorized})) + end, + Mod:with_connection(Case). + +t_case_coap_subscribe(_) -> + Mod = emqx_coap_SUITE, + Prefix = Mod:ps_prefix(), + Fun = fun(Channel, Token, Topic, Checker) -> + TopicStr = binary_to_list(Topic), + URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + + Req = Mod:make_req(get, <<>>, [{observe, 0}]), + Checker(Mod:do_request(Channel, URI, Req)) + end, + Case = fun(Channel, Token) -> + Fun(Channel, Token, <<"/subscribe">>, ?checkMatch({ok, content, _})), + Fun(Channel, Token, <<"/badsubscribe">>, ?checkMatch({error, uauthorized})) + end, + Mod:with_connection(Case). + +-record(coap_content, {content_format, payload = <<>>}). + +t_case_lwm2m(_) -> + MsgId = 12, + Mod = emqx_lwm2m_SUITE, + Epn = "urn:oma:lwm2m:oma:3", + Port = emqx_lwm2m_SUITE:default_port(), + URI = "coap://127.0.0.1:~b/rd?ep=~ts&imei=~ts<=345&lwm2m=1&password=public", + Test = fun(Username, Checker) -> + SubTopic = list_to_binary("lwm2m/" ++ Username ++ "/dn/#"), + ReportTopic = list_to_binary("lwm2m/" ++ Username ++ "/up/resp"), + with_resource( + ?FUNCTOR(gen_udp:open(0, [binary, {active, false}])), + ?FUNCTOR(Socket, gen_udp:close(Socket)), + fun(Socket) -> + Send = fun() -> + Mod:test_send_coap_request( + Socket, + post, + Mod:sprintf(URI, [Port, Epn, Username]), + #coap_content{ + content_format = <<"text/plain">>, + payload = <<", , , , ">> + }, + [], + MsgId + ), + + LoginResult = Mod:test_recv_coap_response(Socket), + ?assertEqual(ack, emqx_coap_SUITE:get_field(type, LoginResult)), + ?assertEqual({ok, created}, emqx_coap_SUITE:get_field(method, LoginResult)) + end, + try_publish_recv(ReportTopic, Send, fun(Data) -> Checker(SubTopic, Data) end) + end + ) + end, + Test("lwm2m", fun(SubTopic, Msg) -> + ?assertEqual(true, lists:member(SubTopic, test_mqtt_broker:get_subscrbied_topics())), + Payload = emqx_message:payload(Msg), + Cmd = emqx_json:decode(Payload, [return_maps]), + ?assertMatch(#{<<"msgType">> := <<"register">>, <<"data">> := _}, Cmd) + end), + + Test("/baduser", fun(SubTopic, Data) -> + ?assertEqual(false, lists:member(SubTopic, test_mqtt_broker:get_subscrbied_topics())), + ?assertEqual(timeout, Data) + end), + ok. + +-define(SN_CONNACK, 16#05). +t_case_sn_publish(_) -> + set_sn_predefined_topic(), + Mod = emqx_sn_protocol_SUITE, + Payload = <<"publish with authz">>, + Publish = fun(TopicId, Topic, Checker) -> + with_resource( + ?FUNCTOR(gen_udp:open(0, [binary])), + ?FUNCTOR(Socket, gen_udp:close(Socket)), + fun(Socket) -> + Mod:send_connect_msg(Socket, <<"client_id_test1">>), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, Mod:receive_response(Socket)), + + Send = fun() -> + Mod:send_publish_msg_normal_topic(Socket, 0, 1, TopicId, Payload) + end, + try_publish_recv(Topic, Send, Checker) + end + ) + end, + Publish(1, <<"/publish">>, fun(Msg) -> ?assertMatch(Payload, emqx_message:payload(Msg)) end), + Publish(2, <<"/badpublish">>, fun(Msg) -> ?assertEqual(timeout, Msg) end), + ok. + +t_case_sn_subscribe(_) -> + set_sn_predefined_topic(), + Mod = emqx_sn_protocol_SUITE, + Payload = <<"subscribe with authz">>, + Sub = fun(Topic, Checker) -> + with_resource( + ?FUNCTOR(gen_udp:open(0, [binary])), + ?FUNCTOR(Socket, gen_udp:close(Socket)), + fun(Socket) -> + Mod:send_connect_msg(Socket, <<"client_id_test1">>), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, Mod:receive_response(Socket)), + + Mod:send_subscribe_msg_normal_topic(Socket, 0, Topic, 1), + _ = Mod:receive_response(Socket), + + timer:sleep(100), + Msg = emqx_message:make(Topic, Payload), + emqx:publish(Msg), + + timer:sleep(100), + + Recv = Mod:receive_response(Socket), + Checker(Recv) + end + ) + end, + Sub(<<"/subscribe">>, fun(Data) -> + {ok, Msg, _, _} = emqx_sn_frame:parse(Data, undefined), + ?assertMatch({mqtt_sn_message, _, {_, 3, 0, Payload}}, Msg) + end), + Sub(<<"/badsubscribe">>, fun(Data) -> + ?assertEqual(udp_receive_timeout, Data) + end), + ok. + +t_case_stomp_publish(_) -> + Mod = emqx_stomp_SUITE, + Payload = <<"publish with authz">>, + Publish = fun(Topic, Checker) -> + Fun = fun(Sock) -> + gen_tcp:send( + Sock, + Mod:serialize( + <<"CONNECT">>, + [ + {<<"accept-version">>, Mod:stomp_ver()}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>} + ] + ) + ), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, Frame, _, _} = Mod:parse(Data), + ?assertEqual(<<"CONNECTED">>, Mod:get_field(command, Frame)), + Send = fun() -> + gen_tcp:send( + Sock, + Mod:serialize( + <<"SEND">>, + [{<<"destination">>, Topic}], + Payload + ) + ) + end, + try_publish_recv(Topic, Send, Checker) + end, + Mod:with_connection(Fun) + end, + Publish(<<"/publish">>, fun(Msg) -> ?assertMatch(Payload, emqx_message:payload(Msg)) end), + Publish(<<"/badpublish">>, fun(Msg) -> ?assertEqual(timeout, Msg) end), + ok. + +t_case_stomp_subscribe(_) -> + Mod = emqx_stomp_SUITE, + Payload = <<"subscribe with authz">>, + Sub = fun(Topic, Checker) -> + Fun = fun(Sock) -> + gen_tcp:send( + Sock, + Mod:serialize( + <<"CONNECT">>, + [ + {<<"accept-version">>, Mod:stomp_ver()}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>} + ] + ) + ), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, Frame, _, _} = Mod:parse(Data), + ?assertEqual(<<"CONNECTED">>, Mod:get_field(command, Frame)), + + %% Subscribe + gen_tcp:send( + Sock, + Mod:serialize( + <<"SUBSCRIBE">>, + [ + {<<"id">>, 0}, + {<<"destination">>, Topic}, + {<<"ack">>, <<"auto">>} + ] + ) + ), + + timer:sleep(100), + Msg = emqx_message:make(Topic, Payload), + emqx:publish(Msg), + + timer:sleep(100), + {ok, Data1} = gen_tcp:recv(Sock, 0, 2000), + {ok, Frame1, _, _} = Mod:parse(Data1), + Checker(Frame1) + end, + Mod:with_connection(Fun) + end, + Sub(<<"/subscribe">>, fun(Frame) -> + ?assertMatch(<<"MESSAGE">>, Mod:get_field(command, Frame)), + ?assertMatch(Payload, Mod:get_field(body, Frame)) + end), + Sub(<<"/badsubscribe">>, fun(Frame) -> + ?assertMatch(<<"ERROR">>, Mod:get_field(command, Frame)), + ?assertMatch(<<"ACL Deny">>, Mod:get_field(body, Frame)) + end), + ok. + +t_case_exproto_publish(_) -> + Mod = emqx_exproto_SUITE, + SvrMod = emqx_exproto_echo_svr, + Svrs = SvrMod:start(), + Payload = <<"publish with authz">>, + Publish = fun(Topic, Checker) -> + with_resource( + ?FUNCTOR(Mod:open(tcp)), + ?FUNCTOR(Sock, Mod:close(Sock)), + fun(Sock) -> + Client = #{ + proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">>, + username => <<"admin">> + }, + + ConnBin = SvrMod:frame_connect(Client, <<"public">>), + + Mod:send(Sock, ConnBin), + {ok, Recv} = Mod:recv(Sock, 5000), + C = ?FUNCTOR(Bin, emqx_json:decode(Bin, [return_maps])), + ?assertEqual(C(SvrMod:frame_connack(0)), C(Recv)), + + Send = fun() -> + PubBin = SvrMod:frame_publish(Topic, 0, Payload), + Mod:send(Sock, PubBin) + end, + try_publish_recv(Topic, Send, Checker) + end + ) + end, + Publish(<<"/publish">>, fun(Msg) -> ?assertMatch(Payload, emqx_message:payload(Msg)) end), + Publish(<<"/badpublish">>, fun(Msg) -> ?assertEqual(timeout, Msg) end), + SvrMod:stop(Svrs), + ok. + +t_case_exproto_subscribe(_) -> + Mod = emqx_exproto_SUITE, + SvrMod = emqx_exproto_echo_svr, + Svrs = SvrMod:start(), + WaitTime = 5000, + Sub = fun(Topic, ErrorCode) -> + with_resource( + ?FUNCTOR(Mod:open(tcp)), + ?FUNCTOR(Sock, Mod:close(Sock)), + fun(Sock) -> + Client = #{ + proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">>, + username => <<"admin">> + }, + + ConnBin = SvrMod:frame_connect(Client, <<"public">>), + + Mod:send(Sock, ConnBin), + {ok, Recv} = Mod:recv(Sock, WaitTime), + C = ?FUNCTOR(Bin, emqx_json:decode(Bin, [return_maps])), + ?assertEqual(C(SvrMod:frame_connack(0)), C(Recv)), + + SubBin = SvrMod:frame_subscribe(Topic, 0), + Mod:send(Sock, SubBin), + {ok, SubAckBin} = Mod:recv(Sock, WaitTime), + ?assertEqual(SvrMod:frame_suback(ErrorCode), SubAckBin) + end + ) + end, + + Sub(<<"/subscribe">>, 0), + Sub(<<"/badsubscribe">>, 1), + SvrMod:stop(Svrs), + ok. + +%%------------------------------------------------------------------------------ +%% Helpers +%%------------------------------------------------------------------------------ +try_publish_recv(Topic, Publish, Checker) -> + emqx:subscribe(Topic), + Clear = fun(Msg) -> + emqx:unsubscribe(Topic), + Checker(Msg) + end, + Publish(), + timer:sleep(200), + receive + {deliver, Topic, Msg} -> + Clear(Msg) + after 500 -> + Clear(timeout) + end. + +set_sn_predefined_topic() -> + RawCfg = emqx_conf:get_raw([gateway, mqttsn], #{}), + NewCfg = RawCfg#{ + <<"predefined">> => [ + #{ + id => 1, + topic => "/publish" + }, + #{ + id => 2, + topic => "/badpublish" + }, + #{ + id => 3, + topic => "/subscribe" + }, + #{ + id => 4, + topic => "/badsubscribe" + } + ] + }, + emqx_gateway_conf:update_gateway(mqttsn, NewCfg), + ok.