rotected function is_queue_empty() { return empty( $this->get_queue() ); } /** * Is process running * * Check whether the current process is already running * in a background process. */ protected function is_process_running() { if ( get_site_transient( $this->get_last_run_transient_key() ) ) { // Process already running. return true; } return false; } protected function update_timestamp( $instance_id ) { $timestamp = time(); $this->start_time = $timestamp; // Set start time of current process. set_site_transient( $this->get_last_run_transient_key(), $timestamp, $this->get_instance_expiry_duration_seconds() ); $human_readable_timestamp = wp_date( 'Y-m-d H:i:s', $timestamp ); $this->logger()->info( "Setting last run timestamp for instance ID $instance_id to $human_readable_timestamp" ); } /** * Get queue * * @return array Return the first queue from the queue */ protected function get_queue() { $queue = $this->utils->get_site_option( $this->get_queue_key(), array() ); return empty( $queue ) || ! is_array( $queue ) ? array() : $queue; } /** * Handle * * Pass each queue item to the task handler, while remaining * within server memory and time limit constraints. */ protected function handle( $instance_id ) { $this->logger()->info( "Handling instance ID $instance_id." ); $this->update_timestamp( $instance_id ); $queue = $this->get_queue(); $processed_tasks_count = 0; foreach ( $queue as $key => $value ) { $this->logger()->info( "Executing task $value." ); $task = $this->task( $value ); if ( $task ) { $this->status->task_successful(); } else { $this->status->task_failed(); } if ( $this->status->is_cancelled() ) { $this->logger()->info( "While we were busy doing the task $value, the process got cancelled. Clean up and stop." ); return; } unset( $queue[ $key ] ); if ( $this->should_update_queue_after_task() ) { $this->update_queue( $queue ); } $processed_tasks_count ++; if ( $this->task_limit_reached( $processed_tasks_count ) ) { $tasks_per_request = $this->get_tasks_per_request(); $this->logger()->info( "Stopping because we are only supposed to perform $tasks_per_request tasks in a single request and we have reached that limit." ); break; } if ( $this->time_exceeded() || $this->memory_exceeded() ) { $this->logger()->warning( "Time/Memory limits reached, save the queue and dispatch a new request." ); break; } } if ( empty( $queue ) ) { $this->complete(); } else { if ( ! $this->should_update_queue_after_task() ) { $this->update_queue( $queue ); } $this->dispatch( $instance_id ); } } /** * Memory exceeded * * Ensures the process never exceeds 90% * of the maximum WordPress memory. * * @return bool */ protected function memory_exceeded() { $memory_limit = $this->server_utils->get_memory_limit() * 0.75; // 75% of max memory $current_memory = $this->server_utils->get_memory_usage(); $return = false; if ( $current_memory >= $memory_limit ) { $return = true; } return apply_filters( $this->identifier . '_memory_exceeded', $return ); } /** * Time exceeded. * * Ensures the process never exceeds a sensible time limit. * A timeout limit of 30s is common on shared hosting. * * @return bool */ protected function time_exceeded() { $finish = $this->start_time + $this->get_time_limit(); $return = false; if ( time() >= $finish ) { $return = true; } return apply_filters( $this->identifier . '_time_exceeded', $return ); } /** * Complete. * * Override if applicable, but ensure that the below actions are * performed, or, call parent::complete(). */ protected function complete() { $this->do_action( 'completed' ); $this->logger()->info( "Process completed." ); $this->cleanup(); $this->status->complete(); } /** * Schedule cron healthcheck * * @access public * * @param mixed $schedules Schedules. * * @return mixed */ public function schedule_cron_healthcheck( $schedules ) { $interval = $this->get_cron_interval_seconds(); // Adds every 5 minutes to the existing schedules. $schedules[ $this->identifier . '_cron_interval' ] = array( 'interval' => $interval, /* translators: %s: Cron interval in minutes */ 'display' => sprintf( __( 'Every %d Minutes', 'wp-smushit' ), $interval / MINUTE_IN_SECONDS ), ); return $schedules; } /** * Handle cron healthcheck * * Restart the background process if not already running * and data exists in the queue. */ public function handle_cron_healthcheck() { $mutex = new Mutex( $this->identifier . '_cron_healthcheck' ); $mutex->set_break_on_timeout( true ) ->set_timeout( 1 ) // We don't want two health checks running ->execute( function () { $this->logger()->info( "Running scheduled health check." ); if ( $this->is_process_running() ) { $this->logger()->info( "Health check: Process seems healthy, no action required." ); exit; } if ( $this->is_queue_empty() ) { $this->logger()->info( "Health check: Process not in progress but the queue is empty, no action required." ); $this->clear_scheduled_event(); exit; } if ( $this->status->is_cancelled() ) { $this->logger()->info( "Health check: Process has been cancelled already, no action required." ); $this->clear_scheduled_event(); exit; } if ( ! $this->is_revival_limit_reached() ) { $this->logger()->warning( "Health check: Process instance seems to have died. Spawn a new instance." ); $this->revive_process(); } else { $this->logger()->warning( "Health check: Process instance seems to have died. Restart disabled, marking the process as dead." ); $this->mark_as_dead(); } } ); exit; } private function revive_process() { $this->do_action( 'revived' ); $this->increment_revival_count(); $this->spawn(); } protected function mark_as_dead() { $this->do_action( 'dead' ); $this->status->mark_as_dead(); $this->cleanup(); } /** * Schedule event */ protected function schedule_event() { $hook = $this->cron_hook_identifier; if ( ! wp_next_scheduled( $hook ) ) { $interval = $this->cron_interval_identifier; $next_run = time() + $this->get_cron_interval_seconds(); wp_schedule_event( $next_run, $interval, $hook ); $this->logger()->info( "Scheduling new event with hook $hook to run $interval." ); } } /** * Clear scheduled event */ protected function clear_scheduled_event() { $hook = $this->cron_hook_identifier; $this->logger()->info( "Cancelling event with hook $hook." ); wp_clear_scheduled_hook( $hook ); } /** * Cancel Process * * Stop processing queue items, clear cronjob and delete queue. */ private function cancel_process() { $this->cleanup(); $this->logger()->info( "Process cancelled." ); } public function cancel() { // Update the cancel flag first $active_instance_id = $this->get_active_instance_id(); $this->logger()->info( "Starting cancellation (Instance: $active_instance_id)." ); $this->status->cancel(); // Since actual cancellation involves deletion of the queue and the handler // might be in the middle of updating the queue, we need to use a mutex $mutex = new Mutex( $this->get_handler_mutex_id() ); $mutex ->set_break_on_timeout( false ) // Since this is a user operation, we must cancel, even if there is a timeout ->set_timeout( $this->get_time_limit() ) // Shouldn't take more time than the time allocated to the process itself ->execute( function () use ( $active_instance_id ) { // Do this before cleanup, so we still have data available to us $this->do_action( 'cancelled' ); $this->logger()->info( "Cancelling the process (Instance: $active_instance_id)." ); $this->cancel_process(); $this->logger()->info( "Cancellation completed (Instance: $active_instance_id)." ); } ); } /** * Task * * Override this method to perform any actions required on each * queue item. Return the modified item for further processing * in the next pass through. Or, return false to remove the * item from the queue. * * @param mixed $task Queue item to iterate over. * * @return mixed */ abstract protected function task( $task ); private function is_active_instance( $instance_id ) { return $instance_id === $this->get_active_instance_id(); } /** * Save the unique ID of the process we are presuming to be dead, so we can prevent it from coming back. * * @param $instance_id * * @return void */ private function set_active_instance_id( $instance_id ) { update_site_option( $this->get_active_instance_option_id(), $instance_id ); } private function get_active_instance_id() { return get_site_option( $this->get_active_instance_option_id(), '' ); } private function get_active_instance_option_id() { return $this->identifier . '_active_instance'; } private function set_process_id( $instance_id ) { update_site_option( $this->get_process_id_option_key(), $instance_id ); } public function get_process_id() { return get_site_option( $this->get_process_id_option_key() ); } private function delete_process_id() { delete_site_option( $this->get_process_id_option_key() ); } private function get_process_id_option_key() { return $this->identifier . '_process_id'; } public function set_logger( $logger ) { $this->logger_container->set_logger( $logger ); } /** * @return Background_Logger_Container */ private function logger() { return $this->logger_container; } public function get_status() { return $this->status; } /** * @param $tasks array * * @return void */ public function start( $tasks ) { $this->do_action( 'before_start' ); $total_items = count( $tasks ); $this->status->start( $total_items ); $this->update_queue( $tasks ); // Generate ID for the whole process. $this->set_process_id( $this->generate_unique_id() ); $this->logger()->info( "Starting new process with $total_items tasks" ); // Trigger the started event before dispatching the request to ensure it is called before the completed event. $this->do_action( 'started' ); $this->spawn(); } private function mutex( $operation ) { $mutex = new Mutex( $this->get_handler_mutex_id() ); $mutex->set_break_on_timeout( true ) // Let the previous handler do its thing ->set_timeout( $this->get_lock_duration() ) ->execute( $operation ); } private function get_handler_mutex_id() { return $this->identifier . '_handler_lock'; } private function get_time_limit() { return apply_filters( $this->identifier . '_default_time_limit', 20 ); // 20 seconds } private function get_lock_duration() { $lock_duration = ( property_exists( $this, 'queue_lock_time' ) ) ? $this->queue_lock_time : 60; // 1 minute return apply_filters( $this->identifier . '_queue_lock_time', $lock_duration ); } protected function get_instance_expiry_duration_seconds() { return MINUTE_IN_SECONDS * 2; } private function get_last_run_transient_key() { return $this->identifier . '_last_run'; } private function clear_last_run_timestamp() { delete_site_transient( $this->get_last_run_transient_key() ); } private function cleanup() { // Delete options and transients $this->delete_queue(); delete_site_option( $this->get_active_instance_option_id() ); $this->delete_process_id(); $this->delete_revival_count(); $this->clear_last_run_timestamp(); // Cancel all events $this->clear_scheduled_event(); } private function task_limit_reached( $processed_tasks_count ) { if ( $this->get_tasks_per_request() === self::TASKS_PER_REQUEST_UNLIMITED ) { return false; } return $processed_tasks_count >= $this->get_tasks_per_request(); } public function get_tasks_per_request() { return $this->tasks_per_request; } /** * @param int $tasks_per_request */ public function set_tasks_per_request( $tasks_per_request ) { $this->tasks_per_request = $tasks_per_request; } private function do_action( $action ) { do_action( "{$this->identifier}_$action", $this->identifier, $this ); } private function get_cron_interval_seconds() { $minutes = property_exists( $this, 'cron_interval' ) ? $this->cron_interval : 5; $interval = apply_filters( $this->identifier . '_cron_interval', $minutes ); return $interval * MINUTE_IN_SECONDS; } public function get_identifier() { return $this->identifier; } protected function should_update_queue_after_task() { return false; } private function increment_revival_count() { $revival_count = $this->get_revival_count(); $this->set_revival_count( $revival_count + 1 ); } private function set_revival_count( $instance_id ) { update_site_option( $this->get_revival_count_option_key(), $instance_id ); } public function get_revival_count() { return (int) get_site_option( $this->get_revival_count_option_key(), 0 ); } private function delete_revival_count() { delete_site_option( $this->get_revival_count_option_key() ); } private function get_revival_count_option_key() { return $this->identifier . '_revival_count'; } protected function get_revival_limit() { return apply_filters( $this->identifier . '_revival_limit', 5 ); } protected function is_revival_limit_reached() { return $this->get_revival_count() >= $this->get_revival_limit(); } }