Change Password

Please enter the password.
Please enter the password. Between 8-64 characters. Not identical to your email address. Contain at least 3 of: uppercase, lowercase, numbers, and special characters.
Please enter the password.
Submit

Change Nickname

Current Nickname:
Submit

Apply New License

License Detail

Please complete this required field.

  • Ultipa Graph V4

Standalone

Please complete this required field.

Please complete this required field.

The MAC address of the server you want to deploy.

Please complete this required field.

Please complete this required field.

Cancel
Apply
ID
Product
Status
Cores
Applied Validity Period(days)
Effective Date
Excpired Date
Mac Address
Apply Comment
Review Comment
Close
Profile
  • Full Name:
  • Phone:
  • Company:
  • Company Email:
  • Country:
  • Language:
Change Password
Apply

You have no license application record.

Apply
Certificate Issued at Valid until Serial No. File
Serial No. Valid until File

Not having one? Apply now! >>>

Product Created On ID Amount (USD) Invoice
Product Created On ID Amount (USD) Invoice

No Invoice

v5.0
Search
    English
    v5.0

      Import from Kakfa

      This page demonstrates the process of importing data from Kafka topic(s) into an Ultipa graphset.

      From Kafka to Ultipa

      Generate Configuration File

      Execute the following command in your command line tool and select kafka.

      ./ultipa-importer --sample
      

      The import.sample.kakfka.yml file will be generated in the same directory as ultipa-importer.exe. If a import.sample.kafka.yml file already exists in that directory, it will be overwritten.

      Modify Configuration File

      The configuration file consists of several sections. Modify the configuration file according to your needs.

      # Mode options: csv/json/jsonl/rdf/graphml/bigQuery/sql/kafka/neo4j/salesforce; only one mode can be used
      # SQL supports mysql/postgresSQL/sqlserver/snowflake/oracle
      mode: kafka
      
      # Kafka host configurations
      kafka:
        # Host IP/URI and port
        host: "192.168.1.23:4567"
      
      # Ultipa server configurations
      server:
        # Host IP/URI and port
        # If it is a cluster, separate hosts with commas, i.e., "<ip1>:<port1>,<ip2>:<port2>,<ip3>:<port3>"
        host: "10.11.22.33:1234"
        username: "admin"
        password: "admin12345"
        # The new or existing graphset where data will be imported
        graphset: "trading"
        # If the above graphset is new, specify the shards where it will be stored
        shards: "1,2,3"
        # If the above graphset is new, specify the partition function (Crc32/Crc64WE/Crc64XZ/CityHash64) used for sharding
        partitionBy: "Crc32"
        # Path of the certificate file for TLS (optional)
        crt: ""
      
      # Node configurations
      nodeConfig:
        # Specify the node type (schema) that the imported nodes belong to in Ultipa
        - schema: "customer"
        # Specify the topic from which messages will be consumed
          topic: "customer"  
          # offset: Specify where to start consuming messages in a Kafka topic partition
          # options include:
          ## - newest: Start consuming from the latest available message
          ## - oldest(default): Start consuming from the earliest available message
          ## - index: Start consuming from a specific index/offset, e.g. offset: 5
          ## - time: Start consuming after a certain timestamp. The format is yyyy-mm-dd hh:mm:ss (local time) or yyyy-mm-dd hh:mm:ss -7000 (with time zone offset) 
          # For large kafka topics, it is more efficient to use newest, oldest or a specified index than a timestamp
          offset: oldest  
          # properties: Map kafka messages to Ultipa graph database properties
          # For each property, you can configure the following:
          ## name: The property name to use in Ultipa graph database
          ## new_name: The property name to use in Ultipa graph database; if unset, it defaults to the column name
          ## type: Supported types include _id, _from, _to, _ignore (to skip importing the column), and other Ultipa property types like int64, int32, float, string, etc.
          ## prefix: Add a prefix to the values of the _id, _from, or _to types; it does not apply to other types
          # Columns mapped to system properties such as _id, _from or _to in kafka messages must be explicitly configured
          properties:
            - name: cust_no
              type: _id
            - name: name
              type: string
            - name: level
              type: int32
        - schema: "merchant"
          topic: "merchant"
          offset: oldest
          properties: 
            - name: merch_no
              type: _id
            - name: name
              type: string
            - name: type
              type: string
      
      # Edge configurations
      edgeConfig:
        - schema: "transaction"
          topic: "transaction"
          offset: oldest
          properties:
            - name: trans_no
              type: string
            - name: cust_no
              type: _from
            - name: merch_no
              type: _to
            - name: trans_time
              type: datetime
          
      
      # Global settings        
      settings:
        # Define the path to output the log file
        logPath: "./logs"
        # Number of rows included in each insertion batch
        batchSize: 10000
        # Import mode supports insert/overwrite/upsert
        importMode: insert
        # Automatically create missing end nodes for edges (applicable only when importing edges)
        createNodeIfNotExist: false
        # Stops the importing process when error occurs
        stopWhenError: false
        # Set to true to automatically create missing graphset, schemas and properties
        yes: true
        # The maximum threads
        threads: 32
        # The maximum size (in MB) of each packet
        maxPacketSize: 40
        # Timezone for the timestamp values
        # timeZone: "+0200"
        # Timestamp value unit, support ms/s
        timestampUnit: s h 
      

      Configuration Items

      Kafka host configurations

      Field
      Type
      Description
      host String IP address or URI of the Kafka broker.

      Ultipa server configuration

      Field
      Type
      Description
      host String IP address or URI of the source database.
      username String Database username.
      password String Password of the above user.
      graphset String Name of the target graphset. If the specified graphset does not exist, it will be created automatically.
      shards String Specifies the shards where data will be processed.
      partitionBy String Specifies the patitioning algorithm, including Crc32, Crc64WE, Crc64XZ and CityHash64.
      crt String Path to the certificate (CRT) file used for TLS encryption.

      Node/Edge configurations

      Field
      Type
      Description
      schema String The node type (schema) to which the imported nodes belong.
      topic String The node type (topic) to which the imported messages belong.
      offset String The unique identifier of a message within a Kafka partition. It supports:
      - newest: Start consuming from the latest available message.
      - oldest: Start consuming from the earliest available message. Default value.
      - index: Start consuming from a specific index/offset, e.g. offset: 5.
      - time: Start consuming after a certain timestamp. The format is yyyy-mm-dd hh:mm:ss (local time) or yyyy-mm-dd hh:mm:ss -7000 (with time zone offset).
      name String The property name to use in target graph database.
      type String Specify the data type. Use _ignore to skip importing the column. See supported property value types.
      prefix String Add a prefix to the data. This is only supported for data types including _id, _from, and _to.
      new_name String Modify the property name specified in name to a new value.

      Global settings

      Field
      Type
      Default
      Description
      logPath String "./logs" The path to save the log file.
      batchSize Integer 10000 Number of nodes or edges to insert per batch.
      importMode String upsert Specifies how the data is inserted into the graph, including overwrite, insert and upsert. When updating nodes or edges, use the upsert mode to prevent overwriting existing data.
      createNodeIfNotExist Bool false Whether missing nodes are automatically created when inserting edges:
    • true: The system automatically creates nodes that do not exist.
    • false: The related edges will not be imported.
    • stopWhenError Bool false Whether to stop the import process when an error occurs.
      yes Bool false Whether to automatically create missing graphset, schemas and properties.
      threads Integer 32 The maximum number of threads. 32 is suggested.
      maxPacketSize Integer 40 The maximum size of data packets in MB that can be sent or received.
      timestampUnit String s The unit of measurement for timestamp data. Supported units are ms (milliseconds) and s (seconds).

      Execute Import

      The import process uses the configuration file specified by the -config parameter to import data from the kafka topics into the target server and display it in the Ultipa graph structure.

      ./ultipa-importer --config import.sample.kafka.yml
      
      Please complete the following information to download this book
      *
      公司名称不能为空
      *
      公司邮箱必须填写
      *
      你的名字必须填写
      *
      你的电话必须填写