Saturday, April 29, 2023
HomeProgrammingThe Join API - Constructing a Pipeline to attach, convert and rework

The Join API – Constructing a Pipeline to attach, convert and rework


Kafka Join is a knowledge pipeline that was launched from Kafka 0.9.x to supply a runtime element that can be utilized to attach, convert and carry out transformations of the information earlier than pushing it to a Kafka subject or to an software or datasource. All these parts are pluggable and configurable to supply better of its use-cases. On this chapter, we’re going to have a look into every of those parts and perceive a high-level overview of the way it works and the way we are able to use them collectively to create a helpful information pipeline.

Kafka Join Structure

Kafka Join is a distributed, fault-tolerant and scalable service which is getting used as a knowledge pipeline to reliably stream sequence of knowledge between Kafka and different methods. Kafka Join on its core offers with three main parts:

  • Connector – It majorly carry out three operations. Firstly, it copies information by having the customers outline jobs on the Connector degree which then additional are damaged into smaller duties. Secondly, it offers information parallelism and asks connector to contemplate how the roles could possibly be damaged down into smaller subtasks with a selective granularity on instant receival of knowledge. Lastly, by offering an API to register supply and sink interfaces which makes it fairly simpler to combine number of information streams.
  • Employee – It permits to scale the the applying. It could both run on a single employee standalone course of the place it itself acts as its received coordinator or in a distributed or clustered surroundings the place connectors and duties are dynamically scheduled on staff.
  • Knowledge – Kafka Join primarily give attention to merely copying the information. There are many streaming instruments out there that may be built-in or used as an ETL course of for additional processing. This makes the Kafka Join easy each from the conceptual and implementation perspective.

Knowledge is produced from the supply and consumed to a sink. Therefore, Kafka Join makes use of its personal mechanism completely different from core Kafka to tracks its offset to restart process on the appropriate start line if there’s any form of failure. These offsets are completely different from offsets maintained in Kafka. In standalone mode, the supply offset is tracked in a neighborhood file and in a distributed mode, the supply offset is tracked in a Kafka subject.

Working Kafka Join in Standalone vs Distributed mode

Normally Kafka Join in standalone mode is used to start out it simply largely for improvement or testing. Its fairly straightforward to get began with. In a standalone mode, a single course of runs your connectors and associated duties. The configuration file is being bundled which is the first method of offering configuration for connector as an alternative of API endpoints. Thus it doesn’t offers scalability and isn’t fault-tolerant. Its very onerous to watch as properly.

Kafka Connect Standalone Mode

After we run Kafka Join in distributed mode, then it runs a number of staff which in flip runs a number of connectors and duties. Though it runs on a number of nodes, surprisingly, it doesn’t require any form of orchestration supervisor to handle the coordination mechanics. The entire inside administration of coordination between the Join nodes is completed utilizing the identical idea of Kafka Client Teams performance. Like client teams, Join nodes in distributed mode can evolve or rebalance between themselves by including or eradicating extra nodes. All of the configuration on this mode is submitted utilizing REST API. It additionally leverages Kafka matters with a view to sink between themselves. It create every subject to retailer config, offsets and standing.

Kafka Connect Distributed

A easy docker-compose occasion internet hosting all of the Kafka parts from Confluent could be run utilizing the next configuration:

model: "3.3"
providers:
  zookeeper:
    picture: confluentinc/cp-zookeeper:5.3.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    surroundings:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  dealer:
    picture: confluentinc/cp-enterprise-kafka:5.3.1
    hostname: dealer
    container_name: dealer
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    surroundings:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://dealer:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: dealer:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'nameless'

  schema-registry:
    picture: confluentinc/cp-schema-registry:5.3.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - dealer
    ports:
      - "8081:8081"
    surroundings:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  join:
    picture: cnfldemos/kafka-connect-datagen:0.1.3-5.3.1
    hostname: join
    container_name: join
    depends_on:
      - zookeeper
      - dealer
      - schema-registry
    ports:
      - "8083:8083"
    surroundings:
      CONNECT_BOOTSTRAP_SERVERS: 'dealer:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: join
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.join.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.join.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.join.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.join.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # CLASSPATH required attributable to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.shoppers.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.shoppers.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

Supply and Sink Connectors

Connectors are principally utilized in Kafka Join as a plugin that may function an interface between the exterior methods and Join runtime. Kafka Join exposes API which can be utilized to configure these plugins. There are two kinds of connectors:

  • Supply Connector – That is used to import information from an exterior system to Kafka Subject.
  • Sink Connector – That is used to export information from Kafka to exterior methods.

Kafka Connect Connectors Workflow

The first position of a connector is to create and handle duties. These duties carry out the precise work of exchanging information inside exterior methods. After we begin a connector, it principally computes the variety of duties it must carry out after which for every process, it computes the configuration and assign a portion of the workload.

Normally when the Kafka Join runs in a distributed mode, a number of duties are scheduled to run in parallel and unfold throughout a number of staff. They have a tendency to work precisely like Kafka Client Teams the place the full work is break up throughout the duties and it will get dynamically rebalanced when there’s any change or failure. On the runtime, these connectors can detect if the parts with which they work together had undergone change and set off a reconfiguration. This helps in adjusting the variety of duties to precisely match the present workload.

Let’s attempt to rapidly create a file supply to learn information from recordsdata and import it right into a Kafka subject. So first we’ll create a subject in Kafka.

bin/kafka-topics.sh 
    --create 
    --zookeeper localhost:2181 
    --replication-factor 3 
    --partitions 3 
    --topic file-source 
    --config min.insync.replicas=2

Then we’ll create a JSON file to outline a supply config:

{
  "title": "file-source",
  "config": {
    "connector.class": "org.apache.kafka.join.file.FileStreamSourceConnector",
    "duties.max": "1",
    "subject": "file-source",
    "file": "/tmp/supply.out",
    "key.converter": "org.apache.kafka.join.json.JsonConverter",
    "key.converter.schemas.allow": "true",
    "worth.converter": "org.apache.kafka.join.json.JsonConverter",
    "worth.converter.schemas.allow": "true"
  }
}

Subsequent we make a POST name to register this supply config:

# curl -X POST http://localhost:8083/connectors -H "Content material-Sort: software/json" -d @"file-source.json"

Lastly, it is going to return a response like beneath which confirms that the supply connector is configured:

{
  "title": "file-source",
  "config": {
    "connector.class": "org.apache.kafka.join.file.FileStreamSourceConnector",
    "duties.max": "1",
    "subject": "file-source",
    "file": "/tmp/supply.out",
    "key.converter": "org.apache.kafka.join.json.JsonConverter",
    "key.converter.schemas.allow": "true",
    "worth.converter": "org.apache.kafka.join.json.JsonConverter",
    "worth.converter.schemas.allow": "true",
    "title": "file-source"
  },
  "duties": [],
  "kind": "supply"
}

Now we are able to create a “/tmp/supply.out” file and add few strings and we’ll see instantly it will get processed into the outlined Kafka subject.

Equally we are able to additionally create a FileStreamSinkConnector to sink information from Kafka subject to a given file.

Writing a {custom} Connector

We regularly would come throughout a scenario the place we might search to construct our personal supply or sink connector with a view to course of information with a schema apart from those supported by Kafka Join. The Join API offers interfaces which could be applied to create a {custom} connector or a process. So let’s begin by defining the dependencies of a Maven challenge in pom.xml:

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <model>3.1.0</model>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <model>3.1.0</model>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <model>1.7.36</model>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <model>1.7.36</model>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <model>3.12.0</model>
        </dependency>
    </dependencies>

Principally, we want to stream the information from Kafka subject and course of it as logs. Thus we might attempt to create a LogSinkConnector to parse information and ahead to downstream functions within the type of logs. So, let’s begin by defining a LogSinkConfig:

public class LogSinkConfig extends AbstractConfig {

    public static closing String LOG_LEVEL = "log.degree";
    public static closing String LOG_CONTENT = "log.content material";
    public static closing String LOG_FORMAT = "log.format";
    public static closing String TASK_ID = "process.id";
    public static closing String TASK_MAX = "process.max";

    public static closing String LOG_FORMAT_DEFAULT = "{} {}";
    public static closing String LOG_CONTENT_DEFAULT = "all";
    public static closing String LOG_LEVEL_DEFAULT = "information";

    public static closing ConfigDef CONFIG_DEF =
            new ConfigDef()
                    .outline(LOG_LEVEL,
                            ConfigDef.Sort.STRING,
                            LOG_LEVEL_DEFAULT,
                            ConfigDef.Significance.HIGH,
                            "Log degree.")
                    .outline(LOG_FORMAT,
                            ConfigDef.Sort.STRING,
                            LOG_FORMAT_DEFAULT,
                            ConfigDef.Significance.HIGH,
                            "Log sample format.")
                    .outline(LOG_CONTENT,
                            ConfigDef.Sort.STRING,
                            LOG_CONTENT_DEFAULT,
                            ConfigDef.Significance.HIGH,
                            "Log content material.");


    public LogSinkConfig(closing Map<?, ?> properties) {
        tremendous(CONFIG_DEF, properties);
    }

    public LogLevel getLogLevel() {
        strive {
            return LogLevel.valueOf(getString(LOG_LEVEL).toUpperCase());
        } catch (closing IllegalArgumentException ex) {
            throw new ConfigException("Configuration error.", ex);
        }
    }

    public LogContent getLogContent() {
        strive {
            return LogContent.valueOf(getString(LOG_CONTENT).toUpperCase());
        } catch (closing IllegalArgumentException ex) {
            throw new ConfigException("Configuration error.", ex);
        }
    }

    public String getLogPatternFormat() {
        closing String format = getString(LOG_FORMAT);

        if (StringUtils.isEmpty(format)) {
            return LOG_FORMAT_DEFAULT;
        } else {
            return format;
        }
    }

    public enum LogLevel {
        INFO,
        DEBUG,
        WARN,
        ERROR,
        TRACE
    }

    public enum LogContent {
        ALL,
        KEY,
        VALUE,
        KEY_VALUE
    }
}

Subsequent we have to create a process to serialize the information from Kafka and move it to exterior system. The anticipated sample is that the duty takes duty for dealing with the connecting and primary passing of knowledge to the exterior service, earlier than that any necessities when it comes to filtering and transformation could be utilized as a separate exercise. Right here we’re going to use the SinkTask interface and implement its strategies to create our personal LogSinkTask.

public class LogSinkTask extends SinkTask {

    closing Logger logger = LoggerFactory.getLogger(LogSinkTask.class);
    non-public static closing String VERSION = "1.0";
    non-public LogSinkConfig.LogLevel logLevel;
    non-public LogSinkConfig.LogContent logContent;
    non-public String logPatternFormat;

    @Override
    public String model() {
        return VERSION;
    }

    @Override
    public void begin(closing Map<String, String> properties) {
        closing LogSinkConfig config = new LogSinkConfig(properties);
        logLevel = config.getLogLevel();
        logContent = config.getLogContent();
        logPatternFormat = config.getLogPatternFormat();
        logger.information("Beginning LogSinkTask with properties {}", properties);
    }

    @Override
    public void put(closing Assortment<SinkRecord> information) {
        change (logLevel) {
            case INFO:
                information.forEach(file -> logger.information(logPatternFormat,
                        getLoggingArgs(logContent, file)));
                break;
            case WARN:
                information.forEach(file -> logger.warn(logPatternFormat,
                        getLoggingArgs(logContent, file)));
                break;
            case DEBUG:
                information.forEach(file -> logger.debug(logPatternFormat,
                        getLoggingArgs(logContent, file)));
                break;

            case TRACE:
                information.forEach(file -> logger.hint(logPatternFormat,
                        getLoggingArgs(logContent, file)));
                break;

            case ERROR:
                information.forEach(file -> logger.error(logPatternFormat,
                        getLoggingArgs(logContent, file)));
                break;

        }
    }

    non-public Object[] getLoggingArgs(closing LogSinkConfig.LogContent logContent, closing SinkRecord file) {
        change (logContent) {
            case KEY:
                return new Object[]{file.key(), StringUtils.EMPTY};
            case VALUE:
                return new Object[]{file.worth(), StringUtils.EMPTY};
            case KEY_VALUE:
                return new Object[]{file.key(), file.worth()};
            default:
                // case ALL
                return new Object[]{file, StringUtils.EMPTY};
        }
    }

    @Override
    public void cease() {
        logger.information("Stopping LogSinkTask.");
    }
}

Lastly, we are able to mix the utilization of LogSinkTask and LogSinkConfig to create our LogSinkConnector. We are able to use the SinkConnector interface to implement the connector strategies.

public class LogSinkConnector extends SinkConnector {

    closing Logger logger = LoggerFactory.getLogger(LogSinkConnector.class);
    non-public static closing String VERSION = "1.0";
    non-public Map<String, String> configProps;

    @Override
    public void begin(closing Map<String, String> properties) {
        logger.information("Beginning LogSinkConnector with properties {}", properties);
        configProps = properties;
    }

    @Override
    public Class<? extends Process> taskClass() {
        return LogSinkTask.class;
    }

    @Override
    public Listing<Map<String, String>> taskConfigs(closing int maxTasks) {
        logger.information("Setting process configurations for {} staff.", maxTasks);
        closing Listing<Map<String, String>> configs = new ArrayList<>(maxTasks);
        for (int i = 0; i < maxTasks; ++i) {
            closing Map<String, String> taskConfig = new HashMap<>(configProps);
            // add process particular values
            taskConfig.put(TASK_ID, String.valueOf(i));
            taskConfig.put(TASK_MAX, String.valueOf(maxTasks));
            configs.add(taskConfig);
        }
        return configs;
    }

    @Override
    public void cease() {
        logger.information("Stopping LogSinkConnector.");
    }

    @Override
    public ConfigDef config() {
        return LogSinkConfig.CONFIG_DEF;
    }

    @Override
    public String model() {
        return VERSION;
    }
}

That completes our total implementation. Now we have to bundle this into an Uber jar or fats jar which incorporates all the gathering of jars. We have to embody all of the dependent libraries besides the direct Kafka dependencies. So we are able to outline a plugin in pom.xml with following plugin:

    <construct>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <model>3.8.0</model>
                    <executions>
                        <execution>
                            <section>bundle</section>
                            <targets>
                                <aim>shade</aim>
                            </targets>
                            <configuration>
                                <finalName>kafka-connect-custom-connector-uber</finalName>
                                <artifactSet>
                                    <excludes>
                                        <exclude>org.apache.kafka:kafka_2.13</exclude>
                                        <exclude>org.apache.kafka:connect-api</exclude>
                                    </excludes>
                                </artifactSet>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
    </construct>

Now we are able to execute “mvn bundle”. As soon as the jar is created within the goal listing, we have to place this uber jar in correct location for the confluent join to find the trail.

Kafka Join finds the plugins utilizing a plugin path outlined as a comma-separated listing of listing paths within the plugin.path. The next is an instance plugin.path employee configuration property:

plugin.path=/usr/native/share/kafka/plugins

With the intention to set up the {custom} connector plugin that now we have developed, we have to place the Uber jar in a listing already listed within the plugin path or replace the plugin path by including absolutely the path of the listing containing the plugin. By utilizing the plugin path talked about above, we are able to create a /usr/native/share/kafka/plugins listing on every machine operating Join after which place the plugin directories (or Uber JARs) there.

Now after we begin or restart our Join staff, every employee discovers all connectors, transforms, and converter plugins discovered contained in the directories on the plugin path. After we use a connector, rework, or converter, the Kafka Join employee masses the lessons from the respective plugin first, adopted by the Kafka Join runtime and Java libraries. Kafka Join explicitly avoids all the libraries in different plugins. This prevents conflicts and makes it very straightforward so as to add and use connectors and transforms developed by completely different suppliers. The implementation for this tradition connector could be present in GitHub.

Converters

The general mechanism which is used to translate between the inner format utilized by Kafka Join and the format utilized by Kafka is called Converter. The info when obtained by Kafka Join is serialized and despatched as a stream of bytes to Kafka subject. Equally, when the information is distributed from the Kafka to Kafka Join it’s once more deserailized.

In case of supply connectors, converters are utilized after the connector.

Source Connect Pipeline

Whereas in case of sink connectors, converters are invoked earlier than the connector.

Sink Connect Pipeline

Normally, Kafka information are product of key, worth and header. When the information is produced or consumed from Kafka, then it requires to be serialized into bytes as that’s the acceptable format by Kafka. If Kafka Join is consuming information from Kafka, it must be fairly conscious of the serializers that had been used to provide the information. Equally, if Kafka Join is producing information into Kafka, then it must serialize it to a format that the shoppers will perceive. Usually the format that the Kafka functions anticipate is completely different from the format in your exterior system. That’s the explanation Kafka Join not solely enables you to configure the interpretation between the Kafka Join inside format and the Kafka format, but additionally lets you configure it fully impartial of the connector that you’ve chosen.

We are able to configure the converter within the Properties file or as a config whereas sending the API payload. The fields are referred to as:

  • key.converter
  • worth.converter
  • header.converter

Kafka Join offers some built-in connectors which proves fairly helpful:

  • org.apache.kafka.join.json.JsonConverter
  • org.apache.kafka.join.storage.StringConverter
  • org.apache.kafka.join.converters.ByteArrayConverter
  • org.apache.kafka.join.converters.DoubleConverter
  • org.apache.kafka.join.converters.FloatConverter
  • org.apache.kafka.join.converters.IntegerConverter
  • org.apache.kafka.join.converters.LongConverter
  • org.apache.kafka.join.converters.ShortConverter
  • org.apache.kafka.join.storage.SimpleHeaderConverter

Transformations

Transformations are additionally form of plugins that may be added to a Kafka Join runtime. Its often known as Single Message Transformations(SMT). They principally assist to rework messages or information that flows via Kafka Join. A metamorphosis is a Java class that implements the Transformation interface from the Kafka Join API. Not like connectors and converters, transformations are non-compulsory parts in Kafka Join pipeline.

In case of supply connectors, transformations are invoked after the connector and earlier than the converter.

Source Connect Transform pipeline

In case of sink connectors, transformations are invoked after the converter and earlier than the connector.

Sink Connect Transform pipeline

Transformations normally helps us in attaining following use-cases:

  • Routing – Suppose if somebody want to replace or change the subject title or the partition fields, then this transformation is getting used. This transformation sometimes doesn’t contact the important thing, worth or headers of the file.
  • Sanitizing – This transformation lets you omit or take away the information that isn’t supposed to circulate downstream via Kafka Join.
  • Formatting – Because the title suggests, this transformation is used to replace the schema objects and its information kind.
  • Enhancing – This transformation helps so as to add fields and headers or replace the information in some fields.

A typical transformation is outlined within the following format:

{
  "title": "sample-connector",
  "config": {
    [...]
    "transforms": "addSuffix",
    "transforms.addSuffix.kind": "org.apache.kafka.join.transforms.RegexRouter",
    "transforms.addSuffix.regex": "(.*)",
    "transforms.addSuffix.alternative": "$1-router"
  }
}

Conclusion

On this complete chapter, we took a deep dive into the Kafka Join runtime and its APIs. We took a glance into every of its parts and constructed a {custom} connector as properly. It’s usually beneficial to run the Kafka Join in distributed mode in case of manufacturing environments as its fairly straightforward to run and preserve. We additionally did have a look into the converters and transformations.

Within the upcoming chapters we’ll do hands-on with The Spring Boot integration with Kafka and perceive numerous modes of configurations and improvement.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments