Merge pull request #7853 from HJianBo/fix-gw-bad-ssl
Fix gateway listener API return 500
This commit is contained in:
commit
39a26f8225
|
@ -489,7 +489,7 @@ zone(Opts) ->
|
||||||
maps:get(zone, Opts, undefined).
|
maps:get(zone, Opts, undefined).
|
||||||
|
|
||||||
limiter(Opts) ->
|
limiter(Opts) ->
|
||||||
maps:get(limiter, Opts).
|
maps:get(limiter, Opts, #{}).
|
||||||
|
|
||||||
ssl_opts(Opts) ->
|
ssl_opts(Opts) ->
|
||||||
maps:to_list(
|
maps:to_list(
|
||||||
|
|
|
@ -368,6 +368,7 @@ do_ensure_ssl_file(Dir, Key, SSL, MaybePem, DryRun) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
is_valid_string(Empty) when Empty == <<>>; Empty == "" -> false;
|
||||||
is_valid_string(String) when is_list(String) ->
|
is_valid_string(String) when is_list(String) ->
|
||||||
io_lib:printable_unicode_list(String);
|
io_lib:printable_unicode_list(String);
|
||||||
is_valid_string(Binary) when is_binary(Binary) ->
|
is_valid_string(Binary) when is_binary(Binary) ->
|
||||||
|
|
|
@ -104,6 +104,17 @@ ssl_files_failure_test_() ->
|
||||||
)
|
)
|
||||||
end},
|
end},
|
||||||
{"bad_pem_string", fun() ->
|
{"bad_pem_string", fun() ->
|
||||||
|
%% empty string
|
||||||
|
?assertMatch(
|
||||||
|
{error, #{
|
||||||
|
reason := invalid_file_path_or_pem_string, which_options := [<<"keyfile">>]
|
||||||
|
}},
|
||||||
|
emqx_tls_lib:ensure_ssl_files("/tmp", #{
|
||||||
|
<<"keyfile">> => <<>>,
|
||||||
|
<<"certfile">> => bin(test_key()),
|
||||||
|
<<"cacertfile">> => bin(test_key())
|
||||||
|
})
|
||||||
|
),
|
||||||
%% not valid unicode
|
%% not valid unicode
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{error, #{
|
{error, #{
|
||||||
|
|
|
@ -245,6 +245,13 @@ emqx_gateway_api_clients {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mountpoint {
|
||||||
|
desc {
|
||||||
|
en: """Topic mountpoint"""
|
||||||
|
zh: """主题固定前缀"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
proto_name {
|
proto_name {
|
||||||
desc {
|
desc {
|
||||||
en: """Client protocol name"""
|
en: """Client protocol name"""
|
||||||
|
|
|
@ -579,7 +579,7 @@ ensure_connected(
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
|
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
|
||||||
_ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]),
|
_ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, #{}]),
|
||||||
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
|
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
|
||||||
Channel#channel{conninfo = NConnInfo, conn_state = connected}.
|
Channel#channel{conninfo = NConnInfo, conn_state = connected}.
|
||||||
|
|
||||||
|
|
|
@ -174,9 +174,7 @@ clients_insta(delete, #{
|
||||||
{204}
|
{204}
|
||||||
end).
|
end).
|
||||||
|
|
||||||
%% FIXME:
|
%% List the established subscriptions with mountpoint
|
||||||
%% List the subscription without mountpoint, but has SubOpts,
|
|
||||||
%% for example, share group ...
|
|
||||||
subscriptions(get, #{
|
subscriptions(get, #{
|
||||||
bindings := #{
|
bindings := #{
|
||||||
name := Name0,
|
name := Name0,
|
||||||
|
@ -189,7 +187,7 @@ subscriptions(get, #{
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
return_http_error(404, "client process not found");
|
return_http_error(404, "client process not found");
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
return_http_error(500, Reason);
|
return_http_error(400, Reason);
|
||||||
{ok, Subs} ->
|
{ok, Subs} ->
|
||||||
{200, Subs}
|
{200, Subs}
|
||||||
end
|
end
|
||||||
|
@ -216,7 +214,7 @@ subscriptions(post, #{
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
return_http_error(404, "client process not found");
|
return_http_error(404, "client process not found");
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
return_http_error(500, Reason);
|
return_http_error(400, Reason);
|
||||||
{ok, {NTopic, NSubOpts}} ->
|
{ok, {NTopic, NSubOpts}} ->
|
||||||
{201, maps:merge(NSubOpts, #{topic => NTopic})}
|
{201, maps:merge(NSubOpts, #{topic => NTopic})}
|
||||||
end
|
end
|
||||||
|
@ -368,6 +366,7 @@ format_channel_info({_, Infos, Stats} = R) ->
|
||||||
{node, ClientInfo, Node},
|
{node, ClientInfo, Node},
|
||||||
{clientid, ClientInfo},
|
{clientid, ClientInfo},
|
||||||
{username, ClientInfo},
|
{username, ClientInfo},
|
||||||
|
{mountpoint, ClientInfo},
|
||||||
{proto_name, ConnInfo},
|
{proto_name, ConnInfo},
|
||||||
{proto_ver, ConnInfo},
|
{proto_ver, ConnInfo},
|
||||||
{ip_address, {peername, ConnInfo, fun peer_to_binary_addr/1}},
|
{ip_address, {peername, ConnInfo, fun peer_to_binary_addr/1}},
|
||||||
|
@ -813,6 +812,11 @@ common_client_props() ->
|
||||||
binary(),
|
binary(),
|
||||||
#{desc => ?DESC(username)}
|
#{desc => ?DESC(username)}
|
||||||
)},
|
)},
|
||||||
|
{mountpoint,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{desc => ?DESC(mountpoint)}
|
||||||
|
)},
|
||||||
{proto_name,
|
{proto_name,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
binary(),
|
||||||
|
|
|
@ -514,8 +514,9 @@ with_gateway(GwName0, Fun) ->
|
||||||
),
|
),
|
||||||
return_http_error(404, "Resource not found. path: " ++ Path);
|
return_http_error(404, "Resource not found. path: " ++ Path);
|
||||||
%% Exceptions from emqx_gateway_conf:convert_certs/2,3
|
%% Exceptions from emqx_gateway_conf:convert_certs/2,3
|
||||||
error:{bad_ssl_config, #{which_option := Option}} ->
|
error:{bad_ssl_config, Reason0} ->
|
||||||
return_http_error(400, ["Bad SSL config, option: ", Option]);
|
Reason = emqx_gateway_utils:stringfy(Reason0),
|
||||||
|
return_http_error(400, ["Bad SSL config, reason: ", Reason]);
|
||||||
Class:Reason:Stk ->
|
Class:Reason:Stk ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "uncatched_error",
|
msg => "uncatched_error",
|
||||||
|
|
|
@ -1782,25 +1782,11 @@ message_to_packet(
|
||||||
| {shutdown, Reason :: term(), Reply :: term(), channel()}
|
| {shutdown, Reason :: term(), Reply :: term(), channel()}
|
||||||
| {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}.
|
| {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}.
|
||||||
handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
|
handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
|
||||||
%% XXX: Only support short_topic_name
|
case do_subscribe({?SN_INVALID_TOPIC_ID, Topic, SubOpts}, Channel) of
|
||||||
SubProps = maps:get(sub_props, SubOpts, #{}),
|
{ok, {_, NTopicName, NSubOpts}, NChannel} ->
|
||||||
case maps:get(subtype, SubProps, short_topic_name) of
|
reply({ok, {NTopicName, NSubOpts}}, NChannel);
|
||||||
short_topic_name ->
|
{error, ?SN_RC2_EXCEED_LIMITATION} ->
|
||||||
case byte_size(Topic) of
|
reply({error, exceed_limitation}, Channel)
|
||||||
2 ->
|
|
||||||
case do_subscribe({?SN_INVALID_TOPIC_ID, Topic, SubOpts}, Channel) of
|
|
||||||
{ok, {_, NTopicName, NSubOpts}, NChannel} ->
|
|
||||||
reply({ok, {NTopicName, NSubOpts}}, NChannel);
|
|
||||||
{error, ?SN_RC2_EXCEED_LIMITATION} ->
|
|
||||||
reply({error, exceed_limitation}, Channel)
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
reply({error, bad_topic_name}, Channel)
|
|
||||||
end;
|
|
||||||
predefined_topic_id ->
|
|
||||||
reply({error, only_support_short_name_topic}, Channel);
|
|
||||||
_ ->
|
|
||||||
reply({error, only_support_short_name_topic}, Channel)
|
|
||||||
end;
|
end;
|
||||||
handle_call({unsubscribe, Topic}, _From, Channel) ->
|
handle_call({unsubscribe, Topic}, _From, Channel) ->
|
||||||
TopicFilters = [emqx_topic:parse(Topic)],
|
TopicFilters = [emqx_topic:parse(Topic)],
|
||||||
|
|
|
@ -417,10 +417,10 @@ handle_in(Packet = ?PACKET(?CMD_CONNECT), Channel) ->
|
||||||
emqx_misc:pipeline(
|
emqx_misc:pipeline(
|
||||||
[
|
[
|
||||||
fun enrich_conninfo/2,
|
fun enrich_conninfo/2,
|
||||||
fun run_conn_hooks/2,
|
|
||||||
fun negotiate_version/2,
|
fun negotiate_version/2,
|
||||||
fun enrich_clientinfo/2,
|
fun enrich_clientinfo/2,
|
||||||
fun assign_clientid_to_conninfo/2,
|
fun assign_clientid_to_conninfo/2,
|
||||||
|
fun run_conn_hooks/2,
|
||||||
fun set_log_meta/2,
|
fun set_log_meta/2,
|
||||||
%% TODO: How to implement the banned in the gateway instance?
|
%% TODO: How to implement the banned in the gateway instance?
|
||||||
%, fun check_banned/2
|
%, fun check_banned/2
|
||||||
|
@ -755,7 +755,7 @@ handle_out(
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
%% XXX: connection_accepted is not defined by stomp protocol
|
%% XXX: connection_accepted is not defined by stomp protocol
|
||||||
_ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]),
|
_ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]),
|
||||||
Replies = [
|
Replies = [
|
||||||
{outgoing, connected_frame(Headers)},
|
{outgoing, connected_frame(Headers)},
|
||||||
{event, connected}
|
{event, connected}
|
||||||
|
|
|
@ -492,6 +492,7 @@ fields(client) ->
|
||||||
" Maximum number of subscriptions allowed by this client">>
|
" Maximum number of subscriptions allowed by this client">>
|
||||||
})},
|
})},
|
||||||
{username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})},
|
{username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})},
|
||||||
|
{mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})},
|
||||||
{will_msg, hoconsc:mk(binary(), #{desc => <<"Client will message">>})},
|
{will_msg, hoconsc:mk(binary(), #{desc => <<"Client will message">>})},
|
||||||
{zone,
|
{zone,
|
||||||
hoconsc:mk(binary(), #{
|
hoconsc:mk(binary(), #{
|
||||||
|
@ -849,7 +850,6 @@ format_channel_info({_, ClientInfo0, ClientStats}) ->
|
||||||
is_superuser,
|
is_superuser,
|
||||||
sockport,
|
sockport,
|
||||||
anonymous,
|
anonymous,
|
||||||
mountpoint,
|
|
||||||
socktype,
|
socktype,
|
||||||
active_n,
|
active_n,
|
||||||
await_rel_timeout,
|
await_rel_timeout,
|
||||||
|
|
Loading…
Reference in New Issue