Skip to content Skip to sidebar Skip to footer

How to Read a Oracle Table and Stream the Data in Kafka

In this post I'm going to show what streaming ETL looks like in practice . We're replacing batch extracts with event streams, and batch transformation with in-flight transformation. But first, a trip back through time…

My first job from academy was edifice a data warehouse for a retailer in the UK. Dorsum and then, it was writing COBOL jobs to load tables in DB2. We waited for all the shops to shut and do their end of twenty-four hours system processing, and ship their data back to the central mainframe. From at that place information technology was checked and loaded, and and so reports generated on it. This was nearly xx years ago equally my greying beard volition attest—and non a lot has inverse in the large bulk of reporting and analytics systems since so. COBOL is maybe less common, just what has remained abiding is the batch-driven nature of processing. Sometimes batches are run more than frequently, and become given fancy names like intra-twenty-four hour period ETL or even micro-batching. But batch processing information technology is, and as such latency is built into our reporting by design. When nosotros opt for batch processing we voluntarily inject delays into the availability of data to our end users, and to applications in our concern that could be driven past this data in real time.

Dorsum in 2022 Neha Narkhede wrote that ETL Is Expressionless, Long Live Streams, and since then nosotros've seen more and more companies moving to adopt Apache Kafka as the backbone of their architectures. With Kafka's Connect and Streams APIs, as well as KSQL, we have the tools available to make Streaming ETL a reality.

Streaming ETL with Confluent Platform

By streaming events from the source system as they are created, using Kafka's Connect API, data is available for driving applications throughout the business in real time. Independently and in parallel, that aforementioned data can be transformed and processed and be made available to end users as soon every bit they desire information technology. The key is that we are no longer batch-driven; we are event-driven.

Data enriched through the transform process is streamed dorsum into Kafka. From here it can also exist used by other applications. So we can refine raw entering data, and utilise the resulting enriched and cleansed data for multiple purposes. Call up of all the information cleansing and concern logic that gets washed as office of ETL…isn't the resulting data useful in more places than just a static data store?

As a information engineer for an online shop, you're tasked with providing a real-time view for your sales operations team on current website activeness. Which of import customers are using the site? What's the rolling value of orders being placed? But every bit well as an analytical 'cockpit' view, we can utilise the same enriched data to feed an upshot-driven microservice responsible for notifying the within sales squad when specially important customers log on to the site. We tin utilize the aforementioned logic and definitions once, for driving both the analytics and the microservice.

And then in this post I'grand going to show an instance of what streaming ETL looks like in practice. I'k replacing batch extracts with upshot streams, and batch transformation with in-flying transformation of these consequence streams. We'll take a stream of data from a transactional system built on Oracle, transform information technology, and stream it into Elasticsearch to land the results to, just your pick of datastore is upwardly to yous—with Kafka's Connect API yous tin stream the data to almost anywhere! Using KSQL we'll run into how to filter streams of events in real-time from a database, how to bring together betwixt events from two database tables, and how to create rolling aggregates on this information.

Let's Become Started!

My source system is Oracle 12c, with the Order Entry schema and a transactional workload driven past a tool called Swingbench. I'grand not using Swingbench here in its common capacity of load/stress-testing, just instead only to generate a stream of transactions without needing to have access to a real information feed. To stream the data from Oracle, I'm using Oracle GoldenGate for Big Data. This is ane of several Modify-Data-Capture (CDC) tools bachelor (others include DBVisit's Replicate) which all work on the principal of taking the transaction log and streaming events from information technology to Kafka. There are enough of other CDC tools around for other databases, including the popular Debezium projection which is open up-source and currently supports both MySQL and Postgres.

You tin can encounter details on the components I've used, and how to exactly reproduce them for your own experiments hither.

The starting point for this is an inbound stream of events in Kafka from our source organization (Oracle, via CDC). This is the "Extract" of our ETL, and is running in real time, event-by-outcome.

We're going to apply some transformations to these events, and practice so in real fourth dimension—not batch! Nosotros'll have a small set of the source tables containing:

  • Orders
  • Logon events
  • Customer details

From these nosotros will use KSQL to provide a existent time stream of:

  • Customers logging onto the 
application, with a second version of the stream filtered just to prove customers of highest value who are logging in
  • Aggregated club counts and values

Nosotros'll also come across in a subsequent mail how we'll also use this enriched data that'due south being written back to Kafka to drive a microservice. This microservice will send an alert to the inside sales team whenever a long-continuing business customer logs on to the site.

Join and Filter Streams of Data from Oracle in KSQL

To join the customers to the logon upshot data, we will create a Table in KSQL on the Customers topic. We're making a table because we only desire to look at the current state of each customer; if nosotros wanted to see a history of how a customer had changed over time, and so nosotros'd want a stream. The Logon information is a sequence of events, and so we only create a Stream on information technology. Having created both, we and so bring together the 2.

Firing up KSQL, get-go we define the customer table source topic:

ksql> CREATE STREAM CUST_SRC WITH (KAFKA_TOPIC='ora-ogg-SOE-CUSTOMERS-avro',  VALUE_FORMAT='AVRO');  Bulletin   ----------------   Stream created   ----------------        

Notation that we've not had to specify the schema of the data, because it's in Avro format and KSQL pulls the schema automagically from Confluent Schema Registry. To learn more about the schema registry and its importance in building applications meet this bully presentation from Gwen Shapira here.To specify the Schema Registry location you need to either pass --schema-registry-url to ksql-cli in local mode, or specify ksql.schema.registry.url in a properties file that you laissez passer as an argument when starting up a standalone KSQL server.

ksql> Depict CUST_SRC;   Field             | Type -----------------------------------------------  ROWTIME           | BIGINT           (system)  ROWKEY            | VARCHAR(Cord)  (system)  OP_TYPE           | VARCHAR(Cord)  OP_TS             | VARCHAR(Cord)  CURRENT_TS        | VARCHAR(STRING)  POS               | VARCHAR(String)  CUSTOMER_ID       | BIGINT  CUST_FIRST_NAME   | VARCHAR(Cord)  CUST_LAST_NAME    | VARCHAR(String)  NLS_LANGUAGE      | VARCHAR(STRING)  NLS_TERRITORY     | VARCHAR(STRING)  CREDIT_LIMIT      | DOUBLE  CUST_EMAIL        | VARCHAR(STRING)  ACCOUNT_MGR_ID    | BIGINT  CUSTOMER_SINCE    | VARCHAR(Cord)  CUSTOMER_CLASS    | VARCHAR(STRING)  SUGGESTIONS       | VARCHAR(STRING)  DOB               | VARCHAR(STRING)  MAILSHOT          | VARCHAR(STRING)  PARTNER_MAILSHOT  | VARCHAR(Cord)  PREFERRED_ADDRESS | BIGINT  PREFERRED_CARD    | BIGINT -----------------------------------------------        

We'll fix the topic offset to earliest so that whatever queries and derived streams that nosotros create contain all of the information to date:

ksql> SET 'auto.showtime.reset' = 'primeval';        

Let's have a quick peek at the information:

ksql> SELECT OP_TYPE,OP_TS,CUSTOMER_ID, CUST_FIRST_NAME, CUST_LAST_NAME FROM CUST_SRC LIMIT 1; I | 2017-09-xiii 14:l:51.000000 | 74999 | lee | murray LIMIT reached for the partition. Query terminated ksql>        

Since nosotros're going to exist joining on the customer ID, we need to rekey the table. KSQL's Depict EXTENDED control can be used to inspect details about an object including fundamental:

ksql> Describe EXTENDED CUST_SRC;  Type                 : STREAM Primal field            : Timestamp field      : Not ready - using <ROWTIME> Primal format           : Cord Value format         : AVRO [...]        

We can use KSQL to easily rekey a topic, using the PARTITION BY clause:

ksql> CREATE STREAM CUST_REKEYED AS SELECT * FROM CUST_SRC Partition BY CUSTOMER_ID;        

Check out the key for the new STREAM:

ksql> DESCRIBE EXTENDED CUST_REKEYED;  Type                 : STREAM Central field            : CUSTOMER_ID Timestamp field      : Not set - using <ROWTIME> Key format           : Cord Value format         : AVRO Kafka output topic   : CUST_REKEYED (partitions: 4, replication: 1) [...]        

Two things of interest here – the cardinal cavalcade is now CUSTOMER_ID, just we tin can also see that there is a Kafka output topicCUST_REKEYED. Only for completeness, permit's check the cardinal on the Kafka topics, using the crawly kafkacat:

Source topic:

Robin@asgard02 > kafkacat -C -c2 -M: -b localhost:9092 -o beginning -f 'Key:    %1000\n' -t ora-ogg-SOE-CUSTOMERS-avro   Key:    74999_lee_murray_RC_New Mexico_6000.00_lee.murray@ntlworld.com_561_2009-06-05 00:00:00_Business_Electronics_1974-02-27 00:00:00_Y_N_49851_49851   Key:    75000_larry_perez_VX_Lithuania_6000.00_larry.perez@googlemail.com_514_2011-04-01 00:00:00_Occasional_Health_1960-eleven-30 00:00:00_Y_Y_114470_114470        

Re-keyed topic

Robin@asgard02 > kafkacat -C -c2 -K: -b localhost:9092 -o beginning -f 'Primal:    %grand\n' -t CUST_REKEYED   Key:    75000   Key:    75004        

And so we've got our rekeyed topic. At present let's build a table on top of it:

ksql> CREATE Table CUSTOMERS WITH  (KAFKA_TOPIC='CUST_REKEYED', VALUE_FORMAT='AVRO', Primal='CUSTOMER_ID');  Message   ---------------   Tabular array created   ---------------        

And finally, query the table that we've built:

ksql> SELECT ROWKEY, CUSTOMER_ID, CUST_FIRST_NAME, CUST_LAST_NAME, CUSTOMER_CLASS  FROM CUSTOMERS LIMIT 5;   75000 | 75000 | larry | perez | Occasional   74999 | 74999 | lee | murray | Concern   75004 | 75004 | derrick | fernandez | Prime   75007 | 75007 | tony | simmons | Occasional   75014 | 75014 | roy | reed | Business organisation   LIMIT reached for the segmentation.   Query terminated   ksql>        

So – that'south our Customers reference table built and available for querying. Now to bring in the Logon events stream:

ksql> CREATE STREAM LOGON WITH  (KAFKA_TOPIC='ora-ogg-SOE-LOGON-avro', VALUE_FORMAT='AVRO');  Message   ----------------   Stream created   ----------------        

That was piece of cake! Let's check we're getting data from it:

ksql> SELECT LOGON_ID,CUSTOMER_ID,LOGON_DATE FROM LOGON LIMIT 5;   178724 | 31809 | 2000-xi-08 23:08:51   178725 | 91808 | 2009-06-29 02:38:11   178726 | 78742 | 2007-11-06 15:29:38   178727 | 4565 | 2010-03-25 09:31:44   178728 | 20575 | 2000-05-31 00:22:00   LIMIT reached for the partition.   Query terminated        

Now for the magic bit…joining the table and stream!

ksql> SELECT L.LOGON_ID, C.CUSTOMER_ID, C.CUST_FIRST_NAME, C.CUST_LAST_NAME,  C.CUSTOMER_SINCE, C.CUSTOMER_CLASS  FROM LOGON Fifty LEFT OUTER JOIN CUSTOMERS C ON L.CUSTOMER_ID = C.CUSTOMER_ID;        

Y'all may see some nulls in the results, which is equally a result of not all logon events having a corresponding customer entry. You tin filter these out using:

ksql> SELECT 50.LOGON_ID, C.CUSTOMER_ID, C.CUST_FIRST_NAME, C.CUST_LAST_NAME,  C.CUSTOMER_SINCE, C.CUSTOMER_CLASS  FROM LOGON L LEFT OUTER Join CUSTOMERS C ON L.CUSTOMER_ID = C.CUSTOMER_ID  WHERE C.CUSTOMER_ID IS NOT NULL LIMIT v;   178771 | 75515 | earl | butler | 2002-07-19 00:00:00 | Occasional   178819 | 76851 | cesar | mckinney | 2000-10-07 00:00:00 | Regular   178832 | 77941 | randall | tucker | 2010-04-23 00:00:00 | Prime   178841 | 80769 | ramon | hart | 2011-01-24 00:00:00 | Occasional   178870 | 77064 | willard | curtis | 2009-05-26 00:00:00 | Occasional   LIMIT reached for the sectionalisation.   Query terminated        

Picket out for this current effect if you're joining on non-identical datatypes.

Having tested the simple bring together, we can start to build on it, calculation in cavalcade concatenation (first + last proper name)

CONCAT(C.CUST_FIRST_NAME ,CONCAT(' ',C.CUST_LAST_NAME)) AS CUST_FULL_NAME        

as well as calculations, here taking engagement on which the account was opened and using information technology to make up one's mind to the nearest year how long the person has been a customer. The functions used here are

  • STRINGTOTIMESTAMP which converts the cord timestamp into an epoch
  • Bandage(…Every bit DOUBLE) so that the BIGINT values tin be accurately used in calculations

The completed argument, wrapped as a CREATE STREAM Every bit SELECT (CSAS) and then that it can be used as the basis of subsequent queries, too as instantiated equally an underlying Kafka topic that tin can be used outside of KSQL, is equally follows:

ksql> CREATE STREAM LOGON_ENRICHED AS  SELECT L.LOGON_ID, Fifty.LOGON_DATE, C.CUSTOMER_ID,  CONCAT(C.CUST_FIRST_NAME ,CONCAT(' ',C.CUST_LAST_NAME)) As CUST_FULL_NAME,  C.CUST_FIRST_NAME, C.CUST_LAST_NAME, C.CUSTOMER_SINCE,  C.CUSTOMER_CLASS, C.CUST_EMAIL,  (Cast(C.ROWTIME Equally DOUBLE)-Bandage(STRINGTOTIMESTAMP(CUSTOMER_SINCE,'yyyy-MM-dd HH:mm:ss')  AS DOUBLE))/ 60 / 60 / 24 / 1000/365 AS CUSTOMER_SINCE_YRS  FROM LOGON L  LEFT OUTER Bring together CUSTOMERS C  ON L.CUSTOMER_ID = C.CUSTOMER_ID ;        

From the derived stream, we can then commencement querying both the original and derived columns, with a nice articulate and understandable query:

ksql> SELECT LOGON_ID, LOGON_DATE, CUST_FULL_NAME, CUSTOMER_CLASS, CUSTOMER_SINCE_YRS FROM LOGON_ENRICHED;   178726 | 2007-eleven-06 xv:29:38 | lloyd blackness | Occasional | x.771086248255962   178732 | 2009-05-21 06:34:42 | donald hernandez | Occasional | 17.77108626258879   178742 | 2002-11-26 12:48:03 | kyle owens | Occasional | xv.776565717751144   178738 | 2004-09-27 05:36:23 | allen griffin | Business organization | 16.773825992548197   [...]        

We can also get-go to apply filters to this, either ad-hoc:

ksql> SELECT LOGON_ID, LOGON_DATE, CUST_FULL_NAME, CUSTOMER_CLASS, CUSTOMER_SINCE_YRS  FROM LOGON_ENRICHED WHERE CUSTOMER_CLASS = 'Prime' LIMIT 5;   181362 | 2011-02-xvi 13:01:16 | isaac wong | Prime | ten.771086241850583   181551 | 2007-01-xv 11:21:19 | ryan turner | Prime | 6.762867074898529   181576 | 2009-07-04 02:19:35 | peter campbell | Prime | 14.779305415810505   181597 | 2006-07-12 04:54:40 | andres fletcher | Prime number | 13.782045160768645   181631 | 2002-09-08 03:06:xvi | john johnson | Prime number | 6.762867062690258   LIMIT reached for the sectionalisation.   Query terminated        

or creating a further derived stream:

ksql> CREATE STREAM IMPORTANT_CUSTOMER_LOGONS AS SELECT LOGON_ID, LOGON_DATE, CUST_FULL_NAME,  CUSTOMER_CLASS, CUSTOMER_SINCE_YRS  FROM LOGON_ENRICHED  WHERE CUSTOMER_CLASS = 'Business concern' AND CUSTOMER_SINCE_YRS > 10;  Message   ----------------------------   Stream created and running  ksql> SELECT * FROM IMPORTANT_CUSTOMER_LOGONS LIMIT 5;   1507286630384 | 83960 | 178738 | 2004-09-27 05:36:23 | allen griffin | Business concern | 16.773825992548197   1507286630386 | 92074 | 178773 | 2010-02-21 20:04:52 | gabriel garza | Business | 14.779305462899543   1507286630477 | 76111 | 181737 | 2007-05-17 23:59:36 | ray alvarez | Business organisation | 12.765606788305432   1507286630401 | 87118 | 178936 | 2006-02-07 22:34:47 | kelly oliver | Business organisation | 17.771086274733637        

An important point here is that these derived streams are executing in real time, on events as they go far, and populating Kafka topics with their results. So whilst the LOGON_ENRICHED stream might be for streaming into a general analytics platform, the IMPORTANT_CUSTOMER_LOGONS stream maybe straight drives a customer operations dashboard or application.

Building Streaming Aggregates in KSQL

As well as denormalizing data in order to make analysis easier by making relevant data available in one place, we can use KSQL to aggregate data. By aggregating inbound streams of events nosotros tin brand available to other applications a existent time stream of summary metrics about the events being candy. Aggregations are too a common pattern used in information warehousing to better the operation of accessing information. Instead of storing data at its base of operations granularity, it is "rolled up" to a college grain at which it is unremarkably queried. For example, orders are placed as stream of events, simply ordinarily a business operations analyst volition desire to know the value of orders placed per hour. Here's a simple example of calculating merely that.

Outset we define our inbound event stream, which is coming from the ORDERS table on Oracle, streamed into the Kafka topic through the CDC process.

ksql> CREATE STREAM ORDERS_SRC WITH (KAFKA_TOPIC='ora-ogg-SOE-ORDERS-avro', VALUE_FORMAT='AVRO');        

Since we're going to be doing some time-based processing, we need to make sure that KSQL is using the advisable timestamp value. Past default it will employ the timestamp of the Kafka message itself, which is the time at which the record was streamed into Kafka from the CDC source. You lot can see which column is being used with Draw EXTENDED:

ksql> DESCRIBE EXTENDED ORDERS_SRC;  Blazon                 : STREAM Fundamental field            : Timestamp field      : Not fix - using <ROWTIME> Key format           : STRING Value format         : AVRO [...]        

You can see the actual timestamp likewise using the ROWTIME implicit cavalcade in any KSQL stream object:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ORDER_ID, ORDER_DATE  FROM ORDERS_SRC LIMIT 5; 2017-x-25 10:36:12 | 71490 | 2009-01-05 23:00:00.000000000 2017-x-25 10:36:12 | 71491 | 2011-07-26 01:00:00.000000000 2017-10-25 10:36:12 | 71492 | 2008-04-23 15:00:00.000000000 2017-10-25 x:36:12 | 71493 | 2009-04-03 09:00:00.000000000 2017-ten-25 10:36:12 | 71494 | 2009-06-22 23:00:00.000000000 LIMIT reached for the sectionalization. Query terminated ksql>        

In the higher up output we can see that the ROWTIME (first column) is completely different from ORDER_DATE (third column). The quondam is the process time and the latter is the result time. For the purposes of our aggregations, since we are reporting on a business event (and not the concrete processing item) nosotros want to make sure KSQL uses the result time (ORDER_DATE). Let's first confirm what format the ORDER_DATE is in:

ksql> SELECT ORDER_DATE FROM ORDERS_SRC LIMIT v; 2009-01-05 23:00:00.000000000 2011-07-26 01:00:00.000000000 2008-04-23 15:00:00.000000000 2009-04-03 09:00:00.000000000 2009-06-22 23:00:00.000000000 LIMIT reached for the partition. Query terminated        

With this cognition, nosotros can cast the string column to a timestamp, using STRINGTOTIMESTAMP and the Java time format:

ksql> SELECT ORDER_DATE, STRINGTOTIMESTAMP(ORDER_DATE,'yyyy-MM-dd HH:mm:ss.SSSSSSSSS')  FROM ORDERS_SRC LIMIT 5; 2009-01-05 23:00:00.000000000 | 1231196400000 2011-07-26 01:00:00.000000000 | 1311638400000 2008-04-23 fifteen:00:00.000000000 | 1208959200000 2009-04-03 09:00:00.000000000 | 1238745600000 2009-06-22 23:00:00.000000000 | 1245708000000 LIMIT reached for the partition. Query terminated ksql>        

From experience, I tin advise it's always good to validate that you've got the appointment format strings correct, by checking the epoch value independently, using an online service or elementary fustigate (removing the milliseconds first):

Robin@asgard02 > appointment -r 1231196400 Mon  v Jan 2009 23:00:00 GMT        

If the epoch doesn't match the cord input, check against the Java time format reference, and pay attending to the example particularly. DD means twenty-four hours of the twelvemonth whilst dd is day of the calendar month, and MM is the calendar month of the twelvemonth whilst mm is minutes of the hour. What could perchance go wrong…

Now we have the issue time in epoch format, nosotros can use this as the footing for defining an intemediary derived stream from this source i. Nosotros're also going to capture the original ROWTIME since this is useful to know every bit well (what time the bulletin striking Kafka from the CDC source):

ksql> CREATE STREAM ORDERS_INT_01 As SELECT ROWTIME Equally EXTRACT_TS, ORDER_DATE,  STRINGTOTIMESTAMP(ORDER_DATE,'yyyy-MM-dd HH:mm:ss.SSSSSSSSS') AS ORDER_DATE_EPOCH,  ORDER_ID, ORDER_STATUS, ORDER_TOTAL FROM ORDERS_SRC;   Bulletin ----------------------------  Stream created and running  ksql> SELECT ORDER_ID, ORDER_TOTAL, TIMESTAMPTOSTRING(EXTRACT_TS,'yyyy-MM-dd HH:mm:ss'), TIMESTAMPTOSTRING(ORDER_DATE_EPOCH, 'yyyy-MM-dd HH:mm:ss') FROM ORDERS_INT_01 LIMIT five; 71491 | 5141.0 | 2017-10-25 ten:36:12 | 2011-07-26 01:00:00 71494 | 3867.0 | 2017-10-25 ten:36:12 | 2009-06-22 23:00:00 71498 | 5511.0 | 2017-10-25 10:36:12 | 2007-ten-18 05:00:00 71501 | 4705.0 | 2017-10-25 ten:36:12 | 2007-08-24 17:00:00 71504 | 6249.0 | 2017-10-25 10:36:12 | 2009-12-01 04:00:00 LIMIT reached for the partition. Query terminated        

The final step is to apply the new epoch column as the basis for our new Orders stream, in which we will use the TIMESTAMP property assignment to instruct KSQL to utilize the outcome time (ORDER_DATE_EPOCH) as the timestamp for the stream:

ksql> CREATE STREAM ORDERS WITH (TIMESTAMP ='ORDER_DATE_EPOCH') As  SELECT EXTRACT_TS, ORDER_DATE_EPOCH, ORDER_ID, ORDER_DATE, ORDER_STATUS, ORDER_TOTAL  FROM ORDERS_INT_01;   Bulletin ----------------------------  Stream created and running        

Inspecting Depict EXTENDED for the new stream shows that the Timestamp field is indeed being driven from the Order date (i.e. event time), and not the fourth dimension at which the result hit our system:

ksql> DESCRIBE EXTENDED ORDERS;  Blazon                 : STREAM Cardinal field            : Timestamp field      : ORDER_DATE_EPOCH Cardinal format           : STRING Value format         : AVRO Kafka output topic   : ORDERS (partitions: 4, replication: 1) [...]        

Now when we query this stream, and include ROWTIME (which is the actual time value KSQL will use for the aggregation) nosotros can run into that it matches what we had in the source ORDER_DATE column – the actual consequence time:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ORDER_DATE, ORDER_ID, ORDER_TOTAL  FROM ORDERS LIMIT 5; 2011-07-26 01:00:00 | 2011-07-26 01:00:00.000000000 | 71491 | 5141.0 2009-06-22 23:00:00 | 2009-06-22 23:00:00.000000000 | 71494 | 3867.0 2008-04-23 15:00:00 | 2008-04-23 xv:00:00.000000000 | 71492 | 4735.0 2007-ten-18 05:00:00 | 2007-10-18 05:00:00.000000000 | 71498 | 5511.0 2007-08-24 17:00:00 | 2007-08-24 17:00:00.000000000 | 71501 | 4705.0 LIMIT reached for the division. Query terminated        

Phew! Now to actually build our aggregate:

ksql> CREATE TABLE ORDERS_AGG_HOURLY Every bit  SELECT ORDER_STATUS, COUNT(*) AS ORDER_COUNT, MAX(ORDER_TOTAL) AS MAX_ORDER_TOTAL,  MIN(ORDER_TOTAL) Every bit MIN_ORDER_TOTAL, SUM(ORDER_TOTAL) As SUM_ORDER_TOTAL,  SUM(ORDER_TOTAL)/COUNT(*) AS AVG_ORDER_TOTAL  FROM ORDERS WINDOW TUMBLING (SIZE ane Hour) GROUP BY ORDER_STATUS;   Message ---------------------------  Table created and running        

This creates a tabular array in KSQL, backed past a Kafka topic. At this indicate you lot have learned that the implicit column ROWTIME carries the information almost result time. Just at present we are dealing with something new, which is the concept of windows. When a table is created and it is based on windows, you lot ought to use different constructions to refer to the offset and end fourth dimension of a given window. Starting from version 5.0, you lot at present have the UDFs WindowStart() and WindowEnd() to refer to these values. You can larn more than about these functions in the KSQL Syntax Reference for aggregates.

Using ROWTIME formatted in a human-readable format along with the WindowStart() function, we can inspect the aggregate:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), ROWKEY, WindowStart(), ORDER_STATUS, MAX_ORDER_TOTAL, MIN_ORDER_TOTAL, SUM_ORDER_TOTAL, ORDER_COUNT, AVG_ORDER_TOTAL FROM ORDERS_AGG_HOURLY LIMIT v; 2008-04-21 xvi:00:00 | four : Window{start=1208790000000} | 1545160440000 | 4 | 4067.0 | 4067.0 | 4067.0 | i | 4067.0 2007-eleven-20 21:00:00 | iv : Window{starting time=1195592400000} | 1545160440000 | 4 | 3745.0 | 3745.0 | 3745.0 | 1 | 3745.0 2008-08-24 06:00:00 | vii : Window{outset=1219554000000} | 1545160440000 | 7 | 7354.0 | 7354.0 | 7354.0 | 1 | 7354.0 2008-03-25 05:00:00 | three : Window{outset=1206421200000} | 1545160440000 | three | 2269.0 | 2269.0 | 2269.0 | ane | 2269.0 2009-11-13 23:00:00 | 3 : Window{start=1258153200000} | 1545160440000 | 3 | 2865.0 | 2865.0 | 2865.0 | 1 | 2865.0        

This implicit metadata tin be exposed properly with a CTAS:

ksql> CREATE Tabular array ORDERS_AGG_HOURLY_WITH_WINDOW Every bit  SELECT TIMESTAMPTOSTRING(WindowStart(), 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY,   ORDER_STATUS, MAX_ORDER_TOTAL, MIN_ORDER_TOTAL, SUM_ORDER_TOTAL, ORDER_COUNT, AVG_ORDER_TOTAL  FROM ORDERS_AGG_HOURLY;        

Checking out the raw Kafka messages shows that our aggregates are in place along with the window timestamp:

Robin@asgard02 > kafka-avro-panel-consumer                   --bootstrap-server localhost:9092                   --property schema.registry.url=http://localhost:8081                   --from-beginning                   --topic ORDERS_AGG_HOURLY_WITH_WINDOW                   --max-messages one|jq '.' {   "WINDOW_START_TS": "2009-04-10 23:00:00",   "ORDER_STATUS": 4,   "MAX_ORDER_TOTAL": 3753,   "MIN_ORDER_TOTAL": 3753,   "SUM_ORDER_TOTAL": 33777,   "ORDER_COUNT": ix,   "AVG_ORDER_TOTAL": 3753 } Processed a total of ane messages        

Note that equally an aggregate is updated (either past data arriving within the current window, or belatedly-arriving data) it is re-emitted, but with the same key (which includes the window) equally before. This means that downstream we just need to take the primal as the basis for storing the aggregate, and overwrite an existing keys with new values.

Streaming enriched data from Kafka into Elasticsearch

Let's at present have the data that originated in Oracle, streamed in through Kafka, enriched in KSQL, and state it to Elasticsearch. Nosotros can practice this using Kafka Connect. We're going to employ a single connector to land the contents of both the enriched logons and the aggregated order metrics into Elasticsearch at once. Y'all tin load them individually besides if yous desire.

We're going to use a Single Message Transform (SMT) to set the Timestamp datatype for LOGON_DATE string. This came from GoldenGate equally a cord, and in order for Elasticsearch to work seamlessly through Kibana nosotros desire the Kafka Connect sink to pass the datatype equally a timestamp—which using the SMT volition enable. The alternative is to use document templates in Elasticsearch to set the datatypes of sure columns, but SMT are neater in this example. We'll use an SMT for the WINDOW_START_TS too, as this column nosotros cast as a string for brandish purposes.

Here is the necessary Kafka Connect configuration to stream the Kafka information from two of the topics populated past KSQL into Elasticsearch:

true cat > ~/es_sink.json&amp;lt;&amp;lt;EOF {   "name": "es_sink",   "config": {     "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",     "value.converter": "io.confluent.connect.avro.AvroConverter",     "value.converter.schema.registry.url": "http://localhost:8081/",     "central.converter": "org.apache.kafka.connect.storage.StringConverter",     "key.converter.schemas.enable": false,     "type.name": "type.proper noun=kafkaconnect",     "topics": "LOGON_ENRICHED,ORDERS_AGG_HOURLY_WITH_WINDOW",     "topic.index.map": "LOGON_ENRICHED:logon_enriched,ORDERS_AGG_HOURLY_WITH_WINDOW:orders_agg_hourly",     "connection.url": "http://localhost:9200",     "transforms": "convert_logon_date,convert_window_ts",     "transforms.convert_logon_date.blazon": "org.apache.kafka.connect.transforms.TimestampConverter$Value",     "transforms.convert_logon_date.target.type": "Timestamp",     "transforms.convert_logon_date.field": "LOGON_DATE",     "transforms.convert_logon_date.format": "yyyy-MM-dd HH:mm:ss",     "transforms.convert_window_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",     "transforms.convert_window_ts.target.type": "Timestamp",     "transforms.convert_window_ts.field": "WINDOW_START_TS",     "transforms.convert_window_ts.format": "yyyy-MM-dd HH:mm:ss"   } } EOF        

Load connector:

$ confluent load es_sink_logon_enriched -d ~/es_sink_logon_enriched.json        

Ostend it's running:

$ confluent status connectors|  jq '.[]'|  xargs -I{connector} confluent status {connector}|  jq -c -M '[.proper name,.connector.state,.tasks[].land]|bring together(":|:")'|  column -s : -t|  sed 's/\"//one thousand'|  sort   es_sink_logon_enriched_01  |  RUNNING  |  RUNNING        

If there's an error so use confluent log connect to see details.

Verify that the SMT has done the trick for the date column, by inspecting the mapping defined for logons:

$ curl -s "http://localhost:9200/logon_enriched/_mappings"|jq '.logon_enriched.mappings."type.name=kafkaconnect".backdrop.LOGON_DATE.type'   "engagement"        

and for the aggregated orders:

$ whorl -s "http://localhost:9200/orders_agg_hourly/_mappings"|jq '."orders_agg_hourly".mappings."blazon.name=kafkaconnect".properties.WINDOW_START_TS.type'   "appointment"        

Sample the data:

$ curl -s -Xget "http://localhost:9200/logon_enriched/_search?q=rick"|jq '.hits.hits[1]' {   "_index": "logon_enriched",   "_type": "type.proper name=kafkaconnect",   "_id": "83280",   "_score": 7.5321684,   "_source": {     "CUST_LAST_NAME": "hansen",     "CUSTOMER_SINCE_YRS": xi.806762565068492,     "CUSTOMER_SINCE": "2006-12-16 00:00:00",     "CUSTOMER_CLASS": "Occasional",     "LOGON_DATE": "2003-07-03 05:53:03",     "C_CUSTOMER_ID": 83280,     "CUST_FULL_NAME": "rick hansen",     "CUST_EMAIL": "rick.hansen@googlemail.com",     "LOGON_ID": 65112,     "CUST_FIRST_NAME": "rick"   } }        

Add together the alphabetize in Kibana and at present we tin can monitor in real fourth dimension what's happening – using data from Oracle, streamed through Kafka, dynamically enriched and joined, and streamed into Elasticsearch.

Here'south a list of all logins, with the full details of the client included:

At present filtering all real time logins to bear witness only those of "Business" blazon:

A useful point here is that whilst we can dynamically filter the data in the cease-user tool, nosotros could as easily generate a dedicated stream of just CUSTOMER_CLASS = 'Business' records using KSQL. Information technology comes downwardly to whether the data is to support exploratory/advertising-hoc analytics, or to bulldoze a concern process that merely needs data matching sure criteria.

With our data streaming from the transactional RDBMS system through Kafka and into a datastore such as Elasticsearch, it's easy to build total dashboards too. These requite a existent time view over business events as they occur:

With the aggregate landed in our datastore, nosotros can easily view the raw aggregate data in a table:

Edifice on the aggregates that we have created, nosotros can add to the dashboard we created above, including information about the orders placed:

All of this data driven in real fourth dimension from our source transaction arrangement! Using Kafka we have been able to stream and persist the raw events, transformed and enriched them with KSQL, and streamed to target datastores such every bit Elasticsearch with Kafka Connect.

ETL Is Dead, Long Alive Streams

We've seen in this commodity how we can stream database changes in real-time into Kafka, and use these to drive multiple applications. With KSQL we can easily transform data, from simple filtering of streams of events from a database, to enriching events from multiple sources, denormalizing normalized structures, and creating rolling aggregates. Since KSQL writes transformed streams back to Kafka, nosotros tin use predicates in KSQL to hands implement exception detection, driving real-time applications without the need for complex coding. By defaulting to being event-driven, we tin can build systems that provide data for analytics when information technology's needed, and utilize the same enriched data for driving applications in real-time.

Where To Go From Here

If yous have enjoyed this article, you might want to go on with the following resources to learn more than about KSQL and Streaming ETL:

  • Go started with KSQL to process and analyze your company'south data in real-time.
  • Watch our 3-function online talk series for the ins and outs behind how KSQL works.
  • Watch Neha Narkhede's online talk: ETL Is Dead, Long Live Streams.
  • Check out our Streaming ETL – The New Information Integration online talk series.

If you are interested in contributing to KSQL, we encourage you to get involved by sharing your feedback via the KSQL issue tracker, voting on existing issues by giving your +1, or opening pull requests. Use the #ksql channel in our public Confluent Slack community to ask questions, hash out utilize cases or help boyfriend KSQL users.

zehrregrarm.blogspot.com

Source: https://www.confluent.io/blog/ksql-in-action-real-time-streaming-etl-from-oracle-transactional-data

Post a Comment for "How to Read a Oracle Table and Stream the Data in Kafka"