Wiki

Clone wiki

neo4j-databridge / 4.5 Importing from Kafka

4.5 Importing from Kafka

The Kafka adapter reads from a Kafka topic/partition and continually consumes messages for as long as Kafka is available, or until no more messages have been received after a user-specified timeout period.

Messages from Kafka are expected to be in JSON format. They are then transformed by the adapter into a format which is compatible with the output of the text-based adapters like the CSV adapter. This means that you define your schema mapping files exactly as if the data resource were a CSV file - there is no new syntax to learn for Kafka resources.

When the adapter is restarted it will resume reading messages from the last sync point. You can override this behaviour by specifying the topic offset. This is described in more detail below.


How the Kafka adapter works

The adapter takes a JSON formatted message and presents it to the importer as one or more rows of tabular data, whose column names consist of the union of all the fields in the JSON object.

Simple JSON objects as well as objects containing nested objects (to any depth) are supported. In fact, any JSON object containing more than one nested object will always result in multiple rows of data being returned. This is best illustrated by an example.

Example: A JSON object representing a folder containing an array of documents

{
    "folder": "shared-documents",
    "document": [
        {
           "name": "meetups",
           "editors": [
                {"name": "Mark"},
                {"name": "Jason"}
           ]
         },
         {
            "name": "travel-advice",
            "created": 20170503
         }
    ]
}

In this example, the folder shared-documents contains two documents, meetups and travel-advice but the structure of these two documents is different. The meetups document has a name, and a list of editors, while the travel-advice document has a name and a created date value. This reflects the real-life situation where data is sometimes missing or incomplete. The Kafka adapter resolves these differences by creating a union of all the fields in the message, and transforming the JSON to one or more rows of data, setting any missing fields on each row to null:

folder document.name document.editors.name document.created
"shared-documents" "meetups" "Mark" null
"shared-documents" "meetups" "Jason" null
"shared-documents" "travel-advice" null "20170503"

Note that all non-null values are returned as Strings. The adapter does not attempt to infer the type of any variable. You can use the appropriate converters in the schema mapping to coerce the value to the type you want.


Defining a Kafka resource in the resource descriptor file

A Kafka resource is specified as a URI which provides all the information required to connect to a broker and subscribe to a particular topic and partition.

The general format of a Kafka resource URI is:

kafka://[group-id@]broker/topic[?partition=...]

group-id The group-id part of the URI is optional, and defaults to 'databridge'. The following two URIs are therefore identical:

"resource": "kafka//databridge@localhost:9092/messages"
"resource": "kafka://localhost:9092/messages"

partition The partition part of the URI is optional, and if omitted, defaults to 0. To have the adapter read from a different partition, add a partition selector to the URI as in the example below:

"resource": "kafka://localhost:9092/messages/?partition=6"

Specifying a timeout

The timeout property signals the adapter to stop once no more more messages have been received for the specified timeout period. The period is specified in milliseconds, i.e. a timeout of 60000 would tell the adapter to stop if no messages have been received for a minute.

The timeout property is required when using the non-streaming endpoint, because otherwise the Neo4j store files would never be closed and the graph database would not be usable. However if you are using the streaming endpoint you can elect to keep the adapter running indefinitely by specifying "timeout:none".

Note that if the timeout property is not provided, the adapter will automatically timeout after 5 minutes.


Changing the topic-partition offset

By default, the Kafka adapter will consume messages from a position based on the last committed sync point. If no sync point has yet been committed, the adapter will only consume messages that arrive after it connects to the broker.

You can change this behaviour by setting the topic.offset property in the resource descriptor file. Setting it to 0 will cause the adaper to start reading from the beginning of the topic/partition. Please note that this does not guarantee that messages will be consumed from exactly the position specified. Because of the way the Kafka consumer commits sync points, the adapter may resume from a slightly earlier position. Please see the Kafka documentation for further details.


Example Kafka resource descriptor

{
    "name": "kafka-resource",
    "resource" : "kafka://localhost:9092/sales"
    "timeout" : "none"
    "topic.offset" : 0
}

In this example, the adapter is configured to read from the topic 'sales' on the default partition 0. The timeout period is 'none' (so the streaming endpoint should be used), and the adapter will start consuming messages from the beginning of the topic.

Updated