From 7f1d245a8f5857c24984be969d2326509fed6977 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 22 Jul 2021 22:05:53 +0800 Subject: [PATCH] test(gw): refine exproto tests --- apps/emqx_gateway/test/emqx_exproto_SUITE.erl | 823 +++++++++--------- .../test/emqx_exproto_echo_svr.erl | 450 +++++----- 2 files changed, 641 insertions(+), 632 deletions(-) diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 802204c14..221d4d7f4 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -16,439 +16,448 @@ -module(emqx_exproto_SUITE). -% -compile(export_all). -% -compile(nowarn_export_all). - -% -import(emqx_exproto_echo_svr, -% [ frame_connect/2 -% , frame_connack/1 -% , frame_publish/3 -% , frame_puback/1 -% , frame_subscribe/2 -% , frame_suback/1 -% , frame_unsubscribe/1 -% , frame_unsuback/1 -% , frame_disconnect/0 -% ]). - -% -include_lib("emqx/include/emqx.hrl"). -% -include_lib("emqx/include/emqx_mqtt.hrl"). - -% -define(TCPOPTS, [binary, {active, false}]). -% -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]). - -% %%-------------------------------------------------------------------- -% %% Setups -% %%-------------------------------------------------------------------- - -% all() -> -% [{group, Name} || Name <- metrics()]. - -% groups() -> -% Cases = emqx_ct:all(?MODULE), -% [{Name, Cases} || Name <- metrics()]. - -% %% @private -% metrics() -> -% [tcp, ssl, udp, dtls]. - -% init_per_group(GrpName, Cfg) -> -% put(grpname, GrpName), -% Svrs = emqx_exproto_echo_svr:start(), -% emqx_ct_helpers:start_apps([emqx_exproto], fun set_special_cfg/1), -% emqx_logger:set_log_level(debug), -% [{servers, Svrs}, {listener_type, GrpName} | Cfg]. - -% end_per_group(_, Cfg) -> -% emqx_ct_helpers:stop_apps([emqx_exproto]), -% emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)). - -% set_special_cfg(emqx_exproto) -> -% LisType = get(grpname), -% Listeners = application:get_env(emqx_exproto, listeners, []), -% SockOpts = socketopts(LisType), -% UpgradeOpts = fun(Opts) -> -% Opts2 = lists:keydelete(tcp_options, 1, Opts), -% Opts3 = lists:keydelete(ssl_options, 1, Opts2), -% Opts4 = lists:keydelete(udp_options, 1, Opts3), -% Opts5 = lists:keydelete(dtls_options, 1, Opts4), -% SockOpts ++ Opts5 -% end, -% NListeners = [{Proto, LisType, LisOn, UpgradeOpts(Opts)} -% || {Proto, _Type, LisOn, Opts} <- Listeners], -% application:set_env(emqx_exproto, listeners, NListeners); -% set_special_cfg(emqx) -> -% application:set_env(emqx, allow_anonymous, true), -% application:set_env(emqx, enable_acl_cache, false), -% ok. - -% %%-------------------------------------------------------------------- -% %% Tests cases -% %%-------------------------------------------------------------------- - -% t_start_stop(_) -> -% ok. - -% t_mountpoint_echo(Cfg) -> -% SockType = proplists:get_value(listener_type, Cfg), -% Sock = open(SockType), - -% Client = #{proto_name => <<"demo">>, -% proto_ver => <<"v0.1">>, -% clientid => <<"test_client_1">>, -% mountpoint => <<"ct/">> -% }, -% Password = <<"123456">>, - -% ConnBin = frame_connect(Client, Password), -% ConnAckBin = frame_connack(0), - -% send(Sock, ConnBin), -% {ok, ConnAckBin} = recv(Sock, 5000), - -% SubBin = frame_subscribe(<<"t/dn">>, 1), -% SubAckBin = frame_suback(0), - -% send(Sock, SubBin), -% {ok, SubAckBin} = recv(Sock, 5000), - -% emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)), -% PubBin1 = frame_publish(<<"t/dn">>, 0, <<"echo">>), -% {ok, PubBin1} = recv(Sock, 5000), - -% PubBin2 = frame_publish(<<"t/up">>, 0, <<"echo">>), -% PubAckBin = frame_puback(0), - -% emqx:subscribe(<<"ct/t/up">>), - -% send(Sock, PubBin2), -% {ok, PubAckBin} = recv(Sock, 5000), - -% receive -% {deliver, _, _} -> ok -% after 1000 -> -% error(echo_not_running) -% end, -% close(Sock). - -% t_auth_deny(Cfg) -> -% SockType = proplists:get_value(listener_type, Cfg), -% Sock = open(SockType), - -% Client = #{proto_name => <<"demo">>, -% proto_ver => <<"v0.1">>, -% clientid => <<"test_client_1">> -% }, -% Password = <<"123456">>, - -% ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), -% ok = meck:expect(emqx_access_control, authenticate, -% fun(_) -> {error, ?RC_NOT_AUTHORIZED} end), - -% ConnBin = frame_connect(Client, Password), -% ConnAckBin = frame_connack(1), - -% send(Sock, ConnBin), -% {ok, ConnAckBin} = recv(Sock, 5000), - -% SockType =/= udp andalso begin -% {error, closed} = recv(Sock, 5000) -% end, -% meck:unload([emqx_access_control]). - -% t_acl_deny(Cfg) -> -% SockType = proplists:get_value(listener_type, Cfg), -% Sock = open(SockType), - -% Client = #{proto_name => <<"demo">>, -% proto_ver => <<"v0.1">>, -% clientid => <<"test_client_1">> -% }, -% Password = <<"123456">>, - -% ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), -% ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end), - -% ConnBin = frame_connect(Client, Password), -% ConnAckBin = frame_connack(0), - -% send(Sock, ConnBin), -% {ok, ConnAckBin} = recv(Sock, 5000), +-compile(export_all). +-compile(nowarn_export_all). + +-import(emqx_exproto_echo_svr, + [ frame_connect/2 + , frame_connack/1 + , frame_publish/3 + , frame_puback/1 + , frame_subscribe/2 + , frame_suback/1 + , frame_unsubscribe/1 + , frame_unsuback/1 + , frame_disconnect/0 + ]). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). + +-define(TCPOPTS, [binary, {active, false}]). +-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> + [{group, Name} || Name <- metrics()]. + +groups() -> + Cases = emqx_ct:all(?MODULE), + [{Name, Cases} || Name <- metrics()]. + +%% @private +metrics() -> + [tcp, ssl, udp, dtls]. + +init_per_group(GrpName, Cfg) -> + put(grpname, GrpName), + Svrs = emqx_exproto_echo_svr:start(), + emqx_ct_helpers:start_apps([emqx_gateway], fun set_special_cfg/1), + emqx_logger:set_log_level(debug), + [{servers, Svrs}, {listener_type, GrpName} | Cfg]. + +end_per_group(_, Cfg) -> + emqx_ct_helpers:stop_apps([emqx_gateway]), + emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)). + +set_special_cfg(emqx_gateway) -> + LisType = get(grpname), + emqx_config:put( + [emqx_gateway, exproto], + #{'1' => + #{authenticator => allow_anonymous, + server => #{bind => 9100}, + handler => #{address => "http://127.0.0.1:9001"}, + listener => listener_confs(LisType) + }}), + + Listeners = application:get_env(emqx_exproto, listeners, []), + SockOpts = socketopts(LisType), + UpgradeOpts = fun(Opts) -> + Opts2 = lists:keydelete(tcp_options, 1, Opts), + Opts3 = lists:keydelete(ssl_options, 1, Opts2), + Opts4 = lists:keydelete(udp_options, 1, Opts3), + Opts5 = lists:keydelete(dtls_options, 1, Opts4), + SockOpts ++ Opts5 + end; + +set_special_cfg(_App) -> + ok. + +listener_confs(Type) -> + Default = #{bind => 7993, acceptors => 8}, + #{Type => #{'1' => maps:merge(Default, maps:from_list(socketopts(Type)))}}. + +%%-------------------------------------------------------------------- +%% Tests cases +%%-------------------------------------------------------------------- + +t_start_stop(_) -> + ok. + +t_mountpoint_echo(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">>, + mountpoint => <<"ct/">> + }, + Password = <<"123456">>, + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + SubBin = frame_subscribe(<<"t/dn">>, 1), + SubAckBin = frame_suback(0), + + send(Sock, SubBin), + {ok, SubAckBin} = recv(Sock, 5000), + + emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)), + PubBin1 = frame_publish(<<"t/dn">>, 0, <<"echo">>), + {ok, PubBin1} = recv(Sock, 5000), + + PubBin2 = frame_publish(<<"t/up">>, 0, <<"echo">>), + PubAckBin = frame_puback(0), + + emqx:subscribe(<<"ct/t/up">>), -% SubBin = frame_subscribe(<<"t/#">>, 1), -% SubAckBin = frame_suback(1), + send(Sock, PubBin2), + {ok, PubAckBin} = recv(Sock, 5000), -% send(Sock, SubBin), -% {ok, SubAckBin} = recv(Sock, 5000), + receive + {deliver, _, _} -> ok + after 1000 -> + error(echo_not_running) + end, + close(Sock). + +t_auth_deny(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, + + ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_access_control, authenticate, + fun(_) -> {error, ?RC_NOT_AUTHORIZED} end), + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(1), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + SockType =/= udp andalso begin + {error, closed} = recv(Sock, 5000) + end, + meck:unload([emqx_access_control]). + +t_acl_deny(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, -% emqx:publish(emqx_message:make(<<"t/dn">>, <<"echo">>)), + ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end), -% PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>), -% PubBinFailedAck = frame_puback(1), -% PubBinSuccesAck = frame_puback(0), + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), -% send(Sock, PubBin), -% {ok, PubBinFailedAck} = recv(Sock, 5000), + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), -% meck:unload([emqx_access_control]), + SubBin = frame_subscribe(<<"t/#">>, 1), + SubAckBin = frame_suback(1), -% send(Sock, PubBin), -% {ok, PubBinSuccesAck} = recv(Sock, 5000), -% close(Sock). + send(Sock, SubBin), + {ok, SubAckBin} = recv(Sock, 5000), -% t_keepalive_timeout(Cfg) -> -% SockType = proplists:get_value(listener_type, Cfg), -% Sock = open(SockType), + emqx:publish(emqx_message:make(<<"t/dn">>, <<"echo">>)), -% Client = #{proto_name => <<"demo">>, -% proto_ver => <<"v0.1">>, -% clientid => <<"test_client_1">>, -% keepalive => 2 -% }, -% Password = <<"123456">>, + PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>), + PubBinFailedAck = frame_puback(1), + PubBinSuccesAck = frame_puback(0), -% ConnBin = frame_connect(Client, Password), -% ConnAckBin = frame_connack(0), + send(Sock, PubBin), + {ok, PubBinFailedAck} = recv(Sock, 5000), -% send(Sock, ConnBin), -% {ok, ConnAckBin} = recv(Sock, 5000), + meck:unload([emqx_access_control]), -% DisconnectBin = frame_disconnect(), -% {ok, DisconnectBin} = recv(Sock, 10000), + send(Sock, PubBin), + {ok, PubBinSuccesAck} = recv(Sock, 5000), + close(Sock). -% SockType =/= udp andalso begin -% {error, closed} = recv(Sock, 5000) -% end, ok. +t_keepalive_timeout(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), -% t_hook_connected_disconnected(Cfg) -> -% SockType = proplists:get_value(listener_type, Cfg), -% Sock = open(SockType), + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">>, + keepalive => 2 + }, + Password = <<"123456">>, -% Client = #{proto_name => <<"demo">>, -% proto_ver => <<"v0.1">>, -% clientid => <<"test_client_1">> -% }, -% Password = <<"123456">>, + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), -% ConnBin = frame_connect(Client, Password), -% ConnAckBin = frame_connack(0), + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), -% Parent = self(), -% emqx:hook('client.connected', {?MODULE, hook_fun1, [Parent]}), -% emqx:hook('client.disconnected',{?MODULE, hook_fun2, [Parent]}), + DisconnectBin = frame_disconnect(), + {ok, DisconnectBin} = recv(Sock, 10000), -% send(Sock, ConnBin), -% {ok, ConnAckBin} = recv(Sock, 5000), + SockType =/= udp andalso begin + {error, closed} = recv(Sock, 5000) + end, ok. -% receive -% connected -> ok -% after 1000 -> -% error(hook_is_not_running) -% end, +t_hook_connected_disconnected(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), -% DisconnectBin = frame_disconnect(), -% send(Sock, DisconnectBin), + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, -% receive -% disconnected -> ok -% after 1000 -> -% error(hook_is_not_running) -% end, + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), -% SockType =/= udp andalso begin -% {error, closed} = recv(Sock, 5000) -% end, -% emqx:unhook('client.connected', {?MODULE, hook_fun1}), -% emqx:unhook('client.disconnected', {?MODULE, hook_fun2}). + Parent = self(), + emqx:hook('client.connected', {?MODULE, hook_fun1, [Parent]}), + emqx:hook('client.disconnected',{?MODULE, hook_fun2, [Parent]}), -% t_hook_session_subscribed_unsubscribed(Cfg) -> -% SockType = proplists:get_value(listener_type, Cfg), -% Sock = open(SockType), + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), -% Client = #{proto_name => <<"demo">>, -% proto_ver => <<"v0.1">>, -% clientid => <<"test_client_1">> -% }, -% Password = <<"123456">>, + receive + connected -> ok + after 1000 -> + error(hook_is_not_running) + end, -% ConnBin = frame_connect(Client, Password), -% ConnAckBin = frame_connack(0), + DisconnectBin = frame_disconnect(), + send(Sock, DisconnectBin), -% send(Sock, ConnBin), -% {ok, ConnAckBin} = recv(Sock, 5000), + receive + disconnected -> ok + after 1000 -> + error(hook_is_not_running) + end, -% Parent = self(), -% emqx:hook('session.subscribed', {?MODULE, hook_fun3, [Parent]}), -% emqx:hook('session.unsubscribed', {?MODULE, hook_fun4, [Parent]}), + SockType =/= udp andalso begin + {error, closed} = recv(Sock, 5000) + end, + emqx:unhook('client.connected', {?MODULE, hook_fun1}), + emqx:unhook('client.disconnected', {?MODULE, hook_fun2}). -% SubBin = frame_subscribe(<<"t/#">>, 1), -% SubAckBin = frame_suback(0), - -% send(Sock, SubBin), -% {ok, SubAckBin} = recv(Sock, 5000), - -% receive -% subscribed -> ok -% after 1000 -> -% error(hook_is_not_running) -% end, - -% UnsubBin = frame_unsubscribe(<<"t/#">>), -% UnsubAckBin = frame_unsuback(0), - -% send(Sock, UnsubBin), -% {ok, UnsubAckBin} = recv(Sock, 5000), - -% receive -% unsubscribed -> ok -% after 1000 -> -% error(hook_is_not_running) -% end, - -% close(Sock), -% emqx:unhook('session.subscribed', {?MODULE, hook_fun3}), -% emqx:unhook('session.unsubscribed', {?MODULE, hook_fun4}). - -% t_hook_message_delivered(Cfg) -> -% SockType = proplists:get_value(listener_type, Cfg), -% Sock = open(SockType), - -% Client = #{proto_name => <<"demo">>, -% proto_ver => <<"v0.1">>, -% clientid => <<"test_client_1">> -% }, -% Password = <<"123456">>, - -% ConnBin = frame_connect(Client, Password), -% ConnAckBin = frame_connack(0), - -% send(Sock, ConnBin), -% {ok, ConnAckBin} = recv(Sock, 5000), - -% SubBin = frame_subscribe(<<"t/#">>, 1), -% SubAckBin = frame_suback(0), - -% send(Sock, SubBin), -% {ok, SubAckBin} = recv(Sock, 5000), - -% emqx:hook('message.delivered', {?MODULE, hook_fun5, []}), - -% emqx:publish(emqx_message:make(<<"t/dn">>, <<"1">>)), -% PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>), -% {ok, PubBin1} = recv(Sock, 5000), - -% close(Sock), -% emqx:unhook('message.delivered', {?MODULE, hook_fun5}). - -% %%-------------------------------------------------------------------- -% %% Utils - -% hook_fun1(_, _, Parent) -> Parent ! connected, ok. -% hook_fun2(_, _, _, Parent) -> Parent ! disconnected, ok. - -% hook_fun3(_, _, _, Parent) -> Parent ! subscribed, ok. -% hook_fun4(_, _, _, Parent) -> Parent ! unsubscribed, ok. - -% hook_fun5(_, Msg) -> {ok, Msg#message{payload = <<"2">>}}. - -% rand_bytes() -> -% crypto:strong_rand_bytes(rand:uniform(256)). - -% %%-------------------------------------------------------------------- -% %% Sock funcs - -% open(tcp) -> -% {ok, Sock} = gen_tcp:connect("127.0.0.1", 7993, ?TCPOPTS), -% {tcp, Sock}; -% open(udp) -> -% {ok, Sock} = gen_udp:open(0, ?TCPOPTS), -% {udp, Sock}; -% open(ssl) -> -% SslOpts = client_ssl_opts(), -% {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts), -% {ssl, SslSock}; -% open(dtls) -> -% SslOpts = client_ssl_opts(), -% {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts), -% {dtls, SslSock}. - -% send({tcp, Sock}, Bin) -> -% gen_tcp:send(Sock, Bin); -% send({udp, Sock}, Bin) -> -% gen_udp:send(Sock, "127.0.0.1", 7993, Bin); -% send({ssl, Sock}, Bin) -> -% ssl:send(Sock, Bin); -% send({dtls, Sock}, Bin) -> -% ssl:send(Sock, Bin). - -% recv({tcp, Sock}, Ts) -> -% gen_tcp:recv(Sock, 0, Ts); -% recv({udp, Sock}, Ts) -> -% {ok, {_, _, Bin}} = gen_udp:recv(Sock, 0, Ts), -% {ok, Bin}; -% recv({ssl, Sock}, Ts) -> -% ssl:recv(Sock, 0, Ts); -% recv({dtls, Sock}, Ts) -> -% ssl:recv(Sock, 0, Ts). - -% close({tcp, Sock}) -> -% gen_tcp:close(Sock); -% close({udp, Sock}) -> -% gen_udp:close(Sock); -% close({ssl, Sock}) -> -% ssl:close(Sock); -% close({dtls, Sock}) -> -% ssl:close(Sock). - -% %%-------------------------------------------------------------------- -% %% Server-Opts - -% socketopts(tcp) -> -% [{tcp_options, tcp_opts()}]; -% socketopts(ssl) -> -% [{tcp_options, tcp_opts()}, -% {ssl_options, ssl_opts()}]; -% socketopts(udp) -> -% [{udp_options, udp_opts()}]; -% socketopts(dtls) -> -% [{udp_options, udp_opts()}, -% {dtls_options, dtls_opts()}]. - -% tcp_opts() -> -% [{send_timeout, 15000}, -% {send_timeout_close, true}, -% {backlog, 100}, -% {nodelay, true} | udp_opts()]. - -% udp_opts() -> -% [{recbuf, 1024}, -% {sndbuf, 1024}, -% {buffer, 1024}, -% {reuseaddr, true}]. - -% ssl_opts() -> -% Certs = certs("key.pem", "cert.pem", "cacert.pem"), -% [{versions, emqx_tls_lib:default_versions()}, -% {ciphers, emqx_tls_lib:default_ciphers()}, -% {verify, verify_peer}, -% {fail_if_no_peer_cert, true}, -% {secure_renegotiate, false}, -% {reuse_sessions, true}, -% {honor_cipher_order, true}]++Certs. - -% dtls_opts() -> -% Opts = ssl_opts(), -% lists:keyreplace(versions, 1, Opts, {versions, ['dtlsv1.2', 'dtlsv1']}). - -% %%-------------------------------------------------------------------- -% %% Client-Opts - -% client_ssl_opts() -> -% certs( "client-key.pem", "client-cert.pem", "cacert.pem" ). - -% certs( Key, Cert, CACert ) -> -% CertsPath = emqx_ct_helpers:deps_path(emqx, "etc/certs"), -% [ { keyfile, filename:join([ CertsPath, Key ]) }, -% { certfile, filename:join([ CertsPath, Cert ]) }, -% { cacertfile, filename:join([ CertsPath, CACert ]) } ]. +t_hook_session_subscribed_unsubscribed(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + Parent = self(), + emqx:hook('session.subscribed', {?MODULE, hook_fun3, [Parent]}), + emqx:hook('session.unsubscribed', {?MODULE, hook_fun4, [Parent]}), + + SubBin = frame_subscribe(<<"t/#">>, 1), + SubAckBin = frame_suback(0), + + send(Sock, SubBin), + {ok, SubAckBin} = recv(Sock, 5000), + + receive + subscribed -> ok + after 1000 -> + error(hook_is_not_running) + end, + + UnsubBin = frame_unsubscribe(<<"t/#">>), + UnsubAckBin = frame_unsuback(0), + + send(Sock, UnsubBin), + {ok, UnsubAckBin} = recv(Sock, 5000), + + receive + unsubscribed -> ok + after 1000 -> + error(hook_is_not_running) + end, + + close(Sock), + emqx:unhook('session.subscribed', {?MODULE, hook_fun3}), + emqx:unhook('session.unsubscribed', {?MODULE, hook_fun4}). + +t_hook_message_delivered(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + SubBin = frame_subscribe(<<"t/#">>, 1), + SubAckBin = frame_suback(0), + + send(Sock, SubBin), + {ok, SubAckBin} = recv(Sock, 5000), + + emqx:hook('message.delivered', {?MODULE, hook_fun5, []}), + + emqx:publish(emqx_message:make(<<"t/dn">>, <<"1">>)), + PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>), + {ok, PubBin1} = recv(Sock, 5000), + + close(Sock), + emqx:unhook('message.delivered', {?MODULE, hook_fun5}). + +%%-------------------------------------------------------------------- +%% Utils + +hook_fun1(_, _, Parent) -> Parent ! connected, ok. +hook_fun2(_, _, _, Parent) -> Parent ! disconnected, ok. + +hook_fun3(_, _, _, Parent) -> Parent ! subscribed, ok. +hook_fun4(_, _, _, Parent) -> Parent ! unsubscribed, ok. + +hook_fun5(_, Msg) -> {ok, Msg#message{payload = <<"2">>}}. + +rand_bytes() -> + crypto:strong_rand_bytes(rand:uniform(256)). + +%%-------------------------------------------------------------------- +%% Sock funcs + +open(tcp) -> + {ok, Sock} = gen_tcp:connect("127.0.0.1", 7993, ?TCPOPTS), + {tcp, Sock}; +open(udp) -> + {ok, Sock} = gen_udp:open(0, ?TCPOPTS), + {udp, Sock}; +open(ssl) -> + SslOpts = client_ssl_opts(), + {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts), + {ssl, SslSock}; +open(dtls) -> + SslOpts = client_ssl_opts(), + {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts), + {dtls, SslSock}. + +send({tcp, Sock}, Bin) -> + gen_tcp:send(Sock, Bin); +send({udp, Sock}, Bin) -> + gen_udp:send(Sock, "127.0.0.1", 7993, Bin); +send({ssl, Sock}, Bin) -> + ssl:send(Sock, Bin); +send({dtls, Sock}, Bin) -> + ssl:send(Sock, Bin). + +recv({tcp, Sock}, Ts) -> + gen_tcp:recv(Sock, 0, Ts); +recv({udp, Sock}, Ts) -> + {ok, {_, _, Bin}} = gen_udp:recv(Sock, 0, Ts), + {ok, Bin}; +recv({ssl, Sock}, Ts) -> + ssl:recv(Sock, 0, Ts); +recv({dtls, Sock}, Ts) -> + ssl:recv(Sock, 0, Ts). + +close({tcp, Sock}) -> + gen_tcp:close(Sock); +close({udp, Sock}) -> + gen_udp:close(Sock); +close({ssl, Sock}) -> + ssl:close(Sock); +close({dtls, Sock}) -> + ssl:close(Sock). + +%%-------------------------------------------------------------------- +%% Server-Opts + +socketopts(tcp) -> + [{tcp_options, tcp_opts()}]; +socketopts(ssl) -> + [{tcp_options, tcp_opts()}, + {ssl_options, ssl_opts()}]; +socketopts(udp) -> + [{udp_options, udp_opts()}]; +socketopts(dtls) -> + [{udp_options, udp_opts()}, + {dtls_options, dtls_opts()}]. + +tcp_opts() -> + [{send_timeout, 15000}, + {send_timeout_close, true}, + {backlog, 100}, + {nodelay, true} | udp_opts()]. + +udp_opts() -> + [{recbuf, 1024}, + {sndbuf, 1024}, + {buffer, 1024}, + {reuseaddr, true}]. + +ssl_opts() -> + Certs = certs("key.pem", "cert.pem", "cacert.pem"), + [{versions, emqx_tls_lib:default_versions()}, + {ciphers, emqx_tls_lib:default_ciphers()}, + {verify, verify_peer}, + {fail_if_no_peer_cert, true}, + {secure_renegotiate, false}, + {reuse_sessions, true}, + {honor_cipher_order, true}]++Certs. + +dtls_opts() -> + Opts = ssl_opts(), + lists:keyreplace(versions, 1, Opts, {versions, ['dtlsv1.2', 'dtlsv1']}). + +%%-------------------------------------------------------------------- +%% Client-Opts + +client_ssl_opts() -> + certs( "client-key.pem", "client-cert.pem", "cacert.pem" ). + +certs( Key, Cert, CACert ) -> + CertsPath = emqx_ct_helpers:deps_path(emqx, "etc/certs"), + [ { keyfile, filename:join([ CertsPath, Key ]) }, + { certfile, filename:join([ CertsPath, Cert ]) }, + { cacertfile, filename:join([ CertsPath, CACert ]) } ]. diff --git a/apps/emqx_gateway/test/emqx_exproto_echo_svr.erl b/apps/emqx_gateway/test/emqx_exproto_echo_svr.erl index 653b481ef..69c64d4ca 100644 --- a/apps/emqx_gateway/test/emqx_exproto_echo_svr.erl +++ b/apps/emqx_gateway/test/emqx_exproto_echo_svr.erl @@ -16,263 +16,263 @@ -module(emqx_exproto_echo_svr). -% -behavior(emqx_exproto_v_1_connection_handler_bhvr). +-behavior(emqx_exproto_v_1_connection_handler_bhvr). -% -export([ start/0 -% , stop/1 -% ]). +-export([ start/0 + , stop/1 + ]). -% -export([ frame_connect/2 -% , frame_connack/1 -% , frame_publish/3 -% , frame_puback/1 -% , frame_subscribe/2 -% , frame_suback/1 -% , frame_unsubscribe/1 -% , frame_unsuback/1 -% , frame_disconnect/0 -% ]). +-export([ frame_connect/2 + , frame_connack/1 + , frame_publish/3 + , frame_puback/1 + , frame_subscribe/2 + , frame_suback/1 + , frame_unsubscribe/1 + , frame_unsuback/1 + , frame_disconnect/0 + ]). -% -export([ on_socket_created/2 -% , on_received_bytes/2 -% , on_socket_closed/2 -% , on_timer_timeout/2 -% , on_received_messages/2 -% ]). +-export([ on_socket_created/2 + , on_received_bytes/2 + , on_socket_closed/2 + , on_timer_timeout/2 + , on_received_messages/2 + ]). -% -define(LOG(Fmt, Args), io:format(standard_error, Fmt, Args)). +-define(LOG(Fmt, Args), io:format(standard_error, Fmt, Args)). -% -define(HTTP, #{grpc_opts => #{service_protos => [emqx_exproto_pb], -% services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}}, -% listen_opts => #{port => 9001, -% socket_options => []}, -% pool_opts => #{size => 8}, -% transport_opts => #{ssl => false}}). +-define(HTTP, #{grpc_opts => #{service_protos => [emqx_exproto_pb], + services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}}, + listen_opts => #{port => 9001, + socket_options => []}, + pool_opts => #{size => 8}, + transport_opts => #{ssl => false}}). -% -define(CLIENT, emqx_exproto_v_1_connection_adapter_client). +-define(CLIENT, emqx_exproto_v_1_connection_adapter_client). -% -define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})). -% -define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})). -% -define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})). -% -define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})). -% -define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})). -% -define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})). -% -define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})). +-define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})). +-define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})). +-define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})). +-define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})). +-define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})). +-define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})). +-define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})). -% -define(TYPE_CONNECT, 1). -% -define(TYPE_CONNACK, 2). -% -define(TYPE_PUBLISH, 3). -% -define(TYPE_PUBACK, 4). -% -define(TYPE_SUBSCRIBE, 5). -% -define(TYPE_SUBACK, 6). -% -define(TYPE_UNSUBSCRIBE, 7). -% -define(TYPE_UNSUBACK, 8). -% -define(TYPE_DISCONNECT, 9). +-define(TYPE_CONNECT, 1). +-define(TYPE_CONNACK, 2). +-define(TYPE_PUBLISH, 3). +-define(TYPE_PUBACK, 4). +-define(TYPE_SUBSCRIBE, 5). +-define(TYPE_SUBACK, 6). +-define(TYPE_UNSUBSCRIBE, 7). +-define(TYPE_UNSUBACK, 8). +-define(TYPE_DISCONNECT, 9). -% -define(loop_recv_and_reply_empty_success(Stream), -% ?loop_recv_and_reply_empty_success(Stream, fun(_) -> ok end)). +-define(loop_recv_and_reply_empty_success(Stream), + ?loop_recv_and_reply_empty_success(Stream, fun(_) -> ok end)). -% -define(loop_recv_and_reply_empty_success(Stream, Fun), -% begin -% LoopRecv = fun _Lp(_St) -> -% case grpc_stream:recv(_St) of -% {more, _Reqs, _NSt} -> -% ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]), -% Fun(_Reqs), _Lp(_NSt); -% {eos, _Reqs, _NSt} -> -% ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]), -% Fun(_Reqs), _NSt -% end -% end, -% NStream = LoopRecv(Stream), -% grpc_stream:reply(NStream, #{}), -% {ok, NStream} -% end). +-define(loop_recv_and_reply_empty_success(Stream, Fun), + begin + LoopRecv = fun _Lp(_St) -> + case grpc_stream:recv(_St) of + {more, _Reqs, _NSt} -> + ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]), + Fun(_Reqs), _Lp(_NSt); + {eos, _Reqs, _NSt} -> + ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]), + Fun(_Reqs), _NSt + end + end, + NStream = LoopRecv(Stream), + grpc_stream:reply(NStream, #{}), + {ok, NStream} + end). -% %%-------------------------------------------------------------------- -% %% APIs -% %%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- -% start() -> -% application:ensure_all_started(grpc), -% [start_channel(), start_server()]. +start() -> + application:ensure_all_started(grpc), + [start_channel(), start_server()]. -% start_channel() -> -% grpc_client_sup:create_channel_pool(ct_test_channel, "http://127.0.0.1:9100", #{}). +start_channel() -> + grpc_client_sup:create_channel_pool(ct_test_channel, "http://127.0.0.1:9100", #{}). -% start_server() -> -% Services = #{protos => [emqx_exproto_pb], -% services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE} -% }, -% Options = [], -% grpc:start_server(?MODULE, 9001, Services, Options). +start_server() -> + Services = #{protos => [emqx_exproto_pb], + services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE} + }, + Options = [], + grpc:start_server(?MODULE, 9001, Services, Options). -% stop([_ChannPid, _SvrPid]) -> -% grpc:stop_server(?MODULE), -% grpc_client_sup:stop_channel_pool(ct_test_channel). +stop([_ChannPid, _SvrPid]) -> + grpc:stop_server(?MODULE), + grpc_client_sup:stop_channel_pool(ct_test_channel). -% %%-------------------------------------------------------------------- -% %% Protocol Adapter callbacks -% %%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- +%% Protocol Adapter callbacks +%%-------------------------------------------------------------------- -% -spec on_socket_created(grpc_stream:stream(), grpc:metadata()) -% -> {ok, grpc_stream:stream()}. -% on_socket_created(Stream, _Md) -> -% ?loop_recv_and_reply_empty_success(Stream). +-spec on_socket_created(grpc_stream:stream(), grpc:metadata()) + -> {ok, grpc_stream:stream()}. +on_socket_created(Stream, _Md) -> + ?loop_recv_and_reply_empty_success(Stream). -% -spec on_socket_closed(grpc_stream:stream(), grpc:metadata()) -% -> {ok, grpc_stream:stream()}. -% on_socket_closed(Stream, _Md) -> -% ?loop_recv_and_reply_empty_success(Stream). +-spec on_socket_closed(grpc_stream:stream(), grpc:metadata()) + -> {ok, grpc_stream:stream()}. +on_socket_closed(Stream, _Md) -> + ?loop_recv_and_reply_empty_success(Stream). -% -spec on_received_bytes(grpc_stream:stream(), grpc:metadata()) -% -> {ok, grpc_stream:stream()}. -% on_received_bytes(Stream, _Md) -> -% ?loop_recv_and_reply_empty_success(Stream, -% fun(Reqs) -> -% lists:foreach( -% fun(#{conn := Conn, bytes := Bytes}) -> -% #{<<"type">> := Type} = Params = emqx_json:decode(Bytes, [return_maps]), -% _ = handle_in(Conn, Type, Params) -% end, Reqs) -% end). +-spec on_received_bytes(grpc_stream:stream(), grpc:metadata()) + -> {ok, grpc_stream:stream()}. +on_received_bytes(Stream, _Md) -> + ?loop_recv_and_reply_empty_success(Stream, + fun(Reqs) -> + lists:foreach( + fun(#{conn := Conn, bytes := Bytes}) -> + #{<<"type">> := Type} = Params = emqx_json:decode(Bytes, [return_maps]), + _ = handle_in(Conn, Type, Params) + end, Reqs) + end). -% -spec on_timer_timeout(grpc_stream:stream(), grpc:metadata()) -% -> {ok, grpc_stream:stream()}. -% on_timer_timeout(Stream, _Md) -> -% ?loop_recv_and_reply_empty_success(Stream, -% fun(Reqs) -> -% lists:foreach( -% fun(#{conn := Conn, type := 'KEEPALIVE'}) -> -% ?LOG("Close this connection ~p due to keepalive timeout", [Conn]), -% handle_out(Conn, ?TYPE_DISCONNECT), -% ?close(#{conn => Conn}) -% end, Reqs) -% end). +-spec on_timer_timeout(grpc_stream:stream(), grpc:metadata()) + -> {ok, grpc_stream:stream()}. +on_timer_timeout(Stream, _Md) -> + ?loop_recv_and_reply_empty_success(Stream, + fun(Reqs) -> + lists:foreach( + fun(#{conn := Conn, type := 'KEEPALIVE'}) -> + ?LOG("Close this connection ~p due to keepalive timeout", [Conn]), + handle_out(Conn, ?TYPE_DISCONNECT), + ?close(#{conn => Conn}) + end, Reqs) + end). -% -spec on_received_messages(grpc_stream:stream(), grpc:metadata()) -% -> {ok, grpc_stream:stream()}. -% on_received_messages(Stream, _Md) -> -% ?loop_recv_and_reply_empty_success(Stream, -% fun(Reqs) -> -% lists:foreach( -% fun(#{conn := Conn, messages := Messages}) -> -% lists:foreach(fun(Message) -> -% handle_out(Conn, ?TYPE_PUBLISH, Message) -% end, Messages) -% end, Reqs) -% end). +-spec on_received_messages(grpc_stream:stream(), grpc:metadata()) + -> {ok, grpc_stream:stream()}. +on_received_messages(Stream, _Md) -> + ?loop_recv_and_reply_empty_success(Stream, + fun(Reqs) -> + lists:foreach( + fun(#{conn := Conn, messages := Messages}) -> + lists:foreach(fun(Message) -> + handle_out(Conn, ?TYPE_PUBLISH, Message) + end, Messages) + end, Reqs) + end). -% %%-------------------------------------------------------------------- -% %% The Protocol Example: -% %% CONN: -% %% {"type": 1, "clientinfo": {...}} -% %% -% %% CONNACK: -% %% {"type": 2, "code": 0} -% %% -% %% PUBLISH: -% %% {"type": 3, "topic": "xxx", "payload": "", "qos": 0} -% %% -% %% PUBACK: -% %% {"type": 4, "code": 0} -% %% -% %% SUBSCRIBE: -% %% {"type": 5, "topic": "xxx", "qos": 1} -% %% -% %% SUBACK: -% %% {"type": 6, "code": 0} -% %% -% %% DISCONNECT: -% %% {"type": 7, "code": 1} -% %%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- +%% The Protocol Example: +%% CONN: +%% {"type": 1, "clientinfo": {...}} +%% +%% CONNACK: +%% {"type": 2, "code": 0} +%% +%% PUBLISH: +%% {"type": 3, "topic": "xxx", "payload": "", "qos": 0} +%% +%% PUBACK: +%% {"type": 4, "code": 0} +%% +%% SUBSCRIBE: +%% {"type": 5, "topic": "xxx", "qos": 1} +%% +%% SUBACK: +%% {"type": 6, "code": 0} +%% +%% DISCONNECT: +%% {"type": 7, "code": 1} +%%-------------------------------------------------------------------- -% handle_in(Conn, ?TYPE_CONNECT, #{<<"clientinfo">> := ClientInfo, <<"password">> := Password}) -> -% NClientInfo = maps:from_list([{binary_to_atom(K, utf8), V} || {K, V} <- maps:to_list(ClientInfo)]), -% case ?authenticate(#{conn => Conn, clientinfo => NClientInfo, password => Password}) of -% {ok, #{code := 'SUCCESS'}, _} -> -% case maps:get(keepalive, NClientInfo, 0) of -% 0 -> ok; -% Intv -> -% io:format("Try call start_timer with ~ps", [Intv]), -% ?start_timer(#{conn => Conn, type => 'KEEPALIVE', interval => Intv}) -% end, -% handle_out(Conn, ?TYPE_CONNACK, 0); -% _ -> -% handle_out(Conn, ?TYPE_CONNACK, 1), -% ?close(#{conn => Conn}) -% end; -% handle_in(Conn, ?TYPE_PUBLISH, #{<<"topic">> := Topic, -% <<"qos">> := Qos, -% <<"payload">> := Payload}) -> -% case ?publish(#{conn => Conn, topic => Topic, qos => Qos, payload => Payload}) of -% {ok, #{code := 'SUCCESS'}, _} -> -% handle_out(Conn, ?TYPE_PUBACK, 0); -% _ -> -% handle_out(Conn, ?TYPE_PUBACK, 1) -% end; -% handle_in(Conn, ?TYPE_SUBSCRIBE, #{<<"qos">> := Qos, <<"topic">> := Topic}) -> -% case ?subscribe(#{conn => Conn, topic => Topic, qos => Qos}) of -% {ok, #{code := 'SUCCESS'}, _} -> -% handle_out(Conn, ?TYPE_SUBACK, 0); -% _ -> -% handle_out(Conn, ?TYPE_SUBACK, 1) -% end; -% handle_in(Conn, ?TYPE_UNSUBSCRIBE, #{<<"topic">> := Topic}) -> -% case ?unsubscribe(#{conn => Conn, topic => Topic}) of -% {ok, #{code := 'SUCCESS'}, _} -> -% handle_out(Conn, ?TYPE_UNSUBACK, 0); -% _ -> -% handle_out(Conn, ?TYPE_UNSUBACK, 1) -% end; +handle_in(Conn, ?TYPE_CONNECT, #{<<"clientinfo">> := ClientInfo, <<"password">> := Password}) -> + NClientInfo = maps:from_list([{binary_to_atom(K, utf8), V} || {K, V} <- maps:to_list(ClientInfo)]), + case ?authenticate(#{conn => Conn, clientinfo => NClientInfo, password => Password}) of + {ok, #{code := 'SUCCESS'}, _} -> + case maps:get(keepalive, NClientInfo, 0) of + 0 -> ok; + Intv -> + io:format("Try call start_timer with ~ps", [Intv]), + ?start_timer(#{conn => Conn, type => 'KEEPALIVE', interval => Intv}) + end, + handle_out(Conn, ?TYPE_CONNACK, 0); + _ -> + handle_out(Conn, ?TYPE_CONNACK, 1), + ?close(#{conn => Conn}) + end; +handle_in(Conn, ?TYPE_PUBLISH, #{<<"topic">> := Topic, + <<"qos">> := Qos, + <<"payload">> := Payload}) -> + case ?publish(#{conn => Conn, topic => Topic, qos => Qos, payload => Payload}) of + {ok, #{code := 'SUCCESS'}, _} -> + handle_out(Conn, ?TYPE_PUBACK, 0); + _ -> + handle_out(Conn, ?TYPE_PUBACK, 1) + end; +handle_in(Conn, ?TYPE_SUBSCRIBE, #{<<"qos">> := Qos, <<"topic">> := Topic}) -> + case ?subscribe(#{conn => Conn, topic => Topic, qos => Qos}) of + {ok, #{code := 'SUCCESS'}, _} -> + handle_out(Conn, ?TYPE_SUBACK, 0); + _ -> + handle_out(Conn, ?TYPE_SUBACK, 1) + end; +handle_in(Conn, ?TYPE_UNSUBSCRIBE, #{<<"topic">> := Topic}) -> + case ?unsubscribe(#{conn => Conn, topic => Topic}) of + {ok, #{code := 'SUCCESS'}, _} -> + handle_out(Conn, ?TYPE_UNSUBACK, 0); + _ -> + handle_out(Conn, ?TYPE_UNSUBACK, 1) + end; -% handle_in(Conn, ?TYPE_DISCONNECT, _) -> -% ?close(#{conn => Conn}). +handle_in(Conn, ?TYPE_DISCONNECT, _) -> + ?close(#{conn => Conn}). -% handle_out(Conn, ?TYPE_CONNACK, Code) -> -% ?send(#{conn => Conn, bytes => frame_connack(Code)}); -% handle_out(Conn, ?TYPE_PUBACK, Code) -> -% ?send(#{conn => Conn, bytes => frame_puback(Code)}); -% handle_out(Conn, ?TYPE_SUBACK, Code) -> -% ?send(#{conn => Conn, bytes => frame_suback(Code)}); -% handle_out(Conn, ?TYPE_UNSUBACK, Code) -> -% ?send(#{conn => Conn, bytes => frame_unsuback(Code)}); -% handle_out(Conn, ?TYPE_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) -> -% ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}). +handle_out(Conn, ?TYPE_CONNACK, Code) -> + ?send(#{conn => Conn, bytes => frame_connack(Code)}); +handle_out(Conn, ?TYPE_PUBACK, Code) -> + ?send(#{conn => Conn, bytes => frame_puback(Code)}); +handle_out(Conn, ?TYPE_SUBACK, Code) -> + ?send(#{conn => Conn, bytes => frame_suback(Code)}); +handle_out(Conn, ?TYPE_UNSUBACK, Code) -> + ?send(#{conn => Conn, bytes => frame_unsuback(Code)}); +handle_out(Conn, ?TYPE_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) -> + ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}). -% handle_out(Conn, ?TYPE_DISCONNECT) -> -% ?send(#{conn => Conn, bytes => frame_disconnect()}). +handle_out(Conn, ?TYPE_DISCONNECT) -> + ?send(#{conn => Conn, bytes => frame_disconnect()}). -% %%-------------------------------------------------------------------- -% %% Frame +%%-------------------------------------------------------------------- +%% Frame -% frame_connect(ClientInfo, Password) -> -% emqx_json:encode(#{type => ?TYPE_CONNECT, -% clientinfo => ClientInfo, -% password => Password}). -% frame_connack(Code) -> -% emqx_json:encode(#{type => ?TYPE_CONNACK, code => Code}). +frame_connect(ClientInfo, Password) -> + emqx_json:encode(#{type => ?TYPE_CONNECT, + clientinfo => ClientInfo, + password => Password}). +frame_connack(Code) -> + emqx_json:encode(#{type => ?TYPE_CONNACK, code => Code}). -% frame_publish(Topic, Qos, Payload) -> -% emqx_json:encode(#{type => ?TYPE_PUBLISH, -% topic => Topic, -% qos => Qos, -% payload => Payload}). +frame_publish(Topic, Qos, Payload) -> + emqx_json:encode(#{type => ?TYPE_PUBLISH, + topic => Topic, + qos => Qos, + payload => Payload}). -% frame_puback(Code) -> -% emqx_json:encode(#{type => ?TYPE_PUBACK, code => Code}). +frame_puback(Code) -> + emqx_json:encode(#{type => ?TYPE_PUBACK, code => Code}). -% frame_subscribe(Topic, Qos) -> -% emqx_json:encode(#{type => ?TYPE_SUBSCRIBE, topic => Topic, qos => Qos}). +frame_subscribe(Topic, Qos) -> + emqx_json:encode(#{type => ?TYPE_SUBSCRIBE, topic => Topic, qos => Qos}). -% frame_suback(Code) -> -% emqx_json:encode(#{type => ?TYPE_SUBACK, code => Code}). +frame_suback(Code) -> + emqx_json:encode(#{type => ?TYPE_SUBACK, code => Code}). -% frame_unsubscribe(Topic) -> -% emqx_json:encode(#{type => ?TYPE_UNSUBSCRIBE, topic => Topic}). +frame_unsubscribe(Topic) -> + emqx_json:encode(#{type => ?TYPE_UNSUBSCRIBE, topic => Topic}). -% frame_unsuback(Code) -> -% emqx_json:encode(#{type => ?TYPE_UNSUBACK, code => Code}). +frame_unsuback(Code) -> + emqx_json:encode(#{type => ?TYPE_UNSUBACK, code => Code}). -% frame_disconnect() -> -% emqx_json:encode(#{type => ?TYPE_DISCONNECT}). +frame_disconnect() -> + emqx_json:encode(#{type => ?TYPE_DISCONNECT}).