Saturday, December 08, 2007

Getting intermediate results in Streams

Let's say that we want to keep a running total of the number of shares that we have traded, and at 4:00 PM every day, we want to dump out the total. In Coral8, we can do something like this:


SELECT SUM(shares)

This looks pretty straightforward. The Totals stream retains the totals until 4:00PM. At 4:00 every day, it outputs the total shares to any other stream that is "subscribed" to Totals, and then resets itself to start accumulating new totals.

This is something that CEP engines are good at, whether it be Coral8, Aleri, or Esper.

Now, let's enhance this a little bit.

Let's say we give the traders a .NET GUI application, and on this GUI is a "Status" button. The traders can press this button any time they want to know how many shares have been traded so far that day. So, at 2:00, a trader pushes a button on the GUI and we need to return to him the number of orders seen so far that day, the number of shares seen, the notional value of all orders, etc.

So, there are two questions:

1) How can we "dump out" these accumulators on demand? In other words, is there a way to tell these CEP engines to give me the contents of an aggregation stream AS OF THIS MOMENT ?

2) How can we "call into" our CEP engine to retrieve these values? Do the CEP engines support an API that I can use from within the GUI to say "Give me the current value of a certain variable in my module"? Something like

IntegerFieldValue field = Coral8Service.GetObject("ccl://localhost:6789/Default/SectorFlowAnalyzer", "sum(Shares)") as IntegerFieldValue;
int shares = field.Value;

In a standard C# application, this would be as simple as putting a Getter on a variable, and just calling the getter. If I was using Web Services, then I could call into a Web Service and just ask for the values of some variables or for some sort of object. But, from a C# app, how can I get the current value of a stream that is aggregating totals?

Another way of accumulating the total number of shares in a CEP engine is to step into the procedural world, and just define a variable. In Coral8, it would be something like this:


ON TradeInputStream
SET TotalShares = TotalShares + TradeInputStream.shares;

Then, we would need a "pulse" to fire at 4:00PM every day, and upon this pulse firing, we could send the TotalShares to another stream.

I am sure that there are patterns in every CEP engine for accessing intermediate results, but something that is a no-brainer in a procedural language may not be so easy in a CEP vendor variant of SQL.

©2007 Marc Adler - All Rights Reserved


Hans said...

I agree that you have to change the way you think with streaming SQL. It's not like working in a regular language. Honestly, even though I've used a stream SQl language a lot, I can't decide which way is easier.

For your problem, I suggest:

Have the sum output intermediate values on every tuple that updates it (don't know exactly how to express this in Coral8 language, but something like "output all every 1").

Select this output into a window that keep the last 1 tuple.

When a GUI requests an update, join this request tuple with the window. This will produce one tuple of output (joining 1 tuple with 1 tuple) that contains the sum. Send this back to the GUI.

Another option is simply to output intermediate values every 30 seconds. The GUI subscribes to this feed and has to wait at most 30 seconds to get the latest value.

In either case, anything needing the summary at the end of the day will have to subscribe to a second stream which filters the intermediate values and only publishes one at the end of the day.

Hans said...

From my previous comment: with the window idea, I think that rather than joining in a request message to that window and sending back a response, maybe a client can just connect and select the one value that's in the window. Sort of like a regular database.

marc said...


Having the user wait 30 seconds, or even 10 seconds, is not acceptable to traders who are used to instantaneous response.

I don't want the CEP engine to publish into my GUI every time it receives a new tuple. I want a simple request/response interface into the CEP engine that will let me query the state of any stream or any variable. I want to have "pull" mechanisms as well as "push" mechanisms.

The question comes down to: has the CEP vendor exposed a fine-grained API to the developer so that the developer can take total control of the CEP engine, and push and pull values into streams and variables.

Hans said...

I suppose that it boils down to this:

You can create a window that contains a single tuple with the very most current value of the volume. This is like having a table with one row and doesn't mean publishing a message every time a new execution report comes in.

If Coral8 allows the client to query the value of this window directly, then you have what you want. Every time they want an update, select from this window and use the one row to display the volume.

If not, it will be annoying.

AXW said...

In SB you can do the materialized window to store the last value. Then "query" (join) it to a request tuple and grab your results on the way out.

However, you're not going to get request/response in any 'pure' stream solution.

As a practical solution, you're going to have to most likely store your current values outside the stream.

--aaron (formerly of SB)

Hans said...

Hi Aaron, didn't have a very long career with SB, huh?

This was the solution I was suggesting but I thought that maybe a product would allow one to query into the materialized window directly, rather than using the request-response tuples.

Jon said...

I'm not sure if other CEP platforms have this, but the Aleri Streaming Platform does have a SQL interface that one can use to pull data from streams. The SQL interface can be accessed by JDBC or ODBC drivers that ship with the product, or a native interface that is the same as the Postgres native interface. In fact, the SQL interface is an integral part of one of Aleri's vertical applications, Liquidity Management.

bernhardttom said...

Esper features a pull API that can be called upon to retrieve current state.