Lab 7 - Example 3 : Multiple File Sort Using Supervisor Worker

Download code and large files that need to sorted

Tuple Space interface

//The TupleSpace interface
public interface TupleSpace{
 
    // deposits data in tuple space
    public void out (String tag, Object data);
 
    // extracts object with tag from tuple space, blocks if not available
      public Object in (String tag) throws InterruptedException;
 
    // reads object with tag from tuple space, blocks if not available
      public Object rd (String tag) throws InterruptedException;
 
    // extracts object if available, return null if not available
      public Object inp (String tag);
 
    //reads object if available, return null if not available
      public Object rdp (String tag);
}

Tuple Space Implementation

import java.util.*;
 
public class TupleSpaceImpl implements TupleSpace {
  private Hashtable tuples = new Hashtable(); //Hashtable of Vectors
 
  // deposits data in tuple space
  public synchronized void out (String tag, Object data) {
    Vector v = (Vector) tuples.get(tag);
    if (v==null) {
        v=new Vector();
        tuples.put(tag,v);
    }
    v.addElement(data);
    notifyAll();
  }
 
  private Object get(String tag, boolean remove) {
    Vector v = (Vector) tuples.get(tag);
    if (v==null) return null;
    if (v.size()==0) return null;
    Object o = v.firstElement();
    if (remove) v.removeElementAt(0);
    return o;
  }
 
  // extracts object with tag from tuple space, blocks if not available
  public synchronized Object in (String tag) throws InterruptedException {
    Object o;
    while ((o=get(tag,true))==null) wait();
    return o;
  }
 
  // reads object with tag from tuple space, blocks if not available
  public synchronized Object rd (String tag) throws InterruptedException {
    Object o;
    while ((o=get(tag,false))==null) wait();
    return o;
  }
 
  // extracts object if available, return null if not available
  public synchronized Object inp (String tag) {
    return get(tag,true);
  }
 
  //reads object if available, return null if not available
  public synchronized Object rdp (String tag) {
    return get(tag,false);
  }
 
}

Supervisor

import java.io.*;
import java.util.*;
 
/**
 * Reads one or more files to be sorted and puts their contents into the 
 * tuple space and then gets the results (the sorted lists) from the tuple space
 * @see TupleSpaceImpl
 * @see TupleSpace
 */
public class FileSortSupervisor extends Thread{
    private TupleSpace ts = null;
    private String files[];
 
    public FileSortSupervisor(TupleSpace _ts, String _files[]){
        ts = _ts;
        files = _files;
    }
 
    public void run(){
        //for every file, read all lines and put them into tuple space
        //tuple space can be any object and the tag for it
        try{
            for(String file: files){
                FileReader fr = new FileReader(file);
                BufferedReader br = new BufferedReader(fr);
 
                List<String> lines = new ArrayList<String>();
 
                String line = "";
                while((line = br.readLine()) != null)
                    lines.add(line);
 
                //put the lines into the TupleSpace                
                ts.out("task", lines);
            }    
        }catch(IOException ioex){
            ioex.printStackTrace();
        }        
 
        //create a list to which the results from worker will be added
        List list = new ArrayList();
 
        //get the results from the workers
        for(int i = 0; i < files.length; i++){
            Object result = null;
            try{ 
                result = ts.in("result");
            }catch(InterruptedException iex){
                //ignore
            }
            //the result will be a list so cast it to a list
            List sortedList = (List)result;
            //add the sortedList to the list
            list.addAll(sortedList);            
        }
 
        //Write the sorted out put to a file
        try{
            FileWriter fw = new FileWriter("sorted.txt");
            String s = "";
            for(Object obj : list){
                s = (String) obj;
                fw.write(s +"\n");
            }
            fw.close();
        }catch(IOException ioex){
            ioex.printStackTrace();
        }
 
        //put "stop" tuples into the tuple space so that workers can stop
        //the number of "stop" tuples must be same as the number of workers
        //otherwise workers will hang and never finish
        ts.out("task","stop");
        ts.out("task","stop");
    }
}

Worker

import java.util.*;
 
/**
 * The implementation of worker follows the discussion
 * from the lecture - algorithm for worker
 */
public class SortWorker extends Thread{
    private TupleSpace ts = null;
 
    public SortWorker(TupleSpace _ts){
        ts = _ts;
    }
 
    public void run(){
        while(true){
            //take a task from the Tuple space
            Object task = null;
            try{
                task = ts.in("task");
            }catch(InterruptedException iex){}
 
            //check the type of task using instanceof
            if(task instanceof String){
                String s = (String) task;
                //if the task is the "stop" tuple - stop this worker thread
                if(s.equals("stop"))
                    break;
            }
 
            //task is a list so cast it to a list
            List list = (List) task;
 
            //do some work - sort the list
            Collections.sort(list);
 
            //"out" the result (the list after sorting) into the tuple space
            ts.out("result", list);
        }
    }
}

Main program which starts the supervisor and workers and initiliazes the TupleSpace

/**
 * The program which starts the Supervisor and workers.
 * It also creates a tuple space which will be used by both the supervisors
 * and workers.
 * The example here uses many files which are not sorted and have large number  
 * of lines(5-10K lines per file). The supervisor puts the contents of each file
 * (as a list) into the tuple space. A worker takes the unsorted list from the
 * tuple space and sorts it and puts the sorted list (the result) into the tuple
 * space. The supervisor takes the result from all workers and merges them into
 * a single list and writes them to a file (see code for FileSortSupervisor)
 * Lastly the supervisor puts the "stop" tuple into the tuple space so that the
 * workers can stop running. 
 * @see FileSortSupervisor
 * @see SortWorker
 */
public class FileSortSuperWorker{
    public static void main(String args[]){
        if(args.length == 0){
            System.out.println("No files to process");
            System.out.println("Usage:java FileSortSuperWorker file1 file2 ..");
            System.exit(1);
        }
 
        String files[] = args; //these are names of the files to be sorted 
 
        //The Tuple space which is the Connector between supervisor and workers
        TupleSpace ts = new TupleSpaceImpl();
 
        //create a supervisor and pass it all files which need to sorted and 
        //merged after sorting by workers
        Thread fss = new FileSortSupervisor(ts, files);
        fss.start();
 
        //create worker threads and pass to them the reference of tuple space
        Thread sortWorker1 = new SortWorker(ts);
        Thread sortWorker2 = new SortWorker(ts);
 
        //start the workers
        sortWorker1.start();
        sortWorker2.start();
    }
}