%%-------------------------------------------------------------------- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_gateway_authz_SUITE). -compile(nowarn_export_all). -compile(export_all). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -import(emqx_gateway_auth_ct, [with_resource/3]). -define(checkMatch(Guard), (fun(Expr) -> case (Expr) of Guard -> ok; X__V -> erlang:error( {assertMatch, [ {module, ?MODULE}, {line, ?LINE}, {expression, (??Expr)}, {pattern, (??Guard)}, {value, X__V} ]} ) end end) ). -define(FUNCTOR(Expr), fun() -> Expr end). -define(FUNCTOR(Arg, Expr), fun(Arg) -> Expr end). -define(AUTHNS, [authz_http]). all() -> emqx_gateway_auth_ct:group_names(?AUTHNS). groups() -> emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS). init_per_group(AuthName, Conf) -> Apps = emqx_cth_suite:start( [ {emqx_conf, "authorization { no_match = deny, cache { enable = false } }"}, emqx_auth, emqx_auth_http, {emqx_gateway, emqx_gateway_auth_ct:list_gateway_conf()} | emqx_gateway_test_utils:all_gateway_apps() ], #{work_dir => emqx_cth_suite:work_dir(Conf)} ), ok = emqx_gateway_auth_ct:start_auth(AuthName), [{group_apps, Apps} | Conf]. end_per_group(AuthName, Conf) -> ok = emqx_gateway_auth_ct:stop_auth(AuthName), ok = emqx_cth_suite:stop(?config(group_apps, Conf)), Conf. init_per_suite(Config) -> {ok, Apps1} = application:ensure_all_started(grpc), {ok, Apps2} = application:ensure_all_started(cowboy), {ok, _} = emqx_gateway_auth_ct:start(), [{suite_apps, Apps1 ++ Apps2} | Config]. end_per_suite(Config) -> ok = emqx_gateway_auth_ct:stop(), ok = emqx_cth_suite:stop_apps(?config(suite_apps, Config)), Config. %%------------------------------------------------------------------------------ %% Tests %%------------------------------------------------------------------------------ t_case_coap_publish(_) -> Mod = emqx_coap_SUITE, Prefix = Mod:ps_prefix(), Fun = fun(Channel, Token, Topic, Checker) -> TopicStr = binary_to_list(Topic), URI = Prefix ++ "/" ++ TopicStr ++ "?clientid=client1&token=" ++ Token, Req = Mod:make_req(post, <<>>), Checker(Mod:do_request(Channel, URI, Req)) end, Case = fun(Channel, Token) -> Fun(Channel, Token, <<"/publish">>, ?checkMatch({ok, changed, _})), Fun(Channel, Token, <<"/badpublish">>, ?checkMatch({error, uauthorized})) end, Mod:with_connection(Case). t_case_coap_subscribe(_) -> Mod = emqx_coap_SUITE, Prefix = Mod:ps_prefix(), Fun = fun(Channel, Token, Topic, Checker) -> TopicStr = binary_to_list(Topic), URI = Prefix ++ "/" ++ TopicStr ++ "?clientid=client1&token=" ++ Token, Req = Mod:make_req(get, <<>>, [{observe, 0}]), Checker(Mod:do_request(Channel, URI, Req)) end, Case = fun(Channel, Token) -> Fun(Channel, Token, <<"/subscribe">>, ?checkMatch({ok, content, _})), Fun(Channel, Token, <<"/badsubscribe">>, ?checkMatch({error, uauthorized})) end, Mod:with_connection(Case). -record(coap_content, {content_format, payload = <<>>}). t_case_lwm2m(_) -> MsgId = 12, Mod = emqx_lwm2m_SUITE, Epn = "urn:oma:lwm2m:oma:3", Port = emqx_lwm2m_SUITE:default_port(), URI = "coap://127.0.0.1:~b/rd?ep=~ts&imei=~ts<=345&lwm2m=1&password=public", Test = fun(Username, Checker) -> SubTopic = list_to_binary("lwm2m/" ++ Username ++ "/dn/#"), ReportTopic = list_to_binary("lwm2m/" ++ Username ++ "/up/resp"), with_resource( ?FUNCTOR(gen_udp:open(0, [binary, {active, false}])), ?FUNCTOR(Socket, gen_udp:close(Socket)), fun(Socket) -> Send = fun() -> Mod:test_send_coap_request( Socket, post, Mod:sprintf(URI, [Port, Epn, Username]), #coap_content{ content_format = <<"text/plain">>, payload = <<", , , , ">> }, [], MsgId ), LoginResult = Mod:test_recv_coap_response(Socket), ?assertEqual(ack, emqx_coap_SUITE:get_field(type, LoginResult)), ?assertEqual({ok, created}, emqx_coap_SUITE:get_field(method, LoginResult)) end, try_publish_recv(ReportTopic, Send, fun(Data) -> Checker(SubTopic, Data) end) end ) end, Test("lwm2m", fun(SubTopic, Msg) -> ?assertEqual(true, lists:member(SubTopic, test_mqtt_broker:get_subscrbied_topics())), Payload = emqx_message:payload(Msg), Cmd = emqx_utils_json:decode(Payload, [return_maps]), ?assertMatch(#{<<"msgType">> := <<"register">>, <<"data">> := _}, Cmd) end), Test("/baduser", fun(SubTopic, Data) -> ?assertEqual(false, lists:member(SubTopic, test_mqtt_broker:get_subscrbied_topics())), ?assertEqual(timeout, Data) end), ok. -define(SN_CONNACK, 16#05). t_case_sn_publish(_) -> set_sn_predefined_topic(), Mod = emqx_sn_protocol_SUITE, Payload = <<"publish with authz">>, Publish = fun(TopicId, Topic, Checker) -> with_resource( ?FUNCTOR(gen_udp:open(0, [binary])), ?FUNCTOR(Socket, gen_udp:close(Socket)), fun(Socket) -> Mod:send_connect_msg(Socket, <<"client_id_test1">>), ?assertEqual(<<3, ?SN_CONNACK, 0>>, Mod:receive_response(Socket)), Send = fun() -> Mod:send_publish_msg_normal_topic(Socket, 0, 1, TopicId, Payload) end, try_publish_recv(Topic, Send, Checker) end ) end, Publish(1, <<"/publish">>, fun(Msg) -> ?assertMatch(Payload, emqx_message:payload(Msg)) end), Publish(2, <<"/badpublish">>, fun(Msg) -> ?assertEqual(timeout, Msg) end), ok. t_case_sn_subscribe(_) -> set_sn_predefined_topic(), Mod = emqx_sn_protocol_SUITE, Payload = <<"subscribe with authz">>, Sub = fun(Topic, Checker) -> with_resource( ?FUNCTOR(gen_udp:open(0, [binary])), ?FUNCTOR(Socket, gen_udp:close(Socket)), fun(Socket) -> Mod:send_connect_msg(Socket, <<"client_id_test1">>), ?assertEqual(<<3, ?SN_CONNACK, 0>>, Mod:receive_response(Socket)), Mod:send_subscribe_msg_normal_topic(Socket, 0, Topic, 1), _ = Mod:receive_response(Socket), timer:sleep(100), Msg = emqx_message:make(Topic, Payload), emqx:publish(Msg), timer:sleep(100), Recv = Mod:receive_response(Socket), Checker(Recv) end ) end, Sub(<<"/subscribe">>, fun(Data) -> {ok, Msg, _, _} = emqx_mqttsn_frame:parse(Data, undefined), ?assertMatch({mqtt_sn_message, _, {_, 3, 0, Payload}}, Msg) end), Sub(<<"/badsubscribe">>, fun(Data) -> ?assertEqual(udp_receive_timeout, Data) end), ok. %% t_case_stomp_publish(_) -> %% Mod = emqx_stomp_SUITE, %% Payload = <<"publish with authz">>, %% Publish = fun(Topic, Checker) -> %% Fun = fun(Sock) -> %% gen_tcp:send( %% Sock, %% Mod:serialize( %% <<"CONNECT">>, %% [ %% {<<"accept-version">>, Mod:stomp_ver()}, %% {<<"host">>, <<"127.0.0.1:61613">>}, %% {<<"login">>, <<"guest">>}, %% {<<"passcode">>, <<"guest">>}, %% {<<"heart-beat">>, <<"0,0">>} %% ] %% ) %% ), %% {ok, Data} = gen_tcp:recv(Sock, 0), %% {ok, Frame, _, _} = Mod:parse(Data), %% ?assertEqual(<<"CONNECTED">>, Mod:get_field(command, Frame)), %% Send = fun() -> %% gen_tcp:send( %% Sock, %% Mod:serialize( %% <<"SEND">>, %% [{<<"destination">>, Topic}], %% Payload %% ) %% ) %% end, %% try_publish_recv(Topic, Send, Checker) %% end, %% Mod:with_connection(Fun) %% end, %% Publish(<<"/publish">>, fun(Msg) -> ?assertMatch(Payload, emqx_message:payload(Msg)) end), %% Publish(<<"/badpublish">>, fun(Msg) -> ?assertEqual(timeout, Msg) end), %% ok. %% t_case_stomp_subscribe(_) -> %% Mod = emqx_stomp_SUITE, %% Payload = <<"subscribe with authz">>, %% Sub = fun(Topic, Checker) -> %% Fun = fun(Sock) -> %% gen_tcp:send( %% Sock, %% Mod:serialize( %% <<"CONNECT">>, %% [ %% {<<"accept-version">>, Mod:stomp_ver()}, %% {<<"host">>, <<"127.0.0.1:61613">>}, %% {<<"login">>, <<"guest">>}, %% {<<"passcode">>, <<"guest">>}, %% {<<"heart-beat">>, <<"0,0">>} %% ] %% ) %% ), %% {ok, Data} = gen_tcp:recv(Sock, 0), %% {ok, Frame, _, _} = Mod:parse(Data), %% ?assertEqual(<<"CONNECTED">>, Mod:get_field(command, Frame)), %% %% Subscribe %% gen_tcp:send( %% Sock, %% Mod:serialize( %% <<"SUBSCRIBE">>, %% [ %% {<<"id">>, 0}, %% {<<"destination">>, Topic}, %% {<<"ack">>, <<"auto">>} %% ] %% ) %% ), %% timer:sleep(200), %% Msg = emqx_message:make(Topic, Payload), %% emqx:publish(Msg), %% timer:sleep(200), %% {ok, Data1} = gen_tcp:recv(Sock, 0, 10000), %% {ok, Frame1, _, _} = Mod:parse(Data1), %% Checker(Frame1) %% end, %% Mod:with_connection(Fun) %% end, %% Sub(<<"/subscribe">>, fun(Frame) -> %% ?assertMatch(<<"MESSAGE">>, Mod:get_field(command, Frame)), %% ?assertMatch(Payload, Mod:get_field(body, Frame)) %% end), %% Sub(<<"/badsubscribe">>, fun(Frame) -> %% ?assertMatch(<<"ERROR">>, Mod:get_field(command, Frame)), %% ?assertMatch(<<"ACL Deny">>, Mod:get_field(body, Frame)) %% end), %% ok. t_case_exproto_publish(_) -> Mod = emqx_exproto_SUITE, SvrMod = emqx_exproto_echo_svr, Svrs = SvrMod:start(http), Payload = <<"publish with authz">>, Publish = fun(Topic, Checker) -> with_resource( ?FUNCTOR(Mod:open(tcp)), ?FUNCTOR(Sock, Mod:close(Sock)), fun(Sock) -> Client = #{ proto_name => <<"demo">>, proto_ver => <<"v0.1">>, clientid => <<"test_client_1">>, username => <<"admin">> }, ConnBin = SvrMod:frame_connect(Client, <<"public">>), Mod:send(Sock, ConnBin), {ok, Recv} = Mod:recv(Sock, 5000), C = ?FUNCTOR(Bin, emqx_utils_json:decode(Bin, [return_maps])), ?assertEqual(C(SvrMod:frame_connack(0)), C(Recv)), Send = fun() -> PubBin = SvrMod:frame_publish(Topic, 0, Payload), Mod:send(Sock, PubBin) end, try_publish_recv(Topic, Send, Checker) end ) end, Publish(<<"/publish">>, fun(Msg) -> ?assertMatch(Payload, emqx_message:payload(Msg)) end), Publish(<<"/badpublish">>, fun(Msg) -> ?assertEqual(timeout, Msg) end), SvrMod:stop(Svrs), ok. t_case_exproto_subscribe(_) -> Mod = emqx_exproto_SUITE, SvrMod = emqx_exproto_echo_svr, Svrs = SvrMod:start(http), WaitTime = 5000, Sub = fun(Topic, ErrorCode) -> with_resource( ?FUNCTOR(Mod:open(tcp)), ?FUNCTOR(Sock, Mod:close(Sock)), fun(Sock) -> Client = #{ proto_name => <<"demo">>, proto_ver => <<"v0.1">>, clientid => <<"test_client_1">>, username => <<"admin">> }, ConnBin = SvrMod:frame_connect(Client, <<"public">>), Mod:send(Sock, ConnBin), {ok, Recv} = Mod:recv(Sock, WaitTime), C = ?FUNCTOR(Bin, emqx_utils_json:decode(Bin, [return_maps])), ?assertEqual(C(SvrMod:frame_connack(0)), C(Recv)), SubBin = SvrMod:frame_subscribe(Topic, 0), Mod:send(Sock, SubBin), {ok, SubAckBin} = Mod:recv(Sock, WaitTime), ?assertEqual(SvrMod:frame_suback(ErrorCode), SubAckBin) end ) end, Sub(<<"/subscribe">>, 0), Sub(<<"/badsubscribe">>, 1), SvrMod:stop(Svrs), ok. %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ try_publish_recv(Topic, Publish, Checker) -> try_publish_recv(Topic, Publish, Checker, 500). try_publish_recv(Topic, Publish, Checker, Timeout) -> emqx:subscribe(Topic), timer:sleep(200), Clear = fun(Msg) -> emqx:unsubscribe(Topic), Checker(Msg) end, Publish(), timer:sleep(200), receive {deliver, Topic, Msg} -> Clear(Msg) after Timeout -> Clear(timeout) end. set_sn_predefined_topic() -> RawCfg = emqx_conf:get_raw([gateway, mqttsn], #{}), NewCfg = RawCfg#{ <<"predefined">> => [ #{ id => 1, topic => "/publish" }, #{ id => 2, topic => "/badpublish" }, #{ id => 3, topic => "/subscribe" }, #{ id => 4, topic => "/badsubscribe" } ] }, emqx_gateway_conf:update_gateway(mqttsn, NewCfg), ok.