diff --git a/.ci/build_packages/tests.sh b/.ci/build_packages/tests.sh index 48e7c27a3..8ae3cc838 100755 --- a/.ci/build_packages/tests.sh +++ b/.ci/build_packages/tests.sh @@ -1,8 +1,9 @@ #!/bin/sh set -x -e -u +export CODE_PATH=${CODE_PATH:-"/emqx"} export EMQX_NAME=${EMQX_NAME:-"emqx"} -export PACKAGE_PATH="/emqx/_packages/${EMQX_NAME}" -export RELUP_PACKAGE_PATH="/emqx/relup_packages/${EMQX_NAME}" +export PACKAGE_PATH="${CODE_PATH}/_packages/${EMQX_NAME}" +export RELUP_PACKAGE_PATH="${CODE_PATH}/relup_packages/${EMQX_NAME}" # export EMQX_NODE_NAME="emqx-on-$(uname -m)@127.0.0.1" # export EMQX_NODE_COOKIE=$(date +%s%N) diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index 057435e4e..2562a725f 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -356,7 +356,7 @@ jobs: if: github.event_name == 'release' run: | curl --silent --show-error \ - -H "Authorization: token ${{ secrets.AccessToken }}" \ + -H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \ -H "Accept: application/vnd.github.v3+json" \ -X POST \ -d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \ @@ -366,7 +366,7 @@ jobs: run: | if [ -z $(echo $version | grep -oE "(alpha|beta|rc)\.[0-9]") ]; then curl --silent --show-error \ - -H "Authorization: token ${{ secrets.AccessToken }}" \ + -H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \ -H "Accept: application/vnd.github.v3+json" \ -X POST \ -d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \ diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml new file mode 100644 index 000000000..217e06917 --- /dev/null +++ b/.github/workflows/build_slim_packages.yaml @@ -0,0 +1,26 @@ +name: Build slim packages + +on: [pull_request] + +jobs: + build: + runs-on: ubuntu-20.04 + + strategy: + matrix: + erl_otp: + - erl23.2.2 + os: + - ubuntu20.04 + - centos8 + + container: emqx/build-env:${{ matrix.erl_otp }}-${{ matrix.os }} + + steps: + - uses: actions/checkout@v1 + - name: build packages + run: make emqx-pkg + - name: pakcages test + run: | + export CODE_PATH=$GITHUB_WORKSPACE + .ci/build_packages/tests.sh diff --git a/.github/workflows/git_sync.yaml b/.github/workflows/git_sync.yaml new file mode 100644 index 000000000..6fe19eae6 --- /dev/null +++ b/.github/workflows/git_sync.yaml @@ -0,0 +1,29 @@ +name: Sync to enterprise + +on: + push: + branches: + - master + +jobs: + sync_to_enterprise: + runs-on: ubuntu-20.04 + if: github.repository == 'emqx/emqx' + steps: + - name: git-sync + uses: Rory-Z/git-sync@v3.0.1 + with: + source_repo: ${{ github.repository }} + source_branch: ${{ github.ref }} + destination_repo: "${{ github.repository_owner }}/emqx-enterprise" + destination_branch: ${{ github.ref }} + destination_ssh_private_key: "${{ secrets.CI_SSH_PRIVATE_KEY }}" + - name: create pull request + run: | + set -euo pipefail + curl --silent --show-error \ + -H "Accept: application/vnd.github.v3+json" \ + -H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \ + -X POST \ + -d '{"title": "Sync code into enterprise from opensource", "head": "master", "base":"enterprise"}' \ + https://api.github.com/repos/${{ github.repository_owner }}/emqx-enterprise/pulls diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 6adf099cb..0a2f13eca 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -38,9 +38,13 @@ jobs: docker-compose -f .ci/apps_tests/docker-compose.yaml build --no-cache docker-compose -f .ci/apps_tests/docker-compose.yaml up -d - name: run eunit - run: docker exec -i erlang bash -c "make eunit" + run: | + docker exec -i erlang bash -c "make eunit" + docker exec --env EMQX_EXTRA_PLUGINS=all -i erlang bash -c "./rebar3 eunit --dir $(find lib-extra/ -mindepth 1 -maxdepth 2 -type l | tr '\n' ',')" - name: run common test - run: docker exec -i erlang bash -c "make ct" + run: | + docker exec -i erlang bash -c "make ct" + docker exec --env EMQX_EXTRA_PLUGINS=all -i erlang bash -c "./rebar3 ct --dir $(find lib-extra/ -mindepth 1 -maxdepth 2 -type l | tr '\n' ',')" - name: run cover run: | docker exec -i erlang bash -c "make cover" diff --git a/.gitignore b/.gitignore index 59ebdd433..c702d3955 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,4 @@ emqx_dialyzer_*_plt */emqx_dashboard/priv/www dist.zip scripts/git-token +etc/*.seg diff --git a/Makefile b/Makefile index dadc9dd3b..69682f29c 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,16 @@ -REBAR_VERSION = 3.14.3-emqx-4 -DASHBOARD_VERSION = v4.3.0-beta.1 +$(shell scripts/git-hooks-init.sh) +REBAR_VERSION = 3.14.3-emqx-5 REBAR = $(CURDIR)/rebar3 BUILD = $(CURDIR)/build SCRIPTS = $(CURDIR)/scripts export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) +export EMQX_DESC ?= EMQ X +export EMQX_CE_DASHBOARD_VERSION ?= v4.3.0-beta.1 PROFILE ?= emqx REL_PROFILES := emqx emqx-edge PKG_PROFILES := emqx-pkg emqx-edge-pkg -PROFILES := $(REL_PROFILES) $(PKG_PROFILES) +PROFILES := $(REL_PROFILES) $(PKG_PROFILES) default export REBAR_GIT_CLONE_OPTIONS += --depth=1 @@ -27,7 +29,7 @@ $(REBAR): ensure-rebar3 .PHONY: get-dashboard get-dashboard: - @$(SCRIPTS)/get-dashboard.sh $(DASHBOARD_VERSION) + @$(SCRIPTS)/get-dashboard.sh .PHONY: eunit eunit: $(REBAR) @@ -51,30 +53,33 @@ coveralls: $(REBAR) .PHONY: $(REL_PROFILES) $(REL_PROFILES:%=%): $(REBAR) get-dashboard -ifneq ($(shell echo $(@) |grep edge),) - @export EMQX_DESC="EMQ X Edge" -else - @export EMQX_DESC="EMQ X Broker" -endif @$(REBAR) as $(@) release -# rebar clean +## Not calling rebar3 clean because +## 1. rebar3 clean relies on rebar3, meaning it reads config, fetches dependencies etc. +## 2. it's slow +## NOTE: this does not force rebar3 to fetch new version dependencies +## make clean-all to delete all fetched dependencies for a fresh start-over .PHONY: clean $(PROFILES:%=clean-%) clean: $(PROFILES:%=clean-%) -$(PROFILES:%=clean-%): $(REBAR) - @$(REBAR) as $(@:clean-%=%) clean - @rm -rf apps/emqx_dashboard/priv/www +$(PROFILES:%=clean-%): + @if [ -d _build/$(@:clean-%=%) ]; then \ + rm -rf _build/$(@:clean-%=%)/rel; \ + find _build/$(@:clean-%=%) -name '*.beam' -o -name '*.so' -o -name '*.app' -o -name '*.appup' -o -name '*.o' -o -name '*.d' -type f | xargs rm -f; \ + fi + +.PHONY: clean-all +clean-all: + @rm -rf _build .PHONY: deps-all deps-all: $(REBAR) $(PROFILES:%=deps-%) +## deps- is used in CI scripts to download deps and the +## share downloads between CI steps and/or copied into containers +## which may not have the right credentials .PHONY: $(PROFILES:%=deps-%) $(PROFILES:%=deps-%): $(REBAR) get-dashboard -ifneq ($(shell echo $(@) |grep edge),) - @export EMQX_DESC="EMQ X Edge" -else - @export EMQX_DESC="EMQ X Broker" -endif @$(REBAR) as $(@:deps-%=%) get-deps .PHONY: xref diff --git a/apps/emqx_auth_http/src/emqx_auth_http_app.erl b/apps/emqx_auth_http/src/emqx_auth_http_app.erl index 89f42a2cd..acbb67bf4 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_app.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_app.erl @@ -53,19 +53,16 @@ translate_env(EnvName) -> {ok, PoolSize} = application:get_env(?APP, pool_size), {ok, ConnectTimeout} = application:get_env(?APP, connect_timeout), URL = proplists:get_value(url, Req), - #{host := Host0, - path := Path0, - scheme := Scheme} = URIMap = uri_string:parse(add_default_scheme(uri_string:normalize(URL))), - Port = maps:get(port, URIMap, case Scheme of - "https" -> 443; - "http" -> 80 - end), + {ok, #{host := Host0, + path := Path0, + port := Port, + scheme := Scheme}} = emqx_http_lib:uri_parse(URL), Path = path(Path0), {Inet, Host} = parse_host(Host0), MoreOpts = case Scheme of - "http" -> + http -> [{transport_opts, [Inet]}]; - "https" -> + https -> CACertFile = application:get_env(?APP, cacertfile, undefined), CertFile = application:get_env(?APP, certfile, undefined), KeyFile = application:get_env(?APP, keyfile, undefined), @@ -158,15 +155,6 @@ ensure_content_type_header(Method, Headers) ensure_content_type_header(_Method, Headers) -> lists:keydelete("content-type", 1, Headers). -add_default_scheme(URL) when is_list(URL) -> - binary_to_list(add_default_scheme(list_to_binary(URL))); -add_default_scheme(<<"http://", _/binary>> = URL) -> - URL; -add_default_scheme(<<"https://", _/binary>> = URL) -> - URL; -add_default_scheme(URL) -> - <<"http://", URL/binary>>. - path("") -> "/"; path(Path) -> diff --git a/apps/emqx_exhook/docs/design.md b/apps/emqx_exhook/docs/design-cn.md similarity index 93% rename from apps/emqx_exhook/docs/design.md rename to apps/emqx_exhook/docs/design-cn.md index 671e240cc..6686e96e3 100644 --- a/apps/emqx_exhook/docs/design.md +++ b/apps/emqx_exhook/docs/design-cn.md @@ -19,7 +19,7 @@ 2. 将 `emqx-extension-hook` 重命名为 `emqx-exhook` -旧版本的设计参考:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md) +旧版本的设计:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md) ## 设计 @@ -39,13 +39,13 @@ `emqx-exhook` 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。 -和 emqx 原生的钩子一致,emqx-exhook 也支持链式的方式计算和返回: +和 emqx 原生的钩子一致,emqx-exhook 也按照链式的方式执行: ### gRPC 服务示例 -用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中。例如,其支持的接口有: +用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中: ```protobuff syntax = "proto3"; diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index 8dc9641b9..41f471d14 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -16,6 +16,12 @@ syntax = "proto3"; +option csharp_namespace = "Emqx.Exhook.V1"; +option go_package = "emqx.io/grpc/exhook"; +option java_multiple_files = true; +option java_package = "io.emqx.exhook"; +option java_outer_classname = "EmqxExHookProto"; + package emqx.exhook.v1; service HookProvider { diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config index b14cb446f..eafa20d85 100644 --- a/apps/emqx_exhook/rebar.config +++ b/apps/emqx_exhook/rebar.config @@ -1,11 +1,11 @@ %%-*- mode: erlang -*- {plugins, [rebar3_proper, - {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.10.0"}}} + {grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}} ]}. {deps, - [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.6.0"}}} + [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} ]}. {grpc, @@ -15,7 +15,9 @@ ]}. {provider_hooks, - [{pre, [{compile, {grpc, gen}}]}]}. + [{pre, [{compile, {grpc, gen}}, + {clean, {grpc, clean}}]} +]}. {edoc_opts, [{preprocess, true}]}. diff --git a/apps/emqx_exhook/src/emqx_exhook_app.erl b/apps/emqx_exhook/src/emqx_exhook_app.erl index b008c251a..3b829e7cd 100644 --- a/apps/emqx_exhook/src/emqx_exhook_app.erl +++ b/apps/emqx_exhook/src/emqx_exhook_app.erl @@ -22,6 +22,8 @@ -emqx_plugin(extension). +-define(REGISTRAY, emqx_exhook_registray). + -export([ start/2 , stop/1 , prep_stop/1 @@ -30,8 +32,8 @@ %% Internal export -export([ load_server/2 , unload_server/1 - , load_exhooks/0 , unload_exhooks/0 + , init_hook_registray/0 ]). %%-------------------------------------------------------------------- @@ -41,12 +43,12 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_exhook_sup:start_link(), + %% Collect all available hooks + _ = init_hook_registray(), + %% Load all dirvers load_all_servers(), - %% Register all hooks - _ = load_exhooks(), - %% Register CLI emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []), {ok, Sup}. @@ -55,6 +57,7 @@ prep_stop(State) -> emqx_ctl:unregister_command(exhook), _ = unload_exhooks(), ok = unload_all_servers(), + _ = deinit_hook_registray(), State. stop(_State) -> @@ -81,11 +84,17 @@ unload_server(Name) -> %%-------------------------------------------------------------------- %% Exhooks -load_exhooks() -> - [emqx:hook(Name, {M, F, A}) || {Name, {M, F, A}} <- search_exhooks()]. +init_hook_registray() -> + _ = ets:new(?REGISTRAY, [public, named_table]), + [ets:insert(?REGISTRAY, {Name, {M, F, A}, 0}) + || {Name, {M, F, A}} <- search_exhooks()]. unload_exhooks() -> - [emqx:unhook(Name, {M, F}) || {Name, {M, F, _A}} <- search_exhooks()]. + [emqx:unhook(Name, {M, F}) || + {Name, {M, F, _A}, _} <- ets:tab2list(?REGISTRAY)]. + +deinit_hook_registray() -> + ets:delete(?REGISTRAY). search_exhooks() -> search_exhooks(ignore_lib_apps(application:loaded_applications())). diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index 3a35073ca..e60eadaa7 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -42,6 +42,12 @@ , on_session_terminated/3 ]). +-export([ on_message_publish/1 + , on_message_dropped/3 + , on_message_delivered/2 + , on_message_acked/2 + ]). + %% Utils -export([ message/1 , stringfy/1 @@ -71,6 +77,10 @@ , {'session.discarded', {?MODULE, on_session_discarded, []}} , {'session.takeovered', {?MODULE, on_session_takeovered, []}} , {'session.terminated', {?MODULE, on_session_terminated, []}} + , {'message.publish', {?MODULE, on_message_publish, []}} + , {'message.delivered', {?MODULE, on_message_delivered, []}} + , {'message.acked', {?MODULE, on_message_acked, []}} + , {'message.dropped', {?MODULE, on_message_dropped, []}} ]). %%-------------------------------------------------------------------- @@ -185,6 +195,45 @@ on_session_terminated(ClientInfo, Reason, _SessInfo) -> reason => stringfy(Reason)}, cast('session.terminated', Req). +%%-------------------------------------------------------------------- +%% Message +%%-------------------------------------------------------------------- + +on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) -> + ok; +on_message_publish(Message) -> + Req = #{message => message(Message)}, + case call_fold('message.publish', Req, + fun emqx_exhook_handler:merge_responsed_message/2) of + {StopOrOk, #{message := NMessage}} -> + {StopOrOk, assign_to_message(NMessage, Message)}; + _ -> {ok, Message} + end. + +on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) -> + ok; +on_message_dropped(Message, _By, Reason) -> + Req = #{message => message(Message), + reason => stringfy(Reason) + }, + cast('message.dropped', Req). + +on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) -> + ok; +on_message_delivered(ClientInfo, Message) -> + Req = #{clientinfo => clientinfo(ClientInfo), + message => message(Message) + }, + cast('message.delivered', Req). + +on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) -> + ok; +on_message_acked(ClientInfo, Message) -> + Req = #{clientinfo => clientinfo(ClientInfo), + message => message(Message) + }, + cast('message.acked', Req). + %%-------------------------------------------------------------------- %% Types @@ -256,7 +305,10 @@ stringfy(Term) -> unicode:characters_to_binary((io_lib:format("~0p", [Term]))). hexstr(B) -> - iolist_to_binary([io_lib:format("~2.16.0B", [X]) || X <- binary_to_list(B)]). + << <<(hexchar(H)), (hexchar(L))>> || <> <= B>>. + +hexchar(I) when I >= 0 andalso I < 10 -> I + $0; +hexchar(I) -> I - 10 + $A. %%-------------------------------------------------------------------- %% Acc funcs diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 451983437..76a2e491d 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -20,6 +20,7 @@ -logger_header("[ExHook Svr]"). +-define(REGISTRAY, emqx_exhook_registray). -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). %% Load/Unload @@ -83,13 +84,19 @@ load(Name0, Opts0) -> Name = prefix(Name0), {SvrAddr, ClientOpts} = channel_opts(Opts0), - case emqx_exhook_sup:start_grpc_client_channel(Name, SvrAddr, ClientOpts) of + case emqx_exhook_sup:start_grpc_client_channel( + Name, + SvrAddr, + ClientOpts) of {ok, _ChannPoolPid} -> case do_init(Name) of {ok, HookSpecs} -> %% Reigster metrics - Prefix = lists:flatten(io_lib:format("exhook.~s.", [Name])), + Prefix = lists:flatten( + io_lib:format("exhook.~s.", [Name])), ensure_metrics(Prefix, HookSpecs), + %% Ensure hooks + ensure_hooks(HookSpecs), {ok, #server{name = Name, options = Opts0, channel = _ChannPoolPid, @@ -126,8 +133,9 @@ channel_opts(Opts) -> {SvrAddr, ClientOpts}. -spec unload(server()) -> ok. -unload(#server{name = Name}) -> +unload(#server{name = Name, hookspec = HookSpecs}) -> _ = do_deinit(Name), + _ = may_unload_hooks(HookSpecs), _ = emqx_exhook_sup:stop_grpc_client_channel(Name), ok. @@ -177,6 +185,31 @@ ensure_metrics(Prefix, HookSpecs) -> || Hookpoint <- maps:keys(HookSpecs)], lists:foreach(fun emqx_metrics:ensure/1, Keys). +ensure_hooks(HookSpecs) -> + lists:foreach(fun(Hookpoint) -> + case ets:lookup(?REGISTRAY, Hookpoint) of + [] -> + ?LOG(warning, "Hoook ~s not found in registray", [Hookpoint]); + [{Hookpoint, {M, F, A}, _}] -> + emqx_hooks:put(Hookpoint, {M, F, A}), + ets:update_counter(?REGISTRAY, Hookpoint, {3, 1}) + end + end, maps:keys(HookSpecs)). + +may_unload_hooks(HookSpecs) -> + lists:foreach(fun(Hookpoint) -> + case ets:update_counter(?REGISTRAY, Hookpoint, {3, -1}) of + Cnt when Cnt =< 0 -> + case ets:lookup(?REGISTRAY, Hookpoint) of + [{Hookpoint, {M, F, _A}, _}] -> + emqx_hooks:del(Hookpoint, {M, F}); + _ -> ok + end, + ets:delete(?REGISTRAY, Hookpoint); + _ -> ok + end + end, maps:keys(HookSpecs)). + format(#server{name = Name, hookspec = Hooks}) -> io_lib:format("name=~p, hooks=~0p", [Name, Hooks]). diff --git a/apps/emqx_exproto/rebar.config b/apps/emqx_exproto/rebar.config index d3b297dca..3fa9f6f8a 100644 --- a/apps/emqx_exproto/rebar.config +++ b/apps/emqx_exproto/rebar.config @@ -9,11 +9,11 @@ {parse_transform}]}. {plugins, [rebar3_proper, - {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.10.0"}}} + {grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}} ]}. {deps, - [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.6.0"}}} + [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} ]}. {grpc, @@ -21,10 +21,12 @@ {protos, ["priv/protos"]}, {gpb_opts, [{module_name_prefix, "emqx_"}, {module_name_suffix, "_pb"}]} - ]}. +]}. {provider_hooks, - [{pre, [{compile, {grpc, gen}}]}]}. + [{pre, [{compile, {grpc, gen}}, + {clean, {grpc, clean}}]} +]}. {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index b29e0b7f5..efe4b7f22 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -515,8 +515,12 @@ handle_timeout(TRef, keepalive, State = #state{socket = Socket, end; handle_timeout(_TRef, emit_stats, State = #state{channel = Channel}) -> - ClientId = emqx_exproto_channel:info(clientid, Channel), - emqx_cm:set_chan_stats(ClientId, stats(State)), + case emqx_exproto_channel:info(clientid, Channel) of + undefined -> + ignore; + ClientId -> + emqx_cm:set_chan_stats(ClientId, stats(State)) + end, {ok, State#state{stats_timer = undefined}}; handle_timeout(TRef, Msg, State) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index b2e75d7ff..382c1051b 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -61,6 +61,8 @@ restart(#{node := Node, identifier := Identifier}, _Params) -> end; %% Restart listeners in the cluster. +restart(#{identifier := <<"http", _/binary>>}, _Params) -> + {403, <<"http_listener_restart_unsupported">>}; restart(#{identifier := Identifier}, _Params) -> Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()], case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index fc18bc1bf..20b4f1b01 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -509,6 +509,18 @@ listeners(["stop", _Proto, ListenOn]) -> end, stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1); +listeners(["restart", "http:management"]) -> + restart_http_listener(http, emqx_management); + +listeners(["restart", "https:management"]) -> + restart_http_listener(https, emqx_management); + +listeners(["restart", "http:dashboard"]) -> + restart_http_listener(http, emqx_dashboard); + +listeners(["restart", "https:dashboard"]) -> + restart_http_listener(https, emqx_dashboard); + listeners(["restart", Identifier]) -> case emqx_listeners:restart_listener(Identifier) of ok -> @@ -661,3 +673,17 @@ listener_identifier(Protocol, ListenOn) -> ID -> ID end. + +restart_http_listener(Scheme, AppName) -> + Listeners = application:get_env(AppName, listeners, []), + case lists:keyfind(Scheme, 1, Listeners) of + false -> + emqx_ctl:print("Listener ~s not exists!~n", [AppName]); + {Scheme, Port, Options} -> + ModName = http_mod_name(AppName), + ModName:stop_listener({Scheme, Port, Options}), + ModName:start_listener({Scheme, Port, Options}) + end. + +http_mod_name(emqx_management) -> emqx_mgmt_http; +http_mod_name(Name) -> Name. diff --git a/apps/emqx_management/src/emqx_mgmt_http.erl b/apps/emqx_management/src/emqx_mgmt_http.erl index 2bf6ef38a..aee057d39 100644 --- a/apps/emqx_management/src/emqx_mgmt_http.erl +++ b/apps/emqx_management/src/emqx_mgmt_http.erl @@ -21,6 +21,8 @@ -export([ start_listeners/0 , handle_request/2 , stop_listeners/0 + , start_listener/1 + , stop_listener/1 ]). -export([init/2]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 409095e02..e178fc9d2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -202,15 +202,11 @@ http_connectivity(Url) -> -spec(http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}). http_connectivity(Url, Timeout) -> - case uri_string:parse(uri_string:normalize(Url)) of - {error, Reason, _} -> - {error, Reason}; - #{host := Host, port := Port} -> + case emqx_http_lib:uri_parse(Url) of + {ok, #{host := Host, port := Port}} -> tcp_connectivity(str(Host), Port, Timeout); - #{host := Host, scheme := Scheme} -> - tcp_connectivity(str(Host), default_port(Scheme), Timeout); - _ -> - {error, {invalid_url, Url}} + {error, Reason} -> + {error, Reason} end. -spec tcp_connectivity(Host :: inet:socket_address() | inet:hostname(), @@ -229,13 +225,6 @@ tcp_connectivity(Host, Port, Timeout) -> {error, Reason} -> {error, Reason} end. -default_port("http") -> 80; -default_port("https") -> 443; -default_port(<<"http">>) -> 80; -default_port(<<"https">>) -> 443; -default_port(Scheme) -> throw({bad_scheme, Scheme}). - - unwrap(<<"${", Val/binary>>) -> binary:part(Val, {0, byte_size(Val)-1}). diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index e5aa25c3a..4e63b54df 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -281,7 +281,7 @@ create_req(_, Path, Headers, Body) -> parse_action_params(Params = #{<<"url">> := URL}) -> try - #{path := CommonPath} = uri_string:parse(URL), + {ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL), Method = method(maps:get(<<"method">>, Params, <<"POST">>)), Headers = headers(maps:get(<<"headers">>, Params, undefined)), NHeaders = ensure_content_type_header(Headers, Method), @@ -318,31 +318,19 @@ str(Str) when is_list(Str) -> Str; str(Atom) when is_atom(Atom) -> atom_to_list(Atom); str(Bin) when is_binary(Bin) -> binary_to_list(Bin). -add_default_scheme(<<"http://", _/binary>> = URL) -> - URL; -add_default_scheme(<<"https://", _/binary>> = URL) -> - URL; -add_default_scheme(URL) -> - <<"http://", URL/binary>>. - pool_opts(Params = #{<<"url">> := URL}, ResId) -> - #{host := Host0, scheme := Scheme} = URIMap = - uri_string:parse(binary_to_list(add_default_scheme(URL))), - DefaultPort = case is_https(Scheme) of - true -> 443; - false -> 80 - end, - Port = maps:get(port, URIMap, DefaultPort), + {ok, #{host := Host0, + port := Port, + scheme := Scheme}} = emqx_http_lib:uri_parse(URL), PoolSize = maps:get(<<"pool_size">>, Params, 32), ConnectTimeout = cuttlefish_duration:parse(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))), {Inet, Host} = parse_host(Host0), - TransportOpts = - case is_https(Scheme) of - true -> [Inet | get_ssl_opts(Params, ResId)]; - false -> [Inet] - end, - Opts = case is_https(Scheme) of + TransportOpts = case Scheme =:= https of + true -> [Inet | get_ssl_opts(Params, ResId)]; + false -> [Inet] + end, + Opts = case Scheme =:= https of true -> [{transport_opts, TransportOpts}, {transport, ssl}]; false -> [{transport_opts, TransportOpts}] end, @@ -357,10 +345,6 @@ pool_opts(Params = #{<<"url">> := URL}, ResId) -> pool_name(ResId) -> list_to_atom("webhook:" ++ str(ResId)). -is_https(Scheme) when is_list(Scheme) -> is_https(list_to_binary(Scheme)); -is_https(<<"https", _/binary>>) -> true; -is_https(_) -> false. - get_ssl_opts(Opts, ResId) -> Dir = filename:join([emqx:get_env(data_dir), "rule", ResId]), [{ssl, true}, {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Opts, Dir)}]. diff --git a/apps/emqx_web_hook/src/emqx_web_hook_app.erl b/apps/emqx_web_hook/src/emqx_web_hook_app.erl index 54ac9c317..67775e00f 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_app.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_app.erl @@ -39,31 +39,19 @@ stop(_State) -> emqx_web_hook:unload(), ehttpc_sup:stop_pool(?APP). -add_default_scheme(URL) when is_list(URL) -> - binary_to_list(add_default_scheme(list_to_binary(URL))); -add_default_scheme(<<"http://", _/binary>> = URL) -> - URL; -add_default_scheme(<<"https://", _/binary>> = URL) -> - URL; -add_default_scheme(URL) -> - <<"http://", URL/binary>>. - translate_env() -> {ok, URL} = application:get_env(?APP, url), - #{host := Host0, - path := Path0, - scheme := Scheme} = URIMap = uri_string:parse(add_default_scheme(uri_string:normalize(URL))), - Port = maps:get(port, URIMap, case Scheme of - "https" -> 443; - "http" -> 80 - end), + {ok, #{host := Host0, + path := Path0, + port := Port, + scheme := Scheme}} = emqx_http_lib:uri_parse(URL), Path = path(Path0), {Inet, Host} = parse_host(Host0), PoolSize = application:get_env(?APP, pool_size, 32), MoreOpts = case Scheme of - "http" -> + http -> [{transport_opts, [Inet]}]; - "https" -> + https -> CACertFile = application:get_env(?APP, cacertfile, undefined), CertFile = application:get_env(?APP, certfile, undefined), KeyFile = application:get_env(?APP, keyfile, undefined), diff --git a/bin/emqx b/bin/emqx index b3bac5b66..006284b4c 100755 --- a/bin/emqx +++ b/bin/emqx @@ -209,7 +209,12 @@ generate_config() { # the vm, we need to pass it in twice. CONFIG_ARGS=" -config $RUNNER_ETC_DIR/app.config -args_file $RUNNER_ETC_DIR/vm.args -vm_args $RUNNER_ETC_DIR/vm.args " else - CONFIG_ARGS=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema -c "$RUNNER_ETC_DIR"/emqx.conf -d "$RUNNER_DATA_DIR"/configs generate) + EMQX_LICENSE_CONF_OPTION="" + if [ "${EMQX_LICENSE_CONF:-}" != "" ]; then + EMQX_LICENSE_CONF_OPTION="-i ${EMQX_LICENSE_CONF}" + fi + # shellcheck disable=SC2086 + CONFIG_ARGS=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema $EMQX_LICENSE_CONF_OPTION -c "$RUNNER_ETC_DIR"/emqx.conf -d "$RUNNER_DATA_DIR"/configs generate) ## Merge cuttlefish generated *.args into the vm.args CUTTLE_GEN_ARG_FILE=$(echo "$CONFIG_ARGS" | sed -n 's/^.*\(vm_args[[:space:]]\)//p' | awk '{print $1}') @@ -388,10 +393,10 @@ case "$1" in fi sleep 1 if relx_nodetool "ping" >/dev/null 2>&1; then - echo "$EMQX_DISCR $REL_VSN is started successfully!" + echo "$EMQX_DESCRIPTION $REL_VSN is started successfully!" exit 0 fi - done && echo "$EMQX_DISCR $REL_VSN failed to start within ${WAIT_FOR_ERLANG:-15} seconds," + done && echo "$EMQX_DESCRIPTION $REL_VSN failed to start within ${WAIT_FOR_ERLANG:-15} seconds," echo "see the output of '$0 console' for more information." echo "If you want to wait longer, set the environment variable" echo "WAIT_FOR_ERLANG to the number of seconds to wait." @@ -410,7 +415,7 @@ case "$1" in ;; restart|reboot) - echo "$EMQX_DISCR $REL_VSN is stopped: $("$RUNNER_BIN_DIR"/emqx stop)" + echo "$EMQX_DESCRIPTION $REL_VSN is stopped: $("$RUNNER_BIN_DIR"/emqx stop)" "$RUNNER_BIN_DIR"/emqx start ;; diff --git a/build b/build index 03b734fc8..d7cd01183 100755 --- a/build +++ b/build @@ -12,19 +12,6 @@ ARTIFACT="$2" # ensure dir cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")" -case "$PROFILE" in - emqx-edge*) - export EMQX_DESC="EMQ X Edge" - ;; - emqx*) - export EMQX_DESC="EMQ X Broker" - ;; - *) - echo "Unknown profile $PROFILE" - exit 1 - ;; -esac - PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}" export PKG_VSN diff --git a/data/emqx_vars b/data/emqx_vars index 5159fa246..8081badbe 100644 --- a/data/emqx_vars +++ b/data/emqx_vars @@ -13,9 +13,11 @@ RUNNER_LIB_DIR="{{ runner_lib_dir }}" RUNNER_ETC_DIR="{{ runner_etc_dir }}" RUNNER_DATA_DIR="{{ runner_data_dir }}" RUNNER_USER="{{ runner_user }}" -EMQX_DISCR="{{ emqx_description }}" LIB_EKKA_DIR="${RUNNER_LIB_DIR}/ekka-$(grep ekka "${RUNNER_ROOT_DIR}/releases/RELEASES" | awk -F '\"' '{print $2}')" +EMQX_LICENSE_CONF='' +export EMQX_DESCRIPTION='{{ emqx_description }}' + ## computed vars REL_NAME="emqx" ERTS_PATH="$RUNNER_ROOT_DIR/erts-$ERTS_VSN/bin" diff --git a/data/loaded_plugins.tmpl b/data/loaded_plugins.tmpl index 6fde9cace..d0dac7fe1 100644 --- a/data/loaded_plugins.tmpl +++ b/data/loaded_plugins.tmpl @@ -1,7 +1,8 @@ {emqx_management, true}. -{emqx_recon, true}. -{emqx_retainer, true}. {emqx_dashboard, true}. -{emqx_telemetry, true}. +{emqx_modules, {{enable_plugin_emqx_modules}}}. +{emqx_recon, {{enable_plugin_emqx_recon}}}. +{emqx_retainer, {{enable_plugin_emqx_retainer}}}. +{emqx_telemetry, {{enable_plugin_emqx_telemetry}}}. {emqx_rule_engine, {{enable_plugin_emqx_rule_engine}}}. {emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}. diff --git a/etc/emqx.conf b/etc/emqx.conf index b7835283f..dd586dccb 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1,10 +1,8 @@ -##==================================================================== -## EMQ X Configuration R4.0 -##==================================================================== +## EMQ X Configuration 4.3 -##-------------------------------------------------------------------- -## Cluster -##-------------------------------------------------------------------- +## NOTE: Do not change format of CONFIG_SECTION_{BGN,END} comments! + +## CONFIG_SECTION_BGN=cluster ================================================== ## Cluster name. ## @@ -173,6 +171,8 @@ cluster.autoclean = 5m ## Value: String ## cluster.k8s.namespace = default +## CONFIG_SECTION_END=cluster ================================================== + ##-------------------------------------------------------------------- ## Node ##-------------------------------------------------------------------- @@ -301,9 +301,8 @@ node.crash_dump = "{{ platform_log_dir }}/crash.dump" node.dist_listen_min = 6369 node.dist_listen_max = 6369 -##-------------------------------------------------------------------- -## RPC -##-------------------------------------------------------------------- +## CONFIG_SECTION_BGN=rpc ====================================================== + ## RPC Mode. ## ## Value: sync | async @@ -400,9 +399,9 @@ rpc.socket_recbuf = 1MB ## Value: Seconds rpc.socket_buffer = 1MB -##-------------------------------------------------------------------- -## Log -##-------------------------------------------------------------------- +## CONFIG_SECTION_END=rpc ====================================================== + +## CONFIG_SECTION_BGN=logger =================================================== ## Where to emit the logs. ## Enable the console (standard output) logs. @@ -571,6 +570,8 @@ log.rotation.count = 5 ## #log.burst_limit = "20000, 1s" +## CONFIG_SECTION_END=logger =================================================== + ##-------------------------------------------------------------------- ## Authentication/Access Control ##-------------------------------------------------------------------- @@ -685,9 +686,7 @@ mqtt.strict_mode = false ## Value: String ## mqtt.response_information = example -##-------------------------------------------------------------------- -## Zones -##-------------------------------------------------------------------- +## CONFIG_SECTION_BGN=zones =================================================== ##-------------------------------------------------------------------- ## External Zone @@ -1021,9 +1020,9 @@ zone.internal.strict_mode = false ## Value: true | false zone.internal.bypass_auth_plugins = true -##-------------------------------------------------------------------- -## Listeners -##-------------------------------------------------------------------- +## CONFIG_SECTION_END=zones ==================================================== + +## CONFIG_SECTION_BGN=listeners ================================================ ##-------------------------------------------------------------------- ## MQTT/TCP - External TCP Listener for MQTT Protocol @@ -1575,7 +1574,7 @@ listener.ws.external.access.1 = "allow all" ## Set to false for WeChat MiniApp. ## ## Value: true | false -## listener.ws.external.fail_if_no_subprotocol = on +## listener.ws.external.fail_if_no_subprotocol = true ## Supported subprotocols ## @@ -2047,9 +2046,10 @@ listener.wss.external.allow_origin_absence = true ## Value: http://url eg. https://localhost:8084, https://127.0.0.1:8084 listener.wss.external.check_origins = "https://localhost:8084, https://127.0.0.1:8084" -##-------------------------------------------------------------------- -## Modules -##-------------------------------------------------------------------- +## CONFIG_SECTION_END=listeners ================================================ + +## CONFIG_SECTION_BGN=modules ================================================== + ## The file to store loaded module names. ## ## Value: File @@ -2176,9 +2176,9 @@ broker.shared_dispatch_ack_enabled = false ## Value: Flag broker.route_batch_clean = off -##-------------------------------------------------------------------- -## System Monitor -##-------------------------------------------------------------------- +## CONFIG_SECTION_END=modules ================================================== + +## CONFIG_SECTION_BGN=sys_mon ================================================== ## Enable Long GC monitoring. Disable if the value is 0. ## Notice: don't enable the monitor in production for: @@ -2324,4 +2324,6 @@ alarm.size_limit = 1000 ## Default: 24h alarm.validity_period = 24h +## CONFIG_SECTION_END=sys_mon ================================================== + {{ additional_configs }} diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl new file mode 100644 index 000000000..b2c93a2bb --- /dev/null +++ b/include/emqx_release.hrl @@ -0,0 +1,39 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_RELEASE_HRL). +-define(EMQX_RELEASE_HRL, true). + +%% NOTE: this is the release version which is not always the same +%% as the emqx app version defined in emqx.app.src +%% App (plugin) versions are bumped independently. +%% e.g. EMQX_RELEASE being 4.3.1 does no always imply emqx app +%% should be 4.3.1, as it might be the case that only one of the +%% plugins had a bug to fix. So for a hot beam upgrade, only the app +%% with beam files changed needs an upgrade. + +%% NOTE: This version number should be manually bumped for each release + +-ifndef(EMQX_ENTERPRISE). + +-define(EMQX_RELEASE, {opensource, "4.3-beta.1"}). + +-else. + + +-endif. + +-endif. diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard.erl index cbf0d81d5..b7b6ca42e 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.erl @@ -23,6 +23,8 @@ -export([ start_listeners/0 , stop_listeners/0 + , start_listener/1 + , stop_listener/1 ]). %% for minirest @@ -87,7 +89,8 @@ listener_name(Proto) -> http_handlers() -> Plugins = lists:map(fun(Plugin) -> Plugin#plugin.name end, emqx_plugins:list()), [{"/api/v4/", - minirest:handler(#{apps => Plugins, filter => fun ?MODULE:filter/1}), + minirest:handler(#{apps => Plugins ++ [emqx_modules], + filter => fun ?MODULE:filter/1}), [{authorization, fun ?MODULE:is_authorized/1}]}]. %%-------------------------------------------------------------------- @@ -113,6 +116,7 @@ is_authorized(_Path, Req) -> _ -> false end. +filter(#{app := emqx_modules}) -> true; filter(#{app := App}) -> case emqx_plugins:find_plugin(App) of false -> false; diff --git a/lib-extra/README.md b/lib-extra/README.md new file mode 100644 index 000000000..3196f4b8c --- /dev/null +++ b/lib-extra/README.md @@ -0,0 +1,50 @@ +# EMQ X Extra plugin apps + +This directory keeps a `plugins` file which defines all the approved +external plugins from open-source community. + +The (maybe broken) symlinks are keept to help testing plugins +in this umbrella project. + +## How to build `plugin_foo` + +Add `plugin_foo` as a rebar3 dependency in `plugins` file. + +e.g. + +``` +{erlang_plugins, + [ {plugin_foo, {git, "https://github.com/bar/plugin-foo.git", {tag, "0.1.0"}}} + ] +}. +``` + +Exeucte command + +``` +export EMQX_EXTRA_PLUGINS='plugin_foo' +make +``` + +The plugin source code should downloaded to `_build/default/lib/plugin_foo` + +NOTE: Shallow clone with depth=1 is used for git dependencies. + +## How to test `plugin_foo` + +If the source code in `_build` is already symlinked from `lib-extra/`, +you may directlly run tests with commands below. + +```bash +./rebar3 eunit --dir lib-extra/plugin_foo +./rebar3 ct --dir lib-extra/plugin_foo +``` + +In case the plugin is being actively developed +it can be cloned to `lib-extra`, e.g. `lib-extra/plugin-bar-dev` +then it can be tested with commands below: + +```bash +./rebar3 eunit --dir lib-extra/plugin-bar-dev +./rebar3 ct --dir lib-extra/plugin-bar-dev +``` diff --git a/lib-extra/emqx_plugin_template b/lib-extra/emqx_plugin_template new file mode 120000 index 000000000..6cf8106d2 --- /dev/null +++ b/lib-extra/emqx_plugin_template @@ -0,0 +1 @@ +../_build/default/lib/emqx_plugin_template \ No newline at end of file diff --git a/lib-extra/plugins b/lib-extra/plugins new file mode 100644 index 000000000..0f3177b9e --- /dev/null +++ b/lib-extra/plugins @@ -0,0 +1,4 @@ +{erlang_plugins, + [ {emqx_plugin_template, {git, "https://github.com/emqx/emqx-plugin-template", {branch, "master"}}} + ] +}. diff --git a/pkg-vsn.sh b/pkg-vsn.sh index 0c1c0cb33..9e8c26320 100755 --- a/pkg-vsn.sh +++ b/pkg-vsn.sh @@ -1,20 +1,21 @@ #!/usr/bin/env bash -set -e -u +set -euo pipefail # This script prints the release version for emqx # ensure dir cd -P -- "$(dirname -- "$0")" -case $(uname) in - *Darwin*) SED="sed -E";; - *) SED="sed -r";; -esac +if [ -f EMQX_ENTERPRISE ]; then + EDITION='enterprise' +else + EDITION='opensource' +fi -# comment SUFFIX out when finalising RELEASE -RELEASE="$(grep -oE '\{vsn, (.*)\}' src/emqx.app.src | $SED 's/\{vsn, (.*)\}/\1/g' | $SED 's/\"//g')" -if [ -d .git ] && ! git describe --tags --match "v${RELEASE}" --exact >/dev/null 2>&1; then - SUFFIX="-$(git rev-parse HEAD | cut -b1-8)" +RELEASE="$(grep -E "define.+EMQX_RELEASE.+${EDITION}" include/emqx_release.hrl | cut -d '"' -f2)" + +if [ -d .git ] && ! git describe --tags --match "${RELEASE}" --exact >/dev/null 2>&1; then + SUFFIX="-$(git rev-parse HEAD | cut -b1-8)" fi echo "${RELEASE}${SUFFIX:-}" diff --git a/rebar.config.erl b/rebar.config.erl index 6aafeffd1..69d18e628 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -11,21 +11,43 @@ bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}. deps(Config) -> - {deps, OldDpes} = lists:keyfind(deps, 1, Config), + {deps, OldDeps} = lists:keyfind(deps, 1, Config), MoreDeps = case provide_bcrypt_dep() of true -> [bcrypt()]; false -> [] end, - lists:keystore(deps, 1, Config, {deps, OldDpes ++ MoreDeps}). + lists:keystore(deps, 1, Config, {deps, OldDeps ++ MoreDeps ++ extra_deps()}). + +extra_deps() -> + {ok, Proplist} = file:consult("lib-extra/plugins"), + AllPlugins = proplists:get_value(erlang_plugins, Proplist), + Filter = string:split(os:getenv("EMQX_EXTRA_PLUGINS", ""), ",", all), + filter_extra_deps(AllPlugins, Filter). + +filter_extra_deps(AllPlugins, ["all"]) -> + AllPlugins; +filter_extra_deps(AllPlugins, Filter) -> + filter_extra_deps(AllPlugins, Filter, []). +filter_extra_deps([], _, Acc) -> + lists:reverse(Acc); +filter_extra_deps([{Plugin, _}=P|More], Filter, Acc) -> + case lists:member(atom_to_list(Plugin), Filter) of + true -> + filter_extra_deps(More, Filter, [P|Acc]); + false -> + filter_extra_deps(More, Filter, Acc) + end. overrides() -> [ {add, [ {extra_src_dirs, [{"etc", [{recursive,true}]}]} , {erl_opts, [ deterministic , {compile_info, [{emqx_vsn, get_vsn()}]} - | [{d, 'EMQX_ENTERPRISE'} || is_enterprise()] ]} ]} - ]. + ] ++ community_plugin_overrides(). + +community_plugin_overrides() -> + [{add, App, [ {erl_opts, [{i, "include"}]}]} || App <- relx_plugin_apps_extra()]. config() -> [ {plugins, plugins()} @@ -36,14 +58,14 @@ config() -> is_enterprise() -> filelib:is_regular("EMQX_ENTERPRISE"). -extra_lib_dir() -> +alternative_lib_dir() -> case is_enterprise() of true -> "lib-ee"; false -> "lib-ce" end. project_app_dirs() -> - ["apps/*", extra_lib_dir() ++ "/*", "."]. + ["apps/*", alternative_lib_dir() ++ "/*", "."]. plugins() -> [ {relup_helper,{git,"https://github.com/emqx/relup_helper", {branch,"master"}}}, @@ -80,17 +102,18 @@ test_compile_opts() -> ]. profiles() -> + Vsn = get_vsn(), [ {'emqx', [ {erl_opts, prod_compile_opts()} - , {relx, relx('emqx')} + , {relx, relx(Vsn, cloud, bin)} ]} , {'emqx-pkg', [ {erl_opts, prod_compile_opts()} - , {relx, relx('emqx-pkg')} + , {relx, relx(Vsn, cloud, pkg)} ]} , {'emqx-edge', [ {erl_opts, prod_compile_opts()} - , {relx, relx('emqx-edge')} + , {relx, relx(Vsn, edge, bin)} ]} , {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()} - , {relx, relx('emqx-edge-pkg')} + , {relx, relx(Vsn, edge, pkg)} ]} , {check, [ {erl_opts, test_compile_opts()} ]} @@ -99,41 +122,81 @@ profiles() -> , {erl_opts, test_compile_opts() ++ erl_opts_i()} , {extra_src_dirs, [{"test", [{recursive,true}]}]} ]} - ]. + ] ++ ee_profiles(Vsn). -relx(Profile) -> - Vsn = get_vsn(), +%% RelType: cloud (full size) | edge (slim size) +%% PkgType: bin | pkg +relx(Vsn, RelType, PkgType) -> + IsEnterprise = is_enterprise(), [ {include_src,false} , {include_erts, true} , {extended_start_script,false} , {generate_start_script,false} , {sys_config,false} , {vm_args,false} - ] ++ do_relx(Profile, Vsn). - -do_relx('emqx', Vsn) -> - [ {release, {emqx, Vsn}, relx_apps(cloud)} - , {overlay, relx_overlay(cloud)} - , {overlay_vars, overlay_vars(["vars/vars-cloud.config","vars/vars-bin.config"])} - ]; -do_relx('emqx-pkg', Vsn) -> - [ {release, {emqx, Vsn}, relx_apps(cloud)} - , {overlay, relx_overlay(cloud)} - , {overlay_vars, overlay_vars(["vars/vars-cloud.config","vars/vars-pkg.config"])} - ]; -do_relx('emqx-edge', Vsn) -> - [ {release, {emqx, Vsn}, relx_apps(edge)} - , {overlay, relx_overlay(edge)} - , {overlay_vars, overlay_vars(["vars/vars-edge.config","vars/vars-bin.config"])} - ]; -do_relx('emqx-edge-pkg', Vsn) -> - [ {release, {emqx, Vsn}, relx_apps(edge)} - , {overlay, relx_overlay(edge)} - , {overlay_vars, overlay_vars(["vars/vars-edge.config","vars/vars-pkg.config"])} + , {release, {emqx, Vsn}, relx_apps(RelType)} + , {overlay, relx_overlay(RelType)} + , {overlay_vars, [ {built_on_arch, rebar_utils:get_arch()} + , {emqx_description, emqx_description(RelType, IsEnterprise)} + | overlay_vars(RelType, PkgType, IsEnterprise)]} ]. -overlay_vars(Files) -> - [{built_on_arch, rebar_utils:get_arch()} | Files]. +emqx_description(cloud, true) -> "EMQ X Enterprise"; +emqx_description(cloud, false) -> "EMQ X Broker"; +emqx_description(edge, _) -> "EMQ X Edge". + + +overlay_vars(_RelType, PkgType, true) -> + ee_overlay_vars(PkgType); +overlay_vars(RelType, PkgType, false) -> + overlay_vars_rel(RelType) ++ overlay_vars_pkg(PkgType). + +%% vars per release type, cloud or edge +overlay_vars_rel(RelType) -> + VmArgs = case RelType of + cloud -> "vm.args"; + edge -> "vm.args.edge" + end, + [ {enable_plugin_emqx_rule_engine, RelType =:= cloud} + , {enable_plugin_emqx_bridge_mqtt, RelType =:= edge} + , {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce + , {enable_plugin_emqx_recon, true} + , {enable_plugin_emqx_retainer, true} + , {enable_plugin_emqx_telemetry, true} + , {vm_args_file, VmArgs} + ]. + +%% vars per packaging type, bin(zip/tar.gz/docker) or pkg(rpm/deb) +overlay_vars_pkg(bin) -> + [ {platform_bin_dir, "bin"} + , {platform_data_dir, "data"} + , {platform_etc_dir, "etc"} + , {platform_lib_dir, "lib"} + , {platform_log_dir, "log"} + , {platform_plugins_dir, "plugins"} + , {runner_root_dir, "$(cd $(dirname $(readlink $0 || echo $0))/..; pwd -P)"} + , {runner_bin_dir, "$RUNNER_ROOT_DIR/bin"} + , {runner_etc_dir, "$RUNNER_ROOT_DIR/etc"} + , {runner_lib_dir, "$RUNNER_ROOT_DIR/lib"} + , {runner_log_dir, "$RUNNER_ROOT_DIR/log"} + , {runner_data_dir, "$RUNNER_ROOT_DIR/data"} + , {runner_user, ""} + ]; +overlay_vars_pkg(pkg) -> + [ {platform_bin_dir, ""} + , {platform_data_dir, "/var/lib/emqx"} + , {platform_etc_dir, "/etc/emqx"} + , {platform_lib_dir, ""} + , {platform_log_dir, "/var/log/emqx"} + , {platform_plugins_dir, "/var/lib/emqx/plugins"} + , {runner_root_dir, "/usr/lib/emqx"} + , {runner_bin_dir, "/usr/bin"} + , {runner_etc_dir, "/etc/emqx"} + , {runner_lib_dir, "$RUNNER_ROOT_DIR/lib"} + , {runner_log_dir, "/var/log/emqx"} + , {runner_data_dir, "/var/lib/emqx"} + , {runner_user, "emqx"} + ]. relx_apps(ReleaseType) -> [ kernel @@ -152,8 +215,9 @@ relx_apps(ReleaseType) -> , {mnesia, load} , {ekka, load} , {emqx_plugin_libs, load} - , emqx_modules ] + ++ [emqx_modules || not is_enterprise()] + ++ [emqx_license || is_enterprise()] ++ [bcrypt || provide_bcrypt_release(ReleaseType)] ++ relx_apps_per_rel(ReleaseType) ++ [{N, load} || N <- relx_plugin_apps(ReleaseType)]. @@ -182,11 +246,11 @@ relx_plugin_apps(ReleaseType) -> , emqx_recon , emqx_rule_engine , emqx_sasl - , emqx_telemetry - , emqx_modules ] + ++ [emqx_telemetry || not is_enterprise()] ++ relx_plugin_apps_per_rel(ReleaseType) - ++ relx_plugin_apps_enterprise(is_enterprise()). + ++ relx_plugin_apps_enterprise(is_enterprise()) + ++ relx_plugin_apps_extra(). relx_plugin_apps_per_rel(cloud) -> [ emqx_lwm2m @@ -208,34 +272,41 @@ relx_plugin_apps_enterprise(true) -> filelib:is_dir(filename:join(["lib-ee", A]))]; relx_plugin_apps_enterprise(false) -> []. +relx_plugin_apps_extra() -> + [Plugin || {Plugin, _} <- extra_deps()]. + relx_overlay(ReleaseType) -> - [ {mkdir,"log/"} - , {mkdir,"data/"} - , {mkdir,"data/mnesia"} - , {mkdir,"data/configs"} - , {mkdir,"data/scripts"} + [ {mkdir, "log/"} + , {mkdir, "data/"} + , {mkdir, "data/mnesia"} + , {mkdir, "data/configs"} + , {mkdir, "data/scripts"} , {template, "data/loaded_plugins.tmpl", "data/loaded_plugins"} , {template, "data/loaded_modules.tmpl", "data/loaded_modules"} - , {template,"data/emqx_vars","releases/emqx_vars"} - , {copy,"bin/emqx","bin/emqx"} - , {copy,"bin/emqx_ctl","bin/emqx_ctl"} - , {copy,"bin/install_upgrade.escript", "bin/install_upgrade.escript"} - , {copy,"bin/emqx","bin/emqx-{{release_version}}"} %% for relup - , {copy,"bin/emqx_ctl","bin/emqx_ctl-{{release_version}}"} %% for relup - , {copy,"bin/install_upgrade.escript", "bin/install_upgrade.escript-{{release_version}}"} %% for relup - , {template,"bin/emqx.cmd","bin/emqx.cmd"} - , {template,"bin/emqx_ctl.cmd","bin/emqx_ctl.cmd"} - , {copy,"bin/nodetool","bin/nodetool"} - , {copy,"bin/nodetool","bin/nodetool-{{release_version}}"} - , {copy,"_build/default/lib/cuttlefish/cuttlefish","bin/cuttlefish"} - , {copy,"_build/default/lib/cuttlefish/cuttlefish","bin/cuttlefish-{{release_version}}"} - , {copy,"priv/emqx.schema","releases/{{release_version}}/"} - ] ++ etc_overlay(ReleaseType). + , {template, "data/emqx_vars", "releases/emqx_vars"} + , {copy, "bin/emqx", "bin/emqx"} + , {copy, "bin/emqx_ctl", "bin/emqx_ctl"} + , {copy, "bin/install_upgrade.escript", "bin/install_upgrade.escript"} + , {copy, "bin/emqx", "bin/emqx-{{release_version}}"} %% for relup + , {copy, "bin/emqx_ctl", "bin/emqx_ctl-{{release_version}}"} %% for relup + , {copy, "bin/install_upgrade.escript", "bin/install_upgrade.escript-{{release_version}}"} %% for relup + , {template, "bin/emqx.cmd", "bin/emqx.cmd"} + , {template, "bin/emqx_ctl.cmd", "bin/emqx_ctl.cmd"} + , {copy, "bin/nodetool", "bin/nodetool"} + , {copy, "bin/nodetool", "bin/nodetool-{{release_version}}"} + , {copy, "_build/default/lib/cuttlefish/cuttlefish", "bin/cuttlefish"} + , {copy, "_build/default/lib/cuttlefish/cuttlefish", "bin/cuttlefish-{{release_version}}"} + , {copy, "priv/emqx.schema", "releases/{{release_version}}/"} + ] ++ case is_enterprise() of + true -> ee_etc_overlay(ReleaseType); + false -> etc_overlay(ReleaseType) + end. etc_overlay(ReleaseType) -> PluginApps = relx_plugin_apps(ReleaseType), Templates = emqx_etc_overlay(ReleaseType) ++ - lists:append([plugin_etc_overlays(App) || App <- PluginApps]), + lists:append([plugin_etc_overlays(App) || App <- PluginApps]) ++ + [community_plugin_etc_overlays(App) || App <- relx_plugin_apps_extra()], [ {mkdir, "etc/"} , {mkdir, "etc/plugins"} , {template, "etc/BUILT_ON", "releases/{{release_version}}/BUILT_ON"} @@ -275,11 +346,15 @@ plugin_etc_overlays(App0) -> [{"{{base_dir}}/lib/"++ App ++"/etc/" ++ F, "etc/plugins/" ++ F} || F <- ConfFiles]. +community_plugin_etc_overlays(App0) -> + App = atom_to_list(App0), + {"{{base_dir}}/lib/"++ App ++"/etc/" ++ App ++ ".conf", "etc/plugins/" ++ App ++ ".conf"}. + %% NOTE: for apps fetched as rebar dependency (there is so far no such an app) %% the overlay should be hand-coded but not to rely on build-time wildcards. find_conf_files(App) -> Dir1 = filename:join(["apps", App, "etc"]), - Dir2 = filename:join([extra_lib_dir(), App, "etc"]), + Dir2 = filename:join([alternative_lib_dir(), App, "etc"]), filelib:wildcard("*.conf", Dir1) ++ filelib:wildcard("*.conf", Dir2). env(Name, Default) -> @@ -294,8 +369,7 @@ get_vsn() -> false -> os:cmd("./pkg-vsn.sh"); Vsn -> Vsn end, - Vsn2 = re:replace(PkgVsn, "v", "", [{return ,list}]), - re:replace(Vsn2, "\n", "", [{return ,list}]). + re:replace(PkgVsn, "\n", "", [{return ,list}]). maybe_dump(Config) -> is_debug() andalso file:write_file("rebar.config.rendered", [io_lib:format("~p.\n", [I]) || I <- Config]), @@ -322,7 +396,7 @@ provide_bcrypt_release(ReleaseType) -> erl_opts_i() -> [{i, "apps"}] ++ [{i, Dir} || Dir <- filelib:wildcard(filename:join(["apps", "*", "include"]))] ++ - [{i, Dir} || Dir <- filelib:wildcard(filename:join([extra_lib_dir(), "*", "include"]))]. + [{i, Dir} || Dir <- filelib:wildcard(filename:join([alternative_lib_dir(), "*", "include"]))]. dialyzer(Config) -> {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config), @@ -334,7 +408,7 @@ dialyzer(Config) -> [ list_to_atom(App) || App <- string:tokens(Value, ",")] end, - AppNames = [emqx | list_dir("apps")] ++ list_dir(extra_lib_dir()), + AppNames = [emqx | list_dir("apps")] ++ list_dir(alternative_lib_dir()), KnownApps = [Name || Name <- AppsToAnalyse, lists:member(Name, AppNames)], @@ -370,3 +444,9 @@ coveralls() -> list_dir(Dir) -> {ok, Names} = file:list_dir(Dir), [list_to_atom(Name) || Name <- Names, filelib:is_dir(filename:join([Dir, Name]))]. + +%% ==== Enterprise supports below ================================================================== + +ee_profiles(_Vsn) -> []. +ee_etc_overlay(_) -> []. +ee_overlay_vars(_PkgType) -> []. diff --git a/scripts/get-dashboard.sh b/scripts/get-dashboard.sh index a0f484d12..b53741048 100755 --- a/scripts/get-dashboard.sh +++ b/scripts/get-dashboard.sh @@ -5,14 +5,15 @@ set -euo pipefail # ensure dir cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.." -VERSION="$1" RELEASE_ASSET_FILE="emqx-dashboard.zip" if [ -f 'EMQX_ENTERPRISE' ]; then + VERSION="${EMQX_EE_DASHBOARD_VERSION}" DASHBOARD_PATH='lib-ee/emqx_dashboard/priv' DASHBOARD_REPO='emqx-enterprise-dashboard-frontend-src' AUTH="Authorization: token $(cat scripts/git-token)" else + VERSION="${EMQX_CE_DASHBOARD_VERSION}" DASHBOARD_PATH='lib-ce/emqx_dashboard/priv' DASHBOARD_REPO='emqx-dashboard-frontend' AUTH="" diff --git a/scripts/git-hook-pre-push.sh b/scripts/git-hook-pre-push.sh new file mode 100755 index 000000000..2f5a9abcd --- /dev/null +++ b/scripts/git-hook-pre-push.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -euo pipefail + +url="$2" + +if [ -f 'EMQX_ENTERPRISE' ]; then + if [[ "$url" != *emqx-enterprise* ]]; then + echo "$(tput setaf 1)error: enterprise_code_to_non_enterprise_repo" + exit 1 + fi +fi diff --git a/scripts/git-hooks-init.sh b/scripts/git-hooks-init.sh new file mode 100755 index 000000000..a9f02ab3a --- /dev/null +++ b/scripts/git-hooks-init.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +set -euo pipefail + +if [ ! -d .git ]; then + exit 0 +fi + +mkdir -p ".git/hooks" + +if [ ! -L '.git/hooks/pre-push' ]; then + ln -sf '../../scripts/git-hook-pre-push.sh' '.git/hooks/pre-push' +fi diff --git a/scripts/split-config.escript b/scripts/split-config.escript new file mode 100755 index 000000000..07caf6503 --- /dev/null +++ b/scripts/split-config.escript @@ -0,0 +1,62 @@ +#!/usr/bin/env escript + +%% This script reads up emqx.conf and split the sections +%% and dump sections to separate files. +%% Sections are grouped between CONFIG_SECTION_BGN and +%% CONFIG_SECTION_END pairs +%% +%% NOTE: this feature is so far not used in opensource +%% edition due to backward-compatibility reasons. + +-mode(compile). + +-define(BASE, <<"emqx">>). + +main(_) -> + {ok, Bin} = file:read_file("etc/emqx.conf"), + Lines = binary:split(Bin, <<"\n">>, [global]), + Sections0 = parse_sections(Lines), + Sections = lists:filter(fun({<<"modules">>, _}) -> false; + (_) -> true + end, Sections0), + ok = dump_sections(Sections). + +parse_sections(Lines) -> + {ok, P} = re:compile("#+\s*CONFIG_SECTION_(BGN|END)\s*=\s*([^\s-]+)\s*="), + Parser = + fun(Line) -> + case re:run(Line, P, [{capture, all_but_first, binary}]) of + {match, [<<"BGN">>, Name]} -> {section_bgn, Name}; + {match, [<<"END">>, Name]} -> {section_end, Name}; + nomatch -> continue + end + end, + parse_sections(Lines, Parser, ?BASE, #{?BASE => []}). + +parse_sections([], _Parse, _Section, Sections) -> + lists:map(fun({N, Lines}) -> {N, lists:reverse(Lines)} end, + maps:to_list(Sections)); +parse_sections([Line | Lines], Parse, Section, Sections) -> + case Parse(Line) of + {section_bgn, Name} -> + ?BASE = Section, %% assert + true = (Name =/= ?BASE), %% assert + false = maps:is_key(Name, Sections), %% assert + Include = iolist_to_binary(["include {{ platform_etc_dir }}/", Name, ".conf"]), + Base = maps:get(?BASE, Sections), + NewSections = Sections#{?BASE := [Include | Base], Name => []}, + parse_sections(Lines, Parse, Name, NewSections); + {section_end, Name} -> + true = (Name =:= Section), %% assert + parse_sections(Lines, Parse, ?BASE, Sections); + continue -> + Acc = maps:get(Section, Sections), + parse_sections(Lines, Parse, Section, Sections#{Section => [Line | Acc]}) + end. + +dump_sections([]) -> ok; +dump_sections([{Name, Lines0} | Rest]) -> + Filename = filename:join(["etc", iolist_to_binary([Name, ".conf.seg"])]), + Lines = [[L, "\n"] || L <- Lines0], + ok = file:write_file(Filename, Lines), + dump_sections(Rest). diff --git a/src/emqx.app.src b/src/emqx.app.src index 70d64fff8..a9ae81cac 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -1,7 +1,7 @@ {application, emqx, - [{description, "EMQ X Broker"}, - {id, "emqx"}, - {vsn, "4.3-beta.1"}, % strict semver, bump manually! + [{id, "emqx"}, + {description, "EMQ X"}, + {vsn, "4.3.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]}, diff --git a/src/emqx_app.erl b/src/emqx_app.erl index cf4f8753d..4f8d47390 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -20,10 +20,14 @@ -export([ start/2 , stop/1 + , get_description/0 + , get_release/0 ]). -define(APP, emqx). +-include("emqx_release.hrl"). + %%-------------------------------------------------------------------- %% Application callbacks %%-------------------------------------------------------------------- @@ -56,9 +60,31 @@ print_banner() -> io:format("Starting ~s on node ~s~n", [?APP, node()]). print_vsn() -> - {ok, Descr} = application:get_key(description), - {ok, Vsn} = application:get_key(vsn), - io:format("~s ~s is running now!~n", [Descr, Vsn]). + io:format("~s ~s is running now!~n", [get_description(), get_release()]). + +get_description() -> + {ok, Descr0} = application:get_key(?APP, description), + case os:getenv("EMQX_DESCRIPTION") of + false -> Descr0; + "" -> Descr0; + Str -> string:strip(Str, both, $\n) + end. + +-ifdef(TEST). +%% When testing, the 'cover' compiler stripps aways compile info +get_release() -> release_in_macro(). +-else. +%% Otherwise print the build number, +%% which may have a git commit in its suffix. +get_release() -> + {_, Vsn} = lists:keyfind(emqx_vsn, 1, ?MODULE:module_info(compile)), + VsnStr = release_in_macro(), + 1 = string:str(Vsn, VsnStr), %% assert + Vsn. +-endif. + +release_in_macro() -> + element(2, ?EMQX_RELEASE). %%-------------------------------------------------------------------- %% Autocluster diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index d09b8416e..8dd3fa602 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -141,6 +141,9 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> %% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> parse_frame(Rest, Header, 2, Options); +parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options) + when Multiplier > 2097152 -> + error(malformed_variable_byte_integer); parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, diff --git a/src/emqx_http_lib.erl b/src/emqx_http_lib.erl index 695879236..195656622 100644 --- a/src/emqx_http_lib.erl +++ b/src/emqx_http_lib.erl @@ -16,7 +16,20 @@ -module(emqx_http_lib). --export([uri_encode/1, uri_decode/1]). +-export([ uri_encode/1 + , uri_decode/1 + , uri_parse/1 + ]). + +-export_type([uri_map/0]). + +-type uri_map() :: #{scheme := http | https, + host := unicode:chardata(), + port := non_neg_integer(), + path => unicode:chardata(), + query => unicode:chardata(), + fragment => unicode:chardata(), + userinfo => unicode:chardata()}. %% @doc Decode percent-encoded URI. %% This is copied from http_uri.erl which has been deprecated since OTP-23 @@ -35,6 +48,51 @@ uri_decode(<<>>) -> uri_encode(URI) when is_binary(URI) -> << <<(uri_encode_binary(Char))/binary>> || <> <= URI >>. +%% @doc Parse URI into a map as uri_string:uri_map(), but with two fields +%% normalised: (1): port number is never 'undefined', default ports are used +%% if missing. (2): scheme is always atom. +-spec uri_parse(string() | binary()) -> {ok, uri_map()} | {error, any()}. +uri_parse(URI) -> + try + {ok, do_parse(uri_string:normalize(URI))} + catch + throw : Reason -> + {error, Reason} + end. + +do_parse({error, Reason, Which}) -> throw({Reason, Which}); +do_parse(URI) -> + %% ensure we return string() instead of binary() in uri_map() values. + Map = uri_string:parse(unicode:characters_to_list(URI)), + case maps:is_key(scheme, Map) of + true -> + normalise_parse_result(Map); + false -> + %% missing scheme, add "http://" and try again + Map2 = uri_string:parse(unicode:characters_to_list(["http://", URI])), + normalise_parse_result(Map2) + end. + +normalise_parse_result(#{host := _, scheme := Scheme0} = Map) -> + Scheme = atom_scheme(Scheme0), + DefaultPort = case https =:= Scheme of + true -> 443; + false -> 80 + end, + Port = case maps:get(port, Map, undefined) of + N when is_number(N) -> N; + _ -> DefaultPort + end, + Map#{ scheme => Scheme + , port => Port + }. + +%% NOTE: so far we only support http schemes. +atom_scheme(Scheme) when is_list(Scheme) -> atom_scheme(list_to_binary(Scheme)); +atom_scheme(<<"https">>) -> https; +atom_scheme(<<"http">>) -> http; +atom_scheme(Other) -> throw({unsupported_scheme, Other}). + uri_encode_binary(Char) -> case reserved(Char) of true -> diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 9b256df0b..11f9f5635 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -85,13 +85,11 @@ stop() -> %% @doc Get sys version -spec(version() -> string()). -version() -> - {ok, Version} = application:get_key(?APP, vsn), Version. +version() -> emqx_app:get_release(). %% @doc Get sys description -spec(sysdescr() -> string()). -sysdescr() -> - {ok, Descr} = application:get_key(?APP, description), Descr. +sysdescr() -> emqx_app:get_description(). %% @doc Get sys uptime -spec(uptime() -> string()). @@ -155,7 +153,8 @@ handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> publish_any(datetime, iolist_to_binary(datetime())), {noreply, heartbeat(State)}; -handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Version, sysdescr = Descr}) -> +handle_info({timeout, TRef, tick}, + State = #state{ticker = TRef, version = Version, sysdescr = Descr}) -> publish_any(version, Version), publish_any(sysdescr, Descr), publish_any(brokers, ekka_mnesia:running_nodes()), diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index fa24dc910..c1fb1b580 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -42,7 +42,8 @@ all() -> groups() -> [{parse, [parallel], [t_parse_cont, - t_parse_frame_too_large + t_parse_frame_too_large, + t_parse_frame_malformed_variable_byte_integer ]}, {connect, [parallel], [t_serialize_parse_v3_connect, @@ -129,6 +130,12 @@ t_parse_frame_too_large(_) -> ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})), ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). +t_parse_frame_malformed_variable_byte_integer(_) -> + MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>, + ParseState = emqx_frame:initial_parse_state(#{}), + ?catch_error(malformed_variable_byte_integer, + emqx_frame:parse(MalformedPayload, ParseState)). + t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, 113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108, diff --git a/test/emqx_http_lib_tests.erl b/test/emqx_http_lib_tests.erl index 78a647417..b5c50e7c9 100644 --- a/test/emqx_http_lib_tests.erl +++ b/test/emqx_http_lib_tests.erl @@ -44,3 +44,31 @@ test_prop_uri(URI) -> Decoded2 = uri_string:percent_decode(Encoded), ?assertEqual(URI, Decoded2), true. + +uri_parse_test_() -> + [ {"default port http", + fun() -> ?assertMatch({ok, #{port := 80, scheme := http, host := "localhost"}}, + emqx_http_lib:uri_parse("localhost")) + end + } + , {"default port https", + fun() -> ?assertMatch({ok, #{port := 443, scheme := https}}, + emqx_http_lib:uri_parse("https://localhost")) + end + } + , {"bad url", + fun() -> ?assertMatch({error, {invalid_uri, _}}, + emqx_http_lib:uri_parse("https://localhost:notnumber")) + end + } + , {"normalise", + fun() -> ?assertMatch({ok, #{scheme := https}}, + emqx_http_lib:uri_parse("HTTPS://127.0.0.1")) + end + } + , {"unsupported_scheme", + fun() -> ?assertEqual({error, {unsupported_scheme, <<"wss">>}}, + emqx_http_lib:uri_parse("wss://127.0.0.1")) + end + } + ]. diff --git a/vars/vars-bin.config b/vars/vars-bin.config deleted file mode 100644 index 3d56fb4f4..000000000 --- a/vars/vars-bin.config +++ /dev/null @@ -1,23 +0,0 @@ -%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- -%% ex: ft=erlang ts=4 sw=4 et - -%% Platform-specific installation paths -{platform_bin_dir, "bin"}. -{platform_data_dir, "data"}. -{platform_etc_dir, "etc"}. -{platform_lib_dir, "lib"}. -{platform_log_dir, "log"}. -{platform_plugins_dir, "plugins"}. - -%% -%% bin/emqx -%% -{runner_root_dir, "$(cd $(dirname $(readlink $0 || echo $0))/..; pwd -P)"}. -{runner_bin_dir, "$RUNNER_ROOT_DIR/bin"}. -{runner_etc_dir, "$RUNNER_ROOT_DIR/etc"}. -{runner_lib_dir, "$RUNNER_ROOT_DIR/lib"}. -{runner_log_dir, "$RUNNER_ROOT_DIR/log"}. -{runner_data_dir, "$RUNNER_ROOT_DIR/data"}. -{pipe_dir, "/tmp/$RUNNER_SCRIPT/"}. -{runner_user, ""}. - diff --git a/vars/vars-cloud.config b/vars/vars-cloud.config deleted file mode 100644 index 266ef717c..000000000 --- a/vars/vars-cloud.config +++ /dev/null @@ -1,4 +0,0 @@ -{enable_plugin_emqx_rule_engine, true}. -{enable_plugin_emqx_bridge_mqtt, false}. -{vm_args_file, "vm.args"}. -{emqx_description, "EMQ X Broker"}. \ No newline at end of file diff --git a/vars/vars-edge.config b/vars/vars-edge.config deleted file mode 100644 index 9b1d76401..000000000 --- a/vars/vars-edge.config +++ /dev/null @@ -1,4 +0,0 @@ -{enable_plugin_emqx_rule_engine, false}. -{enable_plugin_emqx_bridge_mqtt, true}. -{vm_args_file, "vm.args.edge"}. -{emqx_description, "EMQ X Edge"}. diff --git a/vars/vars-pkg.config b/vars/vars-pkg.config deleted file mode 100644 index a48227a75..000000000 --- a/vars/vars-pkg.config +++ /dev/null @@ -1,22 +0,0 @@ -%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- -%% ex: ft=erlang ts=4 sw=4 et - -%% Platform-specific installation paths -{platform_bin_dir, ""}. -{platform_data_dir, "/var/lib/emqx"}. -{platform_etc_dir, "/etc/emqx"}. -{platform_lib_dir, ""}. -{platform_log_dir, "/var/log/emqx"}. -{platform_plugins_dir, "/var/lib/emqx/plugins"}. - -%% -%% bin/emqx -%% -{runner_root_dir, "/usr/lib/emqx"}. -{runner_bin_dir, "/usr/bin"}. -{runner_etc_dir, "/etc/emqx"}. -{runner_lib_dir, "$RUNNER_ROOT_DIR/lib"}. -{runner_log_dir, "/var/log/emqx"}. -{runner_data_dir, "/var/lib/emqx"}. -{pipe_dir, "/tmp/$RUNNER_SCRIPT/"}. -{runner_user, "emqx"}.