Compare commits

...

43 Commits

Author SHA1 Message Date
JianBo He 30ed74bc80 chore(mgmt): update appup.src 2021-07-13 10:29:02 +08:00
JianBo He 73e0cbd644 fix(mgmt): fix dump aborted by print function crash 2021-07-13 10:27:11 +08:00
zhanghongtong 750cb2d491 chore(CI): update emqx-ci-helper tag 2021-07-09 19:37:45 +08:00
Zaiming Shi 780e403262 fix(conf): change wss.external.max_connections from 16 to 102400 2021-07-09 09:30:57 +08:00
JianBo He 05b16c601b chore(presence): more fields for disconnected event 2021-07-07 18:36:09 +08:00
JianBo He 8935d28ed4 fix(exhook): catch the badarg error 2021-07-01 17:56:10 +08:00
zhanghongtong 0c66fcef00 chore(release): update emqx release version 2021-06-28 11:14:33 +08:00
tigercl 637cd5e804
Merge pull request #5105 from terry-xiaoyu/clean_emqx_shared_subscription2
fix(shared_sub): discard all unexpected msgs
2021-06-28 11:07:10 +08:00
Shawn 5fbf83e7f0 fix(shared_sub): discard all unexpected msgs 2021-06-28 09:37:34 +08:00
Shawn 513cd001ac chore(appup): update the appup for 4.3.5 2021-06-25 20:38:43 +08:00
Shawn 868cd6e57c fix(shared_sub): failed to clean the emqx_shared_subscription tab
A trick that fixes the issue that we demonitored the shared subscriber
too early if it not unsubscribed all of the topics.
2021-06-25 20:38:43 +08:00
zhanghongtong a8aabd5f74 revert: chore(CI): update events that trigger workflows
This reverts commit 002cbb6d8b
2021-06-25 20:13:35 +08:00
zhanghongtong 002cbb6d8b chore(CI): update events that trigger workflows 2021-06-24 17:44:30 +08:00
zhanghongtong e87838f272 docs(docker): fix env name error 2021-06-24 14:46:34 +08:00
zhanghongtong f18b9a92bc chore(CI): delete needless link when build packages 2021-06-23 21:38:30 +08:00
Zaiming Shi 49a78c8ef2 fix(script): exclude non-edge apps in relup dependency 2021-06-23 21:30:59 +08:00
zhanghongtong 8ad42cb827 chore(CI): add DIAGNOSTIC=1 when build windows 2021-06-23 20:27:08 +08:00
Zaiming Shi f17962e79a chore: add more info in error message 2021-06-23 19:40:12 +08:00
Turtle 98c4fff43f chore: fix inject deps notfound emqx_reloader 2021-06-23 19:03:06 +08:00
zhanghongtong bfc6c3aa42 chore(release): update emqx release version 2021-06-23 17:52:42 +08:00
Turtle 1a438125c7 chore(review): review 4.3.4 2021-06-23 17:51:17 +08:00
Turtle 2092bedb12 feat(lwm2m): fix check dialyzer fail 2021-06-23 17:08:44 +08:00
Turtle a6bd1c90d5 fix: Ignore repeatedly receiving connection packet in the wait_will_msg/wait_will_topic/connected state 2021-06-23 14:37:26 +08:00
JianBo He 3ddbdbc6c1
fix(emqx_cm): catch noproc exception from rpc_call (#5048) 2021-06-23 09:45:24 +08:00
zhanghongtong 2c0916ff05 chore(CI): upload rebar3.crashdump file when slim build failure 2021-06-23 09:44:58 +08:00
JianBo He 77a41ea88f
Fix coap uri format (#5059) 2021-06-23 08:50:20 +08:00
Zaiming Shi b92940af29 test(ci): add plugin list status check after relup new vsn install 2021-06-22 08:54:55 +08:00
Zaiming Shi bed45417dc chore(relup): add relup dependency injection 2021-06-22 08:54:55 +08:00
JianBo He 8110ef7a64 chore: upgrade lwm2m_coap to 1.1.4 2021-06-21 12:21:25 +08:00
Turtle ecec9bd2f6 feat(lwm2m): add emqx_lwm2m http API 2021-06-21 12:19:35 +08:00
Shawn 6724e59e7a fix(appup): relup for emqx_rule_registry failed 2021-06-21 12:18:39 +08:00
Shawn 5962c9c83c
feat(rules): remove stats update from rule_engine_registry (#5029) 2021-06-19 17:02:43 +08:00
Shawn c0367fb8dd
Delete resource failed when searching dependent rules (#4996) 2021-06-17 16:45:44 +08:00
tigercl 0ecaa80fb8
fix(query string): support query string in path (#4981) 2021-06-17 16:12:08 +08:00
JianBo He bdd9154001 fix(modules): fix start/stop exhook module failure 2021-06-16 14:40:57 +08:00
JianBo He bbed1b55e0 fix(ws): avoid funcation_clause for un-inited websocket 2021-06-15 11:27:47 +08:00
k32 074c0bd2cc fix(auth_ldap): Handle missing attributes
Fixes: #4953
2021-06-11 18:35:21 +08:00
Zaiming (Stone) Shi 69ef5cbdc3
Merge pull request #4979 from zmstone/chore-config-rpc-connections-default-to-one
chore(conf): change default number of gen_rpc connections to 1
2021-06-11 09:58:50 +02:00
JianBo He 42a6f2aba5
fix(mqttsn): fix proto_name to MQTT-SN instead of MQTT (#4961) 2021-06-11 11:08:24 +08:00
Shawn 0184a1b3e8
fix(minirest): encode response message failed (#4965) 2021-06-11 09:56:11 +08:00
JianBo He 86766ee7f1 fix(lwm2m): base64 decode for opaque value 2021-06-11 09:48:57 +08:00
Turtle 8eebdd5cdb chore: remove lager schema info 2021-06-11 09:48:25 +08:00
Zaiming Shi 1f57968c9b chore(conf): change default number of gen_rpc connections to 1 2021-06-10 19:59:10 +02:00
61 changed files with 941 additions and 232 deletions

View File

@ -1,7 +1,7 @@
{erl_opts, [debug_info]}.
{deps,
[
{minirest, {git, "https://github.com/emqx/minirest.git", {tag, "0.3.5"}}}
{minirest, {git, "https://github.com/emqx/minirest.git", {tag, "0.3.6"}}}
]}.
{shell, [

View File

@ -43,7 +43,7 @@
!sed -i '/emqx_telemetry/d' data/loaded_plugins
!./bin/emqx start
?EMQ X (.*) is started successfully!
?EMQ X .* is started successfully!
?SH-PROMPT
!./bin/emqx_ctl cluster join emqx@127.0.0.1
@ -99,6 +99,10 @@
"""
?SH-PROMPT
!./bin/emqx_ctl plugins list | grep emqx_management
?Plugin\(emqx_management.*active=true\)
?SH-PROMPT
[shell emqx2]
!echo "" > log/emqx.log.1
?SH-PROMPT
@ -120,6 +124,10 @@
"""
?SH-PROMPT
!./bin/emqx_ctl plugins list | grep emqx_management
?Plugin\(emqx_management.*active=true\)
?SH-PROMPT
[shell bench]
???publish complete
??SH-PROMPT:

View File

@ -83,6 +83,7 @@ jobs:
- name: build
env:
PYTHON: python
DIAGNOSTIC: 1
run: |
$env:PATH = "${{ steps.install_erlang.outputs.erlpath }}\bin;$env:PATH"
@ -168,9 +169,11 @@ jobs:
- name: build
run: |
. $HOME/.kerl/${{ matrix.erl_otp }}/activate
make -C source ensure-rebar3
sudo cp source/rebar3 /usr/local/bin/rebar3
make -C source ${{ matrix.profile }}-zip
cd source
make ensure-rebar3
sudo cp rebar3 /usr/local/bin/rebar3
rm -rf _build/${{ matrix.profile }}/lib
make ${{ matrix.profile }}-zip
- name: test
run: |
cd source
@ -465,7 +468,7 @@ jobs:
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
-H "Accept: application/vnd.github.v3+json" \
-X POST \
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ee\": \"true\"}}" \
-d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ee\": \"true\"}}" \
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
- name: update repo.emqx.io
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
@ -474,7 +477,7 @@ jobs:
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
-H "Accept: application/vnd.github.v3+json" \
-X POST \
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \
-d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
- name: update homebrew packages
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
@ -484,7 +487,7 @@ jobs:
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
-H "Accept: application/vnd.github.v3+json" \
-X POST \
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \
-d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_homebrew.yaml/dispatches"
fi
- uses: geekyeggo/delete-artifact@v1

View File

@ -38,6 +38,11 @@ jobs:
run: make ${EMQX_NAME}-zip
- name: build deb/rpm packages
run: make ${EMQX_NAME}-pkg
- uses: actions/upload-artifact@v1
if: failure()
with:
name: rebar3.crashdump
path: ./rebar3.crashdump
- name: pakcages test
run: |
export CODE_PATH=$GITHUB_WORKSPACE
@ -94,6 +99,11 @@ jobs:
make ensure-rebar3
sudo cp rebar3 /usr/local/bin/rebar3
make ${EMQX_NAME}-zip
- uses: actions/upload-artifact@v1
if: failure()
with:
name: rebar3.crashdump
path: ./rebar3.crashdump
- name: test
run: |
pkg_name=$(basename _packages/${EMQX_NAME}/emqx-*.zip)

View File

@ -1,6 +1,6 @@
{application, emqx_auth_http,
[{description, "EMQ X Authentication/ACL with HTTP API"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.3.1"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_http_sup]},
{applications, [kernel,stdlib,ehttpc]},

View File

@ -0,0 +1,16 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.0", [
{restart_application, emqx_auth_http}
]},
{<<".*">>, []}
],
[
{"4.3.0", [
{restart_application, emqx_auth_http}
]},
{<<".*">>, []}
]
}.

View File

@ -54,10 +54,9 @@ translate_env(EnvName) ->
{ok, ConnectTimeout} = application:get_env(?APP, connect_timeout),
URL = proplists:get_value(url, Req),
{ok, #{host := Host,
path := Path0,
port := Port,
scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
Path = path(Path0),
scheme := Scheme} = URIMap} = emqx_http_lib:uri_parse(URL),
Path = path(URIMap),
MoreOpts = case Scheme of
http ->
[{transport_opts, emqx_misc:ipv6_probe([])}];
@ -151,8 +150,12 @@ ensure_content_type_header(Method, Headers)
ensure_content_type_header(_Method, Headers) ->
lists:keydelete("content-type", 1, Headers).
path("") ->
path(#{path := "", 'query' := Query}) ->
"?" ++ Query;
path(#{path := Path, 'query' := Query}) ->
Path ++ "?" ++ Query;
path(#{path := ""}) ->
"/";
path(Path) ->
path(#{path := Path}) ->
Path.

View File

@ -27,10 +27,6 @@
, description/0
]).
-import(proplists, [get_value/2]).
-import(emqx_auth_ldap_cli, [search/4]).
-spec(register_metrics() -> ok).
register_metrics() ->
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
@ -70,14 +66,14 @@ do_check_acl(#{username := Username}, PubSub, Topic, _NoMatchAction,
BaseDN = emqx_auth_ldap:replace_vars(CustomBaseDN, ReplaceRules),
case search(Pool, BaseDN, Filter, [Attribute, Attribute1]) of
case emqx_auth_ldap_cli:search(Pool, BaseDN, Filter, [Attribute, Attribute1]) of
{error, noSuchObject} ->
ok;
{ok, #eldap_search_result{entries = []}} ->
ok;
{ok, #eldap_search_result{entries = [Entry]}} ->
Topics = get_value(Attribute, Entry#eldap_entry.attributes)
++ get_value(Attribute1, Entry#eldap_entry.attributes),
Topics = proplists:get_value(Attribute, Entry#eldap_entry.attributes, [])
++ proplists:get_value(Attribute1, Entry#eldap_entry.attributes, []),
match(Topic, Topics);
Error ->
?LOG(error, "[LDAP] search error:~p", [Error]),
@ -95,4 +91,3 @@ match(Topic, [Filter | Topics]) ->
description() ->
"ACL with LDAP".

View File

@ -1,6 +1,6 @@
{application, emqx_auth_ldap,
[{description, "EMQ X Authentication/ACL with LDAP"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.3.1"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_ldap_sup]},
{applications, [kernel,stdlib,eldap2,ecpool]},

View File

@ -0,0 +1,8 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.0",
[{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.0",
[{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}.

View File

@ -1,6 +1,6 @@
{application, emqx_exhook,
[{description, "EMQ X Extension for Hook"},
{vsn, "4.3.1"},
{vsn, "4.3.3"},
{modules, []},
{registered, []},
{mod, {emqx_exhook_app, []}},

View File

@ -1,14 +1,32 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.2", [
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []}
]},
{"4.3.1", [
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
]},
{"4.3.0", [
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
[
{"4.3.2", [
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []}
]},
{"4.3.1", [
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
]},
{"4.3.0", [
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]

View File

@ -88,7 +88,7 @@ init_hooks_cnter() ->
try
_ = ets:new(?CNTER, [named_table, public]), ok
catch
exit:badarg:_ ->
error:badarg:_ ->
ok
end.

View File

@ -122,7 +122,7 @@ channel_opts(Opts) ->
Scheme = proplists:get_value(scheme, Opts),
Host = proplists:get_value(host, Opts),
Port = proplists:get_value(port, Opts),
SvrAddr = lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])),
SvrAddr = format_http_uri(Scheme, Host, Port),
ClientOpts = case Scheme of
https ->
SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
@ -133,6 +133,13 @@ channel_opts(Opts) ->
end,
{SvrAddr, ClientOpts}.
format_http_uri(Scheme, Host0, Port) ->
Host = case is_tuple(Host0) of
true -> inet:ntoa(Host0);
_ -> Host0
end,
lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])).
-spec unload(server()) -> ok.
unload(#server{name = Name, hookspec = HookSpecs}) ->
_ = do_deinit(Name),

View File

@ -1,5 +1,5 @@
{deps,
[{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.2"}}}
[{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.5"}}}
]}.
{profiles,

View File

@ -1,6 +1,6 @@
{application,emqx_lwm2m,
[{description,"EMQ X LwM2M Gateway"},
{vsn, "4.3.1"}, % strict semver, bump manually!
{vsn, "4.3.2"}, % strict semver, bump manually!
{modules,[]},
{registered,[emqx_lwm2m_sup]},
{applications,[kernel,stdlib,lwm2m_coap]},

View File

@ -1,15 +1,13 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.0", [
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
{<<"4.3.[0-1]">>, [
{restart_application, emqx_lwm2m}
]}
],
[
{"4.3.0", [
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
{<<"4.3.[0-1]">>, [
{restart_application, emqx_lwm2m}
]}
]
}.

View File

@ -0,0 +1,162 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_api).
-import(minirest, [return/1]).
-rest_api(#{name => list,
method => 'GET',
path => "/lwm2m_channels/",
func => list,
descr => "A list of all lwm2m channel"
}).
-rest_api(#{name => list,
method => 'GET',
path => "/nodes/:atom:node/lwm2m_channels/",
func => list,
descr => "A list of lwm2m channel of a node"
}).
-rest_api(#{name => lookup_cmd,
method => 'GET',
path => "/lookup_cmd/:bin:ep/",
func => lookup_cmd,
descr => "Send a lwm2m downlink command"
}).
-rest_api(#{name => lookup_cmd,
method => 'GET',
path => "/nodes/:atom:node/lookup_cmd/:bin:ep/",
func => lookup_cmd,
descr => "Send a lwm2m downlink command of a node"
}).
-export([ list/2
, lookup_cmd/2
]).
list(#{node := Node }, Params) ->
case Node = node() of
true -> list(#{}, Params);
_ -> rpc_call(Node, list, [#{}, Params])
end;
list(#{}, _Params) ->
Channels = emqx_lwm2m_cm:all_channels(),
return({ok, format(Channels)}).
lookup_cmd(#{ep := Ep, node := Node}, Params) ->
case Node = node() of
true -> lookup_cmd(#{ep => Ep}, Params);
_ -> rpc_call(Node, lookup_cmd, [#{ep => Ep}, Params])
end;
lookup_cmd(#{ep := Ep}, Params) ->
MsgType = proplists:get_value(<<"msgType">>, Params),
Path0 = proplists:get_value(<<"path">>, Params),
case emqx_lwm2m_cm:lookup_cmd(Ep, Path0, MsgType) of
[] -> return({ok, []});
[{_, undefined} | _] -> return({ok, []});
[{{IMEI, Path, MsgType}, undefined}] ->
return({ok, [{imei, IMEI},
{'msgType', IMEI},
{'code', <<"6.01">>},
{'codeMsg', <<"reply_not_received">>},
{'path', Path}]});
[{{IMEI, Path, MsgType}, {Code, CodeMsg, Content}}] ->
Payload1 = format_cmd_content(Content, MsgType),
return({ok, [{imei, IMEI},
{'msgType', IMEI},
{'code', Code},
{'codeMsg', CodeMsg},
{'path', Path}] ++ Payload1})
end.
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.
format(Channels) ->
lists:map(fun({IMEI, #{lifetime := LifeTime,
peername := Peername,
version := Version,
reg_info := RegInfo}}) ->
ObjectList = lists:map(fun(Path) ->
[ObjId | _] = path_list(Path),
case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
{error, _} ->
{Path, Path};
ObjDefinition ->
ObjectName = emqx_lwm2m_xml_object:get_object_name(ObjDefinition),
{Path, list_to_binary(ObjectName)}
end
end, maps:get(<<"objectList">>, RegInfo)),
{IpAddr, Port} = Peername,
[{imei, IMEI},
{lifetime, LifeTime},
{ip_address, iolist_to_binary(ntoa(IpAddr))},
{port, Port},
{version, Version},
{'objectList', ObjectList}]
end, Channels).
format_cmd_content(undefined, _MsgType) -> [];
format_cmd_content(Content, <<"discover">>) ->
[H | Content1] = Content,
{_, [HObjId]} = emqx_lwm2m_coap_resource:parse_object_list(H),
[ObjId | _]= path_list(HObjId),
ObjectList = case Content1 of
[Content2 | _] ->
{_, ObjL} = emqx_lwm2m_coap_resource:parse_object_list(Content2),
ObjL;
[] -> []
end,
R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
{error, _} ->
lists:map(fun(Object) -> {Object, Object} end, ObjectList);
ObjDefinition ->
lists:map(fun(Object) ->
[_, _, ResId| _] = path_list(Object),
Operations = case emqx_lwm2m_xml_object:get_resource_operations(binary_to_integer(ResId), ObjDefinition) of
"E" -> [{operations, list_to_binary("E")}];
Oper -> [{'dataType', list_to_binary(emqx_lwm2m_xml_object:get_resource_type(binary_to_integer(ResId), ObjDefinition))},
{operations, list_to_binary(Oper)}]
end,
[{path, Object},
{name, list_to_binary(emqx_lwm2m_xml_object:get_resource_name(binary_to_integer(ResId), ObjDefinition))}
] ++ Operations
end, ObjectList)
end,
[{content, R}];
format_cmd_content(Content, _) ->
[{content, Content}].
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
ntoa(IP) ->
inet_parse:ntoa(IP).
path_list(Path) ->
case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of
[ObjId, ObjInsId, ResId, ResInstId] -> [ObjId, ObjInsId, ResId, ResInstId];
[ObjId, ObjInsId, ResId] -> [ObjId, ObjInsId, ResId];
[ObjId, ObjInsId] -> [ObjId, ObjInsId];
[ObjId] -> [ObjId]
end.

View File

@ -0,0 +1,153 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_cm).
-export([start_link/0]).
-export([ register_channel/5
, update_reg_info/2
, unregister_channel/1
]).
-export([ lookup_channel/1
, all_channels/0
]).
-export([ register_cmd/3
, register_cmd/4
, lookup_cmd/3
, lookup_cmd_by_imei/1
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(LOG(Level, Format, Args), logger:Level("LWM2M-CM: " ++ Format, Args)).
%% Server name
-define(CM, ?MODULE).
-define(LWM2M_CHANNEL_TAB, emqx_lwm2m_channel).
-define(LWM2M_CMD_TAB, emqx_lwm2m_cmd).
%% Batch drain
-define(BATCH_SIZE, 100000).
%% @doc Start the channel manager.
start_link() ->
gen_server:start_link({local, ?CM}, ?MODULE, [], []).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
register_channel(IMEI, RegInfo, LifeTime, Ver, Peername) ->
Info = #{
reg_info => RegInfo,
lifetime => LifeTime,
version => Ver,
peername => Peername
},
true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, Info}),
cast({registered, {IMEI, self()}}).
update_reg_info(IMEI, RegInfo) ->
case lookup_channel(IMEI) of
[{_, RegInfo0}] ->
true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, RegInfo0#{reg_info => RegInfo}}),
ok;
[] ->
ok
end.
unregister_channel(IMEI) when is_binary(IMEI) ->
true = ets:delete(?LWM2M_CHANNEL_TAB, IMEI),
ok.
lookup_channel(IMEI) ->
ets:lookup(?LWM2M_CHANNEL_TAB, IMEI).
all_channels() ->
ets:tab2list(?LWM2M_CHANNEL_TAB).
register_cmd(IMEI, Path, Type) ->
true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, undefined}).
register_cmd(_IMEI, undefined, _Type, _Result) ->
ok;
register_cmd(IMEI, Path, Type, Result) ->
true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, Result}).
lookup_cmd(IMEI, Path, Type) ->
ets:lookup(?LWM2M_CMD_TAB, {IMEI, Path, Type}).
lookup_cmd_by_imei(IMEI) ->
ets:select(?LWM2M_CHANNEL_TAB, [{{{IMEI, '_', '_'}, '$1'}, [], ['$_']}]).
%% @private
cast(Msg) -> gen_server:cast(?CM, Msg).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
TabOpts = [public, {write_concurrency, true}, {read_concurrency, true}],
ok = emqx_tables:new(?LWM2M_CHANNEL_TAB, [set, compressed | TabOpts]),
ok = emqx_tables:new(?LWM2M_CMD_TAB, [set, compressed | TabOpts]),
{ok, #{chan_pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({registered, {IMEI, ChanPid}}, State = #{chan_pmon := PMon}) ->
PMon1 = emqx_pmon:monitor(ChanPid, IMEI, PMon),
{noreply, State#{chan_pmon := PMon1}};
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
{noreply, State#{chan_pmon := PMon1}};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
emqx_stats:cancel_update(chan_stats).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
clean_down({_ChanPid, IMEI}) ->
unregister_channel(IMEI).

View File

@ -0,0 +1,41 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_cm_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
CM = #{id => emqx_lwm2m_cm,
start => {emqx_lwm2m_cm, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_lwm2m_cm]},
SupFlags = #{strategy => one_for_one,
intensity => 100,
period => 10
},
{ok, {SupFlags, [CM]}}.

View File

@ -23,6 +23,7 @@
-export([ mqtt2coap/2
, coap2mqtt/4
, ack2mqtt/1
, extract_path/1
]).
-export([path_list/1]).

View File

@ -48,11 +48,11 @@
-define(LOG(Level, Format, Args), logger:Level("LWM2M-RESOURCE: " ++ Format, Args)).
-dialyzer([{nowarn_function, [coap_discover/2]}]).
% we use {'absolute', string(), [{atom(), binary()}]} as coap_uri()
% we use {'absolute', list(binary()), [{atom(), binary()}]} as coap_uri()
% https://github.com/emqx/lwm2m-coap/blob/258e9bd3762124395e83c1e68a1583b84718230f/src/lwm2m_coap_resource.erl#L61
% resource operations
coap_discover(_Prefix, _Args) ->
[{absolute, "mqtt", []}].
[{absolute, [<<"mqtt">>], []}].
coap_get(ChId, [?PREFIX], Query, Content, Lwm2mState) ->
?LOG(debug, "~p ~p GET Query=~p, Content=~p", [self(),ChId, Query, Content]),

View File

@ -197,7 +197,10 @@ value_ex(K, Value) when K =:= <<"Integer">>; K =:= <<"Float">>; K =:= <<"Time">>
value_ex(K, Value) when K =:= <<"String">> ->
Value;
value_ex(K, Value) when K =:= <<"Opaque">> ->
Value;
%% XXX: force to decode it with base64
%% This may not be a good implementation, but it is
%% consistent with the treatment of Opaque in value/3
base64:decode(Value);
value_ex(K, <<"true">>) when K =:= <<"Boolean">> -> <<1>>;
value_ex(K, <<"false">>) when K =:= <<"Boolean">> -> <<0>>;

View File

@ -103,6 +103,7 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
end),
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
{ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
{error, Error} ->
@ -120,10 +121,8 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
_ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
life_timer = LifeTimer, register_info = RegInfo,
coap_pid = CoapPid}) ->
update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
coap_pid = CoapPid, endpoint_name = Epn}) ->
UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
_ = case proplists:get_value(update_msg_publish_condition,
@ -134,6 +133,7 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
%% - report the registration info update, but only when objectList is updated.
case NewRegInfo of
#{<<"objectList">> := _} ->
emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
_ -> ok
end
@ -151,7 +151,8 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
register_info = UpdatedRegInfo}.
replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
coap_pid = CoapPid}) ->
coap_pid = CoapPid,
endpoint_name = EndpointName}) ->
_ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
%% - flush cached donwlink commands
@ -161,7 +162,7 @@ replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
maps:get(<<"lt">>, NewRegInfo), LifeTimer),
_ = send_auto_observe(CoapPid, NewRegInfo),
_ = send_auto_observe(CoapPid, NewRegInfo, EndpointName),
?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
@ -174,15 +175,20 @@ send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) ->
Lwm2mState.
auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
coap_pid = CoapPid}) ->
_ = send_auto_observe(CoapPid, RegInfo),
coap_pid = CoapPid,
endpoint_name = EndpointName}) ->
_ = send_auto_observe(CoapPid, RegInfo, EndpointName),
Lwm2mState.
deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) ->
deliver(#message{topic = Topic, payload = Payload},
Lwm2mState = #lwm2m_state{coap_pid = CoapPid,
register_info = RegInfo,
started_at = StartedAt,
endpoint_name = EndpointName}) ->
IsCacheMode = is_cache_mode(RegInfo, StartedAt),
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode),
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
Lwm2mState.
get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _},
@ -238,20 +244,21 @@ time_now() -> erlang:system_time(millisecond).
%% Deliver downlink message to coap
%%--------------------------------------------------------------------
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode) when is_binary(JsonData)->
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
try
TermData = emqx_json:decode(JsonData, [return_maps]),
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode)
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
catch
C:R:Stack ->
?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p",
[JsonData, {C, R}, Stack])
end;
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermData) ->
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when is_map(TermData) ->
?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]),
{CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData),
MsgType = maps:get(<<"msgType">>, Ref),
emqx_lwm2m_cm:register_cmd(EndpointName, emqx_lwm2m_cmd_handler:extract_path(Ref), MsgType),
case CacheMode of
false ->
do_deliver_to_coap(CoapPid, CoapRequest, Ref);
@ -266,7 +273,12 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermDat
send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
do_send_to_broker(EventType, Payload, Lwm2mState).
do_send_to_broker(EventType, Payload, Lwm2mState) ->
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
Code = maps:get(<<"code">>, Data, undefined),
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
Content = maps:get(<<"content">>, Data, undefined),
emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
NewPayload = maps:put(<<"msgType">>, EventType, Payload),
Topic = uplink_topic(EventType, Lwm2mState),
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
@ -281,7 +293,7 @@ auto_observe_object_list(Expected, Registered) ->
Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected),
lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered).
send_auto_observe(CoapPid, RegInfo) ->
send_auto_observe(CoapPid, RegInfo, EndpointName) ->
%% - auto observe the objects
case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of
false ->
@ -292,25 +304,37 @@ send_auto_observe(CoapPid, RegInfo) ->
maps:get(<<"objectList">>, RegInfo, [])
),
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
auto_observe(AlternatePath, Objectlists, CoapPid)
auto_observe(AlternatePath, Objectlists, CoapPid, EndpointName)
end.
auto_observe(AlternatePath, ObjectList, CoapPid) ->
auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
?LOG(info, "Auto Observe on: ~p", [ObjectList]),
erlang:spawn(fun() ->
observe_object_list(AlternatePath, ObjectList, CoapPid)
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName)
end).
observe_object_list(AlternatePath, ObjectList, CoapPid) ->
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
lists:foreach(fun(ObjectPath) ->
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100)
[ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
case ObjId of
<<"19">> ->
[ObjInsId | _LastPath1] = LastPath,
case ObjInsId of
<<"0">> ->
observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
_ ->
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
end;
_ ->
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
end
end, ObjectList).
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval) ->
observe_object(AlternatePath, ObjectPath, CoapPid),
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval, EndpointName) ->
observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName),
timer:sleep(Interval).
observe_object(AlternatePath, ObjectPath, CoapPid) ->
observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName) ->
Payload = #{
<<"msgType">> => <<"observe">>,
<<"data">> => #{
@ -318,7 +342,7 @@ observe_object(AlternatePath, ObjectPath, CoapPid) ->
}
},
?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]),
deliver_to_coap(AlternatePath, Payload, CoapPid, false).
deliver_to_coap(AlternatePath, Payload, CoapPid, false, EndpointName).
do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
erlang:spawn(fun() ->

View File

@ -29,4 +29,11 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init(_Args) ->
{ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db)] }}.
CmSup = #{id => emqx_lwm2m_cm_sup,
start => {emqx_lwm2m_cm_sup, start_link, []},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [emqx_lwm2m_cm_sup]
},
{ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db), CmSup] }}.

View File

@ -21,9 +21,11 @@
-export([ get_obj_def/2
, get_object_id/1
, get_object_name/1
, get_object_and_resource_id/2
, get_resource_type/2
, get_resource_name/2
, get_resource_operations/2
]).
-define(LOG(Level, Format, Args),
@ -42,6 +44,10 @@ get_object_id(ObjDefinition) ->
[#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
ObjectId.
get_object_name(ObjDefinition) ->
[#xmlText{value=ObjectName}] = xmerl_xpath:string("Name/text()", ObjDefinition),
ObjectName.
get_object_and_resource_id(ResourceNameBinary, ObjDefinition) ->
ResourceNameString = binary_to_list(ResourceNameBinary),
@ -60,3 +66,8 @@ get_resource_name(ResourceIdInt, ObjDefinition) ->
ResourceIdString = integer_to_list(ResourceIdInt),
[#xmlText{value=Name}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Name/text()", ObjDefinition),
Name.
get_resource_operations(ResourceIdInt, ObjDefinition) ->
ResourceIdString = integer_to_list(ResourceIdInt),
[#xmlText{value=Operations}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Operations/text()", ObjDefinition),
Operations.

View File

@ -58,7 +58,7 @@ find_objectid(ObjectId) ->
false -> ObjectId
end,
case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of
[] -> error(no_xml_definition);
[] -> {error, no_xml_definition};
[{ObjectId, Xml}] -> Xml
end.
@ -121,8 +121,10 @@ load(BaseDir) ->
true -> BaseDir++"*.xml";
false -> BaseDir++"/*.xml"
end,
AllXmlFiles = filelib:wildcard(Wild),
load_loop(AllXmlFiles).
case filelib:wildcard(Wild) of
[] -> error(no_xml_files_found, BaseDir);
AllXmlFiles -> load_loop(AllXmlFiles)
end.
load_loop([]) ->
ok;

View File

@ -40,6 +40,7 @@ all() ->
, {group, test_grp_4_discover}
, {group, test_grp_5_write_attr}
, {group, test_grp_6_observe}
, {group, test_grp_8_object_19}
].
suite() -> [{timetrap, {seconds, 90}}].
@ -98,9 +99,9 @@ groups() ->
]},
{test_grp_8_object_19, [RepeatOpt], [
case80_specail_object_19_1_0_write,
case80_specail_object_19_0_0_notify,
case80_specail_object_19_0_0_response,
case80_normal_object_19_0_0_read
case80_specail_object_19_0_0_notify
%case80_specail_object_19_0_0_response,
%case80_normal_object_19_0_0_read
]},
{test_grp_9_psm_queue_mode, [RepeatOpt], [
case90_psm_mode,
@ -1655,6 +1656,7 @@ case80_specail_object_19_1_0_write(Config) ->
<<"value">> => base64:encode(<<12345:32>>)
}
},
CommandJson = emqx_json:encode(Command),
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
timer:sleep(50),
@ -1663,7 +1665,7 @@ case80_specail_object_19_1_0_write(Config) ->
Path2 = get_coap_path(Options2),
?assertEqual(put, Method2),
?assertEqual(<<"/19/1/0">>, Path2),
?assertEqual(<<12345:32>>, Payload2),
?assertEqual(<<3:2, 0:1, 0:2, 4:3, 0, 12345:32>>, Payload2),
timer:sleep(50),
test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, changed}, #coap_content{}, Request2, true),
@ -1672,6 +1674,7 @@ case80_specail_object_19_1_0_write(Config) ->
ReadResult = emqx_json:encode(#{
<<"requestID">> => CmdId, <<"cacheID">> => CmdId,
<<"data">> => #{
<<"reqPath">> => <<"/19/1/0">>,
<<"code">> => <<"2.04">>,
<<"codeMsg">> => <<"changed">>
},

View File

@ -1,6 +1,6 @@
{application, emqx_management,
[{description, "EMQ X Management API and CLI"},
{vsn, "4.3.3"}, % strict semver, bump manually!
{vsn, "4.3.4"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_management_sup]},
{applications, [kernel,stdlib,minirest]},

View File

@ -1,11 +1,11 @@
%% -*- mode: erlang -*-
{VSN,
[ {<<"4.3.[0-2]">>,
[ {<<"4.3.[0-3]">>,
[ {restart_application, emqx_management}
]},
{<<".*">>, []}
],
[ {<<"4.3.[0-2]">>,
[ {<<"4.3.[0-3]">>,
[ {restart_application, emqx_management}
]},
{<<".*">>, []}

View File

@ -616,8 +616,18 @@ dump(_Table, _, '$end_of_table', Result) ->
lists:reverse(Result);
dump(Table, Tag, Key, Result) ->
PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)],
dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]).
Ls = lists:foldl(fun(Record, Acc) ->
try
[print({Tag, Record}) | Acc]
catch
Class : Reason : Stk ->
logger:error("Failed to print ~p, error: {~p, ~p}. "
"Stacktrace: ~0p",
[Record, Class, Reason, Stk]),
Acc
end
end, [], ets:lookup(Table, Key)),
dump(Table, Tag, ets:next(Table, Key), [lists:reverse(Ls) | Result]).
print({_, []}) ->
ok;
@ -634,7 +644,7 @@ print({client, {ClientId, ChanPid}}) ->
ClientInfo = maps:get(clientinfo, Attrs, #{}),
ConnInfo = maps:get(conninfo, Attrs, #{}),
Session = maps:get(session, Attrs, #{}),
Connected = case maps:get(conn_state, Attrs) of
Connected = case maps:get(conn_state, Attrs, undefined) of
connected -> true;
_ -> false
end,

View File

@ -1,6 +1,6 @@
{application, emqx_rule_engine,
[{description, "EMQ X Rule Engine"},
{vsn, "4.3.2"}, % strict semver, bump manually!
{vsn, "4.3.3"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
{applications, [kernel,stdlib,rulesql,getopt]},

View File

@ -1,21 +1,37 @@
%% -*-: erlang -*-
{"4.3.2",
{"4.3.3",
[ {"4.3.0",
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []},
{load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{"4.3.1",
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{"4.3.2",
[ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{<<".*">>, []}
],
[
{"4.3.0",
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []},
{load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{"4.3.1",
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{"4.3.2",
[ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{<<".*">>, []}
]

View File

@ -93,13 +93,6 @@
-define(REGISTRY, ?MODULE).
%% Statistics
-define(STATS,
[ {?RULE_TAB, 'rules.count', 'rules.max'}
, {?ACTION_TAB, 'actions.count', 'actions.max'}
, {?RES_TAB, 'resources.count', 'resources.max'}
]).
-define(T_CALL, 10000).
%%------------------------------------------------------------------------------
@ -392,8 +385,11 @@ find_rules_depends_on_resource(ResId) ->
end, [], get_rules()).
search_action_despends_on_resource(ResId, Actions) ->
lists:search(fun(#action_instance{args = #{<<"$resource">> := ResId0}}) ->
ResId0 =:= ResId
lists:search(fun
(#action_instance{args = #{<<"$resource">> := ResId0}}) ->
ResId0 =:= ResId;
(_) ->
false
end, Actions).
%%------------------------------------------------------------------------------
@ -439,8 +435,6 @@ delete_resource_type(Type) ->
%%------------------------------------------------------------------------------
init([]) ->
%% Enable stats timer
ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0),
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
{read_concurrency, true}]),
{ok, #{}}.
@ -466,7 +460,7 @@ handle_info(Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
emqx_stats:cancel_update(rule_registery_stats).
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -475,13 +469,6 @@ code_change(_OldVsn, State, _Extra) ->
%% Private functions
%%------------------------------------------------------------------------------
update_stats() ->
lists:foreach(
fun({Tab, Stat, MaxStat}) ->
Size = mnesia:table_info(Tab, size),
emqx_stats:setstat(Stat, MaxStat, Size)
end, ?STATS).
get_all_records(Tab) ->
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
ets:tab2list(Tab).

View File

@ -1,6 +1,6 @@
{application, emqx_sn,
[{description, "EMQ X MQTT-SN Plugin"},
{vsn, "4.3.2"}, % strict semver, bump manually!
{vsn, "4.3.3"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,esockd]},

View File

@ -1,11 +1,17 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.2", [
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
]},
{<<"4.3.[0-1]">>, [
{restart_application, emqx_sn}
]}
],
[
{"4.3.2", [
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
]},
{<<"4.3.[0-1]">>, [
{restart_application, emqx_sn}
]}

View File

@ -250,8 +250,9 @@ wait_for_will_topic(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, _State)
% ignore
keep_state_and_data;
wait_for_will_topic(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
do_2nd_connect(Flags, Duration, ClientId, State);
wait_for_will_topic(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
?LOG(warning, "Receive connect packet in wait_for_will_topic state", []),
keep_state_and_data;
wait_for_will_topic(cast, {outgoing, Packet}, State) ->
{keep_state, handle_outgoing(Packet, State)};
@ -275,9 +276,9 @@ wait_for_will_msg(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, _State) -
% ignore
keep_state_and_data;
%% XXX: ?? Why we will handling the 2nd CONNECT packet ??
wait_for_will_msg(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
do_2nd_connect(Flags, Duration, ClientId, State);
wait_for_will_msg(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
?LOG(warning, "Receive connect packet in wait_for_will_msg state", []),
keep_state_and_data;
wait_for_will_msg(cast, {outgoing, Packet}, State) ->
{keep_state, handle_outgoing(Packet, State)};
@ -365,8 +366,9 @@ connected(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, State) ->
% ignore
{keep_state, State};
connected(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
do_2nd_connect(Flags, Duration, ClientId, State);
connected(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, _State) ->
?LOG(warning, "Receive connect packet in wait_for_will_topic state", []),
keep_state_and_data;
connected(cast, {outgoing, Packet}, State) ->
{keep_state, handle_outgoing(Packet, State)};
@ -826,8 +828,10 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
clean_start = CleanStart,
username = State#state.username,
password = State#state.password,
proto_name = <<"MQTT-SN">>,
keepalive = Duration,
properties = OnlyOneInflight
properties = OnlyOneInflight,
proto_ver = 1
},
case WillFlag of
true -> State0 = send_message(?SN_WILLTOPICREQ_MSG(), State),
@ -843,26 +847,6 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
handle_incoming(?CONNECT_PACKET(ConnPkt), NState)
end.
do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
peername = Peername,
channel = Channel}) ->
emqx_logger:set_metadata_clientid(ClientId),
#mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags,
NChannel = case CleanStart of
true ->
emqx_channel:terminate(normal, Channel),
emqx_sn_registry:unregister_topic(ClientId),
emqx_channel:init(#{socktype => udp,
sockname => Sockname,
peername => Peername,
peercert => ?NO_PEERCERT,
conn_mod => ?MODULE
}, ?DEFAULT_CHAN_OPTIONS);
false -> Channel
end,
NState = State#state{channel = NChannel},
do_connect(ClientId, CleanStart, Will, Duration, NState).
handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
State=#state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),

View File

@ -98,19 +98,6 @@ t_connect(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_do_2nd_connect(_) ->
{ok, Socket} = gen_udp:open(0, [binary]),
ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
timer:sleep(100),
send_connect_msg(Socket, <<"client_id_other">>),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_subscribe(_) ->
Dup = 0,
QoS = 0,

View File

@ -1,6 +1,6 @@
{application, emqx_web_hook,
[{description, "EMQ X WebHook Plugin"},
{vsn, "4.3.1"}, % strict semver, bump manually!
{vsn, "4.3.2"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_web_hook_sup]},
{applications, [kernel,stdlib,ehttpc]},

View File

@ -2,14 +2,16 @@
{VSN,
[
{"4.3.0", [
{load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
{<<"4.3.[0-1]">>, [
{restart_application, emqx_web_hook},
{apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
]},
{<<".*">>, []}
],
[
{"4.3.0", [
{load_module, emqx_web_hook_actions, brutal_purge, soft_purge, []}
{<<"4.3.[0-1]">>, [
{restart_application, emqx_web_hook},
{apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
]},
{<<".*">>, []}
]

View File

@ -292,7 +292,7 @@ parse_action_params(Params = #{<<"url">> := URL}) ->
Headers = headers(maps:get(<<"headers">>, Params, undefined)),
NHeaders = ensure_content_type_header(Headers, Method),
#{method => Method,
path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))),
path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)),
headers => NHeaders,
body => maps:get(<<"body">>, Params, <<>>),
request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
@ -306,8 +306,16 @@ ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =
ensure_content_type_header(Headers, _Method) ->
lists:keydelete("content-type", 1, Headers).
path(<<>>) -> <<"/">>;
path(Path) -> Path.
merge_path(CommonPath, <<>>) ->
CommonPath;
merge_path(CommonPath, Path0) ->
case emqx_http_lib:uri_parse(Path0) of
{ok, #{path := Path1, 'query' := Query}} ->
Path2 = filename:join(CommonPath, Path1),
<<Path2/binary, "?", Query/binary>>;
{ok, #{path := Path1}} ->
filename:join(CommonPath, Path1)
end.
method(GET) when GET == <<"GET">>; GET == <<"get">> -> get;
method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;

View File

@ -42,10 +42,9 @@ stop(_State) ->
translate_env() ->
{ok, URL} = application:get_env(?APP, url),
{ok, #{host := Host,
path := Path0,
port := Port,
scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
Path = path(Path0),
scheme := Scheme} = URIMap} = emqx_http_lib:uri_parse(URL),
Path = path(URIMap),
PoolSize = application:get_env(?APP, pool_size, 32),
MoreOpts = case Scheme of
http ->
@ -89,9 +88,13 @@ translate_env() ->
NHeaders = set_content_type(Headers),
application:set_env(?APP, headers, NHeaders).
path("") ->
path(#{path := "", 'query' := Query}) ->
"?" ++ Query;
path(#{path := Path, 'query' := Query}) ->
Path ++ "?" ++ Query;
path(#{path := ""}) ->
"/";
path(Path) ->
path(#{path := Path}) ->
Path.
set_content_type(Headers) ->

View File

@ -26,7 +26,9 @@ COPY . /emqx
ARG PKG_VSN
ARG EMQX_NAME=emqx
RUN cd /emqx && make $EMQX_NAME
RUN cd /emqx \
&& rm -rf _build/$EMQX_NAME/lib \
&& make $EMQX_NAME
FROM $RUN_FROM

View File

@ -83,7 +83,7 @@ These environment variables will ignore for configuration file.
The list is incomplete and may changed with [etc/emqx.conf](https://github.com/emqx/emqx/blob/master/etc/emqx.conf) and plugin configuration files. But the mapping rule is similar.
If set ``EMQX_NAME`` and ``EMQX_HOST``, and unset ``EMQX_NODE__NAME``, ``EMQX_NODE__NAME=$EMQX_NAME@$EMQX_HOST``.
If set ``EMQX_NAME`` and ``EMQX_HOST``, and unset ``EMQX_NODE_NAME``, ``EMQX_NODE_NAME=$EMQX_NAME@$EMQX_HOST``.
For example, set mqtt tcp port to 1883

View File

@ -343,8 +343,8 @@ rpc.port_discovery = stateless
## Number of outgoing RPC connections.
##
## Value: Interger [0-256]
## Defaults to NumberOfCPUSchedulers / 2 when set to 0
#rpc.tcp_client_num = 0
## Default = 1
#rpc.tcp_client_num = 1
## RCP Client connect timeout.
##
@ -1849,7 +1849,7 @@ listener.wss.external.acceptors = 4
## Maximum number of concurrent MQTT/Webwocket/SSL connections.
##
## Value: Number
listener.wss.external.max_connections = 16
listener.wss.external.max_connections = 102400
## Maximum MQTT/WebSocket/SSL connections per second.
##

View File

@ -30,11 +30,13 @@
%% MQTT Protocol Version and Names
%%--------------------------------------------------------------------
-define(MQTT_SN_PROTO_V1, 1).
-define(MQTT_PROTO_V3, 3).
-define(MQTT_PROTO_V4, 4).
-define(MQTT_PROTO_V5, 5).
-define(PROTOCOL_NAMES, [
{?MQTT_SN_PROTO_V1, <<"MQTT-SN">>}, %% XXX:Compatible with emqx-sn plug-in
{?MQTT_PROTO_V3, <<"MQIsdp">>},
{?MQTT_PROTO_V4, <<"MQTT">>},
{?MQTT_PROTO_V5, <<"MQTT">>}]).

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.3.3"}).
-define(EMQX_RELEASE, {opensource, "4.3.5"}).
-else.

View File

@ -1,6 +1,6 @@
{application, emqx_dashboard,
[{description, "EMQ X Web Dashboard"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.3.1"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [kernel,stdlib,mnesia,minirest]},

View File

@ -0,0 +1,16 @@
%% -*- mode: erlang -*-
{VSN,
[ {"4.3.0",
%% load all plugins
%% NOTE: this depends on the fact that emqx_dashboard is always
%% the last application gets upgraded
[ {apply, {emqx_plugins, load, []}}
]},
{<<".*">>, []}
],
[ {"4.3.0",
[ {apply, {emqx_plugins, load, []}}
]},
{<<".*">>, []}
]
}.

View File

@ -52,46 +52,50 @@ description() ->
%%--------------------------------------------------------------------
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) ->
Presence = connected_presence(ClientInfo, ConnInfo),
case emqx_json:safe_encode(Presence) of
Presence = common_infos(ClientInfo, ConnInfo),
NPresence = Presence#{
connack => 0, %% XXX: connack will be removed in 5.0
keepalive => maps:get(keepalive, ConnInfo, 0),
clean_start => maps:get(clean_start, ConnInfo, true),
expiry_interval => maps:get(expiry_interval, ConnInfo, 0),
connected_at => maps:get(connected_at, ConnInfo)
},
case emqx_json:safe_encode(NPresence) of
{ok, Payload} ->
emqx_broker:safe_publish(
make_msg(qos(Env), topic(connected, ClientId), Payload));
{error, _Reason} ->
?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
?LOG(error, "Failed to encode 'connected' presence: ~p", [NPresence])
end.
on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username},
Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
Presence = #{clientid => ClientId,
username => Username,
on_client_disconnected(ClientInfo = #{clientid := ClientId},
Reason, ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
Presence = common_infos(ClientInfo, ConnInfo),
NPresence = Presence#{
reason => reason(Reason),
disconnected_at => DisconnectedAt,
ts => erlang:system_time(millisecond)
disconnected_at => DisconnectedAt
},
case emqx_json:safe_encode(Presence) of
case emqx_json:safe_encode(NPresence) of
{ok, Payload} ->
emqx_broker:safe_publish(
make_msg(qos(Env), topic(disconnected, ClientId), Payload));
{error, _Reason} ->
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [NPresence])
end.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
connected_presence(#{peerhost := PeerHost,
sockport := SockPort,
clientid := ClientId,
username := Username
common_infos(
_ClientInfo = #{clientid := ClientId,
username := Username,
peerhost := PeerHost,
sockport := SockPort
},
#{clean_start := CleanStart,
proto_name := ProtoName,
proto_ver := ProtoVer,
keepalive := Keepalive,
connected_at := ConnectedAt,
expiry_interval := ExpiryInterval
_ConnInfo = #{proto_name := ProtoName,
proto_ver := ProtoVer
}) ->
#{clientid => ClientId,
username => Username,
@ -99,11 +103,6 @@ connected_presence(#{peerhost := PeerHost,
sockport => SockPort,
proto_name => ProtoName,
proto_ver => ProtoVer,
keepalive => Keepalive,
connack => 0, %% Deprecated?
clean_start => CleanStart,
expiry_interval => ExpiryInterval,
connected_at => ConnectedAt,
ts => erlang:system_time(millisecond)
}.

View File

@ -1,6 +1,6 @@
{application, emqx_modules,
[{description, "EMQ X Module Management"},
{vsn, "4.3.2"},
{vsn, "4.3.3"},
{modules, []},
{applications, [kernel,stdlib]},
{mod, {emqx_modules_app, []}},

View File

@ -1,21 +1,31 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.2", [
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}
]},
{"4.3.1", [
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
]},
{"4.3.0", [
{update, emqx_mod_delayed, {advanced, []}},
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
[
{"4.3.2", [
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}
]},
{"4.3.1", [
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
]},
{"4.3.0", [
{update, emqx_mod_delayed, {advanced, []}},
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []},
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}

View File

@ -376,7 +376,7 @@ end}.
{translation, "gen_rpc.tcp_client_num", fun(Conf) ->
case cuttlefish:conf_get("rpc.tcp_client_num", Conf) of
0 -> max(1, erlang:system_info(schedulers) div 2);
0 -> 1; %% keep allowing 0 for backward compatibility
V -> V
end
end}.
@ -582,17 +582,6 @@ end}.
hidden
]}.
%% disable lager
{mapping, "lager.handlers", "lager.handlers", [
{default, []},
hidden
]}.
{mapping, "lager.crash_log", "lager.crash_log", [
{default, off},
{datatype, flag},
hidden
]}.
{translation, "kernel.logger_level", fun(_, _, Conf) ->
cuttlefish:conf_get("log.level", Conf)
end}.

View File

@ -46,7 +46,7 @@
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.5"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.6"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}

View File

@ -128,23 +128,30 @@ prod_compile_opts() ->
prod_overrides() ->
[{add, [ {erl_opts, [deterministic]}]}].
relup_deps(Profile) ->
{post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "scripts/inject-deps.escript " ++ atom_to_list(Profile)}]}.
profiles() ->
Vsn = get_vsn(),
[ {'emqx', [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, cloud, bin)}
, {overrides, prod_overrides()}
, relup_deps('emqx')
]}
, {'emqx-pkg', [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, cloud, pkg)}
, {overrides, prod_overrides()}
, relup_deps('emqx-pkg')
]}
, {'emqx-edge', [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, edge, bin)}
, {overrides, prod_overrides()}
, relup_deps('emqx-edge')
]}
, {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, edge, pkg)}
, {overrides, prod_overrides()}
, relup_deps('emqx-edge-pkg')
]}
, {check, [ {erl_opts, common_compile_opts()}
]}

157
scripts/inject-deps.escript Executable file
View File

@ -0,0 +1,157 @@
#!/usr/bin/env escript
%% This script injects implicit relup dependencies for emqx applications.
%%
%% By 'implicit', it means that it is not feasible to define application
%% dependencies in .app.src files.
%%
%% For instance, during upgrade/downgrade, emqx_dashboard usually requires
%% a restart after (but not before) all plugins are upgraded (and maybe
%% restarted), however, the dependencies are not resolvable at build time
%% when relup is generated.
%%
%% This script is to be executed after compile, with the profile given as the
%% first argument. For each dependency, it modifies the .app file to
%% have the `relup_deps` list extended to application attributes.
%%
%% The `relup_deps` application attribute is then picked up by (EMQ's fork of)
%% `relx` when top-sorting apps to generate relup instructions
-mode(compile).
usage() ->
"Usage: " ++ escript:script_name() ++ " emqx|emqx-edge".
-type app() :: atom().
-type deps_overlay() :: {re, string()} | app().
%% deps/0 returns the dependency overlays.
%% {re, Pattern} to match application names using regexp pattern
-spec deps(string()) -> [{app(), [deps_overlay()]}].
deps("emqx-edge" ++ _) ->
%% special case for edge
base_deps() ++ [{{re, ".+"}, [{exclude, App} || App <- edge_excludes()]}];
deps(_Profile) ->
base_deps().
edge_excludes() ->
[ emqx_lwm2m
, emqx_auth_ldap
, emqx_auth_pgsql
, emqx_auth_redis
, emqx_auth_mongo
, emqx_lua_hook
, emqx_exhook
, emqx_exproto
, emqx_prometheus
, emqx_psk_file
].
base_deps() ->
%% make sure emqx_dashboard depends on all other emqx_xxx apps
%% so the appup instructions for emqx_dashboard is always the last
%% to be executed
[ {emqx_dashboard, [{re, "emqx_.*"}]}
, {emqx_management, [{re, "emqx_.*"}, {exclude, emqx_dashboard}]}
, {{re, "emqx_.*"}, [emqx]}
].
main([Profile | _]) ->
ok = inject(Profile);
main(_Args) ->
io:format(standard_error, "~s", [usage()]),
erlang:halt(1).
expand_names({Name, Deps}, AppNames) ->
Names = match_pattern(Name, AppNames),
[{N, Deps} || N <- Names].
%% merge k-v pairs with v1 ++ v2
merge([], Acc) -> Acc;
merge([{K, V0} | Rest], Acc) ->
V = case lists:keyfind(K, 1, Acc) of
{K, V1} -> V1 ++ V0;
false -> V0
end,
NewAcc = lists:keystore(K, 1, Acc, {K, V}),
merge(Rest, NewAcc).
expand_deps([], _AppNames, Acc) -> Acc;
expand_deps([{exclude, Dep} | Deps], AppNames, Acc) ->
Matches = expand_deps([Dep], AppNames, []),
expand_deps(Deps, AppNames, Acc -- Matches);
expand_deps([Dep | Deps], AppNames, Acc) ->
NewAcc = add_to_list(Acc, match_pattern(Dep, AppNames)),
expand_deps(Deps, AppNames, NewAcc).
inject(Profile) ->
LibDir = lib_dir(Profile),
AppNames = list_apps(LibDir),
Deps0 = lists:flatmap(fun(Dep) -> expand_names(Dep, AppNames) end, deps(Profile)),
Deps1 = merge(Deps0, []),
Deps2 = lists:map(fun({Name, DepsX}) ->
NewDeps = expand_deps(DepsX, AppNames, []),
{Name, NewDeps}
end, Deps1),
lists:foreach(fun({App, Deps}) -> inject(App, Deps, LibDir) end, Deps2).
%% list the profile/lib dir to get all apps
list_apps(LibDir) ->
Apps = filelib:wildcard("*", LibDir),
lists:foldl(fun(App, Acc) -> [App || is_app(LibDir, App)] ++ Acc end, [], Apps).
is_app(_LibDir, "." ++ _) -> false; %% ignore hidden dir
is_app(LibDir, AppName) ->
Path = filename:join([ebin_dir(LibDir, AppName), AppName ++ ".app"]),
filelib:is_regular(Path) orelse error({unknown_app, AppName, Path}). %% wtf
lib_dir(Profile) ->
filename:join(["_build", Profile, lib]).
ebin_dir(LibDir, AppName) -> filename:join([LibDir, AppName, "ebin"]).
inject(App0, DepsToAdd, LibDir) ->
App = str(App0),
AppEbinDir = ebin_dir(LibDir, App),
[AppFile0] = filelib:wildcard("*.app", AppEbinDir),
AppFile = filename:join(AppEbinDir, AppFile0),
{ok, [{application, AppName, Props}]} = file:consult(AppFile),
Deps0 = case lists:keyfind(relup_deps, 1, Props) of
{_, X} -> X;
false -> []
end,
%% merge extra deps, but do not self-include
Deps = add_to_list(Deps0, DepsToAdd) -- [App0],
case Deps =:= [] of
true -> ok;
_ ->
NewProps = lists:keystore(relup_deps, 1, Props, {relup_deps, Deps}),
AppSpec = {application, AppName, NewProps},
AppSpecIoData = io_lib:format("~p.", [AppSpec]),
io:format(user, "updated_relup_deps for ~p~n", [App]),
file:write_file(AppFile, AppSpecIoData)
end.
str(A) when is_atom(A) -> atom_to_list(A).
match_pattern({re, Re}, AppNames) ->
Match = fun(AppName) -> re:run(AppName, Re) =/= nomatch end,
AppNamesToAdd = lists:filter(Match, AppNames),
AppsToAdd = lists:map(fun(N) -> list_to_atom(N) end, AppNamesToAdd),
case AppsToAdd =:= [] of
true -> error({nomatch, Re});
false -> AppsToAdd
end;
match_pattern(NameAtom, AppNames) ->
case lists:member(str(NameAtom), AppNames) of
true -> [NameAtom];
false -> error({notfound, NameAtom})
end.
%% Append elements to list without duplication. No reordering.
add_to_list(List, []) -> List;
add_to_list(List, [H | T]) ->
case lists:member(H, List) of
true -> add_to_list(List, T);
false -> add_to_list(List ++ [H], T)
end.

View File

@ -1,7 +1,7 @@
{application, emqx,
[{id, "emqx"},
{description, "EMQ X"},
{vsn, "4.3.3"}, % strict semver, bump manually!
{vsn, "4.3.5"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},

View File

@ -1,12 +1,31 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.2",
[{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
[
{"4.3.4",
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}
]},
{"4.3.3",
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}
]},
{"4.3.2",
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}
]},
{"4.3.1",
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
@ -18,7 +37,9 @@
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -34,13 +55,32 @@
{apply,{emqx_metrics,upgrade_retained_delayed_counter_type,[]}},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.2",
[{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
[
{"4.3.4",
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}
]},
{"4.3.3",
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}
]},
{"4.3.2",
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}
]},
{"4.3.1",
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
@ -52,7 +92,9 @@
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},

View File

@ -294,6 +294,9 @@ do_discard_session(ClientId, Pid) ->
_ : {noproc, _} -> % emqx_connection: gen_server:call
?tp(debug, "session_already_gone", #{pid => Pid}),
ok;
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
?tp(debug, "session_already_gone", #{pid => Pid}),
ok;
_ : {{shutdown, _}, _} ->
?tp(debug, "session_already_shutdown", #{pid => Pid}),
ok;

View File

@ -336,9 +336,13 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
#emqx_shared_subscription{subpid = SubPid} = OldRecord,
{noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
%% it `unsubscribed` the last topic.
%% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually
%% be disconnected.
% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
% #emqx_shared_subscription{subpid = SubPid} = OldRecord,
% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
handle_info({mnesia_table_event, _Event}, State) ->
{noreply, State};
@ -348,8 +352,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMo
cleanup_down(SubPid),
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -403,7 +403,10 @@ websocket_close(Reason, State) ->
terminate(Reason, _Req, #state{channel = Channel}) ->
?LOG(debug, "Terminated due to ~p", [Reason]),
emqx_channel:terminate(Reason, Channel).
emqx_channel:terminate(Reason, Channel);
terminate(_Reason, _Req, _UnExpectedState) ->
ok.
%%--------------------------------------------------------------------
%% Handle call