rumble fish logo
Services
Case Studies
Careers
Resources
About us
What do we use Apache Flink/Kinesis at Rumble Fish for?

What do we use Apache Flink/Kinesis at Rumble Fish for?

Mon, Oct 24, 202210 min read

Category: Code-stories

Software developers are often faced with the challenging task of managing large datasets in real-time. Modern apps often require some serious big data processing and many of them (like Spotify or Zalando, for example) use Apache Flink to get the job done. Flink is an excellent choice for any event-driven application, due to its extensive feature set that can support stream and batch processing, event-time processing, or even more sophisticated state management. In today’s article, we’ll explain how we use Apache Flink (mixed with Amazon Kinesis Analytics) at Rumble Fish, and showcase a relevant use case with the code lines so you can start using Flink too. Let’s jump right in!

In short, at Rumble Fish we use Apache Flink to process large amount of real-time data and aggregate it in appropriate ways, depending on the need. One of them is to process chunks of data in a tumbling time window. Next, this data is aggregated by, for example, taking the largest value of a given field from a given piece of data. Thanks to this procedure we reduce a potentially huge stream of data to a stream that has defined throughput and can be processed by slower components (like updating the database/frontend, etc).

Another nice feature unlocked by Flink is that data processing can be stateful, without the need for an external database. Thanks to this, we can easily process information in the context of its previous state. Without this feature, we would have to call an external database for each processed record and this would definitely not be fast enough for the datasets we work with.

Finally, Apache Flink provides developers with a large number of connectors, such as Kafka, FileSystem, Kinesis, etc. At Rumble Fish, we use an AWS service, Kinesis Analytics. The Kinesis connector gives us access to Amazon Kinesis Data Streams which allows us to further simplify our real-time data processing work. It is easy to use, as we can create an Amazon Kinesis Stream in seconds, and the service is highly reliable. Kinesis Analytics offers a standard way of deploying Flink applications at scale. 

What systems is Apache Flink a good choice for?

Handling massive amounts of data made simple

Apache Flink allows us to process large amounts of data while offering high fault tolerance. The secret sauce here is the consistent recording of the application state, unlocked by generating consistent snapshots of a distributed data stream and operator state. In the event of a system failure, Apache Flink automatically recovers data from these snapshots.

Real-time processing unlocked

Flink also supports real-time processing of bounded or unbounded data streams. We can freely aggregate the data, process it, or divide it into appropriate time windows. How does it work in a real-life scenario? Let’s say we need to design a lottery with a set number of prizes, in which the player has 1o seconds to redeem the reward and after that time window, the next prize is shown. Apache Flink is an ideal solution for this use case. All you need to do is create some kind of data stream accepting all the prizes, then create a 10-second tumbling window and draw a random (pseudo-random) coupon associated with a particular user and a particular prize. Done!

Statefulness achieved

Flink also allows us to hold the state without using a database, so we can make decisions in real-time based on historic data. Let's go back to our lottery use case. We want to check whether the user who has placed a lottery ticket for an auction has any coupons at all. Below is an example illustrating state holding for the described situation:

private transient BroadcastState < Integer, UserState > userState;
private transient BroadcastState < Integer, LotteryState > lotteryState;

@Override
public void processElement2(UserEvent userEvent, Context ctx,
    Collector < EventOutput > out) throws Exception {
    log.info("Processing UserEvent is started: " + userEvent);
    log.info("Current UserState: " + getUserState());

    Integer userAvailableCoupon = userState.get(ctx.getCurrentKey()).getAvailableCoupon();
    if (userAvailableCoupon > 0) {
        userState.put(ctx.getCurrentKey(), new UserState(ctx.getCurrentKey(),
            userAvailableCoupon - 1));
        out.collect(new EventOutput(userEvent.getLotteryId(), userEvent.getUserId(),
            userEvent.getTimestamp()));
    }

    log.info("Processing UserEvent is done: " + userEvent);
    log.info("updated UserState: " + getUserState());

In the example above, we’ve checked whether the user who wants to click on a lottery has redeemed coupons. Next, we’ve updated the status by reducing the number of coupons and sent the action for further processing.

Benefits of using AWS Kinesis Analytics

As mentioned earlier Apache Flink has a wide variety of connectors. Today, we focus on the tool provided by Amazon - AWS Kinesis. One drawback of Apache Flink is that it doesn’t have an autoscaling feature. And this is where Kinesis comes in handy. It’s a serverless, fully managed service that allows users to process data continuously in real-time. What’s more, it enables the autoscaling of computing resources and volumes of incoming data to avoid processing delays.

How to setup infrastructure for the Kinesis Analytics application?

Let's now take a moment to analyze Apache Flink's connection to AWS Kinesis from the Cloud Formation level. To start, let's create a file named template.yml where our configuration will be placed and see how our streams are defined.

Outputs:
  <STREAM_NAME>:
    Value: !Ref <STREAM-NAME>

  # other streams...

Resources:
  <STREAM_NAME>:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: !Sub ${NamePrefix}<STREAM-NAME>
      RetentionPeriodHours: !Ref StreamRetentionPeriodHours
      ShardCount: !Ref StreamShardCount

  # other streams…

As we can see the whole procedure is not too complicated and pretty self-explanatory. All we need to do at the beginning is to declare our streams and then define them with the appropriate types and provide basic properties. Next, we define Policy for our streams, we just need to give all of them the same permissions i.e. write and read.

KinensisApplicationServiceRole:
  Type: AWS::IAM::Role
  Properties:
    AssumeRolePolicyDocument:
      Version: "2012-10-17"
      Statement:
        - Effect: "Allow"
          Principal:
            Service:
              - "kinesisanalytics.amazonaws.com"
          Action:
            - "sts:AssumeRole"
    Path: "/"
    Policies:
      - PolicyName: StatesExecutionPolicy
        PolicyDocument:
          Version: "2012-10-17"
          Statement:
            - Sid: S3
              Effect: Allow
              Action:
                - s3:GetObject
                - s3:GetObjectVersion

              Resource:
                - !Sub arn:aws:s3:::${ArtifactsBucketName}
                - !Sub arn:aws:s3:::${ArtifactsBucketName}/*
            - Sid: <SID-NAME>
              Effect: Allow
              Action: kinesis:*
              Resource: !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${<STREAM-NAME>}

  # other streams configuration

Finally, let’s define the Kinesis Analytics Application, which shouldn’t be more complicated than what we already did above.

KinesisAnalyticsApp:
  Type: AWS::KinesisAnalyticsV2::Application
  Properties:
    ApplicationName: !Sub ${NamePrefix}AnalyticsApp
    ApplicationConfiguration:
      ApplicationCodeConfiguration:
        CodeContent:
          S3ContentLocation:
            BucketARN: !Sub arn:aws:s3:::${ArtifactsBucketName}
            FileKey: !Ref KinesisAppBucketKey
        CodeContentType: "ZIPFILE"
      FlinkApplicationConfiguration:
        MonitoringConfiguration:
          ConfigurationType: DEFAULT
      EnvironmentProperties:
        PropertyGroups:
          - PropertyGroupId: ProducerConfigProperties
            PropertyMap:
              flink.stream.initpos: LATEST
              aws.region: !Ref AWS::Region
              AggregationEnabled: false
          - PropertyGroupId: ConsumerConfigProperties
            PropertyMap:
              aws.region: !Ref AWS::Region
          - PropertyGroupId: <STREAM-NAME>
            PropertyMap:
              name: !Ref <STREAM-NAME>
          # other streams properties
    RuntimeEnvironment: FLINK-1_13
    ServiceExecutionRole: !GetAtt KinensisApplicationServiceRole.Arn

As we can see above, the first step was to specify a couple of variables: the type of our application, basic properties such as name, AWS S3 configuration where our Flink application will be stored, and in what format (in our case it was ZIP). At the end, we’ve set the environment variables. And, as the French say: voilà.

That's all it takes to make our application work on AWS.

Our recommendation for the structure of the Flink app

As we already know, Apache Flink offers multiple connectors, but what if we want to have more than one of them in our application? For example, we’d like to implement Kafka for testing, and Kinesis for production. We can easily do that by configuring two connectors and controlling them with, for example, input arguments of our program. It is enough that both connectors implement the same interface. Below is a code snippet that demonstrates this:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env.enableCheckpointing(1000);

    ConnectorConfig connectorConfig = assignConnector(args);

    DataStream < String > lotteryStream = connectorConfig.createSource(env, connectorConfig.getFirstSourceInput());
    DataStream < String > userStream = connectorConfig.createSource(env, connectorConfig.getSecondSourceInput());

    SinkFunction < String > lotteryEventsProducer = connectorConfig.createSink();

    LotteryStreams.prepareLotteryPipeline(lotteryStream, userStream, lotteryEventsProducer);

    env.execute("lottery");
}

private static ConnectorConfig assignConnector(String[] args) {
    String connector = args[0];
    if (connector.equals(ConnectorType.KAFKA_CONNECTOR.getValue()))
        return new KafkaConnectorConfig();
    if (connector.equals(ConnectorType.KINESIS_CONNECTOR.getValue()))
        return new KinesisConnectorConfig();
    else {
        throw new IllegalStateException("Connector does not exists");
    }
}

- starting place of the application

public interface ConnectorConfig {

    SinkFunction < String > createSink() throws IOException;

    DataStream < String > createSource(StreamExecutionEnvironment env, String input) throws IOException;

    String getFirstSourceInput();

    String getSecondSourceInput();
}

- common interface

public class KafkaConnectorConfig implements ConnectorConfig {

    public SinkFunction < String > createSink() {
        return new FlinkKafkaProducer011 < > ("localhost:9092", "lottery-output-topic", new SimpleStringSchema());
    }

    public DataStream < String > createSource(StreamExecutionEnvironment env, String inputTopic) {
        FlinkKafkaConsumer011 < String > kafkaConsumer = createStringConsumerForTopic(inputTopic);
        return env.addSource(kafkaConsumer);
    }

    @Override
    public String getFirstSourceInput() {
        return "lottery-input-topic";
    }

    @Override
    public String getSecondSourceInput() {
        return "user-input-topic";
    }

    private FlinkKafkaConsumer011 < String > createStringConsumerForTopic(String topic) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "bid-club");
        props.setProperty("auto.offset.reset", "latest");
        props.setProperty("enable.auto.commit", "false");

        return new FlinkKafkaConsumer011 < > (topic, new SimpleStringSchema(), props);
    }
}

- Kafka configuration

For the implementation of this interface for AWS Kinesis, the situation looks very similar. Of course, we need to use another connector. In addition, we will need the names of the predefined streams and basic configuration. You can find all of this in the repository that is linked below.

Apache Flink in action: the testing

Now let's move on to testing our application. We will cover both unit tests and integration tests. It is worth mentioning that Flink provides us with very useful tools for this purpose. All we need to do is to pull the appropriate dependencies into the project and we are done.

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
  <version>1.13.2</version>
  <scope>test</scope>
</dependency>

Unit tests

Let’s start with unit tests. In our application, we use the so-called KeyedCoProcessFunction. This means that we can separately process events dropping into our application from Kafka or Kinesis. In our case, these are events related to the user (buying coupons, clicking on a lottery) and events related to the lottery itself (launching a specific lottery). In addition, buying coupons is an event that we need to pass on for further processing because we will be choosing a winner based on these coupons.

Next, we move on to writing the first test, which will throw an exception if the user tries to click on an inactive lottery.

First of all, we initialize our TestHarness, which came to us with a previously pulled dependency.

@Before
public void beforeClass() throws Exception {
    KeyedCoProcessFunction < Integer, LotteryEvent, UserEvent, EventOutput > lotteryProcessFunction = new LotteryEventsCoProcessFunction();

    harness = ProcessFunctionTestHarnesses.forKeyedCoProcessFunction(
        lotteryProcessFunction, LotteryEvent::getLotteryId, UserEvent::getUserId, TypeInformation.of(Integer.class));

    harness.open();
}

It’s a kind of wrapper for our class that we will test. Then, on such a prepared instance, we can execute processing methods.

@Test
public void userCannotClickOnInactiveLottery() {
    //when: the user is trying to click on inactive lottery, an exception should be thrown
    Exception exception = assertThrows(IllegalStateException.class,
        () - > harness.processElement2(new UserClickEvent(1000, 1, 1), DUMMY_TIMESTAMP));

    String expectedMessage = "Cannot click on inactive lottery, lotteryId: 1";
    String actualMessage = exception.getMessage();

    assertEquals(expectedMessage, actualMessage);
}

As we can see, before processing the event of clicking on lotteries there is no activation of lotteries and for this reason, we get an exception. But what if we need to check whether the event of clicking on a particular lottery will actually be processed further? This is also possible thanks to our harness.

@Test
public void userClickEventShouldBeCollected() throws Exception {
    //given: the lottery is activated and the user has an available coupon
    harness.processElement1(new LotteryStartEvent(1), DUMMY_TIMESTAMP);
    harness.processElement2(new UserBuyCouponEvent(1, 100), DUMMY_TIMESTAMP);

    //when: userClickEvent is sent
    harness.processElement2(new UserClickEvent(1000, 1, 1), DUMMY_TIMESTAMP);

    //then: only userClickEvent should be collected
    assertEquals(harness.extractOutputValues(), List.of(new EventOutput(1, 1, 1000)));
}

So first we activated our lottery, then a particular user bought coupons so that he could click on a particular lottery. Finally, the user clicked on the lottery and this event should be further processed. It was enough to execute the extractOutputValues() method and compare its result with the expected one.

Integration tests

Now we swiftly move on to the integration testing. Of course, we can run Kafka as a Docker container and test the whole flow manually, but why should we when Flink provides us with a ready-made mini Cluster on which we can work? Let's assume that we have already prepared the output events, which in our case is about clicking on a particular lottery. And now we need to draw the winner. This is where the previously mentioned time windows will come in handy.

public class LotteryStreams implements Serializable {

    public static void prepareLotteryPipeline(DataStream < String > lotteryStream, DataStream < String > userStream, SinkFunction < String > LotterySink) {
        SingleOutputStreamOperator < EventOutput > processedEvents = prepareConnectedStream(lotteryStream, userStream)
            .process(new LotteryEventsCoProcessFunction());

        processedEvents
            .assignTimestampsAndWatermarks(
                WatermarkStrategy. < EventOutput > forMonotonousTimestamps().withTimestampAssigner((event, timestamp) - > event.getTimestamp()))
            .keyBy(EventOutput::getLotteryId)
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .process(new GetWinnerWindowFunction())
            .map(event - > new Gson().toJson(event))
            .addSink(LotterySink);
    }

    private static ConnectedStreams < LotteryEvent, UserEvent > prepareConnectedStream(DataStream < String > lotteryEvents, DataStream < String > userEvents) {
        return lotteryEvents
            .map(lotteryEventAsString - > (LotteryEvent) createEvent(lotteryEventAsString))
            .keyBy(LotteryEvent::getLotteryId)
            .connect(
                userEvents
                .map(userEventAsString - > (UserEvent) createEvent(userEventAsString))
                .keyBy(UserEvent::getUserId));
    }

}

public class GetWinnerWindowFunction extends ProcessWindowFunction < EventOutput, Integer, Integer, TimeWindow > {

    @Override
    public void process(Integer integer, ProcessWindowFunction < EventOutput, Integer, Integer, TimeWindow > .Context context,
        Iterable < EventOutput > elements, Collector < Integer > collector) {
        List < EventOutput > elementsAsList = new ArrayList < > ();
        elements.iterator().forEachRemaining(elementsAsList::add);

        Random rand = new Random();
        Integer winnerId = elementsAsList.get(rand.nextInt(elementsAsList.size())).getUserId();
        collector.collect(winnerId);
    }
}

The above code is actually the heart of our application. First, we prepared our events for processing, that is, we turned them from Strings into concrete objects and combined them into one ConnectedStream. Then we processed them as described above for unit testing. Next, we created a 10-second time window, and based on the data from this window, we selected the winner using GetWinnerWindowFunction. At the very end, we added our Sink, to which we passed the information about the winner. 

Now let's see what an integration test for this could look like.  First, let's create an instance of our Flink cluster and annotate it with @ClassRule to make it run before testing.

@ClassRule
public static MiniClusterWithClientResource flinkCluster =
    new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
        .setNumberSlotsPerTaskManager(2)
        .setNumberTaskManagers(1)
        .build());

Now we need some test Sink to see who won.

private static class CollectSink implements SinkFunction < String > {

    public static final List < String > values = Collections.synchronizedList(new ArrayList < > ());

    @Override
    public void invoke(String value, Context context) {
        values.add(value);
    }
}

As you can see, you just need to implement the SinkFunction interface and override the invoke method. The principle is simple, the win information is added to the static list using the invoke method. Let's see our test:

@Test
public void testLotteryPipeline() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    Gson mapper = new Gson();

    CollectSink.values.clear();

    DataStreamSource < String > lotteryDataStreamSource = env.fromElements(
        mapper.toJson(new LotteryStartEvent(1))
    );

    DataStreamSource < String > userDataStreamSource = env.fromElements(
        mapper.toJson(new UserBuyCouponEvent(1, 100)),
        mapper.toJson(new UserBuyCouponEvent(2, 100)),
        mapper.toJson(new UserBuyCouponEvent(3, 100)),
        mapper.toJson(new UserClickEvent(1000, 1, 1)),
        mapper.toJson(new UserClickEvent(5000, 2, 1)),
        mapper.toJson(new UserClickEvent(11000, 3, 1))
    );

    LotteryStreams.prepareLotteryPipeline(lotteryDataStreamSource, userDataStreamSource, new CollectSink());

    env.execute();

    assertEquals(CollectSink.values.size(), 2);
    assertTrue(CollectSink.values.contains("3"));
}

As a first step, we created our execution environment. Next, we built our input streams using the intuitive fromElements(...) method. Note that our UserClickEvent has a timestamp, based on which we'll be able to further choose the winner. Then, we executed the static method that created our pipeline and, at the very end, we fired up our environment. After all of those operations, we were able to easily check the data from Sink, pulling it from the static list and comparing it with the expected result.

Flink checkpointing

Let's say that new events flood our application all the time and we have to process them in real-time. What if suddenly for some reason an exception is thrown and our application stops working? What about the events that didn’t have the time to be processed? This is when checkpointing comes to our aid, which provides the application with fault tolerance. 

env.enableCheckpointing(1000);

We’ve activated checkpointing and set the appropriate interval.

Checkpointing is nothing more than saving our events in some specific place. It could be S3 in the case of AWS, or some file on our local computer. Now at the moment of an exception, the app will recreate the data and start from the place when the application was still running.

The conclusion

In this article we’ve shown how to use Apache Flink for real-time data processing, having the lottery application as an example. We’ve also showcased how Flink helps to store state and how to connect and configure various connectors, which definitely makes devs’ work easier. This is of course just a fraction of what Apache Flink is capable of. Those passionate about the subject of Big Data will certainly draw something for themselves from the above text and the case we used as an example. The entire lottery application with tests and infrastructure can be found on Rumble Fish’s Github.

Kamil
Kamil

Software Engineer

Get the latest technology insights on our blog

Recent posts
RUMBLE FISH POLAND SP Z O. O.
Filipa Eisenberga 11/3 31-523 Kraków, Poland PL6772425725
P: +48 737 455 594E: hello@rumblefish.dev
Follow usfacebooklinkedintwitter
Copyright © 2022 Rumblefish