getScheduleCallbacks() as $callback) { // Check whether jobs are scheduled. $period = $importer->getSchedulePeriod($callback); if ($period != FEEDS_SCHEDULE_NEVER) { // Refresh feeds that have a refresh time older than now minus // refresh period. $time = FEEDS_REQUEST_TIME - $period; $result = db_query_range('SELECT feed_nid, id AS importer_id, callback, last_scheduled_time FROM {feeds_schedule} WHERE id = "%s" AND callback = "%s" AND scheduled = 0 AND (last_scheduled_time < %d OR last_scheduled_time = 0) ORDER BY last_scheduled_time ASC', $importer->id, $callback, $time, 0, $num); while ($feed_info = db_fetch_array($result)) { // If drupal_queue is present, add to queue, otherwise work off // immediately. if ($use_queue) { if ($queue->createItem($feed_info)) { $this->flag($feed_info['importer_id'], $feed_info['callback'], $feed_info['feed_nid']); } else { watchdog('FeedsScheduler', 'Error adding item to queue.', WATCHDOG_ALERT); } } else { $this->flag($feed_info['importer_id'], $feed_info['callback'], $feed_info['feed_nid']); $this->work($feed_info); } } } } } } // Unflag and post a message that we're done. variable_set('feeds_scheduler_cron', FALSE); watchdog('FeedsScheduler', 'Finished processing schedule after !time.', array('!time' => format_interval(time() - $start))); } /** * Implementation of FeedsSchedulerInterface::add(). * * Add a feed to the scheduler. * * @todo Create optional parameter $last_scheduled_time to pass in. Set this * value if a feed is refreshed on creation. * @todo Create an abstract interface for items that can be added? */ public function add($importer_id, $callback, $feed_nid = 0) { $save = array( 'id' => $importer_id, 'callback' => $callback, 'feed_nid' => $feed_nid, 'last_scheduled_time' => 0, 'scheduled' => 0, // Means NOT scheduled at the moment. ); drupal_write_record('feeds_schedule', $save, array('id', 'callback', 'feed_nid')); if (!db_affected_rows()) { drupal_write_record('feeds_schedule', $save); } } /** * Implementation of FeedsSchedulerInterface::remove(). */ public function remove($importer_id, $callback, $feed_nid = 0) { db_query('DELETE FROM {feeds_schedule} WHERE id = "%s" AND callback = "%s" AND feed_nid = %d', $importer_id, $callback, $feed_nid); } /** * Implementation of FeedsSchedulerInterface::work(). * * Refresh a feed. * * Used as worker callback invoked from feeds_scheduler_refresh() or * if drupal_queue is not enabled, directly from $this->cron(). */ public function work($feed_info) { $importer = feeds_importer($feed_info['importer_id']); // Remove scheduled flag, if we fail after this we'd like to try again asap. $this->unflag($feed_info['importer_id'], $feed_info['callback'], $feed_info['feed_nid']); try { $importer->work($feed_info); } catch (Exception $e) { watchdog('FeedsScheduler', $e->getMessage(), array(), WATCHDOG_ERROR); } } /** * Helper function to flag a feed scheduled. * * This function sets the feed's scheduled bit to 1 and updates * last_scheduled_time to FEEDS_REQUEST_TIME. * * @param $id * Id of the importer configuration. * @param $callback * Callback of the job. * @param $feed_nid * Identifier of the feed node. */ protected function flag($id, $callback, $feed_nid) { $save = array( 'id' => $id, 'callback' => $callback, 'feed_nid' => $feed_nid, 'last_scheduled_time' => FEEDS_REQUEST_TIME, 'scheduled' => 1, ); drupal_write_record('feeds_schedule', $save, array('id', 'callback', 'feed_nid')); } /** * Helper function to flag a feed unscheduled. * * This function sets the feed's scheduled bit to 0 and thus makes * it eligible for being added to the queue again. * * @param $id * Id of the importer configuration. * @param $callback * Callback of the job. * @param $feed_nid * Identifier of the feed node. */ protected function unflag($id, $callback, $feed_nid) { $save = array( 'id' => $id, 'callback' => $callback, 'feed_nid' => $feed_nid, 'scheduled' => 0, ); drupal_write_record('feeds_schedule', $save, array('id', 'callback', 'feed_nid')); } }