array( 'label' => t('MongoDB field storage'), 'description' => t('Stores nodes and fields in a MongoDB database.'), ), ); } /** * Implement hook_field_storage_details(). */ function mongodb_field_storage_field_storage_details($field) { // We probably want to implement that at one point, but this is informative only. } /** * Implement hook_field_storage_create_field(). */ function mongodb_field_storage_field_storage_create_field($field) { // Nothing to do? } /** * Implement hook_field_storage_update_field(). */ function mongodb_field_storage_field_storage_update_field($field, $prior_field, $has_data) { // Nothing to do? } /** * Implement hook_field_storage_delete_field(). */ function mongodb_field_storage_field_storage_delete_field($field) { // Nothing to do? } /** * Implement hook_field_storage_load(). */ function mongodb_field_storage_field_storage_load($entity_type, $entities, $age, $fields, $options) { $load_current = $age == FIELD_LOAD_CURRENT; // Fetch information about the fields and prepare the keys. $keys = array(); $field_names = array(); $saved_fields = array(); foreach ($fields as $field_id => $ids) { $field = field_info_field_by_id($field_id); $saved_fields[$field_id] = $field; // Add this fields' id to the list. foreach ($ids as $id) { $keys[$id] = TRUE; } } // Execute the query and save the fields to the parent object. $collection = mongodb_collection($load_current ? 'fields_current' : 'fields_revision', $entity_type); $result = $collection->find(array('_id' => array('$in' => array_keys($keys)))); foreach ($result as $row) { $entity_id = $row['_id']; foreach ($saved_fields as $field_id => $field) { $field_name = $field['field_name']; $field_values = array(); if (isset($row[$field_name])) { // Restore the field structure. if ($field['cardinality'] == 1) { $language = isset($row[$field_name]['_language']) ? $row[$field_name]['_language'] : LANGUAGE_NONE; unset($row[$field_name]['_language']); $field_values[$language][0] = $row[$field_name]; } else { foreach ($row[$field_name] as $delta => $column_values) { $language = isset($column_values['_language']) ? $column_values['_language'] : LANGUAGE_NONE; unset($column_values['_language']); $field_values[$language][$delta] = $column_values; } } } $entities[$entity_id]->{$field_name} = $field_values; } } } /** * Implement hook_field_storage_write(). */ function mongodb_field_storage_field_storage_write($entity_type, $entity, $op, $fields, $entity_write = FALSE) { $wrote = &drupal_static(__FUNCTION__, array()); list($entity_id, $revision_id, $bundle) = entity_extract_ids($entity_type, $entity); // There is always a field_attach_insert/update call followed by an // entity_insert/update. field_attach does not call storage_write needlessly // so we need to make sure if the entity was not saved during field_attach, // it will be during entity write. The static is set during attach write if // one happened and always unset on entity write. if ($entity_write) { if (isset($wrote[$entity_type][$entity_id])) { unset($wrote[$entity_type][$entity_id]); return; } } else { $wrote[$entity_type][$entity_id] = TRUE; } // Create a new object. $new_entity = new stdClass(); $new_entity->_id = intval($entity_id); $new_entity->_type = $entity_type; $new_entity->_bundle = $bundle; if (isset($revision_id)) { $new_entity->_revision_id = intval($revision_id); } // Add the base table's fields to the new object. $entity_info = entity_get_info($entity_type); // Although it's rare not to define a base table, test entities for sure // don't. if (isset($entity_info['base table'])) { $table_schema = drupal_get_schema($entity_info['base table']); foreach ($table_schema['fields'] as $field_name => $column_definition) { if (isset($entity->$field_name)) { $new_entity->$field_name = _mongodb_field_storage_value($column_definition['type'], $entity->$field_name); } } } // Add the fieldapi fields to the new object foreach ($fields as $field_id) { $field = field_info_field_by_id($field_id); $field_name = $field['field_name']; $field_values = array(); if (isset($entity->$field_name)) { foreach ($entity->$field_name as $language => $values) { // According to field.test, we should not save anything for NULL. if (isset($values)) { if ($field['cardinality'] == 1) { foreach ($values[0] as $column_name => $column_value) { if (isset($field['columns'][$column_name])) { $field_values[$column_name] = _mongodb_field_storage_value($field['columns'][$column_name]['type'], $column_value); } if ($language != LANGUAGE_NONE) { $field_values['_language'] = $language; } } } else { // Collapse deltas. $values = array_values($values); if ($field['cardinality'] > 1 && count($values) > $field['cardinality']) { throw new MongodbStorageException('Invalid delta, not saving'); } foreach ($values as $delta => $column_values) { $store_values = array(); foreach ($column_values as $column_name => $column_value) { if (isset($field['columns'][$column_name])) { $store_values[$column_name] = _mongodb_field_storage_value($field['columns'][$column_name]['type'], $column_values[$column_name]); } } // Collapse the field structure. if ($language != LANGUAGE_NONE) { $store_values['_language'] = $language; } $field_values[] = $store_values; } } } } } $new_entity->$field_name = empty($field_values) ? NULL : $field_values; } // Save the object. $collection = mongodb_collection('fields_current', $entity_type); $collection->save($new_entity); $slaves = variable_get('mongodb_slave'); if ($slaves) { $return = $collection->db->runCommand(array( 'getlasterror' => 1, 'w' => $slaves, 'wtimeout' => 3000, )); if ($return['wtimeout']) { header('HTTP/1.0 500 Database write failed'); exit; } } if (isset($revision_id)) { $new_entity->_id = $revision_id; mongodb_collection('fields_revision', $entity_type)->save($new_entity); } } function _mongodb_field_storage_value($type, $value) { switch ($type) { case 'int': case 'serial': return (int) $value; case 'float': return (float) $value; default: return $value; } } /** * Implement hook_field_storage_delete(). * * This function deletes data for all fields for an entity from the database. */ function mongodb_field_storage_field_storage_delete($entity_type, $entity, $fields) { list($entity_id, $revision_id, $bundle) = entity_extract_ids($entity_type, $entity); mongodb_collection('fields_current', $entity_type)->remove(array( '_id' => (int) $entity_id, )); $entity_info = entity_get_info($entity_type); mongodb_collection('fields_revision', $entity_type)->remove(array( $entity_info['entity keys']['id'] => (int) $entity_id, )); } /** * Implement hook_field_storage_delete_revision(). * * This function actually deletes the data from the database. */ function mongodb_field_storage_field_storage_delete_revision($entity_type, $entity, $fields) { list($entity_id, $revision_id, $bundle) = entity_extract_ids($entity_type, $entity); mongodb_collection('fields_revision', $entity_type)->remove(array( '_id' => (int) $revision_id, )); } /** * Implement hook_field_storage_delete_instance(). * * This function simply marks for deletion all data associated with the field. */ function mongodb_field_storage_field_storage_delete_instance($instance) { // TODO: figure out what to do. } class MongodbStorageException extends Exception {} /** * Implements hook_entity_query_alter(). */ function mongodb_field_storage_entity_query_alter($query) { $query->executeCallback = 'mongodb_field_storage_query'; } function mongodb_field_storage_query($query) { $find = array(); if (isset($query->entityConditions['entity_type'])) { $entity_type = $query->entityConditions['entity_type']['value']; unset($query->entityConditions['entity_type']); } else { if (count($query->fields) == 1) { $field = $query->fields[0]; $field_has_data_query = new EntityFieldQuery; // Compile the same query as field_has_data(). $field_has_data_query ->fieldCondition($field) ->range(0, 1) ->count(); $field_has_data_query->executeCallback = 'mongodb_field_storage_query'; $success = TRUE; // Check that the original query is identical to this one. foreach (get_object_vars($field_has_data_query) as $key => $value) { if ($value != $query->$key) { $success = FALSE; } } // If the query is coming from field_has_data() then try it with every // entity type. if ($success) { foreach ($field['bundles'] as $entity_type => $data) { if (mongodb_collection('fields_current', $entity_type)->find()->limit(1)->count(TRUE)) { return TRUE; } } return FALSE; } } throw new EntityFieldQueryException('No entity_type'); } $entity_map = array('entity_id' => '_id', 'revision_id' => '_revision_id', 'bundle' => '_bundle'); foreach ($query->entityConditions as $field => $data) { $value = $field == 'bundle' ? $data['value'] : intval($data['value']); $find[$entity_map[$field]] = _mongodb_field_storage_query_value($value, $data['operator']); } foreach ($query->fieldConditions as $data) { $find[$data['field']['field_name'] . '.' . $data['column']] = _mongodb_field_storage_query_value($data['value'], $data['operator']); } foreach ($query->propertyConditions as $data) { $find[$data['column']] = _mongodb_field_storage_query_value($data['value'], $data['operator']); } if ($query->age == FIELD_LOAD_CURRENT) { $collection = mongodb_collection('fields_current', $entity_type); $id_key = '_id'; } else { $collection = mongodb_collection('fields_revision', $entity_type); $id_key = '_revision_id'; } if ($query->count && !$query->range) { return $collection->count($find); } $cursor = $collection->find($find); if ($query->range) { $cursor->skip($query->range['start'])->limit($query->range['length']); } if ($query->count) { return $cursor->count(TRUE); } else { $sort = array(); foreach ($query->entityOrder as $name => $direction) { $sort[$entity_map[$name]] = $direction == 'ASC' ? 1 : -1; } foreach ($query->propertyOrder as $data) { $sort[$data['column']] = $data['direction'] == 'ASC' ? 1 : -1; } foreach ($query->fieldOrder as $data) { $sort[$data['field']['field_name'] . '.' . $data['column']] = $data['direction'] == 'ASC' ? 1 : -1; } if ($sort) { $cursor->sort($sort); } } $return = array(); foreach ($cursor as $row) { $row += array('_revision_id' => NULL); $entity = entity_create_stub_entity($entity_type, array($row['_id'], $row['_revision_id'], $row['_bundle'])); $return[$entity_type][$row[$id_key]] = $entity; } return $return; } function _mongodb_field_storage_query_value($value, $operator) { if (!isset($operator)) { $operator = is_array($value) ? 'IN' : '='; } if ($operator == 'LIKE') { $percent = strpos($value, '%'); if ($percent == strlen($value) - 1) { return new MongoRegex('/^' . substr($value, 0, -1) . '/'); } elseif ($percent === FALSE) { return new MongoRegex('/^' . $value . '$/'); } else { return new MongoRegex('/' . str_replace('%', '.*', $value) . '$/'); } } switch ($operator) { case '=' : return $value; case 'IN' : return array('$in' => $value); case 'NOT IN' : return array('$nin' => $value); case 'ALL' : return array('$all' => $value); case '<' : return array('$lt' => $value); case '>' : return array('$gt' => $value); case '<=' : return array('$lte' => $value); case '>=' : return array('$gte' => $value); case '!=':case '<>': return array('$ne' => $value); case 'STARTS_WITH' : return new MongoRegex('/^' . $value . '/'); case 'CONTAINS' : return new MongoRegex('/' . $value . '/'); case 'BETWEEN' : return array('$gte' => $value[0], '$lte' => $value[1]); case 'IS NULL' : return NULL; default : debug_print_backtrace(); throw new EntityFieldQueryException("$operator not implemented"); } } /** * Implement hook_field_attach_rename_bundle(). */ function mongodb_field_storage_field_attach_rename_bundle($entity_type, $bundle_old, $bundle_new) { mongodb_collection('fields_current', $entity_type)->update(array('_bundle' => $bundle_old), array('_bundle' => $bundle_new), array('multiple' => TRUE)); mongodb_collection('fields_revision', $entity_type)->update(array('_bundle' => $bundle_old), array('_bundle' => $bundle_new), array('multiple' => TRUE)); } /** * Implements hook_entity_insert(). */ function mongodb_field_storage_entity_insert($entity, $entity_type) { mongodb_field_storage_field_storage_write($entity_type, $entity, NULL, array(), TRUE); } /** * Implements hook_entity_update(). */ function mongodb_field_storage_entity_update($entity, $entity_type) { mongodb_field_storage_field_storage_write($entity_type, $entity, NULL, array(), TRUE); } /** * Implements hook_field_attach_delete. */ function mongodb_field_storage_field_attach_delete($entity_type, $entity) { list($entity_id, $revision_id) = entity_extract_ids($entity_type, $entity); mongodb_collection('fields_current', $entity_type)->remove(array('_id' => (int) $entity_id)); if ($revision_id) { mongodb_collection('fields_revision', $entity_type)->remove(array('_id' => (int) $revision_id)); } }