diff --git a/.ci/docker-compose-file/docker-compose-python.yaml b/.ci/docker-compose-file/docker-compose-python.yaml index 14e798c6b..4a02c6378 100644 --- a/.ci/docker-compose-file/docker-compose-python.yaml +++ b/.ci/docker-compose-file/docker-compose-python.yaml @@ -3,7 +3,7 @@ version: '3.9' services: python: container_name: python - image: python:3.7.2-alpine3.9 + image: python:3.9.16-alpine3.18 depends_on: - emqx1 - emqx2 @@ -12,4 +12,3 @@ services: emqx_bridge: volumes: - ./python:/scripts - diff --git a/.ci/docker-compose-file/docker-compose-rocketmq.yaml b/.ci/docker-compose-file/docker-compose-rocketmq.yaml index 7e5a2e42e..7c019e931 100644 --- a/.ci/docker-compose-file/docker-compose-rocketmq.yaml +++ b/.ci/docker-compose-file/docker-compose-rocketmq.yaml @@ -23,6 +23,7 @@ services: - ./rocketmq/logs:/opt/logs - ./rocketmq/store:/opt/store - ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf + - ./rocketmq/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.9.4/conf/plain_acl.yml environment: NAMESRV_ADDR: "rocketmq_namesrv:9876" JAVA_OPTS: " -Duser.home=/opt -Drocketmq.broker.diskSpaceWarningLevelRatio=0.99" diff --git a/.ci/docker-compose-file/python/pytest.sh b/.ci/docker-compose-file/python/pytest.sh index 04b0aa1b2..245ee8a2b 100755 --- a/.ci/docker-compose-file/python/pytest.sh +++ b/.ci/docker-compose-file/python/pytest.sh @@ -18,13 +18,13 @@ else fi apk update && apk add git curl -git clone -b develop-4.0 https://github.com/emqx/paho.mqtt.testing.git /paho.mqtt.testing -pip install pytest==6.2.5 +git clone -b develop-5.0 https://github.com/emqx/paho.mqtt.testing.git /paho.mqtt.testing +pip install pytest==7.1.2 pytest-retry -pytest -v /paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "$TARGET_HOST" +pytest --retries 3 -v /paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "$TARGET_HOST" RESULT=$? -pytest -v /paho.mqtt.testing/interoperability/test_client --host "$TARGET_HOST" +pytest --retries 3 -v /paho.mqtt.testing/interoperability/test_client --host "$TARGET_HOST" RESULT=$(( RESULT + $? )) # pytest -v /paho.mqtt.testing/interoperability/test_cluster --host1 "node1.emqx.io" --host2 "node2.emqx.io" diff --git a/.ci/docker-compose-file/rocketmq/conf/broker.conf b/.ci/docker-compose-file/rocketmq/conf/broker.conf index c343090e4..fbd716e54 100644 --- a/.ci/docker-compose-file/rocketmq/conf/broker.conf +++ b/.ci/docker-compose-file/rocketmq/conf/broker.conf @@ -20,3 +20,5 @@ maxMessageSize=65536 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH + +aclEnable=true diff --git a/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml b/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml new file mode 100644 index 000000000..e2c41a87f --- /dev/null +++ b/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml @@ -0,0 +1,11 @@ +globalWhiteRemoteAddresses: + +accounts: + - accessKey: RocketMQ + secretKey: 12345678 + whiteRemoteAddress: + admin: false + defaultTopicPerm: DENY + defaultGroupPerm: PUB|SUB + topicPerms: + - TopicTest=PUB|SUB diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index c685078e0..b5dc01535 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -165,7 +165,7 @@ jobs: path: _packages/**/* docker: - runs-on: ubuntu-22.04 + runs-on: aws-amd64 strategy: fail-fast: false @@ -196,12 +196,17 @@ jobs: tags: ${{ env.EMQX_IMAGE_TAG }} build-args: | EMQX_NAME=${{ env.EMQX_NAME }} - - name: test docker image + - name: smoke test run: | CID=$(docker run -d --rm -P $EMQX_IMAGE_TAG) HTTP_PORT=$(docker inspect --format='{{(index (index .NetworkSettings.Ports "18083/tcp") 0).HostPort}}' $CID) ./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT docker stop $CID + - name: dashboard tests + working-directory: ./scripts/ui-tests + run: | + set -eu + docker compose up --abort-on-container-exit --exit-code-from selenium - name: test two nodes cluster with proto_dist=inet_tls in docker run: | ./scripts/test/start-two-nodes-in-docker.sh -P $EMQX_IMAGE_TAG $EMQX_IMAGE_OLD_VERSION_TAG @@ -216,6 +221,11 @@ jobs: with: name: "${{ matrix.profile[0] }}-docker" path: "${{ env.EMQX_NAME }}-${{ env.PKG_VSN }}.tar.gz" + - name: cleanup + if: always() + working-directory: ./scripts/ui-tests + run: | + docker compose rm -fs spellcheck: needs: linux diff --git a/.github/workflows/run_fvt_tests.yaml b/.github/workflows/run_fvt_tests.yaml index b1246f745..157ec7dcd 100644 --- a/.github/workflows/run_fvt_tests.yaml +++ b/.github/workflows/run_fvt_tests.yaml @@ -228,11 +228,11 @@ jobs: - uses: actions/checkout@v3 with: repository: emqx/paho.mqtt.testing - ref: develop-4.0 + ref: develop-5.0 path: paho.mqtt.testing - name: install pytest run: | - pip install pytest + pip install pytest==7.1.2 pytest-retry echo "$HOME/.local/bin" >> $GITHUB_PATH - name: run paho test timeout-minutes: 10 @@ -250,6 +250,6 @@ jobs: sleep 10 done - pytest -v paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "127.0.0.1" + pytest --retries 3 -v paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "127.0.0.1" - if: failure() run: kubectl logs -l "app.kubernetes.io/instance=${{ matrix.profile }}" -c emqx --tail=1000 diff --git a/apps/emqx/include/emqx_authentication.hrl b/apps/emqx/include/emqx_authentication.hrl index 2b1f4d33f..20ae2bf1e 100644 --- a/apps/emqx/include/emqx_authentication.hrl +++ b/apps/emqx/include/emqx_authentication.hrl @@ -20,6 +20,7 @@ -include_lib("emqx/include/logger.hrl"). -define(AUTHN_TRACE_TAG, "AUTHN"). +-define(GLOBAL, 'mqtt:global'). -define(TRACE_AUTHN_PROVIDER(Msg), ?TRACE_AUTHN_PROVIDER(Msg, #{})). -define(TRACE_AUTHN_PROVIDER(Msg, Meta), ?TRACE_AUTHN_PROVIDER(debug, Msg, Meta)). diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index a86705eae..8623ef04d 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -29,6 +29,7 @@ {emqx_management,3}. {emqx_management,4}. {emqx_mgmt_api_plugins,1}. +{emqx_mgmt_api_plugins,2}. {emqx_mgmt_cluster,1}. {emqx_mgmt_trace,1}. {emqx_mgmt_trace,2}. diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index 1cdb563aa..47ff384c9 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -239,14 +239,30 @@ remove_config([RootName | _] = KeyPath, Opts) -> -spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -reset_config([RootName | _] = KeyPath, Opts) -> +reset_config([RootName | SubKeys] = KeyPath, Opts) -> case emqx_config:get_default_value(KeyPath) of {ok, Default} -> - emqx_config_handler:update_config( - emqx_config:get_schema_mod(RootName), - KeyPath, - {{update, Default}, Opts} - ); + Mod = emqx_config:get_schema_mod(RootName), + case SubKeys =:= [] of + true -> + emqx_config_handler:update_config( + Mod, + KeyPath, + {{update, Default}, Opts} + ); + false -> + NewConf = + emqx_utils_maps:deep_put( + SubKeys, + emqx_config:get_raw([RootName], #{}), + Default + ), + emqx_config_handler:update_config( + Mod, + [RootName], + {{update, NewConf}, Opts} + ) + end; {error, _} = Error -> Error end. diff --git a/apps/emqx/src/emqx_authentication.erl b/apps/emqx/src/emqx_authentication.erl index fe93bed68..cce789f24 100644 --- a/apps/emqx/src/emqx_authentication.erl +++ b/apps/emqx/src/emqx_authentication.erl @@ -60,7 +60,8 @@ update_authenticator/3, lookup_authenticator/2, list_authenticators/1, - move_authenticator/3 + move_authenticator/3, + reorder_authenticator/2 ]). %% APIs for observer built_in_database @@ -86,12 +87,6 @@ %% utility functions -export([authenticator_id/1, metrics_id/2]). -%% proxy callback --export([ - pre_config_update/3, - post_config_update/5 -]). - -export_type([ authenticator_id/0, position/0, @@ -275,12 +270,6 @@ get_enabled(Authenticators) -> %% APIs %%------------------------------------------------------------------------------ -pre_config_update(Path, UpdateReq, OldConfig) -> - emqx_authentication_config:pre_config_update(Path, UpdateReq, OldConfig). - -post_config_update(Path, UpdateReq, NewConfig, OldConfig, AppEnvs) -> - emqx_authentication_config:post_config_update(Path, UpdateReq, NewConfig, OldConfig, AppEnvs). - %% @doc Get all registered authentication providers. get_providers() -> call(get_providers). @@ -413,6 +402,12 @@ list_authenticators(ChainName) -> move_authenticator(ChainName, AuthenticatorID, Position) -> call({move_authenticator, ChainName, AuthenticatorID, Position}). +-spec reorder_authenticator(chain_name(), [authenticator_id()]) -> ok. +reorder_authenticator(_ChainName, []) -> + ok; +reorder_authenticator(ChainName, AuthenticatorIDs) -> + call({reorder_authenticator, ChainName, AuthenticatorIDs}). + -spec import_users(chain_name(), authenticator_id(), {binary(), binary()}) -> ok | {error, term()}. import_users(ChainName, AuthenticatorID, Filename) -> @@ -447,8 +442,9 @@ list_users(ChainName, AuthenticatorID, FuzzyParams) -> init(_Opts) -> process_flag(trap_exit, true), - ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE), - ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], ?MODULE), + Module = emqx_authentication_config, + ok = emqx_config_handler:add_handler([?CONF_ROOT], Module), + ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], Module), {ok, #{hooked => false, providers => #{}}}. handle_call(get_providers, _From, #{providers := Providers} = State) -> @@ -504,6 +500,12 @@ handle_call({move_authenticator, ChainName, AuthenticatorID, Position}, _From, S end, Reply = with_chain(ChainName, UpdateFun), reply(Reply, State); +handle_call({reorder_authenticator, ChainName, AuthenticatorIDs}, _From, State) -> + UpdateFun = fun(Chain) -> + handle_reorder_authenticator(Chain, AuthenticatorIDs) + end, + Reply = with_chain(ChainName, UpdateFun), + reply(Reply, State); handle_call({import_users, ChainName, AuthenticatorID, Filename}, _From, State) -> Reply = call_authenticator(ChainName, AuthenticatorID, import_users, [Filename]), reply(Reply, State); @@ -609,6 +611,24 @@ handle_move_authenticator(Chain, AuthenticatorID, Position) -> {error, Reason} end. +handle_reorder_authenticator(Chain, AuthenticatorIDs) -> + #chain{authenticators = Authenticators} = Chain, + NAuthenticators = + lists:filtermap( + fun(ID) -> + case lists:keyfind(ID, #authenticator.id, Authenticators) of + false -> + ?SLOG(error, #{msg => "authenticator_not_found", id => ID}), + false; + Authenticator -> + {true, Authenticator} + end + end, + AuthenticatorIDs + ), + NewChain = Chain#chain{authenticators = NAuthenticators}, + {ok, ok, NewChain}. + handle_create_authenticator(Chain, Config, Providers) -> #chain{name = Name, authenticators = Authenticators} = Chain, AuthenticatorID = authenticator_id(Config), diff --git a/apps/emqx/src/emqx_authentication_config.erl b/apps/emqx/src/emqx_authentication_config.erl index 98c0a19f8..a1b55ea43 100644 --- a/apps/emqx/src/emqx_authentication_config.erl +++ b/apps/emqx/src/emqx_authentication_config.erl @@ -65,8 +65,8 @@ -spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) -> {ok, map() | list()} | {error, term()}. -pre_config_update(_, UpdateReq, OldConfig) -> - try do_pre_config_update(UpdateReq, to_list(OldConfig)) of +pre_config_update(Paths, UpdateReq, OldConfig) -> + try do_pre_config_update(Paths, UpdateReq, to_list(OldConfig)) of {error, Reason} -> {error, Reason}; {ok, NewConfig} -> {ok, NewConfig} catch @@ -74,9 +74,9 @@ pre_config_update(_, UpdateReq, OldConfig) -> {error, Reason} end. -do_pre_config_update({create_authenticator, ChainName, Config}, OldConfig) -> +do_pre_config_update(_, {create_authenticator, ChainName, Config}, OldConfig) -> NewId = authenticator_id(Config), - case lists:filter(fun(OldConfig0) -> authenticator_id(OldConfig0) =:= NewId end, OldConfig) of + case filter_authenticator(NewId, OldConfig) of [] -> CertsDir = certs_dir(ChainName, Config), NConfig = convert_certs(CertsDir, Config), @@ -84,7 +84,7 @@ do_pre_config_update({create_authenticator, ChainName, Config}, OldConfig) -> [_] -> {error, {already_exists, {authenticator, NewId}}} end; -do_pre_config_update({delete_authenticator, _ChainName, AuthenticatorID}, OldConfig) -> +do_pre_config_update(_, {delete_authenticator, _ChainName, AuthenticatorID}, OldConfig) -> NewConfig = lists:filter( fun(OldConfig0) -> AuthenticatorID =/= authenticator_id(OldConfig0) @@ -92,7 +92,7 @@ do_pre_config_update({delete_authenticator, _ChainName, AuthenticatorID}, OldCon OldConfig ), {ok, NewConfig}; -do_pre_config_update({update_authenticator, ChainName, AuthenticatorID, Config}, OldConfig) -> +do_pre_config_update(_, {update_authenticator, ChainName, AuthenticatorID, Config}, OldConfig) -> CertsDir = certs_dir(ChainName, AuthenticatorID), NewConfig = lists:map( fun(OldConfig0) -> @@ -104,7 +104,7 @@ do_pre_config_update({update_authenticator, ChainName, AuthenticatorID, Config}, OldConfig ), {ok, NewConfig}; -do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}, OldConfig) -> +do_pre_config_update(_, {move_authenticator, _ChainName, AuthenticatorID, Position}, OldConfig) -> case split_by_id(AuthenticatorID, OldConfig) of {error, Reason} -> {error, Reason}; @@ -129,7 +129,18 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position} {ok, BeforeNFound ++ [FoundRelated, Found | AfterNFound]} end end - end. + end; +do_pre_config_update(_, OldConfig, OldConfig) -> + {ok, OldConfig}; +do_pre_config_update(Paths, NewConfig, _OldConfig) -> + ChainName = chain_name(Paths), + {ok, [ + begin + CertsDir = certs_dir(ChainName, New), + convert_certs(CertsDir, New) + end + || New <- to_list(NewConfig) + ]}. -spec post_config_update( list(atom()), @@ -139,13 +150,16 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position} emqx_config:app_envs() ) -> ok | {ok, map()} | {error, term()}. -post_config_update(_, UpdateReq, NewConfig, OldConfig, AppEnvs) -> - do_post_config_update(UpdateReq, to_list(NewConfig), OldConfig, AppEnvs). +post_config_update(Paths, UpdateReq, NewConfig, OldConfig, AppEnvs) -> + do_post_config_update(Paths, UpdateReq, to_list(NewConfig), OldConfig, AppEnvs). -do_post_config_update({create_authenticator, ChainName, Config}, NewConfig, _OldConfig, _AppEnvs) -> +do_post_config_update( + _, {create_authenticator, ChainName, Config}, NewConfig, _OldConfig, _AppEnvs +) -> NConfig = get_authenticator_config(authenticator_id(Config), NewConfig), emqx_authentication:create_authenticator(ChainName, NConfig); do_post_config_update( + _, {delete_authenticator, ChainName, AuthenticatorID}, _NewConfig, OldConfig, @@ -160,6 +174,7 @@ do_post_config_update( {error, Reason} end; do_post_config_update( + _, {update_authenticator, ChainName, AuthenticatorID, Config}, NewConfig, _OldConfig, @@ -172,12 +187,57 @@ do_post_config_update( emqx_authentication:update_authenticator(ChainName, AuthenticatorID, NConfig) end; do_post_config_update( + _, {move_authenticator, ChainName, AuthenticatorID, Position}, _NewConfig, _OldConfig, _AppEnvs ) -> - emqx_authentication:move_authenticator(ChainName, AuthenticatorID, Position). + emqx_authentication:move_authenticator(ChainName, AuthenticatorID, Position); +do_post_config_update(_, _UpdateReq, OldConfig, OldConfig, _AppEnvs) -> + ok; +do_post_config_update(Paths, _UpdateReq, NewConfig0, OldConfig0, _AppEnvs) -> + ChainName = chain_name(Paths), + OldConfig = to_list(OldConfig0), + NewConfig = to_list(NewConfig0), + OldIds = lists:map(fun authenticator_id/1, OldConfig), + NewIds = lists:map(fun authenticator_id/1, NewConfig), + ok = delete_authenticators(NewIds, ChainName, OldConfig), + ok = create_or_update_authenticators(OldIds, ChainName, NewConfig), + ok = emqx_authentication:reorder_authenticator(ChainName, NewIds), + ok. + +%% create new authenticators and update existing ones +create_or_update_authenticators(OldIds, ChainName, NewConfig) -> + lists:foreach( + fun(Conf) -> + Id = authenticator_id(Conf), + case lists:member(Id, OldIds) of + true -> + emqx_authentication:update_authenticator(ChainName, Id, Conf); + false -> + emqx_authentication:create_authenticator(ChainName, Conf) + end + end, + NewConfig + ). + +%% delete authenticators that are not in the new config +delete_authenticators(NewIds, ChainName, OldConfig) -> + lists:foreach( + fun(Conf) -> + Id = authenticator_id(Conf), + case lists:member(Id, NewIds) of + true -> + ok; + false -> + _ = emqx_authentication:delete_authenticator(ChainName, Id), + CertsDir = certs_dir(ChainName, Conf), + ok = clear_certs(CertsDir, Conf) + end + end, + OldConfig + ). to_list(undefined) -> []; to_list(M) when M =:= #{} -> []; @@ -213,14 +273,15 @@ clear_certs(CertsDir, Config) -> ok = emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL). get_authenticator_config(AuthenticatorID, AuthenticatorsConfig) -> - case - lists:filter(fun(C) -> AuthenticatorID =:= authenticator_id(C) end, AuthenticatorsConfig) - of + case filter_authenticator(AuthenticatorID, AuthenticatorsConfig) of [C] -> C; [] -> {error, not_found}; _ -> error({duplicated_authenticator_id, AuthenticatorsConfig}) end. +filter_authenticator(ID, Authenticators) -> + lists:filter(fun(A) -> ID =:= authenticator_id(A) end, Authenticators). + split_by_id(ID, AuthenticatorsConfig) -> case lists:foldl( @@ -287,3 +348,8 @@ dir(ChainName, ID) when is_binary(ID) -> emqx_utils:safe_filename(iolist_to_binary([to_bin(ChainName), "-", ID])); dir(ChainName, Config) when is_map(Config) -> dir(ChainName, authenticator_id(Config)). + +chain_name([authentication]) -> + ?GLOBAL; +chain_name([listeners, Type, Name, authentication]) -> + binary_to_existing_atom(<<(atom_to_binary(Type))/binary, ":", (atom_to_binary(Name))/binary>>). diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index ebcf9c434..db3d48f5d 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -176,11 +176,13 @@ insert_channel_info(ClientId, Info, Stats) -> %% Note that: It should be called on a lock transaction register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) -> Chan = {ClientId, ChanPid}, + %% cast (for process monitor) before inserting ets tables + cast({registered, Chan}), true = ets:insert(?CHAN_TAB, Chan), true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}), ok = emqx_cm_registry:register_channel(Chan), mark_channel_connected(ChanPid), - cast({registered, Chan}). + ok. %% @doc Unregister a channel. -spec unregister_channel(emqx_types:clientid()) -> ok. diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index b43cd5003..080172c7b 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -279,25 +279,28 @@ get_default_value([RootName | _] = KeyPath) -> end. -spec get_raw(emqx_utils_maps:config_key_path()) -> term(). -get_raw([Root | T]) when is_atom(Root) -> get_raw([bin(Root) | T]); -get_raw(KeyPath) -> do_get_raw(KeyPath). +get_raw([Root | _] = KeyPath) when is_binary(Root) -> do_get_raw(KeyPath); +get_raw([Root | T]) -> get_raw([bin(Root) | T]); +get_raw([]) -> do_get_raw([]). -spec get_raw(emqx_utils_maps:config_key_path(), term()) -> term(). -get_raw([Root | T], Default) when is_atom(Root) -> get_raw([bin(Root) | T], Default); -get_raw(KeyPath, Default) -> do_get_raw(KeyPath, Default). +get_raw([Root | _] = KeyPath, Default) when is_binary(Root) -> do_get_raw(KeyPath, Default); +get_raw([Root | T], Default) -> get_raw([bin(Root) | T], Default); +get_raw([], Default) -> do_get_raw([], Default). -spec put_raw(map()) -> ok. put_raw(Config) -> maps:fold( fun(RootName, RootV, _) -> - ?MODULE:put_raw([RootName], RootV) + ?MODULE:put_raw([bin(RootName)], RootV) end, ok, hocon_maps:ensure_plain(Config) ). -spec put_raw(emqx_utils_maps:config_key_path(), term()) -> ok. -put_raw(KeyPath, Config) -> +put_raw(KeyPath0, Config) -> + KeyPath = [bin(K) || K <- KeyPath0], Putter = fun(Path, Map, Value) -> emqx_utils_maps:deep_force_put(Path, Map, Value) end, diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index 0bad19f9e..5576f579d 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -31,8 +31,7 @@ remove_handler/1, update_config/3, get_raw_cluster_override_conf/0, - info/0, - merge_to_old_config/2 + info/0 ]). %% gen_server callbacks @@ -332,31 +331,16 @@ do_post_config_update( SubOldConf = get_sub_config(ConfKey, OldConf), SubNewConf = get_sub_config(ConfKey, NewConf), SubHandlers = get_sub_handlers(ConfKey, Handlers), - case - do_post_config_update( - SubConfKeyPath, - SubHandlers, - SubOldConf, - SubNewConf, - AppEnvs, - UpdateArgs, - Result, - ConfKeyPath - ) - of - {ok, Result1} -> - call_post_config_update( - Handlers, - OldConf, - NewConf, - AppEnvs, - up_req(UpdateArgs), - Result1, - ConfKeyPath - ); - Error -> - Error - end. + do_post_config_update( + SubConfKeyPath, + SubHandlers, + SubOldConf, + SubNewConf, + AppEnvs, + UpdateArgs, + Result, + ConfKeyPath + ). get_sub_handlers(ConfKey, Handlers) -> case maps:find(ConfKey, Handlers) of diff --git a/apps/emqx/src/emqx_maybe.erl b/apps/emqx/src/emqx_maybe.erl index 5b5d5e94b..af2fd04a7 100644 --- a/apps/emqx/src/emqx_maybe.erl +++ b/apps/emqx/src/emqx_maybe.erl @@ -45,8 +45,8 @@ define(Term, _) -> Term. %% @doc Apply a function to a maybe argument. --spec apply(fun((A) -> maybe(A)), maybe(A)) -> - maybe(A). +-spec apply(fun((A) -> B), maybe(A)) -> + maybe(B). apply(_Fun, undefined) -> undefined; apply(Fun, Term) when is_function(Fun) -> diff --git a/apps/emqx/test/emqx_authentication_SUITE.erl b/apps/emqx/test/emqx_authentication_SUITE.erl index 652b634ca..5be3521d9 100644 --- a/apps/emqx/test/emqx_authentication_SUITE.erl +++ b/apps/emqx/test/emqx_authentication_SUITE.erl @@ -174,7 +174,7 @@ t_authenticator(Config) when is_list(Config) -> register_provider(AuthNType1, ?MODULE), ID1 = <<"password_based:built_in_database">>, - % CRUD of authencaticator + % CRUD of authenticator ?assertMatch( {ok, #{id := ID1, state := #{mark := 1}}}, ?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1) @@ -296,7 +296,10 @@ t_update_config({init, Config}) -> | Config ]; t_update_config(Config) when is_list(Config) -> - emqx_config_handler:add_handler([?CONF_ROOT], emqx_authentication), + emqx_config_handler:add_handler([?CONF_ROOT], emqx_authentication_config), + ok = emqx_config_handler:add_handler( + [listeners, '?', '?', ?CONF_ROOT], emqx_authentication_config + ), ok = register_provider(?config("auth1"), ?MODULE), ok = register_provider(?config("auth2"), ?MODULE), Global = ?config(global), @@ -355,6 +358,10 @@ t_update_config(Config) when is_list(Config) -> ?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(Global)), + [Raw2, Raw1] = emqx:get_raw_config([?CONF_ROOT]), + ?assertMatch({ok, _}, update_config([?CONF_ROOT], [Raw1, Raw2])), + ?assertMatch({ok, [#{id := ID1}, #{id := ID2}]}, ?AUTHN:list_authenticators(Global)), + ?assertMatch({ok, _}, update_config([?CONF_ROOT], {delete_authenticator, Global, ID1})), ?assertEqual( {error, {not_found, {authenticator, ID1}}}, @@ -417,11 +424,16 @@ t_update_config(Config) when is_list(Config) -> {ok, _}, update_config(ConfKeyPath, {move_authenticator, ListenerID, ID2, ?CMD_MOVE_FRONT}) ), - ?assertMatch( {ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(ListenerID) ), + [LRaw2, LRaw1] = emqx:get_raw_config(ConfKeyPath), + ?assertMatch({ok, _}, update_config(ConfKeyPath, [LRaw1, LRaw2])), + ?assertMatch( + {ok, [#{id := ID1}, #{id := ID2}]}, + ?AUTHN:list_authenticators(ListenerID) + ), ?assertMatch( {ok, _}, diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 7aaf93c99..54b3b3ca9 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -277,7 +277,7 @@ wait_for_app_processes(_) -> %% and stop others, and then the `application:start/2' callback is %% never called again for this application. perform_sanity_checks(emqx_rule_engine) -> - ensure_config_handler(emqx_rule_engine, [rule_engine, rules]), + ensure_config_handler(emqx_rule_engine, [rule_engine, rules, '?']), ok; perform_sanity_checks(emqx_bridge) -> ensure_config_handler(emqx_bridge, [bridges]), @@ -289,7 +289,7 @@ ensure_config_handler(Module, ConfigPath) -> #{handlers := Handlers} = sys:get_state(emqx_config_handler), case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of #{{mod} := Module} -> ok; - _NotFound -> error({config_handler_missing, ConfigPath, Module}) + NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound}) end, ok. diff --git a/apps/emqx/test/emqx_config_handler_SUITE.erl b/apps/emqx/test/emqx_config_handler_SUITE.erl index e21c1867f..194198571 100644 --- a/apps/emqx/test/emqx_config_handler_SUITE.erl +++ b/apps/emqx/test/emqx_config_handler_SUITE.erl @@ -130,7 +130,7 @@ t_root_key_update(_Config) -> ?assertEqual( {ok, #{ config => 0.81, - post_config_update => #{?MODULE => ok}, + post_config_update => #{}, raw_config => <<"81%">> }}, emqx:update_config(SubKey, "81%", Opts) @@ -174,7 +174,7 @@ t_sub_key_update_remove(_Config) -> %% remove ?assertEqual( - {ok, #{post_config_update => #{emqx_config_handler_SUITE => ok}}}, + {ok, #{post_config_update => #{?MODULE => ok}}}, emqx:remove_config(KeyPath) ), ?assertError( @@ -184,18 +184,6 @@ t_sub_key_update_remove(_Config) -> ?assertEqual(false, lists:member(<<"cpu_check_interval">>, OSKey)), ?assert(length(OSKey) > 0), - ?assertEqual( - {ok, #{ - config => 60000, - post_config_update => #{?MODULE => ok}, - raw_config => <<"60s">> - }}, - emqx:reset_config(KeyPath, Opts) - ), - OSKey1 = maps:keys(emqx:get_raw_config([sysmon, os])), - ?assertEqual(true, lists:member(<<"cpu_check_interval">>, OSKey1)), - ?assert(length(OSKey1) > 1), - ok = emqx_config_handler:remove_handler(KeyPath), ok = emqx_config_handler:remove_handler(KeyPath2), ok. @@ -292,44 +280,6 @@ t_get_raw_cluster_override_conf(_Config) -> ?assertEqual(OldInfo, NewInfo), ok. -t_save_config_failed(_Config) -> - ok. - -t_update_sub(_Config) -> - PathKey = [sysmon], - Opts = #{rawconf_with_defaults => true}, - ok = emqx_config_handler:add_handler(PathKey, ?MODULE), - %% update sub key - #{<<"os">> := OS1} = emqx:get_raw_config(PathKey), - {ok, Res} = emqx:update_config(PathKey ++ [os, cpu_check_interval], <<"120s">>, Opts), - ?assertMatch( - #{ - config := 120000, - post_config_update := #{?MODULE := ok}, - raw_config := <<"120s">> - }, - Res - ), - ?assertMatch(#{os := #{cpu_check_interval := 120000}}, emqx:get_config(PathKey)), - #{<<"os">> := OS2} = emqx:get_raw_config(PathKey), - ?assertEqual(lists:sort(maps:keys(OS1)), lists:sort(maps:keys(OS2))), - - %% update sub key - SubKey = PathKey ++ [os, cpu_high_watermark], - ?assertEqual( - {ok, #{ - config => 0.81, - post_config_update => #{?MODULE => ok}, - raw_config => <<"81%">> - }}, - emqx:update_config(SubKey, "81%", Opts) - ), - ?assertEqual(0.81, emqx:get_config(SubKey)), - ?assertEqual("81%", emqx:get_raw_config(SubKey)), - - ok = emqx_config_handler:remove_handler(PathKey), - ok. - pre_config_update([sysmon], UpdateReq, _RawConf) -> {ok, UpdateReq}; pre_config_update([sysmon, os], UpdateReq, _RawConf) -> diff --git a/apps/emqx/test/emqx_crl_cache_SUITE.erl b/apps/emqx/test/emqx_crl_cache_SUITE.erl index be8f49343..83675f11e 100644 --- a/apps/emqx/test/emqx_crl_cache_SUITE.erl +++ b/apps/emqx/test/emqx_crl_cache_SUITE.erl @@ -279,7 +279,7 @@ does_module_exist(Mod) -> clear_listeners() -> emqx_config:put([listeners], #{}), - emqx_config:put_raw([listeners], #{}), + emqx_config:put_raw([<<"listeners">>], #{}), ok. assert_http_get(URL) -> diff --git a/apps/emqx_authn/include/emqx_authn.hrl b/apps/emqx_authn/include/emqx_authn.hrl index 30482553c..601b161d5 100644 --- a/apps/emqx_authn/include/emqx_authn.hrl +++ b/apps/emqx_authn/include/emqx_authn.hrl @@ -23,8 +23,6 @@ -define(AUTHN, emqx_authentication). --define(GLOBAL, 'mqtt:global'). - -define(RE_PLACEHOLDER, "\\$\\{[a-z0-9\\-]+\\}"). -define(AUTH_SHARD, emqx_authn_shard). diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index f00ca8ed1..65ce0cc32 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -31,6 +31,7 @@ -define(NOT_FOUND, 'NOT_FOUND'). -define(ALREADY_EXISTS, 'ALREADY_EXISTS'). -define(INTERNAL_ERROR, 'INTERNAL_ERROR'). +-define(CONFIG, emqx_authentication_config). % Swagger @@ -833,12 +834,12 @@ with_chain(ListenerID, Fun) -> create_authenticator(ConfKeyPath, ChainName, Config) -> case update_config(ConfKeyPath, {create_authenticator, ChainName, Config}) of {ok, #{ - post_config_update := #{emqx_authentication := #{id := ID}}, + post_config_update := #{?CONFIG := #{id := ID}}, raw_config := AuthenticatorsConfig }} -> {ok, AuthenticatorConfig} = find_config(ID, AuthenticatorsConfig), {200, maps:put(id, ID, convert_certs(fill_defaults(AuthenticatorConfig)))}; - {error, {_PrePostConfigUpdate, emqx_authentication, Reason}} -> + {error, {_PrePostConfigUpdate, ?CONFIG, Reason}} -> serialize_error(Reason); {error, Reason} -> serialize_error(Reason) @@ -1017,7 +1018,7 @@ update_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Config) -> of {ok, _} -> {204}; - {error, {_PrePostConfigUpdate, emqx_authentication, Reason}} -> + {error, {_PrePostConfigUpdate, ?CONFIG, Reason}} -> serialize_error(Reason); {error, Reason} -> serialize_error(Reason) @@ -1027,7 +1028,7 @@ delete_authenticator(ConfKeyPath, ChainName, AuthenticatorID) -> case update_config(ConfKeyPath, {delete_authenticator, ChainName, AuthenticatorID}) of {ok, _} -> {204}; - {error, {_PrePostConfigUpdate, emqx_authentication, Reason}} -> + {error, {_PrePostConfigUpdate, ?CONFIG, Reason}} -> serialize_error(Reason); {error, Reason} -> serialize_error(Reason) @@ -1044,7 +1045,7 @@ move_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Position) -> of {ok, _} -> {204}; - {error, {_PrePostConfigUpdate, emqx_authentication, Reason}} -> + {error, {_PrePostConfigUpdate, ?CONFIG, Reason}} -> serialize_error(Reason); {error, Reason} -> serialize_error(Reason) diff --git a/apps/emqx_authn/test/emqx_authn_SUITE.erl b/apps/emqx_authn/test/emqx_authn_SUITE.erl index 4f96ca2dd..d5df4add3 100644 --- a/apps/emqx_authn/test/emqx_authn_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_SUITE.erl @@ -200,6 +200,127 @@ t_union_selector_errors(Config) when is_list(Config) -> ), ok. +t_update_conf({init, Config}) -> + emqx_common_test_helpers:start_apps([emqx_conf, emqx_authn]), + {ok, _} = emqx:update_config([authentication], []), + Config; +t_update_conf({'end', _Config}) -> + {ok, _} = emqx:update_config([authentication], []), + emqx_common_test_helpers:stop_apps([emqx_authn, emqx_conf]), + ok; +t_update_conf(Config) when is_list(Config) -> + Authn1 = #{ + <<"mechanism">> => <<"password_based">>, + <<"backend">> => <<"built_in_database">>, + <<"user_id_type">> => <<"clientid">>, + <<"enable">> => true + }, + Authn2 = #{ + <<"mechanism">> => <<"password_based">>, + <<"backend">> => <<"http">>, + <<"method">> => <<"post">>, + <<"url">> => <<"http://127.0.0.1:18083">>, + <<"headers">> => #{ + <<"content-type">> => <<"application/json">> + }, + <<"enable">> => true + }, + Authn3 = #{ + <<"mechanism">> => <<"jwt">>, + <<"use_jwks">> => false, + <<"algorithm">> => <<"hmac-based">>, + <<"secret">> => <<"mysecret">>, + <<"secret_base64_encoded">> => false, + <<"verify_claims">> => #{<<"username">> => <<"${username}">>}, + <<"enable">> => true + }, + Chain = 'mqtt:global', + {ok, _} = emqx:update_config([authentication], [Authn1]), + ?assertMatch( + {ok, #{ + authenticators := [ + #{ + enable := true, + id := <<"password_based:built_in_database">>, + provider := emqx_authn_mnesia + } + ] + }}, + emqx_authentication:lookup_chain(Chain) + ), + + {ok, _} = emqx:update_config([authentication], [Authn1, Authn2, Authn3]), + ?assertMatch( + {ok, #{ + authenticators := [ + #{ + enable := true, + id := <<"password_based:built_in_database">>, + provider := emqx_authn_mnesia + }, + #{ + enable := true, + id := <<"password_based:http">>, + provider := emqx_authn_http + }, + #{ + enable := true, + id := <<"jwt">>, + provider := emqx_authn_jwt + } + ] + }}, + emqx_authentication:lookup_chain(Chain) + ), + {ok, _} = emqx:update_config([authentication], [Authn2, Authn1]), + ?assertMatch( + {ok, #{ + authenticators := [ + #{ + enable := true, + id := <<"password_based:http">>, + provider := emqx_authn_http + }, + #{ + enable := true, + id := <<"password_based:built_in_database">>, + provider := emqx_authn_mnesia + } + ] + }}, + emqx_authentication:lookup_chain(Chain) + ), + + {ok, _} = emqx:update_config([authentication], [Authn3, Authn2, Authn1]), + ?assertMatch( + {ok, #{ + authenticators := [ + #{ + enable := true, + id := <<"jwt">>, + provider := emqx_authn_jwt + }, + #{ + enable := true, + id := <<"password_based:http">>, + provider := emqx_authn_http + }, + #{ + enable := true, + id := <<"password_based:built_in_database">>, + provider := emqx_authn_mnesia + } + ] + }}, + emqx_authentication:lookup_chain(Chain) + ), + {ok, _} = emqx:update_config([authentication], []), + ?assertMatch( + {error, {not_found, {chain, Chain}}}, + emqx_authentication:lookup_chain(Chain) + ), + ok. + parse(Bytes) -> {ok, Frame, <<>>, {none, _}} = emqx_frame:parse(Bytes), Frame. diff --git a/apps/emqx_authz/include/emqx_authz.hrl b/apps/emqx_authz/include/emqx_authz.hrl index b7afd5e84..967865868 100644 --- a/apps/emqx_authz/include/emqx_authz.hrl +++ b/apps/emqx_authz/include/emqx_authz.hrl @@ -43,6 +43,7 @@ -define(CMD_MOVE_BEFORE(Before), {before, Before}). -define(CMD_MOVE_AFTER(After), {'after', After}). +-define(ROOT_KEY, [authorization]). -define(CONF_KEY_PATH, [authorization, sources]). -define(RE_PLACEHOLDER, "\\$\\{[a-z0-9_]+\\}"). diff --git a/apps/emqx_authz/src/emqx_authz.app.src b/apps/emqx_authz/src/emqx_authz.app.src index 7620f5548..929f292a7 100644 --- a/apps/emqx_authz/src/emqx_authz.app.src +++ b/apps/emqx_authz/src/emqx_authz.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_authz, [ {description, "An OTP application"}, - {vsn, "0.1.21"}, + {vsn, "0.1.22"}, {registered, []}, {mod, {emqx_authz_app, []}}, {applications, [ diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 7ceacdb68..a8c678be1 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -101,6 +101,7 @@ init() -> ok = register_metrics(), ok = init_metrics(client_info_source()), emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE), + emqx_conf:add_handler(?ROOT_KEY, ?MODULE), Sources = emqx_conf:get(?CONF_KEY_PATH, []), ok = check_dup_types(Sources), NSources = create_sources(Sources), @@ -109,6 +110,7 @@ init() -> deinit() -> ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}), emqx_conf:remove_handler(?CONF_KEY_PATH), + emqx_conf:remove_handler(?ROOT_KEY), emqx_authz_utils:cleanup_resources(). lookup() -> @@ -139,14 +141,29 @@ update({?CMD_DELETE, Type}, Sources) -> update(Cmd, Sources) -> emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}). -pre_config_update(_, Cmd, Sources) -> - try do_pre_config_update(Cmd, Sources) of +pre_config_update(Path, Cmd, Sources) -> + try do_pre_config_update(Path, Cmd, Sources) of {error, Reason} -> {error, Reason}; NSources -> {ok, NSources} catch _:Reason -> {error, Reason} end. +do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) -> + do_pre_config_update(Cmd, Sources); +do_pre_config_update(?ROOT_KEY, NewConf, OldConf) -> + do_pre_config_replace(NewConf, OldConf). + +%% override the entire config when updating the root key +%% emqx_conf:update(?ROOT_KEY, Conf); +do_pre_config_replace(Conf, Conf) -> + Conf; +do_pre_config_replace(NewConf, OldConf) -> + #{<<"sources">> := NewSources} = NewConf, + #{<<"sources">> := OldSources} = OldConf, + NewSources1 = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources), + NewConf#{<<"sources">> := NewSources1}. + do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) -> do_move(Cmd, Sources); do_pre_config_update({?CMD_PREPEND, Source}, Sources) -> @@ -179,47 +196,53 @@ do_pre_config_update({Op, Source}, Sources) -> post_config_update(_, _, undefined, _OldSource, _AppEnvs) -> ok; -post_config_update(_, Cmd, NewSources, _OldSource, _AppEnvs) -> - Actions = do_post_config_update(Cmd, NewSources), +post_config_update(Path, Cmd, NewSources, _OldSource, _AppEnvs) -> + Actions = do_post_config_update(Path, Cmd, NewSources), ok = update_authz_chain(Actions), ok = emqx_authz_cache:drain_cache(). -do_post_config_update({?CMD_MOVE, _Type, _Where} = Cmd, _Sources) -> - InitedSources = lookup(), - do_move(Cmd, InitedSources); -do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) -> - InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)), - %% create metrics +do_post_config_update(?CONF_KEY_PATH, {?CMD_MOVE, _Type, _Where} = Cmd, _Sources) -> + do_move(Cmd, lookup()); +do_post_config_update(?CONF_KEY_PATH, {?CMD_PREPEND, RawNewSource}, Sources) -> TypeName = type(RawNewSource), - ok = emqx_metrics_worker:create_metrics( - authz_metrics, - TypeName, - [total, allow, deny, nomatch], - [total] - ), - [InitedNewSource] ++ lookup(); -do_post_config_update({?CMD_APPEND, RawNewSource}, Sources) -> - InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)), - lookup() ++ [InitedNewSource]; -do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) -> + NewSources = create_sources([get_source_by_type(TypeName, Sources)]), + NewSources ++ lookup(); +do_post_config_update(?CONF_KEY_PATH, {?CMD_APPEND, RawNewSource}, Sources) -> + NewSources = create_sources([get_source_by_type(type(RawNewSource), Sources)]), + lookup() ++ NewSources; +do_post_config_update(?CONF_KEY_PATH, {{?CMD_REPLACE, Type}, RawNewSource}, Sources) -> OldSources = lookup(), {OldSource, Front, Rear} = take(Type, OldSources), NewSource = get_source_by_type(type(RawNewSource), Sources), InitedSources = update_source(type(RawNewSource), OldSource, NewSource), Front ++ [InitedSources] ++ Rear; -do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> +do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> OldInitedSources = lookup(), {OldSource, Front, Rear} = take(Type, OldInitedSources), - %% delete metrics - ok = emqx_metrics_worker:clear_metrics(authz_metrics, Type), - ok = ensure_resource_deleted(OldSource), - clear_certs(OldSource), + ok = ensure_deleted(OldSource, #{clear_metric => true}), Front ++ Rear; -do_post_config_update({?CMD_REPLACE, _RawNewSources}, Sources) -> - %% overwrite the entire config! - OldInitedSources = lookup(), - lists:foreach(fun ensure_resource_deleted/1, OldInitedSources), - lists:foreach(fun clear_certs/1, OldInitedSources), +do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) -> + overwrite_entire_sources(Sources); +do_post_config_update(?ROOT_KEY, Conf, Conf) -> + #{sources := Sources} = Conf, + Sources; +do_post_config_update(?ROOT_KEY, _Conf, NewConf) -> + #{sources := NewSources} = NewConf, + overwrite_entire_sources(NewSources). + +overwrite_entire_sources(Sources) -> + PrevSources = lookup(), + NewSourcesTypes = lists:map(fun type/1, Sources), + EnsureDelete = fun(S) -> + TypeName = type(S), + Opts = + case lists:member(TypeName, NewSourcesTypes) of + true -> #{clear_metric => false}; + false -> #{clear_metric => true} + end, + ensure_deleted(S, Opts) + end, + lists:foreach(EnsureDelete, PrevSources), create_sources(Sources). %% @doc do source move @@ -238,8 +261,14 @@ do_move({?CMD_MOVE, Type, ?CMD_MOVE_AFTER(After)}, Sources) -> {S2, Front2, Rear2} = take(After, Front1 ++ Rear1), Front2 ++ [S2, S1] ++ Rear2. -ensure_resource_deleted(#{enable := false}) -> +ensure_deleted(#{enable := false}, _) -> ok; +ensure_deleted(Source, #{clear_metric := ClearMetric}) -> + TypeName = type(Source), + ensure_resource_deleted(Source), + clear_certs(Source), + ClearMetric andalso emqx_metrics_worker:clear_metrics(authz_metrics, TypeName). + ensure_resource_deleted(#{type := Type} = Source) -> Module = authz_module(Type), Module:destroy(Source). @@ -287,12 +316,18 @@ update_source(Type, OldSource, NewSource) -> init_metrics(Source) -> TypeName = type(Source), - emqx_metrics_worker:create_metrics( - authz_metrics, - TypeName, - [total, allow, deny, nomatch], - [total] - ). + case emqx_metrics_worker:has_metrics(authz_metrics, TypeName) of + %% Don't reset the metrics if it already exists + true -> + ok; + false -> + emqx_metrics_worker:create_metrics( + authz_metrics, + TypeName, + [total, allow, deny, nomatch], + [total] + ) + end. %%-------------------------------------------------------------------- %% AuthZ callbacks @@ -487,7 +522,9 @@ write_acl_file(#{<<"rules">> := Rules} = Source0) -> ok = check_acl_file_rules(AclPath, Rules), ok = write_file(AclPath, Rules), Source1 = maps:remove(<<"rules">>, Source0), - maps:put(<<"path">>, AclPath, Source1). + maps:put(<<"path">>, AclPath, Source1); +write_acl_file(Source) -> + Source. %% @doc where the acl.conf file is stored. acl_conf_file() -> diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 39c414617..702359509 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -272,9 +272,80 @@ t_update_source(_) -> ], emqx_conf:get([authorization, sources], []) ), + ?assertMatch( + [ + #{type := http, enable := false}, + #{type := mongodb, enable := false}, + #{type := mysql, enable := false}, + #{type := postgresql, enable := false}, + #{type := redis, enable := false}, + #{type := file, enable := false} + ], + emqx_authz:lookup() + ), {ok, _} = emqx_authz:update(?CMD_REPLACE, []). +t_replace_all(_) -> + RootKey = [<<"authorization">>], + Conf = emqx:get_raw_config(RootKey), + emqx_authz_utils:update_config(RootKey, Conf#{ + <<"sources">> => [ + ?SOURCE6, ?SOURCE5, ?SOURCE4, ?SOURCE3, ?SOURCE2, ?SOURCE1 + ] + }), + %% config + ?assertMatch( + [ + #{type := file, enable := true}, + #{type := redis, enable := true}, + #{type := postgresql, enable := true}, + #{type := mysql, enable := true}, + #{type := mongodb, enable := true}, + #{type := http, enable := true} + ], + emqx_conf:get([authorization, sources], []) + ), + %% hooks status + ?assertMatch( + [ + #{type := file, enable := true}, + #{type := redis, enable := true}, + #{type := postgresql, enable := true}, + #{type := mysql, enable := true}, + #{type := mongodb, enable := true}, + #{type := http, enable := true} + ], + emqx_authz:lookup() + ), + Ids = [http, mongodb, mysql, postgresql, redis, file], + %% metrics + lists:foreach( + fun(Id) -> + ?assert(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id) + end, + Ids + ), + + ?assertMatch( + {ok, _}, + emqx_authz_utils:update_config( + RootKey, + Conf#{<<"sources">> => [?SOURCE1#{<<"enable">> => false}]} + ) + ), + %% hooks status + ?assertMatch([#{type := http, enable := false}], emqx_authz:lookup()), + %% metrics + ?assert(emqx_metrics_worker:has_metrics(authz_metrics, http)), + lists:foreach( + fun(Id) -> + ?assertNot(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id) + end, + Ids -- [http] + ), + ok. + t_delete_source(_) -> {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1]), diff --git a/apps/emqx_authz/test/emqx_authz_test_lib.erl b/apps/emqx_authz/test/emqx_authz_test_lib.erl index db604d764..e308e0de7 100644 --- a/apps/emqx_authz/test/emqx_authz_test_lib.erl +++ b/apps/emqx_authz/test/emqx_authz_test_lib.erl @@ -25,21 +25,26 @@ -define(DEFAULT_CHECK_AVAIL_TIMEOUT, 1000). reset_authorizers() -> - reset_authorizers(deny, false). + reset_authorizers(deny, false, []). restore_authorizers() -> - reset_authorizers(allow, true). + reset_authorizers(allow, true, []). -reset_authorizers(Nomatch, ChacheEnabled) -> +reset_authorizers(Nomatch, CacheEnabled, Source) -> {ok, _} = emqx:update_config( [authorization], #{ <<"no_match">> => atom_to_binary(Nomatch), - <<"cache">> => #{<<"enable">> => atom_to_binary(ChacheEnabled)}, - <<"sources">> => [] + <<"cache">> => #{<<"enable">> => atom_to_binary(CacheEnabled)}, + <<"sources">> => Source } ), ok. +%% Don't reset sources +reset_authorizers(Nomatch, CacheEnabled) -> + {ok, _} = emqx:update_config([<<"authorization">>, <<"no_match">>], Nomatch), + {ok, _} = emqx:update_config([<<"authorization">>, <<"cache">>, <<"enable">>], CacheEnabled), + ok. setup_config(BaseConfig, SpecialParams) -> Config = maps:merge(BaseConfig, SpecialParams), diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 46c822ed0..d5d26c770 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -39,7 +39,8 @@ disable_enable/3, remove/2, check_deps_and_remove/3, - list/0 + list/0, + reload_hook/1 ]). -export([ @@ -133,6 +134,10 @@ safe_load_bridge(Type, Name, Conf, Opts) -> }) end. +reload_hook(Bridges) -> + ok = unload_hook(), + ok = load_hook(Bridges). + load_hook() -> Bridges = emqx:get_config([bridges], #{}), load_hook(Bridges). diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index e59259c3e..b226b3b32 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -69,10 +69,32 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> {ok, ConfNew} end. -post_config_update(Path, '$remove', _, OldConf, _AppEnvs) -> - _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf); -post_config_update(Path, _Req, NewConf, OldConf, _AppEnvs) -> +post_config_update([bridges, BridgeType, BridgeName] = Path, '$remove', _, OldConf, _AppEnvs) -> + _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf), + ok = emqx_bridge_resource:remove(BridgeType, BridgeName), + Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([bridges])), + emqx_bridge:reload_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, undefined, _AppEnvs) -> + _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, undefined), + ResOpts = emqx_resource:fetch_creation_opts(NewConf), + ok = emqx_bridge_resource:create(BridgeType, BridgeName, NewConf, ResOpts), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([bridges]), NewConf + ), + emqx_bridge:reload_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, OldConf, _AppEnvs) -> _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, OldConf), + ResOpts = emqx_resource:fetch_creation_opts(NewConf), + ok = emqx_bridge_resource:update(BridgeType, BridgeName, {OldConf, NewConf}, ResOpts), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([bridges]), NewConf + ), + emqx_bridge:reload_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), ok. %% internal functions diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index 1bde274f3..ea3495e0f 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_cassandra, [ {description, "EMQX Enterprise Cassandra Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, ecql]}, {env, []}, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index a3032a9df..98ba587e8 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -480,6 +480,8 @@ prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Co handle_result({error, disconnected}) -> {error, {recoverable_error, disconnected}}; +handle_result({error, ecpool_empty}) -> + {error, {recoverable_error, ecpool_empty}}; handle_result({error, Error}) -> {error, {unrecoverable_error, Error}}; handle_result(Res) -> diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index 72669ba8f..58a92fde4 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_clickhouse, [ {description, "EMQX Enterprise ClickHouse Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [kernel, stdlib, clickhouse, emqx_resource]}, {env, []}, diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index a2de1d3c9..aefd9112f 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -464,7 +464,12 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) -> sql => SQL, reason => ClickhouseErrorResult }), - {error, ClickhouseErrorResult}. + case ClickhouseErrorResult of + {error, ecpool_empty} -> + {error, {recoverable_error, ecpool_empty}}; + _ -> + {error, ClickhouseErrorResult} + end. snabbkaffe_log_return(_Result) -> ?tp( diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index 2d2e299d2..0e202b714 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_dynamo, [ {description, "EMQX Enterprise Dynamo Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, erlcloud]}, {env, []}, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 981c31090..86e816c5d 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -170,7 +170,12 @@ do_query( query => Query, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; _ -> ?tp( dynamo_connector_query_return, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index ad41c9904..378dda543 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -115,14 +115,6 @@ end_per_testcase(_TestCase, _Config) -> delete_all_bridges(), ok. -set_special_configs(emqx_management) -> - Listeners = #{http => #{port => 8081}}, - Config = #{ - listeners => Listeners, - applications => [#{id => "admin", secret => "public"}] - }, - emqx_config:put([emqx_management], Config), - ok; set_special_configs(emqx_dashboard) -> emqx_dashboard_api_test_helpers:set_default_config(), ok; diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src index d001446b3..9037b8840 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_opents, [ {description, "EMQX Enterprise OpenTSDB Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 0366c9dc2..e49f552e9 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -142,7 +142,12 @@ do_query(InstanceId, Query, #{pool_name := PoolName} = State) -> query => Query, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; _ -> ?tp( opents_connector_query_return, diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src index 4a2549f7c..ad96b4744 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_oracle, [ {description, "EMQX Enterprise Oracle Database Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl index 7f384c5e6..500059967 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl @@ -16,7 +16,8 @@ namespace/0, roots/0, fields/1, - desc/1 + desc/1, + config_validator/1 ]). -define(DEFAULT_SQL, << @@ -107,3 +108,12 @@ type_field(Type) -> name_field() -> {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. + +config_validator(#{<<"server">> := Server} = Config) when + not is_map(Server) andalso + not is_map_key(<<"sid">>, Config) andalso + not is_map_key(<<"service_name">>, Config) +-> + {error, "neither SID nor Service Name was set"}; +config_validator(_) -> + ok. diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 721beab6e..2e72458b6 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -517,3 +517,15 @@ t_on_get_status(Config) -> ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), ok. + +t_no_sid_nor_service_name(Config0) -> + OracleConfig0 = ?config(oracle_config, Config0), + OracleConfig1 = maps:remove(<<"sid">>, OracleConfig0), + OracleConfig = maps:remove(<<"service_name">>, OracleConfig1), + NewOracleConfig = {oracle_config, OracleConfig}, + Config = lists:keyreplace(oracle_config, 1, Config0, NewOracleConfig), + ?assertMatch( + {error, #{kind := validation_error}}, + create_bridge(Config) + ), + ok. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 749cb8bc1..cbdcbc845 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -431,11 +431,12 @@ on_query( state => emqx_utils:redact(State) }), MessageData = format_data(PayloadTemplate, Data), - ecpool:pick_and_do( + Res = ecpool:pick_and_do( PoolName, {?MODULE, publish_messages, [Config, [MessageData]]}, no_handover - ). + ), + handle_result(Res). %% emqx_resource callback that is called when a batch query is received @@ -467,11 +468,12 @@ on_batch_query( || Data <- MessagesToInsert ], %% Publish the messages - ecpool:pick_and_do( + Res = ecpool:pick_and_do( PoolName, {?MODULE, publish_messages, [Config, FormattedMessages]}, no_handover - ). + ), + handle_result(Res). publish_messages( {_Connection, Channel}, @@ -543,3 +545,8 @@ format_data([], Msg) -> emqx_utils_json:encode(Msg); format_data(Tokens, Msg) -> emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg). + +handle_result({error, ecpool_empty}) -> + {error, {recoverable_error, ecpool_empty}}; +handle_result(Res) -> + Res. diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index 90047e577..a80aee810 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -13,6 +13,9 @@ % Bridge defaults -define(TOPIC, "TopicTest"). +-define(DENY_TOPIC, "DENY_TOPIC"). +-define(ACCESS_KEY, "RocketMQ"). +-define(SECRET_KEY, "12345678"). -define(BATCH_SIZE, 10). -define(PAYLOAD, <<"HELLO">>). @@ -25,17 +28,19 @@ all() -> [ {group, async}, - {group, sync} + {group, sync}, + {group, acl} ]. groups() -> - TCs = emqx_common_test_helpers:all(?MODULE), + TCs = emqx_common_test_helpers:all(?MODULE) -- [t_acl_deny], BatchingGroups = [{group, with_batch}, {group, without_batch}], [ {async, BatchingGroups}, {sync, BatchingGroups}, {with_batch, TCs}, - {without_batch, TCs} + {without_batch, TCs}, + {acl, [t_acl_deny]} ]. init_per_group(async, Config) -> @@ -48,6 +53,9 @@ init_per_group(with_batch, Config0) -> init_per_group(without_batch, Config0) -> Config = [{batch_size, 1} | Config0], common_init(Config); +init_per_group(acl, Config0) -> + Config = [{batch_size, 1}, {query_mode, sync} | Config0], + common_init(Config); init_per_group(_Group, Config) -> Config. @@ -137,6 +145,8 @@ rocketmq_config(BridgeType, Config) -> "bridges.~s.~s {\n" " enable = true\n" " servers = ~p\n" + " access_key = ~p\n" + " secret_key = ~p\n" " topic = ~p\n" " resource_opts = {\n" " request_timeout = 1500ms\n" @@ -148,6 +158,8 @@ rocketmq_config(BridgeType, Config) -> BridgeType, Name, Server, + ?ACCESS_KEY, + ?SECRET_KEY, ?TOPIC, BatchSize, QueryMode @@ -271,3 +283,29 @@ t_simple_query(Config) -> Result = query_resource(Config, Request), ?assertEqual(ok, Result), ok. + +t_acl_deny(Config0) -> + RocketCfg = ?GET_CONFIG(rocketmq_config, Config0), + RocketCfg2 = RocketCfg#{<<"topic">> := ?DENY_TOPIC}, + Config = lists:keyreplace(rocketmq_config, 1, Config0, {rocketmq_config, RocketCfg2}), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + SentData = #{payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertMatch({error, #{<<"code">> := 1}}, send_message(Config, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{error := #{<<"code">> := 1}}], Trace), + ok + end + ), + ok. diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index a0b4e287b..e5c5ae73d 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_sqlserver, [ {description, "EMQX Enterprise SQL Server Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, odbc]}, {env, []}, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index ed8134051..341f89852 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -336,6 +336,7 @@ conn_str([{_, _} | Opts], Acc) -> ) -> {ok, list()} | {error, {recoverable_error, term()}} + | {error, {unrecoverable_error, term()}} | {error, term()}. do_query( ResourceId, @@ -374,7 +375,12 @@ do_query( query => Query, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; _ -> ?tp( sqlserver_connector_query_return, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 25fa3a84d..aed5e5e0a 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -200,7 +200,12 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> job => Job, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; _ -> ?tp( tdengine_connector_query_return, diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 8e109a1e6..9240d2116 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -18,16 +18,40 @@ -export([ load/0, admins/1, + conf/1, unload/0 ]). --define(CMD, cluster_call). +-define(CLUSTER_CALL, cluster_call). +-define(CONF, conf). load() -> - emqx_ctl:register_command(?CMD, {?MODULE, admins}, []). + emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, []), + emqx_ctl:register_command(?CONF, {?MODULE, conf}, []). unload() -> - emqx_ctl:unregister_command(?CMD). + emqx_ctl:unregister_command(?CLUSTER_CALL), + emqx_ctl:unregister_command(?CONF). + +conf(["show", "--keys-only"]) -> + print(emqx_config:get_root_names()); +conf(["show"]) -> + print_hocon(get_config()); +conf(["show", Key]) -> + print_hocon(get_config(Key)); +conf(["load", Path]) -> + load_config(Path); +conf(_) -> + emqx_ctl:usage( + [ + %% TODO add reload + %{"conf reload", "reload etc/emqx.conf on local node"}, + {"conf show --keys-only", "print all keys"}, + {"conf show", "print all running configures"}, + {"conf show ", "print a specific configuration"}, + {"conf load ", "load a hocon file to all nodes"} + ] + ). admins(["status"]) -> status(); @@ -43,7 +67,7 @@ admins(["skip", Node0]) -> status(); admins(["tnxid", TnxId0]) -> TnxId = list_to_integer(TnxId0), - emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]); + print(emqx_cluster_rpc:query(TnxId)); admins(["fast_forward"]) -> status(), Nodes = mria:running_nodes(), @@ -91,3 +115,30 @@ status() -> Status ), emqx_ctl:print("-----------------------------------------------\n"). + +print(Json) -> + emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Json)]). + +print_hocon(Hocon) -> + emqx_ctl:print("~ts~n", [hocon_pp:do(Hocon, #{})]). + +get_config() -> emqx_config:fill_defaults(emqx:get_raw_config([])). +get_config(Key) -> emqx_config:fill_defaults(#{Key => emqx:get_raw_config([Key])}). + +-define(OPTIONS, #{rawconf_with_defaults => true, override_to => cluster}). +load_config(Path) -> + case hocon:files([Path]) of + {ok, Conf} -> + maps:foreach( + fun(Key, Value) -> + case emqx_conf:update([Key], Value, ?OPTIONS) of + {ok, _} -> emqx_ctl:print("load ~ts ok~n", [Key]); + {error, Reason} -> emqx_ctl:print("load ~ts failed: ~p~n", [Key, Reason]) + end + end, + Conf + ); + {error, Reason} -> + emqx_ctl:print("load ~ts failed~n~p~n", [Path, Reason]), + {error, bad_hocon_file} + end. diff --git a/apps/emqx_conf/test/emqx_global_gc_SUITE.erl b/apps/emqx_conf/test/emqx_global_gc_SUITE.erl index 36639f078..ec1e20b3d 100644 --- a/apps/emqx_conf/test/emqx_global_gc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_global_gc_SUITE.erl @@ -23,14 +23,31 @@ all() -> emqx_common_test_helpers:all(?MODULE). +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_config:erase_all(), + ok. + t_run_gc(_) -> - ok = emqx_config:put([node, global_gc_interval], 1000), + Conf0 = #{ + node => #{ + cookie => <<"cookie">>, + data_dir => <<"data">>, + global_gc_interval => 1000 + } + }, + emqx_common_test_helpers:load_config(emqx_conf_schema, Conf0), + ?assertEqual({ok, 1000}, application:get_env(emqx_machine, global_gc_interval)), {ok, _} = emqx_global_gc:start_link(), ok = timer:sleep(1500), {ok, MilliSecs} = emqx_global_gc:run(), - ct:print("Global GC: ~w(ms)~n", [MilliSecs]), + ct:pal("Global GC: ~w(ms)~n", [MilliSecs]), emqx_global_gc:stop(), - ok = emqx_config:put([node, global_gc_interval], disabled), + Conf1 = emqx_utils_maps:deep_put([node, global_gc_interval], Conf0, disabled), + emqx_common_test_helpers:load_config(emqx_conf_schema, Conf1), {ok, Pid} = emqx_global_gc:start_link(), ?assertMatch(#{timer := undefined}, sys:get_state(Pid)), + ?assertEqual({ok, disabled}, application:get_env(emqx_machine, global_gc_interval)), ok. diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 84048901f..1d969e6f1 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -135,11 +135,16 @@ on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = request => Request, connector => InstId, reason => Reason - }); + }), + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; _ -> - ok - end, - Result. + Result + end. on_get_status(_InstId, _State) -> connected. diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 3657bb74c..5b63daef3 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -233,6 +233,8 @@ on_query( connector => InstId }), {error, Reason}; + {error, ecpool_empty} -> + {error, {recoverable_error, ecpool_empty}}; {{true, _Info}, _Document} -> ok end; @@ -261,7 +263,12 @@ on_query( reason => Reason, connector => InstId }), - {error, Reason}; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + {error, Reason} + end; {ok, Cursor} when is_pid(Cursor) -> {ok, mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)}; Result -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index ba80d0c1d..a2c8df3d3 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -241,7 +241,12 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> sql => NameOrSQL, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; Result -> ?tp( pgsql_connector_query_return, diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index e7b474326..89aab0a8c 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -220,6 +220,8 @@ do_query(InstId, Query, #{pool_name := PoolName, type := Type} = State) -> case is_unrecoverable_error(Reason) of true -> {error, {unrecoverable_error, Reason}}; + false when Reason =:= ecpool_empty -> + {error, {recoverable_error, Reason}}; false -> Result end; diff --git a/apps/emqx_ctl/src/emqx_ctl.app.src b/apps/emqx_ctl/src/emqx_ctl.app.src index c3abade67..1196f17a5 100644 --- a/apps/emqx_ctl/src/emqx_ctl.app.src +++ b/apps/emqx_ctl/src/emqx_ctl.app.src @@ -1,6 +1,6 @@ {application, emqx_ctl, [ {description, "Backend for emqx_ctl script"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {mod, {emqx_ctl_app, []}}, {applications, [ diff --git a/apps/emqx_ctl/src/emqx_ctl.erl b/apps/emqx_ctl/src/emqx_ctl.erl index 6123056b9..d2ced7268 100644 --- a/apps/emqx_ctl/src/emqx_ctl.erl +++ b/apps/emqx_ctl/src/emqx_ctl.erl @@ -128,16 +128,21 @@ run_command(Cmd, Args) when is_atom(Cmd) -> }), {error, Reason} end; - [] -> + Error -> help(), - {error, cmd_not_found} + Error end. -spec lookup_command(cmd()) -> [{module(), atom()}]. lookup_command(Cmd) when is_atom(Cmd) -> - case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of - [El] -> El; - [] -> [] + case is_initialized() of + true -> + case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of + [El] -> El; + [] -> {error, cmd_not_found} + end; + false -> + {error, cmd_is_initializing} end. -spec get_commands() -> list({cmd(), module(), atom()}). @@ -145,18 +150,23 @@ get_commands() -> [{Cmd, M, F} || {{_Seq, Cmd}, {M, F}, _Opts} <- ets:tab2list(?CMD_TAB)]. help() -> - case ets:tab2list(?CMD_TAB) of - [] -> - print("No commands available.~n"); - Cmds -> - print("Usage: ~ts~n", ["emqx ctl"]), - lists:foreach( - fun({_, {Mod, Cmd}, _}) -> - print("~110..-s~n", [""]), - apply(Mod, Cmd, [usage]) - end, - Cmds - ) + case is_initialized() of + true -> + case ets:tab2list(?CMD_TAB) of + [] -> + print("No commands available.~n"); + Cmds -> + print("Usage: ~ts~n", ["emqx ctl"]), + lists:foreach( + fun({_, {Mod, Cmd}, _}) -> + print("~110..-s~n", [""]), + apply(Mod, Cmd, [usage]) + end, + Cmds + ) + end; + false -> + print("Command table is initializing.~n") end. -spec print(io:format()) -> ok. @@ -279,3 +289,6 @@ safe_to_existing_atom(Str) -> _:badarg -> undefined end. + +is_initialized() -> + ets:info(?CMD_TAB) =/= undefined. diff --git a/apps/emqx_ctl/test/emqx_ctl_SUITE.erl b/apps/emqx_ctl/test/emqx_ctl_SUITE.erl index 46d9008e8..c11a1d5cb 100644 --- a/apps/emqx_ctl/test/emqx_ctl_SUITE.erl +++ b/apps/emqx_ctl/test/emqx_ctl_SUITE.erl @@ -49,8 +49,8 @@ t_reg_unreg_command(_) -> emqx_ctl:unregister_command(cmd1), emqx_ctl:unregister_command(cmd2), ct:sleep(100), - ?assertEqual([], emqx_ctl:lookup_command(cmd1)), - ?assertEqual([], emqx_ctl:lookup_command(cmd2)), + ?assertEqual({error, cmd_not_found}, emqx_ctl:lookup_command(cmd1)), + ?assertEqual({error, cmd_not_found}, emqx_ctl:lookup_command(cmd2)), ?assertEqual([], emqx_ctl:get_commands()) end ). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index 28bfb709a..e2b02edab 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -62,9 +62,8 @@ fields("dashboard") -> #{ desc => ?DESC(bootstrap_users_file), required => false, - importance => ?IMPORTANCE_HIDDEN, - default => <<>> - %% deprecated => {since, "5.1.0"} + default => <<>>, + deprecated => {since, "5.1.0"} } )} ]; diff --git a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl index 3d354d289..e60f7318f 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl @@ -33,8 +33,9 @@ init_per_suite(Config) -> _ = emqx_mgmt_api_test_util:init_suite([emqx_conf]), Config. -end_per_suite(Config) -> - emqx_mgmt_api_test_util:end_suite([emqx_conf]). +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite([emqx_conf]), + ok. t_object(_Config) -> Spec = #{ diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 898203b51..34dfc09a7 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -45,7 +45,8 @@ offset/0, filemeta/0, segment/0, - checksum/0 + checksum/0, + finopts/0 ]). %% Number of bytes @@ -80,6 +81,10 @@ -type segment() :: {offset(), _Content :: binary()}. +-type finopts() :: #{ + checksum => checksum() +}. + %%-------------------------------------------------------------------- %% API for app %%-------------------------------------------------------------------- @@ -170,8 +175,8 @@ on_file_command(PacketId, FileId, Msg, FileCommand) -> ChecksumBin = emqx_maybe:from_list(MaybeChecksum), validate( [{size, FinalSizeBin}, {{maybe, checksum}, ChecksumBin}], - fun([FinalSize, Checksum]) -> - on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) + fun([FinalSize, FinalChecksum]) -> + on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) end ); [<<"abort">>] -> @@ -251,13 +256,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> end end). -on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> +on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) -> ?tp(info, "file_transfer_fin", #{ mqtt_msg => Msg, packet_id => PacketId, transfer => Transfer, final_size => FinalSize, - checksum => Checksum + checksum => FinalChecksum }), %% TODO: handle checksum? Do we need it? FinPacketKey = {self(), PacketId}, @@ -265,7 +270,7 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result) end, with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() -> - case assemble(Transfer, FinalSize) of + case assemble(Transfer, FinalSize, FinalChecksum) of %% Assembling completed, ack through the responder right away ok -> emqx_ft_responder:ack(FinPacketKey, ok); @@ -314,9 +319,10 @@ store_segment(Transfer, Segment) -> {error, {internal_error, E}} end. -assemble(Transfer, FinalSize) -> +assemble(Transfer, FinalSize, FinalChecksum) -> try - emqx_ft_storage:assemble(Transfer, FinalSize) + FinOpts = [{checksum, FinalChecksum} || FinalChecksum /= undefined], + emqx_ft_storage:assemble(Transfer, FinalSize, maps:from_list(FinOpts)) catch C:E:S -> ?tp(error, "start_assemble_failed", #{ @@ -397,8 +403,8 @@ do_validate([{checksum, Checksum} | Rest], Parsed) -> {error, _Reason} -> {error, {invalid_checksum, Checksum}} end; -do_validate([{integrity, Payload, Checksum} | Rest], Parsed) -> - case crypto:hash(sha256, Payload) of +do_validate([{integrity, Payload, {Algo, Checksum}} | Rest], Parsed) -> + case crypto:hash(Algo, Payload) of Checksum -> do_validate(Rest, [Payload | Parsed]); Mismatch -> @@ -411,7 +417,7 @@ do_validate([{{maybe, T}, Value} | Rest], Parsed) -> parse_checksum(Checksum) when is_binary(Checksum) andalso byte_size(Checksum) =:= 64 -> try - {ok, binary:decode_hex(Checksum)} + {ok, {sha256, binary:decode_hex(Checksum)}} catch error:badarg -> {error, invalid_checksum} diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 873efc6ff..c96df224c 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -16,7 +16,7 @@ -module(emqx_ft_assembler). --export([start_link/3]). +-export([start_link/4]). -behaviour(gen_statem). -export([callback_mode/0]). @@ -29,6 +29,7 @@ -type stdata() :: #{ storage := emqx_ft_storage_fs:storage(), transfer := emqx_ft:transfer(), + finopts := emqx_ft:finopts(), assembly := emqx_ft_assembly:t(), export => emqx_ft_storage_exporter:export() }. @@ -38,8 +39,8 @@ %% -start_link(Storage, Transfer, Size) -> - gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []). +start_link(Storage, Transfer, Size, Opts) -> + gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size, Opts}, []). where(Transfer) -> gproc:where(?NAME(Transfer)). @@ -60,11 +61,12 @@ callback_mode() -> handle_event_function. -spec init(_Args) -> {ok, state(), stdata()}. -init({Storage, Transfer, Size}) -> +init({Storage, Transfer, Size, Opts}) -> _ = erlang:process_flag(trap_exit, true), St = #{ storage => Storage, transfer => Transfer, + finopts => Opts, assembly => emqx_ft_assembly:new(Size) }, {ok, idle, St}. @@ -164,8 +166,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export := end; handle_event(internal, _, {assemble, []}, St = #{}) -> {next_state, complete, St, ?internal([])}; -handle_event(internal, _, complete, St = #{export := Export}) -> - Result = emqx_ft_storage_exporter:complete(Export), +handle_event(internal, _, complete, St = #{export := Export, finopts := Opts}) -> + Result = emqx_ft_storage_exporter:complete(Export, Opts), _ = maybe_garbage_collect(Result, St), {stop, {shutdown, Result}, maps:remove(export, St)}. diff --git a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl index 4ba65c290..e6e689a8c 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl @@ -17,7 +17,7 @@ -module(emqx_ft_assembler_sup). -export([start_link/0]). --export([ensure_child/3]). +-export([ensure_child/4]). -behaviour(supervisor). -export([init/1]). @@ -25,10 +25,10 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -ensure_child(Storage, Transfer, Size) -> +ensure_child(Storage, Transfer, Size, Opts) -> Childspec = #{ id => Transfer, - start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size]}, + start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size, Opts]}, restart => temporary }, case supervisor:start_child(?MODULE, Childspec) of diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 4e1060d88..007b47db9 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -20,7 +20,7 @@ [ store_filemeta/2, store_segment/2, - assemble/2, + assemble/3, files/0, files/1, @@ -88,7 +88,7 @@ ok | {async, pid()} | {error, term()}. -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) -> ok | {async, pid()} | {error, term()}. --callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) -> +-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes(), emqx_ft:finopts()) -> ok | {async, pid()} | {error, term()}. -callback files(storage(), query(Cursor)) -> @@ -114,10 +114,10 @@ store_filemeta(Transfer, FileMeta) -> store_segment(Transfer, Segment) -> dispatch(store_segment, [Transfer, Segment]). --spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) -> +-spec assemble(emqx_ft:transfer(), emqx_ft:bytes(), emqx_ft:finopts()) -> ok | {async, pid()} | {error, term()}. -assemble(Transfer, Size) -> - dispatch(assemble, [Transfer, Size]). +assemble(Transfer, Size, FinOpts) -> + dispatch(assemble, [Transfer, Size, FinOpts]). -spec files() -> {ok, page(file_info(), _)} | {error, term()}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl index 601f8b112..6f2b9bea1 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -24,7 +24,7 @@ %% Export API -export([start_export/3]). -export([write/2]). --export([complete/1]). +-export([complete/2]). -export([discard/1]). %% Listing API @@ -117,12 +117,19 @@ write(#{mod := ExporterMod, st := ExportSt, hash := Hash} = Export, Content) -> Error end. --spec complete(export()) -> +-spec complete(export(), emqx_ft:finopts()) -> ok | {error, _Reason}. -complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}) -> - case verify_checksum(Hash, Filemeta) of - {ok, Checksum} -> - ExporterMod:complete(ExportSt, Checksum); +complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}, Opts) -> + Checksum = emqx_maybe:define( + % NOTE + % Checksum in `Opts` takes precedence over one in `Filemeta` according to the spec. + % We do not care if they differ. + maps:get(checksum, Opts, undefined), + maps:get(checksum, Filemeta, undefined) + ), + case verify_checksum(Hash, Checksum) of + {ok, ExportChecksum} -> + ExporterMod:complete(ExportSt, ExportChecksum); {error, _} = Error -> _ = ExporterMod:discard(ExportSt), Error @@ -183,13 +190,13 @@ init_checksum(#{}) -> update_checksum(Ctx, IoData) -> crypto:hash_update(Ctx, IoData). -verify_checksum(Ctx, #{checksum := {Algo, Digest} = Checksum}) -> +verify_checksum(Ctx, {Algo, Digest} = Checksum) -> case crypto:hash_final(Ctx) of Digest -> {ok, Checksum}; Mismatch -> {error, {checksum, Algo, binary:encode_hex(Mismatch)}} end; -verify_checksum(Ctx, #{}) -> +verify_checksum(Ctx, undefined) -> Digest = crypto:hash_final(Ctx), {ok, {sha256, Digest}}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 7a0a6b3b4..99720f521 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -36,7 +36,7 @@ -export([list/3]). -export([pread/5]). -export([lookup_local_assembler/1]). --export([assemble/3]). +-export([assemble/4]). -export([transfers/1]). @@ -211,14 +211,14 @@ pread(_Storage, _Transfer, Frag, Offset, Size) -> {error, Reason} end. --spec assemble(storage(), transfer(), emqx_ft:bytes()) -> +-spec assemble(storage(), transfer(), emqx_ft:bytes(), emqx_ft:finopts()) -> {async, _Assembler :: pid()} | ok | {error, _TODO}. -assemble(Storage, Transfer, Size) -> +assemble(Storage, Transfer, Size, Opts) -> LookupSources = [ fun() -> lookup_local_assembler(Transfer) end, fun() -> lookup_remote_assembler(Transfer) end, fun() -> check_if_already_exported(Storage, Transfer) end, - fun() -> ensure_local_assembler(Storage, Transfer, Size) end + fun() -> ensure_local_assembler(Storage, Transfer, Size, Opts) end ], lookup_assembler(LookupSources). @@ -295,8 +295,8 @@ lookup_remote_assembler(Transfer) -> _ -> {error, not_found} end. -ensure_local_assembler(Storage, Transfer, Size) -> - {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size), +ensure_local_assembler(Storage, Transfer, Size, Opts) -> + {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size, Opts), {async, Pid}. -spec transfers(storage()) -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index e582db01f..ae274cd86 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -159,6 +159,10 @@ t_invalid_topic_format(Config) -> unspecified_error, emqtt:publish(C, <<"$file/fileid/fin/offset">>, <<>>, 1) ), + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file/fileid/fin/42/xyz">>, <<>>, 1) + ), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/">>, <<>>, 1) @@ -390,9 +394,18 @@ t_invalid_checksum(Config) -> with_offsets(Data) ), + % Send `fin` w/o checksum, should fail since filemeta checksum is invalid + FinTopic = mk_fin_topic(FileId, Filesize), ?assertRCName( unspecified_error, - emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1) + emqtt:publish(C, FinTopic, <<>>, 1) + ), + + % Send `fin` with the correct checksum + Checksum = binary:encode_hex(sha256(Data)), + ?assertRCName( + success, + emqtt:publish(C, <>, <<>>, 1) ). t_corrupted_segment_retry(Config) -> @@ -507,7 +520,7 @@ t_assemble_crash(Config) -> C = ?config(client, Config), meck:new(emqx_ft_storage_fs), - meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end), + meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end), ?assertRCName( unspecified_error, diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index c1deeb3bc..1dcc8a79d 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -178,7 +178,7 @@ complete_assemble(Storage, Transfer, Size) -> complete_assemble(Storage, Transfer, Size, 1000). complete_assemble(Storage, Transfer, Size, Timeout) -> - {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size), + {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}), MRef = erlang:monitor(process, Pid), Pid ! kickoff, receive diff --git a/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl index e717fe262..9e6050f36 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl @@ -110,9 +110,13 @@ t_upload_error(Config) -> Name = "cool_name", Data = <<"data"/utf8>>, - {ok, _} = emqx_conf:update( - [file_transfer, storage, local, exporter, s3, bucket], <<"invalid-bucket">>, #{} + Conf = emqx_conf:get_raw([file_transfer], #{}), + Conf1 = emqx_utils_maps:deep_put( + [<<"storage">>, <<"local">>, <<"exporter">>, <<"s3">>, <<"bucket">>], + Conf, + <<"invalid-bucket">> ), + {ok, _} = emqx_conf:update([file_transfer], Conf1, #{}), ?assertEqual( {error, unspecified_error}, diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index 842ae6bad..12f91c808 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -381,7 +381,7 @@ complete_transfer(Storage, Transfer, Size) -> complete_transfer(Storage, Transfer, Size, 100). complete_transfer(Storage, Transfer, Size, Timeout) -> - case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of + case emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}) of ok -> ok; {async, Pid} -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 92814d112..f50e44771 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -322,7 +322,8 @@ validate_name(Name) -> %% API CallBack Begin list_plugins(get, _) -> - {Plugins, []} = emqx_mgmt_api_plugins_proto_v1:get_plugins(), + Nodes = emqx:running_nodes(), + {Plugins, []} = emqx_mgmt_api_plugins_proto_v2:get_plugins(Nodes), {200, format_plugins(Plugins)}. get_plugins() -> @@ -373,7 +374,8 @@ upload_install(post, #{}) -> do_install_package(FileName, Bin) -> %% TODO: handle bad nodes - {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin), + Nodes = emqx:running_nodes(), + {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v2:install_package(Nodes, FileName, Bin), case lists:filter(fun(R) -> R =/= ok end, Res) of [] -> {200}; @@ -386,7 +388,11 @@ do_install_package(FileName, Bin) -> end, Filtered ), - {error, #{error := Reason}} = hd(Filtered), + Reason = + case hd(Filtered) of + {error, #{error := Reason0}} -> Reason0; + {error, #{reason := Reason0}} -> Reason0 + end, {400, #{ code => 'BAD_PLUGIN_INFO', message => iolist_to_binary([Reason, ":", FileName]) @@ -394,17 +400,18 @@ do_install_package(FileName, Bin) -> end. plugin(get, #{bindings := #{name := Name}}) -> - {Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name), + Nodes = emqx:running_nodes(), + {Plugins, _} = emqx_mgmt_api_plugins_proto_v2:describe_package(Nodes, Name), case format_plugins(Plugins) of [Plugin] -> {200, Plugin}; [] -> {404, #{code => 'NOT_FOUND', message => Name}} end; plugin(delete, #{bindings := #{name := Name}}) -> - Res = emqx_mgmt_api_plugins_proto_v1:delete_package(Name), + Res = emqx_mgmt_api_plugins_proto_v2:delete_package(Name), return(204, Res). update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> - Res = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action), + Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action), return(204, Res). update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> diff --git a/apps/emqx_management/src/emqx_mgmt_app.erl b/apps/emqx_management/src/emqx_mgmt_app.erl index b4cf9091a..2d48ed662 100644 --- a/apps/emqx_management/src/emqx_mgmt_app.erl +++ b/apps/emqx_management/src/emqx_mgmt_app.erl @@ -31,10 +31,12 @@ start(_Type, _Args) -> ok = mria_rlog:wait_for_shards([?MANAGEMENT_SHARD], infinity), case emqx_mgmt_auth:init_bootstrap_file() of ok -> + emqx_conf:add_handler([api_key], emqx_mgmt_auth), emqx_mgmt_sup:start_link(); {error, Reason} -> {error, Reason} end. stop(_State) -> + emqx_conf:remove_handler([api_key]), ok. diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index 12a7a6641..ffb41179f 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -20,6 +20,7 @@ %% API -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). +-behaviour(emqx_config_handler). -export([ create/4, @@ -31,6 +32,7 @@ ]). -export([authorize/3]). +-export([post_config_update/5]). %% Internal exports (RPC) -export([ @@ -65,6 +67,17 @@ mnesia(boot) -> {attributes, record_info(fields, ?APP)} ]). +post_config_update([api_key], _Req, NewConf, _OldConf, _AppEnvs) -> + #{bootstrap_file := File} = NewConf, + case init_bootstrap_file(File) of + ok -> + ?SLOG(debug, #{msg => "init_bootstrap_api_keys_from_file_ok", file => File}); + {error, Reason} -> + Msg = "init_bootstrap_api_keys_from_file_failed", + ?SLOG(error, #{msg => Msg, reason => Reason, file => File}) + end, + ok. + -spec init_bootstrap_file() -> ok | {error, _}. init_bootstrap_file() -> File = bootstrap_file(), @@ -230,13 +243,7 @@ generate_api_secret() -> emqx_base62:encode(Random). bootstrap_file() -> - case emqx:get_config([api_key, bootstrap_file], <<>>) of - %% For compatible remove until 5.1.0 - <<>> -> - emqx:get_config([dashboard, bootstrap_users_file], <<>>); - File -> - File - end. + emqx:get_config([api_key, bootstrap_file], <<>>). init_bootstrap_file(<<>>) -> ok; diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl index d1fbf6706..c415223d0 100644 --- a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl @@ -19,6 +19,8 @@ -export([ introduced_in/0, + deprecated_since/0, + get_plugins/0, install_package/2, describe_package/1, @@ -31,6 +33,9 @@ introduced_in() -> "5.0.0". +deprecated_since() -> + "5.1.0". + -spec get_plugins() -> emqx_rpc:multicall_result(). get_plugins() -> rpc:multicall(emqx_mgmt_api_plugins, get_plugins, [], 15000). diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl new file mode 100644 index 000000000..d0354474b --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl @@ -0,0 +1,52 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_plugins_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_plugins/1, + install_package/3, + describe_package/2, + delete_package/1, + ensure_action/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.1.0". + +-spec get_plugins([node()]) -> emqx_rpc:multicall_result(). +get_plugins(Nodes) -> + rpc:multicall(Nodes, emqx_mgmt_api_plugins, get_plugins, [], 15000). + +-spec install_package([node()], binary() | string(), binary()) -> emqx_rpc:multicall_result(). +install_package(Nodes, Filename, Bin) -> + rpc:multicall(Nodes, emqx_mgmt_api_plugins, install_package, [Filename, Bin], 25000). + +-spec describe_package([node()], binary() | string()) -> emqx_rpc:multicall_result(). +describe_package(Nodes, Name) -> + rpc:multicall(Nodes, emqx_mgmt_api_plugins, describe_package, [Name], 10000). + +-spec delete_package(binary() | string()) -> ok | {error, any()}. +delete_package(Name) -> + emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000). + +-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}. +ensure_action(Name, Action) -> + emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000). diff --git a/apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl index 1a396d795..2a78f76fc 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl @@ -29,19 +29,18 @@ groups() -> ]. init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite([emqx_conf]), + emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]), Config. end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite([emqx_conf]). + emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_management]). t_bootstrap_file(_) -> TestPath = <<"/api/v5/status">>, Bin = <<"test-1:secret-1\ntest-2:secret-2">>, File = "./bootstrap_api_keys.txt", ok = file:write_file(File, Bin), - emqx:update_config([api_key, bootstrap_file], File), - ok = emqx_mgmt_auth:init_bootstrap_file(), + update_file(File), ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"secret-1">>)), ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"secret-2">>)), ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"secret-1">>)), @@ -49,39 +48,33 @@ t_bootstrap_file(_) -> %% relaunch to check if the table is changed. Bin1 = <<"test-1:new-secret-1\ntest-2:new-secret-2">>, ok = file:write_file(File, Bin1), - ok = emqx_mgmt_auth:init_bootstrap_file(), + update_file(File), ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"secret-1">>)), ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"secret-2">>)), ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"new-secret-1">>)), ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"new-secret-2">>)), - %% Compatibility - Bin2 = <<"test-3:new-secret-3\ntest-4:new-secret-4">>, - ok = file:write_file(File, Bin2), - emqx:update_config([api_key, bootstrap_file], <<>>), - emqx:update_config([dashboard, bootstrap_users_file], File), - ok = emqx_mgmt_auth:init_bootstrap_file(), - ?assertMatch(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"new-secret-1">>)), - ?assertMatch(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"new-secret-2">>)), - ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-3">>, <<"new-secret-3">>)), - ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-4">>, <<"new-secret-4">>)), - - %% not found - NotFoundFile = "./bootstrap_apps_not_exist.txt", - emqx:update_config([api_key, bootstrap_file], NotFoundFile), - ?assertMatch({error, "No such file or directory"}, emqx_mgmt_auth:init_bootstrap_file()), + %% not error when bootstrap_file is empty + update_file(<<>>), + update_file("./bootstrap_apps_not_exist.txt"), + ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"secret-1">>)), + ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"secret-2">>)), + ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"new-secret-1">>)), + ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"new-secret-2">>)), %% bad format BadBin = <<"test-1:secret-11\ntest-2 secret-12">>, ok = file:write_file(File, BadBin), - emqx:update_config([api_key, bootstrap_file], File), + update_file(File), ?assertMatch({error, #{reason := "invalid_format"}}, emqx_mgmt_auth:init_bootstrap_file()), ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"secret-11">>)), ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"secret-12">>)), - emqx:update_config([api_key, bootstrap_file], <<>>), - emqx:update_config([dashboard, bootstrap_users_file], <<>>), + update_file(<<>>), ok. +update_file(File) -> + ?assertMatch({ok, _}, emqx:update_config([<<"api_key">>], #{<<"bootstrap_file">> => File})). + t_create(_Config) -> Name = <<"EMQX-API-KEY-1">>, {ok, Create} = create_app(Name), diff --git a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl index f49663682..33292e54e 100644 --- a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl @@ -24,20 +24,11 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - mria:start(), - ok = emqx_common_test_helpers:start_apps([emqx_management]), - emqx_common_test_helpers:start_apps([] ++ [emqx_dashboard], fun set_special_configs/1), + emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]), Config. end_per_suite(_) -> - emqx_common_test_helpers:stop_apps([emqx_management] ++ [emqx_dashboard]), - emqx_config:delete_override_conf_files(), - ok. - -set_special_configs(emqx_dashboard) -> - emqx_dashboard_api_test_helpers:set_default_config(); -set_special_configs(_App) -> - ok. + emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]). t_status(_Config) -> emqx_ctl:run_command([]), diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src index 3beda05a4..10dbe7990 100644 --- a/apps/emqx_oracle/src/emqx_oracle.app.src +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_oracle, [ {description, "EMQX Enterprise Oracle Database Connector"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index c5d8ecc77..ae2128a7e 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -222,7 +222,12 @@ on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) -> sql => NameOrSQL, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; Result -> ?tp( oracle_connector_query_return, diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index 14d6d06fc..8d168ec8b 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -47,7 +47,8 @@ groups() -> [ {copy_plugin, [sequence], [ group_t_copy_plugin_to_a_new_node, - group_t_copy_plugin_to_a_new_node_single_node + group_t_copy_plugin_to_a_new_node_single_node, + group_t_cluster_leave ]}, {create_tar_copy_plugin, [sequence], [group_t_copy_plugin_to_a_new_node]} ]. @@ -676,6 +677,86 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) -> ), ok. +group_t_cluster_leave({init, Config}) -> + PrivDataDir = ?config(priv_dir, Config), + ToInstallDir = filename:join(PrivDataDir, "plugins_copy_to"), + file:del_dir_r(ToInstallDir), + ok = filelib:ensure_path(ToInstallDir), + #{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir), + NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), + Cluster = + emqx_common_test_helpers:emqx_cluster( + [core, core], + #{ + apps => [emqx_conf, emqx_plugins], + env => [ + {emqx, init_config_load_done, false}, + {emqx, boot_modules, []} + ], + env_handler => fun + (emqx_plugins) -> + ok = emqx_plugins:put_config(install_dir, ToInstallDir), + %% this is to simulate an user setting the state + %% via environment variables before starting the node + ok = emqx_plugins:put_config( + states, + [#{name_vsn => NameVsn, enable => true}] + ), + ok; + (_) -> + ok + end, + priv_data_dir => PrivDataDir, + schema_mod => emqx_conf_schema, + peer_mod => slave, + load_schema => true + } + ), + Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster], + [ + {to_install_dir, ToInstallDir}, + {cluster, Cluster}, + {nodes, Nodes}, + {name_vsn, NameVsn}, + {plugin_name, PluginName} + | Config + ]; +group_t_cluster_leave({'end', Config}) -> + Nodes = proplists:get_value(nodes, Config), + [ok = emqx_common_test_helpers:stop_slave(N) || N <- Nodes], + ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)), + ok; +group_t_cluster_leave(Config) -> + [N1, N2] = ?config(nodes, Config), + NameVsn = proplists:get_value(name_vsn, Config), + ok = erpc:call(N1, emqx_plugins, ensure_installed, [NameVsn]), + ok = erpc:call(N1, emqx_plugins, ensure_started, [NameVsn]), + ok = erpc:call(N1, emqx_plugins, ensure_enabled, [NameVsn]), + Params = unused, + %% 2 nodes running + ?assertMatch( + {200, [#{running_status := [#{status := running}, #{status := running}]}]}, + erpc:call(N1, emqx_mgmt_api_plugins, list_plugins, [get, Params]) + ), + ?assertMatch( + {200, [#{running_status := [#{status := running}, #{status := running}]}]}, + erpc:call(N2, emqx_mgmt_api_plugins, list_plugins, [get, Params]) + ), + + %% Now, one node leaves the cluster. + ok = erpc:call(N2, ekka, leave, []), + + %% Each node will no longer ask the plugin status to the other. + ?assertMatch( + {200, [#{running_status := [#{node := N1, status := running}]}]}, + erpc:call(N1, emqx_mgmt_api_plugins, list_plugins, [get, Params]) + ), + ?assertMatch( + {200, [#{running_status := [#{node := N2, status := running}]}]}, + erpc:call(N2, emqx_mgmt_api_plugins, list_plugins, [get, Params]) + ), + ok. + make_tar(Cwd, NameWithVsn) -> make_tar(Cwd, NameWithVsn, NameWithVsn). diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 51ad6ab85..e3fef7e62 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -121,3 +121,6 @@ false end) ). + +-define(KEY_PATH, [rule_engine, rules]). +-define(RULE_PATH(RULE), [rule_engine, rules, RULE]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index d8a798eb8..24ad2c5f0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -26,8 +26,7 @@ -export([start_link/0]). -export([ - post_config_update/5, - config_key_path/0 + post_config_update/5 ]). %% Rule Management @@ -102,9 +101,6 @@ -type action_name() :: binary() | #{function := binary()}. -config_key_path() -> - [rule_engine, rules]. - -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}. start_link() -> gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []). @@ -112,7 +108,13 @@ start_link() -> %%------------------------------------------------------------------------------ %% The config handler for emqx_rule_engine %%------------------------------------------------------------------------------ -post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) -> +post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) -> + create_rule(NewRule#{id => bin(RuleId)}); +post_config_update(?RULE_PATH(RuleId), '$remove', undefined, _OldRule, _AppEnvs) -> + delete_rule(bin(RuleId)); +post_config_update(?RULE_PATH(RuleId), _Req, NewRule, _OldRule, _AppEnvs) -> + update_rule(NewRule#{id => bin(RuleId)}); +post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRules}, _AppEnvs) -> #{added := Added, removed := Removed, changed := Updated} = emqx_utils_maps:diff_maps(NewRules, OldRules), try @@ -134,7 +136,7 @@ post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) -> end, Added ), - {ok, get_rules()} + ok catch throw:#{kind := _} = Error -> {error, Error} @@ -247,11 +249,11 @@ ensure_action_removed(RuleId, ActionName) -> case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of not_found -> ok; - #{<<"actions">> := Acts} -> + #{<<"actions">> := Acts} = Conf -> NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)], {ok, _} = emqx_conf:update( - emqx_rule_engine:config_key_path() ++ [RuleId, actions], - NewActs, + ?RULE_PATH(RuleId), + Conf#{<<"actions">> => NewActs}, #{override_to => cluster} ), ok @@ -372,7 +374,7 @@ init([]) -> {write_concurrency, true}, {read_concurrency, true} ]), - ok = emqx_config_handler:add_handler( + ok = emqx_conf:add_handler( [rule_engine, jq_implementation_module], emqx_rule_engine_schema ), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index b53763d47..54a739b6d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -351,14 +351,13 @@ param_path_id() -> {400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}}; Id -> Params = filter_out_request_body(add_metadata(Params0)), - ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx_rule_engine:get_rule(Id) of {ok, _Rule} -> {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}}; not_found -> + ConfPath = ?RULE_PATH(Id), case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of - {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> - [Rule] = get_one_rule(AllRules, Id), + {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} -> {201, format_rule_info_resp(Rule)}; {error, Reason} -> ?SLOG(error, #{ @@ -396,10 +395,9 @@ param_path_id() -> end; '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) -> Params = filter_out_request_body(Params0), - ConfPath = emqx_rule_engine:config_key_path() ++ [Id], + ConfPath = ?RULE_PATH(Id), case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of - {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> - [Rule] = get_one_rule(AllRules, Id), + {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} -> {200, format_rule_info_resp(Rule)}; {error, Reason} -> ?SLOG(error, #{ @@ -412,7 +410,7 @@ param_path_id() -> '/rules/:id'(delete, #{bindings := #{id := Id}}) -> case emqx_rule_engine:get_rule(Id) of {ok, _Rule} -> - ConfPath = emqx_rule_engine:config_key_path() ++ [Id], + ConfPath = ?RULE_PATH(Id), case emqx_conf:remove(ConfPath, #{override_to => cluster}) of {ok, _} -> {204}; @@ -655,9 +653,6 @@ aggregate_metrics(AllMetrics) -> AllMetrics ). -get_one_rule(AllRules, Id) -> - [R || R = #{id := Id0} <- AllRules, Id0 == Id]. - add_metadata(Params) -> Params#{ <<"metadata">> => #{ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 14d2b1f95..d8b031bdd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -29,11 +29,15 @@ start(_Type, _Args) -> ok = emqx_rule_events:reload(), SupRet = emqx_rule_engine_sup:start_link(), ok = emqx_rule_engine:load_rules(), - emqx_conf:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine), + RulePath = [RuleEngine | _] = ?KEY_PATH, + emqx_conf:add_handler(RulePath ++ ['?'], emqx_rule_engine), + emqx_conf:add_handler([RuleEngine], emqx_rule_engine), emqx_rule_engine_cli:load(), SupRet. stop(_State) -> emqx_rule_engine_cli:unload(), - emqx_conf:remove_handler(emqx_rule_engine:config_key_path()), + RulePath = [RuleEngine | _] = ?KEY_PATH, + emqx_conf:remove_handler(RulePath ++ ['?']), + emqx_conf:remove_handler([RuleEngine]), ok = emqx_rule_events:unload(). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index ca7832717..2ec32173f 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -472,19 +472,17 @@ t_ensure_action_removed(_) -> Id = <<"t_ensure_action_removed">>, GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>, emqx:update_config( - [rule_engine, rules], + [rule_engine, rules, Id], #{ - Id => #{ - <<"actions">> => [ - #{<<"function">> => GetSelectedData}, - #{<<"function">> => <<"console">>}, - #{<<"function">> => <<"republish">>}, - <<"mysql:foo">>, - <<"mqtt:bar">> - ], - <<"description">> => <<"">>, - <<"sql">> => <<"SELECT * FROM \"t/#\"">> - } + <<"actions">> => [ + #{<<"function">> => GetSelectedData}, + #{<<"function">> => <<"console">>}, + #{<<"function">> => <<"republish">>}, + <<"mysql:foo">>, + <<"mqtt:bar">> + ], + <<"description">> => <<"">>, + <<"sql">> => <<"SELECT * FROM \"t/#\"">> } ), ?assertMatch( diff --git a/changes/ce/fix-10923.en.md b/changes/ce/fix-10923.en.md new file mode 100644 index 000000000..accd547fc --- /dev/null +++ b/changes/ce/fix-10923.en.md @@ -0,0 +1,4 @@ +Fix a race-condition in channel info registration. + +Prior to this fix, when system is under heavy load, it might happen that a client is disconnected (or has its session expired) but still can be found in the clients page in dashboard. +One of the possible reasons is a race condition fixed in this PR: the connection is killed in the middle of channel data registration. diff --git a/changes/ee/feat-10892.en.md b/changes/ee/feat-10892.en.md new file mode 100644 index 000000000..8802f3b36 --- /dev/null +++ b/changes/ee/feat-10892.en.md @@ -0,0 +1 @@ +Require that SID or Service Name is set on Oracle Database bridge creation. diff --git a/changes/ee/fix-10913.en.md b/changes/ee/fix-10913.en.md new file mode 100644 index 000000000..210a6f645 --- /dev/null +++ b/changes/ee/fix-10913.en.md @@ -0,0 +1 @@ +Fixed an issue where a node that left the cluster would still report plugin status from other nodes. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index be42a913a..42e10179e 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -186,6 +186,7 @@ fields(bridges) -> hoconsc:map(name, ref(emqx_bridge_oracle, "config")), #{ desc => <<"Oracle Bridge Config">>, + validator => fun emqx_bridge_oracle:config_validator/1, required => false } )}, diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src index b74e9fa7d..b71ed01e5 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_schema_registry, [ {description, "EMQX Schema Registry"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, [emqx_ee_schema_registry_sup]}, {mod, {emqx_ee_schema_registry_app, []}}, {applications, [ diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl index 5ffcb2ba6..1390f9bfe 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl @@ -104,38 +104,32 @@ list_schemas() -> %%------------------------------------------------------------------------------------------------- %% `emqx_config_handler' API %%------------------------------------------------------------------------------------------------- - +%% remove post_config_update( - [?CONF_KEY_ROOT, schemas] = _Path, - _Cmd, - NewConf = #{schemas := NewSchemas}, - OldConf = #{}, + [?CONF_KEY_ROOT, schemas, Name], + '$remove', + _NewSchemas, + _OldSchemas, _AppEnvs ) -> - OldSchemas = maps:get(schemas, OldConf, #{}), - #{ - added := Added, - changed := Changed0, - removed := Removed - } = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas), - Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0), - RemovedNames = maps:keys(Removed), - case RemovedNames of - [] -> - ok; - _ -> - async_delete_serdes(RemovedNames) - end, - SchemasToBuild = maps:to_list(maps:merge(Changed, Added)), - case build_serdes(SchemasToBuild) of + async_delete_serdes([Name]), + ok; +%% add or update +post_config_update( + [?CONF_KEY_ROOT, schemas, NewName], + _Cmd, + NewSchemas, + %% undefined or OldSchemas + _, + _AppEnvs +) -> + case build_serdes([{NewName, NewSchemas}]) of ok -> - {ok, NewConf}; + {ok, #{NewName => NewSchemas}}; {error, Reason, SerdesToRollback} -> lists:foreach(fun ensure_serde_absent/1, SerdesToRollback), {error, Reason} - end; -post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) -> - {ok, NewConf}. + end. %%------------------------------------------------------------------------------------------------- %% `gen_server' API diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl index e82ed95bd..195a54c15 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl @@ -11,9 +11,9 @@ start(_StartType, _StartArgs) -> ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity), - emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry), + emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry), emqx_ee_schema_registry_sup:start_link(). stop(_State) -> - emqx_conf:remove_handler(?CONF_KEY_PATH), + emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']), ok. diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 99c4fa155..71f7c7d8b 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -607,21 +607,25 @@ t_fail_rollback(Config) -> SerdeType = ?config(serde_type, Config), OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)), BrokenSchema = OkSchema#{<<"source">> := <<"{}">>}, - %% hopefully, for this small map, the key order is used. - Serdes = #{ - <<"a">> => OkSchema, - <<"z">> => BrokenSchema - }, + ?assertMatch( - {error, _}, + {ok, _}, emqx_conf:update( - [?CONF_KEY_ROOT, schemas], - Serdes, + [?CONF_KEY_ROOT, schemas, <<"a">>], + OkSchema, #{} ) ), - %% no serdes should be in the table - ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"a">>)), + ?assertMatch( + {error, _}, + emqx_conf:update( + [?CONF_KEY_ROOT, schemas, <<"z">>], + BrokenSchema, + #{} + ) + ), + ?assertMatch({ok, #{name := <<"a">>}}, emqx_ee_schema_registry:get_serde(<<"a">>)), + %% no z serdes should be in the table ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)), ok. diff --git a/scripts/spellcheck/spellcheck.sh b/scripts/spellcheck/spellcheck.sh index 57ea21c55..ac32359d8 100755 --- a/scripts/spellcheck/spellcheck.sh +++ b/scripts/spellcheck/spellcheck.sh @@ -12,6 +12,11 @@ else SCHEMA="$(realpath "$1")" fi +if ! [ -f "$SCHEMA" ]; then + echo "Schema file $SCHEMA does not exist; did you forget to run 'make emqx{,-enterprise}' ?" + exit 1 +fi + set +e docker run --rm -i --name spellcheck \ -v "${PROJ_ROOT}"/scripts/spellcheck/dicts:/dicts \ diff --git a/scripts/ui-tests/conftest.py b/scripts/ui-tests/conftest.py new file mode 100644 index 000000000..d7b52eaef --- /dev/null +++ b/scripts/ui-tests/conftest.py @@ -0,0 +1,15 @@ +import pytest +from selenium import webdriver + +def pytest_addoption(parser): + parser.addoption("--dashboard-host", action="store", default="localhost", help="Dashboard host") + parser.addoption("--dashboard-port", action="store", default="18083", help="Dashboard port") + +@pytest.fixture +def dashboard_host(request): + return request.config.getoption("--dashboard-host") + +@pytest.fixture +def dashboard_port(request): + return request.config.getoption("--dashboard-port") + diff --git a/scripts/ui-tests/dashboard_test.py b/scripts/ui-tests/dashboard_test.py new file mode 100644 index 000000000..4b93262b1 --- /dev/null +++ b/scripts/ui-tests/dashboard_test.py @@ -0,0 +1,73 @@ +import time +import unittest +import pytest +from urllib.parse import urljoin +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.support.wait import WebDriverWait +from selenium.webdriver.common import utils + +@pytest.fixture +def driver(): + options = Options() + options.add_argument("--headless") + options.add_argument("--no-sandbox") + _driver = webdriver.Chrome(options=options) + yield _driver + _driver.quit() + +@pytest.fixture(autouse=True) +def dashboard_url(dashboard_host, dashboard_port): + count = 0 + while utils.is_connectable(port=dashboard_port, host=dashboard_host) is False: + if count == 30: + raise Exception("Dashboard is not ready") + count += 1 + time.sleep(1) + return f"http://{dashboard_host}:{dashboard_port}" + +@pytest.fixture +def login(driver, dashboard_url): + driver.get(dashboard_url) + assert "EMQX Dashboard" == driver.title + assert f"{dashboard_url}/#/login?to=/dashboard/overview" == driver.current_url + driver.find_element(By.XPATH, "//div[@class='login']//form[1]//input[@type='text']").send_keys("admin") + driver.find_element(By.XPATH, "//div[@class='login']//form[1]//input[@type='password']").send_keys("admin") + driver.find_element(By.XPATH, "//div[@class='login']//form[1]//button[1]").click() + dest_url = urljoin(dashboard_url, "/#/dashboard/overview") + driver.get(dest_url) + ensure_current_url(driver, dest_url) + +def ensure_current_url(driver, url): + count = 0 + while url != driver.current_url: + if count == 10: + raise Exception(f"Failed to load {url}") + count += 1 + time.sleep(1) + +def wait_title(driver): + return WebDriverWait(driver, 10).until(lambda x: x.find_element("xpath", "//div[@id='app']//h1[@class='header-title']")) + +def test_basic(driver, login, dashboard_url): + driver.get(dashboard_url) + title = wait_title(driver) + assert "Cluster Overview" == title.text + +def test_log(driver, login, dashboard_url): + dest_url = urljoin(dashboard_url, "/#/log") + driver.get(dest_url) + ensure_current_url(driver, dest_url) + title = wait_title(driver) + assert "Logging" == title.text + label = driver.find_element(By.XPATH, "//div[@id='app']//form//label[./label/span[text()='Enable Log Handler']]") + assert driver.find_elements(By.ID, label.get_attribute("for")) + label = driver.find_element(By.XPATH, "//div[@id='app']//form//label[./label/span[text()='Log Level']]") + assert driver.find_elements(By.ID, label.get_attribute("for")) + label = driver.find_element(By.XPATH, "//div[@id='app']//form//label[./label/span[text()='Log Formatter']]") + assert driver.find_elements(By.ID, label.get_attribute("for")) + label = driver.find_element(By.XPATH, "//div[@id='app']//form//label[./label/span[text()='Time Offset']]") + assert driver.find_elements(By.ID, label.get_attribute("for")) + diff --git a/scripts/ui-tests/docker-compose.yaml b/scripts/ui-tests/docker-compose.yaml new file mode 100644 index 000000000..538db5ca8 --- /dev/null +++ b/scripts/ui-tests/docker-compose.yaml @@ -0,0 +1,16 @@ +version: '3.9' + +services: + emqx: + image: ${EMQX_IMAGE_TAG:-emqx/emqx:latest} + environment: + EMQX_DASHBOARD__DEFAULT_PASSWORD: admin + + selenium: + shm_size: '2gb' + image: ghcr.io/emqx/selenium-chrome:latest + volumes: + - ./:/app + depends_on: + - emqx + command: python3 -m pytest --dashboard-host emqx --dashboard-port 18083