'Queued jobs', 'description' => 'View a list of the queued jobs.', 'page callback' => 'job_queue_list', 'access arguments' => array('view queued jobs'), ); $items['admin/settings/job_queue'] = array( 'title' => 'Job queue', 'description' => 'Set queue priorities.', 'page callback' => 'drupal_get_form', 'page arguments' => array('job_queue_settings_form'), 'access arguments' => array('administer job queue'), ); return $items; } function job_queue_perm() { return array('view queued jobs', 'administer job queue'); } function job_queue_requirements($phase) { $requirements = array(); if ($phase == 'runtime') { $count = db_result(db_query('SELECT count(*) FROM {job_queue}')); $requirements['job_queue'] = array( 'title' => t('Job queue'), 'severity' => ($count > 0) ? REQUIREMENT_WARNING : REQUIREMENT_OK, 'value' => ($count > 0) ? format_plural($count, 'There is @count queued job.', 'There are @count queued jobs.') : t('There are no queued jobs.'), ); } return $requirements; } function job_queue_theme() { return array( 'job_queue_priorities_table' => array( 'arguments' => array('element'), ) ); } /** * Add a job to the queue. The function added will be called in the order it * was added during cron. * * @param $function * The function name to call. * @param $description * A human-readable description of the queued job. * @param $arguments * Optional array of arguments to pass to the function. * @param $file * Optional file path which needs to be included for $fucntion. * @param $no_duplicate * If TRUE, do not add the job to the queue if one with the same function and * arguments already exists. */ function job_queue_add($function, $description, $arguments = array(), $file = '', $no_duplicate = FALSE) { static $priorities; if (!isset($priorities[$function])) { $priorities[$function] = db_result(db_query("SELECT priority FROM {job_queue_functions} WHERE function = '%s'", $function)); } if ($no_duplicate) { $result = db_result(db_query("SELECT jqid FROM {job_queue} WHERE function = '%s' AND arguments = '%s'", $function, serialize($arguments))); if (!empty($result)) { return; } } db_query("INSERT INTO {job_queue} (created, description, function, arguments, file, priority) VALUES (%d, '%s', '%s', '%s', '%s', %d)", time(), $description, $function, serialize($arguments), $file, $priorities[$function]); } /** * Dequeue and execute a queued job. Actions are logged via watchdog(). * * @return TRUE if a job was dequeued, reguardless of success or failure * executing. FALSE if the queue was empty. */ function job_queue_dequeue() { $job = db_fetch_object(db_query_range('SELECT jqid, description, function, arguments, file FROM {job_queue} WHERE priority <> %d ORDER BY priority, jqid', JOB_QUEUE_DO_NOT_RUN, 0, 1)); if ($job === FALSE) { return FALSE; } if (!empty($job->file)) { include_once './'. $job->file; } if (function_exists($job->function)) { $arguments = unserialize($job->arguments); call_user_func_array($job->function, $arguments); // Filter objects out of arguments watchdog('job_queue', 'Ran queued job "!description"', array('!description' => t($job->description, array_filter($arguments, 'is_scalar')))); } else { watchdog('job_queue', 'Failed to run queued job "!description" because the function %function is not defined.', array('!description' => $job->description, '%function' => $job->function), WATCHDOG_ERROR); } db_query('DELETE FROM {job_queue} WHERE jqid = %d', $job->jqid); return TRUE; } function job_queue_cron() { $job_count = db_result(db_query('SELECT count(*) FROM {job_queue}')); $max_execution_time = ini_get('max_execution_time'); if (empty($max_execution_time)) { $max_execution_time = 5 * 60; } while ($job_count > 0 && job_queue_dequeue()) { if ((timer_read('page') / 1000) > ($max_execution_time / 2)) { break; // Stop once we have used over half of the maximum execution time or exceeds the original number of jobs. } $job_count -= 1; } } function job_queue_list() { $count = db_result(db_query('SELECT count(*) FROM {job_queue}')); if ($count == 0) { $output = '

'. t('There are no queued jobs. Modules may programmatically add queued jobs.') .'

'; } else { $output = '

'. format_plural($count, 'There is @count queued job.', 'There are @count queued jobs.') .'

'; $header = array( t('Created'), t('Description'), ); $result = pager_query('SELECT created, description, arguments FROM {job_queue} ORDER BY priority, jqid', 20); $rows = array(); while ($job = db_fetch_object($result)) { // Filter objects out of arguments $rows[] = array( format_date($job->created), t($job->description, array_filter(unserialize($job->arguments), 'is_scalar')), ); } $output .= theme('table', $header, $rows); $output .= theme('pager', array(), 20); } return $output; } /** * Job queue function prioritization. * * hook_job_queue_functions() should return * array( * 'function_name' => array( * 'title' => t('Human-readable explanation'), * ), * ... * ); * * Functions which do not have entries in a implementation of this hook will * always have the default priority. */ function job_queue_settings_form() { $result = db_query('SELECT function, priority FROM {job_queue_functions}'); $priorities = array(); while ($function = db_fetch_object($result)) { $priorities[$function->function] = $function->priority; } $form = array(); $form['priorities'] = array( '#theme' => 'job_queue_priorities_table', '#tree' => TRUE, ); foreach (module_invoke_all('job_queue_functions') as $function => $metadata) { $form['priorities'][$function] = array(); $form['priorities'][$function]['title'] = array( '#value' => $metadata['title'], ); $form['priorities'][$function]['priority'] = array( '#type' => 'select', '#options' => (drupal_map_assoc(range(-10, 10)) + array(JOB_QUEUE_DO_NOT_RUN => t('Do not run'))), '#default_value' => array_key_exists($function, $priorities) ? $priorities[$function] : 0, ); } $form['priorities'][''] = array(); $form['priorities']['']['title'] = array( '#value' => t('All other functions'), ); $form['priorities']['']['priority'] = array( '#value' => '0', ); $form['submit'] = array( '#type' => 'submit', '#value' => t('Save settings'), ); return $form; } function theme_job_queue_priorities_table($element) { $headers = array( t('Job type'), t('Priority'), ); $rows = array(); foreach (element_children($element) as $function) { $rows[] = array( drupal_render($element[$function]['title']), drupal_render($element[$function]['priority']), ); } return theme('table', $headers, $rows); } function job_queue_settings_form_submit($form, &$form_state) { foreach ($form_state['values']['priorities'] as $function => $data) { db_query("DELETE FROM {job_queue_functions} WHERE function = '%s'", $function); db_query("INSERT INTO {job_queue_functions} (function, priority) VALUES ('%s', %d)", $function, $data['priority']); db_query("UPDATE {job_queue} SET priority = %d WHERE function = '%s'", $data['priority'], $function); } drupal_set_message(t('Saved job queue settings.')); }