mapTable; } public function getMessageTable() { return $this->messageTable; } /** * sourceKey and destinationKey arrays are keyed by the field names; values * are the Drupal schema definition for the field. * * @var array */ public function getSourceKey() { return $this->sourceKey; } public function getDestinationKey() { return $this->destinationKey; } /** * Drupal connection object on which to create the map/message tables * @var DatabaseConnection */ protected $connection; public function getConnection() { return $this->connection; } /** * We don't need to check the tables more than once per request. * * @var boolean */ protected $ensured; public function __construct($machine_name, array $source_key, array $destination_key, $connection_key = 'default') { // Default generated table names, limited to 63 characters $this->mapTable = 'migrate_map_' . drupal_strtolower($machine_name); $this->mapTable = substr($this->mapTable, 0, 63); $this->messageTable = 'migrate_message_' . drupal_strtolower($machine_name); $this->messageTable = substr($this->messageTable, 0, 63); $this->sourceKey = $source_key; $this->destinationKey = $destination_key; $this->connection = Database::getConnection('default', $connection_key); // Build the source and destination key maps $this->sourceKeyMap = array(); $count = 1; foreach ($source_key as $field => $schema) { $this->sourceKeyMap[$field] = 'sourceid' . $count++; } $this->destinationKeyMap = array(); $count = 1; foreach ($destination_key as $field => $schema) { $this->destinationKeyMap[$field] = 'destid' . $count++; } $this->ensureTables(); } /** * Create the map and message tables if they don't already exist. */ protected function ensureTables() { if (!$this->ensured) { if (!$this->connection->schema()->tableExists($this->mapTable)) { // Generate appropriate schema info for the map and message tables, // and map from the source field names to the map/msg field names $count = 1; $source_key_schema = array(); $pks = array(); foreach ($this->sourceKey as $field_name => $field_schema) { $mapkey = 'sourceid' . $count++; $source_key_schema[$mapkey] = $field_schema; $pks[] = $mapkey; } $fields = $source_key_schema; // Add destination keys to map table // TODO: How do we discover the destination schema? $count = 1; foreach ($this->destinationKey as $field_name => $field_schema) { $mapkey = 'destid' . $count++; $fields[$mapkey] = $field_schema; } $fields['needs_update'] = array( 'type' => 'int', 'size' => 'tiny', 'unsigned' => TRUE, 'not null' => TRUE, 'default' => (int) FALSE, 'description' => 'Flags existing mapped data to be updated', ); $fields['last_imported'] = array( 'type' => 'int', 'unsigned' => TRUE, 'not null' => TRUE, 'default' => 0, 'description' => 'UNIX timestamp of the last time this row was imported', ); $schema = array( 'description' => t('Mappings from source key to destination key'), 'fields' => $fields, 'primary key' => $pks, ); $this->connection->schema()->createTable($this->mapTable, $schema); // Now for the message table $fields = array(); $fields['msgid'] = array( 'type' => 'serial', 'unsigned' => TRUE, 'not null' => TRUE, ); $fields += $source_key_schema; $fields['level'] = array( 'type' => 'int', 'unsigned' => TRUE, 'not null' => TRUE, 'default' => 1, ); $fields['message'] = array( 'type' => 'text', 'size' => 'medium', 'not null' => TRUE, ); $schema = array( 'description' => t('Messages generated during a migration process'), 'fields' => $fields, 'primary key' => array('msgid'), 'indexes' => array('sourcekey' => $pks), ); $this->connection->schema()->createTable($this->messageTable, $schema); } $this->ensured = TRUE; } } /** * Retrieve a row from the map table, given a source ID * * @param array $source_id */ public function getRowBySource(array $source_id) { migrate_instrument_start('mapRowBySource'); $fields = $this->destinationKeyMap; $fields[] = 'needs_update'; $fields[] = 'last_imported'; $query = $this->connection->select($this->mapTable, 'map') ->fields('map', $fields); foreach ($this->sourceKeyMap as $key_name) { $query = $query->condition("map.$key_name", array_shift($source_id), '='); } $result = $query->execute(); migrate_instrument_stop('mapRowBySource'); return $result->fetchAssoc(); } /** * Retrieve a row from the map table, given a destination ID * * @param array $source_id */ public function getRowByDestination(array $destination_id) { migrate_instrument_start('mapRowByDestination'); $fields = $this->sourceKeyMap; $fields[] = 'needs_update'; $fields[] = 'last_imported'; $query = $this->connection->select($this->mapTable, 'map') ->fields('map', $fields); foreach ($this->destinationKeyMap as $key_name) { $query = $query->condition("map.$key_name", array_shift($destination_id), '='); } $result = $query->execute(); migrate_instrument_stop('mapRowByDestination'); return $result->fetchAssoc(); } /** * Given a (possibly multi-field) destination key, return the (possibly multi-field) * source key mapped to it. * * @param array $destination_id * Array of destination key values. * @return array * Array of source key values, or NULL on failure. */ public function lookupSourceID(array $destination_id) { migrate_instrument_start('lookupSourceID'); $query = $this->connection->select($this->mapTable, 'map') ->fields('map', $this->sourceKeyMap); foreach ($this->destinationKeyMap as $key_name) { $query = $query->condition("map.$key_name", array_shift($destination_id), '='); } $result = $query->execute(); $source_id = $result->fetchAssoc(); migrate_instrument_stop('lookupSourceID'); return $source_id; } /** * Given a (possibly multi-field) source key, return the (possibly multi-field) * destination key it is mapped to. * * @param array $source_id * Array of source key values. * @return array * Array of destination key values, or NULL on failure. */ public function lookupDestinationID(array $source_id, Migration $destination_migration) { migrate_instrument_start('lookupDestinationID'); //////////////////////////////////////////////////////////////// // TODO: Temporary, until we update the cache code for the new world // Query map table and cache the result. $query = $this->connection->select($this->mapTable, 'map') ->fields('map', $this->destinationKeyMap); foreach ($this->sourceKeyMap as $key_name) { $query = $query->condition("map.$key_name", array_shift($source_id), '='); } $result = $query->execute(); $destination_id = $result->fetchAssoc(); migrate_instrument_stop('lookupDestinationID'); return $destination_id; //////////////////////////////////////////////////////////////// static $frequent = array(); // TODO: Let fieldMapping change lookup_cache_size $lookup_cache_size = 500; $frequent_key = $source_id[0]; // Initialize if needed. if (!isset($frequent[$this->mapTable])) { $frequent[$this->mapTable] = array(); } // Initialize if needed. if (!isset($destination_migration->counts['lookup_cache'][$this->mapTable])) { $destination_migration->counts['lookup_cache'][$this->mapTable] = array( 'hit' => 0, 'miss_hit' => 0, 'miss_miss' => 0, ); } // Check for cached pairs. Only works for single keys. if (count($source_id) == 1 && isset($frequent[$this->mapTable][$frequent_key])) { // Cache hit. Just return. $destination_migration->counts['lookup_cache'][$this->mapTable]['hit']++; return $frequent[$this->mapTable][$frequent_key]; } else { // We have a cache miss. // Query map table and cache the result. $query = db_select($this->mapTable, 'map') ->fields('map', $this->destinationKeyMap()); foreach ($this->sourceKeyMap() as $key_name) { $query = $query->condition("map.$key_name", array_shift($source_id), '='); } $result = $query->execute(); $destination_id = $result->fetchAssoc(); if ($destination_id === FALSE) { // We have a lookup miss. $destination_migration->counts['lookup_cache'][$this->mapTable]['miss_miss']++; /* * Handle self references like articles that have 'related articles' and * users that have 'friends'. Sometimes we can't save these references because * the referenced item is not yet migrated. In that case, just mark this row * in map table as needs_update. The entity will be fully saved in a * subsequent import. */ if ($this->machineName == $destination_migration->mapTable) { // We have an entity that's referencing own migration. $destination_migration->needsUpdate = TRUE; } } else { // We have a lookup hit. $destination_migration->counts['lookup_cache'][$this->mapTable]['miss_hit']++; // Optionally cache the lookup hit. // A lookup_cache_size of 0 would disable the cache for a given sourceMigration. if ($lookup_cache_size) { $frequent[$this->mapTable] = array($frequent_key => $destination_id) + $frequent[$this->machineName]; // Keep most recently used (MRU). Size varies between 500 and 1000 items, by default. if (count($frequent[$this->mapTable]) > 2 * $lookup_cache_size) { $frequent[$this->mapTable] = array_slice($frequent[$this->machineName], 0, $lookup_cache_size, TRUE); } } } } migrate_instrument_stop('lookupDestinationID'); return $destination_id; } /** * Called upon successfully import of one record, we record a mapping from * the source key to the destination key. Also may be called, setting the * third parameter to TRUE, to signal an existing record should be remigrated. * * @param stdClass $source_row * The raw source data. We use the key map derived from the source object * to get the source key values. * @param array $dest_ids * The destination key values. * @param boolean $needs_update * Value for the needs_update field in the map. Defaults to FALSE. */ public function saveIDMapping(stdClass $source_row, array $dest_ids, $needs_update = FALSE) { migrate_instrument_start('saveIDMapping'); // Construct the source key $keys = array(); foreach ($this->sourceKeyMap as $field_name => $key_name) { $keys[$key_name] = $source_row->$field_name; } $fields = array('needs_update' => (int)$needs_update); $count = 1; foreach ($dest_ids as $dest_id) { $fields['destid' . $count++] = $dest_id; } if ($this->trackLastImported) { $fields['last_imported'] = time(); } $this->connection->merge($this->mapTable) ->key($keys) ->fields($fields) ->execute(); migrate_instrument_stop('saveIDMapping'); } /** * Record a message in the migration's message table. * * @param array $source_key * Source ID of the record in error * @param string $message * The message to record. * @param int $level * Optional message severity (defaults to MESSAGE_ERROR). */ public function saveMessage($source_key, $message, $level = Migration::MESSAGE_ERROR) { // Source IDs as arguments $count = 1; if (is_array($source_key)) { foreach ($source_key as $key_value) { $fields['sourceid' . $count++] = $key_value; } $fields['level'] = $level; $fields['message'] = $message; $this->connection->insert($this->messageTable) ->fields($fields) ->execute(); } else { // TODO: What else can we do? print($message); } } /** * Prepares this migration to run as an update - that is, in addition to * unmigrated content (source records not in the map table) being imported, * previously-migrated content will also be updated in place. */ public function prepareUpdate() { $this->connection->update($this->mapTable) ->fields(array('needs_update' => 1)) ->execute(); } public function importedCount() { $query = $this->connection->select($this->mapTable); $query->addExpression('COUNT(*)', 'count'); $count = $query->execute()->fetchField(); return $count; } /** * Get the number of source records which failed to import. * * @return int * Number of records errored out. */ public function errorCount() { $fields = array(); foreach ($this->sourceKeyMap as $field) { $fields[] = $field; } $query = $this->connection->select($this->messageTable, 'msg') ->fields('msg', $fields) ->condition('msg.level', MigrationBase::MESSAGE_INFORMATIONAL, '<>') ->distinct() ->countQuery(); $count = $query->execute()->fetchField(); return $count; } /** * Get the number of messages saved. * * @return int * Number of messages. */ public function messageCount() { $query = $this->connection->select($this->messageTable); $query->addExpression('COUNT(*)', 'count'); $count = $query->execute()->fetchField(); return $count; } /** * Delete the map entry and any message table entries for the specified source row. * * @param array $source_key */ public function delete(array $source_key, $messages_only = FALSE) { if (!$messages_only) { $map_query = $this->connection->delete($this->mapTable); } $message_query = $this->connection->delete($this->messageTable); $count = 1; foreach ($source_key as $key_value) { if (!$messages_only) { $map_query->condition('sourceid' . $count, $key_value); } $message_query->condition('sourceid' . $count, $key_value); $count++; } if (!$messages_only) { $map_query->execute(); } $message_query->execute(); } /** * Delete the map entry and any message table entries for the specified destination row. * * @param array $destination_key */ public function deleteDestination(array $destination_key) { $map_query = $this->connection->delete($this->mapTable); $message_query = $this->connection->delete($this->messageTable); $source_key = $this->lookupSourceID($destination_key); if (!empty($source_key)) { $count = 1; foreach ($destination_key as $key_value) { $map_query->condition('destid' . $count, $key_value); $count++; } $map_query->execute(); $count = 1; foreach ($source_key as $key_value) { $message_query->condition('sourceid' . $count, $key_value); $count++; } $message_query->execute(); } } /** * Set the specified row to be updated, if it exists. */ public function setUpdate(array $source_key) { $query = $this->connection->update($this->mapTable) ->fields(array('needs_update' => 1)); $count = 1; foreach ($source_key as $key_value) { $query->condition('sourceid' . $count++, $key_value); } $query->execute(); } /** * Delete all map and message table entries specified. * * @param array $source_keys * Each array member is an array of key fields for one source row. */ public function deleteBulk(array $source_keys) { // If we have a single-column key, we can shortcut it if (count($this->sourceKey) == 1) { $sourceids = array(); foreach ($source_keys as $source_key) { $sourceids[] = $source_key; } $this->connection->delete($this->mapTable) ->condition('sourceid1', $sourceids, 'IN') ->execute(); $this->connection->delete($this->messageTable) ->condition('sourceid1', $sourceids, 'IN') ->execute(); } else { foreach ($source_keys as $source_key) { $map_query = $this->connection->delete($this->mapTable); $message_query = $this->connection->delete($this->messageTable); $count = 1; foreach ($source_key as $key_value) { $map_query->condition('sourceid' . $count++, $key_value); $message_query->condition('sourceid' . $count++, $key_value); } $map_query->execute(); $message_query->execute(); } } } /** * Clear all messages from the message table. */ public function clearMessages() { $this->connection->truncate($this->messageTable) ->execute(); } /** * Remove the associated map and message tables. */ public function destroy() { $this->connection->schema()->dropTable($this->mapTable); $this->connection->schema()->dropTable($this->messageTable); } protected $result = NULL; protected $currentRow = NULL; protected $currentKey = array(); public function getCurrentKey() { return $this->currentKey; } /** * Implementation of Iterator::rewind() - called before beginning a foreach loop. * TODO: Support idlist, itemlimit */ public function rewind() { $this->currentRow = NULL; $fields = array(); foreach ($this->sourceKeyMap as $field) { $fields[] = $field; } foreach ($this->destinationKeyMap as $field) { $fields[] = $field; } /* TODO if (isset($this->options['itemlimit'])) { $query = $query->range(0, $this->options['itemlimit']); } */ $this->result = $this->connection->select($this->mapTable, 'map') ->fields('map', $fields) ->execute(); $this->next(); } /** * Implementation of Iterator::current() - called when entering a loop * iteration, returning the current row */ public function current() { return $this->currentRow; } /** * Implementation of Iterator::key - called when entering a loop iteration, returning * the key of the current row. It must be a scalar - we will serialize * to fulfill the requirement, but using getCurrentKey() is preferable. */ public function key() { return serialize($this->currentKey); } /** * Implementation of Iterator::next() - called at the bottom of the loop implicitly, * as well as explicitly from rewind(). */ public function next() { $this->currentRow = $this->result->fetchObject(); $this->currentKey = array(); if (!is_object($this->currentRow)) { $this->currentRow = NULL; } else { foreach ($this->sourceKeyMap as $map_field) { $this->currentKey[$map_field] = $this->currentRow->$map_field; // Leave only destination fields unset($this->currentRow->$map_field); } } } /** * Implementation of Iterator::valid() - called at the top of the loop, returning * TRUE to process the loop and FALSE to terminate it */ public function valid() { // TODO: Check numProcessed against itemlimit return !is_null($this->currentRow); } }