feat(psk): chunk size can be configured and improve code

This commit is contained in:
zhouzb 2021-09-27 10:53:25 +08:00
parent f325d7c783
commit 84bb486c62
3 changed files with 53 additions and 19 deletions

View File

@ -16,4 +16,7 @@ psk {
## Specifies the separator for PSKIdentity and SharedSecret in the init file. ## Specifies the separator for PSKIdentity and SharedSecret in the init file.
## The default is colon (:) ## The default is colon (:)
## separator = ":" ## separator = ":"
## The size of each chunk used to import to the built-in database from psk file
## chunk_size = 50
} }

View File

@ -40,7 +40,9 @@
]). ]).
-record(psk_entry, {psk_id :: binary(), -record(psk_entry, {psk_id :: binary(),
shared_secret :: binary()}). shared_secret :: binary(),
extra :: term()
}).
-export([mnesia/1]). -export([mnesia/1]).
@ -64,6 +66,7 @@
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = ekka_mnesia:create_table(?TAB, [
{rlog_shard, ?PSK_SHARD}, {rlog_shard, ?PSK_SHARD},
{type, ordered_set},
{disc_copies, [node()]}, {disc_copies, [node()]},
{record_name, psk_entry}, {record_name, psk_entry},
{attributes, record_info(fields, psk_entry)}, {attributes, record_info(fields, psk_entry)},
@ -108,7 +111,7 @@ stop() ->
init(_Opts) -> init(_Opts) ->
_ = case get_config(enable) of _ = case get_config(enable) of
true -> load(); true -> load();
false -> ok false -> ?SLOG(info, #{msg => "emqx_psk_disabled"})
end, end,
_ = case get_config(init_file) of _ = case get_config(init_file) of
undefined -> ok; undefined -> ok;
@ -120,15 +123,15 @@ handle_call({import, SrcFile}, _From, State) ->
{reply, import_psks(SrcFile), State}; {reply, import_psks(SrcFile), State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]), ?SLOG(info, #{msg => "unexpected_call_discarded", req => Req}),
{reply, ignored, State}. {reply, {error, unexecpted}, State}.
handle_cast(Req, State) -> handle_cast(Req, State) ->
?LOG(error, "Unexpected case: ~p", [Req]), ?SLOG(info, #{msg => "unexpected_cast_discarded", req => Req}),
{noreply, State}. {noreply, State}.
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]), ?SLOG(info, #{msg => "unexpected_info_discarded", info => Info}),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
@ -147,27 +150,40 @@ get_config(enable) ->
get_config(init_file) -> get_config(init_file) ->
emqx_config:get([psk, init_file], undefined); emqx_config:get([psk, init_file], undefined);
get_config(separator) -> get_config(separator) ->
emqx_config:get([psk, separator], ?DEFAULT_DELIMITER). emqx_config:get([psk, separator], ?DEFAULT_DELIMITER);
get_config(chunk_size) ->
emqx_config:get([psk, chunk_size]).
import_psks(SrcFile) -> import_psks(SrcFile) ->
case file:open(SrcFile, [read, raw, binary, read_ahead]) of case file:open(SrcFile, [read, raw, binary, read_ahead]) of
{error, Reason} -> {error, Reason} ->
{error, Reason}; {error, Reason};
{ok, Io} -> {ok, Io} ->
case Result = import_psks(Io, get_config(separator)) of try import_psks(Io, get_config(separator), get_config(chunk_size)) of
ok -> ignore; ok -> ok;
{error, Reason} -> {error, Reason} ->
?LOG(warning, "Failed to import psk from ~s due to ~p", [SrcFile, Reason]) ?SLOG(error, #{msg => "failed_to_import_psk_file",
end, file => SrcFile,
_ = file:close(Io), reason => Reason}),
Result {error, Reason}
catch
Class:Reason:Stacktrace ->
?SLOG(error, #{msg => "failed_to_import_psk_file",
file => SrcFile,
class => Class,
reason => Reason,
stacktrace => Stacktrace}),
{error, Reason}
after
_ = file:close(Io)
end
end. end.
import_psks(Io, Delimiter) -> import_psks(Io, Delimiter, ChunkSize) ->
case get_psks(Io, Delimiter, 50) of case get_psks(Io, Delimiter, ChunkSize) of
{ok, Entries} -> {ok, Entries} ->
_ = trans(fun insert_psks/1, Entries), _ = trans(fun insert_psks/1, Entries),
import_psks(Io, Delimiter); import_psks(Io, Delimiter, ChunkSize);
{eof, Entries} -> {eof, Entries} ->
_ = trans(fun insert_psks/1, Entries), _ = trans(fun insert_psks/1, Entries),
ok; ok;

View File

@ -30,16 +30,31 @@ fields("psk") ->
[ {enable, fun enable/1} [ {enable, fun enable/1}
, {init_file, fun init_file/1} , {init_file, fun init_file/1}
, {separator, fun separator/1} , {separator, fun separator/1}
, {chunk_size, fun chunk_size/1}
]. ].
enable(type) -> boolean(); enable(type) -> boolean();
enable(desc) -> <<"Whether to enable tls psk support">>;
enable(default) -> false; enable(default) -> false;
enable(_) -> undefined. enable(_) -> undefined.
init_file(type) -> binary(); init_file(type) -> binary();
init_file(desc) ->
<<"If init_file is specified, emqx will import PSKs from the file ",
"into the built-in database at startup for use by the runtime. ",
"The file has to be structured line-by-line, each line must be in ",
"the format: <PSKIdentity>:<SharedSecret>">>;
init_file(nullable) -> true; init_file(nullable) -> true;
init_file(_) -> undefined. init_file(_) -> undefined.
separator(type) -> binary(); separator(type) -> binary();
separator(desc) ->
<<"The separator between PSKIdentity and SharedSecret in the psk file">>;
separator(default) -> <<":">>; separator(default) -> <<":">>;
separator(_) -> undefined. separator(_) -> undefined.
chunk_size(type) -> integer();
chunk_size(desc) ->
<<"The size of each chunk used to import to the built-in database from psk file">>;
chunk_size(default) -> 50;
chunk_size(_) -> undefined.