Tuesday, November 20, 2007

Our First CEP Use Case (and thoughts on Coral8 and Aleri)

For the Complex Event Processing (CEP) engine evaluation, we have chosen a very simple use case. This use case is:

Tell us when orders for a sector show a greater-than-normal level.

Even though this use case seems very simplistic, and would not tend to be an ideal use case to test a CEP engine, it is an ideal use case for our environment. Why? It forces us to get at various data streams that have previously been inaccessible to most people, and it forces the owners of these streams of data to make there data clean.

(Note: this use case is a very generic use case and test for CEP. I am not giving away any special use cases that would give my company a competitve edge, not will I ever do so in this blog.)

At the Gartner CEP Summit last September, Mary Knox of Gartner mentioned that one of the obstacles for doing successful CEP projects at large organization was the process of liberating all of the data sources that you need, and getting the various silos to talk to each other. We have found this to be the case at our organization too. We figure that if we can get this simple use case to work, then we have won 50% of the battle.

What kind of data do we need to implement this use case?




  • We need to tap into the real-time order flow. Order flow comes to us through FIX messages, and for older systems, through proprietary messages that will one day be deprecated. Luckily, we have found a system that provides us this information. Although this system is a monitoring GUI, we have identified its importance to our company, and we are working with the product owner to split his app into a subscribable order service and a thinner GUI.
  • We need historical order data in order to determine what “normal activity” is for a sector. Luckily, we have this data, and we are in the process of getting access to it. We also need to understand what we mean by “abnormal activity”? Does this mean “2 standard deviations above the 30-day moving average for a sector”?
  • We need to be able to get a list of sectors, and for each order, we need to map each ticker symbol to its sector. Sectors are signified by something called GIC codes, and there are 4 levels of GIC’s. The important thing that we need is to ensure that all corporate actions get percolated down to these mapping tables. So, if a company changes it ticker symbol (like SUNW to JAVA), then the new ticker symbol needs to be automatically added to these mapping tables.


  • Let’s say that we are able to get all of the data that we need, and that the stream of data is pristine. We have to get it into the CEP engine for analysis.

    If you think if writing a normal, procedural program (i.e.: a C# app) to do this analysis, the steps are pretty easy.

    1) Read in all of the reference data. This includes the ticker-to-sector mappings and the list of normal activity per sector per time-slice. We will consider a timeslice to be a one-minute interval. In a 6.5 hour trading day, there are 390 minutes. There are also 11 “GIC0” sectors. So, a timeslice will be an integer from 0 to 389.

    2) Subscribe to a stream of FIX orders.

    3) As each order comes in, extract the ticker and map it to a sector. We are also interested in the number of shares in the order and the time that the order was placed. For each order, increment a running total for that sector and for that timeslice.

    4) Any orders that come in that are past the current timeslice are ignored. Also, any orders that come outside of the normal trading day are ignored. This way, we don’t consider any orders that may have been delayed through our systems.

    5) If we detect a new and later timeslice, then examine all of the sectors for the previous timeslice. If any of the sectors show heightened activity, then alert the user. Then, clear the totals for all of the sectors, and start accumulating new totals for all of the sectors.

    This looks pretty easy. I would assign this to a good C# developer, and hope to get a finished program in one or two days.

    Now, the task is to map this into a CEP engine.

    Most of the CEP engines have a language that is based on SQL. So, you can imagine all of the processing steps above passing through multiple streams in the CEP engine. For step 1) above, we would have two input streams, one for the ticker-to-sector mapping data and the other for the “normal sector activity” data. You can imagine two simple SELECT statements in SQL that read this data from some external database, and construct two in-memory tables in the CEP engine.

    For step 2, you need to write a specialized input adapter that subscribes to a communications channel (sockets or JMS) and reads and decodes the FIX orders. Most orders come through as NewOrderSingle messages (FIX message type = ‘D’). There are various versions of FIX, but let’s say that everything comes in as FIX 4.2 messages.

    Most of the CEP vendors support in-process and out-of-process adapters. In-process adapters are faster than out-of-process adapters, but out-of-process adapters are usually easier to write. An out-of-process adapter will read data from some kind of communications bus (or even from a database table or a flat file), and will write a data stream to the CEP engine. It would be ideal to have the CEP vendors support FIX in in-process input and output adapters.

    Step 4) is easy. We calculate the 0-based timeslice for an order, and if it is below 0 or above 389, then we ignore this order in the stream. This can be done with a simple WHERE clause in the SQL statement.

    We also need to record the “current timeslice” and ignore any orders that come before the current timeslice. So, we need the concept of a “global variable” and when we see an order with a later timeslice, we need to update this variable. This is something which is easy to do with a procedural language, but what is the best way to do this in SQL?

    Steps 3) and 5) are interesting. We need to keep a one minute window per sector. This window should only keep running totals for the current timeslice. When a new timeslice comes in, we need to analyze the sector activity in the current timeslice, do any alerts, and then clear out the totals in all sectors. Again, this is something that is extremely easy to do in a C# application, but translating it into SQL is a bit of a challenge.

    In step 3), the mapping of ticker to sector is very easy. It’s just a join of the ticker in the order with the ticker in the mapping table. The interesting thing is the choice of window type for the stream. Do we accumulate all orders for all sectors for the one-minute timeslice, and then, when we see a new timeslice, do we just take a COUNT() of the number of orders for each sector? Or, do we simple have a window with one row per sector, and keep running totals for each sector as an order comes in?

    Coral8 supports the concepts of sliding and jumping windows. Aleri supports only sliding windows right now. With Coral8, we can set a window that will hold one minute’s worth of data, and we can also tell a stream that it should dump its output after one minute. However, we don’t want to tie the TransactTime in a FIX order message to the actual clock on the computer. We need a stream that will produce output on a certain value in a column, and neither Coral8 nor Aleri seem to have this yet.

    Here is some Coral8 code that shows windows and streams:

    CREATE WINDOW TickerAndSector
    SCHEMA (Ticker STRING, Sector STRING, SectorId INTEGER, Shares INTEGER,
    TransactTimeBucket INTEGER)
    KEEP EVERY 60 SECONDS;

    INSERT INTO TickerAndSector
    SELECT
    FlattenNewOrder.Ticker,
    TickerToSectorMap.SectorName,
    TickerToSectorMap.SectorId,
    TO_INTEGER(FlattenNewOrder.Qty),
    TimeToTimeBucket(FlattenNewOrder.TransactTime, 'HH:MI:SS AM')
    FROM
    FlattenNewOrder,
    TickerToSectorMap
    WHERE
    TickerToSectorMap.Ticker = FlattenNewOrder.Ticker
    OUTPUT EVERY 60 SECONDS;

    The first statement defines a window that keeps one minute’s worth of order data. After one minute, the window will empty its contents.

    The second statement will insert a new row into the window whenever we get a new order. After one minute, the window will send its output to another stream further down the pipeline. (We hope that the data will be sent to the next stream before the window clears itself. Otherwise, we will lose all of the data.)

    So far, in my brief evaluation, I have found step 5) difficult to implement in Coral8. Aleri has implemented this by using a FlexStream. A FlexStream is a stream that has procedural logic attached to it. Aleri has a custom C-like programming language that you can use to implement procedural logic in a FlexStream. But, if you write too much logic using FlexStreams, then wouldn’t you be better off to just write a nice C# application?

    To validate some of the CEP engines, I ended up taking a day and writing a C# application that implements this use-case. For grins, I added a tab that showed some animated graphics using the very excellent ChartFX package. The head of the trading business was so excited by this eye candy that he started to bring over various traders for a look at my simple app. So, in addition to this little app giving the traders information that they did not have before, it provided them a flashy way to see real-time movement across sectors.

    In addition to having SQL skills, a good CEP developer needs to readjust their way of thinking in order to consider pipelined streams of SQL processing. There is a big debate going on in the Yahoo CEP forum as to whether SQL is a suitable language for CEP processing. So far, with this use case, I see the suitability of SQL, but I also need to step out of the SQL way of thinking and apply some procedural logic.

    One of the things that I still need to be convinced of is that CEP engines can do a better job than custom code. I am all ears. Any CEP vendor (even Streambase) is invited to submit public comments to this blog to tell me how this use case can be implemented with their system.


    ©2007 Marc Adler - All Rights Reserved

    1 comment:

    tibbetts said...

    Nice post. Not only do you present a straightforward use case, but you've hit two of my personal favorite issues in CEP: opening access to data and value-based windowing.

    Getting access to siloed and proprietary data is the big challenge for many CEP deployments. Even simple monitoring applications can require access to lots of data. On the other hand, building simple monitoring applications can get you access to lots of data. Once you have access to the data and good tools, CEP adoption starts to snowball. Applications that were once impossible become just one more query, and CEP gets really exciting.

    Value based windowing is the other issue you hit on, possibly without even knowing it. Windowing and other operations that are driven by data values rather than a global notion of time have differentiated StreamBase from other CEP and streaming tools since we were an academic project (Aurora). I've written publicly about this before: My post at Global Time is a Dangerous Assumption talks about it in high-level terms.

    Value based windowing allows you to tie the maintenance of windows to the values of a field or expression in the data stream. Your use case might look like (code formatting mangled by blogging software):

    -- Enrich every order with the sector
    CREATE STREAM NewOrdersWithSector AS
    SELECT fno.*, tsm.SectorID, tsm.SectorName, int(TransactTime / minutes(1)) AS TimeBucket
    FROM FlattenNewOrder fno, TickerToSectorMap tsm
    WHERE fno.Ticker = tsm.Ticker;

    -- Calculate the total trading in each minute by sector.
    -- Pass along the opening value for the window as TimeBucket.
    CREATE OUTPUT STREAM OrdersBySector AS
    SELECT SectorID, sum(Qty) AS Volume, count() AS OrderCount, openval() as TimeBucket
    FROM NewOrdersWithSector[SIZE 1 ON TimeBucket]
    GROUP BY SectorID;

    You can see the anonymous window definition appearing in the second SELECT statement, in the square brackets. You could write this as one statement, with the first SELECT nested inside the second, but for examples I like to name my intermediate streams.

    Value-based windowing works with any numeric value. You could do one minute value based windows over your raw timestamp, rather than calculating your own buckets. That's probably how I would do it. StreamBase does sliding and tumbling windows.

    Of course, this functionality works equally well in textual StreamSQL and in graphical EventFlow. I'm happy to send along the graphical version of the application, it is just two operators plus the reference data table.

    The documentation is on the web, for the StreamSQL CREATE WINDOW Statement and EventFlow Field-Based Dimension Options.

    As long as I'm telling you about StreamBase features, we also have your global variables. We call them dynamic variables. The doc pages are EventFlow Dynamic Variables and StreamSQL DECLARE Statement. You don't need them for this use case, but they come in handy for other things.

    Regards,
    Richard Tibbetts, StreamBase Systems.