SWE 622, Spring 2017, Homework 6, due 5/10/2017 4:00pm.

Introduction

Throughout the semester, you will build a sizable distributed system: a distributed in-memory filesystem. The goal of this project is to give you significant hands-on experience with building distributed systems, along with a tangible project that you can describe to future employers to demonstrate your (now considerable) distributed software engineering experience. We’ll do this in pieces, with a new assignment every two weeks that builds on the prior. You’re going to create an actual filesystem – so when you run CFS, it will mount a new filesystem on your computer (as if you are inserting a new hard disk), except that the contents of that filesystem are defined dynamically, by CFS (rather than being stored on a hard disk).

While we’re not focusing on making this a high-throughput system (for simplicity, we are implementing the filesystem driver in Java, which will inherently add some sizable performance penalty compared to implementing it in native code), you should be able to see the application of our system on the real world: often times, we build services that need to share data. If the total amount of data that we need to store is large, then we’ll probably set up some service specifically devoted to storing that data (rather than storing it all on the same machines that are performing the computation). For performance, we’ll probably want to do some caching on our clients to ensure that we don’t have to keep fetching data from the storage layer. Once we think about this caching layer a little more, it becomes somewhat more tricky in terms of managing consistency, and that’s where a lot of the fun in this project will come in. Here’s a high-level overview of the system that we are building:

Each computer that is interested in using the filesystem will run its own copy of the filesystem driver (we’ll call it the CloudFS, or CFS for short). We’ll use Dropbox’s web API to provide the permanent storage, and will implement a cache of our Dropbox contents in our cfs driver. A lock service will make sure that only a single client writes to a file at a time (HW2). Over the course of the semester, we’ll set up the cache to be replicated among multiple CFS clients (HW3), distribute the lock server to increase fault tolerance (HW4), investigate fault recovery (HW5), and use sharding to distribute the cache (HW6).

Acknowledgements

Our CloudFS is largely inspired by the yfs lab series from MIT’s 6.824 course, developed by Robert Morris, Frans Kaashoek and the 6.824 staff.

Homework 6 Overview

For the final version of CFS, you will be configuring CFS for sharding, so that the load of splitting the cached file data is split between multiple Redis servers. As shown in the figure below, we will set up two different groups of Redis servers, each with a master and replicas:

You’ll have two different shards (each equivalent to how HW5 was set up). Half of the data will be allocated to one shard, and the other half to the other shard. Within each shard, there will be a master and some number of slaves (when you start out, there might be 0 slaves, as with HW5).

Now, when your CFS client starts up, it will need to decide if it joins shard 1 or shard 2. Each client will join whichever group has the fewest members currently. Once it decides which shard to join, it will configure its own Redis cache to be the master (if there isn’t one yet), or a replica of the current master of that shard. To read data, CFS will first identify which shard to access, then read from a slave of that shard (or the master, if there are no slaves). To write data, CFS will first identify which shard to access, and then write to the master of that shard. After writing, CFS will issue its usual WAIT command, noting now to WAIT for only the number of replicas in that shard, rather than the total number of Redis servers in the entire system (that is, the total between the two shards).

In terms of fault tolerance: we will look at an interesting twist: re-balancing the shards. Consider the case where shard 1 has two Redis members (one is the master, one is the standby replica), and shard 2 has only a single Redis member (the master). If the master in shard 2 goes offline, then there will be no leader. In this case, we would want the replica from shard 1 to switch to shard 2, and become the master of it.  You will not need to worry about promotion from slave to master within a shard (that was effectively what we did in HW5), and you do not need to worry about proactive health checks with PINGs to Redis (that was what we did in HW5). More details on this are below.

This assignment totals 100 points, and will correspond to 10% of your final grade for this course.

Getting Started

Start out by importing the HW6 GitHub repository. Then, clone your private repository on your machine, and import it to your ide. Copy your CFS client from HW5 into your HW6 project. You may choose to start your HW6 CloudProvider from scratch, or build directly upon what you had in HW5. HW6 will not require any of the failure detection code.

Academic honesty reminder: You may NOT share any of your code with anyone else. You may NOT post your code in a publicly viewable place (e.g. in a public GitHub repository).  We are providing you with your own private git repository (using the link above) to host your code securely. You may face severe penalties for sharing your code, even “unintentionally.” Please review the course’s academic honesty policy.

You’ll notice there are now several projects in the repository:

  • fusedriver – Your CFS driver, just like in HW5
  • lock-server – The Lock server code, just like in HW5. You should not change anything in this directory. The semantics for the lock server will be implemented by each client, on top of ZooKeeper. The lock server exists to create a cluster of ZooKeeper servers for your CFS clients to connect to.

You’ll also find a Vagrantfile in the repository, similar to the configuration used for HW5. It will not run the default Redis server.

VM warning: All of the VMs I’m distributing forward several ports from your local machine into the VM for ease of use/debugging (for instance, 5005 is used for remotely debugging stuck Maven test processes). This means that you can NOT run multiple VMs from this class at the same time, or you’ll get an error. If you still have your HW5 (or Lab 6 or other) VM running when you try to start this one, you’ll get an error. To resolve, run ‘vagrant suspend’ in the other VM directory, and then you’ll be able to start this one.

At a high level, your task is to modify your  edu.gmu.swe622.cloud.RedisCacheProvider so that: (1) it uses a custom GroupMember implementation to decide which shard to join (and then do so), (2) it uses a LeaderLatch to determine which Redis server will be the master of the shard (and configures itself to do so), (3) direct reads and writes to the appropriate shard and master/replica, and (4) re-balance Redis servers between shards. Note that for this assignment, you will be graded on all error handling aspects of your ZooKeeper interactions (that is, prepare for the possibility that a ZooKeeper operation might not succeed).

Additional details are provided below.

Tips

You should copy your config.properties over from HW4.

You can run the lock server in the VM, by running:

And, you can run the CFS client:

The first time you run it, you’ll need to re-authorize it for Dropbox (the config files that authorized it before were local to the HW1,2,3,4 VM).

Debugging remote JVMs

You can launch a Java process and have it wait for your Eclipse debugger to connect to it by adding this to the command line:

After you make this change, disconnect from the VM if you are connected, then do “vagrant reload” before “vagrant ssh” to apply your changes.

Then, in Eclipse, select Debug… Debug configurations, then “Remote Java Application”, then “create new debug configuration.” Select the appropriate project, then “Connection Type” => “Standard (socket attach)”, “Host” => “localhost” and “port” => 5005 (if you are NOT using Vagrant) or 5006 (if you ARE using Vagrant).

Testing

A good way to test this assignment will be to have multiple instances of CFS running, each mounting your filesystem to a different place. Then, you can observe if writes appear correctly across the clients.

Dealing with ZooKeeper

ZooKeeper is set up to not persist any of its state to disk. That is, if you kill the ZooKeeper server and then restart it, it will be empty. This is done to make it easier for you to test: if you accidentally take out locks on everything, then your client crashes, it’s annoying to clean everything up. This way, if you ever end up in an inconsistent state, you can always stop the ZooKeeper server and restart it (bringing it back to a clean state). Of course, this means that the ZooKeeper is not very fault tolerant – but is just a configuration option, that’s easily changed if we wanted to make it so.

Remember to make sure that you close your ZooKeeper session before your CFS client terminates! This will allow for automatic cleanup of things like locks and group membership. A good way to do it is by adding this code early on, for instance in your main function:

 

Part 1: Shard Membership (30 points)

When starting up, each CFS client will determine which shard its Redis server will belong to: shard 1 or shard 2. It should join the smallest shard group: the one with the fewest current members. A simple solution would be to have two groups, one for each shard, and then join the shard with the fewest current members.

The problem with this, unfortunately, is that it would be prone to data races. Consider the following interleaving:

Client 1: Read number of clients in shard 1 (1), shard 2 (0)
Client 2: Read number of clients in shard 1 (1), shard 2 (0)
Client 3: Read number of clients in shard 1 (1), shard 2 (0)
Client 1: Join shard 2 (now has 1)
Client 2: Join shard 2 (now has 2)
Client 3: Join shard 2 (now has 3)
Result: shard 1 has 1 client, shard 2 has 3 clients – and it is not balanced.

Hence, you’ll need to make a new lock in Curator/ZooKeeper to control this process, so that we can ensure that when a node is reading the group contents, it does not race against other nodes joining those groups.

So, when a node comes online, it will:

  1. Acquire a lock, say, /lock/shardMembership
  2. Find the number of members in shard 1
  3. Find the number of members in shard 2
  4. Become a  GroupMember of /groups/shardN (where N is the correct group)
  5. Release the lock

There are a few tricky bits here though, and we can not use Curator out-of-the-box for knowing the group membership here. It is very important that when we assign nodes to a shard, we do it evenly. Otherwise, if we started 100 nodes at once, it is possible that we might end up with a very disproportionate number (perhaps all) of the nodes flock to one shard. Why?

Curator’s  GroupMember starts asynchronously – when you call start, it goes off and will join the group, but will return immediately (perhaps before the group is joined). Hence, it is possible even when using the lock that clients will see the “wrong” number of members in the group, because even though a node intended to join the group, it didn’t actually complete its join operation before releasing the lock. Similarly, Curator’s GroupMember caches the membership of a group, and hence, getMembers() can return stale values.

Instead, create your own implementation of GroupMember, in edu.gmu.swe622.cloud.BlockingGroupMember (similar to lab 10), which will behave as follows:

  • Creating a new BlockingGroupMember() will set up some resources, but not join the group
  • BlockingGroupMember.getMembers() will return the current list of members in the group. It will retrieve this value directly from ZooKeeper (without caching it), and will work regardless of if that  BlockingGroupMember has been start’ed yet.
  • BlockingGroupMember.startAndBlockUntilJoined() will join the group, and will wait for ZooKeeper to acknowledge the join before returning.

Rather than re-invent the entire wheel, you should use Curator’s  org.apache.curator.framework.recipes.nodes.PersistentNode.PersistentNode to represent your client’s presence in a group (using  CreateMode.EPHEMERAL when creating the node, which will ensure that the node doesn’t outlive the client).

Part 2: Shard Leadership (10 points)

Each shard will have a single redis master, and a variable number of slaves. In this regard, each shard will function just like in HW5, with a leader latch being used to elect a single leader.

Technically, this will involve:

  1. Each CFS client will create a LeaderLatch at the same path (say, /leader/redisShardN). When creating the latch, each CFS client will specify the hostname and port for its local Redis server.
  2. After starting the latch, the leader can be determined by calling getLeader().
  3. If your CFS client is the leader, then it should configure its Redis server as a master. If it is not, then it should configure itself as a slave of the leader.

Make sure to handle various error conditions, which might include:

  • Inability to create the latch
  • Inability to get the current leader
  • Disconnection or other failure causing you to no longer be the leader

In these cases, it is probably wise to throw an IOException() which will then get propagated back to the process trying to use your filesystem.

Part 3: Directing Reads and Writes (40 points)

Each shard will be responsible for handling roughly half of the data being cached. So, now when we want to write data, we will direct that write to the master of the correct shard, and when we want to read data, we will read that data from the master of that shard (note: no need to read from slave, can always read/write from whoever is the master). If there are no servers in that shard, then we will ignore the cache, and read/write directly from DropBox. Hence, your RedisCacheProvider will need to keep track of up to two different Redis pools: a  JedisPool for Shard 1 and the same for Shard 2.

Given a key, you will decide which shard to use based on Consistent Hashing. Use Guava’s  com.google.common.hash.Hashing class to generate hash codes (use SHA-256), and  com.google.common.hash.Hashing.consistentHash to decide which shard to use (using 2 buckets).

You do not need to worry about using  WAIT for the purposes of this assignment.

There are two tricky bits with ZooKeeper here:

1 – Again, Curator’s GroupMember implementation is not sufficient here. Curator’s GroupMember will only report the clients participating in a group after YOU have joined it, but we want to find members of groups that we are not in (namely, members of the other shard). In part 1, you implemented the BlockingGroupMember, which has a method to find the current members of a group without joining it. You’ll use that here.

2 – Curator’s LeaderLatch can only be used to report the leader of a latch that we are participating in. Again, this is over-restrictive: we will need to find out the leader of a LeaderLatch that we are NOT participating in, namely, the master node in the other shard. We have provided code in RedisCacheProvider to do this, in  getLeader(String latchPath), which will return the current leader of a latch regardless of whether your client is involved in that latch.

Part 4: Cross-shard Failover (20 points)

In this assignment, we’ll consider a new kind of failover: in the case that all members of a shard leave (by crashing and timing out from ZooKeeper or simply quitting), if there are multiple Redis servers currently assigned to the other shard, then you should move one of them to the now empty shard. You do NOT need to worry about re-balancing slaves (e.g. if one shard has a master and 9 replicas, and the other shard has 0 servers, the correct behavior is to promote a replica from the occupied shard to become the master of the empty shard, so that the end result is one shard with a master and 8 replicas and the other with a master and 0 replicas). A node which is currently the master in one shard, then, should never consider moving to the other shard; at most a single slave should move between shards in failure.

You do NOT need to implement any proactive failure detector: you can notice failures just by noticing that there are now zero group members, and initiating a failover.  Similarly, you do NOT need to worry about promoting a replica to a master within a shard (that’s exactly what you did on HW5!).

Imagine you are in the following situation: shard 1 has nodes M1 (master), R2, R3, R4 (all replicas. Shard 2 is now empty. R2, R3, and R4 will all attempt to become leader of shard 2; one will succeed, and the remainder will exit that leadership contention (consider the importance of this in the event of new nodes joining shard 2 later on).

You will want to use the Redis SLAVEOF command to promote and reconfigure replicas. When switching between shards, you’ll also want to use the FLUSHALL command to remove the now irrelevant data (that had belonged to the other shard).

Submission

Perform all of your work in your homework-6 git repository. Commit and push your assignment. Once you are ready to submit, create a release, tagged “hw6.” Unless you want to submit a different version of your code, leave it at “master” to release the most recent code that you’ve pushed to GitHub. Make sure that your name is specified somewhere in the release notes. The time that you create your release will be the time used to judge that you have submitted by the deadline. This is NOT the time that you push your code.

Make sure that your released code includes all of your files and builds properly. You can do this by clicking to download the archive, and inspecting/trying to build it on your own machine/vm. There is no need to submit binaries/jar files (the target/ directory is purposely ignored from git). It is accepted (and normal) to NOT include the config.properties file in your release.

If you want to resubmit before the deadline: Simply create a new release, we will automatically use the last release you published. If you submitted before the deadline, then decide to make a new release within 24 hours of the deadline, we will automatically grade your late submission, and subtract 10%. Any releases created more than 24 hours past the deadline will be ignored.

Reminder – Late policy: Late assignments will be accepted for 24 hours after the due date, for a penalty of 10%. After 24 hours have passed since the deadline, no late submissions will be accepted. Your release is what will be graded: if you forget to make a release, then we will assume that you chose to not submit the assignment.

Contact