From d2430e70a8ef9f69709368f29c828ed36f713a35 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 6 Jul 2021 19:14:35 +0800 Subject: [PATCH 1/3] refactor(gw): move mqtt-sn to gateway --- apps/emqx_gateway/etc/emqx_gateway.conf | 43 +++++ .../etc/emqx_sn.conf | 0 .../etc}/priv/emqx_sn.schema | 0 .../src/bhvrs/emqx_gateway_impl.erl | 1 + apps/emqx_gateway/src/emqx_gateway_app.erl | 8 +- .../src/emqx_gateway_insta_sup.erl | 1 - .../emqx_gateway/src/emqx_gateway_metrics.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_schema.erl | 53 +++++- apps/emqx_gateway/src/emqx_gateway_utils.erl | 8 + .../src/mqttsn}/README.md | 25 +-- .../src/mqttsn}/emqx_sn_app.erl | 0 .../src/mqttsn}/emqx_sn_asleep_timer.erl | 0 .../src/mqttsn}/emqx_sn_broadcast.erl | 2 +- .../src/mqttsn}/emqx_sn_frame.erl | 2 +- .../src/mqttsn}/emqx_sn_gateway.erl | 17 +- apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl | 161 ++++++++++++++++++ .../src/mqttsn}/emqx_sn_registry.erl | 4 +- .../src/mqttsn}/emqx_sn_sup.erl | 12 +- .../src/mqttsn}/include/emqx_sn.hrl | 0 .../src/stomp/emqx_stomp_impl.erl | 22 +-- .../test/broadcast_test.py | 0 .../test/emqx_sn_frame_SUITE.erl | 0 .../test/emqx_sn_protocol_SUITE.erl | 0 .../test/emqx_sn_registry_SUITE.erl | 0 .../test}/intergration_test/Makefile | 0 .../test}/intergration_test/README.md | 0 .../add_emqx_sn_to_project.py | 0 .../intergration_test/client/case1_qos0pub.c | 0 .../intergration_test/client/case1_qos0sub.c | 0 .../intergration_test/client/case2_qos0pub.c | 0 .../intergration_test/client/case2_qos0sub.c | 0 .../intergration_test/client/case3_qos0pub.c | 0 .../intergration_test/client/case3_qos0sub.c | 0 .../intergration_test/client/case4_qos3pub.c | 0 .../intergration_test/client/case4_qos3sub.c | 0 .../intergration_test/client/case5_qos3pub.c | 0 .../intergration_test/client/case5_qos3sub.c | 0 .../intergration_test/client/case6_sleep.c | 0 .../client/case7_double_connect.c | 0 .../client/int_test_result.c | 0 .../client/int_test_result.h | 0 .../test}/intergration_test/disable_qos3.py | 0 .../test}/intergration_test/enable_qos3.py | 0 .../test/props/emqx_sn_proper_types.erl | 0 .../test/props/prop_emqx_sn_frame.erl | 0 apps/emqx_sn/.gitignore | 40 ----- apps/emqx_sn/examples/simple_example.erl | 126 -------------- apps/emqx_sn/examples/simple_example2.erl | 120 ------------- apps/emqx_sn/examples/simple_example3.erl | 120 ------------- apps/emqx_sn/examples/simple_example4.erl | 151 ---------------- apps/emqx_sn/rebar.config | 26 --- apps/emqx_sn/src/emqx_sn.app.src | 14 -- apps/emqx_sn/src/emqx_sn.appup.src | 19 --- apps/emqx_sn/vars | 8 - 54 files changed, 305 insertions(+), 680 deletions(-) rename apps/{emqx_sn => emqx_gateway}/etc/emqx_sn.conf (100%) rename apps/{emqx_sn => emqx_gateway/etc}/priv/emqx_sn.schema (100%) rename apps/{emqx_sn => emqx_gateway/src/mqttsn}/README.md (94%) rename apps/{emqx_sn/src => emqx_gateway/src/mqttsn}/emqx_sn_app.erl (100%) rename apps/{emqx_sn/src => emqx_gateway/src/mqttsn}/emqx_sn_asleep_timer.erl (100%) rename apps/{emqx_sn/src => emqx_gateway/src/mqttsn}/emqx_sn_broadcast.erl (98%) rename apps/{emqx_sn/src => emqx_gateway/src/mqttsn}/emqx_sn_frame.erl (99%) rename apps/{emqx_sn/src => emqx_gateway/src/mqttsn}/emqx_sn_gateway.erl (99%) create mode 100644 apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl rename apps/{emqx_sn/src => emqx_gateway/src/mqttsn}/emqx_sn_registry.erl (98%) rename apps/{emqx_sn/src => emqx_gateway/src/mqttsn}/emqx_sn_sup.erl (85%) rename apps/{emqx_sn => emqx_gateway/src/mqttsn}/include/emqx_sn.hrl (100%) rename apps/{emqx_sn => emqx_gateway}/test/broadcast_test.py (100%) rename apps/{emqx_sn => emqx_gateway}/test/emqx_sn_frame_SUITE.erl (100%) rename apps/{emqx_sn => emqx_gateway}/test/emqx_sn_protocol_SUITE.erl (100%) rename apps/{emqx_sn => emqx_gateway}/test/emqx_sn_registry_SUITE.erl (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/Makefile (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/README.md (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/add_emqx_sn_to_project.py (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case1_qos0pub.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case1_qos0sub.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case2_qos0pub.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case2_qos0sub.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case3_qos0pub.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case3_qos0sub.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case4_qos3pub.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case4_qos3sub.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case5_qos3pub.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case5_qos3sub.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case6_sleep.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/case7_double_connect.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/int_test_result.c (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/client/int_test_result.h (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/disable_qos3.py (100%) rename apps/{emqx_sn => emqx_gateway/test}/intergration_test/enable_qos3.py (100%) rename apps/{emqx_sn => emqx_gateway}/test/props/emqx_sn_proper_types.erl (100%) rename apps/{emqx_sn => emqx_gateway}/test/props/prop_emqx_sn_frame.erl (100%) delete mode 100644 apps/emqx_sn/.gitignore delete mode 100644 apps/emqx_sn/examples/simple_example.erl delete mode 100644 apps/emqx_sn/examples/simple_example2.erl delete mode 100644 apps/emqx_sn/examples/simple_example3.erl delete mode 100644 apps/emqx_sn/examples/simple_example4.erl delete mode 100644 apps/emqx_sn/rebar.config delete mode 100644 apps/emqx_sn/src/emqx_sn.app.src delete mode 100644 apps/emqx_sn/src/emqx_sn.appup.src delete mode 100644 apps/emqx_sn/vars diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index ab5b52143..ba1e8168b 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -27,4 +27,47 @@ emqx_gateway: { active_n: 100 } } + + mqttsn.1: { + ## The MQTT-SN Gateway ID in ADVERTISE message. + gateway_id: 1 + + ## Enable broadcast this gateway to WLAN + broadcast: true + + ## To control whether write statistics data into ETS table + ## for dashbord to read. + enable_stats: true + + ## To control whether accept and process the received + ## publish message with qos=-1. + enable_qos3: true + + ## Idle timeout for a MQTT-SN channel + idle_timeout: 30s + + ## The pre-defined topic name corresponding to the pre-defined topic + ## id of N. + ## Note that the pre-defined topic id of 0 is reserved. + predefined: [ + { id: 1 + topic: "/predefined/topic/name/hello" + }, + { id: 2 + topic: "/predefined/topic/name/nice" + } + ] + + ### ClientInfo override + clientinfo_override: { + username: "mqtt_sn_user" + password: "abc" + } + + listener.udp.1: { + bind: 1884 + max_connections: 10240000 + max_conn_rate: 1000 + } + } } diff --git a/apps/emqx_sn/etc/emqx_sn.conf b/apps/emqx_gateway/etc/emqx_sn.conf similarity index 100% rename from apps/emqx_sn/etc/emqx_sn.conf rename to apps/emqx_gateway/etc/emqx_sn.conf diff --git a/apps/emqx_sn/priv/emqx_sn.schema b/apps/emqx_gateway/etc/priv/emqx_sn.schema similarity index 100% rename from apps/emqx_sn/priv/emqx_sn.schema rename to apps/emqx_gateway/etc/priv/emqx_sn.schema diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl index 9726dad02..8d413e49c 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl @@ -31,6 +31,7 @@ ) -> {error, reason()} | {ok, [GwInstaPid :: pid()], GwInstaState :: state()} + %% TODO: v0.2 The child spec is better for restarting child process | {ok, [Childspec :: supervisor:child_spec()], GwInstaState :: state()}. %% @doc diff --git a/apps/emqx_gateway/src/emqx_gateway_app.erl b/apps/emqx_gateway/src/emqx_gateway_app.erl index c99228f17..3982e260b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_app.erl +++ b/apps/emqx_gateway/src/emqx_gateway_app.erl @@ -45,7 +45,7 @@ load_default_gateway_applications() -> gateway_type_searching() -> %% FIXME: Hardcoded apps - [emqx_stomp_impl]. + [emqx_stomp_impl, emqx_sn_impl]. load(Mod) -> try @@ -65,7 +65,7 @@ create_gateway_by_default([]) -> create_gateway_by_default([{Type, Name, Confs}|More]) -> case emqx_gateway_registry:lookup(Type) of undefined -> - ?LOG(error, "Skip to start ~p#~p: not_registred_type", + ?LOG(error, "Skip to start ~s#~s: not_registred_type", [Type, Name]); _ -> case emqx_gateway:create(Type, @@ -73,9 +73,9 @@ create_gateway_by_default([{Type, Name, Confs}|More]) -> <<>>, Confs) of {ok, _} -> - ?LOG(debug, "Start ~p#~p successfully!", [Type, Name]); + ?LOG(debug, "Start ~s#~s successfully!", [Type, Name]); {error, Reason} -> - ?LOG(error, "Start ~p#~p failed: ~0p", + ?LOG(error, "Start ~s#~s failed: ~0p", [Type, Name, Reason]) end end, diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 9f21f0e05..7994a6cea 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -54,7 +54,6 @@ start_link(Insta, Ctx, GwDscrptr) -> gen_server:start_link( - {local, ?MODULE}, ?MODULE, [Insta, Ctx, GwDscrptr], [] diff --git a/apps/emqx_gateway/src/emqx_gateway_metrics.erl b/apps/emqx_gateway/src/emqx_gateway_metrics.erl index 04b711d0a..461eb3344 100644 --- a/apps/emqx_gateway/src/emqx_gateway_metrics.erl +++ b/apps/emqx_gateway/src/emqx_gateway_metrics.erl @@ -49,7 +49,7 @@ %%-------------------------------------------------------------------- start_link(Type) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Type], []). + gen_server:start_link(?MODULE, [Type], []). -spec inc(gateway_type(), atom()) -> ok. inc(Type, Name) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 8f05582c7..f1b1f9fa4 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -35,7 +35,9 @@ structs() -> ["emqx_gateway"]. fields("emqx_gateway") -> - [{stomp, t(ref(stomp))}]; + [{stomp, t(ref(stomp))}, + {mqttsn, t(ref(mqttsn))} + ]; fields(stomp) -> [{"$id", t(ref(stomp_structs))}]; @@ -44,7 +46,7 @@ fields(stomp_structs) -> [ {frame, t(ref(stomp_frame))} , {clientinfo_override, t(ref(clientinfo_override))} , {authenticator, t(union([allow_anonymous]))} - , {listener, t(ref(listener))} + , {listener, t(ref(tcp_listener_group))} ]; fields(stomp_frame) -> @@ -53,13 +55,38 @@ fields(stomp_frame) -> , {max_body_length, t(integer(), undefined, 8192)} ]; +fields(mqttsn) -> + [{"$id", t(ref(mqttsn_structs))}]; + +fields(mqttsn_structs) -> + [ {gateway_id, t(integer())} + , {broadcast, t(boolean())} + , {enable_stats, t(boolean())} + , {enable_qos3, t(boolean())} + , {idle_timeout, t(duration())} + , {predefined, hoconsc:array(ref(mqttsn_predefined))} + , {clientinfo_override, t(ref(clientinfo_override))} + , {listener, t(ref(udp_listener_group))} + ]; + +fields(mqttsn_predefined) -> + %% FIXME: How to check the $id is a integer ??? + [ {id, t(integer())} + , {topic, t(string())} + ]; + fields(clientinfo_override) -> [ {username, t(string())} , {password, t(string())} , {clientid, t(string())} ]; -fields(listener) -> +fields(udp_listener_group) -> + [ {udp, t(ref(udp_listener))} + , {dtls, t(ref(dtls_listener))} + ]; + +fields(tcp_listener_group) -> [ {tcp, t(ref(tcp_listener))} , {ssl, t(ref(ssl_listener))} ]; @@ -70,7 +97,14 @@ fields(tcp_listener) -> fields(ssl_listener) -> [ {"$name", t(ref(ssl_listener_settings))}]; +fields(udp_listener) -> + [ {"$name", t(ref(udp_listener_settings))}]; + +fields(dtls_listener) -> + [ {"$name", t(ref(dtls_listener_settings))}]; + fields(listener_settings) -> + % FIXME: %[ {"bind", t(union(ip_port(), integer()))} [ {bind, t(integer())} , {acceptors, t(integer(), undefined, 8)} @@ -107,6 +141,19 @@ fields(ssl_listener_settings) -> , depth => 10 , reuse_sessions => true}) ++ fields(listener_settings); +fields(udp_listener_settings) -> + [ + %% some special confs for udp listener + ] ++ fields(listener_settings); + +fields(dtls_listener_settings) -> + [ + %% some special confs for dtls listener + ] ++ + ssl(undefined, #{handshake_timeout => "15s" + , depth => 10 + , reuse_sessions => true}) ++ fields(listener_settings); + fields(access) -> [ {"$id", #{type => string(), nullable => true}}]; diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 184c3ff87..b7e6658d1 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -25,6 +25,7 @@ ]). -export([ apply/2 + , format_listenon/1 ]). -export([ normalize_rawconf/1 @@ -89,6 +90,13 @@ apply(F, A2) when is_function(F), is_list(A2) -> erlang:apply(F, A2). +format_listenon(Port) when is_integer(Port) -> + io_lib:format("0.0.0.0:~w", [Port]); +format_listenon({Addr, Port}) when is_list(Addr) -> + io_lib:format("~s:~w", [Addr, Port]); +format_listenon({Addr, Port}) when is_tuple(Addr) -> + io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). + -type listener() :: #{}. -type rawconf() :: diff --git a/apps/emqx_sn/README.md b/apps/emqx_gateway/src/mqttsn/README.md similarity index 94% rename from apps/emqx_sn/README.md rename to apps/emqx_gateway/src/mqttsn/README.md index d7251c49c..8179dde62 100644 --- a/apps/emqx_sn/README.md +++ b/apps/emqx_gateway/src/mqttsn/README.md @@ -1,10 +1,9 @@ -emqx-sn -======= +# MQTT-SN Gateway EMQ X MQTT-SN Gateway. -Configure Plugin ----------------- +## Configure Plugin + File: etc/emqx_sn.conf @@ -72,8 +71,7 @@ mqtt.sn.password = abc - mqtt.sn.password * This parameter is optional. Pair with username above. -Load Plugin ------------ +## Load Plugin ``` ./bin/emqx_ctl plugins load emqx_sn @@ -95,23 +93,18 @@ Load Plugin - https://github.com/njh/mqtt-sn-tools - https://github.com/arobenko/mqtt-sn -sleeping device ------------ +### sleeping device PINGREQ must have a ClientId which is identical to the one in CONNECT message. Without ClientId, emqx-sn will ignore such PINGREQ. -pre-defined topics ------------ +### pre-defined topics The mapping of a pre-defined topic id and topic name should be known inadvance by both client's application and gateway. We define this mapping info in emqx_sn.conf file, and which shall be kept equivalent in all client's side. -License -------- +## License Apache License Version 2.0 -Author ------- - -EMQ X-Men Team. +## Author +EMQ X Team. diff --git a/apps/emqx_sn/src/emqx_sn_app.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_app.erl similarity index 100% rename from apps/emqx_sn/src/emqx_sn_app.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_app.erl diff --git a/apps/emqx_sn/src/emqx_sn_asleep_timer.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_asleep_timer.erl similarity index 100% rename from apps/emqx_sn/src/emqx_sn_asleep_timer.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_asleep_timer.erl diff --git a/apps/emqx_sn/src/emqx_sn_broadcast.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl similarity index 98% rename from apps/emqx_sn/src/emqx_sn_broadcast.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl index a1630b844..69eb9a2c5 100644 --- a/apps/emqx_sn/src/emqx_sn_broadcast.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --include("emqx_sn.hrl"). +-include("src/mqttsn/include/emqx_sn.hrl"). -export([ start_link/2 , stop/0 diff --git a/apps/emqx_sn/src/emqx_sn_frame.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl similarity index 99% rename from apps/emqx_sn/src/emqx_sn_frame.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl index eed32803d..301247fbc 100644 --- a/apps/emqx_sn/src/emqx_sn_frame.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl @@ -17,7 +17,7 @@ -module(emqx_sn_frame). --include("emqx_sn.hrl"). +-include("src/mqttsn/include/emqx_sn.hrl"). -export([ parse/1 , serialize/1 diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl similarity index 99% rename from apps/emqx_sn/src/emqx_sn_gateway.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl index 1bccf0c1a..27eb16498 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl @@ -18,7 +18,7 @@ -behaviour(gen_statem). --include("emqx_sn.hrl"). +-include("src/mqttsn/include/emqx_sn.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -915,7 +915,8 @@ handle_unsubscribe(_, _TopicId, MsgId, State) -> {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}. do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) -> - %% XXX: Handle normal topic id as predefined topic id, to be compatible with paho mqtt-sn library + %% XXX: Handle normal topic id as predefined topic id, to be + %% compatible with paho mqtt-sn library <> = TopicName, do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State); do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, @@ -972,8 +973,11 @@ do_puback(TopicId, MsgId, ReturnCode, StateName, undefined -> {keep_state, State}; TopicName -> %%notice that this TopicName maybe normal or predefined, - %% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels - {keep_state, send_register(TopicName, TopicId, MsgId, State)} + %% involving the predefined topic name in register to + %% enhance the gateway's robustness even inconsistent + %% with MQTT-SN channels + {keep_state, send_register(TopicName, TopicId, + MsgId, State)} end; _ -> ?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]), @@ -1070,8 +1074,9 @@ handle_outgoing(Packet, State) -> send_message(mqtt2sn(Packet, State), State). cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) -> - ?LOG(debug, "cache non-registered publish message for topic-id: ~p, msg: ~0p, pendings: ~0p", - [TopicId, PubPkt, Pendings]), + ?LOG(debug, "cache non-registered publish message " + "for topic-id: ~p, msg: ~0p, pendings: ~0p", + [TopicId, PubPkt, Pendings]), Msgs = maps:get(pending_topic_ids, Pendings, []), Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}. diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl new file mode 100644 index 000000000..c3b679381 --- /dev/null +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -0,0 +1,161 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The MQTT-SN Gateway Implement interface +-module(emqx_sn_impl). + +-behavior(emqx_gateway_impl). + +%% APIs +-export([ load/0 + , unload/0 + ]). + +-export([]). + +-export([ init/1 + , on_insta_create/3 + , on_insta_update/4 + , on_insta_destroy/3 + ]). + +-define(UDP_SOCKOPTS, []). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +load() -> + RegistryOptions = [ {cbkmod, ?MODULE} + ], + YourOptions = [params1, params2], + emqx_gateway_registry:load(mqttsn, RegistryOptions, YourOptions). + +unload() -> + emqx_gateway_registry:unload(mqttsn). + +init(_) -> + GwState = #{}, + {ok, GwState}. + +%%-------------------------------------------------------------------- +%% emqx_gateway_registry callbacks +%%-------------------------------------------------------------------- + +on_insta_create(_Insta = #{ id := InstaId, + rawconf := RawConf + }, Ctx, _GwState) -> + + %% We Also need to start `emqx_sn_broadcast` & + %% `emqx_sn_registry` process + SnGwId = maps:get(gateway_id, RawConf), + case maps:get(broadcast, RawConf) of + false -> + ok; + true -> + %% FIXME: + Port = 1884, + _ = emqx_sn_broadcast:start_link(SnGwId, Port) + end, + + PredefTopics = maps:get(predefined, RawConf), + {ok, RegistrySvr} = emqx_sn_registry:start_link(PredefTopics), + + NRawConf = maps:without( + [gateway_id, broadcast, predefined], + RawConf#{registry => RegistrySvr} + ), + Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf), + + ListenerPids = lists:map(fun(Lis) -> + start_listener(InstaId, Ctx, Lis) + end, Listeners), + {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. + +on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> + InstaId = maps:get(id, NewInsta), + try + %% XXX: 1. How hot-upgrade the changes ??? + %% XXX: 2. Check the New confs first before destroy old instance ??? + on_insta_destroy(OldInsta, GwInstaState, GwState), + on_insta_create(NewInsta, Ctx, GwState) + catch + Class : Reason : Stk -> + logger:error("Failed to update stomp instance ~s; " + "reason: {~0p, ~0p} stacktrace: ~0p", + [InstaId, Class, Reason, Stk]), + {error, {Class, Reason}} + end. + +on_insta_destroy(_Insta = #{ id := InstaId, + rawconf := RawConf + }, _GwInstaState, _GwState) -> + Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), + lists:foreach(fun(Lis) -> + stop_listener(InstaId, Lis) + end, Listeners). + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), + case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of + {ok, Pid} -> + io:format("Start mqttsn ~s:~s listener on ~s successfully.~n", + [InstaId, Type, ListenOnStr]), + Pid; + {error, Reason} -> + io:format(standard_error, + "Failed to start mqttsn ~s:~s listener on ~s: ~0p~n", + [InstaId, Type, ListenOnStr, Reason]), + throw({badconf, Reason}) + end. + +start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(InstaId, Type), + esockd:open_udp(Name, ListenOn, merge_default(SocketOpts), + {emqx_sn_gateway, start_link, [Cfg#{ctx => Ctx}]}). + +name(InstaId, Type) -> + list_to_atom(lists:concat([InstaId, ":", Type])). + +merge_default(Options) -> + case lists:keytake(udp_options, 1, Options) of + {value, {udp_options, TcpOpts}, Options1} -> + [{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} | Options1]; + false -> + [{udp_options, ?UDP_SOCKOPTS} | Options] + end. + +stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg), + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), + case StopRet of + ok -> io:format("Stop mqttsn ~s:~s listener on ~s successfully.~n", + [InstaId, Type, ListenOnStr]); + {error, Reason} -> + io:format(standard_error, + "Failed to stop mqttsn ~s:~s listener on ~s: ~0p~n", + [InstaId, Type, ListenOnStr, Reason] + ) + end, + StopRet. + +stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(InstaId, Type), + esockd:close(Name, ListenOn). diff --git a/apps/emqx_sn/src/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl similarity index 98% rename from apps/emqx_sn/src/emqx_sn_registry.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 903f61c70..f2f87d93b 100644 --- a/apps/emqx_sn/src/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --include("emqx_sn.hrl"). +-include("src/mqttsn/include/emqx_sn.hrl"). -define(LOG(Level, Format, Args), emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)). @@ -132,7 +132,7 @@ init([PredefTopics]) -> %% {ClientId, TopicId} -> TopicName %% {ClientId, TopicName} -> TopicId MaxPredefId = lists:foldl( - fun({TopicId, TopicName}, AccId) -> + fun(#{id := TopicId, topic := TopicName}, AccId) -> ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, value = TopicName}), ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, diff --git a/apps/emqx_sn/src/emqx_sn_sup.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_sup.erl similarity index 85% rename from apps/emqx_sn/src/emqx_sn_sup.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_sup.erl index 3d4fe602f..e78b41766 100644 --- a/apps/emqx_sn/src/emqx_sn_sup.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_sup.erl @@ -33,11 +33,9 @@ init([{_Ip, Port}, GwId, PredefTopics]) -> type => worker, modules => [emqx_sn_broadcast]}, Registry = #{id => emqx_sn_registry, - start => {emqx_sn_registry, start_link, [PredefTopics]}, - restart => permanent, - shutdown => brutal_kill, - type => worker, - modules => [emqx_sn_registry]}, + start => {emqx_sn_registry, start_link, [PredefTopics]}, + restart => permanent, + shutdown => brutal_kill, + type => worker, + modules => [emqx_sn_registry]}, {ok, {{one_for_one, 10, 3600}, [Broadcast, Registry]}}. - - diff --git a/apps/emqx_sn/include/emqx_sn.hrl b/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl similarity index 100% rename from apps/emqx_sn/include/emqx_sn.hrl rename to apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index e6e62565a..86cce9c91 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -71,13 +71,12 @@ on_insta_create(_Insta = #{ id := InstaId, %% FIXME: Assign ctx to InstaState {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. -%% @private -on_insta_update(NewInsta, OldInstace, GwInstaState = #{ctx := Ctx}, GwState) -> +on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> InstaId = maps:get(id, NewInsta), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? - on_insta_destroy(OldInstace, GwInstaState, GwState), + on_insta_destroy(OldInsta, GwInstaState, GwState), on_insta_create(NewInsta, Ctx, GwState) catch Class : Reason : Stk -> @@ -100,15 +99,16 @@ on_insta_destroy(_Insta = #{ id := InstaId, %%-------------------------------------------------------------------- start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> io:format("Start stomp ~s:~s listener on ~s successfully.~n", - [InstaId, Type, format(ListenOn)]), + [InstaId, Type, ListenOnStr]), Pid; {error, Reason} -> io:format(standard_error, "Failed to start stomp ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, format(ListenOn), Reason]), + [InstaId, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. @@ -128,22 +128,16 @@ merge_default(Options) -> [{tcp_options, ?TCP_OPTS} | Options] end. -format(Port) when is_integer(Port) -> - io_lib:format("0.0.0.0:~w", [Port]); -format({Addr, Port}) when is_list(Addr) -> - io_lib:format("~s:~w", [Addr, Port]); -format({Addr, Port}) when is_tuple(Addr) -> - io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). - stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg), + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of ok -> io:format("Stop stomp ~s:~s listener on ~s successfully.~n", - [InstaId, Type, format(ListenOn)]); + [InstaId, Type, ListenOnStr]); {error, Reason} -> io:format(standard_error, "Failed to stop stomp ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, format(ListenOn), Reason] + [InstaId, Type, ListenOnStr, Reason] ) end, StopRet. diff --git a/apps/emqx_sn/test/broadcast_test.py b/apps/emqx_gateway/test/broadcast_test.py similarity index 100% rename from apps/emqx_sn/test/broadcast_test.py rename to apps/emqx_gateway/test/broadcast_test.py diff --git a/apps/emqx_sn/test/emqx_sn_frame_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl similarity index 100% rename from apps/emqx_sn/test/emqx_sn_frame_SUITE.erl rename to apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl similarity index 100% rename from apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl rename to apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl diff --git a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl similarity index 100% rename from apps/emqx_sn/test/emqx_sn_registry_SUITE.erl rename to apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl diff --git a/apps/emqx_sn/intergration_test/Makefile b/apps/emqx_gateway/test/intergration_test/Makefile similarity index 100% rename from apps/emqx_sn/intergration_test/Makefile rename to apps/emqx_gateway/test/intergration_test/Makefile diff --git a/apps/emqx_sn/intergration_test/README.md b/apps/emqx_gateway/test/intergration_test/README.md similarity index 100% rename from apps/emqx_sn/intergration_test/README.md rename to apps/emqx_gateway/test/intergration_test/README.md diff --git a/apps/emqx_sn/intergration_test/add_emqx_sn_to_project.py b/apps/emqx_gateway/test/intergration_test/add_emqx_sn_to_project.py similarity index 100% rename from apps/emqx_sn/intergration_test/add_emqx_sn_to_project.py rename to apps/emqx_gateway/test/intergration_test/add_emqx_sn_to_project.py diff --git a/apps/emqx_sn/intergration_test/client/case1_qos0pub.c b/apps/emqx_gateway/test/intergration_test/client/case1_qos0pub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case1_qos0pub.c rename to apps/emqx_gateway/test/intergration_test/client/case1_qos0pub.c diff --git a/apps/emqx_sn/intergration_test/client/case1_qos0sub.c b/apps/emqx_gateway/test/intergration_test/client/case1_qos0sub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case1_qos0sub.c rename to apps/emqx_gateway/test/intergration_test/client/case1_qos0sub.c diff --git a/apps/emqx_sn/intergration_test/client/case2_qos0pub.c b/apps/emqx_gateway/test/intergration_test/client/case2_qos0pub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case2_qos0pub.c rename to apps/emqx_gateway/test/intergration_test/client/case2_qos0pub.c diff --git a/apps/emqx_sn/intergration_test/client/case2_qos0sub.c b/apps/emqx_gateway/test/intergration_test/client/case2_qos0sub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case2_qos0sub.c rename to apps/emqx_gateway/test/intergration_test/client/case2_qos0sub.c diff --git a/apps/emqx_sn/intergration_test/client/case3_qos0pub.c b/apps/emqx_gateway/test/intergration_test/client/case3_qos0pub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case3_qos0pub.c rename to apps/emqx_gateway/test/intergration_test/client/case3_qos0pub.c diff --git a/apps/emqx_sn/intergration_test/client/case3_qos0sub.c b/apps/emqx_gateway/test/intergration_test/client/case3_qos0sub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case3_qos0sub.c rename to apps/emqx_gateway/test/intergration_test/client/case3_qos0sub.c diff --git a/apps/emqx_sn/intergration_test/client/case4_qos3pub.c b/apps/emqx_gateway/test/intergration_test/client/case4_qos3pub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case4_qos3pub.c rename to apps/emqx_gateway/test/intergration_test/client/case4_qos3pub.c diff --git a/apps/emqx_sn/intergration_test/client/case4_qos3sub.c b/apps/emqx_gateway/test/intergration_test/client/case4_qos3sub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case4_qos3sub.c rename to apps/emqx_gateway/test/intergration_test/client/case4_qos3sub.c diff --git a/apps/emqx_sn/intergration_test/client/case5_qos3pub.c b/apps/emqx_gateway/test/intergration_test/client/case5_qos3pub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case5_qos3pub.c rename to apps/emqx_gateway/test/intergration_test/client/case5_qos3pub.c diff --git a/apps/emqx_sn/intergration_test/client/case5_qos3sub.c b/apps/emqx_gateway/test/intergration_test/client/case5_qos3sub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case5_qos3sub.c rename to apps/emqx_gateway/test/intergration_test/client/case5_qos3sub.c diff --git a/apps/emqx_sn/intergration_test/client/case6_sleep.c b/apps/emqx_gateway/test/intergration_test/client/case6_sleep.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case6_sleep.c rename to apps/emqx_gateway/test/intergration_test/client/case6_sleep.c diff --git a/apps/emqx_sn/intergration_test/client/case7_double_connect.c b/apps/emqx_gateway/test/intergration_test/client/case7_double_connect.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case7_double_connect.c rename to apps/emqx_gateway/test/intergration_test/client/case7_double_connect.c diff --git a/apps/emqx_sn/intergration_test/client/int_test_result.c b/apps/emqx_gateway/test/intergration_test/client/int_test_result.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/int_test_result.c rename to apps/emqx_gateway/test/intergration_test/client/int_test_result.c diff --git a/apps/emqx_sn/intergration_test/client/int_test_result.h b/apps/emqx_gateway/test/intergration_test/client/int_test_result.h similarity index 100% rename from apps/emqx_sn/intergration_test/client/int_test_result.h rename to apps/emqx_gateway/test/intergration_test/client/int_test_result.h diff --git a/apps/emqx_sn/intergration_test/disable_qos3.py b/apps/emqx_gateway/test/intergration_test/disable_qos3.py similarity index 100% rename from apps/emqx_sn/intergration_test/disable_qos3.py rename to apps/emqx_gateway/test/intergration_test/disable_qos3.py diff --git a/apps/emqx_sn/intergration_test/enable_qos3.py b/apps/emqx_gateway/test/intergration_test/enable_qos3.py similarity index 100% rename from apps/emqx_sn/intergration_test/enable_qos3.py rename to apps/emqx_gateway/test/intergration_test/enable_qos3.py diff --git a/apps/emqx_sn/test/props/emqx_sn_proper_types.erl b/apps/emqx_gateway/test/props/emqx_sn_proper_types.erl similarity index 100% rename from apps/emqx_sn/test/props/emqx_sn_proper_types.erl rename to apps/emqx_gateway/test/props/emqx_sn_proper_types.erl diff --git a/apps/emqx_sn/test/props/prop_emqx_sn_frame.erl b/apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl similarity index 100% rename from apps/emqx_sn/test/props/prop_emqx_sn_frame.erl rename to apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl diff --git a/apps/emqx_sn/.gitignore b/apps/emqx_sn/.gitignore deleted file mode 100644 index 46861cdec..000000000 --- a/apps/emqx_sn/.gitignore +++ /dev/null @@ -1,40 +0,0 @@ -.eunit -deps -*.o -*.beam -*.plt -erl_crash.dump -ebin -rel/example_project -.concrete/DEV_MODE -.rebar -_rel/ -emqx_sn.d -logs/ -.erlang.mk/ -data/ -.idea/ -*.iml -*.d -_build/ -.rebar3 -rebar3.crashdump -.DS_Store -bbmustache/ -etc/gen.emqx.conf -cuttlefish -rebar.lock -xrefr -intergration_test/emqx-rel/ -intergration_test/paho.mqtt-sn.embedded-c/ -intergration_test/client/*.exe -intergration_test/client/*.txt -.DS_Store -cover/ -ct.coverdata -eunit.coverdata -test/ct.cover.spec -erlang.mk -etc/emqx_sn.conf.rendered -.rebar3/ -*.swp diff --git a/apps/emqx_sn/examples/simple_example.erl b/apps/emqx_sn/examples/simple_example.erl deleted file mode 100644 index ce19c4133..000000000 --- a/apps/emqx_sn/examples/simple_example.erl +++ /dev/null @@ -1,126 +0,0 @@ --module(simple_example). - --include("emqx_sn.hrl"). - --define(HOST, {127,0,0,1}). --define(PORT, 1884). - --export([start/0]). - -start() -> - io:format("start to connect ~p:~p~n", [?HOST, ?PORT]), - - %% create udp socket - {ok, Socket} = gen_udp:open(0, [binary]), - - %% connect to emqx_sn broker - Packet = gen_connect_packet(<<"client1">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet), - io:format("send connect packet=~p~n", [Packet]), - %% receive message - wait_response(), - - %% register topic_id - RegisterPacket = gen_register_packet(<<"TopicA">>, 0), - ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket), - io:format("send register packet=~p~n", [RegisterPacket]), - TopicId = wait_response(), - - %% subscribe - SubscribePacket = gen_subscribe_packet(TopicId), - ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket), - io:format("send subscribe packet=~p~n", [SubscribePacket]), - wait_response(), - - %% publish - PublishPacket = gen_publish_packet(TopicId, <<"Payload...">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket), - io:format("send publish packet=~p~n", [PublishPacket]), - wait_response(), - - % wait for subscribed message from broker - wait_response(), - - %% disconnect from emqx_sn broker - DisConnectPacket = gen_disconnect_packet(), - ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket), - io:format("send disconnect packet=~p~n", [DisConnectPacket]). - - - -gen_connect_packet(ClientId) -> - Length = 6+byte_size(ClientId), - MsgType = ?SN_CONNECT, - Dup = 0, - QoS = 0, - Retain = 0, - Will = 0, - CleanSession = 1, - TopicIdType = 0, - Flag = <>, - ProtocolId = 1, - Duration = 10, - <>. - -gen_subscribe_packet(TopicId) -> - Length = 7, - MsgType = ?SN_SUBSCRIBE, - Dup = 0, - Retain = 0, - Will = 0, - QoS = 1, - CleanSession = 0, - TopicIdType = 1, - Flag = <>, - MsgId = 1, - <>. - -gen_register_packet(Topic, TopicId) -> - Length = 6+byte_size(Topic), - MsgType = ?SN_REGISTER, - MsgId = 1, - <>. - -gen_publish_packet(TopicId, Payload) -> - Length = 7+byte_size(Payload), - MsgType = ?SN_PUBLISH, - Dup = 0, - QoS = 1, - Retain = 0, - Will = 0, - CleanSession = 0, - MsgId = 1, - TopicIdType = 1, - Flag = <>, - <>. - -gen_disconnect_packet()-> - Length = 2, - MsgType = ?SN_DISCONNECT, - <>. - -wait_response() -> - receive - {udp, _Socket, _, _, Bin} -> - case Bin of - <<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> -> - io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]); - <<_Len:8, ?SN_CONNACK, 0:8>> -> - io:format("recv connect ack~n"); - <<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]), - TopicId; - <<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]); - <<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]); - _ -> - io:format("ignore bin=~p~n", [Bin]) - end; - Any -> - io:format("recv something else from udp socket ~p~n", [Any]) - after - 2000 -> - io:format("Error: receive timeout!~n"), - wait_response() - end. diff --git a/apps/emqx_sn/examples/simple_example2.erl b/apps/emqx_sn/examples/simple_example2.erl deleted file mode 100644 index b9ada6d22..000000000 --- a/apps/emqx_sn/examples/simple_example2.erl +++ /dev/null @@ -1,120 +0,0 @@ --module(simple_example2). - --include("emqx_sn.hrl"). - --define(HOST, "localhost"). --define(PORT, 1884). - --export([start/0]). - -start() -> - io:format("start to connect ~p:~p~n", [?HOST, ?PORT]), - - %% create udp socket - {ok, Socket} = gen_udp:open(0, [binary]), - - %% connect to emqx_sn broker - Packet = gen_connect_packet(<<"client1">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet), - io:format("send connect packet=~p~n", [Packet]), - %% receive message - wait_response(), - - %% subscribe, SHORT TOPIC NAME - SubscribePacket = gen_subscribe_packet(<<"T1">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket), - io:format("send subscribe packet=~p~n", [SubscribePacket]), - wait_response(), - - %% publish, SHORT TOPIC NAME - PublishPacket = gen_publish_packet(<<"T1">>, <<"Payload...">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket), - io:format("send publish packet=~p~n", [PublishPacket]), - wait_response(), - - % wait for subscribed message from broker - wait_response(), - - %% disconnect from emqx_sn broker - DisConnectPacket = gen_disconnect_packet(), - ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket), - io:format("send disconnect packet=~p~n", [DisConnectPacket]). - - - -gen_connect_packet(ClientId) -> - Length = 6+byte_size(ClientId), - MsgType = ?SN_CONNECT, - Dup = 0, - QoS = 0, - Retain = 0, - Will = 0, - CleanSession = 1, - TopicIdType = 0, - Flag = <>, - ProtocolId = 1, - Duration = 10, - <>. - -gen_subscribe_packet(ShortTopic) -> - Length = 7, - MsgType = ?SN_SUBSCRIBE, - Dup = 0, - Retain = 0, - Will = 0, - QoS = 1, - CleanSession = 0, - TopicIdType = 2, % SHORT TOPIC NAME - Flag = <>, - MsgId = 1, - <>. - -gen_register_packet(Topic, TopicId) -> - Length = 6+byte_size(Topic), - MsgType = ?SN_REGISTER, - MsgId = 1, - <>. - -gen_publish_packet(ShortTopic, Payload) -> - Length = 7+byte_size(Payload), - MsgType = ?SN_PUBLISH, - Dup = 0, - QoS = 1, - Retain = 0, - Will = 0, - CleanSession = 0, - MsgId = 1, - TopicIdType = 2, % SHORT TOPIC NAME - Flag = <>, - <>. - -gen_disconnect_packet()-> - Length = 2, - MsgType = ?SN_DISCONNECT, - <>. - -wait_response() -> - receive - {udp, _Socket, _, _, Bin} -> - case Bin of - <<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> -> - io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]); - <<_Len:8, ?SN_CONNACK, 0:8>> -> - io:format("recv connect ack~n"); - <<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]), - TopicId; - <<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]); - <<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]); - _ -> - io:format("ignore bin=~p~n", [Bin]) - end; - Any -> - io:format("recv something else from udp socket ~p~n", [Any]) - after - 2000 -> - io:format("Error: receive timeout!~n"), - wait_response() - end. diff --git a/apps/emqx_sn/examples/simple_example3.erl b/apps/emqx_sn/examples/simple_example3.erl deleted file mode 100644 index 40f0bf572..000000000 --- a/apps/emqx_sn/examples/simple_example3.erl +++ /dev/null @@ -1,120 +0,0 @@ --module(simple_example3). - --include("emqx_sn.hrl"). - --define(HOST, "localhost"). --define(PORT, 1884). - --export([start/0]). - -start() -> - io:format("start to connect ~p:~p~n", [?HOST, ?PORT]), - - %% create udp socket - {ok, Socket} = gen_udp:open(0, [binary]), - - %% connect to emqx_sn broker - Packet = gen_connect_packet(<<"client1">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet), - io:format("send connect packet=~p~n", [Packet]), - %% receive message - wait_response(), - - %% subscribe normal topic name - SubscribePacket = gen_subscribe_packet(<<"T3">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket), - io:format("send subscribe packet=~p~n", [SubscribePacket]), - wait_response(), - - %% publish SHORT TOPIC NAME - PublishPacket = gen_publish_packet(<<"T3">>, <<"Payload...">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket), - io:format("send publish packet=~p~n", [PublishPacket]), - wait_response(), - - % wait for subscribed message from broker - wait_response(), - - %% disconnect from emqx_sn broker - DisConnectPacket = gen_disconnect_packet(), - ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket), - io:format("send disconnect packet=~p~n", [DisConnectPacket]). - - - -gen_connect_packet(ClientId) -> - Length = 6+byte_size(ClientId), - MsgType = ?SN_CONNECT, - Dup = 0, - QoS = 0, - Retain = 0, - Will = 0, - CleanSession = 1, - TopicIdType = 0, - Flag = <>, - ProtocolId = 1, - Duration = 10, - <>. - -gen_subscribe_packet(ShortTopic) -> - Length = 7, - MsgType = ?SN_SUBSCRIBE, - Dup = 0, - Retain = 0, - Will = 0, - QoS = 1, - CleanSession = 0, - TopicIdType = 0, % normal topic name - Flag = <>, - MsgId = 1, - <>. - -gen_register_packet(Topic, TopicId) -> - Length = 6+byte_size(Topic), - MsgType = ?SN_REGISTER, - MsgId = 1, - <>. - -gen_publish_packet(ShortTopic, Payload) -> - Length = 7+byte_size(Payload), - MsgType = ?SN_PUBLISH, - Dup = 0, - QoS = 1, - Retain = 0, - Will = 0, - CleanSession = 0, - MsgId = 1, - TopicIdType = 2, % SHORT TOPIC NAME - Flag = <>, - <>. - -gen_disconnect_packet()-> - Length = 2, - MsgType = ?SN_DISCONNECT, - <>. - -wait_response() -> - receive - {udp, _Socket, _, _, Bin} -> - case Bin of - <<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> -> - io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]); - <<_Len:8, ?SN_CONNACK, 0:8>> -> - io:format("recv connect ack~n"); - <<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]), - TopicId; - <<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]); - <<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]); - _ -> - io:format("ignore bin=~p~n", [Bin]) - end; - Any -> - io:format("recv something else from udp socket ~p~n", [Any]) - after - 2000 -> - io:format("Error: receive timeout!~n"), - wait_response() - end. diff --git a/apps/emqx_sn/examples/simple_example4.erl b/apps/emqx_sn/examples/simple_example4.erl deleted file mode 100644 index 6beb5835c..000000000 --- a/apps/emqx_sn/examples/simple_example4.erl +++ /dev/null @@ -1,151 +0,0 @@ --module(simple_example4). - --include("emqx_sn.hrl"). - --define(HOST, {127,0,0,1}). --define(PORT, 1884). - --export([start/0]). - -start(LoopTimes) -> - io:format("start to connect ~p:~p~n", [?HOST, ?PORT]), - - %% create udp socket - {ok, Socket} = gen_udp:open(0, [binary]), - - %% connect to emqx_sn broker - Packet = gen_connect_packet(<<"client1">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet), - io:format("send connect packet=~p~n", [Packet]), - %% receive message - wait_response(), - - %% register topic_id - RegisterPacket = gen_register_packet(<<"TopicA">>, 0), - ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket), - io:format("send register packet=~p~n", [RegisterPacket]), - TopicId = wait_response(), - - %% subscribe - SubscribePacket = gen_subscribe_packet(TopicId), - ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket), - io:format("send subscribe packet=~p~n", [SubscribePacket]), - wait_response(), - - %% loop publish - [begin - timer:sleep(1000), - io:format("~n-------------------- publish ~p start --------------------~n", [N]), - - PublishPacket = gen_publish_packet(TopicId, <<"Payload...">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket), - io:format("send publish packet=~p~n", [PublishPacket]), - % wait for publish ack - wait_response(), - % wait for subscribed message from broker - wait_response(), - - PingReqPacket = gen_pingreq_packet(), - ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket), - % wait for pingresp - wait_response(), - - io:format("--------------------- publish ~p end ---------------------~n", [N]) - end || N <- lists:seq(1, LoopTimes)], - - %% disconnect from emqx_sn broker - DisConnectPacket = gen_disconnect_packet(), - ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket), - io:format("send disconnect packet=~p~n", [DisConnectPacket]). - - - -gen_connect_packet(ClientId) -> - Length = 6+byte_size(ClientId), - MsgType = ?SN_CONNECT, - Dup = 0, - QoS = 0, - Retain = 0, - Will = 0, - CleanSession = 1, - TopicIdType = 0, - Flag = <>, - ProtocolId = 1, - Duration = 10, - <>. - -gen_subscribe_packet(TopicId) -> - Length = 7, - MsgType = ?SN_SUBSCRIBE, - Dup = 0, - Retain = 0, - Will = 0, - QoS = 1, - CleanSession = 0, - TopicIdType = 1, - Flag = <>, - MsgId = 1, - <>. - -gen_register_packet(Topic, TopicId) -> - Length = 6+byte_size(Topic), - MsgType = ?SN_REGISTER, - MsgId = 1, - <>. - -gen_publish_packet(TopicId, Payload) -> - Length = 7+byte_size(Payload), - MsgType = ?SN_PUBLISH, - Dup = 0, - QoS = 1, - Retain = 0, - Will = 0, - CleanSession = 0, - MsgId = 1, - TopicIdType = 1, - Flag = <>, - <>. - -gen_puback_packet(TopicId, MsgId) -> - Length = 7, - MsgType = ?SN_PUBACK, - <>. - -gen_pingreq_packet() -> - Length = 2, - MsgType = ?SN_PINGREQ, - <>. - -gen_disconnect_packet()-> - Length = 2, - MsgType = ?SN_DISCONNECT, - <>. - -wait_response() -> - receive - {udp, Socket, _, _, Bin} -> - case Bin of - <<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> -> - io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]), - ok = gen_udp:send(Socket, ?HOST, ?PORT, gen_puback_packet(TopicId, MsgId)); - <<_Len:8, ?SN_CONNACK, 0:8>> -> - io:format("recv connect ack~n"); - <<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]), - TopicId; - <<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]); - <<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]); - <<_Len:8, ?SN_PINGRESP>> -> - io:format("recv pingresp~n"); - _ -> - io:format("ignore bin=~p~n", [Bin]) - end; - Any -> - io:format("recv something else from udp socket ~p~n", [Any]) - after - 2000 -> - io:format("Error: receive timeout!~n"), - wait_response() - end. diff --git a/apps/emqx_sn/rebar.config b/apps/emqx_sn/rebar.config deleted file mode 100644 index cbdac78f6..000000000 --- a/apps/emqx_sn/rebar.config +++ /dev/null @@ -1,26 +0,0 @@ -{deps, []}. -{plugins, [rebar3_proper]}. - -{deps, - [{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}} - ]}. - -{edoc_opts, [{preprocess, true}]}. -{erl_opts, [warn_unused_vars, - warn_shadow_vars, - warn_unused_import, - warn_obsolete_guard, - debug_info, - {parse_transform}]}. - -{dialyzer, [{warnings, [unmatched_returns, error_handling, race_conditions]} - ]}. - -{xref_checks, [undefined_function_calls, undefined_functions, - locals_not_used, deprecated_function_calls, - warnings_as_errors, deprecated_functions]}. -{cover_enabled, true}. -{cover_opts, [verbose]}. -{cover_export_enabled, true}. - -{plugins, [coveralls]}. diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src deleted file mode 100644 index 0e4e53dc8..000000000 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ /dev/null @@ -1,14 +0,0 @@ -{application, emqx_sn, - [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.4.0"}, % strict semver, bump manually! - {modules, []}, - {registered, []}, - {applications, [kernel,stdlib,esockd]}, - {mod, {emqx_sn_app,[]}}, - {env, []}, - {licenses, ["Apache-2.0"]}, - {maintainers, ["EMQ X Team "]}, - {links, [{"Homepage", "https://emqx.io/"}, - {"Github", "https://github.com/emqx/emqx-sn"} - ]} - ]}. diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src deleted file mode 100644 index 2bd6f5646..000000000 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ /dev/null @@ -1,19 +0,0 @@ -%% -*-: erlang -*- -{VSN, - [ - {"4.3.2", [ - {load_module, emqx_sn_gateway, brutal_purge, soft_purge, []} - ]}, - {<<"4.3.[0-1]">>, [ - {restart_application, emqx_sn} - ]} - ], - [ - {"4.3.2", [ - {load_module, emqx_sn_gateway, brutal_purge, soft_purge, []} - ]}, - {<<"4.3.[0-1]">>, [ - {restart_application, emqx_sn} - ]} - ] -}. diff --git a/apps/emqx_sn/vars b/apps/emqx_sn/vars deleted file mode 100644 index e39aa2801..000000000 --- a/apps/emqx_sn/vars +++ /dev/null @@ -1,8 +0,0 @@ -%% vars here are for test only, not intended for release - -{platform_bin_dir, "bin"}. -{platform_data_dir, "data"}. -{platform_etc_dir, "etc"}. -{platform_lib_dir, "lib"}. -{platform_log_dir, "log"}. -{platform_plugins_dir, "data/plugins"}. From fc5baf8fd40c39782ee12b96e8efd0a670aea6e8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 7 Jul 2021 18:29:47 +0800 Subject: [PATCH 2/3] refactor(gw-sn): support mutil-registry process --- .../src/mqttsn/emqx_sn_gateway.erl | 68 ++++--- apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl | 6 +- .../src/mqttsn/emqx_sn_registry.erl | 167 +++++++++++------- 3 files changed, 142 insertions(+), 99 deletions(-) diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl index 27eb16498..28d461b9b 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl @@ -76,6 +76,7 @@ }). -record(state, {gwid :: integer(), + registry :: emqx_sn_registry:registry(), socket :: port(), sockpid :: pid(), sockstate :: emqx_types:sockstate(), @@ -145,16 +146,18 @@ kick(GwPid) -> %%-------------------------------------------------------------------- init([{_, SockPid, Sock}, Peername, Options]) -> - GwId = proplists:get_value(gateway_id, Options), - Username = proplists:get_value(username, Options, undefined), - Password = proplists:get_value(password, Options, undefined), - EnableQos3 = proplists:get_value(enable_qos3, Options, false), - IdleTimeout = proplists:get_value(idle_timeout, Options, 30000), - EnableStats = proplists:get_value(enable_stats, Options, false), + GwId = maps:get(gateway_id, Options), + Registry = maps:get(registry, Options), + Username = maps:get(username, Options, undefined), + Password = maps:get(password, Options, undefined), + EnableQos3 = maps:get(enable_qos3, Options, false), + IdleTimeout = maps:get(idle_timeout, Options, 30000), + EnableStats = maps:get(enable_stats, Options, false), case inet:sockname(Sock) of {ok, Sockname} -> Channel = emqx_channel:init(?CONN_INFO(Sockname, Peername), ?DEFAULT_CHAN_OPTIONS), State = #state{gwid = GwId, + registry = Registry, username = Username, password = Password, socket = Sock, @@ -202,10 +205,16 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, topic_id_type = TopicIdType }, TopicId, _MsgId, Data)}, - State = #state{clientid = ClientId}) -> + State = #state{registry = Registry, clientid = ClientId}) -> TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of - false -> emqx_sn_registry:lookup_topic(ClientId, TopicId); - true -> <> + false -> + emqx_sn_registry:lookup_topic( + Registry, + ClientId, + TopicId + ); + true -> + <> end, _ = case TopicName =/= undefined of true -> @@ -290,9 +299,9 @@ wait_for_will_msg(EventType, EventContent, State) -> handle_event(EventType, EventContent, wait_for_will_msg, State). connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)}, - State = #state{clientid = ClientId}) -> + State = #state{registry = Registry, clientid = ClientId}) -> State0 = - case emqx_sn_registry:register_topic(ClientId, TopicName) of + case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of TopicId when is_integer(TopicId) -> ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]), send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State); @@ -579,13 +588,13 @@ handle_event(EventType, EventContent, StateName, State) -> [StateName, {EventType, EventContent}]), {keep_state, State}. -terminate(Reason, _StateName, #state{channel = Channel}) -> +terminate(Reason, _StateName, #state{registry = Registry, channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), case Reason of {shutdown, takeovered} -> ok; _ -> - emqx_sn_registry:unregister_topic(ClientId) + emqx_sn_registry:unregister_topic(Registry, ClientId) end, emqx_channel:terminate(Reason, Channel), ok. @@ -721,12 +730,13 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) -> mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> ?SN_UNSUBACK_MSG(MsgId); -mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) -> +mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), + #state{registry = Registry, channel = Channel}) -> NewPacketId = if QoS =:= ?QOS_0 -> 0; true -> PacketId end, ClientId = emqx_channel:info(clientid, Channel), - {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of + {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of {predef, PredefTopicId} -> {?SN_PREDEFINED_TOPIC, PredefTopicId}; TopicId when is_integer(TopicId) -> @@ -848,9 +858,9 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> end. handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, - State=#state{channel = Channel}) -> + State=#state{registry = Registry, channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:register_topic(ClientId, TopicName) of + case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of {error, too_large} -> State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, ?SN_INVALID_TOPIC_ID, @@ -864,9 +874,9 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, end; handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId, - State = #state{channel = Channel}) -> + State = #state{registry = Registry, channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, TopicId, @@ -895,9 +905,9 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) -> proto_unsubscribe(TopicId, MsgId, State); handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId, - State = #state{channel = Channel}) -> + State = #state{registry = Registry, channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}; PredefinedTopic -> @@ -920,11 +930,11 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) -> <> = TopicName, do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State); do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, - State=#state{channel = Channel}) -> + State=#state{registry = Registry, channel = Channel}) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, NewQoS = get_corrected_qos(QoS), ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID, State)}; @@ -963,13 +973,13 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) -> ok. do_puback(TopicId, MsgId, ReturnCode, StateName, - State=#state{channel = Channel}) -> + State=#state{registry = Registry, channel = Channel}) -> case ReturnCode of ?SN_RC_ACCEPTED -> handle_incoming(?PUBACK_PACKET(MsgId), StateName, State); ?SN_RC_INVALID_TOPIC_ID -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> {keep_state, State}; TopicName -> %%notice that this TopicName maybe normal or predefined, @@ -1061,10 +1071,10 @@ handle_outgoing(Packets, State) when is_list(Packets) -> end, State, Packets); handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _), - State = #state{channel = Channel}) -> + State = #state{registry = Registry, channel = Channel}) -> ?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]), ClientId = emqx_channel:info(clientid, Channel), - TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), + TopicId = emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName), case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of true -> register_and_notify_client(PubPkt, State); false -> send_message(mqtt2sn(PubPkt, State), State) @@ -1089,11 +1099,11 @@ replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}. register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt, - State = #state{pending_topic_ids = Pendings, channel = Channel}) -> + State = #state{registry = Registry, pending_topic_ids = Pendings, channel = Channel}) -> MsgId = message_id(PacketId), #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, ClientId = emqx_channel:info(clientid, Channel), - TopicId = emqx_sn_registry:register_topic(ClientId, TopicName), + TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName), ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index c3b679381..3085afe89 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -72,11 +72,11 @@ on_insta_create(_Insta = #{ id := InstaId, end, PredefTopics = maps:get(predefined, RawConf), - {ok, RegistrySvr} = emqx_sn_registry:start_link(PredefTopics), + {ok, RegistrySvr} = emqx_sn_registry:start_link(InstaId, PredefTopics), NRawConf = maps:without( - [gateway_id, broadcast, predefined], - RawConf#{registry => RegistrySvr} + [broadcast, predefined], + RawConf#{registry => emqx_sn_registry:lookup_name(RegistrySvr)} ), Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index f2f87d93b..30583c443 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -14,6 +14,9 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% @doc The MQTT-SN Topic Registry +%% +%% XXX: -module(emqx_sn_registry). -behaviour(gen_server). @@ -23,16 +26,15 @@ -define(LOG(Level, Format, Args), emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)). --export([ start_link/1 - , stop/0 +-export([ start_link/2 ]). --export([ register_topic/2 - , unregister_topic/1 +-export([ register_topic/3 + , unregister_topic/2 ]). --export([ lookup_topic/2 - , lookup_topic_id/2 +-export([ lookup_topic/3 + , lookup_topic_id/3 ]). %% gen_server callbacks @@ -44,51 +46,54 @@ , code_change/3 ]). +-export([lookup_name/1]). + -define(SN_SHARD, emqx_sn_shard). --define(TAB, ?MODULE). - --record(state, {max_predef_topic_id = 0}). +-record(state, {tabname, max_predef_topic_id = 0}). -record(emqx_sn_registry, {key, value}). %% Mnesia bootstrap --export([mnesia/1]). +%-export([mnesia/1]). --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). +%-boot_mnesia({mnesia, [boot]}). +%-copy_mnesia({mnesia, [copy]}). --rlog_shard({?SN_SHARD, ?TAB}). +%-rlog_shard({?SN_SHARD, ?TAB}). -%% @doc Create or replicate tables. --spec(mnesia(boot | copy) -> ok). -mnesia(boot) -> - %% Optimize storage - StoreProps = [{ets, [{read_concurrency, true}]}], - ok = ekka_mnesia:create_table(?MODULE, [ - {attributes, record_info(fields, emqx_sn_registry)}, - {ram_copies, [node()]}, - {storage_properties, StoreProps}]); +%%% @doc Create or replicate tables. +%-spec(mnesia(boot | copy) -> ok). +%mnesia(boot) -> +% %% Optimize storage +% StoreProps = [{ets, [{read_concurrency, true}]}], +% ok = ekka_mnesia:create_table(?MODULE, [ +% {attributes, record_info(fields, emqx_sn_registry)}, +% {ram_copies, [node()]}, +% {storage_properties, StoreProps}]); +% +%mnesia(copy) -> +% ok = ekka_mnesia:copy_table(?MODULE, ram_copies). -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?MODULE, ram_copies). +-type registry() :: {Tab :: atom(), + RegistryPid :: pid()}. %%----------------------------------------------------------------------------- --spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). -start_link(PredefTopics) -> - ekka_rlog:wait_for_shards([?SN_SHARD], infinity), - gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []). +-spec start_link(atom(), list()) + -> ignore + | {ok, pid()} + | {error, Reason :: term()}. +start_link(InstaId, PredefTopics) -> + gen_server:start_link(?MODULE, [InstaId, PredefTopics], []). --spec(stop() -> ok). -stop() -> - gen_server:stop(?MODULE, normal, infinity). - --spec(register_topic(binary(), binary()) -> integer() | {error, term()}). -register_topic(ClientId, TopicName) when is_binary(TopicName) -> +-spec register_topic(registry(), emqx_types:clientid(), emqx_types:topic()) + -> integer() + | {error, term()}. +register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> case emqx_topic:wildcard(TopicName) of false -> - gen_server:call(?MODULE, {register, ClientId, TopicName}); + gen_server:call(Pid, {register, ClientId, TopicName}); %% TopicId: in case of “accepted” the value that will be used as topic %% id by the gateway when sending PUBLISH messages to the client (not %% relevant in case of subscriptions to a short topic name or to a topic @@ -96,22 +101,24 @@ register_topic(ClientId, TopicName) when is_binary(TopicName) -> true -> {error, wildcard_topic} end. --spec(lookup_topic(binary(), pos_integer()) -> undefined | binary()). -lookup_topic(ClientId, TopicId) when is_integer(TopicId) -> - case lookup_element(?TAB, {predef, TopicId}, 3) of +-spec lookup_topic(registry(), emqx_types:clientid(), pos_integer()) + -> undefined + | binary(). +lookup_topic({Tab, _}, ClientId, TopicId) when is_integer(TopicId) -> + case lookup_element(Tab, {predef, TopicId}, 3) of undefined -> - lookup_element(?TAB, {ClientId, TopicId}, 3); + lookup_element(Tab, {ClientId, TopicId}, 3); Topic -> Topic end. --spec(lookup_topic_id(binary(), binary()) - -> undefined - | pos_integer() - | {predef, integer()}). -lookup_topic_id(ClientId, TopicName) when is_binary(TopicName) -> - case lookup_element(?TAB, {predef, TopicName}, 3) of +-spec lookup_topic_id(registry(), emqx_types:clientid(), emqx_types:topic()) + -> undefined + | pos_integer() + | {predef, integer()}. +lookup_topic_id({Tab, _}, ClientId, TopicName) when is_binary(TopicName) -> + case lookup_element(Tab, {predef, TopicName}, 3) of undefined -> - lookup_element(?TAB, {ClientId, TopicName}, 3); + lookup_element(Tab, {ClientId, TopicName}, 3); TopicId -> {predef, TopicId} end. @@ -120,46 +127,69 @@ lookup_topic_id(ClientId, TopicName) when is_binary(TopicName) -> lookup_element(Tab, Key, Pos) -> try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end. --spec(unregister_topic(binary()) -> ok). -unregister_topic(ClientId) -> - gen_server:call(?MODULE, {unregister, ClientId}). +-spec unregister_topic(registry(), emqx_types:clientid()) -> ok. +unregister_topic({_, Pid}, ClientId) -> + gen_server:call(Pid, {unregister, ClientId}). + +lookup_name(Pid) -> + gen_server:call(Pid, name). %%----------------------------------------------------------------------------- -init([PredefTopics]) -> +name(InstaId) -> + list_to_atom(lists:concat([emqx_sn_, InstaId, '_registry'])). + +init([InstaId, PredefTopics]) -> %% {predef, TopicId} -> TopicName %% {predef, TopicName} -> TopicId %% {ClientId, TopicId} -> TopicName %% {ClientId, TopicName} -> TopicId + Tab = name(InstaId), + ok = ekka_mnesia:create_table(Tab, [ + {ram_copies, [node()]}, + {record_name, emqx_sn_registry}, + {attributes, record_info(fields, emqx_sn_registry)}, + {storage_properties, [{ets, [{read_concurrency, true}]}]} + ]), + ok = ekka_mnesia:copy_table(Tab, ram_copies), + % FIXME: + %ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), MaxPredefId = lists:foldl( fun(#{id := TopicId, topic := TopicName}, AccId) -> - ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, - value = TopicName}), - ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, - value = TopicId}), + ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ + key = {predef, TopicId}, + value = TopicName} + ), + ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ + key = {predef, TopicName}, + value = TopicId} + ), if TopicId > AccId -> TopicId; true -> AccId end end, 0, PredefTopics), - {ok, #state{max_predef_topic_id = MaxPredefId}}. + {ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}. handle_call({register, ClientId, TopicName}, _From, - State = #state{max_predef_topic_id = PredefId}) -> - case lookup_topic_id(ClientId, TopicName) of + State = #state{tabname = Tab, max_predef_topic_id = PredefId}) -> + case lookup_topic_id({Tab, self()}, ClientId, TopicName) of {predef, PredefTopicId} when is_integer(PredefTopicId) -> {reply, PredefTopicId, State}; TopicId when is_integer(TopicId) -> {reply, TopicId, State}; undefined -> - case next_topic_id(?TAB, PredefId, ClientId) of + case next_topic_id(Tab, PredefId, ClientId) of TopicId when TopicId >= 16#FFFF -> {reply, {error, too_large}, State}; TopicId -> Fun = fun() -> - mnesia:write(#emqx_sn_registry{key = {ClientId, next_topic_id}, - value = TopicId + 1}), - mnesia:write(#emqx_sn_registry{key = {ClientId, TopicName}, - value = TopicId}), - mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId}, - value = TopicName}) + mnesia:write(Tab, #emqx_sn_registry{ + key = {ClientId, next_topic_id}, + value = TopicId + 1}, write), + mnesia:write(Tab, #emqx_sn_registry{ + key = {ClientId, TopicName}, + value = TopicId}, write), + mnesia:write(Tab, #emqx_sn_registry{ + key = {ClientId, TopicId}, + value = TopicName}, write) end, case ekka_mnesia:transaction(?SN_SHARD, Fun) of {atomic, ok} -> @@ -170,11 +200,14 @@ handle_call({register, ClientId, TopicName}, _From, end end; -handle_call({unregister, ClientId}, _From, State) -> - Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}), - lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Registry), +handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) -> + Registry = mnesia:dirty_match_object({Tab, {ClientId, '_'}, '_'}), + lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(Tab, R) end, Registry), {reply, ok, State}; +handle_call(name, _From, State = #state{tabname = Tab}) -> + {reply, {Tab, self()}, State}; + handle_call(Req, _From, State) -> ?LOG(error, "Unexpected request: ~p", [Req]), {reply, ignored, State}. From 980c7d91db30e22d95fd6f9d82dec40b78de3f76 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 12 Jul 2021 15:42:45 +0800 Subject: [PATCH 3/3] chore(gw): fix mqtt-sn test cases --- apps/emqx_gateway/etc/emqx_gateway.conf | 1 + apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl | 2 +- .../src/mqttsn/emqx_sn_registry.erl | 9 +- .../emqx_gateway/test/emqx_sn_frame_SUITE.erl | 2 +- .../test/emqx_sn_protocol_SUITE.erl | 43 ++++-- .../test/emqx_sn_registry_SUITE.erl | 126 +++++++++--------- .../test/props/emqx_sn_proper_types.erl | 2 +- .../test/props/prop_emqx_sn_frame.erl | 2 +- 8 files changed, 108 insertions(+), 79 deletions(-) diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index ba1e8168b..591f2523d 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -5,6 +5,7 @@ ## TODO: emqx_gateway: { + stomp.1: { frame: { max_headers: 10 diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index 3085afe89..1e5c5d8cd 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -68,7 +68,7 @@ on_insta_create(_Insta = #{ id := InstaId, true -> %% FIXME: Port = 1884, - _ = emqx_sn_broadcast:start_link(SnGwId, Port) + _ = emqx_sn_broadcast:start_link(SnGwId, Port), ok end, PredefTopics = maps:get(predefined, RawConf), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 30583c443..0fac56ae0 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -201,8 +201,13 @@ handle_call({register, ClientId, TopicName}, _From, end; handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) -> - Registry = mnesia:dirty_match_object({Tab, {ClientId, '_'}, '_'}), - lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(Tab, R) end, Registry), + Registry = mnesia:dirty_match_object( + Tab, + {emqx_sn_registry, {ClientId, '_'}, '_'} + ), + lists:foreach(fun(R) -> + ekka_mnesia:dirty_delete_object(Tab, R) + end, Registry), {reply, ok, State}; handle_call(name, _From, State = #state{tabname = Tab}) -> diff --git a/apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl index 85042a4be..8b68a6145 100644 --- a/apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl @@ -19,7 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx_sn/include/emqx_sn.hrl"). +-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl"). -include_lib("eunit/include/eunit.hrl"). -import(emqx_sn_frame, [ parse/1 diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 0947bdaca..a63e248ec 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -19,7 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx_sn/include/emqx_sn.hrl"). +-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -59,23 +59,41 @@ all() -> init_per_suite(Config) -> logger:set_module_level(emqx_sn_gateway, debug), - emqx_ct_helpers:start_apps([emqx_sn], fun set_special_confs/1), + emqx_ct_helpers:start_apps([emqx_gateway], fun set_special_confs/1), Config. end_per_suite(_) -> - emqx_ct_helpers:stop_apps([emqx_sn]). + emqx_ct_helpers:stop_apps([emqx_gateway]). set_special_confs(emqx) -> application:set_env(emqx, plugins_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); -set_special_confs(emqx_sn) -> - application:set_env(emqx_sn, enable_qos3, ?ENABLE_QOS3), - application:set_env(emqx_sn, enable_stats, true), - application:set_env(emqx_sn, username, <<"user1">>), - application:set_env(emqx_sn, password, <<"pw123">>), - application:set_env(emqx_sn, predefined, - [{?PREDEF_TOPIC_ID1, ?PREDEF_TOPIC_NAME1}, - {?PREDEF_TOPIC_ID2, ?PREDEF_TOPIC_NAME2}]); +set_special_confs(emqx_gateway) -> + emqx_config:put( + [emqx_gateway], + #{ mqttsn => + #{'1' => + #{broadcast => true, + clientinfo_override => + #{password => "pw123", + username => "user1" + }, + enable_qos3 => true, + enable_stats => true, + gateway_id => 1, + idle_timeout => 30000, + listener => + #{udp => + #{'1' => + #{acceptors => 8,active_n => 100,backlog => 1024,bind => 1884, + high_watermark => 1048576,max_conn_rate => 1000, + max_connections => 10240000,send_timeout => 15000, + send_timeout_close => true}}}, + predefined => + [#{id => ?PREDEF_TOPIC_ID1, topic => ?PREDEF_TOPIC_NAME1}, + #{id => ?PREDEF_TOPIC_ID2, topic => ?PREDEF_TOPIC_NAME2}]}} + }); + set_special_confs(_App) -> ok. @@ -87,7 +105,7 @@ set_special_confs(_App) -> %% Connect t_connect(_) -> - SockName = {'mqttsn:udp', {{0,0,0,0}, 1884}}, + SockName = {'mqttsn#1:udp', 1884}, ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())), {ok, Socket} = gen_udp:open(0, [binary]), @@ -170,7 +188,6 @@ t_subscribe_case02(_) -> ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), - ClientId = ?CLIENTID, send_connect_msg(Socket, ?CLIENTID), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), diff --git a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl index 58a458ecc..6161687f2 100644 --- a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl @@ -23,8 +23,10 @@ -define(REGISTRY, emqx_sn_registry). -define(MAX_PREDEF_ID, 2). --define(PREDEF_TOPICS, [{1, <<"/predefined/topic/name/hello">>}, - {2, <<"/predefined/topic/name/nice">>}]). +-define(PREDEF_TOPICS, [#{id => 1, topic => <<"/predefined/topic/name/hello">>}, + #{id => 2, topic => <<"/predefined/topic/name/nice">>}]). + +-define(INSTA_ID, 'mqttsn#1'). %%-------------------------------------------------------------------- %% Setups @@ -34,88 +36,92 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - _ = application:set_env(emqx_sn, predefined, ?PREDEF_TOPICS), + application:ensure_all_started(ekka), + ekka_mnesia:start(), Config. end_per_suite(_Config) -> + application:stop(ekka), ok. init_per_testcase(_TestCase, Config) -> - application:set_env(ekka, strict_mode, true), - ekka_mnesia:start(), - emqx_sn_registry:mnesia(boot), - ekka_mnesia:clear_table(emqx_sn_registry), - PredefTopics = application:get_env(emqx_sn, predefined, []), - {ok, _Pid} = ?REGISTRY:start_link(PredefTopics), - Config. + {ok, Pid} = ?REGISTRY:start_link(?INSTA_ID, ?PREDEF_TOPICS), + {Tab, Pid} = ?REGISTRY:lookup_name(Pid), + [{reg, {Tab, Pid}} | Config]. end_per_testcase(_TestCase, Config) -> - ?REGISTRY:stop(), + {Tab, _Pid} = proplists:get_value(reg, Config), + ekka_mnesia:clear_table(Tab), Config. %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- -t_register(_Config) -> - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)), - emqx_sn_registry:unregister_topic(<<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)). +t_register(Config) -> + Reg = proplists:get_value(reg, Config), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)), + ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)), + emqx_sn_registry:unregister_topic(Reg, <<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)). -t_register_case2(_Config) -> - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic3">>)), - ?REGISTRY:unregister_topic(<<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)). +t_register_case2(Config) -> + Reg = proplists:get_value(reg, Config), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic3">>)), + ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)). -t_reach_maximum(_Config) -> - register_a_lot(?MAX_PREDEF_ID+1, 16#ffff), - ?assertEqual({error, too_large}, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicABC">>)), +t_reach_maximum(Config) -> + Reg = proplists:get_value(reg, Config), + register_a_lot(?MAX_PREDEF_ID+1, 16#ffff, Reg), + ?assertEqual({error, too_large}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicABC">>)), Topic1 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+1])), Topic2 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+2])), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)), - ?REGISTRY:unregister_topic(<<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)). + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)), + ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)). -t_register_case4(_Config) -> - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicA">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicB">>)), - ?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicC">>)), - ?REGISTRY:unregister_topic(<<"ClientId">>), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicD">>)). +t_register_case4(Config) -> + Reg = proplists:get_value(reg, Config), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicA">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicB">>)), + ?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicC">>)), + ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicD">>)). -t_deny_wildcard_topic(_Config) -> - ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/TopicA/#">>)), - ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/+/TopicB">>)). +t_deny_wildcard_topic(Config) -> + Reg = proplists:get_value(reg, Config), + ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/TopicA/#">>)), + ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/+/TopicB">>)). %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- -register_a_lot(Max, Max) -> +register_a_lot(Max, Max, _Reg) -> ok; -register_a_lot(N, Max) when N < Max -> +register_a_lot(N, Max, Reg) when N < Max -> Topic = iolist_to_binary(["Topic", integer_to_list(N)]), - ?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)), - register_a_lot(N+1, Max). + ?assertEqual(N, ?REGISTRY:register_topic(Reg, <<"ClientId">>, Topic)), + register_a_lot(N+1, Max, Reg). diff --git a/apps/emqx_gateway/test/props/emqx_sn_proper_types.erl b/apps/emqx_gateway/test/props/emqx_sn_proper_types.erl index 8d4dae357..96318788d 100644 --- a/apps/emqx_gateway/test/props/emqx_sn_proper_types.erl +++ b/apps/emqx_gateway/test/props/emqx_sn_proper_types.erl @@ -16,7 +16,7 @@ -module(emqx_sn_proper_types). --include_lib("emqx_sn/include/emqx_sn.hrl"). +-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl"). -include_lib("proper/include/proper.hrl"). -compile({no_auto_import, [register/1]}). diff --git a/apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl b/apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl index 0135ebac7..645d24bed 100644 --- a/apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl +++ b/apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl @@ -16,7 +16,7 @@ -module(prop_emqx_sn_frame). --include_lib("emqx_sn/include/emqx_sn.hrl"). +-include_lib("src/mqttsn/include/emqx_sn.hrl"). -include_lib("proper/include/proper.hrl"). -compile({no_auto_import, [register/1]}).