Purchase Transaction Alerting with Couchbase and Kafka

During my experience with Couchbase, I have found a number of useful integration points to enhance its capabilities. One example is Apache Kafka. Kafka is a global commit log messaging service that has recently gained popularity as a distributed tool to provide a data transport layer to large systems.

With Couchbase, as with any data store, it may eventually become necessary to make your data available to other systems. You may need to move your data to aggregate it with data from other systems, add an additional processing layer, or even create a copy for archival. Kafka has grown as a go-to technology for data movement use cases and thanks to the Couchbase Kafka Connector  using Kafka to move Couchbase data is now simple.

In this post I want to show an example of how to setup a data transport layer using the Couchbase Kafka Connector. We will then consume the data on the other end to power a small alerting application.

You can find support code for this example on our Github: https://github.com/Avalon-Consulting-LLC/couchbase-kafka. Check out the README in the repository for instructions on how to get started.

The scenario for this example is to use Couchbase as a datastore for purchase transactions. I want to be alerted of all transactions that are greater than a certain amount. To do this I will make my purchase transactions in Couchbase available for real-time consumption using Kafka. Lastly, we will  monitor all incoming transactions with a Kafka consumer and alert when we see a large transaction amount.

The Couchbase Kafka Connecter is a small Java application that you can build and deploy. For this example we will stick to a minimal implementation, but with the connector you can actively filter and transform records as you pass them to Kafka.

public class KafkaTransportFilter implements Filter {
    public boolean pass(final DCPEvent dcpEvent) {
        return dcpEvent.message() instanceof MutationMessage;
    }
}

Our first class is KafkaTransportFilter. This class will be used to filter the messages that get sent to Kafka. It consists of a single method ‘pass’ that returns a boolean signifying whether the given record should be passed to Kafka. Currently we are sending all instances of MutationMessage. You can extend this method to filter on more complex criteria like different message types or even data within the message.

public class KafkaTransportEncoder extends AbstractEncoder {
    public KafkaTransportEncoder(final VerifiableProperties properties) {
        super(properties);
    }

    @Override
    public byte[] toBytes(final DCPEvent dcpEvent) {
        MutationMessage message = (MutationMessage) dcpEvent.message();
        return message.content().toString(CharsetUtil.UTF_8).getBytes();
    }
}

The second class is KafkaTransportEncoder, this class will be used to encode our message to Kafka. Currently, this class is simply converting our change message to a UTF-8 encoded byte stream. The ‘toBytes’ method can also be extended to transform our message if needed. You have full access to the message and have to return a new encoded message.

public class KafkaTransport {
    private static final String FILTER_CLASS = "com.avalonconsult.couchbase.kafka.KafkaTransportFilter";
    private static final String ENCODER_CLASS = "com.avalonconsult.couchbase.kafka.KafkaTransportEncoder";
    private static final String COUCHBASE_NODE = "couchbase.vagrant";
    private static final String COUCHBASE_BUCKET = "transactions";
    private static final String COUCHBASE_BUCKET_PASSWORD = "";
    private static final String KAFKA_NODE = "kafka.vagrant";
    private static final String KAFKA_TOPIC = "transactions";

    public static void main(String args[]) {
        DefaultCouchbaseKafkaEnvironment.Builder builder =
            (DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment
                .builder()
                .kafkaFilterClass(FILTER_CLASS)
                .kafkaValueSerializerClass(ENCODER_CLASS)
                .dcpEnabled(true);

      CouchbaseKafkaEnvironment env = builder.build();
      CouchbaseKafkaConnector connector = CouchbaseKafkaConnector.create(
          env, COUCHBASE_NODE, COUCHBASE_BUCKET, COUCHBASE_BUCKET_PASSWORD, KAFKA_NODE, KAFKA_TOPIC);
      connector.run();
    }
}

Finally, we wrap the connector together with our KafkaTransport class. Here we are building an environment object with our filter and encoder classes. With the environment created we create and run the connector.

At this point you should be able to build and launch your transport application and start seeing a stream of Couchbase records in Kafka.

The last part of this example is to add a simple Kafka consumer to show how we can alert on Couchbase records:

public class KafkaAlerter {
    private static final String ZOOKEEPER_ADDRESS = "kafka.vagrant:2181";
    private static final String CONSUMER_GROUP = "alerters";
    private static final String KAFKA_TOPIC = "transactions";
    private static final int NUM_THREADS = 4;

    public static void main(String args[]) {
        Properties props = new Properties();
        props.put("zookeeper.connect", ZOOKEEPER_ADDRESS);
        props.put("group.id", CONSUMER_GROUP);

        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        List<KafkaStream<byte[], byte[]>> consumerStreams = consumer.createMessageStreamsByFilter(new Whitelist("^" + KAFKA_TOPIC + "$"), NUM_THREADS);
        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

        for (int i = 0; i < consumerStreams.size(); i++) {
            executor.submit(new KafkaAlertConsumer(consumerStreams.get(i), i));
        }
    }
}

The KafkaAlerter class holds the main method of our alerting application. It connects to Kafka, initiates a thread pool and launches our consumer.

public class KafkaAlertConsumer implements Runnable{
    private static final Double ALERT_LIMIT = 9000.00;
    private final KafkaStream<byte[], byte[]> stream;
    private final int threadNum;

    public KafkaAlertConsumer(KafkaStream<byte[], byte[]> s, int t) {
        stream = s;
        threadNum = t;
    }

    public void run() {
        for (MessageAndMetadata<byte[], byte[]> message : stream) {
            String content = new String(message.message(), Charset.forName("UTF-8"));
            JSONObject json = new JSONObject(content);
            Double amount = json.getDouble("amount");

            if (amount > ALERT_LIMIT) {
                System.out.println("Thread " + threadNum + " caught amount limit: " + content);
            }
        }
    }
}

The second part of our alert application is the consumer. It reads in records, parses the JSON, and prints back a message when we have satisfied our alerting criteria.

Now we can build and run our KafkaAlerter application. With Couchbase, Kafka, the KafkaTransport and KafkaAlerter applications all in place we can throw new transaction records into Couchbase and see large transaction amounts get printed from our KafkaAlerter.

As a reminder, you can find a full demo environment to test this example in Avalon’s Github. Have questions about Couchbase or Kafka and how they can be integrated into your architecture? Please contact us.

About Andy Kruth

Andy is the Manager of Avalon's Hadoop COE, group focused on providing knowledge and expertise in the world of Big Data including the Hadoop ecosystem and NoSQL technologies.

Leave a Comment

*