其他分享
首页 > 其他分享> > Pipeline 的实现与arraya_reduce的妙用

Pipeline 的实现与arraya_reduce的妙用

作者:互联网

管道的一般逻辑

简单实现

function pipeA($in) {
  if ($condition) {
    return 'break';
  }
  return do_something($in);
}
// function pipeB
// function pipeC
$pipes = ['pipeA', 'pipeB', 'pipeC'];
$data = 'before pipe';
foreach ($pipes as $pipe) {
  $data = $pipe($data);
  if ($condition_break) {
    break;
  }
}
printf("after pipe: %s", $data);

代码看起来没啥问题,除了一点小瑕疵。我们注意到pipeA函数里始终需要有一个返回标识来告诉Pipe什么时候需要进入下一个管道命令,什么时候需要跳出。我们可以改进一下:

function pipeA($in) {
  if ($condition) {
    return ['break', 'return value'];
  }
  return ['continue', 'prepare to next'];
}
// function pipeB
// function pipeC
$pipes = ['pipeA', 'pipeB', 'pipeC'];
$data = 'before pipe';
foreach ($pipes as $pipe) {
  list($flag, $data) = $pipe($data);
  if ($flag == 'break') {
    break;
  }
}
printf("after pipe: %s", $data);

这次看起来问题得到了解决,然而在工程上很容易产生问题:管道函数会由不同的开发人员定义,他们可能不能严格遵循上面这种苛刻的返回结果。我们希望有一种更简单明确的管道写法,满足条件则返回,否则进入下一个管道。例如传入闭包就很不错:

function pipe($data, $next) {
  if ($condition) {
    return $result;
  } else {
    // process by next pipe
    return $next($data);
  }
}

可是这样一来,我们的管道处理过程应该怎么写呢?我们发现,这个$next闭包,实际上嵌套包含了后面所有的管道处理函数:

pipeA()
else $nextA()

$nextA = function(){
  pipeB()
  else $nextB()
};
$nextB = function() {
  pipeC()
  else $nextC()
}
...

也就是说,我们需要从最后一个管道命令开始嵌套定义这个闭包,然后再执行闭包函数:


function pipeA($in, $next) {
  if (!random_int(0, 3)) {
    return $in . ' break in pipe A';
  }
  return $next($in.' through A');
}
function pipeB($in, $next) {
  if (!random_int(0, 2)) {
    return $in . ' break in pipe B';
  }
  return $next($in.' through B');
}
function pipeC($in, $next) {
  if (!random_int(0, 1)) {
    return $in . ' break in pipe C';
  }
  return $next($in.' through C');
}

$pipes = ['pipeA', 'pipeB', 'pipeC'];
$data = 'Dapianzi';

// 创建闭包
function create_next($next, $curr) {
  // 返回一个闭包,接收待处理的管道数据
  return function($data) use ($next, $curr) {
    // 将待处理的数据和下一个管道命令传入当前管道命令
    return $curr($data, $next);
  };
}
// 初始闭包 $next
// 实际上最后执行
$next = function($data) {
  return $data . ' pipe done';
};

// 从尾巴开始定义闭包$next
$pipes = array_reverse($pipes);
foreach ($pipes as $pipe) {
  $next = create_next($next, $pipe);
}

$data = $next($data);

printf("after pipe: %s", $data);

我们有一个初始化的$next, 依次访问$pipes数组的每个元素,最后返回一个$next。如果你有印象的话,马上就会想到这不就是array_reduce吗?

// function pipeA, pipeB, pipeC

$pipes = ['pipeA', 'pipeB', 'pipeC'];
$data = 'Dapianzi';

$pipeline = array_reduce(array_reverse($pipes), function($next, $curr) {
    return function($data) use ($next, $curr) {
        return $curr($data, $next);
    };
}, function($data) {
    return $data . ' pipe done';
});

$data = $pipeline($data);

printf("after pipe: %s", $data);

至此,一个简单的Pipeline就完成了。

比较Pipeline源码

最后贴一个Laravel中对Pipeline实现的源码,跟上面不能说大同小异吧,简直就是一摸一样(确信)。

<?php

namespace Illuminate\Pipeline;

use Closure;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Pipeline\Pipeline as PipelineContract;
use RuntimeException;
use Throwable;

class Pipeline implements PipelineContract
{
    /**
     * The container implementation.
     *
     * @var \Illuminate\Contracts\Container\Container
     */
    protected $container;

    /**
     * The object being passed through the pipeline.
     *
     * @var mixed
     */
    protected $passable;

    /**
     * The array of class pipes.
     *
     * @var array
     */
    protected $pipes = [];

    /**
     * The method to call on each pipe.
     *
     * @var string
     */
    protected $method = 'handle';

    /**
     * Create a new class instance.
     *
     * @param  \Illuminate\Contracts\Container\Container|null  $container
     * @return void
     */
    public function __construct(Container $container = null)
    {
        $this->container = $container;
    }

    /**
     * Set the object being sent through the pipeline.
     *
     * @param  mixed  $passable
     * @return $this
     */
    public function send($passable)
    {
        $this->passable = $passable;

        return $this;
    }

    /**
     * Set the array of pipes.
     *
     * @param  array|mixed  $pipes
     * @return $this
     */
    public function through($pipes)
    {
        $this->pipes = is_array($pipes) ? $pipes : func_get_args();

        return $this;
    }

    /**
     * Set the method to call on the pipes.
     *
     * @param  string  $method
     * @return $this
     */
    public function via($method)
    {
        $this->method = $method;

        return $this;
    }

    /**
     * Run the pipeline with a final destination callback.
     *
     * @param  \Closure  $destination
     * @return mixed
     */
    public function then(Closure $destination)
    {
        $pipeline = array_reduce(
            array_reverse($this->pipes()), $this->carry(), $this->prepareDestination($destination)
        );

        return $pipeline($this->passable);
    }

    /**
     * Run the pipeline and return the result.
     *
     * @return mixed
     */
    public function thenReturn()
    {
        return $this->then(function ($passable) {
            return $passable;
        });
    }

    /**
     * Get the final piece of the Closure onion.
     *
     * @param  \Closure  $destination
     * @return \Closure
     */
    protected function prepareDestination(Closure $destination)
    {
        return function ($passable) use ($destination) {
            try {
                return $destination($passable);
            } catch (Throwable $e) {
                return $this->handleException($passable, $e);
            }
        };
    }

    /**
     * Get a Closure that represents a slice of the application onion.
     *
     * @return \Closure
     */
    protected function carry()
    {
        return function ($stack, $pipe) {
            return function ($passable) use ($stack, $pipe) {
                try {
                    if (is_callable($pipe)) {
                        // If the pipe is a callable, then we will call it directly, but otherwise we
                        // will resolve the pipes out of the dependency container and call it with
                        // the appropriate method and arguments, returning the results back out.
                        return $pipe($passable, $stack);
                    } elseif (! is_object($pipe)) {
                        [$name, $parameters] = $this->parsePipeString($pipe);

                        // If the pipe is a string we will parse the string and resolve the class out
                        // of the dependency injection container. We can then build a callable and
                        // execute the pipe function giving in the parameters that are required.
                        $pipe = $this->getContainer()->make($name);

                        $parameters = array_merge([$passable, $stack], $parameters);
                    } else {
                        // If the pipe is already an object we'll just make a callable and pass it to
                        // the pipe as-is. There is no need to do any extra parsing and formatting
                        // since the object we're given was already a fully instantiated object.
                        $parameters = [$passable, $stack];
                    }

                    $carry = method_exists($pipe, $this->method)
                                    ? $pipe->{$this->method}(...$parameters)
                                    : $pipe(...$parameters);

                    return $this->handleCarry($carry);
                } catch (Throwable $e) {
                    return $this->handleException($passable, $e);
                }
            };
        };
    }

    /**
     * Parse full pipe string to get name and parameters.
     *
     * @param  string  $pipe
     * @return array
     */
    protected function parsePipeString($pipe)
    {
        [$name, $parameters] = array_pad(explode(':', $pipe, 2), 2, []);

        if (is_string($parameters)) {
            $parameters = explode(',', $parameters);
        }

        return [$name, $parameters];
    }

    /**
     * Get the array of configured pipes.
     *
     * @return array
     */
    protected function pipes()
    {
        return $this->pipes;
    }

    /**
     * Get the container instance.
     *
     * @return \Illuminate\Contracts\Container\Container
     *
     * @throws \RuntimeException
     */
    protected function getContainer()
    {
        if (! $this->container) {
            throw new RuntimeException('A container instance has not been passed to the Pipeline.');
        }

        return $this->container;
    }

    /**
     * Set the container instance.
     *
     * @param  \Illuminate\Contracts\Container\Container  $container
     * @return $this
     */
    public function setContainer(Container $container)
    {
        $this->container = $container;

        return $this;
    }

    /**
     * Handle the value returned from each pipe before passing it to the next.
     *
     * @param  mixed  $carry
     * @return mixed
     */
    protected function handleCarry($carry)
    {
        return $carry;
    }

    /**
     * Handle the given exception.
     *
     * @param  mixed  $passable
     * @param  \Throwable  $e
     * @return mixed
     *
     * @throws \Throwable
     */
    protected function handleException($passable, Throwable $e)
    {
        throw $e;
    }
}

标签:function,Pipeline,return,pipes,reduce,next,pipe,arraya,data
来源: https://www.cnblogs.com/dapianzi/p/16399006.html