SourceForge: sasa/sasa: Sasa.Reactive/Property.cs@9ac61e30fa69
Sasa.Reactive/Property.cs
author Sandro Magi <naasking@gmail.com>
Fri Dec 23 12:19:52 2011 -0500 (4 months ago)
changeset 735 9ac61e30fa69
parent 684 a26cfdd5314e
child 737 3e401bb5267e
permissions -rw-r--r--
-cleaned up comments and documentation
-fixed reference to latest stable System.Reactive
     1 using System;
     2 using System.Threading;
     3 using System.Collections.Generic;
     4 using System.Linq;
     5 using System.Text;
     6 using System.Reactive;
     7 using System.Reactive.Linq;
     8 
     9 namespace Sasa.Reactive
    10 {
    11     /// <summary>
    12     /// Indicates the thread should retry its updates.
    13     /// </summary>
    14     public sealed class RetryException : System.Exception
    15     {
    16     }
    17 
    18     /// <summary>
    19     /// Indicates an error occurred while propagating the update.
    20     /// </summary>
    21     public sealed class UpdateException : System.Exception
    22     {
    23         /// <summary>
    24         /// Construct an exception.
    25         /// </summary>
    26         /// <param name="message">The message to report.</param>
    27         /// <param name="inner">The inner exception.</param>
    28         public UpdateException(string message, Exception inner)
    29             : base(message, inner)
    30         {
    31         }
    32     }
    33     /// <summary>
    34     /// Used only
    35     /// </summary>
    36     public sealed class NoValueException : System.Exception
    37     {
    38         internal static NoValueException instance = new NoValueException();
    39         NoValueException()
    40             : base("The Property<T> currently has no value.")
    41         {
    42         }
    43     }
    44 
    45     /// <summary>
    46     /// An observable property that retains only the current value, and notifies clients only of distinct changes.
    47     /// </summary>
    48     /// <typeparam name="T">The type of values.</typeparam>
    49     /// <remarks>
    50     /// Clients should first invoke <seealso cref="Ticket.Acquire" /> or <seealso cref="Transaction.Begin" /> before operating on signals.
    51     /// </remarks>
    52     public class Property<T> : IObservable<T>, IObserver<T>, IDisposable, IRef<T>, IValue<T>, IResult<T>
    53     {
    54         // the queue of subscribed observers
    55         internal Node head;
    56         internal Node tail;
    57         // the current value or error
    58         internal T value;
    59         internal Exception error;
    60         // monotonically increasing version number; crude approximation of transactional memory
    61         long version = long.MinValue;
    62         // the current subscription
    63         IDisposable subscription;
    64 
    65         #region Public API
    66         /// <summary>
    67         /// Initialize an empty signal.
    68         /// </summary>
    69         public Property() : this(NoValueException.instance)
    70         {
    71         }
    72         /// <summary>
    73         /// Initialize a signal with a default value.
    74         /// </summary>
    75         /// <param name="value">The default value of the signal.</param>
    76         public Property(T value)
    77             : this(error: null)
    78         {
    79             this.value = value;
    80         }
    81         /// <summary>
    82         /// Initialize a signal with a default value.
    83         /// </summary>
    84         /// <param name="error">The default value of the signal.</param>
    85         public Property(Exception error)
    86         {
    87             head = tail = new Node();
    88             this.error = error;
    89         }
    90         /// <summary>
    91         /// Initialize the signal with the given observable sequence.
    92         /// </summary>
    93         /// <param name="observable">The observable to listen to.</param>
    94         public Property(IObservable<T> observable)
    95             : this(observable.FirstOrDefault())
    96         {
    97             Switch(observable);
    98         }
    99         /// <summary>
   100         /// Dispose of a <seealso cref="Property{T}"/>.
   101         /// </summary>
   102         ~Property()
   103         {
   104             Dispose();
   105         }
   106         /// <summary>
   107         /// The current value of the signal.
   108         /// </summary>
   109         /// <exception cref="System.Exception">Thrown if the current value is an error.</exception>
   110         public T Value
   111         {
   112             get
   113             {
   114                 //FIXME: why lock this?
   115                 lock (head)
   116                 {
   117                     Acquire();
   118                     if (error != null) throw error;
   119                     return value;
   120                 }
   121             }
   122             set
   123             {
   124                 bool changed;
   125                 //FIXME: why lock this?
   126                 lock (head)
   127                 {
   128                     Acquire();
   129                     changed = !EqualityComparer<T>.Default.Equals(this.value, this.value = value);
   130                     this.error = null;
   131                 }
   132                 if (changed) Notify(head.next, value, null);
   133             }
   134         }
   135         /// <summary>
   136         /// The current error value of the signal, if any.
   137         /// </summary>
   138         public Exception Error
   139         {
   140             get
   141             {
   142                 lock (head)
   143                 {
   144                     Acquire();
   145                     return error;
   146                 }
   147             }
   148             set
   149             {
   150                 T ovalue;
   151                 bool changed;
   152                 lock (head)
   153                 {
   154                     Acquire();
   155                     changed = !EqualityComparer<Exception>.Default.Equals(this.error, this.error = value);
   156                     ovalue = this.value;
   157                 }
   158                 if (changed) Notify(head.next, ovalue, value);
   159             }
   160         }
   161         /// <summary>
   162         /// True if the <seealso cref="Property{T}"/> has a value, and no error.
   163         /// </summary>
   164         public bool HasValue
   165         {
   166             get { return Error != null; }
   167         }
   168         /// <summary>
   169         /// Obtains the encapsulated value, if any.
   170         /// </summary>
   171         /// <param name="value">The value to extract.</param>
   172         /// <returns>True if value was available, false otherwise.</returns>
   173         public bool TryGetValue(out T value)
   174         {
   175             value = this.value;
   176             return this.error == null;
   177         }
   178         /// <summary>
   179         /// Returns true if this signal has subscribed to observables.
   180         /// </summary>
   181         public bool IsSubscribed
   182         {
   183             get { return subscription != null; }
   184         }
   185         /// <summary>
   186         /// Notify <paramref name="observer"/> of any changes to this signal.
   187         /// </summary>
   188         /// <param name="observer">The observer to notify.</param>
   189         /// <returns>A disposable handle representing the subscription.</returns>
   190         public IDisposable Subscribe(IObserver<T> observer)
   191         {
   192             // notify current observer
   193             T value;
   194             Exception error;
   195             //FIXME: why acquire ticket?
   196             Ticket.Acquire();
   197             lock (head)
   198             {
   199                 Acquire();
   200                 value = this.value;
   201                 error = this.error;
   202             }
   203             var node = Enqueue(observer);
   204             // does not notify if property has not yet been initialized
   205             Notify(node, value, error);
   206             // if ticket acquired locally, then release it immediately after notifications complete
   207             Ticket.Release();
   208             return new DisposeObserver { node = node, property = this };
   209         }
   210         /// <summary>
   211         /// Subscribe to any changes made to this property.
   212         /// </summary>
   213         /// <param name="onNext">The delegate to invoke for a new value.</param>
   214         /// <returns>A disposable handle representing the subscription.</returns>
   215         public IDisposable Subscribe(Action<T> onNext)
   216         {
   217             return Subscribe(Observer.Create(onNext));
   218         }
   219         /// <summary>
   220         /// Subscribe to any changes made to this property.
   221         /// </summary>
   222         /// <param name="onNext">The delegate to invoke for a new value.</param>
   223         /// <param name="onError">The delegate to invoke when the property receives an error.</param>
   224         /// <returns>A disposable handle representing the subscription.</returns>
   225         public IDisposable Subscribe(Action<T> onNext, Action<Exception> onError)
   226         {
   227             return Subscribe(Observer.Create(onNext, onError));
   228         }
   229         /// <summary>
   230         /// Subscribe to any changes made to this property.
   231         /// </summary>
   232         /// <param name="onNext">The delegate to invoke for a new value.</param>
   233         /// <param name="onError">The delegate to invoke on error.</param>
   234         /// <param name="onDispose">The delegate to invoke when the property is disposed.</param>
   235         /// <returns>A disposable handle representing the subscription.</returns>
   236         public IDisposable Subscribe(Action<T> onNext, Action<Exception> onError, Action onDispose)
   237         {
   238             return Subscribe(Observer.Create(onNext, onError, onDispose));
   239         }
   240         /// <summary>
   241         /// Subscribe this signal to listen to <paramref name="observable"/>.
   242         /// </summary>
   243         /// <param name="observable">The observable to listen to.</param>
   244         public void Switch(IObservable<T> observable)
   245         {
   246             //FIXME: why acquire this lock?
   247             lock (head)
   248             {
   249                 Acquire();
   250                 if (subscription != null) subscription.Dispose();
   251                 subscription = observable.Subscribe(this);
   252             }
   253         }
   254 
   255         /// <summary>
   256         /// Dispose of any subscriptions.
   257         /// </summary>
   258         public void Dispose()
   259         {
   260             Node head;
   261             lock (this.head)
   262             {
   263                 tail = null;
   264                 if (subscription != null) subscription.Dispose();
   265                 head = this.head.next;
   266                 this.head.next = null;
   267             }
   268             for (var y = head; y != null; y = y.next)
   269             {
   270                 try
   271                 {
   272                     y.observer.OnCompleted();
   273                 }
   274                 catch
   275                 {
   276                     continue;
   277                 }
   278             }
   279         }
   280         /// <summary>
   281         /// Generates a string representation.
   282         /// </summary>
   283         /// <returns>A string representation of this property.</returns>
   284         public override string ToString()
   285         {
   286             return Value.ToString();
   287         }
   288         #endregion
   289 
   290         #region Private API
   291         void IObserver<T>.OnNext(T value)
   292         {
   293             Value = value;
   294         }
   295         void IObserver<T>.OnError(Exception error)
   296         {
   297             Error = error;
   298         }
   299         void IObserver<T>.OnCompleted()
   300         {
   301             Dispose();
   302         }
   303         void Acquire()
   304         {
   305             CheckDispose();
   306             if (version > Ticket.Current) throw new RetryException();
   307             version = Ticket.Current;
   308         }
   309         void CheckDispose()
   310         {
   311             if (tail == null) throw new ObjectDisposedException("Property<T>", "This property has been disposed.");
   312         }
   313         void Notify(Node head, T value, Exception error)
   314         {
   315             if (error != null)
   316             {
   317                 if (error is NoValueException) return;
   318                 for (var x = head; x != null; x = x.next)
   319                 {
   320                     Notify(x.observer, error);
   321                 }
   322             }
   323             else
   324             {
   325                 for (var x = head; x != null; x = x.next)
   326                 {
   327                     Notify(x.observer, value);
   328                 }
   329             }
   330         }
   331         void Notify(IObserver<T> x, T value)
   332         {
   333             try
   334             {
   335                 x.OnNext(value);
   336             }
   337             catch (RetryException)
   338             {
   339                 //FIXME: should this propagate, ignore, or accumulate errors?
   340                 throw; // propagate retry exceptions
   341             }
   342             catch (Exception e)
   343             {
   344                 // notify only the current observer of the error
   345                 Notify(x, new UpdateException("Error propagating value", e));
   346             }
   347         }
   348         void Notify(IObserver<T> x, Exception error)
   349         {
   350             while (true)
   351             {
   352                 try
   353                 {
   354                     x.OnError(error);
   355                     return;
   356                 }
   357                 catch (RetryException)
   358                 {
   359                     //FIXME: should this propagate, ignore, or accumulate errors?
   360                     throw; // propagate retry exceptions
   361                 }
   362                 catch (Exception e)
   363                 {
   364                     // try notifying of new update error
   365                     error = new UpdateException("Error propagating exception.", e);
   366                 }
   367             }
   368         }
   369 
   370         internal Node Enqueue(IObserver<T> o)
   371         {
   372             lock (tail)
   373             {
   374                 tail.next = new Node { observer = o };
   375                 return tail = tail.next;
   376             }
   377         }
   378         void Remove(Node node)
   379         {
   380             // must lock for thread-safety:
   381             // head -> o1 -> o2 -> o3
   382             //T1:rem(o1)       T2:rem(o2)
   383             //head.n = o1.n    o1.n = o2.n
   384             lock (head)
   385             {
   386                 for (var x = head; x.next != null; x = x.next)
   387                 {
   388                     if (node == x.next)
   389                     {
   390                         // must lock in case node is tail
   391                         lock (tail) x.next = node.next;
   392                         return;
   393                     }
   394                 }
   395             }
   396         }
   397         #endregion
   398 
   399         #region Public operators
   400         /// <summary>
   401         /// Implicitly create a <seealso cref="Property{T}"/>.
   402         /// </summary>
   403         /// <param name="value">The default value of the property.</param>
   404         /// <returns>A property with the given value.</returns>
   405         public static implicit operator Property<T>(T value)
   406         {
   407             return new Property<T>(value);
   408         }
   409         /// <summary>
   410         /// Implicitly create a <seealso cref="Property{T}"/>.
   411         /// </summary>
   412         /// <param name="error">The starting error of the property.</param>
   413         /// <returns>A property with the given error.</returns>
   414         public static implicit operator Property<T>(Exception error)
   415         {
   416             return new Property<T>(error);
   417         }
   418         //public static implicit operator Signal<T>(IObservable<T> value)
   419         //{
   420         //    return new Signal<T>(value);
   421         //}
   422         /// <summary>
   423         /// Merge events from two sources.
   424         /// </summary>
   425         /// <param name="lhs">The first property.</param>
   426         /// <param name="rhs">The second property.</param>
   427         /// <returns>A stream accumulating the changes from both sources.</returns>
   428         public static IObservable<T> operator |(Property<T> lhs, IObservable<T> rhs)
   429         {
   430             return lhs.Merge(rhs);
   431         }
   432         /// <summary>
   433         /// Merge events from two properties.
   434         /// </summary>
   435         /// <param name="lhs">The first property.</param>
   436         /// <param name="rhs">The second property.</param>
   437         /// <returns>A stream accumulating the changes from both sources.</returns>
   438         public static IObservable<T> operator |(IObservable<T> lhs, Property<T> rhs)
   439         {
   440             return lhs.Merge(rhs);
   441         }
   442         /// <summary>
   443         /// Merge events from two properties.
   444         /// </summary>
   445         /// <param name="lhs">The first property.</param>
   446         /// <param name="rhs">The second property.</param>
   447         /// <returns>A stream accumulating the changes from both sources.</returns>
   448         public static IObservable<T> operator |(Property<T> lhs, Property<T> rhs)
   449         {
   450             return lhs.Merge(rhs);
   451         }
   452         #endregion
   453 
   454         class DisposeObserver : IDisposable
   455         {
   456             internal Node node;
   457             internal Property<T> property;
   458             ~DisposeObserver()
   459             {
   460                 Dispose();
   461             }
   462             public void Dispose()
   463             {
   464                 var x = Interlocked.CompareExchange(ref property, null, property);
   465                 if (x != null)
   466                 {
   467                     x.Remove(node);
   468                     GC.SuppressFinalize(this);
   469                 }
   470             }
   471         }
   472         internal sealed class Node
   473         {
   474             internal IObserver<T> observer;
   475             internal Node next;
   476         }
   477 
   478         #region Linq extensions
   479         /// <summary>
   480         /// 
   481         /// </summary>
   482         /// <typeparam name="R"></typeparam>
   483         /// <param name="selector"></param>
   484         /// <returns></returns>
   485         public Property<R> Select<R>(Func<T, R> selector)
   486         {
   487             var x = new Property<R>();
   488                 x.subscription = Subscribe(t =>
   489                 {
   490                     try
   491                     {
   492                         x.Value = selector(t);
   493                     }
   494                     catch (Exception e)
   495                     {
   496                         x.Error = e;
   497                     }
   498                 },
   499                 e => x.Error = e);
   500             return x;
   501         }
   502         public Property<R> SelectMany<R>(Func<T, Property<R>> selector)
   503         {
   504             var x = new Property<R>();
   505                 x.subscription = Subscribe(t =>
   506                 {
   507                     try
   508                     {
   509                         x.Value = selector(t).Value;
   510                     }
   511                     catch (Exception e)
   512                     {
   513                         x.Error = e;
   514                     }
   515                 }, e => x.Error = e);
   516             return x;
   517         }
   518         public Property<R> SelectMany<U, R>(Func<T, Property<U>> collector, Func<T, U, R> selector)
   519         {
   520             var x = new Property<R>();
   521                 x.subscription = Subscribe(t =>
   522                 {
   523                     try
   524                     {
   525                         x.Value = selector(t, collector(t).Value);
   526                     }
   527                     catch (Exception e)
   528                     {
   529                         x.Error = e;
   530                     }
   531                 }, e => x.Error = e);
   532             return x;
   533         }
   534         public Property<T> Where(Func<T, bool> predicate)
   535         {
   536             var x = new Property<T>();
   537                 x.subscription = Subscribe(t =>
   538                 {
   539                     try
   540                     {
   541                         if (predicate(t)) x.Value = t;
   542                     }
   543                     catch (Exception e)
   544                     {
   545                         x.Error = e;
   546                     }
   547                 },
   548                 e => x.Error = e);
   549             return x;
   550         }
   551         public Property<T> Skip(int count)
   552         {
   553             var x = new Property<T>();
   554                 x.subscription = Subscribe(t =>
   555                 {
   556                     if (count-- > 0) return;
   557                     try
   558                     {
   559                         x.Value = t;
   560                     }
   561                     catch (Exception e)
   562                     {
   563                         x.Error = e;
   564                     }
   565                 },
   566                 e => x.Error = e);
   567             return x;
   568         }
   569         /// <summary>
   570         /// Register a permanent event handler.
   571         /// </summary>
   572         /// <param name="onNext">The handler to invoke.</param>
   573         public void Register(Action<T> onNext)
   574         {
   575             Enqueue(Observer.Create(onNext));
   576         }
   577         #endregion
   578     }
   579     /// <summary>
   580     /// Some convenient extensions.
   581     /// </summary>
   582     public static class Property
   583     {
   584         /// <summary>
   585         /// Bind an object property.
   586         /// </summary>
   587         /// <typeparam name="T">The type of the property value.</typeparam>
   588         /// <param name="value">The current property value.</param>
   589         /// <param name="setter">A function used to set the underlying property value.</param>
   590         /// <returns>A property that's bidirectionally bound to the underlying property.</returns>
   591         /// <remarks>
   592         /// The returned Property&lt;T&gt; must be the ONLY way that the underlying object property is updated,
   593         /// otherwise they will become out of sync. Use the more elaborate Bind operation if the underlying
   594         /// property is updated by other means.
   595         /// </remarks>
   596         public static Property<T> Bind<T>(this T value, Action<T> setter)
   597         {
   598             var prop = new Property<T>(value);
   599                 prop.Enqueue(Observer.Create(setter));
   600             return prop;
   601         }
   602         /// <summary>
   603         /// Bind an object property.
   604         /// </summary>
   605         /// <typeparam name="T">The type of the property value.</typeparam>
   606         /// <typeparam name="TEventArgs">The event args.</typeparam>
   607         /// <param name="getter">A function used to obtain the underlying property value.</param>
   608         /// <param name="setter">A function used to set the underlying property value.</param>
   609         /// <param name="register">Registers an event handler that notifies when the underlying property has been updated.</param>
   610         /// <param name="unregister">Unregisters an event handler for the underlying property.</param>
   611         /// <returns>A property that's bidirectionally bound to an underlying object property.</returns>
   612         public static Property<T> Bind<T, TEventArgs>(Func<T> getter, Action<T> setter, Action<EventHandler<TEventArgs>> register, Action<EventHandler<TEventArgs>> unregister = null)
   613             where TEventArgs : EventArgs
   614         {
   615             var prop = new Property<T>();
   616             EventHandler<TEventArgs> h = (o, e) =>
   617             {
   618                 try
   619                 {
   620                     prop.Value = getter();
   621                 }
   622                 catch (Exception ex)
   623                 {
   624                     prop.Error = ex;
   625                 }
   626             };
   627             // invoke event handler to set the initial property
   628             h(prop, default(TEventArgs));
   629             prop.Enqueue(Observer.Create(setter, () => unregister(h)));
   630             register(h);
   631             return prop;
   632         }
   633     }
   634 }