Tell us when orders for a sector show a greater-than-normal level.
Even though this use case seems very simplistic, and would not tend to be an ideal use case to test a CEP engine, it is an ideal use case for our environment. Why? It forces us to get at various data streams that have previously been inaccessible to most people, and it forces the owners of these streams of data to make there data clean.
(Note: this use case is a very generic use case and test for CEP. I am not giving away any special use cases that would give my company a competitve edge, not will I ever do so in this blog.)
At the Gartner CEP Summit last September, Mary Knox of Gartner mentioned that one of the obstacles for doing successful CEP projects at large organization was the process of liberating all of the data sources that you need, and getting the various silos to talk to each other. We have found this to be the case at our organization too. We figure that if we can get this simple use case to work, then we have won 50% of the battle.
What kind of data do we need to implement this use case?
Let’s say that we are able to get all of the data that we need, and that the stream of data is pristine. We have to get it into the CEP engine for analysis.
If you think if writing a normal, procedural program (i.e.: a C# app) to do this analysis, the steps are pretty easy.
1) Read in all of the reference data. This includes the ticker-to-sector mappings and the list of normal activity per sector per time-slice. We will consider a timeslice to be a one-minute interval. In a 6.5 hour trading day, there are 390 minutes. There are also 11 “GIC0” sectors. So, a timeslice will be an integer from 0 to 389.
2) Subscribe to a stream of FIX orders.
3) As each order comes in, extract the ticker and map it to a sector. We are also interested in the number of shares in the order and the time that the order was placed. For each order, increment a running total for that sector and for that timeslice.
4) Any orders that come in that are past the current timeslice are ignored. Also, any orders that come outside of the normal trading day are ignored. This way, we don’t consider any orders that may have been delayed through our systems.
5) If we detect a new and later timeslice, then examine all of the sectors for the previous timeslice. If any of the sectors show heightened activity, then alert the user. Then, clear the totals for all of the sectors, and start accumulating new totals for all of the sectors.
This looks pretty easy. I would assign this to a good C# developer, and hope to get a finished program in one or two days.
Now, the task is to map this into a CEP engine.
Most of the CEP engines have a language that is based on SQL. So, you can imagine all of the processing steps above passing through multiple streams in the CEP engine. For step 1) above, we would have two input streams, one for the ticker-to-sector mapping data and the other for the “normal sector activity” data. You can imagine two simple SELECT statements in SQL that read this data from some external database, and construct two in-memory tables in the CEP engine.
For step 2, you need to write a specialized input adapter that subscribes to a communications channel (sockets or JMS) and reads and decodes the FIX orders. Most orders come through as NewOrderSingle messages (FIX message type = ‘D’). There are various versions of FIX, but let’s say that everything comes in as FIX 4.2 messages.
Most of the CEP vendors support in-process and out-of-process adapters. In-process adapters are faster than out-of-process adapters, but out-of-process adapters are usually easier to write. An out-of-process adapter will read data from some kind of communications bus (or even from a database table or a flat file), and will write a data stream to the CEP engine. It would be ideal to have the CEP vendors support FIX in in-process input and output adapters.
Step 4) is easy. We calculate the 0-based timeslice for an order, and if it is below 0 or above 389, then we ignore this order in the stream. This can be done with a simple WHERE clause in the SQL statement.
We also need to record the “current timeslice” and ignore any orders that come before the current timeslice. So, we need the concept of a “global variable” and when we see an order with a later timeslice, we need to update this variable. This is something which is easy to do with a procedural language, but what is the best way to do this in SQL?
Steps 3) and 5) are interesting. We need to keep a one minute window per sector. This window should only keep running totals for the current timeslice. When a new timeslice comes in, we need to analyze the sector activity in the current timeslice, do any alerts, and then clear out the totals in all sectors. Again, this is something that is extremely easy to do in a C# application, but translating it into SQL is a bit of a challenge.
In step 3), the mapping of ticker to sector is very easy. It’s just a join of the ticker in the order with the ticker in the mapping table. The interesting thing is the choice of window type for the stream. Do we accumulate all orders for all sectors for the one-minute timeslice, and then, when we see a new timeslice, do we just take a COUNT() of the number of orders for each sector? Or, do we simple have a window with one row per sector, and keep running totals for each sector as an order comes in?
Coral8 supports the concepts of sliding and jumping windows. Aleri supports only sliding windows right now. With Coral8, we can set a window that will hold one minute’s worth of data, and we can also tell a stream that it should dump its output after one minute. However, we don’t want to tie the TransactTime in a FIX order message to the actual clock on the computer. We need a stream that will produce output on a certain value in a column, and neither Coral8 nor Aleri seem to have this yet.
Here is some Coral8 code that shows windows and streams:
CREATE WINDOW TickerAndSector
SCHEMA (Ticker STRING, Sector STRING, SectorId INTEGER, Shares INTEGER,
KEEP EVERY 60 SECONDS;
INSERT INTO TickerAndSector
TimeToTimeBucket(FlattenNewOrder.TransactTime, 'HH:MI:SS AM')
TickerToSectorMap.Ticker = FlattenNewOrder.Ticker
OUTPUT EVERY 60 SECONDS;
The first statement defines a window that keeps one minute’s worth of order data. After one minute, the window will empty its contents.
The second statement will insert a new row into the window whenever we get a new order. After one minute, the window will send its output to another stream further down the pipeline. (We hope that the data will be sent to the next stream before the window clears itself. Otherwise, we will lose all of the data.)
So far, in my brief evaluation, I have found step 5) difficult to implement in Coral8. Aleri has implemented this by using a FlexStream. A FlexStream is a stream that has procedural logic attached to it. Aleri has a custom C-like programming language that you can use to implement procedural logic in a FlexStream. But, if you write too much logic using FlexStreams, then wouldn’t you be better off to just write a nice C# application?
To validate some of the CEP engines, I ended up taking a day and writing a C# application that implements this use-case. For grins, I added a tab that showed some animated graphics using the very excellent ChartFX package. The head of the trading business was so excited by this eye candy that he started to bring over various traders for a look at my simple app. So, in addition to this little app giving the traders information that they did not have before, it provided them a flashy way to see real-time movement across sectors.
In addition to having SQL skills, a good CEP developer needs to readjust their way of thinking in order to consider pipelined streams of SQL processing. There is a big debate going on in the Yahoo CEP forum as to whether SQL is a suitable language for CEP processing. So far, with this use case, I see the suitability of SQL, but I also need to step out of the SQL way of thinking and apply some procedural logic.
One of the things that I still need to be convinced of is that CEP engines can do a better job than custom code. I am all ears. Any CEP vendor (even Streambase) is invited to submit public comments to this blog to tell me how this use case can be implemented with their system.
©2007 Marc Adler - All Rights Reserved