/* * Copyright 2011 Bill La Forge * * This file is part of AgileWiki and is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License (LGPL) as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This code is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * or navigate to the following url http://www.gnu.org/licenses/lgpl-2.1.txt * * Note however that only Scala, Java and JavaScript files are being covered by LGPL. * All other files are covered by the Common Public License (CPL). * A copy of this license is also included and can be * found as well at http://www.opensource.org/licenses/cpl1.0.txt */ package org.agilewiki.blip package exchange import annotation.tailrec import messenger._ import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.Semaphore /** * The Exchange class supports synchronous exchanges of messages when the exchange * receiving the request is idle. */ abstract class Exchange(threadManager: ThreadManager, async: Boolean = false, _bufferedMessenger: BufferedMessenger[ExchangeMessengerMessage] = null) extends ExchangeMessenger(threadManager, _bufferedMessenger) { /** * Tracks which exchange has control. If an exchange can gain control * over another exchange, it can send requests to it synchronously. */ val atomicControl = new AtomicReference[Exchange] /** * Recasts ExchangeRequest.curReq as an ExchangeRequest. */ override def curReq = super.curReq.asInstanceOf[ExchangeRequest] /** * Returns the controlling exchange, or null. */ def controllingExchange = atomicControl.get /** * The haveMessage method is called by a thread when the thread is been assigned to * the exchange. A call to haveMessage results a call to poll, but only if * no other exchange is in control. */ override def haveMessage { if (async) super.haveMessage else if (atomicControl.compareAndSet(null, this)) { try { poll } finally { atomicControl.set(null) } } } /** * Process a request synchronously. */ private def _sendReq(exchangeMessengerRequest: ExchangeMessengerRequest) { if (exchangeMessengerRequest != null) { exchangeMessengerRequest.asInstanceOf[ExchangeRequest].fastSend = true exchangeReq(exchangeMessengerRequest) } poll } /** * If control can be gained over the target exchange, process the request synchronously, * otherwise enqueue the request for subsequent processing on another thread. */ final override def sendReq(targetActor: ExchangeMessengerActor, exchangeMessengerRequest: ExchangeMessengerRequest, srcExchange: ExchangeMessenger) { if (async) { super.sendReq(targetActor, exchangeMessengerRequest, srcExchange) } else { exchangeMessengerRequest.setOldRequest(srcExchange.curReq) val srcControllingExchange = srcExchange.asInstanceOf[Exchange].controllingExchange if (controllingExchange == srcControllingExchange) { _sendReq(exchangeMessengerRequest) } else if (!atomicControl.compareAndSet(null, srcControllingExchange)) { super.sendReq(targetActor, exchangeMessengerRequest, srcExchange) } else { try { _sendReq(exchangeMessengerRequest) } finally { atomicControl.set(null) } } if (!isEmpty) sendRem(srcExchange) } } /** * Handle a potential race condition. */ @tailrec final def sendRem(srcExchange: ExchangeMessenger) { val srcControllingExchange = srcExchange.asInstanceOf[Exchange].controllingExchange if (controllingExchange == srcControllingExchange) { poll } else if (atomicControl.compareAndSet(null, srcControllingExchange)) { try { poll } finally { atomicControl.set(null) } } else return if (isEmpty) return sendRem(srcExchange) } /** * Return a response the same way the request was sent. */ override def sendResponse(senderExchange: ExchangeMessenger, rsp: ExchangeMessengerResponse) { if (curReq.fastSend) { senderExchange.exchangeRsp(rsp) } else super.sendResponse(senderExchange, rsp) } }