I have been working on a Scala client, which I forked from Alejandro Crosa's repository. I implemented pubsub very recently and also have integrated it with Akka actors. The full implementation of the pubsub client in Scala is in my github repository. And if you like to play around with the Akka actor based implementation, have a look at the Akka repository.
You define your publishers and subscribers as actors and exchange messages over channels. You can define your own callbacks as to what you would like to do when you receive a particular message. Let's have a look at a sample implementation at the client level .. I will assume that you want to implement your own pub/sub application on top of the Akka actor based pubsub substrate that uses the redis service underneath.
Implementing the publisher interface is easy .. here is how you can bootstrap your own publishing service ..
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Pub { | |
println("starting publishing service ..") | |
val p = new Publisher(new RedisClient("localhost", 6379)) | |
p.start | |
def publish(channel: String, message: String) = { | |
p ! Publish(channel, message) | |
} | |
} |
The
publish
method just sends a Publish
message to the Publisher
. Publisher
is an actor defined in Akka as follows:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Publisher(client: RedisClient) extends Actor { | |
def receive = { | |
case Publish(channel, message) => | |
client.publish(channel, message) | |
reply(true) | |
} | |
} |
The subscriber implementation is a bit more complex since you need to register your callback as to what you would like to do when you receive a specific type of message. This depends on your use case and it's your responsibility to provide a proper callback function downstream.
Here is a sample implementation for the subscriber. We need two methods to subscribe and unsubscribe from channels. Remember in Redis the subscriber cannot publish - hence our
Sub
cannot do a Pub
.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Sub { | |
println("starting subscription service ..") | |
val s = new Subscriber(new RedisClient("localhost", 6379)) | |
s.start | |
s ! Register(callback) | |
def sub(channels: String*) = { | |
s ! Subscribe(channels.toArray) | |
} | |
def unsub(channels: String*) = { | |
s ! Unsubscribe(channels.toArray) | |
} | |
def callback(pubsub: PubSubMessage) = pubsub match { | |
//.. | |
} | |
} |
I have not yet specified the implementation of the callback. How should it look like ?
The callback will be invoked when the subscriber receives a specific type of message. According to Redis specification, the types of messages which a subscriber can receive are:
a. subscribe
b. unsubscribe
c. message
Refer to the Redis documentation for details of these message formats. In our case, we model them as case classes as part of the core Redis client implementation ..
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sealed trait PubSubMessage | |
case class S(channel: String, noSubscribed: Int) extends PubSubMessage | |
case class U(channel: String, noSubscribed: Int) extends PubSubMessage | |
case class M(origChannel: String, message: String) extends PubSubMessage |
Our callback needs to take appropriate custom action on receipt of any of these types of messages. The following can be one such implementation .. It is customized for a specific application which treats various formats of messages and gives appropriate application dependent semantics ..
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def callback(pubsub: PubSubMessage) = pubsub match { | |
case S(channel, no) => println("subscribed to " + channel + " and count = " + no) | |
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no) | |
case M(channel, msg) => | |
msg match { | |
// exit will unsubscribe from all channels and stop subscription service | |
case "exit" => | |
println("unsubscribe all ..") | |
r.unsubscribe | |
// message "+x" will subscribe to channel x | |
case x if x startsWith "+" => | |
val s: Seq[Char] = x | |
s match { | |
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => } | |
} | |
// message "-x" will unsubscribe from channel x | |
case x if x startsWith "-" => | |
val s: Seq[Char] = x | |
s match { | |
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString) | |
} | |
// other message receive | |
case x => | |
println("received message on channel " + channel + " as : " + x) | |
} | |
} |
Note in the above implementation we specialize some of the messages to give additional semantics. e.g. if I receive a message as "+t", I will interpret it as subscribing to the channel "t". Similarly "exit" will unsubscribe me from all channels.
How to run this application ?
I will assume that you have the Akka master with you. Also you need to have a version of Redis running that implements pubsub. You can start the subscription service using the above implementation and then use any other Redis client to publish messages. Here's a sample recipe for a run ..
Prerequisite: Need Redis Server running (the version that supports pubsub)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1. Download redis from http://github.com/antirez/redis | |
2. build using "make" | |
3. Run server as ./redis-server | |
For running this sample application :-
Starting the Subscription service
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1. Open a shell and set AKKA_HOME to the distribution root | |
2. cd $AKKA_HOME | |
3. sbt console | |
4. scala> import sample.pubsub._ | |
5. scala> Sub.sub("a", "b") // starts Subscription server & subscribes to channels "a" and "b" | |
Starting a Publishing service
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1. Open up another shell similarly as the above and set AKKA_HOME | |
2. cd $AKKA_HOME | |
3. sbt console | |
4. scala> import sample.pubsub._ | |
5. scala> Pub.publish("a", "hello") // the first shell should get the message | |
6. scala> Pub.publish("c", "hi") // the first shell should NOT get this message | |
Another publishing client using redis-cli
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Open up a redis-client from where you installed redis and issue a publish command | |
./redis-cli publish a "hi there" ## the first shell should get the message |
Have fun with the message formats
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1. Go back to the first shell | |
2. Sub.unsub("a") // should unsubscribe the first shell from channel "a" | |
3. Study the callback function defined below. It supports many other message formats. | |
4. In the second shell window do the following: | |
scala> Pub.publish("b", "+c") // will subscribe the first window to channel "c" | |
scala> Pub.publish("b", "+d") // will subscribe the first window to channel "d" | |
scala> Pub.publish("b", "-c") // will unsubscribe the first window from channel "c" | |
scala> Pub.publish("b", "exit") // will unsubscribe the first window from all channels |
The full implementation of the above is there as a sample project in Akka master. And in case you are not using Akka, I also have a version of the above implemented using Scala actors in the scala-redis distribution.
Have fun!
3 comments:
Hello,
i'm interested in this approach as I'm using redis in a context where it could some day make use of this. How does it compare to the existing akka remote actors? I see you did a commit recently:
https://github.com/debasishg/akka-redis-pubsub/blob/master/src/main/scala/RedisPubSub.scala
I don't understand why you need to import akka.persistenc.redis ? (support of akka persistence has been dropped )
BTW, i'd consider to use your scala redis, but I need transactions. any chance?
regards,
Ingvar
Hello -
Akka remote actors are more powerful and offers a complete implementation of the actor model. Redis pubsub is lightweight and is only focused for that functionality only. If u need only pubsub, then u can give it a try.
Regarding the import, it's not required. I will rectify it whenever I get some time.
Regarding transaction support, r u talking about MULTI/EXEC ? It's supported in scala-redis. Have a look at https://github.com/debasishg/scala-redis/blob/master/src/main/scala/com/redis/RedisClient.scala#L62 and https://github.com/debasishg/scala-redis/blob/master/src/test/scala/com/redis/PipelineSpec.scala ..
Thanks for your interest in scala-redis.
Hi!
thanks for the answer. Really nice code. much shorter than jedis.
Understanding the code is a good scala exercise :-)
Is there any documentation I'm missing? some starting point? When I get some time to try it out, I might contrib with some simple tutorial on the github wiki if you like.
Another question: it seems you only have commands that accept strings as input, is that right? No way to directly use byte[] as input?
Btw, relying on String.getBytes and new String(somebytearray) is very slow! have a look at my converter https://github.com/ib84/castriba it would be suitable only for "payload" strings, not internals, but it's 3 times faster!
regards,
Ingvar
Post a Comment