Kafka Connect Scalyr Sink Connector

The Kafka Connect Scalyr sink connector allows streaming log message from a Kafka topic to Scalyr.

Features

  • Supports Elastic Filebeat log messages with automatic conversion of Filebeat log messages to Scalyr log events.
  • Supports custom application log messages using user defined conversion of message fields to Scalyr log event attributes.
  • Supports Fluentd and Fluent Bit using custom application event mappings.
  • Exactly once delivery using the topic, partition, and offset to uniquely identify events and prevent duplicate delivery.

Installation

Obtain the scalyr-kafka-connect-scalyr-sink-<version>.zip

Released Version

Released versions of the Kafka Connect Scalyr Connector can be downloaded from github releases.

Development Version

1. Clone the Kafka Connect Scalyr repository: git clone https://github.com/scalyr/kafka-connect-scalyr

2. Build the connector: mvn clean package. The built connector is located in target/components/packages/scalyr-kafka-connect-scalyr-sink-<version>.zip.

Distributed Mode Installation

1. Modify the $KAFKA_HOME/config/connect-distributed.properties to set the following:

bootstrap.servers=<BOOTSTRAP_SERVER1,BOOTSTRAP_SERVER2,BOOTSTRAP_SERVER3>
plugin.path=<PLUGIN_PATH>

2. Unzip scalyr-kafka-connect-scalyr-sink-<version>.zip into the plugins directory defined in <PLUGIN_PATH>.

3. Start Kafka Connect in Distributed Mode: $KAFKA_HOME/bin/connect-distributed.sh $KAFKA_HOME/config/connect-distributed.properties

4. Create the connect-scalyr-sink.json Kafka Connect Scalyr Sink JSON configuration file. Sample Scalyr Sink configuration JSON files are located in the etc directory of the unzipped plugin. Substitute the correct values for your environment for <FIELDS> in the following sample configuration JSON:

{
    "name": "scalyr-sink-connector",
    "config": {
        "connector.class": "com.scalyr.integrations.kafka.ScalyrSinkConnector",
        "tasks.max": "3",
        "topics": "<KAFKA_TOPIC>",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable":"false",
        "scalyr_server": "<SCALYR_SERVER>",
        "api_key": "<WRITE_API_KEY>",
        "event_enrichment": "<key1=value1,key2=value2>"
    }
}

5. Run the following command to install the Scalyr Sink Connector: curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d @connect-scalyr-sink.json substituting the Scalyr Sink JSON configuration file name for connect-scalyr-sink.json.

Basic Configuration Properties

Property Description
topics Comma separated list of Kafka topics to ingest from
tasks.max Maximum number of Scalyr Sink Connector tasks to send data to Scalyr. This should not exceed the number of partitions for the topics. See Scalability.
scalyr_server Defaults to app.scalyr.com and can be omited. For EU, this should be set to upload.eu.scalyr.com
api_key Logs Write API key from app.scalyr.com/keys
event_enrichment Optional additional attributes to add the the Scalyr log event specified as a comma separated list of key value pairs. For example, env=prod.
custom_app_event_mapping JSON config describing how to map custom application nested Kafka messages to Scalyr events. See Custom Application Event Mapping.
send_entire_record When true, the entire Kafka message is sent as JSON in the message field. This is used to send the entire Kafka message to Scalyr without needing to define a Custom Application Event Mapping for each field. Default false.

Advanced Configuration Properties

The default values for these properties should work for most environments and should rarely need to be changed.

Property Default Description
compression_type deflate Compression type to use for sending log events. Valid values are: deflate, none.
compression_level 6 Compression level for the compression_type. Valid values depend on the compression_type. Default will be used if not specified.
add_events_timeout_ms 20000 Timeout in milliseconds for Scalyr add events call.
add_events_retry_delay_ms 500 Delay in milliseconds for initial add events retry. This delay is increased exponentially for each retry.
batch_send_size_bytes 5000000 Batch size that must be reached before events are sent. This is to buffer events into larger batches for increased throughput.
batch_send_wait_ms 5000 Maximum time to wait in millisecs between batch sends. This ensures events are sent to Scalyr in a timely manner on systems under light load where batch_send_size_bytes may not be reached for longer periods of time.

Filebeat Configuration

Data and Schema Configuration

Filebeat message hostname, logfile, parser, timestamp, and message fields are translated to Scalyr event equivalent fields.

Filebeat uses a JSON data format and does not have schemas, so the following value.converter settings should be used in the Scalyr connector configuration for Filebeat:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",

Parser Configuration

The Scalyr parser should be defined as a log message attribute.

To configure Filebeat to send the parser field, modify the Filebeat configuration to add the fields.parser field:

fields:
    parser: <PARSER>

<PARSER> is a parser defined in app.scalyr.com/parsers.

Custom Application Configuration

Custom Application Event Mapping

The Scalyr Kafka Connector supports sending custom application messages to Scalyr. A mapping of application message fields to Scalyr event attributes must be specified in the custom_app_event_mapping Scalyr connector config. An application may have nested fields. Scalyr events support a flat key/value structure. Nested fields are specified in the format: field1.field2.field3, where the nested fields are separated by a delimiter. By default, the delimiter is ..

The custom_app_event_mapping is defined in JSON.

Sample mapping with matcher attribute mapping first and matchAll mapping last:

[{
    "matcher": {
        "attribute": "app.name",
        "value": "myapp.*"
    },
    "eventMapping": {
         "message": "message",
         "logfile": "log.path",
         "serverHost": "host.hostname",
         "parser": "fields.parser",
         "version": "app.version",
         "appField1", "appField1",
         "appField2", "nested.appField2"
    },
    "delimiter":"\\."
}, {
   "matcher": {
         "matchAll": true
   },
   "eventMapping": {
         "message": "message",
         "logfile": "log.path",
         "serverHost": "host.hostname",
         "parser": "fields.parser"
   }
}]

In this example, the attribute mapping applies to all records that match app.name==myapp.*. The matchAll mapping applies to all other records.

matcher defines which records match the custom application event mapping. The matcher can either specify an attribute to match or matchAll. When specifying a matcher attribute, the event mapping applies to messages where the matcher.attribute value matches the matcher.value. The matcher.value supports regex to match the entire field value.

Multiple custom application event mappings can be specified in a JSON list and are evaluated in the order specified in the list. To avoid mappers from being ignored, only one matchAll mapping is allowed and the matchAll mapping must be the last in the list.

eventMapping defines the message fields that are mapped to Scalyr event attributes. The attribute is the Scalyr event key. The attribute value specifies the delimiter separated nested field names for the attribute value.

The following eventMapping fields have special meaning:

  • message - The log message field. The parser is applied to this field.
  • logfile - The logfile name for the log message.
  • parser - The Scalyr parser to use to parse the message field.
  • serverHost - The hostname generating this log message.

Any of the special Scalyr fields can be omitted. Additional application specific fields can also be specified in the custom app mapping.

delimiter is an optional field an is only needed if the delimiter is not a period. String, boolean, integer, float/double data types are supported for field values. List data type is also supported but is converted to a String.

The custom_app_event_mapping JSON must be JSON escaped, since it is specified as an attribute in the connector JSON config.

Data and Schema Configuration

The Scalyr Sink sends data in the Kafka message value. The key is not utilized. The value.converter and value.converter.schemas.enable should be set according to the application topic data format and whether schemas are used.

Parser configuration

There are two ways to define a parser.

1. Enrichment attribute: If all applications use the same parser, the parser can be defined as an enrichment attribute. Inside the Scalyr Connector Sink Configuration, specify event_enrichment": "parser=PARSER".

2. Message attribute: The custom application message should contain a message field that contains the PARSER. The application message parser field name should be specified in the parser field in the custom application connector configuration.

e.g. If the custom application message has the parser field: "scalyrParser" : <PARSER>, the custom application connector configuration should specify: "parser": "scalyrParser".

<PARSER> is a parser defined in app.scalyr.com/parsers.

Fluentd and Fluent Bit Configuration

Fluentd and Fluent Bit record fields may differ significantly based on the input plugins, parsers, and record transformers defined. To support these different record formats, Custom Application Event Mapping is used to map the Fluentd and Fluent Bit record fields to Scalyr event attributes. A custom application event mapping should be created for each Fluentd and Fluent Bit record format to send to Scalyr.

Fluentd and Fluent Bit record transformers should be used to add logfile, parser, and serverHost fields to the records before sending the records to Kafka. Record transformers can be also be used to add a unique field for the event mapping matcher to use.

Example Fluentd Custom Application Event Mapping

Here is an example Fluentd configuration to tail logs in /var/log with a record transformer to add hostname and tag record fields.

fluent.conf

# Tail all log files in /var/log
<source>
    @type tail
    tag var.log
    path /var/log/*.log
    path_key logfile
    pos_file /var/log/varlog.pos
    <parse>
      @type none
    </parse>
</source>

# Add hostname and tag to the record
<filter var.log>
  @type record_transformer
  <record>
    hostname "#{Socket.gethostname}"
    tag "varlog"
  </record>
</filter>

This creates Fluentd records in Kafka with the fields message, logfile, hostname, and tag.

The custom_app_event_mapping for this Fluentd tail input configuration is

[{
    "matcher": {
        "attribute": "tag",
        "value": "varlog"
    },
    "eventMapping": {
         "message": "message",
         "logfile": "logfile",
         "serverHost": "hostname"
    }
}]

This matches messages with the tag value varlog and maps the message, logfile, and hostname record fields to event attributes.

Scalability

Task instances can be scaled linearly to achieve the throughput needed for the log volume. The throughput of each Kafka Connect Scalyr Sink Task depends on the message size.

Message Size MB/sec
64 bytes 3.2
256 bytes 6.8
> 1 KB 8.5

Use the following formula to calculate the number of tasks needed: Number of tasks = Log volume MB/sec ÷ throughput/task.

For example, if the log volume is 12 MB/sec and the message size is 256 bytes, then Number of tasks = 12 MB/sec ÷ 6.8 MB/sec = 2 tasks.

Each task requires 1 vCPU. There are limits to the throughput a single Kafka Connect worker node can support. Additional workers should be created to scale the Kafka Connect cluster horizontally to support the needed number of tasks. The number of tasks should not exceed the number of partitions for the topics.