Merge pull request #10278 from HJianBo/refactor-gw-dir

Refactor gateway application dirs
This commit is contained in:
JianBo He 2023-04-08 09:42:14 +08:00 committed by GitHub
commit e186477531
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
148 changed files with 1818 additions and 2469 deletions

19
apps/emqx_coap/.gitignore vendored Normal file
View File

@ -0,0 +1,19 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~

31
apps/emqx_coap/README.md Normal file
View File

@ -0,0 +1,31 @@
# emqx_coap
The CoAP gateway implements publish, subscribe, and receive messages as standard
with [Publish-Subscribe Broker for the CoAP](https://datatracker.ietf.org/doc/html/draft-ietf-core-coap-pubsub-09).
## Quick Start
In EMQX 5.0, CoAP gateways can be configured and enabled through the Dashboard.
It can also be enabled via the HTTP API or emqx.conf, e.g. In emqx.conf:
```properties
gateway.coap {
mountpoint = "coap/"
connection_required = false
listeners.udp.default {
bind = "5683"
max_connections = 1024000
max_conn_rate = 1000
}
}
```
> Note:
> Configuring the gateway via emqx.conf requires changes on a per-node basis,
> but configuring it via Dashboard or the HTTP API will take effect across the cluster.
More documentations: [CoAP Gateway](https://www.emqx.io/docs/en/v5.0/gateway/coap.html)

View File

Before

Width:  |  Height:  |  Size: 75 KiB

After

Width:  |  Height:  |  Size: 75 KiB

View File

Before

Width:  |  Height:  |  Size: 31 KiB

After

Width:  |  Height:  |  Size: 31 KiB

View File

Before

Width:  |  Height:  |  Size: 148 KiB

After

Width:  |  Height:  |  Size: 148 KiB

View File

@ -0,0 +1,4 @@
{erl_opts, [debug_info]}.
{deps, [ {emqx, {path, "../../apps/emqx"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}}
]}.

View File

@ -0,0 +1,10 @@
{application, emqx_coap, [
{description, "CoAP Gateway"},
{vsn, "0.1.0"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -14,13 +14,29 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_impl).
-behaviour(emqx_gateway_impl).
%% @doc The CoAP Gateway implement
-module(emqx_coap).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
%% define a gateway named stomp
-gateway(#{
name => coap,
callback_module => ?MODULE,
config_schema_module => emqx_coap_schema
}).
%% callback_module must implement the emqx_gateway_impl behaviour
-behaviour(emqx_gateway_impl).
%% callback for emqx_gateway_impl
-export([
on_gateway_load/2,
on_gateway_update/3,
on_gateway_unload/2
]).
-import(
emqx_gateway_utils,
[
@ -30,31 +46,8 @@
]
).
%% APIs
-export([
reg/0,
unreg/0
]).
-export([
on_gateway_load/2,
on_gateway_update/3,
on_gateway_unload/2
]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
reg() ->
RegistryOptions = [{cbkmod, ?MODULE}],
emqx_gateway_registry:reg(coap, RegistryOptions).
unreg() ->
emqx_gateway_registry:unreg(coap).
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%% emqx_gateway_impl callbacks
%%--------------------------------------------------------------------
on_gateway_load(

View File

@ -18,10 +18,10 @@
-behaviour(minirest_api).
-include("emqx_coap.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include("src/coap/include/emqx_coap.hrl").
%% API
-export([api_spec/0, paths/0, schema/1, namespace/0]).
@ -34,9 +34,12 @@
-import(hoconsc, [mk/2, enum/1]).
-import(emqx_dashboard_swagger, [error_codes/2]).
-elvis([{elvis_style, atom_naming_convention, disable}]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
namespace() -> "gateway_coap".
api_spec() ->

View File

@ -45,8 +45,8 @@
-export_type([channel/0]).
-include("emqx_coap.hrl").
-include_lib("emqx/include/logger.hrl").
-include("src/coap/include/emqx_coap.hrl").
-include_lib("emqx/include/emqx_authentication.hrl").
-define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).

View File

@ -29,7 +29,7 @@
is_message/1
]).
-include("src/coap/include/emqx_coap.hrl").
-include("emqx_coap.hrl").
-include_lib("emqx/include/types.hrl").
-define(VERSION, 1).
@ -55,6 +55,8 @@
-define(OPTION_PROXY_SCHEME, 39).
-define(OPTION_SIZE1, 60).
-elvis([{elvis_style, no_if_expression, disable}]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------

View File

@ -20,7 +20,7 @@
-module(emqx_coap_medium).
-include("src/coap/include/emqx_coap.hrl").
-include("emqx_coap.hrl").
%% API
-export([

View File

@ -43,7 +43,7 @@
set_payload_block/3, set_payload_block/4
]).
-include("src/coap/include/emqx_coap.hrl").
-include("emqx_coap.hrl").
request(Type, Method) ->
request(Type, Method, <<>>, []).

View File

@ -16,7 +16,7 @@
-module(emqx_coap_mqtt_handler).
-include("src/coap/include/emqx_coap.hrl").
-include("emqx_coap.hrl").
-export([handle_request/4]).
-import(emqx_coap_message, [response/2, response/3]).

View File

@ -18,7 +18,7 @@
-module(emqx_coap_pubsub_handler).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include("src/coap/include/emqx_coap.hrl").
-include("emqx_coap.hrl").
-export([handle_request/4]).

View File

@ -0,0 +1,95 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_schema).
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-type duration() :: non_neg_integer().
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
-reflect_type([duration/0]).
%% config schema provides
-export([fields/1, desc/1]).
fields(coap) ->
[
{heartbeat,
sc(
duration(),
#{
default => <<"30s">>,
desc => ?DESC(coap_heartbeat)
}
)},
{connection_required,
sc(
boolean(),
#{
default => false,
desc => ?DESC(coap_connection_required)
}
)},
{notify_type,
sc(
hoconsc:enum([non, con, qos]),
#{
default => qos,
desc => ?DESC(coap_notify_type)
}
)},
{subscribe_qos,
sc(
hoconsc:enum([qos0, qos1, qos2, coap]),
#{
default => coap,
desc => ?DESC(coap_subscribe_qos)
}
)},
{publish_qos,
sc(
hoconsc:enum([qos0, qos1, qos2, coap]),
#{
default => coap,
desc => ?DESC(coap_publish_qos)
}
)},
{mountpoint, emqx_gateway_schema:mountpoint()},
{listeners,
sc(
ref(emqx_gateway_schema, udp_listeners),
#{desc => ?DESC(udp_listeners)}
)}
] ++ emqx_gateway_schema:gateway_common_options().
desc(coap) ->
"The CoAP protocol gateway provides EMQX with the access capability of the CoAP protocol.\n"
"It allows publishing, subscribing, and receiving messages to EMQX in accordance\n"
"with a certain defined CoAP message format.";
desc(_) ->
undefined.
%%--------------------------------------------------------------------
%% helpers
sc(Type, Meta) ->
hoconsc:mk(Type, Meta).
ref(Mod, Field) ->
hoconsc:ref(Mod, Field).

View File

@ -15,10 +15,10 @@
%%--------------------------------------------------------------------
-module(emqx_coap_session).
-include("emqx_coap.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include("src/coap/include/emqx_coap.hrl").
%% API
-export([

View File

@ -29,8 +29,8 @@
-export_type([manager/0, event_result/1]).
-include("emqx_coap.hrl").
-include_lib("emqx/include/logger.hrl").
-include("src/coap/include/emqx_coap.hrl").
-type direction() :: in | out.
@ -80,6 +80,8 @@
-import(emqx_coap_medium, [empty/0, iter/4, reset/1, proto_out/2]).
-elvis([{elvis_style, no_if_expression, disable}]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
@ -401,9 +403,9 @@ alloc_message_id(MsgId, TM) ->
next_message_id(MsgId) ->
Next = MsgId + 1,
if
Next >= ?MAX_MESSAGE_ID ->
1;
case Next >= ?MAX_MESSAGE_ID of
true ->
1;
false ->
Next
end.

View File

@ -16,8 +16,8 @@
-module(emqx_coap_transport).
-include("emqx_coap.hrl").
-include_lib("emqx/include/logger.hrl").
-include("src/coap/include/emqx_coap.hrl").
-define(ACK_TIMEOUT, 2000).
-define(ACK_RANDOM_FACTOR, 1000).
@ -60,6 +60,12 @@
reply/2
]).
-elvis([{elvis_style, atom_naming_convention, disable}]).
-elvis([{elvis_style, no_if_expression, disable}]).
%%--------------------------------------------------------------------
%% APIs
-spec new() -> transport().
new() ->
new(undefined).

View File

@ -56,6 +56,7 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx_coap),
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_authn, emqx_gateway]),
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("src/coap/include/emqx_coap.hrl").
-include("emqx_coap.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -56,6 +56,7 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx_coap),
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_authn, emqx_gateway]),
Config.

24
apps/emqx_exproto/.gitignore vendored Normal file
View File

@ -0,0 +1,24 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~
src/emqx_exproto_pb.erl
src/emqx_exproto_v_1_connection_adapter_bhvr.erl
src/emqx_exproto_v_1_connection_adapter_client.erl
src/emqx_exproto_v_1_connection_handler_bhvr.erl
src/emqx_exproto_v_1_connection_handler_client.erl

View File

@ -0,0 +1,34 @@
{erl_opts, [debug_info]}.
{deps, [ {emqx, {path, "../../apps/emqx"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}}
]}.
{plugins, [
{grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}}
]}.
{grpc, [
{protos, ["priv/protos"]},
{out_dir, "src"},
{gpb_opts, [
{module_name_prefix, "emqx_"},
{module_name_suffix, "_pb"}
]}
]}.
{provider_hooks, [
{pre, [
{compile, {grpc, gen}},
{clean, {grpc, clean}}
]}
]}.
{xref_ignores, [emqx_exproto_pb]}.
{cover_excl_mods, [
emqx_exproto_pb,
emqx_exproto_v_1_connection_adapter_client,
emqx_exproto_v_1_connection_adapter_bhvr,
emqx_exproto_v_1_connection_handler_client,
emqx_exproto_v_1_connection_handler_bhvr
]}.

View File

@ -0,0 +1,10 @@
{application, emqx_exproto, [
{description, "ExProto Gateway"},
{vsn, "0.1.0"},
{registered, []},
{applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -14,12 +14,28 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The ExProto Gateway Implement interface
-module(emqx_exproto_impl).
-behaviour(emqx_gateway_impl).
%% @doc The ExProto Gateway implement
-module(emqx_exproto).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
%% define a gateway named stomp
-gateway(#{
name => exproto,
callback_module => ?MODULE,
config_schema_module => emqx_exproto_schema
}).
%% callback_module must implement the emqx_gateway_impl behaviour
-behaviour(emqx_gateway_impl).
%% callback for emqx_gateway_impl
-export([
on_gateway_load/2,
on_gateway_update/3,
on_gateway_unload/2
]).
-import(
emqx_gateway_utils,
@ -30,31 +46,8 @@
]
).
%% APIs
-export([
reg/0,
unreg/0
]).
-export([
on_gateway_load/2,
on_gateway_update/3,
on_gateway_unload/2
]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
reg() ->
RegistryOptions = [{cbkmod, ?MODULE}],
emqx_gateway_registry:reg(exproto, RegistryOptions).
unreg() ->
emqx_gateway_registry:unreg(exproto).
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%% emqx_gateway_impl callbacks
%%--------------------------------------------------------------------
on_gateway_load(

View File

@ -15,7 +15,8 @@
%%--------------------------------------------------------------------
-module(emqx_exproto_channel).
-include("src/exproto/include/emqx_exproto.hrl").
-include("emqx_exproto.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/types.hrl").

View File

@ -19,7 +19,7 @@
% -behaviour(emqx_exproto_v_1_connection_adapter_bhvr).
-include("src/exproto/include/emqx_exproto.hrl").
-include("emqx_exproto.hrl").
-include_lib("emqx/include/logger.hrl").
-define(IS_QOS(X), (X =:= 0 orelse X =:= 1 orelse X =:= 2)).

View File

@ -0,0 +1,117 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exproto_schema).
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-type ip_port() :: tuple() | integer().
-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
-reflect_type([
ip_port/0
]).
%% config schema provides
-export([fields/1, desc/1]).
fields(exproto) ->
[
{server,
sc(
ref(exproto_grpc_server),
#{
required => true,
desc => ?DESC(exproto_server)
}
)},
{handler,
sc(
ref(exproto_grpc_handler),
#{
required => true,
desc => ?DESC(exproto_handler)
}
)},
{mountpoint, emqx_gateway_schema:mountpoint()},
{listeners,
sc(ref(emqx_gateway_schema, tcp_udp_listeners), #{desc => ?DESC(tcp_udp_listeners)})}
] ++ emqx_gateway_schema:gateway_common_options();
fields(exproto_grpc_server) ->
[
{bind,
sc(
hoconsc:union([ip_port(), integer()]),
#{
required => true,
desc => ?DESC(exproto_grpc_server_bind)
}
)},
{ssl_options,
sc(
ref(ssl_server_opts),
#{
required => {false, recursively},
desc => ?DESC(exproto_grpc_server_ssl)
}
)}
];
fields(exproto_grpc_handler) ->
[
{address, sc(binary(), #{required => true, desc => ?DESC(exproto_grpc_handler_address)})},
{ssl_options,
sc(
ref(emqx_schema, "ssl_client_opts"),
#{
required => {false, recursively},
desc => ?DESC(exproto_grpc_handler_ssl)
}
)}
];
fields(ssl_server_opts) ->
emqx_schema:server_ssl_opts_schema(
#{
depth => 10,
reuse_sessions => true,
versions => tls_all_available
},
true
).
desc(exproto) ->
"Settings for EMQX extension protocol (exproto).";
desc(exproto_grpc_server) ->
"Settings for the exproto gRPC server.";
desc(exproto_grpc_handler) ->
"Settings for the exproto gRPC connection handler.";
desc(ssl_server_opts) ->
"SSL configuration for the server.";
desc(_) ->
undefined.
%%--------------------------------------------------------------------
%% helpers
sc(Type, Meta) ->
hoconsc:mk(Type, Meta).
ref(StructName) ->
ref(?MODULE, StructName).
ref(Mod, Field) ->
hoconsc:ref(Mod, Field).

View File

@ -76,6 +76,7 @@ metrics() ->
[tcp, ssl, udp, dtls].
init_per_group(GrpName, Cfg) ->
application:load(emqx_exproto),
put(grpname, GrpName),
Svrs = emqx_exproto_echo_svr:start(),
emqx_common_test_helpers:start_apps([emqx_authn, emqx_gateway], fun set_special_cfg/1),

View File

@ -18,8 +18,4 @@ _build
rebar3.crashdump
*~
rebar.lock
src/exproto/emqx_exproto_pb.erl
src/exproto/emqx_exproto_v_1_connection_adapter_bhvr.erl
src/exproto/emqx_exproto_v_1_connection_adapter_client.erl
src/exproto/emqx_exproto_v_1_connection_handler_bhvr.erl
src/exproto/emqx_exproto_v_1_connection_handler_client.erl

View File

@ -1,28 +0,0 @@
## shallow clone for speed
REBAR_GIT_CLONE_OPTIONS += --depth 1
export REBAR_GIT_CLONE_OPTIONS
REBAR = rebar3
all: compile
compile:
$(REBAR) compile
clean: distclean
ct:
$(REBAR) as test ct -v
eunit:
$(REBAR) as test eunit
xref:
$(REBAR) xref
cover:
$(REBAR) cover
distclean:
@rm -rf _build
@rm -f data/app.*.config data/vm.*.args rebar.lock

View File

@ -1,332 +1,58 @@
# emqx_gateway
EMQX Gateway
EMQX Gateway is an application that managing all gateways in EMQX.
## Concept
It provides a set of standards to define how to implement a certain type of
protocol access on EMQX. For example:
EMQX Gateway Management
- Gateway-Registry (or Gateway Type)
- *Load
- *UnLoad
- *List
- Frame parsing
- Access authentication
- Publish and subscribe
- Configuration & Schema
- HTTP/CLI management interfaces
- Gateway
- *Create
- *Delete
- *Update
- *Stop-And-Start
- *Hot-Upgrade
- *Satrt/Enable
- *Stop/Disable
- Listener
There are some standard implementations available, such as [Stomp](../emqx_stomp/README.md),
[MQTT-SN](../emqx_mqttsn/README.md), [CoAP](../emqx_coap/README.md),
and [LwM2M](../emqx_lwm2m/README.md) gateway.
## ROADMAP
The emqx_gateway application depends on `emqx`, `emqx_authn`, `emqx_ctl` that
provide the foundation for protocol access.
Gateway v0.1: "Basic Functionals"
- Management support
- Conn/Frame/Protocol Template
- Support Stomp/MQTT-SN/CoAP/LwM2M/ExProto
## Three ways to create your gateway
Gateway v0.2: "Integration & Friendly Management"
- Hooks & Metrics & Statistic
- HTTP APIs
- Management in the cluster
- Integrate with AuthN
- Integrate with `emqx_config`
- Improve hocon config
- Mountpoint & ClientInfo's Metadata
- The Concept Review
## Raw Erlang Application
Gateway v0.3: "Fault tolerance and high availability"
- A common session modoule for message delivery policy
- The restart mechanism for gateway-instance
- Consistency of cluster state
- Configuration hot update
This approach is the same as in EMQX 4.x. You need to implement an Erlang application,
which is packaged in EMQX as a [Plugin](todo) or as a source code dependency.
In this approach, you do not need to respect any specifications of emqx_gateway,
and you can freely implement the features you need.
Gateway v1.0: "Best practices for each type of protocol"
- CoAP
- Stomp
- MQTT-SN
- LwM2M
### Compatible with EMQX
Steps guide: [Implement Gateway via Raw Application](doc/implement_gateway_via_raw_appliction.md)
> Why we need to compatible
## Respect emqx_gateway framework
1. Authentication
2. Hooks/Event system
3. Messages Mode & Rule Engine
4. Cluster registration
5. Metrics & Statistic
Similar to the first approach, you still need to implement an application using Erlang
and package it into EMQX.
The only difference is that you need to follow the standard behaviors(callbacks) provided
by emqx_gateway.
> How to do it
This is the approach we recommend. In this approach, your implementation can be managed
by the emqx_gateway framework, even if it may require you to understand more details about it.
>
### User Interface
Steps guide: [Implement Gateway via Gateway framework](doc/implement_gateway_via_gateway_framekwork.md)
#### Configurations
## Use ExProto Gateway (Non-Erlang developers)
```hocon
gateway {
If you want to implement your gateway using other programming languages such as
Java, Python, Go, etc.
## ... some confs for top scope
..
## End.
You need to implement a gRPC service in the other programming language to parse
your device protocol and integrate it with EMQX.
## Gateway Instances
Refer to: [ExProto Gateway](../emqx_exproto/README.md)
lwm2m[.name] {
## Cookbook for emqx_gateway framework
## variable support
mountpoint: lwm2m/%e/
lifetime_min: 1s
lifetime_max: 86400s
#qmode_time_window: 22
#auto_observe: off
#update_msg_publish_condition: contains_object_list
xml_dir: {{ platform_etc_dir }}/lwm2m_xml
clientinfo_override: {
username: ${register.opts.uname}
password: ${register.opts.passwd}
clientid: ${epn}
}
#authenticator: allow_anonymous
authenticator: [
{
type: auth-http
method: post
//?? how to generate clientinfo ??
params: $client.credential
}
]
translator: {
downlink: "dn/#"
uplink: {
notify: "up/notify"
response: "up/resp"
register: "up/resp"
update: "up/reps"
}
}
%% ?? listener.$type.name ??
listener.udp[.name] {
listen_on: 0.0.0.0:5683
max_connections: 1024000
max_conn_rate: 1000
## ?? udp keepalive in socket level ???
#keepalive:
## ?? udp proxy-protocol in socket level ???
#proxy_protocol: on
#proxy_timeout: 30s
recbuf: 2KB
sndbuf: 2KB
buffer: 2KB
tune_buffer: off
#access: allow all
read_packets: 20
}
listener.dtls[.name] {
listen_on: 0.0.0.0:5684
...
}
}
## The CoAP Gateway
coap[.name] {
#enable_stats: on
authenticator: [
...
]
listener.udp[.name] {
...
}
listener.dtls[.name] {
...
}
}
## The Stomp Gateway
stomp[.name] {
allow_anonymous: true
default_user.login: guest
default_user.passcode: guest
frame.max_headers: 10
frame.max_header_length: 1024
frame.max_body_length: 8192
listener.tcp[.name] {
...
}
listener.ssl[.name] {
...
}
}
exproto[.name] {
proto_name: DL-648
authenticators: [...]
adapter: {
type: grpc
options: {
listen_on: 9100
}
}
handler: {
type: grpc
options: {
url: <http://127.0.0.1:9001>
}
}
listener.tcp[.name] {
...
}
}
## ============================ Enterpise gateways
## The JT/T 808 Gateway
jtt808[.name] {
idle_timeout: 30s
enable_stats: on
max_packet_size: 8192
clientinfo_override: {
clientid: $phone
username: xxx
password: xxx
}
authenticator: [
{
type: auth-http
method: post
params: $clientinfo.credential
}
]
translator: {
subscribe: [jt808/%c/dn]
publish: [jt808/%c/up]
}
listener.tcp[.name] {
...
}
listener.ssl[.name] {
...
}
}
gbt32960[.name] {
frame.max_length: 8192
retx_interval: 8s
retx_max_times: 3
message_queue_len: 10
authenticators: [...]
translator: {
## upstream
login: gbt32960/${vin}/upstream/vlogin
logout: gbt32960/${vin}/upstream/vlogout
informing: gbt32960/${vin}/upstream/info
reinforming: gbt32960/${vin}/upstream/reinfo
## downstream
downstream: gbt32960/${vin}/dnstream
response: gbt32960/${vin}/upstream/response
}
listener.tcp[.name] {
...
}
listener.ssl[.name] {
...
}
}
privtcp[.name] {
max_packet_size: 65535
idle_timeout: 15s
enable_stats: on
force_gc_policy: 1000|1MB
force_shutdown_policy: 8000|800MB
translator: {
up_topic: tcp/%c/up
dn_topic: tcp/%c/dn
}
listener.tcp[.name]: {
...
}
}
}
```
#### CLI
##### Gateway
```bash
## List all started gateway and gateway-instance
emqx_ctl gateway list
emqx_ctl gateway lookup <GatewayId>
emqx_ctl gateway stop <GatewayId>
emqx_ctl gateway start <GatewayId>
emqx_ctl gateway-registry re-searching
emqx_ctl gateway-registry list
emqx_ctl gateway-clients list <Type>
emqx_ctl gateway-clients show <Type> <ClientId>
emqx_ctl gateway-clients kick <Type> <ClientId>
## Banned ??
emqx_ctl gateway-banned
## Metrics
emqx_ctl gateway-metrics [<GatewayId>]
```
#### Management by HTTP-API/Dashboard/
#### How to integrate a protocol to your platform
### Develop your protocol gateway
There are 3 way to create your protocol gateway for EMQX 5.0:
1. Use Erlang to create a new emqx plugin to handle all of protocol packets (same as v5.0 before)
2. Based on the emqx-gateway-impl-bhvr and emqx-gateway
3. Use the gRPC Gateway
*WIP*

View File

@ -37,4 +37,11 @@
config => emqx_config:config()
}.
-type gateway_def() ::
#{
name := gateway_name(),
callback_module := module(),
config_schema_module := module()
}.
-endif.

View File

@ -1,38 +1,5 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../emqx"}}
]}.
{plugins, [
{grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}}
]}.
{grpc, [
{protos, ["src/exproto/protos"]},
{out_dir, "src/exproto/"},
{gpb_opts, [
{module_name_prefix, "emqx_"},
{module_name_suffix, "_pb"}
]}
]}.
{provider_hooks, [
{pre, [
{compile, {grpc, gen}},
{clean, {grpc, clean}}
]}
]}.
{xref_ignores, [emqx_exproto_pb]}.
{cover_excl_mods, [
emqx_exproto_pb,
emqx_exproto_v_1_connection_adapter_client,
emqx_exproto_v_1_connection_adapter_bhvr,
emqx_exproto_v_1_connection_handler_client,
emqx_exproto_v_1_connection_handler_bhvr
]}.
{project_plugins, [erlfmt]}.

View File

@ -1,443 +0,0 @@
# Table of Contents
1. [EMQX 5.0 CoAP Gateway](#org61e5bb8)
1. [Features](#orgeddbc94)
1. [PubSub Handler](#orgfc7be2d)
2. [MQTT Handler](#org55be508)
3. [Heartbeat](#org3d1a32e)
4. [Query String](#org9a6b996)
2. [Implementation](#org9985dfe)
1. [Request/Response flow](#orge94210c)
3. [Example](#ref_example)
<a id="org61e5bb8"></a>
# EMQX 5.0 CoAP Gateway
emqx-coap is a CoAP Gateway for EMQX. It translates CoAP messages into MQTT messages and make it possible to communiate between CoAP clients and MQTT clients.
<a id="orgeddbc94"></a>
## Features
- Partially achieves [Publish-Subscribe Broker for the Constrained Application Protocol (CoAP)](https://datatracker.ietf.org/doc/html/draft-ietf-core-coap-pubsub-09)
we called this as ps handler, include following functions:
- Publish
- Subscribe
- UnSubscribe
- Long connection and authorization verification called as MQTT handler
<a id="orgfc7be2d"></a>
### PubSub Handler
1. Publish
Method: POST\
URI Schema: ps/{+topic}{?q\*}\
q\*: [Shared Options](#orgc50043b)\
Response:
- 2.04 "Changed" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" when with wrong auth uri query
2. Subscribe
Method: GET
Options:
- Observer = 0
URI Schema: ps/{+topic}{?q\*}\
q\*: see [Shared Options](#orgc50043b)\
Response:
- 2.05 "Content" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" when with wrong auth uri query
```
Client1 Client2 Broker
| | Subscribe |
| | ----- GET /ps/topic1 Observe:0 Token:XX ----> |
| | |
| | <---------- 2.05 Content Observe:10---------- |
| | |
| | |
| | Publish |
| ---------|----------- PUT /ps/topic1 "1033.3" --------> |
| | Notify |
| | <---------- 2.05 Content Observe:11 --------- |
| | |
```
3. UnSubscribe
Method : GET
Options:
- Observe = 1
URI Schema: ps/{+topic}{?q\*}\
q\*: see [Shared Options](#orgc50043b)\
Response:
- 2.07 "No Content" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" when with wrong auth uri query
<a id="org55be508"></a>
### MQTT Handler
Establishing a connection is optional. If the CoAP client needs to use connection-based operations, it must first establish a connection.
At the same time, the connectionless mode and the connected mode cannot be mixed.
In connection mode, the Publish/Subscribe/UnSubscribe sent by the client must be has Token and ClientId in query string.
If the Token and Clientid is wrong/miss, EMQX will reset the request.
The communication token is the data carried in the response payload after the client successfully establishes a connection.
After obtaining the token, the client's subsequent request must attach "token=Token" to the Query String
ClientId is necessary when there is a connection, and is a unique identifier defined by the client.
The server manages the client through the ClientId. If the ClientId is wrong, EMQX will reset the request.
1. Create a Connection
Method: POST
URI Schema: mqtt/connection{?q\*}
q\*:
- clientid := client uid
- username
- password
Response:
- 2.01 "Created" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" wrong username or password
Payload: Token if success
2. Close a Connection
Method : DELETE
URI Schema: mqtt/connection{?q\*}
q\*:
- clientid := client uid
- token
Response:
- 2.01 "Deleted" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" wrong clientid or token
<a id="org3d1a32e"></a>
### Heartbeat
The Coap client can maintain the "connection" with the server through the heartbeat,
regardless of whether it is authenticated or not,
so that the server will not release related resources
Method : PUT
URI Schema: mqtt/connection{?q\*}
q\*:
- clientid if authenticated
- token if authenticated
Response:
- 2.01 "Changed" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" wrong clientid or token
<a id="org9a6b996"></a>
### Query String
CoAP gateway uses some options in query string to conversion between MQTT CoAP.
1. Shared Options <a id="orgc50043b"></a>
- clientid
- token
2. Connect Options
- username
- password
3. Publish
<table border="2" cellspacing="0" cellpadding="6" rules="groups" frame="hsides">
<colgroup>
<col class="org-left" />
<col class="org-left" />
<col class="org-left" />
</colgroup>
<thead>
<tr>
<th scope="col" class="org-left">Option</th>
<th scope="col" class="org-left">Type</th>
<th scope="col" class="org-left">Default</th>
</tr>
</thead>
<tbody>
<tr>
<td class="org-left">retain</td>
<td class="org-left">boolean</td>
<td class="org-left">false</td>
</tr>
<tr>
<td class="org-left">qos</td>
<td class="org-left">MQTT Qos</td>
<td class="org-left">See <a href="#org0345c3e">here</a></td>
</tr>
<tr>
<td class="org-left">expiry</td>
<td class="org-left">Message Expiry Interval</td>
<td class="org-left">0(Never expiry)</td>
</tr>
</tbody>
</table>
4. Subscribe
<table border="2" cellspacing="0" cellpadding="6" rules="groups" frame="hsides">
<colgroup>
<col class="org-left" />
<col class="org-left" />
<col class="org-right" />
</colgroup>
<thead>
<tr>
<th scope="col" class="org-left">Option</th>
<th scope="col" class="org-left">Type</th>
<th scope="col" class="org-right">Default</th>
</tr>
</thead>
<tbody>
<tr>
<td class="org-left">qos</td>
<td class="org-left">MQTT Qos</td>
<td class="org-right">See <a href="#org2325c7d">here</a></td>
</tr>
<tr>
<td class="org-left">nl</td>
<td class="org-left">MQTT Subscribe No Local</td>
<td class="org-right">0</td>
</tr>
<tr>
<td class="org-left">rh</td>
<td class="org-left">MQTT Subscribe Retain Handing</td>
<td class="org-right">0</td>
</tr>
</tbody>
</table>
5. MQTT Qos <=> CoAP non/con
1.notif_type
Control the type of notify messages when the observed object has changed.Can be:
- non
- con
- qos
in this value, MQTT Qos0 -> non, Qos1/Qos2 -> con
2.subscribe_qos <a id="org2325c7d"></a>
Control the qos of subscribe.Can be:
- qos0
- qos1
- qos2
- coap
in this value, CoAP non -> qos0, con -> qos1
3.publish_qos <a id="org0345c3e"></a>
like subscribe_qos, but control the qos of the publish MQTT message
<a id="org9985dfe"></a>
## Implementation
<a id="orge94210c"></a>
### Request/Response flow
![img](./doc/flow.png)
1. Authorization check
Check whether the clientid and token in the query string match the current connection
2. Session
Manager the "Transport Manager" "Observe Resources Manager" and next message id
3. Transport Mnager
Manager "Transport" create/close/dispatch
4. Observe resources Mnager
Mnager observe topic and token
5. Transport
![img](./doc/transport.png)
1. Shared State
![img](./doc/shared_state.png)
6. Handler
1. pubsub
<table border="2" cellspacing="0" cellpadding="6" rules="groups" frame="hsides">
<colgroup>
<col class="org-left" />
<col class="org-right" />
<col class="org-left" />
</colgroup>
<thead>
<tr>
<th scope="col" class="org-left">Method</th>
<th scope="col" class="org-right">Observe</th>
<th scope="col" class="org-left">Action</th>
</tr>
</thead>
<tbody>
<tr>
<td class="org-left">GET</td>
<td class="org-right">0</td>
<td class="org-left">subscribe and reply result</td>
</tr>
<tr>
<td class="org-left">GET</td>
<td class="org-right">1</td>
<td class="org-left">unsubscribe and reply result</td>
</tr>
<tr>
<td class="org-left">POST</td>
<td class="org-right">X</td>
<td class="org-left">publish and reply result</td>
</tr>
</tbody>
</table>
2. mqtt
<table border="2" cellspacing="0" cellpadding="6" rules="groups" frame="hsides">
<colgroup>
<col class="org-left" />
<col class="org-left" />
</colgroup>
<thead>
<tr>
<th scope="col" class="org-left">Method</th>
<th scope="col" class="org-left">Action</th>
</tr>
</thead>
<tbody>
<tr>
<td class="org-left">PUT</td>
<td class="org-left">reply result</td>
</tr>
<tr>
<td class="org-left">POST</td>
<td class="org-left">return create connection action</td>
</tr>
<tr>
<td class="org-left">DELETE</td>
<td class="org-left">return close connection action</td>
</tr>
</tbody>
</table>
<a id="ref_example"></a>
## Example
1. Create Connection
```
coap-client -m post -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&username=admin&password=public"
```
Server will return token **X** in payload
2. Update Connection
```
coap-client -m put -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&token=X"
```
3. Publish
```
coap-client -m post -e "Hellow" "obstoken" "coap://127.0.0.1/ps/coap/test?clientid=123&username=admin&password=public"
```
if you want to publish with auth, you must first establish a connection, and then post publish request on the same socket, so libcoap client can't simulation publish with a token
```
coap-client -m post -e "Hellow" "coap://127.0.0.1/ps/coap/test?clientid=123&token=X"
```
4. Subscribe
```
coap-client -m get -s 60 -O 6,0x00 -o - -T "obstoken" "coap://127.0.0.1/ps/coap/test?clientid=123&username=admin&password=public"
```
**Or**
```
coap-client -m get -s 60 -O 6,0x00 -o - -T "obstoken" "coap://127.0.0.1/ps/coap/test?clientid=123&token=X"
```
5. Close Connection
```
coap-client -m delete -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&token=X
```

View File

@ -4,7 +4,7 @@
{vsn, "0.1.14"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, grpc, emqx, emqx_authn, emqx_ctl]},
{applications, [kernel, stdlib, emqx, emqx_authn, emqx_ctl]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},

View File

@ -395,7 +395,7 @@ fields(Gw) when
Gw == exproto
->
[{name, mk(Gw, #{desc => ?DESC(gateway_name)})}] ++
convert_listener_struct(emqx_gateway_schema:fields(Gw));
convert_listener_struct(emqx_gateway_schema:gateway_schema(Gw));
fields(Gw) when
Gw == update_stomp;
Gw == update_mqttsn;
@ -405,7 +405,7 @@ fields(Gw) when
->
"update_" ++ GwStr = atom_to_list(Gw),
Gw1 = list_to_existing_atom(GwStr),
remove_listener_and_authn(emqx_gateway_schema:fields(Gw1));
remove_listener_and_authn(emqx_gateway_schema:gateway_schema(Gw1));
fields(Listener) when
Listener == tcp_listener;
Listener == ssl_listener;

View File

@ -41,35 +41,38 @@ stop(_State) ->
%% Internal funcs
load_default_gateway_applications() ->
Apps = gateway_type_searching(),
lists:foreach(fun reg/1, Apps).
lists:foreach(
fun(Def) ->
load_gateway_application(Def)
end,
emqx_gateway_utils:find_gateway_definations()
).
gateway_type_searching() ->
%% FIXME: Hardcoded apps
[
emqx_stomp_impl,
emqx_sn_impl,
emqx_exproto_impl,
emqx_coap_impl,
emqx_lwm2m_impl
].
reg(Mod) ->
try
Mod:reg(),
?SLOG(debug, #{
msg => "register_gateway_succeed",
callback_module => Mod
})
catch
Class:Reason:Stk ->
load_gateway_application(
#{
name := Name,
callback_module := CbMod,
config_schema_module := SchemaMod
}
) ->
RegistryOptions = [{cbkmod, CbMod}, {schema, SchemaMod}],
case emqx_gateway_registry:reg(Name, RegistryOptions) of
ok ->
?SLOG(debug, #{
msg => "register_gateway_succeed",
callback_module => CbMod
});
{error, already_registered} ->
?SLOG(error, #{
msg => "failed_to_register_gateway",
callback_module => Mod,
reason => {Class, Reason},
stacktrace => Stk
msg => "gateway_already_registered",
name => Name,
callback_module => CbMod
})
end.
end;
load_gateway_application(_) ->
?SLOG(error, #{
msg => "invalid_gateway_defination"
}).
load_gateway_by_default() ->
load_gateway_by_default(confs()).

View File

@ -53,329 +53,29 @@
-export([proxy_protocol_opts/0]).
-export([mountpoint/0, mountpoint/1, gateway_common_options/0, gateway_schema/1]).
namespace() -> gateway.
tags() ->
[<<"Gateway">>].
roots() -> [gateway].
roots() ->
[{gateway, sc(ref(?MODULE, gateway), #{importance => ?IMPORTANCE_HIDDEN})}].
fields(gateway) ->
[
{stomp,
sc(
ref(stomp),
#{
required => {false, recursively},
desc => ?DESC(stomp)
}
)},
{mqttsn,
sc(
ref(mqttsn),
#{
required => {false, recursively},
desc => ?DESC(mqttsn)
}
)},
{coap,
sc(
ref(coap),
#{
required => {false, recursively},
desc => ?DESC(coap)
}
)},
{lwm2m,
sc(
ref(lwm2m),
#{
required => {false, recursively},
desc => ?DESC(lwm2m)
}
)},
{exproto,
sc(
ref(exproto),
#{
required => {false, recursively},
desc => ?DESC(exproto)
}
)}
];
fields(stomp) ->
[
{frame, sc(ref(stomp_frame))},
{mountpoint, mountpoint()},
{listeners, sc(ref(tcp_listeners), #{desc => ?DESC(tcp_listeners)})}
] ++ gateway_common_options();
fields(stomp_frame) ->
[
{max_headers,
sc(
non_neg_integer(),
#{
default => 10,
desc => ?DESC(stom_frame_max_headers)
}
)},
{max_headers_length,
sc(
non_neg_integer(),
#{
default => 1024,
desc => ?DESC(stomp_frame_max_headers_length)
}
)},
{max_body_length,
sc(
integer(),
#{
default => 65536,
desc => ?DESC(stom_frame_max_body_length)
}
)}
];
fields(mqttsn) ->
[
{gateway_id,
sc(
integer(),
#{
default => 1,
required => true,
desc => ?DESC(mqttsn_gateway_id)
}
)},
{broadcast,
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqttsn_broadcast)
}
)},
%% TODO: rename
{enable_qos3,
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqttsn_enable_qos3)
}
)},
{subs_resume,
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqttsn_subs_resume)
}
)},
{predefined,
sc(
hoconsc:array(ref(mqttsn_predefined)),
#{
default => [],
required => {false, recursively},
desc => ?DESC(mqttsn_predefined)
}
)},
{mountpoint, mountpoint()},
{listeners, sc(ref(udp_listeners), #{desc => ?DESC(udp_listeners)})}
] ++ gateway_common_options();
fields(mqttsn_predefined) ->
[
{id,
sc(integer(), #{
required => true,
desc => ?DESC(mqttsn_predefined_id)
})},
{topic,
sc(binary(), #{
required => true,
desc => ?DESC(mqttsn_predefined_topic)
})}
];
fields(coap) ->
[
{heartbeat,
sc(
duration(),
#{
default => <<"30s">>,
desc => ?DESC(coap_heartbeat)
}
)},
{connection_required,
sc(
boolean(),
#{
default => false,
desc => ?DESC(coap_connection_required)
}
)},
{notify_type,
sc(
hoconsc:enum([non, con, qos]),
#{
default => qos,
desc => ?DESC(coap_notify_type)
}
)},
{subscribe_qos,
sc(
hoconsc:enum([qos0, qos1, qos2, coap]),
#{
default => coap,
desc => ?DESC(coap_subscribe_qos)
}
)},
{publish_qos,
sc(
hoconsc:enum([qos0, qos1, qos2, coap]),
#{
default => coap,
desc => ?DESC(coap_publish_qos)
}
)},
{mountpoint, mountpoint()},
{listeners,
sc(
ref(udp_listeners),
#{desc => ?DESC(udp_listeners)}
)}
] ++ gateway_common_options();
fields(lwm2m) ->
[
{xml_dir,
sc(
binary(),
#{
%% since this is not packaged with emqx, nor
%% present in the packages, we must let the user
%% specify it rather than creating a dynamic
%% default (especially difficult to handle when
%% generating docs).
example => <<"/etc/emqx/lwm2m_xml">>,
required => true,
desc => ?DESC(lwm2m_xml_dir)
}
)},
{lifetime_min,
sc(
duration(),
#{
default => <<"15s">>,
desc => ?DESC(lwm2m_lifetime_min)
}
)},
{lifetime_max,
sc(
duration(),
#{
default => <<"86400s">>,
desc => ?DESC(lwm2m_lifetime_max)
}
)},
{qmode_time_window,
sc(
duration_s(),
#{
default => <<"22s">>,
desc => ?DESC(lwm2m_qmode_time_window)
}
)},
%% TODO: Support config resource path
{auto_observe,
sc(
boolean(),
#{
default => false,
desc => ?DESC(lwm2m_auto_observe)
}
)},
%% FIXME: not working now
{update_msg_publish_condition,
sc(
hoconsc:enum([always, contains_object_list]),
#{
default => contains_object_list,
desc => ?DESC(lwm2m_update_msg_publish_condition)
}
)},
{translators,
sc(
ref(lwm2m_translators),
#{
required => true,
desc => ?DESC(lwm2m_translators)
}
)},
{mountpoint, mountpoint("lwm2m/${endpoint_name}/")},
{listeners, sc(ref(udp_listeners), #{desc => ?DESC(udp_listeners)})}
] ++ gateway_common_options();
fields(exproto) ->
[
{server,
sc(
ref(exproto_grpc_server),
#{
required => true,
desc => ?DESC(exproto_server)
}
)},
{handler,
sc(
ref(exproto_grpc_handler),
#{
required => true,
desc => ?DESC(exproto_handler)
}
)},
{mountpoint, mountpoint()},
{listeners, sc(ref(tcp_udp_listeners), #{desc => ?DESC(tcp_udp_listeners)})}
] ++ gateway_common_options();
fields(exproto_grpc_server) ->
[
{bind,
sc(
hoconsc:union([ip_port(), integer()]),
#{
required => true,
desc => ?DESC(exproto_grpc_server_bind)
}
)},
{ssl_options,
sc(
ref(ssl_server_opts),
#{
required => {false, recursively},
desc => ?DESC(exproto_grpc_server_ssl)
}
)}
];
fields(exproto_grpc_handler) ->
[
{address, sc(binary(), #{required => true, desc => ?DESC(exproto_grpc_handler_address)})},
{ssl_options,
sc(
ref(emqx_schema, "ssl_client_opts"),
#{
required => {false, recursively},
desc => ?DESC(exproto_grpc_handler_ssl)
}
)}
];
fields(ssl_server_opts) ->
emqx_schema:server_ssl_opts_schema(
#{
depth => 10,
reuse_sessions => true,
versions => tls_all_available
},
true
lists:map(
fun(#{name := Name, config_schema_module := Mod}) ->
{Name,
sc(
ref(Mod, Name),
#{
required => {false, recursively},
desc => ?DESC(Name)
}
)}
end,
emqx_gateway_utils:find_gateway_definations()
);
fields(clientinfo_override) ->
[
@ -389,68 +89,6 @@ fields(clientinfo_override) ->
})},
{clientid, sc(binary(), #{desc => ?DESC(gateway_common_clientinfo_override_clientid)})}
];
fields(lwm2m_translators) ->
[
{command,
sc(
ref(translator),
#{
desc => ?DESC(lwm2m_translators_command),
required => true
}
)},
{response,
sc(
ref(translator),
#{
desc => ?DESC(lwm2m_translators_response),
required => true
}
)},
{notify,
sc(
ref(translator),
#{
desc => ?DESC(lwm2m_translators_notify),
required => true
}
)},
{register,
sc(
ref(translator),
#{
desc => ?DESC(lwm2m_translators_register),
required => true
}
)},
{update,
sc(
ref(translator),
#{
desc => ?DESC(lwm2m_translators_update),
required => true
}
)}
];
fields(translator) ->
[
{topic,
sc(
binary(),
#{
required => true,
desc => ?DESC(translator_topic)
}
)},
{qos,
sc(
emqx_schema:qos(),
#{
default => 0,
desc => ?DESC(translator_qos)
}
)}
];
fields(udp_listeners) ->
[
{udp, sc(map(name, ref(udp_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
@ -522,37 +160,8 @@ fields(dtls_opts) ->
desc(gateway) ->
"EMQX Gateway configuration root.";
desc(stomp) ->
"The STOMP protocol gateway provides EMQX with the ability to access STOMP\n"
"(Simple (or Streaming) Text Orientated Messaging Protocol) protocol.";
desc(stomp_frame) ->
"Size limits for the STOMP frames.";
desc(mqttsn) ->
"The MQTT-SN (MQTT for Sensor Networks) protocol gateway.";
desc(mqttsn_predefined) ->
"The pre-defined topic name corresponding to the pre-defined topic\n"
"ID of N.\n\n"
"Note: the pre-defined topic ID of 0 is reserved.";
desc(coap) ->
"The CoAP protocol gateway provides EMQX with the access capability of the CoAP protocol.\n"
"It allows publishing, subscribing, and receiving messages to EMQX in accordance\n"
"with a certain defined CoAP message format.";
desc(lwm2m) ->
"The LwM2M protocol gateway.";
desc(exproto) ->
"Settings for EMQX extension protocol (exproto).";
desc(exproto_grpc_server) ->
"Settings for the exproto gRPC server.";
desc(exproto_grpc_handler) ->
"Settings for the exproto gRPC connection handler.";
desc(ssl_server_opts) ->
"SSL configuration for the server.";
desc(clientinfo_override) ->
"ClientInfo override.";
desc(lwm2m_translators) ->
"MQTT topics that correspond to LwM2M events.";
desc(translator) ->
"MQTT topic that corresponds to a particular type of event.";
desc(udp_listeners) ->
"Settings for the UDP listeners.";
desc(tcp_listeners) ->
@ -715,8 +324,18 @@ proxy_protocol_opts() ->
)}
].
sc(Type) ->
sc(Type, #{}).
%%--------------------------------------------------------------------
%% dynamic schemas
%% FIXME: don't hardcode the gateway names
gateway_schema(stomp) -> emqx_stomp_schema:fields(stomp);
gateway_schema(mqttsn) -> emqx_mqttsn_schema:fields(mqttsn);
gateway_schema(coap) -> emqx_coap_schema:fields(coap);
gateway_schema(lwm2m) -> emqx_lwm2m_schema:fields(lwm2m);
gateway_schema(exproto) -> emqx_exproto_schema:fields(exproto).
%%--------------------------------------------------------------------
%% helpers
sc(Type, Meta) ->
hoconsc:mk(Type, Meta).

View File

@ -46,7 +46,8 @@
global_chain/1,
listener_chain/3,
make_deprecated_paths/1,
make_compatible_schema/2
make_compatible_schema/2,
find_gateway_definations/0
]).
-export([stringfy/1]).
@ -562,3 +563,82 @@ make_compatible_schema2(Path, SchemaFun) ->
end,
Schema
).
-spec find_gateway_definations() -> list(gateway_def()).
find_gateway_definations() ->
lists:flatten(
lists:map(
fun(App) ->
gateways(find_attrs(App, gateway))
end,
ignore_lib_apps(application:loaded_applications())
)
).
gateways([]) ->
[];
gateways([
{_App, _Mod,
Defination =
#{
name := Name,
callback_module := CbMod,
config_schema_module := SchemaMod
}}
| More
]) when is_atom(Name), is_atom(CbMod), is_atom(SchemaMod) ->
[Defination | gateways(More)].
find_attrs(App, Def) ->
[
{App, Mod, Attr}
|| {ok, Modules} <- [application:get_key(App, modules)],
Mod <- Modules,
{Name, Attrs} <- module_attributes(Mod),
Name =:= Def,
Attr <- Attrs
].
module_attributes(Module) ->
try
apply(Module, module_info, [attributes])
catch
error:undef -> []
end.
ignore_lib_apps(Apps) ->
LibApps = [
kernel,
stdlib,
sasl,
appmon,
eldap,
erts,
syntax_tools,
ssl,
crypto,
mnesia,
os_mon,
inets,
goldrush,
gproc,
runtime_tools,
snmp,
otp_mibs,
public_key,
asn1,
ssh,
hipe,
common_test,
observer,
webtool,
xmerl,
tools,
test_server,
compiler,
debugger,
eunit,
et,
wx
],
[AppName || {AppName, _, _} <- Apps, not lists:member(AppName, LibApps)].

View File

@ -1,25 +0,0 @@
deps/
ebin/
_rel/
.erlang.mk/
*.d
*.o
*.exe
data/
*.iml
.idea/
logs/
*.beam
emqx_coap.d
erlang.mk
integration_test/emqx-rel/
integration_test/build_wakaama/
integration_test/case*.txt
integration_test/paho/
integration_test/wakaama/
_build/
rebar.lock
rebar3.crashdump
*.conf.rendered
.rebar3/
*.swp

View File

@ -1,357 +0,0 @@
# LwM2M Gateway
[The LwM2M Specifications](http://www.openmobilealliance.org/release/LightweightM2M) is a Lightweight Machine to Machine protocol.
With `emqx_lwm2m`, user is able to send LwM2M commands(READ/WRITE/EXECUTE/...) and get LwM2M response in MQTT way. `emqx_lwm2m` transforms data between MQTT and LwM2M protocol.
emqx_lwm2m needs object definitions to parse data from lwm2m devices. Object definitions are declared by organizations in XML format, you could find those XMLs from [LwM2MRegistry](http://www.openmobilealliance.org/wp/OMNA/LwM2M/LwM2MRegistry.html), download and put them into the directory specified by `lwm2m.xml_dir`. If no associated object definition is found, response from device will be discarded and report an error message in log.
## Load emqx_lwm2m
```
./bin/emqx_ctl plugins load emqx_lwm2m
```
## Test emqx-lwm2m using *wakaama*
[wakaama](https://github.com/eclipse/wakaama) is an easy-to-use lwm2m client command line tool.
Start *lwm2mclient* using an endpoint name `ep1`:
```
./lwm2mclient -n ep1 -h 127.0.0.1 -p 5683 -4
```
To send an LwM2M DISCOVER command to *lwm2mclient*, publish an MQTT message to topic `lwm2m/<epn>/dn` (where `<epn>` is the endpoint name of the client), with following payload:
```json
{
"reqID": "2",
"msgType": "discover",
"data": {
"path": "/3/0"
}
}
```
The MQTT message will be translated to an LwM2M DISCOVER command and sent to the *lwm2mclient*. Then the response of *lwm2mclient* will be in turn translated to an MQTT message, with topic `lwm2m/<epn>/up/resp`, with following payload:
```json
{
"reqID": "2",
"msgType": "discover",
"data": {
"code":"2.05",
"codeMsg": "content",
"content": [
"</3/0>;dim=8",
"</3/0/0>",
"</3/0/1>",
"</3/0/4>",
"</3/0/16>"
]
}
}
```
## LwM2M <--> MQTT Mapping
### Register/Update (LwM2M Client Registration Interface)
- **LwM2M Register and Update message will be converted to following MQTT message:**
- **Method:** PUBLISH
- **Topic:** `lwm2m/{?EndpointName}/up/resp` (configurable)
- **Payload**:
- MsgType **register** and **update**:
```json
{
"msgType": {?MsgType},
"data": {
"ep": {?EndpointName},
"lt": {?LifeTime},
"sms": {?MSISDN},
"lwm2m": {?Lwm2mVersion},
"b": {?Binding},
"alternatePath": {?AlternatePath},
"objectList": {?ObjectList}
}
}
```
- {?EndpointName}: String, the endpoint name of the LwM2M client
- {?MsgType}: String, could be:
- "register": LwM2M Register
- "update": LwM2M Update
- "data" contains the query options and the object-list of the register message
- The *update* message is only published if the object-list changed.
### Downlink Command and Uplink Response (LwM2M Device Management & Service Enablement Interface)
- **To send a downlink command to device, publish following MQTT message:**
- **Method:** PUBLISH
- **Topic:** `lwm2m/{?EndpointName}/dn`
- **Request Payload**:
```json
{
"reqID": {?ReqID},
"msgType": {?MsgType},
"data": {?Data}
}
```
- {?ReqID}: Integer, request-id, used for matching the response to the request
- {?MsgType}: String, can be one of the following:
- "read": LwM2M Read
- "discover": LwM2M Discover
- "write": LwM2M Write
- "write-attr": LwM2M Write Attributes
- "execute": LwM2M Execute
- "create": LwM2M Create
- "delete": LwM2M Delete
- {?Data}: JSON Object, its value depends on the {?MsgType}:
- **If {?MsgType} = "read" or "discover"**:
```json
{
"path": {?ResourcePath}
}
```
- {?ResourcePath}: String, LwM2M full resource path. e.g. "3/0", "/3/0/0", "/3/0/6/0"
- **If {?MsgType} = "write" (single write)**:
```json
{
"path": {?ResourcePath},
"type": {?ValueType},
"value": {?Value}
}
```
- {?ValueType}: String, can be: "Time", "String", "Integer", "Float", "Boolean", "Opaque", "Objlnk"
- {?Value}: Value of the resource, depends on "type".
- **If {?MsgType} = "write" (batch write)**:
```json
{
"basePath": {?BasePath},
"content": [
{
"path": {?ResourcePath},
"type": {?ValueType},
"value": {?Value}
}
]
}
```
- The full path is concatenation of "basePath" and "path".
- **If {?MsgType} = "write-attr"**:
```json
{
"path": {?ResourcePath},
"pmin": {?PeriodMin},
"pmax": {?PeriodMax},
"gt": {?GreaterThan},
"lt": {?LessThan},
"st": {?Step}
}
```
- {?PeriodMin}: Number, LwM2M Notification Class Attribute - Minimum Period.
- {?PeriodMax}: Number, LwM2M Notification Class Attribute - Maximum Period.
- {?GreaterThan}: Number, LwM2M Notification Class Attribute - Greater Than.
- {?LessThan}: Number, LwM2M Notification Class Attribute - Less Than.
- {?Step}: Number, LwM2M Notification Class Attribute - Step.
- **If {?MsgType} = "execute"**:
```json
{
"path": {?ResourcePath},
"args": {?Arguments}
}
```
- {?Arguments}: String, LwM2M Execute Arguments.
- **If {?MsgType} = "create"**:
```json
{
"basePath": "/{?ObjectID}",
"content": [
{
"path": {?ResourcePath},
"type": {?ValueType},
"value": {?Value}
}
]
}
```
- {?ObjectID}: Integer, LwM2M Object ID
- **If {?MsgType} = "delete"**:
```json
{
"path": "{?ObjectID}/{?ObjectInstanceID}"
}
```
- {?ObjectInstanceID}: Integer, LwM2M Object Instance ID
- **The response of LwM2M will be converted to following MQTT message:**
- **Method:** PUBLISH
- **Topic:** `"lwm2m/{?EndpointName}/up/resp"`
- **Response Payload:**
```json
{
"reqID": {?ReqID},
"imei": {?IMEI},
"imsi": {?IMSI},
"msgType": {?MsgType},
"data": {?Data}
}
```
- {?MsgType}: String, can be:
- "read": LwM2M Read
- "discover": LwM2M Discover
- "write": LwM2M Write
- "write-attr": LwM2M Write Attributes
- "execute": LwM2M Execute
- "create": LwM2M Create
- "delete": LwM2M Delete
- **"ack"**: [CoAP Empty ACK](https://tools.ietf.org/html/rfc7252#section-5.2.2)
- {?Data}: JSON Object, its value depends on {?MsgType}:
- **If {?MsgType} = "write", "write-attr", "execute", "create", "delete", or "read"(when response without content)**:
```json
{
"code": {?StatusCode},
"codeMsg": {?CodeMsg},
"reqPath": {?RequestPath}
}
```
- {?StatusCode}: String, LwM2M status code, e.g. "2.01", "4.00", etc.
- {?CodeMsg}: String, LwM2M response message, e.g. "content", "bad_request"
- {?RequestPath}: String, the requested "path" or "basePath"
- **If {?MsgType} = "discover"**:
```json
{
"code": {?StatusCode},
"codeMsg": {?CodeMsg},
"reqPath": {?RequestPath},
"content": [
{?Link},
...
]
}
```
- {?Link}: String(LwM2M link format) e.g. `"</3>"`, `"<3/0/1>;dim=8"`
- **If {?MsgType} = "read"(when response with content)**:
```json
{
"code": {?StatusCode},
"codeMsg": {?CodeMsg},
"content": {?Content}
}
```
- {?Content}
```json
[
{
"path": {?ResourcePath},
"value": {?Value}
}
]
```
- **If {?MsgType} = "ack", "data" does not exists**
### Observe (Information Reporting Interface - Observe/Cancel-Observe)
- **To observe/cancel-observe LwM2M client, send following MQTT PUBLISH:**
- **Method:** PUBLISH
- **Topic:** `lwm2m/{?EndpointName}/dn`
- **Request Payload**:
```json
{
"reqID": {?ReqID},
"msgType": {?MsgType},
"data": {
"path": {?ResourcePath}
}
}
```
- {?ResourcePath}: String, the LwM2M resource to be observed/cancel-observed.
- {?MsgType}: String, can be:
- "observe": LwM2M Observe
- "cancel-observe": LwM2M Cancel Observe
- {?ReqID}: Integer, request-id, is the {?ReqID} in the request
- **Responses will be converted to following MQTT message:**
- **Method:** PUBLISH
- **Topic:** `lwm2m/{?EndpointName}/up/resp`
- **Response Payload**:
```json
{
"reqID": {?ReqID},
"msgType": {?MsgType},
"data": {
"code": {?StatusCode},
"codeMsg": {?CodeMsg},
"reqPath": {?RequestPath},
"content": [
{
"path": {?ResourcePath},
"value": {?Value}
}
]
}
}
```
- {?MsgType}: String, can be:
- "observe": LwM2M Observe
- "cancel-observe": LwM2M Cancel Observe
- **"ack"**: [CoAP Empty ACK](https://tools.ietf.org/html/rfc7252#section-5.2.2)
### Notification (Information Reporting Interface - Notify)
- **The notifications from LwM2M clients will be converted to MQTT PUBLISH:**
- **Method:** PUBLISH
- **Topic:** `lwm2m/{?EndpiontName}/up/notify`
- **Notification Payload**:
```json
{
"reqID": {?ReqID},
"msgType": {?MsgType},
"seqNum": {?ObserveSeqNum},
"data": {
"code": {?StatusCode},
"codeMsg": {?CodeMsg},
"reqPath": {?RequestPath},
"content": [
{
"path": {?ResourcePath},
"value": {?Value}
}
]
}
}
```
- {?MsgType}: String, must be "notify"
- {?ObserveSeqNum}: Number, value of "Observe" option in CoAP message
- "content": same to the "content" field contains in the response of "read" command
## Feature limitations
- emqx_lwm2m implements LwM2M gateway to EMQX, not a full-featured and independent LwM2M server.
- emqx_lwm2m does not include LwM2M bootstrap server.
- emqx_lwm2m supports UDP binding, no SMS binding yet.
- Firmware object is not fully supported now since mqtt to coap block-wise transfer is not available.
- Object Versioning is not supported now.
## DTLS
emqx-lwm2m support DTLS to secure UDP data.
Please config lwm2m.certfile and lwm2m.keyfile in emqx_lwm2m.conf. If certfile or keyfile are invalid, DTLS will be turned off and you could read a error message in the log.
## License
Apache License Version 2.0
## Author
EMQX-Men Team.

View File

@ -1,110 +0,0 @@
# MQTT-SN Gateway
EMQX MQTT-SN Gateway.
## Configure Plugin
File: etc/emqx_sn.conf
```
## The UDP port which emq-sn is listening on.
##
## Value: IP:Port | Port
##
## Examples: 1884, 127.0.0.1:1884, ::1:1884
mqtt.sn.port = 1884
## The duration(seconds) that emq-sn broadcast ADVERTISE message through.
##
## Value: Second
mqtt.sn.advertise_duration = 900
## The MQTT-SN Gateway id in ADVERTISE message.
##
## Value: Number
mqtt.sn.gateway_id = 1
## To control whether write statistics data into ETS table for dashboard to read.
##
## Value: on | off
mqtt.sn.enable_stats = off
## To control whether accept and process the received publish message with qos=-1.
##
## Value: on | off
mqtt.sn.enable_qos3 = off
## The pre-defined topic name corresponding to the pre-defined topic id of N.
## Note that the pre-defined topic id of 0 is reserved.
mqtt.sn.predefined.topic.0 = reserved
mqtt.sn.predefined.topic.1 = /predefined/topic/name/hello
mqtt.sn.predefined.topic.2 = /predefined/topic/name/nice
## Default username for MQTT-SN. This parameter is optional. If specified,
## emq-sn will connect EMQ core with this username. It is useful if any auth
## plug-in is enabled.
##
## Value: String
mqtt.sn.username = mqtt_sn_user
## This parameter is optional. Pair with username above.
##
## Value: String
mqtt.sn.password = abc
```
- mqtt.sn.port
* The UDP port which emqx-sn is listening on.
- mqtt.sn.advertise_duration
* The duration(seconds) that emqx-sn broadcast ADVERTISE message through.
- mqtt.sn.gateway_id
* Gateway id in ADVERTISE message.
- mqtt.sn.enable_stats
* To control whether write statistics data into ETS table for dashboard to read.
- mqtt.sn.enable_qos3
* To control whether accept and process the received publish message with qos=-1.
- mqtt.sn.predefined.topic.N
* The pre-defined topic name corresponding to the pre-defined topic id of N. Note that the pre-defined topic id of 0 is reserved.
- mqtt.sn.username
* This parameter is optional. If specified, emqx-sn will connect EMQX core with this username. It is useful if any auth plug-in is enabled.
- mqtt.sn.password
* This parameter is optional. Pair with username above.
## Load Plugin
```
./bin/emqx_ctl plugins load emqx_sn
```
## Client
### NOTE
- Topic ID is per-client, and will be cleared if client disconnected with broker or keepalive failure is detected in broker.
- Please register your topics again each time connected with broker.
- If your udp socket(mqtt-sn client) has successfully connected to broker, don't try to send another CONNECT on this socket again, which will lead to confusing behaviour. If you want to start from beging, please do as following:
+ destroy your present socket and create a new socket to connect again
+ or send DISCONNECT on the same socket and connect again.
### Library
- https://github.com/eclipse/paho.mqtt-sn.embedded-c/
- https://github.com/ty4tw/MQTT-SN
- https://github.com/njh/mqtt-sn-tools
- https://github.com/arobenko/mqtt-sn
### sleeping device
PINGREQ must have a ClientId which is identical to the one in CONNECT message. Without ClientId, emqx-sn will ignore such PINGREQ.
### pre-defined topics
The mapping of a pre-defined topic id and topic name should be known inadvance by both client's application and gateway. We define this mapping info in emqx_sn.conf file, and which shall be kept equivalent in all client's side.
## License
Apache License Version 2.0
## Author
EMQX Team.

View File

@ -1,73 +0,0 @@
# emqx-stomp
The plugin adds STOMP 1.0/1.1/1.2 protocol supports to the EMQX broker.
The STOMP clients could PubSub to the MQTT clients.
## Configuration
etc/emqx_stomp.conf
```
## The Port that stomp listener will bind.
##
## Value: Port
stomp.listener = 61613
## The acceptor pool for stomp listener.
##
## Value: Number
stomp.listener.acceptors = 4
## Maximum number of concurrent stomp connections.
##
## Value: Number
stomp.listener.max_connections = 512
## Default login user
##
## Value: String
stomp.default_user.login = guest
## Default login password
##
## Value: String
stomp.default_user.passcode = guest
## Allow anonymous authentication.
##
## Value: true | false
stomp.allow_anonymous = true
## Maximum numbers of frame headers.
##
## Value: Number
stomp.frame.max_headers = 10
## Maximum length of frame header.
##
## Value: Number
stomp.frame.max_header_length = 1024
## Maximum body length of frame.
##
## Value: Number
stomp.frame.max_body_length = 8192
```
## Load the Plugin
```
./bin/emqx_ctl plugins load emqx_stomp
```
## License
Apache License Version 2.0
## Author
EMQX Team.

View File

@ -33,6 +33,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_config:erase(gateway),
emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([emqx_authn, emqx_gateway]),
Conf.
@ -67,11 +68,11 @@ end_per_testcase(_TestCase, _Config) ->
t_registered_gateway(_) ->
[
{coap, #{cbkmod := emqx_coap_impl}},
{exproto, #{cbkmod := emqx_exproto_impl}},
{lwm2m, #{cbkmod := emqx_lwm2m_impl}},
{mqttsn, #{cbkmod := emqx_sn_impl}},
{stomp, #{cbkmod := emqx_stomp_impl}}
{coap, #{cbkmod := emqx_coap}},
{exproto, #{cbkmod := emqx_exproto}},
{lwm2m, #{cbkmod := emqx_lwm2m}},
{mqttsn, #{cbkmod := emqx_mqttsn}},
{stomp, #{cbkmod := emqx_stomp}}
] = emqx_gateway:registered_gateway().
t_load_unload_list_lookup(_) ->
@ -187,7 +188,14 @@ read_lwm2m_conf(DataDir) ->
Conf.
setup_fake_usage_data(Lwm2mDataDir) ->
XmlDir = emqx_common_test_helpers:deps_path(emqx_gateway, "src/lwm2m/lwm2m_xml"),
XmlDir = filename:join(
[
emqx_common_test_helpers:proj_root(),
"apps",
"emqx_lwm2m",
"lwm2m_xml"
]
),
Lwm2mConf = read_lwm2m_conf(Lwm2mDataDir),
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, Lwm2mConf),
emqx_config:put([gateway, lwm2m, xml_dir], XmlDir),

View File

@ -46,6 +46,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
application:load(emqx),
emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_config:delete_override_conf_files(),
emqx_config:erase(gateway),
emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
@ -214,9 +215,17 @@ t_gateway_coap(_) ->
t_gateway_lwm2m(_) ->
{200, Gw} = request(get, "/gateways/lwm2m"),
assert_gw_unloaded(Gw),
XmlDir = filename:join(
[
emqx_common_test_helpers:proj_root(),
"apps",
"emqx_lwm2m",
"lwm2m_xml"
]
),
GwConf = #{
name => <<"lwm2m">>,
xml_dir => <<"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml">>,
xml_dir => list_to_binary(XmlDir),
lifetime_min => <<"1s">>,
lifetime_max => <<"1000s">>,
qmode_time_window => <<"30s">>,

View File

@ -66,6 +66,7 @@ end_per_group(AuthName, Conf) ->
init_per_suite(Config) ->
emqx_config:erase(gateway),
emqx_gateway_test_utils:load_all_gateway_apps(),
init_gateway_conf(),
meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_authz_file, create, fun(S) -> S end),
@ -225,7 +226,7 @@ t_case_sn_subscribe(_) ->
)
end,
Sub(<<"/subscribe">>, fun(Data) ->
{ok, Msg, _, _} = emqx_sn_frame:parse(Data, undefined),
{ok, Msg, _, _} = emqx_mqttsn_frame:parse(Data, undefined),
?assertMatch({mqtt_sn_message, _, {_, 3, 0, Payload}}, Msg)
end),
Sub(<<"/badsubscribe">>, fun(Data) ->

View File

@ -62,6 +62,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_config:erase(gateway),
emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]),
Conf.
@ -116,11 +117,11 @@ t_gateway_registry_usage(_) ->
t_gateway_registry_list(_) ->
emqx_gateway_cli:'gateway-registry'(["list"]),
?assertEqual(
"Registered Name: coap, Callback Module: emqx_coap_impl\n"
"Registered Name: exproto, Callback Module: emqx_exproto_impl\n"
"Registered Name: lwm2m, Callback Module: emqx_lwm2m_impl\n"
"Registered Name: mqttsn, Callback Module: emqx_sn_impl\n"
"Registered Name: stomp, Callback Module: emqx_stomp_impl\n",
"Registered Name: coap, Callback Module: emqx_coap\n"
"Registered Name: exproto, Callback Module: emqx_exproto\n"
"Registered Name: lwm2m, Callback Module: emqx_lwm2m\n"
"Registered Name: mqttsn, Callback Module: emqx_mqttsn\n"
"Registered Name: stomp, Callback Module: emqx_stomp\n",
acc_print()
).

View File

@ -34,6 +34,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_config:erase(gateway),
emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([]),

View File

@ -34,6 +34,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_config:erase(gateway),
emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([]),
Conf.

View File

@ -37,6 +37,7 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_common_test_helpers:load_config(emqx_gateway_schema, <<"gateway {}">>),
emqx_common_test_helpers:start_apps([emqx_conf, emqx_authn, emqx_gateway]),
Conf.

View File

@ -28,6 +28,7 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_gateway_test_utils:load_all_gateway_apps(),
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(
emqx_access_control,

View File

@ -33,6 +33,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
emqx_config:erase(gateway),
emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([]),
Conf.

View File

@ -37,6 +37,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
%%--------------------------------------------------------------------
init_per_suite(Cfg) ->
emqx_gateway_test_utils:load_all_gateway_apps(),
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([emqx_authn, emqx_gateway]),
Cfg.

View File

@ -101,6 +101,12 @@ assert_fields_exist(Ks, Map) ->
end,
Ks
).
load_all_gateway_apps() ->
application:load(emqx_stomp),
application:load(emqx_mqttsn),
application:load(emqx_coap),
application:load(emqx_lwm2m),
application:load(emqx_exproto).
%%--------------------------------------------------------------------
%% http

19
apps/emqx_lwm2m/.gitignore vendored Normal file
View File

@ -0,0 +1,19 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~

61
apps/emqx_lwm2m/README.md Normal file
View File

@ -0,0 +1,61 @@
# emqx_lwm2m
[LwM2M (Lightweight Machine-to-Machine)](https://lwm2m.openmobilealliance.org/)
is a protocol designed for IoT devices and machine-to-machine communication.
It is a lightweight protocol that supports devices with limited processing power and memory.
The **LwM2M Gateway** in EMQX can accept LwM2M clients and translate theirevents
and messages into MQTT Publish messages.
In the current implementation, it has the following limitations:
- Based UDP/DTLS transport.
- Only supports v1.0.2. The v1.1.x and v1.2.x is not supported yet.
- Not included LwM2M Bootstrap services.
## Quick Start
In EMQX 5.0, LwM2M gateways can be configured and enabled through the Dashboard.
It can also be enabled via the HTTP API, and emqx.conf e.g, In emqx.conf:
```properties
gateway.lwm2m {
xml_dir = "etc/lwm2m_xml/"
auto_observe = true
enable_stats = true
idle_timeout = "30s"
lifetime_max = "86400s"
lifetime_min = "1s"
mountpoint = "lwm2m/${endpoint_namea}/"
qmode_time_window = "22s"
update_msg_publish_condition = "contains_object_list"
translators {
command {qos = 0, topic = "dn/#"}
notify {qos = 0, topic = "up/notify"}
register {qos = 0, topic = "up/resp"}
response {qos = 0, topic = "up/resp"}
update {qos = 0, topic = "up/update"}
}
listeners {
udp {
default {
bind = "5783"
max_conn_rate = 1000
max_connections = 1024000
}
}
}
}
```
> Note:
> Configuring the gateway via emqx.conf requires changes on a per-node basis,
> but configuring it via Dashboard or the HTTP API will take effect across the cluster.
:::
## Object definations
emqx_lwm2m needs object definitions to parse data from lwm2m devices. Object definitions are declared by organizations in XML format, you could find those XMLs from [LwM2MRegistry](http://www.openmobilealliance.org/wp/OMNA/LwM2M/LwM2MRegistry.html), download and put them into the directory specified by `lwm2m.xml_dir`. If no associated object definition is found, response from device will be discarded and report an error message in log.
More documentations: [LwM2M Gateway](https://www.emqx.io/docs/en/v5.0/gateway/lwm2m.html)

View File

@ -0,0 +1,4 @@
{erl_opts, [debug_info]}.
{deps, [ {emqx, {path, "../../apps/emqx"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}}
]}.

View File

@ -0,0 +1,10 @@
{application, emqx_lwm2m, [
{description, "LwM2M Gateway"},
{vsn, "0.1.0"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway, emqx_coap]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -14,35 +14,37 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The LwM2M Gateway Implement interface
-module(emqx_lwm2m_impl).
-behaviour(emqx_gateway_impl).
%% @doc The LwM2M Gateway implement
-module(emqx_lwm2m).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
%% APIs
-export([
reg/0,
unreg/0
]).
%% define a gateway named stomp
-gateway(#{
name => lwm2m,
callback_module => ?MODULE,
config_schema_module => emqx_lwm2m_schema
}).
%% callback_module must implement the emqx_gateway_impl behaviour
-behaviour(emqx_gateway_impl).
%% callback for emqx_gateway_impl
-export([
on_gateway_load/2,
on_gateway_update/3,
on_gateway_unload/2
]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
reg() ->
RegistryOptions = [{cbkmod, ?MODULE}],
emqx_gateway_registry:reg(lwm2m, RegistryOptions).
unreg() ->
emqx_gateway_registry:unreg(lwm2m).
-import(
emqx_gateway_utils,
[
normalize_config/1,
start_listeners/4,
stop_listeners/2
]
).
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks

View File

@ -32,6 +32,8 @@
-import(hoconsc, [mk/2, ref/1, ref/2]).
-import(emqx_dashboard_swagger, [error_codes/2]).
-elvis([{elvis_style, atom_naming_convention, disable}]).
namespace() -> "lwm2m".
api_spec() ->

View File

@ -16,9 +16,9 @@
-module(emqx_lwm2m_channel).
-include("emqx_lwm2m.hrl").
-include_lib("emqx/include/logger.hrl").
-include("src/coap/include/emqx_coap.hrl").
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-include_lib("emqx_coap/include/emqx_coap.hrl").
%% API
-export([
@ -464,14 +464,14 @@ check_lwm2m_version(
_ ->
false
end,
if
IsValid ->
case IsValid of
true ->
NConnInfo = ConnInfo#{
connected_at => erlang:system_time(millisecond),
proto_ver => Ver
},
{ok, Channel#channel{conninfo = NConnInfo}};
true ->
_ ->
?SLOG(error, #{
msg => "reject_REGISTRE_request",
reason => {unsupported_version, Ver}

View File

@ -16,9 +16,9 @@
-module(emqx_lwm2m_cmd).
-include("emqx_lwm2m.hrl").
-include_lib("emqx/include/logger.hrl").
-include("src/coap/include/emqx_coap.hrl").
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-include_lib("emqx_coap/include/emqx_coap.hrl").
-export([
mqtt_to_coap/2,
@ -292,9 +292,9 @@ make_response(Code, Ref = #{}) ->
BaseRsp = make_base_response(Ref),
make_data_response(BaseRsp, Code).
make_response(Code, Ref = #{}, _Format, Result) ->
make_response(Code, Ref = #{}, Format, Result) ->
BaseRsp = make_base_response(Ref),
make_data_response(BaseRsp, Code, _Format, Result).
make_data_response(BaseRsp, Code, Format, Result).
%% The base response format is what included in the request:
%%

View File

@ -24,7 +24,7 @@
translate_json/1
]).
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-include("emqx_lwm2m.hrl").
tlv_to_json(BaseName, TlvData) ->
DecodedTlv = emqx_lwm2m_tlv:parse(TlvData),
@ -412,9 +412,11 @@ byte_size_of_signed(UInt) ->
byte_size_of_signed(UInt, N) ->
BitSize = (8 * N - 1),
Max = (1 bsl BitSize),
if
UInt =< Max -> N;
UInt > Max -> byte_size_of_signed(UInt, N + 1)
case UInt =< Max of
true ->
N;
false ->
byte_size_of_signed(UInt, N + 1)
end.
binary_to_number(NumStr) ->

View File

@ -0,0 +1,184 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_schema).
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-type duration() :: non_neg_integer().
-type duration_s() :: non_neg_integer().
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
-reflect_type([duration/0, duration_s/0]).
%% config schema provides
-export([fields/1, desc/1]).
fields(lwm2m) ->
[
{xml_dir,
sc(
binary(),
#{
%% since this is not packaged with emqx, nor
%% present in the packages, we must let the user
%% specify it rather than creating a dynamic
%% default (especially difficult to handle when
%% generating docs).
example => <<"/etc/emqx/lwm2m_xml">>,
required => true,
desc => ?DESC(lwm2m_xml_dir)
}
)},
{lifetime_min,
sc(
duration(),
#{
default => <<"15s">>,
desc => ?DESC(lwm2m_lifetime_min)
}
)},
{lifetime_max,
sc(
duration(),
#{
default => <<"86400s">>,
desc => ?DESC(lwm2m_lifetime_max)
}
)},
{qmode_time_window,
sc(
duration_s(),
#{
default => <<"22s">>,
desc => ?DESC(lwm2m_qmode_time_window)
}
)},
%% TODO: Support config resource path
{auto_observe,
sc(
boolean(),
#{
default => false,
desc => ?DESC(lwm2m_auto_observe)
}
)},
%% FIXME: not working now
{update_msg_publish_condition,
sc(
hoconsc:enum([always, contains_object_list]),
#{
default => contains_object_list,
desc => ?DESC(lwm2m_update_msg_publish_condition)
}
)},
{translators,
sc(
ref(lwm2m_translators),
#{
required => true,
desc => ?DESC(lwm2m_translators)
}
)},
{mountpoint, emqx_gateway_schema:mountpoint("lwm2m/${endpoint_name}/")},
{listeners, sc(ref(emqx_gateway_schema, udp_listeners), #{desc => ?DESC(udp_listeners)})}
] ++ emqx_gateway_schema:gateway_common_options();
fields(lwm2m_translators) ->
[
{command,
sc(
ref(translator),
#{
desc => ?DESC(lwm2m_translators_command),
required => true
}
)},
{response,
sc(
ref(translator),
#{
desc => ?DESC(lwm2m_translators_response),
required => true
}
)},
{notify,
sc(
ref(translator),
#{
desc => ?DESC(lwm2m_translators_notify),
required => true
}
)},
{register,
sc(
ref(translator),
#{
desc => ?DESC(lwm2m_translators_register),
required => true
}
)},
{update,
sc(
ref(translator),
#{
desc => ?DESC(lwm2m_translators_update),
required => true
}
)}
];
fields(translator) ->
[
{topic,
sc(
binary(),
#{
required => true,
desc => ?DESC(translator_topic)
}
)},
{qos,
sc(
emqx_schema:qos(),
#{
default => 0,
desc => ?DESC(translator_qos)
}
)}
].
desc(lwm2m) ->
"The LwM2M protocol gateway.";
desc(lwm2m_translators) ->
"MQTT topics that correspond to LwM2M events.";
desc(translator) ->
"MQTT topic that corresponds to a particular type of event.";
desc(_) ->
undefined.
%%--------------------------------------------------------------------
%% helpers
sc(Type, Meta) ->
hoconsc:mk(Type, Meta).
ref(StructName) ->
ref(?MODULE, StructName).
ref(Mod, Field) ->
hoconsc:ref(Mod, Field).

View File

@ -15,12 +15,12 @@
%%--------------------------------------------------------------------
-module(emqx_lwm2m_session).
-include("src/coap/include/emqx_coap.hrl").
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-include("emqx_lwm2m.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_coap/include/emqx_coap.hrl").
%% API
-export([
@ -379,8 +379,8 @@ is_alternate_path(LinkAttrs) ->
true;
[AttrKey, _] when AttrKey =/= <<>> ->
false;
_BadAttr ->
throw({bad_attr, _BadAttr})
BadAttr ->
throw({bad_attr, BadAttr})
end
end,
LinkAttrs
@ -679,10 +679,10 @@ send_to_coap(#session{queue = Queue} = Session) ->
case queue:out(Queue) of
{{value, {Timestamp, Ctx, Req}}, Q2} ->
Now = ?NOW,
if
Timestamp =:= 0 orelse Timestamp > Now ->
send_to_coap(Ctx, Req, Session#session{queue = Q2});
case Timestamp =:= 0 orelse Timestamp > Now of
true ->
send_to_coap(Ctx, Req, Session#session{queue = Q2});
false ->
send_to_coap(Session#session{queue = Q2})
end;
{empty, _} ->

View File

@ -25,7 +25,7 @@
-export([binary_to_hex_string/1]).
-endif.
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-include("emqx_lwm2m.hrl").
-define(TLV_TYPE_OBJECT_INSTANCE, 0).
-define(TLV_TYPE_RESOURCE_INSTANCE, 1).
@ -37,13 +37,18 @@
-define(TLV_LEGNTH_16_BIT, 2).
-define(TLV_LEGNTH_24_BIT, 3).
%----------------------------------------------------------------------------------------------------------------------------------------
% [#{tlv_object_instance := Id11, value := Value11}, #{tlv_object_instance := Id12, value := Value12}, ...]
-elvis([{elvis_style, no_if_expression, disable}]).
%%--------------------------------------------------------------------
% [#{tlv_object_instance := Id11, value := Value11},
% #{tlv_object_instance := Id12, value := Value12}, ...]
% where Value11 and Value12 is a list:
% [#{tlv_resource_with_value => Id21, value => Value21}, #{tlv_multiple_resource => Id22, value = Value22}, ...]
% [#{tlv_resource_with_value => Id21, value => Value21},
% #{tlv_multiple_resource => Id22, value = Value22}, ...]
% where Value21 is a binary
% Value22 is a list:
% [#{tlv_resource_instance => Id31, value => Value31}, #{tlv_resource_instance => Id32, value => Value32}, ...]
% [#{tlv_resource_instance => Id31, value => Value31},
% #{tlv_resource_instance => Id32, value => Value32}, ...]
% where Value31 and Value32 is a binary
%
% correspond to three levels:
@ -51,8 +56,9 @@
% 2) Resource Level
% 3) Resource Instance Level
%
% NOTE: TLV does not has object level, only has object instance level. It implies TLV can not represent multiple objects
%----------------------------------------------------------------------------------------------------------------------------------------
% NOTE: TLV does not has object level, only has object instance level.
% It implies TLV can not represent multiple objects
%%--------------------------------------------------------------------
parse(Data) ->
parse_loop(Data, []).

View File

@ -16,7 +16,7 @@
-module(emqx_lwm2m_xml_object).
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-include("emqx_lwm2m.hrl").
-include_lib("xmerl/include/xmerl.hrl").
-export([

View File

@ -16,7 +16,7 @@
-module(emqx_lwm2m_xml_object_db).
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-include("emqx_lwm2m.hrl").
-include_lib("xmerl/include/xmerl.hrl").
-include_lib("emqx/include/logger.hrl").
@ -45,6 +45,8 @@
-record(state, {}).
-elvis([{elvis_style, atom_naming_convention, disable}]).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
@ -124,10 +126,10 @@ code_change(_OldVsn, State, _Extra) ->
load(BaseDir) ->
Wild = filename:join(BaseDir, "*.xml"),
Wild2 =
if
is_binary(Wild) ->
erlang:binary_to_list(Wild);
case is_binary(Wild) of
true ->
erlang:binary_to_list(Wild);
false ->
Wild
end,
case filelib:wildcard(Wild2) of

View File

@ -31,8 +31,8 @@
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-include("src/coap/include/emqx_coap.hrl").
-include("emqx_lwm2m.hrl").
-include_lib("emqx_coap/include/emqx_coap.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -134,6 +134,7 @@ groups() ->
init_per_suite(Config) ->
%% load application first for minirest api searching
application:load(emqx_gateway),
application:load(emqx_lwm2m),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn]),
Config.
@ -176,11 +177,19 @@ default_config() ->
default_config(#{}).
default_config(Overrides) ->
XmlDir = filename:join(
[
emqx_common_test_helpers:proj_root(),
"apps",
"emqx_lwm2m",
"lwm2m_xml"
]
),
iolist_to_binary(
io_lib:format(
"\n"
"gateway.lwm2m {\n"
" xml_dir = \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\"\n"
" xml_dir = \"~s\"\n"
" lifetime_min = 1s\n"
" lifetime_max = 86400s\n"
" qmode_time_window = 22\n"
@ -199,6 +208,7 @@ default_config(Overrides) ->
" }\n"
"}\n",
[
XmlDir,
maps:get(auto_observe, Overrides, false),
maps:get(bind, Overrides, ?PORT)
]

View File

@ -23,34 +23,11 @@
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-include("src/coap/include/emqx_coap.hrl").
-include("emqx_lwm2m.hrl").
-include("emqx_coap/include/emqx_coap.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<
"\n"
"gateway.lwm2m {\n"
" xml_dir = \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\"\n"
" lifetime_min = 100s\n"
" lifetime_max = 86400s\n"
" qmode_time_window = 200\n"
" auto_observe = false\n"
" mountpoint = \"lwm2m/${username}\"\n"
" update_msg_publish_condition = contains_object_list\n"
" translators {\n"
" command = {topic = \"/dn/#\", qos = 0}\n"
" response = {topic = \"/up/resp\", qos = 0}\n"
" notify = {topic = \"/up/notify\", qos = 0}\n"
" register = {topic = \"/up/resp\", qos = 0}\n"
" update = {topic = \"/up/resp\", qos = 0}\n"
" }\n"
" listeners.udp.default {\n"
" bind = 5783\n"
" }\n"
"}\n"
>>).
-define(assertExists(Map, Key),
?assertNotEqual(maps:get(Key, Map, undefined), undefined)
).
@ -81,8 +58,10 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
application:load(emqx_gateway),
application:load(emqx_lwm2m),
DefaultConfig = emqx_lwm2m_SUITE:default_config(),
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, DefaultConfig),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn]),
Config.
@ -93,7 +72,8 @@ end_per_suite(Config) ->
Config.
init_per_testcase(_AllTestCase, Config) ->
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
DefaultConfig = emqx_lwm2m_SUITE:default_config(),
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, DefaultConfig),
{ok, _} = application:ensure_all_started(emqx_gateway),
{ok, ClientUdpSock} = gen_udp:open(0, [binary, {active, false}]),

View File

@ -21,8 +21,8 @@
-define(LOGT(Format, Args), logger:debug("TEST_SUITE: " ++ Format, Args)).
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-include("src/coap/include/emqx_coap.hrl").
-include("emqx_lwm2m.hrl").
-include("emqx_coap/include/emqx_coap.hrl").
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------

View File

@ -45,6 +45,7 @@ init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
raw_with_default => true
}),
emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authn, emqx_authz, emqx_modules],
fun set_special_configs/1

19
apps/emqx_mqttsn/.gitignore vendored Normal file
View File

@ -0,0 +1,19 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~

View File

@ -0,0 +1,34 @@
# emqx_mqttsn
The MQTT-SN gateway is based on the
[MQTT-SN v1.2](https://www.oasis-open.org/committees/download.php/66091/MQTT-SN_spec_v1.2.pdf).
## Quick Start
In EMQX 5.0, MQTT-SN gateway can be configured and enabled through the Dashboard.
It can also be enabled via the HTTP API or emqx.conf, e.g. In emqx.conf:
```properties
gateway.mqttsn {
mountpoint = "mqtt/sn"
gateway_id = 1
broadcast = true
enable_qos3 = true
listeners.udp.default {
bind = 1884
max_connections = 10240000 max_conn_rate = 1000
}
}
```
> Note:
> Configuring the gateway via emqx.conf requires changes on a per-node basis,
> but configuring it via Dashboard or the HTTP API will take effect across the cluster.
More documentations: [MQTT-SN Gateway](https://www.emqx.io/docs/en/v5.0/gateway/mqttsn.html)

View File

@ -0,0 +1,4 @@
{erl_opts, [debug_info]}.
{deps, [ {emqx, {path, "../../apps/emqx"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}}
]}.

View File

@ -0,0 +1,10 @@
{application, emqx_mqttsn, [
{description, "MQTT-SN Gateway"},
{vsn, "0.1.0"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,13 +14,28 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The MQTT-SN Gateway Implement interface
-module(emqx_sn_impl).
-behaviour(emqx_gateway_impl).
%% @doc The MQTT-SN Gateway implement interface
-module(emqx_mqttsn).
-include_lib("emqx/include/logger.hrl").
%% define a gateway named stomp
-gateway(#{
name => mqttsn,
callback_module => ?MODULE,
config_schema_module => emqx_mqttsn_schema
}).
%% callback_module must implement the emqx_gateway_impl behaviour
-behaviour(emqx_gateway_impl).
%% callback for emqx_gateway_impl
-export([
on_gateway_load/2,
on_gateway_update/3,
on_gateway_unload/2
]).
-import(
emqx_gateway_utils,
[
@ -30,31 +45,8 @@
]
).
%% APIs
-export([
reg/0,
unreg/0
]).
-export([
on_gateway_load/2,
on_gateway_update/3,
on_gateway_unload/2
]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
reg() ->
RegistryOptions = [{cbkmod, ?MODULE}],
emqx_gateway_registry:reg(mqttsn, RegistryOptions).
unreg() ->
emqx_gateway_registry:unreg(mqttsn).
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%% emqx_gateway_impl callbacks
%%--------------------------------------------------------------------
on_gateway_load(
@ -64,8 +56,8 @@ on_gateway_load(
},
Ctx
) ->
%% We Also need to start `emqx_sn_broadcast` &
%% `emqx_sn_registry` process
%% We Also need to start `emqx_mqttsn_broadcast` &
%% `emqx_mqttsn_registry` process
case maps:get(broadcast, Config, false) of
false ->
ok;
@ -73,23 +65,23 @@ on_gateway_load(
%% FIXME:
Port = 1884,
SnGwId = maps:get(gateway_id, Config, undefined),
_ = emqx_sn_broadcast:start_link(SnGwId, Port),
_ = emqx_mqttsn_broadcast:start_link(SnGwId, Port),
ok
end,
PredefTopics = maps:get(predefined, Config, []),
{ok, RegistrySvr} = emqx_sn_registry:start_link(GwName, PredefTopics),
{ok, RegistrySvr} = emqx_mqttsn_registry:start_link(GwName, PredefTopics),
NConfig = maps:without(
[broadcast, predefined],
Config#{registry => emqx_sn_registry:lookup_name(RegistrySvr)}
Config#{registry => emqx_mqttsn_registry:lookup_name(RegistrySvr)}
),
Listeners = emqx_gateway_utils:normalize_config(NConfig),
ModCfg = #{
frame_mod => emqx_sn_frame,
chann_mod => emqx_sn_channel
frame_mod => emqx_mqttsn_frame,
chann_mod => emqx_mqttsn_channel
},
case

View File

@ -14,17 +14,11 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sn_broadcast).
-module(emqx_mqttsn_broadcast).
-behaviour(gen_server).
-ifdef(TEST).
%% make rebar3 ct happy when testing with --suite path/to/module_SUITE.erl
-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl").
-else.
%% make mix happy
-include("src/mqttsn/include/emqx_sn.hrl").
-endif.
-include("emqx_mqttsn.hrl").
-include_lib("emqx/include/logger.hrl").
-export([
@ -65,7 +59,7 @@ stop() ->
init([GwId, Port]) ->
%% FIXME:
Duration = application:get_env(emqx_sn, advertise_duration, ?DEFAULT_DURATION),
Duration = application:get_env(emqx_mqttsn, advertise_duration, ?DEFAULT_DURATION),
{ok, Sock} = gen_udp:open(0, [binary, {broadcast, true}]),
{ok,
ensure_advertise(#state{
@ -121,7 +115,7 @@ send_advertise(#state{
addrs = Addrs,
duration = Duration
}) ->
Data = emqx_sn_frame:serialize_pkt(?SN_ADVERTISE_MSG(GwId, Duration), #{}),
Data = emqx_mqttsn_frame:serialize_pkt(?SN_ADVERTISE_MSG(GwId, Duration), #{}),
lists:foreach(
fun(Addr) ->
?SLOG(debug, #{

View File

@ -14,11 +14,11 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sn_channel).
-module(emqx_mqttsn_channel).
-behaviour(emqx_gateway_channel).
-include("src/mqttsn/include/emqx_sn.hrl").
-include("emqx_mqttsn.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
@ -51,7 +51,7 @@
%% Context
ctx :: emqx_gateway_ctx:context(),
%% Registry
registry :: emqx_sn_registry:registry(),
registry :: emqx_mqttsn_registry:registry(),
%% Gateway Id
gateway_id :: integer(),
%% Enable QoS3
@ -478,7 +478,7 @@ handle_in(
true ->
<<TopicId:16>>;
false ->
emqx_sn_registry:lookup_topic(
emqx_mqttsn_registry:lookup_topic(
Registry,
?NEG_QOS_CLIENT_ID,
TopicId
@ -624,7 +624,7 @@ handle_in(
clientinfo = #{clientid := ClientId}
}
) ->
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of
TopicId when is_integer(TopicId) ->
?SLOG(debug, #{
msg => "registered_topic_name",
@ -778,7 +778,7 @@ handle_in(
{ok, Channel}
end;
?SN_RC_INVALID_TOPIC_ID ->
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
case emqx_mqttsn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined ->
{ok, Channel};
TopicName ->
@ -1093,7 +1093,7 @@ convert_topic_id_to_name(
clientinfo = #{clientid := ClientId}
}
) ->
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
case emqx_mqttsn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined ->
{error, ?SN_RC_INVALID_TOPIC_ID};
TopicName ->
@ -1202,7 +1202,7 @@ preproc_subs_type(
%% If the gateway is able accept the subscription,
%% it assigns a topic id to the received topic name
%% and returns it within a SUBACK message
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of
{error, too_large} ->
{error, ?SN_RC2_EXCEED_LIMITATION};
{error, wildcard_topic} ->
@ -1228,7 +1228,7 @@ preproc_subs_type(
}
) ->
case
emqx_sn_registry:lookup_topic(
emqx_mqttsn_registry:lookup_topic(
Registry,
ClientId,
TopicId
@ -1344,7 +1344,7 @@ preproc_unsub_type(
}
) ->
case
emqx_sn_registry:lookup_topic(
emqx_mqttsn_registry:lookup_topic(
Registry,
ClientId,
TopicId
@ -1765,7 +1765,7 @@ message_to_packet(
?QOS_0 -> 0;
_ -> MsgId
end,
case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of
case emqx_mqttsn_registry:lookup_topic_id(Registry, ClientId, Topic) of
{predef, PredefTopicId} ->
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_PREDEFINED_TOPIC},
?SN_PUBLISH_MSG(Flags, PredefTopicId, NMsgId, Payload);
@ -1932,9 +1932,9 @@ ensure_registered_topic_name(
Channel = #channel{registry = Registry}
) ->
ClientId = clientid(Channel),
case emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) of
case emqx_mqttsn_registry:lookup_topic_id(Registry, ClientId, TopicName) of
undefined ->
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of
{error, Reason} -> {error, Reason};
TopicId -> {ok, TopicId}
end;

View File

@ -16,11 +16,11 @@
%%--------------------------------------------------------------------
%% @doc The frame parser for MQTT-SN protocol
-module(emqx_sn_frame).
-module(emqx_mqttsn_frame).
-behaviour(emqx_gateway_frame).
-include("src/mqttsn/include/emqx_sn.hrl").
-include("emqx_mqttsn.hrl").
-export([
initial_parse_state/1,
@ -58,10 +58,10 @@ serialize_opts() ->
%% Parse MQTT-SN Message
%%--------------------------------------------------------------------
parse(<<16#01:?byte, Len:?short, Type:?byte, Var/binary>>, _State) ->
{ok, parse(Type, Len - 4, Var), <<>>, _State};
parse(<<Len:?byte, Type:?byte, Var/binary>>, _State) ->
{ok, parse(Type, Len - 2, Var), <<>>, _State}.
parse(<<16#01:?byte, Len:?short, Type:?byte, Var/binary>>, State) ->
{ok, parse(Type, Len - 4, Var), <<>>, State};
parse(<<Len:?byte, Type:?byte, Var/binary>>, State) ->
{ok, parse(Type, Len - 2, Var), <<>>, State}.
parse(Type, Len, Var) when Len =:= size(Var) ->
#mqtt_sn_message{type = Type, variable = parse_var(Type, Var)};
@ -160,9 +160,11 @@ parse_topic(2#11, Topic) -> Topic.
serialize_pkt(#mqtt_sn_message{type = Type, variable = Var}, Opts) ->
VarBin = serialize(Type, Var, Opts),
VarLen = size(VarBin),
if
VarLen < 254 -> <<(VarLen + 2), Type, VarBin/binary>>;
true -> <<16#01, (VarLen + 4):?short, Type, VarBin/binary>>
case VarLen < 254 of
true ->
<<(VarLen + 2), Type, VarBin/binary>>;
false ->
<<16#01, (VarLen + 4):?short, Type, VarBin/binary>>
end.
serialize(?SN_ADVERTISE, {GwId, Duration}, _Opts) ->
@ -438,7 +440,7 @@ format(?SN_DISCONNECT_MSG(Duration)) ->
format(#mqtt_sn_message{type = Type, variable = Var}) ->
io_lib:format(
"mqtt_sn_message(type=~s, Var=~w)",
[emqx_sn_frame:message_type(Type), Var]
[emqx_mqttsn_frame:message_type(Type), Var]
).
is_message(#mqtt_sn_message{type = Type}) when

View File

@ -15,13 +15,11 @@
%%--------------------------------------------------------------------
%% @doc The MQTT-SN Topic Registry
%%
%% XXX:
-module(emqx_sn_registry).
-module(emqx_mqttsn_registry).
-behaviour(gen_server).
-include("src/mqttsn/include/emqx_sn.hrl").
-include("emqx_mqttsn.hrl").
-include_lib("emqx/include/logger.hrl").
-export([start_link/2]).
@ -53,11 +51,11 @@
-export([lookup_name/1]).
-define(SN_SHARD, emqx_sn_shard).
-define(SN_SHARD, emqx_mqttsn_shard).
-record(state, {tabname, max_predef_topic_id = 0}).
-record(emqx_sn_registry, {key, value}).
-record(emqx_mqttsn_registry, {key, value}).
-type registry() :: {Tab :: atom(), RegistryPid :: pid()}.
@ -126,7 +124,7 @@ lookup_name(Pid) ->
%%-----------------------------------------------------------------------------
name(InstaId) ->
list_to_atom(lists:concat([emqx_sn_, InstaId, '_registry'])).
list_to_atom(lists:concat([emqx_mqttsn_, InstaId, '_registry'])).
init([InstaId, PredefTopics]) ->
%% {predef, TopicId} -> TopicName
@ -136,8 +134,8 @@ init([InstaId, PredefTopics]) ->
Tab = name(InstaId),
ok = mria:create_table(Tab, [
{storage, ram_copies},
{record_name, emqx_sn_registry},
{attributes, record_info(fields, emqx_sn_registry)},
{record_name, emqx_mqttsn_registry},
{attributes, record_info(fields, emqx_mqttsn_registry)},
{storage_properties, [{ets, [{read_concurrency, true}]}]},
{rlog_shard, ?SN_SHARD}
]),
@ -145,17 +143,17 @@ init([InstaId, PredefTopics]) ->
MaxPredefId = lists:foldl(
fun(#{id := TopicId, topic := TopicName0}, AccId) ->
TopicName = iolist_to_binary(TopicName0),
mria:dirty_write(Tab, #emqx_sn_registry{
mria:dirty_write(Tab, #emqx_mqttsn_registry{
key = {predef, TopicId},
value = TopicName
}),
mria:dirty_write(Tab, #emqx_sn_registry{
mria:dirty_write(Tab, #emqx_mqttsn_registry{
key = {predef, TopicName},
value = TopicId
}),
if
TopicId > AccId -> TopicId;
true -> AccId
case TopicId > AccId of
true -> TopicId;
false -> AccId
end
end,
0,
@ -193,7 +191,7 @@ handle_call(
handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) ->
Registry = mnesia:dirty_match_object(
Tab,
{emqx_sn_registry, {ClientId, '_'}, '_'}
{emqx_mqttsn_registry, {ClientId, '_'}, '_'}
),
lists:foreach(
fun(R) ->
@ -234,7 +232,7 @@ code_change(_OldVsn, State, _Extra) ->
do_register(Tab, ClientId, TopicId, TopicName) ->
mnesia:write(
Tab,
#emqx_sn_registry{
#emqx_mqttsn_registry{
key = {ClientId, next_topic_id},
value = TopicId + 1
},
@ -242,7 +240,7 @@ do_register(Tab, ClientId, TopicId, TopicName) ->
),
mnesia:write(
Tab,
#emqx_sn_registry{
#emqx_mqttsn_registry{
key = {ClientId, TopicName},
value = TopicId
},
@ -250,7 +248,7 @@ do_register(Tab, ClientId, TopicId, TopicName) ->
),
mnesia:write(
Tab,
#emqx_sn_registry{
#emqx_mqttsn_registry{
key = {ClientId, TopicId},
value = TopicName
},
@ -261,6 +259,6 @@ do_register(Tab, ClientId, TopicId, TopicName) ->
next_topic_id(Tab, PredefId, ClientId) ->
case mnesia:dirty_read(Tab, {ClientId, next_topic_id}) of
[#emqx_sn_registry{value = Id}] -> Id;
[#emqx_mqttsn_registry{value = Id}] -> Id;
[] -> PredefId + 1
end.

Some files were not shown because too many files have changed in this diff Show More