This page demonstrates the process of importing data from Kafka topic(s) into a graph in Ultipa.

The following steps are demonstrated using PowerShell (Windows).
Generate the Configuration File
Open the terminal program and navigate to the folder containing ultipa-importer
. Then, run the following command and select kafka
to generate a sample configuration file for a relational database:
./ultipa-importer --sample

A file named import.sample.kafka.yml
will be generated in the same directory as ultipa-importer
. If the file already exists, it will be overwritten.
Modify the Configuration File
Customize the import.sample.kafka.yml
configuration file based on your specific requirements. It includes the following sections:
mode
: Set tokafka
.kafka
: Configure the Kafka host address or URI for connection.server
: Provide your Ultipa server details and specify the target graph (new or existing) for data import.nodeConfig
: Define node schemas, where each schema corresponds to a topic. All columns are mapped to node properties sequentially.edgeConfig
: Define edge schemas, where each schema corresponds to a topic. All columns are mapped to edge properties sequentially.settings
: Set global import preferences and parameters.
# Mode options: csv/json/jsonl/rdf/graphml/bigQuery/sql/kafka/neo4j/salesforce
mode: kafka
# Kafka host configurations
kafka:
# Host IP/URI and port
host: "192.168.1.23:4567"
# Ultipa server configurations
server:
# Host IP/URI and port; if it's a cluster, separate multiple hosts with commas
host: "10.11.22.33:1234"
username: "admin"
password: "admin12345"
# The new or existing graph for data import
graphset: "trading"
# If the above graphset is new, specify the shards where the graph will be stored
shards: "1,2,3"
# If the above graphset is new, specify the partition function (Crc32/Crc64WE/Crc64XZ/CityHash64) used for sharding
partitionBy: "Crc32"
# Path of the certificate file for TLS encryption
crt: ""
# Node configurations
nodeConfig:
# Specify the schema
- schema: "Customer"
# Specify the topic from which messages will be consumed
topic: "customer"
# offset: Specify where to start consuming messages in a Kafka topic partition
# Options include:
## - newest: Start from the latest message
## - oldest(default): Start from the earliest message
## - index: Start from a specific offset
## - time: Start from a specific timestamp. Format: yyyy-mm-dd hh:mm:ss (local time) or yyyy-mm-dd hh:mm:ss -7000 (with time zone offset)
# For large kafka topics, it is more efficient to use newest, oldest or a specified index than a timestamp
offset: oldest
# properties: Map kafka messages to properties; all columns are mapped sequentially
## name: The property name
## new_name: The property name; it defaults to the name above
## type: The property type; you can set to _id, _from, _to, or other Ultipa property value types like int64, int32, float, string, etc.; set to _ignore to skip importing the column
## prefix: Add a prefix to the values of a property; only apply to _id, _from, and _to
properties:
- name: cust_no
type: _id
prefix:
- name: name
type: string
- name: level
type: int32
- schema: "Merchant"
topic: "merchant"
offset: oldest
properties:
- name: merch_no
type: _id
- name: name
type: string
- name: type
type: string
# Edge configurations
edgeConfig:
- schema: "Transfers"
topic: "transaction"
offset: oldest
properties:
- name: trans_no
type: string
- name: cust_no
type: _from
- name: merch_no
type: _to
- name: time
type: datetime
# Global settings
settings:
# Path of the log file
logPath: "./logs"
# Number of rows included in each insertion batch
batchSize: 10000
# Import mode: insert/overwrite/upsert
importMode: insert
# Automatically create missing end nodes for edges
createNodeIfNotExist: false
# Stops the importing process when error occurs
stopWhenError: false
# Set to true to automatically create new graph, schemas and properties
yes: true
# The maximum threads
threads: 32
# The maximum size (in MB) of each packet
maxPacketSize: 40
# Timezone for the timestamp values
# timeZone: "+0200"
# Timestamp value unit, support ms/s
timestampUnit: s
Execute Import
Execute the import by specifying the configuration file using the --config
flag:
./ultipa-importer --config import.sample.kafka.yml
