DataHub
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[datahub]'
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
commit_state_interval integer | Number of records to process before committing state Default: 1000 |
commit_with_parse_errors boolean | Whether to update createdon timestamp and kafka offset despite parse errors. Enable if you want to ignore the errors. Default: False |
include_all_versions boolean | If enabled, include all versions of each aspect. Otherwise, only include the latest version of each aspect. Default: False |
kafka_topic_name string | Name of kafka topic containing timeseries MCLs Default: MetadataChangeLog_Timeseries_v1 |
mysql_batch_size integer | Number of records to fetch from MySQL at a time Default: 10000 |
mysql_table_name string | Name of MySQL table containing all versioned aspects Default: metadata_aspect_v2 |
kafka_connection KafkaConsumerConnectionConfig | Kafka connection config Default: {'bootstrap': 'localhost:9092', 'schema_registry_u... |
kafka_connection.bootstrap string | Default: localhost:9092 |
kafka_connection.client_timeout_seconds integer | The request timeout used when interacting with the Kafka APIs. Default: 60 |
kafka_connection.consumer_config object | Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md . |
kafka_connection.schema_registry_config object | Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient |
kafka_connection.schema_registry_url string | Default: http://localhost:8080/schema-registry/api/ |
mysql_connection MySQLConnectionConfig | MySQL connection config Default: {'username': None, 'host_port': 'localhost:3306', ... |
mysql_connection.database string | database (catalog) |
mysql_connection.database_alias string | [Deprecated] Alias to apply to database when ingesting. |
mysql_connection.host_port string | MySQL host URL. Default: localhost:3306 |
mysql_connection.options object | Any options specified here will be passed to SQLAlchemy.create_engine as kwargs. |
mysql_connection.password string(password) | password |
mysql_connection.scheme string | Default: mysql+pymysql |
mysql_connection.sqlalchemy_uri string | URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters. |
mysql_connection.username string | username |
stateful_ingestion StatefulIngestionConfig | Stateful Ingestion Config Default: {'enabled': True, 'max_checkpoint_state_size': 167... |
stateful_ingestion.enabled boolean | The type of the ingestion state provider registered with datahub. Default: False |
The JSONSchema for this configuration is inlined below.
{
"title": "DataHubSourceConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"stateful_ingestion": {
"title": "Stateful Ingestion",
"description": "Stateful Ingestion Config",
"default": {
"enabled": true,
"max_checkpoint_state_size": 16777216,
"state_provider": {
"type": "datahub",
"config": null
},
"ignore_old_state": false,
"ignore_new_state": false
},
"allOf": [
{
"$ref": "#/definitions/StatefulIngestionConfig"
}
]
},
"mysql_connection": {
"title": "Mysql Connection",
"description": "MySQL connection config",
"default": {
"username": null,
"host_port": "localhost:3306",
"database": null,
"database_alias": null,
"scheme": "mysql+pymysql",
"sqlalchemy_uri": null,
"options": {}
},
"allOf": [
{
"$ref": "#/definitions/MySQLConnectionConfig"
}
]
},
"kafka_connection": {
"title": "Kafka Connection",
"description": "Kafka connection config",
"default": {
"bootstrap": "localhost:9092",
"schema_registry_url": "http://localhost:8080/schema-registry/api/",
"schema_registry_config": {},
"client_timeout_seconds": 60,
"consumer_config": {}
},
"allOf": [
{
"$ref": "#/definitions/KafkaConsumerConnectionConfig"
}
]
},
"include_all_versions": {
"title": "Include All Versions",
"description": "If enabled, include all versions of each aspect. Otherwise, only include the latest version of each aspect.",
"default": false,
"type": "boolean"
},
"mysql_batch_size": {
"title": "Mysql Batch Size",
"description": "Number of records to fetch from MySQL at a time",
"default": 10000,
"type": "integer"
},
"mysql_table_name": {
"title": "Mysql Table Name",
"description": "Name of MySQL table containing all versioned aspects",
"default": "metadata_aspect_v2",
"type": "string"
},
"kafka_topic_name": {
"title": "Kafka Topic Name",
"description": "Name of kafka topic containing timeseries MCLs",
"default": "MetadataChangeLog_Timeseries_v1",
"type": "string"
},
"commit_state_interval": {
"title": "Commit State Interval",
"description": "Number of records to process before committing state",
"default": 1000,
"type": "integer"
},
"commit_with_parse_errors": {
"title": "Commit With Parse Errors",
"description": "Whether to update createdon timestamp and kafka offset despite parse errors. Enable if you want to ignore the errors.",
"default": false,
"type": "boolean"
}
},
"definitions": {
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulIngestionConfig": {
"title": "StatefulIngestionConfig",
"description": "Basic Stateful Ingestion Specific Configuration for any source.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "The type of the ingestion state provider registered with datahub.",
"default": false,
"type": "boolean"
}
},
"additionalProperties": false
},
"MySQLConnectionConfig": {
"title": "MySQLConnectionConfig",
"type": "object",
"properties": {
"username": {
"title": "Username",
"description": "username",
"type": "string"
},
"password": {
"title": "Password",
"description": "password",
"type": "string",
"writeOnly": true,
"format": "password"
},
"host_port": {
"title": "Host Port",
"description": "MySQL host URL.",
"default": "localhost:3306",
"type": "string"
},
"database": {
"title": "Database",
"description": "database (catalog)",
"type": "string"
},
"database_alias": {
"title": "Database Alias",
"description": "[Deprecated] Alias to apply to database when ingesting.",
"type": "string"
},
"scheme": {
"title": "Scheme",
"default": "mysql+pymysql",
"type": "string"
},
"sqlalchemy_uri": {
"title": "Sqlalchemy Uri",
"description": "URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters.",
"type": "string"
},
"options": {
"title": "Options",
"description": "Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.",
"type": "object"
}
},
"additionalProperties": false
},
"KafkaConsumerConnectionConfig": {
"title": "KafkaConsumerConnectionConfig",
"description": "Configuration class for holding connectivity information for Kafka consumers",
"type": "object",
"properties": {
"bootstrap": {
"title": "Bootstrap",
"default": "localhost:9092",
"type": "string"
},
"schema_registry_url": {
"title": "Schema Registry Url",
"default": "http://localhost:8080/schema-registry/api/",
"type": "string"
},
"schema_registry_config": {
"title": "Schema Registry Config",
"description": "Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient",
"type": "object"
},
"client_timeout_seconds": {
"title": "Client Timeout Seconds",
"description": "The request timeout used when interacting with the Kafka APIs.",
"default": 60,
"type": "integer"
},
"consumer_config": {
"title": "Consumer Config",
"description": "Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .",
"type": "object"
}
},
"additionalProperties": false
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.datahub.datahub_source.DataHubSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for DataHub, feel free to ping us on our Slack.
Is this page helpful?