Code Trace

The REST API handler of StaticFlowPusher is registered here.

// src/main/java/net/floodlightcontroller/staticflowentry/web/StaticFlowEntryWebRoutable.java
router.attach("/json", StaticFlowEntryPusherResource.class);
router.attach("/json/store", StaticFlowEntryPusherResource.class);
router.attach("/json/delete", StaticFlowEntryDeleteResource.class);

When adding a static flow, the store() will be invoked.

// src/main/java/net/floodlightcontroller/staticflowentry/web/StaticFlowEntryPusherResource.java
@Post
@LogMessageDoc(level="ERROR",
    message="Error parsing push flow mod request: {request}",
    explanation="An invalid request was sent to static flow pusher",
    recommendation="Fix the format of the static flow mod request")
public String store(String fmJson) {
    IStorageSourceService storageSource =
        (IStorageSourceService)getContext().getAttributes().
        get(IStorageSourceService.class.getCanonicalName());

    Map<String, Object> rowValues;
    try {
        rowValues = StaticFlowEntries.jsonToStorageEntry(fmJson);
        String status = null;
        if (!checkMatchIp(rowValues)) {
            status = "Warning! Pushing a static flow entry that matches IP " +
                "fields without matching for IP payload (ether-type 2048) will cause " +
                "the switch to wildcard higher level fields.";
            log.error(status);
        } else {
            status = "Entry pushed";
        }
        storageSource.insertRowAsync(StaticFlowEntryPusher.TABLE_NAME, rowValues);
        return ("{\"status\" : \"" + status + "\"}");
    } catch (IOException e) {
        log.error("Error parsing push flow mod request: " + fmJson, e);
        return "{\"status\" : \"Error! Could not parse flod mod, see log for details.\"}";
    }
}

StaticFlowPusher first inserts the flow into a database by calling insertRowAsync(), followed by insertRow(), insertRowImpl() and insertRowsAndNotify().

// src/main/java/net/floodlightcontroller/staticflowentry/StaticFlowEntryPusher.java
public static final String TABLE_NAME = "controller_staticflowtableentry";
// src/main/java/net/floodlightcontroller/storage/AbstractStorageSource.java
@Override
public Future<?> insertRowAsync(final String tableName,
    final Map<String,Object> values) {
    Future<?> future = executorService.submit(
        new StorageRunnable() {
            public void doStorageOperation() {
                insertRow(tableName, values);
            }
        }, null);
    return future;
}
// src/main/java/net/floodlightcontroller/storage/AbstractStorageSource.java
@Override
public void insertRow(String tableName, Map<String, Object> values) {
    updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
    insertRowImpl(tableName, values);
}

After the successful database insertion, a StorageSourceNotification.Action.MODIFY event is triggered.

// src/main/java/net/floodlightcontroller/storage/nosql/NoSqlStorageSource.java
protected void insertRowsAndNotify(String tableName, List<Map<String,Object>> insertRowList) {
    insertRows(tableName, insertRowList);
    sendNotification(tableName, StorageSourceNotification.Action.MODIFY, insertRowList);
}

@Override
public void insertRowImpl(String tableName, Map<String, Object> values) {
    ArrayList<Map<String,Object>> rowList = new ArrayList<Map<String,Object>>();
    rowList.add(values);
    insertRowsAndNotify(tableName, rowList);
}

The StorageSourceNotification.Action.MODIFY event can be captured by implementing IStorageSourceListener.

// src/main/java/net/floodlightcontroller/storage/AbstractStorageSource.java
protected synchronized void notifyListeners(StorageSourceNotification notification) {
   if (logger.isTraceEnabled()) {
       logger.trace("Notifying storage listeneres: {}", notification);
   }
   String tableName = notification.getTableName();
   Set<Object> keys = notification.getKeys();
   Set<IStorageSourceListener> tableListeners = listeners.get(tableName);
   if (tableListeners != null) {
       for (IStorageSourceListener listener : tableListeners) {
           try {
               switch (notification.getAction()) {
                   case MODIFY:
                       listener.rowsModified(tableName, keys);
                       break;
                   case DELETE:
                       listener.rowsDeleted(tableName, keys);
                       break;
               }
           }
           catch (Exception e) {
               logger.error("Exception caught handling storage notification", e);
           }
       }
   }
}

The event handler is defined in rowsModified(). Upon receiving the event, StaticFlowPusher installs the flow into the switch by writeOFMessagesToSwitch().

// src/main/java/net/floodlightcontroller/staticflowentry/StaticFlowEntryPusher.java
@Override
public void rowsModified(String tableName, Set<Object> rowKeys) {
    // This handles both rowInsert() and rowUpdate()
    log.debug("Modifying Table {}", tableName);
    HashMap<String, Map<String, OFFlowMod>> entriesToAdd =
        new HashMap<String, Map<String, OFFlowMod>>();
    // build up list of what was added
    for (Object key: rowKeys) {
        IResultSet resultSet = storageSource.getRow(tableName, key);
        Iterator<IResultSet> it = resultSet.iterator();
        while (it.hasNext()) {
            Map<String, Object> row = it.next().getRow();
            parseRow(row, entriesToAdd);
        }
    }
    // batch updates by switch and blast them out
    for (String dpid : entriesToAdd.keySet()) {
        if (!entriesFromStorage.containsKey(dpid))
            entriesFromStorage.put(dpid, new HashMap<String, OFFlowMod>());

        List<OFMessage> outQueue = new ArrayList<OFMessage>();
        for(String entry : entriesToAdd.get(dpid).keySet()) {
            OFFlowMod newFlowMod = entriesToAdd.get(dpid).get(entry);
            //OFFlowMod oldFlowMod = entriesFromStorage.get(dpid).get(entry);
            OFFlowMod oldFlowMod = null;
            String dpidOldFlowMod = entry2dpid.get(entry);
            if (dpidOldFlowMod != null) {
                oldFlowMod = entriesFromStorage.get(dpidOldFlowMod).remove(entry);
            }
            if (oldFlowMod != null && newFlowMod != null) {
                // set the new flow mod to modify a pre-existing rule if these fields match
                if(oldFlowMod.getMatch().equals(newFlowMod.getMatch())
                        && oldFlowMod.getCookie() == newFlowMod.getCookie()
                        && oldFlowMod.getPriority() == newFlowMod.getPriority()){
                    newFlowMod.setCommand(OFFlowMod.OFPFC_MODIFY_STRICT);
                // if they don't match delete the old flow
                } else{
                    oldFlowMod.setCommand(OFFlowMod.OFPFC_DELETE_STRICT);
                    if (dpidOldFlowMod.equals(dpid)) {
                        outQueue.add(oldFlowMod);
                    } else {
                        writeOFMessageToSwitch(HexString.toLong(dpidOldFlowMod), oldFlowMod);
                    }
                }
            }
            // write the new flow
            if (newFlowMod != null) {
                entriesFromStorage.get(dpid).put(entry, newFlowMod);
                outQueue.add(newFlowMod);
                entry2dpid.put(entry, dpid);
            } else {
                entriesFromStorage.get(dpid).remove(entry);
                entry2dpid.remove(entry);
            }
        }
        writeOFMessagesToSwitch(HexString.toLong(dpid), outQueue);
    }
}

Note

It seems that StaticFlowPusher does not utilize barrier request/response to ensure the successful installation of flow entries.