|
Related articles: The JXTA Position -- an introduction JXTA Takes Its Position -- an analysis of the framework, by Rael Dornfest The JuxtaNet -- a look at how JXTA may enable a network as popular as Gnutella, by Kelly Truelove Learning the JXTA Shell -- a tutorial by Rael Dornfest |
|
|
Want to build the next great peer-to-peer application? Sun
Microsystem's juxtapose group has anticipated your
move. Foreseeing waves of companies and developers building
peer-to-peer applications, Sun has released its open source platform
juxtapose (JXTA) to make building distributed processes easier.
JXTA isn't a library of code that can be used to enable P2P applications; rather, it's a set of protocols that can be implemented in any language that will allow distributed client interoperability. It provides a platform to perform the most basic functionality required by any P2P application: peer discovery and peer communication.
As part of its open source release, Sun is distributing a preliminary Java binding for JXTA with the goal of having early-adopter engineers create simple P2P applications in Java. Sun's binding isn't complete, however; interfaces, implementations, and protocols are bound to change. Sun welcomes others to suggest changes as part of releasing JXTA as a open source project. So there's no excuse to avoid writing a 'Hello JXTA World' application in order to become familiar with JXTA.
At its core JXTA is simply a protocol for inter-peer communication. Each peer is assigned a unique identifier (peer ID). Each peer belongs to one or more peer groups in which the peers cooperate and function similarly and under a unified set of capabilities and restrictions. JXTA provides protocols for the basic functions -- create groups, find groups, join and leave groups, monitor groups, talk to other groups and peers, share content and services -- all of which are performed by publishing and exchanging XML advertisements and messages between peers.
Conceptually, each peer in JXTA abstracts three layers: the core layer, the services layer, and the applications layer. The core layer is responsible for managing the JXTA protocol; it should encapsulate the knowledge of all basic P2P operations. The core creates an addressing space separate from IP addresses by providing each peer its own unique peer ID, while also boot-strapping each peer into a peer group. Through protocols which the core knows how to implement, a JXTA peer can locate other peers and other peer groups to join (in a secure, authenticated manner, if so desired). The core layer can also open a pipe, a very simple one-way message queue, to another peer or group of peers. By using pipes, distinct parties can communicate with each other. The details of the core itself aren't of much concern to application developers; core development is like application server development, both of which functionality upon which application writers can rely.
The services layer serves as a place for common functionality which more than one, but not necessarily every, P2P program might use. For example, Sun has released a Content Management System (CMS) service that's implemented as a JXTA service. CMS provides a generic way for different applications to share files on JXTA peers so that decentralized Gnutella-like searchs may be performed against all of them. Once specific content has been located, the CMS provides ways for peers to download content. So the services layer provides library-like functionality which JXTA applications can control via logic located in application layer.
The third layer is where the P2P application truly lives. The upper layer might host a UI, so that the user can control different services, or it might be where the logic of an autonomous application operates. For example, a simple chat program can be built on this layer, making use of both the service and the core layer to allow people to send messages back and forth to each other. P2P applications should be fairly easy to build once a developer is familiar with JXTA, as the platform provides the basic peer-to-peer framework.
Sun's preliminary Java binding offers a set of Java classes that
implement the JXTA protocol and hooks into itself via a default set of
objects and services. The code contains two main packages,
net.jxta and net.jxta.impl. The former
encompasses all the JXTA-Java binding nterfaces, while the latter
contains the implementations of those interfaces. The mechanism to
discover other peers and peer groups is abstracted into the
net.jxta.discovery.Discovery class (the implementation of
which is jxta.discovery.impl.service.DiscoveryService);
the factory to create pipes is located in the
net.jxta.pipe.Pipe class (the implementation of which is
net.jxta.impl.pipe.PipeService), and so on. These
services handle the core protocols and are available via method calls
in the net.jxta.peergroup.PeerGroup object.
In order to publish advertisements for peers, peer groups, pipes,
and so on, you must first create the advertisement with the
net.jxta.document.AdvertisementFactory factory. Then all
the bean-type methods can be set before the advertisement is published
in the Discovery service. Along with publishing advertisements, the
Discovery service can be used to discover advertisements located in
other hosts. Calling the getRemoteAdvertisements method
sends a query either to a specific peer or to the peer group as a
whole, and the results will be added to the cache of advertisements
maintained by the local binding. Calling the
getLocalAdvertisements method will query the local cache
for the advertisements.
All JXTA applications and services follow a simple life cycle. The
init method is the first to be called; it is handed an advertisement
(net.jxta.document.Advertisement) and information about
the peer group it has booted into
(net.jxta.peergroup.PeerGroup). The advertisement is
dependent on what is booting within the platform: if initializing a
service, then a service advertisement
(net.jxta.protocol.ServiceAdvertisement) is passed to it.
When booting an application, the group's peer group advertisement
(net.jxta.protocol.PeerGroupAdvertisement) is handed to
it. Once initialization has finished, the startApp
method is invoked, which is the signal to begin. At
startApp, most applications will spawn a thread in which
to run application code. A stopApp method must also be
implemented to give the platform a way to end the program cleanly. (In
future Java bindings, the program may not be allowed to access the
platform after a given timeout if it cannot handle the
stopApp method properly.)
The binding requires a little more of services. Services need to
expose to applications and other services their methods, which is
accomplished by way of a Java interface returned by the
getInterface method. This interface contains methods
that can be called upon the service; or listeners can be registered
for asynchronous events. The interface must extend
net.jxta.service.Service. In practice, a proxy that
implements a shared interface with the service is preferred. Using a
proxy allows exposure only of the methods in the interface, hiding the
other methods in the service. A service also needs to be able to
return its advertisement for publication. In practice, this
advertisement is initially passed to the service as part of its
init method.
|
Let's start with building something simple, a service that will run in the Java binding, open a listening input pipe, and wait until a message has been received from the pipe. The service will also provide a single method that can be invoked to find a random peer's hello service input pipe and then send it a single message.
First we have to define the interface that the Service is going to
export to any application running within the platform. We're going to
build a very simple one, only providing a single method that finds a
peer running HelloService, opens a pipe to it, and sends
it a small message.
public interface HelloService
extends net.jxta.service.Service {
// find another peer to send a message to, then send
// it a message
public void sendMessage();
}
Next we need to provide an implementation of the service for the platform to run. Initialization of this service should be dealt with first.
public class HelloServiceImpl
extends Object
implements HelloService {
// a handle to the discovery service
private Discovery discovery;
// a handle to the pipe management service
private Pipe pipes;
// the peer group that this service is being
// booted into
private PeerGroup peerGroup;
// the advertisement for the peer group
private PeerGroupAdvertisement peerGroupAdvertisement;
// the advertisement for this service
private ServiceAdvertisement serviceAdvertisement;
// the advertisement
private PipeAdvertisement helloPipeAdvertisement;
// initialize this service
public void init( PeerGroup group, Advertisement advertisement )
throws PeerGroupException {
// save away all the variables given to us
this.peerGroup = group;
this.peerGroupAdvertisement = group.getAdvertisement();
this.serviceAdvertisement = (ServiceAdvertisement)advertisement;
// get handles to the services that we are going to need later
this.discovery = this.peerGroup.getDiscovery();
this.pipes = this.peerGroup.getPipe();
// start the application running
startApp( null );
}
// provide the ability to get a handle to this
// service. in more robust services, a proxy object
// should be passed out to protect the service from
// malicious applications
public Service getInterface() {
return this;
}
// simply return the service advertisement that
// has been passed to us
public Advertisement getAdvertisement() {
return this.serviceAdvertisement;
}
Next we provide a method to start the service. This service
creates a pipe to listen for incoming messages. When it gets a
message, it writes to System.out who sent the message.
Before we open the pipe, however, we need to publish the pipe
advertisement into the peer group so that other peers may find it.
// a simple helper function that will create a pipe advertisement
// for the input pipe that we are going to open up
private PipeAdvertisement createPipeAdvertisement() {
try {
// create the pipe advertisement object
PipeAdvertisement pipeAdvertisement = (PipeAdvertisement)AdvertisementFactory.newAdvertisement( PipeAdvertisement.getAdvertisementType() );
pipeAdvertisement.setPipeID( new PipeID( this.peerGroup.getID() ) );
// create a simple name for the pipe that we can do
// an easy lookup on when we are searching later on
pipeAdvertisement.setName( "HelloService:" + this.peerGroup.getPeerID() );
return pipeAdvertisement;
} catch( InvocationTargetException error ) {
System.err.println( "problem creating the pipe advertisement" );
return null;
}
}
// the input pipe that this service is going to be
// listening on
private InputPipe helloPipe;
// the thread that the service is going to be
// running within
private Thread runningThread;
// start this service -- calling this method will spawn
// a thread that we can use to listen for messages coming
// down the pipe
public int startApp( String[] arguments ) {
// create the pipe that we are going to use to listen for
// messages
try {
// now we are going to publish the pipe that we are
// going to use both in our local cache and into the
// caches of the other peers in this peer group
this.helloPipeAdvertisement = createPipeAdvertisement();
if( this.helloPipeAdvertisement == null )
throw new PeerGroupException();
this.discovery.publish( this.helloPipeAdvertisement, Discovery.ADV );
this.discovery.remotePublish( this.helloPipeAdvertisement, Discovery.ADV );
// create the input pipe that we can use
this.helloPipe = this.pipes.createInputPipe( this.helloPipeAdvertisement );
} catch( IOException error ) {
System.err.println( "problem publishing the advertisement for the input pipie" );
} catch( Exception error ) {
System.err.println( "problem opening the pipe to read from" );
} finally {
if( this.helloPipe == null ) {
System.err.println( "do not have a pipe to read from" );
return 1;
}
}
// start a listening thread that constantly listens
// for messages on the input pipe. as a standard, we choose to
// format our messages with a "sender" symbolic name that
// contains the peer id that sent us the message. when
// a message comes down the pipe, we will just write
// something to System.out
this.runningThread = new Thread( new Runnable() {
public void run() {
try {
// block until we receive a message
Message message = null;
while( ( message = HelloServiceImpl.this.helloPipe.waitForMessage() ) != null ) {
// see if this thread has been
// interrupted
if( Thread.isInterrupted() )
break;
// read the sender information
InputStream senderInputStream = message.pop( "sender" );
byte[] senderBytes = new byte[senderInputStream.available()];
senderInputStream.read( senderBytes );
String sender = new String( senderBytes );
// print out the information from this message
System.out.println( "\"" + sender + "\" says hello" );
}
} catch( IOException error ) {
System.err.println( "problem reading from the input pipe" );
} finally {
HelloServiceImpl.this.helloPipe.close();
}
}
});
// start running the listening thread
this.runningThread.start();
return 0;
}
// the corresponding method to startApp -- this
// interrupts the currently running thread
public void stopApp() {
if( this.runningThread != null )
this.runningThread.interrupt();
}
|
Now that the "server" end of the P2P service has been written, it's
time to write the "client" end. The HelloService
interface provides one method, sendMessage, that will
open an output pipe to a random peer's input pipe and send it a
message.
// the method that the application may call to send
// a message to another random peer running the HelloService
public void sendMessage() {
// send out a query to all peers in our peer group
// looking for advertisements of pipes that are
// prefixed with "HelloService:" as the pipes
// for the HelloService are all named like that
this.discovery.getRemoteAdvertisements( null, Discovery.ADV, "name", "HelloService:*", 10 );
// wait 5 seconds for all the responses to come
// in and be incorporated into the platform's cache
try {
Thread.sleep( 5000 );
} catch( InterruptedException error ) {
}
try {
// retrieve a list of all the advertisements
// from our cache of responses that match the
// query
Enumeration pipeEnumeration = this.discovery.getLocalAdvertisements( Discovery.ADV, "name", "HelloService:*" );
if( pipeEnumeration != null ) {
// look for the first PipeAdvertisement
// out of all the advertisements that
// have been returned
PipeAdvertisement pipeAdvertisement = null;
while( pipeEnumeration.hasMoreElements() ) {
Advertisement advertisement = (Advertisement)pipeEnumeration.nextElement();
if( advertisement instanceof PipeAdvertisement ) {
pipeAdvertisement = (PipeAdvertisement)advertisement;
break;
}
}
if( pipeAdvertisement != null ) {
try {
// construct a non-blocking output pipe
// to the pipe that we found being
// advertised
OutputPipe helloPipe = this.pipes.createOutputPipe( pipeAdvertisement, Pipe.NonBlocking, -1 );
// construct a message with the sender
// name's as part of the object
Message helloMessage = this.pipes.createMessage();
helloMessage.push( "sender", new ByteArrayInputStream( this.peerGroup.getPeerID().toString().getBytes() ) );
// send the message down the pipe
helloPipe.send( helloMessage );
System.out.println( "message sent!" );
} catch( IOException error ) {
System.err.println( "problem sending the message to the peer" );
} finally {
helloPipe.close();
}
}
} else
System.err.println( "could not find a pipe to talk to" );
} catch( IOException error ) {
System.err.println( "problem retrieving advertisements from the local cache" );
}
}
}
A service is useless without an application to call it. The next
step is to create an application that will call the
HelloService.
public class HelloApp
extends Object
implements Application {
// a handle to the running hello service on this platform
private HelloService helloService;
// initialize the application
public void init( PeerGroup group, Advertisement advertisement )
throws PeerGroupException {
// get a handle to the hello service
try {
this.helloService = (HelloService)group.lookupService( "HelloService" );
} catch( ServiceNotFoundException error ) {
System.err.println( "could not find the hello service" );
throw new PeerGroupException();
}
}
// start the application -- all that is done here is to call
// the sendMessage method on the hello service
public int startApp( String[] arg ) {
// call the one method on the service
this.helloService.sendMessage();
return 0;
}
// does not actually need to do anything
public void stopApp() {
}
}
Running this code is harder than is ideal. The JXTA Java platform
is presently optimized to run the JXTA Shell. Launching the Shell
prepares the platform to run JXTA (it creates a
jxtaConfig file and a PlatformPeerGroup
advertisement directory in the current working directory), so you have
to launch the Shell once to allow it to configure the node. Then you
can terminate the shell and edit the jxtaConfig and the
PlatformPeerGroup files to make the platform do what we
want it to do.
The jxtaConfig properties file contains information
that's used to create the PlatformPeerGroup advertisement
file that the node uses to configure itself. There are two lines to
edit. First, change the value of the isRendezvous
property to "true". This peer will act as a rendezvous for its
specific peer group (a discussion of rendezvous and router peers is
beyond the scope of this article). Second, change the value of
InitialNetPeerGroupAppCode to the class name of the
HelloApp. If you look carefully, you will notice
InitialNetPeerGroupAppCodeURL which ideally would be set
to a URL from which the platform could download the application code.
This property will be ignored if the application classes are in the
classpath when the platform's invoked.
PlatformPeerGroup is the group advertisement (XML
document) for the group that the platform boots into. Modify the
value of the Name tag to HelloPeerGroup,
which will cause this platform to boot into its own custom peer group,
separate from the default network peer group. Then modify
isRendezvous to "true". Finally, we need to add a new
service advertisement to this peer group so that the platform will
boot the HelloService. Add the following advertisement
at the end of the Services element, which is the advertisement passed
to the HelloServiceImpl upon initialization:
<jxta:ServiceAdvertisement>
<Name>
HelloService
</Name>
<Version>
1.0
</Version>
<Keywords>
HelloService
</Keywords>
<PipeService>
</PipeService>
<Uri>
file://hello.jar
</Uri>
<Provider>
oreilly.com
</Provider>
<Security>
TBD
</Security>
<Code>
HelloServiceImpl
</Code>
</jxta:ServiceAdvertisement>
When the platform is restarted, it will boot into the
HelloPeerGroup while running the
HelloService. It will create an input pipe, create an
output pipe, connect the two together, and send a message. If you are
lucky (or are running two peers), then the application will connect to
another service and send it a message containing its peer ID.
Otherwise, the service will probably connect to its own pipe and
sending itself a message.
Sun's Java binding for JXTA is a long way from being completed. The code needs to mature before developers can use it to develop robust P2P applications. However, by building toy projects and learning the terminology along with the basic concepts, developers will be better poised to use it when it matures.
Raffi Krikorian makes a career of hacking everything and anything. Professionally, he is the founding partner at Synthesis Studios: a technological design and consulting firm that orchestrates his disjointed train of thought.
Return to ONJava.com.
Copyright © 2009 O'Reilly Media, Inc.