Skip to main content
Version: Next

Pulsar

Integration Details

The Datahub Pulsar source plugin extracts topic and schema metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the Pulsar admin Rest API interface to interact with the Pulsar instance. The following APIs are used in order to:

The data is extracted on tenant and namespace basis, topics with corresponding schema (if available) are ingested as Dataset into Datahub. Some additional values like schema description, schema_version, schema_type and partitioned are included as DatasetProperties.

Concept Mapping

This ingestion source maps the following Source System Concepts to DataHub Concepts:

Source ConceptDataHub ConceptNotes
pulsarData Platform
Pulsar TopicDatasetsubType: topic
Pulsar SchemaSchemaFieldMaps to the fields defined within the Avro or JSON schema definition.

Metadata Ingestion Quickstart

For context on getting started with ingestion, check out our metadata ingestion guide. Incubating

Important Capabilities

CapabilityStatusNotes
DomainsSupported via the domain config field
Platform InstanceEnabled by default

PulsarSource(config: datahub.ingestion.source_config.pulsar.PulsarSourceConfig, ctx: datahub.ingestion.api.common.PipelineContext)

NOTE: Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}).

Prerequisites

In order to ingest metadata from Apache Pulsar, you will need:

  • Access to a Pulsar Instance, if authentication is enabled a valid access token.
  • Pulsar version >= 2.7.0

NOTE: A superUser role is required for listing all existing tenants within a Pulsar instance.

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[pulsar]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "pulsar"
config:
env: "TEST"
platform_instance: "local"
## Pulsar client connection config ##
web_service_url: "https://localhost:8443"
verify_ssl: "/opt/certs/ca.cert.pem"
# Issuer url for auth document, for example "http://localhost:8083/realms/pulsar"
issuer_url: <issuer_url>
client_id: ${CLIENT_ID}
client_secret: ${CLIENT_SECRET}
# Tenant list to scrape
tenants:
- tenant_1
- tenant_2
# Topic filter pattern
topic_patterns:
allow:
- ".*sales.*"

sink:
# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

FieldDescription
client_id
string
The application's client ID
client_secret
string
The application's client secret
exclude_individual_partitions
boolean
Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets.
Default: True
issuer_url
string
The complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication.
oid_config
object
Placeholder for OpenId discovery document
platform_instance
string
The instance of the platform that all assets produced by this recipe belong to
tenants
array(string)
timeout
integer
Timout setting, how long to wait for the Pulsar rest api to send data before giving up
Default: 5
token
string
The access token for the application. Mandatory for token based authentication.
verify_ssl
One of boolean, string
Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.
Default: True
web_service_url
string
The web URL for the cluster.
Default: http://localhost:8080
env
string
The environment that all assets produced by this connector belong to
Default: PROD
domain
map(str,AllowDenyPattern)
A class to store allow deny regexes
domain.key.allow
array(string)
domain.key.deny
array(string)
domain.key.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
namespace_patterns
AllowDenyPattern
List of regex patterns for namespaces to include/exclude from ingestion. By default the functions namespace is denied.
Default: {'allow': ['.*'], 'deny': ['public/functions'], 'i...
namespace_patterns.allow
array(string)
namespace_patterns.deny
array(string)
namespace_patterns.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
tenant_patterns
AllowDenyPattern
List of regex patterns for tenants to include/exclude from ingestion. By default all tenants are allowed.
Default: {'allow': ['.*'], 'deny': ['pulsar'], 'ignoreCase'...
tenant_patterns.allow
array(string)
tenant_patterns.deny
array(string)
tenant_patterns.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
topic_patterns
AllowDenyPattern
List of regex patterns for topics to include/exclude from ingestion. By default the Pulsar system topics are denied.
Default: {'allow': ['.*'], 'deny': ['/__.*$'], 'ignoreCase'...
topic_patterns.allow
array(string)
topic_patterns.deny
array(string)
topic_patterns.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
stateful_ingestion
StatefulStaleMetadataRemovalConfig
see Stateful Ingestion
stateful_ingestion.enabled
boolean
The type of the ingestion state provider registered with datahub.
Default: False
stateful_ingestion.remove_stale_metadata
boolean
Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.
Default: True

Code Coordinates

  • Class Name: datahub.ingestion.source.pulsar.PulsarSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Pulsar, feel free to ping us on our Slack.