test(gw): refine exproto tests

This commit is contained in:
JianBo He 2021-07-22 22:05:53 +08:00
parent 4125209d1c
commit 7f1d245a8f
2 changed files with 641 additions and 632 deletions

View File

@ -16,439 +16,448 @@
-module(emqx_exproto_SUITE). -module(emqx_exproto_SUITE).
% -compile(export_all). -compile(export_all).
% -compile(nowarn_export_all). -compile(nowarn_export_all).
% -import(emqx_exproto_echo_svr, -import(emqx_exproto_echo_svr,
% [ frame_connect/2 [ frame_connect/2
% , frame_connack/1 , frame_connack/1
% , frame_publish/3 , frame_publish/3
% , frame_puback/1 , frame_puback/1
% , frame_subscribe/2 , frame_subscribe/2
% , frame_suback/1 , frame_suback/1
% , frame_unsubscribe/1 , frame_unsubscribe/1
% , frame_unsuback/1 , frame_unsuback/1
% , frame_disconnect/0 , frame_disconnect/0
% ]). ]).
% -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
% -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
% -define(TCPOPTS, [binary, {active, false}]). -define(TCPOPTS, [binary, {active, false}]).
% -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]). -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
% %%-------------------------------------------------------------------- %%--------------------------------------------------------------------
% %% Setups %% Setups
% %%-------------------------------------------------------------------- %%--------------------------------------------------------------------
% all() -> all() ->
% [{group, Name} || Name <- metrics()]. [{group, Name} || Name <- metrics()].
% groups() -> groups() ->
% Cases = emqx_ct:all(?MODULE), Cases = emqx_ct:all(?MODULE),
% [{Name, Cases} || Name <- metrics()]. [{Name, Cases} || Name <- metrics()].
% %% @private %% @private
% metrics() -> metrics() ->
% [tcp, ssl, udp, dtls]. [tcp, ssl, udp, dtls].
% init_per_group(GrpName, Cfg) -> init_per_group(GrpName, Cfg) ->
% put(grpname, GrpName), put(grpname, GrpName),
% Svrs = emqx_exproto_echo_svr:start(), Svrs = emqx_exproto_echo_svr:start(),
% emqx_ct_helpers:start_apps([emqx_exproto], fun set_special_cfg/1), emqx_ct_helpers:start_apps([emqx_gateway], fun set_special_cfg/1),
% emqx_logger:set_log_level(debug), emqx_logger:set_log_level(debug),
% [{servers, Svrs}, {listener_type, GrpName} | Cfg]. [{servers, Svrs}, {listener_type, GrpName} | Cfg].
% end_per_group(_, Cfg) -> end_per_group(_, Cfg) ->
% emqx_ct_helpers:stop_apps([emqx_exproto]), emqx_ct_helpers:stop_apps([emqx_gateway]),
% emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)). emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
% set_special_cfg(emqx_exproto) -> set_special_cfg(emqx_gateway) ->
% LisType = get(grpname), LisType = get(grpname),
% Listeners = application:get_env(emqx_exproto, listeners, []), emqx_config:put(
% SockOpts = socketopts(LisType), [emqx_gateway, exproto],
% UpgradeOpts = fun(Opts) -> #{'1' =>
% Opts2 = lists:keydelete(tcp_options, 1, Opts), #{authenticator => allow_anonymous,
% Opts3 = lists:keydelete(ssl_options, 1, Opts2), server => #{bind => 9100},
% Opts4 = lists:keydelete(udp_options, 1, Opts3), handler => #{address => "http://127.0.0.1:9001"},
% Opts5 = lists:keydelete(dtls_options, 1, Opts4), listener => listener_confs(LisType)
% SockOpts ++ Opts5 }}),
% end,
% NListeners = [{Proto, LisType, LisOn, UpgradeOpts(Opts)} Listeners = application:get_env(emqx_exproto, listeners, []),
% || {Proto, _Type, LisOn, Opts} <- Listeners], SockOpts = socketopts(LisType),
% application:set_env(emqx_exproto, listeners, NListeners); UpgradeOpts = fun(Opts) ->
% set_special_cfg(emqx) -> Opts2 = lists:keydelete(tcp_options, 1, Opts),
% application:set_env(emqx, allow_anonymous, true), Opts3 = lists:keydelete(ssl_options, 1, Opts2),
% application:set_env(emqx, enable_acl_cache, false), Opts4 = lists:keydelete(udp_options, 1, Opts3),
% ok. Opts5 = lists:keydelete(dtls_options, 1, Opts4),
SockOpts ++ Opts5
% %%-------------------------------------------------------------------- end;
% %% Tests cases
% %%-------------------------------------------------------------------- set_special_cfg(_App) ->
ok.
% t_start_stop(_) ->
% ok. listener_confs(Type) ->
Default = #{bind => 7993, acceptors => 8},
% t_mountpoint_echo(Cfg) -> #{Type => #{'1' => maps:merge(Default, maps:from_list(socketopts(Type)))}}.
% SockType = proplists:get_value(listener_type, Cfg),
% Sock = open(SockType), %%--------------------------------------------------------------------
%% Tests cases
% Client = #{proto_name => <<"demo">>, %%--------------------------------------------------------------------
% proto_ver => <<"v0.1">>,
% clientid => <<"test_client_1">>, t_start_stop(_) ->
% mountpoint => <<"ct/">> ok.
% },
% Password = <<"123456">>, t_mountpoint_echo(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
% ConnBin = frame_connect(Client, Password), Sock = open(SockType),
% ConnAckBin = frame_connack(0),
Client = #{proto_name => <<"demo">>,
% send(Sock, ConnBin), proto_ver => <<"v0.1">>,
% {ok, ConnAckBin} = recv(Sock, 5000), clientid => <<"test_client_1">>,
mountpoint => <<"ct/">>
% SubBin = frame_subscribe(<<"t/dn">>, 1), },
% SubAckBin = frame_suback(0), Password = <<"123456">>,
% send(Sock, SubBin), ConnBin = frame_connect(Client, Password),
% {ok, SubAckBin} = recv(Sock, 5000), ConnAckBin = frame_connack(0),
% emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)), send(Sock, ConnBin),
% PubBin1 = frame_publish(<<"t/dn">>, 0, <<"echo">>), {ok, ConnAckBin} = recv(Sock, 5000),
% {ok, PubBin1} = recv(Sock, 5000),
SubBin = frame_subscribe(<<"t/dn">>, 1),
% PubBin2 = frame_publish(<<"t/up">>, 0, <<"echo">>), SubAckBin = frame_suback(0),
% PubAckBin = frame_puback(0),
send(Sock, SubBin),
% emqx:subscribe(<<"ct/t/up">>), {ok, SubAckBin} = recv(Sock, 5000),
% send(Sock, PubBin2), emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)),
% {ok, PubAckBin} = recv(Sock, 5000), PubBin1 = frame_publish(<<"t/dn">>, 0, <<"echo">>),
{ok, PubBin1} = recv(Sock, 5000),
% receive
% {deliver, _, _} -> ok PubBin2 = frame_publish(<<"t/up">>, 0, <<"echo">>),
% after 1000 -> PubAckBin = frame_puback(0),
% error(echo_not_running)
% end, emqx:subscribe(<<"ct/t/up">>),
% close(Sock).
% t_auth_deny(Cfg) ->
% SockType = proplists:get_value(listener_type, Cfg),
% Sock = open(SockType),
% Client = #{proto_name => <<"demo">>,
% proto_ver => <<"v0.1">>,
% clientid => <<"test_client_1">>
% },
% Password = <<"123456">>,
% ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
% ok = meck:expect(emqx_access_control, authenticate,
% fun(_) -> {error, ?RC_NOT_AUTHORIZED} end),
% ConnBin = frame_connect(Client, Password),
% ConnAckBin = frame_connack(1),
% send(Sock, ConnBin),
% {ok, ConnAckBin} = recv(Sock, 5000),
% SockType =/= udp andalso begin
% {error, closed} = recv(Sock, 5000)
% end,
% meck:unload([emqx_access_control]).
% t_acl_deny(Cfg) ->
% SockType = proplists:get_value(listener_type, Cfg),
% Sock = open(SockType),
% Client = #{proto_name => <<"demo">>,
% proto_ver => <<"v0.1">>,
% clientid => <<"test_client_1">>
% },
% Password = <<"123456">>,
% ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
% ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end),
% ConnBin = frame_connect(Client, Password),
% ConnAckBin = frame_connack(0),
% send(Sock, ConnBin),
% {ok, ConnAckBin} = recv(Sock, 5000),
% SubBin = frame_subscribe(<<"t/#">>, 1), send(Sock, PubBin2),
% SubAckBin = frame_suback(1), {ok, PubAckBin} = recv(Sock, 5000),
% send(Sock, SubBin), receive
% {ok, SubAckBin} = recv(Sock, 5000), {deliver, _, _} -> ok
after 1000 ->
error(echo_not_running)
end,
close(Sock).
t_auth_deny(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,
fun(_) -> {error, ?RC_NOT_AUTHORIZED} end),
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(1),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SockType =/= udp andalso begin
{error, closed} = recv(Sock, 5000)
end,
meck:unload([emqx_access_control]).
t_acl_deny(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
% emqx:publish(emqx_message:make(<<"t/dn">>, <<"echo">>)), ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end),
% PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>), ConnBin = frame_connect(Client, Password),
% PubBinFailedAck = frame_puback(1), ConnAckBin = frame_connack(0),
% PubBinSuccesAck = frame_puback(0),
% send(Sock, PubBin), send(Sock, ConnBin),
% {ok, PubBinFailedAck} = recv(Sock, 5000), {ok, ConnAckBin} = recv(Sock, 5000),
% meck:unload([emqx_access_control]), SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(1),
% send(Sock, PubBin), send(Sock, SubBin),
% {ok, PubBinSuccesAck} = recv(Sock, 5000), {ok, SubAckBin} = recv(Sock, 5000),
% close(Sock).
% t_keepalive_timeout(Cfg) -> emqx:publish(emqx_message:make(<<"t/dn">>, <<"echo">>)),
% SockType = proplists:get_value(listener_type, Cfg),
% Sock = open(SockType),
% Client = #{proto_name => <<"demo">>, PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>),
% proto_ver => <<"v0.1">>, PubBinFailedAck = frame_puback(1),
% clientid => <<"test_client_1">>, PubBinSuccesAck = frame_puback(0),
% keepalive => 2
% },
% Password = <<"123456">>,
% ConnBin = frame_connect(Client, Password), send(Sock, PubBin),
% ConnAckBin = frame_connack(0), {ok, PubBinFailedAck} = recv(Sock, 5000),
% send(Sock, ConnBin), meck:unload([emqx_access_control]),
% {ok, ConnAckBin} = recv(Sock, 5000),
% DisconnectBin = frame_disconnect(), send(Sock, PubBin),
% {ok, DisconnectBin} = recv(Sock, 10000), {ok, PubBinSuccesAck} = recv(Sock, 5000),
close(Sock).
% SockType =/= udp andalso begin t_keepalive_timeout(Cfg) ->
% {error, closed} = recv(Sock, 5000) SockType = proplists:get_value(listener_type, Cfg),
% end, ok. Sock = open(SockType),
% t_hook_connected_disconnected(Cfg) -> Client = #{proto_name => <<"demo">>,
% SockType = proplists:get_value(listener_type, Cfg), proto_ver => <<"v0.1">>,
% Sock = open(SockType), clientid => <<"test_client_1">>,
keepalive => 2
},
Password = <<"123456">>,
% Client = #{proto_name => <<"demo">>, ConnBin = frame_connect(Client, Password),
% proto_ver => <<"v0.1">>, ConnAckBin = frame_connack(0),
% clientid => <<"test_client_1">>
% },
% Password = <<"123456">>,
% ConnBin = frame_connect(Client, Password), send(Sock, ConnBin),
% ConnAckBin = frame_connack(0), {ok, ConnAckBin} = recv(Sock, 5000),
% Parent = self(), DisconnectBin = frame_disconnect(),
% emqx:hook('client.connected', {?MODULE, hook_fun1, [Parent]}), {ok, DisconnectBin} = recv(Sock, 10000),
% emqx:hook('client.disconnected',{?MODULE, hook_fun2, [Parent]}),
% send(Sock, ConnBin), SockType =/= udp andalso begin
% {ok, ConnAckBin} = recv(Sock, 5000), {error, closed} = recv(Sock, 5000)
end, ok.
% receive t_hook_connected_disconnected(Cfg) ->
% connected -> ok SockType = proplists:get_value(listener_type, Cfg),
% after 1000 -> Sock = open(SockType),
% error(hook_is_not_running)
% end,
% DisconnectBin = frame_disconnect(), Client = #{proto_name => <<"demo">>,
% send(Sock, DisconnectBin), proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
% receive ConnBin = frame_connect(Client, Password),
% disconnected -> ok ConnAckBin = frame_connack(0),
% after 1000 ->
% error(hook_is_not_running)
% end,
% SockType =/= udp andalso begin Parent = self(),
% {error, closed} = recv(Sock, 5000) emqx:hook('client.connected', {?MODULE, hook_fun1, [Parent]}),
% end, emqx:hook('client.disconnected',{?MODULE, hook_fun2, [Parent]}),
% emqx:unhook('client.connected', {?MODULE, hook_fun1}),
% emqx:unhook('client.disconnected', {?MODULE, hook_fun2}).
% t_hook_session_subscribed_unsubscribed(Cfg) -> send(Sock, ConnBin),
% SockType = proplists:get_value(listener_type, Cfg), {ok, ConnAckBin} = recv(Sock, 5000),
% Sock = open(SockType),
% Client = #{proto_name => <<"demo">>, receive
% proto_ver => <<"v0.1">>, connected -> ok
% clientid => <<"test_client_1">> after 1000 ->
% }, error(hook_is_not_running)
% Password = <<"123456">>, end,
% ConnBin = frame_connect(Client, Password), DisconnectBin = frame_disconnect(),
% ConnAckBin = frame_connack(0), send(Sock, DisconnectBin),
% send(Sock, ConnBin), receive
% {ok, ConnAckBin} = recv(Sock, 5000), disconnected -> ok
after 1000 ->
error(hook_is_not_running)
end,
% Parent = self(), SockType =/= udp andalso begin
% emqx:hook('session.subscribed', {?MODULE, hook_fun3, [Parent]}), {error, closed} = recv(Sock, 5000)
% emqx:hook('session.unsubscribed', {?MODULE, hook_fun4, [Parent]}), end,
emqx:unhook('client.connected', {?MODULE, hook_fun1}),
emqx:unhook('client.disconnected', {?MODULE, hook_fun2}).
% SubBin = frame_subscribe(<<"t/#">>, 1), t_hook_session_subscribed_unsubscribed(Cfg) ->
% SubAckBin = frame_suback(0), SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
% send(Sock, SubBin),
% {ok, SubAckBin} = recv(Sock, 5000), Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
% receive clientid => <<"test_client_1">>
% subscribed -> ok },
% after 1000 -> Password = <<"123456">>,
% error(hook_is_not_running)
% end, ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
% UnsubBin = frame_unsubscribe(<<"t/#">>),
% UnsubAckBin = frame_unsuback(0), send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
% send(Sock, UnsubBin),
% {ok, UnsubAckBin} = recv(Sock, 5000), Parent = self(),
emqx:hook('session.subscribed', {?MODULE, hook_fun3, [Parent]}),
% receive emqx:hook('session.unsubscribed', {?MODULE, hook_fun4, [Parent]}),
% unsubscribed -> ok
% after 1000 -> SubBin = frame_subscribe(<<"t/#">>, 1),
% error(hook_is_not_running) SubAckBin = frame_suback(0),
% end,
send(Sock, SubBin),
% close(Sock), {ok, SubAckBin} = recv(Sock, 5000),
% emqx:unhook('session.subscribed', {?MODULE, hook_fun3}),
% emqx:unhook('session.unsubscribed', {?MODULE, hook_fun4}). receive
subscribed -> ok
% t_hook_message_delivered(Cfg) -> after 1000 ->
% SockType = proplists:get_value(listener_type, Cfg), error(hook_is_not_running)
% Sock = open(SockType), end,
% Client = #{proto_name => <<"demo">>, UnsubBin = frame_unsubscribe(<<"t/#">>),
% proto_ver => <<"v0.1">>, UnsubAckBin = frame_unsuback(0),
% clientid => <<"test_client_1">>
% }, send(Sock, UnsubBin),
% Password = <<"123456">>, {ok, UnsubAckBin} = recv(Sock, 5000),
% ConnBin = frame_connect(Client, Password), receive
% ConnAckBin = frame_connack(0), unsubscribed -> ok
after 1000 ->
% send(Sock, ConnBin), error(hook_is_not_running)
% {ok, ConnAckBin} = recv(Sock, 5000), end,
% SubBin = frame_subscribe(<<"t/#">>, 1), close(Sock),
% SubAckBin = frame_suback(0), emqx:unhook('session.subscribed', {?MODULE, hook_fun3}),
emqx:unhook('session.unsubscribed', {?MODULE, hook_fun4}).
% send(Sock, SubBin),
% {ok, SubAckBin} = recv(Sock, 5000), t_hook_message_delivered(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
% emqx:hook('message.delivered', {?MODULE, hook_fun5, []}), Sock = open(SockType),
% emqx:publish(emqx_message:make(<<"t/dn">>, <<"1">>)), Client = #{proto_name => <<"demo">>,
% PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>), proto_ver => <<"v0.1">>,
% {ok, PubBin1} = recv(Sock, 5000), clientid => <<"test_client_1">>
},
% close(Sock), Password = <<"123456">>,
% emqx:unhook('message.delivered', {?MODULE, hook_fun5}).
ConnBin = frame_connect(Client, Password),
% %%-------------------------------------------------------------------- ConnAckBin = frame_connack(0),
% %% Utils
send(Sock, ConnBin),
% hook_fun1(_, _, Parent) -> Parent ! connected, ok. {ok, ConnAckBin} = recv(Sock, 5000),
% hook_fun2(_, _, _, Parent) -> Parent ! disconnected, ok.
SubBin = frame_subscribe(<<"t/#">>, 1),
% hook_fun3(_, _, _, Parent) -> Parent ! subscribed, ok. SubAckBin = frame_suback(0),
% hook_fun4(_, _, _, Parent) -> Parent ! unsubscribed, ok.
send(Sock, SubBin),
% hook_fun5(_, Msg) -> {ok, Msg#message{payload = <<"2">>}}. {ok, SubAckBin} = recv(Sock, 5000),
% rand_bytes() -> emqx:hook('message.delivered', {?MODULE, hook_fun5, []}),
% crypto:strong_rand_bytes(rand:uniform(256)).
emqx:publish(emqx_message:make(<<"t/dn">>, <<"1">>)),
% %%-------------------------------------------------------------------- PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>),
% %% Sock funcs {ok, PubBin1} = recv(Sock, 5000),
% open(tcp) -> close(Sock),
% {ok, Sock} = gen_tcp:connect("127.0.0.1", 7993, ?TCPOPTS), emqx:unhook('message.delivered', {?MODULE, hook_fun5}).
% {tcp, Sock};
% open(udp) -> %%--------------------------------------------------------------------
% {ok, Sock} = gen_udp:open(0, ?TCPOPTS), %% Utils
% {udp, Sock};
% open(ssl) -> hook_fun1(_, _, Parent) -> Parent ! connected, ok.
% SslOpts = client_ssl_opts(), hook_fun2(_, _, _, Parent) -> Parent ! disconnected, ok.
% {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts),
% {ssl, SslSock}; hook_fun3(_, _, _, Parent) -> Parent ! subscribed, ok.
% open(dtls) -> hook_fun4(_, _, _, Parent) -> Parent ! unsubscribed, ok.
% SslOpts = client_ssl_opts(),
% {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts), hook_fun5(_, Msg) -> {ok, Msg#message{payload = <<"2">>}}.
% {dtls, SslSock}.
rand_bytes() ->
% send({tcp, Sock}, Bin) -> crypto:strong_rand_bytes(rand:uniform(256)).
% gen_tcp:send(Sock, Bin);
% send({udp, Sock}, Bin) -> %%--------------------------------------------------------------------
% gen_udp:send(Sock, "127.0.0.1", 7993, Bin); %% Sock funcs
% send({ssl, Sock}, Bin) ->
% ssl:send(Sock, Bin); open(tcp) ->
% send({dtls, Sock}, Bin) -> {ok, Sock} = gen_tcp:connect("127.0.0.1", 7993, ?TCPOPTS),
% ssl:send(Sock, Bin). {tcp, Sock};
open(udp) ->
% recv({tcp, Sock}, Ts) -> {ok, Sock} = gen_udp:open(0, ?TCPOPTS),
% gen_tcp:recv(Sock, 0, Ts); {udp, Sock};
% recv({udp, Sock}, Ts) -> open(ssl) ->
% {ok, {_, _, Bin}} = gen_udp:recv(Sock, 0, Ts), SslOpts = client_ssl_opts(),
% {ok, Bin}; {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts),
% recv({ssl, Sock}, Ts) -> {ssl, SslSock};
% ssl:recv(Sock, 0, Ts); open(dtls) ->
% recv({dtls, Sock}, Ts) -> SslOpts = client_ssl_opts(),
% ssl:recv(Sock, 0, Ts). {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts),
{dtls, SslSock}.
% close({tcp, Sock}) ->
% gen_tcp:close(Sock); send({tcp, Sock}, Bin) ->
% close({udp, Sock}) -> gen_tcp:send(Sock, Bin);
% gen_udp:close(Sock); send({udp, Sock}, Bin) ->
% close({ssl, Sock}) -> gen_udp:send(Sock, "127.0.0.1", 7993, Bin);
% ssl:close(Sock); send({ssl, Sock}, Bin) ->
% close({dtls, Sock}) -> ssl:send(Sock, Bin);
% ssl:close(Sock). send({dtls, Sock}, Bin) ->
ssl:send(Sock, Bin).
% %%--------------------------------------------------------------------
% %% Server-Opts recv({tcp, Sock}, Ts) ->
gen_tcp:recv(Sock, 0, Ts);
% socketopts(tcp) -> recv({udp, Sock}, Ts) ->
% [{tcp_options, tcp_opts()}]; {ok, {_, _, Bin}} = gen_udp:recv(Sock, 0, Ts),
% socketopts(ssl) -> {ok, Bin};
% [{tcp_options, tcp_opts()}, recv({ssl, Sock}, Ts) ->
% {ssl_options, ssl_opts()}]; ssl:recv(Sock, 0, Ts);
% socketopts(udp) -> recv({dtls, Sock}, Ts) ->
% [{udp_options, udp_opts()}]; ssl:recv(Sock, 0, Ts).
% socketopts(dtls) ->
% [{udp_options, udp_opts()}, close({tcp, Sock}) ->
% {dtls_options, dtls_opts()}]. gen_tcp:close(Sock);
close({udp, Sock}) ->
% tcp_opts() -> gen_udp:close(Sock);
% [{send_timeout, 15000}, close({ssl, Sock}) ->
% {send_timeout_close, true}, ssl:close(Sock);
% {backlog, 100}, close({dtls, Sock}) ->
% {nodelay, true} | udp_opts()]. ssl:close(Sock).
% udp_opts() -> %%--------------------------------------------------------------------
% [{recbuf, 1024}, %% Server-Opts
% {sndbuf, 1024},
% {buffer, 1024}, socketopts(tcp) ->
% {reuseaddr, true}]. [{tcp_options, tcp_opts()}];
socketopts(ssl) ->
% ssl_opts() -> [{tcp_options, tcp_opts()},
% Certs = certs("key.pem", "cert.pem", "cacert.pem"), {ssl_options, ssl_opts()}];
% [{versions, emqx_tls_lib:default_versions()}, socketopts(udp) ->
% {ciphers, emqx_tls_lib:default_ciphers()}, [{udp_options, udp_opts()}];
% {verify, verify_peer}, socketopts(dtls) ->
% {fail_if_no_peer_cert, true}, [{udp_options, udp_opts()},
% {secure_renegotiate, false}, {dtls_options, dtls_opts()}].
% {reuse_sessions, true},
% {honor_cipher_order, true}]++Certs. tcp_opts() ->
[{send_timeout, 15000},
% dtls_opts() -> {send_timeout_close, true},
% Opts = ssl_opts(), {backlog, 100},
% lists:keyreplace(versions, 1, Opts, {versions, ['dtlsv1.2', 'dtlsv1']}). {nodelay, true} | udp_opts()].
% %%-------------------------------------------------------------------- udp_opts() ->
% %% Client-Opts [{recbuf, 1024},
{sndbuf, 1024},
% client_ssl_opts() -> {buffer, 1024},
% certs( "client-key.pem", "client-cert.pem", "cacert.pem" ). {reuseaddr, true}].
% certs( Key, Cert, CACert ) -> ssl_opts() ->
% CertsPath = emqx_ct_helpers:deps_path(emqx, "etc/certs"), Certs = certs("key.pem", "cert.pem", "cacert.pem"),
% [ { keyfile, filename:join([ CertsPath, Key ]) }, [{versions, emqx_tls_lib:default_versions()},
% { certfile, filename:join([ CertsPath, Cert ]) }, {ciphers, emqx_tls_lib:default_ciphers()},
% { cacertfile, filename:join([ CertsPath, CACert ]) } ]. {verify, verify_peer},
{fail_if_no_peer_cert, true},
{secure_renegotiate, false},
{reuse_sessions, true},
{honor_cipher_order, true}]++Certs.
dtls_opts() ->
Opts = ssl_opts(),
lists:keyreplace(versions, 1, Opts, {versions, ['dtlsv1.2', 'dtlsv1']}).
%%--------------------------------------------------------------------
%% Client-Opts
client_ssl_opts() ->
certs( "client-key.pem", "client-cert.pem", "cacert.pem" ).
certs( Key, Cert, CACert ) ->
CertsPath = emqx_ct_helpers:deps_path(emqx, "etc/certs"),
[ { keyfile, filename:join([ CertsPath, Key ]) },
{ certfile, filename:join([ CertsPath, Cert ]) },
{ cacertfile, filename:join([ CertsPath, CACert ]) } ].

View File

@ -16,263 +16,263 @@
-module(emqx_exproto_echo_svr). -module(emqx_exproto_echo_svr).
% -behavior(emqx_exproto_v_1_connection_handler_bhvr). -behavior(emqx_exproto_v_1_connection_handler_bhvr).
% -export([ start/0 -export([ start/0
% , stop/1 , stop/1
% ]). ]).
% -export([ frame_connect/2 -export([ frame_connect/2
% , frame_connack/1 , frame_connack/1
% , frame_publish/3 , frame_publish/3
% , frame_puback/1 , frame_puback/1
% , frame_subscribe/2 , frame_subscribe/2
% , frame_suback/1 , frame_suback/1
% , frame_unsubscribe/1 , frame_unsubscribe/1
% , frame_unsuback/1 , frame_unsuback/1
% , frame_disconnect/0 , frame_disconnect/0
% ]). ]).
% -export([ on_socket_created/2 -export([ on_socket_created/2
% , on_received_bytes/2 , on_received_bytes/2
% , on_socket_closed/2 , on_socket_closed/2
% , on_timer_timeout/2 , on_timer_timeout/2
% , on_received_messages/2 , on_received_messages/2
% ]). ]).
% -define(LOG(Fmt, Args), io:format(standard_error, Fmt, Args)). -define(LOG(Fmt, Args), io:format(standard_error, Fmt, Args)).
% -define(HTTP, #{grpc_opts => #{service_protos => [emqx_exproto_pb], -define(HTTP, #{grpc_opts => #{service_protos => [emqx_exproto_pb],
% services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}}, services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}},
% listen_opts => #{port => 9001, listen_opts => #{port => 9001,
% socket_options => []}, socket_options => []},
% pool_opts => #{size => 8}, pool_opts => #{size => 8},
% transport_opts => #{ssl => false}}). transport_opts => #{ssl => false}}).
% -define(CLIENT, emqx_exproto_v_1_connection_adapter_client). -define(CLIENT, emqx_exproto_v_1_connection_adapter_client).
% -define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})). -define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})).
% -define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})). -define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})).
% -define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})). -define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})).
% -define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})). -define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})).
% -define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})). -define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})).
% -define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})). -define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})).
% -define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})). -define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})).
% -define(TYPE_CONNECT, 1). -define(TYPE_CONNECT, 1).
% -define(TYPE_CONNACK, 2). -define(TYPE_CONNACK, 2).
% -define(TYPE_PUBLISH, 3). -define(TYPE_PUBLISH, 3).
% -define(TYPE_PUBACK, 4). -define(TYPE_PUBACK, 4).
% -define(TYPE_SUBSCRIBE, 5). -define(TYPE_SUBSCRIBE, 5).
% -define(TYPE_SUBACK, 6). -define(TYPE_SUBACK, 6).
% -define(TYPE_UNSUBSCRIBE, 7). -define(TYPE_UNSUBSCRIBE, 7).
% -define(TYPE_UNSUBACK, 8). -define(TYPE_UNSUBACK, 8).
% -define(TYPE_DISCONNECT, 9). -define(TYPE_DISCONNECT, 9).
% -define(loop_recv_and_reply_empty_success(Stream), -define(loop_recv_and_reply_empty_success(Stream),
% ?loop_recv_and_reply_empty_success(Stream, fun(_) -> ok end)). ?loop_recv_and_reply_empty_success(Stream, fun(_) -> ok end)).
% -define(loop_recv_and_reply_empty_success(Stream, Fun), -define(loop_recv_and_reply_empty_success(Stream, Fun),
% begin begin
% LoopRecv = fun _Lp(_St) -> LoopRecv = fun _Lp(_St) ->
% case grpc_stream:recv(_St) of case grpc_stream:recv(_St) of
% {more, _Reqs, _NSt} -> {more, _Reqs, _NSt} ->
% ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]), ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]),
% Fun(_Reqs), _Lp(_NSt); Fun(_Reqs), _Lp(_NSt);
% {eos, _Reqs, _NSt} -> {eos, _Reqs, _NSt} ->
% ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]), ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]),
% Fun(_Reqs), _NSt Fun(_Reqs), _NSt
% end end
% end, end,
% NStream = LoopRecv(Stream), NStream = LoopRecv(Stream),
% grpc_stream:reply(NStream, #{}), grpc_stream:reply(NStream, #{}),
% {ok, NStream} {ok, NStream}
% end). end).
% %%-------------------------------------------------------------------- %%--------------------------------------------------------------------
% %% APIs %% APIs
% %%-------------------------------------------------------------------- %%--------------------------------------------------------------------
% start() -> start() ->
% application:ensure_all_started(grpc), application:ensure_all_started(grpc),
% [start_channel(), start_server()]. [start_channel(), start_server()].
% start_channel() -> start_channel() ->
% grpc_client_sup:create_channel_pool(ct_test_channel, "http://127.0.0.1:9100", #{}). grpc_client_sup:create_channel_pool(ct_test_channel, "http://127.0.0.1:9100", #{}).
% start_server() -> start_server() ->
% Services = #{protos => [emqx_exproto_pb], Services = #{protos => [emqx_exproto_pb],
% services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE} services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}
% }, },
% Options = [], Options = [],
% grpc:start_server(?MODULE, 9001, Services, Options). grpc:start_server(?MODULE, 9001, Services, Options).
% stop([_ChannPid, _SvrPid]) -> stop([_ChannPid, _SvrPid]) ->
% grpc:stop_server(?MODULE), grpc:stop_server(?MODULE),
% grpc_client_sup:stop_channel_pool(ct_test_channel). grpc_client_sup:stop_channel_pool(ct_test_channel).
% %%-------------------------------------------------------------------- %%--------------------------------------------------------------------
% %% Protocol Adapter callbacks %% Protocol Adapter callbacks
% %%-------------------------------------------------------------------- %%--------------------------------------------------------------------
% -spec on_socket_created(grpc_stream:stream(), grpc:metadata()) -spec on_socket_created(grpc_stream:stream(), grpc:metadata())
% -> {ok, grpc_stream:stream()}. -> {ok, grpc_stream:stream()}.
% on_socket_created(Stream, _Md) -> on_socket_created(Stream, _Md) ->
% ?loop_recv_and_reply_empty_success(Stream). ?loop_recv_and_reply_empty_success(Stream).
% -spec on_socket_closed(grpc_stream:stream(), grpc:metadata()) -spec on_socket_closed(grpc_stream:stream(), grpc:metadata())
% -> {ok, grpc_stream:stream()}. -> {ok, grpc_stream:stream()}.
% on_socket_closed(Stream, _Md) -> on_socket_closed(Stream, _Md) ->
% ?loop_recv_and_reply_empty_success(Stream). ?loop_recv_and_reply_empty_success(Stream).
% -spec on_received_bytes(grpc_stream:stream(), grpc:metadata()) -spec on_received_bytes(grpc_stream:stream(), grpc:metadata())
% -> {ok, grpc_stream:stream()}. -> {ok, grpc_stream:stream()}.
% on_received_bytes(Stream, _Md) -> on_received_bytes(Stream, _Md) ->
% ?loop_recv_and_reply_empty_success(Stream, ?loop_recv_and_reply_empty_success(Stream,
% fun(Reqs) -> fun(Reqs) ->
% lists:foreach( lists:foreach(
% fun(#{conn := Conn, bytes := Bytes}) -> fun(#{conn := Conn, bytes := Bytes}) ->
% #{<<"type">> := Type} = Params = emqx_json:decode(Bytes, [return_maps]), #{<<"type">> := Type} = Params = emqx_json:decode(Bytes, [return_maps]),
% _ = handle_in(Conn, Type, Params) _ = handle_in(Conn, Type, Params)
% end, Reqs) end, Reqs)
% end). end).
% -spec on_timer_timeout(grpc_stream:stream(), grpc:metadata()) -spec on_timer_timeout(grpc_stream:stream(), grpc:metadata())
% -> {ok, grpc_stream:stream()}. -> {ok, grpc_stream:stream()}.
% on_timer_timeout(Stream, _Md) -> on_timer_timeout(Stream, _Md) ->
% ?loop_recv_and_reply_empty_success(Stream, ?loop_recv_and_reply_empty_success(Stream,
% fun(Reqs) -> fun(Reqs) ->
% lists:foreach( lists:foreach(
% fun(#{conn := Conn, type := 'KEEPALIVE'}) -> fun(#{conn := Conn, type := 'KEEPALIVE'}) ->
% ?LOG("Close this connection ~p due to keepalive timeout", [Conn]), ?LOG("Close this connection ~p due to keepalive timeout", [Conn]),
% handle_out(Conn, ?TYPE_DISCONNECT), handle_out(Conn, ?TYPE_DISCONNECT),
% ?close(#{conn => Conn}) ?close(#{conn => Conn})
% end, Reqs) end, Reqs)
% end). end).
% -spec on_received_messages(grpc_stream:stream(), grpc:metadata()) -spec on_received_messages(grpc_stream:stream(), grpc:metadata())
% -> {ok, grpc_stream:stream()}. -> {ok, grpc_stream:stream()}.
% on_received_messages(Stream, _Md) -> on_received_messages(Stream, _Md) ->
% ?loop_recv_and_reply_empty_success(Stream, ?loop_recv_and_reply_empty_success(Stream,
% fun(Reqs) -> fun(Reqs) ->
% lists:foreach( lists:foreach(
% fun(#{conn := Conn, messages := Messages}) -> fun(#{conn := Conn, messages := Messages}) ->
% lists:foreach(fun(Message) -> lists:foreach(fun(Message) ->
% handle_out(Conn, ?TYPE_PUBLISH, Message) handle_out(Conn, ?TYPE_PUBLISH, Message)
% end, Messages) end, Messages)
% end, Reqs) end, Reqs)
% end). end).
% %%-------------------------------------------------------------------- %%--------------------------------------------------------------------
% %% The Protocol Example: %% The Protocol Example:
% %% CONN: %% CONN:
% %% {"type": 1, "clientinfo": {...}} %% {"type": 1, "clientinfo": {...}}
% %% %%
% %% CONNACK: %% CONNACK:
% %% {"type": 2, "code": 0} %% {"type": 2, "code": 0}
% %% %%
% %% PUBLISH: %% PUBLISH:
% %% {"type": 3, "topic": "xxx", "payload": "", "qos": 0} %% {"type": 3, "topic": "xxx", "payload": "", "qos": 0}
% %% %%
% %% PUBACK: %% PUBACK:
% %% {"type": 4, "code": 0} %% {"type": 4, "code": 0}
% %% %%
% %% SUBSCRIBE: %% SUBSCRIBE:
% %% {"type": 5, "topic": "xxx", "qos": 1} %% {"type": 5, "topic": "xxx", "qos": 1}
% %% %%
% %% SUBACK: %% SUBACK:
% %% {"type": 6, "code": 0} %% {"type": 6, "code": 0}
% %% %%
% %% DISCONNECT: %% DISCONNECT:
% %% {"type": 7, "code": 1} %% {"type": 7, "code": 1}
% %%-------------------------------------------------------------------- %%--------------------------------------------------------------------
% handle_in(Conn, ?TYPE_CONNECT, #{<<"clientinfo">> := ClientInfo, <<"password">> := Password}) -> handle_in(Conn, ?TYPE_CONNECT, #{<<"clientinfo">> := ClientInfo, <<"password">> := Password}) ->
% NClientInfo = maps:from_list([{binary_to_atom(K, utf8), V} || {K, V} <- maps:to_list(ClientInfo)]), NClientInfo = maps:from_list([{binary_to_atom(K, utf8), V} || {K, V} <- maps:to_list(ClientInfo)]),
% case ?authenticate(#{conn => Conn, clientinfo => NClientInfo, password => Password}) of case ?authenticate(#{conn => Conn, clientinfo => NClientInfo, password => Password}) of
% {ok, #{code := 'SUCCESS'}, _} -> {ok, #{code := 'SUCCESS'}, _} ->
% case maps:get(keepalive, NClientInfo, 0) of case maps:get(keepalive, NClientInfo, 0) of
% 0 -> ok; 0 -> ok;
% Intv -> Intv ->
% io:format("Try call start_timer with ~ps", [Intv]), io:format("Try call start_timer with ~ps", [Intv]),
% ?start_timer(#{conn => Conn, type => 'KEEPALIVE', interval => Intv}) ?start_timer(#{conn => Conn, type => 'KEEPALIVE', interval => Intv})
% end, end,
% handle_out(Conn, ?TYPE_CONNACK, 0); handle_out(Conn, ?TYPE_CONNACK, 0);
% _ -> _ ->
% handle_out(Conn, ?TYPE_CONNACK, 1), handle_out(Conn, ?TYPE_CONNACK, 1),
% ?close(#{conn => Conn}) ?close(#{conn => Conn})
% end; end;
% handle_in(Conn, ?TYPE_PUBLISH, #{<<"topic">> := Topic, handle_in(Conn, ?TYPE_PUBLISH, #{<<"topic">> := Topic,
% <<"qos">> := Qos, <<"qos">> := Qos,
% <<"payload">> := Payload}) -> <<"payload">> := Payload}) ->
% case ?publish(#{conn => Conn, topic => Topic, qos => Qos, payload => Payload}) of case ?publish(#{conn => Conn, topic => Topic, qos => Qos, payload => Payload}) of
% {ok, #{code := 'SUCCESS'}, _} -> {ok, #{code := 'SUCCESS'}, _} ->
% handle_out(Conn, ?TYPE_PUBACK, 0); handle_out(Conn, ?TYPE_PUBACK, 0);
% _ -> _ ->
% handle_out(Conn, ?TYPE_PUBACK, 1) handle_out(Conn, ?TYPE_PUBACK, 1)
% end; end;
% handle_in(Conn, ?TYPE_SUBSCRIBE, #{<<"qos">> := Qos, <<"topic">> := Topic}) -> handle_in(Conn, ?TYPE_SUBSCRIBE, #{<<"qos">> := Qos, <<"topic">> := Topic}) ->
% case ?subscribe(#{conn => Conn, topic => Topic, qos => Qos}) of case ?subscribe(#{conn => Conn, topic => Topic, qos => Qos}) of
% {ok, #{code := 'SUCCESS'}, _} -> {ok, #{code := 'SUCCESS'}, _} ->
% handle_out(Conn, ?TYPE_SUBACK, 0); handle_out(Conn, ?TYPE_SUBACK, 0);
% _ -> _ ->
% handle_out(Conn, ?TYPE_SUBACK, 1) handle_out(Conn, ?TYPE_SUBACK, 1)
% end; end;
% handle_in(Conn, ?TYPE_UNSUBSCRIBE, #{<<"topic">> := Topic}) -> handle_in(Conn, ?TYPE_UNSUBSCRIBE, #{<<"topic">> := Topic}) ->
% case ?unsubscribe(#{conn => Conn, topic => Topic}) of case ?unsubscribe(#{conn => Conn, topic => Topic}) of
% {ok, #{code := 'SUCCESS'}, _} -> {ok, #{code := 'SUCCESS'}, _} ->
% handle_out(Conn, ?TYPE_UNSUBACK, 0); handle_out(Conn, ?TYPE_UNSUBACK, 0);
% _ -> _ ->
% handle_out(Conn, ?TYPE_UNSUBACK, 1) handle_out(Conn, ?TYPE_UNSUBACK, 1)
% end; end;
% handle_in(Conn, ?TYPE_DISCONNECT, _) -> handle_in(Conn, ?TYPE_DISCONNECT, _) ->
% ?close(#{conn => Conn}). ?close(#{conn => Conn}).
% handle_out(Conn, ?TYPE_CONNACK, Code) -> handle_out(Conn, ?TYPE_CONNACK, Code) ->
% ?send(#{conn => Conn, bytes => frame_connack(Code)}); ?send(#{conn => Conn, bytes => frame_connack(Code)});
% handle_out(Conn, ?TYPE_PUBACK, Code) -> handle_out(Conn, ?TYPE_PUBACK, Code) ->
% ?send(#{conn => Conn, bytes => frame_puback(Code)}); ?send(#{conn => Conn, bytes => frame_puback(Code)});
% handle_out(Conn, ?TYPE_SUBACK, Code) -> handle_out(Conn, ?TYPE_SUBACK, Code) ->
% ?send(#{conn => Conn, bytes => frame_suback(Code)}); ?send(#{conn => Conn, bytes => frame_suback(Code)});
% handle_out(Conn, ?TYPE_UNSUBACK, Code) -> handle_out(Conn, ?TYPE_UNSUBACK, Code) ->
% ?send(#{conn => Conn, bytes => frame_unsuback(Code)}); ?send(#{conn => Conn, bytes => frame_unsuback(Code)});
% handle_out(Conn, ?TYPE_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) -> handle_out(Conn, ?TYPE_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) ->
% ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}). ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}).
% handle_out(Conn, ?TYPE_DISCONNECT) -> handle_out(Conn, ?TYPE_DISCONNECT) ->
% ?send(#{conn => Conn, bytes => frame_disconnect()}). ?send(#{conn => Conn, bytes => frame_disconnect()}).
% %%-------------------------------------------------------------------- %%--------------------------------------------------------------------
% %% Frame %% Frame
% frame_connect(ClientInfo, Password) -> frame_connect(ClientInfo, Password) ->
% emqx_json:encode(#{type => ?TYPE_CONNECT, emqx_json:encode(#{type => ?TYPE_CONNECT,
% clientinfo => ClientInfo, clientinfo => ClientInfo,
% password => Password}). password => Password}).
% frame_connack(Code) -> frame_connack(Code) ->
% emqx_json:encode(#{type => ?TYPE_CONNACK, code => Code}). emqx_json:encode(#{type => ?TYPE_CONNACK, code => Code}).
% frame_publish(Topic, Qos, Payload) -> frame_publish(Topic, Qos, Payload) ->
% emqx_json:encode(#{type => ?TYPE_PUBLISH, emqx_json:encode(#{type => ?TYPE_PUBLISH,
% topic => Topic, topic => Topic,
% qos => Qos, qos => Qos,
% payload => Payload}). payload => Payload}).
% frame_puback(Code) -> frame_puback(Code) ->
% emqx_json:encode(#{type => ?TYPE_PUBACK, code => Code}). emqx_json:encode(#{type => ?TYPE_PUBACK, code => Code}).
% frame_subscribe(Topic, Qos) -> frame_subscribe(Topic, Qos) ->
% emqx_json:encode(#{type => ?TYPE_SUBSCRIBE, topic => Topic, qos => Qos}). emqx_json:encode(#{type => ?TYPE_SUBSCRIBE, topic => Topic, qos => Qos}).
% frame_suback(Code) -> frame_suback(Code) ->
% emqx_json:encode(#{type => ?TYPE_SUBACK, code => Code}). emqx_json:encode(#{type => ?TYPE_SUBACK, code => Code}).
% frame_unsubscribe(Topic) -> frame_unsubscribe(Topic) ->
% emqx_json:encode(#{type => ?TYPE_UNSUBSCRIBE, topic => Topic}). emqx_json:encode(#{type => ?TYPE_UNSUBSCRIBE, topic => Topic}).
% frame_unsuback(Code) -> frame_unsuback(Code) ->
% emqx_json:encode(#{type => ?TYPE_UNSUBACK, code => Code}). emqx_json:encode(#{type => ?TYPE_UNSUBACK, code => Code}).
% frame_disconnect() -> frame_disconnect() ->
% emqx_json:encode(#{type => ?TYPE_DISCONNECT}). emqx_json:encode(#{type => ?TYPE_DISCONNECT}).