Enterprise Streaming
by Amir Shevat04/20/2005
The concept of input and output streams was first introduced in
Java 1.0. Not to be confused with the concept of multimedia
streaming,
Java streams serve as a standard way to write data to a
destination and to read data from a source. Components such as
files, sockets, and even the keyboard and the screen
(System.in and System.out) are common
examples of destinations and sources that can communicate using
input and output streams. In fact, some objects, such as sockets, can
be both a source and a destination at the same time.
The Java Message Service (JMS) is a standard way for enterprise applications to communicate with each other in a distributed environment. While this is a well-known and proven method, it is a complex and sometimes cumbersome message-driven framework that lacks some of the power that the simpler stream framework provides. MantaRay, an open source data messaging project (JMS provider) that is based on a peer-to-peer, serverless architecture, has solved this problem by combining the benefits of JMS and streaming.
This article will review the power hidden in the concept of streams and introduce the enterprise stream developed by the MantaRay open source project, a new type of stream whose destination and source are JMS topics and queues.
The Power of Input and Output Streams
Generalization
One of the most powerful features of streams is that whatever
the destination or the source may be, the communications API always
stays the same. Writing to a socket is the same as writing to a
file. A FileOutputStream object may give you
additional functionality that is unique to files, but the basic API
to read and write still stays the same. The result is that if you
do not use any additional functionality, you can easily replace the
source/destination without changing your code.
Figure 1 shows how streams are used.

Figure 1. Using streams
The stream API to every destination is the same and is quite
primitive. For example, the return value of the read()
method for the class InputStream is an int in
the range 0 to 255. For this reason, Java
provides a set of utilities that helps you write more complex data
to these streams. These utilities include helper classes like
writers and readers, as well as wrapper streams that wrap over the
original stream. These utilities will be discussed in the next
section.
Stream Wrapping and Changing
Another powerful feature is wrapping one stream over another.
If you write data to a file but want the data to be compressed, you
simply create a FileOutputStream object and wrap it
with a ZipOutputStream object. You then write to
ZipOutputStream, which compresses the data and
delegates it to FileOutputStream, which writes it to
the physical file. Similarly, you can read from that .zip file using the
FileInputStream and ZipInputStream
objects.
In fact, you can chain multiple streams together. For example,
you can wrap a SocketOutputStream object with a
CipherOutputStream object in order to encrypt the data,
and a ZipInputStream object in order to compress the
data. As the data passes from stream to stream, each object
performs its own operation on it.
Figure 2 shows an example of wrapping streams.

Figure 2. Wrapping streams
MantaRay's Enterprise Streams
Why Are Enterprise Streams Needed?
While most streams work with physical IO components, MantaRay enterprise streams work with JMS queues and topics. JMS is a message-oriented standard for transferring data asynchronously over queues and topics, commonly used in the enterprise environment.
For example, let's take two applications that want to communicate over a queue. One application sends a message to a queue and the other receives the message. Here is the code involved in the process, implemented using JMS 1.02:
// sender code:
javax.jms.QueueConnectionFactory conFactory
= new ...// look up in JNDI or create an instance
javax.jms.QueueConnection con
= conFactory.createQueueConnection();
// creates a non transacted session with
// automatic acknowledge
javax.jms.QueueSession sendSession
= con.createQueueSession(false
,Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue sendQueue
= sendSession.createQueue (sQueue);
javax.jms.QueueSender sender
= sendSession.createSender(sendQueue);
javax.jms.TextMessage msg
= sendSession.createTextMessage();
msg.setText( "some text" );
sender.send( msg,
javax.jms.DeliveryMode.NON_PERSISTENT,
javax.jms.Message.DEFAULT_PRIORITY,
MESSAGE_TTL);
// receiver code:
javax.jms.QueueConnectionFactory conFactory
= new ... // look up in JNDI or create an instance
javax.jms.QueueConnection con
= conFactory.createQueueConnection();
// creates a non transacted session with
// automatic acknowledge
javax.jms.QueueSession receiveSession
= con.createQueueSession(false
,Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue receiveQueue
= receiveSession.createQueue (rQueue);
javax.jms.QueueReceiver qReceiver
= receiveSession.createReceiver(receiveQueue);
javax.jms.TextMessageMessage
=(TextMessageMessage) qReceiver.receive();
As you can see, not only is this code a bit complex, it is also message-oriented and not stream-oriented. When a user writes data into an enterprise stream, the stream cuts the data into packets and wraps them in a JMS message envelope. It then sends the message on the predefined queue or topic, where it is processed by the input stream, which unwraps the data and makes it available for reading by the user at its destination.
Figure 3 shows how data is processed in MantaRay enterprise stream.

Figure 3. The process of data in MantaRay enterprise
stream
Because enterprise streams extend the InputStream
and OutputStream objects, in the same way a socket or
file Input/OutputStream object does, you can use them to utilize
all of the powers of streaming:
- They are easy to use because they provide the same interface all streams share.
- You can wrap them with other streams to archive extended functionality such as compression and encryption.
- You do not have to worry about breaking the data into packets, buffer allocation, or any other low-level issue regarding data delivery.
Working with Enterprise Streams
JMS queues are all about point-to-point communications. When using an enterprise stream over a queue, there should be only one output stream and only one input stream on the same queue. Although most JMS providers, including MantaRay, enable multiple producers and consumers on the same queue, this is not defined in the JMS specification, and is thus considered misuse when working with enterprise streams over a queue. The reason for this is that a message on a specific queue is delivered to one and only one consumer; therefore, multiple consumers may "steal" vital data from one another.
JMS topics define many-to-many communication. With enterprise streams that use a topic as the destination or source, there can be multiple subscribes but only one publisher on the same topic. The reason for this is that otherwise, multiple publishers on the same topic will get their data scrambled and create senseless output for the subscribers.
However, the fact that enterprise streams over topics can have multiple subscribers enabling one-to-many communication provides a broadcast-like ability. The user of the output stream does not need to manage a separate output stream for each peer. Instead, the user simply writes the data to one output stream and the data packets are received by all of the topic's subscribers.
Enterprise streams bind to a topic or queue by using the connect
method. Only after the stream is connected can you write to it or
read from it. When streaming over a queue, the data produced by the
output stream is stored in the queue until it is consumed by the
input stream. Because of this ability to store data, the order of
connection is not important. The output stream can connect and send
data, and the input stream can then connect at any time to receive
all of the data from the beginning of the stream. This is not the case
when using topics as an input stream, as topics will only receive
data produced after they have connected to the stream. While this
may not be a problem in some cases (for example, with a continuous
CPU usage report), it may be important in others cases (for
example, in the case of file transfers).
MantaRay Enterprise Streaming Example
Graph Feeder-Viewer
Consider a graph feeder component that generates continuous data--anything from memory usage, to stock prices, to factory output, to the number of Martians on earth at the moment. This data needs to be displayed in several different locations in the form of a dynamically changing graph called the graph viewer.
Because the data is continuous and the communication is of the one-to-many type, this task can be easily solved by using MantaRay enterprise streams.
Figure 4 shows the graph view component.

Figure 4. Graph view component
Below is a short code example of the graph feeder component:
import org.mr.api.blocks.MantaOutputStream;
...
/**
* creates random data and sends it to the graph viewer
*/
public class GraphFeeder {
public static void main(String[] args)
throws Exception {
// create an enterprise output stream with
// a 4 bytes packet hint, the hint helps
// the stream cut the info into packets
MantaOutputStream out =
new MantaOutputStream(4);
// connect the enterprise output stream to
// a topic called graph
out.connect("graph",
MantaOutputStream.TOPIC);
// wrap the data in a DataOutputStream
// so we can easily write integers
DataOutputStream dos =
new DataOutputStream(out);
int currentStatus = 0;
for(int rounds =0 ;rounds < 30000
;rounds++ ){
// generate random fluctuations
// in the graph
int rand =(int)
(System.currentTimeMillis()%777);
if(rand%2 ==0){
currentStatus++;
}else{
currentStatus--;
}
// write data to the stream
dos.writeInt(currentStatus);
// sleep for a while before generating
// more random data
Thread.sleep(rand/3);
}
}
}
Below is a short code example of the graph viewer component:
import org.mr.api.blocks.MantaInputStream;
...
/**
* Shows the data in from
* the input stream in a graphical way
*/
public class GraphViewer {
// runs the program
public static void main(String[] args)
throws IOException {
// init the program
GraphViewer view = new GraphViewer();
view.init();
}
// the program logic
public void init() throws IOException{
// init graph view
InitGraph();
// create an enterprise input stream
MantaInputStream in =
new MantaInputStream();
// connect the enterprise output stream to
// a topic called graph
in.connect("graph",
MantaOutputStream.TOPIC);
//wrap the data in a DataOutputStream so
// we can easily read integers
DataInputStream dis =
new DataInputStream(in);
int input =0;
boolean go = true;
while(go){
try{
//read data
input=dis.readInt();
}catch(IOException e){
e.printStackTrace();
go =false;
}
// update the graph
updateGraph(input);
}
}
}
This example shows the ability to wrap one stream around another. As you can see in this example, a
DataInputStream and a DataOutputStream were
used to write the data in the form of an integer. The full code for
the graph viewer example can be found in the sample folder of
MantaRay's latest release. This article simplified the code example
to make it more suitable for an article format.
JMS provides enhanced features, such as selectors, that may be
useful for some tasks and are not provided by the enterprise
stream. In addition, message-oriented tasks may be done more simply
using the JMS API. This example shows how a simple stream-oriented
task such as this one can benefit from the simple stream-oriented
API of MantaRay's enterprise InputStream and
OutputStream.
Conclusion
Combining the power of JMS and streaming can be very powerful. Applications can use all of the powerful capabilities of streams while still using the J2EE framework as a communications infrastructure. Streams are also very useful for users who do not want to work with JMS objects, as they can sometimes be complex and cumbersome.
Enterprise streams are part of MantaRay's set of "building blocks"--simple and intuitive utilities that simplify the process of writing a distributed application and extend the capabilities MantaRay provides for distributed application communication.
Resources
- MantaRay project page on SourceForge
- MantaRay home page
- JMS
- Java Streams
- InputStream API
- OutputStream API
Amir Shevat is a senior software developer with eight years of experience in computing.
|
Related Reading Java Message Service |
Return to ONJava.com.
