Move the 'pipeline' function to 'emqx_misc' module

This commit is contained in:
Feng Lee 2019-08-27 16:47:34 +08:00
parent 0e9647b601
commit 08ab350fec
2 changed files with 31 additions and 29 deletions

View File

@ -50,9 +50,11 @@
-export([gc/3]).
-import(emqx_misc, [maybe_apply/2]).
-import(emqx_access_control, [check_acl/3]).
-import(emqx_misc,
[ run_fold/3
, pipeline/3
, maybe_apply/2
]).
-export_type([channel/0]).
@ -480,10 +482,10 @@ do_unsubscribe(TopicFilter, _SubOpts,
%%--------------------------------------------------------------------
handle_out({connack, ?RC_SUCCESS, SP}, Channel = #channel{client = Client}) ->
AckProps = emqx_misc:run_fold([fun enrich_caps/2,
fun enrich_server_keepalive/2,
fun enrich_assigned_clientid/2
], #{}, Channel),
AckProps = run_fold([fun enrich_caps/2,
fun enrich_server_keepalive/2,
fun enrich_assigned_clientid/2
], #{}, Channel),
AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
Channel1 = ensure_keepalive(AckProps, ensure_connected(Channel)),
ok = emqx_hooks:run('client.connected',
@ -1016,7 +1018,8 @@ check_publish(Packet, Channel) ->
%% Check Pub ACL
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
#channel{client = Client}) ->
case is_acl_enabled(Client) andalso check_acl(Client, publish, Topic) of
case is_acl_enabled(Client) andalso
emqx_access_control:check_acl(Client, publish, Topic) of
false -> ok;
allow -> ok;
deny -> {error, ?RC_NOT_AUTHORIZED}
@ -1057,7 +1060,7 @@ check_subscribe(TopicFilter, SubOpts, Channel) ->
%% Check Sub ACL
check_sub_acl(TopicFilter, #channel{client = Client}) ->
case is_acl_enabled(Client) andalso
check_acl(Client, subscribe, TopicFilter) of
emqx_access_control:check_acl(Client, subscribe, TopicFilter) of
false -> allow;
Result -> Result
end.
@ -1176,26 +1179,6 @@ unmount(Client = #{mountpoint := MountPoint}, TopicOrMsg) ->
emqx_mountpoint:unmount(
emqx_mountpoint:replvar(MountPoint, Client), TopicOrMsg).
%%--------------------------------------------------------------------
%% Pipeline
%%--------------------------------------------------------------------
pipeline([], Packet, Channel) ->
{ok, Packet, Channel};
pipeline([Fun|More], Packet, Channel) ->
case Fun(Packet, Channel) of
ok -> pipeline(More, Packet, Channel);
{ok, NChannel} ->
pipeline(More, Packet, NChannel);
{ok, NPacket, NChannel} ->
pipeline(More, NPacket, NChannel);
{error, ReasonCode} ->
{error, ReasonCode, Channel};
{error, ReasonCode, NChannel} ->
{error, ReasonCode, NChannel}
end.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------

View File

@ -21,6 +21,7 @@
-export([ merge_opts/2
, maybe_apply/2
, run_fold/3
, pipeline/3
, start_timer/2
, start_timer/3
, cancel_timer/1
@ -57,11 +58,29 @@ maybe_apply(_Fun, undefined) ->
maybe_apply(Fun, Arg) when is_function(Fun) ->
erlang:apply(Fun, [Arg]).
%% @doc RunFold
run_fold([], Acc, _State) ->
Acc;
run_fold([Fun|More], Acc, State) ->
run_fold(More, Fun(Acc, State), State).
%% @doc Pipeline
pipeline([], Input, State) ->
{ok, Input, State};
pipeline([Fun|More], Input, State) ->
case Fun(Input, State) of
ok -> pipeline(More, Input, State);
{ok, NState} ->
pipeline(More, Input, NState);
{ok, NInput, NState} ->
pipeline(More, NInput, NState);
{error, Reason} ->
{error, Reason, State};
{error, Reason, NState} ->
{error, Reason, NState}
end.
-spec(start_timer(integer(), term()) -> reference()).
start_timer(Interval, Msg) ->
start_timer(Interval, self(), Msg).