Questions Marked For Review Flashcards

1
Q

In Kafka Streams, what value are internal topics prefixed by?

  • kafka-streams-
  • group.id
  • tasks-
  • application.id
A

Answer: application.id

In Kafka Streams, the application.id is also the underlying group.id for your consumers, and the prefix for all internal topics (repartition and state).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

A consumer starts and has auto.offset.reset=none, and the topic partition currently has data for offsets going from 45 to 2311. The consumer group has committed the offset 10 for the topic before. Where will the consumer read from?

  • offset 45
  • offset 2311
  • offset 10
  • it will crash
A

Answer: it will crash.

Explanation:
auto.offset.reset=none means that the consumer will crash if the offsets that is recovering from have been deleted from Kafka, which is the case here since 10 is less than 45.
This is because the offset 10 has already been committed and the topic partition has data for offsets from 45 to 2311.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

What is returned by a producer.send() call in the Java API?

A

A Future object

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

You are using JDBC source connector to copy data from 3 tables to 3 Kafka topics. There is one connector created with max.tasks equal to 2 deployed on a cluster of 3 workers. How many tasks are launched?

A

Answer: 2

Here, we have 3 tables but max.tasks=2 is set. So, this means that the max number of tasks that will be created is 2.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

In Java, Avro SpecificRecords classes are:

  • Automatically generated from an Avro Schema
  • Automatically generated from an Avro Schema + Maven / Gradle Plugin
  • Written manually by the programmer
A

Answer: Automatically generated from an Avro Schema + Maven/Gradle Plugin.

A SpecificRecord is created from generated record classes.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Consider the following code block:

While (true) {
	ConsumerRecords records = consumer.poll(100);
	try {
		consumer.commitSync();
	} catch (CommitFailedException e) {
		log.error(“commit failed”, e)
	}
for (ConsumerRecord record : records) {
	System.out.printf(“topic = %s, partition = %s, offset = %d, customer = %s”,
									record.topic(), record.partition(), record.offset(), 
									record.key(), record.value()
								);
} } ~~~ ```

What kind of deliver guarantee does this consumer offer?

A

Answer: At-most-once

Here the offset is committed before processing the message. If the consumer crashes before processing the message, the message will be lost when the consumer comes back up.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

What data format is NOT natively available with the Confluent REST Proxy?

  • Avro
  • Binary
  • JSON
  • Protobuf
A

Answer: Protobuf.

Explanation:
Protocol buffers are not a natively supported type for the Confluent REST Proxy, but you may use the binary format instead.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Compaction is enabled for a topic in Kafka by setting log.cleanup.policy=compact. What is true about log compaction?

  • After cleanup, only one message per key is retained with the latest value.
  • Compaction changes the offset of messages.
  • After cleanup, only one message per key is retained with the first value.
  • Each message stored in the topic is compressed.
  • Kafka automatically de-duplicates incoming messages based on key hashes.
A
  • After cleanup, only one message per key is retained with the latest value.

Log compaction retains at least the last known value for each record key for a single topic partition. All compacted log offsets remain valid, even if a record at an offset has been compacted away, as a consumer will get the next highest offset.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

You are sending messages with keys to a topic. To increase throughput, you decide to increase the number of partitions of the topic. Select all that apply.

  • New records may get written to a different partition.
  • New records with the same key will get written to the partition where old records with that key were written.
  • All existing records will get rebalanced among the partitions to balance the load.
  • Old records will stay in their partitions.
A
  • New records may get written to a different partition. And…
  • Old records will stay in their partitions.

Explanation:
Increasing the number of partitions causes new messages to get hashed differently, and breaks the guarantee that the “same key goes to the same partition.”
Kafka logs are immutable and the previous messages are not re-shuffled or reordered.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

You are building a consumer application that processes events from a Kafka topic. What is the most important metric to monitor to ensure real-time processing?

  • MessagesInPerSec
  • UnderReplicatedPartitions
  • records-lag-max
  • BytesInPerSec
A

Answer: records-lag-max

This metric shows the current lag, which is the number of messages the consumer is behind the broker by.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

How can you gracefully make a Kafka consumer immediately stop polling data from Kafka, and gracefully shut down a consumer application?

  • Call consumer.poll() in another thread.
  • Call consumer.wakeUp() and catch a WakeUpException.
  • Kill the consumer thread.
A
  • Call consumer.wakeUp() and catch a WakeUpException.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

A consumer wants to read messages from partitions 0 and 1 of a topic named “topic1”. What will be the result of the following code snippet:

Consumer.subscribe(Arrays.asList(“topic1”));
List pc = new ArrayList<>();
Pc.add(new PartitionTopic(“topic1, 0));
Pc.add(new PartitionTopic(“topic1”, 1));
Consumer.assign(pc);
A

Answer: The code will throw “IllegalStateException”

Explanation:
	The subscribe() and assign() methods cannot be called by the same consumer - subscribe() is used to leverage the consumer group mechanism, while assign() is used to manually control partition assignment and reads assignment.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

A topic has 3 replicas and you set min.insync.replicas=2. If 2 out of 3 replicas are not available, what happens when a consume request is sent to the broker?

  • A new leader for partition will be elected
  • NotEnoughReplicasException will be returned
  • Data will be returned from the remaining in-sync replica
  • An empty message will be returned
A
  • Data will be returned from the remaining in-sync replica.

With this configuration, a single in-sync replica is still readable, but not writable if the producer is using acks=all.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q
StreamsBuilder builder = new StreamsBuilder();
	KStream textLines = builder.stream(“word-count-input”);
	KTable wordCounts = textLines
				.mapValues(textLine -> textLine.toLowerCase())
				.flatMapValues(textLine -> Arrays.asList(textLine.split(“\W+”)))
				.selectKey((key, word) -> word)
				.groupByKey()
				.count(Materialized.as(“Counts”));

	wordCounts.toStream()
			.to(“word-count-output”, Produced.with(Serdes.String(), Serdes.long()));

	builder.build();

What is an adequate topic configuration for the topic word-count-output?

  • cleanup.policy=compact
  • max.messages.bytes=100000000
  • compression.type=lz4
  • cleanup.policy=delete
A
  • cleanup.policy=compact

Result is aggregated into a table with key as the unique word and value equal to its frequency.
We have to enable log compaction for this topic to align the topic’s cleanup policy with KTable semantics.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

What Java library is KSQL based on?

  • Kafka Connect
  • Kafka Streams
  • Schema Registry
  • REST Proxy
A

Answer: Kafka Streams

KSQL is based on Kafka Streams and allows you to express transformations in the SQL language that get automatically converted to a Kafka Streams program in the backend.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

A consumer sends a request to commit offset 2000. There is a temporary communication problem, so the broker never gets the request and therefore never responds. Meanwhile, the consumer processed another batch and successfully committed offset 3000.
What should you do?

  • Use the kafka-consumer-group command to manually commit the offsets 2000 for the consumer group.
  • Nothing.
  • Restart the consumer.
  • Add a new consumer to the group.
A

Answer: Nothing.

In this case, because the offset 3000 has been committed and all the messages between 0 and 3000 have all been processed, it is okay not to have committed offset 2000. Therefore, you don’t have to do anything - this behavior is acceptable.

17
Q

In Avro, removing a field that does not have a default is what kind of schema evolution?

A

Answer: Backward.

Clients with new schema will be able to read records saved with old schema.

18
Q

When auto.create.topics.enable=true in Kafka configuration, what are the circumstances where a Kafka broker automatically creates a topic?

A

A Kafka broker automatically creates a topic under the following circumstances:

- When a producer starts writing messages to the topic.
- When a consumer starts reading messages from the topic.
- When any client requests metadata for the topic.
19
Q

To allow consumers in a group to resume at the previously committed offset, I need to set the proper value for…

  • group.id
  • auto.offset.resets
  • value.deserializer
  • enable.auto.commit
A

Answer: group.id

Setting a group.id that is consistent across restarts will allow your consumers that are part of the same group to resume reading from where offsets were last committed for that group.

20
Q

We have a store selling shoes. What dataset is a great candidate to be modeled as a KTable in Kafka Streams? Chose all that apply.

  • The transaction stream
  • Items returned
  • Money made until now
  • Inventory contents right now
A
  • Money made until now
  • Inventory contents right now

Aggregations of streams are stored in tables, whereas Streams must be modeled as a KStream to avoid data explosion.

21
Q

When using plain JSON data with Connect, you see the following error message: org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields.
How will you fix the error?

  • Set key.converter, value.converter to JsonConverter and the schema registry url
  • Use Single Message Transformation to add schema and payload fields in the message.
  • Set key.converter, value.converter to AvroConverter and the schema registry url
  • Set key.converter.schemas.enable and value.converter.schemas.enable to false
A
  • Set key.converter.schemas.enable and value.converter.schemas.enable to false

You will need to set the schemas.enable parameters for the converter to false for plain text with no schema.

22
Q

When using the Confluent Kafka Distribution, where does the schema registry reside?

  • As an in-memory plugin on your Kafka Brokers
  • As a separate JVM component
  • As an in-memory plugin on your Kafka Connect Workers
  • As an in-memory plugin on your Zookeeper cluster
A
  • As a separate JVM component

Schema registry is a separate application that provides a RESTful interface for storing and retrieving Avro schemas.

23
Q

The kafka-console-consumer CLI, when used with the default options…

  • uses a random group id
  • always uses the same group id
  • does not use a group id
A
  • uses a random group id.

If a group is not specified, the kafka-console-consumer generates a random consumer group.

24
Q

When is the onCompletion() method called in the following code:
~~~
Private class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}

ProducerRecord record = 
									new ProducerRecord<>(“topic1”, “key1”, “value1”);
producer.send(record, new ProducerCallback()); ~~~
A

The callback is invoked when a broker response is received.

25
Q

Which actions will trigger partition rebalance for a consumer group?

A

Rebalance occurs when:

- A new consumer is added or removed.
- A consumer in a consumer group shuts down or dies
- A partition is added to the topic.
26
Q

What is true about partitions? (Select two)

  • You cannot have more partitions than the number of brokers in your cluster.
  • Only out of sync replicas are replicas, the remaining partitions that are in-sync are also leader.
  • A partition has one replica that is a leader, while the other replicas are followers.
  • A broker can have different partition numbers for the same topic on its disk.
  • A broker can have a partition and its replica on its disk.
A

Only one of the replicas is elected as a partition leader, and a broker can hold many partitions from the same topic on its disk.

27
Q

A Kafka topic has a replication factor of 3 and min.insync.replicas setting of 2. How many brokers can go down before a producer with acks=all can NOT producer?

A

Answer: 1

If acks=all and min.insync.replicas=2, this means that we must have at least 2 brokers up for the partition to be available. Since there is a replication factor of 3, we know that there will be 3 replicas, and therefore if 1 goes down we will still have the 2 brokers remaining that are required by min.insync.replicas=2.

28
Q

After copying a producer application into another developer’s machine, the producer is able to connect to Kafka but unable to produce to the same Kafka topic because of an authorization issue. What is likely the issue?

A

The Kafka ACL does not allow another machine IP.

ACLs take “Host” as a parameter, which represents an IP. It can be * (asterisk is a wildcard for all IPs), or a specific IP.
Here, it’s a specific IP, since moving a producer to a different machine breaks the consumer, so the ACL needs to be updated.

29
Q

A producer just sent a message to the leader broker for a topic partition. The producer used “acks=1” and therefore the data has not yet been replicated to followers. Under which conditions will the consumer see the message?

  • Never, the produce request will fail.
  • When the message has been fully replicated to all replicas.
  • Right away.
  • When the high watermark has advanced.
A
  • When the high watermark has advanced.

The high watermark is an advanced Kafka concept, and is advanced (moved further along) once all the ISR replicates the latest offsets. A consumer can only read up to the value of the High Watermark.

The value of the High Watermark can be less than the highest offset, in the case of acks=1.

30
Q

Which of the Kafka Streams joins are always windowed joins?

A

KStream-KStream joins are always windowed.

31
Q

Your streams app is reading form an input topic that has 5 partitions. You run 5 instances of your app, each with num.streams.threads set to 5.
How many stream tasks will be created and how many will be active?

A

25 stream tasks will be created and 5 of them will be active.

On partition is assigned a thread, so only 5 will be active and 25 threads will be created.
The number of tasks is driven by the number of input partitions. For example, if you have a KS app that only subscribes to one topic, and that topic has 6 partitions, your KS app would have 6 tasks. But if you have multiple topics, the app takes the highest partition count among the topics.
Tasks are assigned to StreamThread(s) for execution. The default KS app has one StreamThread. So, if you have 5 tasks and one StreamThread, that StreamThread will work records for each task in turn. However, in Kafka Streams, you can have as many threads as there are tasks. So, for five tasks, you could configure your app to have 5 threads. Each task gets its own thread, and any remaining threads are idle.

With respect to task assignment, app instances are similar to tasks. If you have an input topic with 5 partitions, you could spin up 5 Kafka Streams instances that all have the same application ID, and each app would be assigned and process one task. Spinning up new applications provides the same increase in throughput as increasing threads. Just like with threads, if you spin up more app instances than tasks, the extra instances will be idle, although available for failover. The advantage is that this behavior is dynamic; it doesn’t involve shutting anything down. You can spin up instances on the fly, and you can take down instances.

32
Q

What is the disadvantage of request/response communication?

A

Coupling.

Point-to-point (request/response) style communication will couple the client with the server.

33
Q

What happens if you write the following code in your producer?

producer.send(ProducerRecord).get()
  • Compression will be increased
  • Throughput will be decreased
  • It will force all brokers in Kafka to acknowledge the ProducerRecord
  • Batching will be increased
A
  • Throughput will be decreased.

Using Future.get() to wait for a reply from Kafka will limit throughput.

34
Q

If you want to send binary data through the REST Proxy, it needs to be base64 encoded. Which component needs to encode the binary data into base64?

  • Zookeeper
  • The REST Proxy
  • The Producer
  • The Consumer
  • The Kafka Broker
A

Answer: The Producer.

The REST Proxy requires that the data is already base64 encoded in order to receive that data over the REST interface. Sending data is the job of the producer, not the consumer or broker. And, since Zookeeper doesn’t play a role in this, it is the responsibility of the producer.

35
Q

You can set the retention for a given topic to 1 hour by setting retention.ms=3600000. At what level is retention.ms configured, or what kind of configuration is it?

  • consumer config
  • topic config
  • broker config
  • producer config
A

The setting for retention.ms can be configured at topic level while creating a topic or by altering a topic.
It should NOT be set at the broker level (using log.retention.ms) as this would impact all of the topics in the cluster, and not just the one we are interested in.

36
Q

An e-commerce website sells some custom made goods. What’s the natural way of modeling this data in Kafka Streams?

  • Purchase as stream, Product as table, Customer as table.
  • Purchase as table, Product as table, Customer as table.
  • Purchase as table, Product as table, Customer as stream.
  • Purchase as stream, Product as stream, Customer as stream.
A
  • Purchase as stream, Product as table, Customer as table.

Mostly static data is modeled as a table, whereas business transactions should be modeled as a stream.

37
Q

In Avro, adding an element to an enumeration without a default is what kind of schema evolution?

A

This is a BREAKING schema evolution.

Since default value was added to the complex type “enum”, the schema resolution changed from:
(<1.9.1) if both are en us: **if the writer’s symbol is not present in the reader’s enum, then an error is signaled. **
(>=1.9.1) if both are enums: if the writer’s symbol is not present in the reader’s enum and the reader has a default value, then that value is used - otherwise, an error is signaled.

38
Q

A Zookeeper configuration has tickTime of 2000, initLimit of 20 and syncLimit of 5.
What is the timeout value for followers to connect to Zookeeper?

A

40 seconds.

The tick time is 2000 ms, and initLimit is the config taken into account when establishing a connection to Zookeeper, so the answer is 2000 * 20 = 40000 ms
And 40000 ms = 40 seconds.

39
Q

In Kafka, every broker…. (Select three)

  • contains all topics and all the partitions.
  • knows all the metadata for all topics and partitions.
  • knows the metadata for the topics and partitions it has on its disk.
  • contains only a subset of the topics and the partitions.
  • is a bootstrap broker.
  • is a controller.
A
  • knows all the metadata for all topics and partitions.
  • contains only a subset of the topics and the partitions.
  • is a bootstrap broker.

Kafka topics are divided into partitions and spread across brokers. Each broker knows about all the metadata and each brother is a bootstrap broker/server, but only one of them is elected as the controller.