query = $query; $this->countQuery = $count_query; $this->configuration = $configuration; $this->fields = $fields; $this->batchSize = isset($options['batch_size']) ? $options['batch_size'] : 500; } /** * Return a string representing the source query. * * @return string */ public function __toString() { return (string) $this->query; } /** * Connect lazily to the DB server. */ public function connect() { if (!isset($this->connection)) { if (isset($this->configuration['port'])) { $host = $this->configuration['servername'] . ':' . $this->configuration['port']; } else { $host = $this->configuration['servername']; } $this->connection = mssql_connect( $host, $this->configuration['username'], $this->configuration['password'], TRUE); if (isset($this->configuration['database'])) { return mssql_select_db($this->configuration['database'], $this->connection); } } } /** * Returns a list of fields available to be mapped from the source query. * * @return array * Keys: machine names of the fields (to be passed to addFieldMapping) * Values: Human-friendly descriptions of the fields. */ public function fields() { // The fields are defined in the Constructor for this plugin. return $this->fields; } /** * Return a count of all available source records. * * @param boolean $refresh * @todo: Not implemented. */ public function count($refresh = FALSE) { if ($this->connect()) { $result = mssql_query($this->countQuery); return reset(mssql_fetch_object($result)); } else { return FALSE; } } /** * Implementation of Iterator::rewind() - called before beginning a foreach loop. */ public function rewind() { $migration = Migration::currentMigration(); $this->result = NULL; $this->currentRow = NULL; $this->numProcessed = 0; $map = $migration->getMap(); $keys = array(); foreach ($map->getSourceKey() as $field_name => $field_schema) { // Allow caller to provide an alias to table containing the primary key. // TODO: Alias should be determined automatically if (!empty($field_schema['alias'])) { $field_name = $field_schema['alias'] . '.' . $field_name; } $keys[] = $field_name; } /* * Replace :criteria placeholder with idlist or highwater clauses. We * considered supporting both but it is not worth the complexity. Run twice * instead. */ $idlist = $migration->getOption('idlist'); if (isset($idlist)) { // TODO: Sanitize. not critical as this is admin supplied data in drush. // Quote values. Works fine for numeric keys as well. foreach (explode(',', $idlist) as $id) { $quoted[] = "'$id'"; } $this->query = str_replace(':criteria', $keys[0] . ' IN (' . implode(', ', $quoted) . ')', $this->query); } else { $highwaterField = $migration->getHighwaterField(); if (isset($highwaterField['name']) && $highwater = $migration->getHighwater()) { if (empty($highwaterField['alias'])) { $highwater_name = $highwaterField['name']; } else { $highwater_name = $highwaterField['alias'] . '.' . $highwaterField['name']; } $this->query = str_replace(':criteria', "$highwater_name > '$highwater'", $this->query); } else { // No idlist or highwater. Replace :criteria placeholder with harmless WHERE // clause instead of empty since we don't know if an AND follows. $this->query = str_replace(':criteria', '1=1', $this->query); } } // @todo: can't do this when not map is non joinable. // Assume query starts with SELECT. // if ($itemlimit = $this->migration->getOption('itemlimit')) { // $this->query = substr_replace($this->query, "SELECT TOP $itemlimit", 0, 6); // } migrate_instrument_start('mssql_query'); $this->connect(); $migration->showMessage($this->query, 'debug'); $this->result = mssql_query($this->query, $this->connection, $this->batchSize); migrate_instrument_stop('mssql_query'); $this->next(); } /** * Implementation of Iterator::current() - called when entering a loop * iteration, returning the current row */ public function current() { return $this->currentRow; } /** * Implementation of Iterator::key - called when entering a loop iteration, returning * the key of the current row. It must be a scalar - we will serialize * to fulfill the requirement, but using getCurrentKey() is preferable. * * @todo: sql.inc does not implement this method. */ public function key() { return serialize($this->currentKey); } /** * Implementation of Iterator::next() - called at the bottom of the loop implicitly, * as well as explicitly from rewind(). */ public function next() { $this->currentRow = NULL; $this->currentKey = NULL; $migration = Migration::currentMigration(); // If we couldn't add the itemlimit to the query directly, enforce it here if (!$this->mapJoinable) { $itemlimit = $migration->getOption('itemlimit'); if ($itemlimit && $this->numProcessed >= $itemlimit) { return; } } // get next row migrate_instrument_start('MigrateSourceMSSQL next'); $map = $migration->getMap(); while (TRUE) { $this->currentRow = mssql_fetch_object($this->result); if (!is_object($this->currentRow)) { // Might be totally out of data, or just out of this batch - request another batch and see. migrate_instrument_start('mssql_fetch_batch'); mssql_fetch_batch($this->result); migrate_instrument_stop('mssql_fetch_batch'); if (!$this->currentRow = mssql_fetch_object($this->result)) { break; } } foreach ($map->getSourceKey() as $field_name => $field_schema) { $this->currentKey[$field_name] = $this->currentRow->$field_name; } if (!$this->mapJoinable) { // Check the map - if it's already mapped, and not marked for update, skip it $map_row = $migration->getMap()->getRowBySource($this->currentKey); if ($map_row && $map_row['needs_update'] == 0) { continue; } // Add map info to the row, if present if ($map_row) { foreach ($map_row as $field => $value) { $field = 'migrate_map_' . $field; $this->currentRow->$field = $value; } } } // Add some debugging, just for the first row. if (empty($this->numProcessed)) { $migration->showMessage(print_r($this->currentRow, TRUE)); } // Allow the Migration to prepare this row. prepareRow() can return boolean // FALSE to stop processing this row. To add/modify fields on the // result, modify $row by reference. $return = TRUE; if (method_exists($migration, 'prepareRow')) { $return = $migration->prepareRow($this->currentRow); } if ($return !== FALSE) { $this->numProcessed++; break; } } if (!is_object($this->currentRow)) { $this->currentRow = NULL; } migrate_instrument_stop('MigrateSourceMSSQL next'); } /** * Implementation of Iterator::valid() - called at the top of the loop, returning * TRUE to process the loop and FALSE to terminate it */ public function valid() { return !is_null($this->currentRow); } }