Compare commits
43 Commits
master
...
fix_cm_rpc
Author | SHA1 | Date |
---|---|---|
![]() |
1dbc9c6fd4 | |
![]() |
258dd100f5 | |
![]() |
750cb2d491 | |
![]() |
780e403262 | |
![]() |
05b16c601b | |
![]() |
8935d28ed4 | |
![]() |
0c66fcef00 | |
![]() |
637cd5e804 | |
![]() |
5fbf83e7f0 | |
![]() |
513cd001ac | |
![]() |
868cd6e57c | |
![]() |
a8aabd5f74 | |
![]() |
002cbb6d8b | |
![]() |
e87838f272 | |
![]() |
f18b9a92bc | |
![]() |
49a78c8ef2 | |
![]() |
8ad42cb827 | |
![]() |
f17962e79a | |
![]() |
98c4fff43f | |
![]() |
bfc6c3aa42 | |
![]() |
1a438125c7 | |
![]() |
2092bedb12 | |
![]() |
a6bd1c90d5 | |
![]() |
3ddbdbc6c1 | |
![]() |
2c0916ff05 | |
![]() |
77a41ea88f | |
![]() |
b92940af29 | |
![]() |
bed45417dc | |
![]() |
8110ef7a64 | |
![]() |
ecec9bd2f6 | |
![]() |
6724e59e7a | |
![]() |
5962c9c83c | |
![]() |
c0367fb8dd | |
![]() |
0ecaa80fb8 | |
![]() |
bdd9154001 | |
![]() |
bbed1b55e0 | |
![]() |
074c0bd2cc | |
![]() |
69ef5cbdc3 | |
![]() |
42a6f2aba5 | |
![]() |
0184a1b3e8 | |
![]() |
86766ee7f1 | |
![]() |
8eebdd5cdb | |
![]() |
1f57968c9b |
|
@ -1,7 +1,7 @@
|
|||
{erl_opts, [debug_info]}.
|
||||
{deps,
|
||||
[
|
||||
{minirest, {git, "https://github.com/emqx/minirest.git", {tag, "0.3.5"}}}
|
||||
{minirest, {git, "https://github.com/emqx/minirest.git", {tag, "0.3.6"}}}
|
||||
]}.
|
||||
|
||||
{shell, [
|
||||
|
|
|
@ -43,7 +43,7 @@
|
|||
!sed -i '/emqx_telemetry/d' data/loaded_plugins
|
||||
|
||||
!./bin/emqx start
|
||||
?EMQ X (.*) is started successfully!
|
||||
?EMQ X .* is started successfully!
|
||||
?SH-PROMPT
|
||||
|
||||
!./bin/emqx_ctl cluster join emqx@127.0.0.1
|
||||
|
@ -99,6 +99,10 @@
|
|||
"""
|
||||
?SH-PROMPT
|
||||
|
||||
!./bin/emqx_ctl plugins list | grep emqx_management
|
||||
?Plugin\(emqx_management.*active=true\)
|
||||
?SH-PROMPT
|
||||
|
||||
[shell emqx2]
|
||||
!echo "" > log/emqx.log.1
|
||||
?SH-PROMPT
|
||||
|
@ -120,6 +124,10 @@
|
|||
"""
|
||||
?SH-PROMPT
|
||||
|
||||
!./bin/emqx_ctl plugins list | grep emqx_management
|
||||
?Plugin\(emqx_management.*active=true\)
|
||||
?SH-PROMPT
|
||||
|
||||
[shell bench]
|
||||
???publish complete
|
||||
??SH-PROMPT:
|
||||
|
|
|
@ -83,6 +83,7 @@ jobs:
|
|||
- name: build
|
||||
env:
|
||||
PYTHON: python
|
||||
DIAGNOSTIC: 1
|
||||
run: |
|
||||
$env:PATH = "${{ steps.install_erlang.outputs.erlpath }}\bin;$env:PATH"
|
||||
|
||||
|
@ -168,9 +169,11 @@ jobs:
|
|||
- name: build
|
||||
run: |
|
||||
. $HOME/.kerl/${{ matrix.erl_otp }}/activate
|
||||
make -C source ensure-rebar3
|
||||
sudo cp source/rebar3 /usr/local/bin/rebar3
|
||||
make -C source ${{ matrix.profile }}-zip
|
||||
cd source
|
||||
make ensure-rebar3
|
||||
sudo cp rebar3 /usr/local/bin/rebar3
|
||||
rm -rf _build/${{ matrix.profile }}/lib
|
||||
make ${{ matrix.profile }}-zip
|
||||
- name: test
|
||||
run: |
|
||||
cd source
|
||||
|
@ -465,7 +468,7 @@ jobs:
|
|||
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
|
||||
-H "Accept: application/vnd.github.v3+json" \
|
||||
-X POST \
|
||||
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ee\": \"true\"}}" \
|
||||
-d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ee\": \"true\"}}" \
|
||||
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
|
||||
- name: update repo.emqx.io
|
||||
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
|
||||
|
@ -474,7 +477,7 @@ jobs:
|
|||
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
|
||||
-H "Accept: application/vnd.github.v3+json" \
|
||||
-X POST \
|
||||
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \
|
||||
-d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \
|
||||
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
|
||||
- name: update homebrew packages
|
||||
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
|
||||
|
@ -484,7 +487,7 @@ jobs:
|
|||
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
|
||||
-H "Accept: application/vnd.github.v3+json" \
|
||||
-X POST \
|
||||
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \
|
||||
-d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \
|
||||
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_homebrew.yaml/dispatches"
|
||||
fi
|
||||
- uses: geekyeggo/delete-artifact@v1
|
||||
|
|
|
@ -38,6 +38,11 @@ jobs:
|
|||
run: make ${EMQX_NAME}-zip
|
||||
- name: build deb/rpm packages
|
||||
run: make ${EMQX_NAME}-pkg
|
||||
- uses: actions/upload-artifact@v1
|
||||
if: failure()
|
||||
with:
|
||||
name: rebar3.crashdump
|
||||
path: ./rebar3.crashdump
|
||||
- name: pakcages test
|
||||
run: |
|
||||
export CODE_PATH=$GITHUB_WORKSPACE
|
||||
|
@ -94,6 +99,11 @@ jobs:
|
|||
make ensure-rebar3
|
||||
sudo cp rebar3 /usr/local/bin/rebar3
|
||||
make ${EMQX_NAME}-zip
|
||||
- uses: actions/upload-artifact@v1
|
||||
if: failure()
|
||||
with:
|
||||
name: rebar3.crashdump
|
||||
path: ./rebar3.crashdump
|
||||
- name: test
|
||||
run: |
|
||||
pkg_name=$(basename _packages/${EMQX_NAME}/emqx-*.zip)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_auth_http,
|
||||
[{description, "EMQ X Authentication/ACL with HTTP API"},
|
||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_http_sup]},
|
||||
{applications, [kernel,stdlib,ehttpc]},
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
%% -*-: erlang -*-
|
||||
|
||||
{VSN,
|
||||
[
|
||||
{"4.3.0", [
|
||||
{restart_application, emqx_auth_http}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
],
|
||||
[
|
||||
{"4.3.0", [
|
||||
{restart_application, emqx_auth_http}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
]
|
||||
}.
|
|
@ -54,10 +54,9 @@ translate_env(EnvName) ->
|
|||
{ok, ConnectTimeout} = application:get_env(?APP, connect_timeout),
|
||||
URL = proplists:get_value(url, Req),
|
||||
{ok, #{host := Host,
|
||||
path := Path0,
|
||||
port := Port,
|
||||
scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
|
||||
Path = path(Path0),
|
||||
scheme := Scheme} = URIMap} = emqx_http_lib:uri_parse(URL),
|
||||
Path = path(URIMap),
|
||||
MoreOpts = case Scheme of
|
||||
http ->
|
||||
[{transport_opts, emqx_misc:ipv6_probe([])}];
|
||||
|
@ -151,8 +150,12 @@ ensure_content_type_header(Method, Headers)
|
|||
ensure_content_type_header(_Method, Headers) ->
|
||||
lists:keydelete("content-type", 1, Headers).
|
||||
|
||||
path("") ->
|
||||
path(#{path := "", 'query' := Query}) ->
|
||||
"?" ++ Query;
|
||||
path(#{path := Path, 'query' := Query}) ->
|
||||
Path ++ "?" ++ Query;
|
||||
path(#{path := ""}) ->
|
||||
"/";
|
||||
path(Path) ->
|
||||
path(#{path := Path}) ->
|
||||
Path.
|
||||
|
||||
|
|
|
@ -27,10 +27,6 @@
|
|||
, description/0
|
||||
]).
|
||||
|
||||
-import(proplists, [get_value/2]).
|
||||
|
||||
-import(emqx_auth_ldap_cli, [search/4]).
|
||||
|
||||
-spec(register_metrics() -> ok).
|
||||
register_metrics() ->
|
||||
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
|
||||
|
@ -70,14 +66,14 @@ do_check_acl(#{username := Username}, PubSub, Topic, _NoMatchAction,
|
|||
|
||||
BaseDN = emqx_auth_ldap:replace_vars(CustomBaseDN, ReplaceRules),
|
||||
|
||||
case search(Pool, BaseDN, Filter, [Attribute, Attribute1]) of
|
||||
case emqx_auth_ldap_cli:search(Pool, BaseDN, Filter, [Attribute, Attribute1]) of
|
||||
{error, noSuchObject} ->
|
||||
ok;
|
||||
{ok, #eldap_search_result{entries = []}} ->
|
||||
ok;
|
||||
{ok, #eldap_search_result{entries = [Entry]}} ->
|
||||
Topics = get_value(Attribute, Entry#eldap_entry.attributes)
|
||||
++ get_value(Attribute1, Entry#eldap_entry.attributes),
|
||||
Topics = proplists:get_value(Attribute, Entry#eldap_entry.attributes, [])
|
||||
++ proplists:get_value(Attribute1, Entry#eldap_entry.attributes, []),
|
||||
match(Topic, Topics);
|
||||
Error ->
|
||||
?LOG(error, "[LDAP] search error:~p", [Error]),
|
||||
|
@ -95,4 +91,3 @@ match(Topic, [Filter | Topics]) ->
|
|||
|
||||
description() ->
|
||||
"ACL with LDAP".
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_auth_ldap,
|
||||
[{description, "EMQ X Authentication/ACL with LDAP"},
|
||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_ldap_sup]},
|
||||
{applications, [kernel,stdlib,eldap2,ecpool]},
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.3.0",
|
||||
[{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.0",
|
||||
[{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}]}.
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_exhook,
|
||||
[{description, "EMQ X Extension for Hook"},
|
||||
{vsn, "4.3.1"},
|
||||
{vsn, "4.3.3"},
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{mod, {emqx_exhook_app, []}},
|
||||
|
|
|
@ -1,14 +1,32 @@
|
|||
%% -*-: erlang -*-
|
||||
{VSN,
|
||||
[
|
||||
{"4.3.2", [
|
||||
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.1", [
|
||||
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.0", [
|
||||
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}
|
||||
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
],
|
||||
[
|
||||
{"4.3.2", [
|
||||
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.1", [
|
||||
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.0", [
|
||||
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}
|
||||
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
]
|
||||
|
|
|
@ -88,7 +88,7 @@ init_hooks_cnter() ->
|
|||
try
|
||||
_ = ets:new(?CNTER, [named_table, public]), ok
|
||||
catch
|
||||
exit:badarg:_ ->
|
||||
error:badarg:_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ channel_opts(Opts) ->
|
|||
Scheme = proplists:get_value(scheme, Opts),
|
||||
Host = proplists:get_value(host, Opts),
|
||||
Port = proplists:get_value(port, Opts),
|
||||
SvrAddr = lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])),
|
||||
SvrAddr = format_http_uri(Scheme, Host, Port),
|
||||
ClientOpts = case Scheme of
|
||||
https ->
|
||||
SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
|
||||
|
@ -133,6 +133,13 @@ channel_opts(Opts) ->
|
|||
end,
|
||||
{SvrAddr, ClientOpts}.
|
||||
|
||||
format_http_uri(Scheme, Host0, Port) ->
|
||||
Host = case is_tuple(Host0) of
|
||||
true -> inet:ntoa(Host0);
|
||||
_ -> Host0
|
||||
end,
|
||||
lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])).
|
||||
|
||||
-spec unload(server()) -> ok.
|
||||
unload(#server{name = Name, hookspec = HookSpecs}) ->
|
||||
_ = do_deinit(Name),
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
{deps,
|
||||
[{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.2"}}}
|
||||
[{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.5"}}}
|
||||
]}.
|
||||
|
||||
{profiles,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application,emqx_lwm2m,
|
||||
[{description,"EMQ X LwM2M Gateway"},
|
||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||
{modules,[]},
|
||||
{registered,[emqx_lwm2m_sup]},
|
||||
{applications,[kernel,stdlib,lwm2m_coap]},
|
||||
|
|
|
@ -1,15 +1,13 @@
|
|||
%% -*-: erlang -*-
|
||||
{VSN,
|
||||
[
|
||||
{"4.3.0", [
|
||||
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
{<<"4.3.[0-1]">>, [
|
||||
{restart_application, emqx_lwm2m}
|
||||
]}
|
||||
],
|
||||
[
|
||||
{"4.3.0", [
|
||||
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
{<<"4.3.[0-1]">>, [
|
||||
{restart_application, emqx_lwm2m}
|
||||
]}
|
||||
]
|
||||
}.
|
||||
|
|
|
@ -0,0 +1,162 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lwm2m_api).
|
||||
|
||||
-import(minirest, [return/1]).
|
||||
|
||||
-rest_api(#{name => list,
|
||||
method => 'GET',
|
||||
path => "/lwm2m_channels/",
|
||||
func => list,
|
||||
descr => "A list of all lwm2m channel"
|
||||
}).
|
||||
|
||||
-rest_api(#{name => list,
|
||||
method => 'GET',
|
||||
path => "/nodes/:atom:node/lwm2m_channels/",
|
||||
func => list,
|
||||
descr => "A list of lwm2m channel of a node"
|
||||
}).
|
||||
|
||||
-rest_api(#{name => lookup_cmd,
|
||||
method => 'GET',
|
||||
path => "/lookup_cmd/:bin:ep/",
|
||||
func => lookup_cmd,
|
||||
descr => "Send a lwm2m downlink command"
|
||||
}).
|
||||
|
||||
-rest_api(#{name => lookup_cmd,
|
||||
method => 'GET',
|
||||
path => "/nodes/:atom:node/lookup_cmd/:bin:ep/",
|
||||
func => lookup_cmd,
|
||||
descr => "Send a lwm2m downlink command of a node"
|
||||
}).
|
||||
|
||||
-export([ list/2
|
||||
, lookup_cmd/2
|
||||
]).
|
||||
|
||||
list(#{node := Node }, Params) ->
|
||||
case Node = node() of
|
||||
true -> list(#{}, Params);
|
||||
_ -> rpc_call(Node, list, [#{}, Params])
|
||||
end;
|
||||
|
||||
list(#{}, _Params) ->
|
||||
Channels = emqx_lwm2m_cm:all_channels(),
|
||||
return({ok, format(Channels)}).
|
||||
|
||||
lookup_cmd(#{ep := Ep, node := Node}, Params) ->
|
||||
case Node = node() of
|
||||
true -> lookup_cmd(#{ep => Ep}, Params);
|
||||
_ -> rpc_call(Node, lookup_cmd, [#{ep => Ep}, Params])
|
||||
end;
|
||||
|
||||
lookup_cmd(#{ep := Ep}, Params) ->
|
||||
MsgType = proplists:get_value(<<"msgType">>, Params),
|
||||
Path0 = proplists:get_value(<<"path">>, Params),
|
||||
case emqx_lwm2m_cm:lookup_cmd(Ep, Path0, MsgType) of
|
||||
[] -> return({ok, []});
|
||||
[{_, undefined} | _] -> return({ok, []});
|
||||
[{{IMEI, Path, MsgType}, undefined}] ->
|
||||
return({ok, [{imei, IMEI},
|
||||
{'msgType', IMEI},
|
||||
{'code', <<"6.01">>},
|
||||
{'codeMsg', <<"reply_not_received">>},
|
||||
{'path', Path}]});
|
||||
[{{IMEI, Path, MsgType}, {Code, CodeMsg, Content}}] ->
|
||||
Payload1 = format_cmd_content(Content, MsgType),
|
||||
return({ok, [{imei, IMEI},
|
||||
{'msgType', IMEI},
|
||||
{'code', Code},
|
||||
{'codeMsg', CodeMsg},
|
||||
{'path', Path}] ++ Payload1})
|
||||
end.
|
||||
|
||||
rpc_call(Node, Fun, Args) ->
|
||||
case rpc:call(Node, ?MODULE, Fun, Args) of
|
||||
{badrpc, Reason} -> {error, Reason};
|
||||
Res -> Res
|
||||
end.
|
||||
|
||||
format(Channels) ->
|
||||
lists:map(fun({IMEI, #{lifetime := LifeTime,
|
||||
peername := Peername,
|
||||
version := Version,
|
||||
reg_info := RegInfo}}) ->
|
||||
ObjectList = lists:map(fun(Path) ->
|
||||
[ObjId | _] = path_list(Path),
|
||||
case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
|
||||
{error, _} ->
|
||||
{Path, Path};
|
||||
ObjDefinition ->
|
||||
ObjectName = emqx_lwm2m_xml_object:get_object_name(ObjDefinition),
|
||||
{Path, list_to_binary(ObjectName)}
|
||||
end
|
||||
end, maps:get(<<"objectList">>, RegInfo)),
|
||||
{IpAddr, Port} = Peername,
|
||||
[{imei, IMEI},
|
||||
{lifetime, LifeTime},
|
||||
{ip_address, iolist_to_binary(ntoa(IpAddr))},
|
||||
{port, Port},
|
||||
{version, Version},
|
||||
{'objectList', ObjectList}]
|
||||
end, Channels).
|
||||
|
||||
format_cmd_content(undefined, _MsgType) -> [];
|
||||
format_cmd_content(Content, <<"discover">>) ->
|
||||
[H | Content1] = Content,
|
||||
{_, [HObjId]} = emqx_lwm2m_coap_resource:parse_object_list(H),
|
||||
[ObjId | _]= path_list(HObjId),
|
||||
ObjectList = case Content1 of
|
||||
[Content2 | _] ->
|
||||
{_, ObjL} = emqx_lwm2m_coap_resource:parse_object_list(Content2),
|
||||
ObjL;
|
||||
[] -> []
|
||||
end,
|
||||
R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
|
||||
{error, _} ->
|
||||
lists:map(fun(Object) -> {Object, Object} end, ObjectList);
|
||||
ObjDefinition ->
|
||||
lists:map(fun(Object) ->
|
||||
[_, _, ResId| _] = path_list(Object),
|
||||
Operations = case emqx_lwm2m_xml_object:get_resource_operations(binary_to_integer(ResId), ObjDefinition) of
|
||||
"E" -> [{operations, list_to_binary("E")}];
|
||||
Oper -> [{'dataType', list_to_binary(emqx_lwm2m_xml_object:get_resource_type(binary_to_integer(ResId), ObjDefinition))},
|
||||
{operations, list_to_binary(Oper)}]
|
||||
end,
|
||||
[{path, Object},
|
||||
{name, list_to_binary(emqx_lwm2m_xml_object:get_resource_name(binary_to_integer(ResId), ObjDefinition))}
|
||||
] ++ Operations
|
||||
end, ObjectList)
|
||||
end,
|
||||
[{content, R}];
|
||||
format_cmd_content(Content, _) ->
|
||||
[{content, Content}].
|
||||
|
||||
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
|
||||
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
|
||||
ntoa(IP) ->
|
||||
inet_parse:ntoa(IP).
|
||||
|
||||
path_list(Path) ->
|
||||
case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of
|
||||
[ObjId, ObjInsId, ResId, ResInstId] -> [ObjId, ObjInsId, ResId, ResInstId];
|
||||
[ObjId, ObjInsId, ResId] -> [ObjId, ObjInsId, ResId];
|
||||
[ObjId, ObjInsId] -> [ObjId, ObjInsId];
|
||||
[ObjId] -> [ObjId]
|
||||
end.
|
|
@ -0,0 +1,153 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lwm2m_cm).
|
||||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([ register_channel/5
|
||||
, update_reg_info/2
|
||||
, unregister_channel/1
|
||||
]).
|
||||
|
||||
-export([ lookup_channel/1
|
||||
, all_channels/0
|
||||
]).
|
||||
|
||||
-export([ register_cmd/3
|
||||
, register_cmd/4
|
||||
, lookup_cmd/3
|
||||
, lookup_cmd_by_imei/1
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
, code_change/3
|
||||
]).
|
||||
|
||||
-define(LOG(Level, Format, Args), logger:Level("LWM2M-CM: " ++ Format, Args)).
|
||||
|
||||
%% Server name
|
||||
-define(CM, ?MODULE).
|
||||
|
||||
-define(LWM2M_CHANNEL_TAB, emqx_lwm2m_channel).
|
||||
-define(LWM2M_CMD_TAB, emqx_lwm2m_cmd).
|
||||
|
||||
%% Batch drain
|
||||
-define(BATCH_SIZE, 100000).
|
||||
|
||||
%% @doc Start the channel manager.
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?CM}, ?MODULE, [], []).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
register_channel(IMEI, RegInfo, LifeTime, Ver, Peername) ->
|
||||
Info = #{
|
||||
reg_info => RegInfo,
|
||||
lifetime => LifeTime,
|
||||
version => Ver,
|
||||
peername => Peername
|
||||
},
|
||||
true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, Info}),
|
||||
cast({registered, {IMEI, self()}}).
|
||||
|
||||
update_reg_info(IMEI, RegInfo) ->
|
||||
case lookup_channel(IMEI) of
|
||||
[{_, RegInfo0}] ->
|
||||
true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, RegInfo0#{reg_info => RegInfo}}),
|
||||
ok;
|
||||
[] ->
|
||||
ok
|
||||
end.
|
||||
|
||||
unregister_channel(IMEI) when is_binary(IMEI) ->
|
||||
true = ets:delete(?LWM2M_CHANNEL_TAB, IMEI),
|
||||
ok.
|
||||
|
||||
lookup_channel(IMEI) ->
|
||||
ets:lookup(?LWM2M_CHANNEL_TAB, IMEI).
|
||||
|
||||
all_channels() ->
|
||||
ets:tab2list(?LWM2M_CHANNEL_TAB).
|
||||
|
||||
register_cmd(IMEI, Path, Type) ->
|
||||
true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, undefined}).
|
||||
|
||||
register_cmd(_IMEI, undefined, _Type, _Result) ->
|
||||
ok;
|
||||
register_cmd(IMEI, Path, Type, Result) ->
|
||||
true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, Result}).
|
||||
|
||||
lookup_cmd(IMEI, Path, Type) ->
|
||||
ets:lookup(?LWM2M_CMD_TAB, {IMEI, Path, Type}).
|
||||
|
||||
lookup_cmd_by_imei(IMEI) ->
|
||||
ets:select(?LWM2M_CHANNEL_TAB, [{{{IMEI, '_', '_'}, '$1'}, [], ['$_']}]).
|
||||
|
||||
%% @private
|
||||
cast(Msg) -> gen_server:cast(?CM, Msg).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
TabOpts = [public, {write_concurrency, true}, {read_concurrency, true}],
|
||||
ok = emqx_tables:new(?LWM2M_CHANNEL_TAB, [set, compressed | TabOpts]),
|
||||
ok = emqx_tables:new(?LWM2M_CMD_TAB, [set, compressed | TabOpts]),
|
||||
{ok, #{chan_pmon => emqx_pmon:new()}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({registered, {IMEI, ChanPid}}, State = #{chan_pmon := PMon}) ->
|
||||
PMon1 = emqx_pmon:monitor(ChanPid, IMEI, PMon),
|
||||
{noreply, State#{chan_pmon := PMon1}};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
||||
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
|
||||
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
||||
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
|
||||
{noreply, State#{chan_pmon := PMon1}};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
emqx_stats:cancel_update(chan_stats).
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
clean_down({_ChanPid, IMEI}) ->
|
||||
unregister_channel(IMEI).
|
|
@ -0,0 +1,41 @@
|
|||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lwm2m_cm_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([init/1]).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
CM = #{id => emqx_lwm2m_cm,
|
||||
start => {emqx_lwm2m_cm, start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 5000,
|
||||
type => worker,
|
||||
modules => [emqx_lwm2m_cm]},
|
||||
SupFlags = #{strategy => one_for_one,
|
||||
intensity => 100,
|
||||
period => 10
|
||||
},
|
||||
{ok, {SupFlags, [CM]}}.
|
||||
|
|
@ -23,6 +23,7 @@
|
|||
-export([ mqtt2coap/2
|
||||
, coap2mqtt/4
|
||||
, ack2mqtt/1
|
||||
, extract_path/1
|
||||
]).
|
||||
|
||||
-export([path_list/1]).
|
||||
|
|
|
@ -48,11 +48,11 @@
|
|||
-define(LOG(Level, Format, Args), logger:Level("LWM2M-RESOURCE: " ++ Format, Args)).
|
||||
|
||||
-dialyzer([{nowarn_function, [coap_discover/2]}]).
|
||||
% we use {'absolute', string(), [{atom(), binary()}]} as coap_uri()
|
||||
% we use {'absolute', list(binary()), [{atom(), binary()}]} as coap_uri()
|
||||
% https://github.com/emqx/lwm2m-coap/blob/258e9bd3762124395e83c1e68a1583b84718230f/src/lwm2m_coap_resource.erl#L61
|
||||
% resource operations
|
||||
coap_discover(_Prefix, _Args) ->
|
||||
[{absolute, "mqtt", []}].
|
||||
[{absolute, [<<"mqtt">>], []}].
|
||||
|
||||
coap_get(ChId, [?PREFIX], Query, Content, Lwm2mState) ->
|
||||
?LOG(debug, "~p ~p GET Query=~p, Content=~p", [self(),ChId, Query, Content]),
|
||||
|
|
|
@ -197,7 +197,10 @@ value_ex(K, Value) when K =:= <<"Integer">>; K =:= <<"Float">>; K =:= <<"Time">>
|
|||
value_ex(K, Value) when K =:= <<"String">> ->
|
||||
Value;
|
||||
value_ex(K, Value) when K =:= <<"Opaque">> ->
|
||||
Value;
|
||||
%% XXX: force to decode it with base64
|
||||
%% This may not be a good implementation, but it is
|
||||
%% consistent with the treatment of Opaque in value/3
|
||||
base64:decode(Value);
|
||||
value_ex(K, <<"true">>) when K =:= <<"Boolean">> -> <<1>>;
|
||||
value_ex(K, <<"false">>) when K =:= <<"Boolean">> -> <<0>>;
|
||||
|
||||
|
|
|
@ -103,6 +103,7 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
|
|||
emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
|
||||
end),
|
||||
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
|
||||
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
|
||||
|
||||
{ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
|
||||
{error, Error} ->
|
||||
|
@ -120,10 +121,8 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
|
|||
_ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
|
||||
Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
|
||||
|
||||
update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
|
||||
life_timer = LifeTimer, register_info = RegInfo,
|
||||
coap_pid = CoapPid}) ->
|
||||
|
||||
update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
|
||||
coap_pid = CoapPid, endpoint_name = Epn}) ->
|
||||
UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
|
||||
|
||||
_ = case proplists:get_value(update_msg_publish_condition,
|
||||
|
@ -134,6 +133,7 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
|
|||
%% - report the registration info update, but only when objectList is updated.
|
||||
case NewRegInfo of
|
||||
#{<<"objectList">> := _} ->
|
||||
emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
|
||||
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
|
||||
_ -> ok
|
||||
end
|
||||
|
@ -151,7 +151,8 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
|
|||
register_info = UpdatedRegInfo}.
|
||||
|
||||
replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
|
||||
coap_pid = CoapPid}) ->
|
||||
coap_pid = CoapPid,
|
||||
endpoint_name = EndpointName}) ->
|
||||
_ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
|
||||
|
||||
%% - flush cached donwlink commands
|
||||
|
@ -161,7 +162,7 @@ replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
|
|||
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
|
||||
maps:get(<<"lt">>, NewRegInfo), LifeTimer),
|
||||
|
||||
_ = send_auto_observe(CoapPid, NewRegInfo),
|
||||
_ = send_auto_observe(CoapPid, NewRegInfo, EndpointName),
|
||||
|
||||
?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
|
||||
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
|
||||
|
@ -174,15 +175,20 @@ send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) ->
|
|||
Lwm2mState.
|
||||
|
||||
auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
|
||||
coap_pid = CoapPid}) ->
|
||||
_ = send_auto_observe(CoapPid, RegInfo),
|
||||
coap_pid = CoapPid,
|
||||
endpoint_name = EndpointName}) ->
|
||||
_ = send_auto_observe(CoapPid, RegInfo, EndpointName),
|
||||
Lwm2mState.
|
||||
|
||||
deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) ->
|
||||
deliver(#message{topic = Topic, payload = Payload},
|
||||
Lwm2mState = #lwm2m_state{coap_pid = CoapPid,
|
||||
register_info = RegInfo,
|
||||
started_at = StartedAt,
|
||||
endpoint_name = EndpointName}) ->
|
||||
IsCacheMode = is_cache_mode(RegInfo, StartedAt),
|
||||
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
|
||||
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
||||
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode),
|
||||
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
|
||||
Lwm2mState.
|
||||
|
||||
get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _},
|
||||
|
@ -238,20 +244,21 @@ time_now() -> erlang:system_time(millisecond).
|
|||
%% Deliver downlink message to coap
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode) when is_binary(JsonData)->
|
||||
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
|
||||
try
|
||||
TermData = emqx_json:decode(JsonData, [return_maps]),
|
||||
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode)
|
||||
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
|
||||
catch
|
||||
C:R:Stack ->
|
||||
?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p",
|
||||
[JsonData, {C, R}, Stack])
|
||||
end;
|
||||
|
||||
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermData) ->
|
||||
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when is_map(TermData) ->
|
||||
?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]),
|
||||
{CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData),
|
||||
|
||||
MsgType = maps:get(<<"msgType">>, Ref),
|
||||
emqx_lwm2m_cm:register_cmd(EndpointName, emqx_lwm2m_cmd_handler:extract_path(Ref), MsgType),
|
||||
case CacheMode of
|
||||
false ->
|
||||
do_deliver_to_coap(CoapPid, CoapRequest, Ref);
|
||||
|
@ -266,7 +273,12 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermDat
|
|||
send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
|
||||
do_send_to_broker(EventType, Payload, Lwm2mState).
|
||||
|
||||
do_send_to_broker(EventType, Payload, Lwm2mState) ->
|
||||
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
|
||||
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
|
||||
Code = maps:get(<<"code">>, Data, undefined),
|
||||
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
|
||||
Content = maps:get(<<"content">>, Data, undefined),
|
||||
emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
|
||||
NewPayload = maps:put(<<"msgType">>, EventType, Payload),
|
||||
Topic = uplink_topic(EventType, Lwm2mState),
|
||||
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
|
||||
|
@ -281,7 +293,7 @@ auto_observe_object_list(Expected, Registered) ->
|
|||
Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected),
|
||||
lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered).
|
||||
|
||||
send_auto_observe(CoapPid, RegInfo) ->
|
||||
send_auto_observe(CoapPid, RegInfo, EndpointName) ->
|
||||
%% - auto observe the objects
|
||||
case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of
|
||||
false ->
|
||||
|
@ -292,25 +304,37 @@ send_auto_observe(CoapPid, RegInfo) ->
|
|||
maps:get(<<"objectList">>, RegInfo, [])
|
||||
),
|
||||
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
||||
auto_observe(AlternatePath, Objectlists, CoapPid)
|
||||
auto_observe(AlternatePath, Objectlists, CoapPid, EndpointName)
|
||||
end.
|
||||
|
||||
auto_observe(AlternatePath, ObjectList, CoapPid) ->
|
||||
auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
|
||||
?LOG(info, "Auto Observe on: ~p", [ObjectList]),
|
||||
erlang:spawn(fun() ->
|
||||
observe_object_list(AlternatePath, ObjectList, CoapPid)
|
||||
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName)
|
||||
end).
|
||||
|
||||
observe_object_list(AlternatePath, ObjectList, CoapPid) ->
|
||||
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
|
||||
lists:foreach(fun(ObjectPath) ->
|
||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100)
|
||||
[ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
|
||||
case ObjId of
|
||||
<<"19">> ->
|
||||
[ObjInsId | _LastPath1] = LastPath,
|
||||
case ObjInsId of
|
||||
<<"0">> ->
|
||||
observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
|
||||
_ ->
|
||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
|
||||
end;
|
||||
_ ->
|
||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
|
||||
end
|
||||
end, ObjectList).
|
||||
|
||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval) ->
|
||||
observe_object(AlternatePath, ObjectPath, CoapPid),
|
||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval, EndpointName) ->
|
||||
observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName),
|
||||
timer:sleep(Interval).
|
||||
|
||||
observe_object(AlternatePath, ObjectPath, CoapPid) ->
|
||||
observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName) ->
|
||||
Payload = #{
|
||||
<<"msgType">> => <<"observe">>,
|
||||
<<"data">> => #{
|
||||
|
@ -318,7 +342,7 @@ observe_object(AlternatePath, ObjectPath, CoapPid) ->
|
|||
}
|
||||
},
|
||||
?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]),
|
||||
deliver_to_coap(AlternatePath, Payload, CoapPid, false).
|
||||
deliver_to_coap(AlternatePath, Payload, CoapPid, false, EndpointName).
|
||||
|
||||
do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
|
||||
erlang:spawn(fun() ->
|
||||
|
|
|
@ -29,4 +29,11 @@ start_link() ->
|
|||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init(_Args) ->
|
||||
{ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db)] }}.
|
||||
CmSup = #{id => emqx_lwm2m_cm_sup,
|
||||
start => {emqx_lwm2m_cm_sup, start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => infinity,
|
||||
type => supervisor,
|
||||
modules => [emqx_lwm2m_cm_sup]
|
||||
},
|
||||
{ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db), CmSup] }}.
|
||||
|
|
|
@ -21,9 +21,11 @@
|
|||
|
||||
-export([ get_obj_def/2
|
||||
, get_object_id/1
|
||||
, get_object_name/1
|
||||
, get_object_and_resource_id/2
|
||||
, get_resource_type/2
|
||||
, get_resource_name/2
|
||||
, get_resource_operations/2
|
||||
]).
|
||||
|
||||
-define(LOG(Level, Format, Args),
|
||||
|
@ -42,6 +44,10 @@ get_object_id(ObjDefinition) ->
|
|||
[#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
|
||||
ObjectId.
|
||||
|
||||
get_object_name(ObjDefinition) ->
|
||||
[#xmlText{value=ObjectName}] = xmerl_xpath:string("Name/text()", ObjDefinition),
|
||||
ObjectName.
|
||||
|
||||
|
||||
get_object_and_resource_id(ResourceNameBinary, ObjDefinition) ->
|
||||
ResourceNameString = binary_to_list(ResourceNameBinary),
|
||||
|
@ -60,3 +66,8 @@ get_resource_name(ResourceIdInt, ObjDefinition) ->
|
|||
ResourceIdString = integer_to_list(ResourceIdInt),
|
||||
[#xmlText{value=Name}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Name/text()", ObjDefinition),
|
||||
Name.
|
||||
|
||||
get_resource_operations(ResourceIdInt, ObjDefinition) ->
|
||||
ResourceIdString = integer_to_list(ResourceIdInt),
|
||||
[#xmlText{value=Operations}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Operations/text()", ObjDefinition),
|
||||
Operations.
|
||||
|
|
|
@ -58,7 +58,7 @@ find_objectid(ObjectId) ->
|
|||
false -> ObjectId
|
||||
end,
|
||||
case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of
|
||||
[] -> error(no_xml_definition);
|
||||
[] -> {error, no_xml_definition};
|
||||
[{ObjectId, Xml}] -> Xml
|
||||
end.
|
||||
|
||||
|
@ -121,8 +121,10 @@ load(BaseDir) ->
|
|||
true -> BaseDir++"*.xml";
|
||||
false -> BaseDir++"/*.xml"
|
||||
end,
|
||||
AllXmlFiles = filelib:wildcard(Wild),
|
||||
load_loop(AllXmlFiles).
|
||||
case filelib:wildcard(Wild) of
|
||||
[] -> error(no_xml_files_found, BaseDir);
|
||||
AllXmlFiles -> load_loop(AllXmlFiles)
|
||||
end.
|
||||
|
||||
load_loop([]) ->
|
||||
ok;
|
||||
|
|
|
@ -40,6 +40,7 @@ all() ->
|
|||
, {group, test_grp_4_discover}
|
||||
, {group, test_grp_5_write_attr}
|
||||
, {group, test_grp_6_observe}
|
||||
, {group, test_grp_8_object_19}
|
||||
].
|
||||
|
||||
suite() -> [{timetrap, {seconds, 90}}].
|
||||
|
@ -98,9 +99,9 @@ groups() ->
|
|||
]},
|
||||
{test_grp_8_object_19, [RepeatOpt], [
|
||||
case80_specail_object_19_1_0_write,
|
||||
case80_specail_object_19_0_0_notify,
|
||||
case80_specail_object_19_0_0_response,
|
||||
case80_normal_object_19_0_0_read
|
||||
case80_specail_object_19_0_0_notify
|
||||
%case80_specail_object_19_0_0_response,
|
||||
%case80_normal_object_19_0_0_read
|
||||
]},
|
||||
{test_grp_9_psm_queue_mode, [RepeatOpt], [
|
||||
case90_psm_mode,
|
||||
|
@ -1655,6 +1656,7 @@ case80_specail_object_19_1_0_write(Config) ->
|
|||
<<"value">> => base64:encode(<<12345:32>>)
|
||||
}
|
||||
},
|
||||
|
||||
CommandJson = emqx_json:encode(Command),
|
||||
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
|
||||
timer:sleep(50),
|
||||
|
@ -1663,7 +1665,7 @@ case80_specail_object_19_1_0_write(Config) ->
|
|||
Path2 = get_coap_path(Options2),
|
||||
?assertEqual(put, Method2),
|
||||
?assertEqual(<<"/19/1/0">>, Path2),
|
||||
?assertEqual(<<12345:32>>, Payload2),
|
||||
?assertEqual(<<3:2, 0:1, 0:2, 4:3, 0, 12345:32>>, Payload2),
|
||||
timer:sleep(50),
|
||||
|
||||
test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, changed}, #coap_content{}, Request2, true),
|
||||
|
@ -1672,6 +1674,7 @@ case80_specail_object_19_1_0_write(Config) ->
|
|||
ReadResult = emqx_json:encode(#{
|
||||
<<"requestID">> => CmdId, <<"cacheID">> => CmdId,
|
||||
<<"data">> => #{
|
||||
<<"reqPath">> => <<"/19/1/0">>,
|
||||
<<"code">> => <<"2.04">>,
|
||||
<<"codeMsg">> => <<"changed">>
|
||||
},
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_rule_engine,
|
||||
[{description, "EMQ X Rule Engine"},
|
||||
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.3"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
|
||||
{applications, [kernel,stdlib,rulesql,getopt]},
|
||||
|
|
|
@ -1,21 +1,37 @@
|
|||
%% -*-: erlang -*-
|
||||
{"4.3.2",
|
||||
{"4.3.3",
|
||||
[ {"4.3.0",
|
||||
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
||||
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||
]},
|
||||
{"4.3.1",
|
||||
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||
]},
|
||||
{"4.3.2",
|
||||
[ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
],
|
||||
[
|
||||
{"4.3.0",
|
||||
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
||||
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||
]},
|
||||
{"4.3.1",
|
||||
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||
]},
|
||||
{"4.3.2",
|
||||
[ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
]
|
||||
|
|
|
@ -93,13 +93,6 @@
|
|||
|
||||
-define(REGISTRY, ?MODULE).
|
||||
|
||||
%% Statistics
|
||||
-define(STATS,
|
||||
[ {?RULE_TAB, 'rules.count', 'rules.max'}
|
||||
, {?ACTION_TAB, 'actions.count', 'actions.max'}
|
||||
, {?RES_TAB, 'resources.count', 'resources.max'}
|
||||
]).
|
||||
|
||||
-define(T_CALL, 10000).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -392,8 +385,11 @@ find_rules_depends_on_resource(ResId) ->
|
|||
end, [], get_rules()).
|
||||
|
||||
search_action_despends_on_resource(ResId, Actions) ->
|
||||
lists:search(fun(#action_instance{args = #{<<"$resource">> := ResId0}}) ->
|
||||
ResId0 =:= ResId
|
||||
lists:search(fun
|
||||
(#action_instance{args = #{<<"$resource">> := ResId0}}) ->
|
||||
ResId0 =:= ResId;
|
||||
(_) ->
|
||||
false
|
||||
end, Actions).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -439,8 +435,6 @@ delete_resource_type(Type) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
%% Enable stats timer
|
||||
ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0),
|
||||
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
|
||||
{read_concurrency, true}]),
|
||||
{ok, #{}}.
|
||||
|
@ -466,7 +460,7 @@ handle_info(Info, State) ->
|
|||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
emqx_stats:cancel_update(rule_registery_stats).
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
@ -475,13 +469,6 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Private functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
update_stats() ->
|
||||
lists:foreach(
|
||||
fun({Tab, Stat, MaxStat}) ->
|
||||
Size = mnesia:table_info(Tab, size),
|
||||
emqx_stats:setstat(Stat, MaxStat, Size)
|
||||
end, ?STATS).
|
||||
|
||||
get_all_records(Tab) ->
|
||||
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
|
||||
ets:tab2list(Tab).
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_sn,
|
||||
[{description, "EMQ X MQTT-SN Plugin"},
|
||||
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.3"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel,stdlib,esockd]},
|
||||
|
|
|
@ -1,11 +1,17 @@
|
|||
%% -*-: erlang -*-
|
||||
{VSN,
|
||||
[
|
||||
{"4.3.2", [
|
||||
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<"4.3.[0-1]">>, [
|
||||
{restart_application, emqx_sn}
|
||||
]}
|
||||
],
|
||||
[
|
||||
{"4.3.2", [
|
||||
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<"4.3.[0-1]">>, [
|
||||
{restart_application, emqx_sn}
|
||||
]}
|
||||
|
|
|
@ -250,8 +250,9 @@ wait_for_will_topic(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, _State)
|
|||
% ignore
|
||||
keep_state_and_data;
|
||||
|
||||
wait_for_will_topic(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
|
||||
do_2nd_connect(Flags, Duration, ClientId, State);
|
||||
wait_for_will_topic(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
|
||||
?LOG(warning, "Receive connect packet in wait_for_will_topic state", []),
|
||||
keep_state_and_data;
|
||||
|
||||
wait_for_will_topic(cast, {outgoing, Packet}, State) ->
|
||||
{keep_state, handle_outgoing(Packet, State)};
|
||||
|
@ -275,9 +276,9 @@ wait_for_will_msg(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, _State) -
|
|||
% ignore
|
||||
keep_state_and_data;
|
||||
|
||||
%% XXX: ?? Why we will handling the 2nd CONNECT packet ??
|
||||
wait_for_will_msg(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
|
||||
do_2nd_connect(Flags, Duration, ClientId, State);
|
||||
wait_for_will_msg(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
|
||||
?LOG(warning, "Receive connect packet in wait_for_will_msg state", []),
|
||||
keep_state_and_data;
|
||||
|
||||
wait_for_will_msg(cast, {outgoing, Packet}, State) ->
|
||||
{keep_state, handle_outgoing(Packet, State)};
|
||||
|
@ -365,8 +366,9 @@ connected(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, State) ->
|
|||
% ignore
|
||||
{keep_state, State};
|
||||
|
||||
connected(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
|
||||
do_2nd_connect(Flags, Duration, ClientId, State);
|
||||
connected(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
|
||||
?LOG(warning, "Receive connect packet in wait_for_will_topic state", []),
|
||||
keep_state_and_data;
|
||||
|
||||
connected(cast, {outgoing, Packet}, State) ->
|
||||
{keep_state, handle_outgoing(Packet, State)};
|
||||
|
@ -826,8 +828,10 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
|
|||
clean_start = CleanStart,
|
||||
username = State#state.username,
|
||||
password = State#state.password,
|
||||
proto_name = <<"MQTT-SN">>,
|
||||
keepalive = Duration,
|
||||
properties = OnlyOneInflight
|
||||
properties = OnlyOneInflight,
|
||||
proto_ver = 1
|
||||
},
|
||||
case WillFlag of
|
||||
true -> State0 = send_message(?SN_WILLTOPICREQ_MSG(), State),
|
||||
|
@ -843,26 +847,6 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
|
|||
handle_incoming(?CONNECT_PACKET(ConnPkt), NState)
|
||||
end.
|
||||
|
||||
do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
|
||||
peername = Peername,
|
||||
channel = Channel}) ->
|
||||
emqx_logger:set_metadata_clientid(ClientId),
|
||||
#mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags,
|
||||
NChannel = case CleanStart of
|
||||
true ->
|
||||
emqx_channel:terminate(normal, Channel),
|
||||
emqx_sn_registry:unregister_topic(ClientId),
|
||||
emqx_channel:init(#{socktype => udp,
|
||||
sockname => Sockname,
|
||||
peername => Peername,
|
||||
peercert => ?NO_PEERCERT,
|
||||
conn_mod => ?MODULE
|
||||
}, ?DEFAULT_CHAN_OPTIONS);
|
||||
false -> Channel
|
||||
end,
|
||||
NState = State#state{channel = NChannel},
|
||||
do_connect(ClientId, CleanStart, Will, Duration, NState).
|
||||
|
||||
handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
|
||||
State=#state{channel = Channel}) ->
|
||||
ClientId = emqx_channel:info(clientid, Channel),
|
||||
|
|
|
@ -98,19 +98,6 @@ t_connect(_) ->
|
|||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||
gen_udp:close(Socket).
|
||||
|
||||
t_do_2nd_connect(_) ->
|
||||
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||
ClientId = ?CLIENTID,
|
||||
send_connect_msg(Socket, ClientId),
|
||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
timer:sleep(100),
|
||||
send_connect_msg(Socket, <<"client_id_other">>),
|
||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
send_disconnect_msg(Socket, undefined),
|
||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||
gen_udp:close(Socket).
|
||||
|
||||
t_subscribe(_) ->
|
||||
Dup = 0,
|
||||
QoS = 0,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_web_hook,
|
||||
[{description, "EMQ X WebHook Plugin"},
|
||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_web_hook_sup]},
|
||||
{applications, [kernel,stdlib,ehttpc]},
|
||||
|
|
|
@ -2,14 +2,16 @@
|
|||
|
||||
{VSN,
|
||||
[
|
||||
{"4.3.0", [
|
||||
{load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
|
||||
{<<"4.3.[0-1]">>, [
|
||||
{restart_application, emqx_web_hook},
|
||||
{apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
],
|
||||
[
|
||||
{"4.3.0", [
|
||||
{load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
|
||||
{<<"4.3.[0-1]">>, [
|
||||
{restart_application, emqx_web_hook},
|
||||
{apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
]
|
||||
|
|
|
@ -292,7 +292,7 @@ parse_action_params(Params = #{<<"url">> := URL}) ->
|
|||
Headers = headers(maps:get(<<"headers">>, Params, undefined)),
|
||||
NHeaders = ensure_content_type_header(Headers, Method),
|
||||
#{method => Method,
|
||||
path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))),
|
||||
path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)),
|
||||
headers => NHeaders,
|
||||
body => maps:get(<<"body">>, Params, <<>>),
|
||||
request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
|
||||
|
@ -306,8 +306,16 @@ ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =
|
|||
ensure_content_type_header(Headers, _Method) ->
|
||||
lists:keydelete("content-type", 1, Headers).
|
||||
|
||||
path(<<>>) -> <<"/">>;
|
||||
path(Path) -> Path.
|
||||
merge_path(CommonPath, <<>>) ->
|
||||
CommonPath;
|
||||
merge_path(CommonPath, Path0) ->
|
||||
case emqx_http_lib:uri_parse(Path0) of
|
||||
{ok, #{path := Path1, 'query' := Query}} ->
|
||||
Path2 = filename:join(CommonPath, Path1),
|
||||
<<Path2/binary, "?", Query/binary>>;
|
||||
{ok, #{path := Path1}} ->
|
||||
filename:join(CommonPath, Path1)
|
||||
end.
|
||||
|
||||
method(GET) when GET == <<"GET">>; GET == <<"get">> -> get;
|
||||
method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
|
||||
|
|
|
@ -42,10 +42,9 @@ stop(_State) ->
|
|||
translate_env() ->
|
||||
{ok, URL} = application:get_env(?APP, url),
|
||||
{ok, #{host := Host,
|
||||
path := Path0,
|
||||
port := Port,
|
||||
scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
|
||||
Path = path(Path0),
|
||||
scheme := Scheme} = URIMap} = emqx_http_lib:uri_parse(URL),
|
||||
Path = path(URIMap),
|
||||
PoolSize = application:get_env(?APP, pool_size, 32),
|
||||
MoreOpts = case Scheme of
|
||||
http ->
|
||||
|
@ -89,9 +88,13 @@ translate_env() ->
|
|||
NHeaders = set_content_type(Headers),
|
||||
application:set_env(?APP, headers, NHeaders).
|
||||
|
||||
path("") ->
|
||||
path(#{path := "", 'query' := Query}) ->
|
||||
"?" ++ Query;
|
||||
path(#{path := Path, 'query' := Query}) ->
|
||||
Path ++ "?" ++ Query;
|
||||
path(#{path := ""}) ->
|
||||
"/";
|
||||
path(Path) ->
|
||||
path(#{path := Path}) ->
|
||||
Path.
|
||||
|
||||
set_content_type(Headers) ->
|
||||
|
|
|
@ -26,7 +26,9 @@ COPY . /emqx
|
|||
ARG PKG_VSN
|
||||
ARG EMQX_NAME=emqx
|
||||
|
||||
RUN cd /emqx && make $EMQX_NAME
|
||||
RUN cd /emqx \
|
||||
&& rm -rf _build/$EMQX_NAME/lib \
|
||||
&& make $EMQX_NAME
|
||||
|
||||
FROM $RUN_FROM
|
||||
|
||||
|
|
|
@ -83,7 +83,7 @@ These environment variables will ignore for configuration file.
|
|||
|
||||
The list is incomplete and may changed with [etc/emqx.conf](https://github.com/emqx/emqx/blob/master/etc/emqx.conf) and plugin configuration files. But the mapping rule is similar.
|
||||
|
||||
If set ``EMQX_NAME`` and ``EMQX_HOST``, and unset ``EMQX_NODE__NAME``, ``EMQX_NODE__NAME=$EMQX_NAME@$EMQX_HOST``.
|
||||
If set ``EMQX_NAME`` and ``EMQX_HOST``, and unset ``EMQX_NODE_NAME``, ``EMQX_NODE_NAME=$EMQX_NAME@$EMQX_HOST``.
|
||||
|
||||
For example, set mqtt tcp port to 1883
|
||||
|
||||
|
|
|
@ -343,8 +343,8 @@ rpc.port_discovery = stateless
|
|||
## Number of outgoing RPC connections.
|
||||
##
|
||||
## Value: Interger [0-256]
|
||||
## Defaults to NumberOfCPUSchedulers / 2 when set to 0
|
||||
#rpc.tcp_client_num = 0
|
||||
## Default = 1
|
||||
#rpc.tcp_client_num = 1
|
||||
|
||||
## RCP Client connect timeout.
|
||||
##
|
||||
|
@ -1849,7 +1849,7 @@ listener.wss.external.acceptors = 4
|
|||
## Maximum number of concurrent MQTT/Webwocket/SSL connections.
|
||||
##
|
||||
## Value: Number
|
||||
listener.wss.external.max_connections = 16
|
||||
listener.wss.external.max_connections = 102400
|
||||
|
||||
## Maximum MQTT/WebSocket/SSL connections per second.
|
||||
##
|
||||
|
|
|
@ -30,11 +30,13 @@
|
|||
%% MQTT Protocol Version and Names
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(MQTT_SN_PROTO_V1, 1).
|
||||
-define(MQTT_PROTO_V3, 3).
|
||||
-define(MQTT_PROTO_V4, 4).
|
||||
-define(MQTT_PROTO_V5, 5).
|
||||
|
||||
-define(PROTOCOL_NAMES, [
|
||||
{?MQTT_SN_PROTO_V1, <<"MQTT-SN">>}, %% XXX:Compatible with emqx-sn plug-in
|
||||
{?MQTT_PROTO_V3, <<"MQIsdp">>},
|
||||
{?MQTT_PROTO_V4, <<"MQTT">>},
|
||||
{?MQTT_PROTO_V5, <<"MQTT">>}]).
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
|
||||
-ifndef(EMQX_ENTERPRISE).
|
||||
|
||||
-define(EMQX_RELEASE, {opensource, "4.3.3"}).
|
||||
-define(EMQX_RELEASE, {opensource, "4.3.5"}).
|
||||
|
||||
-else.
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_dashboard,
|
||||
[{description, "EMQ X Web Dashboard"},
|
||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_dashboard_sup]},
|
||||
{applications, [kernel,stdlib,mnesia,minirest]},
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[ {"4.3.0",
|
||||
%% load all plugins
|
||||
%% NOTE: this depends on the fact that emqx_dashboard is always
|
||||
%% the last application gets upgraded
|
||||
[ {apply, {emqx_plugins, load, []}}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
],
|
||||
[ {"4.3.0",
|
||||
[ {apply, {emqx_plugins, load, []}}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
]
|
||||
}.
|
|
@ -52,46 +52,50 @@ description() ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) ->
|
||||
Presence = connected_presence(ClientInfo, ConnInfo),
|
||||
case emqx_json:safe_encode(Presence) of
|
||||
Presence = common_infos(ClientInfo, ConnInfo),
|
||||
NPresence = Presence#{
|
||||
connack => 0, %% XXX: connack will be removed in 5.0
|
||||
keepalive => maps:get(keepalive, ConnInfo, 0),
|
||||
clean_start => maps:get(clean_start, ConnInfo, true),
|
||||
expiry_interval => maps:get(expiry_interval, ConnInfo, 0),
|
||||
connected_at => maps:get(connected_at, ConnInfo)
|
||||
},
|
||||
case emqx_json:safe_encode(NPresence) of
|
||||
{ok, Payload} ->
|
||||
emqx_broker:safe_publish(
|
||||
make_msg(qos(Env), topic(connected, ClientId), Payload));
|
||||
{error, _Reason} ->
|
||||
?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
|
||||
?LOG(error, "Failed to encode 'connected' presence: ~p", [NPresence])
|
||||
end.
|
||||
|
||||
on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username},
|
||||
Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
|
||||
Presence = #{clientid => ClientId,
|
||||
username => Username,
|
||||
on_client_disconnected(ClientInfo = #{clientid := ClientId},
|
||||
Reason, ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
|
||||
|
||||
Presence = common_infos(ClientInfo, ConnInfo),
|
||||
NPresence = Presence#{
|
||||
reason => reason(Reason),
|
||||
disconnected_at => DisconnectedAt,
|
||||
ts => erlang:system_time(millisecond)
|
||||
disconnected_at => DisconnectedAt
|
||||
},
|
||||
case emqx_json:safe_encode(Presence) of
|
||||
case emqx_json:safe_encode(NPresence) of
|
||||
{ok, Payload} ->
|
||||
emqx_broker:safe_publish(
|
||||
make_msg(qos(Env), topic(disconnected, ClientId), Payload));
|
||||
{error, _Reason} ->
|
||||
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
|
||||
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [NPresence])
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
connected_presence(#{peerhost := PeerHost,
|
||||
sockport := SockPort,
|
||||
clientid := ClientId,
|
||||
username := Username
|
||||
common_infos(
|
||||
_ClientInfo = #{clientid := ClientId,
|
||||
username := Username,
|
||||
peerhost := PeerHost,
|
||||
sockport := SockPort
|
||||
},
|
||||
#{clean_start := CleanStart,
|
||||
proto_name := ProtoName,
|
||||
proto_ver := ProtoVer,
|
||||
keepalive := Keepalive,
|
||||
connected_at := ConnectedAt,
|
||||
expiry_interval := ExpiryInterval
|
||||
_ConnInfo = #{proto_name := ProtoName,
|
||||
proto_ver := ProtoVer
|
||||
}) ->
|
||||
#{clientid => ClientId,
|
||||
username => Username,
|
||||
|
@ -99,11 +103,6 @@ connected_presence(#{peerhost := PeerHost,
|
|||
sockport => SockPort,
|
||||
proto_name => ProtoName,
|
||||
proto_ver => ProtoVer,
|
||||
keepalive => Keepalive,
|
||||
connack => 0, %% Deprecated?
|
||||
clean_start => CleanStart,
|
||||
expiry_interval => ExpiryInterval,
|
||||
connected_at => ConnectedAt,
|
||||
ts => erlang:system_time(millisecond)
|
||||
}.
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_modules,
|
||||
[{description, "EMQ X Module Management"},
|
||||
{vsn, "4.3.2"},
|
||||
{vsn, "4.3.3"},
|
||||
{modules, []},
|
||||
{applications, [kernel,stdlib]},
|
||||
{mod, {emqx_modules_app, []}},
|
||||
|
|
|
@ -1,21 +1,31 @@
|
|||
%% -*-: erlang -*-
|
||||
{VSN,
|
||||
[
|
||||
{"4.3.2", [
|
||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.1", [
|
||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.0", [
|
||||
{update, emqx_mod_delayed, {advanced, []}},
|
||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
],
|
||||
[
|
||||
{"4.3.2", [
|
||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.1", [
|
||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.0", [
|
||||
{update, emqx_mod_delayed, {advanced, []}},
|
||||
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
|
|
|
@ -376,7 +376,7 @@ end}.
|
|||
|
||||
{translation, "gen_rpc.tcp_client_num", fun(Conf) ->
|
||||
case cuttlefish:conf_get("rpc.tcp_client_num", Conf) of
|
||||
0 -> max(1, erlang:system_info(schedulers) div 2);
|
||||
0 -> 1; %% keep allowing 0 for backward compatibility
|
||||
V -> V
|
||||
end
|
||||
end}.
|
||||
|
@ -582,17 +582,6 @@ end}.
|
|||
hidden
|
||||
]}.
|
||||
|
||||
%% disable lager
|
||||
{mapping, "lager.handlers", "lager.handlers", [
|
||||
{default, []},
|
||||
hidden
|
||||
]}.
|
||||
{mapping, "lager.crash_log", "lager.crash_log", [
|
||||
{default, off},
|
||||
{datatype, flag},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{translation, "kernel.logger_level", fun(_, _, Conf) ->
|
||||
cuttlefish:conf_get("log.level", Conf)
|
||||
end}.
|
||||
|
|
|
@ -46,7 +46,7 @@
|
|||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1"}}}
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
||||
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.5"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.6"}}}
|
||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
|
||||
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
|
||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
||||
|
|
|
@ -128,23 +128,30 @@ prod_compile_opts() ->
|
|||
prod_overrides() ->
|
||||
[{add, [ {erl_opts, [deterministic]}]}].
|
||||
|
||||
relup_deps(Profile) ->
|
||||
{post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "scripts/inject-deps.escript " ++ atom_to_list(Profile)}]}.
|
||||
|
||||
profiles() ->
|
||||
Vsn = get_vsn(),
|
||||
[ {'emqx', [ {erl_opts, prod_compile_opts()}
|
||||
, {relx, relx(Vsn, cloud, bin)}
|
||||
, {overrides, prod_overrides()}
|
||||
, relup_deps('emqx')
|
||||
]}
|
||||
, {'emqx-pkg', [ {erl_opts, prod_compile_opts()}
|
||||
, {relx, relx(Vsn, cloud, pkg)}
|
||||
, {overrides, prod_overrides()}
|
||||
, relup_deps('emqx-pkg')
|
||||
]}
|
||||
, {'emqx-edge', [ {erl_opts, prod_compile_opts()}
|
||||
, {relx, relx(Vsn, edge, bin)}
|
||||
, {overrides, prod_overrides()}
|
||||
, relup_deps('emqx-edge')
|
||||
]}
|
||||
, {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()}
|
||||
, {relx, relx(Vsn, edge, pkg)}
|
||||
, {overrides, prod_overrides()}
|
||||
, relup_deps('emqx-edge-pkg')
|
||||
]}
|
||||
, {check, [ {erl_opts, common_compile_opts()}
|
||||
]}
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
#!/usr/bin/env escript
|
||||
|
||||
%% This script injects implicit relup dependencies for emqx applications.
|
||||
%%
|
||||
%% By 'implicit', it means that it is not feasible to define application
|
||||
%% dependencies in .app.src files.
|
||||
%%
|
||||
%% For instance, during upgrade/downgrade, emqx_dashboard usually requires
|
||||
%% a restart after (but not before) all plugins are upgraded (and maybe
|
||||
%% restarted), however, the dependencies are not resolvable at build time
|
||||
%% when relup is generated.
|
||||
%%
|
||||
%% This script is to be executed after compile, with the profile given as the
|
||||
%% first argument. For each dependency, it modifies the .app file to
|
||||
%% have the `relup_deps` list extended to application attributes.
|
||||
%%
|
||||
%% The `relup_deps` application attribute is then picked up by (EMQ's fork of)
|
||||
%% `relx` when top-sorting apps to generate relup instructions
|
||||
|
||||
-mode(compile).
|
||||
|
||||
usage() ->
|
||||
"Usage: " ++ escript:script_name() ++ " emqx|emqx-edge".
|
||||
|
||||
-type app() :: atom().
|
||||
-type deps_overlay() :: {re, string()} | app().
|
||||
|
||||
%% deps/0 returns the dependency overlays.
|
||||
%% {re, Pattern} to match application names using regexp pattern
|
||||
-spec deps(string()) -> [{app(), [deps_overlay()]}].
|
||||
deps("emqx-edge" ++ _) ->
|
||||
%% special case for edge
|
||||
base_deps() ++ [{{re, ".+"}, [{exclude, App} || App <- edge_excludes()]}];
|
||||
deps(_Profile) ->
|
||||
base_deps().
|
||||
|
||||
edge_excludes() ->
|
||||
[ emqx_lwm2m
|
||||
, emqx_auth_ldap
|
||||
, emqx_auth_pgsql
|
||||
, emqx_auth_redis
|
||||
, emqx_auth_mongo
|
||||
, emqx_lua_hook
|
||||
, emqx_exhook
|
||||
, emqx_exproto
|
||||
, emqx_prometheus
|
||||
, emqx_psk_file
|
||||
].
|
||||
|
||||
base_deps() ->
|
||||
%% make sure emqx_dashboard depends on all other emqx_xxx apps
|
||||
%% so the appup instructions for emqx_dashboard is always the last
|
||||
%% to be executed
|
||||
[ {emqx_dashboard, [{re, "emqx_.*"}]}
|
||||
, {emqx_management, [{re, "emqx_.*"}, {exclude, emqx_dashboard}]}
|
||||
, {{re, "emqx_.*"}, [emqx]}
|
||||
].
|
||||
|
||||
main([Profile | _]) ->
|
||||
ok = inject(Profile);
|
||||
main(_Args) ->
|
||||
io:format(standard_error, "~s", [usage()]),
|
||||
erlang:halt(1).
|
||||
|
||||
expand_names({Name, Deps}, AppNames) ->
|
||||
Names = match_pattern(Name, AppNames),
|
||||
[{N, Deps} || N <- Names].
|
||||
|
||||
%% merge k-v pairs with v1 ++ v2
|
||||
merge([], Acc) -> Acc;
|
||||
merge([{K, V0} | Rest], Acc) ->
|
||||
V = case lists:keyfind(K, 1, Acc) of
|
||||
{K, V1} -> V1 ++ V0;
|
||||
false -> V0
|
||||
end,
|
||||
NewAcc = lists:keystore(K, 1, Acc, {K, V}),
|
||||
merge(Rest, NewAcc).
|
||||
|
||||
expand_deps([], _AppNames, Acc) -> Acc;
|
||||
expand_deps([{exclude, Dep} | Deps], AppNames, Acc) ->
|
||||
Matches = expand_deps([Dep], AppNames, []),
|
||||
expand_deps(Deps, AppNames, Acc -- Matches);
|
||||
expand_deps([Dep | Deps], AppNames, Acc) ->
|
||||
NewAcc = add_to_list(Acc, match_pattern(Dep, AppNames)),
|
||||
expand_deps(Deps, AppNames, NewAcc).
|
||||
|
||||
inject(Profile) ->
|
||||
LibDir = lib_dir(Profile),
|
||||
AppNames = list_apps(LibDir),
|
||||
Deps0 = lists:flatmap(fun(Dep) -> expand_names(Dep, AppNames) end, deps(Profile)),
|
||||
Deps1 = merge(Deps0, []),
|
||||
Deps2 = lists:map(fun({Name, DepsX}) ->
|
||||
NewDeps = expand_deps(DepsX, AppNames, []),
|
||||
{Name, NewDeps}
|
||||
end, Deps1),
|
||||
lists:foreach(fun({App, Deps}) -> inject(App, Deps, LibDir) end, Deps2).
|
||||
|
||||
%% list the profile/lib dir to get all apps
|
||||
list_apps(LibDir) ->
|
||||
Apps = filelib:wildcard("*", LibDir),
|
||||
lists:foldl(fun(App, Acc) -> [App || is_app(LibDir, App)] ++ Acc end, [], Apps).
|
||||
|
||||
is_app(_LibDir, "." ++ _) -> false; %% ignore hidden dir
|
||||
is_app(LibDir, AppName) ->
|
||||
Path = filename:join([ebin_dir(LibDir, AppName), AppName ++ ".app"]),
|
||||
filelib:is_regular(Path) orelse error({unknown_app, AppName, Path}). %% wtf
|
||||
|
||||
lib_dir(Profile) ->
|
||||
filename:join(["_build", Profile, lib]).
|
||||
|
||||
ebin_dir(LibDir, AppName) -> filename:join([LibDir, AppName, "ebin"]).
|
||||
|
||||
inject(App0, DepsToAdd, LibDir) ->
|
||||
App = str(App0),
|
||||
AppEbinDir = ebin_dir(LibDir, App),
|
||||
[AppFile0] = filelib:wildcard("*.app", AppEbinDir),
|
||||
AppFile = filename:join(AppEbinDir, AppFile0),
|
||||
{ok, [{application, AppName, Props}]} = file:consult(AppFile),
|
||||
Deps0 = case lists:keyfind(relup_deps, 1, Props) of
|
||||
{_, X} -> X;
|
||||
false -> []
|
||||
end,
|
||||
%% merge extra deps, but do not self-include
|
||||
Deps = add_to_list(Deps0, DepsToAdd) -- [App0],
|
||||
case Deps =:= [] of
|
||||
true -> ok;
|
||||
_ ->
|
||||
NewProps = lists:keystore(relup_deps, 1, Props, {relup_deps, Deps}),
|
||||
AppSpec = {application, AppName, NewProps},
|
||||
AppSpecIoData = io_lib:format("~p.", [AppSpec]),
|
||||
io:format(user, "updated_relup_deps for ~p~n", [App]),
|
||||
file:write_file(AppFile, AppSpecIoData)
|
||||
end.
|
||||
|
||||
str(A) when is_atom(A) -> atom_to_list(A).
|
||||
|
||||
match_pattern({re, Re}, AppNames) ->
|
||||
Match = fun(AppName) -> re:run(AppName, Re) =/= nomatch end,
|
||||
AppNamesToAdd = lists:filter(Match, AppNames),
|
||||
AppsToAdd = lists:map(fun(N) -> list_to_atom(N) end, AppNamesToAdd),
|
||||
case AppsToAdd =:= [] of
|
||||
true -> error({nomatch, Re});
|
||||
false -> AppsToAdd
|
||||
end;
|
||||
match_pattern(NameAtom, AppNames) ->
|
||||
case lists:member(str(NameAtom), AppNames) of
|
||||
true -> [NameAtom];
|
||||
false -> error({notfound, NameAtom})
|
||||
end.
|
||||
|
||||
%% Append elements to list without duplication. No reordering.
|
||||
add_to_list(List, []) -> List;
|
||||
add_to_list(List, [H | T]) ->
|
||||
case lists:member(H, List) of
|
||||
true -> add_to_list(List, T);
|
||||
false -> add_to_list(List ++ [H], T)
|
||||
end.
|
|
@ -1,7 +1,7 @@
|
|||
{application, emqx,
|
||||
[{id, "emqx"},
|
||||
{description, "EMQ X"},
|
||||
{vsn, "4.3.3"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.6"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},
|
||||
|
|
|
@ -1,12 +1,35 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.3.2",
|
||||
[{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
[
|
||||
{"4.3.5", [
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{"4.3.4", [
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{"4.3.3", [
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{"4.3.2", [
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.1",
|
||||
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{"4.3.1", [
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
|
@ -17,8 +40,10 @@
|
|||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.0",
|
||||
[{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
|
||||
{"4.3.0", [
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
|
@ -34,13 +59,36 @@
|
|||
{apply,{emqx_metrics,upgrade_retained_delayed_counter_type,[]}},
|
||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.2",
|
||||
[{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
[
|
||||
{"4.3.5", [
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{"4.3.4", [
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{"4.3.3", [
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{"4.3.2", [
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.1",
|
||||
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{"4.3.1", [
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
|
@ -51,8 +99,10 @@
|
|||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.0",
|
||||
[{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
|
||||
{"4.3.0", [
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -294,6 +294,9 @@ do_discard_session(ClientId, Pid) ->
|
|||
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
||||
?tp(debug, "session_already_gone", #{pid => Pid}),
|
||||
ok;
|
||||
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
|
||||
?tp(debug, "session_already_gone", #{pid => Pid}),
|
||||
ok;
|
||||
_ : {{shutdown, _}, _} ->
|
||||
?tp(debug, "session_already_shutdown", #{pid => Pid}),
|
||||
ok;
|
||||
|
@ -373,7 +376,7 @@ lookup_channels(local, ClientId) ->
|
|||
|
||||
%% @private
|
||||
rpc_call(Node, Fun, Args) ->
|
||||
case rpc:call(Node, ?MODULE, Fun, Args) of
|
||||
case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of
|
||||
{badrpc, Reason} -> error(Reason);
|
||||
Res -> Res
|
||||
end.
|
||||
|
|
|
@ -336,9 +336,13 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P
|
|||
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
|
||||
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||
|
||||
handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
||||
#emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
||||
{noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
||||
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
|
||||
%% it `unsubscribed` the last topic.
|
||||
%% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually
|
||||
%% be disconnected.
|
||||
% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
||||
% #emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
||||
% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
||||
|
||||
handle_info({mnesia_table_event, _Event}, State) ->
|
||||
{noreply, State};
|
||||
|
@ -348,8 +352,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMo
|
|||
cleanup_down(SubPid),
|
||||
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
|
|
@ -403,7 +403,10 @@ websocket_close(Reason, State) ->
|
|||
|
||||
terminate(Reason, _Req, #state{channel = Channel}) ->
|
||||
?LOG(debug, "Terminated due to ~p", [Reason]),
|
||||
emqx_channel:terminate(Reason, Channel).
|
||||
emqx_channel:terminate(Reason, Channel);
|
||||
|
||||
terminate(_Reason, _Req, _UnExpectedState) ->
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle call
|
||||
|
|
Loading…
Reference in New Issue