$info) { if ($info['enabled']) { if (($now - $info["last"]) >= $info["calc_frequency"]) { $cmd = hosting_generate_command("hosting", $queue, node_load(HOSTING_OWN_PLATFORM), array(), array('items' => $info['calc_items'], 'background' => TRUE)); hosting_log('hosting_dispatch', t("dispatching command !cmd", array('!cmd' => $cmd))); system($cmd); } else { hosting_log(false, t("too early for queue @queue", array('@queue' => $queue))); } } else { hosting_log(false, t("queue @queue disabled", array('@queue' => $queue))); } } } else { hosting_log('hosting_dispatch', t("dispatching disabled")); } exit(PROVISION_SUCCESS); } /** * Retrieve a list of queues that need to be dispatched * * Generate a list of queues, and the frequency / amount of items * that need to be processed for each of them. */ function hosting_get_queues($refresh = false) { static $cache = null; if (is_null($cache) || $refresh) { $cache = array(); $defaults = array( 'type' => 'serial', 'max_threads' => 6, 'threshold' => '100', 'min_threads' => 1, 'timeout' => strtotime("10 minutes", 0), 'frequency' => strtotime("5 minutes", 0), 'items' => 5, 'enabled' => TRUE, 'singular' => t('item'), 'plural' => t('items') ); $queues = module_invoke_all("hosting_queues"); foreach ($queues as $key => $queue) { $queue = array_merge($defaults, $queue); // Configurable settings. $configured = array( 'frequency' => variable_get('hosting_queue_' . $key . '_frequency', $queue['frequency']), 'items' => variable_get('hosting_queue_' . $key . '_items', $queue['items']), 'enabled' => variable_get('hosting_queue_' . $key . '_enabled', $queue['enabled']), 'last_run' => variable_get('hosting_queue_' . $key . '_last_run', false), 'running' => variable_get('hosting_queue_' . $key . '_running', false), 'interval' => variable_get('hosting_queue_' . $key . '_interval', false), ); $queue = array_merge($queue, $configured); if ($queue['type'] == 'batch') { $threads = $queue['total_items'] / $queue['threshold']; if ($threads <= $queue['min_threads']) { $threads = $queue['min_threads']; } elseif ($thread > $queue['max_threads']) { $threads = $queue['max_threads']; } $queue['calc_threads'] = $threads; $queue['calc_frequency'] = ceil($queue['frequency'] / $threads); $queue['calc_items'] = ceil($queue['total_items'] / $threads); } else { $queue['calc_frequency'] = $queue['frequency']; $queue['calc_items'] = $queue['items']; } $queue['last'] = variable_get('hosting_queue_' . $key . '_last_run', 0); $queue['running'] = variable_get('hosting_queue_' . $key . '_running', 0); $queues[$key] = $queue; } $cache = $queues; } return $cache; } /** * Implementation of hook_hosting_queues * * Return a list of queues that this module needs to manage. */ function hosting_hosting_queues() { $queue['tasks'] = array( 'name' => t('Task queue'), 'description' => t('Process the queue of outstanding hosting tasks.'), 'type' => 'serial', # run queue sequentially. always with the same parameters. 'frequency' => strtotime("1 minute", 0), # run queue every minute. 'items' => 20, # process 20 queue items per execution. 'total_items' => hosting_task_count(), 'singular' => t('task'), 'plural' => t('tasks'), ); return $queue; } /** * Run a queue specified by hook_hosting_queues * * Run an instance of a queue processor. This function contains all the book keeping * functionality needed to ensure that the queues are running as scheduled. */ function hosting_run_queue() { global $args; $queue = $args['commands'][1]; $count = drush_get_option(array('i', 'items'), 5); # process a default of 5 items at a time. variable_set('hosting_queue_' . $queue . '_last_run', $t = mktime()); variable_set('hosting_queue_' . $queue . '_running', $t); $func = "hosting_" . $queue . "_queue"; if (function_exists($func)) { $func($count); } variable_del('hosting_queue_' . $queue . '_running'); exit(PROVISION_SUCCESS); } /** * Generate a drush command. * * Generates a drush command for either the provision or hosting system. */ function hosting_generate_command($system, $command, $ref, $args = array(), $options = array()) { $data = module_invoke_all('provision_args', $ref, $command); $data = array_merge($data, $options); foreach ($data as $key => $value) { if (!is_array($value) && !is_object($value) && !is_null($value) && ($value != '')) { if (substr($key, 0, 1) == '#') { $args[$key] = $value; } else { $options[$key] = $value; } } } ksort($args); if ($options['backend']) { $backend = " --backend "; unset($options['backend']); } $arg_str = ''; foreach ($args as $key => $value) { $arg_str .= ' ' . escapeshellarg(filter_xss($value)); } $option_str = ''; foreach ($options as $key => $value) { if ($value !== TRUE) { $option_str .= " --$key=" . escapeshellarg(filter_xss($value)); } else { $option_str .= " --$key"; } } $drush_path = sprintf("%s/drush.php", $data['publish_path']); // @TODO: Implement proper multi platform / multi server support. $cmd = sprintf(escapeshellcmd("%s %s %s %s --root=%s %s"), escapeshellcmd($drush_path) , escapeshellarg($system), escapeshellarg($command), $arg_str, escapeshellarg($data['publish_path']), $option_str); if ($backend) { $cmd .= $backend; } if ($options['background']) { $cmd .= " &"; } return $cmd; } /** * Retrieve a list of outstanding tasks. * * @param limit * The amount of items to return. * @return * An associative array containing task nodes, indexed by node id. */ function _hosting_get_new_tasks($limit = 20) { $return = array(); $result = db_query("SELECT nid FROM {hosting_task_queue} WHERE status=%d ORDER BY timestamp, nid ASC LIMIT %d", PROVISION_QUEUED, $limit); while ($node = db_fetch_object($result)) { $return[$node->nid] = node_load($node->nid); } return $return; } /** * Check if a certain task is still in the queue * * @param nid * The nid of the task being checked * @return * A boolean depending on whether the task is still queued. */ function _hosting_task_in_queue($nid) { return (db_result(db_query("SELECT nid FROM {hosting_task_queue} WHERE status=%d AND nid=%d", PROVISION_QUEUED, $nid))) ? TRUE : FALSE; } /** * Process the hosting task queue. * * Iterates through the list of outstanding tasks, and execute the commands on the back end. */ function hosting_tasks_queue($count = 20) { global $provision_errors; hosting_log('hosting_tasks', t("Running tasks queue")); $tasks = _hosting_get_new_tasks($count); foreach ($tasks as $task) { // create a new revision $task->changed = mktime(); $task->executed = mktime(); $task->revision = true; node_save($task); hosting_task_log($task->vid, "queue", t("Task starts processing")); // execute the hosting_pre_${task_type} hook. module_invoke_all("hosting_pre_" . $task->task_type, $task); // @todo Allow multiple commands to be run. // For now tasks just map to one backend command, but that might need to change with additional complexity. $cmd = hosting_generate_command("provision", $task->task_type, $task, array(), array('backend' => TRUE)); hosting_task_log($task->vid, 'command', 'Running:' . escapeshellcmd("$cmd")); $proc = null; $proc = provision_proc_open($cmd); $code = $proc['code']; $return = $proc['output']; if (!$data = provision_parse_output($return)) { hosting_task_log($task->vid, 'error', t("The command could not be executed succesfully (returned: !return, code: %code)", array("!return" => $return, "%code" => $code))); $code = PROVISION_FRAMEWORK_ERROR; } else { foreach ((array) $data['log'] as $log) { hosting_task_log($task->vid, $log['type'], $log['message'], $log['severity'], $log['timestamp']); } # Drupal message errors. foreach ((array) $data['messages']['error'] as $error) { if (preg_match("/^warning:/", $error)) { hosting_task_log($task->vid, "warning", ereg_replace("/^warning: /", '', $error), 0, $log['timestamp']); } elseif (preg_match("/^user warning:/", $error)) { hosting_task_log($task->vid, "warning", ereg_replace("/^user warning: /", '', $error), 0, $log['timestamp']); } } } // record status $task->task_status = $code; // New revision is created at the beginning of function. $task->revision = false; node_save($task); if ($code == PROVISION_SUCCESS) { # The task has been successful. Run the post hook. module_invoke_all("hosting_post_" . $task->task_type, $task, $data); } else { module_invoke_all("hosting_failed_". $task->task_type, $task, $data, $code); } // remove from queue hosting_task_log($task->vid, "queue", t("Removing task from hosting queue"), 0); db_query("UPDATE {hosting_task_queue} SET status=%d WHERE nid=%d", $code, $task->nid); } } function _hosting_queues_clean_output($return) { return filter_xss($return, array()); } /** * Page callback * * Configure the frequency of tasks. */ function hosting_queues_configure() { drupal_add_css(drupal_get_path('module', 'hosting') . '/hosting.css'); $units = array( strtotime("1 second", 0) => t("Seconds"), strtotime("1 minute", 0) => t("Minutes"), strtotime("1 hour", 0) => t("Hours"), strtotime("1 day", 0) => t("Days"), strtotime("1 week", 0) => t("Weeks"), ); $queues = hosting_get_queues(); $form['#tree'] = TRUE; foreach ($queues as $queue => $info) { $form[$queue]['description'] = array( '#type' => 'item', '#value' => $info['name'], '#description' => $info['description'] ); $form[$queue]["enabled"] = array( '#type' => 'checkbox', '#default_value' => $info['enabled'] ); $form[$queue]["last_run"] = array( '#value' => hosting_format_interval(variable_get('hosting_queue_' . $queue . '_last_run', false)) ); $form[$queue]['frequency']['#prefix'] = "
"; $form[$queue]['frequency']['#suffix'] = "
"; if ($info['type'] == 'batch') { $form[$queue]['frequency']['items'] = array( '#value' => t('%count %items every ', array("%count" => $info['total_items'], "%items" => format_plural($info['total_items'], $info['singular'], $info['plural']))), ); } else { $form[$queue]['frequency']['items'] = array( '#type' => 'textfield', '#size' => 3, '#maxlength' => 3, '#default_value' => $info['items'], '#suffix' => t(' %items every ', array('%items' => $info['plural'])), ); } foreach (array_reverse(array_keys($units)) as $length) { $unit = $units[$length]; if (!($info['frequency'] % $length)) { $frequency_ticks = $info['frequency'] / $length; $frequency_length = $length; break; } } $form[$queue]['frequency']["ticks"] = array( '#type' => 'textfield', '#default_value' => $frequency_ticks, '#maxlength' => 5, '#size' => 5 ); $form[$queue]['frequency']["unit"] = array( '#type' => 'select', '#options' => $units, '#default_value' => $frequency_length, ); } $form['submit'] = array('#type' => 'submit', '#value' => t('Save changes')); return $form; } function theme_hosting_queues_configure($form) { $queues = hosting_get_queues(); $rows = array(); $header = array('', t('Description'), array('data' => t('Frequency'), 'class' => 'hosting-queue-frequency-head'), t('Last run'),); foreach ($queues as $key => $info) { $row = array(); $row[] = drupal_render($form[$key]['enabled']); $row[] = drupal_render($form[$key]['description']); $row[] = drupal_render($form[$key]['frequency']); $row[] = drupal_render($form[$key]['last_run']); $rows[] = $row; } $output = theme('table', $header, $rows); $output .= drupal_render($form['submit']); $output .= drupal_render($form); return $output; } function hosting_queues_configure_submit($form_id, $values) { foreach (hosting_get_queues() as $queue => $info) { if ($values[$queue]) { variable_set("hosting_queue_" . $queue . "_enabled", $values[$queue]['enabled']); variable_set("hosting_queue_" . $queue . "_frequency", $values[$queue]['frequency']['ticks'] * $values[$queue]['frequency']['unit']); if ($info['type'] == 'serial') { variable_set("hosting_queue_" . $queue . "_items", $values[$queue]['items']); } } } } function _hosting_dispatch_cmd() { $drush_path = sprintf("%s/drush.php", PROVISION_DOCROOT_PATH); $cmd = sprintf("php %s hosting dispatch --root=%s", escapeshellarg($drush_path), escapeshellarg(PROVISION_DOCROOT_PATH)); return $cmd; } function hosting_queues_cron_cmd() { return sprintf("*/1 * * * * (%s)", _hosting_dispatch_cmd()); }