如何構(gòu)建應用程序引擎以及使用 Windows Azure Storage 實現(xiàn)異步消息傳送
現(xiàn)在,我們需要一個簡單的包裝來與我們的隊列交互。從本質(zhì)上說,我們需要能夠?qū)?a class="contentlabel" href="http://www.2s4d.com/news/listbylabel/label/消息">消息插入隊列,獲取任何掛起的消息并清除該隊列(請參見圖 3)。本文引用地址:http://www.2s4d.com/article/149634.htm
圖 3 用于與隊列交互的包裝
namespace HollywoodHackers.Storage.Queue
{
public class StdQueueT> :
StorageBase where T : QueueMessageBase, new()
{
protected CloudQueue queue;
protected CloudQueueClient client;
public StdQueue(string queueName)
{
client = new CloudQueueClient
(StorageBase.QueueBaseUri, StorageBase.Credentials);
queue = client.GetQueueReference(queueName);
queue.CreateIfNotExist();
}
public void AddMessage(T message)
{
CloudQueueMessage msg =
new CloudQueueMessage(message.ToBinary());
queue.AddMessage(msg);
}
public void DeleteMessage(CloudQueueMessage msg)
{
queue.DeleteMessage(msg);
}
public CloudQueueMessage GetMessage()
{
return queue.GetMessage(TimeSpan.FromSeconds(60));
}
}
public class ToastQueue : StdQueueToastQueueMessage>
{
public ToastQueue()
: base(toasts)
{
}
}
}
我們還需要為表存儲設置一個包裝,以便在用戶登錄到站點之前可以存儲用戶通知??梢允褂?PartitionKey(行集合的標識符)和 RowKey(可唯一標識特定分區(qū)中的每個單獨行)組織表數(shù)據(jù)。選擇 PartitionKey 和 RowKey 使用的數(shù)據(jù)是在使用表存儲時所做的最重要的設計決策之一。
這些特點允許跨存儲節(jié)點進行負載平衡,并在應用程序中提供內(nèi)置的可伸縮性選項。不考慮數(shù)據(jù)的數(shù)據(jù)中心關(guān)聯(lián)性,使用同一分區(qū)鍵的表存儲中的行將保留在相同的物理數(shù)據(jù)存儲中。因為針對每個用戶存儲對應的消息,所以分區(qū)鍵將是 UserName,而 RowKey 則成為標識每行的 GUID(請參見圖 4)。
圖 4 表存儲的包裝
namespace HollywoodHackers.Storage.Repositories
{
public class UserTextNotificationRepository : StorageBase
{
public const string EntitySetName =
UserTextNotifications;
CloudTableClient tableClient;
UserTextNotificationContext notificationContext;
public UserTextNotificationRepository()
: base()
{
tableClient = new CloudTableClient
(StorageBase.TableBaseUri, StorageBase.Credentials);
notificationContext = new UserTextNotificationContext
(StorageBase.TableBaseUri,StorageBase.Credentials);
tableClient.CreateTableIfNotExist(EntitySetName);
}
public UserTextNotification[]
GetNotificationsForUser(string userName)
{
var q = from notification in
notificationContext.UserNotifications
where notification.TargetUserName ==
userName select notification;
return q.ToArray();
}
public void AddNotification
(UserTextNotification notification)
{
notification.RowKey = Guid.NewGuid().ToString();
notificationContext.AddObject
(EntitySetName, notification);
notificationContext.SaveChanges();
}
}
}
因為我們的存儲機制已經(jīng)確定,所以我們需要一個工作者角色作為引擎;以便在我們的電子商務站點的后臺處理消息。為此,我們定義了一個從 Microsoft.ServiceHosting.ServiceRuntime.RoleEntryPoint 類繼承的類,并將其與云服務項目中的工作者角色關(guān)聯(lián)(請參見圖 5)。
圖 5 作為引擎的工作者角色
public class WorkerRole : RoleEntryPoint
{
ShoppingCartQueue cartQueue;
ToastQueue toastQueue;
UserTextNotificationRepository toastRepository;
public override void Run()
{
// This is a sample worker implementation.
//Replace with your logic.
Trace.WriteLine(WorkerRole1 entry point called,
Information);
toastRepository = new UserTextNotificationRepository();
InitQueue();
while (true)
{
Thread.Sleep(10000);
Trace.WriteLine(Working, Information);
ProcessNewTextNotifications();
ProcessShoppingCarts();
}
}
private void InitQueue()
{
cartQueue = new ShoppingCartQueue();
toastQueue = new ToastQueue();
}
private void ProcessNewTextNotifications()
{
CloudQueueMessage cqm = toastQueue.GetMessage();
while (cqm != null)
{
ToastQueueMessage message =
QueueMessageBase.FromMessageToastQueueMessage>(cqm);
toastRepository.AddNotification(new
UserTextNotification()
{
MessageText = message.MessageText,
MessageDate = DateTime.Now,
TargetUserName = message.TargetUserName,
Title = message.Title
});
toastQueue.DeleteMessage(cqm);
cqm = toastQueue.GetMessage();
}
}
private void ProcessShoppingCarts()
{
// We will add this later in the article!
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
DiagnosticMonitor.Start(DiagnosticsConnectionString);
// For information on handling configuration changes
// see the MSDN topic at
//http://go.microsoft.com/fwlink/?LinkId=166357.
RoleEnvironment.Changing += RoleEnvironmentChanging;
return base.OnStart();
}
private void RoleEnvironmentChanging(object sender, RoleEnvironmentChangingEventArgs e)
{
// If a configuration setting is changing
if (e.Changes.Any(change => change is RoleEnvironmentConfigurationSettingChange))
{
// Set e.Cancel to true to restart this role instance
e.Cancel = true;
}
}
}
讓我們看一下工作者角色代碼。在初始化和設置所需的隊列和表存儲之后,此代碼將進入一個循環(huán)。每 10 秒鐘,它就會處理一次隊列中的消息。每次我們通過處理循環(huán)時都將獲取隊列中的消息,直到最終返回 null,這表示隊列為空。
您從隊列中看到的消息永遠不會超過 20 個,如果不信,您可以反復嘗試來驗證一下。對隊列進行的任何處理都有時間限制,必須在該時間范圍內(nèi)對每個隊列消息執(zhí)行有意義的操作,否則隊列消息將被視為超時,并在隊列中顯示備份,以便可以由其他工作者來處理此消息。每個消息都會作為用戶通知添加到表存儲中。關(guān)于工作者角色需要記住的重要一點是:一旦入口點方法完成,工作者角色也就結(jié)束了。這就是您需要在一個循環(huán)中保持邏輯運行的原因。
從客戶端的角度來說,我們需要能夠以 JSON 形式返回消息,以便 jQuery 可以異步輪詢并顯示新的用戶通知。為此,我們會將一些代碼添加到消息控制器中,以便可以訪問這些通知(請參見圖 6)。
圖 6 以 JSON 形式返回消息
public JsonResult GetMessages()
{
if (User.Identity.IsAuthenticated)
{
UserTextNotification[] userToasts =
toastRepository.GetNotifications(User.Identity.Name);
object[] data =
(from UserTextNotification toast in userToasts
select new { title = toast.Title ?? Notification,
text = toast.MessageText }).ToArray();
return Json(data, JsonRequestBehavior.AllowGet);
}
else
return Json(null);
}
在 Visual Studio 2010 beta 2 下的 ASP.NET MVC 2(我們用于撰寫本文的環(huán)境)中,如果沒有 JsonRequestBehavior.AllowGet 選項,您無法將 JSON 數(shù)據(jù)返回到 jQuery 或其他客戶端。在 ASP.NET MVC 1 中,不需要此選項?,F(xiàn)在,我們可以編寫 JavaScript,它每 15 秒將調(diào)用一次 GetMessages 方法并將以 toast 形式消息顯示通知(請參見圖 7)。
評論