diff --git a/.github/workflows/build_and_push_docker_images.yaml b/.github/workflows/build_and_push_docker_images.yaml index c54ae7bd9..ae2cf4fa9 100644 --- a/.github/workflows/build_and_push_docker_images.yaml +++ b/.github/workflows/build_and_push_docker_images.yaml @@ -183,9 +183,19 @@ jobs: img_suffix="elixir-${{ matrix.arch }}" img_labels="org.opencontainers.image.elixir.version=${{ matrix.elixir }}\n${img_labels}" fi + + if [ ${{ matrix.profile }} = "emqx" ]; then + img_labels="org.opencontainers.image.edition=Opensource\n${img_labels}" + fi + + if [ ${{ matrix.profile }} = "emqx-enterprise" ]; then + img_labels="org.opencontainers.image.edition=Enterprise\n${img_labels}" + fi + if [[ ${{ matrix.os[0] }} =~ "alpine" ]]; then img_suffix="${img_suffix}-alpine" fi + echo "::set-output name=emqx_name::${emqx_name}" echo "::set-output name=img_suffix::${img_suffix}" echo "::set-output name=img_labels::${img_labels}" @@ -299,10 +309,19 @@ jobs: img_suffix="elixir-${{ matrix.arch }}" img_labels="org.opencontainers.image.elixir.version=${{ matrix.elixir }}\n$img_labels" fi + + if [ ${{ matrix.profile }} = "emqx" ]; then + img_labels="org.opencontainers.image.edition=Opensource\n${img_labels}" + fi + + if [ ${{ matrix.profile }} = "emqx-enterprise" ]; then + img_labels="org.opencontainers.image.edition=Enterprise\n${img_labels}" + fi + if [[ ${{ matrix.os[0] }} =~ "alpine" ]]; then img_suffix="${img_suffix}-alpine" fi - echo "::set-output name=img::${img}" + echo "::set-output name=emqx_name::${emqx_name}" echo "::set-output name=img_suffix::${img_suffix}" echo "::set-output name=img_labels::${img_labels}" diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index d636cd44b..13a2fbb30 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -9,6 +9,7 @@ * Fix the extra / prefix when CoAP gateway parsing client topics. [#8658](https://github.com/emqx/emqx/pull/8658) * Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857) * Fix delayed publish inaccurate caused by os time change. [#8926](https://github.com/emqx/emqx/pull/8926) +* Fix that EMQX can't start when the retainer is disabled [#8911](https://github.com/emqx/emqx/pull/8911) ## Enhancements @@ -16,6 +17,7 @@ * Change the `/gateway` API path to plural form. [#8823](https://github.com/emqx/emqx/pull/8823) * Remove `node.etc_dir` from emqx.conf, because it is never used. Also allow user to customize the logging directory [#8892](https://github.com/emqx/emqx/pull/8892) +* Added a new API `POST /listeners` for creating listener. [#8876](https://github.com/emqx/emqx/pull/8876) # 5.0.7 diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl index 684d60e49..4cc00322f 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl @@ -149,8 +149,8 @@ authenticate( of ok -> {ok, emqx_authn_utils:is_superuser(Selected)}; - {error, Reason} -> - {error, Reason} + {error, _Reason} -> + ignore end; {error, Reason} -> ?TRACE_AUTHN_PROVIDER(error, "redis_query_failed", #{ diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl index 889404c5e..f9ed8bcb1 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl @@ -173,6 +173,9 @@ test_user_auth(#{ {create_authenticator, ?GLOBAL, AuthConfig} ), + {ok, [#{provider := emqx_authn_redis, state := State}]} = + emqx_authentication:list_authenticators(?GLOBAL), + Credentials = Credentials0#{ listener => 'tcp:default', protocol => mqtt @@ -180,6 +183,15 @@ test_user_auth(#{ ?assertEqual(Result, emqx_access_control:authenticate(Credentials)), + AuthnResult = + case Result of + {error, _} -> + ignore; + Any -> + Any + end, + ?assertEqual(AuthnResult, emqx_authn_redis:authenticate(Credentials, State)), + emqx_authn_test_lib:delete_authenticators( [authentication], ?GLOBAL @@ -466,7 +478,7 @@ user_seeds() -> <<"cmd">> => <<"HMGET mqtt_user:${username} password_hash salt is_superuser">>, <<"password_hash_algorithm">> => #{<<"name">> => <<"bcrypt">>} }, - result => {error, bad_username_or_password} + result => {error, not_authorized} }, #{ diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 34f32d8be..52cbc4775 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -778,7 +778,7 @@ to_bin(List) when is_list(List) -> to_bin(Boolean) when is_boolean(Boolean) -> Boolean; to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); to_bin({Type, Args}) -> - unicode:characters_to_binary(io_lib:format("~p(~p)", [Type, Args])); + unicode:characters_to_binary(io_lib:format("~ts(~p)", [Type, Args])); to_bin(X) -> X. diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 31678e0f6..925c20ff1 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -96,6 +96,16 @@ schema("/listeners") -> listener_id_status_example() ) } + }, + post => #{ + tags => [<<"listeners">>], + desc => <<"Create the specified listener on all nodes.">>, + parameters => [], + 'requestBody' => create_listener_schema(#{bind => true}), + responses => #{ + 200 => listener_schema(#{bind => true}), + 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST']) + } } }; schema("/listeners/:id") -> @@ -129,7 +139,8 @@ schema("/listeners/:id") -> responses => #{ 200 => listener_schema(#{bind => true}), 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST']) - } + }, + deprecated => true }, delete => #{ tags => [<<"listeners">>], @@ -251,10 +262,10 @@ fields(node_status) -> })}, {status, ?HOCON(?R_REF(status))} ]; +fields({Type, with_name}) -> + listener_struct_with_name(Type); fields(Type) -> - Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}), - [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type], - Schema. + listener_struct(Type). listener_schema(Opts) -> emqx_dashboard_swagger:schema_with_example( @@ -262,6 +273,17 @@ listener_schema(Opts) -> tcp_schema_example() ). +create_listener_schema(Opts) -> + Schemas = [ + ?R_REF(Mod, {Type, with_name}) + || #{ref := ?R_REF(Mod, Type)} <- listeners_info(Opts) + ], + Example = maps:remove(id, tcp_schema_example()), + emqx_dashboard_swagger:schema_with_example( + ?UNION(Schemas), + Example#{name => <<"demo">>} + ). + listeners_type() -> lists:map( fun({Type, _}) -> list_to_existing_atom(Type) end, @@ -339,7 +361,9 @@ list_listeners(get, #{query_string := Query}) -> {ok, Type} -> listener_type_filter(atom_to_binary(Type), Listeners); error -> Listeners end, - {200, listener_status_by_id(NodeL)}. + {200, listener_status_by_id(NodeL)}; +list_listeners(post, #{body := Body}) -> + create_listener(Body). crud_listeners_by_id(get, #{bindings := #{id := Id0}}) -> Listeners = @@ -382,23 +406,8 @@ crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> _ -> {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}} end; -crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) -> - case parse_listener_conf(Body0) of - {Id, Type, Name, Conf} -> - Path = [listeners, Type, Name], - case create(Path, Conf) of - {ok, #{raw_config := _RawConf}} -> - crud_listeners_by_id(get, #{bindings => #{id => Id}}); - {error, already_exist} -> - {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}}; - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} - end; - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; - _ -> - {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}} - end; +crud_listeners_by_id(post, #{body := Body}) -> + create_listener(Body); crud_listeners_by_id(delete, #{bindings := #{id := Id}}) -> {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), case ensure_remove([listeners, Type, Name]) of @@ -408,13 +417,24 @@ crud_listeners_by_id(delete, #{bindings := #{id := Id}}) -> parse_listener_conf(Conf0) -> Conf1 = maps:without([<<"running">>, <<"current_connections">>], Conf0), - {IdBin, Conf2} = maps:take(<<"id">>, Conf1), - {TypeBin, Conf3} = maps:take(<<"type">>, Conf2), - {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin), + {TypeBin, Conf2} = maps:take(<<"type">>, Conf1), TypeAtom = binary_to_existing_atom(TypeBin), - case Type =:= TypeAtom of - true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3}; - false -> {error, listener_type_inconsistent} + + case maps:take(<<"id">>, Conf2) of + {IdBin, Conf3} -> + {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin), + case Type =:= TypeAtom of + true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3}; + false -> {error, listener_type_inconsistent} + end; + _ -> + case maps:take(<<"name">>, Conf2) of + {Name, Conf3} -> + IdBin = <>, + {binary_to_atom(IdBin), TypeAtom, Name, Conf3}; + _ -> + {error, listener_config_invalid} + end end. stop_listeners_by_id(Method, Body = #{bindings := Bindings}) -> @@ -787,3 +807,37 @@ tcp_schema_example() -> type => tcp, zone => default }. + +create_listener(Body) -> + case parse_listener_conf(Body) of + {Id, Type, Name, Conf} -> + Path = [listeners, Type, Name], + case create(Path, Conf) of + {ok, #{raw_config := _RawConf}} -> + crud_listeners_by_id(get, #{bindings => #{id => Id}}); + {error, already_exist} -> + {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}}; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end. + +listener_struct(Type) -> + Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}), + [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type], + Schema. + +listener_struct_with_name(Type) -> + BaseSchema = listener_struct(Type), + lists:keyreplace( + id, + 1, + BaseSchema, + {name, + ?HOCON(binary(), #{ + desc => "Listener name", + required => true + })} + ). diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index f72f9b762..10d04db85 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -37,6 +37,35 @@ t_list_listeners(_) -> Res = request(get, Path, [], []), #{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(), ?assertEqual(length(Expect), length(Res)), + + %% POST /listeners + ListenerId = <<"tcp:default">>, + NewListenerId = <<"tcp:new">>, + + OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), + NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), + + OriginListener = request(get, OriginPath, [], []), + + %% create with full options + ?assertEqual({error, not_found}, is_running(NewListenerId)), + ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])), + + OriginListener2 = maps:remove(<<"id">>, OriginListener), + NewConf = OriginListener2#{ + <<"name">> => <<"new">>, + <<"bind">> => <<"0.0.0.0:2883">> + }, + Create = request(post, Path, [], NewConf), + ?assertEqual(lists:sort(maps:keys(OriginListener)), lists:sort(maps:keys(Create))), + Get1 = request(get, NewPath, [], []), + ?assertMatch(Create, Get1), + ?assert(is_running(NewListenerId)), + + %% delete + ?assertEqual([], delete(NewPath)), + ?assertEqual({error, not_found}, is_running(NewListenerId)), + ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])), ok. t_tcp_crud_listeners_by_id(_) -> diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index c91ba0eec..888335ab4 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.4"}, + {vsn, "5.0.5"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index f5a3ad403..5d911b5f4 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -348,16 +348,12 @@ enable_retainer( #{context_id := ContextId} = State, #{ msg_clear_interval := ClearInterval, - backend := BackendCfg, - flow_control := FlowControl + backend := BackendCfg } ) -> NewContextId = ContextId + 1, Context = create_resource(new_context(NewContextId), BackendCfg), load(Context), - emqx_limiter_server:add_bucket( - ?APP, internal, maps:get(batch_deliver_limiter, FlowControl, undefined) - ), State#{ enable := true, context_id := NewContextId, @@ -373,7 +369,6 @@ disable_retainer( } = State ) -> unload(), - emqx_limiter_server:del_bucket(?APP, internal), ok = close_resource(Context), State#{ enable := false, diff --git a/apps/emqx_retainer/src/emqx_retainer_app.erl b/apps/emqx_retainer/src/emqx_retainer_app.erl index 2285d4551..061679cf7 100644 --- a/apps/emqx_retainer/src/emqx_retainer_app.erl +++ b/apps/emqx_retainer/src/emqx_retainer_app.erl @@ -18,6 +18,8 @@ -behaviour(application). +-include("emqx_retainer.hrl"). + -export([ start/2, stop/1 @@ -25,8 +27,19 @@ start(_Type, _Args) -> ok = emqx_retainer_mnesia_cli:load(), + init_bucket(), emqx_retainer_sup:start_link(). stop(_State) -> ok = emqx_retainer_mnesia_cli:unload(), + delete_bucket(), ok. + +init_bucket() -> + #{flow_control := FlowControl} = emqx:get_config([retainer]), + emqx_limiter_server:add_bucket( + ?APP, internal, maps:get(batch_deliver_limiter, FlowControl, undefined) + ). + +delete_bucket() -> + emqx_limiter_server:del_bucket(?APP, internal). diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index d7ddc2424..09e6c4bb4 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -31,14 +31,16 @@ all() -> [ {group, mnesia_without_indices}, {group, mnesia_with_indices}, - {group, mnesia_reindex} + {group, mnesia_reindex}, + {group, test_disable_then_start} ]. groups() -> [ {mnesia_without_indices, [sequence], common_tests()}, {mnesia_with_indices, [sequence], common_tests()}, - {mnesia_reindex, [sequence], [t_reindex]} + {mnesia_reindex, [sequence], [t_reindex]}, + {test_disable_then_start, [sequence], [test_disable_then_start]} ]. common_tests() -> @@ -624,6 +626,19 @@ t_get_basic_usage_info(_Config) -> ?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()), ok. +%% test whether the app can start normally after disabling emqx_retainer +%% fix: https://github.com/emqx/emqx/pull/8911 +test_disable_then_start(_Config) -> + emqx_retainer:update_config(#{<<"enable">> => false}), + ?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)), + ok = application:stop(emqx_retainer), + timer:sleep(100), + ?assertEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)), + ok = application:ensure_started(emqx_retainer), + timer:sleep(100), + ?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)), + ok. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/rebar.config.erl b/rebar.config.erl index c6ab19818..ce1930ed6 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -482,11 +482,16 @@ emqx_etc_overlay_per_edition(ee) -> ]. get_vsn(Profile) -> - %% to make it compatible to Linux and Windows, - %% we must use bash to execute the bash file - %% because "./" will not be recognized as an internal or external command - os_cmd("pkg-vsn.sh " ++ atom_to_list(Profile)). + case os:getenv("PKG_VSN") of + false -> + os_cmd("pkg-vsn.sh " ++ atom_to_list(Profile)); + Vsn -> + Vsn + end. +%% to make it compatible to Linux and Windows, +%% we must use bash to execute the bash file +%% because "./" will not be recognized as an internal or external command os_cmd(Cmd) -> Output = os:cmd("bash " ++ Cmd), re:replace(Output, "\n", "", [{return, list}]).