Kafka Connect Scalyr Sink Connector

The Kafka Connect Scalyr sink connector allows streaming log message from a Kafka topic to Scalyr.

Features

  • Supports Elastic Filebeat log messages with automatic conversion of Filebeat log messages to Scalyr log events.
  • Supports custom application log messages using user defined conversion of message fields to Scalyr log event attributes.
  • Supports Fluentd and Fluent Bit using custom application event mappings.
  • Exactly once delivery using the topic, partition, and offset to uniquely identify events and prevent duplicate delivery.

Installation

Build the Scalyr Sink Connector

1. Clone the Kafka Connect Scalyr repository: git clone https://github.com/scalyr/kafka-connect-scalyr

2. Build the connector: mvn clean package. The built connector is located in target/components/packages/scalyr-kafka-connect-scalyr-sink-0.1.zip.

Distributed Mode Installation

1. Modify the $KAFKA_HOME/config/connect-distributed.properties to set the following:

bootstrap.servers=<BOOTSTRAP_SERVER1,BOOTSTRAP_SERVER2,BOOTSTRAP_SERVER3>
plugin.path=<PLUGIN_PATH>

2. Unzip scalyr-kafka-connect-scalyr-sink-0.1.zip into the plugins directory defined in <PLUGIN_PATH>.

3. Start Kafka Connect in Distributed Mode: $KAFKA_HOME/bin/connect-distributed.sh $KAFKA_HOME/config/connect-distributed.properties

4. Create the connect-scalyr-sink.json Kafka Connect Scalyr Sink JSON configuration file. Sample Scalyr Sink configuration JSON files are located in the etc directory of the unzipped plugin. Substitute the correct values for your environment for <FIELDS> in the following sample configuration JSON:

{
    "name": "scalyr-sink-connector",
    "config": {
        "connector.class": "com.scalyr.integrations.kafka.ScalyrSinkConnector",
        "tasks.max": "3",
        "topics": "<KAFKA_TOPIC>",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable":"false",
        "scalyr_server": "<SCALYR_SERVER>",
        "api_key": "<WRITE_API_KEY>",
        "event_enrichment": "<key1=value1,key2=value2>"
    }
}

5. Run the following command to install the Scalyr Sink Connector: curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d @connect-scalyr-sink.json substituting the Scalyr Sink JSON configuration file name for connect-scalyr-sink.json.

Basic Configuration Properties

Property Description
topics Comma separated list of Kafka topics to ingest from
tasks.max Maximum number of Scalyr Sink Connector tasks to send data to Scalyr. This should not exceed the number of partitions for the topics. See Scalability.
scalyr_server Defaults to app.scalyr.com and can be omited. For EU, this should be set to upload.eu.scalyr.com
api_key Logs Write API key from app.scalyr.com/keys
event_enrichment Optional additional attributes to add the the Scalyr log event specified as a comma separated list of key value pairs. For example, env=prod.
custom_app_event_mapping JSON config describing how to map custom application nested Kafka messages to Scalyr events. See Custom Application Event Mapping.

Advanced Configuration Properties

The default values for these properties should work for most environments and should rarely need to be changed.

Property Default Description
compression_type deflate Compression type to use for sending log events. Valid values are: deflate, none.
compression_level 6 Compression level for the compression_type. Valid values depend on the compression_type. Default will be used if not specified.
add_events_timeout_ms 20000 Timeout in milliseconds for Scalyr add events call.
add_events_retry_delay_ms 500 Delay in milliseconds for initial add events retry. This delay is increased exponentially for each retry.
batch_send_size_bytes 4000000 Batch size that must be reached before events are sent. This is to buffer events into larger batches for increased throughput.
batch_send_wait_ms 5000 Maximum time to wait in millisecs between batch sends. This ensures events are sent to Scalyr in a timely manner on systems under light load where batch_send_size_bytes may not be reached for longer periods of time.

Filebeat Configuration

Data and Schema Configuration

Filebeat message hostname, logfile, parser, timestamp, and message fields are translated to Scalyr event equivalent fields.

Filebeat uses a JSON data format and does not have schemas, so the following value.converter settings should be used in the Scalyr connector configuration for Filebeat:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",

Parser Configuration

The Scalyr parser should be defined as a log message attribute.

To configure Filebeat to send the parser field, modify the Filebeat configuration to add the fields.parser field:

fields:
    parser: <PARSER>

<PARSER> is a parser defined in app.scalyr.com/parsers.

Custom Application Configuration

Custom Application Event Mapping

The Scalyr Kafka Connector supports sending custom application messages to Scalyr. A mapping of application message fields to Scalyr event attributes must be specified in the custom_app_event_mapping Scalyr connector config. An application may have nested fields. Scalyr events support a flat key/value structure. Nested fields are specified in the format: field1.field2.field3, where the nested fields are separated by a delimiter. By default, the delimiter is ..

The custom_app_event_mapping is defined in JSON. Here is a sample mapping:

[{
    "matcher": {
        "attribute": "app.name",
        "value": "myapp"
    },
    "eventMapping": {
         "message": "message",
         "logfile": "log.path",
         "serverHost": "host.hostname",
         "parser": "fields.parser",
         "version": "app.version",
         "appField1", "appField1",
         "appField2", "nested.appField2"
    },
    "delimiter":"\\."
}]

matcher defines an attribute to determine whether the event mapping applies to the message. The event mapping is only applied to messages where the matcher.attribute value matches the matcher.value.

eventMapping defines the message fields that are mapped to Scalyr event attributes. The attribute is the Scalyr event key. The attribute value specifies the delimiter separated nested field names for the attribute value.

The following eventMapping fields have special meaning:

  • message - The log message field. The parser is applied to this field.
  • logfile - The logfile name for the log message.
  • parser - The Scalyr parser to use to parse the message field.
  • serverHost - The hostname generating this log message. Internally, the serverHost field name is translated to source, which is also a reserved field for the hostname.

Any of the special Scalyr fields can be omitted. Additional application specific fields can also be specified in the custom app mapping.

delimiter is an optional field an is only needed if the delimiter is not a period. Multiple application mappings can be specified in the mapping JSON List.

String, boolean, integer, float/double data types are supported for field values. List data type is also supported but is converted to a String.

Data and Schema Configuration

The Scalyr Sink sends data in the Kafka message value. The key is not utilized. The value.converter and value.converter.schemas.enable should be set according to the application topic data format and whether schemas are used.

Parser configuration

There are two ways to define a parser.

1. Enrichment attribute: If all applications use the same parser, the parser can be defined as an enrichment attribute. Inside the Scalyr Connector Sink Configuration, specify event_enrichment": "parser=PARSER".

2. Message attribute: The custom application message should contain a message field that contains the PARSER. The application message parser field name should be specified in the parser field in the custom application connector configuration.

e.g. If the custom application message has the parser field: "scalyrParser" : <PARSER>, the custom application connector configuration should specify: "parser": "scalyrParser".

<PARSER> is a parser defined in app.scalyr.com/parsers.

Fluentd and Fluent Bit Configuration

Fluentd and Fluent Bit record fields may differ significantly based on the input plugins, parsers, and record transformers defined. To support these different record formats, Custom Application Event Mapping is used to map the Fluentd and Fluent Bit record fields to Scalyr event attributes. A custom application event mapping should be created for each Fluentd and Fluent Bit record format to send to Scalyr.

Fluentd and Fluent Bit record transformers should be used to add logfile, parser, and serverHost fields to the records before sending the records to Kafka. Record transformers can be also be used to add a unique field for the event mapping matcher to use.

Example Fluentd Custom Application Event Mapping

Here is an example Fluentd configuration to tail logs in /var/log with a record transformer to add hostname and tag record fields.

fluent.conf

# Tail all log files in /var/log
<source>
    @type tail
    tag var.log
    path /var/log/*.log
    path_key logfile
    pos_file /var/log/varlog.pos
    <parse>
      @type none
    </parse>
</source>

# Add hostname and tag to the record
<filter var.log>
  @type record_transformer
  <record>
    hostname "#{Socket.gethostname}"
    tag "varlog"
  </record>
</filter>

This creates Fluentd records in Kafka with the fields message, logfile, hostname, and tag.

The custom_app_event_mapping for this Fluentd tail input configuration is

[{
    "matcher": {
        "attribute": "tag",
        "value": "varlog"
    },
    "eventMapping": {
         "message": "message",
         "logfile": "logfile",
         "serverHost": "hostname"
    }
}]

This matches messages with the tag value varlog and maps the message, logfile, and hostname record fields to event attributes.

Scalability

Task instances can be scaled linearly to achieve the throughput needed for the log volume. The throughput of each Kafka Connect Scalyr Sink Task depends on the message size.

Message Size MB/sec
64 bytes 3.2
256 bytes 6.8
> 1 KB 8.5

Use the following formula to calculate the number of tasks needed: Number of tasks = Log volume MB/sec ÷ throughput/task.

For example, if the log volume is 12 MB/sec and the message size is 256 bytes, then Number of tasks = 12 MB/sec ÷ 6.8 MB/sec = 2 tasks.

Each task requires 1 vCPU. Additional workers should be created to support the needed number of tasks. The number of tasks should not exceed the number of partitions for the topics.

For message sizes > 1 KB, the batch_send_size_bytes should be increased to 5000000 (5 MB) for increased throughput.