Compare commits
43 Commits
master
...
fix_print_
Author | SHA1 | Date |
---|---|---|
![]() |
30ed74bc80 | |
![]() |
73e0cbd644 | |
![]() |
750cb2d491 | |
![]() |
780e403262 | |
![]() |
05b16c601b | |
![]() |
8935d28ed4 | |
![]() |
0c66fcef00 | |
![]() |
637cd5e804 | |
![]() |
5fbf83e7f0 | |
![]() |
513cd001ac | |
![]() |
868cd6e57c | |
![]() |
a8aabd5f74 | |
![]() |
002cbb6d8b | |
![]() |
e87838f272 | |
![]() |
f18b9a92bc | |
![]() |
49a78c8ef2 | |
![]() |
8ad42cb827 | |
![]() |
f17962e79a | |
![]() |
98c4fff43f | |
![]() |
bfc6c3aa42 | |
![]() |
1a438125c7 | |
![]() |
2092bedb12 | |
![]() |
a6bd1c90d5 | |
![]() |
3ddbdbc6c1 | |
![]() |
2c0916ff05 | |
![]() |
77a41ea88f | |
![]() |
b92940af29 | |
![]() |
bed45417dc | |
![]() |
8110ef7a64 | |
![]() |
ecec9bd2f6 | |
![]() |
6724e59e7a | |
![]() |
5962c9c83c | |
![]() |
c0367fb8dd | |
![]() |
0ecaa80fb8 | |
![]() |
bdd9154001 | |
![]() |
bbed1b55e0 | |
![]() |
074c0bd2cc | |
![]() |
69ef5cbdc3 | |
![]() |
42a6f2aba5 | |
![]() |
0184a1b3e8 | |
![]() |
86766ee7f1 | |
![]() |
8eebdd5cdb | |
![]() |
1f57968c9b |
|
@ -1,7 +1,7 @@
|
||||||
{erl_opts, [debug_info]}.
|
{erl_opts, [debug_info]}.
|
||||||
{deps,
|
{deps,
|
||||||
[
|
[
|
||||||
{minirest, {git, "https://github.com/emqx/minirest.git", {tag, "0.3.5"}}}
|
{minirest, {git, "https://github.com/emqx/minirest.git", {tag, "0.3.6"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{shell, [
|
{shell, [
|
||||||
|
|
|
@ -43,7 +43,7 @@
|
||||||
!sed -i '/emqx_telemetry/d' data/loaded_plugins
|
!sed -i '/emqx_telemetry/d' data/loaded_plugins
|
||||||
|
|
||||||
!./bin/emqx start
|
!./bin/emqx start
|
||||||
?EMQ X (.*) is started successfully!
|
?EMQ X .* is started successfully!
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
!./bin/emqx_ctl cluster join emqx@127.0.0.1
|
!./bin/emqx_ctl cluster join emqx@127.0.0.1
|
||||||
|
@ -99,6 +99,10 @@
|
||||||
"""
|
"""
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
|
!./bin/emqx_ctl plugins list | grep emqx_management
|
||||||
|
?Plugin\(emqx_management.*active=true\)
|
||||||
|
?SH-PROMPT
|
||||||
|
|
||||||
[shell emqx2]
|
[shell emqx2]
|
||||||
!echo "" > log/emqx.log.1
|
!echo "" > log/emqx.log.1
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
@ -120,6 +124,10 @@
|
||||||
"""
|
"""
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
|
!./bin/emqx_ctl plugins list | grep emqx_management
|
||||||
|
?Plugin\(emqx_management.*active=true\)
|
||||||
|
?SH-PROMPT
|
||||||
|
|
||||||
[shell bench]
|
[shell bench]
|
||||||
???publish complete
|
???publish complete
|
||||||
??SH-PROMPT:
|
??SH-PROMPT:
|
||||||
|
|
|
@ -83,6 +83,7 @@ jobs:
|
||||||
- name: build
|
- name: build
|
||||||
env:
|
env:
|
||||||
PYTHON: python
|
PYTHON: python
|
||||||
|
DIAGNOSTIC: 1
|
||||||
run: |
|
run: |
|
||||||
$env:PATH = "${{ steps.install_erlang.outputs.erlpath }}\bin;$env:PATH"
|
$env:PATH = "${{ steps.install_erlang.outputs.erlpath }}\bin;$env:PATH"
|
||||||
|
|
||||||
|
@ -168,9 +169,11 @@ jobs:
|
||||||
- name: build
|
- name: build
|
||||||
run: |
|
run: |
|
||||||
. $HOME/.kerl/${{ matrix.erl_otp }}/activate
|
. $HOME/.kerl/${{ matrix.erl_otp }}/activate
|
||||||
make -C source ensure-rebar3
|
cd source
|
||||||
sudo cp source/rebar3 /usr/local/bin/rebar3
|
make ensure-rebar3
|
||||||
make -C source ${{ matrix.profile }}-zip
|
sudo cp rebar3 /usr/local/bin/rebar3
|
||||||
|
rm -rf _build/${{ matrix.profile }}/lib
|
||||||
|
make ${{ matrix.profile }}-zip
|
||||||
- name: test
|
- name: test
|
||||||
run: |
|
run: |
|
||||||
cd source
|
cd source
|
||||||
|
@ -465,7 +468,7 @@ jobs:
|
||||||
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
|
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
|
||||||
-H "Accept: application/vnd.github.v3+json" \
|
-H "Accept: application/vnd.github.v3+json" \
|
||||||
-X POST \
|
-X POST \
|
||||||
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ee\": \"true\"}}" \
|
-d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ee\": \"true\"}}" \
|
||||||
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
|
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
|
||||||
- name: update repo.emqx.io
|
- name: update repo.emqx.io
|
||||||
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
|
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
|
||||||
|
@ -474,7 +477,7 @@ jobs:
|
||||||
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
|
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
|
||||||
-H "Accept: application/vnd.github.v3+json" \
|
-H "Accept: application/vnd.github.v3+json" \
|
||||||
-X POST \
|
-X POST \
|
||||||
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \
|
-d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \
|
||||||
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
|
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
|
||||||
- name: update homebrew packages
|
- name: update homebrew packages
|
||||||
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
|
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
|
||||||
|
@ -484,7 +487,7 @@ jobs:
|
||||||
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
|
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
|
||||||
-H "Accept: application/vnd.github.v3+json" \
|
-H "Accept: application/vnd.github.v3+json" \
|
||||||
-X POST \
|
-X POST \
|
||||||
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \
|
-d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \
|
||||||
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_homebrew.yaml/dispatches"
|
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_homebrew.yaml/dispatches"
|
||||||
fi
|
fi
|
||||||
- uses: geekyeggo/delete-artifact@v1
|
- uses: geekyeggo/delete-artifact@v1
|
||||||
|
|
|
@ -38,6 +38,11 @@ jobs:
|
||||||
run: make ${EMQX_NAME}-zip
|
run: make ${EMQX_NAME}-zip
|
||||||
- name: build deb/rpm packages
|
- name: build deb/rpm packages
|
||||||
run: make ${EMQX_NAME}-pkg
|
run: make ${EMQX_NAME}-pkg
|
||||||
|
- uses: actions/upload-artifact@v1
|
||||||
|
if: failure()
|
||||||
|
with:
|
||||||
|
name: rebar3.crashdump
|
||||||
|
path: ./rebar3.crashdump
|
||||||
- name: pakcages test
|
- name: pakcages test
|
||||||
run: |
|
run: |
|
||||||
export CODE_PATH=$GITHUB_WORKSPACE
|
export CODE_PATH=$GITHUB_WORKSPACE
|
||||||
|
@ -94,6 +99,11 @@ jobs:
|
||||||
make ensure-rebar3
|
make ensure-rebar3
|
||||||
sudo cp rebar3 /usr/local/bin/rebar3
|
sudo cp rebar3 /usr/local/bin/rebar3
|
||||||
make ${EMQX_NAME}-zip
|
make ${EMQX_NAME}-zip
|
||||||
|
- uses: actions/upload-artifact@v1
|
||||||
|
if: failure()
|
||||||
|
with:
|
||||||
|
name: rebar3.crashdump
|
||||||
|
path: ./rebar3.crashdump
|
||||||
- name: test
|
- name: test
|
||||||
run: |
|
run: |
|
||||||
pkg_name=$(basename _packages/${EMQX_NAME}/emqx-*.zip)
|
pkg_name=$(basename _packages/${EMQX_NAME}/emqx-*.zip)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_auth_http,
|
{application, emqx_auth_http,
|
||||||
[{description, "EMQ X Authentication/ACL with HTTP API"},
|
[{description, "EMQ X Authentication/ACL with HTTP API"},
|
||||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_auth_http_sup]},
|
{registered, [emqx_auth_http_sup]},
|
||||||
{applications, [kernel,stdlib,ehttpc]},
|
{applications, [kernel,stdlib,ehttpc]},
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
%% -*-: erlang -*-
|
||||||
|
|
||||||
|
{VSN,
|
||||||
|
[
|
||||||
|
{"4.3.0", [
|
||||||
|
{restart_application, emqx_auth_http}
|
||||||
|
]},
|
||||||
|
{<<".*">>, []}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{"4.3.0", [
|
||||||
|
{restart_application, emqx_auth_http}
|
||||||
|
]},
|
||||||
|
{<<".*">>, []}
|
||||||
|
]
|
||||||
|
}.
|
|
@ -54,10 +54,9 @@ translate_env(EnvName) ->
|
||||||
{ok, ConnectTimeout} = application:get_env(?APP, connect_timeout),
|
{ok, ConnectTimeout} = application:get_env(?APP, connect_timeout),
|
||||||
URL = proplists:get_value(url, Req),
|
URL = proplists:get_value(url, Req),
|
||||||
{ok, #{host := Host,
|
{ok, #{host := Host,
|
||||||
path := Path0,
|
|
||||||
port := Port,
|
port := Port,
|
||||||
scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
|
scheme := Scheme} = URIMap} = emqx_http_lib:uri_parse(URL),
|
||||||
Path = path(Path0),
|
Path = path(URIMap),
|
||||||
MoreOpts = case Scheme of
|
MoreOpts = case Scheme of
|
||||||
http ->
|
http ->
|
||||||
[{transport_opts, emqx_misc:ipv6_probe([])}];
|
[{transport_opts, emqx_misc:ipv6_probe([])}];
|
||||||
|
@ -151,8 +150,12 @@ ensure_content_type_header(Method, Headers)
|
||||||
ensure_content_type_header(_Method, Headers) ->
|
ensure_content_type_header(_Method, Headers) ->
|
||||||
lists:keydelete("content-type", 1, Headers).
|
lists:keydelete("content-type", 1, Headers).
|
||||||
|
|
||||||
path("") ->
|
path(#{path := "", 'query' := Query}) ->
|
||||||
|
"?" ++ Query;
|
||||||
|
path(#{path := Path, 'query' := Query}) ->
|
||||||
|
Path ++ "?" ++ Query;
|
||||||
|
path(#{path := ""}) ->
|
||||||
"/";
|
"/";
|
||||||
path(Path) ->
|
path(#{path := Path}) ->
|
||||||
Path.
|
Path.
|
||||||
|
|
||||||
|
|
|
@ -27,10 +27,6 @@
|
||||||
, description/0
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-import(proplists, [get_value/2]).
|
|
||||||
|
|
||||||
-import(emqx_auth_ldap_cli, [search/4]).
|
|
||||||
|
|
||||||
-spec(register_metrics() -> ok).
|
-spec(register_metrics() -> ok).
|
||||||
register_metrics() ->
|
register_metrics() ->
|
||||||
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
|
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
|
||||||
|
@ -70,14 +66,14 @@ do_check_acl(#{username := Username}, PubSub, Topic, _NoMatchAction,
|
||||||
|
|
||||||
BaseDN = emqx_auth_ldap:replace_vars(CustomBaseDN, ReplaceRules),
|
BaseDN = emqx_auth_ldap:replace_vars(CustomBaseDN, ReplaceRules),
|
||||||
|
|
||||||
case search(Pool, BaseDN, Filter, [Attribute, Attribute1]) of
|
case emqx_auth_ldap_cli:search(Pool, BaseDN, Filter, [Attribute, Attribute1]) of
|
||||||
{error, noSuchObject} ->
|
{error, noSuchObject} ->
|
||||||
ok;
|
ok;
|
||||||
{ok, #eldap_search_result{entries = []}} ->
|
{ok, #eldap_search_result{entries = []}} ->
|
||||||
ok;
|
ok;
|
||||||
{ok, #eldap_search_result{entries = [Entry]}} ->
|
{ok, #eldap_search_result{entries = [Entry]}} ->
|
||||||
Topics = get_value(Attribute, Entry#eldap_entry.attributes)
|
Topics = proplists:get_value(Attribute, Entry#eldap_entry.attributes, [])
|
||||||
++ get_value(Attribute1, Entry#eldap_entry.attributes),
|
++ proplists:get_value(Attribute1, Entry#eldap_entry.attributes, []),
|
||||||
match(Topic, Topics);
|
match(Topic, Topics);
|
||||||
Error ->
|
Error ->
|
||||||
?LOG(error, "[LDAP] search error:~p", [Error]),
|
?LOG(error, "[LDAP] search error:~p", [Error]),
|
||||||
|
@ -95,4 +91,3 @@ match(Topic, [Filter | Topics]) ->
|
||||||
|
|
||||||
description() ->
|
description() ->
|
||||||
"ACL with LDAP".
|
"ACL with LDAP".
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_auth_ldap,
|
{application, emqx_auth_ldap,
|
||||||
[{description, "EMQ X Authentication/ACL with LDAP"},
|
[{description, "EMQ X Authentication/ACL with LDAP"},
|
||||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_auth_ldap_sup]},
|
{registered, [emqx_auth_ldap_sup]},
|
||||||
{applications, [kernel,stdlib,eldap2,ecpool]},
|
{applications, [kernel,stdlib,eldap2,ecpool]},
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
%% -*- mode: erlang -*-
|
||||||
|
{VSN,
|
||||||
|
[{"4.3.0",
|
||||||
|
[{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]},
|
||||||
|
{<<".*">>,[]}],
|
||||||
|
[{"4.3.0",
|
||||||
|
[{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]},
|
||||||
|
{<<".*">>,[]}]}.
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_exhook,
|
{application, emqx_exhook,
|
||||||
[{description, "EMQ X Extension for Hook"},
|
[{description, "EMQ X Extension for Hook"},
|
||||||
{vsn, "4.3.1"},
|
{vsn, "4.3.3"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_exhook_app, []}},
|
{mod, {emqx_exhook_app, []}},
|
||||||
|
|
|
@ -1,14 +1,32 @@
|
||||||
%% -*-: erlang -*-
|
%% -*-: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
|
{"4.3.2", [
|
||||||
|
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{"4.3.1", [
|
||||||
|
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{"4.3.0", [
|
{"4.3.0", [
|
||||||
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}
|
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
|
{"4.3.2", [
|
||||||
|
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{"4.3.1", [
|
||||||
|
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{"4.3.0", [
|
{"4.3.0", [
|
||||||
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}
|
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
|
|
|
@ -88,7 +88,7 @@ init_hooks_cnter() ->
|
||||||
try
|
try
|
||||||
_ = ets:new(?CNTER, [named_table, public]), ok
|
_ = ets:new(?CNTER, [named_table, public]), ok
|
||||||
catch
|
catch
|
||||||
exit:badarg:_ ->
|
error:badarg:_ ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -122,7 +122,7 @@ channel_opts(Opts) ->
|
||||||
Scheme = proplists:get_value(scheme, Opts),
|
Scheme = proplists:get_value(scheme, Opts),
|
||||||
Host = proplists:get_value(host, Opts),
|
Host = proplists:get_value(host, Opts),
|
||||||
Port = proplists:get_value(port, Opts),
|
Port = proplists:get_value(port, Opts),
|
||||||
SvrAddr = lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])),
|
SvrAddr = format_http_uri(Scheme, Host, Port),
|
||||||
ClientOpts = case Scheme of
|
ClientOpts = case Scheme of
|
||||||
https ->
|
https ->
|
||||||
SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
|
SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
|
||||||
|
@ -133,6 +133,13 @@ channel_opts(Opts) ->
|
||||||
end,
|
end,
|
||||||
{SvrAddr, ClientOpts}.
|
{SvrAddr, ClientOpts}.
|
||||||
|
|
||||||
|
format_http_uri(Scheme, Host0, Port) ->
|
||||||
|
Host = case is_tuple(Host0) of
|
||||||
|
true -> inet:ntoa(Host0);
|
||||||
|
_ -> Host0
|
||||||
|
end,
|
||||||
|
lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])).
|
||||||
|
|
||||||
-spec unload(server()) -> ok.
|
-spec unload(server()) -> ok.
|
||||||
unload(#server{name = Name, hookspec = HookSpecs}) ->
|
unload(#server{name = Name, hookspec = HookSpecs}) ->
|
||||||
_ = do_deinit(Name),
|
_ = do_deinit(Name),
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
{deps,
|
{deps,
|
||||||
[{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.2"}}}
|
[{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.5"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{profiles,
|
{profiles,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application,emqx_lwm2m,
|
{application,emqx_lwm2m,
|
||||||
[{description,"EMQ X LwM2M Gateway"},
|
[{description,"EMQ X LwM2M Gateway"},
|
||||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||||
{modules,[]},
|
{modules,[]},
|
||||||
{registered,[emqx_lwm2m_sup]},
|
{registered,[emqx_lwm2m_sup]},
|
||||||
{applications,[kernel,stdlib,lwm2m_coap]},
|
{applications,[kernel,stdlib,lwm2m_coap]},
|
||||||
|
|
|
@ -1,15 +1,13 @@
|
||||||
%% -*-: erlang -*-
|
%% -*-: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
{"4.3.0", [
|
{<<"4.3.[0-1]">>, [
|
||||||
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
|
{restart_application, emqx_lwm2m}
|
||||||
]},
|
]}
|
||||||
{<<".*">>, []}
|
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
{"4.3.0", [
|
{<<"4.3.[0-1]">>, [
|
||||||
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
|
{restart_application, emqx_lwm2m}
|
||||||
]},
|
]}
|
||||||
{<<".*">>, []}
|
|
||||||
]
|
]
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -0,0 +1,162 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_lwm2m_api).
|
||||||
|
|
||||||
|
-import(minirest, [return/1]).
|
||||||
|
|
||||||
|
-rest_api(#{name => list,
|
||||||
|
method => 'GET',
|
||||||
|
path => "/lwm2m_channels/",
|
||||||
|
func => list,
|
||||||
|
descr => "A list of all lwm2m channel"
|
||||||
|
}).
|
||||||
|
|
||||||
|
-rest_api(#{name => list,
|
||||||
|
method => 'GET',
|
||||||
|
path => "/nodes/:atom:node/lwm2m_channels/",
|
||||||
|
func => list,
|
||||||
|
descr => "A list of lwm2m channel of a node"
|
||||||
|
}).
|
||||||
|
|
||||||
|
-rest_api(#{name => lookup_cmd,
|
||||||
|
method => 'GET',
|
||||||
|
path => "/lookup_cmd/:bin:ep/",
|
||||||
|
func => lookup_cmd,
|
||||||
|
descr => "Send a lwm2m downlink command"
|
||||||
|
}).
|
||||||
|
|
||||||
|
-rest_api(#{name => lookup_cmd,
|
||||||
|
method => 'GET',
|
||||||
|
path => "/nodes/:atom:node/lookup_cmd/:bin:ep/",
|
||||||
|
func => lookup_cmd,
|
||||||
|
descr => "Send a lwm2m downlink command of a node"
|
||||||
|
}).
|
||||||
|
|
||||||
|
-export([ list/2
|
||||||
|
, lookup_cmd/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
list(#{node := Node }, Params) ->
|
||||||
|
case Node = node() of
|
||||||
|
true -> list(#{}, Params);
|
||||||
|
_ -> rpc_call(Node, list, [#{}, Params])
|
||||||
|
end;
|
||||||
|
|
||||||
|
list(#{}, _Params) ->
|
||||||
|
Channels = emqx_lwm2m_cm:all_channels(),
|
||||||
|
return({ok, format(Channels)}).
|
||||||
|
|
||||||
|
lookup_cmd(#{ep := Ep, node := Node}, Params) ->
|
||||||
|
case Node = node() of
|
||||||
|
true -> lookup_cmd(#{ep => Ep}, Params);
|
||||||
|
_ -> rpc_call(Node, lookup_cmd, [#{ep => Ep}, Params])
|
||||||
|
end;
|
||||||
|
|
||||||
|
lookup_cmd(#{ep := Ep}, Params) ->
|
||||||
|
MsgType = proplists:get_value(<<"msgType">>, Params),
|
||||||
|
Path0 = proplists:get_value(<<"path">>, Params),
|
||||||
|
case emqx_lwm2m_cm:lookup_cmd(Ep, Path0, MsgType) of
|
||||||
|
[] -> return({ok, []});
|
||||||
|
[{_, undefined} | _] -> return({ok, []});
|
||||||
|
[{{IMEI, Path, MsgType}, undefined}] ->
|
||||||
|
return({ok, [{imei, IMEI},
|
||||||
|
{'msgType', IMEI},
|
||||||
|
{'code', <<"6.01">>},
|
||||||
|
{'codeMsg', <<"reply_not_received">>},
|
||||||
|
{'path', Path}]});
|
||||||
|
[{{IMEI, Path, MsgType}, {Code, CodeMsg, Content}}] ->
|
||||||
|
Payload1 = format_cmd_content(Content, MsgType),
|
||||||
|
return({ok, [{imei, IMEI},
|
||||||
|
{'msgType', IMEI},
|
||||||
|
{'code', Code},
|
||||||
|
{'codeMsg', CodeMsg},
|
||||||
|
{'path', Path}] ++ Payload1})
|
||||||
|
end.
|
||||||
|
|
||||||
|
rpc_call(Node, Fun, Args) ->
|
||||||
|
case rpc:call(Node, ?MODULE, Fun, Args) of
|
||||||
|
{badrpc, Reason} -> {error, Reason};
|
||||||
|
Res -> Res
|
||||||
|
end.
|
||||||
|
|
||||||
|
format(Channels) ->
|
||||||
|
lists:map(fun({IMEI, #{lifetime := LifeTime,
|
||||||
|
peername := Peername,
|
||||||
|
version := Version,
|
||||||
|
reg_info := RegInfo}}) ->
|
||||||
|
ObjectList = lists:map(fun(Path) ->
|
||||||
|
[ObjId | _] = path_list(Path),
|
||||||
|
case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
|
||||||
|
{error, _} ->
|
||||||
|
{Path, Path};
|
||||||
|
ObjDefinition ->
|
||||||
|
ObjectName = emqx_lwm2m_xml_object:get_object_name(ObjDefinition),
|
||||||
|
{Path, list_to_binary(ObjectName)}
|
||||||
|
end
|
||||||
|
end, maps:get(<<"objectList">>, RegInfo)),
|
||||||
|
{IpAddr, Port} = Peername,
|
||||||
|
[{imei, IMEI},
|
||||||
|
{lifetime, LifeTime},
|
||||||
|
{ip_address, iolist_to_binary(ntoa(IpAddr))},
|
||||||
|
{port, Port},
|
||||||
|
{version, Version},
|
||||||
|
{'objectList', ObjectList}]
|
||||||
|
end, Channels).
|
||||||
|
|
||||||
|
format_cmd_content(undefined, _MsgType) -> [];
|
||||||
|
format_cmd_content(Content, <<"discover">>) ->
|
||||||
|
[H | Content1] = Content,
|
||||||
|
{_, [HObjId]} = emqx_lwm2m_coap_resource:parse_object_list(H),
|
||||||
|
[ObjId | _]= path_list(HObjId),
|
||||||
|
ObjectList = case Content1 of
|
||||||
|
[Content2 | _] ->
|
||||||
|
{_, ObjL} = emqx_lwm2m_coap_resource:parse_object_list(Content2),
|
||||||
|
ObjL;
|
||||||
|
[] -> []
|
||||||
|
end,
|
||||||
|
R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
|
||||||
|
{error, _} ->
|
||||||
|
lists:map(fun(Object) -> {Object, Object} end, ObjectList);
|
||||||
|
ObjDefinition ->
|
||||||
|
lists:map(fun(Object) ->
|
||||||
|
[_, _, ResId| _] = path_list(Object),
|
||||||
|
Operations = case emqx_lwm2m_xml_object:get_resource_operations(binary_to_integer(ResId), ObjDefinition) of
|
||||||
|
"E" -> [{operations, list_to_binary("E")}];
|
||||||
|
Oper -> [{'dataType', list_to_binary(emqx_lwm2m_xml_object:get_resource_type(binary_to_integer(ResId), ObjDefinition))},
|
||||||
|
{operations, list_to_binary(Oper)}]
|
||||||
|
end,
|
||||||
|
[{path, Object},
|
||||||
|
{name, list_to_binary(emqx_lwm2m_xml_object:get_resource_name(binary_to_integer(ResId), ObjDefinition))}
|
||||||
|
] ++ Operations
|
||||||
|
end, ObjectList)
|
||||||
|
end,
|
||||||
|
[{content, R}];
|
||||||
|
format_cmd_content(Content, _) ->
|
||||||
|
[{content, Content}].
|
||||||
|
|
||||||
|
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
|
||||||
|
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
|
||||||
|
ntoa(IP) ->
|
||||||
|
inet_parse:ntoa(IP).
|
||||||
|
|
||||||
|
path_list(Path) ->
|
||||||
|
case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of
|
||||||
|
[ObjId, ObjInsId, ResId, ResInstId] -> [ObjId, ObjInsId, ResId, ResInstId];
|
||||||
|
[ObjId, ObjInsId, ResId] -> [ObjId, ObjInsId, ResId];
|
||||||
|
[ObjId, ObjInsId] -> [ObjId, ObjInsId];
|
||||||
|
[ObjId] -> [ObjId]
|
||||||
|
end.
|
|
@ -0,0 +1,153 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_lwm2m_cm).
|
||||||
|
|
||||||
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
-export([ register_channel/5
|
||||||
|
, update_reg_info/2
|
||||||
|
, unregister_channel/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ lookup_channel/1
|
||||||
|
, all_channels/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ register_cmd/3
|
||||||
|
, register_cmd/4
|
||||||
|
, lookup_cmd/3
|
||||||
|
, lookup_cmd_by_imei/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([ init/1
|
||||||
|
, handle_call/3
|
||||||
|
, handle_cast/2
|
||||||
|
, handle_info/2
|
||||||
|
, terminate/2
|
||||||
|
, code_change/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(LOG(Level, Format, Args), logger:Level("LWM2M-CM: " ++ Format, Args)).
|
||||||
|
|
||||||
|
%% Server name
|
||||||
|
-define(CM, ?MODULE).
|
||||||
|
|
||||||
|
-define(LWM2M_CHANNEL_TAB, emqx_lwm2m_channel).
|
||||||
|
-define(LWM2M_CMD_TAB, emqx_lwm2m_cmd).
|
||||||
|
|
||||||
|
%% Batch drain
|
||||||
|
-define(BATCH_SIZE, 100000).
|
||||||
|
|
||||||
|
%% @doc Start the channel manager.
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?CM}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
register_channel(IMEI, RegInfo, LifeTime, Ver, Peername) ->
|
||||||
|
Info = #{
|
||||||
|
reg_info => RegInfo,
|
||||||
|
lifetime => LifeTime,
|
||||||
|
version => Ver,
|
||||||
|
peername => Peername
|
||||||
|
},
|
||||||
|
true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, Info}),
|
||||||
|
cast({registered, {IMEI, self()}}).
|
||||||
|
|
||||||
|
update_reg_info(IMEI, RegInfo) ->
|
||||||
|
case lookup_channel(IMEI) of
|
||||||
|
[{_, RegInfo0}] ->
|
||||||
|
true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, RegInfo0#{reg_info => RegInfo}}),
|
||||||
|
ok;
|
||||||
|
[] ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
unregister_channel(IMEI) when is_binary(IMEI) ->
|
||||||
|
true = ets:delete(?LWM2M_CHANNEL_TAB, IMEI),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
lookup_channel(IMEI) ->
|
||||||
|
ets:lookup(?LWM2M_CHANNEL_TAB, IMEI).
|
||||||
|
|
||||||
|
all_channels() ->
|
||||||
|
ets:tab2list(?LWM2M_CHANNEL_TAB).
|
||||||
|
|
||||||
|
register_cmd(IMEI, Path, Type) ->
|
||||||
|
true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, undefined}).
|
||||||
|
|
||||||
|
register_cmd(_IMEI, undefined, _Type, _Result) ->
|
||||||
|
ok;
|
||||||
|
register_cmd(IMEI, Path, Type, Result) ->
|
||||||
|
true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, Result}).
|
||||||
|
|
||||||
|
lookup_cmd(IMEI, Path, Type) ->
|
||||||
|
ets:lookup(?LWM2M_CMD_TAB, {IMEI, Path, Type}).
|
||||||
|
|
||||||
|
lookup_cmd_by_imei(IMEI) ->
|
||||||
|
ets:select(?LWM2M_CHANNEL_TAB, [{{{IMEI, '_', '_'}, '$1'}, [], ['$_']}]).
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
cast(Msg) -> gen_server:cast(?CM, Msg).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% gen_server callbacks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
TabOpts = [public, {write_concurrency, true}, {read_concurrency, true}],
|
||||||
|
ok = emqx_tables:new(?LWM2M_CHANNEL_TAB, [set, compressed | TabOpts]),
|
||||||
|
ok = emqx_tables:new(?LWM2M_CMD_TAB, [set, compressed | TabOpts]),
|
||||||
|
{ok, #{chan_pmon => emqx_pmon:new()}}.
|
||||||
|
|
||||||
|
handle_call(Req, _From, State) ->
|
||||||
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||||
|
{reply, ignored, State}.
|
||||||
|
|
||||||
|
handle_cast({registered, {IMEI, ChanPid}}, State = #{chan_pmon := PMon}) ->
|
||||||
|
PMon1 = emqx_pmon:monitor(ChanPid, IMEI, PMon),
|
||||||
|
{noreply, State#{chan_pmon := PMon1}};
|
||||||
|
|
||||||
|
handle_cast(Msg, State) ->
|
||||||
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
||||||
|
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
|
||||||
|
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
||||||
|
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
|
||||||
|
{noreply, State#{chan_pmon := PMon1}};
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, _State) ->
|
||||||
|
emqx_stats:cancel_update(chan_stats).
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
clean_down({_ChanPid, IMEI}) ->
|
||||||
|
unregister_channel(IMEI).
|
|
@ -0,0 +1,41 @@
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_lwm2m_cm_sup).
|
||||||
|
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
CM = #{id => emqx_lwm2m_cm,
|
||||||
|
start => {emqx_lwm2m_cm, start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => 5000,
|
||||||
|
type => worker,
|
||||||
|
modules => [emqx_lwm2m_cm]},
|
||||||
|
SupFlags = #{strategy => one_for_one,
|
||||||
|
intensity => 100,
|
||||||
|
period => 10
|
||||||
|
},
|
||||||
|
{ok, {SupFlags, [CM]}}.
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
-export([ mqtt2coap/2
|
-export([ mqtt2coap/2
|
||||||
, coap2mqtt/4
|
, coap2mqtt/4
|
||||||
, ack2mqtt/1
|
, ack2mqtt/1
|
||||||
|
, extract_path/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([path_list/1]).
|
-export([path_list/1]).
|
||||||
|
|
|
@ -48,11 +48,11 @@
|
||||||
-define(LOG(Level, Format, Args), logger:Level("LWM2M-RESOURCE: " ++ Format, Args)).
|
-define(LOG(Level, Format, Args), logger:Level("LWM2M-RESOURCE: " ++ Format, Args)).
|
||||||
|
|
||||||
-dialyzer([{nowarn_function, [coap_discover/2]}]).
|
-dialyzer([{nowarn_function, [coap_discover/2]}]).
|
||||||
% we use {'absolute', string(), [{atom(), binary()}]} as coap_uri()
|
% we use {'absolute', list(binary()), [{atom(), binary()}]} as coap_uri()
|
||||||
% https://github.com/emqx/lwm2m-coap/blob/258e9bd3762124395e83c1e68a1583b84718230f/src/lwm2m_coap_resource.erl#L61
|
% https://github.com/emqx/lwm2m-coap/blob/258e9bd3762124395e83c1e68a1583b84718230f/src/lwm2m_coap_resource.erl#L61
|
||||||
% resource operations
|
% resource operations
|
||||||
coap_discover(_Prefix, _Args) ->
|
coap_discover(_Prefix, _Args) ->
|
||||||
[{absolute, "mqtt", []}].
|
[{absolute, [<<"mqtt">>], []}].
|
||||||
|
|
||||||
coap_get(ChId, [?PREFIX], Query, Content, Lwm2mState) ->
|
coap_get(ChId, [?PREFIX], Query, Content, Lwm2mState) ->
|
||||||
?LOG(debug, "~p ~p GET Query=~p, Content=~p", [self(),ChId, Query, Content]),
|
?LOG(debug, "~p ~p GET Query=~p, Content=~p", [self(),ChId, Query, Content]),
|
||||||
|
|
|
@ -197,7 +197,10 @@ value_ex(K, Value) when K =:= <<"Integer">>; K =:= <<"Float">>; K =:= <<"Time">>
|
||||||
value_ex(K, Value) when K =:= <<"String">> ->
|
value_ex(K, Value) when K =:= <<"String">> ->
|
||||||
Value;
|
Value;
|
||||||
value_ex(K, Value) when K =:= <<"Opaque">> ->
|
value_ex(K, Value) when K =:= <<"Opaque">> ->
|
||||||
Value;
|
%% XXX: force to decode it with base64
|
||||||
|
%% This may not be a good implementation, but it is
|
||||||
|
%% consistent with the treatment of Opaque in value/3
|
||||||
|
base64:decode(Value);
|
||||||
value_ex(K, <<"true">>) when K =:= <<"Boolean">> -> <<1>>;
|
value_ex(K, <<"true">>) when K =:= <<"Boolean">> -> <<1>>;
|
||||||
value_ex(K, <<"false">>) when K =:= <<"Boolean">> -> <<0>>;
|
value_ex(K, <<"false">>) when K =:= <<"Boolean">> -> <<0>>;
|
||||||
|
|
||||||
|
|
|
@ -103,6 +103,7 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
|
||||||
emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
|
emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
|
||||||
end),
|
end),
|
||||||
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
|
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
|
||||||
|
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
|
||||||
|
|
||||||
{ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
|
{ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
|
@ -120,10 +121,8 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
|
||||||
_ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
|
_ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
|
||||||
Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
|
Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
|
||||||
|
|
||||||
update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
|
update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
|
||||||
life_timer = LifeTimer, register_info = RegInfo,
|
coap_pid = CoapPid, endpoint_name = Epn}) ->
|
||||||
coap_pid = CoapPid}) ->
|
|
||||||
|
|
||||||
UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
|
UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
|
||||||
|
|
||||||
_ = case proplists:get_value(update_msg_publish_condition,
|
_ = case proplists:get_value(update_msg_publish_condition,
|
||||||
|
@ -134,6 +133,7 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
|
||||||
%% - report the registration info update, but only when objectList is updated.
|
%% - report the registration info update, but only when objectList is updated.
|
||||||
case NewRegInfo of
|
case NewRegInfo of
|
||||||
#{<<"objectList">> := _} ->
|
#{<<"objectList">> := _} ->
|
||||||
|
emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
|
||||||
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
|
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end
|
end
|
||||||
|
@ -151,7 +151,8 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
|
||||||
register_info = UpdatedRegInfo}.
|
register_info = UpdatedRegInfo}.
|
||||||
|
|
||||||
replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
|
replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
|
||||||
coap_pid = CoapPid}) ->
|
coap_pid = CoapPid,
|
||||||
|
endpoint_name = EndpointName}) ->
|
||||||
_ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
|
_ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
|
||||||
|
|
||||||
%% - flush cached donwlink commands
|
%% - flush cached donwlink commands
|
||||||
|
@ -161,7 +162,7 @@ replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
|
||||||
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
|
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
|
||||||
maps:get(<<"lt">>, NewRegInfo), LifeTimer),
|
maps:get(<<"lt">>, NewRegInfo), LifeTimer),
|
||||||
|
|
||||||
_ = send_auto_observe(CoapPid, NewRegInfo),
|
_ = send_auto_observe(CoapPid, NewRegInfo, EndpointName),
|
||||||
|
|
||||||
?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
|
?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
|
||||||
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
|
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
|
||||||
|
@ -174,15 +175,20 @@ send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) ->
|
||||||
Lwm2mState.
|
Lwm2mState.
|
||||||
|
|
||||||
auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
|
auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
|
||||||
coap_pid = CoapPid}) ->
|
coap_pid = CoapPid,
|
||||||
_ = send_auto_observe(CoapPid, RegInfo),
|
endpoint_name = EndpointName}) ->
|
||||||
|
_ = send_auto_observe(CoapPid, RegInfo, EndpointName),
|
||||||
Lwm2mState.
|
Lwm2mState.
|
||||||
|
|
||||||
deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) ->
|
deliver(#message{topic = Topic, payload = Payload},
|
||||||
|
Lwm2mState = #lwm2m_state{coap_pid = CoapPid,
|
||||||
|
register_info = RegInfo,
|
||||||
|
started_at = StartedAt,
|
||||||
|
endpoint_name = EndpointName}) ->
|
||||||
IsCacheMode = is_cache_mode(RegInfo, StartedAt),
|
IsCacheMode = is_cache_mode(RegInfo, StartedAt),
|
||||||
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
|
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
|
||||||
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
||||||
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode),
|
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
|
||||||
Lwm2mState.
|
Lwm2mState.
|
||||||
|
|
||||||
get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _},
|
get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _},
|
||||||
|
@ -238,20 +244,21 @@ time_now() -> erlang:system_time(millisecond).
|
||||||
%% Deliver downlink message to coap
|
%% Deliver downlink message to coap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode) when is_binary(JsonData)->
|
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
|
||||||
try
|
try
|
||||||
TermData = emqx_json:decode(JsonData, [return_maps]),
|
TermData = emqx_json:decode(JsonData, [return_maps]),
|
||||||
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode)
|
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
|
||||||
catch
|
catch
|
||||||
C:R:Stack ->
|
C:R:Stack ->
|
||||||
?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p",
|
?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p",
|
||||||
[JsonData, {C, R}, Stack])
|
[JsonData, {C, R}, Stack])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermData) ->
|
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when is_map(TermData) ->
|
||||||
?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]),
|
?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]),
|
||||||
{CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData),
|
{CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData),
|
||||||
|
MsgType = maps:get(<<"msgType">>, Ref),
|
||||||
|
emqx_lwm2m_cm:register_cmd(EndpointName, emqx_lwm2m_cmd_handler:extract_path(Ref), MsgType),
|
||||||
case CacheMode of
|
case CacheMode of
|
||||||
false ->
|
false ->
|
||||||
do_deliver_to_coap(CoapPid, CoapRequest, Ref);
|
do_deliver_to_coap(CoapPid, CoapRequest, Ref);
|
||||||
|
@ -266,7 +273,12 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermDat
|
||||||
send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
|
send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
|
||||||
do_send_to_broker(EventType, Payload, Lwm2mState).
|
do_send_to_broker(EventType, Payload, Lwm2mState).
|
||||||
|
|
||||||
do_send_to_broker(EventType, Payload, Lwm2mState) ->
|
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
|
||||||
|
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
|
||||||
|
Code = maps:get(<<"code">>, Data, undefined),
|
||||||
|
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
|
||||||
|
Content = maps:get(<<"content">>, Data, undefined),
|
||||||
|
emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
|
||||||
NewPayload = maps:put(<<"msgType">>, EventType, Payload),
|
NewPayload = maps:put(<<"msgType">>, EventType, Payload),
|
||||||
Topic = uplink_topic(EventType, Lwm2mState),
|
Topic = uplink_topic(EventType, Lwm2mState),
|
||||||
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
|
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
|
||||||
|
@ -281,7 +293,7 @@ auto_observe_object_list(Expected, Registered) ->
|
||||||
Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected),
|
Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected),
|
||||||
lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered).
|
lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered).
|
||||||
|
|
||||||
send_auto_observe(CoapPid, RegInfo) ->
|
send_auto_observe(CoapPid, RegInfo, EndpointName) ->
|
||||||
%% - auto observe the objects
|
%% - auto observe the objects
|
||||||
case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of
|
case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of
|
||||||
false ->
|
false ->
|
||||||
|
@ -292,25 +304,37 @@ send_auto_observe(CoapPid, RegInfo) ->
|
||||||
maps:get(<<"objectList">>, RegInfo, [])
|
maps:get(<<"objectList">>, RegInfo, [])
|
||||||
),
|
),
|
||||||
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
||||||
auto_observe(AlternatePath, Objectlists, CoapPid)
|
auto_observe(AlternatePath, Objectlists, CoapPid, EndpointName)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
auto_observe(AlternatePath, ObjectList, CoapPid) ->
|
auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
|
||||||
?LOG(info, "Auto Observe on: ~p", [ObjectList]),
|
?LOG(info, "Auto Observe on: ~p", [ObjectList]),
|
||||||
erlang:spawn(fun() ->
|
erlang:spawn(fun() ->
|
||||||
observe_object_list(AlternatePath, ObjectList, CoapPid)
|
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName)
|
||||||
end).
|
end).
|
||||||
|
|
||||||
observe_object_list(AlternatePath, ObjectList, CoapPid) ->
|
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
|
||||||
lists:foreach(fun(ObjectPath) ->
|
lists:foreach(fun(ObjectPath) ->
|
||||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100)
|
[ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
|
||||||
|
case ObjId of
|
||||||
|
<<"19">> ->
|
||||||
|
[ObjInsId | _LastPath1] = LastPath,
|
||||||
|
case ObjInsId of
|
||||||
|
<<"0">> ->
|
||||||
|
observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
|
||||||
|
_ ->
|
||||||
|
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
|
||||||
|
end
|
||||||
end, ObjectList).
|
end, ObjectList).
|
||||||
|
|
||||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval) ->
|
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval, EndpointName) ->
|
||||||
observe_object(AlternatePath, ObjectPath, CoapPid),
|
observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName),
|
||||||
timer:sleep(Interval).
|
timer:sleep(Interval).
|
||||||
|
|
||||||
observe_object(AlternatePath, ObjectPath, CoapPid) ->
|
observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName) ->
|
||||||
Payload = #{
|
Payload = #{
|
||||||
<<"msgType">> => <<"observe">>,
|
<<"msgType">> => <<"observe">>,
|
||||||
<<"data">> => #{
|
<<"data">> => #{
|
||||||
|
@ -318,7 +342,7 @@ observe_object(AlternatePath, ObjectPath, CoapPid) ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]),
|
?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]),
|
||||||
deliver_to_coap(AlternatePath, Payload, CoapPid, false).
|
deliver_to_coap(AlternatePath, Payload, CoapPid, false, EndpointName).
|
||||||
|
|
||||||
do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
|
do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
|
||||||
erlang:spawn(fun() ->
|
erlang:spawn(fun() ->
|
||||||
|
|
|
@ -29,4 +29,11 @@ start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
init(_Args) ->
|
init(_Args) ->
|
||||||
{ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db)] }}.
|
CmSup = #{id => emqx_lwm2m_cm_sup,
|
||||||
|
start => {emqx_lwm2m_cm_sup, start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => infinity,
|
||||||
|
type => supervisor,
|
||||||
|
modules => [emqx_lwm2m_cm_sup]
|
||||||
|
},
|
||||||
|
{ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db), CmSup] }}.
|
||||||
|
|
|
@ -21,9 +21,11 @@
|
||||||
|
|
||||||
-export([ get_obj_def/2
|
-export([ get_obj_def/2
|
||||||
, get_object_id/1
|
, get_object_id/1
|
||||||
|
, get_object_name/1
|
||||||
, get_object_and_resource_id/2
|
, get_object_and_resource_id/2
|
||||||
, get_resource_type/2
|
, get_resource_type/2
|
||||||
, get_resource_name/2
|
, get_resource_name/2
|
||||||
|
, get_resource_operations/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(LOG(Level, Format, Args),
|
-define(LOG(Level, Format, Args),
|
||||||
|
@ -42,6 +44,10 @@ get_object_id(ObjDefinition) ->
|
||||||
[#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
|
[#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
|
||||||
ObjectId.
|
ObjectId.
|
||||||
|
|
||||||
|
get_object_name(ObjDefinition) ->
|
||||||
|
[#xmlText{value=ObjectName}] = xmerl_xpath:string("Name/text()", ObjDefinition),
|
||||||
|
ObjectName.
|
||||||
|
|
||||||
|
|
||||||
get_object_and_resource_id(ResourceNameBinary, ObjDefinition) ->
|
get_object_and_resource_id(ResourceNameBinary, ObjDefinition) ->
|
||||||
ResourceNameString = binary_to_list(ResourceNameBinary),
|
ResourceNameString = binary_to_list(ResourceNameBinary),
|
||||||
|
@ -60,3 +66,8 @@ get_resource_name(ResourceIdInt, ObjDefinition) ->
|
||||||
ResourceIdString = integer_to_list(ResourceIdInt),
|
ResourceIdString = integer_to_list(ResourceIdInt),
|
||||||
[#xmlText{value=Name}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Name/text()", ObjDefinition),
|
[#xmlText{value=Name}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Name/text()", ObjDefinition),
|
||||||
Name.
|
Name.
|
||||||
|
|
||||||
|
get_resource_operations(ResourceIdInt, ObjDefinition) ->
|
||||||
|
ResourceIdString = integer_to_list(ResourceIdInt),
|
||||||
|
[#xmlText{value=Operations}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Operations/text()", ObjDefinition),
|
||||||
|
Operations.
|
||||||
|
|
|
@ -58,7 +58,7 @@ find_objectid(ObjectId) ->
|
||||||
false -> ObjectId
|
false -> ObjectId
|
||||||
end,
|
end,
|
||||||
case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of
|
case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of
|
||||||
[] -> error(no_xml_definition);
|
[] -> {error, no_xml_definition};
|
||||||
[{ObjectId, Xml}] -> Xml
|
[{ObjectId, Xml}] -> Xml
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -121,8 +121,10 @@ load(BaseDir) ->
|
||||||
true -> BaseDir++"*.xml";
|
true -> BaseDir++"*.xml";
|
||||||
false -> BaseDir++"/*.xml"
|
false -> BaseDir++"/*.xml"
|
||||||
end,
|
end,
|
||||||
AllXmlFiles = filelib:wildcard(Wild),
|
case filelib:wildcard(Wild) of
|
||||||
load_loop(AllXmlFiles).
|
[] -> error(no_xml_files_found, BaseDir);
|
||||||
|
AllXmlFiles -> load_loop(AllXmlFiles)
|
||||||
|
end.
|
||||||
|
|
||||||
load_loop([]) ->
|
load_loop([]) ->
|
||||||
ok;
|
ok;
|
||||||
|
|
|
@ -40,6 +40,7 @@ all() ->
|
||||||
, {group, test_grp_4_discover}
|
, {group, test_grp_4_discover}
|
||||||
, {group, test_grp_5_write_attr}
|
, {group, test_grp_5_write_attr}
|
||||||
, {group, test_grp_6_observe}
|
, {group, test_grp_6_observe}
|
||||||
|
, {group, test_grp_8_object_19}
|
||||||
].
|
].
|
||||||
|
|
||||||
suite() -> [{timetrap, {seconds, 90}}].
|
suite() -> [{timetrap, {seconds, 90}}].
|
||||||
|
@ -98,9 +99,9 @@ groups() ->
|
||||||
]},
|
]},
|
||||||
{test_grp_8_object_19, [RepeatOpt], [
|
{test_grp_8_object_19, [RepeatOpt], [
|
||||||
case80_specail_object_19_1_0_write,
|
case80_specail_object_19_1_0_write,
|
||||||
case80_specail_object_19_0_0_notify,
|
case80_specail_object_19_0_0_notify
|
||||||
case80_specail_object_19_0_0_response,
|
%case80_specail_object_19_0_0_response,
|
||||||
case80_normal_object_19_0_0_read
|
%case80_normal_object_19_0_0_read
|
||||||
]},
|
]},
|
||||||
{test_grp_9_psm_queue_mode, [RepeatOpt], [
|
{test_grp_9_psm_queue_mode, [RepeatOpt], [
|
||||||
case90_psm_mode,
|
case90_psm_mode,
|
||||||
|
@ -1655,6 +1656,7 @@ case80_specail_object_19_1_0_write(Config) ->
|
||||||
<<"value">> => base64:encode(<<12345:32>>)
|
<<"value">> => base64:encode(<<12345:32>>)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
CommandJson = emqx_json:encode(Command),
|
CommandJson = emqx_json:encode(Command),
|
||||||
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
|
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
|
||||||
timer:sleep(50),
|
timer:sleep(50),
|
||||||
|
@ -1663,7 +1665,7 @@ case80_specail_object_19_1_0_write(Config) ->
|
||||||
Path2 = get_coap_path(Options2),
|
Path2 = get_coap_path(Options2),
|
||||||
?assertEqual(put, Method2),
|
?assertEqual(put, Method2),
|
||||||
?assertEqual(<<"/19/1/0">>, Path2),
|
?assertEqual(<<"/19/1/0">>, Path2),
|
||||||
?assertEqual(<<12345:32>>, Payload2),
|
?assertEqual(<<3:2, 0:1, 0:2, 4:3, 0, 12345:32>>, Payload2),
|
||||||
timer:sleep(50),
|
timer:sleep(50),
|
||||||
|
|
||||||
test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, changed}, #coap_content{}, Request2, true),
|
test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, changed}, #coap_content{}, Request2, true),
|
||||||
|
@ -1672,6 +1674,7 @@ case80_specail_object_19_1_0_write(Config) ->
|
||||||
ReadResult = emqx_json:encode(#{
|
ReadResult = emqx_json:encode(#{
|
||||||
<<"requestID">> => CmdId, <<"cacheID">> => CmdId,
|
<<"requestID">> => CmdId, <<"cacheID">> => CmdId,
|
||||||
<<"data">> => #{
|
<<"data">> => #{
|
||||||
|
<<"reqPath">> => <<"/19/1/0">>,
|
||||||
<<"code">> => <<"2.04">>,
|
<<"code">> => <<"2.04">>,
|
||||||
<<"codeMsg">> => <<"changed">>
|
<<"codeMsg">> => <<"changed">>
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_management,
|
{application, emqx_management,
|
||||||
[{description, "EMQ X Management API and CLI"},
|
[{description, "EMQ X Management API and CLI"},
|
||||||
{vsn, "4.3.3"}, % strict semver, bump manually!
|
{vsn, "4.3.4"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_management_sup]},
|
{registered, [emqx_management_sup]},
|
||||||
{applications, [kernel,stdlib,minirest]},
|
{applications, [kernel,stdlib,minirest]},
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[ {<<"4.3.[0-2]">>,
|
[ {<<"4.3.[0-3]">>,
|
||||||
[ {restart_application, emqx_management}
|
[ {restart_application, emqx_management}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[ {<<"4.3.[0-2]">>,
|
[ {<<"4.3.[0-3]">>,
|
||||||
[ {restart_application, emqx_management}
|
[ {restart_application, emqx_management}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
|
|
|
@ -616,8 +616,18 @@ dump(_Table, _, '$end_of_table', Result) ->
|
||||||
lists:reverse(Result);
|
lists:reverse(Result);
|
||||||
|
|
||||||
dump(Table, Tag, Key, Result) ->
|
dump(Table, Tag, Key, Result) ->
|
||||||
PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)],
|
Ls = lists:foldl(fun(Record, Acc) ->
|
||||||
dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]).
|
try
|
||||||
|
[print({Tag, Record}) | Acc]
|
||||||
|
catch
|
||||||
|
Class : Reason : Stk ->
|
||||||
|
logger:error("Failed to print ~p, error: {~p, ~p}. "
|
||||||
|
"Stacktrace: ~0p",
|
||||||
|
[Record, Class, Reason, Stk]),
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end, [], ets:lookup(Table, Key)),
|
||||||
|
dump(Table, Tag, ets:next(Table, Key), [lists:reverse(Ls) | Result]).
|
||||||
|
|
||||||
print({_, []}) ->
|
print({_, []}) ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -634,7 +644,7 @@ print({client, {ClientId, ChanPid}}) ->
|
||||||
ClientInfo = maps:get(clientinfo, Attrs, #{}),
|
ClientInfo = maps:get(clientinfo, Attrs, #{}),
|
||||||
ConnInfo = maps:get(conninfo, Attrs, #{}),
|
ConnInfo = maps:get(conninfo, Attrs, #{}),
|
||||||
Session = maps:get(session, Attrs, #{}),
|
Session = maps:get(session, Attrs, #{}),
|
||||||
Connected = case maps:get(conn_state, Attrs) of
|
Connected = case maps:get(conn_state, Attrs, undefined) of
|
||||||
connected -> true;
|
connected -> true;
|
||||||
_ -> false
|
_ -> false
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_rule_engine,
|
{application, emqx_rule_engine,
|
||||||
[{description, "EMQ X Rule Engine"},
|
[{description, "EMQ X Rule Engine"},
|
||||||
{vsn, "4.3.2"}, % strict semver, bump manually!
|
{vsn, "4.3.3"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
|
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
|
||||||
{applications, [kernel,stdlib,rulesql,getopt]},
|
{applications, [kernel,stdlib,rulesql,getopt]},
|
||||||
|
|
|
@ -1,21 +1,37 @@
|
||||||
%% -*-: erlang -*-
|
%% -*-: erlang -*-
|
||||||
{"4.3.2",
|
{"4.3.3",
|
||||||
[ {"4.3.0",
|
[ {"4.3.0",
|
||||||
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []},
|
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
|
||||||
{load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
||||||
|
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||||
|
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||||
]},
|
]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
||||||
|
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||||
|
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||||
|
]},
|
||||||
|
{"4.3.2",
|
||||||
|
[ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||||
|
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []},
|
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
|
||||||
{load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
||||||
|
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||||
|
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||||
]},
|
]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
||||||
|
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||||
|
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||||
|
]},
|
||||||
|
{"4.3.2",
|
||||||
|
[ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||||
|
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
|
|
|
@ -93,13 +93,6 @@
|
||||||
|
|
||||||
-define(REGISTRY, ?MODULE).
|
-define(REGISTRY, ?MODULE).
|
||||||
|
|
||||||
%% Statistics
|
|
||||||
-define(STATS,
|
|
||||||
[ {?RULE_TAB, 'rules.count', 'rules.max'}
|
|
||||||
, {?ACTION_TAB, 'actions.count', 'actions.max'}
|
|
||||||
, {?RES_TAB, 'resources.count', 'resources.max'}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(T_CALL, 10000).
|
-define(T_CALL, 10000).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -392,8 +385,11 @@ find_rules_depends_on_resource(ResId) ->
|
||||||
end, [], get_rules()).
|
end, [], get_rules()).
|
||||||
|
|
||||||
search_action_despends_on_resource(ResId, Actions) ->
|
search_action_despends_on_resource(ResId, Actions) ->
|
||||||
lists:search(fun(#action_instance{args = #{<<"$resource">> := ResId0}}) ->
|
lists:search(fun
|
||||||
ResId0 =:= ResId
|
(#action_instance{args = #{<<"$resource">> := ResId0}}) ->
|
||||||
|
ResId0 =:= ResId;
|
||||||
|
(_) ->
|
||||||
|
false
|
||||||
end, Actions).
|
end, Actions).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -439,8 +435,6 @@ delete_resource_type(Type) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
%% Enable stats timer
|
|
||||||
ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0),
|
|
||||||
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
|
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
|
||||||
{read_concurrency, true}]),
|
{read_concurrency, true}]),
|
||||||
{ok, #{}}.
|
{ok, #{}}.
|
||||||
|
@ -466,7 +460,7 @@ handle_info(Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
emqx_stats:cancel_update(rule_registery_stats).
|
ok.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -475,13 +469,6 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Private functions
|
%% Private functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
update_stats() ->
|
|
||||||
lists:foreach(
|
|
||||||
fun({Tab, Stat, MaxStat}) ->
|
|
||||||
Size = mnesia:table_info(Tab, size),
|
|
||||||
emqx_stats:setstat(Stat, MaxStat, Size)
|
|
||||||
end, ?STATS).
|
|
||||||
|
|
||||||
get_all_records(Tab) ->
|
get_all_records(Tab) ->
|
||||||
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
|
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
|
||||||
ets:tab2list(Tab).
|
ets:tab2list(Tab).
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_sn,
|
{application, emqx_sn,
|
||||||
[{description, "EMQ X MQTT-SN Plugin"},
|
[{description, "EMQ X MQTT-SN Plugin"},
|
||||||
{vsn, "4.3.2"}, % strict semver, bump manually!
|
{vsn, "4.3.3"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,esockd]},
|
{applications, [kernel,stdlib,esockd]},
|
||||||
|
|
|
@ -1,11 +1,17 @@
|
||||||
%% -*-: erlang -*-
|
%% -*-: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
|
{"4.3.2", [
|
||||||
|
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{<<"4.3.[0-1]">>, [
|
{<<"4.3.[0-1]">>, [
|
||||||
{restart_application, emqx_sn}
|
{restart_application, emqx_sn}
|
||||||
]}
|
]}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
|
{"4.3.2", [
|
||||||
|
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{<<"4.3.[0-1]">>, [
|
{<<"4.3.[0-1]">>, [
|
||||||
{restart_application, emqx_sn}
|
{restart_application, emqx_sn}
|
||||||
]}
|
]}
|
||||||
|
|
|
@ -250,8 +250,9 @@ wait_for_will_topic(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, _State)
|
||||||
% ignore
|
% ignore
|
||||||
keep_state_and_data;
|
keep_state_and_data;
|
||||||
|
|
||||||
wait_for_will_topic(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
|
wait_for_will_topic(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
|
||||||
do_2nd_connect(Flags, Duration, ClientId, State);
|
?LOG(warning, "Receive connect packet in wait_for_will_topic state", []),
|
||||||
|
keep_state_and_data;
|
||||||
|
|
||||||
wait_for_will_topic(cast, {outgoing, Packet}, State) ->
|
wait_for_will_topic(cast, {outgoing, Packet}, State) ->
|
||||||
{keep_state, handle_outgoing(Packet, State)};
|
{keep_state, handle_outgoing(Packet, State)};
|
||||||
|
@ -275,9 +276,9 @@ wait_for_will_msg(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, _State) -
|
||||||
% ignore
|
% ignore
|
||||||
keep_state_and_data;
|
keep_state_and_data;
|
||||||
|
|
||||||
%% XXX: ?? Why we will handling the 2nd CONNECT packet ??
|
wait_for_will_msg(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
|
||||||
wait_for_will_msg(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
|
?LOG(warning, "Receive connect packet in wait_for_will_msg state", []),
|
||||||
do_2nd_connect(Flags, Duration, ClientId, State);
|
keep_state_and_data;
|
||||||
|
|
||||||
wait_for_will_msg(cast, {outgoing, Packet}, State) ->
|
wait_for_will_msg(cast, {outgoing, Packet}, State) ->
|
||||||
{keep_state, handle_outgoing(Packet, State)};
|
{keep_state, handle_outgoing(Packet, State)};
|
||||||
|
@ -365,8 +366,9 @@ connected(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, State) ->
|
||||||
% ignore
|
% ignore
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
connected(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
|
connected(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
|
||||||
do_2nd_connect(Flags, Duration, ClientId, State);
|
?LOG(warning, "Receive connect packet in wait_for_will_topic state", []),
|
||||||
|
keep_state_and_data;
|
||||||
|
|
||||||
connected(cast, {outgoing, Packet}, State) ->
|
connected(cast, {outgoing, Packet}, State) ->
|
||||||
{keep_state, handle_outgoing(Packet, State)};
|
{keep_state, handle_outgoing(Packet, State)};
|
||||||
|
@ -826,15 +828,17 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
|
||||||
clean_start = CleanStart,
|
clean_start = CleanStart,
|
||||||
username = State#state.username,
|
username = State#state.username,
|
||||||
password = State#state.password,
|
password = State#state.password,
|
||||||
|
proto_name = <<"MQTT-SN">>,
|
||||||
keepalive = Duration,
|
keepalive = Duration,
|
||||||
properties = OnlyOneInflight
|
properties = OnlyOneInflight,
|
||||||
|
proto_ver = 1
|
||||||
},
|
},
|
||||||
case WillFlag of
|
case WillFlag of
|
||||||
true -> State0 = send_message(?SN_WILLTOPICREQ_MSG(), State),
|
true -> State0 = send_message(?SN_WILLTOPICREQ_MSG(), State),
|
||||||
NState = State0#state{connpkt = ConnPkt,
|
NState = State0#state{connpkt = ConnPkt,
|
||||||
clientid = ClientId,
|
clientid = ClientId,
|
||||||
keepalive_interval = Duration
|
keepalive_interval = Duration
|
||||||
},
|
},
|
||||||
{next_state, wait_for_will_topic, NState};
|
{next_state, wait_for_will_topic, NState};
|
||||||
false ->
|
false ->
|
||||||
NState = State#state{clientid = ClientId,
|
NState = State#state{clientid = ClientId,
|
||||||
|
@ -843,26 +847,6 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
|
||||||
handle_incoming(?CONNECT_PACKET(ConnPkt), NState)
|
handle_incoming(?CONNECT_PACKET(ConnPkt), NState)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
|
|
||||||
peername = Peername,
|
|
||||||
channel = Channel}) ->
|
|
||||||
emqx_logger:set_metadata_clientid(ClientId),
|
|
||||||
#mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags,
|
|
||||||
NChannel = case CleanStart of
|
|
||||||
true ->
|
|
||||||
emqx_channel:terminate(normal, Channel),
|
|
||||||
emqx_sn_registry:unregister_topic(ClientId),
|
|
||||||
emqx_channel:init(#{socktype => udp,
|
|
||||||
sockname => Sockname,
|
|
||||||
peername => Peername,
|
|
||||||
peercert => ?NO_PEERCERT,
|
|
||||||
conn_mod => ?MODULE
|
|
||||||
}, ?DEFAULT_CHAN_OPTIONS);
|
|
||||||
false -> Channel
|
|
||||||
end,
|
|
||||||
NState = State#state{channel = NChannel},
|
|
||||||
do_connect(ClientId, CleanStart, Will, Duration, NState).
|
|
||||||
|
|
||||||
handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
|
handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
|
||||||
State=#state{channel = Channel}) ->
|
State=#state{channel = Channel}) ->
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
|
|
|
@ -98,19 +98,6 @@ t_connect(_) ->
|
||||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
t_do_2nd_connect(_) ->
|
|
||||||
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
||||||
ClientId = ?CLIENTID,
|
|
||||||
send_connect_msg(Socket, ClientId),
|
|
||||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
||||||
timer:sleep(100),
|
|
||||||
send_connect_msg(Socket, <<"client_id_other">>),
|
|
||||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
||||||
|
|
||||||
send_disconnect_msg(Socket, undefined),
|
|
||||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
||||||
gen_udp:close(Socket).
|
|
||||||
|
|
||||||
t_subscribe(_) ->
|
t_subscribe(_) ->
|
||||||
Dup = 0,
|
Dup = 0,
|
||||||
QoS = 0,
|
QoS = 0,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_web_hook,
|
{application, emqx_web_hook,
|
||||||
[{description, "EMQ X WebHook Plugin"},
|
[{description, "EMQ X WebHook Plugin"},
|
||||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_web_hook_sup]},
|
{registered, [emqx_web_hook_sup]},
|
||||||
{applications, [kernel,stdlib,ehttpc]},
|
{applications, [kernel,stdlib,ehttpc]},
|
||||||
|
|
|
@ -2,14 +2,16 @@
|
||||||
|
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
{"4.3.0", [
|
{<<"4.3.[0-1]">>, [
|
||||||
{load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
|
{restart_application, emqx_web_hook},
|
||||||
]},
|
{apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
|
||||||
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
{"4.3.0", [
|
{<<"4.3.[0-1]">>, [
|
||||||
{load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
|
{restart_application, emqx_web_hook},
|
||||||
|
{apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
|
|
|
@ -292,7 +292,7 @@ parse_action_params(Params = #{<<"url">> := URL}) ->
|
||||||
Headers = headers(maps:get(<<"headers">>, Params, undefined)),
|
Headers = headers(maps:get(<<"headers">>, Params, undefined)),
|
||||||
NHeaders = ensure_content_type_header(Headers, Method),
|
NHeaders = ensure_content_type_header(Headers, Method),
|
||||||
#{method => Method,
|
#{method => Method,
|
||||||
path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))),
|
path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)),
|
||||||
headers => NHeaders,
|
headers => NHeaders,
|
||||||
body => maps:get(<<"body">>, Params, <<>>),
|
body => maps:get(<<"body">>, Params, <<>>),
|
||||||
request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
|
request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
|
||||||
|
@ -306,8 +306,16 @@ ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =
|
||||||
ensure_content_type_header(Headers, _Method) ->
|
ensure_content_type_header(Headers, _Method) ->
|
||||||
lists:keydelete("content-type", 1, Headers).
|
lists:keydelete("content-type", 1, Headers).
|
||||||
|
|
||||||
path(<<>>) -> <<"/">>;
|
merge_path(CommonPath, <<>>) ->
|
||||||
path(Path) -> Path.
|
CommonPath;
|
||||||
|
merge_path(CommonPath, Path0) ->
|
||||||
|
case emqx_http_lib:uri_parse(Path0) of
|
||||||
|
{ok, #{path := Path1, 'query' := Query}} ->
|
||||||
|
Path2 = filename:join(CommonPath, Path1),
|
||||||
|
<<Path2/binary, "?", Query/binary>>;
|
||||||
|
{ok, #{path := Path1}} ->
|
||||||
|
filename:join(CommonPath, Path1)
|
||||||
|
end.
|
||||||
|
|
||||||
method(GET) when GET == <<"GET">>; GET == <<"get">> -> get;
|
method(GET) when GET == <<"GET">>; GET == <<"get">> -> get;
|
||||||
method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
|
method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
|
||||||
|
|
|
@ -42,10 +42,9 @@ stop(_State) ->
|
||||||
translate_env() ->
|
translate_env() ->
|
||||||
{ok, URL} = application:get_env(?APP, url),
|
{ok, URL} = application:get_env(?APP, url),
|
||||||
{ok, #{host := Host,
|
{ok, #{host := Host,
|
||||||
path := Path0,
|
|
||||||
port := Port,
|
port := Port,
|
||||||
scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
|
scheme := Scheme} = URIMap} = emqx_http_lib:uri_parse(URL),
|
||||||
Path = path(Path0),
|
Path = path(URIMap),
|
||||||
PoolSize = application:get_env(?APP, pool_size, 32),
|
PoolSize = application:get_env(?APP, pool_size, 32),
|
||||||
MoreOpts = case Scheme of
|
MoreOpts = case Scheme of
|
||||||
http ->
|
http ->
|
||||||
|
@ -89,9 +88,13 @@ translate_env() ->
|
||||||
NHeaders = set_content_type(Headers),
|
NHeaders = set_content_type(Headers),
|
||||||
application:set_env(?APP, headers, NHeaders).
|
application:set_env(?APP, headers, NHeaders).
|
||||||
|
|
||||||
path("") ->
|
path(#{path := "", 'query' := Query}) ->
|
||||||
|
"?" ++ Query;
|
||||||
|
path(#{path := Path, 'query' := Query}) ->
|
||||||
|
Path ++ "?" ++ Query;
|
||||||
|
path(#{path := ""}) ->
|
||||||
"/";
|
"/";
|
||||||
path(Path) ->
|
path(#{path := Path}) ->
|
||||||
Path.
|
Path.
|
||||||
|
|
||||||
set_content_type(Headers) ->
|
set_content_type(Headers) ->
|
||||||
|
|
|
@ -26,7 +26,9 @@ COPY . /emqx
|
||||||
ARG PKG_VSN
|
ARG PKG_VSN
|
||||||
ARG EMQX_NAME=emqx
|
ARG EMQX_NAME=emqx
|
||||||
|
|
||||||
RUN cd /emqx && make $EMQX_NAME
|
RUN cd /emqx \
|
||||||
|
&& rm -rf _build/$EMQX_NAME/lib \
|
||||||
|
&& make $EMQX_NAME
|
||||||
|
|
||||||
FROM $RUN_FROM
|
FROM $RUN_FROM
|
||||||
|
|
||||||
|
|
|
@ -83,7 +83,7 @@ These environment variables will ignore for configuration file.
|
||||||
|
|
||||||
The list is incomplete and may changed with [etc/emqx.conf](https://github.com/emqx/emqx/blob/master/etc/emqx.conf) and plugin configuration files. But the mapping rule is similar.
|
The list is incomplete and may changed with [etc/emqx.conf](https://github.com/emqx/emqx/blob/master/etc/emqx.conf) and plugin configuration files. But the mapping rule is similar.
|
||||||
|
|
||||||
If set ``EMQX_NAME`` and ``EMQX_HOST``, and unset ``EMQX_NODE__NAME``, ``EMQX_NODE__NAME=$EMQX_NAME@$EMQX_HOST``.
|
If set ``EMQX_NAME`` and ``EMQX_HOST``, and unset ``EMQX_NODE_NAME``, ``EMQX_NODE_NAME=$EMQX_NAME@$EMQX_HOST``.
|
||||||
|
|
||||||
For example, set mqtt tcp port to 1883
|
For example, set mqtt tcp port to 1883
|
||||||
|
|
||||||
|
|
|
@ -343,8 +343,8 @@ rpc.port_discovery = stateless
|
||||||
## Number of outgoing RPC connections.
|
## Number of outgoing RPC connections.
|
||||||
##
|
##
|
||||||
## Value: Interger [0-256]
|
## Value: Interger [0-256]
|
||||||
## Defaults to NumberOfCPUSchedulers / 2 when set to 0
|
## Default = 1
|
||||||
#rpc.tcp_client_num = 0
|
#rpc.tcp_client_num = 1
|
||||||
|
|
||||||
## RCP Client connect timeout.
|
## RCP Client connect timeout.
|
||||||
##
|
##
|
||||||
|
@ -1849,7 +1849,7 @@ listener.wss.external.acceptors = 4
|
||||||
## Maximum number of concurrent MQTT/Webwocket/SSL connections.
|
## Maximum number of concurrent MQTT/Webwocket/SSL connections.
|
||||||
##
|
##
|
||||||
## Value: Number
|
## Value: Number
|
||||||
listener.wss.external.max_connections = 16
|
listener.wss.external.max_connections = 102400
|
||||||
|
|
||||||
## Maximum MQTT/WebSocket/SSL connections per second.
|
## Maximum MQTT/WebSocket/SSL connections per second.
|
||||||
##
|
##
|
||||||
|
|
|
@ -30,11 +30,13 @@
|
||||||
%% MQTT Protocol Version and Names
|
%% MQTT Protocol Version and Names
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(MQTT_SN_PROTO_V1, 1).
|
||||||
-define(MQTT_PROTO_V3, 3).
|
-define(MQTT_PROTO_V3, 3).
|
||||||
-define(MQTT_PROTO_V4, 4).
|
-define(MQTT_PROTO_V4, 4).
|
||||||
-define(MQTT_PROTO_V5, 5).
|
-define(MQTT_PROTO_V5, 5).
|
||||||
|
|
||||||
-define(PROTOCOL_NAMES, [
|
-define(PROTOCOL_NAMES, [
|
||||||
|
{?MQTT_SN_PROTO_V1, <<"MQTT-SN">>}, %% XXX:Compatible with emqx-sn plug-in
|
||||||
{?MQTT_PROTO_V3, <<"MQIsdp">>},
|
{?MQTT_PROTO_V3, <<"MQIsdp">>},
|
||||||
{?MQTT_PROTO_V4, <<"MQTT">>},
|
{?MQTT_PROTO_V4, <<"MQTT">>},
|
||||||
{?MQTT_PROTO_V5, <<"MQTT">>}]).
|
{?MQTT_PROTO_V5, <<"MQTT">>}]).
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
-ifndef(EMQX_ENTERPRISE).
|
-ifndef(EMQX_ENTERPRISE).
|
||||||
|
|
||||||
-define(EMQX_RELEASE, {opensource, "4.3.3"}).
|
-define(EMQX_RELEASE, {opensource, "4.3.5"}).
|
||||||
|
|
||||||
-else.
|
-else.
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_dashboard,
|
{application, emqx_dashboard,
|
||||||
[{description, "EMQ X Web Dashboard"},
|
[{description, "EMQ X Web Dashboard"},
|
||||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_dashboard_sup]},
|
{registered, [emqx_dashboard_sup]},
|
||||||
{applications, [kernel,stdlib,mnesia,minirest]},
|
{applications, [kernel,stdlib,mnesia,minirest]},
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
%% -*- mode: erlang -*-
|
||||||
|
{VSN,
|
||||||
|
[ {"4.3.0",
|
||||||
|
%% load all plugins
|
||||||
|
%% NOTE: this depends on the fact that emqx_dashboard is always
|
||||||
|
%% the last application gets upgraded
|
||||||
|
[ {apply, {emqx_plugins, load, []}}
|
||||||
|
]},
|
||||||
|
{<<".*">>, []}
|
||||||
|
],
|
||||||
|
[ {"4.3.0",
|
||||||
|
[ {apply, {emqx_plugins, load, []}}
|
||||||
|
]},
|
||||||
|
{<<".*">>, []}
|
||||||
|
]
|
||||||
|
}.
|
|
@ -52,58 +52,57 @@ description() ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) ->
|
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) ->
|
||||||
Presence = connected_presence(ClientInfo, ConnInfo),
|
Presence = common_infos(ClientInfo, ConnInfo),
|
||||||
case emqx_json:safe_encode(Presence) of
|
NPresence = Presence#{
|
||||||
|
connack => 0, %% XXX: connack will be removed in 5.0
|
||||||
|
keepalive => maps:get(keepalive, ConnInfo, 0),
|
||||||
|
clean_start => maps:get(clean_start, ConnInfo, true),
|
||||||
|
expiry_interval => maps:get(expiry_interval, ConnInfo, 0),
|
||||||
|
connected_at => maps:get(connected_at, ConnInfo)
|
||||||
|
},
|
||||||
|
case emqx_json:safe_encode(NPresence) of
|
||||||
{ok, Payload} ->
|
{ok, Payload} ->
|
||||||
emqx_broker:safe_publish(
|
emqx_broker:safe_publish(
|
||||||
make_msg(qos(Env), topic(connected, ClientId), Payload));
|
make_msg(qos(Env), topic(connected, ClientId), Payload));
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
|
?LOG(error, "Failed to encode 'connected' presence: ~p", [NPresence])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username},
|
on_client_disconnected(ClientInfo = #{clientid := ClientId},
|
||||||
Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
|
Reason, ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
|
||||||
Presence = #{clientid => ClientId,
|
|
||||||
username => Username,
|
Presence = common_infos(ClientInfo, ConnInfo),
|
||||||
reason => reason(Reason),
|
NPresence = Presence#{
|
||||||
disconnected_at => DisconnectedAt,
|
reason => reason(Reason),
|
||||||
ts => erlang:system_time(millisecond)
|
disconnected_at => DisconnectedAt
|
||||||
},
|
},
|
||||||
case emqx_json:safe_encode(Presence) of
|
case emqx_json:safe_encode(NPresence) of
|
||||||
{ok, Payload} ->
|
{ok, Payload} ->
|
||||||
emqx_broker:safe_publish(
|
emqx_broker:safe_publish(
|
||||||
make_msg(qos(Env), topic(disconnected, ClientId), Payload));
|
make_msg(qos(Env), topic(disconnected, ClientId), Payload));
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
|
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [NPresence])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
connected_presence(#{peerhost := PeerHost,
|
common_infos(
|
||||||
sockport := SockPort,
|
_ClientInfo = #{clientid := ClientId,
|
||||||
clientid := ClientId,
|
username := Username,
|
||||||
username := Username
|
peerhost := PeerHost,
|
||||||
},
|
sockport := SockPort
|
||||||
#{clean_start := CleanStart,
|
},
|
||||||
proto_name := ProtoName,
|
_ConnInfo = #{proto_name := ProtoName,
|
||||||
proto_ver := ProtoVer,
|
proto_ver := ProtoVer
|
||||||
keepalive := Keepalive,
|
}) ->
|
||||||
connected_at := ConnectedAt,
|
|
||||||
expiry_interval := ExpiryInterval
|
|
||||||
}) ->
|
|
||||||
#{clientid => ClientId,
|
#{clientid => ClientId,
|
||||||
username => Username,
|
username => Username,
|
||||||
ipaddress => ntoa(PeerHost),
|
ipaddress => ntoa(PeerHost),
|
||||||
sockport => SockPort,
|
sockport => SockPort,
|
||||||
proto_name => ProtoName,
|
proto_name => ProtoName,
|
||||||
proto_ver => ProtoVer,
|
proto_ver => ProtoVer,
|
||||||
keepalive => Keepalive,
|
|
||||||
connack => 0, %% Deprecated?
|
|
||||||
clean_start => CleanStart,
|
|
||||||
expiry_interval => ExpiryInterval,
|
|
||||||
connected_at => ConnectedAt,
|
|
||||||
ts => erlang:system_time(millisecond)
|
ts => erlang:system_time(millisecond)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_modules,
|
{application, emqx_modules,
|
||||||
[{description, "EMQ X Module Management"},
|
[{description, "EMQ X Module Management"},
|
||||||
{vsn, "4.3.2"},
|
{vsn, "4.3.3"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{applications, [kernel,stdlib]},
|
{applications, [kernel,stdlib]},
|
||||||
{mod, {emqx_modules_app, []}},
|
{mod, {emqx_modules_app, []}},
|
||||||
|
|
|
@ -1,21 +1,31 @@
|
||||||
%% -*-: erlang -*-
|
%% -*-: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
|
{"4.3.2", [
|
||||||
|
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{"4.3.1", [
|
{"4.3.1", [
|
||||||
|
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{"4.3.0", [
|
{"4.3.0", [
|
||||||
{update, emqx_mod_delayed, {advanced, []}},
|
{update, emqx_mod_delayed, {advanced, []}},
|
||||||
|
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
|
{"4.3.2", [
|
||||||
|
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{"4.3.1", [
|
{"4.3.1", [
|
||||||
|
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{"4.3.0", [
|
{"4.3.0", [
|
||||||
{update, emqx_mod_delayed, {advanced, []}},
|
{update, emqx_mod_delayed, {advanced, []}},
|
||||||
|
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
|
|
|
@ -376,7 +376,7 @@ end}.
|
||||||
|
|
||||||
{translation, "gen_rpc.tcp_client_num", fun(Conf) ->
|
{translation, "gen_rpc.tcp_client_num", fun(Conf) ->
|
||||||
case cuttlefish:conf_get("rpc.tcp_client_num", Conf) of
|
case cuttlefish:conf_get("rpc.tcp_client_num", Conf) of
|
||||||
0 -> max(1, erlang:system_info(schedulers) div 2);
|
0 -> 1; %% keep allowing 0 for backward compatibility
|
||||||
V -> V
|
V -> V
|
||||||
end
|
end
|
||||||
end}.
|
end}.
|
||||||
|
@ -582,17 +582,6 @@ end}.
|
||||||
hidden
|
hidden
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%% disable lager
|
|
||||||
{mapping, "lager.handlers", "lager.handlers", [
|
|
||||||
{default, []},
|
|
||||||
hidden
|
|
||||||
]}.
|
|
||||||
{mapping, "lager.crash_log", "lager.crash_log", [
|
|
||||||
{default, off},
|
|
||||||
{datatype, flag},
|
|
||||||
hidden
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{translation, "kernel.logger_level", fun(_, _, Conf) ->
|
{translation, "kernel.logger_level", fun(_, _, Conf) ->
|
||||||
cuttlefish:conf_get("log.level", Conf)
|
cuttlefish:conf_get("log.level", Conf)
|
||||||
end}.
|
end}.
|
||||||
|
|
|
@ -46,7 +46,7 @@
|
||||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1"}}}
|
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1"}}}
|
||||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
||||||
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.5"}}}
|
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.5"}}}
|
||||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}
|
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.6"}}}
|
||||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
|
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
|
||||||
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
|
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
|
||||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
||||||
|
|
|
@ -128,23 +128,30 @@ prod_compile_opts() ->
|
||||||
prod_overrides() ->
|
prod_overrides() ->
|
||||||
[{add, [ {erl_opts, [deterministic]}]}].
|
[{add, [ {erl_opts, [deterministic]}]}].
|
||||||
|
|
||||||
|
relup_deps(Profile) ->
|
||||||
|
{post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "scripts/inject-deps.escript " ++ atom_to_list(Profile)}]}.
|
||||||
|
|
||||||
profiles() ->
|
profiles() ->
|
||||||
Vsn = get_vsn(),
|
Vsn = get_vsn(),
|
||||||
[ {'emqx', [ {erl_opts, prod_compile_opts()}
|
[ {'emqx', [ {erl_opts, prod_compile_opts()}
|
||||||
, {relx, relx(Vsn, cloud, bin)}
|
, {relx, relx(Vsn, cloud, bin)}
|
||||||
, {overrides, prod_overrides()}
|
, {overrides, prod_overrides()}
|
||||||
|
, relup_deps('emqx')
|
||||||
]}
|
]}
|
||||||
, {'emqx-pkg', [ {erl_opts, prod_compile_opts()}
|
, {'emqx-pkg', [ {erl_opts, prod_compile_opts()}
|
||||||
, {relx, relx(Vsn, cloud, pkg)}
|
, {relx, relx(Vsn, cloud, pkg)}
|
||||||
, {overrides, prod_overrides()}
|
, {overrides, prod_overrides()}
|
||||||
|
, relup_deps('emqx-pkg')
|
||||||
]}
|
]}
|
||||||
, {'emqx-edge', [ {erl_opts, prod_compile_opts()}
|
, {'emqx-edge', [ {erl_opts, prod_compile_opts()}
|
||||||
, {relx, relx(Vsn, edge, bin)}
|
, {relx, relx(Vsn, edge, bin)}
|
||||||
, {overrides, prod_overrides()}
|
, {overrides, prod_overrides()}
|
||||||
|
, relup_deps('emqx-edge')
|
||||||
]}
|
]}
|
||||||
, {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()}
|
, {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()}
|
||||||
, {relx, relx(Vsn, edge, pkg)}
|
, {relx, relx(Vsn, edge, pkg)}
|
||||||
, {overrides, prod_overrides()}
|
, {overrides, prod_overrides()}
|
||||||
|
, relup_deps('emqx-edge-pkg')
|
||||||
]}
|
]}
|
||||||
, {check, [ {erl_opts, common_compile_opts()}
|
, {check, [ {erl_opts, common_compile_opts()}
|
||||||
]}
|
]}
|
||||||
|
|
|
@ -0,0 +1,157 @@
|
||||||
|
#!/usr/bin/env escript
|
||||||
|
|
||||||
|
%% This script injects implicit relup dependencies for emqx applications.
|
||||||
|
%%
|
||||||
|
%% By 'implicit', it means that it is not feasible to define application
|
||||||
|
%% dependencies in .app.src files.
|
||||||
|
%%
|
||||||
|
%% For instance, during upgrade/downgrade, emqx_dashboard usually requires
|
||||||
|
%% a restart after (but not before) all plugins are upgraded (and maybe
|
||||||
|
%% restarted), however, the dependencies are not resolvable at build time
|
||||||
|
%% when relup is generated.
|
||||||
|
%%
|
||||||
|
%% This script is to be executed after compile, with the profile given as the
|
||||||
|
%% first argument. For each dependency, it modifies the .app file to
|
||||||
|
%% have the `relup_deps` list extended to application attributes.
|
||||||
|
%%
|
||||||
|
%% The `relup_deps` application attribute is then picked up by (EMQ's fork of)
|
||||||
|
%% `relx` when top-sorting apps to generate relup instructions
|
||||||
|
|
||||||
|
-mode(compile).
|
||||||
|
|
||||||
|
usage() ->
|
||||||
|
"Usage: " ++ escript:script_name() ++ " emqx|emqx-edge".
|
||||||
|
|
||||||
|
-type app() :: atom().
|
||||||
|
-type deps_overlay() :: {re, string()} | app().
|
||||||
|
|
||||||
|
%% deps/0 returns the dependency overlays.
|
||||||
|
%% {re, Pattern} to match application names using regexp pattern
|
||||||
|
-spec deps(string()) -> [{app(), [deps_overlay()]}].
|
||||||
|
deps("emqx-edge" ++ _) ->
|
||||||
|
%% special case for edge
|
||||||
|
base_deps() ++ [{{re, ".+"}, [{exclude, App} || App <- edge_excludes()]}];
|
||||||
|
deps(_Profile) ->
|
||||||
|
base_deps().
|
||||||
|
|
||||||
|
edge_excludes() ->
|
||||||
|
[ emqx_lwm2m
|
||||||
|
, emqx_auth_ldap
|
||||||
|
, emqx_auth_pgsql
|
||||||
|
, emqx_auth_redis
|
||||||
|
, emqx_auth_mongo
|
||||||
|
, emqx_lua_hook
|
||||||
|
, emqx_exhook
|
||||||
|
, emqx_exproto
|
||||||
|
, emqx_prometheus
|
||||||
|
, emqx_psk_file
|
||||||
|
].
|
||||||
|
|
||||||
|
base_deps() ->
|
||||||
|
%% make sure emqx_dashboard depends on all other emqx_xxx apps
|
||||||
|
%% so the appup instructions for emqx_dashboard is always the last
|
||||||
|
%% to be executed
|
||||||
|
[ {emqx_dashboard, [{re, "emqx_.*"}]}
|
||||||
|
, {emqx_management, [{re, "emqx_.*"}, {exclude, emqx_dashboard}]}
|
||||||
|
, {{re, "emqx_.*"}, [emqx]}
|
||||||
|
].
|
||||||
|
|
||||||
|
main([Profile | _]) ->
|
||||||
|
ok = inject(Profile);
|
||||||
|
main(_Args) ->
|
||||||
|
io:format(standard_error, "~s", [usage()]),
|
||||||
|
erlang:halt(1).
|
||||||
|
|
||||||
|
expand_names({Name, Deps}, AppNames) ->
|
||||||
|
Names = match_pattern(Name, AppNames),
|
||||||
|
[{N, Deps} || N <- Names].
|
||||||
|
|
||||||
|
%% merge k-v pairs with v1 ++ v2
|
||||||
|
merge([], Acc) -> Acc;
|
||||||
|
merge([{K, V0} | Rest], Acc) ->
|
||||||
|
V = case lists:keyfind(K, 1, Acc) of
|
||||||
|
{K, V1} -> V1 ++ V0;
|
||||||
|
false -> V0
|
||||||
|
end,
|
||||||
|
NewAcc = lists:keystore(K, 1, Acc, {K, V}),
|
||||||
|
merge(Rest, NewAcc).
|
||||||
|
|
||||||
|
expand_deps([], _AppNames, Acc) -> Acc;
|
||||||
|
expand_deps([{exclude, Dep} | Deps], AppNames, Acc) ->
|
||||||
|
Matches = expand_deps([Dep], AppNames, []),
|
||||||
|
expand_deps(Deps, AppNames, Acc -- Matches);
|
||||||
|
expand_deps([Dep | Deps], AppNames, Acc) ->
|
||||||
|
NewAcc = add_to_list(Acc, match_pattern(Dep, AppNames)),
|
||||||
|
expand_deps(Deps, AppNames, NewAcc).
|
||||||
|
|
||||||
|
inject(Profile) ->
|
||||||
|
LibDir = lib_dir(Profile),
|
||||||
|
AppNames = list_apps(LibDir),
|
||||||
|
Deps0 = lists:flatmap(fun(Dep) -> expand_names(Dep, AppNames) end, deps(Profile)),
|
||||||
|
Deps1 = merge(Deps0, []),
|
||||||
|
Deps2 = lists:map(fun({Name, DepsX}) ->
|
||||||
|
NewDeps = expand_deps(DepsX, AppNames, []),
|
||||||
|
{Name, NewDeps}
|
||||||
|
end, Deps1),
|
||||||
|
lists:foreach(fun({App, Deps}) -> inject(App, Deps, LibDir) end, Deps2).
|
||||||
|
|
||||||
|
%% list the profile/lib dir to get all apps
|
||||||
|
list_apps(LibDir) ->
|
||||||
|
Apps = filelib:wildcard("*", LibDir),
|
||||||
|
lists:foldl(fun(App, Acc) -> [App || is_app(LibDir, App)] ++ Acc end, [], Apps).
|
||||||
|
|
||||||
|
is_app(_LibDir, "." ++ _) -> false; %% ignore hidden dir
|
||||||
|
is_app(LibDir, AppName) ->
|
||||||
|
Path = filename:join([ebin_dir(LibDir, AppName), AppName ++ ".app"]),
|
||||||
|
filelib:is_regular(Path) orelse error({unknown_app, AppName, Path}). %% wtf
|
||||||
|
|
||||||
|
lib_dir(Profile) ->
|
||||||
|
filename:join(["_build", Profile, lib]).
|
||||||
|
|
||||||
|
ebin_dir(LibDir, AppName) -> filename:join([LibDir, AppName, "ebin"]).
|
||||||
|
|
||||||
|
inject(App0, DepsToAdd, LibDir) ->
|
||||||
|
App = str(App0),
|
||||||
|
AppEbinDir = ebin_dir(LibDir, App),
|
||||||
|
[AppFile0] = filelib:wildcard("*.app", AppEbinDir),
|
||||||
|
AppFile = filename:join(AppEbinDir, AppFile0),
|
||||||
|
{ok, [{application, AppName, Props}]} = file:consult(AppFile),
|
||||||
|
Deps0 = case lists:keyfind(relup_deps, 1, Props) of
|
||||||
|
{_, X} -> X;
|
||||||
|
false -> []
|
||||||
|
end,
|
||||||
|
%% merge extra deps, but do not self-include
|
||||||
|
Deps = add_to_list(Deps0, DepsToAdd) -- [App0],
|
||||||
|
case Deps =:= [] of
|
||||||
|
true -> ok;
|
||||||
|
_ ->
|
||||||
|
NewProps = lists:keystore(relup_deps, 1, Props, {relup_deps, Deps}),
|
||||||
|
AppSpec = {application, AppName, NewProps},
|
||||||
|
AppSpecIoData = io_lib:format("~p.", [AppSpec]),
|
||||||
|
io:format(user, "updated_relup_deps for ~p~n", [App]),
|
||||||
|
file:write_file(AppFile, AppSpecIoData)
|
||||||
|
end.
|
||||||
|
|
||||||
|
str(A) when is_atom(A) -> atom_to_list(A).
|
||||||
|
|
||||||
|
match_pattern({re, Re}, AppNames) ->
|
||||||
|
Match = fun(AppName) -> re:run(AppName, Re) =/= nomatch end,
|
||||||
|
AppNamesToAdd = lists:filter(Match, AppNames),
|
||||||
|
AppsToAdd = lists:map(fun(N) -> list_to_atom(N) end, AppNamesToAdd),
|
||||||
|
case AppsToAdd =:= [] of
|
||||||
|
true -> error({nomatch, Re});
|
||||||
|
false -> AppsToAdd
|
||||||
|
end;
|
||||||
|
match_pattern(NameAtom, AppNames) ->
|
||||||
|
case lists:member(str(NameAtom), AppNames) of
|
||||||
|
true -> [NameAtom];
|
||||||
|
false -> error({notfound, NameAtom})
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Append elements to list without duplication. No reordering.
|
||||||
|
add_to_list(List, []) -> List;
|
||||||
|
add_to_list(List, [H | T]) ->
|
||||||
|
case lists:member(H, List) of
|
||||||
|
true -> add_to_list(List, T);
|
||||||
|
false -> add_to_list(List ++ [H], T)
|
||||||
|
end.
|
|
@ -1,7 +1,7 @@
|
||||||
{application, emqx,
|
{application, emqx,
|
||||||
[{id, "emqx"},
|
[{id, "emqx"},
|
||||||
{description, "EMQ X"},
|
{description, "EMQ X"},
|
||||||
{vsn, "4.3.3"}, % strict semver, bump manually!
|
{vsn, "4.3.5"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},
|
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},
|
||||||
|
|
|
@ -1,12 +1,31 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.2",
|
[
|
||||||
[{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{"4.3.4",
|
||||||
|
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
|
{"4.3.3",
|
||||||
|
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
|
{"4.3.2",
|
||||||
|
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -18,7 +37,9 @@
|
||||||
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
|
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
@ -34,13 +55,32 @@
|
||||||
{apply,{emqx_metrics,upgrade_retained_delayed_counter_type,[]}},
|
{apply,{emqx_metrics,upgrade_retained_delayed_counter_type,[]}},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.2",
|
[
|
||||||
[{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{"4.3.4",
|
||||||
|
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
|
{"4.3.3",
|
||||||
|
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
|
{"4.3.2",
|
||||||
|
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -52,7 +92,9 @@
|
||||||
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
|
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -294,6 +294,9 @@ do_discard_session(ClientId, Pid) ->
|
||||||
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
||||||
?tp(debug, "session_already_gone", #{pid => Pid}),
|
?tp(debug, "session_already_gone", #{pid => Pid}),
|
||||||
ok;
|
ok;
|
||||||
|
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
|
||||||
|
?tp(debug, "session_already_gone", #{pid => Pid}),
|
||||||
|
ok;
|
||||||
_ : {{shutdown, _}, _} ->
|
_ : {{shutdown, _}, _} ->
|
||||||
?tp(debug, "session_already_shutdown", #{pid => Pid}),
|
?tp(debug, "session_already_shutdown", #{pid => Pid}),
|
||||||
ok;
|
ok;
|
||||||
|
|
|
@ -336,9 +336,13 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P
|
||||||
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
|
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
|
||||||
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
|
||||||
#emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
%% it `unsubscribed` the last topic.
|
||||||
{noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
%% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually
|
||||||
|
%% be disconnected.
|
||||||
|
% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
||||||
|
% #emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
||||||
|
% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_info({mnesia_table_event, _Event}, State) ->
|
handle_info({mnesia_table_event, _Event}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
@ -348,8 +352,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMo
|
||||||
cleanup_down(SubPid),
|
cleanup_down(SubPid),
|
||||||
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
|
|
|
@ -403,7 +403,10 @@ websocket_close(Reason, State) ->
|
||||||
|
|
||||||
terminate(Reason, _Req, #state{channel = Channel}) ->
|
terminate(Reason, _Req, #state{channel = Channel}) ->
|
||||||
?LOG(debug, "Terminated due to ~p", [Reason]),
|
?LOG(debug, "Terminated due to ~p", [Reason]),
|
||||||
emqx_channel:terminate(Reason, Channel).
|
emqx_channel:terminate(Reason, Channel);
|
||||||
|
|
||||||
|
terminate(_Reason, _Req, _UnExpectedState) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle call
|
%% Handle call
|
||||||
|
|
Loading…
Reference in New Issue