Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed Location Option for Create Federated Catalog CLI Command #3012

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,10 +821,10 @@ def export_assessment(w: WorkspaceClient, prompts: Prompts):


@ucx.command
def create_federated_catalog(w: WorkspaceClient, _: Prompts):
def create_federated_catalog(w: WorkspaceClient, prompts: Prompts):
"""(Experimental) Create federated catalog from current workspace Hive Metastore."""
ctx = WorkspaceContext(w)
ctx.federation.register_internal_hms_as_federated_catalog()
ctx.federation.register_internal_hms_as_federated_catalog(prompts)


@ucx.command
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/contexts/workspace_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def federation(self):
self.workspace_client,
self.external_locations,
self.workspace_info,
self.catalog_schema,
self.config.enable_hms_federation,
)

Expand Down
40 changes: 39 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/catalog_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class Catalog:
name: str
"""The catalog name"""

connection_name: str | None = None
"""The connection name for the catalog used for federated catalogs."""

@property
def full_name(self) -> str:
"""The full name of the catalog.
Expand Down Expand Up @@ -134,6 +137,35 @@ def create_ucx_catalog(self, prompts: Prompts, *, properties: dict[str, str] | N
"""
self._create_catalog_validate(Catalog(self._ucx_catalog), prompts, properties=properties)

def create_federated_catalog(
self,
prompts: Prompts,
catalog_name: str,
connection_name: str,
authorized_paths: str,
*,
properties: dict[str, str] | None = None,
) -> None:
"""Create the HMS Federated catalog.

Args:
prompts : Prompts
The prompts object to use for interactive input.
catalog_name : str
The catalog name to use for the federated catalog.
connection_name : str
The connection name to use for the federated catalog.
authorized_paths : str
The authorized paths to pass to the catalog.
properties : (dict[str, str] | None), default None
The properties to pass to the catalog. If None, no properties are passed.
"""

options = {}
options["authorized_paths"] = authorized_paths
self._create_catalog_validate(Catalog(catalog_name, connection_name), prompts, options=options,
properties=properties)

def create_all_catalogs_schemas(self, prompts: Prompts, *, properties: dict[str, str] | None = None) -> None:
"""Create all UC catalogs and schemas reference by the table mapping file.

Expand Down Expand Up @@ -176,6 +208,7 @@ def _create_catalog_validate(
prompts: Prompts,
*,
properties: dict[str, str] | None,
options: dict[str, str] | None,
) -> Catalog:
catalog_existing = self._get_catalog(catalog)
if catalog_existing:
Expand All @@ -192,7 +225,8 @@ def _create_catalog_validate(
attempts -= 1
if attempts == 0:
raise NotFound(f"Failed to validate location for catalog: {catalog.name}")
return self._create_catalog(catalog, catalog_storage, properties=properties)
return self._create_catalog(catalog, catalog_storage, connection_name=catalog.connection_name,
properties=properties, options=options)

def _validate_location(self, location: str) -> bool:
if location == "metastore":
Expand Down Expand Up @@ -236,7 +270,9 @@ def _create_catalog(
catalog: Catalog,
catalog_storage: str,
*,
connection_name: str | None = None,
properties: dict[str, str] | None,
options: dict[str, str] | None,
) -> Catalog:
logger.info(f"Creating UC catalog: {catalog.name}")
if catalog_storage == "metastore":
Expand All @@ -245,8 +281,10 @@ def _create_catalog(
self._ws.catalogs.create(
catalog.name,
storage_root=catalog_storage,
connection_name=connection_name,
comment="Created by UCX",
properties=properties,
options=options
)
catalog_created = self._get_catalog(catalog)
if catalog_created is None:
Expand Down
39 changes: 22 additions & 17 deletions src/databricks/labs/ucx/hive_metastore/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
import logging

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import Prompts
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import AlreadyExists, NotFound, BadRequest
from databricks.sdk.errors import AlreadyExists, NotFound
from databricks.sdk.service.catalog import (
ConnectionType,
ConnectionInfo,
SecurableType,
Privilege,
PermissionsChange,
CatalogInfo,
)

from databricks.labs.ucx.account.workspaces import WorkspaceInfo
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.hive_metastore import ExternalLocations

from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema

logger = logging.getLogger(__name__)

Expand All @@ -37,32 +37,37 @@ def __init__(
workspace_client: WorkspaceClient,
external_locations: ExternalLocations,
workspace_info: WorkspaceInfo,
catalog_schema: CatalogSchema,
enable_hms_federation: bool = False,
):
self._workspace_client = workspace_client
self._external_locations = external_locations
self._workspace_info = workspace_info
self._enable_hms_federation = enable_hms_federation
self._catalog_schema = catalog_schema

def register_internal_hms_as_federated_catalog(self) -> CatalogInfo:
def register_internal_hms_as_federated_catalog(self, prompts: Prompts) -> None:
if not self._enable_hms_federation:
raise RuntimeWarning('Run `databricks labs ucx enable-hms-federation` to enable HMS Federation')
name = self._workspace_info.current()
connection_info = self._get_or_create_connection(name)
assert connection_info.name is not None
try:
return self._workspace_client.catalogs.create(
name=connection_info.name,
connection_name=connection_info.name,
options={"authorized_paths": self._get_authorized_paths()},
)
except BadRequest as err:
if err.error_code == 'CATALOG_ALREADY_EXISTS':
logger.info(f'Catalog {connection_info.name} already exists')
for catalog_info in self._workspace_client.catalogs.list():
if catalog_info.name == connection_info.name:
return catalog_info
raise err
return self._catalog_schema.create_federated_catalog(
prompts, connection_info.name, connection_info.name, self._get_authorized_paths()
)
# try:
# return self._workspace_client.catalogs.create(
# name=connection_info.name,
# connection_name=connection_info.name,
# options={"authorized_paths": self._get_authorized_paths()},
# )
# except BadRequest as err:
# if err.error_code == 'CATALOG_ALREADY_EXISTS':
# logger.info(f'Catalog {connection_info.name} already exists')
# for catalog_info in self._workspace_client.catalogs.list():
# if catalog_info.name == connection_info.name:
# return catalog_info
# raise err

def _get_or_create_connection(self, name: str) -> ConnectionInfo:
try:
Expand Down
Loading