Laravel中schedule调度的运行机制

2022-04-16 16:13:46
目录
⒈ runInBackground 和 withoutOverlapping⑴ runInBackground() 方法⑵ withoutOverlapping() 方法⒉ mutex 互斥锁⑴ 超时时间⑵ 回调方法

Laravel 的 console 命令行极大的方便了 php 定时任务的设置以及运行。以往通过 crontab 配置定时任务过程相对比较繁琐,并且通过 crontab 设置的定时任务很难防止任务的交叠运行。

所谓任务的交叠运行,是指由于定时任务运行时间较长,在 crontab 设置的运行周期不尽合理的情况下,已经启动的任务还没有结束运行,而系统又启动了新的任务去执行相同的操作。如果程序内部没有处理好数据一致性的问题,那么两个任务同时操作同一份数据,很可能会导致严重的后果。

⒈ runInBackground 和 withoutOverlapping

为了防止任务的交叠运行,Laravel 提供了 withoutOverlapping() 方法;为了能让多任务在后台并行执行,Laravel 提供了 runInBackground() 方法。

⑴ runInBackground() 方法

console 命令行中的每一个命令都代表一个 Event ,AppConsoleKernel 中的 schedule() 方法的作用只是将这些命令行代表的 Event 注册到 IlluminateConsoleSchedulingSchedule 的属性 $events 中。

// namespace IlluminateConsoleSchedulingSchedulepublic function command($command, array $parameters = []){  if (class_exists($command)) {    $command = Container::getInstance()->make($command)->getName();  }  return $this->exec(    Application::formatCommandString($command), $parameters  );}public function exec($command, array $parameters = []){  if (count($parameters)) {    $command .= ' '.$this->compileParameters($parameters);  }  $this->events[] = $event = new Event($this->eventMutex, $command, $this->timezone);  return $event;}

Event 的运行方式有两种:Foreground 和 Background 。二者的区别就在于多个 Event 是否可以并行执行。Event 默认以 Foreground 的方式运行,在这种运行方式下,多个 Event 顺序执行,后面的 Event 需要等到前面的 Event 运行完成之后才能开始执行。

但在实际应用中,我们往往是希望多个 Event 可以并行执行,此时就需要调用 Event 的 runInBackground() 方法将其运行方式设置为 Background 。
Laravel 框架对这两种运行方式的处理区别在于命令行的组装方式和回调方法的调用方式。

// namespace IlluminateConsoleSchedulingEventprotected function runCommandInForeground(Container $container){  $this->callBeforeCallbacks($container);  $this->exitCode = Process::fromShellCommandline($this->buildCommand(), base_path(), null, null, null)->run();  $this->callAfterCallbacks($container);}protected function runCommandInBackground(Container $container){  $this->callBeforeCallbacks($container);  Process::fromShellCommandline($this->buildCommand(), base_path(), null, null, null)->run();}public function buildCommand(){  return (new CommandBuilder)->buildCommand($this);}// namespace IlluminateConsoleSchedulingCommandBuilderpublic function buildCommand(Event $event){  if ($event->runInBackground) {    return $this->buildBackgroundCommand($event);  }  return $this->buildForegroundCommand($event);}protected function buildForegroundCommand(Event $event){  $output = ProcessUtils::escapeArgument($event->output);  return $this->ensureCorrectUser(    $event, $event->command.($event->shouldAppendOutput ? ' >> ' : ' > ').$output.' 2>&1'  );}protected function buildBackgroundCommand(Event $event){  $output = ProcessUtils::escapeArgument($event->output);  $redirect = $event->shouldAppendOutput ? ' >> ' : ' > ';  $finished = Application::formatCommandString('schedule:finish').' "'.$event->mutexName().'"';  if (windows_os()) {    return 'start /b cmd /c "('.$event->command.' & '.$finished.' "%errorlevel%")'.$redirect.$output.' 2>&1"';  }  return $this->ensureCorrectUser($event,    '('.$event->command.$redirect.$output.' 2>&1 ; '.$finished.' "$?") > '    .ProcessUtils::escapeArgument($event->getDefaultOutput()).' 2>&1 &' www.easck.com );}

从代码中可以看出,采用 Background 方式运行的 Event ,其命令行在组装的时候结尾会增加一个 & 符号,其作用是使命令行程序进入后台运行;另外,采用 Foreground 方式运行的 Event ,其回调方法是同步调用的,而采用 Background 方式运行的 Event ,其 after 回调则是通过 schedule:finish 命令行来执行的。

⑵ withoutOverlapping() 方法

在设置 Event 的运行周期时,由于应用场景的不断变化,很难避免某个特定的 Event 在某个时间段内需要运行较长的时间才能完成,甚至在下一个运行周期开始时还没有执行完成。如果不对这种情况进行处理,就会导致多个相同的 Event 同时运行,而如果这些 Event 当中涉及到对数据的操作并且程序中没有处理好幂等问题,很可能会造成严重后果。
为了避免出现上述的问题,Event 中提供了 withoutOverlapping() 方法,该方法通过将 Event 的 withoutOverlapping 属性设置为 TRUE ,在每次要执行 Event 时会检查当前是否存在正在执行的相同的 Event ,如果存在,则不执行新的 Event 任务。

// namespace IlluminateConsoleSchedulingEventpublic function withoutOverlapping($expiresAt = 1440){  $this->withoutOverlapping = true;  $this->expiresAt = $expiresAt;  return $this->then(function () {    $this->mutex->forget($this);  })->skip(function () {    return $this->mutex->exists($this);  });}public function run(Container $container){  if ($this->withoutOverlapping &&    ! $this->mutex->create($this)) {    return;  }  $this->runInBackground        ? $this->runCommandInBackground($container)        : $this->runCommandInForeground($container);}

⒉ mutex 互斥锁

在调用 withoutOverlapping() 方法时,该方法还实现了另外两个功能:一个是设置超时时间,默认为 24 小时;另一个是设置 Event 的回调。

⑴ 超时时间

首先说超时时间,这个超时时间并不是 Event 的超时时间,而是 Event 的属性 mutex 的超时时间。在向 IlluminateConsoleSchedulingSchedule 的属性 $events 中注册 Event 时,会调用 Schedule 中的 exec() 方法,在该方法中会新建 Event 对象,此时会向 Event 的构造方法中传入一个 eventMutex ,这就是 Event 对象中的属性 mutex ,超时时间就是为这个 mutex 设置的。而 Schedule 中的 eventMutex 则是通过实例化 CacheEventMutex 来创建的。

// namespace IlluminateConsoleSchedulingSchedule$this->eventMutex = $container->bound(EventMutex::class)                ? $container->make(EventMutex::class)                : $container->make(CacheEventMutex::class);

设置了 withoutOverlapping 的 Event 在执行之前,首先会尝试获取 mutex 互斥锁,如果无法成功获取到锁,那么 Event 就不会执行。获取互斥锁的操作通过调用 mutex 的 create() 方法完成。
CacheEventMutex 在实例化时需要传入一个 IlluminateContractsCacheFactory 类型的实例,其最终传入的是一个 IlluminateCacheCacheManager 实例。在调用 create() 方法获取互斥锁时,还需要通过调用 store() 方法设置存储引擎。

// namespace IlluminateFoundationConsoleKernelprotected function defineConsoleSchedule(){  $this->app->singleton(Schedule::class, function ($app) {    return tap(new Schedule($this->scheduleTimezone()), function ($schedule) {      $this->schedule($schedule->useCache($this->scheduleCache()));    });  });}protected function scheduleCache(){  return Env::get('SCHEDULE_CACHE_DRIVER');}// namespace IlluminateConsoleSchedulingSchedulepublic function useCache($store){  if ($this->eventMutex instanceof CacheEventMutex) {    $this->eventMutex->useStore($store);  }  /* ... ... */  return $this;}// namespace IlluminateConsoleSchedulingCacheEventMutexpublic function create(Event $event){  return $this->cache->store($this->store)->add(    $event->mutexName(), true, $event->expiresAt * 60  );}// namespace IlluminateCacheCacheManagerpublic function store($name = null){  $name = $name ?: $this->getDefaultDriver();  return $this->stores[$name] = $this->get($name);}public function getDefaultDriver(){  return $this->app['config']['cache.default'];}protected function get($name){  return $this->stores[$name] ?? $this->resolve($name);}protected function resolve($name){  $config = $this->getConfig($name);  if (is_null($config)) {    throw new InvalidArgumentException("Cache store [{$name}] is not defined.");  }  if (isset($this->customCreators[$config['driver']])) {    return $this->callCustomCreator($config);  } else {    $driverMethod = 'create'.ucfirst($config['driver']).'Driver';    if (method_exists($this, $driverMe接着我们会发现这些被 kill 掉的命令行在一段时间内无法按照设置的运行周期自动调度,其原因就在于手动 kill 掉的命令行没有调用 schedule:finish 清理缓存文件,释放互斥锁。这就导致在设置的过期时间到达之前,互斥锁会一直被占用,新的 Event 不会再次运行。

相关文章 大家在看