queue() ? variable_get('feeds_schedule_queue_num', 200) : variable_get('feeds_schedule_num', 5); foreach ($importers as $importer) { foreach ($importer->getScheduleCallbacks() as $callback) { $period = $importer->getSchedulePeriod($callback); if ($period != FEEDS_SCHEDULE_NEVER) { $result = db_query_range('SELECT feed_nid, id, callback, last_executed_time FROM {feeds_schedule} WHERE id = "%s" AND callback = "%s" AND scheduled = 0 AND (last_executed_time < %d OR last_executed_time = 0) ORDER BY last_executed_time ASC', $importer->id, $callback, FEEDS_REQUEST_TIME - $period, 0, $num); while ($job = db_fetch_array($result)) { $this->schedule($job); // @todo Add time limit. } } } } } // Unflag and post a message that we're done. variable_set('feeds_scheduler_cron', FALSE); watchdog('FeedsScheduler', 'Finished processing schedule after !time.', array('!time' => format_interval(time() - $start))); } /** * Implementation of FeedsSchedulerInterface::add(). * * Add a feed to the scheduler. * * @todo Create optional parameter $last_executed_time to pass in. Set this * value if a feed is refreshed on creation. */ public function add($importer_id, $callback, $feed_nid = 0) { $save = array( 'id' => $importer_id, 'callback' => $callback, 'feed_nid' => $feed_nid, 'last_executed_time' => 0, 'scheduled' => 0, // Means NOT scheduled at the moment. ); drupal_write_record('feeds_schedule', $save, array('id', 'callback', 'feed_nid')); if (!db_affected_rows()) { drupal_write_record('feeds_schedule', $save); } } /** * Implementation of FeedsSchedulerInterface::remove(). */ public function remove($importer_id, $callback, $feed_nid = 0) { db_query("DELETE FROM {feeds_schedule} WHERE id = '%s' AND callback = '%s' AND feed_nid = %d", $importer_id, $callback, $feed_nid); } /** * Implementation of FeedsSchedulerInterface::work(). * * Refresh a feed. * * Used as worker callback invoked from feeds_scheduler_refresh() or * if drupal_queue is not enabled, directly from $this->cron(). */ public function work($job) { $importer = feeds_importer($job['id']); try { if (FEEDS_BATCH_COMPLETE == $importer->existing()->work($job)) { $this->finished($job); } } catch (Exception $e) { watchdog('FeedsScheduler', $e->getMessage(), array(), WATCHDOG_ERROR); $this->finished($job); } // Make sure that job is not scheduled after this method has executed. $this->unschedule($job); } /** * @return * Drupal Queue if available, NULL otherwise. */ protected function queue() { if (module_exists('drupal_queue')) { drupal_queue_include(); return drupal_queue_get(FEEDS_SCHEDULER_QUEUE); } } /** * Attempt to reserve a job. If successful work it off or - if Drupal Queue is * available - queue it. * * The lock/release mechanism makes sure that an item does not get queued * twice. It has a different purpose than the FeedsSource level locking * which is in place to avoid concurrent import/clear operations on a source. * * @param $job * A job array. */ protected function schedule($job) { db_query("UPDATE {feeds_schedule} SET scheduled = %d WHERE id = '%s' AND feed_nid = %d AND callback = '%s'", FEEDS_REQUEST_TIME, $job['id'], $job['feed_nid'], $job['callback']); if (db_affected_rows()) { if ($this->queue()) { if (!$this->queue()->createItem($job)) { $this->unschedule($job); watchdog('FeedsScheduler', 'Error adding item to queue.', WATCHDOG_CRITICAL); return; } } else { $this->work($job); } } } /** * Remove a job from schedule. * * This function sets the source's scheduled bit to 0 and thus makes * it eligible for being added to the queue again. * * @param $job * A job array. */ protected function unschedule($job) { unset($job['last_executed_time']); $job = array( 'scheduled' => 0, ) + $job; drupal_write_record('feeds_schedule', $job, array('id', 'callback', 'feed_nid')); } /** * Release a job and set its last_executed_time flag. * * @param $job * A job array. */ protected function finished($job) { $job = array( 'scheduled' => 0, 'last_executed_time' => FEEDS_REQUEST_TIME, ) + $job; drupal_write_record('feeds_schedule', $job, array('id', 'callback', 'feed_nid')); } }