diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl index 6b491a807..b5ac046ce 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl @@ -92,6 +92,8 @@ info(Channel) -> -spec info(list(atom()) | atom(), channel()) -> term(). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; +info(ctx, #channel{ctx = Ctx}) -> + Ctx; info(conninfo, #channel{conninfo = ConnInfo}) -> ConnInfo; info(zone, #channel{clientinfo = #{zone := Zone}}) -> @@ -279,6 +281,9 @@ handle_out({AckCode, Frame}, Channel) when -> {ok, [{outgoing, ack(AckCode, Frame)}], Channel}. +handle_out({AckCode, Frame}, Outgoings, Channel) when ?IS_ACK_CODE(AckCode) -> + {ok, [{outgoing, ack(AckCode, Frame)} | Outgoings], Channel}. + %%-------------------------------------------------------------------- %% Handle Delivers from broker to client %%-------------------------------------------------------------------- @@ -609,7 +614,7 @@ process_connect( _ = upstreaming(Frame, NChannel), %% XXX: connection_accepted is not defined by stomp protocol _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]), - {ok, NChannel}; + handle_out({?ACK_SUCCESS, Frame}, [{event, connected}], NChannel); {error, Reason} -> log( error, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl index 641f2c02d..8fd95a6b5 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl @@ -40,6 +40,10 @@ phase := phase() }. +-ifdef(TEST). +-export([serialize/1]). +-endif. + %%-------------------------------------------------------------------- %% Init a Parser %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_schema.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_schema.erl index bc86b9686..abc1623f6 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_schema.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_schema.erl @@ -7,12 +7,14 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). +-define(DEFAULT_MOUNTPOINT, <<"gbt32960/${clientid}">>). + %% config schema provides -export([fields/1, desc/1]). fields(gbt32960) -> [ - {mountpoint, emqx_gateway_schema:mountpoint()}, + {mountpoint, emqx_gateway_schema:mountpoint(?DEFAULT_MOUNTPOINT)}, {retry_interval, sc( emqx_schema:duration_ms(), diff --git a/apps/emqx_gateway_gbt32960/test/emqx_gbt32960_SUITE.erl b/apps/emqx_gateway_gbt32960/test/emqx_gbt32960_SUITE.erl new file mode 100644 index 000000000..16d56e28a --- /dev/null +++ b/apps/emqx_gateway_gbt32960/test/emqx_gbt32960_SUITE.erl @@ -0,0 +1,1444 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_gbt32960_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx_gbt32960.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(BYTE, 8 / big - integer). +-define(WORD, 16 / big - integer). +-define(DWORD, 32 / big - integer). + +-define(PORT, 7325). +-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). + +-define(CONF_DEFAULT, << + "\n" + "gateway.gbt32960 {\n" + " retry_interval = \"1s\"\n" + " listeners.tcp.default {\n" + " bind = 7325\n" + " }\n" + "}\n" +>>). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + application:load(emqx_gateway_gbt32960), + Apps = emqx_cth_suite:start( + [ + {emqx_conf, ?CONF_DEFAULT}, + emqx_gateway, + emqx_auth, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + emqx_common_test_http:create_default_app(), + [{suite_apps, Apps} | Config]. + +end_per_suite(Config) -> + emqx_common_test_http:delete_default_app(), + emqx_cth_suite:stop(?config(suite_apps, Config)), + ok. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% helper functions %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +encode(Cmd, Vin, Data) -> + encode(Cmd, ?ACK_IS_CMD, Vin, ?ENCRYPT_NONE, Data). + +encode(Cmd, Ack, Vin, Data) -> + encode(Cmd, Ack, Vin, ?ENCRYPT_NONE, Data). + +encode(Cmd, Ack, Vin, Encrypt, Data) -> + Size = byte_size(Data), + S1 = <>, + Crc = make_crc(S1, undefined), + Stream = <<"##", S1/binary, Crc:8>>, + ?LOGT("encode a packet=~p", [binary_to_hex_string(Stream)]), + Stream. + +make_crc(<<>>, Xor) -> Xor; +make_crc(<>, undefined) -> make_crc(Rest, C); +make_crc(<>, Xor) -> make_crc(Rest, C bxor Xor). + +make_time() -> + {Year, Mon, Day} = date(), + {Hour, Min, Sec} = time(), + Year1 = list_to_integer(string:substr(integer_to_list(Year), 3, 2)), + <>. + +binary_to_hex_string(Data) -> + lists:flatten([io_lib:format("~2.16.0B ", [X]) || <> <= Data]). + +to_json(#frame{cmd = Cmd, vin = Vin, encrypt = Encrypt, data = Data}) -> + emqx_utils_json:encode(#{'Cmd' => Cmd, 'Vin' => Vin, 'Encrypt' => Encrypt, 'Data' => Data}). + +get_published_msg() -> + receive + {deliver, _Topic, #message{topic = Topic, payload = Payload}} -> + {Topic, Payload} + after 5000 -> + error(timeout) + end. + +get_subscriptions() -> + lists:map(fun({_, Topic}) -> Topic end, ets:tab2list(emqx_subscription)). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%% test cases %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +login_first() -> + emqx:subscribe("gbt32960/+/upstream/#"), + + % + % send VEHICLE LOGIN + % + Time = <<12, 12, 29, 12, 19, 20>>, + Data = <